"""命令监听处理器:匹配 #命令名+空格 格式的消息,转发到外部回调 URL 并自动回复。""" import re import aiohttp from ..config import COMMAND_CALLBACK_URL, COMMAND_LENGTH, COMMAND_PREFIX def build_command_pattern() -> re.Pattern: """构建命令匹配正则:# + N个中文字 + 空格。""" return re.compile( rf"^{re.escape(COMMAND_PREFIX)}([\u4e00-\u9fff]{{{COMMAND_LENGTH}}})\s+(.+)", re.DOTALL, ) COMMAND_PATTERN = build_command_pattern() def parse_command(raw_message: str) -> dict | None: """解析消息,匹配命令模式。返回 {command, content, raw_message} 或 None。""" match = COMMAND_PATTERN.match(raw_message.strip()) if not match: return None return { "command": match.group(1), "content": match.group(2).strip(), "raw_message": raw_message.strip(), } async def send_command_callback(data: dict, event, api, logger) -> None: """将命令数据 POST 到外部回调 URL,根据响应自动回复到 QQ。 回调服务器返回格式: { "reply": "回复文本", // 纯文本回复(引用原消息) "messages": [ // 批量回复(可选,优先于 reply) {"type": "text", "msg": "..."}, {"type": "image", "url": "..."}, {"type": "file", "url": "..."}, {"type": "video", "url": "..."} ], "at_sender": true // 是否 @发送者(默认 true,仅群聊) } 所有字段均为可选,无回复内容时返回空 JSON 即可。 回复会引用触发命令的原消息。 """ if not COMMAND_CALLBACK_URL: logger.warning("COMMAND_CALLBACK_URL 未配置,跳过命令回调") return try: async with aiohttp.ClientSession() as session: async with session.post( COMMAND_CALLBACK_URL, json=data, timeout=aiohttp.ClientTimeout(total=10), ) as resp: if resp.status >= 400: body = await resp.text() logger.error( "命令回调失败: status=%d url=%s body=%s", resp.status, COMMAND_CALLBACK_URL, body[:200], ) return # 解析响应,自动回复 try: result = await resp.json(content_type=None) except Exception: return if not isinstance(result, dict): return await _handle_reply(result, event.data, api, logger) except Exception as exc: logger.error("命令回调异常: url=%s error=%s", COMMAND_CALLBACK_URL, exc) async def _handle_reply(result: dict, msg_event, api, logger) -> None: """处理回调响应,引用原消息自动回复。msg_event 是 GroupMessageEvent / PrivateMessageEvent。""" at_sender = result.get("at_sender", True) messages = result.get("messages") reply = result.get("reply") group_id = getattr(msg_event, "group_id", None) user_id = msg_event.user_id message_id = msg_event.message_id # 构造引用消息段 from ncatbot.types import MessageArray, Reply def build_reply_msg(text=None, image=None, video=None) -> MessageArray: """构建带引用的消息段。""" msg = MessageArray() msg.add_reply(message_id) if group_id and at_sender: msg.add_at(user_id) msg.add_text(" ") if text is not None: msg.add_text(text) if image is not None: msg.add_image(image) if video is not None: msg.add_video(video) return msg # 批量回复:引用 + 批量消息段 if messages and isinstance(messages, list): text_parts: list[str] = [] image_url: str | None = None video_url: str | None = None file_msgs: list[dict] = [] for msg in messages: msg_type = msg.get("type", "text") if msg_type == "text": text_parts.append(msg.get("msg", "")) elif msg_type == "image": image_url = msg.get("url") elif msg_type == "video": video_url = msg.get("url") elif msg_type == "file": file_msgs.append(msg) text = "\n".join(text_parts) if text_parts else None try: # 组合消息(带引用) if text or image_url or video_url: reply_msg = build_reply_msg(text=text, image=image_url, video=video_url) if group_id: await api.qq.post_group_array_msg(group_id=group_id, msg=reply_msg) else: await api.qq.post_private_array_msg(user_id=user_id, msg=reply_msg) # 文件单独发 for fm in file_msgs: url = fm.get("url", "") filename = url.split("/")[-1] if group_id: await api.qq.send_group_file(group_id=group_id, file=url, name=filename) else: await api.qq.send_private_file(user_id=user_id, file=url, name=filename) except Exception as exc: logger.error("命令回复失败: %s", exc) return # 纯文本回复:引用原消息 if reply and isinstance(reply, str): try: reply_msg = build_reply_msg(text=reply) if group_id: await api.qq.post_group_array_msg(group_id=group_id, msg=reply_msg) else: await api.qq.post_private_array_msg(user_id=user_id, msg=reply_msg) except Exception as exc: logger.error("命令回复失败: %s", exc)