自动提交token

This commit is contained in:
2026-03-11 23:23:40 +08:00
commit 68a169980d
4 changed files with 785 additions and 0 deletions

16
.env Normal file
View File

@@ -0,0 +1,16 @@
UPLOAD_TOKEN=
# 多个上传接口 URL用 ; 或 , 分隔(不要用 ':' 分隔)
# 例UPLOAD_ENDPOINTS=http://127.0.0.1:8317/v0/management/auth-files;http://127.0.0.1:46344/v0/management/auth-files
UPLOAD_ENDPOINTS=
# 默认 endpoint当未设置 UPLOAD_ENDPOINTS 且未传 --endpoint 时使用)
UPLOAD_ENDPOINT=http://127.0.0.1:8317/v0/management/auth-files
# 默认 SQLite DB 文件名(当未传 --db 时使用)
UPLOAD_DB=upload_state.sqlite3
# JSON 子目录:支持绝对路径或相对 --dir 的路径
# Linux 示例:/usr/local/openai_register/tokens
# Windows 示例C:\Users\<you>\Desktop\tokens\tokens
UPLOAD_JSON_SUBDIR=

152
README.md Normal file
View File

@@ -0,0 +1,152 @@
# uploader
一个常驻扫描器:定期扫描目录中的 `.json` 文件,并以 `multipart/form-data` 方式上传到一个或多个 HTTP 接口;上传成功/跳过/失败会写入本地 SQLite 状态库,避免重复上传。
## 运行环境
- Python 3.9+(建议 3.10/3.11
- 依赖见 `requirements.txt`
## 快速开始(本地)
```bash
cd /opt/uploader
python3 -m venv .venv
source .venv/bin/activate
pip install -U pip
pip install -r requirements.txt
# 运行(示例)
./.venv/bin/python uploader.py --dir /opt/uploader --interval 360
```
## 配置(环境变量 / .env
脚本会在启动时尝试加载当前工作目录下的 `.env`(需要安装了 `python-dotenv`)。
建议在 `/opt/uploader/.env` 配置:
```ini
# 鉴权 token必填
UPLOAD_TOKEN=your_token_here
# 上传接口列表(推荐用 ; 或 , 分隔;不要用 ':' 分隔)
# 示例使用本机地址;请按你的实际服务地址修改
UPLOAD_ENDPOINTS=http://127.0.0.1:8317/v0/management/auth-files;http://127.0.0.1:46344/v0/management/auth-files
# JSON 子目录(可选):从这些目录递归扫描 *.json
# 注意:该变量支持 os.pathsep 分隔Linux 为 ':'Windows 为 ';'),也支持逗号
UPLOAD_JSON_SUBDIR=/usr/local/openai_register/tokens
# 或多个UPLOAD_JSON_SUBDIRS=/path/a:/path/b
# 默认 endpoint当未设置 UPLOAD_ENDPOINTS 且未传 --endpoint 时使用)
UPLOAD_ENDPOINT=http://127.0.0.1:8317/v0/management/auth-files
# 默认 SQLite DB 文件名(当未传 --db 时使用)
UPLOAD_DB=upload_state.sqlite3
```
### 环境变量说明
- `UPLOAD_TOKEN`:上传鉴权 token支持 `Bearer xxx` 或直接 token
- `UPLOAD_ENDPOINTS`:多个上传接口 URL`;``,` 分隔(不要用 `:` 分隔,因为 URL 自己包含 `http://` 和端口)
- `UPLOAD_ENDPOINT`:默认接口 URL兜底用
- `UPLOAD_DB`:默认 SQLite 状态库文件名(兜底用)
- `UPLOAD_JSON_SUBDIRS` / `UPLOAD_JSON_SUBDIR`:扫描子目录(可选;不设置则扫描 `--dir` 本身)
## 命令行参数
```bash
./.venv/bin/python uploader.py \
--dir /opt/uploader \
--interval 360 \
--endpoint http://XXX:8317/v0/management/auth-files \
--db upload_state.sqlite3 \
--timeout 60 \
--once
```
常用参数:
- `--dir`:工作目录
- `--interval`:扫描间隔(秒)
- `--endpoint`:当未配置 `UPLOAD_ENDPOINTS` 时使用的单个 endpoint
- `--db`SQLite 文件名/路径
- `--once`:只跑一轮就退出
## Linux 部署systemd 推荐)
### 1) 放置项目
```bash
mkdir -p /opt/uploader
# 将 uploader.py、requirements.txt、.env 放到 /opt/uploader
```
### 2) 安装依赖
```bash
cd /opt/uploader
python3 -m venv .venv
source .venv/bin/activate
pip install -U pip
pip install -r requirements.txt
```
### 3) 创建 systemd 服务
创建 `/etc/systemd/system/uploader.service`
```ini
[Unit]
Description=uploader daemon
After=network.target
[Service]
Type=simple
WorkingDirectory=/opt/uploader
ExecStart=/opt/uploader/.venv/bin/python /opt/uploader/uploader.py --dir /opt/uploader --interval 360
Restart=always
RestartSec=3
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.target
```
启用并启动:
```bash
systemctl daemon-reload
systemctl enable --now uploader
systemctl status uploader
```
## 实时看日志
### 方式 A直接看 systemd 日志(推荐)
```bash
journalctl -u uploader -f -o cat
```
### 方式 B输出到文件再 tail
`uploader.service``[Service]` 增加:
```ini
StandardOutput=append:/var/log/uploader/uploader.log
StandardError=append:/var/log/uploader/uploader.err.log
```
并创建目录(如果服务用非 root 用户运行,请确保有写权限):
```bash
mkdir -p /var/log/uploader
# 如果 service 里设置了 User=uploader
# chown -R uploader:uploader /var/log/uploader
systemctl daemon-reload
systemctl restart uploader
tail -n 200 -f /var/log/uploader/uploader.log
```

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
requests>=2.31.0
python-dotenv>=1.0.0

615
uploader.py Normal file
View File

@@ -0,0 +1,615 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import argparse
import hashlib
import json
import logging
import os
import sqlite3
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Optional, Tuple
import requests
try:
# 可选:用于从 .env 文件加载环境变量UPLOAD_JSON_SUBDIRS / UPLOAD_JSON_SUBDIR 等)
from dotenv import load_dotenv # type: ignore
except Exception: # pragma: no cover
load_dotenv = None # type: ignore
DEFAULT_ENDPOINT = (
os.environ.get("UPLOAD_ENDPOINT")
or "http://127.0.0.1:8317/v0/management/auth-files"
).strip()
DEFAULT_DB_NAME = (os.environ.get("UPLOAD_DB") or "upload_state.sqlite3").strip()
# 环境变量:指定 JSON 所在子目录(相对 --dir / 当前目录),可配置多个
# 例:
# Windows: set UPLOAD_JSON_SUBDIRS=data;data2
# bash: export UPLOAD_JSON_SUBDIRS="data:data2" (注意:在 Windows 上 os.pathsep 是 ;)
# 同时也支持逗号分隔data,data2
ENV_JSON_SUBDIRS = "UPLOAD_JSON_SUBDIRS"
ENV_JSON_SUBDIR = "UPLOAD_JSON_SUBDIR" # 单个子目录(兼容/简化用)
# 多个上传站点(接口地址)列表,建议配置在 .env
# 例UPLOAD_ENDPOINTS=http://a/v0/management/auth-files;http://b/v0/management/auth-files
ENV_ENDPOINTS = "UPLOAD_ENDPOINTS"
ENV_TOKEN = "UPLOAD_TOKEN"
@dataclass(frozen=True)
class UploadResult:
ok: bool
status_code: Optional[int]
text: str
def setup_logger() -> logging.Logger:
logger = logging.getLogger("uploader")
logger.setLevel(logging.INFO)
class _CnFormatter(logging.Formatter):
LEVEL_MAP = {
"DEBUG": "调试",
"INFO": "信息",
"WARNING": "警告",
"ERROR": "错误",
"CRITICAL": "严重",
}
def format(self, record: logging.LogRecord) -> str:
record.levelname_cn = self.LEVEL_MAP.get(record.levelname, record.levelname)
return super().format(record)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = _CnFormatter(
fmt="[%(asctime)s] [%(levelname_cn)s] %(message)s",
datefmt="%H:%M:%S",
)
handler.setFormatter(formatter)
# 防止重复添加 handler例如某些 IDE 运行方式)
if not logger.handlers:
logger.addHandler(handler)
return logger
def init_db(db_path: Path) -> sqlite3.Connection:
conn = sqlite3.connect(str(db_path))
# v2: 按 endpoint + file_path 去重
conn.execute(
"""
CREATE TABLE IF NOT EXISTS uploads_v2 (
endpoint TEXT NOT NULL,
file_path TEXT NOT NULL,
file_name TEXT NOT NULL,
sha256 TEXT NOT NULL,
size_bytes INTEGER NOT NULL,
mtime_ns INTEGER NOT NULL,
status TEXT NOT NULL, -- success | skipped | failed
http_status INTEGER,
response_text TEXT,
updated_at INTEGER NOT NULL,
PRIMARY KEY (endpoint, file_path)
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_uploads_v2_status ON uploads_v2(status)"
)
# 兼容迁移:如果旧表 uploads 存在且 v2 没数据,则把旧数据迁移到 v2按 DEFAULT_ENDPOINT
try:
old_exists = (
conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='uploads'"
).fetchone()
is not None
)
if old_exists:
v2_count = conn.execute("SELECT COUNT(1) FROM uploads_v2").fetchone()[0]
if v2_count == 0:
conn.execute(
"""
INSERT OR IGNORE INTO uploads_v2(
endpoint, file_path, file_name, sha256, size_bytes, mtime_ns,
status, http_status, response_text, updated_at
)
SELECT
?, file_path, file_name, sha256, size_bytes, mtime_ns,
status, http_status, response_text, updated_at
FROM uploads
""",
(DEFAULT_ENDPOINT,),
)
except Exception:
# 迁移失败不影响主流程
pass
conn.commit()
return conn
def sha256_of_file(path: Path, chunk_size: int = 1024 * 1024) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
def get_file_fingerprint(path: Path) -> Tuple[int, int, str]:
st = path.stat()
size = int(st.st_size)
mtime_ns = int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1e9)))
digest = sha256_of_file(path)
return size, mtime_ns, digest
def is_file_stable(path: Path, stable_seconds: float = 1.5) -> bool:
"""避免上传正在写入中的文件:短暂等待并检查 size/mtime 是否变化。"""
try:
st1 = path.stat()
except FileNotFoundError:
return False
time.sleep(stable_seconds)
try:
st2 = path.stat()
except FileNotFoundError:
return False
return (st1.st_size == st2.st_size) and (st1.st_mtime_ns == st2.st_mtime_ns)
def iter_json_files(folder: Path) -> Iterable[Path]:
# 递归扫描:如果你只想扫描当前目录(不含子目录),把 rglob 改成 glob
for p in folder.rglob("*.json"):
if p.is_file():
yield p
def parse_list_from_env(var_name: str) -> list[str]:
raw = os.environ.get(var_name, "").strip()
if not raw:
return []
parts: list[str] = []
# 同时兼容 "a;b" / "a:b" / "a,b"
for token in raw.replace(",", os.pathsep).split(os.pathsep):
token = token.strip()
if token:
parts.append(token)
# 去重并保持顺序
seen: set[str] = set()
uniq: list[str] = []
for p in parts:
if p not in seen:
seen.add(p)
uniq.append(p)
return uniq
def parse_endpoints_from_env(fallback_endpoint: str) -> list[str]:
"""读取需要上传的 endpoint 列表。
- 优先使用环境变量 UPLOAD_ENDPOINTS
- endpoints 只支持用 ; 或 , 分隔(不要用 ':'),因为 URL 自身包含 'http://', 端口等 ':'
- 如果未配置,则回退到命令行 --endpoint
"""
raw = os.environ.get(ENV_ENDPOINTS, "").strip()
if raw:
parts = [p.strip() for p in raw.replace(",", ";").split(";")]
endpoints = [p for p in parts if p]
if endpoints:
return endpoints
return [fallback_endpoint]
def normalize_token(raw: str) -> str:
t = (raw or "").strip()
if not t:
return ""
if t.lower().startswith("bearer "):
return t.split(" ", 1)[1].strip()
return t
def parse_subdirs_from_env(base_dir: Path) -> list[Path]:
"""从环境变量读取 JSON 子目录列表。
支持:
- UPLOAD_JSON_SUBDIRS多个子目录用 os.pathsep 分隔Windows 是 ;Linux/mac 是 :
- 也支持逗号分隔
- UPLOAD_JSON_SUBDIR单个子目录兼容/简化)
返回:绝对路径列表(不存在的会被过滤掉)
说明:
- 如果传入的是绝对路径,则直接使用
- 如果是相对路径,则拼接 base_dir
"""
raw = os.environ.get(ENV_JSON_SUBDIRS, "").strip()
if not raw:
raw = os.environ.get(ENV_JSON_SUBDIR, "").strip()
if not raw:
return []
parts = (
parse_list_from_env(ENV_JSON_SUBDIRS)
if os.environ.get(ENV_JSON_SUBDIRS)
else []
)
if not parts:
parts = [raw]
dirs: list[Path] = []
for sub in parts:
candidate = Path(sub).expanduser()
if candidate.is_absolute():
p = candidate.resolve()
else:
p = (base_dir / sub).expanduser().resolve()
if p.exists() and p.is_dir():
dirs.append(p)
# 去重并保持顺序
seen: set[str] = set()
uniq: list[Path] = []
for d in dirs:
s = str(d)
if s not in seen:
seen.add(s)
uniq.append(d)
return uniq
def db_has_success_or_skipped(
conn: sqlite3.Connection, endpoint: str, file_path: str
) -> bool:
row = conn.execute(
"SELECT status FROM uploads_v2 WHERE endpoint = ? AND file_path = ?",
(endpoint, file_path),
).fetchone()
if not row:
return False
return row[0] in ("success", "skipped")
def db_upsert(
conn: sqlite3.Connection,
*,
endpoint: str,
file_path: str,
file_name: str,
sha256: str,
size_bytes: int,
mtime_ns: int,
status: str,
http_status: Optional[int],
response_text: str,
) -> None:
now = int(time.time())
conn.execute(
"""
INSERT INTO uploads_v2(
endpoint, file_path, file_name, sha256, size_bytes, mtime_ns,
status, http_status, response_text, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(endpoint, file_path) DO UPDATE SET
file_name=excluded.file_name,
sha256=excluded.sha256,
size_bytes=excluded.size_bytes,
mtime_ns=excluded.mtime_ns,
status=excluded.status,
http_status=excluded.http_status,
response_text=excluded.response_text,
updated_at=excluded.updated_at
""",
(
endpoint,
file_path,
file_name,
sha256,
size_bytes,
mtime_ns,
status,
http_status,
response_text[:4000],
now,
),
)
conn.commit()
def should_treat_as_duplicate(resp: requests.Response) -> bool:
"""无法提前拿到服务器已有列表时,依赖响应判断“已存在”。
你可以按实际接口返回格式在这里加强判断逻辑。
"""
if resp.status_code in (409, 208):
return True
text = (resp.text or "").lower()
# 常见提示关键字(按需增删)
keywords = [
"already",
"exists",
"duplicate",
"重复",
"已存在",
"已上传",
]
return any(k in text for k in keywords)
def upload_file(
*,
endpoint: str,
token: str,
path: Path,
timeout_s: int,
verify_tls: bool,
extra_headers_json: Optional[str] = None,
) -> UploadResult:
headers = {
"Accept": "application/json, text/plain, */*",
"Authorization": f"Bearer {token}",
}
if extra_headers_json:
try:
extra = json.loads(extra_headers_json)
if not isinstance(extra, dict):
raise ValueError("extra headers must be a JSON object")
for k, v in extra.items():
headers[str(k)] = str(v)
except Exception as e:
return UploadResult(False, None, f"Invalid --extra-headers-json: {e}")
# requests 会自动生成 multipart boundary不要手工设置 Content-Type否则 boundary 不匹配)
with path.open("rb") as f:
files = {
"file": (path.name, f, "application/json"),
}
try:
resp = requests.post(
endpoint,
headers=headers,
files=files,
timeout=timeout_s,
verify=verify_tls,
)
except Exception as e:
return UploadResult(False, None, f"Request error: {e}")
# 只要 2xx 就认为成功
if 200 <= resp.status_code < 300:
return UploadResult(True, resp.status_code, resp.text or "")
# 如果服务器返回“已存在”可当作跳过ok=True 但由上层标记为 skipped
if should_treat_as_duplicate(resp):
return UploadResult(True, resp.status_code, resp.text or "")
return UploadResult(False, resp.status_code, resp.text or "")
def main() -> int:
parser = argparse.ArgumentParser(description="Upload local JSON files to target site")
parser.add_argument("--dir", default=".", help="工作目录(默认当前目录)")
parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT, help="上传接口 URL")
parser.add_argument(
"--token",
default=None,
help=f"上传鉴权 token也可用环境变量 {ENV_TOKEN} 配置)",
)
parser.add_argument(
"--interval",
type=int,
default=120,
help="扫描间隔秒数(默认 120 秒)",
)
parser.add_argument(
"--db",
default=DEFAULT_DB_NAME,
help="本地状态数据库文件名(默认 upload_state.sqlite3",
)
parser.add_argument(
"--timeout",
type=int,
default=60,
help="单次上传超时秒数(默认 60",
)
parser.add_argument(
"--verify-tls",
action="store_true",
help="开启 TLS 证书校验(仅 https 有意义;默认关闭)",
)
parser.add_argument(
"--once",
action="store_true",
help="只扫描并上传一轮后退出(默认持续运行)",
)
parser.add_argument(
"--extra-headers-json",
default=None,
help='附加请求头(JSON对象),如 {"Origin":"...","Referer":"..."}',
)
args = parser.parse_args()
logger = setup_logger()
# 如果安装了 python-dotenv则自动加载当前目录下的 .env
if load_dotenv is not None:
load_dotenv()
base_dir = Path(args.dir).expanduser().resolve()
if not base_dir.exists() or not base_dir.is_dir():
logger.error(f"目录不存在或不是文件夹: {base_dir}")
return 2
# 从环境变量读取子目录;如果没配,则默认扫描 base_dir 本身
subdirs = parse_subdirs_from_env(base_dir)
scan_dirs = subdirs if subdirs else [base_dir]
db_path = Path(args.db).expanduser().resolve()
conn = init_db(db_path)
# token优先命令行其次环境变量/.env
token = normalize_token(args.token or os.environ.get(ENV_TOKEN, ""))
if not token:
logger.error(f"未配置 token请使用 --token 或在 .env 设置 {ENV_TOKEN}")
return 2
endpoints = parse_endpoints_from_env(args.endpoint)
logger.info(f"工作目录: {base_dir}")
if subdirs:
logger.info(
f"扫描子目录: {', '.join(str(d) for d in scan_dirs)}(来自环境变量 {ENV_JSON_SUBDIRS}/{ENV_JSON_SUBDIR}"
)
else:
logger.info("未设置子目录环境变量:将扫描工作目录本身")
logger.info(f"上传站点数: {len(endpoints)}")
for i, ep in enumerate(endpoints, 1):
logger.info(f"站点[{i}]: {ep}")
logger.info(f"状态库: {db_path}")
logger.info(f"扫描间隔: {args.interval}")
while True:
any_new = False
# 汇总所有扫描目录的文件
all_files: list[Path] = []
for d in scan_dirs:
all_files.extend(iter_json_files(d))
# 为了可控,按 mtime 升序上传(更接近“按生成时间”)
files = sorted(all_files, key=lambda p: p.stat().st_mtime_ns)
for path in files:
file_path_str = str(path)
for endpoint in endpoints:
if db_has_success_or_skipped(conn, endpoint, file_path_str):
continue
if not is_file_stable(path):
logger.info(f"跳过(正在写入): {path.name}")
break
any_new = True
try:
size_bytes, mtime_ns, digest = get_file_fingerprint(path)
except Exception as e:
logger.error(f"失败(读取/计算哈希): {path.name}: {e}")
db_upsert(
conn,
endpoint=endpoint,
file_path=file_path_str,
file_name=path.name,
sha256="",
size_bytes=0,
mtime_ns=0,
status="failed",
http_status=None,
response_text=str(e),
)
continue
result = upload_file(
endpoint=endpoint,
token=token,
path=path,
timeout_s=args.timeout,
verify_tls=args.verify_tls,
extra_headers_json=args.extra_headers_json,
)
if result.ok:
status = "success"
if result.status_code is not None and not (
200 <= result.status_code < 300
):
status = "skipped"
if status == "success":
logger.info(
f"成功: {path.name} -> {endpoint} | HTTP={result.status_code}"
)
else:
logger.info(
f"跳过(已存在): {path.name} -> {endpoint} | HTTP={result.status_code}"
)
db_upsert(
conn,
endpoint=endpoint,
file_path=file_path_str,
file_name=path.name,
sha256=digest,
size_bytes=size_bytes,
mtime_ns=mtime_ns,
status=status,
http_status=result.status_code,
response_text=result.text,
)
else:
logger.error(
f"失败: {path.name} -> {endpoint} | HTTP={result.status_code} | {result.text[:300]}"
)
db_upsert(
conn,
endpoint=endpoint,
file_path=file_path_str,
file_name=path.name,
sha256=digest,
size_bytes=size_bytes,
mtime_ns=mtime_ns,
status="failed",
http_status=result.status_code,
response_text=result.text,
)
if args.once:
logger.info("已完成(单次模式)。")
return 0
wait_s = max(5, int(args.interval))
if not any_new:
# 倒计时读秒(同一行刷新),避免刷屏
for remaining in range(wait_s, 0, -1):
sys.stdout.write(
f"\r[{time.strftime('%H:%M:%S')}] [信息] 暂无新文件,等待中... {remaining}"
)
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("\n")
sys.stdout.flush()
else:
time.sleep(wait_s)
if __name__ == "__main__":
raise SystemExit(main())