1612 lines
64 KiB
Python
1612 lines
64 KiB
Python
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import re
|
|||
|
|
import glob
|
|||
|
|
import random
|
|||
|
|
import shutil
|
|||
|
|
import subprocess
|
|||
|
|
import time
|
|||
|
|
import asyncio
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Optional, Callable, List, Tuple
|
|||
|
|
from dataclasses import dataclass
|
|||
|
|
|
|||
|
|
# 预设后台登录背景图片列表
|
|||
|
|
LOGIN_BACKGROUND_IMAGES = [
|
|||
|
|
"http://img.yidaima.cn/7-dcd4fc1e8d0144aaa064bc282d1af289",
|
|||
|
|
"http://img.yidaima.cn/6-3db67e2e6ff64e018d48282638900a67",
|
|||
|
|
"http://img.yidaima.cn/5-74ee6fe2f0954a8db2574e8f7fbb8ef2",
|
|||
|
|
"http://img.yidaima.cn/4-a926593ba8c444a3984100e99ae322bb",
|
|||
|
|
"http://img.yidaima.cn/3-f91e9a0ed55d45c9aff80a08f554c625",
|
|||
|
|
"http://img.yidaima.cn/2-2bd41cdd57b54b529dc3a139e66233e0",
|
|||
|
|
"http://img.yidaima.cn/1-5a0b7ee2344a46bd987c1cafff19b3ba",
|
|||
|
|
"http://img.yidaima.cn/8-495b9767a2f34197b963a13226d3e1d2",
|
|||
|
|
"http://img.yidaima.cn/10-991483f941914f3d85bf87a4f8748fee",
|
|||
|
|
"http://img.yidaima.cn/11-d274f1d695f7472a8fcd2c8c6238a79e",
|
|||
|
|
"http://img.yidaima.cn/14-0e30653336d849379060ad8c436fe512",
|
|||
|
|
"http://img.yidaima.cn/16-7a58235a2a4040f99e3431c3cac9ea7a",
|
|||
|
|
"http://img.yidaima.cn/17-3406a546b81c412d9e3501cdb0f88b12",
|
|||
|
|
"http://img.yidaima.cn/18-0a2fae1d6634480dabf5471fa86b3623",
|
|||
|
|
"http://img.yidaima.cn/19-107196938e074c92a40b62c60aed18a4"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class ProjectConfig:
|
|||
|
|
"""项目配置数据类"""
|
|||
|
|
project_path: str
|
|||
|
|
has_front: bool = True
|
|||
|
|
desktop_path: str = ""
|
|||
|
|
|
|||
|
|
# 脚本路径(相对于 bat 文件夹或绝对路径)
|
|||
|
|
bat_folder: str = "new" # bat 文件所在文件夹(相对于桌面路径)
|
|||
|
|
install_server: str = "run_install_server.bat"
|
|||
|
|
install_front: str = "run_install_front.bat"
|
|||
|
|
install_admin: str = "run_install_admin.bat"
|
|||
|
|
run_server: str = "run_server.bat"
|
|||
|
|
run_front: str = "run_front.bat"
|
|||
|
|
run_admin: str = "run_admin.bat"
|
|||
|
|
run_sql: str = "run_sql.bat"
|
|||
|
|
|
|||
|
|
# SQL 配置
|
|||
|
|
sql_script_path: str = "server/db/init.sql"
|
|||
|
|
|
|||
|
|
# 服务配置
|
|||
|
|
backend_url: str = "http://localhost:8080"
|
|||
|
|
backend_startup_timeout: int = 30
|
|||
|
|
|
|||
|
|
front_url: str = "http://localhost:8082"
|
|||
|
|
front_login_url: str = "http://localhost:8082/#/login"
|
|||
|
|
front_startup_timeout: int = 60
|
|||
|
|
|
|||
|
|
admin_url: str = "http://localhost:8081"
|
|||
|
|
admin_login_url: str = "http://localhost:8081/#/login"
|
|||
|
|
admin_startup_timeout: int = 60
|
|||
|
|
|
|||
|
|
# 登录配置
|
|||
|
|
admin_username: str = "admin"
|
|||
|
|
admin_password: str = "admin"
|
|||
|
|
front_username: str = "2"
|
|||
|
|
front_password: str = "123456"
|
|||
|
|
|
|||
|
|
# 截图配置
|
|||
|
|
screenshot_output_dir: str = "./screenshots"
|
|||
|
|
screenshot_delay: int = 2000
|
|||
|
|
admin_menu_selector: str = ".el-menu-item, .el-sub-menu__title"
|
|||
|
|
front_menu_selector: str = ".nav-item, .menu-item"
|
|||
|
|
|
|||
|
|
# 轮播图片配置
|
|||
|
|
swiper_source_folder: str = "bg_pic"
|
|||
|
|
swiper_target_subpath: str = "src/main/resources/static/file"
|
|||
|
|
swiper_target_names: List[str] = None
|
|||
|
|
swiper_count: int = 3
|
|||
|
|
|
|||
|
|
# 登录背景图配置
|
|||
|
|
login_vue_subpath: str = "src/views/login.vue"
|
|||
|
|
login_background_images: List[str] = None
|
|||
|
|
|
|||
|
|
# 代码整理配置
|
|||
|
|
code_source_folder: str = "code" # 桌面上的 code 文件夹名称
|
|||
|
|
code_target_path: str = r"D:\code" # 整理代码的目标目录(默认 D:\code)
|
|||
|
|
|
|||
|
|
# 执行选项
|
|||
|
|
show_cmd_window: bool = True # 是否显示 CMD 窗口
|
|||
|
|
|
|||
|
|
def __post_init__(self):
|
|||
|
|
if self.swiper_target_names is None:
|
|||
|
|
self.swiper_target_names = ["swiperPicture1.jpg", "swiperPicture2.jpg", "swiperPicture3.jpg"]
|
|||
|
|
if self.login_background_images is None:
|
|||
|
|
self.login_background_images = LOGIN_BACKGROUND_IMAGES.copy()
|
|||
|
|
|
|||
|
|
|
|||
|
|
class ProjectScreenshotAutomation:
|
|||
|
|
"""项目截图自动化类"""
|
|||
|
|
|
|||
|
|
def __init__(self, config: ProjectConfig, log_callback: Optional[Callable[[str], None]] = None):
|
|||
|
|
self.config = config
|
|||
|
|
self.log_callback = log_callback or print
|
|||
|
|
self.processes: List[subprocess.Popen] = []
|
|||
|
|
|
|||
|
|
def log(self, message: str):
|
|||
|
|
"""输出日志"""
|
|||
|
|
self.log_callback(message)
|
|||
|
|
|
|||
|
|
def execute_bat(self, bat_path: str, cwd: str = None, timeout: int = 300) -> Tuple[bool, str]:
|
|||
|
|
"""
|
|||
|
|
执行 bat 脚本并返回结果
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
bat_path: bat 文件路径
|
|||
|
|
cwd: 工作目录
|
|||
|
|
timeout: 超时时间(秒)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
(success, message)
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
if not os.path.exists(bat_path):
|
|||
|
|
return False, f"脚本文件不存在: {bat_path}"
|
|||
|
|
|
|||
|
|
self.log(f"执行脚本: {bat_path}")
|
|||
|
|
self.log(f"项目路径参数: {self.config.project_path}")
|
|||
|
|
|
|||
|
|
# 根据配置决定是否显示 CMD 窗口
|
|||
|
|
startupinfo = subprocess.STARTUPINFO()
|
|||
|
|
creationflags = 0
|
|||
|
|
|
|||
|
|
if self.config.show_cmd_window:
|
|||
|
|
# 显示 CMD 窗口
|
|||
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|||
|
|
startupinfo.wShowWindow = 1 # SW_SHOWNORMAL
|
|||
|
|
creationflags = 0x00000010 # CREATE_NEW_CONSOLE
|
|||
|
|
self.log("CMD 窗口: 显示")
|
|||
|
|
else:
|
|||
|
|
# 隐藏 CMD 窗口
|
|||
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|||
|
|
startupinfo.wShowWindow = 0 # SW_HIDE
|
|||
|
|
self.log("CMD 窗口: 隐藏")
|
|||
|
|
|
|||
|
|
# 构建命令,传入项目路径作为参数
|
|||
|
|
cmd = f'"{bat_path}" "{self.config.project_path}"'
|
|||
|
|
|
|||
|
|
process = subprocess.Popen(
|
|||
|
|
cmd,
|
|||
|
|
cwd=cwd,
|
|||
|
|
shell=True,
|
|||
|
|
stdout=subprocess.PIPE,
|
|||
|
|
stderr=subprocess.PIPE,
|
|||
|
|
startupinfo=startupinfo,
|
|||
|
|
creationflags=creationflags
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
self.processes.append(process)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
stdout_bytes, stderr_bytes = process.communicate(timeout=timeout)
|
|||
|
|
# 尝试多种编码解码输出
|
|||
|
|
stdout = self._decode_output(stdout_bytes)
|
|||
|
|
stderr = self._decode_output(stderr_bytes)
|
|||
|
|
|
|||
|
|
if process.returncode == 0:
|
|||
|
|
return True, stdout
|
|||
|
|
else:
|
|||
|
|
return False, f"脚本执行失败: {stderr}"
|
|||
|
|
except subprocess.TimeoutExpired:
|
|||
|
|
process.kill()
|
|||
|
|
return False, f"脚本执行超时({timeout}秒)"
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
return False, f"执行脚本出错: {str(e)}"
|
|||
|
|
|
|||
|
|
def _decode_output(self, data: bytes) -> str:
|
|||
|
|
"""尝试多种编码解码字节数据"""
|
|||
|
|
if not data:
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
# 尝试的编码列表(GBK/GB2312/CP936 优先,因为 bat 脚本使用 chcp 936)
|
|||
|
|
encodings = ['gbk', 'gb2312', 'cp936', 'utf-8', 'latin-1']
|
|||
|
|
|
|||
|
|
for encoding in encodings:
|
|||
|
|
try:
|
|||
|
|
return data.decode(encoding, errors='replace')
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 如果都失败,使用 latin-1 解码(不会丢失数据)
|
|||
|
|
return data.decode('latin-1', errors='replace')
|
|||
|
|
|
|||
|
|
def clear_directory(self, path: str):
|
|||
|
|
"""删除目录下的所有内容"""
|
|||
|
|
if os.path.exists(path):
|
|||
|
|
for item in os.listdir(path):
|
|||
|
|
item_path = os.path.join(path, item)
|
|||
|
|
if os.path.isfile(item_path) or os.path.islink(item_path):
|
|||
|
|
os.unlink(item_path)
|
|||
|
|
elif os.path.isdir(item_path):
|
|||
|
|
shutil.rmtree(item_path)
|
|||
|
|
self.log(f"已清空目录: {path}")
|
|||
|
|
else:
|
|||
|
|
self.log(f"目录不存在: {path}")
|
|||
|
|
|
|||
|
|
def get_first_subdirectory(self, path: str) -> Optional[str]:
|
|||
|
|
"""获取指定目录下的第一个子文件夹"""
|
|||
|
|
if os.path.exists(path):
|
|||
|
|
for item in os.listdir(path):
|
|||
|
|
item_path = os.path.join(path, item)
|
|||
|
|
if os.path.isdir(item_path):
|
|||
|
|
return item_path
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def copy_directory(self, src: str, dst: str):
|
|||
|
|
"""复制目录及其内容"""
|
|||
|
|
if os.path.exists(src):
|
|||
|
|
shutil.copytree(src, dst, dirs_exist_ok=True)
|
|||
|
|
self.log(f"已复制 {src} 到 {dst}")
|
|||
|
|
else:
|
|||
|
|
self.log(f"源目录不存在: {src}")
|
|||
|
|
|
|||
|
|
def get_filename_without_extension(self, file_path: str) -> str:
|
|||
|
|
"""获取文件名(不带后缀)"""
|
|||
|
|
return os.path.splitext(os.path.basename(file_path))[0]
|
|||
|
|
|
|||
|
|
def organize_project_code(self) -> Optional[str]:
|
|||
|
|
"""
|
|||
|
|
整理项目代码
|
|||
|
|
|
|||
|
|
步骤:
|
|||
|
|
1. 清空桌面 code 文件夹
|
|||
|
|
2. 把项目路径的代码复制到桌面 code 文件夹
|
|||
|
|
3. 在桌面 code 文件夹中找前后端代码和 db
|
|||
|
|
4. 放到代码目标路径下的 \front、\server、\db 文件夹
|
|||
|
|
5. 返回 SQL 目录下的第一个文件名(不带后缀)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
第一个 SQL 文件名(不带后缀),失败返回 None
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
self.log("开始整理项目代码...")
|
|||
|
|
|
|||
|
|
# 步骤 1: 清空桌面 code 文件夹和截图文件夹
|
|||
|
|
desktop_code_path = os.path.join(self.config.desktop_path, self.config.code_source_folder)
|
|||
|
|
screenshot_path = os.path.join(self.config.desktop_path, "截图")
|
|||
|
|
|
|||
|
|
self.log(f"步骤1: 清空桌面 code 文件夹 {desktop_code_path}...")
|
|||
|
|
self.clear_directory(desktop_code_path)
|
|||
|
|
os.makedirs(desktop_code_path, exist_ok=True)
|
|||
|
|
|
|||
|
|
self.log(f"步骤1: 清空桌面截图文件夹 {screenshot_path}...")
|
|||
|
|
self.clear_directory(screenshot_path)
|
|||
|
|
os.makedirs(screenshot_path, exist_ok=True)
|
|||
|
|
|
|||
|
|
# 步骤 2: 把项目路径的代码复制到桌面 code 文件夹
|
|||
|
|
self.log("\n步骤2: 复制项目代码到桌面 code 文件夹...")
|
|||
|
|
project_source_path = os.path.join(self.config.project_path, "源码")
|
|||
|
|
|
|||
|
|
if os.path.exists(project_source_path):
|
|||
|
|
self.log(f"从 {project_source_path} 复制代码到 {desktop_code_path}")
|
|||
|
|
self.copy_directory(project_source_path, desktop_code_path)
|
|||
|
|
else:
|
|||
|
|
self.log(f"警告:项目源码文件夹不存在: {project_source_path}")
|
|||
|
|
self.log("将直接使用桌面 code 文件夹中的现有代码")
|
|||
|
|
|
|||
|
|
# 步骤 3: 在桌面 code 文件夹中找第一个子文件夹
|
|||
|
|
self.log("\n步骤3: 查找项目子目录...")
|
|||
|
|
first_subdir = self.get_first_subdirectory(desktop_code_path)
|
|||
|
|
|
|||
|
|
if not first_subdir:
|
|||
|
|
self.log("未找到桌面 code 文件夹下的子目录")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
self.log(f"找到的子目录: {first_subdir}")
|
|||
|
|
|
|||
|
|
# 步骤 4: 清空并创建代码目标路径下的目录
|
|||
|
|
self.log(f"\n步骤4: 准备代码目标路径 {self.config.code_target_path}...")
|
|||
|
|
target_dirs = [
|
|||
|
|
os.path.join(self.config.code_target_path, "front"),
|
|||
|
|
os.path.join(self.config.code_target_path, "server"),
|
|||
|
|
os.path.join(self.config.code_target_path, "db"),
|
|||
|
|
]
|
|||
|
|
for dir_path in target_dirs:
|
|||
|
|
self.clear_directory(dir_path)
|
|||
|
|
os.makedirs(dir_path, exist_ok=True)
|
|||
|
|
|
|||
|
|
# 步骤 5: 复制 client_code 和 manage_code 到代码目标路径的 front
|
|||
|
|
self.log("\n步骤5: 复制前端代码...")
|
|||
|
|
front_target = os.path.join(self.config.code_target_path, "front")
|
|||
|
|
for folder in ["client_code", "manage_code"]:
|
|||
|
|
src = os.path.join(first_subdir, folder)
|
|||
|
|
dst = os.path.join(front_target, folder)
|
|||
|
|
self.copy_directory(src, dst)
|
|||
|
|
|
|||
|
|
# 步骤 6: 复制 server_code 到代码目标路径的 server
|
|||
|
|
self.log("\n步骤6: 复制后端代码...")
|
|||
|
|
server_src = os.path.join(first_subdir, "server_code")
|
|||
|
|
server_dst = os.path.join(self.config.code_target_path, "server", "server_code")
|
|||
|
|
self.copy_directory(server_src, server_dst)
|
|||
|
|
|
|||
|
|
# 步骤 7: 复制 sql 文件到代码目标路径的 db
|
|||
|
|
self.log("\n步骤7: 复制 SQL 文件...")
|
|||
|
|
sql_src = os.path.join(first_subdir, "server_code", "sql")
|
|||
|
|
db_target = os.path.join(self.config.code_target_path, "db")
|
|||
|
|
first_sql_filename = None
|
|||
|
|
|
|||
|
|
if os.path.exists(sql_src):
|
|||
|
|
for item in os.listdir(sql_src):
|
|||
|
|
item_src = os.path.join(sql_src, item)
|
|||
|
|
item_dst = os.path.join(db_target, item)
|
|||
|
|
if os.path.isfile(item_src):
|
|||
|
|
shutil.copy2(item_src, item_dst)
|
|||
|
|
self.log(f"已复制文件 {item} 到 {db_target}")
|
|||
|
|
if first_sql_filename is None:
|
|||
|
|
first_sql_filename = self.get_filename_without_extension(item)
|
|||
|
|
elif os.path.isdir(item_src):
|
|||
|
|
shutil.copytree(item_src, item_dst, dirs_exist_ok=True)
|
|||
|
|
self.log(f"已复制目录 {item} 到 {db_target}")
|
|||
|
|
else:
|
|||
|
|
self.log(f"SQL 目录不存在: {sql_src}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# 步骤 8: 返回 SQL 目录下的第一个文件名(不带后缀)
|
|||
|
|
self.log("\n步骤8: 代码整理完成!")
|
|||
|
|
if first_sql_filename:
|
|||
|
|
self.log(f"SQL 文件名(不带后缀): {first_sql_filename}")
|
|||
|
|
return first_sql_filename
|
|||
|
|
else:
|
|||
|
|
self.log("SQL 目录中没有文件,无法获取文件名")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"整理项目代码出错: {str(e)}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def execute_bat_async(self, bat_path: str, cwd: str = None) -> subprocess.Popen:
|
|||
|
|
"""
|
|||
|
|
异步执行 bat 脚本(用于启动服务)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
bat_path: bat 文件路径
|
|||
|
|
cwd: 工作目录
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
subprocess.Popen 对象
|
|||
|
|
"""
|
|||
|
|
if not os.path.exists(bat_path):
|
|||
|
|
raise FileNotFoundError(f"脚本文件不存在: {bat_path}")
|
|||
|
|
|
|||
|
|
self.log(f"异步执行脚本: {bat_path}")
|
|||
|
|
self.log(f"项目路径参数: {self.config.project_path}")
|
|||
|
|
|
|||
|
|
# 根据配置决定是否显示 CMD 窗口
|
|||
|
|
startupinfo = subprocess.STARTUPINFO()
|
|||
|
|
creationflags = 0
|
|||
|
|
|
|||
|
|
if self.config.show_cmd_window:
|
|||
|
|
# 显示 CMD 窗口
|
|||
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|||
|
|
startupinfo.wShowWindow = 1 # SW_SHOWNORMAL
|
|||
|
|
creationflags = 0x00000010 # CREATE_NEW_CONSOLE
|
|||
|
|
self.log("CMD 窗口: 显示")
|
|||
|
|
else:
|
|||
|
|
# 隐藏 CMD 窗口
|
|||
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|||
|
|
startupinfo.wShowWindow = 0 # SW_HIDE
|
|||
|
|
self.log("CMD 窗口: 隐藏")
|
|||
|
|
|
|||
|
|
# 构建命令,传入项目路径作为参数
|
|||
|
|
cmd = f'"{bat_path}" "{self.config.project_path}"'
|
|||
|
|
|
|||
|
|
# 对于启动服务的场景,不捕获输出,让用户可以直接在 CMD 窗口中看到输出并交互
|
|||
|
|
if self.config.show_cmd_window:
|
|||
|
|
# 显示 CMD 窗口,不捕获输出
|
|||
|
|
# 使用 cmd /c 启动 bat 文件,确保 CMD 窗口正确显示
|
|||
|
|
full_cmd = f'cmd /c "{cmd}"'
|
|||
|
|
process = subprocess.Popen(
|
|||
|
|
full_cmd,
|
|||
|
|
cwd=cwd,
|
|||
|
|
shell=False,
|
|||
|
|
startupinfo=startupinfo,
|
|||
|
|
creationflags=creationflags
|
|||
|
|
)
|
|||
|
|
else:
|
|||
|
|
# 隐藏 CMD 窗口,捕获输出
|
|||
|
|
process = subprocess.Popen(
|
|||
|
|
cmd,
|
|||
|
|
cwd=cwd,
|
|||
|
|
shell=True,
|
|||
|
|
stdout=subprocess.PIPE,
|
|||
|
|
stderr=subprocess.PIPE,
|
|||
|
|
startupinfo=startupinfo,
|
|||
|
|
creationflags=creationflags
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
self.processes.append(process)
|
|||
|
|
return process
|
|||
|
|
|
|||
|
|
def wait_for_service(self, url: str, timeout: int = 120) -> bool:
|
|||
|
|
"""
|
|||
|
|
等待服务启动完成
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
url: 服务健康检查 URL
|
|||
|
|
timeout: 超时时间(秒)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
是否启动成功
|
|||
|
|
"""
|
|||
|
|
import urllib.request
|
|||
|
|
import urllib.error
|
|||
|
|
|
|||
|
|
self.log(f"等待服务启动: {url} (超时时间: {timeout}秒)")
|
|||
|
|
start_time = time.time()
|
|||
|
|
check_count = 0
|
|||
|
|
|
|||
|
|
while time.time() - start_time < timeout:
|
|||
|
|
check_count += 1
|
|||
|
|
elapsed = int(time.time() - start_time)
|
|||
|
|
|
|||
|
|
# 每10秒输出一次日志
|
|||
|
|
if check_count % 5 == 0:
|
|||
|
|
self.log(f" 已等待 {elapsed} 秒,继续检查...")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
req = urllib.request.Request(url, method='GET')
|
|||
|
|
req.add_header('User-Agent', 'Mozilla/5.0')
|
|||
|
|
with urllib.request.urlopen(req, timeout=5) as response:
|
|||
|
|
if response.status == 200:
|
|||
|
|
self.log(f"服务已启动: {url} (共等待 {elapsed} 秒)")
|
|||
|
|
return True
|
|||
|
|
except urllib.error.HTTPError as e:
|
|||
|
|
# 某些服务返回 401/403 也表示服务已启动
|
|||
|
|
if e.code in [401, 403]:
|
|||
|
|
self.log(f"服务已启动(需要认证): {url} (共等待 {elapsed} 秒)")
|
|||
|
|
return True
|
|||
|
|
except Exception as e:
|
|||
|
|
# 每20秒输出一次错误信息
|
|||
|
|
if check_count % 10 == 0:
|
|||
|
|
self.log(f" 检查失败: {str(e)[:50]}")
|
|||
|
|
|
|||
|
|
time.sleep(2)
|
|||
|
|
|
|||
|
|
self.log(f"服务启动超时: {url} (超时时间: {timeout}秒)")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def import_sql(self, sql_file: str, db_config: dict = None) -> Tuple[bool, str]:
|
|||
|
|
"""
|
|||
|
|
导入 SQL 脚本到数据库
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
sql_file: SQL 文件路径
|
|||
|
|
db_config: 数据库配置
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
(success, message)
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
if not os.path.exists(sql_file):
|
|||
|
|
return False, f"SQL 文件不存在: {sql_file}"
|
|||
|
|
|
|||
|
|
self.log(f"导入 SQL 文件: {sql_file}")
|
|||
|
|
|
|||
|
|
# 使用 mysql 命令行导入
|
|||
|
|
if db_config:
|
|||
|
|
host = db_config.get('host', 'localhost')
|
|||
|
|
port = db_config.get('port', 3306)
|
|||
|
|
user = db_config.get('user', 'root')
|
|||
|
|
password = db_config.get('password', '')
|
|||
|
|
database = db_config.get('database', '')
|
|||
|
|
|
|||
|
|
cmd = f'mysql -h {host} -P {port} -u {user}'
|
|||
|
|
if password:
|
|||
|
|
cmd += f' -p{password}'
|
|||
|
|
if database:
|
|||
|
|
cmd += f' {database}'
|
|||
|
|
cmd += f' < "{sql_file}"'
|
|||
|
|
|
|||
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
|||
|
|
if result.returncode != 0:
|
|||
|
|
return False, f"SQL 导入失败: {result.stderr}"
|
|||
|
|
|
|||
|
|
self.log("SQL 导入成功")
|
|||
|
|
|
|||
|
|
# 执行额外的 SQL 语句
|
|||
|
|
self.log("执行额外的 SQL 更新操作...")
|
|||
|
|
self._execute_post_import_sql(host, port, user, password, database)
|
|||
|
|
|
|||
|
|
return True, "SQL 导入成功"
|
|||
|
|
else:
|
|||
|
|
# 如果没有数据库配置,尝试执行 run_sql.bat
|
|||
|
|
bat_path = os.path.join(self.config.project_path, self.config.run_sql)
|
|||
|
|
return self.execute_bat(bat_path, cwd=self.config.project_path)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
return False, f"导入 SQL 出错: {str(e)}"
|
|||
|
|
|
|||
|
|
def _execute_post_import_sql(self, host: str, port: int, user: str, password: str, database: str):
|
|||
|
|
"""
|
|||
|
|
导入 SQL 后执行的额外操作
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
host: 数据库主机
|
|||
|
|
port: 数据库端口
|
|||
|
|
user: 用户名
|
|||
|
|
password: 密码
|
|||
|
|
database: 数据库名
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
import pymysql
|
|||
|
|
|
|||
|
|
# 连接数据库
|
|||
|
|
conn = pymysql.connect(
|
|||
|
|
host=host,
|
|||
|
|
port=port,
|
|||
|
|
user=user,
|
|||
|
|
password=password,
|
|||
|
|
database=database,
|
|||
|
|
charset='utf8mb4'
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
with conn.cursor() as cursor:
|
|||
|
|
# 查询包含 ming 或 hao 的列
|
|||
|
|
query = """
|
|||
|
|
SELECT TABLE_NAME, COLUMN_NAME
|
|||
|
|
FROM INFORMATION_SCHEMA.COLUMNS
|
|||
|
|
WHERE (COLUMN_NAME LIKE '%ming%' OR COLUMN_NAME LIKE '%hao%')
|
|||
|
|
AND TABLE_SCHEMA = %s
|
|||
|
|
"""
|
|||
|
|
cursor.execute(query, (database,))
|
|||
|
|
results = cursor.fetchall()
|
|||
|
|
|
|||
|
|
self.log(f"找到 {len(results)} 个匹配的列")
|
|||
|
|
|
|||
|
|
# 执行更新
|
|||
|
|
for table_name, column_name in results:
|
|||
|
|
update_sql = f"UPDATE `{table_name}` SET `{column_name}` = '2' WHERE `{column_name}` LIKE '%2%'"
|
|||
|
|
try:
|
|||
|
|
cursor.execute(update_sql)
|
|||
|
|
affected_rows = cursor.rowcount
|
|||
|
|
conn.commit()
|
|||
|
|
self.log(f"更新 {table_name}.{column_name}: 影响 {affected_rows} 行")
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"更新 {table_name}.{column_name} 失败: {str(e)}")
|
|||
|
|
conn.rollback()
|
|||
|
|
|
|||
|
|
conn.close()
|
|||
|
|
self.log("额外 SQL 操作执行完成")
|
|||
|
|
|
|||
|
|
except ImportError:
|
|||
|
|
self.log("警告:未安装 pymysql,跳过额外 SQL 操作")
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"执行额外 SQL 操作失败: {str(e)}")
|
|||
|
|
|
|||
|
|
def get_random_images(self, bg_pic_folder: str, count: int = 3) -> List[str]:
|
|||
|
|
"""
|
|||
|
|
从指定文件夹随机获取指定数量的图片文件
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
bg_pic_folder: 图片文件夹路径
|
|||
|
|
count: 需要获取的图片数量
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
随机选择的图片文件路径列表
|
|||
|
|
"""
|
|||
|
|
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.gif',
|
|||
|
|
'*.JPG', '*.JPEG', '*.PNG', '*.BMP', '*.GIF']
|
|||
|
|
|
|||
|
|
all_images = []
|
|||
|
|
for extension in image_extensions:
|
|||
|
|
pattern = os.path.join(bg_pic_folder, extension)
|
|||
|
|
all_images.extend(glob.glob(pattern))
|
|||
|
|
|
|||
|
|
if len(all_images) < count:
|
|||
|
|
self.log(f"警告:文件夹中只有 {len(all_images)} 张图片,少于需要的 {count} 张")
|
|||
|
|
return all_images
|
|||
|
|
|
|||
|
|
selected_images = random.sample(all_images, count)
|
|||
|
|
self.log(f"从 {len(all_images)} 张图片中随机选择了 {len(selected_images)} 张:")
|
|||
|
|
for i, img in enumerate(selected_images, 1):
|
|||
|
|
self.log(f" {i}. {os.path.basename(img)}")
|
|||
|
|
|
|||
|
|
return selected_images
|
|||
|
|
|
|||
|
|
def copy_images_to_target(self, source_images: List[str], target_folder: str) -> bool:
|
|||
|
|
"""
|
|||
|
|
将源图片复制到目标文件夹并重命名
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
source_images: 源图片文件路径列表
|
|||
|
|
target_folder: 目标文件夹路径
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
是否复制成功
|
|||
|
|
"""
|
|||
|
|
os.makedirs(target_folder, exist_ok=True)
|
|||
|
|
|
|||
|
|
for i, source_image in enumerate(source_images):
|
|||
|
|
if i >= len(self.config.swiper_target_names):
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
target_path = os.path.join(target_folder, self.config.swiper_target_names[i])
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
shutil.copy2(source_image, target_path)
|
|||
|
|
self.log(f"成功复制: {os.path.basename(source_image)} -> {self.config.swiper_target_names[i]}")
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"复制失败: {os.path.basename(source_image)} -> {self.config.swiper_target_names[i]}, 错误: {str(e)}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
def replace_swiper_images(self) -> str:
|
|||
|
|
"""
|
|||
|
|
更换前台轮播图片
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
执行结果信息
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
bg_pic_folder = os.path.join(self.config.desktop_path, self.config.swiper_source_folder)
|
|||
|
|
# 使用代码目标路径下的 server/server_code 目录
|
|||
|
|
target_folder = os.path.join(self.config.code_target_path, "server", "server_code", "src", "main", "resources", "static", "file")
|
|||
|
|
|
|||
|
|
self.log(f"源文件夹: {bg_pic_folder}")
|
|||
|
|
self.log(f"目标文件夹: {target_folder}")
|
|||
|
|
|
|||
|
|
if not os.path.exists(bg_pic_folder):
|
|||
|
|
error_msg = f"错误:源文件夹不存在: {bg_pic_folder}"
|
|||
|
|
self.log(error_msg)
|
|||
|
|
return error_msg
|
|||
|
|
|
|||
|
|
selected_images = self.get_random_images(bg_pic_folder, self.config.swiper_count)
|
|||
|
|
|
|||
|
|
if not selected_images:
|
|||
|
|
error_msg = "错误:源文件夹中没有找到图片文件"
|
|||
|
|
self.log(error_msg)
|
|||
|
|
return error_msg
|
|||
|
|
|
|||
|
|
success = self.copy_images_to_target(selected_images, target_folder)
|
|||
|
|
|
|||
|
|
if success:
|
|||
|
|
result_msg = f"成功!已随机选择 {len(selected_images)} 张图片并覆盖到目标位置"
|
|||
|
|
self.log(result_msg)
|
|||
|
|
return result_msg
|
|||
|
|
else:
|
|||
|
|
error_msg = "复制过程中出现错误"
|
|||
|
|
self.log(error_msg)
|
|||
|
|
return error_msg
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
error_msg = f"程序执行出错: {str(e)}"
|
|||
|
|
self.log(error_msg)
|
|||
|
|
return error_msg
|
|||
|
|
|
|||
|
|
def update_login_vue_background(self) -> bool:
|
|||
|
|
"""
|
|||
|
|
修改 login.vue 文件中的背景图片链接
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
是否更新成功
|
|||
|
|
"""
|
|||
|
|
# 使用代码目标路径下的 front/manage_code 目录
|
|||
|
|
file_path = os.path.join(self.config.code_target_path, "front", "manage_code", "src", "views", "login.vue")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
if not os.path.exists(file_path):
|
|||
|
|
self.log(f"错误:文件 {file_path} 不存在")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|||
|
|
content = file.read()
|
|||
|
|
|
|||
|
|
self.log(f"成功读取文件:{file_path}")
|
|||
|
|
|
|||
|
|
selected_image = random.choice(self.config.login_background_images)
|
|||
|
|
self.log(f"随机选择的图片:{selected_image}")
|
|||
|
|
|
|||
|
|
# 使用正则表达式匹配并替换背景图片URL
|
|||
|
|
pattern = r'background-image:\s*url\([^)]+\)'
|
|||
|
|
|
|||
|
|
if not re.search(pattern, content):
|
|||
|
|
self.log("警告:未找到匹配的背景图片URL模式")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
new_content = re.sub(pattern, f'background-image: url({selected_image})', content)
|
|||
|
|
|
|||
|
|
with open(file_path, 'w', encoding='utf-8') as file:
|
|||
|
|
file.write(new_content)
|
|||
|
|
|
|||
|
|
self.log(f"成功更新背景图片为:{selected_image}")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"处理文件时出错:{str(e)}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def _get_bat_path(self, bat_name: str) -> str:
|
|||
|
|
"""获取 bat 文件的完整路径(优先从桌面 new 文件夹查找)"""
|
|||
|
|
# 首先检查桌面 new 文件夹
|
|||
|
|
desktop_bat_path = os.path.join(self.config.desktop_path, self.config.bat_folder, bat_name)
|
|||
|
|
if os.path.exists(desktop_bat_path):
|
|||
|
|
return desktop_bat_path
|
|||
|
|
|
|||
|
|
# 其次检查项目路径
|
|||
|
|
project_bat_path = os.path.join(self.config.project_path, bat_name)
|
|||
|
|
if os.path.exists(project_bat_path):
|
|||
|
|
return project_bat_path
|
|||
|
|
|
|||
|
|
# 默认返回桌面 new 文件夹路径(即使不存在,让调用方处理错误)
|
|||
|
|
return desktop_bat_path
|
|||
|
|
|
|||
|
|
def install_server(self) -> Tuple[bool, str]:
|
|||
|
|
"""Maven 构建后端"""
|
|||
|
|
bat_path = self._get_bat_path(self.config.install_server)
|
|||
|
|
return self.execute_bat(bat_path, cwd=self.config.project_path, timeout=600)
|
|||
|
|
|
|||
|
|
def install_front(self) -> Tuple[bool, str]:
|
|||
|
|
"""安装前台前端依赖"""
|
|||
|
|
bat_path = self._get_bat_path(self.config.install_front)
|
|||
|
|
return self.execute_bat(bat_path, cwd=self.config.project_path, timeout=300)
|
|||
|
|
|
|||
|
|
def install_admin(self) -> Tuple[bool, str]:
|
|||
|
|
"""安装后台前端依赖"""
|
|||
|
|
bat_path = self._get_bat_path(self.config.install_admin)
|
|||
|
|
return self.execute_bat(bat_path, cwd=self.config.project_path, timeout=300)
|
|||
|
|
|
|||
|
|
def run_sql_script(self) -> Tuple[bool, str]:
|
|||
|
|
"""导入 SQL 脚本"""
|
|||
|
|
# SQL 脚本优先从桌面 new 文件夹查找
|
|||
|
|
desktop_sql_path = os.path.join(self.config.desktop_path, self.config.bat_folder, self.config.run_sql)
|
|||
|
|
if os.path.exists(desktop_sql_path):
|
|||
|
|
# 执行 bat 文件导入 SQL
|
|||
|
|
success, msg = self.execute_bat(desktop_sql_path, cwd=self.config.project_path, timeout=300)
|
|||
|
|
return success, msg
|
|||
|
|
|
|||
|
|
# 其次检查项目路径下的 SQL 脚本
|
|||
|
|
sql_file = os.path.join(self.config.project_path, self.config.sql_script_path)
|
|||
|
|
# 尝试从 SQL 文件中提取数据库名
|
|||
|
|
database_name = self._extract_database_from_sql(sql_file)
|
|||
|
|
if database_name:
|
|||
|
|
db_config = {
|
|||
|
|
'host': 'localhost',
|
|||
|
|
'port': 3306,
|
|||
|
|
'user': 'root',
|
|||
|
|
'password': '',
|
|||
|
|
'database': database_name
|
|||
|
|
}
|
|||
|
|
return self.import_sql(sql_file, db_config)
|
|||
|
|
else:
|
|||
|
|
return self.import_sql(sql_file)
|
|||
|
|
|
|||
|
|
def _extract_database_from_sql(self, sql_file: str) -> str:
|
|||
|
|
"""
|
|||
|
|
从 SQL 文件中提取数据库名
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
sql_file: SQL 文件路径
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
数据库名,如果无法提取则返回空字符串
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
if not os.path.exists(sql_file):
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
with open(sql_file, 'r', encoding='utf-8') as f:
|
|||
|
|
content = f.read()
|
|||
|
|
|
|||
|
|
# 尝试匹配 USE database; 或 CREATE DATABASE database; 等语句
|
|||
|
|
import re
|
|||
|
|
patterns = [
|
|||
|
|
r"USE\s+`?(\w+)`?\s*;",
|
|||
|
|
r"CREATE\s+DATABASE\s+(?:IF\s+NOT\s+EXISTS\s+)?`?(\w+)`?",
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for pattern in patterns:
|
|||
|
|
match = re.search(pattern, content, re.IGNORECASE)
|
|||
|
|
if match:
|
|||
|
|
database_name = match.group(1)
|
|||
|
|
self.log(f"从 SQL 文件中提取到数据库名: {database_name}")
|
|||
|
|
return database_name
|
|||
|
|
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"提取数据库名失败: {str(e)}")
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
def start_backend(self) -> subprocess.Popen:
|
|||
|
|
"""启动后端服务"""
|
|||
|
|
bat_path = self._get_bat_path(self.config.run_server)
|
|||
|
|
return self.execute_bat_async(bat_path, cwd=self.config.project_path)
|
|||
|
|
|
|||
|
|
def start_front(self) -> subprocess.Popen:
|
|||
|
|
"""启动前台前端"""
|
|||
|
|
bat_path = self._get_bat_path(self.config.run_front)
|
|||
|
|
return self.execute_bat_async(bat_path, cwd=self.config.project_path)
|
|||
|
|
|
|||
|
|
def start_admin(self) -> subprocess.Popen:
|
|||
|
|
"""启动后台前端"""
|
|||
|
|
bat_path = self._get_bat_path(self.config.run_admin)
|
|||
|
|
return self.execute_bat_async(bat_path, cwd=self.config.project_path)
|
|||
|
|
|
|||
|
|
def stop_all_processes(self):
|
|||
|
|
"""停止所有启动的进程"""
|
|||
|
|
self.log("停止所有服务进程...")
|
|||
|
|
for process in self.processes:
|
|||
|
|
try:
|
|||
|
|
process.terminate()
|
|||
|
|
process.wait(timeout=5)
|
|||
|
|
except Exception:
|
|||
|
|
try:
|
|||
|
|
process.kill()
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
self.processes.clear()
|
|||
|
|
|
|||
|
|
async def capture_screenshots_with_playwright(self, output_dir: str) -> List[str]:
|
|||
|
|
"""
|
|||
|
|
使用 Playwright 进行截图(同时截取前后台)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
output_dir: 截图输出目录
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
截图文件路径列表
|
|||
|
|
"""
|
|||
|
|
from playwright.async_api import async_playwright
|
|||
|
|
|
|||
|
|
screenshots = []
|
|||
|
|
os.makedirs(output_dir, exist_ok=True)
|
|||
|
|
|
|||
|
|
async with async_playwright() as p:
|
|||
|
|
browser = await p.chromium.launch(headless=False)
|
|||
|
|
context = await browser.new_context(viewport={'width': 1920, 'height': 1080})
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 后台截图
|
|||
|
|
self.log("开始后台截图...")
|
|||
|
|
admin_screenshots = await self._capture_admin_screenshots(context, output_dir)
|
|||
|
|
screenshots.extend(admin_screenshots)
|
|||
|
|
|
|||
|
|
# 前台截图(如果有)
|
|||
|
|
if self.config.has_front:
|
|||
|
|
self.log("开始前台截图...")
|
|||
|
|
front_screenshots = await self._capture_front_screenshots(context, output_dir)
|
|||
|
|
screenshots.extend(front_screenshots)
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
await context.close()
|
|||
|
|
await browser.close()
|
|||
|
|
|
|||
|
|
return screenshots
|
|||
|
|
|
|||
|
|
async def capture_admin_screenshots_only(self, output_dir: str) -> List[str]:
|
|||
|
|
"""
|
|||
|
|
仅截取后台页面
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
output_dir: 截图输出目录
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
截图文件路径列表
|
|||
|
|
"""
|
|||
|
|
from playwright.async_api import async_playwright
|
|||
|
|
|
|||
|
|
screenshots = []
|
|||
|
|
os.makedirs(output_dir, exist_ok=True)
|
|||
|
|
|
|||
|
|
async with async_playwright() as p:
|
|||
|
|
browser = await p.chromium.launch(headless=False)
|
|||
|
|
context = await browser.new_context(viewport={'width': 1920, 'height': 1080})
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
self.log("开始后台截图...")
|
|||
|
|
admin_screenshots = await self._capture_admin_screenshots(context, output_dir)
|
|||
|
|
screenshots.extend(admin_screenshots)
|
|||
|
|
finally:
|
|||
|
|
await context.close()
|
|||
|
|
await browser.close()
|
|||
|
|
|
|||
|
|
return screenshots
|
|||
|
|
|
|||
|
|
async def capture_front_screenshots_only(self, output_dir: str) -> List[str]:
|
|||
|
|
"""
|
|||
|
|
仅截取前台页面
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
output_dir: 截图输出目录
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
截图文件路径列表
|
|||
|
|
"""
|
|||
|
|
from playwright.async_api import async_playwright
|
|||
|
|
|
|||
|
|
screenshots = []
|
|||
|
|
os.makedirs(output_dir, exist_ok=True)
|
|||
|
|
|
|||
|
|
async with async_playwright() as p:
|
|||
|
|
browser = await p.chromium.launch(headless=False)
|
|||
|
|
context = await browser.new_context(viewport={'width': 1920, 'height': 1080})
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
self.log("开始前台截图...")
|
|||
|
|
front_screenshots = await self._capture_front_screenshots(context, output_dir)
|
|||
|
|
screenshots.extend(front_screenshots)
|
|||
|
|
finally:
|
|||
|
|
await context.close()
|
|||
|
|
await browser.close()
|
|||
|
|
|
|||
|
|
return screenshots
|
|||
|
|
|
|||
|
|
async def _capture_menu_recursive(self, page, output_dir: str, screenshots: List[str], captured_menus: set, depth: int, max_depth: int = 3, processed_items: set = None):
|
|||
|
|
"""
|
|||
|
|
递归截图菜单
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
page: Playwright page 对象
|
|||
|
|
output_dir: 截图输出目录
|
|||
|
|
screenshots: 截图路径列表
|
|||
|
|
captured_menus: 已截图的菜单名称集合
|
|||
|
|
depth: 当前递归深度
|
|||
|
|
max_depth: 最大递归深度
|
|||
|
|
processed_items: 已处理的菜单项文本集合(避免重复处理)
|
|||
|
|
"""
|
|||
|
|
if depth > max_depth:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
if processed_items is None:
|
|||
|
|
processed_items = set()
|
|||
|
|
|
|||
|
|
# 查找所有可见的菜单项(包括子菜单和普通菜单项)
|
|||
|
|
# 使用更通用的选择器获取所有 el-menu 下的 li 元素
|
|||
|
|
menu_selectors = [
|
|||
|
|
"xpath=//ul[contains(@class, 'el-menu')]//li",
|
|||
|
|
self.config.admin_menu_selector
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
menu_items = []
|
|||
|
|
for selector in menu_selectors:
|
|||
|
|
try:
|
|||
|
|
items = await page.query_selector_all(selector)
|
|||
|
|
if items:
|
|||
|
|
menu_items = items
|
|||
|
|
self.log(f"使用选择器定位菜单成功: {selector}, 找到 {len(items)} 个")
|
|||
|
|
break
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"选择器 {selector} 失败: {str(e)}")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
self.log(f"第 {depth} 层发现 {len(menu_items)} 个菜单项")
|
|||
|
|
|
|||
|
|
for i, item in enumerate(menu_items):
|
|||
|
|
try:
|
|||
|
|
# 获取菜单文本
|
|||
|
|
text = await item.text_content()
|
|||
|
|
if not text:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
text = text.strip().replace('/', '_').replace('\\', '_')
|
|||
|
|
if not text or len(text) > 50:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 检查是否已经处理过这个菜单项
|
|||
|
|
if text in processed_items:
|
|||
|
|
self.log(f"菜单 '{text}' 已处理过,跳过")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 检查元素是否可见
|
|||
|
|
is_visible = await item.is_visible()
|
|||
|
|
if not is_visible:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 标记为已处理
|
|||
|
|
processed_items.add(text)
|
|||
|
|
|
|||
|
|
# 检查是否是子菜单(有 el-sub-menu 类)
|
|||
|
|
try:
|
|||
|
|
is_submenu = await item.evaluate('el => el.classList.contains("el-sub-menu")')
|
|||
|
|
except Exception:
|
|||
|
|
is_submenu = False
|
|||
|
|
|
|||
|
|
# 检查是否有子菜单项(ul 元素)
|
|||
|
|
has_children = False
|
|||
|
|
try:
|
|||
|
|
children = await item.query_selector_all(':scope > ul.el-menu--inline, :scope > ul.el-menu')
|
|||
|
|
has_children = len(children) > 0
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
self.log(f"处理菜单: {text}, is_submenu={is_submenu}, has_children={has_children}")
|
|||
|
|
|
|||
|
|
if is_submenu or has_children:
|
|||
|
|
# 点击展开子菜单
|
|||
|
|
self.log(f"点击展开子菜单: {text}")
|
|||
|
|
try:
|
|||
|
|
await item.click(force=True, timeout=3000)
|
|||
|
|
except Exception:
|
|||
|
|
try:
|
|||
|
|
await item.evaluate('el => el.click()')
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
await asyncio.sleep(1.5)
|
|||
|
|
|
|||
|
|
# 递归处理子菜单(只处理当前子菜单下的子项)
|
|||
|
|
await self._capture_submenu_items(page, item, output_dir, screenshots, captured_menus, depth + 1, max_depth, processed_items)
|
|||
|
|
else:
|
|||
|
|
# 普通菜单项,检查是否已截图
|
|||
|
|
if text in captured_menus:
|
|||
|
|
self.log(f"菜单 '{text}' 已截图,跳过")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 点击菜单
|
|||
|
|
self.log(f"点击菜单: {text}")
|
|||
|
|
try:
|
|||
|
|
await item.click(force=True, timeout=3000)
|
|||
|
|
except Exception:
|
|||
|
|
try:
|
|||
|
|
await item.evaluate('el => el.click()')
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
await asyncio.sleep(self.config.screenshot_delay / 1000)
|
|||
|
|
|
|||
|
|
# 截图
|
|||
|
|
screenshot_path = os.path.join(output_dir, f'{text}管理.png')
|
|||
|
|
await page.screenshot(path=screenshot_path, full_page=True)
|
|||
|
|
screenshots.append(screenshot_path)
|
|||
|
|
captured_menus.add(text)
|
|||
|
|
self.log(f"截图已保存: {screenshot_path}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"菜单处理失败: {str(e)}")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
async def _capture_submenu_items(self, page, parent_item, output_dir: str, screenshots: List[str], captured_menus: set, depth: int, max_depth: int, processed_items: set):
|
|||
|
|
"""
|
|||
|
|
处理子菜单项(只在父元素下查找)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
page: Playwright page 对象
|
|||
|
|
parent_item: 父菜单元素
|
|||
|
|
output_dir: 截图输出目录
|
|||
|
|
screenshots: 截图路径列表
|
|||
|
|
captured_menus: 已截图的菜单名称集合
|
|||
|
|
depth: 当前递归深度
|
|||
|
|
max_depth: 最大递归深度
|
|||
|
|
processed_items: 已处理的菜单项文本集合
|
|||
|
|
"""
|
|||
|
|
if depth > max_depth:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 在父元素下查找子菜单项
|
|||
|
|
submenu_selectors = [
|
|||
|
|
':scope > ul.el-menu--inline > li',
|
|||
|
|
':scope > ul > li',
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
submenu_items = []
|
|||
|
|
for selector in submenu_selectors:
|
|||
|
|
try:
|
|||
|
|
items = await parent_item.query_selector_all(selector)
|
|||
|
|
if items:
|
|||
|
|
submenu_items = items
|
|||
|
|
self.log(f"在子菜单下找到 {len(items)} 个子项")
|
|||
|
|
break
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
for item in submenu_items:
|
|||
|
|
try:
|
|||
|
|
# 获取菜单文本
|
|||
|
|
text = await item.text_content()
|
|||
|
|
if not text:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
text = text.strip().replace('/', '_').replace('\\', '_')
|
|||
|
|
if not text or len(text) > 50:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 检查是否已经处理过
|
|||
|
|
if text in processed_items:
|
|||
|
|
self.log(f"子菜单 '{text}' 已处理过,跳过")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 检查元素是否可见
|
|||
|
|
is_visible = await item.is_visible()
|
|||
|
|
if not is_visible:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 标记为已处理
|
|||
|
|
processed_items.add(text)
|
|||
|
|
|
|||
|
|
# 检查是否有子菜单
|
|||
|
|
try:
|
|||
|
|
is_submenu = await item.evaluate('el => el.classList.contains("el-sub-menu")')
|
|||
|
|
except Exception:
|
|||
|
|
is_submenu = False
|
|||
|
|
|
|||
|
|
has_children = False
|
|||
|
|
try:
|
|||
|
|
children = await item.query_selector_all(':scope > ul.el-menu--inline, :scope > ul.el-menu')
|
|||
|
|
has_children = len(children) > 0
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
self.log(f"处理子菜单: {text}, is_submenu={is_submenu}, has_children={has_children}")
|
|||
|
|
|
|||
|
|
if is_submenu or has_children:
|
|||
|
|
# 点击展开子菜单
|
|||
|
|
self.log(f"点击展开子菜单: {text}")
|
|||
|
|
try:
|
|||
|
|
await item.click(force=True, timeout=3000)
|
|||
|
|
except Exception:
|
|||
|
|
try:
|
|||
|
|
await item.evaluate('el => el.click()')
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
await asyncio.sleep(1.5)
|
|||
|
|
|
|||
|
|
# 递归处理子菜单
|
|||
|
|
await self._capture_submenu_items(page, item, output_dir, screenshots, captured_menus, depth + 1, max_depth, processed_items)
|
|||
|
|
else:
|
|||
|
|
# 普通菜单项,检查是否已截图
|
|||
|
|
if text in captured_menus:
|
|||
|
|
self.log(f"菜单 '{text}' 已截图,跳过")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 点击菜单
|
|||
|
|
self.log(f"点击菜单: {text}")
|
|||
|
|
try:
|
|||
|
|
await item.click(force=True, timeout=3000)
|
|||
|
|
except Exception:
|
|||
|
|
try:
|
|||
|
|
await item.evaluate('el => el.click()')
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
await asyncio.sleep(self.config.screenshot_delay / 1000)
|
|||
|
|
|
|||
|
|
# 截图
|
|||
|
|
screenshot_path = os.path.join(output_dir, f'{text}管理.png')
|
|||
|
|
await page.screenshot(path=screenshot_path, full_page=True)
|
|||
|
|
screenshots.append(screenshot_path)
|
|||
|
|
captured_menus.add(text)
|
|||
|
|
self.log(f"截图已保存: {screenshot_path}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"子菜单处理失败: {str(e)}")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
async def _capture_admin_screenshots(self, context, output_dir: str) -> List[str]:
|
|||
|
|
"""截取后台页面"""
|
|||
|
|
screenshots = []
|
|||
|
|
page = await context.new_page()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 访问登录页面
|
|||
|
|
self.log(f"访问后台登录页面: {self.config.admin_login_url}")
|
|||
|
|
await page.goto(self.config.admin_login_url, wait_until='networkidle')
|
|||
|
|
await asyncio.sleep(2)
|
|||
|
|
|
|||
|
|
# 截取登录页面
|
|||
|
|
login_screenshot_path = os.path.join(output_dir, '后台登录.png')
|
|||
|
|
await page.screenshot(path=login_screenshot_path, full_page=True)
|
|||
|
|
screenshots.append(login_screenshot_path)
|
|||
|
|
self.log(f"登录页面截图已保存: {login_screenshot_path}")
|
|||
|
|
|
|||
|
|
# 输入用户名密码
|
|||
|
|
self.log(f"登录后台: {self.config.admin_username}")
|
|||
|
|
|
|||
|
|
# 用户名输入框选择器(按优先级顺序)
|
|||
|
|
username_selectors = [
|
|||
|
|
"xpath=//div[contains(text(), '用户名:')]/following-sibling::input",
|
|||
|
|
"xpath=//input[@placeholder='请输入账号']",
|
|||
|
|
"input[placeholder='请输入用户名']",
|
|||
|
|
"input[name='username']",
|
|||
|
|
"#username",
|
|||
|
|
"input[type='text']"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 密码输入框选择器(按优先级顺序)
|
|||
|
|
password_selectors = [
|
|||
|
|
"xpath=//div[contains(text(), '密码:')]/following-sibling::input",
|
|||
|
|
"xpath=//input[@placeholder='请输入密码']",
|
|||
|
|
"input[placeholder='请输入密码']",
|
|||
|
|
"input[name='password']",
|
|||
|
|
"#password",
|
|||
|
|
"input[type='password']"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 登录按钮选择器
|
|||
|
|
submit_selectors = [
|
|||
|
|
"xpath=//button[.//span[text()='登录']]",
|
|||
|
|
"button[type='submit']",
|
|||
|
|
".login-btn",
|
|||
|
|
".el-button--primary",
|
|||
|
|
"button:has-text('登录')"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 输入用户名
|
|||
|
|
username_filled = False
|
|||
|
|
for selector in username_selectors:
|
|||
|
|
try:
|
|||
|
|
await page.fill(selector, self.config.admin_username, timeout=2000)
|
|||
|
|
self.log(f"用户名输入框定位成功: {selector}")
|
|||
|
|
username_filled = True
|
|||
|
|
break
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
if not username_filled:
|
|||
|
|
self.log("警告:未能找到用户名输入框")
|
|||
|
|
|
|||
|
|
# 输入密码
|
|||
|
|
password_filled = False
|
|||
|
|
for selector in password_selectors:
|
|||
|
|
try:
|
|||
|
|
await page.fill(selector, self.config.admin_password, timeout=2000)
|
|||
|
|
self.log(f"密码输入框定位成功: {selector}")
|
|||
|
|
password_filled = True
|
|||
|
|
break
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
if not password_filled:
|
|||
|
|
self.log("警告:未能找到密码输入框")
|
|||
|
|
|
|||
|
|
# 点击登录
|
|||
|
|
login_clicked = False
|
|||
|
|
for selector in submit_selectors:
|
|||
|
|
try:
|
|||
|
|
await page.click(selector, timeout=2000)
|
|||
|
|
self.log(f"登录按钮定位成功: {selector}")
|
|||
|
|
login_clicked = True
|
|||
|
|
break
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
if not login_clicked:
|
|||
|
|
self.log("警告:未能找到登录按钮")
|
|||
|
|
|
|||
|
|
# 等待登录成功
|
|||
|
|
await asyncio.sleep(3)
|
|||
|
|
|
|||
|
|
# 访问后台首页
|
|||
|
|
self.log(f"访问后台首页: {self.config.admin_url}")
|
|||
|
|
await page.goto(self.config.admin_url, wait_until='networkidle')
|
|||
|
|
await asyncio.sleep(self.config.screenshot_delay / 1000)
|
|||
|
|
|
|||
|
|
# 截取首页
|
|||
|
|
screenshot_path = os.path.join(output_dir, '后台首页.png')
|
|||
|
|
await page.screenshot(path=screenshot_path, full_page=True)
|
|||
|
|
screenshots.append(screenshot_path)
|
|||
|
|
self.log(f"截图已保存: {screenshot_path}")
|
|||
|
|
|
|||
|
|
# 识别菜单并截图
|
|||
|
|
try:
|
|||
|
|
# 用于记录已截图的菜单名称,避免重复
|
|||
|
|
captured_menus = set()
|
|||
|
|
|
|||
|
|
# 递归截图菜单
|
|||
|
|
await self._capture_menu_recursive(page, output_dir, screenshots, captured_menus, 0)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"菜单识别失败: {str(e)}")
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
await page.close()
|
|||
|
|
|
|||
|
|
return screenshots
|
|||
|
|
|
|||
|
|
async def _capture_front_screenshots(self, context, output_dir: str) -> List[str]:
|
|||
|
|
"""截取前台页面"""
|
|||
|
|
screenshots = []
|
|||
|
|
page = await context.new_page()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 访问登录页面
|
|||
|
|
self.log(f"访问前台登录页面: {self.config.front_login_url}")
|
|||
|
|
await page.goto(self.config.front_login_url, wait_until='networkidle')
|
|||
|
|
await asyncio.sleep(2)
|
|||
|
|
|
|||
|
|
# 输入用户名密码
|
|||
|
|
self.log(f"登录前台: {self.config.front_username}")
|
|||
|
|
|
|||
|
|
# 用户名输入框选择器(按优先级顺序)
|
|||
|
|
username_selectors = [
|
|||
|
|
"xpath=//div[contains(text(), '账号:')]/following-sibling::input",
|
|||
|
|
"xpath=//input[@placeholder='请输入账号']",
|
|||
|
|
"input[placeholder='请输入用户名']",
|
|||
|
|
"input[name='username']",
|
|||
|
|
"#username",
|
|||
|
|
"input[type='text']"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 密码输入框选择器(按优先级顺序)
|
|||
|
|
password_selectors = [
|
|||
|
|
"xpath=//div[contains(text(), '密码:')]/following-sibling::input",
|
|||
|
|
"xpath=//input[@placeholder='请输入密码']",
|
|||
|
|
"input[placeholder='请输入密码']",
|
|||
|
|
"input[name='password']",
|
|||
|
|
"#password",
|
|||
|
|
"input[type='password']"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 登录按钮选择器
|
|||
|
|
submit_selectors = [
|
|||
|
|
"xpath=//button[.//span[text()='登录']]",
|
|||
|
|
"button[type='submit']",
|
|||
|
|
".login-btn",
|
|||
|
|
".el-button--primary",
|
|||
|
|
"button:has-text('登录')"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 输入用户名
|
|||
|
|
username_filled = False
|
|||
|
|
for selector in username_selectors:
|
|||
|
|
try:
|
|||
|
|
await page.fill(selector, self.config.front_username, timeout=2000)
|
|||
|
|
self.log(f"用户名输入框定位成功: {selector}")
|
|||
|
|
username_filled = True
|
|||
|
|
break
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
if not username_filled:
|
|||
|
|
self.log("警告:未能找到用户名输入框")
|
|||
|
|
|
|||
|
|
# 输入密码
|
|||
|
|
password_filled = False
|
|||
|
|
for selector in password_selectors:
|
|||
|
|
try:
|
|||
|
|
await page.fill(selector, self.config.front_password, timeout=2000)
|
|||
|
|
self.log(f"密码输入框定位成功: {selector}")
|
|||
|
|
password_filled = True
|
|||
|
|
break
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
if not password_filled:
|
|||
|
|
self.log("警告:未能找到密码输入框")
|
|||
|
|
|
|||
|
|
# 点击登录
|
|||
|
|
login_clicked = False
|
|||
|
|
for selector in submit_selectors:
|
|||
|
|
try:
|
|||
|
|
await page.click(selector, timeout=2000)
|
|||
|
|
self.log(f"登录按钮定位成功: {selector}")
|
|||
|
|
login_clicked = True
|
|||
|
|
break
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
if not login_clicked:
|
|||
|
|
self.log("警告:未能找到登录按钮")
|
|||
|
|
|
|||
|
|
# 等待登录成功
|
|||
|
|
await asyncio.sleep(3)
|
|||
|
|
|
|||
|
|
# 访问前台首页 http://localhost:8082
|
|||
|
|
front_home_url = "http://localhost:8082"
|
|||
|
|
self.log(f"访问前台首页: {front_home_url}")
|
|||
|
|
await page.goto(front_home_url, wait_until='networkidle')
|
|||
|
|
await asyncio.sleep(self.config.screenshot_delay / 1000)
|
|||
|
|
|
|||
|
|
# 识别菜单并截图
|
|||
|
|
try:
|
|||
|
|
# 使用 xpath 查找 el-menu 下的菜单项
|
|||
|
|
menu_selector = "xpath=//ul[contains(@class, 'el-menu')]//li"
|
|||
|
|
menu_items = await page.query_selector_all(menu_selector)
|
|||
|
|
self.log(f"发现 {len(menu_items)} 个菜单项")
|
|||
|
|
|
|||
|
|
# 用于记录已截图的菜单名称,避免重复
|
|||
|
|
captured_front_menus = set()
|
|||
|
|
|
|||
|
|
for i, item in enumerate(menu_items[:15]): # 最多截图15个菜单
|
|||
|
|
try:
|
|||
|
|
# 获取菜单文本
|
|||
|
|
text = await item.text_content()
|
|||
|
|
if not text:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
text = text.strip().replace('/', '_').replace('\\', '_')
|
|||
|
|
if not text or len(text) > 50:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 检查是否已截图
|
|||
|
|
if text in captured_front_menus:
|
|||
|
|
self.log(f"菜单 '{text}' 已截图,跳过")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 检查元素是否可见
|
|||
|
|
is_visible = await item.is_visible()
|
|||
|
|
if not is_visible:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 点击菜单
|
|||
|
|
self.log(f"点击菜单: {text}")
|
|||
|
|
try:
|
|||
|
|
await item.click(force=True, timeout=3000)
|
|||
|
|
except Exception:
|
|||
|
|
try:
|
|||
|
|
await item.evaluate('el => el.click()')
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
await asyncio.sleep(self.config.screenshot_delay / 1000)
|
|||
|
|
|
|||
|
|
# 截图,文件名就是菜单名称
|
|||
|
|
screenshot_path = os.path.join(output_dir, f'{text}.png')
|
|||
|
|
await page.screenshot(path=screenshot_path, full_page=True)
|
|||
|
|
screenshots.append(screenshot_path)
|
|||
|
|
captured_front_menus.add(text)
|
|||
|
|
self.log(f"截图已保存: {screenshot_path}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"菜单截图失败: {str(e)}")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"菜单识别失败: {str(e)}")
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
await page.close()
|
|||
|
|
|
|||
|
|
return screenshots
|
|||
|
|
|
|||
|
|
def finalize_screenshots(self, folder_path: str):
|
|||
|
|
"""
|
|||
|
|
截图收尾工作:按创建时间排序并重命名截图文件
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
folder_path: 截图文件夹路径
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
self.log("=" * 50)
|
|||
|
|
self.log("开始执行截图收尾工作...")
|
|||
|
|
|
|||
|
|
# 获取文件夹中所有文件
|
|||
|
|
files = os.listdir(folder_path)
|
|||
|
|
|
|||
|
|
# 筛选出图片文件
|
|||
|
|
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']
|
|||
|
|
image_files = [f for f in files if os.path.splitext(f)[1].lower() in image_extensions]
|
|||
|
|
|
|||
|
|
if not image_files:
|
|||
|
|
self.log("未找到图片文件")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
self.log(f"找到 {len(image_files)} 个图片文件")
|
|||
|
|
|
|||
|
|
# 根据创建时间排序图片文件
|
|||
|
|
image_files.sort(key=lambda f: os.path.getctime(os.path.join(folder_path, f)))
|
|||
|
|
|
|||
|
|
# 开始重命名
|
|||
|
|
renamed_count = 0
|
|||
|
|
for index, filename in enumerate(image_files, start=1):
|
|||
|
|
# 获取文件扩展名
|
|||
|
|
ext = os.path.splitext(filename)[1]
|
|||
|
|
file_new_name = os.path.splitext(filename)[0]
|
|||
|
|
|
|||
|
|
# 构建新文件名
|
|||
|
|
new_name = f"{index}_{file_new_name}{ext}"
|
|||
|
|
|
|||
|
|
# 构建完整旧路径和新路径
|
|||
|
|
old_path = os.path.join(folder_path, filename)
|
|||
|
|
new_path = os.path.join(folder_path, new_name)
|
|||
|
|
|
|||
|
|
# 重命名文件
|
|||
|
|
try:
|
|||
|
|
os.rename(old_path, new_path)
|
|||
|
|
self.log(f"重命名: {filename} -> {new_name}")
|
|||
|
|
renamed_count += 1
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"无法重命名 {filename}: {e}")
|
|||
|
|
|
|||
|
|
self.log(f"截图收尾工作完成,共重命名 {renamed_count} 个文件")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"截图收尾工作失败: {str(e)}")
|
|||
|
|
|
|||
|
|
def run_full_flow(self) -> bool:
|
|||
|
|
"""
|
|||
|
|
执行完整流程(按照按钮排序顺序)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
是否执行成功
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
self.log("=" * 50)
|
|||
|
|
self.log("开始执行项目截图完整流程")
|
|||
|
|
self.log("=" * 50)
|
|||
|
|
|
|||
|
|
# 1. 初始化变量和准备工作
|
|||
|
|
self.log("\n[1/11] 初始化变量和准备工作...")
|
|||
|
|
if not os.path.exists(self.config.project_path):
|
|||
|
|
self.log(f"错误:项目路径不存在: {self.config.project_path}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
# 截图保存到桌面路径下的"截图"文件夹
|
|||
|
|
output_dir = os.path.join(self.config.desktop_path, "截图")
|
|||
|
|
os.makedirs(output_dir, exist_ok=True)
|
|||
|
|
self.log(f"截图输出目录: {output_dir}")
|
|||
|
|
|
|||
|
|
# 2. 整理项目代码
|
|||
|
|
self.log("\n[2/11] 整理项目代码...")
|
|||
|
|
sql_filename = self.organize_project_code()
|
|||
|
|
if sql_filename:
|
|||
|
|
self.log(f"代码整理完成,SQL文件名: {sql_filename}")
|
|||
|
|
else:
|
|||
|
|
self.log("警告:代码整理可能未完全成功,继续执行...")
|
|||
|
|
|
|||
|
|
# 3. 安装后端依赖 (Maven构建)
|
|||
|
|
self.log("\n[3/11] Maven 构建后端...")
|
|||
|
|
success, msg = self.install_server()
|
|||
|
|
if not success:
|
|||
|
|
self.log(f"警告:Maven 构建失败: {msg}")
|
|||
|
|
else:
|
|||
|
|
self.log("Maven 构建完成")
|
|||
|
|
|
|||
|
|
# 4. 安装前端依赖
|
|||
|
|
self.log("\n[4/11] 安装前端依赖...")
|
|||
|
|
|
|||
|
|
self.log("安装后台前端依赖...")
|
|||
|
|
success, msg = self.install_admin()
|
|||
|
|
if not success:
|
|||
|
|
self.log(f"警告:后台依赖安装失败: {msg}")
|
|||
|
|
|
|||
|
|
if self.config.has_front:
|
|||
|
|
self.log("安装前台前端依赖...")
|
|||
|
|
success, msg = self.install_front()
|
|||
|
|
if not success:
|
|||
|
|
self.log(f"警告:前台依赖安装失败: {msg}")
|
|||
|
|
|
|||
|
|
# 5. 启动后端服务
|
|||
|
|
self.log("\n[5/11] 启动后端服务...")
|
|||
|
|
backend_process = self.start_backend()
|
|||
|
|
if not self.wait_for_service(
|
|||
|
|
self.config.backend_url,
|
|||
|
|
self.config.backend_startup_timeout
|
|||
|
|
):
|
|||
|
|
self.log("警告:后端服务启动超时,继续执行...")
|
|||
|
|
|
|||
|
|
# 6. 启动后台前端
|
|||
|
|
self.log("\n[6/11] 启动后台前端...")
|
|||
|
|
admin_process = self.start_admin()
|
|||
|
|
if not self.wait_for_service(self.config.admin_url, self.config.admin_startup_timeout):
|
|||
|
|
self.log("警告:后台前端启动超时,继续执行...")
|
|||
|
|
|
|||
|
|
# 7. 启动前台前端
|
|||
|
|
if self.config.has_front:
|
|||
|
|
self.log("\n[7/11] 启动前台前端...")
|
|||
|
|
front_process = self.start_front()
|
|||
|
|
if not self.wait_for_service(self.config.front_url, self.config.front_startup_timeout):
|
|||
|
|
self.log("警告:前台前端启动超时,继续执行...")
|
|||
|
|
|
|||
|
|
# 8. 更换图片
|
|||
|
|
self.log("\n[8/11] 更换图片...")
|
|||
|
|
|
|||
|
|
# 更换前台轮播图片
|
|||
|
|
self.log("更换前台轮播图片...")
|
|||
|
|
self.replace_swiper_images()
|
|||
|
|
|
|||
|
|
# 更换后台登录背景图
|
|||
|
|
self.log("更换后台登录背景图...")
|
|||
|
|
self.update_login_vue_background()
|
|||
|
|
|
|||
|
|
# 9. 导入 SQL
|
|||
|
|
self.log("\n[9/11] 导入 SQL...")
|
|||
|
|
success, msg = self.run_sql_script()
|
|||
|
|
if not success:
|
|||
|
|
self.log(f"警告:SQL 导入失败: {msg}")
|
|||
|
|
else:
|
|||
|
|
self.log("SQL 导入成功")
|
|||
|
|
|
|||
|
|
# 10. 截图
|
|||
|
|
self.log("\n[10/11] 开始截图...")
|
|||
|
|
try:
|
|||
|
|
screenshots = asyncio.run(self.capture_screenshots_with_playwright(output_dir))
|
|||
|
|
self.log(f"共生成 {len(screenshots)} 张截图")
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"截图失败: {str(e)}")
|
|||
|
|
|
|||
|
|
# 11. 收尾工作
|
|||
|
|
self.log("\n[11/11] 执行收尾工作...")
|
|||
|
|
self.finalize_screenshots(output_dir)
|
|||
|
|
self.log("完整流程执行完成!")
|
|||
|
|
self.log("=" * 50)
|
|||
|
|
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log(f"\n[ERROR] 流程执行失败: {str(e)}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
# 测试代码
|
|||
|
|
config = ProjectConfig(
|
|||
|
|
project_path=r"D:\new_project",
|
|||
|
|
desktop_path=r"C:\Users\Administrator\Desktop",
|
|||
|
|
has_front=True
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
automation = ProjectScreenshotAutomation(config)
|
|||
|
|
automation.run_full_flow()
|