Files
webhook/handlers/message.py

132 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""消息发送处理器JSON 解析、参数校验、QQ API 超时与重试。"""
import asyncio
from pathlib import Path
from aiohttp import web
from ..config import QQ_API_MAX_RETRIES, QQ_API_TIMEOUT, UPLOAD_DIR
from ..response import error, ok
VALID_MSG_TYPES = {"text", "image", "file", "video"}
# 每种消息类型必填的字段
REQUIRED_FIELDS: dict[str, list[str]] = {
"text": ["msg"],
"image": ["url"],
"file": ["url"],
"video": ["url"],
}
def _validate_payload(data: dict) -> tuple[dict | None, web.Response | None]:
"""校验请求体,返回 (data, None) 或 (None, error_response)。"""
group_id = data.get("group_id")
user_id = data.get("user_id")
if not group_id and not user_id:
return None, error("need group_id or user_id")
msg_type = data.get("type", "text")
if msg_type not in VALID_MSG_TYPES:
return None, error(f"invalid type: {msg_type}, must be one of {VALID_MSG_TYPES}")
# 检查必填字段
missing = [f for f in REQUIRED_FIELDS.get(msg_type, []) if not data.get(f)]
if missing:
return None, error(f"missing required fields: {', '.join(missing)}")
return data, None
async def _call_qq_api(coro_factory, request: web.Request) -> web.Response:
"""带超时和重试的 QQ API 调用。"""
logger = request.app["logger"]
rid = request.get("request_id", "-")
last_exc: Exception | None = None
for attempt in range(1, QQ_API_MAX_RETRIES + 1):
try:
await asyncio.wait_for(coro_factory(), timeout=QQ_API_TIMEOUT)
return ok()
except asyncio.TimeoutError:
last_exc = asyncio.TimeoutError()
logger.warning(f"[{rid}] QQ API timeout, attempt {attempt}/{QQ_API_MAX_RETRIES}")
except Exception as exc:
last_exc = exc
logger.error(f"[{rid}] QQ API error: {exc}, attempt {attempt}/{QQ_API_MAX_RETRIES}")
logger.error(f"[{rid}] QQ API failed after {QQ_API_MAX_RETRIES} retries: {last_exc}")
return error(f"qq api failed: {last_exc}", code=502, status=502)
async def webhook_handler(request: web.Request) -> web.Response:
"""处理消息发送请求。"""
# 安全解析 JSONaiohttp 可能抛 JSONDecodeError 或 ContentTypeError
try:
data = await request.json()
except Exception:
return error("invalid json")
if not isinstance(data, dict):
return error("request body must be a json object")
data, err = _validate_payload(data)
if err:
return err
msg_type = data.get("type", "text")
group_id = data.get("group_id")
user_id = data.get("user_id")
msg = data.get("msg", "")
url = data.get("url", "")
# 如果 url 是本地已上传的文件(相对路径),补全为绝对路径
file_path = Path(url)
if not file_path.is_absolute() and (UPLOAD_DIR / url).exists():
url = str((UPLOAD_DIR / url).resolve())
# 获取 ncatbot API 实例
api = request.app["qq_api"]
if group_id:
match msg_type:
case "text":
return await _call_qq_api(
lambda: api.qq.send_group_text(group_id=group_id, text=msg), request
)
case "image":
return await _call_qq_api(
lambda: api.qq.send_group_image(group_id=group_id, image=url), request
)
case "file":
return await _call_qq_api(
lambda: api.qq.send_group_file(group_id=group_id, file=url, name=url.split("/")[-1]),
request,
)
case "video":
return await _call_qq_api(
lambda: api.qq.send_group_video(group_id=group_id, video=url), request
)
else:
match msg_type:
case "text":
return await _call_qq_api(
lambda: api.qq.send_private_text(user_id=user_id, text=msg), request
)
case "image":
return await _call_qq_api(
lambda: api.qq.send_private_image(user_id=user_id, image=url), request
)
case "file":
return await _call_qq_api(
lambda: api.qq.send_private_file(user_id=user_id, file=url, name=url.split("/")[-1]),
request,
)
case "video":
return await _call_qq_api(
lambda: api.qq.send_private_video(user_id=user_id, video=url), request
)
return error("unreachable", code=500, status=500)