Files
yidaima_tools/project_screenshot.py

1705 lines
68 KiB
Python
Raw Permalink Normal View History

2026-04-10 14:37:23 +08:00
from __future__ import annotations
import os
import re
import glob
import random
import shutil
import subprocess
import time
import asyncio
from pathlib import Path
from typing import Optional, Callable, List, Tuple
from dataclasses import dataclass
# 预设后台登录背景图片列表
LOGIN_BACKGROUND_IMAGES = [
"http://img.yidaima.cn/7-dcd4fc1e8d0144aaa064bc282d1af289",
"http://img.yidaima.cn/6-3db67e2e6ff64e018d48282638900a67",
"http://img.yidaima.cn/5-74ee6fe2f0954a8db2574e8f7fbb8ef2",
"http://img.yidaima.cn/4-a926593ba8c444a3984100e99ae322bb",
"http://img.yidaima.cn/3-f91e9a0ed55d45c9aff80a08f554c625",
"http://img.yidaima.cn/2-2bd41cdd57b54b529dc3a139e66233e0",
"http://img.yidaima.cn/1-5a0b7ee2344a46bd987c1cafff19b3ba",
"http://img.yidaima.cn/8-495b9767a2f34197b963a13226d3e1d2",
"http://img.yidaima.cn/10-991483f941914f3d85bf87a4f8748fee",
"http://img.yidaima.cn/11-d274f1d695f7472a8fcd2c8c6238a79e",
"http://img.yidaima.cn/14-0e30653336d849379060ad8c436fe512",
"http://img.yidaima.cn/16-7a58235a2a4040f99e3431c3cac9ea7a",
"http://img.yidaima.cn/17-3406a546b81c412d9e3501cdb0f88b12",
"http://img.yidaima.cn/18-0a2fae1d6634480dabf5471fa86b3623",
"http://img.yidaima.cn/19-107196938e074c92a40b62c60aed18a4"
]
@dataclass
class ProjectConfig:
"""项目配置数据类"""
project_path: str
has_front: bool = True
desktop_path: str = ""
# 脚本路径(相对于 bat 文件夹或绝对路径)
bat_folder: str = "new" # bat 文件所在文件夹(相对于桌面路径)
install_server: str = "run_install_server.bat"
install_front: str = "run_install_front.bat"
install_admin: str = "run_install_admin.bat"
run_server: str = "run_server.bat"
run_front: str = "run_front.bat"
run_admin: str = "run_admin.bat"
run_sql: str = "run_sql.bat"
# SQL 配置
sql_script_path: str = "server/db/init.sql"
# 服务配置
backend_url: str = "http://localhost:8080"
backend_startup_timeout: int = 30
front_url: str = "http://localhost:8082"
front_login_url: str = "http://localhost:8082/#/login"
front_startup_timeout: int = 60
admin_url: str = "http://localhost:8081"
admin_login_url: str = "http://localhost:8081/#/login"
admin_startup_timeout: int = 60
# 登录配置
admin_username: str = "admin"
admin_password: str = "admin"
front_username: str = "2"
front_password: str = "123456"
# 截图配置
screenshot_output_dir: str = "./screenshots"
screenshot_delay: int = 2000
admin_menu_selector: str = ".el-menu-item, .el-sub-menu__title"
front_menu_selector: str = ".nav-item, .menu-item"
# 轮播图片配置
swiper_source_folder: str = "bg_pic"
swiper_target_subpath: str = "src/main/resources/static/file"
swiper_target_names: List[str] = None
swiper_count: int = 3
# 登录背景图配置
login_vue_subpath: str = "src/views/login.vue"
login_background_images: List[str] = None
# 代码整理配置
code_source_folder: str = "code" # 桌面上的 code 文件夹名称
code_target_path: str = r"D:\code" # 整理代码的目标目录(默认 D:\code
# 执行选项
show_cmd_window: bool = True # 是否显示 CMD 窗口
def __post_init__(self):
if self.swiper_target_names is None:
self.swiper_target_names = ["swiperPicture1.jpg", "swiperPicture2.jpg", "swiperPicture3.jpg"]
if self.login_background_images is None:
self.login_background_images = LOGIN_BACKGROUND_IMAGES.copy()
class ProjectScreenshotAutomation:
"""项目截图自动化类"""
def __init__(self, config: ProjectConfig, log_callback: Optional[Callable[[str], None]] = None):
self.config = config
self.log_callback = log_callback or print
self.processes: List[subprocess.Popen] = []
def log(self, message: str):
"""输出日志"""
self.log_callback(message)
def execute_bat(self, bat_path: str, cwd: str = None, timeout: int = 300) -> Tuple[bool, str]:
"""
执行 bat 脚本并返回结果
Args:
bat_path: bat 文件路径
cwd: 工作目录
timeout: 超时时间
Returns:
(success, message)
"""
try:
if not os.path.exists(bat_path):
return False, f"脚本文件不存在: {bat_path}"
self.log(f"执行脚本: {bat_path}")
self.log(f"项目路径参数: {self.config.project_path}")
# 根据配置决定是否显示 CMD 窗口
startupinfo = subprocess.STARTUPINFO()
creationflags = 0
if self.config.show_cmd_window:
# 显示 CMD 窗口
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = 1 # SW_SHOWNORMAL
creationflags = 0x00000010 # CREATE_NEW_CONSOLE
self.log("CMD 窗口: 显示")
else:
# 隐藏 CMD 窗口
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = 0 # SW_HIDE
self.log("CMD 窗口: 隐藏")
# 构建命令,传入项目路径作为参数
cmd = f'"{bat_path}" "{self.config.project_path}"'
process = subprocess.Popen(
cmd,
cwd=cwd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
startupinfo=startupinfo,
creationflags=creationflags
)
self.processes.append(process)
try:
stdout_bytes, stderr_bytes = process.communicate(timeout=timeout)
# 尝试多种编码解码输出
stdout = self._decode_output(stdout_bytes)
stderr = self._decode_output(stderr_bytes)
if process.returncode == 0:
return True, stdout
else:
return False, f"脚本执行失败: {stderr}"
except subprocess.TimeoutExpired:
process.kill()
return False, f"脚本执行超时({timeout}秒)"
except Exception as e:
return False, f"执行脚本出错: {str(e)}"
def _decode_output(self, data: bytes) -> str:
"""尝试多种编码解码字节数据"""
if not data:
return ""
# 尝试的编码列表GBK/GB2312/CP936 优先,因为 bat 脚本使用 chcp 936
encodings = ['gbk', 'gb2312', 'cp936', 'utf-8', 'latin-1']
for encoding in encodings:
try:
return data.decode(encoding, errors='replace')
except Exception:
continue
# 如果都失败,使用 latin-1 解码(不会丢失数据)
return data.decode('latin-1', errors='replace')
def clear_directory(self, path: str):
"""删除目录下的所有内容"""
if os.path.exists(path):
for item in os.listdir(path):
item_path = os.path.join(path, item)
if os.path.isfile(item_path) or os.path.islink(item_path):
os.unlink(item_path)
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
self.log(f"已清空目录: {path}")
else:
self.log(f"目录不存在: {path}")
def get_first_subdirectory(self, path: str) -> Optional[str]:
"""获取指定目录下的第一个子文件夹"""
if os.path.exists(path):
for item in os.listdir(path):
item_path = os.path.join(path, item)
if os.path.isdir(item_path):
return item_path
return None
def copy_directory(self, src: str, dst: str):
"""复制目录及其内容"""
if os.path.exists(src):
shutil.copytree(src, dst, dirs_exist_ok=True)
self.log(f"已复制 {src}{dst}")
else:
self.log(f"源目录不存在: {src}")
def get_filename_without_extension(self, file_path: str) -> str:
"""获取文件名(不带后缀)"""
return os.path.splitext(os.path.basename(file_path))[0]
def organize_project_code(self) -> Optional[str]:
"""
整理项目代码
步骤
1. 清空桌面 code 文件夹
2. 把项目路径的代码复制到桌面 code 文件夹
3. 在桌面 code 文件夹中找前后端代码和 db
4. 放到代码目标路径下的 \front\server\db 文件夹
5. 返回 SQL 目录下的第一个文件名不带后缀
Returns:
第一个 SQL 文件名不带后缀失败返回 None
"""
try:
self.log("开始整理项目代码...")
# 步骤 1: 清空桌面 code 文件夹和截图文件夹
desktop_code_path = os.path.join(self.config.desktop_path, self.config.code_source_folder)
screenshot_path = os.path.join(self.config.desktop_path, "截图")
self.log(f"步骤1: 清空桌面 code 文件夹 {desktop_code_path}...")
self.clear_directory(desktop_code_path)
os.makedirs(desktop_code_path, exist_ok=True)
self.log(f"步骤1: 清空桌面截图文件夹 {screenshot_path}...")
self.clear_directory(screenshot_path)
os.makedirs(screenshot_path, exist_ok=True)
# 步骤 2: 把项目路径的代码复制到桌面 code 文件夹
self.log("\n步骤2: 复制项目代码到桌面 code 文件夹...")
project_source_path = os.path.join(self.config.project_path, "源码")
if os.path.exists(project_source_path):
self.log(f"{project_source_path} 复制代码到 {desktop_code_path}")
self.copy_directory(project_source_path, desktop_code_path)
else:
self.log(f"警告:项目源码文件夹不存在: {project_source_path}")
self.log("将直接使用桌面 code 文件夹中的现有代码")
# 步骤 3: 在桌面 code 文件夹中找第一个子文件夹
self.log("\n步骤3: 查找项目子目录...")
first_subdir = self.get_first_subdirectory(desktop_code_path)
if not first_subdir:
self.log("未找到桌面 code 文件夹下的子目录")
return None
self.log(f"找到的子目录: {first_subdir}")
# 步骤 4: 清空并创建代码目标路径下的目录
self.log(f"\n步骤4: 准备代码目标路径 {self.config.code_target_path}...")
target_dirs = [
os.path.join(self.config.code_target_path, "front"),
os.path.join(self.config.code_target_path, "server"),
os.path.join(self.config.code_target_path, "db"),
]
for dir_path in target_dirs:
self.clear_directory(dir_path)
os.makedirs(dir_path, exist_ok=True)
# 步骤 5: 复制 client_code 和 manage_code 到代码目标路径的 front
self.log("\n步骤5: 复制前端代码...")
front_target = os.path.join(self.config.code_target_path, "front")
for folder in ["client_code", "manage_code"]:
src = os.path.join(first_subdir, folder)
dst = os.path.join(front_target, folder)
self.copy_directory(src, dst)
# 步骤 6: 复制 server_code 到代码目标路径的 server
self.log("\n步骤6: 复制后端代码...")
server_src = os.path.join(first_subdir, "server_code")
server_dst = os.path.join(self.config.code_target_path, "server", "server_code")
self.copy_directory(server_src, server_dst)
# 步骤 7: 复制 sql 文件到代码目标路径的 db
self.log("\n步骤7: 复制 SQL 文件...")
sql_src = os.path.join(first_subdir, "server_code", "sql")
db_target = os.path.join(self.config.code_target_path, "db")
first_sql_filename = None
if os.path.exists(sql_src):
for item in os.listdir(sql_src):
item_src = os.path.join(sql_src, item)
item_dst = os.path.join(db_target, item)
if os.path.isfile(item_src):
shutil.copy2(item_src, item_dst)
self.log(f"已复制文件 {item}{db_target}")
if first_sql_filename is None:
first_sql_filename = self.get_filename_without_extension(item)
elif os.path.isdir(item_src):
shutil.copytree(item_src, item_dst, dirs_exist_ok=True)
self.log(f"已复制目录 {item}{db_target}")
else:
self.log(f"SQL 目录不存在: {sql_src}")
return None
# 步骤 8: 返回 SQL 目录下的第一个文件名(不带后缀)
self.log("\n步骤8: 代码整理完成!")
if first_sql_filename:
self.log(f"SQL 文件名(不带后缀): {first_sql_filename}")
return first_sql_filename
else:
self.log("SQL 目录中没有文件,无法获取文件名")
return None
except Exception as e:
self.log(f"整理项目代码出错: {str(e)}")
return None
def execute_bat_async(self, bat_path: str, cwd: str = None) -> subprocess.Popen:
"""
异步执行 bat 脚本用于启动服务
Args:
bat_path: bat 文件路径
cwd: 工作目录
Returns:
subprocess.Popen 对象
"""
if not os.path.exists(bat_path):
raise FileNotFoundError(f"脚本文件不存在: {bat_path}")
self.log(f"异步执行脚本: {bat_path}")
self.log(f"项目路径参数: {self.config.project_path}")
# 根据配置决定是否显示 CMD 窗口
startupinfo = subprocess.STARTUPINFO()
creationflags = 0
if self.config.show_cmd_window:
# 显示 CMD 窗口
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = 1 # SW_SHOWNORMAL
creationflags = 0x00000010 # CREATE_NEW_CONSOLE
self.log("CMD 窗口: 显示")
else:
# 隐藏 CMD 窗口
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = 0 # SW_HIDE
self.log("CMD 窗口: 隐藏")
# 构建命令,传入项目路径作为参数
cmd = f'"{bat_path}" "{self.config.project_path}"'
# 对于启动服务的场景,不捕获输出,让用户可以直接在 CMD 窗口中看到输出并交互
if self.config.show_cmd_window:
# 显示 CMD 窗口,不捕获输出
# 使用 cmd /c 启动 bat 文件,确保 CMD 窗口正确显示
full_cmd = f'cmd /c "{cmd}"'
process = subprocess.Popen(
full_cmd,
cwd=cwd,
shell=False,
startupinfo=startupinfo,
creationflags=creationflags
)
else:
# 隐藏 CMD 窗口,捕获输出
process = subprocess.Popen(
cmd,
cwd=cwd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
startupinfo=startupinfo,
creationflags=creationflags
)
self.processes.append(process)
return process
def wait_for_service(self, url: str, timeout: int = 120) -> bool:
"""
等待服务启动完成
Args:
url: 服务健康检查 URL
timeout: 超时时间
Returns:
是否启动成功
"""
import urllib.request
import urllib.error
self.log(f"等待服务启动: {url} (超时时间: {timeout}秒)")
start_time = time.time()
check_count = 0
while time.time() - start_time < timeout:
check_count += 1
elapsed = int(time.time() - start_time)
# 每10秒输出一次日志
if check_count % 5 == 0:
self.log(f" 已等待 {elapsed} 秒,继续检查...")
try:
req = urllib.request.Request(url, method='GET')
req.add_header('User-Agent', 'Mozilla/5.0')
with urllib.request.urlopen(req, timeout=5) as response:
if response.status == 200:
self.log(f"服务已启动: {url} (共等待 {elapsed} 秒)")
return True
except urllib.error.HTTPError as e:
# 某些服务返回 401/403 也表示服务已启动
if e.code in [401, 403]:
self.log(f"服务已启动(需要认证): {url} (共等待 {elapsed} 秒)")
return True
except Exception as e:
# 每20秒输出一次错误信息
if check_count % 10 == 0:
self.log(f" 检查失败: {str(e)[:50]}")
time.sleep(2)
self.log(f"服务启动超时: {url} (超时时间: {timeout}秒)")
return False
def import_sql(self, sql_file: str, db_config: dict = None) -> Tuple[bool, str]:
"""
导入 SQL 脚本到数据库
Args:
sql_file: SQL 文件路径
db_config: 数据库配置
Returns:
(success, message)
"""
try:
if not os.path.exists(sql_file):
return False, f"SQL 文件不存在: {sql_file}"
self.log(f"导入 SQL 文件: {sql_file}")
# 使用 mysql 命令行导入
if db_config:
host = db_config.get('host', 'localhost')
port = db_config.get('port', 3306)
user = db_config.get('user', 'root')
password = db_config.get('password', '')
database = db_config.get('database', '')
cmd = f'mysql -h {host} -P {port} -u {user}'
if password:
cmd += f' -p{password}'
if database:
cmd += f' {database}'
cmd += f' < "{sql_file}"'
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
return False, f"SQL 导入失败: {result.stderr}"
self.log("SQL 导入成功")
# 执行额外的 SQL 语句
self.log("执行额外的 SQL 更新操作...")
self._execute_post_import_sql(host, port, user, password, database)
return True, "SQL 导入成功"
else:
# 如果没有数据库配置,尝试执行 run_sql.bat
bat_path = os.path.join(self.config.project_path, self.config.run_sql)
return self.execute_bat(bat_path, cwd=self.config.project_path)
except Exception as e:
return False, f"导入 SQL 出错: {str(e)}"
def _execute_post_import_sql(self, host: str, port: int, user: str, password: str, database: str):
"""
导入 SQL 后执行的额外操作
Args:
host: 数据库主机
port: 数据库端口
user: 用户名
password: 密码
database: 数据库名
"""
try:
import pymysql
# 连接数据库
conn = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
database=database,
charset='utf8mb4'
)
with conn.cursor() as cursor:
# 查询包含 ming 或 hao 的列
query = """
SELECT TABLE_NAME, COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE (COLUMN_NAME LIKE '%ming%' OR COLUMN_NAME LIKE '%hao%')
AND TABLE_SCHEMA = %s
"""
cursor.execute(query, (database,))
results = cursor.fetchall()
self.log(f"找到 {len(results)} 个匹配的列")
# 执行更新
for table_name, column_name in results:
update_sql = f"UPDATE `{table_name}` SET `{column_name}` = '2' WHERE `{column_name}` LIKE '%2%'"
try:
cursor.execute(update_sql)
affected_rows = cursor.rowcount
conn.commit()
self.log(f"更新 {table_name}.{column_name}: 影响 {affected_rows}")
except Exception as e:
self.log(f"更新 {table_name}.{column_name} 失败: {str(e)}")
conn.rollback()
conn.close()
self.log("额外 SQL 操作执行完成")
except ImportError:
self.log("警告:未安装 pymysql跳过额外 SQL 操作")
except Exception as e:
self.log(f"执行额外 SQL 操作失败: {str(e)}")
def get_random_images(self, bg_pic_folder: str, count: int = 3) -> List[str]:
"""
从指定文件夹随机获取指定数量的图片文件
Args:
bg_pic_folder: 图片文件夹路径
count: 需要获取的图片数量
Returns:
随机选择的图片文件路径列表
"""
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.gif',
'*.JPG', '*.JPEG', '*.PNG', '*.BMP', '*.GIF']
all_images = []
for extension in image_extensions:
pattern = os.path.join(bg_pic_folder, extension)
all_images.extend(glob.glob(pattern))
if len(all_images) < count:
self.log(f"警告:文件夹中只有 {len(all_images)} 张图片,少于需要的 {count}")
return all_images
selected_images = random.sample(all_images, count)
self.log(f"{len(all_images)} 张图片中随机选择了 {len(selected_images)} 张:")
for i, img in enumerate(selected_images, 1):
self.log(f" {i}. {os.path.basename(img)}")
return selected_images
def copy_images_to_target(self, source_images: List[str], target_folder: str) -> bool:
"""
将源图片复制到目标文件夹并重命名
Args:
source_images: 源图片文件路径列表
target_folder: 目标文件夹路径
Returns:
是否复制成功
"""
os.makedirs(target_folder, exist_ok=True)
for i, source_image in enumerate(source_images):
if i >= len(self.config.swiper_target_names):
break
target_path = os.path.join(target_folder, self.config.swiper_target_names[i])
try:
shutil.copy2(source_image, target_path)
self.log(f"成功复制: {os.path.basename(source_image)} -> {self.config.swiper_target_names[i]}")
except Exception as e:
self.log(f"复制失败: {os.path.basename(source_image)} -> {self.config.swiper_target_names[i]}, 错误: {str(e)}")
return False
return True
def replace_swiper_images(self) -> str:
"""
更换前台轮播图片
Returns:
执行结果信息
"""
try:
bg_pic_folder = os.path.join(self.config.desktop_path, self.config.swiper_source_folder)
# 使用代码目标路径下的 server/server_code 目录
target_folder = os.path.join(self.config.code_target_path, "server", "server_code", "src", "main", "resources", "static", "file")
self.log(f"源文件夹: {bg_pic_folder}")
self.log(f"目标文件夹: {target_folder}")
if not os.path.exists(bg_pic_folder):
error_msg = f"错误:源文件夹不存在: {bg_pic_folder}"
self.log(error_msg)
return error_msg
selected_images = self.get_random_images(bg_pic_folder, self.config.swiper_count)
if not selected_images:
error_msg = "错误:源文件夹中没有找到图片文件"
self.log(error_msg)
return error_msg
success = self.copy_images_to_target(selected_images, target_folder)
if success:
result_msg = f"成功!已随机选择 {len(selected_images)} 张图片并覆盖到目标位置"
self.log(result_msg)
return result_msg
else:
error_msg = "复制过程中出现错误"
self.log(error_msg)
return error_msg
except Exception as e:
error_msg = f"程序执行出错: {str(e)}"
self.log(error_msg)
return error_msg
def update_login_vue_background(self) -> bool:
"""
修改 login.vue 文件中的背景图片链接
Returns:
是否更新成功
"""
# 使用代码目标路径下的 front/manage_code 目录
file_path = os.path.join(self.config.code_target_path, "front", "manage_code", "src", "views", "login.vue")
try:
if not os.path.exists(file_path):
self.log(f"错误:文件 {file_path} 不存在")
return False
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
self.log(f"成功读取文件:{file_path}")
selected_image = random.choice(self.config.login_background_images)
self.log(f"随机选择的图片:{selected_image}")
# 使用正则表达式匹配并替换背景图片URL
pattern = r'background-image:\s*url\([^)]+\)'
if not re.search(pattern, content):
self.log("警告未找到匹配的背景图片URL模式")
return False
new_content = re.sub(pattern, f'background-image: url({selected_image})', content)
with open(file_path, 'w', encoding='utf-8') as file:
file.write(new_content)
self.log(f"成功更新背景图片为:{selected_image}")
return True
except Exception as e:
self.log(f"处理文件时出错:{str(e)}")
return False
def _get_bat_path(self, bat_name: str) -> str:
"""获取 bat 文件的完整路径(优先从桌面 new 文件夹查找)"""
# 首先检查桌面 new 文件夹
desktop_bat_path = os.path.join(self.config.desktop_path, self.config.bat_folder, bat_name)
if os.path.exists(desktop_bat_path):
return desktop_bat_path
# 其次检查项目路径
project_bat_path = os.path.join(self.config.project_path, bat_name)
if os.path.exists(project_bat_path):
return project_bat_path
# 默认返回桌面 new 文件夹路径(即使不存在,让调用方处理错误)
return desktop_bat_path
def install_server(self) -> Tuple[bool, str]:
"""Maven 构建后端"""
bat_path = self._get_bat_path(self.config.install_server)
return self.execute_bat(bat_path, cwd=self.config.project_path, timeout=600)
def install_front(self) -> Tuple[bool, str]:
"""安装前台前端依赖"""
bat_path = self._get_bat_path(self.config.install_front)
return self.execute_bat(bat_path, cwd=self.config.project_path, timeout=300)
def install_admin(self) -> Tuple[bool, str]:
"""安装后台前端依赖"""
bat_path = self._get_bat_path(self.config.install_admin)
return self.execute_bat(bat_path, cwd=self.config.project_path, timeout=300)
def run_sql_script(self) -> Tuple[bool, str]:
"""导入 SQL 脚本"""
# SQL 脚本优先从桌面 new 文件夹查找
desktop_sql_path = os.path.join(self.config.desktop_path, self.config.bat_folder, self.config.run_sql)
if os.path.exists(desktop_sql_path):
# 执行 bat 文件导入 SQL
success, msg = self.execute_bat(desktop_sql_path, cwd=self.config.project_path, timeout=300)
return success, msg
# 其次检查项目路径下的 SQL 脚本
sql_file = os.path.join(self.config.project_path, self.config.sql_script_path)
# 尝试从 SQL 文件中提取数据库名
database_name = self._extract_database_from_sql(sql_file)
if database_name:
db_config = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': '',
'database': database_name
}
return self.import_sql(sql_file, db_config)
else:
return self.import_sql(sql_file)
def _extract_database_from_sql(self, sql_file: str) -> str:
"""
SQL 文件中提取数据库名
Args:
sql_file: SQL 文件路径
Returns:
数据库名如果无法提取则返回空字符串
"""
try:
if not os.path.exists(sql_file):
return ""
with open(sql_file, 'r', encoding='utf-8') as f:
content = f.read()
# 尝试匹配 USE database; 或 CREATE DATABASE database; 等语句
import re
patterns = [
r"USE\s+`?(\w+)`?\s*;",
r"CREATE\s+DATABASE\s+(?:IF\s+NOT\s+EXISTS\s+)?`?(\w+)`?",
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
database_name = match.group(1)
self.log(f"从 SQL 文件中提取到数据库名: {database_name}")
return database_name
return ""
except Exception as e:
self.log(f"提取数据库名失败: {str(e)}")
return ""
def start_backend(self) -> subprocess.Popen:
"""启动后端服务"""
bat_path = self._get_bat_path(self.config.run_server)
return self.execute_bat_async(bat_path, cwd=self.config.project_path)
def start_front(self) -> subprocess.Popen:
"""启动前台前端"""
bat_path = self._get_bat_path(self.config.run_front)
return self.execute_bat_async(bat_path, cwd=self.config.project_path)
def start_admin(self) -> subprocess.Popen:
"""启动后台前端"""
bat_path = self._get_bat_path(self.config.run_admin)
return self.execute_bat_async(bat_path, cwd=self.config.project_path)
def stop_all_processes(self):
"""停止所有启动的进程"""
self.log("停止所有服务进程...")
for process in self.processes:
try:
process.terminate()
process.wait(timeout=5)
except Exception:
try:
process.kill()
except Exception:
pass
self.processes.clear()
async def capture_screenshots_with_playwright(self, output_dir: str) -> List[str]:
"""
使用 Playwright 进行截图同时截取前后台
Args:
output_dir: 截图输出目录
Returns:
截图文件路径列表
"""
from playwright.async_api import async_playwright
screenshots = []
os.makedirs(output_dir, exist_ok=True)
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False)
context = await browser.new_context(viewport={'width': 1920, 'height': 1080})
try:
# 后台截图
self.log("开始后台截图...")
admin_screenshots = await self._capture_admin_screenshots(context, output_dir)
screenshots.extend(admin_screenshots)
# 前台截图(如果有)
if self.config.has_front:
self.log("开始前台截图...")
front_screenshots = await self._capture_front_screenshots(context, output_dir)
screenshots.extend(front_screenshots)
finally:
await context.close()
await browser.close()
return screenshots
async def capture_admin_screenshots_only(self, output_dir: str) -> List[str]:
"""
仅截取后台页面
Args:
output_dir: 截图输出目录
Returns:
截图文件路径列表
"""
from playwright.async_api import async_playwright
screenshots = []
os.makedirs(output_dir, exist_ok=True)
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False)
context = await browser.new_context(viewport={'width': 1920, 'height': 1080})
try:
self.log("开始后台截图...")
admin_screenshots = await self._capture_admin_screenshots(context, output_dir)
screenshots.extend(admin_screenshots)
finally:
await context.close()
await browser.close()
return screenshots
async def capture_front_screenshots_only(self, output_dir: str) -> List[str]:
"""
仅截取前台页面
Args:
output_dir: 截图输出目录
Returns:
截图文件路径列表
"""
from playwright.async_api import async_playwright
screenshots = []
os.makedirs(output_dir, exist_ok=True)
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False)
context = await browser.new_context(viewport={'width': 1920, 'height': 1080})
try:
self.log("开始前台截图...")
front_screenshots = await self._capture_front_screenshots(context, output_dir)
screenshots.extend(front_screenshots)
finally:
await context.close()
await browser.close()
return screenshots
async def _capture_menu_recursive(self, page, output_dir: str, screenshots: List[str], captured_menus: set, depth: int, max_depth: int = 3, processed_items: set = None):
"""
递归截图菜单
Args:
page: Playwright page 对象
output_dir: 截图输出目录
screenshots: 截图路径列表
captured_menus: 已截图的菜单名称集合
depth: 当前递归深度
max_depth: 最大递归深度
processed_items: 已处理的菜单项文本集合避免重复处理
"""
if depth > max_depth:
return
if processed_items is None:
processed_items = set()
# 查找所有可见的菜单项(包括子菜单和普通菜单项)
# 使用更通用的选择器获取所有 el-menu 下的 li 元素
menu_selectors = [
"xpath=//ul[contains(@class, 'el-menu')]//li",
self.config.admin_menu_selector
]
menu_items = []
for selector in menu_selectors:
try:
items = await page.query_selector_all(selector)
if items:
menu_items = items
self.log(f"使用选择器定位菜单成功: {selector}, 找到 {len(items)}")
break
except Exception as e:
self.log(f"选择器 {selector} 失败: {str(e)}")
continue
self.log(f"{depth} 层发现 {len(menu_items)} 个菜单项")
for i, item in enumerate(menu_items):
try:
# 获取菜单文本
text = await item.text_content()
if not text:
continue
text = text.strip().replace('/', '_').replace('\\', '_')
if not text or len(text) > 50:
continue
# 检查是否已经处理过这个菜单项
if text in processed_items:
self.log(f"菜单 '{text}' 已处理过,跳过")
continue
# 检查元素是否可见
is_visible = await item.is_visible()
if not is_visible:
continue
# 标记为已处理
processed_items.add(text)
# 检查是否是子菜单(有 el-sub-menu 类)
try:
is_submenu = await item.evaluate('el => el.classList.contains("el-sub-menu")')
except Exception:
is_submenu = False
# 检查是否有子菜单项ul 元素)
has_children = False
try:
children = await item.query_selector_all(':scope > ul.el-menu--inline, :scope > ul.el-menu')
has_children = len(children) > 0
except Exception:
pass
self.log(f"处理菜单: {text}, is_submenu={is_submenu}, has_children={has_children}")
if is_submenu or has_children:
# 点击展开子菜单
self.log(f"点击展开子菜单: {text}")
try:
await item.click(force=True, timeout=3000)
except Exception:
try:
await item.evaluate('el => el.click()')
except Exception:
continue
await asyncio.sleep(1.5)
# 递归处理子菜单(只处理当前子菜单下的子项)
await self._capture_submenu_items(page, item, output_dir, screenshots, captured_menus, depth + 1, max_depth, processed_items)
else:
# 普通菜单项,检查是否已截图
if text in captured_menus:
self.log(f"菜单 '{text}' 已截图,跳过")
continue
# 点击菜单
self.log(f"点击菜单: {text}")
try:
await item.click(force=True, timeout=3000)
except Exception:
try:
await item.evaluate('el => el.click()')
except Exception:
continue
await asyncio.sleep(self.config.screenshot_delay / 1000)
# 截图
screenshot_path = os.path.join(output_dir, f'{text}管理.png')
await page.screenshot(path=screenshot_path, full_page=True)
screenshots.append(screenshot_path)
captured_menus.add(text)
self.log(f"截图已保存: {screenshot_path}")
except Exception as e:
self.log(f"菜单处理失败: {str(e)}")
continue
async def _capture_submenu_items(self, page, parent_item, output_dir: str, screenshots: List[str], captured_menus: set, depth: int, max_depth: int, processed_items: set):
"""
处理子菜单项只在父元素下查找
Args:
page: Playwright page 对象
parent_item: 父菜单元素
output_dir: 截图输出目录
screenshots: 截图路径列表
captured_menus: 已截图的菜单名称集合
depth: 当前递归深度
max_depth: 最大递归深度
processed_items: 已处理的菜单项文本集合
"""
if depth > max_depth:
return
# 在父元素下查找子菜单项
submenu_selectors = [
':scope > ul.el-menu--inline > li',
':scope > ul > li',
]
submenu_items = []
for selector in submenu_selectors:
try:
items = await parent_item.query_selector_all(selector)
if items:
submenu_items = items
self.log(f"在子菜单下找到 {len(items)} 个子项")
break
except Exception:
continue
for item in submenu_items:
try:
# 获取菜单文本
text = await item.text_content()
if not text:
continue
text = text.strip().replace('/', '_').replace('\\', '_')
if not text or len(text) > 50:
continue
# 检查是否已经处理过
if text in processed_items:
self.log(f"子菜单 '{text}' 已处理过,跳过")
continue
# 检查元素是否可见
is_visible = await item.is_visible()
if not is_visible:
continue
# 标记为已处理
processed_items.add(text)
# 检查是否有子菜单
try:
is_submenu = await item.evaluate('el => el.classList.contains("el-sub-menu")')
except Exception:
is_submenu = False
has_children = False
try:
children = await item.query_selector_all(':scope > ul.el-menu--inline, :scope > ul.el-menu')
has_children = len(children) > 0
except Exception:
pass
self.log(f"处理子菜单: {text}, is_submenu={is_submenu}, has_children={has_children}")
if is_submenu or has_children:
# 点击展开子菜单
self.log(f"点击展开子菜单: {text}")
try:
await item.click(force=True, timeout=3000)
except Exception:
try:
await item.evaluate('el => el.click()')
except Exception:
continue
await asyncio.sleep(1.5)
# 递归处理子菜单
await self._capture_submenu_items(page, item, output_dir, screenshots, captured_menus, depth + 1, max_depth, processed_items)
else:
# 普通菜单项,检查是否已截图
if text in captured_menus:
self.log(f"菜单 '{text}' 已截图,跳过")
continue
# 点击菜单
self.log(f"点击菜单: {text}")
try:
await item.click(force=True, timeout=3000)
except Exception:
try:
await item.evaluate('el => el.click()')
except Exception:
continue
await asyncio.sleep(self.config.screenshot_delay / 1000)
# 截图
screenshot_path = os.path.join(output_dir, f'{text}管理.png')
await page.screenshot(path=screenshot_path, full_page=True)
screenshots.append(screenshot_path)
captured_menus.add(text)
self.log(f"截图已保存: {screenshot_path}")
except Exception as e:
self.log(f"子菜单处理失败: {str(e)}")
continue
async def _capture_admin_screenshots(self, context, output_dir: str) -> List[str]:
"""截取后台页面"""
screenshots = []
page = await context.new_page()
try:
# 访问登录页面
self.log(f"访问后台登录页面: {self.config.admin_login_url}")
await page.goto(self.config.admin_login_url, wait_until='networkidle')
await asyncio.sleep(2)
# 截取登录页面
login_screenshot_path = os.path.join(output_dir, '后台登录.png')
await page.screenshot(path=login_screenshot_path, full_page=True)
screenshots.append(login_screenshot_path)
self.log(f"登录页面截图已保存: {login_screenshot_path}")
# 输入用户名密码
self.log(f"登录后台: {self.config.admin_username}")
# 用户名输入框选择器(按优先级顺序)
username_selectors = [
"xpath=//div[contains(text(), '用户名:')]/following-sibling::input",
"xpath=//input[@placeholder='请输入账号']",
"input[placeholder='请输入用户名']",
"input[name='username']",
"#username",
"input[type='text']"
]
# 密码输入框选择器(按优先级顺序)
password_selectors = [
"xpath=//div[contains(text(), '密码:')]/following-sibling::input",
"xpath=//input[@placeholder='请输入密码']",
"input[placeholder='请输入密码']",
"input[name='password']",
"#password",
"input[type='password']"
]
# 登录按钮选择器
submit_selectors = [
"xpath=//button[.//span[text()='登录']]",
"button[type='submit']",
".login-btn",
".el-button--primary",
"button:has-text('登录')"
]
# 输入用户名
username_filled = False
for selector in username_selectors:
try:
await page.fill(selector, self.config.admin_username, timeout=2000)
self.log(f"用户名输入框定位成功: {selector}")
username_filled = True
break
except Exception:
continue
if not username_filled:
self.log("警告:未能找到用户名输入框")
# 输入密码
password_filled = False
for selector in password_selectors:
try:
await page.fill(selector, self.config.admin_password, timeout=2000)
self.log(f"密码输入框定位成功: {selector}")
password_filled = True
break
except Exception:
continue
if not password_filled:
self.log("警告:未能找到密码输入框")
# 点击登录
login_clicked = False
for selector in submit_selectors:
try:
await page.click(selector, timeout=2000)
self.log(f"登录按钮定位成功: {selector}")
login_clicked = True
break
except Exception:
continue
if not login_clicked:
self.log("警告:未能找到登录按钮")
# 等待登录成功
await asyncio.sleep(3)
# 访问后台首页
self.log(f"访问后台首页: {self.config.admin_url}")
await page.goto(self.config.admin_url, wait_until='networkidle')
await asyncio.sleep(self.config.screenshot_delay / 1000)
# 截取首页
screenshot_path = os.path.join(output_dir, '后台首页.png')
await page.screenshot(path=screenshot_path, full_page=True)
screenshots.append(screenshot_path)
self.log(f"截图已保存: {screenshot_path}")
# 识别菜单并截图
try:
# 用于记录已截图的菜单名称,避免重复
captured_menus = set()
# 递归截图菜单
await self._capture_menu_recursive(page, output_dir, screenshots, captured_menus, 0)
except Exception as e:
self.log(f"菜单识别失败: {str(e)}")
finally:
await page.close()
return screenshots
async def _capture_front_screenshots(self, context, output_dir: str) -> List[str]:
"""截取前台页面"""
screenshots = []
page = await context.new_page()
try:
# 访问登录页面
self.log(f"访问前台登录页面: {self.config.front_login_url}")
await page.goto(self.config.front_login_url, wait_until='networkidle')
await asyncio.sleep(2)
# 输入用户名密码
self.log(f"登录前台: {self.config.front_username}")
# 用户名输入框选择器(按优先级顺序)
username_selectors = [
"xpath=//div[contains(text(), '账号:')]/following-sibling::input",
"xpath=//input[@placeholder='请输入账号']",
"input[placeholder='请输入用户名']",
"input[name='username']",
"#username",
"input[type='text']"
]
# 密码输入框选择器(按优先级顺序)
password_selectors = [
"xpath=//div[contains(text(), '密码:')]/following-sibling::input",
"xpath=//input[@placeholder='请输入密码']",
"input[placeholder='请输入密码']",
"input[name='password']",
"#password",
"input[type='password']"
]
# 登录按钮选择器
submit_selectors = [
"xpath=//button[.//span[text()='登录']]",
"button[type='submit']",
".login-btn",
".el-button--primary",
"button:has-text('登录')"
]
# 输入用户名
username_filled = False
for selector in username_selectors:
try:
await page.fill(selector, self.config.front_username, timeout=2000)
self.log(f"用户名输入框定位成功: {selector}")
username_filled = True
break
except Exception:
continue
if not username_filled:
self.log("警告:未能找到用户名输入框")
# 输入密码
password_filled = False
for selector in password_selectors:
try:
await page.fill(selector, self.config.front_password, timeout=2000)
self.log(f"密码输入框定位成功: {selector}")
password_filled = True
break
except Exception:
continue
if not password_filled:
self.log("警告:未能找到密码输入框")
# 点击登录
login_clicked = False
for selector in submit_selectors:
try:
await page.click(selector, timeout=2000)
self.log(f"登录按钮定位成功: {selector}")
login_clicked = True
break
except Exception:
continue
if not login_clicked:
self.log("警告:未能找到登录按钮")
# 等待登录成功
await asyncio.sleep(3)
# 访问前台首页 http://localhost:8082
front_home_url = "http://localhost:8082"
self.log(f"访问前台首页: {front_home_url}")
await page.goto(front_home_url, wait_until='networkidle')
await asyncio.sleep(self.config.screenshot_delay / 1000)
# 识别菜单并截图
try:
# 使用 xpath 查找 el-menu 下的菜单项
menu_selector = "xpath=//ul[contains(@class, 'el-menu')]//li"
menu_items = await page.query_selector_all(menu_selector)
self.log(f"发现 {len(menu_items)} 个菜单项")
# 用于记录已截图的菜单名称,避免重复
captured_front_menus = set()
for i, item in enumerate(menu_items[:15]): # 最多截图15个菜单
try:
# 获取菜单文本
text = await item.text_content()
if not text:
continue
text = text.strip().replace('/', '_').replace('\\', '_')
if not text or len(text) > 50:
continue
# 检查是否已截图
if text in captured_front_menus:
self.log(f"菜单 '{text}' 已截图,跳过")
continue
# 检查元素是否可见
is_visible = await item.is_visible()
if not is_visible:
continue
# 点击菜单
self.log(f"点击菜单: {text}")
try:
await item.click(force=True, timeout=3000)
except Exception:
try:
await item.evaluate('el => el.click()')
except Exception:
continue
await asyncio.sleep(self.config.screenshot_delay / 1000)
# 截图,文件名就是菜单名称
screenshot_path = os.path.join(output_dir, f'{text}.png')
await page.screenshot(path=screenshot_path, full_page=True)
screenshots.append(screenshot_path)
captured_front_menus.add(text)
self.log(f"截图已保存: {screenshot_path}")
except Exception as e:
self.log(f"菜单截图失败: {str(e)}")
continue
except Exception as e:
self.log(f"菜单识别失败: {str(e)}")
finally:
await page.close()
return screenshots
def finalize_screenshots(self, folder_path: str):
"""
截图收尾工作按创建时间排序并重命名截图文件
Args:
folder_path: 截图文件夹路径
"""
try:
self.log("=" * 50)
self.log("开始执行截图收尾工作...")
# 获取文件夹中所有文件
files = os.listdir(folder_path)
# 筛选出图片文件
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']
image_files = [f for f in files if os.path.splitext(f)[1].lower() in image_extensions]
if not image_files:
self.log("未找到图片文件")
return
self.log(f"找到 {len(image_files)} 个图片文件")
# 根据创建时间排序图片文件
image_files.sort(key=lambda f: os.path.getctime(os.path.join(folder_path, f)))
# 开始重命名
renamed_count = 0
for index, filename in enumerate(image_files, start=1):
# 获取文件扩展名
ext = os.path.splitext(filename)[1]
file_new_name = os.path.splitext(filename)[0]
# 构建新文件名
new_name = f"{index}_{file_new_name}{ext}"
# 构建完整旧路径和新路径
old_path = os.path.join(folder_path, filename)
new_path = os.path.join(folder_path, new_name)
# 重命名文件
try:
os.rename(old_path, new_path)
self.log(f"重命名: {filename} -> {new_name}")
renamed_count += 1
except Exception as e:
self.log(f"无法重命名 {filename}: {e}")
self.log(f"截图收尾工作完成,共重命名 {renamed_count} 个文件")
except Exception as e:
self.log(f"截图收尾工作失败: {str(e)}")
def organize_and_zip_project(self, override_name: Optional[str] = None) -> Optional[str]:
"""
整理项目并打包成压缩包
将桌面路径下的code文件夹下的第一个子文件夹名称改成项目名称并打包
Args:
override_name: 如果提供则使用此名称作为项目名称否则从路径提取
Returns:
压缩包路径
"""
try:
self.log("=" * 50)
self.log("开始整理项目并打包...")
# 1. 确定项目名称
if override_name:
project_name = override_name
self.log(f"使用提供的项目名称: {project_name}")
elif not self.config.project_path:
self.log("未配置项目路径,且未提供覆盖名称,无法提取项目名称")
return None
else:
project_name = os.path.basename(self.config.project_path.rstrip('/\\'))
self.log(f"从路径提取项目名称: {project_name}")
# 去掉前后的空格
project_name = project_name.strip()
# 移除非法文件名字符
project_name = re.sub(r'[\\/:*?"<>|]', '_', project_name)
self.log(f"最终使用的项目名称: {project_name}")
# 2. 找到 code 文件夹
code_dir = os.path.join(self.config.desktop_path, "code")
if not os.path.exists(code_dir):
self.log(f"未找到 code 文件夹: {code_dir}")
return None
# 3. 找到 code 下的第一个子文件夹
subdirs = [d for d in os.listdir(code_dir) if os.path.isdir(os.path.join(code_dir, d))]
if not subdirs:
self.log(f"code 文件夹下没有子文件夹: {code_dir}")
return None
# 排除掉一些可能的非项目文件夹
# subdirs = [d for d in subdirs if d not in ['.git', '__pycache__']]
first_subdir = subdirs[0]
first_subdir_path = os.path.join(code_dir, first_subdir)
# 4. 重命名
new_subdir_path = os.path.join(code_dir, project_name)
# 如果新路径和旧路径不同,则重命名
if first_subdir != project_name:
if os.path.exists(new_subdir_path):
self.log(f"目标文件夹已存在,删除旧文件夹: {new_subdir_path}")
shutil.rmtree(new_subdir_path)
try:
os.rename(first_subdir_path, new_subdir_path)
self.log(f"重命名文件夹: {first_subdir} -> {project_name}")
except Exception as e:
self.log(f"重命名失败: {str(e)}")
return None
else:
self.log(f"文件夹已经是项目名称,无需重命名: {first_subdir}")
# 5. 打包成压缩包
zip_target_path = os.path.join(self.config.desktop_path, project_name)
self.log(f"开始打包到桌面: {project_name}.zip")
# make_archive(base_name, format, root_dir, base_dir)
# base_name 是输出文件的路径(不含后缀)
# root_dir 是要打包的根目录
# base_dir 是相对于 root_dir 的要打包的目录(包含在压缩包内的顶级目录)
try:
zip_file = shutil.make_archive(
base_name=zip_target_path,
format='zip',
root_dir=code_dir,
base_dir=project_name
)
self.log(f"打包完成: {zip_file}")
return zip_file
except Exception as e:
self.log(f"打包失败: {str(e)}")
return None
except Exception as e:
self.log(f"整理打包失败: {str(e)}")
return None
2026-04-10 14:37:23 +08:00
def run_full_flow(self) -> bool:
"""
执行完整流程按照按钮排序顺序
Returns:
是否执行成功
"""
try:
self.log("=" * 50)
self.log("开始执行项目截图完整流程")
self.log("=" * 50)
# 1. 初始化变量和准备工作
self.log("\n[1/11] 初始化变量和准备工作...")
if not os.path.exists(self.config.project_path):
self.log(f"错误:项目路径不存在: {self.config.project_path}")
return False
# 截图保存到桌面路径下的"截图"文件夹
output_dir = os.path.join(self.config.desktop_path, "截图")
os.makedirs(output_dir, exist_ok=True)
self.log(f"截图输出目录: {output_dir}")
# 2. 整理项目代码
self.log("\n[2/11] 整理项目代码...")
sql_filename = self.organize_project_code()
if sql_filename:
self.log(f"代码整理完成SQL文件名: {sql_filename}")
else:
self.log("警告:代码整理可能未完全成功,继续执行...")
# 3. 安装后端依赖 (Maven构建)
self.log("\n[3/11] Maven 构建后端...")
success, msg = self.install_server()
if not success:
self.log(f"警告Maven 构建失败: {msg}")
else:
self.log("Maven 构建完成")
# 4. 安装前端依赖
self.log("\n[4/11] 安装前端依赖...")
self.log("安装后台前端依赖...")
success, msg = self.install_admin()
if not success:
self.log(f"警告:后台依赖安装失败: {msg}")
if self.config.has_front:
self.log("安装前台前端依赖...")
success, msg = self.install_front()
if not success:
self.log(f"警告:前台依赖安装失败: {msg}")
# 5. 启动后端服务
self.log("\n[5/11] 启动后端服务...")
backend_process = self.start_backend()
if not self.wait_for_service(
self.config.backend_url,
self.config.backend_startup_timeout
):
self.log("警告:后端服务启动超时,继续执行...")
# 6. 启动后台前端
self.log("\n[6/11] 启动后台前端...")
admin_process = self.start_admin()
if not self.wait_for_service(self.config.admin_url, self.config.admin_startup_timeout):
self.log("警告:后台前端启动超时,继续执行...")
# 7. 启动前台前端
if self.config.has_front:
self.log("\n[7/11] 启动前台前端...")
front_process = self.start_front()
if not self.wait_for_service(self.config.front_url, self.config.front_startup_timeout):
self.log("警告:前台前端启动超时,继续执行...")
# 8. 更换图片
self.log("\n[8/11] 更换图片...")
# 更换前台轮播图片
self.log("更换前台轮播图片...")
self.replace_swiper_images()
# 更换后台登录背景图
self.log("更换后台登录背景图...")
self.update_login_vue_background()
# 9. 导入 SQL
self.log("\n[9/11] 导入 SQL...")
success, msg = self.run_sql_script()
if not success:
self.log(f"警告SQL 导入失败: {msg}")
else:
self.log("SQL 导入成功")
# 10. 截图
self.log("\n[10/11] 开始截图...")
try:
screenshots = asyncio.run(self.capture_screenshots_with_playwright(output_dir))
self.log(f"共生成 {len(screenshots)} 张截图")
except Exception as e:
self.log(f"截图失败: {str(e)}")
# 11. 收尾工作
self.log("\n[11/11] 执行收尾工作...")
self.finalize_screenshots(output_dir)
self.log("完整流程执行完成!")
self.log("=" * 50)
return True
except Exception as e:
self.log(f"\n[ERROR] 流程执行失败: {str(e)}")
return False
if __name__ == "__main__":
# 测试代码
config = ProjectConfig(
project_path=r"D:\new_project",
desktop_path=r"C:\Users\Administrator\Desktop",
has_front=True
)
automation = ProjectScreenshotAutomation(config)
automation.run_full_flow()