Files
yidaima_tools/db_manager.py

284 lines
8.7 KiB
Python
Raw Normal View History

2026-04-09 14:55:54 +08:00
"""
数据库管理模块 - 连接 MySQL 管理文章发布数据
"""
from __future__ import annotations
import mysql.connector
from mysql.connector import Error
from dataclasses import dataclass
from typing import List, Optional, Tuple
import os
import sys
@dataclass
class ProjectOrder:
"""项目订单数据类"""
name: Optional[str] = None
paths: Optional[str] = None
zlzt: Optional[str] = None # 整理状态
sfxxm: Optional[str] = None
name1: Optional[str] = None
name2: Optional[str] = None
bianhao: Optional[str] = None
desc: Optional[str] = None
sql_paths: Optional[str] = None
error_message: Optional[str] = None
class DatabaseManager:
"""MySQL 数据库管理器"""
def __init__(self, host: str = "localhost", database: str = "test",
user: str = "root", password: str = "123456", port: int = 3306):
self.host = host
self.database = database
self.user = user
self.password = password
self.port = port
self.connection = None
def connect(self) -> bool:
"""连接数据库"""
try:
self.connection = mysql.connector.connect(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
port=self.port,
charset='utf8mb4',
collation='utf8mb4_unicode_ci',
connection_timeout=5,
use_pure=True
)
return True
except Error as e:
print(f"[DatabaseManager] 连接数据库失败: {e}")
return False
except Exception as e:
print(f"[DatabaseManager] 连接数据库时发生未知错误: {e}")
return False
def disconnect(self):
"""断开数据库连接"""
if self.connection and self.connection.is_connected():
self.connection.close()
self.connection = None
def is_connected(self) -> bool:
"""检查是否已连接"""
return self.connection is not None and self.connection.is_connected()
def ensure_connected(self) -> bool:
"""确保已连接数据库"""
if not self.is_connected():
return self.connect()
return True
def get_projects(self, page: int = 1, page_size: int = 20,
search_name: Optional[str] = None) -> Tuple[List[ProjectOrder], int]:
"""
获取项目列表分页
Args:
page: 页码从1开始
page_size: 每页数量
search_name: 搜索名称模糊匹配
Returns:
(项目列表, 总数量)
"""
if not self.ensure_connected():
return [], 0
try:
cursor = self.connection.cursor(dictionary=True)
# 构建查询条件
where_clause = ""
params = []
if search_name:
where_clause = "WHERE name LIKE %s"
params.append(f"%{search_name}%")
# 获取总数
count_sql = f"SELECT COUNT(*) as total FROM t_new_project_order_new {where_clause}"
cursor.execute(count_sql, params)
total = cursor.fetchone()['total']
# 获取分页数据
offset = (page - 1) * page_size
sql = f"""
SELECT name, paths, zlzt, sfxxm, name1, name2, bianhao,
`desc`, sql_paths, error_message
FROM t_new_project_order_new
{where_clause}
ORDER BY name2 ASC
LIMIT %s OFFSET %s
"""
cursor.execute(sql, params + [page_size, offset])
rows = cursor.fetchall()
projects = []
for row in rows:
projects.append(ProjectOrder(
name=row.get('name'),
paths=row.get('paths'),
zlzt=row.get('zlzt'),
sfxxm=row.get('sfxxm'),
name1=row.get('name1'),
name2=row.get('name2'),
bianhao=row.get('bianhao'),
desc=row.get('desc'),
sql_paths=row.get('sql_paths'),
error_message=row.get('error_message')
))
cursor.close()
return projects, total
except Error as e:
print(f"[DatabaseManager] 查询失败: {e}")
return [], 0
def update_zlzt(self, name: str, zlzt: str) -> bool:
"""
更新整理状态
Args:
name: 项目名称
zlzt: 新的整理状态值
Returns:
是否成功
"""
if not self.ensure_connected():
return False
try:
cursor = self.connection.cursor()
sql = "UPDATE t_new_project_order_new SET zlzt = %s WHERE name = %s"
cursor.execute(sql, (zlzt, name))
self.connection.commit()
affected = cursor.rowcount
cursor.close()
return affected > 0
except Error as e:
print(f"[DatabaseManager] 更新失败: {e}")
return False
def get_project_by_name(self, name: str) -> Optional[ProjectOrder]:
"""
根据名称获取项目
Args:
name: 项目名称
Returns:
项目对象如果找不到返回 None
"""
if not self.ensure_connected():
return None
try:
cursor = self.connection.cursor(dictionary=True)
sql = """
SELECT name, paths, zlzt, sfxxm, name1, name2, bianhao,
`desc`, sql_paths, error_message
FROM t_new_project_order_new
WHERE name = %s
"""
cursor.execute(sql, (name,))
row = cursor.fetchone()
cursor.close()
if row:
return ProjectOrder(
name=row.get('name'),
paths=row.get('paths'),
zlzt=row.get('zlzt'),
sfxxm=row.get('sfxxm'),
name1=row.get('name1'),
name2=row.get('name2'),
bianhao=row.get('bianhao'),
desc=row.get('desc'),
sql_paths=row.get('sql_paths'),
error_message=row.get('error_message')
)
return None
except Error as e:
print(f"[DatabaseManager] 查询失败: {e}")
return None
def delete_project(self, name: str) -> bool:
"""
删除项目
Args:
name: 项目名称
Returns:
是否成功
"""
if not self.ensure_connected():
return False
try:
cursor = self.connection.cursor()
sql = "DELETE FROM t_new_project_order_new WHERE name = %s"
cursor.execute(sql, (name,))
self.connection.commit()
affected = cursor.rowcount
cursor.close()
return affected > 0
except Error as e:
print(f"[DatabaseManager] 删除失败: {e}")
return False
def test_connection(self) -> Tuple[bool, str]:
"""
测试数据库连接
Returns:
(是否成功, 错误信息)
"""
try:
conn = mysql.connector.connect(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
port=self.port,
connect_timeout=5,
use_pure=True
)
conn.close()
return True, ""
except Error as e:
return False, str(e)
except Exception as e:
return False, f"未知错误: {str(e)}"
# 全局数据库管理器实例
_db_manager: Optional[DatabaseManager] = None
def get_db_manager(host: str = "localhost", database: str = "test",
user: str = "root", password: str = "123456",
port: int = 3306) -> DatabaseManager:
"""获取数据库管理器实例"""
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager(host, database, user, password, port)
return _db_manager
def reset_db_manager():
"""重置数据库管理器(用于配置变更后)"""
global _db_manager
if _db_manager:
_db_manager.disconnect()
_db_manager = None