Files
webhook/handlers/message.py

271 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""消息发送处理器JSON 解析、参数校验、QQ API 超时与重试、批量发送。"""
import asyncio
from pathlib import Path
from aiohttp import web
from ..config import QQ_API_MAX_RETRIES, QQ_API_TIMEOUT, UPLOAD_DIR
from ..response import error, ok
VALID_MSG_TYPES = {"text", "image", "file", "video"}
def _resolve_url(url: str) -> str:
"""如果 url 是本地已上传的文件(相对路径),补全为绝对路径。"""
file_path = Path(url)
if not file_path.is_absolute() and (UPLOAD_DIR / url).exists():
return str((UPLOAD_DIR / url).resolve())
return url
def _validate_message(msg: dict) -> str | None:
"""校验单条消息,返回 None 或错误信息。"""
msg_type = msg.get("type", "text")
if msg_type not in VALID_MSG_TYPES:
return f"invalid type: {msg_type}, must be one of {VALID_MSG_TYPES}"
# text 必须有 msgimage/file/video 必须有 url
if msg_type == "text" and not msg.get("msg"):
return "missing required field: msg"
if msg_type in {"image", "file", "video"} and not msg.get("url"):
return f"missing required field: url"
return None
async def _call_with_retry(coro_factory, logger, rid: str) -> None:
"""带超时和重试的 API 调用。"""
last_exc: Exception | None = None
for attempt in range(1, QQ_API_MAX_RETRIES + 1):
try:
await asyncio.wait_for(coro_factory(), timeout=QQ_API_TIMEOUT)
return
except asyncio.TimeoutError:
last_exc = asyncio.TimeoutError()
logger.warning(f"[{rid}] QQ API timeout, attempt {attempt}/{QQ_API_MAX_RETRIES}")
except Exception as exc:
last_exc = exc
logger.error(f"[{rid}] QQ API error: {exc}, attempt {attempt}/{QQ_API_MAX_RETRIES}")
raise Exception(f"qq api failed after {QQ_API_MAX_RETRIES} retries: {last_exc}")
async def webhook_handler(request: web.Request) -> web.Response:
"""处理消息发送请求,支持单条和批量发送。
单条格式:
{
"group_id": "123",
"type": "text",
"msg": "hello"
}
批量格式(使用 post_group_msg 组合消息段):
{
"group_id": "123",
"messages": [
{"type": "text", "msg": "hello"},
{"type": "image", "url": "photo.jpg"}
]
}
批量发送时text/image/video 会在一条消息里组合发送(框架自动拆分冲突段),
file 单独发送(因为 file 是独占段)。
"""
try:
data = await request.json()
except Exception:
return error("invalid json")
if not isinstance(data, dict):
return error("request body must be a json object")
group_id = data.get("group_id")
user_id = data.get("user_id")
if not group_id and not user_id:
return error("need group_id or user_id")
api = request.app["qq_api"]
logger = request.app["logger"]
rid = request.get("request_id", "-")
messages = data.get("messages")
if messages is not None:
# 批量模式
if not isinstance(messages, list):
return error("messages must be an array")
if not messages:
return error("messages cannot be empty")
# 校验所有消息
for i, msg in enumerate(messages):
if not isinstance(msg, dict):
return error(f"messages[{i}] must be an object")
err = _validate_message(msg)
if err:
return error(f"messages[{i}]: {err}")
# 分组text/image/video 可以组合file 必须单独发
combined_msgs: list[dict] = [] # text/image/video 组合
file_msgs: list[dict] = [] # file 单独发
for msg in messages:
if msg.get("type") == "file":
file_msgs.append(msg)
else:
combined_msgs.append(msg)
results: list[dict] = []
# 发送组合消息text + image + video
if combined_msgs:
text_parts: list[str] = []
image_urls: list[str] = []
video_url: str | None = None
for msg in combined_msgs:
msg_type = msg.get("type", "text")
if msg_type == "text":
text_parts.append(msg.get("msg", ""))
elif msg_type == "image":
image_urls.append(_resolve_url(msg.get("url", "")))
elif msg_type == "video":
video_url = _resolve_url(msg.get("url", ""))
text = "\n".join(text_parts) if text_parts else None
image = image_urls[0] if image_urls else None # post_group_msg 只支持单张图片
try:
if group_id:
await _call_with_retry(
lambda: api.qq.post_group_msg(
group_id=group_id,
text=text,
image=image,
video=video_url,
),
logger, rid
)
else:
await _call_with_retry(
lambda: api.qq.post_private_msg(
user_id=user_id,
text=text,
image=image,
video=video_url,
),
logger, rid
)
# 所有组合消息标记为成功
for i, msg in enumerate(messages):
if msg.get("type") != "file":
results.append({"index": i, "success": True})
except Exception as exc:
logger.error(f"[{rid}] Combined message failed: {exc}")
for i, msg in enumerate(messages):
if msg.get("type") != "file":
results.append({"index": i, "success": False, "error": str(exc)})
# 发送文件消息(每个文件单独一条)
for msg in file_msgs:
idx = messages.index(msg)
url = _resolve_url(msg.get("url", ""))
filename = url.split("/")[-1]
try:
if group_id:
await _call_with_retry(
lambda u=url, n=filename: api.qq.send_group_file(
group_id=group_id, file=u, name=n
),
logger, rid
)
else:
await _call_with_retry(
lambda u=url, n=filename: api.qq.send_private_file(
user_id=user_id, file=u, name=n
),
logger, rid
)
results.append({"index": idx, "success": True})
except Exception as exc:
logger.error(f"[{rid}] File message failed: {exc}")
results.append({"index": idx, "success": False, "error": str(exc)})
# 按 index 排序结果
results.sort(key=lambda x: x["index"])
success_count = sum(1 for r in results if r["success"])
return ok(data={
"total": len(results),
"success": success_count,
"failed": len(results) - success_count,
"results": results,
})
else:
# 单条模式(兼容旧格式)
err = _validate_message(data)
if err:
return error(err)
msg_type = data.get("type", "text")
url = _resolve_url(data.get("url", "")) if msg_type != "text" else None
msg_text = data.get("msg", "")
filename = url.split("/")[-1] if url else None
try:
if group_id:
match msg_type:
case "text":
await _call_with_retry(
lambda: api.qq.send_group_text(group_id=group_id, text=msg_text),
logger, rid
)
case "image":
await _call_with_retry(
lambda: api.qq.send_group_image(group_id=group_id, image=url),
logger, rid
)
case "file":
await _call_with_retry(
lambda: api.qq.send_group_file(group_id=group_id, file=url, name=filename),
logger, rid
)
case "video":
await _call_with_retry(
lambda: api.qq.send_group_video(group_id=group_id, video=url),
logger, rid
)
else:
match msg_type:
case "text":
await _call_with_retry(
lambda: api.qq.send_private_text(user_id=user_id, text=msg_text),
logger, rid
)
case "image":
await _call_with_retry(
lambda: api.qq.send_private_image(user_id=user_id, image=url),
logger, rid
)
case "file":
await _call_with_retry(
lambda: api.qq.send_private_file(user_id=user_id, file=url, name=filename),
logger, rid
)
case "video":
await _call_with_retry(
lambda: api.qq.send_private_video(user_id=user_id, video=url),
logger, rid
)
return ok()
except Exception as exc:
logger.error(f"[{rid}] QQ API error: {exc}")
return error(f"qq api failed: {exc}", code=502, status=502)