1
0
forked from Eeveid/lightOps
Files
lightOps/crates/lightops-agent/src/actions.rs
2026-05-25 11:14:39 +08:00

1952 lines
62 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use anyhow::{anyhow, bail, Context, Result};
use base64::{engine::general_purpose, Engine};
use chrono::{DateTime, NaiveDateTime, Utc};
use lightops_common::protocol::{DockerContainer, DockerImage, FileEntry, NginxSite, ServiceInfo};
use serde_json::{json, Value};
use std::{
fs,
io::{Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
};
use tokio::process::Command;
use tokio::time::{timeout, Duration};
const MAX_TEXT_FILE: u64 = 2 * 1024 * 1024;
const NGINX_AVAILABLE: &str = "/etc/nginx/sites-available";
const NGINX_ENABLED: &str = "/etc/nginx/sites-enabled";
pub async fn handle(action: &str, params: Value) -> Result<Value> {
if action.starts_with("app.") {
return crate::app::handle(action, params).await;
}
match action {
"system.snapshot" => system_snapshot().await,
"file.roots" => file_roots(),
"file.list" => file_list(path_param(&params, "path")?),
"file.search" => file_search(params),
"file.read" => file_read(path_param(&params, "path")?),
"file.write" => file_write(
path_param(&params, "path")?,
string_param(&params, "content")?,
),
"file.mkdir" => file_mkdir(path_param(&params, "path")?),
"file.delete" => file_delete(
path_param(&params, "path")?,
params
.get("recursive")
.and_then(Value::as_bool)
.unwrap_or(false),
),
"file.rename" => file_rename(path_param(&params, "from")?, path_param(&params, "to")?),
"file.upload.chunk" => file_upload_chunk(params),
"file.download.chunk" => file_download_chunk(params),
"file.chmod" => file_chmod(params),
"log.tail_file" | "log.read_file" => log_tail(params).await,
"service.list" => service_list().await,
"service.status" => service_status(string_param(&params, "name")?).await,
"service.start" => service_action("start", string_param(&params, "name")?).await,
"service.stop" => service_action("stop", string_param(&params, "name")?).await,
"service.restart" => service_action("restart", string_param(&params, "name")?).await,
"service.enable" => service_action("enable", string_param(&params, "name")?).await,
"service.disable" => service_action("disable", string_param(&params, "name")?).await,
"nginx.status" => nginx_status().await,
"nginx.sites" => nginx_sites(),
"nginx.site.get" => nginx_site_get(string_param(&params, "name")?),
"nginx.site.backups" => nginx_site_backups(string_param(&params, "name")?),
"nginx.site.restore_backup" => {
nginx_site_restore_backup(
string_param(&params, "name")?,
string_param(&params, "backup")?,
)
.await
}
"nginx.site.create" => nginx_site_create(params).await,
"nginx.site.update" => {
nginx_site_update(
string_param(&params, "name")?,
string_param(&params, "content")?,
)
.await
}
"nginx.site.enable" => nginx_site_enable(string_param(&params, "name")?).await,
"nginx.site.disable" => nginx_site_disable(string_param(&params, "name")?),
"nginx.test" => nginx_test().await,
"nginx.reload" => nginx_reload().await,
"nginx.ssl.status" => nginx_ssl_status().await,
"nginx.ssl.issue" => nginx_ssl_issue(params).await,
"nginx.ssl.renew" => nginx_ssl_renew().await,
"nginx.ssl.auto_renew" => nginx_ssl_auto_renew().await,
"docker.status" => docker_status().await,
"docker.containers" => docker_containers().await,
"docker.container.inspect" => docker_inspect(string_param(&params, "id")?).await,
"docker.container.stats" => docker_stats(string_param(&params, "id")?).await,
"docker.container.run" => docker_run(params).await,
"docker.container.start" => docker_simple("start", string_param(&params, "id")?).await,
"docker.container.stop" => docker_simple("stop", string_param(&params, "id")?).await,
"docker.container.restart" => docker_simple("restart", string_param(&params, "id")?).await,
"docker.container.delete" => docker_simple("rm", string_param(&params, "id")?).await,
"docker.container.logs" => {
docker_logs(
string_param(&params, "id")?,
params.get("tail").and_then(Value::as_u64).unwrap_or(200),
)
.await
}
"docker.images" => docker_images().await,
"docker.image.pull" => docker_pull(string_param(&params, "image")?).await,
"docker.image.delete" => docker_rmi(string_param(&params, "id")?).await,
"docker.volumes" => docker_volumes().await,
"docker.volume.delete" => docker_volume_delete(string_param(&params, "name")?).await,
"docker.compose.projects" => docker_compose_projects().await,
"docker.compose.start" => {
docker_compose_action("start", string_param(&params, "project")?).await
}
"docker.compose.stop" => {
docker_compose_action("stop", string_param(&params, "project")?).await
}
"docker.compose.restart" => {
docker_compose_action("restart", string_param(&params, "project")?).await
}
"docker.compose.logs" => {
docker_compose_logs(
string_param(&params, "project")?,
params.get("tail").and_then(Value::as_u64).unwrap_or(200),
)
.await
}
"docker.compose.preflight" => docker_compose_preflight(params).await,
"docker.compose.deploy" => docker_compose_deploy(params).await,
"docker.compose.update" => docker_compose_update(params).await,
"docker.compose.down" => docker_compose_down(params).await,
_ => bail!("不支持的操作"),
}
}
#[allow(dead_code)]
pub trait DockerProvider {
async fn list_containers(&self) -> Result<Vec<DockerContainer>>;
async fn start_container(&self, id: &str) -> Result<()>;
async fn stop_container(&self, id: &str) -> Result<()>;
async fn restart_container(&self, id: &str) -> Result<()>;
async fn remove_container(&self, id: &str) -> Result<()>;
}
#[allow(dead_code)]
pub struct DockerCliProvider;
impl DockerProvider for DockerCliProvider {
async fn list_containers(&self) -> Result<Vec<DockerContainer>> {
let value = docker_containers().await?;
Ok(serde_json::from_value(value["containers"].clone())?)
}
async fn start_container(&self, id: &str) -> Result<()> {
docker_simple("start", id.to_string()).await.map(|_| ())
}
async fn stop_container(&self, id: &str) -> Result<()> {
docker_simple("stop", id.to_string()).await.map(|_| ())
}
async fn restart_container(&self, id: &str) -> Result<()> {
docker_simple("restart", id.to_string()).await.map(|_| ())
}
async fn remove_container(&self, id: &str) -> Result<()> {
docker_simple("rm", id.to_string()).await.map(|_| ())
}
}
async fn system_snapshot() -> Result<Value> {
let processes = command_json_lines(
Command::new("ps").args(["-eo", "pid,ppid,user,comm,%cpu,%mem", "--sort=-%cpu"]),
8,
parse_process_line,
30,
)
.await
.unwrap_or_default();
let ports_result = match command_lines(Command::new("ss").args(["-H", "-tulpen"]), 8).await {
Ok(lines) => Ok(lines),
Err(_) => command_lines(Command::new("netstat").args(["-tulpen"]), 8).await,
};
let ports = ports_result
.unwrap_or_default()
.into_iter()
.take(80)
.map(|line| json!({ "raw": line }))
.collect::<Vec<_>>();
let disks = command_json_lines(Command::new("df").args(["-hP"]), 8, parse_disk_line, 50)
.await
.unwrap_or_default();
let networks = command_lines(Command::new("ip").args(["-brief", "addr"]), 8)
.await
.unwrap_or_default()
.into_iter()
.map(|line| json!({ "raw": line }))
.collect::<Vec<_>>();
Ok(json!({
"processes": processes,
"ports": ports,
"disks": disks,
"networks": networks
}))
}
async fn command_lines(cmd: &mut Command, timeout_secs: u64) -> Result<Vec<String>> {
let output = run_command(cmd, timeout_secs).await?;
let text = output_text(output)?;
Ok(text
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.map(ToString::to_string)
.collect())
}
async fn command_json_lines<F>(
cmd: &mut Command,
timeout_secs: u64,
parser: F,
limit: usize,
) -> Result<Vec<Value>>
where
F: Fn(&str) -> Option<Value>,
{
Ok(command_lines(cmd, timeout_secs)
.await?
.into_iter()
.skip(1)
.filter_map(|line| parser(&line))
.take(limit)
.collect())
}
fn parse_process_line(line: &str) -> Option<Value> {
let parts = line.split_whitespace().collect::<Vec<_>>();
if parts.len() < 6 {
return None;
}
Some(json!({
"pid": parts[0],
"ppid": parts[1],
"user": parts[2],
"name": parts[3],
"cpu": parts[4],
"memory": parts[5]
}))
}
fn parse_disk_line(line: &str) -> Option<Value> {
let parts = line.split_whitespace().collect::<Vec<_>>();
if parts.len() < 6 {
return None;
}
Some(json!({
"filesystem": parts[0],
"size": parts[1],
"used": parts[2],
"available": parts[3],
"usage": parts[4],
"mount": parts[5]
}))
}
fn string_param(params: &Value, name: &str) -> Result<String> {
params
.get(name)
.and_then(Value::as_str)
.map(ToString::to_string)
.ok_or_else(|| anyhow!("缺少参数:{name}"))
}
fn path_param(params: &Value, name: &str) -> Result<PathBuf> {
let value = string_param(params, name)?;
if value.contains('\0') {
bail!("路径无效");
}
Ok(PathBuf::from(value))
}
fn file_roots() -> Result<Value> {
let mut roots = Vec::new();
#[cfg(windows)]
{
for letter in b'A'..=b'Z' {
let path = format!("{}:\\", letter as char);
if Path::new(&path).exists() {
roots.push(json!({ "name": path, "path": path }));
}
}
}
#[cfg(not(windows))]
{
roots.push(json!({ "name": "/", "path": "/" }));
}
Ok(json!({ "roots": roots }))
}
fn file_list(path: PathBuf) -> Result<Value> {
let entries = fs::read_dir(&path)
.with_context(|| format!("read directory {}", path.display()))?
.filter_map(|entry| {
let entry = entry.ok()?;
let meta = entry.metadata().ok()?;
Some(FileEntry {
name: entry.file_name().to_string_lossy().to_string(),
path: entry.path().to_string_lossy().to_string(),
is_dir: meta.is_dir(),
size: meta.len(),
modified: meta.modified().ok().and_then(|t| {
t.duration_since(std::time::UNIX_EPOCH)
.ok()
.map(|d| d.as_secs().to_string())
}),
readonly: meta.permissions().readonly(),
})
})
.collect::<Vec<_>>();
Ok(json!({ "path": path, "entries": entries }))
}
fn file_search(params: Value) -> Result<Value> {
let root = path_param(&params, "path")?;
let keyword = string_param(&params, "keyword")?;
let keyword = keyword.trim().to_lowercase();
if keyword.is_empty()
|| keyword.len() > 120
|| keyword.contains('\0')
|| keyword.contains('/')
|| keyword.contains('\\')
{
bail!("搜索关键词无效");
}
let max_depth = params
.get("max_depth")
.and_then(Value::as_u64)
.unwrap_or(5)
.min(10) as usize;
let limit = params
.get("limit")
.and_then(Value::as_u64)
.unwrap_or(100)
.min(500) as usize;
let mut results = Vec::new();
search_dir(&root, &keyword, 0, max_depth, limit, &mut results)?;
Ok(json!({ "path": root, "keyword": keyword, "entries": results }))
}
fn search_dir(
path: &Path,
keyword: &str,
depth: usize,
max_depth: usize,
limit: usize,
results: &mut Vec<Value>,
) -> Result<()> {
if depth > max_depth || results.len() >= limit {
return Ok(());
}
let Ok(entries) = fs::read_dir(path) else {
return Ok(());
};
for entry in entries.flatten() {
if results.len() >= limit {
break;
}
let name = entry.file_name().to_string_lossy().to_string();
let entry_path = entry.path();
let meta = entry.metadata().ok();
let is_dir = meta.as_ref().is_some_and(|value| value.is_dir());
if name.to_lowercase().contains(keyword) {
results.push(json!({
"name": name,
"path": entry_path,
"is_dir": is_dir,
"size": meta.as_ref().map(|value| value.len()).unwrap_or(0)
}));
}
if is_dir {
let _ = search_dir(&entry_path, keyword, depth + 1, max_depth, limit, results);
}
}
Ok(())
}
fn file_read(path: PathBuf) -> Result<Value> {
let meta = fs::metadata(&path)?;
if meta.len() > MAX_TEXT_FILE {
bail!("文件过大,无法按文本读取");
}
Ok(json!({ "path": path, "content": fs::read_to_string(path)? }))
}
fn file_write(path: PathBuf, content: String) -> Result<Value> {
if content.len() as u64 > MAX_TEXT_FILE {
bail!("写入内容过大");
}
fs::write(&path, content)?;
Ok(json!({ "ok": true, "path": path }))
}
fn file_mkdir(path: PathBuf) -> Result<Value> {
fs::create_dir_all(&path)?;
Ok(json!({ "ok": true, "path": path }))
}
fn file_delete(path: PathBuf, recursive: bool) -> Result<Value> {
let meta = fs::metadata(&path)?;
if meta.is_dir() {
if recursive {
fs::remove_dir_all(&path)?;
} else {
fs::remove_dir(&path)?;
}
} else {
fs::remove_file(&path)?;
}
Ok(json!({ "ok": true }))
}
fn file_rename(from: PathBuf, to: PathBuf) -> Result<Value> {
fs::rename(&from, &to)?;
Ok(json!({ "ok": true }))
}
fn file_upload_chunk(params: Value) -> Result<Value> {
let path = path_param(&params, "path")?;
let offset = params.get("offset").and_then(Value::as_u64).unwrap_or(0);
let data = string_param(&params, "data")?;
let bytes = general_purpose::STANDARD
.decode(data)
.context("分片数据不是有效 base64")?;
if bytes.len() > 1024 * 1024 {
bail!("单个分片不能超过 1MB");
}
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let mut file = fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(offset == 0)
.open(&path)?;
file.seek(SeekFrom::Start(offset))?;
file.write_all(&bytes)?;
Ok(json!({ "ok": true, "path": path, "written": bytes.len(), "offset": offset }))
}
fn file_download_chunk(params: Value) -> Result<Value> {
let path = path_param(&params, "path")?;
let offset = params.get("offset").and_then(Value::as_u64).unwrap_or(0);
let size = params
.get("size")
.and_then(Value::as_u64)
.unwrap_or(512 * 1024)
.min(1024 * 1024) as usize;
let meta = fs::metadata(&path)?;
if !meta.is_file() {
bail!("只能下载普通文件");
}
let mut file = fs::File::open(&path)?;
file.seek(SeekFrom::Start(offset))?;
let mut buf = vec![0u8; size];
let read = file.read(&mut buf)?;
buf.truncate(read);
Ok(json!({
"path": path,
"offset": offset,
"size": meta.len(),
"read": read,
"eof": offset + read as u64 >= meta.len(),
"data": general_purpose::STANDARD.encode(buf)
}))
}
fn file_chmod(params: Value) -> Result<Value> {
let path = path_param(&params, "path")?;
let mode = string_param(&params, "mode")?;
if mode.len() > 4 || !mode.chars().all(|c| matches!(c, '0'..='7')) {
bail!("权限模式无效,例如 755");
}
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let value = u32::from_str_radix(&mode, 8)?;
let mut permissions = fs::metadata(&path)?.permissions();
permissions.set_mode(value);
fs::set_permissions(&path, permissions)?;
Ok(json!({ "ok": true, "path": path, "mode": mode }))
}
#[cfg(not(unix))]
{
let _ = path;
bail!("当前系统不支持 chmod")
}
}
async fn log_tail(params: Value) -> Result<Value> {
let path = path_param(&params, "path")?;
let lines = params
.get("lines")
.and_then(Value::as_u64)
.unwrap_or(200)
.min(2000);
let output = run_command(
Command::new("tail")
.arg("-n")
.arg(lines.to_string())
.arg(path),
8,
)
.await?;
command_value(output)
}
async fn service_list() -> Result<Value> {
let output = run_command(
Command::new("systemctl").args([
"list-units",
"--type=service",
"--all",
"--no-pager",
"--plain",
]),
10,
)
.await?;
let text = output_text(output)?;
let services = text
.lines()
.filter(|line| line.ends_with(".service") || line.contains(".service "))
.filter_map(parse_service_line)
.collect::<Vec<_>>();
Ok(json!({ "services": services }))
}
fn parse_service_line(line: &str) -> Option<ServiceInfo> {
let parts = line.split_whitespace().collect::<Vec<_>>();
if parts.len() < 4 || !parts[0].contains(".service") {
return None;
}
Some(ServiceInfo {
name: parts[0].to_string(),
load: parts.get(1).unwrap_or(&"").to_string(),
active: parts.get(2).unwrap_or(&"").to_string(),
sub: parts.get(3).unwrap_or(&"").to_string(),
description: parts.get(4..).unwrap_or(&[]).join(" "),
enabled: None,
})
}
async fn service_status(name: String) -> Result<Value> {
validate_unit(&name)?;
command_value(
run_command(
Command::new("systemctl").args(["status", "--no-pager", &name]),
10,
)
.await?,
)
}
async fn service_action(action: &str, name: String) -> Result<Value> {
validate_unit(&name)?;
command_value(run_command(Command::new("systemctl").args([action, &name]), 30).await?)
}
fn validate_unit(name: &str) -> Result<()> {
if name.len() > 200
|| !name
.chars()
.all(|c| c.is_ascii_alphanumeric() || ".@_-".contains(c))
{
bail!("服务名称无效");
}
Ok(())
}
async fn nginx_status() -> Result<Value> {
let output = run_command(Command::new("nginx").arg("-v"), 8).await;
match output {
Ok(output) => {
Ok(json!({ "installed": output.status.success(), "version": stderr_text(&output) }))
}
Err(_) => Ok(json!({ "installed": false, "version": null })),
}
}
fn nginx_sites() -> Result<Value> {
let enabled = fs::read_dir(NGINX_ENABLED)
.map(|it| {
it.filter_map(|e| e.ok().map(|e| e.file_name().to_string_lossy().to_string()))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let sites = fs::read_dir(NGINX_AVAILABLE)?
.filter_map(|entry| {
let entry = entry.ok()?;
let name = entry.file_name().to_string_lossy().to_string();
Some(NginxSite {
enabled: enabled.contains(&name),
path: entry.path().to_string_lossy().to_string(),
name,
})
})
.collect::<Vec<_>>();
Ok(json!({ "sites": sites }))
}
fn nginx_site_get(name: String) -> Result<Value> {
validate_site_name(&name)?;
let path = Path::new(NGINX_AVAILABLE).join(&name);
Ok(json!({ "name": name, "content": fs::read_to_string(path)? }))
}
async fn nginx_site_create(params: Value) -> Result<Value> {
let name = string_param(&params, "name")?;
validate_site_name(&name)?;
let server_name = params
.get("server_name")
.and_then(Value::as_str)
.unwrap_or(&name);
let mode = params
.get("mode")
.and_then(Value::as_str)
.unwrap_or("static");
let content = match mode {
"proxy" => {
let upstream = params
.get("upstream")
.and_then(Value::as_str)
.unwrap_or("http://127.0.0.1:3000");
nginx_proxy_config(server_name, upstream, &params)
}
"spa" => {
let root = params
.get("root")
.and_then(Value::as_str)
.unwrap_or("/var/www/html");
nginx_spa_config(server_name, root)
}
"load_balance" => nginx_load_balance_config(server_name, &params)?,
"php" => {
let root = params
.get("root")
.and_then(Value::as_str)
.unwrap_or("/var/www/html");
let fastcgi_pass = params
.get("fastcgi_pass")
.and_then(Value::as_str)
.unwrap_or("unix:/run/php/php8.2-fpm.sock");
nginx_php_config(server_name, root, fastcgi_pass)
}
_ => {
let root = params
.get("root")
.and_then(Value::as_str)
.unwrap_or("/var/www/html");
nginx_static_config(server_name, root)
}
};
nginx_site_update(name.clone(), content).await?;
Ok(json!({ "ok": true, "name": name }))
}
fn nginx_site_backups(name: String) -> Result<Value> {
validate_site_name(&name)?;
let prefix = format!("{name}.bak.");
let backups = fs::read_dir(NGINX_AVAILABLE)
.map(|entries| {
entries
.filter_map(|entry| {
let entry = entry.ok()?;
let file_name = entry.file_name().to_string_lossy().to_string();
if !file_name.starts_with(&prefix) {
return None;
}
let meta = entry.metadata().ok()?;
Some(json!({
"name": file_name,
"path": entry.path().to_string_lossy().to_string(),
"size": meta.len(),
"modified": meta.modified().ok().and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok()).map(|duration| duration.as_secs().to_string())
}))
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
Ok(json!({ "name": name, "backups": backups }))
}
async fn nginx_site_restore_backup(name: String, backup: String) -> Result<Value> {
validate_site_name(&name)?;
validate_site_name(&backup)?;
if !backup.starts_with(&format!("{name}.bak.")) {
bail!("备份文件不属于该站点");
}
let backup_path = Path::new(NGINX_AVAILABLE).join(&backup);
if !backup_path.exists() {
bail!("备份文件不存在");
}
let target = Path::new(NGINX_AVAILABLE).join(&name);
if target.exists() {
let rollback = target.with_extension(format!("bak.{}", chrono_like_timestamp()));
fs::copy(&target, rollback)?;
}
fs::copy(&backup_path, &target)?;
match nginx_test().await {
Ok(_) => Ok(json!({ "ok": true, "name": name, "backup": backup })),
Err(err) => bail!("恢复后 Nginx 配置测试失败:{err}"),
}
}
async fn nginx_site_update(name: String, content: String) -> Result<Value> {
validate_site_name(&name)?;
let path = Path::new(NGINX_AVAILABLE).join(&name);
let mut backup_path = None;
if path.exists() {
let backup = path.with_extension(format!("bak.{}", chrono_like_timestamp()));
fs::copy(&path, &backup)?;
backup_path = Some(backup);
}
fs::write(&path, content)?;
match nginx_test().await {
Ok(_) => Ok(json!({ "ok": true })),
Err(err) => {
if let Some(backup) = backup_path {
let _ = fs::copy(backup, &path);
} else {
let _ = fs::remove_file(&path);
}
bail!("Nginx 配置测试失败,已尝试回滚:{err}")
}
}
}
async fn nginx_site_enable(name: String) -> Result<Value> {
validate_site_name(&name)?;
let src = Path::new(NGINX_AVAILABLE).join(&name);
let dst = Path::new(NGINX_ENABLED).join(&name);
if !dst.exists() {
#[cfg(unix)]
std::os::unix::fs::symlink(&src, &dst)?;
#[cfg(windows)]
fs::copy(&src, &dst)?;
}
nginx_test().await?;
Ok(json!({ "ok": true }))
}
fn nginx_site_disable(name: String) -> Result<Value> {
validate_site_name(&name)?;
let dst = Path::new(NGINX_ENABLED).join(&name);
if dst.exists() {
fs::remove_file(dst)?;
}
Ok(json!({ "ok": true }))
}
async fn nginx_test() -> Result<Value> {
command_value(run_command(Command::new("nginx").arg("-t"), 15).await?)
}
async fn nginx_reload() -> Result<Value> {
nginx_test().await?;
command_value(run_command(Command::new("systemctl").args(["reload", "nginx"]), 20).await?)
}
async fn nginx_ssl_status() -> Result<Value> {
let certbot = run_command(Command::new("certbot").arg("--version"), 8).await;
let installed = certbot
.as_ref()
.map(|output| output.status.success())
.unwrap_or(false);
let version = certbot
.as_ref()
.map(|output| {
let stderr = stderr_text(output);
if stderr.trim().is_empty() {
stdout_text(output)
} else {
stderr
}
})
.unwrap_or_default();
let certbot_timer = run_command(
Command::new("systemctl").args(["is-enabled", "certbot.timer"]),
8,
)
.await;
let lightops_timer = run_command(
Command::new("systemctl").args(["is-enabled", "lightops-certbot-renew.timer"]),
8,
)
.await;
let auto_renew_enabled = certbot_timer
.as_ref()
.map(|output| output.status.success())
.unwrap_or(false)
|| lightops_timer
.as_ref()
.map(|output| output.status.success())
.unwrap_or(false);
let auto_renew_provider = if certbot_timer
.as_ref()
.map(|output| output.status.success())
.unwrap_or(false)
{
"certbot.timer"
} else if lightops_timer
.as_ref()
.map(|output| output.status.success())
.unwrap_or(false)
{
"lightops-certbot-renew.timer"
} else {
"none"
};
let certs = list_letsencrypt_certs().await;
Ok(json!({
"installed": installed,
"version": version.trim(),
"auto_renew_enabled": auto_renew_enabled,
"auto_renew_provider": auto_renew_provider,
"certs": certs
}))
}
async fn nginx_ssl_issue(params: Value) -> Result<Value> {
ensure_certbot_installed().await?;
let domains = params
.get("domains")
.and_then(Value::as_array)
.ok_or_else(|| anyhow!("缺少域名列表"))?
.iter()
.filter_map(Value::as_str)
.map(str::trim)
.filter(|domain| !domain.is_empty())
.map(ToString::to_string)
.collect::<Vec<_>>();
if domains.is_empty() {
bail!("至少需要一个域名");
}
for domain in &domains {
validate_domain(domain)?;
}
let email = params.get("email").and_then(Value::as_str).map(str::trim);
if let Some(email) = email {
validate_email(email)?;
}
let mut command = Command::new("certbot");
command
.args(["--nginx", "--non-interactive", "--agree-tos", "--redirect"])
.arg("--keep-until-expiring");
if let Some(email) = email.filter(|value| !value.is_empty()) {
command.args(["-m", email]);
} else {
command.arg("--register-unsafely-without-email");
}
for domain in &domains {
command.args(["-d", domain]);
}
let result = command_value(run_command(&mut command, 120).await?)?;
let _ = nginx_reload().await;
Ok(result)
}
async fn nginx_ssl_renew() -> Result<Value> {
ensure_certbot_installed().await?;
let result = command_value(run_command(Command::new("certbot").arg("renew"), 120).await?)?;
let _ = nginx_reload().await;
Ok(result)
}
async fn nginx_ssl_auto_renew() -> Result<Value> {
ensure_certbot_installed().await?;
let systemctl = run_command(
Command::new("systemctl").args(["enable", "--now", "certbot.timer"]),
20,
)
.await;
match systemctl {
Ok(output) if output.status.success() => command_value(output),
_ => {
install_lightops_certbot_timer()?;
command_value(run_command(Command::new("systemctl").arg("daemon-reload"), 20).await?)?;
command_value(
run_command(
Command::new("systemctl").args([
"enable",
"--now",
"lightops-certbot-renew.timer",
]),
20,
)
.await?,
)
}
}
}
async fn ensure_certbot_installed() -> Result<()> {
let output = run_command(Command::new("certbot").arg("--version"), 8).await;
match output {
Ok(output) if output.status.success() => Ok(()),
_ => bail!("未检测到 certbot请先在 Agent 主机安装 certbot 和 Nginx 插件"),
}
}
fn install_lightops_certbot_timer() -> Result<()> {
let service = r#"[Unit]
Description=LightOps Certbot 自动续期
Wants=network-online.target
After=network-online.target
[Service]
Type=oneshot
ExecStart=/usr/bin/env certbot renew --quiet --deploy-hook "systemctl reload nginx"
"#;
let timer = r#"[Unit]
Description=LightOps 每日检查 Certbot 证书续期
[Timer]
OnCalendar=*-*-* 03:35:00
RandomizedDelaySec=1800
Persistent=true
[Install]
WantedBy=timers.target
"#;
fs::write(
"/etc/systemd/system/lightops-certbot-renew.service",
service,
)?;
fs::write("/etc/systemd/system/lightops-certbot-renew.timer", timer)?;
Ok(())
}
async fn list_letsencrypt_certs() -> Vec<Value> {
let root = Path::new("/etc/letsencrypt/live");
let Ok(entries) = fs::read_dir(root) else {
return Vec::new();
};
let mut certs = Vec::new();
for entry in entries {
if let Some(cert) = async {
let entry = entry.ok()?;
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with('.') {
return None;
}
let fullchain = entry.path().join("fullchain.pem");
let (expires_at, days_remaining, status) = cert_expiry(&fullchain).await;
Some(json!({
"name": name,
"path": entry.path().to_string_lossy().to_string(),
"fullchain": fullchain.to_string_lossy().to_string(),
"privkey": entry.path().join("privkey.pem").to_string_lossy().to_string(),
"expires_at": expires_at,
"days_remaining": days_remaining,
"status": status
}))
}
.await
{
certs.push(cert);
}
}
certs
}
async fn cert_expiry(path: &Path) -> (Option<String>, Option<i64>, String) {
let output = run_command(
Command::new("openssl")
.args(["x509", "-enddate", "-noout", "-in"])
.arg(path),
8,
)
.await;
let Ok(output) = output else {
return (None, None, "unknown".into());
};
if !output.status.success() {
return (None, None, "unknown".into());
}
let text = stdout_text(&output);
let raw = text.trim().strip_prefix("notAfter=").unwrap_or(text.trim());
let Ok(naive) = NaiveDateTime::parse_from_str(raw, "%b %e %H:%M:%S %Y GMT") else {
return (Some(raw.to_string()), None, "unknown".into());
};
let expires = DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc);
let days = (expires - Utc::now()).num_days();
let status = if days < 0 {
"expired"
} else if days <= 7 {
"critical"
} else if days <= 30 {
"warning"
} else {
"valid"
};
(Some(expires.to_rfc3339()), Some(days), status.into())
}
fn nginx_static_config(server_name: &str, root: &str) -> String {
format!(
"server {{\n listen 80;\n server_name {server_name};\n\n root {root};\n index index.html index.htm;\n\n location / {{\n try_files $uri $uri/ =404;\n }}\n}}\n"
)
}
fn nginx_spa_config(server_name: &str, root: &str) -> String {
format!(
"server {{\n listen 80;\n server_name {server_name};\n\n root {root};\n index index.html;\n\n location / {{\n try_files $uri $uri/ /index.html;\n }}\n\n location ~* \\.(?:css|js|jpg|jpeg|gif|png|ico|svg|webp|woff2?)$ {{\n try_files $uri =404;\n expires 7d;\n add_header Cache-Control \"public, max-age=604800\";\n }}\n}}\n"
)
}
fn nginx_php_config(server_name: &str, root: &str, fastcgi_pass: &str) -> String {
format!(
"server {{\n listen 80;\n server_name {server_name};\n\n root {root};\n index index.php index.html index.htm;\n\n location / {{\n try_files $uri $uri/ /index.php?$query_string;\n }}\n\n location ~ \\.php$ {{\n include snippets/fastcgi-php.conf;\n fastcgi_pass {fastcgi_pass};\n }}\n\n location ~ /\\.ht {{\n deny all;\n }}\n}}\n"
)
}
fn nginx_load_balance_config(server_name: &str, params: &Value) -> Result<String> {
let upstream_name = format!("lightops_{}", sanitize_upstream_name(server_name));
let upstreams = params
.get("upstreams")
.and_then(Value::as_array)
.ok_or_else(|| anyhow!("缺少上游列表"))?
.iter()
.filter_map(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.collect::<Vec<_>>();
if upstreams.is_empty() {
bail!("至少需要一个上游地址");
}
let mut upstream_block = format!("upstream {upstream_name} {{\n");
for upstream in upstreams {
validate_nginx_upstream(upstream)?;
upstream_block.push_str(&format!(" server {upstream};\n"));
}
upstream_block.push_str(" keepalive 32;\n}\n\n");
let mut proxy_params = params.clone();
if let Some(obj) = proxy_params.as_object_mut() {
obj.insert("websocket".into(), json!(true));
obj.insert("gzip".into(), json!(true));
}
Ok(format!(
"{upstream_block}{}",
nginx_proxy_config(
server_name,
&format!("http://{upstream_name}"),
&proxy_params
)
))
}
fn nginx_proxy_config(server_name: &str, upstream: &str, params: &Value) -> String {
let websocket = params
.get("websocket")
.and_then(Value::as_bool)
.unwrap_or(true);
let gzip = params.get("gzip").and_then(Value::as_bool).unwrap_or(true);
let cache_static = params
.get("cache_static")
.and_then(Value::as_bool)
.unwrap_or(false);
let force_https = params
.get("force_https")
.and_then(Value::as_bool)
.unwrap_or(false);
let client_max_body_size = params
.get("client_max_body_size")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("64m");
let client_max_body_size = sanitize_nginx_size(client_max_body_size);
let mut extra = String::new();
if force_https {
extra.push_str(
" if ($scheme = http) {\n return 301 https://$host$request_uri;\n }\n\n",
);
}
if gzip {
extra.push_str(" gzip on;\n gzip_types text/plain text/css application/json application/javascript application/xml image/svg+xml;\n\n");
}
let websocket_headers = if websocket {
" proxy_http_version 1.1;\n proxy_set_header Upgrade $http_upgrade;\n proxy_set_header Connection \"upgrade\";\n"
} else {
""
};
let static_cache = if cache_static {
"\n location ~* \\.(?:css|js|jpg|jpeg|gif|png|ico|svg|webp|woff2?)$ {\n proxy_pass {upstream};\n expires 7d;\n add_header Cache-Control \"public, max-age=604800\";\n }\n"
} else {
""
};
format!(
"server {{\n listen 80;\n server_name {server_name};\n client_max_body_size {client_max_body_size};\n\n{extra} location / {{\n proxy_pass {upstream};\n proxy_set_header Host $host;\n proxy_set_header X-Real-IP $remote_addr;\n proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;\n proxy_set_header X-Forwarded-Proto $scheme;\n{websocket_headers} proxy_read_timeout 300s;\n proxy_send_timeout 300s;\n }}\n{static_cache}}}\n"
)
.replace("{upstream}", upstream)
}
fn sanitize_upstream_name(server_name: &str) -> String {
let name = server_name
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() {
c.to_ascii_lowercase()
} else {
'_'
}
})
.collect::<String>();
name.trim_matches('_').chars().take(80).collect()
}
fn validate_nginx_upstream(value: &str) -> Result<()> {
if value.len() > 200
|| value.contains('\0')
|| value.contains(';')
|| value.contains('{')
|| value.contains('}')
|| value.contains('$')
|| value.contains('`')
|| value.contains(' ')
{
bail!("上游地址无效:{value}");
}
Ok(())
}
fn sanitize_nginx_size(value: &str) -> String {
if value.len() <= 16
&& value
.chars()
.all(|c| c.is_ascii_digit() || matches!(c, 'k' | 'K' | 'm' | 'M' | 'g' | 'G'))
{
value.to_string()
} else {
"64m".into()
}
}
fn validate_site_name(name: &str) -> Result<()> {
if name.len() > 180
|| name.contains('/')
|| name.contains('\\')
|| name.contains("..")
|| name.contains('\0')
{
bail!("站点名称无效");
}
Ok(())
}
fn validate_domain(domain: &str) -> Result<()> {
if domain.len() > 253
|| domain.starts_with('-')
|| domain.ends_with('-')
|| domain.contains("..")
|| !domain
.chars()
.all(|c| c.is_ascii_alphanumeric() || ".-".contains(c))
{
bail!("域名无效:{domain}");
}
Ok(())
}
fn validate_email(email: &str) -> Result<()> {
if email.len() > 254 || email.contains('\0') || !email.contains('@') {
bail!("邮箱无效");
}
Ok(())
}
fn chrono_like_timestamp() -> String {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
.to_string()
}
async fn docker_status() -> Result<Value> {
let output = run_command(
Command::new("docker")
.arg("version")
.arg("--format")
.arg("{{json .Server.Version}}"),
8,
)
.await;
match output {
Ok(output) => Ok(
json!({ "installed": output.status.success(), "version": stdout_text(&output).trim() }),
),
Err(_) => Ok(json!({ "installed": false, "version": null })),
}
}
async fn docker_containers() -> Result<Value> {
let output = run_command(
Command::new("docker").args(["ps", "-a", "--format", "{{json .}}"]),
12,
)
.await?;
let text = output_text(output)?;
let containers = text
.lines()
.filter_map(|line| serde_json::from_str::<Value>(line).ok())
.map(|v| DockerContainer {
id: v
.get("ID")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string(),
image: v
.get("Image")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string(),
command: v
.get("Command")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string(),
status: v
.get("Status")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string(),
names: v
.get("Names")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string(),
ports: v
.get("Ports")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string(),
})
.collect::<Vec<_>>();
Ok(json!({ "containers": containers }))
}
async fn docker_inspect(id: String) -> Result<Value> {
validate_docker_id(&id)?;
let output = run_command(Command::new("docker").args(["inspect", &id]), 20).await?;
let text = output_text(output)?;
let parsed: Value = serde_json::from_str(&text).context("Docker inspect 输出不是有效 JSON")?;
let detail = parsed
.as_array()
.and_then(|items| items.first())
.cloned()
.unwrap_or(parsed);
Ok(json!({ "id": id, "detail": detail }))
}
async fn docker_stats(id: String) -> Result<Value> {
validate_docker_id(&id)?;
let output = run_command(
Command::new("docker").args(["stats", "--no-stream", "--format", "{{json .}}", &id]),
20,
)
.await?;
let text = output_text(output)?;
let stats = text
.lines()
.find_map(|line| serde_json::from_str::<Value>(line).ok())
.unwrap_or_else(|| json!({}));
Ok(json!({ "id": id, "stats": stats }))
}
async fn docker_images() -> Result<Value> {
let output = run_command(
Command::new("docker").args(["images", "--format", "{{json .}}"]),
12,
)
.await?;
let text = output_text(output)?;
let images = text
.lines()
.filter_map(|line| serde_json::from_str::<Value>(line).ok())
.map(|v| DockerImage {
repository: v
.get("Repository")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string(),
tag: v
.get("Tag")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string(),
id: v
.get("ID")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string(),
size: v
.get("Size")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string(),
})
.collect::<Vec<_>>();
Ok(json!({ "images": images }))
}
async fn docker_volumes() -> Result<Value> {
let output = run_command(
Command::new("docker").args(["volume", "ls", "--format", "{{json .}}"]),
12,
)
.await?;
let text = output_text(output)?;
let volumes = text
.lines()
.filter_map(|line| serde_json::from_str::<Value>(line).ok())
.collect::<Vec<_>>();
Ok(json!({ "volumes": volumes }))
}
async fn docker_volume_delete(name: String) -> Result<Value> {
validate_docker_volume_name(&name)?;
command_value(run_command(Command::new("docker").args(["volume", "rm", &name]), 60).await?)
}
async fn docker_simple(action: &str, id: String) -> Result<Value> {
validate_docker_id(&id)?;
command_value(run_command(Command::new("docker").args([action, &id]), 30).await?)
}
async fn docker_logs(id: String, tail: u64) -> Result<Value> {
validate_docker_id(&id)?;
let tail = tail.min(2000).to_string();
command_value(
run_command(
Command::new("docker").args(["logs", "--tail", &tail, &id]),
30,
)
.await?,
)
}
async fn docker_rmi(id: String) -> Result<Value> {
validate_docker_id(&id)?;
command_value(run_command(Command::new("docker").args(["rmi", &id]), 60).await?)
}
async fn docker_pull(image: String) -> Result<Value> {
validate_docker_image(&image)?;
command_value(run_command(Command::new("docker").args(["pull", &image]), 600).await?)
}
async fn docker_run(params: Value) -> Result<Value> {
let image = string_param(&params, "image")?;
validate_docker_image(&image)?;
let mut cmd = Command::new("docker");
cmd.arg("run");
if params
.get("detach")
.and_then(Value::as_bool)
.unwrap_or(true)
{
cmd.arg("-d");
}
if let Some(name) = params.get("name").and_then(Value::as_str).map(str::trim) {
if !name.is_empty() {
validate_container_name(name)?;
cmd.args(["--name", name]);
}
}
if let Some(restart) = params.get("restart").and_then(Value::as_str) {
validate_restart_policy(restart)?;
if restart != "no" {
cmd.args(["--restart", restart]);
}
}
if let Some(items) = params.get("ports").and_then(Value::as_array) {
for item in items {
let host = item
.get("host")
.and_then(Value::as_u64)
.ok_or_else(|| anyhow!("端口映射缺少宿主机端口"))?;
let container = item
.get("container")
.and_then(Value::as_u64)
.ok_or_else(|| anyhow!("端口映射缺少容器端口"))?;
validate_port(host)?;
validate_port(container)?;
let protocol = item
.get("protocol")
.and_then(Value::as_str)
.unwrap_or("tcp");
if !matches!(protocol, "tcp" | "udp") {
bail!("端口协议只支持 tcp/udp");
}
cmd.args(["-p", &format!("{host}:{container}/{protocol}")]);
}
}
if let Some(items) = params.get("volumes").and_then(Value::as_array) {
for item in items {
let host = item
.get("host")
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("卷映射缺少宿主机路径"))?;
let container = item
.get("container")
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("卷映射缺少容器路径"))?;
validate_volume_path(host)?;
validate_volume_path(container)?;
let suffix = if item
.get("readonly")
.and_then(Value::as_bool)
.unwrap_or(false)
{
":ro"
} else {
""
};
cmd.args(["-v", &format!("{host}:{container}{suffix}")]);
}
}
if let Some(env) = params.get("env").and_then(Value::as_object) {
for (key, value) in env {
validate_env_key(key)?;
let value = value.as_str().unwrap_or_default();
if value.contains('\0') || value.len() > 2048 {
bail!("环境变量值无效");
}
cmd.args(["-e", &format!("{key}={value}")]);
}
}
cmd.arg(&image);
if let Some(command) = params.get("command").and_then(Value::as_str).map(str::trim) {
if !command.is_empty() {
validate_command_args(command)?;
for part in command.split_whitespace() {
cmd.arg(part);
}
}
}
command_value(run_command(&mut cmd, 120).await?)
}
async fn docker_compose_projects() -> Result<Value> {
let output = run_command(
Command::new("docker").args(["ps", "-a", "--format", "{{json .}}"]),
12,
)
.await?;
let text = output_text(output)?;
let mut projects = serde_json::Map::new();
for line in text.lines() {
let Ok(value) = serde_json::from_str::<Value>(line) else {
continue;
};
let labels = value
.get("Labels")
.and_then(Value::as_str)
.unwrap_or_default();
let Some(project) = label_value(labels, "com.docker.compose.project") else {
continue;
};
let service = label_value(labels, "com.docker.compose.service").unwrap_or_default();
let status = value
.get("Status")
.and_then(Value::as_str)
.unwrap_or_default();
let entry = projects.entry(project.clone()).or_insert_with(|| {
json!({
"name": project,
"services": [],
"containers": [],
"running": 0,
"stopped": 0,
"status": "Stopped"
})
});
if let Some(obj) = entry.as_object_mut() {
push_unique_json_string(obj, "services", &service);
if let Some(id) = value.get("ID").and_then(Value::as_str) {
push_unique_json_string(obj, "containers", id);
}
let key = if status.starts_with("Up") {
"running"
} else {
"stopped"
};
let next = obj.get(key).and_then(Value::as_u64).unwrap_or(0) + 1;
obj.insert(key.into(), json!(next));
obj.insert(
"status".into(),
json!(if status.starts_with("Up") {
"Running"
} else {
"Stopped"
}),
);
}
}
Ok(json!({ "projects": projects.into_iter().map(|(_, value)| value).collect::<Vec<_>>() }))
}
async fn docker_compose_action(action: &str, project: String) -> Result<Value> {
validate_compose_project(&project)?;
let ids = compose_container_ids(&project).await?;
if ids.is_empty() {
bail!("未找到 Compose 项目容器");
}
let mut cmd = Command::new("docker");
cmd.arg(action);
for id in &ids {
cmd.arg(id);
}
command_value(run_command(&mut cmd, 60).await?)
}
async fn docker_compose_logs(project: String, tail: u64) -> Result<Value> {
validate_compose_project(&project)?;
let ids = compose_container_ids(&project).await?;
let tail = tail.min(2000).to_string();
let mut content = String::new();
for id in ids {
let output = run_command(
Command::new("docker").args(["logs", "--tail", &tail, &id]),
30,
)
.await?;
content.push_str(&format!("===== {id} =====\n"));
content.push_str(&stdout_text(&output));
content.push_str(&stderr_text(&output));
if !content.ends_with('\n') {
content.push('\n');
}
}
Ok(json!({ "ok": true, "stdout": content, "stderr": "" }))
}
async fn docker_compose_preflight(params: Value) -> Result<Value> {
let project = string_param(&params, "project")?;
validate_compose_project(&project)?;
let work_dir = path_param(&params, "work_dir")?;
let data_dir = path_param(&params, "data_dir")?;
validate_absolute_path(&work_dir)?;
validate_absolute_path(&data_dir)?;
let docker = run_command(Command::new("docker").arg("info"), 15).await?;
if !docker.status.success() {
bail!("Docker 不可用:{}", stderr_text(&docker));
}
let compose = run_command(Command::new("docker").args(["compose", "version"]), 15).await?;
if !compose.status.success() {
bail!("Docker Compose 不可用:{}", stderr_text(&compose));
}
let ids = compose_container_ids(&project).await?;
if !ids.is_empty() {
bail!("Compose 项目名已存在:{project}");
}
if work_dir.join("compose.yaml").exists() {
bail!("工作目录已存在 compose.yaml为避免覆盖请更换项目名或工作目录");
}
let mut warnings = Vec::new();
if dir_has_entries(&work_dir)? {
warnings.push(format!("工作目录 {} 非空", work_dir.display()));
}
if dir_has_entries(&data_dir)? {
warnings.push(format!(
"数据目录 {} 非空,安装会复用该目录",
data_dir.display()
));
}
if let Some(ports) = params.get("ports").and_then(Value::as_array) {
for port in ports {
let port = port.as_u64().ok_or_else(|| anyhow!("端口必须是数字"))?;
validate_port(port)?;
if port_in_use(port as u16).await? {
bail!("端口已被占用:{port}");
}
}
}
Ok(json!({
"ok": true,
"checks": {
"docker": true,
"compose": true,
"project_available": true,
"ports_available": true,
"work_dir_safe": true
},
"warnings": warnings
}))
}
async fn docker_compose_deploy(params: Value) -> Result<Value> {
let project = string_param(&params, "project")?;
validate_compose_project(&project)?;
let work_dir = path_param(&params, "work_dir")?;
if !work_dir.is_absolute() {
bail!("工作目录必须是绝对路径");
}
let content = string_param(&params, "content")?;
if content.is_empty() || content.len() > 256 * 1024 {
bail!("Compose 内容为空或超过 256KB");
}
fs::create_dir_all(&work_dir)?;
let compose_path = work_dir.join("compose.yaml");
let backup_path = work_dir.join(format!(
".compose.yaml.lightops-bak-{}",
chrono_like_timestamp()
));
let had_old = compose_path.exists();
if had_old {
fs::copy(&compose_path, &backup_path)?;
}
fs::write(&compose_path, content)?;
let result = run_command(
Command::new("docker")
.arg("compose")
.args(["-p", &project])
.arg("-f")
.arg(&compose_path)
.args(["up", "-d"]),
180,
)
.await;
match result {
Ok(output) if output.status.success() => Ok(json!({
"ok": true,
"stdout": stdout_text(&output),
"stderr": stderr_text(&output),
"work_dir": work_dir,
"compose_file": compose_path
})),
Ok(output) => {
if had_old {
let _ = fs::copy(&backup_path, &compose_path);
} else {
let _ = fs::remove_file(&compose_path);
}
bail!("{}", stderr_text(&output))
}
Err(err) => {
if had_old {
let _ = fs::copy(&backup_path, &compose_path);
} else {
let _ = fs::remove_file(&compose_path);
}
Err(err)
}
}
}
fn validate_absolute_path(path: &Path) -> Result<()> {
if path.is_absolute()
&& path.to_string_lossy().len() <= 300
&& !path.to_string_lossy().contains('\0')
{
Ok(())
} else {
bail!("路径必须是安全的绝对路径")
}
}
fn dir_has_entries(path: &Path) -> Result<bool> {
if !path.exists() {
return Ok(false);
}
if !path.is_dir() {
bail!("路径不是目录:{}", path.display());
}
Ok(fs::read_dir(path)?.next().is_some())
}
async fn port_in_use(port: u16) -> Result<bool> {
let output = run_command(Command::new("ss").args(["-H", "-tuln"]), 8).await;
let text = match output {
Ok(output) if output.status.success() => stdout_text(&output),
_ => {
let output = run_command(Command::new("netstat").args(["-tuln"]), 8).await?;
if !output.status.success() {
return Ok(false);
}
stdout_text(&output)
}
};
Ok(text.lines().any(|line| line_has_port(line, port)))
}
fn line_has_port(line: &str, port: u16) -> bool {
let suffix = format!(":{port}");
line.split_whitespace()
.any(|part| part.ends_with(&suffix) || part.contains(&format!("{suffix} ")))
}
async fn docker_compose_update(params: Value) -> Result<Value> {
let (project, compose_path) = compose_file_params(&params)?;
let pull = run_command(
Command::new("docker")
.arg("compose")
.args(["-p", &project])
.arg("-f")
.arg(&compose_path)
.arg("pull"),
300,
)
.await?;
if !pull.status.success() {
bail!("{}", stderr_text(&pull));
}
let up = run_command(
Command::new("docker")
.arg("compose")
.args(["-p", &project])
.arg("-f")
.arg(&compose_path)
.args(["up", "-d"]),
180,
)
.await?;
command_value_with_extra(
up,
json!({
"pull_stdout": stdout_text(&pull),
"pull_stderr": stderr_text(&pull),
"compose_file": compose_path
}),
)
}
async fn docker_compose_down(params: Value) -> Result<Value> {
let (project, compose_path) = compose_file_params(&params)?;
command_value(
run_command(
Command::new("docker")
.arg("compose")
.args(["-p", &project])
.arg("-f")
.arg(&compose_path)
.arg("down"),
120,
)
.await?,
)
}
fn compose_file_params(params: &Value) -> Result<(String, PathBuf)> {
let project = string_param(params, "project")?;
validate_compose_project(&project)?;
let work_dir = path_param(params, "work_dir")?;
if !work_dir.is_absolute() {
bail!("工作目录必须是绝对路径");
}
let compose_path = work_dir.join("compose.yaml");
if !compose_path.exists() {
bail!("Compose 文件不存在");
}
Ok((project, compose_path))
}
async fn compose_container_ids(project: &str) -> Result<Vec<String>> {
let label = format!("label=com.docker.compose.project={project}");
let output = run_command(
Command::new("docker").args(["ps", "-aq", "--filter", &label]),
12,
)
.await?;
output_text(output).map(|text| {
text.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.map(ToString::to_string)
.collect()
})
}
fn label_value(labels: &str, key: &str) -> Option<String> {
labels.split(',').find_map(|item| {
item.trim()
.strip_prefix(&format!("{key}="))
.map(ToString::to_string)
})
}
fn push_unique_json_string(obj: &mut serde_json::Map<String, Value>, key: &str, value: &str) {
if value.is_empty() {
return;
}
let values = obj.entry(key.to_string()).or_insert_with(|| json!([]));
if let Some(array) = values.as_array_mut() {
if !array.iter().any(|item| item.as_str() == Some(value)) {
array.push(json!(value));
}
}
}
fn validate_docker_id(id: &str) -> Result<()> {
if id.len() > 200
|| !id
.chars()
.all(|c| c.is_ascii_alphanumeric() || ".:/@_-".contains(c))
{
bail!("Docker 标识无效");
}
Ok(())
}
fn validate_docker_image(image: &str) -> Result<()> {
if image.is_empty()
|| image.len() > 255
|| image.contains('\0')
|| image.starts_with('-')
|| image.contains("://")
|| !image
.chars()
.all(|c| c.is_ascii_alphanumeric() || ".:/@_-".contains(c))
{
bail!("Docker 镜像名称无效");
}
Ok(())
}
fn validate_container_name(name: &str) -> Result<()> {
if name.len() > 128
|| !name
.chars()
.all(|c| c.is_ascii_alphanumeric() || "._-".contains(c))
{
bail!("容器名称无效");
}
Ok(())
}
fn validate_restart_policy(policy: &str) -> Result<()> {
if matches!(policy, "no" | "always" | "unless-stopped" | "on-failure") {
Ok(())
} else {
bail!("重启策略无效")
}
}
fn validate_port(port: u64) -> Result<()> {
if (1..=65535).contains(&port) {
Ok(())
} else {
bail!("端口无效")
}
}
fn validate_volume_path(path: &str) -> Result<()> {
if path.is_empty()
|| path.len() > 300
|| path.contains('\0')
|| path.contains('\n')
|| path.contains('\r')
{
bail!("卷路径无效");
}
Ok(())
}
fn validate_docker_volume_name(name: &str) -> Result<()> {
if name.is_empty()
|| name.len() > 255
|| name.starts_with('-')
|| !name
.chars()
.all(|c| c.is_ascii_alphanumeric() || "._-".contains(c))
{
bail!("Docker volume 名称无效");
}
Ok(())
}
fn validate_env_key(key: &str) -> Result<()> {
if key.is_empty()
|| key.len() > 80
|| !key
.chars()
.all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
{
bail!("环境变量名无效,建议使用大写字母、数字和下划线");
}
Ok(())
}
fn validate_command_args(command: &str) -> Result<()> {
if command.len() > 500
|| command.contains('\0')
|| command.contains("&&")
|| command.contains("||")
|| command.contains(';')
|| command.contains('`')
|| command.contains("$(")
{
bail!("容器命令不安全");
}
Ok(())
}
fn validate_compose_project(project: &str) -> Result<()> {
if project.is_empty()
|| project.len() > 128
|| !project
.chars()
.all(|c| c.is_ascii_alphanumeric() || "._-".contains(c))
{
bail!("Compose 项目名称无效");
}
Ok(())
}
async fn run_command(cmd: &mut Command, timeout_secs: u64) -> Result<std::process::Output> {
timeout(Duration::from_secs(timeout_secs), cmd.output())
.await
.map_err(|_| anyhow!("命令执行超时"))?
.map_err(Into::into)
}
fn command_value(output: std::process::Output) -> Result<Value> {
let success = output.status.success();
let stdout = stdout_text(&output);
let stderr = stderr_text(&output);
if !success {
bail!("{}", if stderr.is_empty() { stdout } else { stderr });
}
Ok(json!({ "ok": true, "stdout": stdout, "stderr": stderr }))
}
fn command_value_with_extra(output: std::process::Output, extra: Value) -> Result<Value> {
let mut value = command_value(output)?;
if let (Some(value), Some(extra)) = (value.as_object_mut(), extra.as_object()) {
for (key, item) in extra {
value.insert(key.clone(), item.clone());
}
}
Ok(value)
}
fn output_text(output: std::process::Output) -> Result<String> {
if output.status.success() {
Ok(String::from_utf8_lossy(&output.stdout).to_string())
} else {
bail!("{}", String::from_utf8_lossy(&output.stderr));
}
}
fn stdout_text(output: &std::process::Output) -> String {
String::from_utf8_lossy(&output.stdout).to_string()
}
fn stderr_text(output: &std::process::Output) -> String {
String::from_utf8_lossy(&output.stderr).to_string()
}