"""ncatbot-webhook-plugin 入口:组装各模块,启动 HTTP 服务。""" import asyncio import logging import os from aiohttp import web from ncatbot.plugin import NcatBotPlugin from .config import COMMAND_CALLBACK_URL, COMMAND_LENGTH, COMMAND_PREFIX, HOST, PORT, UPLOAD_DIR, WEBHOOK_API_KEY from .handlers.command import parse_command, send_command_callback from .handlers.health import health_handler from .handlers.message import webhook_handler from .handlers.upload import cleanup_expired_files, upload_handler from .middleware import auth_middleware, request_id_middleware logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) class WebHookPlugin(NcatBotPlugin): """NcatBot 插件:对外暴露 HTTP 接口,接收外部消息转发至 QQ。""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._webhook_runner: web.AppRunner | None = None self._cleanup_task: asyncio.Task | None = None self._listener_task: asyncio.Task | None = None async def on_load(self): self.logger.info("Webhook 插件已加载") self.logger.info("WEBHOOK_API_KEY: %s", "已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成") self.logger.info("命令监听: 前缀=%s 长度=%d 回调=%s", COMMAND_PREFIX, COMMAND_LENGTH, COMMAND_CALLBACK_URL or "未配置") asyncio.create_task(self._start_webhook()) self._cleanup_task = asyncio.create_task(self._cleanup_loop()) self._listener_task = asyncio.create_task(self._message_listener()) async def on_close(self): if self._listener_task is not None: self._listener_task.cancel() try: await self._listener_task except asyncio.CancelledError: pass self._listener_task = None if self._cleanup_task is not None: self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass self._cleanup_task = None await self._stop_webhook() self.logger.info("Webhook 插件已卸载") async def _cleanup_loop(self) -> None: """每小时清理一次过期上传文件。""" while True: await asyncio.sleep(3600) try: await cleanup_expired_files() except Exception as exc: self.logger.error("清理过期文件失败: %s", exc) async def _message_listener(self) -> None: """监听 QQ 消息,匹配命令模式后转发到外部回调。""" try: async with self.events("message") as stream: async for event in stream: try: raw_message = event.data.raw_message if not raw_message: continue parsed = parse_command(raw_message) if not parsed: continue # 构建回调数据 data = { "command": parsed["command"], "content": parsed["content"], "raw_message": parsed["raw_message"], "user_id": event.data.user_id, "message_id": event.data.message_id, } if hasattr(event.data, "group_id"): data["group_id"] = event.data.group_id self.logger.info( "命令监听匹配: command=%s user=%s group=%s", parsed["command"], data["user_id"], data.get("group_id", "-"), ) asyncio.create_task(send_command_callback(data, self.logger)) except Exception as exc: self.logger.error("消息处理异常: %s", exc) except asyncio.CancelledError: return def _create_app(self) -> web.Application: app = web.Application(middlewares=[request_id_middleware, auth_middleware]) app["qq_api"] = self.api app["logger"] = self.logger app.router.add_get("/healthz", health_handler) app.router.add_post("/webhook", webhook_handler) app.router.add_post("/upload", upload_handler) return app async def _start_webhook(self): await self._stop_webhook() app = self._create_app() self._webhook_runner = web.AppRunner(app) await self._webhook_runner.setup() site = web.TCPSite(self._webhook_runner, HOST, PORT) await site.start() self.logger.info("Webhook 已启动: %s:%d", HOST, PORT) self.logger.info("上传目录: %s", UPLOAD_DIR) async def _stop_webhook(self): if self._webhook_runner is not None: await self._webhook_runner.cleanup() self._webhook_runner = None self.logger.info("Webhook 已停止")