18 Commits
v0.1.1 ... main

Author SHA1 Message Date
832ed063a0 🔒 fix(auth): 管理面板添加 API Key 鉴权
- 移除 /api/ 路由的鉴权豁免,所有数据接口必须携带 API Key
- 仅 /healthz 和 /admin/(HTML 页面壳)免鉴权
- 前端新增登录遮罩层,401 时弹出 API Key 输入框
- Key 存储在 sessionStorage,所有 API 请求自动附加 X-API-Key header
- 支持 ?apiKey=xxx URL 参数自动登录(登录后从 URL 移除避免泄露)
2026-05-04 19:01:32 +08:00
29433dda02 Merge pull request '命令监听配置重构:SQLite → YAML + 黑白名单过滤优化' (#2) from feat/command-scope into main
Reviewed-on: #2
2026-05-04 00:09:56 +08:00
783049257e ♻️ refactor(command): 黑白名单过滤从 AND 改为 OR 逻辑
- 白名单模式:用户在名单 OR 群在名单 → 放行
- 黑名单模式:用户在名单 OR 群在名单 → 拒绝
- 名单为空时表示不限制
- 更新前端提示说明 OR 逻辑的语义
2026-05-04 00:05:18 +08:00
58e53c8aec feat(command): 黑白名单开关、自动保存与管理界面重构
- 新增 list_enabled 开关控制是否启用名单过滤
- 表单变更后 800ms 自动保存,去掉手动保存按钮
- Header 显示"未保存"指示器,保存中 toast 提示
- 内容区限制最大宽度 900px,优化宽屏显示
- 侧边栏增加圆角选中态,运行状态带脉冲动画
- 白名单模式灰掉黑名单输入,关闭名单时显示遮罩
- 命令测试结果增加成功/失败颜色反馈
- 回调格式改用等宽字体代码块
2026-05-03 21:56:48 +08:00
f82363f45f ♻️ refactor(command): 配置系统从 SQLite 迁移至 YAML 并修复白名单失效
- 用 CommandConfig dataclass 单例替代模块级变量,解决 from import 造成的本地绑定不随 global 更新的 bug
- 删除 db.py,改用 settings.yaml 存储动态配置,首次启动自动创建并合并 .env 默认值
- 新增文件轮询 watcher(2 秒),检测 YAML 变更自动热重载
- 管理界面 API 改为直接读写 YAML,即时生效
- 依赖 aiosqlite 替换为 pyyaml
2026-05-03 18:23:29 +08:00
9ffe78a9c2 feat(command): 添加动态配置、黑白名单与后台管理界面
- 新增 SQLite 数据库层(db.py)持久化命令监听配置,支持热更新无需重启
- 命令过滤从白名单扩展为黑白名单双模式(COMMAND_LIST_MODE: allow/deny)
- 新增后台管理页面 /admin/,侧边栏布局,支持在线修改所有命令监听配置
- 新增 REST API:GET/PUT /api/settings、POST /api/settings/reload
- 新增 rebuild_pattern() 支持配置变更后正则动态重编译
- 中间件放行 /admin 和 /api 路径免鉴权
- 添加 aiosqlite 依赖
2026-05-03 15:22:53 +08:00
ed6e27f162 feat(command): 添加监听范围过滤和回复 @控制
- 新增 COMMAND_SCOPE 配置,支持 all/group/private 过滤消息来源
- 新增 COMMAND_ALLOWED_GROUPS 群号白名单,逗号分隔,留空不限制
- 新增 COMMAND_ALLOWED_USERS QQ 号白名单,逗号分隔,留空不限制
- 新增 COMMAND_AT_SENDER 配置,控制回复时是否 @发送者(默认 true)
- 回调响应中 at_sender 字段可覆盖全局配置
- 更新 .env.example 和 README.md 文档
2026-05-03 12:26:44 +08:00
84f671741b 🐛 fix(command): 回调回复图片时解析本地文件路径
- 复用 message handler 的 _resolve_url 解析上传文件的相对路径
- 回调返回的 image/video/file url 若为已上传文件则补全为绝对路径
- 修复本地文件名被当作 URL 发送导致识别失败的问题
2026-05-03 00:51:07 +08:00
a7f52e33fb 🐛 fix(command): 回调超时从 10s 改为可配置,默认 180s
- 新增 COMMAND_CALLBACK_TIMEOUT 配置项,默认 180 秒
- 生图等耗时命令不再因超时中断
2026-05-03 00:37:16 +08:00
fa5d61dbfa Merge pull request 'feat: 添加命令监听与外接回调功能' (#1) from test/command-listener into main
Reviewed-on: #1
2026-05-02 21:39:18 +08:00
e62fc13f7c feat(command): 命令名支持中文、数字、字母等任意非空白字符
- 正则从仅匹配中文改为匹配任意非空白字符(\S)
- 中文、数字、字母、其他字符均按 1 个字符计数
- 长度范围仍由 COMMAND_LENGTH_MIN/MAX 控制
2026-05-02 21:30:19 +08:00
7c72b1c97a 🐛 fix(command): 修复 plugin.py 引用已删除的 COMMAND_LENGTH 导致启动报错
- 导入改为 COMMAND_LENGTH_MIN / COMMAND_LENGTH_MAX
- 启动日志适配范围格式 %d~%d
2026-05-02 21:20:57 +08:00
d4962a840d feat(command): 命令名长度改为可配置范围 2~4 个中文字
- 将 COMMAND_LENGTH 拆分为 COMMAND_LENGTH_MIN 和 COMMAND_LENGTH_MAX
- 正则匹配支持 2~4 个中文字,范围通过 .env 配置
- 默认最小 2 字、最大 4 字
2026-05-02 21:13:22 +08:00
c6ba7e2e37 feat(command): 支持无参数命令触发
- 正则改为允许命令后无空格和内容,如 `#测试命令` 也可触发
- 命令内容为空时 content 返回空字符串而非匹配失败
2026-05-02 21:01:33 +08:00
89461b6ed6 🐛 fix(command): 修复 Event 对象无 reply 方法
- events() 返回的是 Event(data=GroupMessageEvent) 包装对象
- 改用 event.data(消息事件实体)构建回复
- 手动构建 MessageArray + add_reply 实现引用回复
- 群聊默认 @发送者,at_sender=false 可关闭
2026-05-02 20:38:09 +08:00
601bce8847 feat(command): 回复时引用原消息
- 使用 event.reply() 替代手动调用 send_group/private_text
- 自动引用触发命令的原消息,回复带引用效果
- 群聊默认 @发送者,可通过 at_sender=false 关闭
2026-05-02 19:58:47 +08:00
af0f6c7ec6 feat(command): 回调响应自动回复到 QQ
- 回调服务器可返回 reply 或 messages 字段,插件自动回复到原消息来源
- reply 为纯文本回复,messages 格式同 /webhook 接口
- 支持通过 group_id/user_id 覆盖回复目标
- 无需回复时返回空 JSON 即可
- 更新 README 文档说明回调响应格式
2026-05-02 19:29:18 +08:00
ee1bd583d8 feat(command): 添加命令监听与外接回调功能
- 新增 `#四个中文字+空格` 消息匹配规则,可配置前缀和长度
- 匹配成功后 POST 到 COMMAND_CALLBACK_URL,携带命令名、内容、用户信息
- 使用 EventMixin.events() 订阅消息流,on_close 自动取消监听
- 新增配置项:COMMAND_PREFIX、COMMAND_LENGTH、COMMAND_CALLBACK_URL
- 更新 .env.example 和 README 文档
2026-05-02 19:02:40 +08:00
10 changed files with 1793 additions and 21 deletions

View File

@@ -15,3 +15,23 @@ ALLOWED_EXTENSIONS=
# ── QQ API ──
QQ_API_TIMEOUT=10
QQ_API_MAX_RETRIES=2
# ── 命令监听 ──
# 命令前缀,默认 #
COMMAND_PREFIX=#
# 命令名最小字符数,默认 2
COMMAND_LENGTH_MIN=2
# 命令名最大字符数,默认 4
COMMAND_LENGTH_MAX=4
# 监听范围all群+私、group仅群、private仅私默认 all
COMMAND_SCOPE=all
# 允许的群号逗号分隔留空不限制123456,789012
COMMAND_ALLOWED_GROUPS=
# 允许的 QQ 号逗号分隔留空不限制111111,222222
COMMAND_ALLOWED_USERS=
# 回复时是否 @发送者,默认 true
COMMAND_AT_SENDER=true
# 回调超时秒数,默认 180生图等耗时命令需要较长超时
COMMAND_CALLBACK_TIMEOUT=180
# 匹配到命令后的回调 URL留空则不监听
COMMAND_CALLBACK_URL=

9
.gitignore vendored
View File

@@ -7,6 +7,9 @@ wheels/
*.egg-info
.claude
CLAUDE.md
codestable
# Virtual environments
.venv
@@ -15,3 +18,9 @@ uploads/
# Environment
.env
# Dynamic config
settings.yaml
# Legacy data
data/

View File

@@ -41,6 +41,15 @@ uv run python -m ncatbot
| `ALLOWED_EXTENSIONS` | 否 | 空(不限) | 允许的扩展名,逗号分隔,如 `jpg,png,pdf` |
| `QQ_API_TIMEOUT` | 否 | `10` | QQ API 超时秒数 |
| `QQ_API_MAX_RETRIES` | 否 | `2` | QQ API 失败重试次数 |
| `COMMAND_PREFIX` | 否 | `#` | 命令前缀 |
| `COMMAND_LENGTH_MIN` | 否 | `2` | 命令名最小字符数 |
| `COMMAND_LENGTH_MAX` | 否 | `4` | 命令名最大字符数 |
| `COMMAND_SCOPE` | 否 | `all` | 监听范围:`all`(群+私)、`group`(仅群)、`private`(仅私) |
| `COMMAND_ALLOWED_GROUPS` | 否 | 空(不限) | 允许的群号,逗号分隔,如 `123456,789012` |
| `COMMAND_ALLOWED_USERS` | 否 | 空(不限) | 允许的 QQ 号,逗号分隔,如 `111111,222222` |
| `COMMAND_AT_SENDER` | 否 | `true` | 回复时是否 @发送者 |
| `COMMAND_CALLBACK_TIMEOUT` | 否 | `180` | 回调超时秒数 |
| `COMMAND_CALLBACK_URL` | 否 | 空(不监听) | 命令匹配后的回调 URL |
## 接口说明
@@ -182,6 +191,88 @@ curl -X POST http://localhost:8081/webhook \
每条消息独立处理,一条失败不影响其他消息。`results` 数组按原始 `index` 排序,包含每条消息的发送结果。
### 命令监听
插件会自动监听 QQ 消息,当消息以 `#命令名` 开头时,将命令内容 POST 到 `COMMAND_CALLBACK_URL`
**匹配规则:**
```
#测试命令 你好世界
│ │ │ │
│ │ │ └── 命令内容content可选
│ └── 空格分隔(可选)
└── 命令名2~4个字符
└── 前缀(默认 #
```
- 命令名支持中文、数字、字母等任意非空白字符,每个字符计 1
- 前缀通过 `COMMAND_PREFIX` 配置,长度范围通过 `COMMAND_LENGTH_MIN` / `COMMAND_LENGTH_MAX` 配置
- `#测试命令``#1a``#abc` 均可触发
- 不配置 `COMMAND_CALLBACK_URL` 则不监听
**监听范围过滤:**
- `COMMAND_SCOPE`:控制监听范围
- `all`(默认):群聊 + 私聊都监听
- `group`:仅监听群聊
- `private`:仅监听私聊
- `COMMAND_ALLOWED_GROUPS`:群号白名单,逗号分隔,留空不限制
- `COMMAND_ALLOWED_USERS`QQ 号白名单,逗号分隔,留空不限制
- 三个条件同时生效,必须全部满足才触发回调
**回调请求体POST JSON**
```json
{
"command": "测试命令",
"content": "你好世界",
"raw_message": "#测试命令 你好世界",
"user_id": "123456",
"group_id": "789012",
"message_id": "abc123"
}
```
| 字段 | 说明 |
|---|---|
| `command` | 命令名2~4个字符 |
| `content` | 命令后的内容 |
| `raw_message` | 原始消息文本 |
| `user_id` | 发送者 QQ 号 |
| `group_id` | 群号(私聊消息无此字段) |
| `message_id` | 消息 ID |
**回调响应格式(自动回复到 QQ**
回调服务器返回 JSON插件会自动将内容回复到原消息来源群/私聊)。
纯文本回复:
```json
{"reply": "收到你的命令了"}
```
批量回复text/image/video 组合发送file 单独发送):
```json
{
"messages": [
{"type": "text", "msg": "处理结果如下"},
{"type": "image", "url": "https://example.com/result.png"},
{"type": "file", "url": "report.pdf"}
]
}
```
| 字段 | 说明 |
|---|---|
| `reply` | 纯文本回复(与 `messages` 二选一,`messages` 优先) |
| `messages` | 批量回复数组,格式同 `/webhook``messages` 字段 |
| `at_sender` | 是否 @发送者(默认取 `COMMAND_AT_SENDER` 配置,仅群聊生效) |
| `group_id` | 可选,覆盖回复目标群号(默认回复到原群) |
| `user_id` | 可选,覆盖回复目标 QQ 号(默认回复到原发送者) |
不需要回复时返回 `{}` 或空响应即可。
## 项目结构
```
@@ -191,6 +282,7 @@ curl -X POST http://localhost:8081/webhook \
├── response.py # 统一响应格式
├── handlers/
│ ├── __init__.py
│ ├── command.py # 命令监听匹配与回调
│ ├── health.py # GET /healthz
│ ├── message.py # POST /webhook
│ └── upload.py # POST /upload

267
config.py
View File

@@ -1,33 +1,284 @@
"""项目配置:所有值从环境变量读取,未配置时使用安全默认值"""
"""项目配置:静态配置从环境变量读取,命令监听配置通过 settings.yaml 动态管理"""
import logging
import os
import uuid
from dataclasses import dataclass
from pathlib import Path
import yaml
from dotenv import load_dotenv
# 加载 .env 文件(优先从插件目录查找)
load_dotenv(Path(__file__).parent / ".env")
# ── 鉴权 ────────────────────────────────────────────────────
WEBHOOK_API_KEY: str = os.environ.get("WEBHOOK_API_KEY", "") or uuid.uuid4().hex
logger = logging.getLogger("webhook-plugin.config")
# ── 网络 ─────────────────────────────────────────────────────
# ── 静态配置(环境变量,运行时不变)────────────────────────────
WEBHOOK_API_KEY: str = os.environ.get("WEBHOOK_API_KEY", "") or uuid.uuid4().hex
HOST: str = os.environ.get("WEBHOOK_HOST", "0.0.0.0")
try:
PORT: int = int(os.environ.get("WEBHOOK_PORT", "8081"))
except ValueError:
PORT = 8081
# ── 上传 ─────────────────────────────────────────────────────
UPLOAD_DIR: Path = Path(os.environ.get("UPLOAD_DIR", str(Path(__file__).parent / "uploads")))
# 单个文件最大 20 MB
MAX_UPLOAD_SIZE: int = int(os.environ.get("MAX_UPLOAD_SIZE", str(20 * 1024 * 1024)))
# 允许的文件扩展名(小写,不含点),为空则不限制
ALLOWED_EXTENSIONS: set[str] = set(
filter(None, os.environ.get("ALLOWED_EXTENSIONS", "").lower().split(","))
)
# ── QQ API ───────────────────────────────────────────────────
QQ_API_TIMEOUT: float = float(os.environ.get("QQ_API_TIMEOUT", "10"))
QQ_API_MAX_RETRIES: int = int(os.environ.get("QQ_API_MAX_RETRIES", "2"))
# ── YAML 文件路径 ────────────────────────────────────────────
SETTINGS_YAML_PATH: Path = Path(__file__).parent / "settings.yaml"
# ── 动态配置 dataclass ──────────────────────────────────────
@dataclass
class CommandConfig:
"""命令监听动态配置,通过 settings.yaml 热重载。"""
prefix: str = "#"
length_min: int = 2
length_max: int = 4
scope: str = "all" # all / group / private
list_enabled: bool = False # 是否启用黑白名单过滤
list_mode: str = "allow" # allow / deny
allowed_groups: frozenset[str] = frozenset()
denied_groups: frozenset[str] = frozenset()
allowed_users: frozenset[str] = frozenset()
denied_users: frozenset[str] = frozenset()
at_sender: bool = True
callback_url: str = ""
callback_timeout: int = 180
# 全局单例 —— 所有消费者通过 config.command.xxx 访问,避免 stale import 问题
command = CommandConfig()
# ── 环境变量默认值YAML 缺失 key 时的回退)────────────────────
_ENV_DEFAULTS: dict = {
"prefix": os.environ.get("COMMAND_PREFIX", "#"),
"length_min": int(os.environ.get("COMMAND_LENGTH_MIN", "2")),
"length_max": int(os.environ.get("COMMAND_LENGTH_MAX", "4")),
"scope": os.environ.get("COMMAND_SCOPE", "all"),
"list_enabled": os.environ.get("COMMAND_LIST_ENABLED", "false").lower() in ("true", "1", "yes"),
"list_mode": os.environ.get("COMMAND_LIST_MODE", "allow"),
"allowed_groups": os.environ.get("COMMAND_ALLOWED_GROUPS", ""),
"denied_groups": os.environ.get("COMMAND_DENIED_GROUPS", ""),
"allowed_users": os.environ.get("COMMAND_ALLOWED_USERS", ""),
"denied_users": os.environ.get("COMMAND_DENIED_USERS", ""),
"at_sender": os.environ.get("COMMAND_AT_SENDER", "true").lower() in ("true", "1", "yes"),
"callback_url": os.environ.get("COMMAND_CALLBACK_URL", ""),
"callback_timeout": int(os.environ.get("COMMAND_CALLBACK_TIMEOUT", "180")),
}
# ── YAML I/O ─────────────────────────────────────────────────
_YAML_HEADER = (
"# settings.yaml - 命令监听动态配置\n"
"# 可通过后台管理界面修改,也可手动编辑(自动热重载)\n\n"
)
def _yaml_defaults() -> dict:
"""返回默认 YAML 结构(首次启动时写入)。"""
return {
"command": {
"prefix": "#",
"length_min": 2,
"length_max": 4,
"scope": "all",
"list_enabled": False,
"list_mode": "allow",
"allowed_groups": [],
"denied_groups": [],
"allowed_users": [],
"denied_users": [],
"at_sender": True,
"callback_url": "",
"callback_timeout": 180,
}
}
def load_yaml() -> dict:
"""读取 settings.yaml文件不存在时返回空字典。"""
if not SETTINGS_YAML_PATH.exists():
return {}
with open(SETTINGS_YAML_PATH, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
return data if isinstance(data, dict) else {}
def save_yaml(data: dict) -> None:
"""将配置写入 settings.yaml。"""
with open(SETTINGS_YAML_PATH, "w", encoding="utf-8") as f:
f.write(_YAML_HEADER)
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
def ensure_settings_yaml() -> None:
"""首次启动时创建 settings.yaml合并环境变量覆盖。"""
if SETTINGS_YAML_PATH.exists():
return
data = _yaml_defaults()
# 用环境变量覆盖默认值
env_map = {
"prefix": ("COMMAND_PREFIX", str),
"length_min": ("COMMAND_LENGTH_MIN", int),
"length_max": ("COMMAND_LENGTH_MAX", int),
"scope": ("COMMAND_SCOPE", str),
"list_mode": ("COMMAND_LIST_MODE", str),
"list_enabled": ("COMMAND_LIST_ENABLED", "bool"),
"allowed_groups": ("COMMAND_ALLOWED_GROUPS", "csv_list"),
"denied_groups": ("COMMAND_DENIED_GROUPS", "csv_list"),
"allowed_users": ("COMMAND_ALLOWED_USERS", "csv_list"),
"denied_users": ("COMMAND_DENIED_USERS", "csv_list"),
"at_sender": ("COMMAND_AT_SENDER", "bool"),
"callback_url": ("COMMAND_CALLBACK_URL", str),
"callback_timeout": ("COMMAND_CALLBACK_TIMEOUT", int),
}
for key, (env_name, conv) in env_map.items():
env_val = os.environ.get(env_name)
if env_val is not None:
if conv == "csv_list":
data["command"][key] = [v.strip() for v in env_val.split(",") if v.strip()]
elif conv == "bool":
data["command"][key] = env_val.lower() in ("true", "1", "yes")
else:
data["command"][key] = conv(env_val)
save_yaml(data)
logger.info("Created settings.yaml with defaults (env-var overrides applied)")
# ── 将 YAML 数据应用到全局 command 对象 ───────────────────────
def _apply_yaml_to_command(data: dict) -> None:
"""将 YAML command 段合并到全局 command 对象,缺失 key 回退到环境变量默认值。"""
cmd_data = data.get("command", {})
command.prefix = str(cmd_data.get("prefix", _ENV_DEFAULTS["prefix"]))
command.length_min = int(cmd_data.get("length_min", _ENV_DEFAULTS["length_min"]))
command.length_max = int(cmd_data.get("length_max", _ENV_DEFAULTS["length_max"]))
command.scope = str(cmd_data.get("scope", _ENV_DEFAULTS["scope"]))
command.list_mode = str(cmd_data.get("list_mode", _ENV_DEFAULTS["list_mode"]))
# list_enabled: YAML bool → Python bool
list_enabled_val = cmd_data.get("list_enabled", _ENV_DEFAULTS["list_enabled"])
if isinstance(list_enabled_val, str):
command.list_enabled = list_enabled_val.lower() in ("true", "1", "yes")
else:
command.list_enabled = bool(list_enabled_val)
command.callback_url = str(cmd_data.get("callback_url", _ENV_DEFAULTS["callback_url"]))
command.callback_timeout = int(cmd_data.get("callback_timeout", _ENV_DEFAULTS["callback_timeout"]))
# at_sender: YAML bool → Python bool
at_sender_val = cmd_data.get("at_sender", _ENV_DEFAULTS["at_sender"])
if isinstance(at_sender_val, str):
command.at_sender = at_sender_val.lower() in ("true", "1", "yes")
else:
command.at_sender = bool(at_sender_val)
# 列表字段: YAML list → frozenset
for field_name in ("allowed_groups", "denied_groups", "allowed_users", "denied_users"):
raw = cmd_data.get(field_name)
if raw is None:
# 回退到环境变量默认值
env_default = _ENV_DEFAULTS[field_name]
if isinstance(env_default, str):
setattr(command, field_name, frozenset(
v.strip() for v in env_default.split(",") if v.strip()
))
else:
setattr(command, field_name, frozenset(env_default) if env_default else frozenset())
elif isinstance(raw, str):
setattr(command, field_name, frozenset(
v.strip() for v in raw.split(",") if v.strip()
))
elif isinstance(raw, list):
setattr(command, field_name, frozenset(str(v).strip() for v in raw if str(v).strip()))
else:
setattr(command, field_name, frozenset())
# 重建命令匹配正则
from .handlers.command import rebuild_pattern
rebuild_pattern()
# ── 公共 API ─────────────────────────────────────────────────
def reload_settings() -> None:
"""重新读取 settings.yaml 并应用到全局 command 对象。"""
data = load_yaml()
_apply_yaml_to_command(data)
logger.info("Config reloaded from settings.yaml")
def get_settings_flat() -> dict[str, str]:
"""返回所有动态配置的扁平字典 {key: str_value},供管理 API 使用。"""
return {
"command_prefix": command.prefix,
"command_length_min": str(command.length_min),
"command_length_max": str(command.length_max),
"command_scope": command.scope,
"command_list_mode": command.list_mode,
"command_list_enabled": str(command.list_enabled).lower(),
"command_allowed_groups": ",".join(sorted(command.allowed_groups)),
"command_denied_groups": ",".join(sorted(command.denied_groups)),
"command_allowed_users": ",".join(sorted(command.allowed_users)),
"command_denied_users": ",".join(sorted(command.denied_users)),
"command_at_sender": str(command.at_sender).lower(),
"command_callback_url": command.callback_url,
"command_callback_timeout": str(command.callback_timeout),
}
# API 平坦 key → (YAML key, 类型转换) 映射
_API_KEY_MAP: dict[str, tuple[str, ...]] = {
"command_prefix": ("prefix", str),
"command_length_min": ("length_min", int),
"command_length_max": ("length_max", int),
"command_scope": ("scope", str),
"command_list_mode": ("list_mode", str),
"command_list_enabled": ("list_enabled", "bool"),
"command_at_sender": ("at_sender", "bool"),
"command_callback_url": ("callback_url", str),
"command_callback_timeout": ("callback_timeout", int),
"command_allowed_groups": ("allowed_groups", "csv_list"),
"command_denied_groups": ("denied_groups", "csv_list"),
"command_allowed_users": ("allowed_users", "csv_list"),
"command_denied_users": ("denied_users", "csv_list"),
}
def update_settings_from_api(data: dict[str, str]) -> dict[str, str]:
"""从管理 API 接收扁平字典,合并到 settings.yaml 并热重载。返回已应用的字段。"""
allowed_keys = set(_API_KEY_MAP.keys())
filtered = {k: str(v) for k, v in data.items() if k in allowed_keys}
if not filtered:
return {}
# 读取当前 YAML合并修改
yaml_data = load_yaml()
if "command" not in yaml_data:
yaml_data["command"] = {}
cmd = yaml_data["command"]
for flat_key, value in filtered.items():
yaml_key, conv = _API_KEY_MAP[flat_key]
if conv == "csv_list":
cmd[yaml_key] = [v.strip() for v in value.split(",") if v.strip()]
elif conv == "bool":
cmd[yaml_key] = value.lower() in ("true", "1", "yes")
elif conv is int:
try:
cmd[yaml_key] = int(value)
except ValueError:
pass
else:
cmd[yaml_key] = value
save_yaml(yaml_data)
# 写入后立即应用(不等文件 watcher
_apply_yaml_to_command(yaml_data)
return filtered

1106
handlers/admin.py Normal file

File diff suppressed because it is too large Load Diff

176
handlers/command.py Normal file
View File

@@ -0,0 +1,176 @@
"""命令监听处理器:匹配 #命令名 格式的消息,转发到外部回调 URL 并自动回复。"""
import re
import aiohttp
from ..config import UPLOAD_DIR, command
from ..handlers.message import _resolve_url
def build_command_pattern() -> re.Pattern:
"""构建命令匹配正则:# + N个字符中文/数字/字母/下划线等),后面可跟空格+内容或无内容。
每个"字符"按 Unicode 码点计:
- 一个中文字 = 1
- 一个数字 = 1
- 一个英文字母 = 1
- 其他非空白字符 = 1
"""
return re.compile(
rf"^{re.escape(command.prefix)}(\S{{{command.length_min},{command.length_max}}})(?:\s+(.+))?$",
re.DOTALL,
)
COMMAND_PATTERN = build_command_pattern()
def rebuild_pattern() -> None:
"""动态配置变更后重新编译正则。"""
global COMMAND_PATTERN
COMMAND_PATTERN = build_command_pattern()
def parse_command(raw_message: str) -> dict | None:
"""解析消息,匹配命令模式。返回 {command, content, raw_message} 或 None。"""
match = COMMAND_PATTERN.match(raw_message.strip())
if not match:
return None
return {
"command": match.group(1),
"content": match.group(2).strip() if match.group(2) else "",
"raw_message": raw_message.strip(),
}
async def send_command_callback(data: dict, event, api, logger) -> None:
"""将命令数据 POST 到外部回调 URL根据响应自动回复到 QQ。
回调服务器返回格式:
{
"reply": "回复文本", // 纯文本回复(引用原消息)
"messages": [ // 批量回复(可选,优先于 reply
{"type": "text", "msg": "..."},
{"type": "image", "url": "..."},
{"type": "file", "url": "..."},
{"type": "video", "url": "..."}
],
"at_sender": true // 是否 @发送者(默认取配置,仅群聊)
}
所有字段均为可选,无回复内容时返回空 JSON 即可。
回复会引用触发命令的原消息。
"""
if not command.callback_url:
logger.warning("callback_url 未配置,跳过命令回调")
return
try:
async with aiohttp.ClientSession() as session:
async with session.post(
command.callback_url,
json=data,
timeout=aiohttp.ClientTimeout(total=command.callback_timeout),
) as resp:
if resp.status >= 400:
body = await resp.text()
logger.error(
"命令回调失败: status=%d url=%s body=%s",
resp.status, command.callback_url, body[:200],
)
return
# 解析响应,自动回复
try:
result = await resp.json(content_type=None)
except Exception:
return
if not isinstance(result, dict):
return
await _handle_reply(result, event.data, api, logger)
except Exception as exc:
logger.error("命令回调异常: url=%s error=%s", command.callback_url, exc)
async def _handle_reply(result: dict, msg_event, api, logger) -> None:
"""处理回调响应引用原消息自动回复。msg_event 是 GroupMessageEvent / PrivateMessageEvent。"""
# at_sender: 回调响应中的值优先,未指定则使用全局配置
at_sender = result.get("at_sender", command.at_sender)
messages = result.get("messages")
reply = result.get("reply")
group_id = getattr(msg_event, "group_id", None)
user_id = msg_event.user_id
message_id = msg_event.message_id
# 构造引用消息段
from ncatbot.types import MessageArray, Reply
def build_reply_msg(text=None, image=None, video=None) -> MessageArray:
"""构建带引用的消息段。"""
msg = MessageArray()
msg.add_reply(message_id)
if group_id and at_sender:
msg.add_at(user_id)
msg.add_text(" ")
if text is not None:
msg.add_text(text)
if image is not None:
msg.add_image(image)
if video is not None:
msg.add_video(video)
return msg
# 批量回复:引用 + 批量消息段
if messages and isinstance(messages, list):
text_parts: list[str] = []
image_url: str | None = None
video_url: str | None = None
file_msgs: list[dict] = []
for msg in messages:
msg_type = msg.get("type", "text")
if msg_type == "text":
text_parts.append(msg.get("msg", ""))
elif msg_type == "image":
image_url = _resolve_url(msg.get("url", ""))
elif msg_type == "video":
video_url = _resolve_url(msg.get("url", ""))
elif msg_type == "file":
file_msgs.append(msg)
text = "\n".join(text_parts) if text_parts else None
try:
# 组合消息(带引用)
if text or image_url or video_url:
reply_msg = build_reply_msg(text=text, image=image_url, video=video_url)
if group_id:
await api.qq.post_group_array_msg(group_id=group_id, msg=reply_msg)
else:
await api.qq.post_private_array_msg(user_id=user_id, msg=reply_msg)
# 文件单独发
for fm in file_msgs:
url = _resolve_url(fm.get("url", ""))
filename = url.split("/")[-1]
if group_id:
await api.qq.send_group_file(group_id=group_id, file=url, name=filename)
else:
await api.qq.send_private_file(user_id=user_id, file=url, name=filename)
except Exception as exc:
logger.error("命令回复失败: %s", exc)
return
# 纯文本回复:引用原消息
if reply and isinstance(reply, str):
try:
reply_msg = build_reply_msg(text=reply)
if group_id:
await api.qq.post_group_array_msg(group_id=group_id, msg=reply_msg)
else:
await api.qq.post_private_array_msg(user_id=user_id, msg=reply_msg)
except Exception as exc:
logger.error("命令回复失败: %s", exc)

View File

@@ -10,9 +10,8 @@ from .response import error
@web.middleware
async def auth_middleware(request: web.Request, handler):
""" /upload 和 /webhook 路径强制校验 API Key"""
# 健康检查不需要鉴权
if request.path == "/healthz":
"""需要鉴权的路径校验 API Key。仅 /healthz 和管理页面 HTML 无需鉴权"""
if request.path == "/healthz" or request.path == "/admin/":
return await handler(request)
auth_header = request.headers.get("Authorization", "")

136
plugin.py
View File

@@ -7,7 +7,18 @@ import os
from aiohttp import web
from ncatbot.plugin import NcatBotPlugin
from .config import HOST, PORT, UPLOAD_DIR, WEBHOOK_API_KEY
from .config import (
HOST,
PORT,
UPLOAD_DIR,
WEBHOOK_API_KEY,
SETTINGS_YAML_PATH,
command,
ensure_settings_yaml,
reload_settings,
)
from .handlers.admin import admin_page_handler, api_get_settings, api_reload_settings, api_update_settings
from .handlers.command import parse_command, send_command_callback
from .handlers.health import health_handler
from .handlers.message import webhook_handler
from .handlers.upload import cleanup_expired_files, upload_handler
@@ -26,21 +37,44 @@ class WebHookPlugin(NcatBotPlugin):
super().__init__(*args, **kwargs)
self._webhook_runner: web.AppRunner | None = None
self._cleanup_task: asyncio.Task | None = None
self._listener_task: asyncio.Task | None = None
self._watcher_task: asyncio.Task | None = None
async def on_load(self):
# 初始化 settings.yaml 并加载配置
ensure_settings_yaml()
reload_settings()
self.logger.info("Webhook 插件已加载")
self.logger.info("WEBHOOK_API_KEY: %s", "已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成")
self.logger.info(
"WEBHOOK_API_KEY: %s",
"已配置" if os.environ.get("WEBHOOK_API_KEY") else "自动生成",
)
self.logger.info(
"命令监听: 前缀=%s 长度=%d~%d 范围=%s 名单=%s(%s) 回调=%s",
command.prefix,
command.length_min,
command.length_max,
command.scope,
"" if command.list_enabled else "",
command.list_mode,
command.callback_url or "未配置",
)
asyncio.create_task(self._start_webhook())
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
self._listener_task = asyncio.create_task(self._message_listener())
self._watcher_task = asyncio.create_task(self._config_watcher())
async def on_close(self):
if self._cleanup_task is not None:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
self._cleanup_task = None
for task_attr in ("_watcher_task", "_listener_task", "_cleanup_task"):
task = getattr(self, task_attr)
if task is not None:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
setattr(self, task_attr, None)
await self._stop_webhook()
self.logger.info("Webhook 插件已卸载")
@@ -53,6 +87,82 @@ class WebHookPlugin(NcatBotPlugin):
except Exception as exc:
self.logger.error("清理过期文件失败: %s", exc)
async def _config_watcher(self) -> None:
"""轮询 settings.yaml 的 mtime变更时热重载配置。"""
last_mtime: float = 0.0
while True:
await asyncio.sleep(2)
try:
if SETTINGS_YAML_PATH.exists():
current_mtime = SETTINGS_YAML_PATH.stat().st_mtime
if current_mtime != last_mtime:
last_mtime = current_mtime
reload_settings()
self.logger.info("settings.yaml changed, config reloaded")
except Exception as exc:
self.logger.error("Config watcher error: %s", exc)
async def _message_listener(self) -> None:
"""监听 QQ 消息,匹配命令模式后转发到外部回调。"""
try:
async with self.events("message") as stream:
async for event in stream:
try:
raw_message = event.data.raw_message
if not raw_message:
continue
parsed = parse_command(raw_message)
if not parsed:
continue
# 范围过滤group / private / all
is_group = hasattr(event.data, "group_id")
if command.scope == "group" and not is_group:
continue
if command.scope == "private" and is_group:
continue
# 黑白名单过滤
if command.list_enabled:
if command.list_mode == "allow":
# 白名单模式OR 逻辑):用户在名单 OR 群在名单 → 放行
# 名单为空时视为不限制
if command.allowed_users or command.allowed_groups:
user_ok = event.data.user_id in command.allowed_users
group_ok = is_group and event.data.group_id in command.allowed_groups
if not (user_ok or group_ok):
continue
elif command.list_mode == "deny":
# 黑名单模式:用户在名单 OR 群在名单 → 拒绝
user_blocked = event.data.user_id in command.denied_users
group_blocked = is_group and event.data.group_id in command.denied_groups
if user_blocked or group_blocked:
continue
# 构建回调数据
data = {
"command": parsed["command"],
"content": parsed["content"],
"raw_message": parsed["raw_message"],
"user_id": event.data.user_id,
"message_id": event.data.message_id,
}
if is_group:
data["group_id"] = event.data.group_id
self.logger.info(
"命令监听匹配: command=%s user=%s group=%s",
parsed["command"],
data["user_id"],
data.get("group_id", "-"),
)
asyncio.create_task(
send_command_callback(data, event, self.api, self.logger)
)
except Exception as exc:
self.logger.error("消息处理异常: %s", exc)
except asyncio.CancelledError:
return
def _create_app(self) -> web.Application:
app = web.Application(middlewares=[request_id_middleware, auth_middleware])
app["qq_api"] = self.api
@@ -60,6 +170,11 @@ class WebHookPlugin(NcatBotPlugin):
app.router.add_get("/healthz", health_handler)
app.router.add_post("/webhook", webhook_handler)
app.router.add_post("/upload", upload_handler)
# 后台管理
app.router.add_get("/admin/", admin_page_handler)
app.router.add_get("/api/settings", api_get_settings)
app.router.add_put("/api/settings", api_update_settings)
app.router.add_post("/api/settings/reload", api_reload_settings)
return app
async def _start_webhook(self):
@@ -71,9 +186,10 @@ class WebHookPlugin(NcatBotPlugin):
await site.start()
self.logger.info("Webhook 已启动: %s:%d", HOST, PORT)
self.logger.info("上传目录: %s", UPLOAD_DIR)
self.logger.info("后台管理: http://%s:%d/admin/", HOST, PORT)
async def _stop_webhook(self):
if self._webhook_runner is not None:
await self._webhook_runner.cleanup()
self._webhook_runner = None
self.logger.info("Webhook 已停止")
self.logger.info("Webhook 已停止")

View File

@@ -8,4 +8,5 @@ dependencies = [
"ncatbot5>=5.5.2.post3",
"aiohttp>=3.9",
"python-dotenv>=1.0",
"pyyaml>=6.0",
]

2
uv.lock generated
View File

@@ -911,6 +911,7 @@ dependencies = [
{ name = "aiohttp" },
{ name = "ncatbot5" },
{ name = "python-dotenv" },
{ name = "pyyaml" },
]
[package.metadata]
@@ -918,6 +919,7 @@ requires-dist = [
{ name = "aiohttp", specifier = ">=3.9" },
{ name = "ncatbot5", specifier = ">=5.5.2.post3" },
{ name = "python-dotenv", specifier = ">=1.0" },
{ name = "pyyaml", specifier = ">=6.0" },
]
[[package]]