"""ncatbot-webhook-plugin 入口:组装各模块,启动 HTTP 服务。""" import asyncio import logging import os from aiohttp import web from ncatbot.plugin import NcatBotPlugin from .config import ( HOST, PORT, UPLOAD_DIR, WEBHOOK_API_KEY, SETTINGS_YAML_PATH, command, ensure_settings_yaml, reload_settings, ) from .handlers.admin import admin_page_handler, api_get_settings, api_reload_settings, api_update_settings from .handlers.command import parse_command, send_command_callback from .handlers.health import health_handler from .handlers.message import webhook_handler from .handlers.upload import cleanup_expired_files, upload_handler from .middleware import auth_middleware, request_id_middleware logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) class WebHookPlugin(NcatBotPlugin): """NcatBot 插件:对外暴露 HTTP 接口,接收外部消息转发至 QQ。""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._webhook_runner: web.AppRunner | None = None self._cleanup_task: asyncio.Task | None = None self._listener_task: asyncio.Task | None = None self._watcher_task: asyncio.Task | None = None async def on_load(self): # 初始化 settings.yaml 并加载配置 ensure_settings_yaml() reload_settings() self.logger.info("Webhook 插件已加载") self.logger.info( "WEBHOOK_API_KEY: %s", "已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成", ) self.logger.info( "命令监听: 前缀=%s 长度=%d~%d 范围=%s 名单=%s(%s) 回调=%s", command.prefix, command.length_min, command.length_max, command.scope, "开" if command.list_enabled else "关", command.list_mode, command.callback_url or "未配置", ) asyncio.create_task(self._start_webhook()) self._cleanup_task = asyncio.create_task(self._cleanup_loop()) self._listener_task = asyncio.create_task(self._message_listener()) self._watcher_task = asyncio.create_task(self._config_watcher()) async def on_close(self): for task_attr in ("_watcher_task", "_listener_task", "_cleanup_task"): task = getattr(self, task_attr) if task is not None: task.cancel() try: await task except asyncio.CancelledError: pass setattr(self, task_attr, None) await self._stop_webhook() self.logger.info("Webhook 插件已卸载") async def _cleanup_loop(self) -> None: """每小时清理一次过期上传文件。""" while True: await asyncio.sleep(3600) try: await cleanup_expired_files() except Exception as exc: self.logger.error("清理过期文件失败: %s", exc) async def _config_watcher(self) -> None: """轮询 settings.yaml 的 mtime,变更时热重载配置。""" last_mtime: float = 0.0 while True: await asyncio.sleep(2) try: if SETTINGS_YAML_PATH.exists(): current_mtime = SETTINGS_YAML_PATH.stat().st_mtime if current_mtime != last_mtime: last_mtime = current_mtime reload_settings() self.logger.info("settings.yaml changed, config reloaded") except Exception as exc: self.logger.error("Config watcher error: %s", exc) async def _message_listener(self) -> None: """监听 QQ 消息,匹配命令模式后转发到外部回调。""" try: async with self.events("message") as stream: async for event in stream: try: raw_message = event.data.raw_message if not raw_message: continue parsed = parse_command(raw_message) if not parsed: continue # 范围过滤:group / private / all is_group = hasattr(event.data, "group_id") if command.scope == "group" and not is_group: continue if command.scope == "private" and is_group: continue # 黑白名单过滤 if command.list_enabled: if command.list_mode == "allow": # 白名单模式:在名单内才放行 if command.allowed_groups and is_group: if event.data.group_id not in command.allowed_groups: continue if command.allowed_users and event.data.user_id not in command.allowed_users: continue elif command.list_mode == "deny": # 黑名单模式:在名单内则拒绝 if command.denied_groups and is_group: if event.data.group_id in command.denied_groups: continue if command.denied_users and event.data.user_id in command.denied_users: continue # 构建回调数据 data = { "command": parsed["command"], "content": parsed["content"], "raw_message": parsed["raw_message"], "user_id": event.data.user_id, "message_id": event.data.message_id, } if is_group: data["group_id"] = event.data.group_id self.logger.info( "命令监听匹配: command=%s user=%s group=%s", parsed["command"], data["user_id"], data.get("group_id", "-"), ) asyncio.create_task( send_command_callback(data, event, self.api, self.logger) ) except Exception as exc: self.logger.error("消息处理异常: %s", exc) except asyncio.CancelledError: return def _create_app(self) -> web.Application: app = web.Application(middlewares=[request_id_middleware, auth_middleware]) app["qq_api"] = self.api app["logger"] = self.logger app.router.add_get("/healthz", health_handler) app.router.add_post("/webhook", webhook_handler) app.router.add_post("/upload", upload_handler) # 后台管理 app.router.add_get("/admin/", admin_page_handler) app.router.add_get("/api/settings", api_get_settings) app.router.add_put("/api/settings", api_update_settings) app.router.add_post("/api/settings/reload", api_reload_settings) return app async def _start_webhook(self): await self._stop_webhook() app = self._create_app() self._webhook_runner = web.AppRunner(app) await self._webhook_runner.setup() site = web.TCPSite(self._webhook_runner, HOST, PORT) await site.start() self.logger.info("Webhook 已启动: %s:%d", HOST, PORT) self.logger.info("上传目录: %s", UPLOAD_DIR) self.logger.info("后台管理: http://%s:%d/admin/", HOST, PORT) async def _stop_webhook(self): if self._webhook_runner is not None: await self._webhook_runner.cleanup() self._webhook_runner = None self.logger.info("Webhook 已停止")