feat(command): 回调响应自动回复到 QQ

- 回调服务器可返回 reply 或 messages 字段,插件自动回复到原消息来源
- reply 为纯文本回复,messages 格式同 /webhook 接口
- 支持通过 group_id/user_id 覆盖回复目标
- 无需回复时返回空 JSON 即可
- 更新 README 文档说明回调响应格式
This commit is contained in:
2026-05-02 19:29:18 +08:00
parent ee1bd583d8
commit af0f6c7ec6
3 changed files with 126 additions and 9 deletions

View File

@@ -1,11 +1,10 @@
"""命令监听处理器:匹配 #命令名+空格 格式的消息,转发到外部回调 URL。"""
"""命令监听处理器:匹配 #命令名+空格 格式的消息,转发到外部回调 URL 并自动回复"""
import re
import aiohttp
from ..config import COMMAND_CALLBACK_URL, COMMAND_LENGTH, COMMAND_PREFIX
from ..response import error, ok
def build_command_pattern() -> re.Pattern:
@@ -31,11 +30,25 @@ def parse_command(raw_message: str) -> dict | None:
}
async def send_command_callback(data: dict, logger) -> bool:
"""将命令数据 POST 到外部回调 URL。返回是否成功。"""
async def send_command_callback(data: dict, event, api, logger) -> None:
"""将命令数据 POST 到外部回调 URL,根据响应自动回复到 QQ。
回调服务器返回格式:
{
"reply": "回复文本", // 纯文本回复
"messages": [ // 批量回复(可选,优先于 reply
{"type": "text", "msg": "..."},
{"type": "image", "url": "..."},
{"type": "file", "url": "..."},
{"type": "video", "url": "..."}
]
}
所有字段均为可选,无回复内容时返回空 JSON 即可。
"""
if not COMMAND_CALLBACK_URL:
logger.warning("COMMAND_CALLBACK_URL 未配置,跳过命令回调")
return False
return
try:
async with aiohttp.ClientSession() as session:
@@ -50,8 +63,81 @@ async def send_command_callback(data: dict, logger) -> bool:
"命令回调失败: status=%d url=%s body=%s",
resp.status, COMMAND_CALLBACK_URL, body[:200],
)
return False
return True
return
# 解析响应,自动回复
try:
result = await resp.json(content_type=None)
except Exception:
return
if not isinstance(result, dict):
return
await _handle_reply(result, event, api, logger)
except Exception as exc:
logger.error("命令回调异常: url=%s error=%s", COMMAND_CALLBACK_URL, exc)
return False
async def _handle_reply(result: dict, event, api, logger) -> None:
"""处理回调响应,自动回复到原消息来源。"""
group_id = result.get("group_id") or (getattr(event.data, "group_id", None))
user_id = result.get("user_id") or event.data.user_id
messages = result.get("messages")
reply = result.get("reply")
# 批量回复(使用 post_group_msg / post_private_msg 组合消息段)
if messages and isinstance(messages, list):
text_parts: list[str] = []
image_url: str | None = None
video_url: str | None = None
file_msgs: list[dict] = []
for msg in messages:
msg_type = msg.get("type", "text")
if msg_type == "text":
text_parts.append(msg.get("msg", ""))
elif msg_type == "image":
image_url = msg.get("url")
elif msg_type == "video":
video_url = msg.get("url")
elif msg_type == "file":
file_msgs.append(msg)
text = "\n".join(text_parts) if text_parts else None
try:
if group_id:
if text or image_url or video_url:
await api.qq.post_group_msg(
group_id=group_id, text=text, image=image_url, video=video_url,
)
for fm in file_msgs:
url = fm.get("url", "")
await api.qq.send_group_file(
group_id=group_id, file=url, name=url.split("/")[-1],
)
else:
if text or image_url or video_url:
await api.qq.post_private_msg(
user_id=user_id, text=text, image=image_url, video=video_url,
)
for fm in file_msgs:
url = fm.get("url", "")
await api.qq.send_private_file(
user_id=user_id, file=url, name=url.split("/")[-1],
)
except Exception as exc:
logger.error("命令回复失败: %s", exc)
return
# 纯文本回复
if reply and isinstance(reply, str):
try:
if group_id:
await api.qq.send_group_text(group_id=group_id, text=reply)
else:
await api.qq.send_private_text(user_id=user_id, text=reply)
except Exception as exc:
logger.error("命令回复失败: %s", exc)