7 Commits
v0.1.2 ... main

Author SHA1 Message Date
832ed063a0 🔒 fix(auth): 管理面板添加 API Key 鉴权
- 移除 /api/ 路由的鉴权豁免,所有数据接口必须携带 API Key
- 仅 /healthz 和 /admin/(HTML 页面壳)免鉴权
- 前端新增登录遮罩层,401 时弹出 API Key 输入框
- Key 存储在 sessionStorage,所有 API 请求自动附加 X-API-Key header
- 支持 ?apiKey=xxx URL 参数自动登录(登录后从 URL 移除避免泄露)
2026-05-04 19:01:32 +08:00
29433dda02 Merge pull request '命令监听配置重构:SQLite → YAML + 黑白名单过滤优化' (#2) from feat/command-scope into main
Reviewed-on: #2
2026-05-04 00:09:56 +08:00
783049257e ♻️ refactor(command): 黑白名单过滤从 AND 改为 OR 逻辑
- 白名单模式:用户在名单 OR 群在名单 → 放行
- 黑名单模式:用户在名单 OR 群在名单 → 拒绝
- 名单为空时表示不限制
- 更新前端提示说明 OR 逻辑的语义
2026-05-04 00:05:18 +08:00
58e53c8aec feat(command): 黑白名单开关、自动保存与管理界面重构
- 新增 list_enabled 开关控制是否启用名单过滤
- 表单变更后 800ms 自动保存,去掉手动保存按钮
- Header 显示"未保存"指示器,保存中 toast 提示
- 内容区限制最大宽度 900px,优化宽屏显示
- 侧边栏增加圆角选中态,运行状态带脉冲动画
- 白名单模式灰掉黑名单输入,关闭名单时显示遮罩
- 命令测试结果增加成功/失败颜色反馈
- 回调格式改用等宽字体代码块
2026-05-03 21:56:48 +08:00
f82363f45f ♻️ refactor(command): 配置系统从 SQLite 迁移至 YAML 并修复白名单失效
- 用 CommandConfig dataclass 单例替代模块级变量,解决 from import 造成的本地绑定不随 global 更新的 bug
- 删除 db.py,改用 settings.yaml 存储动态配置,首次启动自动创建并合并 .env 默认值
- 新增文件轮询 watcher(2 秒),检测 YAML 变更自动热重载
- 管理界面 API 改为直接读写 YAML,即时生效
- 依赖 aiosqlite 替换为 pyyaml
2026-05-03 18:23:29 +08:00
9ffe78a9c2 feat(command): 添加动态配置、黑白名单与后台管理界面
- 新增 SQLite 数据库层(db.py)持久化命令监听配置,支持热更新无需重启
- 命令过滤从白名单扩展为黑白名单双模式(COMMAND_LIST_MODE: allow/deny)
- 新增后台管理页面 /admin/,侧边栏布局,支持在线修改所有命令监听配置
- 新增 REST API:GET/PUT /api/settings、POST /api/settings/reload
- 新增 rebuild_pattern() 支持配置变更后正则动态重编译
- 中间件放行 /admin 和 /api 路径免鉴权
- 添加 aiosqlite 依赖
2026-05-03 15:22:53 +08:00
ed6e27f162 feat(command): 添加监听范围过滤和回复 @控制
- 新增 COMMAND_SCOPE 配置,支持 all/group/private 过滤消息来源
- 新增 COMMAND_ALLOWED_GROUPS 群号白名单,逗号分隔,留空不限制
- 新增 COMMAND_ALLOWED_USERS QQ 号白名单,逗号分隔,留空不限制
- 新增 COMMAND_AT_SENDER 配置,控制回复时是否 @发送者(默认 true)
- 回调响应中 at_sender 字段可覆盖全局配置
- 更新 .env.example 和 README.md 文档
2026-05-03 12:26:44 +08:00
10 changed files with 1526 additions and 57 deletions

View File

@@ -19,7 +19,19 @@ QQ_API_MAX_RETRIES=2
# ── 命令监听 ── # ── 命令监听 ──
# 命令前缀,默认 # # 命令前缀,默认 #
COMMAND_PREFIX=# COMMAND_PREFIX=#
# 命令名长度(中文字数),默认 4 # 命令名最小字符数,默认 2
COMMAND_LENGTH=4 COMMAND_LENGTH_MIN=2
# 命令名最大字符数,默认 4
COMMAND_LENGTH_MAX=4
# 监听范围all群+私、group仅群、private仅私默认 all
COMMAND_SCOPE=all
# 允许的群号逗号分隔留空不限制123456,789012
COMMAND_ALLOWED_GROUPS=
# 允许的 QQ 号逗号分隔留空不限制111111,222222
COMMAND_ALLOWED_USERS=
# 回复时是否 @发送者,默认 true
COMMAND_AT_SENDER=true
# 回调超时秒数,默认 180生图等耗时命令需要较长超时
COMMAND_CALLBACK_TIMEOUT=180
# 匹配到命令后的回调 URL留空则不监听 # 匹配到命令后的回调 URL留空则不监听
COMMAND_CALLBACK_URL= COMMAND_CALLBACK_URL=

9
.gitignore vendored
View File

@@ -7,6 +7,9 @@ wheels/
*.egg-info *.egg-info
.claude .claude
CLAUDE.md
codestable
# Virtual environments # Virtual environments
.venv .venv
@@ -15,3 +18,9 @@ uploads/
# Environment # Environment
.env .env
# Dynamic config
settings.yaml
# Legacy data
data/

View File

@@ -42,7 +42,13 @@ uv run python -m ncatbot
| `QQ_API_TIMEOUT` | 否 | `10` | QQ API 超时秒数 | | `QQ_API_TIMEOUT` | 否 | `10` | QQ API 超时秒数 |
| `QQ_API_MAX_RETRIES` | 否 | `2` | QQ API 失败重试次数 | | `QQ_API_MAX_RETRIES` | 否 | `2` | QQ API 失败重试次数 |
| `COMMAND_PREFIX` | 否 | `#` | 命令前缀 | | `COMMAND_PREFIX` | 否 | `#` | 命令前缀 |
| `COMMAND_LENGTH` | 否 | `4` | 命令名字符数(中文字数) | | `COMMAND_LENGTH_MIN` | 否 | `2` | 命令名最小字符数 |
| `COMMAND_LENGTH_MAX` | 否 | `4` | 命令名最大字符数 |
| `COMMAND_SCOPE` | 否 | `all` | 监听范围:`all`(群+私)、`group`(仅群)、`private`(仅私) |
| `COMMAND_ALLOWED_GROUPS` | 否 | 空(不限) | 允许的群号,逗号分隔,如 `123456,789012` |
| `COMMAND_ALLOWED_USERS` | 否 | 空(不限) | 允许的 QQ 号,逗号分隔,如 `111111,222222` |
| `COMMAND_AT_SENDER` | 否 | `true` | 回复时是否 @发送者 |
| `COMMAND_CALLBACK_TIMEOUT` | 否 | `180` | 回调超时秒数 |
| `COMMAND_CALLBACK_URL` | 否 | 空(不监听) | 命令匹配后的回调 URL | | `COMMAND_CALLBACK_URL` | 否 | 空(不监听) | 命令匹配后的回调 URL |
## 接口说明 ## 接口说明
@@ -187,22 +193,34 @@ curl -X POST http://localhost:8081/webhook \
### 命令监听 ### 命令监听
插件会自动监听 QQ 消息,当消息以 `#四个中文字+空格` 开头时,将命令内容 POST 到 `COMMAND_CALLBACK_URL` 插件会自动监听 QQ 消息,当消息以 `#命令名` 开头时,将命令内容 POST 到 `COMMAND_CALLBACK_URL`
**匹配规则:** **匹配规则:**
``` ```
#测试命令 你好世界 #测试命令 你好世界
│ │ │ │ │ │ │ │
│ │ │ └── 命令内容content │ │ │ └── 命令内容content,可选
│ └── 空格分隔 │ └── 空格分隔(可选)
└── 命令名(4个中文字 └── 命令名(2~4个字符
└── 前缀(默认 # └── 前缀(默认 #
``` ```
- 前缀、命令名长度可通过 `COMMAND_PREFIX``COMMAND_LENGTH` 配置 - 命令名支持中文、数字、字母等任意非空白字符,每个字符计 1
- 前缀通过 `COMMAND_PREFIX` 配置,长度范围通过 `COMMAND_LENGTH_MIN` / `COMMAND_LENGTH_MAX` 配置
- `#测试命令``#1a``#abc` 均可触发
- 不配置 `COMMAND_CALLBACK_URL` 则不监听 - 不配置 `COMMAND_CALLBACK_URL` 则不监听
**监听范围过滤:**
- `COMMAND_SCOPE`:控制监听范围
- `all`(默认):群聊 + 私聊都监听
- `group`:仅监听群聊
- `private`:仅监听私聊
- `COMMAND_ALLOWED_GROUPS`:群号白名单,逗号分隔,留空不限制
- `COMMAND_ALLOWED_USERS`QQ 号白名单,逗号分隔,留空不限制
- 三个条件同时生效,必须全部满足才触发回调
**回调请求体POST JSON** **回调请求体POST JSON**
```json ```json
@@ -218,7 +236,7 @@ curl -X POST http://localhost:8081/webhook \
| 字段 | 说明 | | 字段 | 说明 |
|---|---| |---|---|
| `command` | 命令名(4个中文字 | | `command` | 命令名(2~4个字符 |
| `content` | 命令后的内容 | | `content` | 命令后的内容 |
| `raw_message` | 原始消息文本 | | `raw_message` | 原始消息文本 |
| `user_id` | 发送者 QQ 号 | | `user_id` | 发送者 QQ 号 |
@@ -249,6 +267,7 @@ curl -X POST http://localhost:8081/webhook \
|---|---| |---|---|
| `reply` | 纯文本回复(与 `messages` 二选一,`messages` 优先) | | `reply` | 纯文本回复(与 `messages` 二选一,`messages` 优先) |
| `messages` | 批量回复数组,格式同 `/webhook``messages` 字段 | | `messages` | 批量回复数组,格式同 `/webhook``messages` 字段 |
| `at_sender` | 是否 @发送者(默认取 `COMMAND_AT_SENDER` 配置,仅群聊生效) |
| `group_id` | 可选,覆盖回复目标群号(默认回复到原群) | | `group_id` | 可选,覆盖回复目标群号(默认回复到原群) |
| `user_id` | 可选,覆盖回复目标 QQ 号(默认回复到原发送者) | | `user_id` | 可选,覆盖回复目标 QQ 号(默认回复到原发送者) |

272
config.py
View File

@@ -1,40 +1,284 @@
"""项目配置:所有值从环境变量读取,未配置时使用安全默认值""" """项目配置:静态配置从环境变量读取,命令监听配置通过 settings.yaml 动态管理"""
import logging
import os import os
import uuid import uuid
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import yaml
from dotenv import load_dotenv from dotenv import load_dotenv
# 加载 .env 文件(优先从插件目录查找) # 加载 .env 文件(优先从插件目录查找)
load_dotenv(Path(__file__).parent / ".env") load_dotenv(Path(__file__).parent / ".env")
# ── 鉴权 ──────────────────────────────────────────────────── logger = logging.getLogger("webhook-plugin.config")
WEBHOOK_API_KEY: str = os.environ.get("WEBHOOK_API_KEY", "") or uuid.uuid4().hex
# ── 网络 ───────────────────────────────────────────────────── # ── 静态配置(环境变量,运行时不变)────────────────────────────
WEBHOOK_API_KEY: str = os.environ.get("WEBHOOK_API_KEY", "") or uuid.uuid4().hex
HOST: str = os.environ.get("WEBHOOK_HOST", "0.0.0.0") HOST: str = os.environ.get("WEBHOOK_HOST", "0.0.0.0")
try: try:
PORT: int = int(os.environ.get("WEBHOOK_PORT", "8081")) PORT: int = int(os.environ.get("WEBHOOK_PORT", "8081"))
except ValueError: except ValueError:
PORT = 8081 PORT = 8081
# ── 上传 ─────────────────────────────────────────────────────
UPLOAD_DIR: Path = Path(os.environ.get("UPLOAD_DIR", str(Path(__file__).parent / "uploads"))) UPLOAD_DIR: Path = Path(os.environ.get("UPLOAD_DIR", str(Path(__file__).parent / "uploads")))
# 单个文件最大 20 MB
MAX_UPLOAD_SIZE: int = int(os.environ.get("MAX_UPLOAD_SIZE", str(20 * 1024 * 1024))) MAX_UPLOAD_SIZE: int = int(os.environ.get("MAX_UPLOAD_SIZE", str(20 * 1024 * 1024)))
# 允许的文件扩展名(小写,不含点),为空则不限制
ALLOWED_EXTENSIONS: set[str] = set( ALLOWED_EXTENSIONS: set[str] = set(
filter(None, os.environ.get("ALLOWED_EXTENSIONS", "").lower().split(",")) filter(None, os.environ.get("ALLOWED_EXTENSIONS", "").lower().split(","))
) )
# ── QQ API ───────────────────────────────────────────────────
QQ_API_TIMEOUT: float = float(os.environ.get("QQ_API_TIMEOUT", "10")) QQ_API_TIMEOUT: float = float(os.environ.get("QQ_API_TIMEOUT", "10"))
QQ_API_MAX_RETRIES: int = int(os.environ.get("QQ_API_MAX_RETRIES", "2")) QQ_API_MAX_RETRIES: int = int(os.environ.get("QQ_API_MAX_RETRIES", "2"))
# ── 命令监听 ──────────────────────────────────────────────── # ── YAML 文件路径 ────────────────────────────────────────────
COMMAND_PREFIX: str = os.environ.get("COMMAND_PREFIX", "#") SETTINGS_YAML_PATH: Path = Path(__file__).parent / "settings.yaml"
COMMAND_LENGTH_MIN: int = int(os.environ.get("COMMAND_LENGTH_MIN", "2"))
COMMAND_LENGTH_MAX: int = int(os.environ.get("COMMAND_LENGTH_MAX", "4")) # ── 动态配置 dataclass ──────────────────────────────────────
COMMAND_CALLBACK_URL: str = os.environ.get("COMMAND_CALLBACK_URL", "") @dataclass
COMMAND_CALLBACK_TIMEOUT: int = int(os.environ.get("COMMAND_CALLBACK_TIMEOUT", "180")) class CommandConfig:
"""命令监听动态配置,通过 settings.yaml 热重载。"""
prefix: str = "#"
length_min: int = 2
length_max: int = 4
scope: str = "all" # all / group / private
list_enabled: bool = False # 是否启用黑白名单过滤
list_mode: str = "allow" # allow / deny
allowed_groups: frozenset[str] = frozenset()
denied_groups: frozenset[str] = frozenset()
allowed_users: frozenset[str] = frozenset()
denied_users: frozenset[str] = frozenset()
at_sender: bool = True
callback_url: str = ""
callback_timeout: int = 180
# 全局单例 —— 所有消费者通过 config.command.xxx 访问,避免 stale import 问题
command = CommandConfig()
# ── 环境变量默认值YAML 缺失 key 时的回退)────────────────────
_ENV_DEFAULTS: dict = {
"prefix": os.environ.get("COMMAND_PREFIX", "#"),
"length_min": int(os.environ.get("COMMAND_LENGTH_MIN", "2")),
"length_max": int(os.environ.get("COMMAND_LENGTH_MAX", "4")),
"scope": os.environ.get("COMMAND_SCOPE", "all"),
"list_enabled": os.environ.get("COMMAND_LIST_ENABLED", "false").lower() in ("true", "1", "yes"),
"list_mode": os.environ.get("COMMAND_LIST_MODE", "allow"),
"allowed_groups": os.environ.get("COMMAND_ALLOWED_GROUPS", ""),
"denied_groups": os.environ.get("COMMAND_DENIED_GROUPS", ""),
"allowed_users": os.environ.get("COMMAND_ALLOWED_USERS", ""),
"denied_users": os.environ.get("COMMAND_DENIED_USERS", ""),
"at_sender": os.environ.get("COMMAND_AT_SENDER", "true").lower() in ("true", "1", "yes"),
"callback_url": os.environ.get("COMMAND_CALLBACK_URL", ""),
"callback_timeout": int(os.environ.get("COMMAND_CALLBACK_TIMEOUT", "180")),
}
# ── YAML I/O ─────────────────────────────────────────────────
_YAML_HEADER = (
"# settings.yaml - 命令监听动态配置\n"
"# 可通过后台管理界面修改,也可手动编辑(自动热重载)\n\n"
)
def _yaml_defaults() -> dict:
"""返回默认 YAML 结构(首次启动时写入)。"""
return {
"command": {
"prefix": "#",
"length_min": 2,
"length_max": 4,
"scope": "all",
"list_enabled": False,
"list_mode": "allow",
"allowed_groups": [],
"denied_groups": [],
"allowed_users": [],
"denied_users": [],
"at_sender": True,
"callback_url": "",
"callback_timeout": 180,
}
}
def load_yaml() -> dict:
"""读取 settings.yaml文件不存在时返回空字典。"""
if not SETTINGS_YAML_PATH.exists():
return {}
with open(SETTINGS_YAML_PATH, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
return data if isinstance(data, dict) else {}
def save_yaml(data: dict) -> None:
"""将配置写入 settings.yaml。"""
with open(SETTINGS_YAML_PATH, "w", encoding="utf-8") as f:
f.write(_YAML_HEADER)
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
def ensure_settings_yaml() -> None:
"""首次启动时创建 settings.yaml合并环境变量覆盖。"""
if SETTINGS_YAML_PATH.exists():
return
data = _yaml_defaults()
# 用环境变量覆盖默认值
env_map = {
"prefix": ("COMMAND_PREFIX", str),
"length_min": ("COMMAND_LENGTH_MIN", int),
"length_max": ("COMMAND_LENGTH_MAX", int),
"scope": ("COMMAND_SCOPE", str),
"list_mode": ("COMMAND_LIST_MODE", str),
"list_enabled": ("COMMAND_LIST_ENABLED", "bool"),
"allowed_groups": ("COMMAND_ALLOWED_GROUPS", "csv_list"),
"denied_groups": ("COMMAND_DENIED_GROUPS", "csv_list"),
"allowed_users": ("COMMAND_ALLOWED_USERS", "csv_list"),
"denied_users": ("COMMAND_DENIED_USERS", "csv_list"),
"at_sender": ("COMMAND_AT_SENDER", "bool"),
"callback_url": ("COMMAND_CALLBACK_URL", str),
"callback_timeout": ("COMMAND_CALLBACK_TIMEOUT", int),
}
for key, (env_name, conv) in env_map.items():
env_val = os.environ.get(env_name)
if env_val is not None:
if conv == "csv_list":
data["command"][key] = [v.strip() for v in env_val.split(",") if v.strip()]
elif conv == "bool":
data["command"][key] = env_val.lower() in ("true", "1", "yes")
else:
data["command"][key] = conv(env_val)
save_yaml(data)
logger.info("Created settings.yaml with defaults (env-var overrides applied)")
# ── 将 YAML 数据应用到全局 command 对象 ───────────────────────
def _apply_yaml_to_command(data: dict) -> None:
"""将 YAML command 段合并到全局 command 对象,缺失 key 回退到环境变量默认值。"""
cmd_data = data.get("command", {})
command.prefix = str(cmd_data.get("prefix", _ENV_DEFAULTS["prefix"]))
command.length_min = int(cmd_data.get("length_min", _ENV_DEFAULTS["length_min"]))
command.length_max = int(cmd_data.get("length_max", _ENV_DEFAULTS["length_max"]))
command.scope = str(cmd_data.get("scope", _ENV_DEFAULTS["scope"]))
command.list_mode = str(cmd_data.get("list_mode", _ENV_DEFAULTS["list_mode"]))
# list_enabled: YAML bool → Python bool
list_enabled_val = cmd_data.get("list_enabled", _ENV_DEFAULTS["list_enabled"])
if isinstance(list_enabled_val, str):
command.list_enabled = list_enabled_val.lower() in ("true", "1", "yes")
else:
command.list_enabled = bool(list_enabled_val)
command.callback_url = str(cmd_data.get("callback_url", _ENV_DEFAULTS["callback_url"]))
command.callback_timeout = int(cmd_data.get("callback_timeout", _ENV_DEFAULTS["callback_timeout"]))
# at_sender: YAML bool → Python bool
at_sender_val = cmd_data.get("at_sender", _ENV_DEFAULTS["at_sender"])
if isinstance(at_sender_val, str):
command.at_sender = at_sender_val.lower() in ("true", "1", "yes")
else:
command.at_sender = bool(at_sender_val)
# 列表字段: YAML list → frozenset
for field_name in ("allowed_groups", "denied_groups", "allowed_users", "denied_users"):
raw = cmd_data.get(field_name)
if raw is None:
# 回退到环境变量默认值
env_default = _ENV_DEFAULTS[field_name]
if isinstance(env_default, str):
setattr(command, field_name, frozenset(
v.strip() for v in env_default.split(",") if v.strip()
))
else:
setattr(command, field_name, frozenset(env_default) if env_default else frozenset())
elif isinstance(raw, str):
setattr(command, field_name, frozenset(
v.strip() for v in raw.split(",") if v.strip()
))
elif isinstance(raw, list):
setattr(command, field_name, frozenset(str(v).strip() for v in raw if str(v).strip()))
else:
setattr(command, field_name, frozenset())
# 重建命令匹配正则
from .handlers.command import rebuild_pattern
rebuild_pattern()
# ── 公共 API ─────────────────────────────────────────────────
def reload_settings() -> None:
"""重新读取 settings.yaml 并应用到全局 command 对象。"""
data = load_yaml()
_apply_yaml_to_command(data)
logger.info("Config reloaded from settings.yaml")
def get_settings_flat() -> dict[str, str]:
"""返回所有动态配置的扁平字典 {key: str_value},供管理 API 使用。"""
return {
"command_prefix": command.prefix,
"command_length_min": str(command.length_min),
"command_length_max": str(command.length_max),
"command_scope": command.scope,
"command_list_mode": command.list_mode,
"command_list_enabled": str(command.list_enabled).lower(),
"command_allowed_groups": ",".join(sorted(command.allowed_groups)),
"command_denied_groups": ",".join(sorted(command.denied_groups)),
"command_allowed_users": ",".join(sorted(command.allowed_users)),
"command_denied_users": ",".join(sorted(command.denied_users)),
"command_at_sender": str(command.at_sender).lower(),
"command_callback_url": command.callback_url,
"command_callback_timeout": str(command.callback_timeout),
}
# API 平坦 key → (YAML key, 类型转换) 映射
_API_KEY_MAP: dict[str, tuple[str, ...]] = {
"command_prefix": ("prefix", str),
"command_length_min": ("length_min", int),
"command_length_max": ("length_max", int),
"command_scope": ("scope", str),
"command_list_mode": ("list_mode", str),
"command_list_enabled": ("list_enabled", "bool"),
"command_at_sender": ("at_sender", "bool"),
"command_callback_url": ("callback_url", str),
"command_callback_timeout": ("callback_timeout", int),
"command_allowed_groups": ("allowed_groups", "csv_list"),
"command_denied_groups": ("denied_groups", "csv_list"),
"command_allowed_users": ("allowed_users", "csv_list"),
"command_denied_users": ("denied_users", "csv_list"),
}
def update_settings_from_api(data: dict[str, str]) -> dict[str, str]:
"""从管理 API 接收扁平字典,合并到 settings.yaml 并热重载。返回已应用的字段。"""
allowed_keys = set(_API_KEY_MAP.keys())
filtered = {k: str(v) for k, v in data.items() if k in allowed_keys}
if not filtered:
return {}
# 读取当前 YAML合并修改
yaml_data = load_yaml()
if "command" not in yaml_data:
yaml_data["command"] = {}
cmd = yaml_data["command"]
for flat_key, value in filtered.items():
yaml_key, conv = _API_KEY_MAP[flat_key]
if conv == "csv_list":
cmd[yaml_key] = [v.strip() for v in value.split(",") if v.strip()]
elif conv == "bool":
cmd[yaml_key] = value.lower() in ("true", "1", "yes")
elif conv is int:
try:
cmd[yaml_key] = int(value)
except ValueError:
pass
else:
cmd[yaml_key] = value
save_yaml(yaml_data)
# 写入后立即应用(不等文件 watcher
_apply_yaml_to_command(yaml_data)
return filtered

1106
handlers/admin.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@ import re
import aiohttp import aiohttp
from ..config import COMMAND_CALLBACK_TIMEOUT, COMMAND_CALLBACK_URL, COMMAND_LENGTH_MAX, COMMAND_LENGTH_MIN, COMMAND_PREFIX, UPLOAD_DIR from ..config import UPLOAD_DIR, command
from ..handlers.message import _resolve_url from ..handlers.message import _resolve_url
@@ -18,7 +18,7 @@ def build_command_pattern() -> re.Pattern:
- 其他非空白字符 = 1 - 其他非空白字符 = 1
""" """
return re.compile( return re.compile(
rf"^{re.escape(COMMAND_PREFIX)}(\S{{{COMMAND_LENGTH_MIN},{COMMAND_LENGTH_MAX}}})(?:\s+(.+))?$", rf"^{re.escape(command.prefix)}(\S{{{command.length_min},{command.length_max}}})(?:\s+(.+))?$",
re.DOTALL, re.DOTALL,
) )
@@ -26,6 +26,12 @@ def build_command_pattern() -> re.Pattern:
COMMAND_PATTERN = build_command_pattern() COMMAND_PATTERN = build_command_pattern()
def rebuild_pattern() -> None:
"""动态配置变更后重新编译正则。"""
global COMMAND_PATTERN
COMMAND_PATTERN = build_command_pattern()
def parse_command(raw_message: str) -> dict | None: def parse_command(raw_message: str) -> dict | None:
"""解析消息,匹配命令模式。返回 {command, content, raw_message} 或 None。""" """解析消息,匹配命令模式。返回 {command, content, raw_message} 或 None。"""
match = COMMAND_PATTERN.match(raw_message.strip()) match = COMMAND_PATTERN.match(raw_message.strip())
@@ -50,28 +56,28 @@ async def send_command_callback(data: dict, event, api, logger) -> None:
{"type": "file", "url": "..."}, {"type": "file", "url": "..."},
{"type": "video", "url": "..."} {"type": "video", "url": "..."}
], ],
"at_sender": true // 是否 @发送者(默认 true,仅群聊) "at_sender": true // 是否 @发送者(默认取配置,仅群聊)
} }
所有字段均为可选,无回复内容时返回空 JSON 即可。 所有字段均为可选,无回复内容时返回空 JSON 即可。
回复会引用触发命令的原消息。 回复会引用触发命令的原消息。
""" """
if not COMMAND_CALLBACK_URL: if not command.callback_url:
logger.warning("COMMAND_CALLBACK_URL 未配置,跳过命令回调") logger.warning("callback_url 未配置,跳过命令回调")
return return
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.post( async with session.post(
COMMAND_CALLBACK_URL, command.callback_url,
json=data, json=data,
timeout=aiohttp.ClientTimeout(total=COMMAND_CALLBACK_TIMEOUT), timeout=aiohttp.ClientTimeout(total=command.callback_timeout),
) as resp: ) as resp:
if resp.status >= 400: if resp.status >= 400:
body = await resp.text() body = await resp.text()
logger.error( logger.error(
"命令回调失败: status=%d url=%s body=%s", "命令回调失败: status=%d url=%s body=%s",
resp.status, COMMAND_CALLBACK_URL, body[:200], resp.status, command.callback_url, body[:200],
) )
return return
@@ -87,12 +93,13 @@ async def send_command_callback(data: dict, event, api, logger) -> None:
await _handle_reply(result, event.data, api, logger) await _handle_reply(result, event.data, api, logger)
except Exception as exc: except Exception as exc:
logger.error("命令回调异常: url=%s error=%s", COMMAND_CALLBACK_URL, exc) logger.error("命令回调异常: url=%s error=%s", command.callback_url, exc)
async def _handle_reply(result: dict, msg_event, api, logger) -> None: async def _handle_reply(result: dict, msg_event, api, logger) -> None:
"""处理回调响应引用原消息自动回复。msg_event 是 GroupMessageEvent / PrivateMessageEvent。""" """处理回调响应引用原消息自动回复。msg_event 是 GroupMessageEvent / PrivateMessageEvent。"""
at_sender = result.get("at_sender", True) # at_sender: 回调响应中的值优先,未指定则使用全局配置
at_sender = result.get("at_sender", command.at_sender)
messages = result.get("messages") messages = result.get("messages")
reply = result.get("reply") reply = result.get("reply")
group_id = getattr(msg_event, "group_id", None) group_id = getattr(msg_event, "group_id", None)

View File

@@ -10,9 +10,8 @@ from .response import error
@web.middleware @web.middleware
async def auth_middleware(request: web.Request, handler): async def auth_middleware(request: web.Request, handler):
""" /upload 和 /webhook 路径强制校验 API Key""" """需要鉴权的路径校验 API Key。仅 /healthz 和管理页面 HTML 无需鉴权"""
# 健康检查不需要鉴权 if request.path == "/healthz" or request.path == "/admin/":
if request.path == "/healthz":
return await handler(request) return await handler(request)
auth_header = request.headers.get("Authorization", "") auth_header = request.headers.get("Authorization", "")

112
plugin.py
View File

@@ -7,7 +7,17 @@ import os
from aiohttp import web from aiohttp import web
from ncatbot.plugin import NcatBotPlugin from ncatbot.plugin import NcatBotPlugin
from .config import COMMAND_CALLBACK_URL, COMMAND_LENGTH_MAX, COMMAND_LENGTH_MIN, COMMAND_PREFIX, HOST, PORT, UPLOAD_DIR, WEBHOOK_API_KEY from .config import (
HOST,
PORT,
UPLOAD_DIR,
WEBHOOK_API_KEY,
SETTINGS_YAML_PATH,
command,
ensure_settings_yaml,
reload_settings,
)
from .handlers.admin import admin_page_handler, api_get_settings, api_reload_settings, api_update_settings
from .handlers.command import parse_command, send_command_callback from .handlers.command import parse_command, send_command_callback
from .handlers.health import health_handler from .handlers.health import health_handler
from .handlers.message import webhook_handler from .handlers.message import webhook_handler
@@ -28,31 +38,43 @@ class WebHookPlugin(NcatBotPlugin):
self._webhook_runner: web.AppRunner | None = None self._webhook_runner: web.AppRunner | None = None
self._cleanup_task: asyncio.Task | None = None self._cleanup_task: asyncio.Task | None = None
self._listener_task: asyncio.Task | None = None self._listener_task: asyncio.Task | None = None
self._watcher_task: asyncio.Task | None = None
async def on_load(self): async def on_load(self):
# 初始化 settings.yaml 并加载配置
ensure_settings_yaml()
reload_settings()
self.logger.info("Webhook 插件已加载") self.logger.info("Webhook 插件已加载")
self.logger.info("WEBHOOK_API_KEY: %s", "已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成") self.logger.info(
self.logger.info("命令监听: 前缀=%s 长度=%d~%d 回调=%s", COMMAND_PREFIX, COMMAND_LENGTH_MIN, COMMAND_LENGTH_MAX, "WEBHOOK_API_KEY: %s",
COMMAND_CALLBACK_URL or "未配置") "已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成",
)
self.logger.info(
"命令监听: 前缀=%s 长度=%d~%d 范围=%s 名单=%s(%s) 回调=%s",
command.prefix,
command.length_min,
command.length_max,
command.scope,
"" if command.list_enabled else "",
command.list_mode,
command.callback_url or "未配置",
)
asyncio.create_task(self._start_webhook()) asyncio.create_task(self._start_webhook())
self._cleanup_task = asyncio.create_task(self._cleanup_loop()) self._cleanup_task = asyncio.create_task(self._cleanup_loop())
self._listener_task = asyncio.create_task(self._message_listener()) self._listener_task = asyncio.create_task(self._message_listener())
self._watcher_task = asyncio.create_task(self._config_watcher())
async def on_close(self): async def on_close(self):
if self._listener_task is not None: for task_attr in ("_watcher_task", "_listener_task", "_cleanup_task"):
self._listener_task.cancel() task = getattr(self, task_attr)
try: if task is not None:
await self._listener_task task.cancel()
except asyncio.CancelledError: try:
pass await task
self._listener_task = None except asyncio.CancelledError:
if self._cleanup_task is not None: pass
self._cleanup_task.cancel() setattr(self, task_attr, None)
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
self._cleanup_task = None
await self._stop_webhook() await self._stop_webhook()
self.logger.info("Webhook 插件已卸载") self.logger.info("Webhook 插件已卸载")
@@ -65,6 +87,21 @@ class WebHookPlugin(NcatBotPlugin):
except Exception as exc: except Exception as exc:
self.logger.error("清理过期文件失败: %s", exc) self.logger.error("清理过期文件失败: %s", exc)
async def _config_watcher(self) -> None:
"""轮询 settings.yaml 的 mtime变更时热重载配置。"""
last_mtime: float = 0.0
while True:
await asyncio.sleep(2)
try:
if SETTINGS_YAML_PATH.exists():
current_mtime = SETTINGS_YAML_PATH.stat().st_mtime
if current_mtime != last_mtime:
last_mtime = current_mtime
reload_settings()
self.logger.info("settings.yaml changed, config reloaded")
except Exception as exc:
self.logger.error("Config watcher error: %s", exc)
async def _message_listener(self) -> None: async def _message_listener(self) -> None:
"""监听 QQ 消息,匹配命令模式后转发到外部回调。""" """监听 QQ 消息,匹配命令模式后转发到外部回调。"""
try: try:
@@ -77,6 +114,31 @@ class WebHookPlugin(NcatBotPlugin):
parsed = parse_command(raw_message) parsed = parse_command(raw_message)
if not parsed: if not parsed:
continue continue
# 范围过滤group / private / all
is_group = hasattr(event.data, "group_id")
if command.scope == "group" and not is_group:
continue
if command.scope == "private" and is_group:
continue
# 黑白名单过滤
if command.list_enabled:
if command.list_mode == "allow":
# 白名单模式OR 逻辑):用户在名单 OR 群在名单 → 放行
# 名单为空时视为不限制
if command.allowed_users or command.allowed_groups:
user_ok = event.data.user_id in command.allowed_users
group_ok = is_group and event.data.group_id in command.allowed_groups
if not (user_ok or group_ok):
continue
elif command.list_mode == "deny":
# 黑名单模式:用户在名单 OR 群在名单 → 拒绝
user_blocked = event.data.user_id in command.denied_users
group_blocked = is_group and event.data.group_id in command.denied_groups
if user_blocked or group_blocked:
continue
# 构建回调数据 # 构建回调数据
data = { data = {
"command": parsed["command"], "command": parsed["command"],
@@ -85,11 +147,13 @@ class WebHookPlugin(NcatBotPlugin):
"user_id": event.data.user_id, "user_id": event.data.user_id,
"message_id": event.data.message_id, "message_id": event.data.message_id,
} }
if hasattr(event.data, "group_id"): if is_group:
data["group_id"] = event.data.group_id data["group_id"] = event.data.group_id
self.logger.info( self.logger.info(
"命令监听匹配: command=%s user=%s group=%s", "命令监听匹配: command=%s user=%s group=%s",
parsed["command"], data["user_id"], data.get("group_id", "-"), parsed["command"],
data["user_id"],
data.get("group_id", "-"),
) )
asyncio.create_task( asyncio.create_task(
send_command_callback(data, event, self.api, self.logger) send_command_callback(data, event, self.api, self.logger)
@@ -106,6 +170,11 @@ class WebHookPlugin(NcatBotPlugin):
app.router.add_get("/healthz", health_handler) app.router.add_get("/healthz", health_handler)
app.router.add_post("/webhook", webhook_handler) app.router.add_post("/webhook", webhook_handler)
app.router.add_post("/upload", upload_handler) app.router.add_post("/upload", upload_handler)
# 后台管理
app.router.add_get("/admin/", admin_page_handler)
app.router.add_get("/api/settings", api_get_settings)
app.router.add_put("/api/settings", api_update_settings)
app.router.add_post("/api/settings/reload", api_reload_settings)
return app return app
async def _start_webhook(self): async def _start_webhook(self):
@@ -117,9 +186,10 @@ class WebHookPlugin(NcatBotPlugin):
await site.start() await site.start()
self.logger.info("Webhook 已启动: %s:%d", HOST, PORT) self.logger.info("Webhook 已启动: %s:%d", HOST, PORT)
self.logger.info("上传目录: %s", UPLOAD_DIR) self.logger.info("上传目录: %s", UPLOAD_DIR)
self.logger.info("后台管理: http://%s:%d/admin/", HOST, PORT)
async def _stop_webhook(self): async def _stop_webhook(self):
if self._webhook_runner is not None: if self._webhook_runner is not None:
await self._webhook_runner.cleanup() await self._webhook_runner.cleanup()
self._webhook_runner = None self._webhook_runner = None
self.logger.info("Webhook 已停止") self.logger.info("Webhook 已停止")

View File

@@ -8,4 +8,5 @@ dependencies = [
"ncatbot5>=5.5.2.post3", "ncatbot5>=5.5.2.post3",
"aiohttp>=3.9", "aiohttp>=3.9",
"python-dotenv>=1.0", "python-dotenv>=1.0",
"pyyaml>=6.0",
] ]

2
uv.lock generated
View File

@@ -911,6 +911,7 @@ dependencies = [
{ name = "aiohttp" }, { name = "aiohttp" },
{ name = "ncatbot5" }, { name = "ncatbot5" },
{ name = "python-dotenv" }, { name = "python-dotenv" },
{ name = "pyyaml" },
] ]
[package.metadata] [package.metadata]
@@ -918,6 +919,7 @@ requires-dist = [
{ name = "aiohttp", specifier = ">=3.9" }, { name = "aiohttp", specifier = ">=3.9" },
{ name = "ncatbot5", specifier = ">=5.5.2.post3" }, { name = "ncatbot5", specifier = ">=5.5.2.post3" },
{ name = "python-dotenv", specifier = ">=1.0" }, { name = "python-dotenv", specifier = ">=1.0" },
{ name = "pyyaml", specifier = ">=6.0" },
] ]
[[package]] [[package]]