Compare commits

..

17 Commits

Author SHA1 Message Date
29433dda02 Merge pull request '命令监听配置重构:SQLite → YAML + 黑白名单过滤优化' (#2) from feat/command-scope into main
Reviewed-on: #2
2026-05-04 00:09:56 +08:00
783049257e ♻️ refactor(command): 黑白名单过滤从 AND 改为 OR 逻辑
- 白名单模式:用户在名单 OR 群在名单 → 放行
- 黑名单模式:用户在名单 OR 群在名单 → 拒绝
- 名单为空时表示不限制
- 更新前端提示说明 OR 逻辑的语义
2026-05-04 00:05:18 +08:00
58e53c8aec feat(command): 黑白名单开关、自动保存与管理界面重构
- 新增 list_enabled 开关控制是否启用名单过滤
- 表单变更后 800ms 自动保存,去掉手动保存按钮
- Header 显示"未保存"指示器,保存中 toast 提示
- 内容区限制最大宽度 900px,优化宽屏显示
- 侧边栏增加圆角选中态,运行状态带脉冲动画
- 白名单模式灰掉黑名单输入,关闭名单时显示遮罩
- 命令测试结果增加成功/失败颜色反馈
- 回调格式改用等宽字体代码块
2026-05-03 21:56:48 +08:00
f82363f45f ♻️ refactor(command): 配置系统从 SQLite 迁移至 YAML 并修复白名单失效
- 用 CommandConfig dataclass 单例替代模块级变量,解决 from import 造成的本地绑定不随 global 更新的 bug
- 删除 db.py,改用 settings.yaml 存储动态配置,首次启动自动创建并合并 .env 默认值
- 新增文件轮询 watcher(2 秒),检测 YAML 变更自动热重载
- 管理界面 API 改为直接读写 YAML,即时生效
- 依赖 aiosqlite 替换为 pyyaml
2026-05-03 18:23:29 +08:00
9ffe78a9c2 feat(command): 添加动态配置、黑白名单与后台管理界面
- 新增 SQLite 数据库层(db.py)持久化命令监听配置,支持热更新无需重启
- 命令过滤从白名单扩展为黑白名单双模式(COMMAND_LIST_MODE: allow/deny)
- 新增后台管理页面 /admin/,侧边栏布局,支持在线修改所有命令监听配置
- 新增 REST API:GET/PUT /api/settings、POST /api/settings/reload
- 新增 rebuild_pattern() 支持配置变更后正则动态重编译
- 中间件放行 /admin 和 /api 路径免鉴权
- 添加 aiosqlite 依赖
2026-05-03 15:22:53 +08:00
ed6e27f162 feat(command): 添加监听范围过滤和回复 @控制
- 新增 COMMAND_SCOPE 配置,支持 all/group/private 过滤消息来源
- 新增 COMMAND_ALLOWED_GROUPS 群号白名单,逗号分隔,留空不限制
- 新增 COMMAND_ALLOWED_USERS QQ 号白名单,逗号分隔,留空不限制
- 新增 COMMAND_AT_SENDER 配置,控制回复时是否 @发送者(默认 true)
- 回调响应中 at_sender 字段可覆盖全局配置
- 更新 .env.example 和 README.md 文档
2026-05-03 12:26:44 +08:00
84f671741b 🐛 fix(command): 回调回复图片时解析本地文件路径
- 复用 message handler 的 _resolve_url 解析上传文件的相对路径
- 回调返回的 image/video/file url 若为已上传文件则补全为绝对路径
- 修复本地文件名被当作 URL 发送导致识别失败的问题
2026-05-03 00:51:07 +08:00
a7f52e33fb 🐛 fix(command): 回调超时从 10s 改为可配置,默认 180s
- 新增 COMMAND_CALLBACK_TIMEOUT 配置项,默认 180 秒
- 生图等耗时命令不再因超时中断
2026-05-03 00:37:16 +08:00
fa5d61dbfa Merge pull request 'feat: 添加命令监听与外接回调功能' (#1) from test/command-listener into main
Reviewed-on: #1
2026-05-02 21:39:18 +08:00
e62fc13f7c feat(command): 命令名支持中文、数字、字母等任意非空白字符
- 正则从仅匹配中文改为匹配任意非空白字符(\S)
- 中文、数字、字母、其他字符均按 1 个字符计数
- 长度范围仍由 COMMAND_LENGTH_MIN/MAX 控制
2026-05-02 21:30:19 +08:00
7c72b1c97a 🐛 fix(command): 修复 plugin.py 引用已删除的 COMMAND_LENGTH 导致启动报错
- 导入改为 COMMAND_LENGTH_MIN / COMMAND_LENGTH_MAX
- 启动日志适配范围格式 %d~%d
2026-05-02 21:20:57 +08:00
d4962a840d feat(command): 命令名长度改为可配置范围 2~4 个中文字
- 将 COMMAND_LENGTH 拆分为 COMMAND_LENGTH_MIN 和 COMMAND_LENGTH_MAX
- 正则匹配支持 2~4 个中文字,范围通过 .env 配置
- 默认最小 2 字、最大 4 字
2026-05-02 21:13:22 +08:00
c6ba7e2e37 feat(command): 支持无参数命令触发
- 正则改为允许命令后无空格和内容,如 `#测试命令` 也可触发
- 命令内容为空时 content 返回空字符串而非匹配失败
2026-05-02 21:01:33 +08:00
89461b6ed6 🐛 fix(command): 修复 Event 对象无 reply 方法
- events() 返回的是 Event(data=GroupMessageEvent) 包装对象
- 改用 event.data(消息事件实体)构建回复
- 手动构建 MessageArray + add_reply 实现引用回复
- 群聊默认 @发送者,at_sender=false 可关闭
2026-05-02 20:38:09 +08:00
601bce8847 feat(command): 回复时引用原消息
- 使用 event.reply() 替代手动调用 send_group/private_text
- 自动引用触发命令的原消息,回复带引用效果
- 群聊默认 @发送者,可通过 at_sender=false 关闭
2026-05-02 19:58:47 +08:00
af0f6c7ec6 feat(command): 回调响应自动回复到 QQ
- 回调服务器可返回 reply 或 messages 字段,插件自动回复到原消息来源
- reply 为纯文本回复,messages 格式同 /webhook 接口
- 支持通过 group_id/user_id 覆盖回复目标
- 无需回复时返回空 JSON 即可
- 更新 README 文档说明回调响应格式
2026-05-02 19:29:18 +08:00
ee1bd583d8 feat(command): 添加命令监听与外接回调功能
- 新增 `#四个中文字+空格` 消息匹配规则,可配置前缀和长度
- 匹配成功后 POST 到 COMMAND_CALLBACK_URL,携带命令名、内容、用户信息
- 使用 EventMixin.events() 订阅消息流,on_close 自动取消监听
- 新增配置项:COMMAND_PREFIX、COMMAND_LENGTH、COMMAND_CALLBACK_URL
- 更新 .env.example 和 README 文档
2026-05-02 19:02:40 +08:00
10 changed files with 1727 additions and 21 deletions

View File

@@ -15,3 +15,23 @@ ALLOWED_EXTENSIONS=
# ── QQ API ── # ── QQ API ──
QQ_API_TIMEOUT=10 QQ_API_TIMEOUT=10
QQ_API_MAX_RETRIES=2 QQ_API_MAX_RETRIES=2
# ── 命令监听 ──
# 命令前缀,默认 #
COMMAND_PREFIX=#
# 命令名最小字符数,默认 2
COMMAND_LENGTH_MIN=2
# 命令名最大字符数,默认 4
COMMAND_LENGTH_MAX=4
# 监听范围all群+私、group仅群、private仅私默认 all
COMMAND_SCOPE=all
# 允许的群号逗号分隔留空不限制123456,789012
COMMAND_ALLOWED_GROUPS=
# 允许的 QQ 号逗号分隔留空不限制111111,222222
COMMAND_ALLOWED_USERS=
# 回复时是否 @发送者,默认 true
COMMAND_AT_SENDER=true
# 回调超时秒数,默认 180生图等耗时命令需要较长超时
COMMAND_CALLBACK_TIMEOUT=180
# 匹配到命令后的回调 URL留空则不监听
COMMAND_CALLBACK_URL=

6
.gitignore vendored
View File

@@ -15,3 +15,9 @@ uploads/
# Environment # Environment
.env .env
# Dynamic config
settings.yaml
# Legacy data
data/

View File

@@ -41,6 +41,15 @@ uv run python -m ncatbot
| `ALLOWED_EXTENSIONS` | 否 | 空(不限) | 允许的扩展名,逗号分隔,如 `jpg,png,pdf` | | `ALLOWED_EXTENSIONS` | 否 | 空(不限) | 允许的扩展名,逗号分隔,如 `jpg,png,pdf` |
| `QQ_API_TIMEOUT` | 否 | `10` | QQ API 超时秒数 | | `QQ_API_TIMEOUT` | 否 | `10` | QQ API 超时秒数 |
| `QQ_API_MAX_RETRIES` | 否 | `2` | QQ API 失败重试次数 | | `QQ_API_MAX_RETRIES` | 否 | `2` | QQ API 失败重试次数 |
| `COMMAND_PREFIX` | 否 | `#` | 命令前缀 |
| `COMMAND_LENGTH_MIN` | 否 | `2` | 命令名最小字符数 |
| `COMMAND_LENGTH_MAX` | 否 | `4` | 命令名最大字符数 |
| `COMMAND_SCOPE` | 否 | `all` | 监听范围:`all`(群+私)、`group`(仅群)、`private`(仅私) |
| `COMMAND_ALLOWED_GROUPS` | 否 | 空(不限) | 允许的群号,逗号分隔,如 `123456,789012` |
| `COMMAND_ALLOWED_USERS` | 否 | 空(不限) | 允许的 QQ 号,逗号分隔,如 `111111,222222` |
| `COMMAND_AT_SENDER` | 否 | `true` | 回复时是否 @发送者 |
| `COMMAND_CALLBACK_TIMEOUT` | 否 | `180` | 回调超时秒数 |
| `COMMAND_CALLBACK_URL` | 否 | 空(不监听) | 命令匹配后的回调 URL |
## 接口说明 ## 接口说明
@@ -182,6 +191,88 @@ curl -X POST http://localhost:8081/webhook \
每条消息独立处理,一条失败不影响其他消息。`results` 数组按原始 `index` 排序,包含每条消息的发送结果。 每条消息独立处理,一条失败不影响其他消息。`results` 数组按原始 `index` 排序,包含每条消息的发送结果。
### 命令监听
插件会自动监听 QQ 消息,当消息以 `#命令名` 开头时,将命令内容 POST 到 `COMMAND_CALLBACK_URL`
**匹配规则:**
```
#测试命令 你好世界
│ │ │ │
│ │ │ └── 命令内容content可选
│ └── 空格分隔(可选)
└── 命令名2~4个字符
└── 前缀(默认 #
```
- 命令名支持中文、数字、字母等任意非空白字符,每个字符计 1
- 前缀通过 `COMMAND_PREFIX` 配置,长度范围通过 `COMMAND_LENGTH_MIN` / `COMMAND_LENGTH_MAX` 配置
- `#测试命令``#1a``#abc` 均可触发
- 不配置 `COMMAND_CALLBACK_URL` 则不监听
**监听范围过滤:**
- `COMMAND_SCOPE`:控制监听范围
- `all`(默认):群聊 + 私聊都监听
- `group`:仅监听群聊
- `private`:仅监听私聊
- `COMMAND_ALLOWED_GROUPS`:群号白名单,逗号分隔,留空不限制
- `COMMAND_ALLOWED_USERS`QQ 号白名单,逗号分隔,留空不限制
- 三个条件同时生效,必须全部满足才触发回调
**回调请求体POST JSON**
```json
{
"command": "测试命令",
"content": "你好世界",
"raw_message": "#测试命令 你好世界",
"user_id": "123456",
"group_id": "789012",
"message_id": "abc123"
}
```
| 字段 | 说明 |
|---|---|
| `command` | 命令名2~4个字符 |
| `content` | 命令后的内容 |
| `raw_message` | 原始消息文本 |
| `user_id` | 发送者 QQ 号 |
| `group_id` | 群号(私聊消息无此字段) |
| `message_id` | 消息 ID |
**回调响应格式(自动回复到 QQ**
回调服务器返回 JSON插件会自动将内容回复到原消息来源群/私聊)。
纯文本回复:
```json
{"reply": "收到你的命令了"}
```
批量回复text/image/video 组合发送file 单独发送):
```json
{
"messages": [
{"type": "text", "msg": "处理结果如下"},
{"type": "image", "url": "https://example.com/result.png"},
{"type": "file", "url": "report.pdf"}
]
}
```
| 字段 | 说明 |
|---|---|
| `reply` | 纯文本回复(与 `messages` 二选一,`messages` 优先) |
| `messages` | 批量回复数组,格式同 `/webhook``messages` 字段 |
| `at_sender` | 是否 @发送者(默认取 `COMMAND_AT_SENDER` 配置,仅群聊生效) |
| `group_id` | 可选,覆盖回复目标群号(默认回复到原群) |
| `user_id` | 可选,覆盖回复目标 QQ 号(默认回复到原发送者) |
不需要回复时返回 `{}` 或空响应即可。
## 项目结构 ## 项目结构
``` ```
@@ -191,6 +282,7 @@ curl -X POST http://localhost:8081/webhook \
├── response.py # 统一响应格式 ├── response.py # 统一响应格式
├── handlers/ ├── handlers/
│ ├── __init__.py │ ├── __init__.py
│ ├── command.py # 命令监听匹配与回调
│ ├── health.py # GET /healthz │ ├── health.py # GET /healthz
│ ├── message.py # POST /webhook │ ├── message.py # POST /webhook
│ └── upload.py # POST /upload │ └── upload.py # POST /upload

267
config.py
View File

@@ -1,33 +1,284 @@
"""项目配置:所有值从环境变量读取,未配置时使用安全默认值""" """项目配置:静态配置从环境变量读取,命令监听配置通过 settings.yaml 动态管理"""
import logging
import os import os
import uuid import uuid
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import yaml
from dotenv import load_dotenv from dotenv import load_dotenv
# 加载 .env 文件(优先从插件目录查找) # 加载 .env 文件(优先从插件目录查找)
load_dotenv(Path(__file__).parent / ".env") load_dotenv(Path(__file__).parent / ".env")
# ── 鉴权 ──────────────────────────────────────────────────── logger = logging.getLogger("webhook-plugin.config")
WEBHOOK_API_KEY: str = os.environ.get("WEBHOOK_API_KEY", "") or uuid.uuid4().hex
# ── 网络 ───────────────────────────────────────────────────── # ── 静态配置(环境变量,运行时不变)────────────────────────────
WEBHOOK_API_KEY: str = os.environ.get("WEBHOOK_API_KEY", "") or uuid.uuid4().hex
HOST: str = os.environ.get("WEBHOOK_HOST", "0.0.0.0") HOST: str = os.environ.get("WEBHOOK_HOST", "0.0.0.0")
try: try:
PORT: int = int(os.environ.get("WEBHOOK_PORT", "8081")) PORT: int = int(os.environ.get("WEBHOOK_PORT", "8081"))
except ValueError: except ValueError:
PORT = 8081 PORT = 8081
# ── 上传 ─────────────────────────────────────────────────────
UPLOAD_DIR: Path = Path(os.environ.get("UPLOAD_DIR", str(Path(__file__).parent / "uploads"))) UPLOAD_DIR: Path = Path(os.environ.get("UPLOAD_DIR", str(Path(__file__).parent / "uploads")))
# 单个文件最大 20 MB
MAX_UPLOAD_SIZE: int = int(os.environ.get("MAX_UPLOAD_SIZE", str(20 * 1024 * 1024))) MAX_UPLOAD_SIZE: int = int(os.environ.get("MAX_UPLOAD_SIZE", str(20 * 1024 * 1024)))
# 允许的文件扩展名(小写,不含点),为空则不限制
ALLOWED_EXTENSIONS: set[str] = set( ALLOWED_EXTENSIONS: set[str] = set(
filter(None, os.environ.get("ALLOWED_EXTENSIONS", "").lower().split(",")) filter(None, os.environ.get("ALLOWED_EXTENSIONS", "").lower().split(","))
) )
# ── QQ API ───────────────────────────────────────────────────
QQ_API_TIMEOUT: float = float(os.environ.get("QQ_API_TIMEOUT", "10")) QQ_API_TIMEOUT: float = float(os.environ.get("QQ_API_TIMEOUT", "10"))
QQ_API_MAX_RETRIES: int = int(os.environ.get("QQ_API_MAX_RETRIES", "2")) QQ_API_MAX_RETRIES: int = int(os.environ.get("QQ_API_MAX_RETRIES", "2"))
# ── YAML 文件路径 ────────────────────────────────────────────
SETTINGS_YAML_PATH: Path = Path(__file__).parent / "settings.yaml"
# ── 动态配置 dataclass ──────────────────────────────────────
@dataclass
class CommandConfig:
"""命令监听动态配置,通过 settings.yaml 热重载。"""
prefix: str = "#"
length_min: int = 2
length_max: int = 4
scope: str = "all" # all / group / private
list_enabled: bool = False # 是否启用黑白名单过滤
list_mode: str = "allow" # allow / deny
allowed_groups: frozenset[str] = frozenset()
denied_groups: frozenset[str] = frozenset()
allowed_users: frozenset[str] = frozenset()
denied_users: frozenset[str] = frozenset()
at_sender: bool = True
callback_url: str = ""
callback_timeout: int = 180
# 全局单例 —— 所有消费者通过 config.command.xxx 访问,避免 stale import 问题
command = CommandConfig()
# ── 环境变量默认值YAML 缺失 key 时的回退)────────────────────
_ENV_DEFAULTS: dict = {
"prefix": os.environ.get("COMMAND_PREFIX", "#"),
"length_min": int(os.environ.get("COMMAND_LENGTH_MIN", "2")),
"length_max": int(os.environ.get("COMMAND_LENGTH_MAX", "4")),
"scope": os.environ.get("COMMAND_SCOPE", "all"),
"list_enabled": os.environ.get("COMMAND_LIST_ENABLED", "false").lower() in ("true", "1", "yes"),
"list_mode": os.environ.get("COMMAND_LIST_MODE", "allow"),
"allowed_groups": os.environ.get("COMMAND_ALLOWED_GROUPS", ""),
"denied_groups": os.environ.get("COMMAND_DENIED_GROUPS", ""),
"allowed_users": os.environ.get("COMMAND_ALLOWED_USERS", ""),
"denied_users": os.environ.get("COMMAND_DENIED_USERS", ""),
"at_sender": os.environ.get("COMMAND_AT_SENDER", "true").lower() in ("true", "1", "yes"),
"callback_url": os.environ.get("COMMAND_CALLBACK_URL", ""),
"callback_timeout": int(os.environ.get("COMMAND_CALLBACK_TIMEOUT", "180")),
}
# ── YAML I/O ─────────────────────────────────────────────────
_YAML_HEADER = (
"# settings.yaml - 命令监听动态配置\n"
"# 可通过后台管理界面修改,也可手动编辑(自动热重载)\n\n"
)
def _yaml_defaults() -> dict:
"""返回默认 YAML 结构(首次启动时写入)。"""
return {
"command": {
"prefix": "#",
"length_min": 2,
"length_max": 4,
"scope": "all",
"list_enabled": False,
"list_mode": "allow",
"allowed_groups": [],
"denied_groups": [],
"allowed_users": [],
"denied_users": [],
"at_sender": True,
"callback_url": "",
"callback_timeout": 180,
}
}
def load_yaml() -> dict:
"""读取 settings.yaml文件不存在时返回空字典。"""
if not SETTINGS_YAML_PATH.exists():
return {}
with open(SETTINGS_YAML_PATH, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
return data if isinstance(data, dict) else {}
def save_yaml(data: dict) -> None:
"""将配置写入 settings.yaml。"""
with open(SETTINGS_YAML_PATH, "w", encoding="utf-8") as f:
f.write(_YAML_HEADER)
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
def ensure_settings_yaml() -> None:
"""首次启动时创建 settings.yaml合并环境变量覆盖。"""
if SETTINGS_YAML_PATH.exists():
return
data = _yaml_defaults()
# 用环境变量覆盖默认值
env_map = {
"prefix": ("COMMAND_PREFIX", str),
"length_min": ("COMMAND_LENGTH_MIN", int),
"length_max": ("COMMAND_LENGTH_MAX", int),
"scope": ("COMMAND_SCOPE", str),
"list_mode": ("COMMAND_LIST_MODE", str),
"list_enabled": ("COMMAND_LIST_ENABLED", "bool"),
"allowed_groups": ("COMMAND_ALLOWED_GROUPS", "csv_list"),
"denied_groups": ("COMMAND_DENIED_GROUPS", "csv_list"),
"allowed_users": ("COMMAND_ALLOWED_USERS", "csv_list"),
"denied_users": ("COMMAND_DENIED_USERS", "csv_list"),
"at_sender": ("COMMAND_AT_SENDER", "bool"),
"callback_url": ("COMMAND_CALLBACK_URL", str),
"callback_timeout": ("COMMAND_CALLBACK_TIMEOUT", int),
}
for key, (env_name, conv) in env_map.items():
env_val = os.environ.get(env_name)
if env_val is not None:
if conv == "csv_list":
data["command"][key] = [v.strip() for v in env_val.split(",") if v.strip()]
elif conv == "bool":
data["command"][key] = env_val.lower() in ("true", "1", "yes")
else:
data["command"][key] = conv(env_val)
save_yaml(data)
logger.info("Created settings.yaml with defaults (env-var overrides applied)")
# ── 将 YAML 数据应用到全局 command 对象 ───────────────────────
def _apply_yaml_to_command(data: dict) -> None:
"""将 YAML command 段合并到全局 command 对象,缺失 key 回退到环境变量默认值。"""
cmd_data = data.get("command", {})
command.prefix = str(cmd_data.get("prefix", _ENV_DEFAULTS["prefix"]))
command.length_min = int(cmd_data.get("length_min", _ENV_DEFAULTS["length_min"]))
command.length_max = int(cmd_data.get("length_max", _ENV_DEFAULTS["length_max"]))
command.scope = str(cmd_data.get("scope", _ENV_DEFAULTS["scope"]))
command.list_mode = str(cmd_data.get("list_mode", _ENV_DEFAULTS["list_mode"]))
# list_enabled: YAML bool → Python bool
list_enabled_val = cmd_data.get("list_enabled", _ENV_DEFAULTS["list_enabled"])
if isinstance(list_enabled_val, str):
command.list_enabled = list_enabled_val.lower() in ("true", "1", "yes")
else:
command.list_enabled = bool(list_enabled_val)
command.callback_url = str(cmd_data.get("callback_url", _ENV_DEFAULTS["callback_url"]))
command.callback_timeout = int(cmd_data.get("callback_timeout", _ENV_DEFAULTS["callback_timeout"]))
# at_sender: YAML bool → Python bool
at_sender_val = cmd_data.get("at_sender", _ENV_DEFAULTS["at_sender"])
if isinstance(at_sender_val, str):
command.at_sender = at_sender_val.lower() in ("true", "1", "yes")
else:
command.at_sender = bool(at_sender_val)
# 列表字段: YAML list → frozenset
for field_name in ("allowed_groups", "denied_groups", "allowed_users", "denied_users"):
raw = cmd_data.get(field_name)
if raw is None:
# 回退到环境变量默认值
env_default = _ENV_DEFAULTS[field_name]
if isinstance(env_default, str):
setattr(command, field_name, frozenset(
v.strip() for v in env_default.split(",") if v.strip()
))
else:
setattr(command, field_name, frozenset(env_default) if env_default else frozenset())
elif isinstance(raw, str):
setattr(command, field_name, frozenset(
v.strip() for v in raw.split(",") if v.strip()
))
elif isinstance(raw, list):
setattr(command, field_name, frozenset(str(v).strip() for v in raw if str(v).strip()))
else:
setattr(command, field_name, frozenset())
# 重建命令匹配正则
from .handlers.command import rebuild_pattern
rebuild_pattern()
# ── 公共 API ─────────────────────────────────────────────────
def reload_settings() -> None:
"""重新读取 settings.yaml 并应用到全局 command 对象。"""
data = load_yaml()
_apply_yaml_to_command(data)
logger.info("Config reloaded from settings.yaml")
def get_settings_flat() -> dict[str, str]:
"""返回所有动态配置的扁平字典 {key: str_value},供管理 API 使用。"""
return {
"command_prefix": command.prefix,
"command_length_min": str(command.length_min),
"command_length_max": str(command.length_max),
"command_scope": command.scope,
"command_list_mode": command.list_mode,
"command_list_enabled": str(command.list_enabled).lower(),
"command_allowed_groups": ",".join(sorted(command.allowed_groups)),
"command_denied_groups": ",".join(sorted(command.denied_groups)),
"command_allowed_users": ",".join(sorted(command.allowed_users)),
"command_denied_users": ",".join(sorted(command.denied_users)),
"command_at_sender": str(command.at_sender).lower(),
"command_callback_url": command.callback_url,
"command_callback_timeout": str(command.callback_timeout),
}
# API 平坦 key → (YAML key, 类型转换) 映射
_API_KEY_MAP: dict[str, tuple[str, ...]] = {
"command_prefix": ("prefix", str),
"command_length_min": ("length_min", int),
"command_length_max": ("length_max", int),
"command_scope": ("scope", str),
"command_list_mode": ("list_mode", str),
"command_list_enabled": ("list_enabled", "bool"),
"command_at_sender": ("at_sender", "bool"),
"command_callback_url": ("callback_url", str),
"command_callback_timeout": ("callback_timeout", int),
"command_allowed_groups": ("allowed_groups", "csv_list"),
"command_denied_groups": ("denied_groups", "csv_list"),
"command_allowed_users": ("allowed_users", "csv_list"),
"command_denied_users": ("denied_users", "csv_list"),
}
def update_settings_from_api(data: dict[str, str]) -> dict[str, str]:
"""从管理 API 接收扁平字典,合并到 settings.yaml 并热重载。返回已应用的字段。"""
allowed_keys = set(_API_KEY_MAP.keys())
filtered = {k: str(v) for k, v in data.items() if k in allowed_keys}
if not filtered:
return {}
# 读取当前 YAML合并修改
yaml_data = load_yaml()
if "command" not in yaml_data:
yaml_data["command"] = {}
cmd = yaml_data["command"]
for flat_key, value in filtered.items():
yaml_key, conv = _API_KEY_MAP[flat_key]
if conv == "csv_list":
cmd[yaml_key] = [v.strip() for v in value.split(",") if v.strip()]
elif conv == "bool":
cmd[yaml_key] = value.lower() in ("true", "1", "yes")
elif conv is int:
try:
cmd[yaml_key] = int(value)
except ValueError:
pass
else:
cmd[yaml_key] = value
save_yaml(yaml_data)
# 写入后立即应用(不等文件 watcher
_apply_yaml_to_command(yaml_data)
return filtered

1042
handlers/admin.py Normal file

File diff suppressed because it is too large Load Diff

176
handlers/command.py Normal file
View File

@@ -0,0 +1,176 @@
"""命令监听处理器:匹配 #命令名 格式的消息,转发到外部回调 URL 并自动回复。"""
import re
import aiohttp
from ..config import UPLOAD_DIR, command
from ..handlers.message import _resolve_url
def build_command_pattern() -> re.Pattern:
"""构建命令匹配正则:# + N个字符中文/数字/字母/下划线等),后面可跟空格+内容或无内容。
每个"字符"按 Unicode 码点计:
- 一个中文字 = 1
- 一个数字 = 1
- 一个英文字母 = 1
- 其他非空白字符 = 1
"""
return re.compile(
rf"^{re.escape(command.prefix)}(\S{{{command.length_min},{command.length_max}}})(?:\s+(.+))?$",
re.DOTALL,
)
COMMAND_PATTERN = build_command_pattern()
def rebuild_pattern() -> None:
"""动态配置变更后重新编译正则。"""
global COMMAND_PATTERN
COMMAND_PATTERN = build_command_pattern()
def parse_command(raw_message: str) -> dict | None:
"""解析消息,匹配命令模式。返回 {command, content, raw_message} 或 None。"""
match = COMMAND_PATTERN.match(raw_message.strip())
if not match:
return None
return {
"command": match.group(1),
"content": match.group(2).strip() if match.group(2) else "",
"raw_message": raw_message.strip(),
}
async def send_command_callback(data: dict, event, api, logger) -> None:
"""将命令数据 POST 到外部回调 URL根据响应自动回复到 QQ。
回调服务器返回格式:
{
"reply": "回复文本", // 纯文本回复(引用原消息)
"messages": [ // 批量回复(可选,优先于 reply
{"type": "text", "msg": "..."},
{"type": "image", "url": "..."},
{"type": "file", "url": "..."},
{"type": "video", "url": "..."}
],
"at_sender": true // 是否 @发送者(默认取配置,仅群聊)
}
所有字段均为可选,无回复内容时返回空 JSON 即可。
回复会引用触发命令的原消息。
"""
if not command.callback_url:
logger.warning("callback_url 未配置,跳过命令回调")
return
try:
async with aiohttp.ClientSession() as session:
async with session.post(
command.callback_url,
json=data,
timeout=aiohttp.ClientTimeout(total=command.callback_timeout),
) as resp:
if resp.status >= 400:
body = await resp.text()
logger.error(
"命令回调失败: status=%d url=%s body=%s",
resp.status, command.callback_url, body[:200],
)
return
# 解析响应,自动回复
try:
result = await resp.json(content_type=None)
except Exception:
return
if not isinstance(result, dict):
return
await _handle_reply(result, event.data, api, logger)
except Exception as exc:
logger.error("命令回调异常: url=%s error=%s", command.callback_url, exc)
async def _handle_reply(result: dict, msg_event, api, logger) -> None:
"""处理回调响应引用原消息自动回复。msg_event 是 GroupMessageEvent / PrivateMessageEvent。"""
# at_sender: 回调响应中的值优先,未指定则使用全局配置
at_sender = result.get("at_sender", command.at_sender)
messages = result.get("messages")
reply = result.get("reply")
group_id = getattr(msg_event, "group_id", None)
user_id = msg_event.user_id
message_id = msg_event.message_id
# 构造引用消息段
from ncatbot.types import MessageArray, Reply
def build_reply_msg(text=None, image=None, video=None) -> MessageArray:
"""构建带引用的消息段。"""
msg = MessageArray()
msg.add_reply(message_id)
if group_id and at_sender:
msg.add_at(user_id)
msg.add_text(" ")
if text is not None:
msg.add_text(text)
if image is not None:
msg.add_image(image)
if video is not None:
msg.add_video(video)
return msg
# 批量回复:引用 + 批量消息段
if messages and isinstance(messages, list):
text_parts: list[str] = []
image_url: str | None = None
video_url: str | None = None
file_msgs: list[dict] = []
for msg in messages:
msg_type = msg.get("type", "text")
if msg_type == "text":
text_parts.append(msg.get("msg", ""))
elif msg_type == "image":
image_url = _resolve_url(msg.get("url", ""))
elif msg_type == "video":
video_url = _resolve_url(msg.get("url", ""))
elif msg_type == "file":
file_msgs.append(msg)
text = "\n".join(text_parts) if text_parts else None
try:
# 组合消息(带引用)
if text or image_url or video_url:
reply_msg = build_reply_msg(text=text, image=image_url, video=video_url)
if group_id:
await api.qq.post_group_array_msg(group_id=group_id, msg=reply_msg)
else:
await api.qq.post_private_array_msg(user_id=user_id, msg=reply_msg)
# 文件单独发
for fm in file_msgs:
url = _resolve_url(fm.get("url", ""))
filename = url.split("/")[-1]
if group_id:
await api.qq.send_group_file(group_id=group_id, file=url, name=filename)
else:
await api.qq.send_private_file(user_id=user_id, file=url, name=filename)
except Exception as exc:
logger.error("命令回复失败: %s", exc)
return
# 纯文本回复:引用原消息
if reply and isinstance(reply, str):
try:
reply_msg = build_reply_msg(text=reply)
if group_id:
await api.qq.post_group_array_msg(group_id=group_id, msg=reply_msg)
else:
await api.qq.post_private_array_msg(user_id=user_id, msg=reply_msg)
except Exception as exc:
logger.error("命令回复失败: %s", exc)

View File

@@ -10,9 +10,9 @@ from .response import error
@web.middleware @web.middleware
async def auth_middleware(request: web.Request, handler): async def auth_middleware(request: web.Request, handler):
""" /upload 和 /webhook 路径强制校验 API Key""" """需要鉴权的路径校验 API Key。/healthz 和 /admin/ 及 /api/ 开头的路径不需要鉴权"""
# 健康检查不需要鉴权 # 不需要鉴权的路径
if request.path == "/healthz": if request.path == "/healthz" or request.path.startswith("/admin") or request.path.startswith("/api/"):
return await handler(request) return await handler(request)
auth_header = request.headers.get("Authorization", "") auth_header = request.headers.get("Authorization", "")

128
plugin.py
View File

@@ -7,7 +7,18 @@ import os
from aiohttp import web from aiohttp import web
from ncatbot.plugin import NcatBotPlugin from ncatbot.plugin import NcatBotPlugin
from .config import HOST, PORT, UPLOAD_DIR, WEBHOOK_API_KEY from .config import (
HOST,
PORT,
UPLOAD_DIR,
WEBHOOK_API_KEY,
SETTINGS_YAML_PATH,
command,
ensure_settings_yaml,
reload_settings,
)
from .handlers.admin import admin_page_handler, api_get_settings, api_reload_settings, api_update_settings
from .handlers.command import parse_command, send_command_callback
from .handlers.health import health_handler from .handlers.health import health_handler
from .handlers.message import webhook_handler from .handlers.message import webhook_handler
from .handlers.upload import cleanup_expired_files, upload_handler from .handlers.upload import cleanup_expired_files, upload_handler
@@ -26,21 +37,44 @@ class WebHookPlugin(NcatBotPlugin):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._webhook_runner: web.AppRunner | None = None self._webhook_runner: web.AppRunner | None = None
self._cleanup_task: asyncio.Task | None = None self._cleanup_task: asyncio.Task | None = None
self._listener_task: asyncio.Task | None = None
self._watcher_task: asyncio.Task | None = None
async def on_load(self): async def on_load(self):
# 初始化 settings.yaml 并加载配置
ensure_settings_yaml()
reload_settings()
self.logger.info("Webhook 插件已加载") self.logger.info("Webhook 插件已加载")
self.logger.info("WEBHOOK_API_KEY: %s", "已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成") self.logger.info(
"WEBHOOK_API_KEY: %s",
"已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成",
)
self.logger.info(
"命令监听: 前缀=%s 长度=%d~%d 范围=%s 名单=%s(%s) 回调=%s",
command.prefix,
command.length_min,
command.length_max,
command.scope,
"" if command.list_enabled else "",
command.list_mode,
command.callback_url or "未配置",
)
asyncio.create_task(self._start_webhook()) asyncio.create_task(self._start_webhook())
self._cleanup_task = asyncio.create_task(self._cleanup_loop()) self._cleanup_task = asyncio.create_task(self._cleanup_loop())
self._listener_task = asyncio.create_task(self._message_listener())
self._watcher_task = asyncio.create_task(self._config_watcher())
async def on_close(self): async def on_close(self):
if self._cleanup_task is not None: for task_attr in ("_watcher_task", "_listener_task", "_cleanup_task"):
self._cleanup_task.cancel() task = getattr(self, task_attr)
if task is not None:
task.cancel()
try: try:
await self._cleanup_task await task
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
self._cleanup_task = None setattr(self, task_attr, None)
await self._stop_webhook() await self._stop_webhook()
self.logger.info("Webhook 插件已卸载") self.logger.info("Webhook 插件已卸载")
@@ -53,6 +87,82 @@ class WebHookPlugin(NcatBotPlugin):
except Exception as exc: except Exception as exc:
self.logger.error("清理过期文件失败: %s", exc) self.logger.error("清理过期文件失败: %s", exc)
async def _config_watcher(self) -> None:
"""轮询 settings.yaml 的 mtime变更时热重载配置。"""
last_mtime: float = 0.0
while True:
await asyncio.sleep(2)
try:
if SETTINGS_YAML_PATH.exists():
current_mtime = SETTINGS_YAML_PATH.stat().st_mtime
if current_mtime != last_mtime:
last_mtime = current_mtime
reload_settings()
self.logger.info("settings.yaml changed, config reloaded")
except Exception as exc:
self.logger.error("Config watcher error: %s", exc)
async def _message_listener(self) -> None:
"""监听 QQ 消息,匹配命令模式后转发到外部回调。"""
try:
async with self.events("message") as stream:
async for event in stream:
try:
raw_message = event.data.raw_message
if not raw_message:
continue
parsed = parse_command(raw_message)
if not parsed:
continue
# 范围过滤group / private / all
is_group = hasattr(event.data, "group_id")
if command.scope == "group" and not is_group:
continue
if command.scope == "private" and is_group:
continue
# 黑白名单过滤
if command.list_enabled:
if command.list_mode == "allow":
# 白名单模式OR 逻辑):用户在名单 OR 群在名单 → 放行
# 名单为空时视为不限制
if command.allowed_users or command.allowed_groups:
user_ok = event.data.user_id in command.allowed_users
group_ok = is_group and event.data.group_id in command.allowed_groups
if not (user_ok or group_ok):
continue
elif command.list_mode == "deny":
# 黑名单模式:用户在名单 OR 群在名单 → 拒绝
user_blocked = event.data.user_id in command.denied_users
group_blocked = is_group and event.data.group_id in command.denied_groups
if user_blocked or group_blocked:
continue
# 构建回调数据
data = {
"command": parsed["command"],
"content": parsed["content"],
"raw_message": parsed["raw_message"],
"user_id": event.data.user_id,
"message_id": event.data.message_id,
}
if is_group:
data["group_id"] = event.data.group_id
self.logger.info(
"命令监听匹配: command=%s user=%s group=%s",
parsed["command"],
data["user_id"],
data.get("group_id", "-"),
)
asyncio.create_task(
send_command_callback(data, event, self.api, self.logger)
)
except Exception as exc:
self.logger.error("消息处理异常: %s", exc)
except asyncio.CancelledError:
return
def _create_app(self) -> web.Application: def _create_app(self) -> web.Application:
app = web.Application(middlewares=[request_id_middleware, auth_middleware]) app = web.Application(middlewares=[request_id_middleware, auth_middleware])
app["qq_api"] = self.api app["qq_api"] = self.api
@@ -60,6 +170,11 @@ class WebHookPlugin(NcatBotPlugin):
app.router.add_get("/healthz", health_handler) app.router.add_get("/healthz", health_handler)
app.router.add_post("/webhook", webhook_handler) app.router.add_post("/webhook", webhook_handler)
app.router.add_post("/upload", upload_handler) app.router.add_post("/upload", upload_handler)
# 后台管理
app.router.add_get("/admin/", admin_page_handler)
app.router.add_get("/api/settings", api_get_settings)
app.router.add_put("/api/settings", api_update_settings)
app.router.add_post("/api/settings/reload", api_reload_settings)
return app return app
async def _start_webhook(self): async def _start_webhook(self):
@@ -71,6 +186,7 @@ class WebHookPlugin(NcatBotPlugin):
await site.start() await site.start()
self.logger.info("Webhook 已启动: %s:%d", HOST, PORT) self.logger.info("Webhook 已启动: %s:%d", HOST, PORT)
self.logger.info("上传目录: %s", UPLOAD_DIR) self.logger.info("上传目录: %s", UPLOAD_DIR)
self.logger.info("后台管理: http://%s:%d/admin/", HOST, PORT)
async def _stop_webhook(self): async def _stop_webhook(self):
if self._webhook_runner is not None: if self._webhook_runner is not None:

View File

@@ -8,4 +8,5 @@ dependencies = [
"ncatbot5>=5.5.2.post3", "ncatbot5>=5.5.2.post3",
"aiohttp>=3.9", "aiohttp>=3.9",
"python-dotenv>=1.0", "python-dotenv>=1.0",
"pyyaml>=6.0",
] ]

2
uv.lock generated
View File

@@ -911,6 +911,7 @@ dependencies = [
{ name = "aiohttp" }, { name = "aiohttp" },
{ name = "ncatbot5" }, { name = "ncatbot5" },
{ name = "python-dotenv" }, { name = "python-dotenv" },
{ name = "pyyaml" },
] ]
[package.metadata] [package.metadata]
@@ -918,6 +919,7 @@ requires-dist = [
{ name = "aiohttp", specifier = ">=3.9" }, { name = "aiohttp", specifier = ">=3.9" },
{ name = "ncatbot5", specifier = ">=5.5.2.post3" }, { name = "ncatbot5", specifier = ">=5.5.2.post3" },
{ name = "python-dotenv", specifier = ">=1.0" }, { name = "python-dotenv", specifier = ">=1.0" },
{ name = "pyyaml", specifier = ">=6.0" },
] ]
[[package]] [[package]]