"""消息发送处理器:JSON 解析、参数校验、QQ API 超时与重试、批量发送。""" import asyncio from pathlib import Path from aiohttp import web from ..config import QQ_API_MAX_RETRIES, QQ_API_TIMEOUT, UPLOAD_DIR from ..response import error, ok VALID_MSG_TYPES = {"text", "image", "file", "video"} def _resolve_url(url: str) -> str: """如果 url 是本地已上传的文件(相对路径),补全为绝对路径。""" file_path = Path(url) if not file_path.is_absolute() and (UPLOAD_DIR / url).exists(): return str((UPLOAD_DIR / url).resolve()) return url def _validate_message(msg: dict) -> str | None: """校验单条消息,返回 None 或错误信息。""" msg_type = msg.get("type", "text") if msg_type not in VALID_MSG_TYPES: return f"invalid type: {msg_type}, must be one of {VALID_MSG_TYPES}" # text 必须有 msg,image/file/video 必须有 url if msg_type == "text" and not msg.get("msg"): return "missing required field: msg" if msg_type in {"image", "file", "video"} and not msg.get("url"): return f"missing required field: url" return None async def _call_with_retry(coro_factory, logger, rid: str) -> None: """带超时和重试的 API 调用。""" last_exc: Exception | None = None for attempt in range(1, QQ_API_MAX_RETRIES + 1): try: await asyncio.wait_for(coro_factory(), timeout=QQ_API_TIMEOUT) return except asyncio.TimeoutError: last_exc = asyncio.TimeoutError() logger.warning(f"[{rid}] QQ API timeout, attempt {attempt}/{QQ_API_MAX_RETRIES}") except Exception as exc: last_exc = exc logger.error(f"[{rid}] QQ API error: {exc}, attempt {attempt}/{QQ_API_MAX_RETRIES}") raise Exception(f"qq api failed after {QQ_API_MAX_RETRIES} retries: {last_exc}") async def webhook_handler(request: web.Request) -> web.Response: """处理消息发送请求,支持单条和批量发送。 单条格式: { "group_id": "123", "type": "text", "msg": "hello" } 批量格式(使用 post_group_msg 组合消息段): { "group_id": "123", "messages": [ {"type": "text", "msg": "hello"}, {"type": "image", "url": "photo.jpg"} ] } 批量发送时,text/image/video 会在一条消息里组合发送(框架自动拆分冲突段), file 单独发送(因为 file 是独占段)。 """ try: data = await request.json() except Exception: return error("invalid json") if not isinstance(data, dict): return error("request body must be a json object") group_id = data.get("group_id") user_id = data.get("user_id") if not group_id and not user_id: return error("need group_id or user_id") api = request.app["qq_api"] logger = request.app["logger"] rid = request.get("request_id", "-") messages = data.get("messages") if messages is not None: # 批量模式 if not isinstance(messages, list): return error("messages must be an array") if not messages: return error("messages cannot be empty") # 校验所有消息 for i, msg in enumerate(messages): if not isinstance(msg, dict): return error(f"messages[{i}] must be an object") err = _validate_message(msg) if err: return error(f"messages[{i}]: {err}") # 分组:text/image/video 可以组合,file 必须单独发 combined_msgs: list[dict] = [] # text/image/video 组合 file_msgs: list[dict] = [] # file 单独发 for msg in messages: if msg.get("type") == "file": file_msgs.append(msg) else: combined_msgs.append(msg) results: list[dict] = [] # 发送组合消息(text + image + video) if combined_msgs: text_parts: list[str] = [] image_urls: list[str] = [] video_url: str | None = None for msg in combined_msgs: msg_type = msg.get("type", "text") if msg_type == "text": text_parts.append(msg.get("msg", "")) elif msg_type == "image": image_urls.append(_resolve_url(msg.get("url", ""))) elif msg_type == "video": video_url = _resolve_url(msg.get("url", "")) text = "\n".join(text_parts) if text_parts else None image = image_urls[0] if image_urls else None # post_group_msg 只支持单张图片 try: if group_id: await _call_with_retry( lambda: api.qq.post_group_msg( group_id=group_id, text=text, image=image, video=video_url, ), logger, rid ) else: await _call_with_retry( lambda: api.qq.post_private_msg( user_id=user_id, text=text, image=image, video=video_url, ), logger, rid ) # 所有组合消息标记为成功 for i, msg in enumerate(messages): if msg.get("type") != "file": results.append({"index": i, "success": True}) except Exception as exc: logger.error(f"[{rid}] Combined message failed: {exc}") for i, msg in enumerate(messages): if msg.get("type") != "file": results.append({"index": i, "success": False, "error": str(exc)}) # 发送文件消息(每个文件单独一条) for msg in file_msgs: idx = messages.index(msg) url = _resolve_url(msg.get("url", "")) filename = url.split("/")[-1] try: if group_id: await _call_with_retry( lambda u=url, n=filename: api.qq.send_group_file( group_id=group_id, file=u, name=n ), logger, rid ) else: await _call_with_retry( lambda u=url, n=filename: api.qq.send_private_file( user_id=user_id, file=u, name=n ), logger, rid ) results.append({"index": idx, "success": True}) except Exception as exc: logger.error(f"[{rid}] File message failed: {exc}") results.append({"index": idx, "success": False, "error": str(exc)}) # 按 index 排序结果 results.sort(key=lambda x: x["index"]) success_count = sum(1 for r in results if r["success"]) return ok(data={ "total": len(results), "success": success_count, "failed": len(results) - success_count, "results": results, }) else: # 单条模式(兼容旧格式) err = _validate_message(data) if err: return error(err) msg_type = data.get("type", "text") url = _resolve_url(data.get("url", "")) if msg_type != "text" else None msg_text = data.get("msg", "") filename = url.split("/")[-1] if url else None try: if group_id: match msg_type: case "text": await _call_with_retry( lambda: api.qq.send_group_text(group_id=group_id, text=msg_text), logger, rid ) case "image": await _call_with_retry( lambda: api.qq.send_group_image(group_id=group_id, image=url), logger, rid ) case "file": await _call_with_retry( lambda: api.qq.send_group_file(group_id=group_id, file=url, name=filename), logger, rid ) case "video": await _call_with_retry( lambda: api.qq.send_group_video(group_id=group_id, video=url), logger, rid ) else: match msg_type: case "text": await _call_with_retry( lambda: api.qq.send_private_text(user_id=user_id, text=msg_text), logger, rid ) case "image": await _call_with_retry( lambda: api.qq.send_private_image(user_id=user_id, image=url), logger, rid ) case "file": await _call_with_retry( lambda: api.qq.send_private_file(user_id=user_id, file=url, name=filename), logger, rid ) case "video": await _call_with_retry( lambda: api.qq.send_private_video(user_id=user_id, video=url), logger, rid ) return ok() except Exception as exc: logger.error(f"[{rid}] QQ API error: {exc}") return error(f"qq api failed: {exc}", code=502, status=502)