diff --git a/handlers/message.py b/handlers/message.py index 2afa656..163cf2f 100644 --- a/handlers/message.py +++ b/handlers/message.py @@ -1,4 +1,4 @@ -"""消息发送处理器:JSON 解析、参数校验、QQ API 超时与重试。""" +"""消息发送处理器:JSON 解析、参数校验、QQ API 超时与重试、批量发送。""" import asyncio from pathlib import Path @@ -10,45 +10,36 @@ from ..response import error, ok VALID_MSG_TYPES = {"text", "image", "file", "video"} -# 每种消息类型必填的字段 -REQUIRED_FIELDS: dict[str, list[str]] = { - "text": ["msg"], - "image": ["url"], - "file": ["url"], - "video": ["url"], -} + +def _resolve_url(url: str) -> str: + """如果 url 是本地已上传的文件(相对路径),补全为绝对路径。""" + file_path = Path(url) + if not file_path.is_absolute() and (UPLOAD_DIR / url).exists(): + return str((UPLOAD_DIR / url).resolve()) + return url -def _validate_payload(data: dict) -> tuple[dict | None, web.Response | None]: - """校验请求体,返回 (data, None) 或 (None, error_response)。""" - group_id = data.get("group_id") - user_id = data.get("user_id") - - if not group_id and not user_id: - return None, error("need group_id or user_id") - - msg_type = data.get("type", "text") +def _validate_message(msg: dict) -> str | None: + """校验单条消息,返回 None 或错误信息。""" + msg_type = msg.get("type", "text") if msg_type not in VALID_MSG_TYPES: - return None, error(f"invalid type: {msg_type}, must be one of {VALID_MSG_TYPES}") + return f"invalid type: {msg_type}, must be one of {VALID_MSG_TYPES}" - # 检查必填字段 - missing = [f for f in REQUIRED_FIELDS.get(msg_type, []) if not data.get(f)] - if missing: - return None, error(f"missing required fields: {', '.join(missing)}") - - return data, None + # text 必须有 msg,image/file/video 必须有 url + if msg_type == "text" and not msg.get("msg"): + return "missing required field: msg" + if msg_type in {"image", "file", "video"} and not msg.get("url"): + return f"missing required field: url" + return None -async def _call_qq_api(coro_factory, request: web.Request) -> web.Response: - """带超时和重试的 QQ API 调用。""" - logger = request.app["logger"] - rid = request.get("request_id", "-") - +async def _call_with_retry(coro_factory, logger, rid: str) -> None: + """带超时和重试的 API 调用。""" last_exc: Exception | None = None for attempt in range(1, QQ_API_MAX_RETRIES + 1): try: await asyncio.wait_for(coro_factory(), timeout=QQ_API_TIMEOUT) - return ok() + return except asyncio.TimeoutError: last_exc = asyncio.TimeoutError() logger.warning(f"[{rid}] QQ API timeout, attempt {attempt}/{QQ_API_MAX_RETRIES}") @@ -56,13 +47,31 @@ async def _call_qq_api(coro_factory, request: web.Request) -> web.Response: last_exc = exc logger.error(f"[{rid}] QQ API error: {exc}, attempt {attempt}/{QQ_API_MAX_RETRIES}") - logger.error(f"[{rid}] QQ API failed after {QQ_API_MAX_RETRIES} retries: {last_exc}") - return error(f"qq api failed: {last_exc}", code=502, status=502) + raise Exception(f"qq api failed after {QQ_API_MAX_RETRIES} retries: {last_exc}") async def webhook_handler(request: web.Request) -> web.Response: - """处理消息发送请求。""" - # 安全解析 JSON(aiohttp 可能抛 JSONDecodeError 或 ContentTypeError) + """处理消息发送请求,支持单条和批量发送。 + + 单条格式: + { + "group_id": "123", + "type": "text", + "msg": "hello" + } + + 批量格式(使用 post_group_msg 组合消息段): + { + "group_id": "123", + "messages": [ + {"type": "text", "msg": "hello"}, + {"type": "image", "url": "photo.jpg"} + ] + } + + 批量发送时,text/image/video 会在一条消息里组合发送(框架自动拆分冲突段), + file 单独发送(因为 file 是独占段)。 + """ try: data = await request.json() except Exception: @@ -71,61 +80,191 @@ async def webhook_handler(request: web.Request) -> web.Response: if not isinstance(data, dict): return error("request body must be a json object") - data, err = _validate_payload(data) - if err: - return err - - msg_type = data.get("type", "text") group_id = data.get("group_id") user_id = data.get("user_id") - msg = data.get("msg", "") - url = data.get("url", "") - # 如果 url 是本地已上传的文件(相对路径),补全为绝对路径 - file_path = Path(url) - if not file_path.is_absolute() and (UPLOAD_DIR / url).exists(): - url = str((UPLOAD_DIR / url).resolve()) + if not group_id and not user_id: + return error("need group_id or user_id") - # 获取 ncatbot API 实例 api = request.app["qq_api"] + logger = request.app["logger"] + rid = request.get("request_id", "-") + + messages = data.get("messages") + + if messages is not None: + # 批量模式 + if not isinstance(messages, list): + return error("messages must be an array") + + if not messages: + return error("messages cannot be empty") + + # 校验所有消息 + for i, msg in enumerate(messages): + if not isinstance(msg, dict): + return error(f"messages[{i}] must be an object") + err = _validate_message(msg) + if err: + return error(f"messages[{i}]: {err}") + + # 分组:text/image/video 可以组合,file 必须单独发 + combined_msgs: list[dict] = [] # text/image/video 组合 + file_msgs: list[dict] = [] # file 单独发 + + for msg in messages: + if msg.get("type") == "file": + file_msgs.append(msg) + else: + combined_msgs.append(msg) + + results: list[dict] = [] + + # 发送组合消息(text + image + video) + if combined_msgs: + text_parts: list[str] = [] + image_urls: list[str] = [] + video_url: str | None = None + + for msg in combined_msgs: + msg_type = msg.get("type", "text") + if msg_type == "text": + text_parts.append(msg.get("msg", "")) + elif msg_type == "image": + image_urls.append(_resolve_url(msg.get("url", ""))) + elif msg_type == "video": + video_url = _resolve_url(msg.get("url", "")) + + text = "\n".join(text_parts) if text_parts else None + image = image_urls[0] if image_urls else None # post_group_msg 只支持单张图片 + + try: + if group_id: + await _call_with_retry( + lambda: api.qq.post_group_msg( + group_id=group_id, + text=text, + image=image, + video=video_url, + ), + logger, rid + ) + else: + await _call_with_retry( + lambda: api.qq.post_private_msg( + user_id=user_id, + text=text, + image=image, + video=video_url, + ), + logger, rid + ) + + # 所有组合消息标记为成功 + for i, msg in enumerate(messages): + if msg.get("type") != "file": + results.append({"index": i, "success": True}) + except Exception as exc: + logger.error(f"[{rid}] Combined message failed: {exc}") + for i, msg in enumerate(messages): + if msg.get("type") != "file": + results.append({"index": i, "success": False, "error": str(exc)}) + + # 发送文件消息(每个文件单独一条) + for msg in file_msgs: + idx = messages.index(msg) + url = _resolve_url(msg.get("url", "")) + filename = url.split("/")[-1] + + try: + if group_id: + await _call_with_retry( + lambda u=url, n=filename: api.qq.send_group_file( + group_id=group_id, file=u, name=n + ), + logger, rid + ) + else: + await _call_with_retry( + lambda u=url, n=filename: api.qq.send_private_file( + user_id=user_id, file=u, name=n + ), + logger, rid + ) + results.append({"index": idx, "success": True}) + except Exception as exc: + logger.error(f"[{rid}] File message failed: {exc}") + results.append({"index": idx, "success": False, "error": str(exc)}) + + # 按 index 排序结果 + results.sort(key=lambda x: x["index"]) + success_count = sum(1 for r in results if r["success"]) + + return ok(data={ + "total": len(results), + "success": success_count, + "failed": len(results) - success_count, + "results": results, + }) - if group_id: - match msg_type: - case "text": - return await _call_qq_api( - lambda: api.qq.send_group_text(group_id=group_id, text=msg), request - ) - case "image": - return await _call_qq_api( - lambda: api.qq.send_group_image(group_id=group_id, image=url), request - ) - case "file": - return await _call_qq_api( - lambda: api.qq.send_group_file(group_id=group_id, file=url, name=url.split("/")[-1]), - request, - ) - case "video": - return await _call_qq_api( - lambda: api.qq.send_group_video(group_id=group_id, video=url), request - ) else: - match msg_type: - case "text": - return await _call_qq_api( - lambda: api.qq.send_private_text(user_id=user_id, text=msg), request - ) - case "image": - return await _call_qq_api( - lambda: api.qq.send_private_image(user_id=user_id, image=url), request - ) - case "file": - return await _call_qq_api( - lambda: api.qq.send_private_file(user_id=user_id, file=url, name=url.split("/")[-1]), - request, - ) - case "video": - return await _call_qq_api( - lambda: api.qq.send_private_video(user_id=user_id, video=url), request - ) + # 单条模式(兼容旧格式) + err = _validate_message(data) + if err: + return error(err) - return error("unreachable", code=500, status=500) + msg_type = data.get("type", "text") + url = _resolve_url(data.get("url", "")) if msg_type != "text" else None + msg_text = data.get("msg", "") + filename = url.split("/")[-1] if url else None + + try: + if group_id: + match msg_type: + case "text": + await _call_with_retry( + lambda: api.qq.send_group_text(group_id=group_id, text=msg_text), + logger, rid + ) + case "image": + await _call_with_retry( + lambda: api.qq.send_group_image(group_id=group_id, image=url), + logger, rid + ) + case "file": + await _call_with_retry( + lambda: api.qq.send_group_file(group_id=group_id, file=url, name=filename), + logger, rid + ) + case "video": + await _call_with_retry( + lambda: api.qq.send_group_video(group_id=group_id, video=url), + logger, rid + ) + else: + match msg_type: + case "text": + await _call_with_retry( + lambda: api.qq.send_private_text(user_id=user_id, text=msg_text), + logger, rid + ) + case "image": + await _call_with_retry( + lambda: api.qq.send_private_image(user_id=user_id, image=url), + logger, rid + ) + case "file": + await _call_with_retry( + lambda: api.qq.send_private_file(user_id=user_id, file=url, name=filename), + logger, rid + ) + case "video": + await _call_with_retry( + lambda: api.qq.send_private_video(user_id=user_id, video=url), + logger, rid + ) + + return ok() + except Exception as exc: + logger.error(f"[{rid}] QQ API error: {exc}") + return error(f"qq api failed: {exc}", code=502, status=502)