Files
yidaima_tools/db_manager.py

340 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据库管理模块 - 连接 MySQL 管理文章发布数据
"""
from __future__ import annotations
import mysql.connector
from mysql.connector import Error
from dataclasses import dataclass
from typing import List, Optional, Tuple
import os
import sys
@dataclass
class ProjectOrder:
"""项目订单数据类"""
name: Optional[str] = None
paths: Optional[str] = None
zlzt: Optional[str] = None # 整理状态
sfxxm: Optional[str] = None
name1: Optional[str] = None
name2: Optional[str] = None
bianhao: Optional[str] = None
desc: Optional[str] = None
sql_paths: Optional[str] = None
error_message: Optional[str] = None
class DatabaseManager:
"""MySQL 数据库管理器"""
def __init__(self, host: str = "localhost", database: str = "test",
user: str = "root", password: str = "123456", port: int = 3306):
self.host = host
self.database = database
self.user = user
self.password = password
self.port = port
self.connection = None
def connect(self) -> bool:
"""连接数据库"""
try:
self.connection = mysql.connector.connect(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
port=self.port,
charset='utf8mb4',
collation='utf8mb4_unicode_ci',
connection_timeout=5,
use_pure=True
)
return True
except Error as e:
print(f"[DatabaseManager] 连接数据库失败: {e}")
return False
except Exception as e:
print(f"[DatabaseManager] 连接数据库时发生未知错误: {e}")
return False
def disconnect(self):
"""断开数据库连接"""
if self.connection and self.connection.is_connected():
self.connection.close()
self.connection = None
def is_connected(self) -> bool:
"""检查是否已连接"""
return self.connection is not None and self.connection.is_connected()
def ensure_connected(self) -> bool:
"""确保已连接数据库"""
if not self.is_connected():
return self.connect()
return True
def get_projects(self, page: int = 1, page_size: int = 20,
search_name: Optional[str] = None,
zlzt: Optional[str] = None) -> Tuple[List[ProjectOrder], int]:
"""
获取项目列表(分页)
Args:
page: 页码从1开始
page_size: 每页数量
search_name: 搜索名称(模糊匹配)
zlzt: 整理状态
Returns:
(项目列表, 总数量)
"""
if not self.ensure_connected():
return [], 0
try:
cursor = self.connection.cursor(dictionary=True)
# 构建查询条件
where_conditions = []
params = []
if search_name:
where_conditions.append("name LIKE %s")
params.append(f"%{search_name}%")
if zlzt:
where_conditions.append("zlzt = %s")
params.append(zlzt)
where_clause = ""
if where_conditions:
where_clause = "WHERE " + " AND ".join(where_conditions)
# 获取总数
count_sql = f"SELECT COUNT(*) as total FROM t_new_project_order_new {where_clause}"
cursor.execute(count_sql, params)
total = cursor.fetchone()['total']
# 获取分页数据
offset = (page - 1) * page_size
sql = f"""
SELECT name, paths, zlzt, sfxxm, name1, name2, bianhao,
`desc`, sql_paths, error_message
FROM t_new_project_order_new
{where_clause}
ORDER BY name2 ASC
LIMIT %s OFFSET %s
"""
cursor.execute(sql, params + [page_size, offset])
rows = cursor.fetchall()
projects = []
for row in rows:
projects.append(ProjectOrder(
name=row.get('name'),
paths=row.get('paths'),
zlzt=row.get('zlzt'),
sfxxm=row.get('sfxxm'),
name1=row.get('name1'),
name2=row.get('name2'),
bianhao=row.get('bianhao'),
desc=row.get('desc'),
sql_paths=row.get('sql_paths'),
error_message=row.get('error_message')
))
cursor.close()
return projects, total
except Error as e:
print(f"[DatabaseManager] 查询失败: {e}")
return [], 0
def update_zlzt(self, name: str, zlzt: str) -> bool:
"""
更新整理状态
Args:
name: 项目名称 (匹配 name 或 name2)
zlzt: 新的整理状态值
Returns:
是否成功
"""
if not self.ensure_connected():
return False
try:
cursor = self.connection.cursor()
# 尝试同时匹配 name 和 name2
sql = "UPDATE t_new_project_order_new SET zlzt = %s WHERE name = %s OR name2 = %s"
cursor.execute(sql, (zlzt, name, name))
self.connection.commit()
affected = cursor.rowcount
cursor.close()
return affected > 0
except Error as e:
print(f"[DatabaseManager] 更新失败: {e}")
return False
def get_project_by_name(self, name: str) -> Optional[ProjectOrder]:
"""
根据名称获取项目
Args:
name: 项目名称
Returns:
项目对象,如果找不到返回 None
"""
if not self.ensure_connected():
return None
try:
cursor = self.connection.cursor(dictionary=True)
sql = """
SELECT name, paths, zlzt, sfxxm, name1, name2, bianhao,
`desc`, sql_paths, error_message
FROM t_new_project_order_new
WHERE name = %s
"""
cursor.execute(sql, (name,))
row = cursor.fetchone()
cursor.close()
if row:
return ProjectOrder(
name=row.get('name'),
paths=row.get('paths'),
zlzt=row.get('zlzt'),
sfxxm=row.get('sfxxm'),
name1=row.get('name1'),
name2=row.get('name2'),
bianhao=row.get('bianhao'),
desc=row.get('desc'),
sql_paths=row.get('sql_paths'),
error_message=row.get('error_message')
)
return None
except Error as e:
print(f"[DatabaseManager] 查询失败: {e}")
return None
def get_project_by_path(self, path: str) -> Optional[ProjectOrder]:
"""
根据路径模糊获取项目
Args:
path: 项目路径关键字
Returns:
项目对象,如果找不到返回 None
"""
if not self.ensure_connected():
return None
try:
cursor = self.connection.cursor(dictionary=True)
# 使用模糊匹配,因为路径可能包含反斜杠或斜杠差异
sql = """
SELECT name, paths, zlzt, sfxxm, name1, name2, bianhao,
`desc`, sql_paths, error_message
FROM t_new_project_order_new
WHERE paths LIKE %s OR %s LIKE CONCAT('%', paths, '%')
LIMIT 1
"""
search_path = f"%{path}%"
cursor.execute(sql, (search_path, path))
row = cursor.fetchone()
cursor.close()
if row:
return ProjectOrder(
name=row.get('name'),
paths=row.get('paths'),
zlzt=row.get('zlzt'),
sfxxm=row.get('sfxxm'),
name1=row.get('name1'),
name2=row.get('name2'),
bianhao=row.get('bianhao'),
desc=row.get('desc'),
sql_paths=row.get('sql_paths'),
error_message=row.get('error_message')
)
return None
except Error as e:
print(f"[DatabaseManager] 查询失败: {e}")
return None
def delete_project(self, name: str) -> bool:
"""
删除项目
Args:
name: 项目名称
Returns:
是否成功
"""
if not self.ensure_connected():
return False
try:
cursor = self.connection.cursor()
sql = "DELETE FROM t_new_project_order_new WHERE name = %s"
cursor.execute(sql, (name,))
self.connection.commit()
affected = cursor.rowcount
cursor.close()
return affected > 0
except Error as e:
print(f"[DatabaseManager] 删除失败: {e}")
return False
def test_connection(self) -> Tuple[bool, str]:
"""
测试数据库连接
Returns:
(是否成功, 错误信息)
"""
try:
conn = mysql.connector.connect(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
port=self.port,
connect_timeout=5,
use_pure=True
)
conn.close()
return True, ""
except Error as e:
return False, str(e)
except Exception as e:
return False, f"未知错误: {str(e)}"
# 全局数据库管理器实例
_db_manager: Optional[DatabaseManager] = None
def get_db_manager(host: str = "localhost", database: str = "test",
user: str = "root", password: str = "123456",
port: int = 3306) -> DatabaseManager:
"""获取数据库管理器实例"""
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager(host, database, user, password, port)
return _db_manager
def reset_db_manager():
"""重置数据库管理器(用于配置变更后)"""
global _db_manager
if _db_manager:
_db_manager.disconnect()
_db_manager = None