commit 68a169980d3d977a916a1b087e115755d776f778 Author: zpooi <448859157@qq.com> Date: Wed Mar 11 23:23:40 2026 +0800 自动提交token diff --git a/.env b/.env new file mode 100644 index 0000000..645b789 --- /dev/null +++ b/.env @@ -0,0 +1,16 @@ +UPLOAD_TOKEN= + +# 多个上传接口 URL,用 ; 或 , 分隔(不要用 ':' 分隔) +# 例:UPLOAD_ENDPOINTS=http://127.0.0.1:8317/v0/management/auth-files;http://127.0.0.1:46344/v0/management/auth-files +UPLOAD_ENDPOINTS= + +# 默认 endpoint(当未设置 UPLOAD_ENDPOINTS 且未传 --endpoint 时使用) +UPLOAD_ENDPOINT=http://127.0.0.1:8317/v0/management/auth-files + +# 默认 SQLite DB 文件名(当未传 --db 时使用) +UPLOAD_DB=upload_state.sqlite3 + +# JSON 子目录:支持绝对路径或相对 --dir 的路径 +# Linux 示例:/usr/local/openai_register/tokens +# Windows 示例:C:\Users\\Desktop\tokens\tokens +UPLOAD_JSON_SUBDIR= diff --git a/README.md b/README.md new file mode 100644 index 0000000..fdfbce8 --- /dev/null +++ b/README.md @@ -0,0 +1,152 @@ +# uploader + +一个常驻扫描器:定期扫描目录中的 `.json` 文件,并以 `multipart/form-data` 方式上传到一个或多个 HTTP 接口;上传成功/跳过/失败会写入本地 SQLite 状态库,避免重复上传。 + +## 运行环境 + +- Python 3.9+(建议 3.10/3.11) +- 依赖见 `requirements.txt` + +## 快速开始(本地) + +```bash +cd /opt/uploader +python3 -m venv .venv +source .venv/bin/activate +pip install -U pip +pip install -r requirements.txt + +# 运行(示例) +./.venv/bin/python uploader.py --dir /opt/uploader --interval 360 +``` + +## 配置(环境变量 / .env) + +脚本会在启动时尝试加载当前工作目录下的 `.env`(需要安装了 `python-dotenv`)。 + +建议在 `/opt/uploader/.env` 配置: + +```ini +# 鉴权 token(必填) +UPLOAD_TOKEN=your_token_here + +# 上传接口列表(推荐用 ; 或 , 分隔;不要用 ':' 分隔) +# 示例使用本机地址;请按你的实际服务地址修改 +UPLOAD_ENDPOINTS=http://127.0.0.1:8317/v0/management/auth-files;http://127.0.0.1:46344/v0/management/auth-files + +# JSON 子目录(可选):从这些目录递归扫描 *.json +# 注意:该变量支持 os.pathsep 分隔(Linux 为 ':',Windows 为 ';'),也支持逗号 +UPLOAD_JSON_SUBDIR=/usr/local/openai_register/tokens +# 或多个:UPLOAD_JSON_SUBDIRS=/path/a:/path/b + +# 默认 endpoint(当未设置 UPLOAD_ENDPOINTS 且未传 --endpoint 时使用) +UPLOAD_ENDPOINT=http://127.0.0.1:8317/v0/management/auth-files + +# 默认 SQLite DB 文件名(当未传 --db 时使用) +UPLOAD_DB=upload_state.sqlite3 +``` + +### 环境变量说明 + +- `UPLOAD_TOKEN`:上传鉴权 token(支持 `Bearer xxx` 或直接 token) +- `UPLOAD_ENDPOINTS`:多个上传接口 URL,用 `;` 或 `,` 分隔(不要用 `:` 分隔,因为 URL 自己包含 `http://` 和端口) +- `UPLOAD_ENDPOINT`:默认接口 URL(兜底用) +- `UPLOAD_DB`:默认 SQLite 状态库文件名(兜底用) +- `UPLOAD_JSON_SUBDIRS` / `UPLOAD_JSON_SUBDIR`:扫描子目录(可选;不设置则扫描 `--dir` 本身) + +## 命令行参数 + +```bash +./.venv/bin/python uploader.py \ + --dir /opt/uploader \ + --interval 360 \ + --endpoint http://XXX:8317/v0/management/auth-files \ + --db upload_state.sqlite3 \ + --timeout 60 \ + --once +``` + +常用参数: +- `--dir`:工作目录 +- `--interval`:扫描间隔(秒) +- `--endpoint`:当未配置 `UPLOAD_ENDPOINTS` 时使用的单个 endpoint +- `--db`:SQLite 文件名/路径 +- `--once`:只跑一轮就退出 + +## Linux 部署(systemd 推荐) + +### 1) 放置项目 + +```bash +mkdir -p /opt/uploader +# 将 uploader.py、requirements.txt、.env 放到 /opt/uploader +``` + +### 2) 安装依赖 + +```bash +cd /opt/uploader +python3 -m venv .venv +source .venv/bin/activate +pip install -U pip +pip install -r requirements.txt +``` + +### 3) 创建 systemd 服务 + +创建 `/etc/systemd/system/uploader.service`: + +```ini +[Unit] +Description=uploader daemon +After=network.target + +[Service] +Type=simple +WorkingDirectory=/opt/uploader +ExecStart=/opt/uploader/.venv/bin/python /opt/uploader/uploader.py --dir /opt/uploader --interval 360 +Restart=always +RestartSec=3 +Environment=PYTHONUNBUFFERED=1 + +[Install] +WantedBy=multi-user.target +``` + +启用并启动: + +```bash +systemctl daemon-reload +systemctl enable --now uploader +systemctl status uploader +``` + +## 实时看日志 + +### 方式 A:直接看 systemd 日志(推荐) + +```bash +journalctl -u uploader -f -o cat +``` + +### 方式 B:输出到文件再 tail + +在 `uploader.service` 的 `[Service]` 增加: + +```ini +StandardOutput=append:/var/log/uploader/uploader.log +StandardError=append:/var/log/uploader/uploader.err.log +``` + +并创建目录(如果服务用非 root 用户运行,请确保有写权限): + +```bash +mkdir -p /var/log/uploader +# 如果 service 里设置了 User=uploader: +# chown -R uploader:uploader /var/log/uploader + +systemctl daemon-reload +systemctl restart uploader + +tail -n 200 -f /var/log/uploader/uploader.log +``` diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2b1be2a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +requests>=2.31.0 +python-dotenv>=1.0.0 diff --git a/uploader.py b/uploader.py new file mode 100644 index 0000000..4a5f915 --- /dev/null +++ b/uploader.py @@ -0,0 +1,615 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +from __future__ import annotations + +import argparse +import hashlib +import json +import logging +import os +import sqlite3 +import sys +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, Optional, Tuple + +import requests + +try: + # 可选:用于从 .env 文件加载环境变量(UPLOAD_JSON_SUBDIRS / UPLOAD_JSON_SUBDIR 等) + from dotenv import load_dotenv # type: ignore +except Exception: # pragma: no cover + load_dotenv = None # type: ignore + +DEFAULT_ENDPOINT = ( + os.environ.get("UPLOAD_ENDPOINT") + or "http://127.0.0.1:8317/v0/management/auth-files" +).strip() +DEFAULT_DB_NAME = (os.environ.get("UPLOAD_DB") or "upload_state.sqlite3").strip() + +# 环境变量:指定 JSON 所在子目录(相对 --dir / 当前目录),可配置多个 +# 例: +# Windows: set UPLOAD_JSON_SUBDIRS=data;data2 +# bash: export UPLOAD_JSON_SUBDIRS="data:data2" (注意:在 Windows 上 os.pathsep 是 ;) +# 同时也支持逗号分隔:data,data2 +ENV_JSON_SUBDIRS = "UPLOAD_JSON_SUBDIRS" +ENV_JSON_SUBDIR = "UPLOAD_JSON_SUBDIR" # 单个子目录(兼容/简化用) + +# 多个上传站点(接口地址)列表,建议配置在 .env +# 例:UPLOAD_ENDPOINTS=http://a/v0/management/auth-files;http://b/v0/management/auth-files +ENV_ENDPOINTS = "UPLOAD_ENDPOINTS" +ENV_TOKEN = "UPLOAD_TOKEN" + + +@dataclass(frozen=True) +class UploadResult: + ok: bool + status_code: Optional[int] + text: str + + +def setup_logger() -> logging.Logger: + logger = logging.getLogger("uploader") + logger.setLevel(logging.INFO) + + class _CnFormatter(logging.Formatter): + LEVEL_MAP = { + "DEBUG": "调试", + "INFO": "信息", + "WARNING": "警告", + "ERROR": "错误", + "CRITICAL": "严重", + } + + def format(self, record: logging.LogRecord) -> str: + record.levelname_cn = self.LEVEL_MAP.get(record.levelname, record.levelname) + return super().format(record) + + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(logging.INFO) + formatter = _CnFormatter( + fmt="[%(asctime)s] [%(levelname_cn)s] %(message)s", + datefmt="%H:%M:%S", + ) + handler.setFormatter(formatter) + + # 防止重复添加 handler(例如某些 IDE 运行方式) + if not logger.handlers: + logger.addHandler(handler) + + return logger + + +def init_db(db_path: Path) -> sqlite3.Connection: + conn = sqlite3.connect(str(db_path)) + + # v2: 按 endpoint + file_path 去重 + conn.execute( + """ + CREATE TABLE IF NOT EXISTS uploads_v2 ( + endpoint TEXT NOT NULL, + file_path TEXT NOT NULL, + file_name TEXT NOT NULL, + sha256 TEXT NOT NULL, + size_bytes INTEGER NOT NULL, + mtime_ns INTEGER NOT NULL, + status TEXT NOT NULL, -- success | skipped | failed + http_status INTEGER, + response_text TEXT, + updated_at INTEGER NOT NULL, + PRIMARY KEY (endpoint, file_path) + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_uploads_v2_status ON uploads_v2(status)" + ) + + # 兼容迁移:如果旧表 uploads 存在且 v2 没数据,则把旧数据迁移到 v2(按 DEFAULT_ENDPOINT) + try: + old_exists = ( + conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='uploads'" + ).fetchone() + is not None + ) + if old_exists: + v2_count = conn.execute("SELECT COUNT(1) FROM uploads_v2").fetchone()[0] + if v2_count == 0: + conn.execute( + """ + INSERT OR IGNORE INTO uploads_v2( + endpoint, file_path, file_name, sha256, size_bytes, mtime_ns, + status, http_status, response_text, updated_at + ) + SELECT + ?, file_path, file_name, sha256, size_bytes, mtime_ns, + status, http_status, response_text, updated_at + FROM uploads + """, + (DEFAULT_ENDPOINT,), + ) + except Exception: + # 迁移失败不影响主流程 + pass + + conn.commit() + return conn + + +def sha256_of_file(path: Path, chunk_size: int = 1024 * 1024) -> str: + h = hashlib.sha256() + with path.open("rb") as f: + while True: + chunk = f.read(chunk_size) + if not chunk: + break + h.update(chunk) + return h.hexdigest() + + +def get_file_fingerprint(path: Path) -> Tuple[int, int, str]: + st = path.stat() + size = int(st.st_size) + mtime_ns = int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1e9))) + digest = sha256_of_file(path) + return size, mtime_ns, digest + + +def is_file_stable(path: Path, stable_seconds: float = 1.5) -> bool: + """避免上传正在写入中的文件:短暂等待并检查 size/mtime 是否变化。""" + try: + st1 = path.stat() + except FileNotFoundError: + return False + time.sleep(stable_seconds) + try: + st2 = path.stat() + except FileNotFoundError: + return False + + return (st1.st_size == st2.st_size) and (st1.st_mtime_ns == st2.st_mtime_ns) + + +def iter_json_files(folder: Path) -> Iterable[Path]: + # 递归扫描:如果你只想扫描当前目录(不含子目录),把 rglob 改成 glob + for p in folder.rglob("*.json"): + if p.is_file(): + yield p + + +def parse_list_from_env(var_name: str) -> list[str]: + raw = os.environ.get(var_name, "").strip() + if not raw: + return [] + + parts: list[str] = [] + # 同时兼容 "a;b" / "a:b" / "a,b" + for token in raw.replace(",", os.pathsep).split(os.pathsep): + token = token.strip() + if token: + parts.append(token) + + # 去重并保持顺序 + seen: set[str] = set() + uniq: list[str] = [] + for p in parts: + if p not in seen: + seen.add(p) + uniq.append(p) + return uniq + + +def parse_endpoints_from_env(fallback_endpoint: str) -> list[str]: + """读取需要上传的 endpoint 列表。 + + - 优先使用环境变量 UPLOAD_ENDPOINTS + - endpoints 只支持用 ; 或 , 分隔(不要用 ':'),因为 URL 自身包含 'http://', 端口等 ':' + - 如果未配置,则回退到命令行 --endpoint + """ + + raw = os.environ.get(ENV_ENDPOINTS, "").strip() + if raw: + parts = [p.strip() for p in raw.replace(",", ";").split(";")] + endpoints = [p for p in parts if p] + if endpoints: + return endpoints + + return [fallback_endpoint] + + +def normalize_token(raw: str) -> str: + t = (raw or "").strip() + if not t: + return "" + if t.lower().startswith("bearer "): + return t.split(" ", 1)[1].strip() + return t + + +def parse_subdirs_from_env(base_dir: Path) -> list[Path]: + """从环境变量读取 JSON 子目录列表。 + + 支持: + - UPLOAD_JSON_SUBDIRS:多个子目录,用 os.pathsep 分隔(Windows 是 ;,Linux/mac 是 :) + - 也支持逗号分隔 + - UPLOAD_JSON_SUBDIR:单个子目录(兼容/简化) + + 返回:绝对路径列表(不存在的会被过滤掉) + + 说明: + - 如果传入的是绝对路径,则直接使用 + - 如果是相对路径,则拼接 base_dir + """ + + raw = os.environ.get(ENV_JSON_SUBDIRS, "").strip() + if not raw: + raw = os.environ.get(ENV_JSON_SUBDIR, "").strip() + + if not raw: + return [] + + parts = ( + parse_list_from_env(ENV_JSON_SUBDIRS) + if os.environ.get(ENV_JSON_SUBDIRS) + else [] + ) + if not parts: + parts = [raw] + + dirs: list[Path] = [] + for sub in parts: + candidate = Path(sub).expanduser() + if candidate.is_absolute(): + p = candidate.resolve() + else: + p = (base_dir / sub).expanduser().resolve() + + if p.exists() and p.is_dir(): + dirs.append(p) + + # 去重并保持顺序 + seen: set[str] = set() + uniq: list[Path] = [] + for d in dirs: + s = str(d) + if s not in seen: + seen.add(s) + uniq.append(d) + + return uniq + + +def db_has_success_or_skipped( + conn: sqlite3.Connection, endpoint: str, file_path: str +) -> bool: + row = conn.execute( + "SELECT status FROM uploads_v2 WHERE endpoint = ? AND file_path = ?", + (endpoint, file_path), + ).fetchone() + if not row: + return False + return row[0] in ("success", "skipped") + + +def db_upsert( + conn: sqlite3.Connection, + *, + endpoint: str, + file_path: str, + file_name: str, + sha256: str, + size_bytes: int, + mtime_ns: int, + status: str, + http_status: Optional[int], + response_text: str, +) -> None: + now = int(time.time()) + conn.execute( + """ + INSERT INTO uploads_v2( + endpoint, file_path, file_name, sha256, size_bytes, mtime_ns, + status, http_status, response_text, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(endpoint, file_path) DO UPDATE SET + file_name=excluded.file_name, + sha256=excluded.sha256, + size_bytes=excluded.size_bytes, + mtime_ns=excluded.mtime_ns, + status=excluded.status, + http_status=excluded.http_status, + response_text=excluded.response_text, + updated_at=excluded.updated_at + """, + ( + endpoint, + file_path, + file_name, + sha256, + size_bytes, + mtime_ns, + status, + http_status, + response_text[:4000], + now, + ), + ) + conn.commit() + + +def should_treat_as_duplicate(resp: requests.Response) -> bool: + """无法提前拿到服务器已有列表时,依赖响应判断“已存在”。 + + 你可以按实际接口返回格式在这里加强判断逻辑。 + """ + if resp.status_code in (409, 208): + return True + + text = (resp.text or "").lower() + # 常见提示关键字(按需增删) + keywords = [ + "already", + "exists", + "duplicate", + "重复", + "已存在", + "已上传", + ] + return any(k in text for k in keywords) + + +def upload_file( + *, + endpoint: str, + token: str, + path: Path, + timeout_s: int, + verify_tls: bool, + extra_headers_json: Optional[str] = None, +) -> UploadResult: + headers = { + "Accept": "application/json, text/plain, */*", + "Authorization": f"Bearer {token}", + } + + if extra_headers_json: + try: + extra = json.loads(extra_headers_json) + if not isinstance(extra, dict): + raise ValueError("extra headers must be a JSON object") + for k, v in extra.items(): + headers[str(k)] = str(v) + except Exception as e: + return UploadResult(False, None, f"Invalid --extra-headers-json: {e}") + + # requests 会自动生成 multipart boundary;不要手工设置 Content-Type(否则 boundary 不匹配) + with path.open("rb") as f: + files = { + "file": (path.name, f, "application/json"), + } + try: + resp = requests.post( + endpoint, + headers=headers, + files=files, + timeout=timeout_s, + verify=verify_tls, + ) + except Exception as e: + return UploadResult(False, None, f"Request error: {e}") + + # 只要 2xx 就认为成功 + if 200 <= resp.status_code < 300: + return UploadResult(True, resp.status_code, resp.text or "") + + # 如果服务器返回“已存在”,可当作跳过(ok=True 但由上层标记为 skipped) + if should_treat_as_duplicate(resp): + return UploadResult(True, resp.status_code, resp.text or "") + + return UploadResult(False, resp.status_code, resp.text or "") + + +def main() -> int: + parser = argparse.ArgumentParser(description="Upload local JSON files to target site") + parser.add_argument("--dir", default=".", help="工作目录(默认当前目录)") + parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT, help="上传接口 URL") + parser.add_argument( + "--token", + default=None, + help=f"上传鉴权 token(也可用环境变量 {ENV_TOKEN} 配置)", + ) + parser.add_argument( + "--interval", + type=int, + default=120, + help="扫描间隔秒数(默认 120 秒)", + ) + parser.add_argument( + "--db", + default=DEFAULT_DB_NAME, + help="本地状态数据库文件名(默认 upload_state.sqlite3)", + ) + parser.add_argument( + "--timeout", + type=int, + default=60, + help="单次上传超时秒数(默认 60)", + ) + parser.add_argument( + "--verify-tls", + action="store_true", + help="开启 TLS 证书校验(仅 https 有意义;默认关闭)", + ) + parser.add_argument( + "--once", + action="store_true", + help="只扫描并上传一轮后退出(默认持续运行)", + ) + parser.add_argument( + "--extra-headers-json", + default=None, + help='附加请求头(JSON对象),如 {"Origin":"...","Referer":"..."}', + ) + + args = parser.parse_args() + + logger = setup_logger() + + # 如果安装了 python-dotenv,则自动加载当前目录下的 .env + if load_dotenv is not None: + load_dotenv() + + base_dir = Path(args.dir).expanduser().resolve() + if not base_dir.exists() or not base_dir.is_dir(): + logger.error(f"目录不存在或不是文件夹: {base_dir}") + return 2 + + # 从环境变量读取子目录;如果没配,则默认扫描 base_dir 本身 + subdirs = parse_subdirs_from_env(base_dir) + scan_dirs = subdirs if subdirs else [base_dir] + + db_path = Path(args.db).expanduser().resolve() + conn = init_db(db_path) + + # token:优先命令行,其次环境变量/.env + token = normalize_token(args.token or os.environ.get(ENV_TOKEN, "")) + if not token: + logger.error(f"未配置 token:请使用 --token 或在 .env 设置 {ENV_TOKEN}") + return 2 + + endpoints = parse_endpoints_from_env(args.endpoint) + + logger.info(f"工作目录: {base_dir}") + if subdirs: + logger.info( + f"扫描子目录: {', '.join(str(d) for d in scan_dirs)}(来自环境变量 {ENV_JSON_SUBDIRS}/{ENV_JSON_SUBDIR})" + ) + else: + logger.info("未设置子目录环境变量:将扫描工作目录本身") + logger.info(f"上传站点数: {len(endpoints)}") + for i, ep in enumerate(endpoints, 1): + logger.info(f"站点[{i}]: {ep}") + logger.info(f"状态库: {db_path}") + logger.info(f"扫描间隔: {args.interval} 秒") + + while True: + any_new = False + + # 汇总所有扫描目录的文件 + all_files: list[Path] = [] + for d in scan_dirs: + all_files.extend(iter_json_files(d)) + + # 为了可控,按 mtime 升序上传(更接近“按生成时间”) + files = sorted(all_files, key=lambda p: p.stat().st_mtime_ns) + + for path in files: + file_path_str = str(path) + + for endpoint in endpoints: + if db_has_success_or_skipped(conn, endpoint, file_path_str): + continue + + if not is_file_stable(path): + logger.info(f"跳过(正在写入): {path.name}") + break + + any_new = True + + try: + size_bytes, mtime_ns, digest = get_file_fingerprint(path) + except Exception as e: + logger.error(f"失败(读取/计算哈希): {path.name}: {e}") + db_upsert( + conn, + endpoint=endpoint, + file_path=file_path_str, + file_name=path.name, + sha256="", + size_bytes=0, + mtime_ns=0, + status="failed", + http_status=None, + response_text=str(e), + ) + continue + result = upload_file( + endpoint=endpoint, + token=token, + path=path, + timeout_s=args.timeout, + verify_tls=args.verify_tls, + extra_headers_json=args.extra_headers_json, + ) + + if result.ok: + status = "success" + if result.status_code is not None and not ( + 200 <= result.status_code < 300 + ): + status = "skipped" + + if status == "success": + logger.info( + f"成功: {path.name} -> {endpoint} | HTTP={result.status_code}" + ) + else: + logger.info( + f"跳过(已存在): {path.name} -> {endpoint} | HTTP={result.status_code}" + ) + + db_upsert( + conn, + endpoint=endpoint, + file_path=file_path_str, + file_name=path.name, + sha256=digest, + size_bytes=size_bytes, + mtime_ns=mtime_ns, + status=status, + http_status=result.status_code, + response_text=result.text, + ) + else: + logger.error( + f"失败: {path.name} -> {endpoint} | HTTP={result.status_code} | {result.text[:300]}" + ) + db_upsert( + conn, + endpoint=endpoint, + file_path=file_path_str, + file_name=path.name, + sha256=digest, + size_bytes=size_bytes, + mtime_ns=mtime_ns, + status="failed", + http_status=result.status_code, + response_text=result.text, + ) + + if args.once: + logger.info("已完成(单次模式)。") + return 0 + + wait_s = max(5, int(args.interval)) + + if not any_new: + # 倒计时读秒(同一行刷新),避免刷屏 + for remaining in range(wait_s, 0, -1): + sys.stdout.write( + f"\r[{time.strftime('%H:%M:%S')}] [信息] 暂无新文件,等待中... {remaining} 秒" + ) + sys.stdout.flush() + time.sleep(1) + sys.stdout.write("\n") + sys.stdout.flush() + else: + time.sleep(wait_s) + + +if __name__ == "__main__": + raise SystemExit(main())