""" 数据库管理模块 - 连接 MySQL 管理文章发布数据 """ from __future__ import annotations import mysql.connector from mysql.connector import Error from dataclasses import dataclass from typing import List, Optional, Tuple import os import sys @dataclass class ProjectOrder: """项目订单数据类""" name: Optional[str] = None paths: Optional[str] = None zlzt: Optional[str] = None # 整理状态 sfxxm: Optional[str] = None name1: Optional[str] = None name2: Optional[str] = None bianhao: Optional[str] = None desc: Optional[str] = None sql_paths: Optional[str] = None error_message: Optional[str] = None class DatabaseManager: """MySQL 数据库管理器""" def __init__(self, host: str = "localhost", database: str = "test", user: str = "root", password: str = "123456", port: int = 3306): self.host = host self.database = database self.user = user self.password = password self.port = port self.connection = None def connect(self) -> bool: """连接数据库""" try: self.connection = mysql.connector.connect( host=self.host, database=self.database, user=self.user, password=self.password, port=self.port, charset='utf8mb4', collation='utf8mb4_unicode_ci', connection_timeout=5, use_pure=True ) return True except Error as e: print(f"[DatabaseManager] 连接数据库失败: {e}") return False except Exception as e: print(f"[DatabaseManager] 连接数据库时发生未知错误: {e}") return False def disconnect(self): """断开数据库连接""" if self.connection and self.connection.is_connected(): self.connection.close() self.connection = None def is_connected(self) -> bool: """检查是否已连接""" return self.connection is not None and self.connection.is_connected() def ensure_connected(self) -> bool: """确保已连接数据库""" if not self.is_connected(): return self.connect() return True def get_projects(self, page: int = 1, page_size: int = 20, search_name: Optional[str] = None) -> Tuple[List[ProjectOrder], int]: """ 获取项目列表(分页) Args: page: 页码(从1开始) page_size: 每页数量 search_name: 搜索名称(模糊匹配) Returns: (项目列表, 总数量) """ if not self.ensure_connected(): return [], 0 try: cursor = self.connection.cursor(dictionary=True) # 构建查询条件 where_clause = "" params = [] if search_name: where_clause = "WHERE name LIKE %s" params.append(f"%{search_name}%") # 获取总数 count_sql = f"SELECT COUNT(*) as total FROM t_new_project_order_new {where_clause}" cursor.execute(count_sql, params) total = cursor.fetchone()['total'] # 获取分页数据 offset = (page - 1) * page_size sql = f""" SELECT name, paths, zlzt, sfxxm, name1, name2, bianhao, `desc`, sql_paths, error_message FROM t_new_project_order_new {where_clause} ORDER BY name2 ASC LIMIT %s OFFSET %s """ cursor.execute(sql, params + [page_size, offset]) rows = cursor.fetchall() projects = [] for row in rows: projects.append(ProjectOrder( name=row.get('name'), paths=row.get('paths'), zlzt=row.get('zlzt'), sfxxm=row.get('sfxxm'), name1=row.get('name1'), name2=row.get('name2'), bianhao=row.get('bianhao'), desc=row.get('desc'), sql_paths=row.get('sql_paths'), error_message=row.get('error_message') )) cursor.close() return projects, total except Error as e: print(f"[DatabaseManager] 查询失败: {e}") return [], 0 def update_zlzt(self, name: str, zlzt: str) -> bool: """ 更新整理状态 Args: name: 项目名称 zlzt: 新的整理状态值 Returns: 是否成功 """ if not self.ensure_connected(): return False try: cursor = self.connection.cursor() sql = "UPDATE t_new_project_order_new SET zlzt = %s WHERE name = %s" cursor.execute(sql, (zlzt, name)) self.connection.commit() affected = cursor.rowcount cursor.close() return affected > 0 except Error as e: print(f"[DatabaseManager] 更新失败: {e}") return False def get_project_by_name(self, name: str) -> Optional[ProjectOrder]: """ 根据名称获取项目 Args: name: 项目名称 Returns: 项目对象,如果找不到返回 None """ if not self.ensure_connected(): return None try: cursor = self.connection.cursor(dictionary=True) sql = """ SELECT name, paths, zlzt, sfxxm, name1, name2, bianhao, `desc`, sql_paths, error_message FROM t_new_project_order_new WHERE name = %s """ cursor.execute(sql, (name,)) row = cursor.fetchone() cursor.close() if row: return ProjectOrder( name=row.get('name'), paths=row.get('paths'), zlzt=row.get('zlzt'), sfxxm=row.get('sfxxm'), name1=row.get('name1'), name2=row.get('name2'), bianhao=row.get('bianhao'), desc=row.get('desc'), sql_paths=row.get('sql_paths'), error_message=row.get('error_message') ) return None except Error as e: print(f"[DatabaseManager] 查询失败: {e}") return None def delete_project(self, name: str) -> bool: """ 删除项目 Args: name: 项目名称 Returns: 是否成功 """ if not self.ensure_connected(): return False try: cursor = self.connection.cursor() sql = "DELETE FROM t_new_project_order_new WHERE name = %s" cursor.execute(sql, (name,)) self.connection.commit() affected = cursor.rowcount cursor.close() return affected > 0 except Error as e: print(f"[DatabaseManager] 删除失败: {e}") return False def test_connection(self) -> Tuple[bool, str]: """ 测试数据库连接 Returns: (是否成功, 错误信息) """ try: conn = mysql.connector.connect( host=self.host, database=self.database, user=self.user, password=self.password, port=self.port, connect_timeout=5, use_pure=True ) conn.close() return True, "" except Error as e: return False, str(e) except Exception as e: return False, f"未知错误: {str(e)}" # 全局数据库管理器实例 _db_manager: Optional[DatabaseManager] = None def get_db_manager(host: str = "localhost", database: str = "test", user: str = "root", password: str = "123456", port: int = 3306) -> DatabaseManager: """获取数据库管理器实例""" global _db_manager if _db_manager is None: _db_manager = DatabaseManager(host, database, user, password, port) return _db_manager def reset_db_manager(): """重置数据库管理器(用于配置变更后)""" global _db_manager if _db_manager: _db_manager.disconnect() _db_manager = None