feat(*): 添加测试项目代码

This commit is contained in:
2026-05-01 21:22:57 +08:00
commit f4eba61365
15 changed files with 1599 additions and 0 deletions

17
.env.example Normal file
View File

@@ -0,0 +1,17 @@
# ── 鉴权 ──
# 不设置时自动生成 UUIDv4 随机密钥并打印到启动日志
WEBHOOK_API_KEY=
# ── 网络 ──
WEBHOOK_HOST=0.0.0.0
WEBHOOK_PORT=8081
# ── 上传 ──
UPLOAD_DIR=./uploads
MAX_UPLOAD_SIZE=20971520
# 留空不限制多个用逗号分隔jpg,png,gif,pdf
ALLOWED_EXTENSIONS=
# ── QQ API ──
QQ_API_TIMEOUT=10
QQ_API_MAX_RETRIES=2

17
.gitignore vendored Normal file
View File

@@ -0,0 +1,17 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
.claude
# Virtual environments
.venv
# Uploads
uploads/
# Environment
.env

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.12

115
README.md Normal file
View File

@@ -0,0 +1,115 @@
# ncatbot-webhook-plugin
NcatBot 插件,对外暴露 HTTP 接口,接收外部消息转发至 QQ。
## 快速开始
```bash
# 安装依赖
uv sync
# 复制环境变量并填写
cp .env.example .env
# 编辑 .env至少设置 WEBHOOK_API_KEY
# 启动
uv run python -m ncatbot
```
## 环境变量
| 变量 | 必填 | 默认值 | 说明 |
|---|---|---|---|
| `WEBHOOK_API_KEY` | 否 | 自动生成 | API 鉴权密钥,未设置时自动生成 UUIDv4 并打印到启动日志 |
| `WEBHOOK_HOST` | 否 | `0.0.0.0` | 监听地址 |
| `WEBHOOK_PORT` | 否 | `8081` | 监听端口 |
| `UPLOAD_DIR` | 否 | `./uploads` | 上传文件保存目录 |
| `MAX_UPLOAD_SIZE` | 否 | `20971520` | 单文件最大字节数(默认 20 MB |
| `ALLOWED_EXTENSIONS` | 否 | 空(不限) | 允许的扩展名,逗号分隔,如 `jpg,png,pdf` |
| `QQ_API_TIMEOUT` | 否 | `10` | QQ API 超时秒数 |
| `QQ_API_MAX_RETRIES` | 否 | `2` | QQ API 失败重试次数 |
## 接口说明
所有接口均需通过 `Authorization: Bearer <KEY>``X-API-Key: <KEY>` 鉴权。
### GET /healthz
健康检查,无需鉴权。
```json
{"status": "ok", "health": "ok"}
```
### POST /webhook
发送 QQ 消息。
**请求体:**
```json
{
"type": "text",
"group_id": "123456",
"msg": "Hello!"
}
```
| 字段 | 必填 | 说明 |
|---|---|---|
| `type` | 否 | 消息类型:`text`(默认)、`image``file``video` |
| `group_id` | 二选一 | 群号 |
| `user_id` | 二选一 | QQ 号(私聊) |
| `msg` | text 时必填 | 文本内容 |
| `url` | image/file/video 时必填 | 资源链接 |
**成功响应:**
```json
{"status": "ok"}
```
**错误响应:**
```json
{"status": "error", "error": "invalid json", "code": 400}
```
### POST /upload
上传文件到服务器,供后续消息引用。
**请求:** `multipart/form-data`,字段名不限,每个文件部分需带 `filename`
**成功响应:**
```json
{
"status": "ok",
"files": ["photo.jpg"],
"path": "photo.jpg"
}
```
`files` 为相对路径(文件 ID`path` 为单文件时的快捷字段。
## 项目结构
```
├── main.py # 入口,组装并启动服务
├── config.py # 配置(环境变量)
├── middleware.py # 鉴权 & 请求 ID 中间件
├── response.py # 统一响应格式
├── handlers/
│ ├── health.py # /healthz
│ ├── message.py # /webhook 消息发送
│ └── upload.py # /upload 文件上传
├── .env.example # 环境变量模板
└── pyproject.toml # 依赖声明
```
## 内网部署建议
- 放在 Nginx/Caddy 后面,启用 HTTPS 和限流
- 用 systemd 管理进程,配置 `EnvironmentFile=.env`
- 日志默认输出到 stdout可由 systemd/journald 收集

25
config.py Normal file
View File

@@ -0,0 +1,25 @@
"""项目配置:所有值从环境变量读取,未配置时使用安全默认值。"""
import os
import uuid
from pathlib import Path
# ── 鉴权 ────────────────────────────────────────────────────
WEBHOOK_API_KEY: str = os.environ.get("WEBHOOK_API_KEY", "") or uuid.uuid4().hex
# ── 网络 ─────────────────────────────────────────────────────
HOST: str = os.environ.get("WEBHOOK_HOST", "0.0.0.0")
PORT: int = int(os.environ.get("WEBHOOK_PORT", "8081"))
# ── 上传 ─────────────────────────────────────────────────────
UPLOAD_DIR: Path = Path(os.environ.get("UPLOAD_DIR", str(Path(__file__).parent / "uploads")))
# 单个文件最大 20 MB
MAX_UPLOAD_SIZE: int = int(os.environ.get("MAX_UPLOAD_SIZE", str(20 * 1024 * 1024)))
# 允许的文件扩展名(小写,不含点),为空则不限制
ALLOWED_EXTENSIONS: set[str] = set(
filter(None, os.environ.get("ALLOWED_EXTENSIONS", "").lower().split(","))
)
# ── QQ API ───────────────────────────────────────────────────
QQ_API_TIMEOUT: float = float(os.environ.get("QQ_API_TIMEOUT", "10"))
QQ_API_MAX_RETRIES: int = int(os.environ.get("QQ_API_MAX_RETRIES", "2"))

0
handlers/__init__.py Normal file
View File

9
handlers/health.py Normal file
View File

@@ -0,0 +1,9 @@
"""健康检查端点。"""
from aiohttp import web
from response import ok
async def health_handler(request: web.Request) -> web.Response:
return ok(data={"health": "ok"})

125
handlers/message.py Normal file
View File

@@ -0,0 +1,125 @@
"""消息发送处理器JSON 解析、参数校验、QQ API 超时与重试。"""
import asyncio
from aiohttp import web
from config import QQ_API_MAX_RETRIES, QQ_API_TIMEOUT
from response import error, ok
VALID_MSG_TYPES = {"text", "image", "file", "video"}
# 每种消息类型必填的字段
REQUIRED_FIELDS: dict[str, list[str]] = {
"text": ["msg"],
"image": ["url"],
"file": ["url"],
"video": ["url"],
}
def _validate_payload(data: dict) -> tuple[dict | None, web.Response | None]:
"""校验请求体,返回 (data, None) 或 (None, error_response)。"""
group_id = data.get("group_id")
user_id = data.get("user_id")
if not group_id and not user_id:
return None, error("need group_id or user_id")
msg_type = data.get("type", "text")
if msg_type not in VALID_MSG_TYPES:
return None, error(f"invalid type: {msg_type}, must be one of {VALID_MSG_TYPES}")
# 检查必填字段
missing = [f for f in REQUIRED_FIELDS.get(msg_type, []) if not data.get(f)]
if missing:
return None, error(f"missing required fields: {', '.join(missing)}")
return data, None
async def _call_qq_api(coro_factory, request: web.Request) -> web.Response:
"""带超时和重试的 QQ API 调用。"""
logger = request.app["logger"]
rid = request.get("request_id", "-")
last_exc: Exception | None = None
for attempt in range(1, QQ_API_MAX_RETRIES + 1):
try:
await asyncio.wait_for(coro_factory(), timeout=QQ_API_TIMEOUT)
return ok()
except asyncio.TimeoutError:
last_exc = asyncio.TimeoutError()
logger.warning(f"[{rid}] QQ API timeout, attempt {attempt}/{QQ_API_MAX_RETRIES}")
except Exception as exc:
last_exc = exc
logger.error(f"[{rid}] QQ API error: {exc}, attempt {attempt}/{QQ_API_MAX_RETRIES}")
logger.error(f"[{rid}] QQ API failed after {QQ_API_MAX_RETRIES} retries: {last_exc}")
return error(f"qq api failed: {last_exc}", code=502, status=502)
async def webhook_handler(request: web.Request) -> web.Response:
"""处理消息发送请求。"""
# 安全解析 JSONaiohttp 可能抛 JSONDecodeError 或 ContentTypeError
try:
data = await request.json()
except Exception:
return error("invalid json")
if not isinstance(data, dict):
return error("request body must be a json object")
data, err = _validate_payload(data)
if err:
return err
msg_type = data.get("type", "text")
group_id = data.get("group_id")
user_id = data.get("user_id")
msg = data.get("msg", "")
url = data.get("url", "")
# 获取 ncatbot API 实例
api = request.app["qq_api"]
if group_id:
match msg_type:
case "text":
return await _call_qq_api(
lambda: api.qq.send_group_text(group_id=group_id, text=msg), request
)
case "image":
return await _call_qq_api(
lambda: api.qq.send_group_image(group_id=group_id, image=url), request
)
case "file":
return await _call_qq_api(
lambda: api.qq.send_group_file(group_id=group_id, file=url, name=url.split("/")[-1]),
request,
)
case "video":
return await _call_qq_api(
lambda: api.qq.send_group_video(group_id=group_id, video=url), request
)
else:
match msg_type:
case "text":
return await _call_qq_api(
lambda: api.qq.send_private_text(user_id=user_id, text=msg), request
)
case "image":
return await _call_qq_api(
lambda: api.qq.send_private_image(user_id=user_id, image=url), request
)
case "file":
return await _call_qq_api(
lambda: api.qq.send_private_file(user_id=user_id, file=url, name=url.split("/")[-1]),
request,
)
case "video":
return await _call_qq_api(
lambda: api.qq.send_private_video(user_id=user_id, video=url), request
)
return error("unreachable", code=500, status=500)

102
handlers/upload.py Normal file
View File

@@ -0,0 +1,102 @@
"""上传处理器:文件上传、大小/类型限制、异步写入、自动清理过期文件。"""
import asyncio
import logging
import time
from pathlib import Path
from aiohttp import BodyPartReader, web
from config import ALLOWED_EXTENSIONS, MAX_UPLOAD_SIZE, UPLOAD_DIR
from response import error, ok
logger = logging.getLogger("webhook-plugin.upload")
# 文件最大保留秒数(默认 24 小时)
FILE_TTL_SECONDS: int = 24 * 60 * 60
def _check_extension(filename: str) -> bool:
"""检查文件扩展名是否在允许列表内。"""
if not ALLOWED_EXTENSIONS:
return True
ext = Path(filename).suffix.lstrip(".").lower()
return ext in ALLOWED_EXTENSIONS
async def cleanup_expired_files() -> None:
"""删除上传目录中超过 FILE_TTL_SECONDS 的文件。"""
if not UPLOAD_DIR.exists():
return
now = time.time()
for path in UPLOAD_DIR.iterdir():
if path.is_file() and (now - path.stat().st_mtime) > FILE_TTL_SECONDS:
try:
path.unlink()
logger.info("已清理过期文件: %s", path.name)
except OSError as exc:
logger.warning("清理文件失败 %s: %s", path.name, exc)
async def upload_handler(request: web.Request) -> web.Response:
"""接收 multipart/form-data 上传,保存到 uploads 目录,返回相对文件 ID。"""
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
reader = await request.multipart()
saved_ids: list[str] = []
async for part in reader:
if not isinstance(part, BodyPartReader) or not part.filename:
continue
filename: str = Path(part.filename).name # 防路径穿越
if not _check_extension(filename):
return error(f"file type not allowed: {filename}", code=415)
# 读取文件内容并检查大小
chunks: list[bytes] = []
total_size = 0
while True:
chunk = await part.read_chunk(65536)
if not chunk:
break
total_size += len(chunk)
if total_size > MAX_UPLOAD_SIZE:
return error(
f"file too large, max {MAX_UPLOAD_SIZE // (1024*1024)} MB",
code=413,
)
chunks.append(chunk)
if total_size == 0:
return error("empty file", code=400)
# 同名文件自动重命名
save_path = UPLOAD_DIR / filename
stem = Path(filename).stem
suffix = Path(filename).suffix
counter = 1
while save_path.exists():
save_path = UPLOAD_DIR / f"{stem}_{counter}{suffix}"
counter += 1
# 使用线程池写文件,避免阻塞事件循环
data = b"".join(chunks)
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, _write_file, save_path, data)
# 返回相对路径作为文件 ID
file_id = save_path.relative_to(UPLOAD_DIR).as_posix()
saved_ids.append(file_id)
if not saved_ids:
return error("no file uploaded", code=400)
return ok(data={"files": saved_ids, "path": saved_ids[0] if len(saved_ids) == 1 else None})
def _write_file(path: Path, data: bytes) -> None:
"""同步写文件,由线程池调用。"""
with open(path, "wb") as f:
f.write(data)

70
main.py Normal file
View File

@@ -0,0 +1,70 @@
"""ncatbot-webhook-plugin 入口:组装各模块,启动 HTTP 服务。"""
import asyncio
import logging
from aiohttp import web
from ncatbot.plugin import NcatBotPlugin
from config import HOST, PORT, UPLOAD_DIR, WEBHOOK_API_KEY
from handlers.health import health_handler
from handlers.message import webhook_handler
from handlers.upload import cleanup_expired_files, upload_handler
from middleware import auth_middleware, request_id_middleware
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
class WebHookPlugin(NcatBotPlugin):
"""NcatBot 插件:对外暴露 HTTP 接口,接收外部消息转发至 QQ。"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._webhook_runner: web.AppRunner | None = None
async def on_load(self):
self.logger.info("Webhook 插件已加载")
self.logger.info("WEBHOOK_API_KEY: %s", WEBHOOK_API_KEY)
asyncio.create_task(self._start_webhook())
asyncio.create_task(self._cleanup_loop())
async def on_close(self):
await self._stop_webhook()
self.logger.info("Webhook 插件已卸载")
async def _cleanup_loop(self) -> None:
"""每小时清理一次过期上传文件。"""
while True:
await asyncio.sleep(3600)
try:
await cleanup_expired_files()
except Exception as exc:
self.logger.error("清理过期文件失败: %s", exc)
def _create_app(self) -> web.Application:
app = web.Application(middlewares=[request_id_middleware, auth_middleware])
app["qq_api"] = self.api
app["logger"] = self.logger
app.router.add_get("/healthz", health_handler)
app.router.add_post("/webhook", webhook_handler)
app.router.add_post("/upload", upload_handler)
return app
async def _start_webhook(self):
await self._stop_webhook()
app = self._create_app()
self._webhook_runner = web.AppRunner(app)
await self._webhook_runner.setup()
site = web.TCPSite(self._webhook_runner, HOST, PORT)
await site.start()
self.logger.info("Webhook 已启动: %s:%d", HOST, PORT)
self.logger.info("上传目录: %s", UPLOAD_DIR)
async def _stop_webhook(self):
if self._webhook_runner is not None:
await self._webhook_runner.cleanup()
self._webhook_runner = None
self.logger.info("Webhook 已停止")

0
manifest.toml Normal file
View File

37
middleware.py Normal file
View File

@@ -0,0 +1,37 @@
"""鉴权中间件:校验请求中的 API Key。"""
import uuid
from aiohttp import web
from config import WEBHOOK_API_KEY
from response import error
@web.middleware
async def auth_middleware(request: web.Request, handler):
"""对 /upload 和 /webhook 路径强制校验 API Key。"""
# 健康检查不需要鉴权
if request.path == "/healthz":
return await handler(request)
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
key = auth_header[len("Bearer "):]
else:
key = request.headers.get("X-API-Key", "")
if key != WEBHOOK_API_KEY:
return error("unauthorized", code=401, status=401)
return await handler(request)
@web.middleware
async def request_id_middleware(request: web.Request, handler):
"""为每个请求附加唯一 request_id便于日志追踪。"""
request_id = request.headers.get("X-Request-ID", uuid.uuid4().hex[:12])
request["request_id"] = request_id
response = await handler(request)
response.headers["X-Request-ID"] = request_id
return response

10
pyproject.toml Normal file
View File

@@ -0,0 +1,10 @@
[project]
name = "ncatbot-demo"
version = "0.1.0"
description = "NcatBot webhook plugin receive external messages and forward to QQ"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"ncatbot5>=5.5.2.post3",
"aiohttp>=3.9",
]

16
response.py Normal file
View File

@@ -0,0 +1,16 @@
"""规范化响应工具:统一返回 {"code": 0, "msg": "", "data": ...} 格式。"""
from aiohttp import web
def ok(data=None, *, msg: str = "ok", status: int = 200) -> web.Response:
"""成功响应code=0。"""
return web.json_response({"code": 0, "msg": msg, "data": data}, status=status)
def error(msg: str, *, code: int = 1, status: int | None = None) -> web.Response:
"""错误响应code 非 0。"""
return web.json_response(
{"code": code, "msg": msg, "data": None},
status=status or (code if 400 <= code < 600 else 400),
)

1055
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff