"""上传处理器:文件上传、大小/类型限制、异步写入、自动清理过期文件。""" import asyncio import logging import time from pathlib import Path from aiohttp import BodyPartReader, web from ..config import ALLOWED_EXTENSIONS, MAX_UPLOAD_SIZE, UPLOAD_DIR from ..response import error, ok logger = logging.getLogger("webhook-plugin.upload") # 文件最大保留秒数(默认 24 小时) FILE_TTL_SECONDS: int = 24 * 60 * 60 # 读取块大小 CHUNK_SIZE: int = 65536 def _check_extension(filename: str) -> bool: """检查文件扩展名是否在允许列表内。""" if not ALLOWED_EXTENSIONS: return True ext = Path(filename).suffix.lstrip(".").lower() return ext in ALLOWED_EXTENSIONS async def cleanup_expired_files() -> None: """删除上传目录中超过 FILE_TTL_SECONDS 的文件。""" if not UPLOAD_DIR.exists(): return now = time.time() for path in UPLOAD_DIR.iterdir(): if path.is_file() and (now - path.stat().st_mtime) > FILE_TTL_SECONDS: try: path.unlink() logger.info("已清理过期文件: %s", path.name) except OSError as exc: logger.warning("清理文件失败 %s: %s", path.name, exc) async def upload_handler(request: web.Request) -> web.Response: """接收 multipart/form-data 上传,保存到 uploads 目录,返回相对文件 ID。""" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) reader = await request.multipart() saved_ids: list[str] = [] async for part in reader: if not isinstance(part, BodyPartReader) or not part.filename: continue filename: str = Path(part.filename).name # 防路径穿越 if not _check_extension(filename): return error(f"file type not allowed: {filename}", code=415) # 读取文件内容并检查大小 chunks: list[bytes] = [] total_size = 0 while True: chunk = await part.read_chunk(CHUNK_SIZE) if not chunk: break total_size += len(chunk) if total_size > MAX_UPLOAD_SIZE: # 消费剩余数据,避免 multipart reader 状态异常 while await part.read_chunk(CHUNK_SIZE): pass return error( f"file too large, max {MAX_UPLOAD_SIZE // (1024*1024)} MB", code=413, ) chunks.append(chunk) if total_size == 0: return error("empty file", code=400) # 同名文件自动重命名 save_path = UPLOAD_DIR / filename stem = Path(filename).stem suffix = Path(filename).suffix counter = 1 while save_path.exists(): save_path = UPLOAD_DIR / f"{stem}_{counter}{suffix}" counter += 1 # 使用线程池写文件,避免阻塞事件循环 data = b"".join(chunks) loop = asyncio.get_running_loop() await loop.run_in_executor(None, _write_file, save_path, data) # 返回相对路径作为文件 ID file_id = save_path.relative_to(UPLOAD_DIR).as_posix() saved_ids.append(file_id) if not saved_ids: return error("no file uploaded", code=400) return ok(data={"files": saved_ids, "path": saved_ids[0] if len(saved_ids) == 1 else None}) def _write_file(path: Path, data: bytes) -> None: """同步写文件,由线程池调用。""" with open(path, "wb") as f: f.write(data)