diff --git a/baidu_uploader.py b/baidu_uploader.py index cee1cb3..6d175e1 100644 --- a/baidu_uploader.py +++ b/baidu_uploader.py @@ -1,21 +1,30 @@ from __future__ import annotations import os -import time import re -from typing import Optional, Callable +import time +from typing import Callable, Optional + +from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from playwright.sync_api import sync_playwright + class BaiduUploader: """ 百度网盘上传工具 - 纯净上传版 (移除分享逻辑) """ - def __init__(self, chrome_path: str, cookies_dir: str, log_callback: Optional[Callable[[str], None]] = None): + + def __init__( + self, + chrome_path: str, + cookies_dir: str, + log_callback: Optional[Callable[[str], None]] = None, + ): self.chrome_path = chrome_path self.cookies_dir = cookies_dir self.log_callback = log_callback self.url = "https://pan.baidu.com/" - + if not os.path.exists(self.cookies_dir): os.makedirs(self.cookies_dir, exist_ok=True) @@ -24,7 +33,12 @@ class BaiduUploader: if self.log_callback: self.log_callback(message) - def upload_file(self, file_path: str, target_folder_name: str, root_path: str = "精品项目整理") -> bool: + def upload_file( + self, + file_path: str, + target_folder_name: str, + root_path: str = "精品项目整理", + ) -> bool: """ 上传文件到百度网盘 """ @@ -33,145 +47,778 @@ class BaiduUploader: return False filename = os.path.basename(file_path) - + file_size_mb = os.path.getsize(file_path) / 1024 / 1024 + context = None + # 提取项目编号 project_id = "" - match = re.search(r'(【[A-Za-z0-9]+】)', target_folder_name) + match = re.search(r"(【[A-Za-z0-9]+】)", target_folder_name) if match: project_id = match.group(1) try: with sync_playwright() as p: - self._log(f"正在启动浏览器...") - + self._log("正在启动浏览器...") + launch_args = { "user_data_dir": self.cookies_dir, "headless": False, - "executable_path": self.chrome_path if self.chrome_path and os.path.exists(self.chrome_path) else None, - "viewport": {"width": 1280, "height": 800} + "executable_path": self.chrome_path + if self.chrome_path and os.path.exists(self.chrome_path) + else None, + "viewport": {"width": 1280, "height": 800}, } if not launch_args["executable_path"]: launch_args.pop("executable_path") context = p.chromium.launch_persistent_context(**launch_args) - context.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") - + context.add_init_script( + "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" + ) + page = context.new_page() page.set_default_timeout(60000) - + # 1. 登录 self._log("检测登录状态...") - page.goto(self.url, wait_until="domcontentloaded") - - if not self._wait_for_login(page): - self._log("请手动扫码登录...") - page.wait_for_selector("text=全部文件", timeout=180000) - + page.goto(self.url, wait_until="domcontentloaded", timeout=60000) + + if not self._wait_for_login(page, timeout=20): + self._log("请手动扫码登录,登录后会自动继续...") + if not self._wait_for_login(page, timeout=180): + raise RuntimeError("等待百度网盘登录超时") + # 2. 导航 self._log("登录成功,进入目标路径...") - page.get_by_text("全部文件").first.click(force=True) - time.sleep(4) - - if not self._scan_and_enter(page, root_path): - self._create_folder(page, root_path) - time.sleep(2); self._scan_and_enter(page, root_path) - + self._open_all_files(page) + + self._log("等待文件列表加载...") + if not self._wait_for_file_list_loaded(page): + self._log("警告:文件列表加载超时,继续尝试目录检查...") + + self._log(f"准备进入根目录: {root_path}") + if not self._ensure_folder(page, root_path): + raise RuntimeError(f"无法进入或创建根目录: {root_path}") + search_key = project_id if project_id else target_folder_name - if not self._scan_and_enter(page, search_key, is_fuzzy=True): - self._create_folder(page, target_folder_name) - time.sleep(2); self._scan_and_enter(page, search_key, is_fuzzy=True) - + self._log(f"准备进入项目目录: {target_folder_name}") + if not self._ensure_folder( + page, + search_key, + create_name=target_folder_name, + is_fuzzy=bool(project_id), + ): + raise RuntimeError(f"无法进入或创建项目目录: {target_folder_name}") + # 3. 查重 if self._check_file_exists_robust(page, filename): self._log(f"云端已存在文件 '{filename}',跳过上传。") time.sleep(2) - context.close() return True - + # 4. 执行上传 - self._log(f"开始传输: {filename}") - page.set_input_files("input[type=file]", file_path) - + self._log(f"开始传输: {filename} ({file_size_mb:.2f} MB)") + self._clear_old_notifications(page) + self._select_upload_file(page, file_path) + + if not self._wait_for_upload_started(page, filename): + self._log("警告:未检测到明显的上传任务,继续监测文件列表。") + else: + self._log("检测到上传任务已建立。") + # 5. 监测列表确认成功 self._log("上传已启动,监测云端状态中...") - success = self._wait_for_file_in_list(page, filename) - + success = self._wait_for_upload_finished(page, filename) + if success: - self._log("\n" + "="*30) + self._log("\n" + "=" * 30) self._log("百度网盘上传圆满成功!") - self._log("="*30 + "\n") + self._log("=" * 30 + "\n") time.sleep(3) else: self._log("警告:监测超时,未能在列表中看到文件,请手动核实。") - context.close() return success except Exception as e: self._log(f"程序异常: {str(e)}") return False + finally: + if context: + try: + context.close() + except Exception: + pass - def _wait_for_login(self, page) -> bool: - for _ in range(10): - if page.get_by_text("全部文件").count() > 0: return True - time.sleep(2) + def _wait_for_login(self, page, timeout: float = 20) -> bool: + start_time = time.time() + while time.time() - start_time < timeout: + try: + body_text = self._body_text(page) + if "全部文件" in body_text or "我的文件" in body_text: + return True + except Exception: + pass + time.sleep(1) + return False + + def _open_all_files(self, page): + if self._click_text(page, ["全部文件", "我的文件"]): + time.sleep(1) + return + self._log("未找到“全部文件”入口,尝试直接在当前页面继续。") + + def _wait_for_file_list_loaded( + self, + page, + timeout: float = 8, + poll_interval: float = 0.3, + ) -> bool: + """ + 等待目录页真正进入可操作状态。 + 百度网盘目录可能为空,所以不能只靠“列表里有项目”来判断。 + """ + start_time = time.time() + while time.time() - start_time < timeout: + try: + state = page.evaluate( + """ + () => { + const bodyText = document.body ? document.body.innerText : ''; + const itemCount = document.querySelectorAll( + 'span[title], a[title], .wp-s-file-list-drag-copy__item-title-text, .file-name' + ).length; + const breadcrumbCount = document.querySelectorAll( + '.wp-s-file-list-drag-copy__header-breadcrumb-item' + ).length; + const hasCreateFolderButton = bodyText.includes('新建文件夹'); + const hasAllFilesText = bodyText.includes('全部文件'); + const hasEmptyState = ['暂无文件', '空文件夹', '拖拽文件到此处上传', '上传文件到百度网盘'] + .some(keyword => bodyText.includes(keyword)); + const hasLoadingText = ['加载中', '正在加载', '请稍候'] + .some(keyword => bodyText.includes(keyword)); + const hasLoadingMask = !!document.querySelector( + '[class*="loading"], [class*="Loading"], [class*="skeleton"], [class*="Skeleton"], [class*="spin"], [class*="Spin"]' + ); + return { + itemCount, + breadcrumbCount, + hasCreateFolderButton, + hasAllFilesText, + hasEmptyState, + hasLoadingText, + hasLoadingMask, + }; + } + """ + ) + + ready = ( + (state["breadcrumbCount"] > 0 or state["hasAllFilesText"]) + and state["hasCreateFolderButton"] + and ( + state["itemCount"] > 0 + or state["hasEmptyState"] + or (not state["hasLoadingText"] and not state["hasLoadingMask"]) + ) + ) + if ready: + return True + except Exception: + pass + time.sleep(poll_interval) + return False + + def _find_matching_item( + self, + page, + target_name: str, + is_fuzzy: bool = False, + ) -> Optional[str]: + try: + return page.evaluate( + """ + (args) => { + const { name, fuzzy } = args; + const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); + const visible = element => { + if (!element) return false; + const style = window.getComputedStyle(element); + const rect = element.getBoundingClientRect(); + return style.display !== 'none' + && style.visibility !== 'hidden' + && rect.width > 0 + && rect.height > 0; + }; + const els = [ + ...document.querySelectorAll( + 'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name, [class*="name"], [class*="Name"], [class*="file"], [class*="File"]' + ), + ].filter(visible); + const target = els.find(e => { + const values = [ + normalize(e.getAttribute('title')), + normalize(e.getAttribute('aria-label')), + normalize(e.innerText || e.textContent), + ].filter(Boolean); + if (!values.length) { + return false; + } + return values.some(value => fuzzy ? value.includes(name) : value === name); + }); + if (!target) { + return null; + } + return normalize( + target.getAttribute('title') + || target.getAttribute('aria-label') + || target.innerText + || target.textContent + ); + } + """, + {"name": target_name, "fuzzy": is_fuzzy}, + ) + except Exception: + return None + + def _matches_name(self, source: str, target_name: str, is_fuzzy: bool = False) -> bool: + source = (source or "").strip() + if not source: + return False + return target_name in source if is_fuzzy else source == target_name + + def _page_contains_text(self, page, text: str) -> bool: + try: + return bool( + page.evaluate( + """ + (target) => { + const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); + const bodyText = normalize(document.body ? document.body.innerText : ''); + return !!target && bodyText.includes(normalize(target)); + } + """, + text, + ) + ) + except Exception: + return False + + def _get_navigation_context(self, page) -> dict: + try: + return page.evaluate( + """ + () => { + const normalize = text => (text || '').replace(/\\s+/g, ' ').trim(); + const breadcrumbs = [ + ...document.querySelectorAll( + '.wp-s-file-list-drag-copy__header-breadcrumb-item, [class*="breadcrumb"] a, [class*="breadcrumb"] span, [class*="Breadcrumb"] a, [class*="Breadcrumb"] span' + ), + ] + .map(node => normalize(node.getAttribute('title') || node.innerText || node.textContent)) + .filter(Boolean); + const itemTitles = [ + ...document.querySelectorAll( + 'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name, [class*="name"], [class*="Name"]' + ), + ] + .map(node => normalize( + node.getAttribute('title') + || node.getAttribute('aria-label') + || node.innerText + || node.textContent + )) + .filter(Boolean); + return { + url: window.location.href, + breadcrumbs, + itemTitles, + }; + } + """ + ) + except Exception: + return { + "url": "", + "breadcrumbs": [], + "itemTitles": [], + } + + def _wait_for_folder_visible( + self, + page, + target_name: str, + is_fuzzy: bool = False, + exact_name: Optional[str] = None, + timeout: float = 8, + poll_interval: float = 0.3, + ) -> bool: + start_time = time.time() + while time.time() - start_time < timeout: + if exact_name and self._find_matching_item(page, exact_name, False): + return True + if self._find_matching_item(page, target_name, is_fuzzy): + return True + if exact_name and self._page_contains_text(page, exact_name): + return True + time.sleep(poll_interval) + return False + + def _wait_for_folder_entered( + self, + page, + target_name: str, + is_fuzzy: bool = False, + timeout: float = 8, + before_url: str = "", + before_had_target: bool = False, + poll_interval: float = 0.25, + ) -> bool: + start_time = time.time() + while time.time() - start_time < timeout: + context = self._get_navigation_context(page) + breadcrumbs = context.get("breadcrumbs", []) + current_url = context.get("url", "") + item_titles = context.get("itemTitles", []) + + breadcrumb_match = next( + (crumb for crumb in breadcrumbs if self._matches_name(crumb, target_name, is_fuzzy)), + "", + ) + url_changed = bool(before_url and current_url and current_url != before_url) + list_no_longer_shows_target = not any( + self._matches_name(title, target_name, is_fuzzy) for title in item_titles + ) + + if breadcrumb_match or url_changed or (before_had_target and list_no_longer_shows_target): + if not self._wait_for_file_list_loaded(page, timeout=4, poll_interval=0.2): + display_name = breadcrumb_match or target_name + self._log(f"警告:已进入目录 {display_name},但列表就绪确认超时,继续后续流程。") + return True + time.sleep(poll_interval) + + context = self._get_navigation_context(page) + breadcrumbs = " > ".join(context.get("breadcrumbs", [])) or "无" + current_url = context.get("url", "") or "无" + self._log( + f"进入目录确认超时: 目标={target_name}, 当前面包屑={breadcrumbs}, 当前URL={current_url}" + ) + return False + + def _ensure_folder( + self, + page, + target_name: str, + create_name: Optional[str] = None, + is_fuzzy: bool = False, + ) -> bool: + create_name = create_name or target_name + + if not self._wait_for_file_list_loaded(page, timeout=5, poll_interval=0.2): + self._log("警告:当前目录列表未完全确认加载,但仍继续检查目录。") + + matched_name = self._find_matching_item(page, target_name, is_fuzzy) + if matched_name: + self._log(f"已找到目录: {matched_name},准备进入...") + return self._wait_and_enter_folder( + page, + target_name, + is_fuzzy=is_fuzzy, + exact_name=matched_name, + timeout=18, + ) + + self._log(f"未找到目录,开始创建: {create_name}") + if not self._create_folder(page, create_name): + return False + + if not self._wait_and_enter_folder( + page, + target_name, + is_fuzzy=is_fuzzy, + exact_name=create_name, + timeout=35, + ): + self._log(f"目录创建后仍未能进入: {create_name}") + return False + + return True + + def _wait_and_enter_folder( + self, + page, + target_name: str, + is_fuzzy: bool = False, + exact_name: Optional[str] = None, + timeout: float = 30, + poll_interval: float = 1, + ) -> bool: + start_time = time.time() + attempt = 0 + + while time.time() - start_time < timeout: + attempt += 1 + + if exact_name and self._find_matching_item(page, exact_name, False): + self._log(f"确认目录已出现: {exact_name},尝试进入...") + if self._scan_and_enter(page, exact_name, is_fuzzy=False): + return True + + matched_name = self._find_matching_item(page, target_name, is_fuzzy) + if matched_name: + self._log(f"确认目录已出现: {matched_name},尝试进入...") + if self._scan_and_enter(page, target_name, is_fuzzy=is_fuzzy): + return True + + if attempt % 5 == 0: + self._log(f"目录暂未稳定可进入,刷新当前目录后重试: {exact_name or target_name}") + self._refresh_current_folder(page) + + time.sleep(poll_interval) + + context = self._get_navigation_context(page) + item_titles = " | ".join(context.get("itemTitles", [])[:20]) or "无" + self._log(f"目录确认进入失败: {exact_name or target_name},当前列表: {item_titles}") return False def _scan_and_enter(self, page, target_name: str, is_fuzzy: bool = False) -> bool: try: page.evaluate("window.scrollTo(0, 300)") - found = page.evaluate(f""" - (args) => {{ - const {{ name, fuzzy }} = args; - const els = [...document.querySelectorAll('span[title], a[title], .wp-s-file-list-drag-copy__item-title-text')]; - const target = els.find(e => {{ - const title = e.getAttribute('title') || e.innerText.trim(); - return fuzzy ? title.includes(name) : title === name; - }}); - if (target) {{ - target.click(); - const ev = new MouseEvent('dblclick', {{ 'view': window, 'bubbles': true, 'cancelable': true }}); - target.dispatchEvent(ev); + before_context = self._get_navigation_context(page) + before_url = before_context.get("url", "") + before_item_titles = before_context.get("itemTitles", []) + before_had_target = any( + self._matches_name(title, target_name, is_fuzzy) for title in before_item_titles + ) + found = page.evaluate( + """ + (args) => { + const { name, fuzzy } = args; + const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); + const visible = element => { + if (!element) return false; + const style = window.getComputedStyle(element); + const rect = element.getBoundingClientRect(); + return style.display !== 'none' + && style.visibility !== 'hidden' + && rect.width > 0 + && rect.height > 0; + }; + const els = [ + ...document.querySelectorAll( + 'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name, [class*="name"], [class*="Name"]' + ) + ].filter(visible); + const target = els.find(e => { + const values = [ + normalize(e.getAttribute('title')), + normalize(e.getAttribute('aria-label')), + normalize(e.innerText || e.textContent), + ].filter(Boolean); + return values.some(value => fuzzy ? value.includes(name) : value === name); + }); + if (target) { + const row = target.closest('tr, li, [class*="item"], [class*="Item"], [class*="row"], [class*="Row"], [class*="file"], [class*="File"]') || target; + row.scrollIntoView({block: 'center'}); + row.click(); + const ev = new MouseEvent('dblclick', { view: window, bubbles: true, cancelable: true }); + row.dispatchEvent(ev); return true; - }} + } return false; - }} - """, {"name": target_name, "fuzzy": is_fuzzy}) - if found: time.sleep(3); return True + } + """, + {"name": target_name, "fuzzy": is_fuzzy}, + ) + if not found: + try: + locator = page.get_by_text(target_name, exact=not is_fuzzy).first + if locator.count() == 0: + return False + locator.click(force=True) + locator.dblclick(force=True) + found = True + except Exception: + return False + time.sleep(0.2) + return self._wait_for_folder_entered( + page, + target_name, + is_fuzzy=is_fuzzy, + before_url=before_url, + before_had_target=before_had_target, + poll_interval=0.2, + ) + except Exception: return False - except: return False - def _create_folder(self, page, folder_name: str): + def _create_folder(self, page, folder_name: str) -> bool: try: page.keyboard.press("Escape") btn = page.get_by_text("新建文件夹").first - if btn.count() > 0: - btn.click(force=True) - time.sleep(1.5) - page.keyboard.type(folder_name, delay=50) - page.keyboard.press("Enter") - time.sleep(3) - except: pass + if btn.count() == 0: + self._log("未找到“新建文件夹”按钮") + return False + + btn.click(force=True) + time.sleep(0.8) + page.keyboard.type(folder_name, delay=50) + page.keyboard.press("Enter") + + if self._wait_for_folder_visible( + page, + folder_name, + exact_name=folder_name, + timeout=8, + poll_interval=0.2, + ): + self._log(f"文件夹创建完成: {folder_name}") + return True + + self._log(f"文件夹创建后未立即在列表中看到,刷新后继续确认: {folder_name}") + self._refresh_current_folder(page) + if self._wait_for_folder_visible( + page, + folder_name, + exact_name=folder_name, + timeout=8, + poll_interval=0.2, + ): + self._log(f"文件夹创建完成: {folder_name}") + return True + + self._log(f"已提交文件夹创建请求,交给后续流程继续确认: {folder_name}") + return True + except Exception as exc: + self._log(f"创建文件夹失败: {folder_name},原因: {exc}") + return False def _check_file_exists_robust(self, page, filename: str) -> bool: - return page.evaluate(f""" - (name) => {{ - const els = [...document.querySelectorAll('span[title], a[title], .wp-s-file-list-drag-copy__item-title-text')]; - return els.some(e => (e.getAttribute('title') || e.innerText.trim()) === name); - }} - """, filename) + try: + return bool( + page.evaluate( + """ + (name) => { + const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); + const visible = element => { + if (!element) return false; + const style = window.getComputedStyle(element); + const rect = element.getBoundingClientRect(); + return style.display !== 'none' + && style.visibility !== 'hidden' + && rect.width > 0 + && rect.height > 0; + }; + const els = [ + ...document.querySelectorAll( + 'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name' + ), + ].filter(visible); + return els.some(e => { + const title = normalize(e.getAttribute('title')); + const label = normalize(e.getAttribute('aria-label')); + const text = normalize(e.innerText || e.textContent); + return title === name || label === name || text === name; + }); + } + """, + filename, + ) + ) + except Exception: + try: + locator = page.get_by_text(filename, exact=True) + count = locator.count() + for index in range(min(count, 5)): + if locator.nth(index).is_visible(): + return True + except Exception: + pass + return False + + def _select_upload_file(self, page, file_path: str): + input_locator = page.locator("input[type=file]") + if input_locator.count() == 0: + if not self._click_text(page, ["上传", "上传文件"]): + self._log("未找到“上传”按钮,继续等待上传控件。") + page.wait_for_selector("input[type=file]", timeout=10000) + + page.locator("input[type=file]").first.set_input_files(file_path) + + def _wait_for_upload_started( + self, + page, + filename: str, + timeout: float = 20, + ) -> bool: + start_time = time.time() + while time.time() - start_time < timeout: + state = self._upload_state(page, filename) + if state["uploading"] or state["item_active"] or state["file_exists"]: + return True + time.sleep(0.5) + return False + + def _wait_for_upload_finished( + self, + page, + filename: str, + timeout: float = 1800, + ) -> bool: + start_time = time.time() + stable_count = 0 + last_active_log = 0.0 + last_refresh = time.time() + + while time.time() - start_time < timeout: + state = self._upload_state(page, filename) + + if state["has_error"]: + self._log("检测到页面提示上传失败。") + return False + + if state["file_exists"] and not state["item_active"] and not state["uploading"]: + stable_count += 1 + if stable_count >= 3: + self._log("文件已出现在列表中,且上传状态连续稳定。") + time.sleep(3) + return self._check_file_exists_robust(page, filename) + + self._log(f"上传疑似完成,稳定性校验 ({stable_count}/3)...") + else: + stable_count = 0 + now = time.time() + if (state["uploading"] or state["item_active"]) and now - last_active_log >= 15: + self._log("监测到活跃传输流,继续等待...") + last_active_log = now + + if time.time() - last_refresh > 25: + self._refresh_current_folder(page) + last_refresh = time.time() + + time.sleep(3) + + return False + + def _upload_state(self, page, filename: str) -> dict: + try: + state = page.evaluate( + """ + (filename) => { + const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); + const visible = element => { + if (!element) return false; + const style = window.getComputedStyle(element); + const rect = element.getBoundingClientRect(); + return style.display !== 'none' + && style.visibility !== 'hidden' + && rect.width > 0 + && rect.height > 0; + }; + const bodyText = normalize(document.body ? document.body.innerText : ''); + const nodes = [ + ...document.querySelectorAll( + 'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name, [class*="file"], [class*="File"]' + ) + ].filter(visible); + + const matchingNode = nodes.find(node => { + const title = normalize(node.getAttribute('title')); + const label = normalize(node.getAttribute('aria-label')); + const text = normalize(node.innerText || node.textContent); + return title === filename || label === filename || text === filename; + }); + + const container = matchingNode + ? matchingNode.closest('tr, li, [class*="item"], [class*="Item"], [class*="row"], [class*="Row"], [class*="file"], [class*="File"]') + : null; + const containerText = normalize(container ? container.innerText : ''); + const itemActiveWords = ['正在上传', '上传中', '等待上传', '传输队列', '传输中', '0B/s', '校验中', '处理中']; + const pageActiveWords = ['正在上传', '上传中', '等待上传', '传输队列', '传输中', '校验中', '处理中']; + const errorWords = ['上传失败', '传输失败', '网络异常', '上传出错', '文件上传失败']; + const progressPattern = /(?:^|\\D)(?:100|\\d{1,2})(?:\\.\\d+)?%/; + const itemHasProgress = container + ? [...container.querySelectorAll('[class*="progress"], [class*="Progress"]')].some(visible) + : false; + + const itemActive = itemActiveWords.some(word => containerText.includes(word)) + || progressPattern.test(containerText) + || itemHasProgress; + const uploading = itemActive + || (bodyText.includes(filename) + && (pageActiveWords.some(word => bodyText.includes(word)) + || progressPattern.test(bodyText))); + const hasError = errorWords.some(word => bodyText.includes(word)); + + return { + file_exists: Boolean(matchingNode), + item_active: itemActive, + uploading, + has_error: hasError, + }; + } + """, + filename, + ) + return { + "file_exists": bool(state.get("file_exists")), + "item_active": bool(state.get("item_active")), + "uploading": bool(state.get("uploading")), + "has_error": bool(state.get("has_error")), + } + except Exception: + return { + "file_exists": self._check_file_exists_robust(page, filename), + "item_active": False, + "uploading": False, + "has_error": False, + } + + def _refresh_current_folder(self, page): + try: + page.locator(".wp-s-file-list-drag-copy__header-breadcrumb-item").last.click() + time.sleep(2) + except Exception: + try: + page.keyboard.press("F5") + page.wait_for_load_state("domcontentloaded", timeout=10000) + except Exception: + pass + + def _click_text(self, page, texts: list[str]) -> bool: + for text in texts: + for exact in (True, False): + try: + locator = page.get_by_text(text, exact=exact) + count = locator.count() + for index in range(min(count, 5)): + item = locator.nth(index) + if item.is_visible(): + item.click(force=True, timeout=5000) + return True + except PlaywrightTimeoutError: + continue + except Exception: + continue + return False + + def _clear_old_notifications(self, page): + try: + page.evaluate( + """ + () => { + document + .querySelectorAll('.ant-message-notice, .u-toast, [class*="toast"], [class*="Toast"]') + .forEach(element => element.remove()); + } + """ + ) + except Exception: + pass + + def _body_text(self, page) -> str: + return page.evaluate("() => document.body ? document.body.innerText : ''") def _wait_for_file_in_list(self, page, filename: str) -> bool: - start_time = time.time(); last_refresh = time.time() - while time.time() - start_time < 1800: - if self._check_file_exists_robust(page, filename): - content = page.content() - # 如果文件在列表里,且页面内容没在报错或重试 - if not any(x in content for x in ["正在上传", "传输队列", "0B/s"]): - return True - if time.time() - last_refresh > 25: - try: page.locator(".wp-s-file-list-drag-copy__header-breadcrumb-item").last.click(); time.sleep(3) - except: pass - last_refresh = time.time() - time.sleep(5) - return False + return self._wait_for_upload_finished(page, filename) diff --git a/gui.py b/gui.py index dbc4e90..0fb3c2c 100644 --- a/gui.py +++ b/gui.py @@ -962,6 +962,8 @@ class YidaimaGUI: upload_frame.pack(fill=tk.X) ttk.Button(upload_frame, text="整理项目", command=self._ps_organize_project, width=15).pack(side=tk.LEFT, padx=(0, 5)) + ttk.Button(upload_frame, text="整理并上传夸克", command=self._ps_organize_upload_cloud, width=18).pack(side=tk.LEFT, padx=(5, 5)) + ttk.Button(upload_frame, text="整理并上传百度", command=self._ps_organize_upload_baidu, width=18).pack(side=tk.LEFT, padx=(5, 5)) ttk.Button(upload_frame, text="上传夸克网盘", command=self._ps_upload_quark, width=15).pack(side=tk.LEFT, padx=(5, 5)) ttk.Button(upload_frame, text="上传百度网盘", command=self._ps_upload_baidu, width=15).pack(side=tk.LEFT, padx=(5, 0)) @@ -1577,6 +1579,69 @@ class YidaimaGUI: threading.Thread(target=run, daemon=True).start() + def _ps_organize_upload_baidu(self): + """整理项目并上传到百度网盘""" + def run(): + try: + self._ps_set_running(True) + self._ps_log("=" * 50) + self._ps_log("开始整理项目并打包...") + config = self._get_ps_config() + automation = ProjectScreenshotAutomation(config, log_callback=self._ps_log) + + project_name = self.ps_project_name_var.get().strip() + if not project_name: + project_name = None + else: + self._ps_log(f"使用项目名称: {project_name}") + + zip_file = automation.organize_and_zip_project(override_name=project_name) + + if zip_file: + self._ps_log("=" * 50) + self._ps_log(f"项目整理完成!压缩包已生成:{zip_file}") + self._ps_log("\n开始准备上传到百度网盘...") + + chrome_path = self.config.get("chrome.path", "") + baidu_cookies_dir = self.config.get("baidu.cookies_dir", os.path.join(os.getcwd(), "data", "baidu_cookies")) + baidu_root_path = self.config.get("baidu.root_path", "精品项目整理") + + if not project_name: + project_name = os.path.basename(zip_file).replace(".zip", "") + + uploader = BaiduUploader( + chrome_path=chrome_path, + cookies_dir=baidu_cookies_dir, + log_callback=self._ps_log + ) + + success = uploader.upload_file( + file_path=zip_file, + target_folder_name=project_name, + root_path=baidu_root_path + ) + + if success: + self._ps_log("=" * 50) + self._ps_log("项目已成功整理并上传到百度网盘!") + self.root.after(0, lambda: messagebox.showinfo("成功", "项目整理并上传百度网盘成功!")) + else: + self._ps_log("=" * 50) + self._ps_log("上传百度网盘失败,请检查日志。") + self.root.after(0, lambda: messagebox.showerror("错误", "上传百度网盘失败,请检查日志。")) + else: + self._ps_log("=" * 50) + self._ps_log("项目整理失败,请检查日志。") + self.root.after(0, lambda: messagebox.showerror("错误", "项目整理并打包失败,请检查日志。")) + + except Exception as e: + self._ps_log(f"错误: {str(e)}") + self.root.after(0, lambda: messagebox.showerror("错误", f"执行异常: {str(e)}")) + finally: + self._ps_set_running(False) + + threading.Thread(target=run, daemon=True).start() + def _ps_organize_upload_cloud(self): """整理项目并上传网盘""" def run(): diff --git a/quark_uploader.py b/quark_uploader.py index 1e5f5c9..2eae106 100644 --- a/quark_uploader.py +++ b/quark_uploader.py @@ -2,19 +2,30 @@ from __future__ import annotations import os import time -from typing import Optional, Callable +from typing import Callable, Optional + +from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from playwright.sync_api import sync_playwright + class QuarkUploader: """ - 夸克网盘上传工具 - 终极稳健版 (双重状态锁) + 夸克网盘上传工具。 + + 通过真实 Chrome + 持久用户目录上传,避免依赖非公开接口。 """ - def __init__(self, chrome_path: str, cookies_dir: str, log_callback: Optional[Callable[[str], None]] = None): + + def __init__( + self, + chrome_path: str, + cookies_dir: str, + log_callback: Optional[Callable[[str], None]] = None, + ): self.chrome_path = chrome_path self.cookies_dir = cookies_dir self.log_callback = log_callback self.url = "https://pan.quark.cn/" - + if not os.path.exists(self.cookies_dir): os.makedirs(self.cookies_dir, exist_ok=True) @@ -23,144 +34,403 @@ class QuarkUploader: if self.log_callback: self.log_callback(message) - def upload_file(self, file_path: str, target_folder_name: str, root_path: str = "精品项目整理") -> bool: + def upload_file( + self, + file_path: str, + target_folder_name: str, + root_path: str = "精品项目整理", + ) -> bool: """ - 上传文件到夸克网盘 + 上传文件到夸克网盘。 """ if not os.path.exists(file_path): self._log(f"错误: 本地文件不存在 {file_path}") return False filename = os.path.basename(file_path) + file_size_mb = os.path.getsize(file_path) / 1024 / 1024 + context = None try: with sync_playwright() as p: - self._log(f"正在启动浏览器...") + self._log("正在启动浏览器...") launch_args = { "user_data_dir": self.cookies_dir, "headless": False, - "viewport": {"width": 1280, "height": 800} + "viewport": {"width": 1280, "height": 800}, } if self.chrome_path and os.path.exists(self.chrome_path): launch_args["executable_path"] = self.chrome_path - - context = p.chromium.launch_persistent_context(**launch_args) - page = context.new_page() - - self._log("正在打开夸克网盘...") - page.goto(self.url, wait_until="networkidle") - - # 1. 登录检测 - try: - page.wait_for_selector("text=全部文件", timeout=10000) - except: - self._log("请在浏览器中完成登录...") - page.wait_for_selector("text=全部文件", timeout=180000) - - # 2. 导航到目录 - self._log(f"定位目录: {root_path} > {target_folder_name}") - page.get_by_text("全部文件").first.click() - page.wait_for_timeout(2000) - - # 进入根目录 - if not page.get_by_text(root_path, exact=True).first.is_visible(): - self._create_folder(page, root_path) - page.get_by_text(root_path, exact=True).first.dblclick() - page.wait_for_timeout(1000) - - # 进入项目目录 - if not page.get_by_text(target_folder_name, exact=True).first.is_visible(): - self._create_folder(page, target_folder_name) - page.get_by_text(target_folder_name, exact=True).first.dblclick() - page.wait_for_timeout(2000) - - # 3. 查重 - if page.get_by_text(filename, exact=True).count() > 0: - self._log(f"云端已存在 '{filename}',跳过上传。") - context.close() - return True - - # 4. 执行上传 - self._log(f"开始上传: {filename}") - # 清除可能存在的旧“上传成功”提示 - page.evaluate("() => { const els = document.querySelectorAll('.ant-notification-notice'); els.forEach(e => e.remove()); }") - - page.set_input_files("input[type=file]", file_path) - - # 5. 确认上传已启动 (关键一步) - self._log("等待上传任务初始化...") - try: - # 等待出现任何进度指示元素 - page.wait_for_selector("text=% , .ant-progress-inner, text=正在上传", timeout=20000) - self._log("检测到上传流已建立。") - except: - self._log("警告:未检测到明显的进度条,可能由于文件较小或 UI 延迟,继续监测列表状态。") - # 6. 深度监测循环 - self._log("进入深度监测模式,请保持浏览器前台运行...") - start_time = time.time() - timeout = 1800 # 30分钟 - - # 连续稳定次数计数 - stable_count = 0 - - while time.time() - start_time < timeout: - # A. 检查全局进度标志 - is_uploading = page.get_by_text("%").is_visible() or \ - page.get_by_text("正在上传").is_visible() or \ - page.locator(".ant-progress-inner").is_visible() - - # B. 检查列表中该行的具体状态 (精准匹配) - # 找到包含文件名的那一行 tr,看里面是否有“正在上传”字样 - row_status_text = "" - try: - row = page.locator(f"tr:has-text('{filename}')").first - if row.is_visible(): - row_status_text = row.inner_text() - except: - pass - - is_item_active = "正在上传" in row_status_text or "%" in row_status_text or "等待上传" in row_status_text - - # C. 确认列表里确实有这个文件 - in_list = page.get_by_text(filename, exact=True).count() > 0 - - if not is_uploading and not is_item_active and in_list: - stable_count += 1 - if stable_count >= 5: # 连续 5 次检测(约 15 秒)都处于稳定态 - self._log("检测到上传任务已从任务列表中消失,且列表文件状态正常。") - self._log("为了绝对安全,最后等待 15 秒进行数据落盘同步...") - page.wait_for_timeout(15000) - self._log("上传确认圆满成功!") - context.close() - return True - else: - self._log(f"上传疑似完成,正在进行稳定性校验 ({stable_count}/5)...") - else: - if is_uploading or is_item_active: - self._log("监测到活跃传输流...") - stable_count = 0 # 只要发现还在传,重置计数 - - time.sleep(3) + context = p.chromium.launch_persistent_context(**launch_args) + context.add_init_script( + "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" + ) + + page = context.new_page() + page.set_default_timeout(60000) + + self._log("正在打开夸克网盘...") + page.goto(self.url, wait_until="domcontentloaded", timeout=60000) + + if not self._wait_for_login(page, timeout=10): + self._log("请在浏览器中完成夸克登录,登录后会自动继续...") + if not self._wait_for_login(page, timeout=180): + raise RuntimeError("等待登录超时") + + self._log(f"定位目录: {root_path} > {target_folder_name}") + self._open_all_files(page) + self._wait_for_file_list_loaded(page) + + if not self._ensure_folder(page, root_path): + raise RuntimeError(f"无法进入或创建根目录: {root_path}") + + if not self._ensure_folder(page, target_folder_name): + raise RuntimeError(f"无法进入或创建项目目录: {target_folder_name}") + + if self._check_file_exists(page, filename): + self._log(f"云端已存在 '{filename}',跳过上传。") + return True + + self._clear_old_notifications(page) + self._log(f"开始上传: {filename} ({file_size_mb:.2f} MB)") + self._select_upload_file(page, file_path) + + if not self._wait_for_upload_started(page, filename): + self._log("警告:未检测到明显的上传任务,继续监测文件列表。") + else: + self._log("检测到上传任务已建立。") + + success = self._wait_for_upload_finished(page, filename) + if success: + self._log("夸克网盘上传确认成功。") + return True self._log("传输任务监测超时,请确认网速或手动核实。") - context.close() return False - except Exception as e: - self._log(f"代码执行异常: {str(e)}") + except Exception as exc: + self._log(f"代码执行异常: {exc}") + return False + finally: + if context: + try: + context.close() + except Exception: + pass + + def _wait_for_login(self, page, timeout: float) -> bool: + start_time = time.time() + while time.time() - start_time < timeout: + try: + body_text = self._body_text(page) + if "全部文件" in body_text or "我的文件" in body_text: + return True + except Exception: + pass + time.sleep(1) + return False + + def _open_all_files(self, page): + if self._click_text(page, ["全部文件", "我的文件"]): + page.wait_for_timeout(1000) + return + self._log("未找到“全部文件”入口,尝试直接在当前页面继续。") + + def _wait_for_file_list_loaded( + self, + page, + timeout: float = 12, + poll_interval: float = 0.4, + ) -> bool: + start_time = time.time() + while time.time() - start_time < timeout: + try: + state = page.evaluate( + """ + () => { + const text = document.body ? document.body.innerText : ''; + const loadingWords = ['加载中', '正在加载', '请稍候']; + const hasLoadingText = loadingWords.some(word => text.includes(word)); + const itemCount = document.querySelectorAll( + '[title], [aria-label], [class*="file"], [class*="File"]' + ).length; + const hasAction = ['新建', '上传', '全部文件', '我的文件'] + .some(word => text.includes(word)); + const hasEmptyState = ['暂无文件', '空文件夹', '拖拽文件'] + .some(word => text.includes(word)); + return {hasLoadingText, itemCount, hasAction, hasEmptyState}; + } + """ + ) + if ( + state["hasAction"] + and (state["itemCount"] > 0 or state["hasEmptyState"] or not state["hasLoadingText"]) + ): + return True + except Exception: + pass + time.sleep(poll_interval) + return False + + def _ensure_folder(self, page, folder_name: str) -> bool: + self._wait_for_file_list_loaded(page) + + if self._open_folder(page, folder_name): + return True + + self._log(f"目录不存在,准备创建: {folder_name}") + if not self._create_folder(page, folder_name): return False - def _create_folder(self, page, folder_name: str): - """创建目录""" + end_time = time.time() + 20 + while time.time() < end_time: + if self._open_folder(page, folder_name): + return True + time.sleep(1) + + return False + + def _open_folder(self, page, folder_name: str) -> bool: try: - page.get_by_text("新建").first.click() + locator = page.get_by_text(folder_name, exact=True) + count = locator.count() + for index in range(min(count, 5)): + item = locator.nth(index) + if item.is_visible(): + item.dblclick(force=True, timeout=5000) + page.wait_for_timeout(1200) + self._wait_for_file_list_loaded(page) + self._log(f"已进入目录: {folder_name}") + return True + except Exception: + pass + + return False + + def _create_folder(self, page, folder_name: str) -> bool: + try: + if not self._click_text(page, ["新建"]): + self._log("未找到“新建”按钮。") + return False + page.wait_for_timeout(500) - page.get_by_text("新建文件夹").first.click() + if not self._click_text(page, ["新建文件夹", "文件夹"]): + self._log("未找到“新建文件夹”入口。") + return False + page.wait_for_timeout(500) page.keyboard.type(folder_name) page.keyboard.press("Enter") self._log(f"已创建目录: {folder_name}") page.wait_for_timeout(1500) - except: + return True + except Exception as exc: + self._log(f"创建目录失败: {exc}") + return False + + def _select_upload_file(self, page, file_path: str): + input_locator = page.locator("input[type=file]") + if input_locator.count() == 0: + self._click_text(page, ["上传", "上传文件"]) + page.wait_for_selector("input[type=file]", timeout=10000) + + page.locator("input[type=file]").first.set_input_files(file_path) + + def _wait_for_upload_started( + self, + page, + filename: str, + timeout: float = 20, + ) -> bool: + start_time = time.time() + while time.time() - start_time < timeout: + state = self._upload_state(page, filename) + if state["uploading"] or state["item_active"] or state["file_exists"]: + return True + time.sleep(0.5) + return False + + def _wait_for_upload_finished( + self, + page, + filename: str, + timeout: float = 1800, + ) -> bool: + start_time = time.time() + stable_count = 0 + last_active_log = 0.0 + + self._log("进入上传监测模式,请保持浏览器前台运行...") + while time.time() - start_time < timeout: + state = self._upload_state(page, filename) + + if state["has_error"]: + self._log("检测到页面提示上传失败。") + return False + + if state["file_exists"] and not state["uploading"] and not state["item_active"]: + stable_count += 1 + if stable_count >= 5: + self._log("文件已出现在列表中,且上传状态连续稳定。") + self._log("最后等待 15 秒进行网盘同步确认...") + page.wait_for_timeout(15000) + return self._check_file_exists(page, filename) + + self._log(f"上传疑似完成,稳定性校验 ({stable_count}/5)...") + else: + stable_count = 0 + now = time.time() + if (state["uploading"] or state["item_active"]) and now - last_active_log >= 15: + self._log("监测到活跃传输流,继续等待...") + last_active_log = now + + time.sleep(3) + + return False + + def _upload_state(self, page, filename: str) -> dict: + try: + state = page.evaluate( + """ + (filename) => { + const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); + const visible = element => { + if (!element) return false; + const style = window.getComputedStyle(element); + const rect = element.getBoundingClientRect(); + return style.display !== 'none' + && style.visibility !== 'hidden' + && rect.width > 0 + && rect.height > 0; + }; + const bodyText = normalize(document.body ? document.body.innerText : ''); + const nodes = [ + ...document.querySelectorAll( + '[title], [aria-label], [class*="file"], [class*="File"], [class*="name"], [class*="Name"]' + ) + ].filter(visible); + + const matchingNode = nodes.find(node => { + const title = normalize(node.getAttribute('title')); + const label = normalize(node.getAttribute('aria-label')); + const text = normalize(node.textContent); + return title === filename || label === filename || text === filename; + }); + + const container = matchingNode + ? matchingNode.closest('tr, li, [class*="item"], [class*="Item"], [class*="row"], [class*="Row"], [class*="file"], [class*="File"]') + : null; + const containerText = normalize(container ? container.innerText : ''); + const activeWords = ['正在上传', '上传中', '等待上传', '传输中', '解析中', '处理中']; + const errorWords = ['上传失败', '传输失败', '网络异常', '上传出错']; + const progressPattern = /(?:^|\\D)(?:100|\\d{1,2})(?:\\.\\d+)?%/; + const itemHasProgress = container + ? [...container.querySelectorAll( + '.ant-progress-inner, [class*="progress"], [class*="Progress"]' + )].some(visible) + : false; + + const itemActive = activeWords.some(word => containerText.includes(word)) + || progressPattern.test(containerText) + || itemHasProgress; + const uploading = activeWords.some(word => bodyText.includes(word)) + || itemActive; + const hasError = errorWords.some(word => bodyText.includes(word)); + + return { + file_exists: Boolean(matchingNode), + item_active: itemActive, + uploading, + has_error: hasError, + }; + } + """, + filename, + ) + return { + "file_exists": bool(state.get("file_exists")), + "item_active": bool(state.get("item_active")), + "uploading": bool(state.get("uploading")), + "has_error": bool(state.get("has_error")), + } + except Exception: + return { + "file_exists": self._check_file_exists(page, filename), + "item_active": False, + "uploading": False, + "has_error": False, + } + + def _check_file_exists(self, page, filename: str) -> bool: + try: + locator = page.get_by_text(filename, exact=True) + count = locator.count() + for index in range(min(count, 5)): + if locator.nth(index).is_visible(): + return True + except Exception: pass + + try: + names = page.evaluate( + """ + () => { + const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); + const visible = element => { + const style = window.getComputedStyle(element); + const rect = element.getBoundingClientRect(); + return style.display !== 'none' + && style.visibility !== 'hidden' + && rect.width > 0 + && rect.height > 0; + }; + const result = []; + for (const node of document.querySelectorAll('[title], [aria-label]')) { + if (!visible(node)) continue; + const title = normalize(node.getAttribute('title')); + const label = normalize(node.getAttribute('aria-label')); + if (title) result.push(title); + if (label) result.push(label); + } + return result; + } + """ + ) + return filename in names + except Exception: + return False + + def _click_text(self, page, texts: list[str]) -> bool: + for text in texts: + for exact in (True, False): + try: + locator = page.get_by_text(text, exact=exact) + count = locator.count() + for index in range(min(count, 5)): + item = locator.nth(index) + if item.is_visible(): + item.click(force=True, timeout=5000) + return True + except PlaywrightTimeoutError: + continue + except Exception: + continue + return False + + def _clear_old_notifications(self, page): + try: + page.evaluate( + """ + () => { + document + .querySelectorAll('.ant-notification-notice, .ant-message-notice') + .forEach(element => element.remove()); + } + """ + ) + except Exception: + pass + + def _body_text(self, page) -> str: + return page.evaluate("() => document.body ? document.body.innerText : ''")