from __future__ import annotations import os import re import time from typing import Callable, Optional from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from playwright.sync_api import sync_playwright class BaiduUploader: """ 百度网盘上传工具 - 纯净上传版 (移除分享逻辑) """ def __init__( self, chrome_path: str, cookies_dir: str, log_callback: Optional[Callable[[str], None]] = None, ): self.chrome_path = chrome_path self.cookies_dir = cookies_dir self.log_callback = log_callback self.url = "https://pan.baidu.com/" if not os.path.exists(self.cookies_dir): os.makedirs(self.cookies_dir, exist_ok=True) def _log(self, message: str): print(message) if self.log_callback: self.log_callback(message) def upload_file( self, file_path: str, target_folder_name: str, root_path: str = "精品项目整理", ) -> bool: """ 上传文件到百度网盘 """ if not os.path.exists(file_path): self._log(f"错误: 本地文件不存在 {file_path}") return False filename = os.path.basename(file_path) file_size_mb = os.path.getsize(file_path) / 1024 / 1024 context = None # 提取项目编号 project_id = "" match = re.search(r"(【[A-Za-z0-9]+】)", target_folder_name) if match: project_id = match.group(1) try: with sync_playwright() as p: self._log("正在启动浏览器...") launch_args = { "user_data_dir": self.cookies_dir, "headless": False, "executable_path": self.chrome_path if self.chrome_path and os.path.exists(self.chrome_path) else None, "viewport": {"width": 1280, "height": 800}, } if not launch_args["executable_path"]: launch_args.pop("executable_path") context = p.chromium.launch_persistent_context(**launch_args) context.add_init_script( "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" ) page = context.new_page() page.set_default_timeout(60000) # 1. 登录 self._log("检测登录状态...") page.goto(self.url, wait_until="domcontentloaded", timeout=60000) if not self._wait_for_login(page, timeout=20): self._log("请手动扫码登录,登录后会自动继续...") if not self._wait_for_login(page, timeout=180): raise RuntimeError("等待百度网盘登录超时") # 2. 导航 self._log("登录成功,进入目标路径...") self._open_all_files(page) self._log("等待文件列表加载...") if not self._wait_for_file_list_loaded(page): self._log("警告:文件列表加载超时,继续尝试目录检查...") self._log(f"准备进入根目录: {root_path}") if not self._ensure_folder(page, root_path): raise RuntimeError(f"无法进入或创建根目录: {root_path}") search_key = project_id if project_id else target_folder_name self._log(f"准备进入项目目录: {target_folder_name}") if not self._ensure_folder( page, search_key, create_name=target_folder_name, is_fuzzy=bool(project_id), ): raise RuntimeError(f"无法进入或创建项目目录: {target_folder_name}") # 3. 查重 if self._check_file_exists_robust(page, filename): self._log(f"云端已存在文件 '{filename}',跳过上传。") time.sleep(2) return True # 4. 执行上传 self._log(f"开始传输: {filename} ({file_size_mb:.2f} MB)") self._clear_old_notifications(page) self._select_upload_file(page, file_path) if not self._wait_for_upload_started(page, filename): self._log("警告:未检测到明显的上传任务,继续监测文件列表。") else: self._log("检测到上传任务已建立。") # 5. 监测列表确认成功 self._log("上传已启动,监测云端状态中...") success = self._wait_for_upload_finished(page, filename) if success: self._log("\n" + "=" * 30) self._log("百度网盘上传圆满成功!") self._log("=" * 30 + "\n") time.sleep(3) else: self._log("警告:监测超时,未能在列表中看到文件,请手动核实。") return success except Exception as e: self._log(f"程序异常: {str(e)}") return False finally: if context: try: context.close() except Exception: pass def _wait_for_login(self, page, timeout: float = 20) -> bool: start_time = time.time() while time.time() - start_time < timeout: try: body_text = self._body_text(page) if "全部文件" in body_text or "我的文件" in body_text: return True except Exception: pass time.sleep(1) return False def _open_all_files(self, page): if self._click_text(page, ["全部文件", "我的文件"]): time.sleep(1) return self._log("未找到“全部文件”入口,尝试直接在当前页面继续。") def _wait_for_file_list_loaded( self, page, timeout: float = 8, poll_interval: float = 0.3, ) -> bool: """ 等待目录页真正进入可操作状态。 百度网盘目录可能为空,所以不能只靠“列表里有项目”来判断。 """ start_time = time.time() while time.time() - start_time < timeout: try: state = page.evaluate( """ () => { const bodyText = document.body ? document.body.innerText : ''; const itemCount = document.querySelectorAll( 'span[title], a[title], .wp-s-file-list-drag-copy__item-title-text, .file-name' ).length; const breadcrumbCount = document.querySelectorAll( '.wp-s-file-list-drag-copy__header-breadcrumb-item' ).length; const hasCreateFolderButton = bodyText.includes('新建文件夹'); const hasAllFilesText = bodyText.includes('全部文件'); const hasEmptyState = ['暂无文件', '空文件夹', '拖拽文件到此处上传', '上传文件到百度网盘'] .some(keyword => bodyText.includes(keyword)); const hasLoadingText = ['加载中', '正在加载', '请稍候'] .some(keyword => bodyText.includes(keyword)); const hasLoadingMask = !!document.querySelector( '[class*="loading"], [class*="Loading"], [class*="skeleton"], [class*="Skeleton"], [class*="spin"], [class*="Spin"]' ); return { itemCount, breadcrumbCount, hasCreateFolderButton, hasAllFilesText, hasEmptyState, hasLoadingText, hasLoadingMask, }; } """ ) ready = ( (state["breadcrumbCount"] > 0 or state["hasAllFilesText"]) and state["hasCreateFolderButton"] and ( state["itemCount"] > 0 or state["hasEmptyState"] or (not state["hasLoadingText"] and not state["hasLoadingMask"]) ) ) if ready: return True except Exception: pass time.sleep(poll_interval) return False def _find_matching_item( self, page, target_name: str, is_fuzzy: bool = False, ) -> Optional[str]: try: return page.evaluate( """ (args) => { const { name, fuzzy } = args; const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); const visible = element => { if (!element) return false; const style = window.getComputedStyle(element); const rect = element.getBoundingClientRect(); return style.display !== 'none' && style.visibility !== 'hidden' && rect.width > 0 && rect.height > 0; }; const els = [ ...document.querySelectorAll( 'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name, [class*="name"], [class*="Name"], [class*="file"], [class*="File"]' ), ].filter(visible); const target = els.find(e => { const values = [ normalize(e.getAttribute('title')), normalize(e.getAttribute('aria-label')), normalize(e.innerText || e.textContent), ].filter(Boolean); if (!values.length) { return false; } return values.some(value => fuzzy ? value.includes(name) : value === name); }); if (!target) { return null; } return normalize( target.getAttribute('title') || target.getAttribute('aria-label') || target.innerText || target.textContent ); } """, {"name": target_name, "fuzzy": is_fuzzy}, ) except Exception: return None def _matches_name(self, source: str, target_name: str, is_fuzzy: bool = False) -> bool: source = (source or "").strip() if not source: return False return target_name in source if is_fuzzy else source == target_name def _page_contains_text(self, page, text: str) -> bool: try: return bool( page.evaluate( """ (target) => { const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); const bodyText = normalize(document.body ? document.body.innerText : ''); return !!target && bodyText.includes(normalize(target)); } """, text, ) ) except Exception: return False def _get_navigation_context(self, page) -> dict: try: return page.evaluate( """ () => { const normalize = text => (text || '').replace(/\\s+/g, ' ').trim(); const breadcrumbs = [ ...document.querySelectorAll( '.wp-s-file-list-drag-copy__header-breadcrumb-item, [class*="breadcrumb"] a, [class*="breadcrumb"] span, [class*="Breadcrumb"] a, [class*="Breadcrumb"] span' ), ] .map(node => normalize(node.getAttribute('title') || node.innerText || node.textContent)) .filter(Boolean); const itemTitles = [ ...document.querySelectorAll( 'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name, [class*="name"], [class*="Name"]' ), ] .map(node => normalize( node.getAttribute('title') || node.getAttribute('aria-label') || node.innerText || node.textContent )) .filter(Boolean); return { url: window.location.href, breadcrumbs, itemTitles, }; } """ ) except Exception: return { "url": "", "breadcrumbs": [], "itemTitles": [], } def _wait_for_folder_visible( self, page, target_name: str, is_fuzzy: bool = False, exact_name: Optional[str] = None, timeout: float = 8, poll_interval: float = 0.3, ) -> bool: start_time = time.time() while time.time() - start_time < timeout: if exact_name and self._find_matching_item(page, exact_name, False): return True if self._find_matching_item(page, target_name, is_fuzzy): return True if exact_name and self._page_contains_text(page, exact_name): return True time.sleep(poll_interval) return False def _wait_for_folder_entered( self, page, target_name: str, is_fuzzy: bool = False, timeout: float = 8, before_url: str = "", before_had_target: bool = False, poll_interval: float = 0.25, ) -> bool: start_time = time.time() while time.time() - start_time < timeout: context = self._get_navigation_context(page) breadcrumbs = context.get("breadcrumbs", []) current_url = context.get("url", "") item_titles = context.get("itemTitles", []) breadcrumb_match = next( (crumb for crumb in breadcrumbs if self._matches_name(crumb, target_name, is_fuzzy)), "", ) url_changed = bool(before_url and current_url and current_url != before_url) list_no_longer_shows_target = not any( self._matches_name(title, target_name, is_fuzzy) for title in item_titles ) if breadcrumb_match or url_changed or (before_had_target and list_no_longer_shows_target): if not self._wait_for_file_list_loaded(page, timeout=4, poll_interval=0.2): display_name = breadcrumb_match or target_name self._log(f"警告:已进入目录 {display_name},但列表就绪确认超时,继续后续流程。") return True time.sleep(poll_interval) context = self._get_navigation_context(page) breadcrumbs = " > ".join(context.get("breadcrumbs", [])) or "无" current_url = context.get("url", "") or "无" self._log( f"进入目录确认超时: 目标={target_name}, 当前面包屑={breadcrumbs}, 当前URL={current_url}" ) return False def _ensure_folder( self, page, target_name: str, create_name: Optional[str] = None, is_fuzzy: bool = False, ) -> bool: create_name = create_name or target_name if not self._wait_for_file_list_loaded(page, timeout=5, poll_interval=0.2): self._log("警告:当前目录列表未完全确认加载,但仍继续检查目录。") matched_name = self._find_matching_item(page, target_name, is_fuzzy) if matched_name: self._log(f"已找到目录: {matched_name},准备进入...") return self._wait_and_enter_folder( page, target_name, is_fuzzy=is_fuzzy, exact_name=matched_name, timeout=18, ) self._log(f"未找到目录,开始创建: {create_name}") if not self._create_folder(page, create_name): return False if not self._wait_and_enter_folder( page, target_name, is_fuzzy=is_fuzzy, exact_name=create_name, timeout=35, ): self._log(f"目录创建后仍未能进入: {create_name}") return False return True def _wait_and_enter_folder( self, page, target_name: str, is_fuzzy: bool = False, exact_name: Optional[str] = None, timeout: float = 30, poll_interval: float = 1, ) -> bool: start_time = time.time() attempt = 0 while time.time() - start_time < timeout: attempt += 1 if exact_name and self._find_matching_item(page, exact_name, False): self._log(f"确认目录已出现: {exact_name},尝试进入...") if self._scan_and_enter(page, exact_name, is_fuzzy=False): return True matched_name = self._find_matching_item(page, target_name, is_fuzzy) if matched_name: self._log(f"确认目录已出现: {matched_name},尝试进入...") if self._scan_and_enter(page, target_name, is_fuzzy=is_fuzzy): return True if attempt % 5 == 0: self._log(f"目录暂未稳定可进入,刷新当前目录后重试: {exact_name or target_name}") self._refresh_current_folder(page) time.sleep(poll_interval) context = self._get_navigation_context(page) item_titles = " | ".join(context.get("itemTitles", [])[:20]) or "无" self._log(f"目录确认进入失败: {exact_name or target_name},当前列表: {item_titles}") return False def _scan_and_enter(self, page, target_name: str, is_fuzzy: bool = False) -> bool: try: page.evaluate("window.scrollTo(0, 300)") before_context = self._get_navigation_context(page) before_url = before_context.get("url", "") before_item_titles = before_context.get("itemTitles", []) before_had_target = any( self._matches_name(title, target_name, is_fuzzy) for title in before_item_titles ) found = page.evaluate( """ (args) => { const { name, fuzzy } = args; const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); const visible = element => { if (!element) return false; const style = window.getComputedStyle(element); const rect = element.getBoundingClientRect(); return style.display !== 'none' && style.visibility !== 'hidden' && rect.width > 0 && rect.height > 0; }; const els = [ ...document.querySelectorAll( 'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name, [class*="name"], [class*="Name"]' ) ].filter(visible); const target = els.find(e => { const values = [ normalize(e.getAttribute('title')), normalize(e.getAttribute('aria-label')), normalize(e.innerText || e.textContent), ].filter(Boolean); return values.some(value => fuzzy ? value.includes(name) : value === name); }); if (target) { const row = target.closest('tr, li, [class*="item"], [class*="Item"], [class*="row"], [class*="Row"], [class*="file"], [class*="File"]') || target; row.scrollIntoView({block: 'center'}); row.click(); const ev = new MouseEvent('dblclick', { view: window, bubbles: true, cancelable: true }); row.dispatchEvent(ev); return true; } return false; } """, {"name": target_name, "fuzzy": is_fuzzy}, ) if not found: try: locator = page.get_by_text(target_name, exact=not is_fuzzy).first if locator.count() == 0: return False locator.click(force=True) locator.dblclick(force=True) found = True except Exception: return False time.sleep(0.2) return self._wait_for_folder_entered( page, target_name, is_fuzzy=is_fuzzy, before_url=before_url, before_had_target=before_had_target, poll_interval=0.2, ) except Exception: return False def _create_folder(self, page, folder_name: str) -> bool: try: page.keyboard.press("Escape") btn = page.get_by_text("新建文件夹").first if btn.count() == 0: self._log("未找到“新建文件夹”按钮") return False btn.click(force=True) time.sleep(0.8) page.keyboard.type(folder_name, delay=50) page.keyboard.press("Enter") if self._wait_for_folder_visible( page, folder_name, exact_name=folder_name, timeout=8, poll_interval=0.2, ): self._log(f"文件夹创建完成: {folder_name}") return True self._log(f"文件夹创建后未立即在列表中看到,刷新后继续确认: {folder_name}") self._refresh_current_folder(page) if self._wait_for_folder_visible( page, folder_name, exact_name=folder_name, timeout=8, poll_interval=0.2, ): self._log(f"文件夹创建完成: {folder_name}") return True self._log(f"已提交文件夹创建请求,交给后续流程继续确认: {folder_name}") return True except Exception as exc: self._log(f"创建文件夹失败: {folder_name},原因: {exc}") return False def _check_file_exists_robust(self, page, filename: str) -> bool: try: return bool( page.evaluate( """ (name) => { const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); const visible = element => { if (!element) return false; const style = window.getComputedStyle(element); const rect = element.getBoundingClientRect(); return style.display !== 'none' && style.visibility !== 'hidden' && rect.width > 0 && rect.height > 0; }; const els = [ ...document.querySelectorAll( 'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name' ), ].filter(visible); return els.some(e => { const title = normalize(e.getAttribute('title')); const label = normalize(e.getAttribute('aria-label')); const text = normalize(e.innerText || e.textContent); return title === name || label === name || text === name; }); } """, filename, ) ) except Exception: try: locator = page.get_by_text(filename, exact=True) count = locator.count() for index in range(min(count, 5)): if locator.nth(index).is_visible(): return True except Exception: pass return False def _select_upload_file(self, page, file_path: str): input_locator = page.locator("input[type=file]") if input_locator.count() == 0: if not self._click_text(page, ["上传", "上传文件"]): self._log("未找到“上传”按钮,继续等待上传控件。") page.wait_for_selector("input[type=file]", timeout=10000) page.locator("input[type=file]").first.set_input_files(file_path) def _wait_for_upload_started( self, page, filename: str, timeout: float = 20, ) -> bool: start_time = time.time() while time.time() - start_time < timeout: state = self._upload_state(page, filename) if state["uploading"] or state["item_active"] or state["file_exists"]: return True time.sleep(0.5) return False def _wait_for_upload_finished( self, page, filename: str, timeout: float = 1800, ) -> bool: start_time = time.time() stable_count = 0 last_active_log = 0.0 last_refresh = time.time() while time.time() - start_time < timeout: state = self._upload_state(page, filename) if state["has_error"]: self._log("检测到页面提示上传失败。") return False if state["file_exists"] and not state["item_active"] and not state["uploading"]: stable_count += 1 if stable_count >= 3: self._log("文件已出现在列表中,且上传状态连续稳定。") time.sleep(3) return self._check_file_exists_robust(page, filename) self._log(f"上传疑似完成,稳定性校验 ({stable_count}/3)...") else: stable_count = 0 now = time.time() if (state["uploading"] or state["item_active"]) and now - last_active_log >= 15: self._log("监测到活跃传输流,继续等待...") last_active_log = now if time.time() - last_refresh > 25: self._refresh_current_folder(page) last_refresh = time.time() time.sleep(3) return False def _upload_state(self, page, filename: str) -> dict: try: state = page.evaluate( """ (filename) => { const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); const visible = element => { if (!element) return false; const style = window.getComputedStyle(element); const rect = element.getBoundingClientRect(); return style.display !== 'none' && style.visibility !== 'hidden' && rect.width > 0 && rect.height > 0; }; const bodyText = normalize(document.body ? document.body.innerText : ''); const nodes = [ ...document.querySelectorAll( 'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name, [class*="file"], [class*="File"]' ) ].filter(visible); const matchingNode = nodes.find(node => { const title = normalize(node.getAttribute('title')); const label = normalize(node.getAttribute('aria-label')); const text = normalize(node.innerText || node.textContent); return title === filename || label === filename || text === filename; }); const container = matchingNode ? matchingNode.closest('tr, li, [class*="item"], [class*="Item"], [class*="row"], [class*="Row"], [class*="file"], [class*="File"]') : null; const containerText = normalize(container ? container.innerText : ''); const itemActiveWords = ['正在上传', '上传中', '等待上传', '传输队列', '传输中', '0B/s', '校验中', '处理中']; const pageActiveWords = ['正在上传', '上传中', '等待上传', '传输队列', '传输中', '校验中', '处理中']; const errorWords = ['上传失败', '传输失败', '网络异常', '上传出错', '文件上传失败']; const progressPattern = /(?:^|\\D)(?:100|\\d{1,2})(?:\\.\\d+)?%/; const itemHasProgress = container ? [...container.querySelectorAll('[class*="progress"], [class*="Progress"]')].some(visible) : false; const itemActive = itemActiveWords.some(word => containerText.includes(word)) || progressPattern.test(containerText) || itemHasProgress; const uploading = itemActive || (bodyText.includes(filename) && (pageActiveWords.some(word => bodyText.includes(word)) || progressPattern.test(bodyText))); const hasError = errorWords.some(word => bodyText.includes(word)); return { file_exists: Boolean(matchingNode), item_active: itemActive, uploading, has_error: hasError, }; } """, filename, ) return { "file_exists": bool(state.get("file_exists")), "item_active": bool(state.get("item_active")), "uploading": bool(state.get("uploading")), "has_error": bool(state.get("has_error")), } except Exception: return { "file_exists": self._check_file_exists_robust(page, filename), "item_active": False, "uploading": False, "has_error": False, } def _refresh_current_folder(self, page): try: page.locator(".wp-s-file-list-drag-copy__header-breadcrumb-item").last.click() time.sleep(2) except Exception: try: page.keyboard.press("F5") page.wait_for_load_state("domcontentloaded", timeout=10000) except Exception: pass def _click_text(self, page, texts: list[str]) -> bool: for text in texts: for exact in (True, False): try: locator = page.get_by_text(text, exact=exact) count = locator.count() for index in range(min(count, 5)): item = locator.nth(index) if item.is_visible(): item.click(force=True, timeout=5000) return True except PlaywrightTimeoutError: continue except Exception: continue return False def _clear_old_notifications(self, page): try: page.evaluate( """ () => { document .querySelectorAll('.ant-message-notice, .u-toast, [class*="toast"], [class*="Toast"]') .forEach(element => element.remove()); } """ ) except Exception: pass def _body_text(self, page) -> str: return page.evaluate("() => document.body ? document.body.innerText : ''") def _wait_for_file_in_list(self, page, filename: str) -> bool: return self._wait_for_upload_finished(page, filename)