from __future__ import annotations import re from pathlib import Path from typing import Optional from urllib.parse import urlparse import requests from config_loader import get_config class WeChatImageUploader: """微信图片上传器 - 将外部图片上传到微信服务器获取永久链接""" def __init__(self, access_token: str): self.access_token = access_token self.upload_url = "https://api.weixin.qq.com/cgi-bin/media/uploadimg" def upload_image(self, image_url: str) -> Optional[str]: """ 下载图片并上传到微信服务器 Args: image_url: 图片的原始 URL(七牛云等) Returns: 微信图片 URL,上传失败返回 None """ try: # 1. 下载图片 print(f"[ImageUploader] 正在下载图片: {image_url}") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(image_url, headers=headers, timeout=30) response.raise_for_status() # 2. 获取文件扩展名和 MIME 类型 content_type = response.headers.get('Content-Type', '') if 'image/' in content_type: ext = content_type.split('/')[-1].split(';')[0].strip() if ext == 'jpeg': ext = 'jpg' else: # 从 URL 推断 parsed = urlparse(image_url) path = parsed.path.lower() if path.endswith('.jpg') or path.endswith('.jpeg'): ext = 'jpg' elif path.endswith('.png'): ext = 'png' elif path.endswith('.gif'): ext = 'gif' else: ext = 'jpg' # 3. 上传到微信 print(f"[ImageUploader] 正在上传到微信服务器...") files = { 'media': (f'image.{ext}', response.content, f'image/{ext}') } params = { 'access_token': self.access_token } upload_response = requests.post( self.upload_url, params=params, files=files, timeout=30 ) upload_response.raise_for_status() result = upload_response.json() print(f"[ImageUploader] 微信返回: {result}") # 检查错误 if 'errcode' in result and result['errcode'] != 0: print(f"[ImageUploader] 上传失败: {result}") return None # 获取微信图片 URL wechat_url = result.get('url') if wechat_url: print(f"[ImageUploader] 上传成功: {wechat_url}") return wechat_url else: print(f"[ImageUploader] 上传失败,未返回 URL: {result}") return None except requests.exceptions.RequestException as e: print(f"[ImageUploader] 网络请求出错: {e}") return None except Exception as e: print(f"[ImageUploader] 上传出错: {e}") import traceback traceback.print_exc() return None def process_html_images(html_content: str, access_token: str, external_domain: Optional[str] = None) -> tuple[str, Optional[str]]: """ 处理 HTML 中的图片,将外部图片上传到微信并替换 URL Args: html_content: HTML 内容 access_token: 微信 access_token external_domain: 外部图片域名,用于识别需要上传的图片(如 yidaima.cn) Returns: (处理后的 HTML 内容, 第二张图片的微信 URL 作为封面图) """ uploader = WeChatImageUploader(access_token) # 匹配所有 img 标签的 src 属性 img_pattern = r']+src="([^"]+)"' # 收集所有图片 URL(包括微信图片) all_img_urls = [] for match in re.finditer(img_pattern, html_content): url = match.group(1) all_img_urls.append(url) print(f"[ImageUploader] HTML 中共有 {len(all_img_urls)} 张图片") # 获取第二张图片作为封面(如果存在) cover_image_url: Optional[str] = None if len(all_img_urls) >= 2: second_img = all_img_urls[1] print(f"[ImageUploader] 第二张图片: {second_img}") # 如果第二张是外部图片,需要上传 if 'mmbiz.qpic.cn' not in second_img: print(f"[ImageUploader] 上传第二张图片作为封面...") cover_image_url = uploader.upload_image(second_img) if cover_image_url: print(f"[ImageUploader] 封面上传成功: {cover_image_url}") else: print(f"[ImageUploader] 封面上传失败") else: # 已经是微信图片 cover_image_url = second_img print(f"[ImageUploader] 第二张已是微信图片,直接用作封面") else: print(f"[ImageUploader] 图片数量不足2张,无法获取封面") # 收集所有需要处理的外部图片 URL urls_to_process = [] for url in all_img_urls: # 跳过已经是微信图片的 if 'mmbiz.qpic.cn' in url: continue # 检查是否是外部图片 is_external = False if external_domain and external_domain in url: is_external = True elif url.startswith(('http://', 'https://')) and not url.startswith(('http://localhost', 'https://localhost')): # 所有外部 HTTP/HTTPS 图片(除了本地) is_external = True if is_external: urls_to_process.append(url) if not urls_to_process: print("[ImageUploader] 没有需要上传的外部图片") return html_content, cover_image_url print(f"[ImageUploader] 发现 {len(urls_to_process)} 张外部图片需要上传") # 上传图片并替换 URL url_mapping = {} for url in urls_to_process: print(f"[ImageUploader] 处理图片: {url}") wechat_url = uploader.upload_image(url) if wechat_url: url_mapping[url] = wechat_url else: print(f"[ImageUploader] 上传失败,保留原 URL: {url}") # 替换 HTML 中的 URL for old_url, new_url in url_mapping.items(): # 同时替换 src 和 data-src html_content = html_content.replace(f'src="{old_url}"', f'src="{new_url}"') html_content = html_content.replace(f'data-src="{old_url}"', f'data-src="{new_url}"') print(f"[ImageUploader] 已替换: {old_url} -> {new_url}") # 如果第二张图片被替换了,更新封面 URL if len(all_img_urls) >= 2 and all_img_urls[1] in url_mapping: cover_image_url = url_mapping[all_img_urls[1]] return html_content, cover_image_url def get_wechat_access_token(appid: str, appsecret: str) -> Optional[str]: """ 获取微信 access_token Args: appid: 微信公众号 AppID appsecret: 微信公众号 AppSecret Returns: access_token,失败返回 None """ try: url = "https://api.weixin.qq.com/cgi-bin/token" params = { "grant_type": "client_credential", "appid": appid, "secret": appsecret } response = requests.get(url, params=params, timeout=30) response.raise_for_status() data = response.json() if "access_token" in data: return data["access_token"] else: print(f"[ImageUploader] 获取 access_token 失败: {data}") return None except Exception as e: print(f"[ImageUploader] 获取 access_token 出错: {e}") return None def main(): """测试图片上传功能""" config = get_config() # 获取 access_token access_token = get_wechat_access_token(config.wechat_appid, config.wechat_appsecret) if not access_token: print("[ImageUploader] 无法获取 access_token,请检查配置") return # 测试 HTML test_html = """
测试 微信图片 本地图片
""" result = process_html_images(test_html, access_token, external_domain="yidaima.cn") print("\n处理后的 HTML:") print(result) if __name__ == "__main__": main()