- **Bug 修复** - `message.py`: 批量发送时使用 `(index, msg)` 元组替代 `messages.index(msg)`,避免重复 dict 查找错误 - `message.py`: 多张图片逐张发送,不再静默丢弃后续图片 - `plugin.py`: API Key 日志只打印"已配置/自动生成",不再泄露密钥 - **潜在问题修复** - `message.py`: lambda 闭包添加默认参数绑定,防止循环变量捕获问题 - `upload.py`: 文件超限后消费剩余 multipart 数据,避免 reader 状态异常 - `config.py`: PORT 环境变量非法值容错,默认回退 8081 - `plugin.py`: cleanup task 保存引用,on_close 时正确取消,避免热重载泄漏 - **代码风格** - `message.py`: 无插值 f-string 改为普通字符串 - `upload.py`: read_chunk 硬编码提取为 CHUNK_SIZE 常量
308 lines
12 KiB
Python
308 lines
12 KiB
Python
"""消息发送处理器:JSON 解析、参数校验、QQ API 超时与重试、批量发送。"""
|
||
|
||
import asyncio
|
||
from pathlib import Path
|
||
|
||
from aiohttp import web
|
||
|
||
from ..config import QQ_API_MAX_RETRIES, QQ_API_TIMEOUT, UPLOAD_DIR
|
||
from ..response import error, ok
|
||
|
||
VALID_MSG_TYPES = {"text", "image", "file", "video"}
|
||
|
||
|
||
def _resolve_url(url: str) -> str:
|
||
"""如果 url 是本地已上传的文件(相对路径),补全为绝对路径。"""
|
||
file_path = Path(url)
|
||
if not file_path.is_absolute() and (UPLOAD_DIR / url).exists():
|
||
return str((UPLOAD_DIR / url).resolve())
|
||
return url
|
||
|
||
|
||
def _validate_message(msg: dict) -> str | None:
|
||
"""校验单条消息,返回 None 或错误信息。"""
|
||
msg_type = msg.get("type", "text")
|
||
if msg_type not in VALID_MSG_TYPES:
|
||
return f"invalid type: {msg_type}, must be one of {VALID_MSG_TYPES}"
|
||
|
||
# text 必须有 msg,image/file/video 必须有 url
|
||
if msg_type == "text" and not msg.get("msg"):
|
||
return "missing required field: msg"
|
||
if msg_type in {"image", "file", "video"} and not msg.get("url"):
|
||
return "missing required field: url"
|
||
return None
|
||
|
||
|
||
async def _call_with_retry(coro_factory, logger, rid: str) -> None:
|
||
"""带超时和重试的 API 调用。"""
|
||
last_exc: Exception | None = None
|
||
for attempt in range(1, QQ_API_MAX_RETRIES + 1):
|
||
try:
|
||
await asyncio.wait_for(coro_factory(), timeout=QQ_API_TIMEOUT)
|
||
return
|
||
except asyncio.TimeoutError:
|
||
last_exc = asyncio.TimeoutError()
|
||
logger.warning(f"[{rid}] QQ API timeout, attempt {attempt}/{QQ_API_MAX_RETRIES}")
|
||
except Exception as exc:
|
||
last_exc = exc
|
||
logger.error(f"[{rid}] QQ API error: {exc}, attempt {attempt}/{QQ_API_MAX_RETRIES}")
|
||
|
||
raise Exception(f"qq api failed after {QQ_API_MAX_RETRIES} retries: {last_exc}")
|
||
|
||
|
||
async def _send_single(api, msg_type: str, url: str | None, msg_text: str,
|
||
group_id: str | None, user_id: str | None,
|
||
logger, rid: str) -> None:
|
||
"""发送单条消息(非组合模式)。"""
|
||
filename = url.split("/")[-1] if url else None
|
||
|
||
if group_id:
|
||
match msg_type:
|
||
case "text":
|
||
await _call_with_retry(
|
||
lambda t=msg_text: api.qq.send_group_text(group_id=group_id, text=t),
|
||
logger, rid
|
||
)
|
||
case "image":
|
||
await _call_with_retry(
|
||
lambda u=url: api.qq.send_group_image(group_id=group_id, image=u),
|
||
logger, rid
|
||
)
|
||
case "file":
|
||
await _call_with_retry(
|
||
lambda u=url, n=filename: api.qq.send_group_file(group_id=group_id, file=u, name=n),
|
||
logger, rid
|
||
)
|
||
case "video":
|
||
await _call_with_retry(
|
||
lambda u=url: api.qq.send_group_video(group_id=group_id, video=u),
|
||
logger, rid
|
||
)
|
||
else:
|
||
match msg_type:
|
||
case "text":
|
||
await _call_with_retry(
|
||
lambda t=msg_text: api.qq.send_private_text(user_id=user_id, text=t),
|
||
logger, rid
|
||
)
|
||
case "image":
|
||
await _call_with_retry(
|
||
lambda u=url: api.qq.send_private_image(user_id=user_id, image=u),
|
||
logger, rid
|
||
)
|
||
case "file":
|
||
await _call_with_retry(
|
||
lambda u=url, n=filename: api.qq.send_private_file(user_id=user_id, file=u, name=n),
|
||
logger, rid
|
||
)
|
||
case "video":
|
||
await _call_with_retry(
|
||
lambda u=url: api.qq.send_private_video(user_id=user_id, video=u),
|
||
logger, rid
|
||
)
|
||
|
||
|
||
async def _send_batch(api, messages: list[dict], group_id: str | None, user_id: str | None,
|
||
logger, rid: str) -> list[dict]:
|
||
"""批量发送消息。
|
||
|
||
- text/image/video 组合为一条消息(post_group_msg / post_private_msg)
|
||
- 多张图片拆成多条消息发送
|
||
- file 单独发送(独占段)
|
||
"""
|
||
results: list[dict] = []
|
||
|
||
# 分组:每条消息记录原始 index
|
||
combined_items: list[tuple[int, dict]] = [] # (index, msg) — text/image/video
|
||
file_items: list[tuple[int, dict]] = [] # (index, msg) — file
|
||
|
||
for i, msg in enumerate(messages):
|
||
if msg.get("type") == "file":
|
||
file_items.append((i, msg))
|
||
else:
|
||
combined_items.append((i, msg))
|
||
|
||
# 处理组合消息:按图片拆组,每组最多一张图片
|
||
if combined_items:
|
||
# 收集所有 text、image、video
|
||
text_parts: list[str] = []
|
||
image_items: list[tuple[int, dict]] = []
|
||
video_url: str | None = None
|
||
video_index: int | None = None
|
||
combined_indices: list[int] = []
|
||
|
||
for idx, msg in combined_items:
|
||
msg_type = msg.get("type", "text")
|
||
if msg_type == "text":
|
||
text_parts.append(msg.get("msg", ""))
|
||
combined_indices.append(idx)
|
||
elif msg_type == "image":
|
||
image_items.append((idx, msg))
|
||
elif msg_type == "video":
|
||
video_url = _resolve_url(msg.get("url", ""))
|
||
video_index = idx
|
||
|
||
text = "\n".join(text_parts) if text_parts else None
|
||
|
||
# 如果有视频,视频单独发(独占段会吞掉其他内容)
|
||
if video_url:
|
||
try:
|
||
if group_id:
|
||
await _call_with_retry(
|
||
lambda u=video_url: api.qq.post_group_msg(
|
||
group_id=group_id, video=u
|
||
),
|
||
logger, rid
|
||
)
|
||
else:
|
||
await _call_with_retry(
|
||
lambda u=video_url: api.qq.post_private_msg(
|
||
user_id=user_id, video=u
|
||
),
|
||
logger, rid
|
||
)
|
||
results.append({"index": video_index, "success": True})
|
||
except Exception as exc:
|
||
logger.error(f"[{rid}] Video message failed: {exc}")
|
||
results.append({"index": video_index, "success": False, "error": str(exc)})
|
||
|
||
# 文本 + 第一张图片组合
|
||
if text or image_items:
|
||
first_image_url = _resolve_url(image_items[0][1].get("url", "")) if image_items else None
|
||
text_image_indices = combined_indices + ([image_items[0][0]] if image_items else [])
|
||
|
||
try:
|
||
if group_id:
|
||
await _call_with_retry(
|
||
lambda t=text, img=first_image_url: api.qq.post_group_msg(
|
||
group_id=group_id, text=t, image=img
|
||
),
|
||
logger, rid
|
||
)
|
||
else:
|
||
await _call_with_retry(
|
||
lambda t=text, img=first_image_url: api.qq.post_private_msg(
|
||
user_id=user_id, text=t, image=img
|
||
),
|
||
logger, rid
|
||
)
|
||
for idx in text_image_indices:
|
||
results.append({"index": idx, "success": True})
|
||
except Exception as exc:
|
||
logger.error(f"[{rid}] Text+image message failed: {exc}")
|
||
for idx in text_image_indices:
|
||
results.append({"index": idx, "success": False, "error": str(exc)})
|
||
|
||
# 剩余图片逐张发送
|
||
for img_idx, img_msg in image_items[1:]:
|
||
img_url = _resolve_url(img_msg.get("url", ""))
|
||
try:
|
||
if group_id:
|
||
await _call_with_retry(
|
||
lambda u=img_url: api.qq.send_group_image(group_id=group_id, image=u),
|
||
logger, rid
|
||
)
|
||
else:
|
||
await _call_with_retry(
|
||
lambda u=img_url: api.qq.send_private_image(user_id=user_id, image=u),
|
||
logger, rid
|
||
)
|
||
results.append({"index": img_idx, "success": True})
|
||
except Exception as exc:
|
||
logger.error(f"[{rid}] Image message failed: {exc}")
|
||
results.append({"index": img_idx, "success": False, "error": str(exc)})
|
||
|
||
# 文件单独发送
|
||
for idx, msg in file_items:
|
||
url = _resolve_url(msg.get("url", ""))
|
||
filename = url.split("/")[-1]
|
||
|
||
try:
|
||
if group_id:
|
||
await _call_with_retry(
|
||
lambda u=url, n=filename: api.qq.send_group_file(group_id=group_id, file=u, name=n),
|
||
logger, rid
|
||
)
|
||
else:
|
||
await _call_with_retry(
|
||
lambda u=url, n=filename: api.qq.send_private_file(user_id=user_id, file=u, name=n),
|
||
logger, rid
|
||
)
|
||
results.append({"index": idx, "success": True})
|
||
except Exception as exc:
|
||
logger.error(f"[{rid}] File message failed: {exc}")
|
||
results.append({"index": idx, "success": False, "error": str(exc)})
|
||
|
||
results.sort(key=lambda x: x["index"])
|
||
return results
|
||
|
||
|
||
async def webhook_handler(request: web.Request) -> web.Response:
|
||
"""处理消息发送请求,支持单条和批量发送。"""
|
||
try:
|
||
data = await request.json()
|
||
except Exception:
|
||
return error("invalid json")
|
||
|
||
if not isinstance(data, dict):
|
||
return error("request body must be a json object")
|
||
|
||
group_id = data.get("group_id")
|
||
user_id = data.get("user_id")
|
||
|
||
if not group_id and not user_id:
|
||
return error("need group_id or user_id")
|
||
|
||
api = request.app["qq_api"]
|
||
logger = request.app["logger"]
|
||
rid = request.get("request_id", "-")
|
||
|
||
messages = data.get("messages")
|
||
|
||
if messages is not None:
|
||
# 批量模式
|
||
if not isinstance(messages, list):
|
||
return error("messages must be an array")
|
||
|
||
if not messages:
|
||
return error("messages cannot be empty")
|
||
|
||
# 校验所有消息
|
||
for i, msg in enumerate(messages):
|
||
if not isinstance(msg, dict):
|
||
return error(f"messages[{i}] must be an object")
|
||
err = _validate_message(msg)
|
||
if err:
|
||
return error(f"messages[{i}]: {err}")
|
||
|
||
try:
|
||
results = await _send_batch(api, messages, group_id, user_id, logger, rid)
|
||
except Exception as exc:
|
||
logger.error(f"[{rid}] Batch send failed: {exc}")
|
||
return error(f"batch send failed: {exc}", code=502, status=502)
|
||
|
||
success_count = sum(1 for r in results if r["success"])
|
||
return ok(data={
|
||
"total": len(results),
|
||
"success": success_count,
|
||
"failed": len(results) - success_count,
|
||
"results": results,
|
||
})
|
||
|
||
else:
|
||
# 单条模式
|
||
err = _validate_message(data)
|
||
if err:
|
||
return error(err)
|
||
|
||
msg_type = data.get("type", "text")
|
||
url = _resolve_url(data.get("url", "")) if msg_type != "text" else None
|
||
msg_text = data.get("msg", "")
|
||
|
||
try:
|
||
await _send_single(api, msg_type, url, msg_text, group_id, user_id, logger, rid)
|
||
return ok()
|
||
except Exception as exc:
|
||
logger.error(f"[{rid}] QQ API error: {exc}")
|
||
return error(f"qq api failed: {exc}", code=502, status=502)
|