Files
webhook/plugin.py
zhilv 9ffe78a9c2 feat(command): 添加动态配置、黑白名单与后台管理界面
- 新增 SQLite 数据库层(db.py)持久化命令监听配置,支持热更新无需重启
- 命令过滤从白名单扩展为黑白名单双模式(COMMAND_LIST_MODE: allow/deny)
- 新增后台管理页面 /admin/,侧边栏布局,支持在线修改所有命令监听配置
- 新增 REST API:GET/PUT /api/settings、POST /api/settings/reload
- 新增 rebuild_pattern() 支持配置变更后正则动态重编译
- 中间件放行 /admin 和 /api 路径免鉴权
- 添加 aiosqlite 依赖
2026-05-03 15:22:53 +08:00

192 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ncatbot-webhook-plugin 入口:组装各模块,启动 HTTP 服务。"""
import asyncio
import logging
import os
from aiohttp import web
from ncatbot.plugin import NcatBotPlugin
from .config import (
COMMAND_ALLOWED_GROUPS,
COMMAND_ALLOWED_USERS,
COMMAND_AT_SENDER,
COMMAND_CALLBACK_URL,
COMMAND_DENIED_GROUPS,
COMMAND_DENIED_USERS,
COMMAND_LENGTH_MAX,
COMMAND_LENGTH_MIN,
COMMAND_LIST_MODE,
COMMAND_PREFIX,
COMMAND_SCOPE,
HOST,
PORT,
UPLOAD_DIR,
WEBHOOK_API_KEY,
reload_settings,
)
from .db import get_settings, init_db
from .handlers.admin import admin_page_handler, api_get_settings, api_reload_settings, api_update_settings
from .handlers.command import parse_command, send_command_callback
from .handlers.health import health_handler
from .handlers.message import webhook_handler
from .handlers.upload import cleanup_expired_files, upload_handler
from .middleware import auth_middleware, request_id_middleware
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
class WebHookPlugin(NcatBotPlugin):
"""NcatBot 插件:对外暴露 HTTP 接口,接收外部消息转发至 QQ。"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._webhook_runner: web.AppRunner | None = None
self._cleanup_task: asyncio.Task | None = None
self._listener_task: asyncio.Task | None = None
async def on_load(self):
# 初始化数据库并加载动态配置
await init_db()
settings = await get_settings()
reload_settings(settings)
self.logger.info("Webhook 插件已加载")
self.logger.info(
"WEBHOOK_API_KEY: %s",
"已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成",
)
self.logger.info(
"命令监听: 前缀=%s 长度=%d~%d 范围=%s 名单=%s 回调=%s",
COMMAND_PREFIX,
COMMAND_LENGTH_MIN,
COMMAND_LENGTH_MAX,
COMMAND_SCOPE,
COMMAND_LIST_MODE,
COMMAND_CALLBACK_URL or "未配置",
)
asyncio.create_task(self._start_webhook())
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
self._listener_task = asyncio.create_task(self._message_listener())
async def on_close(self):
if self._listener_task is not None:
self._listener_task.cancel()
try:
await self._listener_task
except asyncio.CancelledError:
pass
self._listener_task = None
if self._cleanup_task is not None:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
self._cleanup_task = None
await self._stop_webhook()
self.logger.info("Webhook 插件已卸载")
async def _cleanup_loop(self) -> None:
"""每小时清理一次过期上传文件。"""
while True:
await asyncio.sleep(3600)
try:
await cleanup_expired_files()
except Exception as exc:
self.logger.error("清理过期文件失败: %s", exc)
async def _message_listener(self) -> None:
"""监听 QQ 消息,匹配命令模式后转发到外部回调。"""
try:
async with self.events("message") as stream:
async for event in stream:
try:
raw_message = event.data.raw_message
if not raw_message:
continue
parsed = parse_command(raw_message)
if not parsed:
continue
# 范围过滤group / private / all
is_group = hasattr(event.data, "group_id")
if COMMAND_SCOPE == "group" and not is_group:
continue
if COMMAND_SCOPE == "private" and is_group:
continue
# 黑白名单过滤
if COMMAND_LIST_MODE == "allow":
# 白名单模式:在名单内才放行
if COMMAND_ALLOWED_GROUPS and is_group:
if event.data.group_id not in COMMAND_ALLOWED_GROUPS:
continue
if COMMAND_ALLOWED_USERS and event.data.user_id not in COMMAND_ALLOWED_USERS:
continue
elif COMMAND_LIST_MODE == "deny":
# 黑名单模式:在名单内则拒绝
if COMMAND_DENIED_GROUPS and is_group:
if event.data.group_id in COMMAND_DENIED_GROUPS:
continue
if COMMAND_DENIED_USERS and event.data.user_id in COMMAND_DENIED_USERS:
continue
# 构建回调数据
data = {
"command": parsed["command"],
"content": parsed["content"],
"raw_message": parsed["raw_message"],
"user_id": event.data.user_id,
"message_id": event.data.message_id,
}
if is_group:
data["group_id"] = event.data.group_id
self.logger.info(
"命令监听匹配: command=%s user=%s group=%s",
parsed["command"],
data["user_id"],
data.get("group_id", "-"),
)
asyncio.create_task(
send_command_callback(data, event, self.api, self.logger)
)
except Exception as exc:
self.logger.error("消息处理异常: %s", exc)
except asyncio.CancelledError:
return
def _create_app(self) -> web.Application:
app = web.Application(middlewares=[request_id_middleware, auth_middleware])
app["qq_api"] = self.api
app["logger"] = self.logger
app.router.add_get("/healthz", health_handler)
app.router.add_post("/webhook", webhook_handler)
app.router.add_post("/upload", upload_handler)
# 后台管理
app.router.add_get("/admin/", admin_page_handler)
app.router.add_get("/api/settings", api_get_settings)
app.router.add_put("/api/settings", api_update_settings)
app.router.add_post("/api/settings/reload", api_reload_settings)
return app
async def _start_webhook(self):
await self._stop_webhook()
app = self._create_app()
self._webhook_runner = web.AppRunner(app)
await self._webhook_runner.setup()
site = web.TCPSite(self._webhook_runner, HOST, PORT)
await site.start()
self.logger.info("Webhook 已启动: %s:%d", HOST, PORT)
self.logger.info("上传目录: %s", UPLOAD_DIR)
self.logger.info("后台管理: http://%s:%d/admin/", HOST, PORT)
async def _stop_webhook(self):
if self._webhook_runner is not None:
await self._webhook_runner.cleanup()
self._webhook_runner = None
self.logger.info("Webhook 已停止")