diff --git a/.env.example b/.env.example index 0d349d8..1b356e5 100644 --- a/.env.example +++ b/.env.example @@ -15,3 +15,11 @@ ALLOWED_EXTENSIONS= # ── QQ API ── QQ_API_TIMEOUT=10 QQ_API_MAX_RETRIES=2 + +# ── 命令监听 ── +# 命令前缀,默认 # +COMMAND_PREFIX=# +# 命令名长度(中文字数),默认 4 +COMMAND_LENGTH=4 +# 匹配到命令后的回调 URL,留空则不监听 +COMMAND_CALLBACK_URL= diff --git a/README.md b/README.md index 7bca71d..999e728 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,9 @@ uv run python -m ncatbot | `ALLOWED_EXTENSIONS` | 否 | 空(不限) | 允许的扩展名,逗号分隔,如 `jpg,png,pdf` | | `QQ_API_TIMEOUT` | 否 | `10` | QQ API 超时秒数 | | `QQ_API_MAX_RETRIES` | 否 | `2` | QQ API 失败重试次数 | +| `COMMAND_PREFIX` | 否 | `#` | 命令前缀 | +| `COMMAND_LENGTH` | 否 | `4` | 命令名字符数(中文字数) | +| `COMMAND_CALLBACK_URL` | 否 | 空(不监听) | 命令匹配后的回调 URL | ## 接口说明 @@ -182,6 +185,75 @@ curl -X POST http://localhost:8081/webhook \ 每条消息独立处理,一条失败不影响其他消息。`results` 数组按原始 `index` 排序,包含每条消息的发送结果。 +### 命令监听 + +插件会自动监听 QQ 消息,当消息以 `#四个中文字+空格` 开头时,将命令内容 POST 到 `COMMAND_CALLBACK_URL`。 + +**匹配规则:** + +``` +#测试命令 你好世界 +│ │ │ │ +│ │ │ └── 命令内容(content) +│ └── 空格分隔 +└── 命令名(4个中文字) + └── 前缀(默认 #) +``` + +- 前缀、命令名长度可通过 `COMMAND_PREFIX`、`COMMAND_LENGTH` 配置 +- 不配置 `COMMAND_CALLBACK_URL` 则不监听 + +**回调请求体(POST JSON):** + +```json +{ + "command": "测试命令", + "content": "你好世界", + "raw_message": "#测试命令 你好世界", + "user_id": "123456", + "group_id": "789012", + "message_id": "abc123" +} +``` + +| 字段 | 说明 | +|---|---| +| `command` | 命令名(4个中文字) | +| `content` | 命令后的内容 | +| `raw_message` | 原始消息文本 | +| `user_id` | 发送者 QQ 号 | +| `group_id` | 群号(私聊消息无此字段) | +| `message_id` | 消息 ID | + +**回调响应格式(自动回复到 QQ):** + +回调服务器返回 JSON,插件会自动将内容回复到原消息来源(群/私聊)。 + +纯文本回复: +```json +{"reply": "收到你的命令了"} +``` + +批量回复(text/image/video 组合发送,file 单独发送): +```json +{ + "messages": [ + {"type": "text", "msg": "处理结果如下"}, + {"type": "image", "url": "https://example.com/result.png"}, + {"type": "file", "url": "report.pdf"} + ] +} +``` + +| 字段 | 说明 | +|---|---| +| `reply` | 纯文本回复(与 `messages` 二选一,`messages` 优先) | +| `messages` | 批量回复数组,格式同 `/webhook` 的 `messages` 字段 | +| `group_id` | 可选,覆盖回复目标群号(默认回复到原群) | +| `user_id` | 可选,覆盖回复目标 QQ 号(默认回复到原发送者) | + +不需要回复时返回 `{}` 或空响应即可。 + ## 项目结构 ``` @@ -191,6 +263,7 @@ curl -X POST http://localhost:8081/webhook \ ├── response.py # 统一响应格式 ├── handlers/ │ ├── __init__.py +│ ├── command.py # 命令监听匹配与回调 │ ├── health.py # GET /healthz │ ├── message.py # POST /webhook │ └── upload.py # POST /upload diff --git a/config.py b/config.py index bb2083a..28d6416 100644 --- a/config.py +++ b/config.py @@ -31,3 +31,9 @@ ALLOWED_EXTENSIONS: set[str] = set( # ── QQ API ─────────────────────────────────────────────────── QQ_API_TIMEOUT: float = float(os.environ.get("QQ_API_TIMEOUT", "10")) QQ_API_MAX_RETRIES: int = int(os.environ.get("QQ_API_MAX_RETRIES", "2")) + +# ── 命令监听 ──────────────────────────────────────────────── +COMMAND_PREFIX: str = os.environ.get("COMMAND_PREFIX", "#") +COMMAND_LENGTH_MIN: int = int(os.environ.get("COMMAND_LENGTH_MIN", "2")) +COMMAND_LENGTH_MAX: int = int(os.environ.get("COMMAND_LENGTH_MAX", "4")) +COMMAND_CALLBACK_URL: str = os.environ.get("COMMAND_CALLBACK_URL", "") diff --git a/handlers/command.py b/handlers/command.py new file mode 100644 index 0000000..54afa68 --- /dev/null +++ b/handlers/command.py @@ -0,0 +1,168 @@ +"""命令监听处理器:匹配 #命令名 格式的消息,转发到外部回调 URL 并自动回复。""" + +import re + +import aiohttp + +from ..config import COMMAND_CALLBACK_URL, COMMAND_LENGTH_MAX, COMMAND_LENGTH_MIN, COMMAND_PREFIX + + +def build_command_pattern() -> re.Pattern: + """构建命令匹配正则:# + N个字符(中文/数字/字母/下划线等),后面可跟空格+内容或无内容。 + + 每个"字符"按 Unicode 码点计: + - 一个中文字 = 1 + - 一个数字 = 1 + - 一个英文字母 = 1 + - 其他非空白字符 = 1 + """ + return re.compile( + rf"^{re.escape(COMMAND_PREFIX)}(\S{{{COMMAND_LENGTH_MIN},{COMMAND_LENGTH_MAX}}})(?:\s+(.+))?$", + re.DOTALL, + ) + + +COMMAND_PATTERN = build_command_pattern() + + +def parse_command(raw_message: str) -> dict | None: + """解析消息,匹配命令模式。返回 {command, content, raw_message} 或 None。""" + match = COMMAND_PATTERN.match(raw_message.strip()) + if not match: + return None + return { + "command": match.group(1), + "content": match.group(2).strip() if match.group(2) else "", + "raw_message": raw_message.strip(), + } + + +async def send_command_callback(data: dict, event, api, logger) -> None: + """将命令数据 POST 到外部回调 URL,根据响应自动回复到 QQ。 + + 回调服务器返回格式: + { + "reply": "回复文本", // 纯文本回复(引用原消息) + "messages": [ // 批量回复(可选,优先于 reply) + {"type": "text", "msg": "..."}, + {"type": "image", "url": "..."}, + {"type": "file", "url": "..."}, + {"type": "video", "url": "..."} + ], + "at_sender": true // 是否 @发送者(默认 true,仅群聊) + } + + 所有字段均为可选,无回复内容时返回空 JSON 即可。 + 回复会引用触发命令的原消息。 + """ + if not COMMAND_CALLBACK_URL: + logger.warning("COMMAND_CALLBACK_URL 未配置,跳过命令回调") + return + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + COMMAND_CALLBACK_URL, + json=data, + timeout=aiohttp.ClientTimeout(total=10), + ) as resp: + if resp.status >= 400: + body = await resp.text() + logger.error( + "命令回调失败: status=%d url=%s body=%s", + resp.status, COMMAND_CALLBACK_URL, body[:200], + ) + return + + # 解析响应,自动回复 + try: + result = await resp.json(content_type=None) + except Exception: + return + + if not isinstance(result, dict): + return + + await _handle_reply(result, event.data, api, logger) + + except Exception as exc: + logger.error("命令回调异常: url=%s error=%s", COMMAND_CALLBACK_URL, exc) + + +async def _handle_reply(result: dict, msg_event, api, logger) -> None: + """处理回调响应,引用原消息自动回复。msg_event 是 GroupMessageEvent / PrivateMessageEvent。""" + at_sender = result.get("at_sender", True) + messages = result.get("messages") + reply = result.get("reply") + group_id = getattr(msg_event, "group_id", None) + user_id = msg_event.user_id + message_id = msg_event.message_id + + # 构造引用消息段 + from ncatbot.types import MessageArray, Reply + + def build_reply_msg(text=None, image=None, video=None) -> MessageArray: + """构建带引用的消息段。""" + msg = MessageArray() + msg.add_reply(message_id) + if group_id and at_sender: + msg.add_at(user_id) + msg.add_text(" ") + if text is not None: + msg.add_text(text) + if image is not None: + msg.add_image(image) + if video is not None: + msg.add_video(video) + return msg + + # 批量回复:引用 + 批量消息段 + if messages and isinstance(messages, list): + text_parts: list[str] = [] + image_url: str | None = None + video_url: str | None = None + file_msgs: list[dict] = [] + + for msg in messages: + msg_type = msg.get("type", "text") + if msg_type == "text": + text_parts.append(msg.get("msg", "")) + elif msg_type == "image": + image_url = msg.get("url") + elif msg_type == "video": + video_url = msg.get("url") + elif msg_type == "file": + file_msgs.append(msg) + + text = "\n".join(text_parts) if text_parts else None + + try: + # 组合消息(带引用) + if text or image_url or video_url: + reply_msg = build_reply_msg(text=text, image=image_url, video=video_url) + if group_id: + await api.qq.post_group_array_msg(group_id=group_id, msg=reply_msg) + else: + await api.qq.post_private_array_msg(user_id=user_id, msg=reply_msg) + # 文件单独发 + for fm in file_msgs: + url = fm.get("url", "") + filename = url.split("/")[-1] + if group_id: + await api.qq.send_group_file(group_id=group_id, file=url, name=filename) + else: + await api.qq.send_private_file(user_id=user_id, file=url, name=filename) + except Exception as exc: + logger.error("命令回复失败: %s", exc) + return + + # 纯文本回复:引用原消息 + if reply and isinstance(reply, str): + try: + reply_msg = build_reply_msg(text=reply) + if group_id: + await api.qq.post_group_array_msg(group_id=group_id, msg=reply_msg) + else: + await api.qq.post_private_array_msg(user_id=user_id, msg=reply_msg) + except Exception as exc: + logger.error("命令回复失败: %s", exc) diff --git a/plugin.py b/plugin.py index 87054f1..85cf13b 100644 --- a/plugin.py +++ b/plugin.py @@ -7,7 +7,8 @@ import os from aiohttp import web from ncatbot.plugin import NcatBotPlugin -from .config import HOST, PORT, UPLOAD_DIR, WEBHOOK_API_KEY +from .config import COMMAND_CALLBACK_URL, COMMAND_LENGTH_MAX, COMMAND_LENGTH_MIN, COMMAND_PREFIX, HOST, PORT, UPLOAD_DIR, WEBHOOK_API_KEY +from .handlers.command import parse_command, send_command_callback from .handlers.health import health_handler from .handlers.message import webhook_handler from .handlers.upload import cleanup_expired_files, upload_handler @@ -26,14 +27,25 @@ class WebHookPlugin(NcatBotPlugin): super().__init__(*args, **kwargs) self._webhook_runner: web.AppRunner | None = None self._cleanup_task: asyncio.Task | None = None + self._listener_task: asyncio.Task | None = None async def on_load(self): self.logger.info("Webhook 插件已加载") self.logger.info("WEBHOOK_API_KEY: %s", "已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成") + self.logger.info("命令监听: 前缀=%s 长度=%d~%d 回调=%s", COMMAND_PREFIX, COMMAND_LENGTH_MIN, COMMAND_LENGTH_MAX, + COMMAND_CALLBACK_URL or "未配置") asyncio.create_task(self._start_webhook()) self._cleanup_task = asyncio.create_task(self._cleanup_loop()) + self._listener_task = asyncio.create_task(self._message_listener()) async def on_close(self): + if self._listener_task is not None: + self._listener_task.cancel() + try: + await self._listener_task + except asyncio.CancelledError: + pass + self._listener_task = None if self._cleanup_task is not None: self._cleanup_task.cancel() try: @@ -53,6 +65,40 @@ class WebHookPlugin(NcatBotPlugin): except Exception as exc: self.logger.error("清理过期文件失败: %s", exc) + async def _message_listener(self) -> None: + """监听 QQ 消息,匹配命令模式后转发到外部回调。""" + try: + async with self.events("message") as stream: + async for event in stream: + try: + raw_message = event.data.raw_message + if not raw_message: + continue + parsed = parse_command(raw_message) + if not parsed: + continue + # 构建回调数据 + data = { + "command": parsed["command"], + "content": parsed["content"], + "raw_message": parsed["raw_message"], + "user_id": event.data.user_id, + "message_id": event.data.message_id, + } + if hasattr(event.data, "group_id"): + data["group_id"] = event.data.group_id + self.logger.info( + "命令监听匹配: command=%s user=%s group=%s", + parsed["command"], data["user_id"], data.get("group_id", "-"), + ) + asyncio.create_task( + send_command_callback(data, event, self.api, self.logger) + ) + except Exception as exc: + self.logger.error("消息处理异常: %s", exc) + except asyncio.CancelledError: + return + def _create_app(self) -> web.Application: app = web.Application(middlewares=[request_id_middleware, auth_middleware]) app["qq_api"] = self.api