Files
webhook/handlers/message.py
zhilv b86a2d4c4e 🐛 fix(*): 修复代码审查中发现的问题
- **Bug 修复**
  - `message.py`: 批量发送时使用 `(index, msg)` 元组替代 `messages.index(msg)`,避免重复 dict 查找错误
  - `message.py`: 多张图片逐张发送,不再静默丢弃后续图片
  - `plugin.py`: API Key 日志只打印"已配置/自动生成",不再泄露密钥

- **潜在问题修复**
  - `message.py`: lambda 闭包添加默认参数绑定,防止循环变量捕获问题
  - `upload.py`: 文件超限后消费剩余 multipart 数据,避免 reader 状态异常
  - `config.py`: PORT 环境变量非法值容错,默认回退 8081
  - `plugin.py`: cleanup task 保存引用,on_close 时正确取消,避免热重载泄漏

- **代码风格**
  - `message.py`: 无插值 f-string 改为普通字符串
  - `upload.py`: read_chunk 硬编码提取为 CHUNK_SIZE 常量
2026-05-02 15:28:54 +08:00

308 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""消息发送处理器JSON 解析、参数校验、QQ API 超时与重试、批量发送。"""
import asyncio
from pathlib import Path
from aiohttp import web
from ..config import QQ_API_MAX_RETRIES, QQ_API_TIMEOUT, UPLOAD_DIR
from ..response import error, ok
VALID_MSG_TYPES = {"text", "image", "file", "video"}
def _resolve_url(url: str) -> str:
"""如果 url 是本地已上传的文件(相对路径),补全为绝对路径。"""
file_path = Path(url)
if not file_path.is_absolute() and (UPLOAD_DIR / url).exists():
return str((UPLOAD_DIR / url).resolve())
return url
def _validate_message(msg: dict) -> str | None:
"""校验单条消息,返回 None 或错误信息。"""
msg_type = msg.get("type", "text")
if msg_type not in VALID_MSG_TYPES:
return f"invalid type: {msg_type}, must be one of {VALID_MSG_TYPES}"
# text 必须有 msgimage/file/video 必须有 url
if msg_type == "text" and not msg.get("msg"):
return "missing required field: msg"
if msg_type in {"image", "file", "video"} and not msg.get("url"):
return "missing required field: url"
return None
async def _call_with_retry(coro_factory, logger, rid: str) -> None:
"""带超时和重试的 API 调用。"""
last_exc: Exception | None = None
for attempt in range(1, QQ_API_MAX_RETRIES + 1):
try:
await asyncio.wait_for(coro_factory(), timeout=QQ_API_TIMEOUT)
return
except asyncio.TimeoutError:
last_exc = asyncio.TimeoutError()
logger.warning(f"[{rid}] QQ API timeout, attempt {attempt}/{QQ_API_MAX_RETRIES}")
except Exception as exc:
last_exc = exc
logger.error(f"[{rid}] QQ API error: {exc}, attempt {attempt}/{QQ_API_MAX_RETRIES}")
raise Exception(f"qq api failed after {QQ_API_MAX_RETRIES} retries: {last_exc}")
async def _send_single(api, msg_type: str, url: str | None, msg_text: str,
group_id: str | None, user_id: str | None,
logger, rid: str) -> None:
"""发送单条消息(非组合模式)。"""
filename = url.split("/")[-1] if url else None
if group_id:
match msg_type:
case "text":
await _call_with_retry(
lambda t=msg_text: api.qq.send_group_text(group_id=group_id, text=t),
logger, rid
)
case "image":
await _call_with_retry(
lambda u=url: api.qq.send_group_image(group_id=group_id, image=u),
logger, rid
)
case "file":
await _call_with_retry(
lambda u=url, n=filename: api.qq.send_group_file(group_id=group_id, file=u, name=n),
logger, rid
)
case "video":
await _call_with_retry(
lambda u=url: api.qq.send_group_video(group_id=group_id, video=u),
logger, rid
)
else:
match msg_type:
case "text":
await _call_with_retry(
lambda t=msg_text: api.qq.send_private_text(user_id=user_id, text=t),
logger, rid
)
case "image":
await _call_with_retry(
lambda u=url: api.qq.send_private_image(user_id=user_id, image=u),
logger, rid
)
case "file":
await _call_with_retry(
lambda u=url, n=filename: api.qq.send_private_file(user_id=user_id, file=u, name=n),
logger, rid
)
case "video":
await _call_with_retry(
lambda u=url: api.qq.send_private_video(user_id=user_id, video=u),
logger, rid
)
async def _send_batch(api, messages: list[dict], group_id: str | None, user_id: str | None,
logger, rid: str) -> list[dict]:
"""批量发送消息。
- text/image/video 组合为一条消息post_group_msg / post_private_msg
- 多张图片拆成多条消息发送
- file 单独发送(独占段)
"""
results: list[dict] = []
# 分组:每条消息记录原始 index
combined_items: list[tuple[int, dict]] = [] # (index, msg) — text/image/video
file_items: list[tuple[int, dict]] = [] # (index, msg) — file
for i, msg in enumerate(messages):
if msg.get("type") == "file":
file_items.append((i, msg))
else:
combined_items.append((i, msg))
# 处理组合消息:按图片拆组,每组最多一张图片
if combined_items:
# 收集所有 text、image、video
text_parts: list[str] = []
image_items: list[tuple[int, dict]] = []
video_url: str | None = None
video_index: int | None = None
combined_indices: list[int] = []
for idx, msg in combined_items:
msg_type = msg.get("type", "text")
if msg_type == "text":
text_parts.append(msg.get("msg", ""))
combined_indices.append(idx)
elif msg_type == "image":
image_items.append((idx, msg))
elif msg_type == "video":
video_url = _resolve_url(msg.get("url", ""))
video_index = idx
text = "\n".join(text_parts) if text_parts else None
# 如果有视频,视频单独发(独占段会吞掉其他内容)
if video_url:
try:
if group_id:
await _call_with_retry(
lambda u=video_url: api.qq.post_group_msg(
group_id=group_id, video=u
),
logger, rid
)
else:
await _call_with_retry(
lambda u=video_url: api.qq.post_private_msg(
user_id=user_id, video=u
),
logger, rid
)
results.append({"index": video_index, "success": True})
except Exception as exc:
logger.error(f"[{rid}] Video message failed: {exc}")
results.append({"index": video_index, "success": False, "error": str(exc)})
# 文本 + 第一张图片组合
if text or image_items:
first_image_url = _resolve_url(image_items[0][1].get("url", "")) if image_items else None
text_image_indices = combined_indices + ([image_items[0][0]] if image_items else [])
try:
if group_id:
await _call_with_retry(
lambda t=text, img=first_image_url: api.qq.post_group_msg(
group_id=group_id, text=t, image=img
),
logger, rid
)
else:
await _call_with_retry(
lambda t=text, img=first_image_url: api.qq.post_private_msg(
user_id=user_id, text=t, image=img
),
logger, rid
)
for idx in text_image_indices:
results.append({"index": idx, "success": True})
except Exception as exc:
logger.error(f"[{rid}] Text+image message failed: {exc}")
for idx in text_image_indices:
results.append({"index": idx, "success": False, "error": str(exc)})
# 剩余图片逐张发送
for img_idx, img_msg in image_items[1:]:
img_url = _resolve_url(img_msg.get("url", ""))
try:
if group_id:
await _call_with_retry(
lambda u=img_url: api.qq.send_group_image(group_id=group_id, image=u),
logger, rid
)
else:
await _call_with_retry(
lambda u=img_url: api.qq.send_private_image(user_id=user_id, image=u),
logger, rid
)
results.append({"index": img_idx, "success": True})
except Exception as exc:
logger.error(f"[{rid}] Image message failed: {exc}")
results.append({"index": img_idx, "success": False, "error": str(exc)})
# 文件单独发送
for idx, msg in file_items:
url = _resolve_url(msg.get("url", ""))
filename = url.split("/")[-1]
try:
if group_id:
await _call_with_retry(
lambda u=url, n=filename: api.qq.send_group_file(group_id=group_id, file=u, name=n),
logger, rid
)
else:
await _call_with_retry(
lambda u=url, n=filename: api.qq.send_private_file(user_id=user_id, file=u, name=n),
logger, rid
)
results.append({"index": idx, "success": True})
except Exception as exc:
logger.error(f"[{rid}] File message failed: {exc}")
results.append({"index": idx, "success": False, "error": str(exc)})
results.sort(key=lambda x: x["index"])
return results
async def webhook_handler(request: web.Request) -> web.Response:
"""处理消息发送请求,支持单条和批量发送。"""
try:
data = await request.json()
except Exception:
return error("invalid json")
if not isinstance(data, dict):
return error("request body must be a json object")
group_id = data.get("group_id")
user_id = data.get("user_id")
if not group_id and not user_id:
return error("need group_id or user_id")
api = request.app["qq_api"]
logger = request.app["logger"]
rid = request.get("request_id", "-")
messages = data.get("messages")
if messages is not None:
# 批量模式
if not isinstance(messages, list):
return error("messages must be an array")
if not messages:
return error("messages cannot be empty")
# 校验所有消息
for i, msg in enumerate(messages):
if not isinstance(msg, dict):
return error(f"messages[{i}] must be an object")
err = _validate_message(msg)
if err:
return error(f"messages[{i}]: {err}")
try:
results = await _send_batch(api, messages, group_id, user_id, logger, rid)
except Exception as exc:
logger.error(f"[{rid}] Batch send failed: {exc}")
return error(f"batch send failed: {exc}", code=502, status=502)
success_count = sum(1 for r in results if r["success"])
return ok(data={
"total": len(results),
"success": success_count,
"failed": len(results) - success_count,
"results": results,
})
else:
# 单条模式
err = _validate_message(data)
if err:
return error(err)
msg_type = data.get("type", "text")
url = _resolve_url(data.get("url", "")) if msg_type != "text" else None
msg_text = data.get("msg", "")
try:
await _send_single(api, msg_type, url, msg_text, group_id, user_id, logger, rid)
return ok()
except Exception as exc:
logger.error(f"[{rid}] QQ API error: {exc}")
return error(f"qq api failed: {exc}", code=502, status=502)