"""消息发送处理器:JSON 解析、参数校验、QQ API 超时与重试、批量发送。""" import asyncio from pathlib import Path from aiohttp import web from ..config import QQ_API_MAX_RETRIES, QQ_API_TIMEOUT, UPLOAD_DIR from ..response import error, ok VALID_MSG_TYPES = {"text", "image", "file", "video"} def _resolve_url(url: str) -> str: """如果 url 是本地已上传的文件(相对路径),补全为绝对路径。""" file_path = Path(url) if not file_path.is_absolute() and (UPLOAD_DIR / url).exists(): return str((UPLOAD_DIR / url).resolve()) return url def _validate_message(msg: dict) -> str | None: """校验单条消息,返回 None 或错误信息。""" msg_type = msg.get("type", "text") if msg_type not in VALID_MSG_TYPES: return f"invalid type: {msg_type}, must be one of {VALID_MSG_TYPES}" # text 必须有 msg,image/file/video 必须有 url if msg_type == "text" and not msg.get("msg"): return "missing required field: msg" if msg_type in {"image", "file", "video"} and not msg.get("url"): return "missing required field: url" return None async def _call_with_retry(coro_factory, logger, rid: str) -> None: """带超时和重试的 API 调用。""" last_exc: Exception | None = None for attempt in range(1, QQ_API_MAX_RETRIES + 1): try: await asyncio.wait_for(coro_factory(), timeout=QQ_API_TIMEOUT) return except asyncio.TimeoutError: last_exc = asyncio.TimeoutError() logger.warning(f"[{rid}] QQ API timeout, attempt {attempt}/{QQ_API_MAX_RETRIES}") except Exception as exc: last_exc = exc logger.error(f"[{rid}] QQ API error: {exc}, attempt {attempt}/{QQ_API_MAX_RETRIES}") raise Exception(f"qq api failed after {QQ_API_MAX_RETRIES} retries: {last_exc}") async def _send_single(api, msg_type: str, url: str | None, msg_text: str, group_id: str | None, user_id: str | None, logger, rid: str) -> None: """发送单条消息(非组合模式)。""" filename = url.split("/")[-1] if url else None if group_id: match msg_type: case "text": await _call_with_retry( lambda t=msg_text: api.qq.send_group_text(group_id=group_id, text=t), logger, rid ) case "image": await _call_with_retry( lambda u=url: api.qq.send_group_image(group_id=group_id, image=u), logger, rid ) case "file": await _call_with_retry( lambda u=url, n=filename: api.qq.send_group_file(group_id=group_id, file=u, name=n), logger, rid ) case "video": await _call_with_retry( lambda u=url: api.qq.send_group_video(group_id=group_id, video=u), logger, rid ) else: match msg_type: case "text": await _call_with_retry( lambda t=msg_text: api.qq.send_private_text(user_id=user_id, text=t), logger, rid ) case "image": await _call_with_retry( lambda u=url: api.qq.send_private_image(user_id=user_id, image=u), logger, rid ) case "file": await _call_with_retry( lambda u=url, n=filename: api.qq.send_private_file(user_id=user_id, file=u, name=n), logger, rid ) case "video": await _call_with_retry( lambda u=url: api.qq.send_private_video(user_id=user_id, video=u), logger, rid ) async def _send_batch(api, messages: list[dict], group_id: str | None, user_id: str | None, logger, rid: str) -> list[dict]: """批量发送消息。 - text/image/video 组合为一条消息(post_group_msg / post_private_msg) - 多张图片拆成多条消息发送 - file 单独发送(独占段) """ results: list[dict] = [] # 分组:每条消息记录原始 index combined_items: list[tuple[int, dict]] = [] # (index, msg) — text/image/video file_items: list[tuple[int, dict]] = [] # (index, msg) — file for i, msg in enumerate(messages): if msg.get("type") == "file": file_items.append((i, msg)) else: combined_items.append((i, msg)) # 处理组合消息:按图片拆组,每组最多一张图片 if combined_items: # 收集所有 text、image、video text_parts: list[str] = [] image_items: list[tuple[int, dict]] = [] video_url: str | None = None video_index: int | None = None combined_indices: list[int] = [] for idx, msg in combined_items: msg_type = msg.get("type", "text") if msg_type == "text": text_parts.append(msg.get("msg", "")) combined_indices.append(idx) elif msg_type == "image": image_items.append((idx, msg)) elif msg_type == "video": video_url = _resolve_url(msg.get("url", "")) video_index = idx text = "\n".join(text_parts) if text_parts else None # 如果有视频,视频单独发(独占段会吞掉其他内容) if video_url: try: if group_id: await _call_with_retry( lambda u=video_url: api.qq.post_group_msg( group_id=group_id, video=u ), logger, rid ) else: await _call_with_retry( lambda u=video_url: api.qq.post_private_msg( user_id=user_id, video=u ), logger, rid ) results.append({"index": video_index, "success": True}) except Exception as exc: logger.error(f"[{rid}] Video message failed: {exc}") results.append({"index": video_index, "success": False, "error": str(exc)}) # 文本 + 第一张图片组合 if text or image_items: first_image_url = _resolve_url(image_items[0][1].get("url", "")) if image_items else None text_image_indices = combined_indices + ([image_items[0][0]] if image_items else []) try: if group_id: await _call_with_retry( lambda t=text, img=first_image_url: api.qq.post_group_msg( group_id=group_id, text=t, image=img ), logger, rid ) else: await _call_with_retry( lambda t=text, img=first_image_url: api.qq.post_private_msg( user_id=user_id, text=t, image=img ), logger, rid ) for idx in text_image_indices: results.append({"index": idx, "success": True}) except Exception as exc: logger.error(f"[{rid}] Text+image message failed: {exc}") for idx in text_image_indices: results.append({"index": idx, "success": False, "error": str(exc)}) # 剩余图片逐张发送 for img_idx, img_msg in image_items[1:]: img_url = _resolve_url(img_msg.get("url", "")) try: if group_id: await _call_with_retry( lambda u=img_url: api.qq.send_group_image(group_id=group_id, image=u), logger, rid ) else: await _call_with_retry( lambda u=img_url: api.qq.send_private_image(user_id=user_id, image=u), logger, rid ) results.append({"index": img_idx, "success": True}) except Exception as exc: logger.error(f"[{rid}] Image message failed: {exc}") results.append({"index": img_idx, "success": False, "error": str(exc)}) # 文件单独发送 for idx, msg in file_items: url = _resolve_url(msg.get("url", "")) filename = url.split("/")[-1] try: if group_id: await _call_with_retry( lambda u=url, n=filename: api.qq.send_group_file(group_id=group_id, file=u, name=n), logger, rid ) else: await _call_with_retry( lambda u=url, n=filename: api.qq.send_private_file(user_id=user_id, file=u, name=n), logger, rid ) results.append({"index": idx, "success": True}) except Exception as exc: logger.error(f"[{rid}] File message failed: {exc}") results.append({"index": idx, "success": False, "error": str(exc)}) results.sort(key=lambda x: x["index"]) return results async def webhook_handler(request: web.Request) -> web.Response: """处理消息发送请求,支持单条和批量发送。""" try: data = await request.json() except Exception: return error("invalid json") if not isinstance(data, dict): return error("request body must be a json object") group_id = data.get("group_id") user_id = data.get("user_id") if not group_id and not user_id: return error("need group_id or user_id") api = request.app["qq_api"] logger = request.app["logger"] rid = request.get("request_id", "-") messages = data.get("messages") if messages is not None: # 批量模式 if not isinstance(messages, list): return error("messages must be an array") if not messages: return error("messages cannot be empty") # 校验所有消息 for i, msg in enumerate(messages): if not isinstance(msg, dict): return error(f"messages[{i}] must be an object") err = _validate_message(msg) if err: return error(f"messages[{i}]: {err}") try: results = await _send_batch(api, messages, group_id, user_id, logger, rid) except Exception as exc: logger.error(f"[{rid}] Batch send failed: {exc}") return error(f"batch send failed: {exc}", code=502, status=502) success_count = sum(1 for r in results if r["success"]) return ok(data={ "total": len(results), "success": success_count, "failed": len(results) - success_count, "results": results, }) else: # 单条模式 err = _validate_message(data) if err: return error(err) msg_type = data.get("type", "text") url = _resolve_url(data.get("url", "")) if msg_type != "text" else None msg_text = data.get("msg", "") try: await _send_single(api, msg_type, url, msg_text, group_id, user_id, logger, rid) return ok() except Exception as exc: logger.error(f"[{rid}] QQ API error: {exc}") return error(f"qq api failed: {exc}", code=502, status=502)