Files
yidaima_tools/baidu_uploader.py
2026-05-12 17:49:51 +08:00

825 lines
33 KiB
Python

from __future__ import annotations
import os
import re
import time
from typing import Callable, Optional
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import sync_playwright
class BaiduUploader:
"""
百度网盘上传工具 - 纯净上传版 (移除分享逻辑)
"""
def __init__(
self,
chrome_path: str,
cookies_dir: str,
log_callback: Optional[Callable[[str], None]] = None,
):
self.chrome_path = chrome_path
self.cookies_dir = cookies_dir
self.log_callback = log_callback
self.url = "https://pan.baidu.com/"
if not os.path.exists(self.cookies_dir):
os.makedirs(self.cookies_dir, exist_ok=True)
def _log(self, message: str):
print(message)
if self.log_callback:
self.log_callback(message)
def upload_file(
self,
file_path: str,
target_folder_name: str,
root_path: str = "精品项目整理",
) -> bool:
"""
上传文件到百度网盘
"""
if not os.path.exists(file_path):
self._log(f"错误: 本地文件不存在 {file_path}")
return False
filename = os.path.basename(file_path)
file_size_mb = os.path.getsize(file_path) / 1024 / 1024
context = None
# 提取项目编号
project_id = ""
match = re.search(r"(【[A-Za-z0-9]+】)", target_folder_name)
if match:
project_id = match.group(1)
try:
with sync_playwright() as p:
self._log("正在启动浏览器...")
launch_args = {
"user_data_dir": self.cookies_dir,
"headless": False,
"executable_path": self.chrome_path
if self.chrome_path and os.path.exists(self.chrome_path)
else None,
"viewport": {"width": 1280, "height": 800},
}
if not launch_args["executable_path"]:
launch_args.pop("executable_path")
context = p.chromium.launch_persistent_context(**launch_args)
context.add_init_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
)
page = context.new_page()
page.set_default_timeout(60000)
# 1. 登录
self._log("检测登录状态...")
page.goto(self.url, wait_until="domcontentloaded", timeout=60000)
if not self._wait_for_login(page, timeout=20):
self._log("请手动扫码登录,登录后会自动继续...")
if not self._wait_for_login(page, timeout=180):
raise RuntimeError("等待百度网盘登录超时")
# 2. 导航
self._log("登录成功,进入目标路径...")
self._open_all_files(page)
self._log("等待文件列表加载...")
if not self._wait_for_file_list_loaded(page):
self._log("警告:文件列表加载超时,继续尝试目录检查...")
self._log(f"准备进入根目录: {root_path}")
if not self._ensure_folder(page, root_path):
raise RuntimeError(f"无法进入或创建根目录: {root_path}")
search_key = project_id if project_id else target_folder_name
self._log(f"准备进入项目目录: {target_folder_name}")
if not self._ensure_folder(
page,
search_key,
create_name=target_folder_name,
is_fuzzy=bool(project_id),
):
raise RuntimeError(f"无法进入或创建项目目录: {target_folder_name}")
# 3. 查重
if self._check_file_exists_robust(page, filename):
self._log(f"云端已存在文件 '{filename}',跳过上传。")
time.sleep(2)
return True
# 4. 执行上传
self._log(f"开始传输: {filename} ({file_size_mb:.2f} MB)")
self._clear_old_notifications(page)
self._select_upload_file(page, file_path)
if not self._wait_for_upload_started(page, filename):
self._log("警告:未检测到明显的上传任务,继续监测文件列表。")
else:
self._log("检测到上传任务已建立。")
# 5. 监测列表确认成功
self._log("上传已启动,监测云端状态中...")
success = self._wait_for_upload_finished(page, filename)
if success:
self._log("\n" + "=" * 30)
self._log("百度网盘上传圆满成功!")
self._log("=" * 30 + "\n")
time.sleep(3)
else:
self._log("警告:监测超时,未能在列表中看到文件,请手动核实。")
return success
except Exception as e:
self._log(f"程序异常: {str(e)}")
return False
finally:
if context:
try:
context.close()
except Exception:
pass
def _wait_for_login(self, page, timeout: float = 20) -> bool:
start_time = time.time()
while time.time() - start_time < timeout:
try:
body_text = self._body_text(page)
if "全部文件" in body_text or "我的文件" in body_text:
return True
except Exception:
pass
time.sleep(1)
return False
def _open_all_files(self, page):
if self._click_text(page, ["全部文件", "我的文件"]):
time.sleep(1)
return
self._log("未找到“全部文件”入口,尝试直接在当前页面继续。")
def _wait_for_file_list_loaded(
self,
page,
timeout: float = 8,
poll_interval: float = 0.3,
) -> bool:
"""
等待目录页真正进入可操作状态。
百度网盘目录可能为空,所以不能只靠“列表里有项目”来判断。
"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
state = page.evaluate(
"""
() => {
const bodyText = document.body ? document.body.innerText : '';
const itemCount = document.querySelectorAll(
'span[title], a[title], .wp-s-file-list-drag-copy__item-title-text, .file-name'
).length;
const breadcrumbCount = document.querySelectorAll(
'.wp-s-file-list-drag-copy__header-breadcrumb-item'
).length;
const hasCreateFolderButton = bodyText.includes('新建文件夹');
const hasAllFilesText = bodyText.includes('全部文件');
const hasEmptyState = ['暂无文件', '空文件夹', '拖拽文件到此处上传', '上传文件到百度网盘']
.some(keyword => bodyText.includes(keyword));
const hasLoadingText = ['加载中', '正在加载', '请稍候']
.some(keyword => bodyText.includes(keyword));
const hasLoadingMask = !!document.querySelector(
'[class*="loading"], [class*="Loading"], [class*="skeleton"], [class*="Skeleton"], [class*="spin"], [class*="Spin"]'
);
return {
itemCount,
breadcrumbCount,
hasCreateFolderButton,
hasAllFilesText,
hasEmptyState,
hasLoadingText,
hasLoadingMask,
};
}
"""
)
ready = (
(state["breadcrumbCount"] > 0 or state["hasAllFilesText"])
and state["hasCreateFolderButton"]
and (
state["itemCount"] > 0
or state["hasEmptyState"]
or (not state["hasLoadingText"] and not state["hasLoadingMask"])
)
)
if ready:
return True
except Exception:
pass
time.sleep(poll_interval)
return False
def _find_matching_item(
self,
page,
target_name: str,
is_fuzzy: bool = False,
) -> Optional[str]:
try:
return page.evaluate(
"""
(args) => {
const { name, fuzzy } = args;
const normalize = value => (value || '').replace(/\\s+/g, ' ').trim();
const visible = element => {
if (!element) return false;
const style = window.getComputedStyle(element);
const rect = element.getBoundingClientRect();
return style.display !== 'none'
&& style.visibility !== 'hidden'
&& rect.width > 0
&& rect.height > 0;
};
const els = [
...document.querySelectorAll(
'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name, [class*="name"], [class*="Name"], [class*="file"], [class*="File"]'
),
].filter(visible);
const target = els.find(e => {
const values = [
normalize(e.getAttribute('title')),
normalize(e.getAttribute('aria-label')),
normalize(e.innerText || e.textContent),
].filter(Boolean);
if (!values.length) {
return false;
}
return values.some(value => fuzzy ? value.includes(name) : value === name);
});
if (!target) {
return null;
}
return normalize(
target.getAttribute('title')
|| target.getAttribute('aria-label')
|| target.innerText
|| target.textContent
);
}
""",
{"name": target_name, "fuzzy": is_fuzzy},
)
except Exception:
return None
def _matches_name(self, source: str, target_name: str, is_fuzzy: bool = False) -> bool:
source = (source or "").strip()
if not source:
return False
return target_name in source if is_fuzzy else source == target_name
def _page_contains_text(self, page, text: str) -> bool:
try:
return bool(
page.evaluate(
"""
(target) => {
const normalize = value => (value || '').replace(/\\s+/g, ' ').trim();
const bodyText = normalize(document.body ? document.body.innerText : '');
return !!target && bodyText.includes(normalize(target));
}
""",
text,
)
)
except Exception:
return False
def _get_navigation_context(self, page) -> dict:
try:
return page.evaluate(
"""
() => {
const normalize = text => (text || '').replace(/\\s+/g, ' ').trim();
const breadcrumbs = [
...document.querySelectorAll(
'.wp-s-file-list-drag-copy__header-breadcrumb-item, [class*="breadcrumb"] a, [class*="breadcrumb"] span, [class*="Breadcrumb"] a, [class*="Breadcrumb"] span'
),
]
.map(node => normalize(node.getAttribute('title') || node.innerText || node.textContent))
.filter(Boolean);
const itemTitles = [
...document.querySelectorAll(
'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name, [class*="name"], [class*="Name"]'
),
]
.map(node => normalize(
node.getAttribute('title')
|| node.getAttribute('aria-label')
|| node.innerText
|| node.textContent
))
.filter(Boolean);
return {
url: window.location.href,
breadcrumbs,
itemTitles,
};
}
"""
)
except Exception:
return {
"url": "",
"breadcrumbs": [],
"itemTitles": [],
}
def _wait_for_folder_visible(
self,
page,
target_name: str,
is_fuzzy: bool = False,
exact_name: Optional[str] = None,
timeout: float = 8,
poll_interval: float = 0.3,
) -> bool:
start_time = time.time()
while time.time() - start_time < timeout:
if exact_name and self._find_matching_item(page, exact_name, False):
return True
if self._find_matching_item(page, target_name, is_fuzzy):
return True
if exact_name and self._page_contains_text(page, exact_name):
return True
time.sleep(poll_interval)
return False
def _wait_for_folder_entered(
self,
page,
target_name: str,
is_fuzzy: bool = False,
timeout: float = 8,
before_url: str = "",
before_had_target: bool = False,
poll_interval: float = 0.25,
) -> bool:
start_time = time.time()
while time.time() - start_time < timeout:
context = self._get_navigation_context(page)
breadcrumbs = context.get("breadcrumbs", [])
current_url = context.get("url", "")
item_titles = context.get("itemTitles", [])
breadcrumb_match = next(
(crumb for crumb in breadcrumbs if self._matches_name(crumb, target_name, is_fuzzy)),
"",
)
url_changed = bool(before_url and current_url and current_url != before_url)
list_no_longer_shows_target = not any(
self._matches_name(title, target_name, is_fuzzy) for title in item_titles
)
if breadcrumb_match or url_changed or (before_had_target and list_no_longer_shows_target):
if not self._wait_for_file_list_loaded(page, timeout=4, poll_interval=0.2):
display_name = breadcrumb_match or target_name
self._log(f"警告:已进入目录 {display_name},但列表就绪确认超时,继续后续流程。")
return True
time.sleep(poll_interval)
context = self._get_navigation_context(page)
breadcrumbs = " > ".join(context.get("breadcrumbs", [])) or ""
current_url = context.get("url", "") or ""
self._log(
f"进入目录确认超时: 目标={target_name}, 当前面包屑={breadcrumbs}, 当前URL={current_url}"
)
return False
def _ensure_folder(
self,
page,
target_name: str,
create_name: Optional[str] = None,
is_fuzzy: bool = False,
) -> bool:
create_name = create_name or target_name
if not self._wait_for_file_list_loaded(page, timeout=5, poll_interval=0.2):
self._log("警告:当前目录列表未完全确认加载,但仍继续检查目录。")
matched_name = self._find_matching_item(page, target_name, is_fuzzy)
if matched_name:
self._log(f"已找到目录: {matched_name},准备进入...")
return self._wait_and_enter_folder(
page,
target_name,
is_fuzzy=is_fuzzy,
exact_name=matched_name,
timeout=18,
)
self._log(f"未找到目录,开始创建: {create_name}")
if not self._create_folder(page, create_name):
return False
if not self._wait_and_enter_folder(
page,
target_name,
is_fuzzy=is_fuzzy,
exact_name=create_name,
timeout=35,
):
self._log(f"目录创建后仍未能进入: {create_name}")
return False
return True
def _wait_and_enter_folder(
self,
page,
target_name: str,
is_fuzzy: bool = False,
exact_name: Optional[str] = None,
timeout: float = 30,
poll_interval: float = 1,
) -> bool:
start_time = time.time()
attempt = 0
while time.time() - start_time < timeout:
attempt += 1
if exact_name and self._find_matching_item(page, exact_name, False):
self._log(f"确认目录已出现: {exact_name},尝试进入...")
if self._scan_and_enter(page, exact_name, is_fuzzy=False):
return True
matched_name = self._find_matching_item(page, target_name, is_fuzzy)
if matched_name:
self._log(f"确认目录已出现: {matched_name},尝试进入...")
if self._scan_and_enter(page, target_name, is_fuzzy=is_fuzzy):
return True
if attempt % 5 == 0:
self._log(f"目录暂未稳定可进入,刷新当前目录后重试: {exact_name or target_name}")
self._refresh_current_folder(page)
time.sleep(poll_interval)
context = self._get_navigation_context(page)
item_titles = " | ".join(context.get("itemTitles", [])[:20]) or ""
self._log(f"目录确认进入失败: {exact_name or target_name},当前列表: {item_titles}")
return False
def _scan_and_enter(self, page, target_name: str, is_fuzzy: bool = False) -> bool:
try:
page.evaluate("window.scrollTo(0, 300)")
before_context = self._get_navigation_context(page)
before_url = before_context.get("url", "")
before_item_titles = before_context.get("itemTitles", [])
before_had_target = any(
self._matches_name(title, target_name, is_fuzzy) for title in before_item_titles
)
found = page.evaluate(
"""
(args) => {
const { name, fuzzy } = args;
const normalize = value => (value || '').replace(/\\s+/g, ' ').trim();
const visible = element => {
if (!element) return false;
const style = window.getComputedStyle(element);
const rect = element.getBoundingClientRect();
return style.display !== 'none'
&& style.visibility !== 'hidden'
&& rect.width > 0
&& rect.height > 0;
};
const els = [
...document.querySelectorAll(
'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name, [class*="name"], [class*="Name"]'
)
].filter(visible);
const target = els.find(e => {
const values = [
normalize(e.getAttribute('title')),
normalize(e.getAttribute('aria-label')),
normalize(e.innerText || e.textContent),
].filter(Boolean);
return values.some(value => fuzzy ? value.includes(name) : value === name);
});
if (target) {
const row = target.closest('tr, li, [class*="item"], [class*="Item"], [class*="row"], [class*="Row"], [class*="file"], [class*="File"]') || target;
row.scrollIntoView({block: 'center'});
row.click();
const ev = new MouseEvent('dblclick', { view: window, bubbles: true, cancelable: true });
row.dispatchEvent(ev);
return true;
}
return false;
}
""",
{"name": target_name, "fuzzy": is_fuzzy},
)
if not found:
try:
locator = page.get_by_text(target_name, exact=not is_fuzzy).first
if locator.count() == 0:
return False
locator.click(force=True)
locator.dblclick(force=True)
found = True
except Exception:
return False
time.sleep(0.2)
return self._wait_for_folder_entered(
page,
target_name,
is_fuzzy=is_fuzzy,
before_url=before_url,
before_had_target=before_had_target,
poll_interval=0.2,
)
except Exception:
return False
def _create_folder(self, page, folder_name: str) -> bool:
try:
page.keyboard.press("Escape")
btn = page.get_by_text("新建文件夹").first
if btn.count() == 0:
self._log("未找到“新建文件夹”按钮")
return False
btn.click(force=True)
time.sleep(0.8)
page.keyboard.type(folder_name, delay=50)
page.keyboard.press("Enter")
if self._wait_for_folder_visible(
page,
folder_name,
exact_name=folder_name,
timeout=8,
poll_interval=0.2,
):
self._log(f"文件夹创建完成: {folder_name}")
return True
self._log(f"文件夹创建后未立即在列表中看到,刷新后继续确认: {folder_name}")
self._refresh_current_folder(page)
if self._wait_for_folder_visible(
page,
folder_name,
exact_name=folder_name,
timeout=8,
poll_interval=0.2,
):
self._log(f"文件夹创建完成: {folder_name}")
return True
self._log(f"已提交文件夹创建请求,交给后续流程继续确认: {folder_name}")
return True
except Exception as exc:
self._log(f"创建文件夹失败: {folder_name},原因: {exc}")
return False
def _check_file_exists_robust(self, page, filename: str) -> bool:
try:
return bool(
page.evaluate(
"""
(name) => {
const normalize = value => (value || '').replace(/\\s+/g, ' ').trim();
const visible = element => {
if (!element) return false;
const style = window.getComputedStyle(element);
const rect = element.getBoundingClientRect();
return style.display !== 'none'
&& style.visibility !== 'hidden'
&& rect.width > 0
&& rect.height > 0;
};
const els = [
...document.querySelectorAll(
'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name'
),
].filter(visible);
return els.some(e => {
const title = normalize(e.getAttribute('title'));
const label = normalize(e.getAttribute('aria-label'));
const text = normalize(e.innerText || e.textContent);
return title === name || label === name || text === name;
});
}
""",
filename,
)
)
except Exception:
try:
locator = page.get_by_text(filename, exact=True)
count = locator.count()
for index in range(min(count, 5)):
if locator.nth(index).is_visible():
return True
except Exception:
pass
return False
def _select_upload_file(self, page, file_path: str):
input_locator = page.locator("input[type=file]")
if input_locator.count() == 0:
if not self._click_text(page, ["上传", "上传文件"]):
self._log("未找到“上传”按钮,继续等待上传控件。")
page.wait_for_selector("input[type=file]", timeout=10000)
page.locator("input[type=file]").first.set_input_files(file_path)
def _wait_for_upload_started(
self,
page,
filename: str,
timeout: float = 20,
) -> bool:
start_time = time.time()
while time.time() - start_time < timeout:
state = self._upload_state(page, filename)
if state["uploading"] or state["item_active"] or state["file_exists"]:
return True
time.sleep(0.5)
return False
def _wait_for_upload_finished(
self,
page,
filename: str,
timeout: float = 1800,
) -> bool:
start_time = time.time()
stable_count = 0
last_active_log = 0.0
last_refresh = time.time()
while time.time() - start_time < timeout:
state = self._upload_state(page, filename)
if state["has_error"]:
self._log("检测到页面提示上传失败。")
return False
if state["file_exists"] and not state["item_active"] and not state["uploading"]:
stable_count += 1
if stable_count >= 3:
self._log("文件已出现在列表中,且上传状态连续稳定。")
time.sleep(3)
return self._check_file_exists_robust(page, filename)
self._log(f"上传疑似完成,稳定性校验 ({stable_count}/3)...")
else:
stable_count = 0
now = time.time()
if (state["uploading"] or state["item_active"]) and now - last_active_log >= 15:
self._log("监测到活跃传输流,继续等待...")
last_active_log = now
if time.time() - last_refresh > 25:
self._refresh_current_folder(page)
last_refresh = time.time()
time.sleep(3)
return False
def _upload_state(self, page, filename: str) -> dict:
try:
state = page.evaluate(
"""
(filename) => {
const normalize = value => (value || '').replace(/\\s+/g, ' ').trim();
const visible = element => {
if (!element) return false;
const style = window.getComputedStyle(element);
const rect = element.getBoundingClientRect();
return style.display !== 'none'
&& style.visibility !== 'hidden'
&& rect.width > 0
&& rect.height > 0;
};
const bodyText = normalize(document.body ? document.body.innerText : '');
const nodes = [
...document.querySelectorAll(
'span[title], a[title], [aria-label], .wp-s-file-list-drag-copy__item-title-text, .file-name, [class*="file"], [class*="File"]'
)
].filter(visible);
const matchingNode = nodes.find(node => {
const title = normalize(node.getAttribute('title'));
const label = normalize(node.getAttribute('aria-label'));
const text = normalize(node.innerText || node.textContent);
return title === filename || label === filename || text === filename;
});
const container = matchingNode
? matchingNode.closest('tr, li, [class*="item"], [class*="Item"], [class*="row"], [class*="Row"], [class*="file"], [class*="File"]')
: null;
const containerText = normalize(container ? container.innerText : '');
const itemActiveWords = ['正在上传', '上传中', '等待上传', '传输队列', '传输中', '0B/s', '校验中', '处理中'];
const pageActiveWords = ['正在上传', '上传中', '等待上传', '传输队列', '传输中', '校验中', '处理中'];
const errorWords = ['上传失败', '传输失败', '网络异常', '上传出错', '文件上传失败'];
const progressPattern = /(?:^|\\D)(?:100|\\d{1,2})(?:\\.\\d+)?%/;
const itemHasProgress = container
? [...container.querySelectorAll('[class*="progress"], [class*="Progress"]')].some(visible)
: false;
const itemActive = itemActiveWords.some(word => containerText.includes(word))
|| progressPattern.test(containerText)
|| itemHasProgress;
const uploading = itemActive
|| (bodyText.includes(filename)
&& (pageActiveWords.some(word => bodyText.includes(word))
|| progressPattern.test(bodyText)));
const hasError = errorWords.some(word => bodyText.includes(word));
return {
file_exists: Boolean(matchingNode),
item_active: itemActive,
uploading,
has_error: hasError,
};
}
""",
filename,
)
return {
"file_exists": bool(state.get("file_exists")),
"item_active": bool(state.get("item_active")),
"uploading": bool(state.get("uploading")),
"has_error": bool(state.get("has_error")),
}
except Exception:
return {
"file_exists": self._check_file_exists_robust(page, filename),
"item_active": False,
"uploading": False,
"has_error": False,
}
def _refresh_current_folder(self, page):
try:
page.locator(".wp-s-file-list-drag-copy__header-breadcrumb-item").last.click()
time.sleep(2)
except Exception:
try:
page.keyboard.press("F5")
page.wait_for_load_state("domcontentloaded", timeout=10000)
except Exception:
pass
def _click_text(self, page, texts: list[str]) -> bool:
for text in texts:
for exact in (True, False):
try:
locator = page.get_by_text(text, exact=exact)
count = locator.count()
for index in range(min(count, 5)):
item = locator.nth(index)
if item.is_visible():
item.click(force=True, timeout=5000)
return True
except PlaywrightTimeoutError:
continue
except Exception:
continue
return False
def _clear_old_notifications(self, page):
try:
page.evaluate(
"""
() => {
document
.querySelectorAll('.ant-message-notice, .u-toast, [class*="toast"], [class*="Toast"]')
.forEach(element => element.remove());
}
"""
)
except Exception:
pass
def _body_text(self, page) -> str:
return page.evaluate("() => document.body ? document.body.innerText : ''")
def _wait_for_file_in_list(self, page, filename: str) -> bool:
return self._wait_for_upload_finished(page, filename)