diff --git a/.env.example b/.env.example index 1b356e5..89736f9 100644 --- a/.env.example +++ b/.env.example @@ -19,7 +19,19 @@ QQ_API_MAX_RETRIES=2 # ── 命令监听 ── # 命令前缀,默认 # COMMAND_PREFIX=# -# 命令名长度(中文字数),默认 4 -COMMAND_LENGTH=4 +# 命令名最小字符数,默认 2 +COMMAND_LENGTH_MIN=2 +# 命令名最大字符数,默认 4 +COMMAND_LENGTH_MAX=4 +# 监听范围:all(群+私)、group(仅群)、private(仅私),默认 all +COMMAND_SCOPE=all +# 允许的群号,逗号分隔,留空不限制,例:123456,789012 +COMMAND_ALLOWED_GROUPS= +# 允许的 QQ 号,逗号分隔,留空不限制,例:111111,222222 +COMMAND_ALLOWED_USERS= +# 回复时是否 @发送者,默认 true +COMMAND_AT_SENDER=true +# 回调超时秒数,默认 180(生图等耗时命令需要较长超时) +COMMAND_CALLBACK_TIMEOUT=180 # 匹配到命令后的回调 URL,留空则不监听 COMMAND_CALLBACK_URL= diff --git a/.gitignore b/.gitignore index 9a2006c..89d7581 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,9 @@ uploads/ # Environment .env + +# Dynamic config +settings.yaml + +# Legacy data +data/ diff --git a/README.md b/README.md index 999e728..8906bde 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,13 @@ uv run python -m ncatbot | `QQ_API_TIMEOUT` | 否 | `10` | QQ API 超时秒数 | | `QQ_API_MAX_RETRIES` | 否 | `2` | QQ API 失败重试次数 | | `COMMAND_PREFIX` | 否 | `#` | 命令前缀 | -| `COMMAND_LENGTH` | 否 | `4` | 命令名字符数(中文字数) | +| `COMMAND_LENGTH_MIN` | 否 | `2` | 命令名最小字符数 | +| `COMMAND_LENGTH_MAX` | 否 | `4` | 命令名最大字符数 | +| `COMMAND_SCOPE` | 否 | `all` | 监听范围:`all`(群+私)、`group`(仅群)、`private`(仅私) | +| `COMMAND_ALLOWED_GROUPS` | 否 | 空(不限) | 允许的群号,逗号分隔,如 `123456,789012` | +| `COMMAND_ALLOWED_USERS` | 否 | 空(不限) | 允许的 QQ 号,逗号分隔,如 `111111,222222` | +| `COMMAND_AT_SENDER` | 否 | `true` | 回复时是否 @发送者 | +| `COMMAND_CALLBACK_TIMEOUT` | 否 | `180` | 回调超时秒数 | | `COMMAND_CALLBACK_URL` | 否 | 空(不监听) | 命令匹配后的回调 URL | ## 接口说明 @@ -187,22 +193,34 @@ curl -X POST http://localhost:8081/webhook \ ### 命令监听 -插件会自动监听 QQ 消息,当消息以 `#四个中文字+空格` 开头时,将命令内容 POST 到 `COMMAND_CALLBACK_URL`。 +插件会自动监听 QQ 消息,当消息以 `#命令名` 开头时,将命令内容 POST 到 `COMMAND_CALLBACK_URL`。 **匹配规则:** ``` #测试命令 你好世界 │ │ │ │ -│ │ │ └── 命令内容(content) -│ └── 空格分隔 -└── 命令名(4个中文字) +│ │ │ └── 命令内容(content,可选) +│ └── 空格分隔(可选) +└── 命令名(2~4个字符) └── 前缀(默认 #) ``` -- 前缀、命令名长度可通过 `COMMAND_PREFIX`、`COMMAND_LENGTH` 配置 +- 命令名支持中文、数字、字母等任意非空白字符,每个字符计 1 +- 前缀通过 `COMMAND_PREFIX` 配置,长度范围通过 `COMMAND_LENGTH_MIN` / `COMMAND_LENGTH_MAX` 配置 +- `#测试命令`、`#1a`、`#abc` 均可触发 - 不配置 `COMMAND_CALLBACK_URL` 则不监听 +**监听范围过滤:** + +- `COMMAND_SCOPE`:控制监听范围 + - `all`(默认):群聊 + 私聊都监听 + - `group`:仅监听群聊 + - `private`:仅监听私聊 +- `COMMAND_ALLOWED_GROUPS`:群号白名单,逗号分隔,留空不限制 +- `COMMAND_ALLOWED_USERS`:QQ 号白名单,逗号分隔,留空不限制 +- 三个条件同时生效,必须全部满足才触发回调 + **回调请求体(POST JSON):** ```json @@ -218,7 +236,7 @@ curl -X POST http://localhost:8081/webhook \ | 字段 | 说明 | |---|---| -| `command` | 命令名(4个中文字) | +| `command` | 命令名(2~4个字符) | | `content` | 命令后的内容 | | `raw_message` | 原始消息文本 | | `user_id` | 发送者 QQ 号 | @@ -249,6 +267,7 @@ curl -X POST http://localhost:8081/webhook \ |---|---| | `reply` | 纯文本回复(与 `messages` 二选一,`messages` 优先) | | `messages` | 批量回复数组,格式同 `/webhook` 的 `messages` 字段 | +| `at_sender` | 是否 @发送者(默认取 `COMMAND_AT_SENDER` 配置,仅群聊生效) | | `group_id` | 可选,覆盖回复目标群号(默认回复到原群) | | `user_id` | 可选,覆盖回复目标 QQ 号(默认回复到原发送者) | diff --git a/config.py b/config.py index 123c596..4c77428 100644 --- a/config.py +++ b/config.py @@ -1,40 +1,284 @@ -"""项目配置:所有值从环境变量读取,未配置时使用安全默认值。""" +"""项目配置:静态配置从环境变量读取,命令监听配置通过 settings.yaml 动态管理。""" +import logging import os import uuid +from dataclasses import dataclass from pathlib import Path +import yaml from dotenv import load_dotenv # 加载 .env 文件(优先从插件目录查找) load_dotenv(Path(__file__).parent / ".env") -# ── 鉴权 ──────────────────────────────────────────────────── -WEBHOOK_API_KEY: str = os.environ.get("WEBHOOK_API_KEY", "") or uuid.uuid4().hex +logger = logging.getLogger("webhook-plugin.config") -# ── 网络 ───────────────────────────────────────────────────── +# ── 静态配置(环境变量,运行时不变)──────────────────────────── +WEBHOOK_API_KEY: str = os.environ.get("WEBHOOK_API_KEY", "") or uuid.uuid4().hex HOST: str = os.environ.get("WEBHOOK_HOST", "0.0.0.0") try: PORT: int = int(os.environ.get("WEBHOOK_PORT", "8081")) except ValueError: PORT = 8081 -# ── 上传 ───────────────────────────────────────────────────── UPLOAD_DIR: Path = Path(os.environ.get("UPLOAD_DIR", str(Path(__file__).parent / "uploads"))) -# 单个文件最大 20 MB MAX_UPLOAD_SIZE: int = int(os.environ.get("MAX_UPLOAD_SIZE", str(20 * 1024 * 1024))) -# 允许的文件扩展名(小写,不含点),为空则不限制 ALLOWED_EXTENSIONS: set[str] = set( filter(None, os.environ.get("ALLOWED_EXTENSIONS", "").lower().split(",")) ) -# ── QQ API ─────────────────────────────────────────────────── QQ_API_TIMEOUT: float = float(os.environ.get("QQ_API_TIMEOUT", "10")) QQ_API_MAX_RETRIES: int = int(os.environ.get("QQ_API_MAX_RETRIES", "2")) -# ── 命令监听 ──────────────────────────────────────────────── -COMMAND_PREFIX: str = os.environ.get("COMMAND_PREFIX", "#") -COMMAND_LENGTH_MIN: int = int(os.environ.get("COMMAND_LENGTH_MIN", "2")) -COMMAND_LENGTH_MAX: int = int(os.environ.get("COMMAND_LENGTH_MAX", "4")) -COMMAND_CALLBACK_URL: str = os.environ.get("COMMAND_CALLBACK_URL", "") -COMMAND_CALLBACK_TIMEOUT: int = int(os.environ.get("COMMAND_CALLBACK_TIMEOUT", "180")) +# ── YAML 文件路径 ──────────────────────────────────────────── +SETTINGS_YAML_PATH: Path = Path(__file__).parent / "settings.yaml" + +# ── 动态配置 dataclass ────────────────────────────────────── +@dataclass +class CommandConfig: + """命令监听动态配置,通过 settings.yaml 热重载。""" + + prefix: str = "#" + length_min: int = 2 + length_max: int = 4 + scope: str = "all" # all / group / private + list_enabled: bool = False # 是否启用黑白名单过滤 + list_mode: str = "allow" # allow / deny + allowed_groups: frozenset[str] = frozenset() + denied_groups: frozenset[str] = frozenset() + allowed_users: frozenset[str] = frozenset() + denied_users: frozenset[str] = frozenset() + at_sender: bool = True + callback_url: str = "" + callback_timeout: int = 180 + + +# 全局单例 —— 所有消费者通过 config.command.xxx 访问,避免 stale import 问题 +command = CommandConfig() + +# ── 环境变量默认值(YAML 缺失 key 时的回退)──────────────────── +_ENV_DEFAULTS: dict = { + "prefix": os.environ.get("COMMAND_PREFIX", "#"), + "length_min": int(os.environ.get("COMMAND_LENGTH_MIN", "2")), + "length_max": int(os.environ.get("COMMAND_LENGTH_MAX", "4")), + "scope": os.environ.get("COMMAND_SCOPE", "all"), + "list_enabled": os.environ.get("COMMAND_LIST_ENABLED", "false").lower() in ("true", "1", "yes"), + "list_mode": os.environ.get("COMMAND_LIST_MODE", "allow"), + "allowed_groups": os.environ.get("COMMAND_ALLOWED_GROUPS", ""), + "denied_groups": os.environ.get("COMMAND_DENIED_GROUPS", ""), + "allowed_users": os.environ.get("COMMAND_ALLOWED_USERS", ""), + "denied_users": os.environ.get("COMMAND_DENIED_USERS", ""), + "at_sender": os.environ.get("COMMAND_AT_SENDER", "true").lower() in ("true", "1", "yes"), + "callback_url": os.environ.get("COMMAND_CALLBACK_URL", ""), + "callback_timeout": int(os.environ.get("COMMAND_CALLBACK_TIMEOUT", "180")), +} + +# ── YAML I/O ───────────────────────────────────────────────── +_YAML_HEADER = ( + "# settings.yaml - 命令监听动态配置\n" + "# 可通过后台管理界面修改,也可手动编辑(自动热重载)\n\n" +) + + +def _yaml_defaults() -> dict: + """返回默认 YAML 结构(首次启动时写入)。""" + return { + "command": { + "prefix": "#", + "length_min": 2, + "length_max": 4, + "scope": "all", + "list_enabled": False, + "list_mode": "allow", + "allowed_groups": [], + "denied_groups": [], + "allowed_users": [], + "denied_users": [], + "at_sender": True, + "callback_url": "", + "callback_timeout": 180, + } + } + + +def load_yaml() -> dict: + """读取 settings.yaml,文件不存在时返回空字典。""" + if not SETTINGS_YAML_PATH.exists(): + return {} + with open(SETTINGS_YAML_PATH, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + return data if isinstance(data, dict) else {} + + +def save_yaml(data: dict) -> None: + """将配置写入 settings.yaml。""" + with open(SETTINGS_YAML_PATH, "w", encoding="utf-8") as f: + f.write(_YAML_HEADER) + yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) + + +def ensure_settings_yaml() -> None: + """首次启动时创建 settings.yaml,合并环境变量覆盖。""" + if SETTINGS_YAML_PATH.exists(): + return + data = _yaml_defaults() + # 用环境变量覆盖默认值 + env_map = { + "prefix": ("COMMAND_PREFIX", str), + "length_min": ("COMMAND_LENGTH_MIN", int), + "length_max": ("COMMAND_LENGTH_MAX", int), + "scope": ("COMMAND_SCOPE", str), + "list_mode": ("COMMAND_LIST_MODE", str), + "list_enabled": ("COMMAND_LIST_ENABLED", "bool"), + "allowed_groups": ("COMMAND_ALLOWED_GROUPS", "csv_list"), + "denied_groups": ("COMMAND_DENIED_GROUPS", "csv_list"), + "allowed_users": ("COMMAND_ALLOWED_USERS", "csv_list"), + "denied_users": ("COMMAND_DENIED_USERS", "csv_list"), + "at_sender": ("COMMAND_AT_SENDER", "bool"), + "callback_url": ("COMMAND_CALLBACK_URL", str), + "callback_timeout": ("COMMAND_CALLBACK_TIMEOUT", int), + } + for key, (env_name, conv) in env_map.items(): + env_val = os.environ.get(env_name) + if env_val is not None: + if conv == "csv_list": + data["command"][key] = [v.strip() for v in env_val.split(",") if v.strip()] + elif conv == "bool": + data["command"][key] = env_val.lower() in ("true", "1", "yes") + else: + data["command"][key] = conv(env_val) + save_yaml(data) + logger.info("Created settings.yaml with defaults (env-var overrides applied)") + + +# ── 将 YAML 数据应用到全局 command 对象 ─────────────────────── +def _apply_yaml_to_command(data: dict) -> None: + """将 YAML command 段合并到全局 command 对象,缺失 key 回退到环境变量默认值。""" + cmd_data = data.get("command", {}) + + command.prefix = str(cmd_data.get("prefix", _ENV_DEFAULTS["prefix"])) + command.length_min = int(cmd_data.get("length_min", _ENV_DEFAULTS["length_min"])) + command.length_max = int(cmd_data.get("length_max", _ENV_DEFAULTS["length_max"])) + command.scope = str(cmd_data.get("scope", _ENV_DEFAULTS["scope"])) + command.list_mode = str(cmd_data.get("list_mode", _ENV_DEFAULTS["list_mode"])) + + # list_enabled: YAML bool → Python bool + list_enabled_val = cmd_data.get("list_enabled", _ENV_DEFAULTS["list_enabled"]) + if isinstance(list_enabled_val, str): + command.list_enabled = list_enabled_val.lower() in ("true", "1", "yes") + else: + command.list_enabled = bool(list_enabled_val) + + command.callback_url = str(cmd_data.get("callback_url", _ENV_DEFAULTS["callback_url"])) + command.callback_timeout = int(cmd_data.get("callback_timeout", _ENV_DEFAULTS["callback_timeout"])) + + # at_sender: YAML bool → Python bool + at_sender_val = cmd_data.get("at_sender", _ENV_DEFAULTS["at_sender"]) + if isinstance(at_sender_val, str): + command.at_sender = at_sender_val.lower() in ("true", "1", "yes") + else: + command.at_sender = bool(at_sender_val) + + # 列表字段: YAML list → frozenset + for field_name in ("allowed_groups", "denied_groups", "allowed_users", "denied_users"): + raw = cmd_data.get(field_name) + if raw is None: + # 回退到环境变量默认值 + env_default = _ENV_DEFAULTS[field_name] + if isinstance(env_default, str): + setattr(command, field_name, frozenset( + v.strip() for v in env_default.split(",") if v.strip() + )) + else: + setattr(command, field_name, frozenset(env_default) if env_default else frozenset()) + elif isinstance(raw, str): + setattr(command, field_name, frozenset( + v.strip() for v in raw.split(",") if v.strip() + )) + elif isinstance(raw, list): + setattr(command, field_name, frozenset(str(v).strip() for v in raw if str(v).strip())) + else: + setattr(command, field_name, frozenset()) + + # 重建命令匹配正则 + from .handlers.command import rebuild_pattern + rebuild_pattern() + + +# ── 公共 API ───────────────────────────────────────────────── +def reload_settings() -> None: + """重新读取 settings.yaml 并应用到全局 command 对象。""" + data = load_yaml() + _apply_yaml_to_command(data) + logger.info("Config reloaded from settings.yaml") + + +def get_settings_flat() -> dict[str, str]: + """返回所有动态配置的扁平字典 {key: str_value},供管理 API 使用。""" + return { + "command_prefix": command.prefix, + "command_length_min": str(command.length_min), + "command_length_max": str(command.length_max), + "command_scope": command.scope, + "command_list_mode": command.list_mode, + "command_list_enabled": str(command.list_enabled).lower(), + "command_allowed_groups": ",".join(sorted(command.allowed_groups)), + "command_denied_groups": ",".join(sorted(command.denied_groups)), + "command_allowed_users": ",".join(sorted(command.allowed_users)), + "command_denied_users": ",".join(sorted(command.denied_users)), + "command_at_sender": str(command.at_sender).lower(), + "command_callback_url": command.callback_url, + "command_callback_timeout": str(command.callback_timeout), + } + + +# API 平坦 key → (YAML key, 类型转换) 映射 +_API_KEY_MAP: dict[str, tuple[str, ...]] = { + "command_prefix": ("prefix", str), + "command_length_min": ("length_min", int), + "command_length_max": ("length_max", int), + "command_scope": ("scope", str), + "command_list_mode": ("list_mode", str), + "command_list_enabled": ("list_enabled", "bool"), + "command_at_sender": ("at_sender", "bool"), + "command_callback_url": ("callback_url", str), + "command_callback_timeout": ("callback_timeout", int), + "command_allowed_groups": ("allowed_groups", "csv_list"), + "command_denied_groups": ("denied_groups", "csv_list"), + "command_allowed_users": ("allowed_users", "csv_list"), + "command_denied_users": ("denied_users", "csv_list"), +} + + +def update_settings_from_api(data: dict[str, str]) -> dict[str, str]: + """从管理 API 接收扁平字典,合并到 settings.yaml 并热重载。返回已应用的字段。""" + allowed_keys = set(_API_KEY_MAP.keys()) + filtered = {k: str(v) for k, v in data.items() if k in allowed_keys} + if not filtered: + return {} + + # 读取当前 YAML,合并修改 + yaml_data = load_yaml() + if "command" not in yaml_data: + yaml_data["command"] = {} + cmd = yaml_data["command"] + + for flat_key, value in filtered.items(): + yaml_key, conv = _API_KEY_MAP[flat_key] + if conv == "csv_list": + cmd[yaml_key] = [v.strip() for v in value.split(",") if v.strip()] + elif conv == "bool": + cmd[yaml_key] = value.lower() in ("true", "1", "yes") + elif conv is int: + try: + cmd[yaml_key] = int(value) + except ValueError: + pass + else: + cmd[yaml_key] = value + + save_yaml(yaml_data) + # 写入后立即应用(不等文件 watcher) + _apply_yaml_to_command(yaml_data) + return filtered diff --git a/handlers/admin.py b/handlers/admin.py new file mode 100644 index 0000000..1782de5 --- /dev/null +++ b/handlers/admin.py @@ -0,0 +1,1042 @@ +"""后台管理:提供 Web 管理界面和 REST API,动态修改命令监听配置。""" + +from aiohttp import web + +from ..config import command, get_settings_flat, reload_settings, update_settings_from_api +from ..response import error, ok + + +# ── API ────────────────────────────────────────────────────── + +async def api_get_settings(request: web.Request) -> web.Response: + """GET /api/settings — 返回全部动态配置。""" + return ok(data=get_settings_flat()) + + +async def api_update_settings(request: web.Request) -> web.Response: + """PUT /api/settings — 批量更新配置并立即生效。""" + try: + data = await request.json() + except Exception: + return error("invalid json") + + if not isinstance(data, dict): + return error("request body must be a json object") + + filtered = update_settings_from_api(data) + if not filtered: + return error("no valid settings to update") + + return ok(data=filtered, msg="配置已更新并生效") + + +async def api_reload_settings(request: web.Request) -> web.Response: + """POST /api/settings/reload — 从 settings.yaml 重新加载配置。""" + reload_settings() + return ok(msg="配置已从 settings.yaml 重新加载") + + +# ── 管理页面 ───────────────────────────────────────────────── + +ADMIN_HTML = r""" + + + + +NcatBot Webhook 管理 + + + + + + +
+
+
+ 命令监听配置 + ● 未保存 +
+
+
+ + 运行中 +
+ +
+
+ +
+
+ + +
+
+

