89 lines
2.4 KiB
Python
89 lines
2.4 KiB
Python
import httpx
|
|
import config
|
|
from pathlib import Path
|
|
|
|
|
|
async def send_image(base64_str: str, md5: str):
|
|
url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={config.WECHAT_COMANY_BOT_KEY}"
|
|
|
|
payload = {"msgtype": "image", "image": {"base64": base64_str, "md5": md5}}
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.post(url, json=payload)
|
|
resp.raise_for_status()
|
|
|
|
|
|
async def upload_file(file_path: str, filename: str = "None") -> str:
|
|
"""
|
|
上传文件到 webhook 服务的 /upload 端点
|
|
|
|
Args:
|
|
file_path: 本地文件路径
|
|
filename: 上传时的文件名,不指定则使用原文件名
|
|
|
|
Returns:
|
|
远程文件路径
|
|
"""
|
|
if filename is None:
|
|
filename = Path(file_path).name
|
|
|
|
url = f"{config.WEBHOOK_API_URL}/upload"
|
|
headers = {"Authorization": f"Bearer {config.WEBHOOK_API_TOKEN}"}
|
|
|
|
with open(file_path, "rb") as f:
|
|
files = {"file": (filename, f)}
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
resp = await client.post(url, files=files, headers=headers)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return data.get("data", data.get("files", [None])[0])
|
|
|
|
|
|
async def send_webhook(group_id: str, file_url: str) -> dict:
|
|
"""
|
|
发送 webhook 请求通知文件
|
|
|
|
Args:
|
|
group_id: 群组 ID
|
|
file_url: 远程文件路径
|
|
|
|
Returns:
|
|
API 响应字典
|
|
"""
|
|
url = f"{config.WEBHOOK_API_URL}/webhook"
|
|
headers = {
|
|
"Authorization": f"Bearer {config.WEBHOOK_API_TOKEN}",
|
|
}
|
|
|
|
payload = {
|
|
"group_id": group_id,
|
|
"messages": [
|
|
{
|
|
"type": "image",
|
|
"url": file_url,
|
|
},
|
|
],
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.post(url, json=payload, headers=headers)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
async def upload_and_notify(file_path: str, group_id: str, filename: str = "") -> dict:
|
|
"""
|
|
上传文件并发送 webhook 通知(组合函数)
|
|
|
|
Args:
|
|
file_path: 本地文件路径
|
|
group_id: 群组 ID
|
|
filename: 上传时的文件名,不指定则使用原文件名
|
|
|
|
Returns:
|
|
API 响应字典
|
|
"""
|
|
remote_path = await upload_file(file_path, filename)
|
|
result = await send_webhook(group_id, remote_path)
|
|
return result
|