Files
yidaima_tools/image_uploader.py
王鹏 a2f5875d1b init
2026-04-09 14:55:54 +08:00

257 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import re
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
import requests
from config_loader import get_config
class WeChatImageUploader:
"""微信图片上传器 - 将外部图片上传到微信服务器获取永久链接"""
def __init__(self, access_token: str):
self.access_token = access_token
self.upload_url = "https://api.weixin.qq.com/cgi-bin/media/uploadimg"
def upload_image(self, image_url: str) -> Optional[str]:
"""
下载图片并上传到微信服务器
Args:
image_url: 图片的原始 URL七牛云等
Returns:
微信图片 URL上传失败返回 None
"""
try:
# 1. 下载图片
print(f"[ImageUploader] 正在下载图片: {image_url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(image_url, headers=headers, timeout=30)
response.raise_for_status()
# 2. 获取文件扩展名和 MIME 类型
content_type = response.headers.get('Content-Type', '')
if 'image/' in content_type:
ext = content_type.split('/')[-1].split(';')[0].strip()
if ext == 'jpeg':
ext = 'jpg'
else:
# 从 URL 推断
parsed = urlparse(image_url)
path = parsed.path.lower()
if path.endswith('.jpg') or path.endswith('.jpeg'):
ext = 'jpg'
elif path.endswith('.png'):
ext = 'png'
elif path.endswith('.gif'):
ext = 'gif'
else:
ext = 'jpg'
# 3. 上传到微信
print(f"[ImageUploader] 正在上传到微信服务器...")
files = {
'media': (f'image.{ext}', response.content, f'image/{ext}')
}
params = {
'access_token': self.access_token
}
upload_response = requests.post(
self.upload_url,
params=params,
files=files,
timeout=30
)
upload_response.raise_for_status()
result = upload_response.json()
print(f"[ImageUploader] 微信返回: {result}")
# 检查错误
if 'errcode' in result and result['errcode'] != 0:
print(f"[ImageUploader] 上传失败: {result}")
return None
# 获取微信图片 URL
wechat_url = result.get('url')
if wechat_url:
print(f"[ImageUploader] 上传成功: {wechat_url}")
return wechat_url
else:
print(f"[ImageUploader] 上传失败,未返回 URL: {result}")
return None
except requests.exceptions.RequestException as e:
print(f"[ImageUploader] 网络请求出错: {e}")
return None
except Exception as e:
print(f"[ImageUploader] 上传出错: {e}")
import traceback
traceback.print_exc()
return None
def process_html_images(html_content: str, access_token: str, external_domain: Optional[str] = None) -> tuple[str, Optional[str]]:
"""
处理 HTML 中的图片,将外部图片上传到微信并替换 URL
Args:
html_content: HTML 内容
access_token: 微信 access_token
external_domain: 外部图片域名,用于识别需要上传的图片(如 yidaima.cn
Returns:
(处理后的 HTML 内容, 第二张图片的微信 URL 作为封面图)
"""
uploader = WeChatImageUploader(access_token)
# 匹配所有 img 标签的 src 属性
img_pattern = r'<img[^>]+src="([^"]+)"'
# 收集所有图片 URL包括微信图片
all_img_urls = []
for match in re.finditer(img_pattern, html_content):
url = match.group(1)
all_img_urls.append(url)
print(f"[ImageUploader] HTML 中共有 {len(all_img_urls)} 张图片")
# 获取第二张图片作为封面(如果存在)
cover_image_url: Optional[str] = None
if len(all_img_urls) >= 2:
second_img = all_img_urls[1]
print(f"[ImageUploader] 第二张图片: {second_img}")
# 如果第二张是外部图片,需要上传
if 'mmbiz.qpic.cn' not in second_img:
print(f"[ImageUploader] 上传第二张图片作为封面...")
cover_image_url = uploader.upload_image(second_img)
if cover_image_url:
print(f"[ImageUploader] 封面上传成功: {cover_image_url}")
else:
print(f"[ImageUploader] 封面上传失败")
else:
# 已经是微信图片
cover_image_url = second_img
print(f"[ImageUploader] 第二张已是微信图片,直接用作封面")
else:
print(f"[ImageUploader] 图片数量不足2张无法获取封面")
# 收集所有需要处理的外部图片 URL
urls_to_process = []
for url in all_img_urls:
# 跳过已经是微信图片的
if 'mmbiz.qpic.cn' in url:
continue
# 检查是否是外部图片
is_external = False
if external_domain and external_domain in url:
is_external = True
elif url.startswith(('http://', 'https://')) and not url.startswith(('http://localhost', 'https://localhost')):
# 所有外部 HTTP/HTTPS 图片(除了本地)
is_external = True
if is_external:
urls_to_process.append(url)
if not urls_to_process:
print("[ImageUploader] 没有需要上传的外部图片")
return html_content, cover_image_url
print(f"[ImageUploader] 发现 {len(urls_to_process)} 张外部图片需要上传")
# 上传图片并替换 URL
url_mapping = {}
for url in urls_to_process:
print(f"[ImageUploader] 处理图片: {url}")
wechat_url = uploader.upload_image(url)
if wechat_url:
url_mapping[url] = wechat_url
else:
print(f"[ImageUploader] 上传失败,保留原 URL: {url}")
# 替换 HTML 中的 URL
for old_url, new_url in url_mapping.items():
# 同时替换 src 和 data-src
html_content = html_content.replace(f'src="{old_url}"', f'src="{new_url}"')
html_content = html_content.replace(f'data-src="{old_url}"', f'data-src="{new_url}"')
print(f"[ImageUploader] 已替换: {old_url} -> {new_url}")
# 如果第二张图片被替换了,更新封面 URL
if len(all_img_urls) >= 2 and all_img_urls[1] in url_mapping:
cover_image_url = url_mapping[all_img_urls[1]]
return html_content, cover_image_url
def get_wechat_access_token(appid: str, appsecret: str) -> Optional[str]:
"""
获取微信 access_token
Args:
appid: 微信公众号 AppID
appsecret: 微信公众号 AppSecret
Returns:
access_token失败返回 None
"""
try:
url = "https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": appid,
"secret": appsecret
}
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
if "access_token" in data:
return data["access_token"]
else:
print(f"[ImageUploader] 获取 access_token 失败: {data}")
return None
except Exception as e:
print(f"[ImageUploader] 获取 access_token 出错: {e}")
return None
def main():
"""测试图片上传功能"""
config = get_config()
# 获取 access_token
access_token = get_wechat_access_token(config.wechat_appid, config.wechat_appsecret)
if not access_token:
print("[ImageUploader] 无法获取 access_token请检查配置")
return
# 测试 HTML
test_html = """
<div>
<img src="http://img.yidaima.cn/test.png" alt="测试">
<img src="http://mmbiz.qpic.cn/some/path" alt="微信图片">
<img src="./local.png" alt="本地图片">
</div>
"""
result = process_html_images(test_html, access_token, external_domain="yidaima.cn")
print("\n处理后的 HTML:")
print(result)
if __name__ == "__main__":
main()