Files
webhook/plugin.py
zhilv 783049257e ♻️ refactor(command): 黑白名单过滤从 AND 改为 OR 逻辑
- 白名单模式:用户在名单 OR 群在名单 → 放行
- 黑名单模式:用户在名单 OR 群在名单 → 拒绝
- 名单为空时表示不限制
- 更新前端提示说明 OR 逻辑的语义
2026-05-04 00:05:18 +08:00

195 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ncatbot-webhook-plugin 入口:组装各模块,启动 HTTP 服务。"""
import asyncio
import logging
import os
from aiohttp import web
from ncatbot.plugin import NcatBotPlugin
from .config import (
HOST,
PORT,
UPLOAD_DIR,
WEBHOOK_API_KEY,
SETTINGS_YAML_PATH,
command,
ensure_settings_yaml,
reload_settings,
)
from .handlers.admin import admin_page_handler, api_get_settings, api_reload_settings, api_update_settings
from .handlers.command import parse_command, send_command_callback
from .handlers.health import health_handler
from .handlers.message import webhook_handler
from .handlers.upload import cleanup_expired_files, upload_handler
from .middleware import auth_middleware, request_id_middleware
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
class WebHookPlugin(NcatBotPlugin):
"""NcatBot 插件:对外暴露 HTTP 接口,接收外部消息转发至 QQ。"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._webhook_runner: web.AppRunner | None = None
self._cleanup_task: asyncio.Task | None = None
self._listener_task: asyncio.Task | None = None
self._watcher_task: asyncio.Task | None = None
async def on_load(self):
# 初始化 settings.yaml 并加载配置
ensure_settings_yaml()
reload_settings()
self.logger.info("Webhook 插件已加载")
self.logger.info(
"WEBHOOK_API_KEY: %s",
"已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成",
)
self.logger.info(
"命令监听: 前缀=%s 长度=%d~%d 范围=%s 名单=%s(%s) 回调=%s",
command.prefix,
command.length_min,
command.length_max,
command.scope,
"" if command.list_enabled else "",
command.list_mode,
command.callback_url or "未配置",
)
asyncio.create_task(self._start_webhook())
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
self._listener_task = asyncio.create_task(self._message_listener())
self._watcher_task = asyncio.create_task(self._config_watcher())
async def on_close(self):
for task_attr in ("_watcher_task", "_listener_task", "_cleanup_task"):
task = getattr(self, task_attr)
if task is not None:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
setattr(self, task_attr, None)
await self._stop_webhook()
self.logger.info("Webhook 插件已卸载")
async def _cleanup_loop(self) -> None:
"""每小时清理一次过期上传文件。"""
while True:
await asyncio.sleep(3600)
try:
await cleanup_expired_files()
except Exception as exc:
self.logger.error("清理过期文件失败: %s", exc)
async def _config_watcher(self) -> None:
"""轮询 settings.yaml 的 mtime变更时热重载配置。"""
last_mtime: float = 0.0
while True:
await asyncio.sleep(2)
try:
if SETTINGS_YAML_PATH.exists():
current_mtime = SETTINGS_YAML_PATH.stat().st_mtime
if current_mtime != last_mtime:
last_mtime = current_mtime
reload_settings()
self.logger.info("settings.yaml changed, config reloaded")
except Exception as exc:
self.logger.error("Config watcher error: %s", exc)
async def _message_listener(self) -> None:
"""监听 QQ 消息,匹配命令模式后转发到外部回调。"""
try:
async with self.events("message") as stream:
async for event in stream:
try:
raw_message = event.data.raw_message
if not raw_message:
continue
parsed = parse_command(raw_message)
if not parsed:
continue
# 范围过滤group / private / all
is_group = hasattr(event.data, "group_id")
if command.scope == "group" and not is_group:
continue
if command.scope == "private" and is_group:
continue
# 黑白名单过滤
if command.list_enabled:
if command.list_mode == "allow":
# 白名单模式OR 逻辑):用户在名单 OR 群在名单 → 放行
# 名单为空时视为不限制
if command.allowed_users or command.allowed_groups:
user_ok = event.data.user_id in command.allowed_users
group_ok = is_group and event.data.group_id in command.allowed_groups
if not (user_ok or group_ok):
continue
elif command.list_mode == "deny":
# 黑名单模式:用户在名单 OR 群在名单 → 拒绝
user_blocked = event.data.user_id in command.denied_users
group_blocked = is_group and event.data.group_id in command.denied_groups
if user_blocked or group_blocked:
continue
# 构建回调数据
data = {
"command": parsed["command"],
"content": parsed["content"],
"raw_message": parsed["raw_message"],
"user_id": event.data.user_id,
"message_id": event.data.message_id,
}
if is_group:
data["group_id"] = event.data.group_id
self.logger.info(
"命令监听匹配: command=%s user=%s group=%s",
parsed["command"],
data["user_id"],
data.get("group_id", "-"),
)
asyncio.create_task(
send_command_callback(data, event, self.api, self.logger)
)
except Exception as exc:
self.logger.error("消息处理异常: %s", exc)
except asyncio.CancelledError:
return
def _create_app(self) -> web.Application:
app = web.Application(middlewares=[request_id_middleware, auth_middleware])
app["qq_api"] = self.api
app["logger"] = self.logger
app.router.add_get("/healthz", health_handler)
app.router.add_post("/webhook", webhook_handler)
app.router.add_post("/upload", upload_handler)
# 后台管理
app.router.add_get("/admin/", admin_page_handler)
app.router.add_get("/api/settings", api_get_settings)
app.router.add_put("/api/settings", api_update_settings)
app.router.add_post("/api/settings/reload", api_reload_settings)
return app
async def _start_webhook(self):
await self._stop_webhook()
app = self._create_app()
self._webhook_runner = web.AppRunner(app)
await self._webhook_runner.setup()
site = web.TCPSite(self._webhook_runner, HOST, PORT)
await site.start()
self.logger.info("Webhook 已启动: %s:%d", HOST, PORT)
self.logger.info("上传目录: %s", UPLOAD_DIR)
self.logger.info("后台管理: http://%s:%d/admin/", HOST, PORT)
async def _stop_webhook(self):
if self._webhook_runner is not None:
await self._webhook_runner.cleanup()
self._webhook_runner = None
self.logger.info("Webhook 已停止")