基本设置

+
+
+ 修改后自动保存,无需手动操作。 +
+
+
+ + +
+
+ + +
+
+
+
+ + +
+
+ + +
+
+
+
+ + 回复时 @发送者(仅群聊) +
+
+
+
+ +
+

命令匹配测试

+
+
+ +
+
输入消息后自动检测
+
+
+
+ + + + + + + + + + +
+
+
+ +
+ + + +""" + + +async def admin_page_handler(request: web.Request) -> web.Response: + """GET /admin/ — 返回管理页面 HTML。""" + return web.Response(text=ADMIN_HTML, content_type="text/html") diff --git a/handlers/command.py b/handlers/command.py index ada6358..9e47e9d 100644 --- a/handlers/command.py +++ b/handlers/command.py @@ -4,7 +4,7 @@ import re import aiohttp -from ..config import COMMAND_CALLBACK_TIMEOUT, COMMAND_CALLBACK_URL, COMMAND_LENGTH_MAX, COMMAND_LENGTH_MIN, COMMAND_PREFIX, UPLOAD_DIR +from ..config import UPLOAD_DIR, command from ..handlers.message import _resolve_url @@ -18,7 +18,7 @@ def build_command_pattern() -> re.Pattern: - 其他非空白字符 = 1 """ return re.compile( - rf"^{re.escape(COMMAND_PREFIX)}(\S{{{COMMAND_LENGTH_MIN},{COMMAND_LENGTH_MAX}}})(?:\s+(.+))?$", + rf"^{re.escape(command.prefix)}(\S{{{command.length_min},{command.length_max}}})(?:\s+(.+))?$", re.DOTALL, ) @@ -26,6 +26,12 @@ def build_command_pattern() -> re.Pattern: COMMAND_PATTERN = build_command_pattern() +def rebuild_pattern() -> None: + """动态配置变更后重新编译正则。""" + global COMMAND_PATTERN + COMMAND_PATTERN = build_command_pattern() + + def parse_command(raw_message: str) -> dict | None: """解析消息,匹配命令模式。返回 {command, content, raw_message} 或 None。""" match = COMMAND_PATTERN.match(raw_message.strip()) @@ -50,28 +56,28 @@ async def send_command_callback(data: dict, event, api, logger) -> None: {"type": "file", "url": "..."}, {"type": "video", "url": "..."} ], - "at_sender": true // 是否 @发送者(默认 true,仅群聊) + "at_sender": true // 是否 @发送者(默认取配置,仅群聊) } 所有字段均为可选,无回复内容时返回空 JSON 即可。 回复会引用触发命令的原消息。 """ - if not COMMAND_CALLBACK_URL: - logger.warning("COMMAND_CALLBACK_URL 未配置,跳过命令回调") + if not command.callback_url: + logger.warning("callback_url 未配置,跳过命令回调") return try: async with aiohttp.ClientSession() as session: async with session.post( - COMMAND_CALLBACK_URL, + command.callback_url, json=data, - timeout=aiohttp.ClientTimeout(total=COMMAND_CALLBACK_TIMEOUT), + timeout=aiohttp.ClientTimeout(total=command.callback_timeout), ) as resp: if resp.status >= 400: body = await resp.text() logger.error( "命令回调失败: status=%d url=%s body=%s", - resp.status, COMMAND_CALLBACK_URL, body[:200], + resp.status, command.callback_url, body[:200], ) return @@ -87,12 +93,13 @@ async def send_command_callback(data: dict, event, api, logger) -> None: await _handle_reply(result, event.data, api, logger) except Exception as exc: - logger.error("命令回调异常: url=%s error=%s", COMMAND_CALLBACK_URL, exc) + logger.error("命令回调异常: url=%s error=%s", command.callback_url, exc) async def _handle_reply(result: dict, msg_event, api, logger) -> None: """处理回调响应,引用原消息自动回复。msg_event 是 GroupMessageEvent / PrivateMessageEvent。""" - at_sender = result.get("at_sender", True) + # at_sender: 回调响应中的值优先,未指定则使用全局配置 + at_sender = result.get("at_sender", command.at_sender) messages = result.get("messages") reply = result.get("reply") group_id = getattr(msg_event, "group_id", None) diff --git a/middleware.py b/middleware.py index dba4e91..9ddd2ff 100644 --- a/middleware.py +++ b/middleware.py @@ -10,9 +10,9 @@ from .response import error @web.middleware async def auth_middleware(request: web.Request, handler): - """对 /upload 和 /webhook 路径强制校验 API Key。""" - # 健康检查不需要鉴权 - if request.path == "/healthz": + """对需要鉴权的路径校验 API Key。/healthz 和 /admin/ 及 /api/ 开头的路径不需要鉴权。""" + # 不需要鉴权的路径 + if request.path == "/healthz" or request.path.startswith("/admin") or request.path.startswith("/api/"): return await handler(request) auth_header = request.headers.get("Authorization", "") diff --git a/plugin.py b/plugin.py index 85cf13b..878b1a5 100644 --- a/plugin.py +++ b/plugin.py @@ -7,7 +7,17 @@ import os from aiohttp import web from ncatbot.plugin import NcatBotPlugin -from .config import COMMAND_CALLBACK_URL, COMMAND_LENGTH_MAX, COMMAND_LENGTH_MIN, COMMAND_PREFIX, HOST, PORT, UPLOAD_DIR, WEBHOOK_API_KEY +from .config import ( + HOST, + PORT, + UPLOAD_DIR, + WEBHOOK_API_KEY, + SETTINGS_YAML_PATH, + command, + ensure_settings_yaml, + reload_settings, +) +from .handlers.admin import admin_page_handler, api_get_settings, api_reload_settings, api_update_settings from .handlers.command import parse_command, send_command_callback from .handlers.health import health_handler from .handlers.message import webhook_handler @@ -28,31 +38,43 @@ class WebHookPlugin(NcatBotPlugin): self._webhook_runner: web.AppRunner | None = None self._cleanup_task: asyncio.Task | None = None self._listener_task: asyncio.Task | None = None + self._watcher_task: asyncio.Task | None = None async def on_load(self): + # 初始化 settings.yaml 并加载配置 + ensure_settings_yaml() + reload_settings() + self.logger.info("Webhook 插件已加载") - self.logger.info("WEBHOOK_API_KEY: %s", "已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成") - self.logger.info("命令监听: 前缀=%s 长度=%d~%d 回调=%s", COMMAND_PREFIX, COMMAND_LENGTH_MIN, COMMAND_LENGTH_MAX, - COMMAND_CALLBACK_URL or "未配置") + self.logger.info( + "WEBHOOK_API_KEY: %s", + "已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成", + ) + self.logger.info( + "命令监听: 前缀=%s 长度=%d~%d 范围=%s 名单=%s(%s) 回调=%s", + command.prefix, + command.length_min, + command.length_max, + command.scope, + "开" if command.list_enabled else "关", + command.list_mode, + command.callback_url or "未配置", + ) asyncio.create_task(self._start_webhook()) self._cleanup_task = asyncio.create_task(self._cleanup_loop()) self._listener_task = asyncio.create_task(self._message_listener()) + self._watcher_task = asyncio.create_task(self._config_watcher()) async def on_close(self): - if self._listener_task is not None: - self._listener_task.cancel() - try: - await self._listener_task - except asyncio.CancelledError: - pass - self._listener_task = None - if self._cleanup_task is not None: - self._cleanup_task.cancel() - try: - await self._cleanup_task - except asyncio.CancelledError: - pass - self._cleanup_task = None + for task_attr in ("_watcher_task", "_listener_task", "_cleanup_task"): + task = getattr(self, task_attr) + if task is not None: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + setattr(self, task_attr, None) await self._stop_webhook() self.logger.info("Webhook 插件已卸载") @@ -65,6 +87,21 @@ class WebHookPlugin(NcatBotPlugin): except Exception as exc: self.logger.error("清理过期文件失败: %s", exc) + async def _config_watcher(self) -> None: + """轮询 settings.yaml 的 mtime,变更时热重载配置。""" + last_mtime: float = 0.0 + while True: + await asyncio.sleep(2) + try: + if SETTINGS_YAML_PATH.exists(): + current_mtime = SETTINGS_YAML_PATH.stat().st_mtime + if current_mtime != last_mtime: + last_mtime = current_mtime + reload_settings() + self.logger.info("settings.yaml changed, config reloaded") + except Exception as exc: + self.logger.error("Config watcher error: %s", exc) + async def _message_listener(self) -> None: """监听 QQ 消息,匹配命令模式后转发到外部回调。""" try: @@ -77,6 +114,31 @@ class WebHookPlugin(NcatBotPlugin): parsed = parse_command(raw_message) if not parsed: continue + + # 范围过滤:group / private / all + is_group = hasattr(event.data, "group_id") + if command.scope == "group" and not is_group: + continue + if command.scope == "private" and is_group: + continue + + # 黑白名单过滤 + if command.list_enabled: + if command.list_mode == "allow": + # 白名单模式(OR 逻辑):用户在名单 OR 群在名单 → 放行 + # 名单为空时视为不限制 + if command.allowed_users or command.allowed_groups: + user_ok = event.data.user_id in command.allowed_users + group_ok = is_group and event.data.group_id in command.allowed_groups + if not (user_ok or group_ok): + continue + elif command.list_mode == "deny": + # 黑名单模式:用户在名单 OR 群在名单 → 拒绝 + user_blocked = event.data.user_id in command.denied_users + group_blocked = is_group and event.data.group_id in command.denied_groups + if user_blocked or group_blocked: + continue + # 构建回调数据 data = { "command": parsed["command"], @@ -85,11 +147,13 @@ class WebHookPlugin(NcatBotPlugin): "user_id": event.data.user_id, "message_id": event.data.message_id, } - if hasattr(event.data, "group_id"): + if is_group: data["group_id"] = event.data.group_id self.logger.info( "命令监听匹配: command=%s user=%s group=%s", - parsed["command"], data["user_id"], data.get("group_id", "-"), + parsed["command"], + data["user_id"], + data.get("group_id", "-"), ) asyncio.create_task( send_command_callback(data, event, self.api, self.logger) @@ -106,6 +170,11 @@ class WebHookPlugin(NcatBotPlugin): app.router.add_get("/healthz", health_handler) app.router.add_post("/webhook", webhook_handler) app.router.add_post("/upload", upload_handler) + # 后台管理 + app.router.add_get("/admin/", admin_page_handler) + app.router.add_get("/api/settings", api_get_settings) + app.router.add_put("/api/settings", api_update_settings) + app.router.add_post("/api/settings/reload", api_reload_settings) return app async def _start_webhook(self): @@ -117,9 +186,10 @@ class WebHookPlugin(NcatBotPlugin): await site.start() self.logger.info("Webhook 已启动: %s:%d", HOST, PORT) self.logger.info("上传目录: %s", UPLOAD_DIR) + self.logger.info("后台管理: http://%s:%d/admin/", HOST, PORT) async def _stop_webhook(self): if self._webhook_runner is not None: await self._webhook_runner.cleanup() self._webhook_runner = None - self.logger.info("Webhook 已停止") + self.logger.info("Webhook 已停止") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 4e87235..83028e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,4 +8,5 @@ dependencies = [ "ncatbot5>=5.5.2.post3", "aiohttp>=3.9", "python-dotenv>=1.0", + "pyyaml>=6.0", ] diff --git a/uv.lock b/uv.lock index 695243e..be4f8da 100644 --- a/uv.lock +++ b/uv.lock @@ -911,6 +911,7 @@ dependencies = [ { name = "aiohttp" }, { name = "ncatbot5" }, { name = "python-dotenv" }, + { name = "pyyaml" }, ] [package.metadata] @@ -918,6 +919,7 @@ requires-dist = [ { name = "aiohttp", specifier = ">=3.9" }, { name = "ncatbot5", specifier = ">=5.5.2.post3" }, { name = "python-dotenv", specifier = ">=1.0" }, + { name = "pyyaml", specifier = ">=6.0" }, ] [[package]]