Files
webhook/plugin.py
zhilv ed6e27f162 feat(command): 添加监听范围过滤和回复 @控制
- 新增 COMMAND_SCOPE 配置,支持 all/group/private 过滤消息来源
- 新增 COMMAND_ALLOWED_GROUPS 群号白名单,逗号分隔,留空不限制
- 新增 COMMAND_ALLOWED_USERS QQ 号白名单,逗号分隔,留空不限制
- 新增 COMMAND_AT_SENDER 配置,控制回复时是否 @发送者(默认 true)
- 回调响应中 at_sender 字段可覆盖全局配置
- 更新 .env.example 和 README.md 文档
2026-05-03 12:26:44 +08:00

167 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ncatbot-webhook-plugin 入口:组装各模块,启动 HTTP 服务。"""
import asyncio
import logging
import os
from aiohttp import web
from ncatbot.plugin import NcatBotPlugin
from .config import (
COMMAND_ALLOWED_GROUPS,
COMMAND_ALLOWED_USERS,
COMMAND_AT_SENDER,
COMMAND_CALLBACK_URL,
COMMAND_LENGTH_MAX,
COMMAND_LENGTH_MIN,
COMMAND_PREFIX,
COMMAND_SCOPE,
HOST,
PORT,
UPLOAD_DIR,
WEBHOOK_API_KEY,
)
from .handlers.command import parse_command, send_command_callback
from .handlers.health import health_handler
from .handlers.message import webhook_handler
from .handlers.upload import cleanup_expired_files, upload_handler
from .middleware import auth_middleware, request_id_middleware
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
class WebHookPlugin(NcatBotPlugin):
"""NcatBot 插件:对外暴露 HTTP 接口,接收外部消息转发至 QQ。"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._webhook_runner: web.AppRunner | None = None
self._cleanup_task: asyncio.Task | None = None
self._listener_task: asyncio.Task | None = None
async def on_load(self):
self.logger.info("Webhook 插件已加载")
self.logger.info(
"WEBHOOK_API_KEY: %s",
"已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成",
)
self.logger.info(
"命令监听: 前缀=%s 长度=%d~%d 范围=%s 回调=%s",
COMMAND_PREFIX,
COMMAND_LENGTH_MIN,
COMMAND_LENGTH_MAX,
COMMAND_SCOPE,
COMMAND_CALLBACK_URL or "未配置",
)
asyncio.create_task(self._start_webhook())
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
self._listener_task = asyncio.create_task(self._message_listener())
async def on_close(self):
if self._listener_task is not None:
self._listener_task.cancel()
try:
await self._listener_task
except asyncio.CancelledError:
pass
self._listener_task = None
if self._cleanup_task is not None:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
self._cleanup_task = None
await self._stop_webhook()
self.logger.info("Webhook 插件已卸载")
async def _cleanup_loop(self) -> None:
"""每小时清理一次过期上传文件。"""
while True:
await asyncio.sleep(3600)
try:
await cleanup_expired_files()
except Exception as exc:
self.logger.error("清理过期文件失败: %s", exc)
async def _message_listener(self) -> None:
"""监听 QQ 消息,匹配命令模式后转发到外部回调。"""
try:
async with self.events("message") as stream:
async for event in stream:
try:
raw_message = event.data.raw_message
if not raw_message:
continue
parsed = parse_command(raw_message)
if not parsed:
continue
# 范围过滤group / private / all
is_group = hasattr(event.data, "group_id")
if COMMAND_SCOPE == "group" and not is_group:
continue
if COMMAND_SCOPE == "private" and is_group:
continue
# 群白名单过滤
if COMMAND_ALLOWED_GROUPS and is_group:
if event.data.group_id not in COMMAND_ALLOWED_GROUPS:
continue
# 用户白名单过滤
if COMMAND_ALLOWED_USERS and event.data.user_id not in COMMAND_ALLOWED_USERS:
continue
# 构建回调数据
data = {
"command": parsed["command"],
"content": parsed["content"],
"raw_message": parsed["raw_message"],
"user_id": event.data.user_id,
"message_id": event.data.message_id,
}
if hasattr(event.data, "group_id"):
data["group_id"] = event.data.group_id
self.logger.info(
"命令监听匹配: command=%s user=%s group=%s",
parsed["command"],
data["user_id"],
data.get("group_id", "-"),
)
asyncio.create_task(
send_command_callback(data, event, self.api, self.logger)
)
except Exception as exc:
self.logger.error("消息处理异常: %s", exc)
except asyncio.CancelledError:
return
def _create_app(self) -> web.Application:
app = web.Application(middlewares=[request_id_middleware, auth_middleware])
app["qq_api"] = self.api
app["logger"] = self.logger
app.router.add_get("/healthz", health_handler)
app.router.add_post("/webhook", webhook_handler)
app.router.add_post("/upload", upload_handler)
return app
async def _start_webhook(self):
await self._stop_webhook()
app = self._create_app()
self._webhook_runner = web.AppRunner(app)
await self._webhook_runner.setup()
site = web.TCPSite(self._webhook_runner, HOST, PORT)
await site.start()
self.logger.info("Webhook 已启动: %s:%d", HOST, PORT)
self.logger.info("上传目录: %s", UPLOAD_DIR)
async def _stop_webhook(self):
if self._webhook_runner is not None:
await self._webhook_runner.cleanup()
self._webhook_runner = None
self.logger.info("Webhook 已停止")