use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; use chrono::Utc; use lightops_common::protocol::{ Application, ApplicationDetail, ApplicationProviderType, ApplicationRelation, ApplicationStatus, ApplicationType, }; use serde_json::{json, Value}; use std::{ collections::HashSet, fs, path::{Path, PathBuf}, time::Instant, }; use tokio::{net::TcpStream, process::Command, time::Duration}; use uuid::Uuid; const NGINX_AVAILABLE: &str = "/etc/nginx/sites-available"; const NGINX_ENABLED: &str = "/etc/nginx/sites-enabled"; const NGINX_CONF_D: &str = "/etc/nginx/conf.d"; #[async_trait] #[allow(dead_code)] pub trait ApplicationProvider { async fn detect(&self) -> Result; async fn list_apps(&self) -> Result>; async fn get_app(&self, id: &str) -> Result; async fn start(&self, id: &str) -> Result<()>; async fn stop(&self, id: &str) -> Result<()>; async fn restart(&self, id: &str) -> Result<()>; async fn reload(&self, id: &str) -> Result<()>; async fn logs(&self, id: &str, lines: usize) -> Result>; async fn update(&self, id: &str) -> Result<()>; async fn uninstall(&self, id: &str) -> Result<()>; } pub async fn handle(action: &str, params: Value) -> Result { match action { "app.discover" | "app.list" => discover().await, "app.get" => app_detail(params).await, "app.start" => provider_action(params, ProviderVerb::Start).await, "app.stop" => provider_action(params, ProviderVerb::Stop).await, "app.restart" => provider_action(params, ProviderVerb::Restart).await, "app.reload" => provider_action(params, ProviderVerb::Reload).await, "app.logs" => app_logs(params).await, "app.health" => app_health(params).await, "app.update" => bail!("暂不支持操作:应用更新"), "app.uninstall" => bail!("暂不支持操作:应用卸载需要先实现备份能力"), "app.backup" => app_backup(params).await, "app.restore" => bail!("暂不支持操作:应用恢复"), "app.manage_custom" => manage_custom(params, false).await, "app.create_systemd_service" => manage_custom(params, true).await, "app.unmanage" => Ok(json!({ "ok": true })), _ => bail!("不支持的操作"), } } async fn discover() -> Result { let agent_id = agent_id(); let mut apps = Vec::new(); let mut relations = Vec::new(); let providers: Vec> = vec![ Box::new(SystemdAppProvider::new(agent_id.clone())), Box::new(DockerComposeAppProvider::new(agent_id.clone())), Box::new(DockerAppProvider::new(agent_id.clone())), Box::new(NginxSiteAppProvider::new(agent_id.clone())), Box::new(PackageAppProvider::new(agent_id.clone())), Box::new(LightOpsManagedAppProvider::new(agent_id.clone())), ]; for provider in providers { if provider.detect().await.unwrap_or(false) { match provider.list_apps().await { Ok(found) => merge_apps(&mut apps, found), Err(err) => tracing::debug!(?err, "应用 Provider 执行失败"), } } } enrich_with_ports(&mut apps).await; build_relations(&apps, &mut relations); Ok(json!({ "applications": apps, "relations": relations })) } fn merge_apps(apps: &mut Vec, found: Vec) { for mut app in found { if let Some(existing) = apps.iter_mut().find(|existing| same_app(existing, &app)) { if existing.version.is_none() { existing.version = app.version.take(); } if existing.install_path.is_none() { existing.install_path = app.install_path.take(); } if existing.work_dir.is_none() { existing.work_dir = app.work_dir.take(); } merge_vec(&mut existing.config_paths, app.config_paths); merge_vec(&mut existing.log_paths, app.log_paths); merge_vec(&mut existing.data_paths, app.data_paths); merge_vec(&mut existing.ports, app.ports); merge_vec(&mut existing.domains, app.domains); existing.package_name = existing.package_name.take().or(app.package_name.take()); existing.service_name = existing.service_name.take().or(app.service_name.take()); existing.container_id = existing.container_id.take().or(app.container_id.take()); existing.nginx_site = existing.nginx_site.take().or(app.nginx_site.take()); existing.metadata = merge_metadata(existing.metadata.clone(), app.metadata); } else { apps.push(app); } } } fn same_app(left: &Application, right: &Application) -> bool { left.id == right.id || left.name == right.name || (left.service_name.is_some() && left.service_name == right.service_name) || (left.container_id.is_some() && left.container_id == right.container_id) || (left.nginx_site.is_some() && left.nginx_site == right.nginx_site) } fn merge_vec(target: &mut Vec, incoming: Vec) { let mut seen = target.iter().cloned().collect::>(); for item in incoming { if seen.insert(item.clone()) { target.push(item); } } } fn merge_metadata(left: Value, right: Value) -> Value { let mut merged = left.as_object().cloned().unwrap_or_default(); if let Some(map) = right.as_object() { for (key, value) in map { merged.insert(key.clone(), value.clone()); } } Value::Object(merged) } async fn app_detail(params: Value) -> Result { let app = app_param(¶ms)?; let detail = ApplicationDetail { application: app, relations: Vec::new(), recent_actions: Vec::new(), runtime_info: json!({}), available_actions: vec![ "start".into(), "stop".into(), "restart".into(), "logs".into(), "health".into(), "backup".into(), ], risk_level: "normal".into(), provider_specific_info: json!({}), }; Ok(json!(detail)) } enum ProviderVerb { Start, Stop, Restart, Reload, } async fn provider_action(params: Value, verb: ProviderVerb) -> Result { let app = app_param(¶ms)?; if app.is_system { bail!("系统应用默认只读"); } match app.provider { ApplicationProviderType::Systemd | ApplicationProviderType::LightOpsManaged => { let service = app .service_name .ok_or_else(|| anyhow!("缺少 service_name"))?; let provider = SystemdAppProvider::new(app.agent_id); match verb { ProviderVerb::Start => provider.start(&service).await?, ProviderVerb::Stop => provider.stop(&service).await?, ProviderVerb::Restart => provider.restart(&service).await?, ProviderVerb::Reload => provider.reload(&service).await?, } } ApplicationProviderType::Docker => { let container = app .container_id .ok_or_else(|| anyhow!("缺少 container_id"))?; let provider = DockerAppProvider::new(app.agent_id); match verb { ProviderVerb::Start => provider.start(&container).await?, ProviderVerb::Stop => provider.stop(&container).await?, ProviderVerb::Restart => provider.restart(&container).await?, ProviderVerb::Reload => bail!("Docker 容器不支持重载操作"), } } ApplicationProviderType::DockerCompose => { let project = app .compose_project .ok_or_else(|| anyhow!("缺少 compose_project"))?; let provider = DockerComposeAppProvider::new(app.agent_id); match verb { ProviderVerb::Start => provider.start(&project).await?, ProviderVerb::Stop => provider.stop(&project).await?, ProviderVerb::Restart => provider.restart(&project).await?, ProviderVerb::Reload => bail!("Docker Compose 项目不支持重载操作"), } } ApplicationProviderType::NginxSite => { if matches!(verb, ProviderVerb::Reload) { command_checked(Command::new("systemctl").args(["reload", "nginx"])).await?; } else { bail!("Nginx 站点不支持此操作"); } } _ => bail!("此应用来源不支持该操作"), } Ok(json!({ "ok": true })) } async fn app_logs(params: Value) -> Result { let app = app_param(¶ms)?; let lines = params .get("params") .and_then(|p| p.get("lines")) .and_then(Value::as_u64) .unwrap_or(200) .min(2000) as usize; let logs = match app.provider { ApplicationProviderType::Docker => { let id = app .container_id .ok_or_else(|| anyhow!("缺少 container_id"))?; DockerAppProvider::new(app.agent_id) .logs(&id, lines) .await? } ApplicationProviderType::DockerCompose => { let project = app .compose_project .ok_or_else(|| anyhow!("缺少 compose_project"))?; DockerComposeAppProvider::new(app.agent_id) .logs(&project, lines) .await? } ApplicationProviderType::Systemd | ApplicationProviderType::LightOpsManaged => { let service = app .service_name .ok_or_else(|| anyhow!("缺少 service_name"))?; SystemdAppProvider::new(app.agent_id) .logs(&service, lines) .await? } ApplicationProviderType::NginxSite => { let mut out = Vec::new(); for path in app.log_paths.iter().take(4) { out.extend(tail_file(path, lines / 2).await.unwrap_or_default()); } out } _ => { let mut out = Vec::new(); for path in app.log_paths.iter().take(4) { out.extend(tail_file(path, lines / 2).await.unwrap_or_default()); } out } }; Ok(json!({ "lines": logs, "content": logs.join("\n") })) } async fn app_health(params: Value) -> Result { let app = app_param(¶ms)?; let options = params.get("params").cloned().unwrap_or_else(|| json!({})); let timeout_secs = options .get("timeout_secs") .and_then(Value::as_u64) .unwrap_or(5) .clamp(1, 30); let timeout_duration = Duration::from_secs(timeout_secs); let kind = options .get("kind") .and_then(Value::as_str) .unwrap_or_default() .to_lowercase(); if kind == "http" || options.get("url").and_then(Value::as_str).is_some() || (!kind.eq("tcp") && !app.domains.is_empty()) { return http_health(&app, &options, timeout_secs).await; } tcp_health(&app, &options, timeout_duration).await } async fn app_backup(params: Value) -> Result { let app = app_param(¶ms)?; if app.is_system { bail!("系统应用默认不允许备份"); } let paths = backup_paths(&app); if paths.is_empty() { bail!("当前应用没有可备份路径"); } let backup_dir = Path::new("/var/backups/lightops/apps"); fs::create_dir_all(backup_dir).context("创建应用备份目录失败")?; let timestamp = Utc::now().format("%Y%m%d%H%M%S").to_string(); let archive_name = format!("{}-{timestamp}.tar.gz", safe_app_name(&app.name)?); let archive_path = backup_dir.join(archive_name); let mut cmd = Command::new("tar"); cmd.arg("-czf").arg(&archive_path).arg("-C").arg("/"); for path in &paths { let relative = path .strip_prefix("/") .ok_or_else(|| anyhow!("备份路径必须是绝对路径"))?; cmd.arg(relative); } command_checked(&mut cmd).await?; let size = fs::metadata(&archive_path) .map(|meta| meta.len()) .unwrap_or(0); Ok(json!({ "ok": true, "archive_path": archive_path.to_string_lossy(), "size": size, "paths": paths, })) } fn backup_paths(app: &Application) -> Vec { let mut out = Vec::new(); let mut seen = HashSet::new(); for value in app .install_path .iter() .chain(app.work_dir.iter()) .chain(app.config_paths.iter()) .chain(app.data_paths.iter()) { let path = Path::new(value); if !path.is_absolute() || !path.exists() || value == "/" || value.starts_with("/proc/") { continue; } if seen.insert(value.clone()) { out.push(value.clone()); } } out.truncate(64); out } async fn http_health(app: &Application, options: &Value, timeout_secs: u64) -> Result { let target = options .get("url") .and_then(Value::as_str) .map(ToString::to_string) .or_else(|| app.domains.first().map(|domain| format!("http://{domain}"))) .or_else(|| { app.ports .first() .map(|port| format!("http://127.0.0.1:{port}")) }) .ok_or_else(|| anyhow!("缺少 HTTP 健康检查目标"))?; validate_url_target(&target)?; let mut cmd = Command::new("curl"); cmd.arg("-I") .arg("-L") .arg("-sS") .arg("-o") .arg("/dev/null") .arg("-w") .arg("%{http_code}") .arg("--max-time") .arg(timeout_secs.to_string()) .arg(&target); let start = Instant::now(); let output = tokio::time::timeout(Duration::from_secs(timeout_secs + 2), cmd.output()).await??; let latency_ms = start.elapsed().as_millis() as u64; let status_text = String::from_utf8_lossy(&output.stdout).trim().to_string(); let status_code = status_text.parse::().ok(); let ok = output.status.success() && status_code.is_some_and(|code| (200..500).contains(&code)); let error = if ok { None } else { Some(String::from_utf8_lossy(&output.stderr).trim().to_string()) .filter(|text| !text.is_empty()) .or_else(|| Some("HTTP 健康检查失败".to_string())) }; Ok(json!({ "ok": ok, "kind": "http", "target": target, "latency_ms": latency_ms, "status_code": status_code, "error": error, })) } async fn tcp_health( app: &Application, options: &Value, timeout_duration: Duration, ) -> Result { let host = options .get("host") .and_then(Value::as_str) .unwrap_or("127.0.0.1") .to_string(); validate_host_target(&host)?; let port = options .get("port") .and_then(Value::as_u64) .and_then(|port| u16::try_from(port).ok()) .or_else(|| app.ports.first().copied()) .ok_or_else(|| anyhow!("缺少 TCP 健康检查端口"))?; let target = format!("{host}:{port}"); let start = Instant::now(); let result = tokio::time::timeout(timeout_duration, TcpStream::connect((&host[..], port))).await; let latency_ms = start.elapsed().as_millis() as u64; let (ok, error) = match result { Ok(Ok(_stream)) => (true, None), Ok(Err(err)) => (false, Some(err.to_string())), Err(_) => (false, Some("TCP 健康检查超时".to_string())), }; Ok(json!({ "ok": ok, "kind": "tcp", "target": target, "latency_ms": latency_ms, "status_code": null, "error": error, })) } async fn manage_custom(params: Value, create_service: bool) -> Result { let name = string_param(¶ms, "name")?; let safe_name = safe_app_name(&name)?; let work_dir = string_param(¶ms, "work_dir")?; let run_user = params .get("run_user") .and_then(Value::as_str) .unwrap_or("root") .to_string(); if !Path::new(&work_dir).is_dir() { bail!("工作目录不存在"); } ensure_user_exists(&run_user).await?; let ports = params .get("ports") .and_then(Value::as_array) .map(|items| { items .iter() .filter_map(|v| v.as_u64().map(|n| n as u16)) .collect::>() }) .unwrap_or_default(); let log_paths = params .get("log_paths") .and_then(Value::as_array) .map(|items| { items .iter() .filter_map(|v| v.as_str().map(ToString::to_string)) .collect::>() }) .unwrap_or_default(); let mut service_name = None; let mut generated_service_path = None; if create_service || params .get("create_systemd") .and_then(Value::as_bool) .unwrap_or(false) { let start_command = string_param(¶ms, "start_command")?; validate_start_command(&start_command)?; let unit = format!("lightops-{safe_name}.service"); let path = Path::new("/etc/systemd/system").join(&unit); if path.exists() { bail!("服务已存在"); } let env = params .get("env") .and_then(Value::as_object) .cloned() .unwrap_or_default(); let env_lines = env .iter() .map(|(key, value)| { Ok(format!( "Environment=\"{}={}\"", sanitize_env_key(key)?, value.as_str().unwrap_or_default().replace('"', "\\\"") )) }) .collect::>>()? .join("\n"); let content = format!( "[Unit]\nDescription=LightOps App {name}\nAfter=network.target\n\n[Service]\nType=simple\nWorkingDirectory={work_dir}\nExecStart={start_command}\nRestart=always\nUser={run_user}\n{env_lines}\n\n[Install]\nWantedBy=multi-user.target\n" ); fs::write(&path, content)?; let result = async { command_checked(Command::new("systemctl").arg("daemon-reload")).await?; if params .get("enable") .and_then(Value::as_bool) .unwrap_or(true) { command_checked(Command::new("systemctl").args(["enable", &unit])).await?; } if params.get("start").and_then(Value::as_bool).unwrap_or(true) { command_checked(Command::new("systemctl").args(["start", &unit])).await?; } Ok::<_, anyhow::Error>(()) } .await; if let Err(err) = result { let _ = fs::remove_file(&path); let _ = command_checked(Command::new("systemctl").arg("daemon-reload")).await; return Err(err); } service_name = Some(unit); generated_service_path = Some(path.to_string_lossy().to_string()); } let app = Application { id: format!("lightops:{}", safe_name), agent_id: agent_id(), name: safe_name.clone(), display_name: name, description: params .get("description") .and_then(Value::as_str) .map(ToString::to_string), app_type: ApplicationType::Custom, provider: ApplicationProviderType::LightOpsManaged, status: if params.get("start").and_then(Value::as_bool).unwrap_or(true) { ApplicationStatus::Running } else { ApplicationStatus::Stopped }, version: None, install_path: Some(work_dir.clone()), work_dir: Some(work_dir), config_paths: generated_service_path.iter().cloned().collect(), log_paths, data_paths: Vec::new(), ports, domains: Vec::new(), service_name, container_id: None, compose_project: None, package_name: None, nginx_site: None, run_user: Some(run_user), is_system: false, is_managed: true, is_lightops_managed: true, metadata: json!({ "source": "lightops", "generated_service": generated_service_path }), created_at: None, updated_at: None, }; Ok(json!({ "application": app })) } fn app_param(params: &Value) -> Result { serde_json::from_value( params .get("application") .cloned() .ok_or_else(|| anyhow!("缺少应用数据"))?, ) .context("解析应用数据失败") } struct SystemdAppProvider { agent_id: String, } impl SystemdAppProvider { fn new(agent_id: String) -> Self { Self { agent_id } } } #[async_trait] impl ApplicationProvider for SystemdAppProvider { async fn detect(&self) -> Result { Ok(Path::new("/bin/systemctl").exists() || Path::new("/usr/bin/systemctl").exists()) } async fn list_apps(&self) -> Result> { let output = command_output(Command::new("systemctl").args([ "list-units", "--type=service", "--all", "--no-pager", "--plain", ])) .await?; let mut apps = Vec::new(); for line in output.lines() { if let Some(app) = parse_systemd_line(&self.agent_id, line) { apps.push(app); } } Ok(apps) } async fn get_app(&self, id: &str) -> Result { let app = self .list_apps() .await? .into_iter() .find(|app| app.id == id || app.service_name.as_deref() == Some(id)) .ok_or_else(|| anyhow!("应用不存在"))?; Ok(detail(app)) } async fn start(&self, id: &str) -> Result<()> { validate_unit(id)?; command_checked(Command::new("systemctl").args(["start", id])).await } async fn stop(&self, id: &str) -> Result<()> { validate_unit(id)?; command_checked(Command::new("systemctl").args(["stop", id])).await } async fn restart(&self, id: &str) -> Result<()> { validate_unit(id)?; command_checked(Command::new("systemctl").args(["restart", id])).await } async fn reload(&self, id: &str) -> Result<()> { validate_unit(id)?; command_checked(Command::new("systemctl").args(["reload", id])).await } async fn logs(&self, id: &str, lines: usize) -> Result> { validate_unit(id)?; let line_count = lines.min(2000).to_string(); let output = command_output( Command::new("journalctl") .arg("-u") .arg(id) .arg("-n") .arg(line_count) .arg("--no-pager"), ) .await?; Ok(output.lines().map(ToString::to_string).collect()) } async fn update(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn uninstall(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } } fn parse_systemd_line(agent_id: &str, line: &str) -> Option { let parts = line.split_whitespace().collect::>(); if parts.len() < 4 || !parts[0].ends_with(".service") { return None; } let service = parts[0].to_string(); let base = service.trim_end_matches(".service").to_string(); let description = parts.get(4..).unwrap_or(&[]).join(" "); if !is_manageable_service(&base, &service, &description) { return None; } let is_system = is_core_system_name(&base); Some(Application { id: format!("systemd:{service}"), agent_id: agent_id.to_string(), name: base.clone(), display_name: human_name(&base), description: (!description.is_empty()).then_some(description), app_type: infer_app_type(&base), provider: ApplicationProviderType::Systemd, status: match parts.get(2).copied().unwrap_or_default() { "active" => ApplicationStatus::Running, "failed" => ApplicationStatus::Failed, _ => ApplicationStatus::Stopped, }, version: None, install_path: None, work_dir: None, config_paths: common_config_paths(&base), log_paths: Vec::new(), data_paths: common_data_paths(&base), ports: common_ports(&base), domains: Vec::new(), service_name: Some(service), container_id: None, compose_project: None, package_name: package_for_service(&base), nginx_site: None, run_user: None, is_system, is_managed: true, is_lightops_managed: base.starts_with("lightops-"), metadata: json!({ "source": "systemd", "load": parts.get(1).copied().unwrap_or_default(), "sub": parts.get(3).copied().unwrap_or_default() }), created_at: None, updated_at: None, }) } struct DockerAppProvider { agent_id: String, } impl DockerAppProvider { fn new(agent_id: String) -> Self { Self { agent_id } } } #[async_trait] impl ApplicationProvider for DockerAppProvider { async fn detect(&self) -> Result { Ok(command_status(Command::new("docker").arg("version")) .await .unwrap_or(false)) } async fn list_apps(&self) -> Result> { let output = command_output(Command::new("docker").args(["ps", "-a", "--format", "{{json .}}"])) .await?; Ok(output .lines() .filter_map(|line| parse_docker_app(&self.agent_id, line)) .collect()) } async fn get_app(&self, id: &str) -> Result { let app = self .list_apps() .await? .into_iter() .find(|app| app.id == id || app.container_id.as_deref() == Some(id)) .ok_or_else(|| anyhow!("应用不存在"))?; Ok(detail(app)) } async fn start(&self, id: &str) -> Result<()> { validate_docker_id(id)?; command_checked(Command::new("docker").args(["start", id])).await } async fn stop(&self, id: &str) -> Result<()> { validate_docker_id(id)?; command_checked(Command::new("docker").args(["stop", id])).await } async fn restart(&self, id: &str) -> Result<()> { validate_docker_id(id)?; command_checked(Command::new("docker").args(["restart", id])).await } async fn reload(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn logs(&self, id: &str, lines: usize) -> Result> { validate_docker_id(id)?; let line_count = lines.min(2000).to_string(); let output = command_output( Command::new("docker") .arg("logs") .arg("--tail") .arg(line_count) .arg(id), ) .await?; Ok(output.lines().map(ToString::to_string).collect()) } async fn update(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn uninstall(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } } struct DockerComposeAppProvider { agent_id: String, } impl DockerComposeAppProvider { fn new(agent_id: String) -> Self { Self { agent_id } } } #[async_trait] impl ApplicationProvider for DockerComposeAppProvider { async fn detect(&self) -> Result { Ok(command_status(Command::new("docker").arg("version")) .await .unwrap_or(false)) } async fn list_apps(&self) -> Result> { let output = command_output(Command::new("docker").args(["ps", "-a", "--format", "{{json .}}"])) .await?; Ok(compose_project_apps(&self.agent_id, &output)) } async fn get_app(&self, id: &str) -> Result { let app = self .list_apps() .await? .into_iter() .find(|app| app.id == id || app.compose_project.as_deref() == Some(id)) .ok_or_else(|| anyhow!("应用不存在"))?; Ok(detail(app)) } async fn start(&self, id: &str) -> Result<()> { docker_compose_container_action(id, "start").await } async fn stop(&self, id: &str) -> Result<()> { docker_compose_container_action(id, "stop").await } async fn restart(&self, id: &str) -> Result<()> { docker_compose_container_action(id, "restart").await } async fn reload(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn logs(&self, id: &str, lines: usize) -> Result> { validate_compose_project(id)?; let ids = compose_container_ids(id).await?; let mut out = Vec::new(); let line_count = lines.min(2000).to_string(); for container_id in ids { out.push(format!("===== {container_id} =====")); let output = command_output(Command::new("docker").args([ "logs", "--tail", &line_count, &container_id, ])) .await .unwrap_or_else(|err| err.to_string()); out.extend(output.lines().map(ToString::to_string)); } Ok(out) } async fn update(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn uninstall(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } } fn compose_project_apps(agent_id: &str, output: &str) -> Vec { let mut projects = std::collections::BTreeMap::::new(); for line in output.lines() { let Ok(value) = serde_json::from_str::(line) else { continue; }; let labels = value .get("Labels") .and_then(Value::as_str) .unwrap_or_default(); let Some(project) = docker_label_value(labels, "com.docker.compose.project") else { continue; }; let service = docker_label_value(labels, "com.docker.compose.service").unwrap_or_default(); let status = value .get("Status") .and_then(Value::as_str) .unwrap_or_default(); let ports = parse_port_numbers( value .get("Ports") .and_then(Value::as_str) .unwrap_or_default(), ); let entry = projects.entry(project.clone()).or_default(); entry.running |= status.starts_with("Up"); if !service.is_empty() && !entry.services.contains(&service) { entry.services.push(service); } if let Some(id) = value.get("ID").and_then(Value::as_str) { entry.container_ids.push(id.to_string()); } for port in ports { if !entry.ports.contains(&port) { entry.ports.push(port); } } } projects .into_iter() .map(|(project, acc)| Application { id: format!("compose:{project}"), agent_id: agent_id.to_string(), name: project.clone(), display_name: project.clone(), description: Some(format!( "Docker Compose 项目,服务:{}", if acc.services.is_empty() { "-".into() } else { acc.services.join(", ") } )), app_type: ApplicationType::ComposeProject, provider: ApplicationProviderType::DockerCompose, status: if acc.running { ApplicationStatus::Running } else { ApplicationStatus::Stopped }, version: None, install_path: None, work_dir: None, config_paths: Vec::new(), log_paths: Vec::new(), data_paths: Vec::new(), ports: acc.ports, domains: Vec::new(), service_name: None, container_id: None, compose_project: Some(project.clone()), package_name: None, nginx_site: None, run_user: None, is_system: false, is_managed: true, is_lightops_managed: false, metadata: json!({ "source": "docker-compose", "services": acc.services, "containers": acc.container_ids }), created_at: None, updated_at: None, }) .collect() } #[derive(Default)] struct ComposeAccumulator { running: bool, services: Vec, container_ids: Vec, ports: Vec, } async fn docker_compose_container_action(project: &str, action: &str) -> Result<()> { validate_compose_project(project)?; let ids = compose_container_ids(project).await?; if ids.is_empty() { bail!("未找到 Compose 项目容器"); } let mut cmd = Command::new("docker"); cmd.arg(action); for id in ids { cmd.arg(id); } command_checked(&mut cmd).await } async fn compose_container_ids(project: &str) -> Result> { let label = format!("label=com.docker.compose.project={project}"); let output = command_output(Command::new("docker").args(["ps", "-aq", "--filter", &label])).await?; Ok(output .lines() .map(str::trim) .filter(|line| !line.is_empty()) .map(ToString::to_string) .collect()) } fn docker_label_value(labels: &str, key: &str) -> Option { labels.split(',').find_map(|item| { item.trim() .strip_prefix(&format!("{key}=")) .map(ToString::to_string) }) } fn parse_docker_app(agent_id: &str, line: &str) -> Option { let value = serde_json::from_str::(line).ok()?; let id = value.get("ID").and_then(Value::as_str)?.to_string(); let name = value .get("Names") .and_then(Value::as_str) .unwrap_or(&id) .trim_start_matches('/') .to_string(); let status_text = value .get("Status") .and_then(Value::as_str) .unwrap_or_default(); let labels = value .get("Labels") .and_then(Value::as_str) .unwrap_or_default(); let compose_project = labels.split(',').find_map(|item| { item.strip_prefix("com.docker.compose.project=") .map(ToString::to_string) }); Some(Application { id: format!("docker:{id}"), agent_id: agent_id.to_string(), name: name.clone(), display_name: name, description: value .get("Image") .and_then(Value::as_str) .map(ToString::to_string), app_type: ApplicationType::Container, provider: ApplicationProviderType::Docker, status: if status_text.starts_with("Up") { ApplicationStatus::Running } else { ApplicationStatus::Stopped }, version: None, install_path: None, work_dir: None, config_paths: Vec::new(), log_paths: Vec::new(), data_paths: Vec::new(), ports: parse_port_numbers( value .get("Ports") .and_then(Value::as_str) .unwrap_or_default(), ), domains: Vec::new(), service_name: None, container_id: Some(id), compose_project, package_name: None, nginx_site: None, run_user: None, is_system: false, is_managed: true, is_lightops_managed: false, metadata: value, created_at: None, updated_at: None, }) } struct NginxSiteAppProvider { agent_id: String, } impl NginxSiteAppProvider { fn new(agent_id: String) -> Self { Self { agent_id } } } #[async_trait] impl ApplicationProvider for NginxSiteAppProvider { async fn detect(&self) -> Result { Ok(Path::new(NGINX_AVAILABLE).exists() || Path::new(NGINX_CONF_D).exists()) } async fn list_apps(&self) -> Result> { let mut sites = Vec::new(); for dir in [NGINX_AVAILABLE, NGINX_CONF_D] { if let Ok(entries) = fs::read_dir(dir) { for entry in entries.flatten() { if entry.path().is_file() { sites.push(site_app(&self.agent_id, entry.path())); } } } } Ok(sites) } async fn get_app(&self, id: &str) -> Result { let app = self .list_apps() .await? .into_iter() .find(|app| app.id == id || app.nginx_site.as_deref() == Some(id)) .ok_or_else(|| anyhow!("应用不存在"))?; Ok(detail(app)) } async fn start(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn stop(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn restart(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn reload(&self, _id: &str) -> Result<()> { command_checked(Command::new("systemctl").args(["reload", "nginx"])).await } async fn logs(&self, id: &str, lines: usize) -> Result> { let app = self.get_app(id).await?.application; let mut out = Vec::new(); for path in app.log_paths { out.extend(tail_file(&path, lines / 2).await.unwrap_or_default()); } Ok(out) } async fn update(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn uninstall(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } } fn site_app(agent_id: &str, path: PathBuf) -> Application { let name = path .file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_else(|| "nginx-site".into()); let content = fs::read_to_string(&path).unwrap_or_default(); let domains = parse_nginx_domains(&content); let ports = parse_nginx_listen_ports(&content); let enabled = Path::new(NGINX_ENABLED).join(&name).exists() || path.starts_with(NGINX_CONF_D); Application { id: format!("nginx:{name}"), agent_id: agent_id.to_string(), name: name.clone(), display_name: if domains.is_empty() { name.clone() } else { domains.join(", ") }, description: Some("Nginx site".into()), app_type: if content.contains("proxy_pass") { ApplicationType::ReverseProxy } else { ApplicationType::StaticSite }, provider: ApplicationProviderType::NginxSite, status: if enabled { ApplicationStatus::Enabled } else { ApplicationStatus::Disabled }, version: None, install_path: None, work_dir: parse_nginx_root(&content), config_paths: vec![path.to_string_lossy().to_string()], log_paths: parse_nginx_logs(&content), data_paths: Vec::new(), ports, domains, service_name: Some("nginx.service".into()), container_id: None, compose_project: None, package_name: Some("nginx".into()), nginx_site: Some(name), run_user: None, is_system: false, is_managed: true, is_lightops_managed: false, metadata: json!({ "enabled": enabled, "source": "nginx" }), created_at: None, updated_at: None, } } struct PackageAppProvider { agent_id: String, } impl PackageAppProvider { fn new(agent_id: String) -> Self { Self { agent_id } } } #[async_trait] impl ApplicationProvider for PackageAppProvider { async fn detect(&self) -> Result { Ok(Path::new("/usr/bin/dpkg-query").exists()) } async fn list_apps(&self) -> Result> { let output = command_output(Command::new("dpkg-query").args(["-W", "-f=${Package} ${Version}\n"])) .await?; let allowed = allowed_packages(); Ok(output .lines() .filter_map(|line| { let mut parts = line.split_whitespace(); let package = parts.next()?.to_string(); if !allowed.contains(package.as_str()) { return None; } let version = parts.next().map(ToString::to_string); Some(Application { id: format!("apt:{package}"), agent_id: self.agent_id.clone(), name: package.clone(), display_name: human_name(&package), description: Some("APT package with operational value".into()), app_type: infer_app_type(&package), provider: ApplicationProviderType::Apt, status: ApplicationStatus::Unknown, version, install_path: None, work_dir: None, config_paths: common_config_paths(&package), log_paths: Vec::new(), data_paths: common_data_paths(&package), ports: common_ports(&package), domains: Vec::new(), service_name: service_for_package(&package), container_id: None, compose_project: None, package_name: Some(package), nginx_site: None, run_user: None, is_system: false, is_managed: true, is_lightops_managed: false, metadata: json!({ "source": "apt" }), created_at: None, updated_at: None, }) }) .collect()) } async fn get_app(&self, id: &str) -> Result { let app = self .list_apps() .await? .into_iter() .find(|app| app.id == id || app.package_name.as_deref() == Some(id)) .ok_or_else(|| anyhow!("应用不存在"))?; Ok(detail(app)) } async fn start(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn stop(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn restart(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn reload(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn logs(&self, _id: &str, _lines: usize) -> Result> { bail!("不支持的操作") } async fn update(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn uninstall(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } } struct LightOpsManagedAppProvider { agent_id: String, } impl LightOpsManagedAppProvider { fn new(agent_id: String) -> Self { Self { agent_id } } } #[async_trait] impl ApplicationProvider for LightOpsManagedAppProvider { async fn detect(&self) -> Result { Ok(true) } async fn list_apps(&self) -> Result> { SystemdAppProvider::new(self.agent_id.clone()) .list_apps() .await .map(|apps| { apps.into_iter() .filter(|app| app.is_lightops_managed) .collect() }) } async fn get_app(&self, id: &str) -> Result { SystemdAppProvider::new(self.agent_id.clone()) .get_app(id) .await } async fn start(&self, id: &str) -> Result<()> { SystemdAppProvider::new(self.agent_id.clone()) .start(id) .await } async fn stop(&self, id: &str) -> Result<()> { SystemdAppProvider::new(self.agent_id.clone()) .stop(id) .await } async fn restart(&self, id: &str) -> Result<()> { SystemdAppProvider::new(self.agent_id.clone()) .restart(id) .await } async fn reload(&self, id: &str) -> Result<()> { SystemdAppProvider::new(self.agent_id.clone()) .reload(id) .await } async fn logs(&self, id: &str, lines: usize) -> Result> { SystemdAppProvider::new(self.agent_id.clone()) .logs(id, lines) .await } async fn update(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } async fn uninstall(&self, _id: &str) -> Result<()> { bail!("不支持的操作") } } fn detail(application: Application) -> ApplicationDetail { ApplicationDetail { application, relations: Vec::new(), recent_actions: Vec::new(), runtime_info: json!({}), available_actions: vec![ "start".into(), "stop".into(), "restart".into(), "logs".into(), "health".into(), "backup".into(), ], risk_level: "normal".into(), provider_specific_info: json!({}), } } fn build_relations(apps: &[Application], relations: &mut Vec) { for nginx in apps .iter() .filter(|app| app.provider == ApplicationProviderType::NginxSite) { for port in &nginx.ports { for target in apps.iter().filter(|app| { app.provider != ApplicationProviderType::NginxSite && app.ports.contains(port) }) { relations.push(ApplicationRelation { id: Uuid::new_v4().to_string(), agent_id: nginx.agent_id.clone(), app_id: nginx.id.clone(), relation_type: "proxy_to_port".into(), target_id: Some(target.id.clone()), target_name: Some(target.display_name.clone()), metadata: json!({ "port": port }), }); } } } } async fn enrich_with_ports(apps: &mut [Application]) { let Ok(output) = command_output(Command::new("ss").args(["-tulpen"])).await else { return; }; for line in output.lines() { let Some(port) = line .rsplit(':') .find_map(|part| part.split_whitespace().next()?.parse::().ok()) else { continue; }; for app in apps.iter_mut() { if line.to_lowercase().contains(&app.name.to_lowercase()) && !app.ports.contains(&port) { app.ports.push(port); } } } } async fn tail_file(path: &str, lines: usize) -> Result> { let line_count = lines.min(2000).to_string(); let output = command_output(Command::new("tail").arg("-n").arg(line_count).arg(path)).await?; Ok(output.lines().map(ToString::to_string).collect()) } async fn command_output(cmd: &mut Command) -> Result { let output = tokio::time::timeout(Duration::from_secs(8), cmd.output()).await??; if !output.status.success() { bail!("{}", String::from_utf8_lossy(&output.stderr)); } Ok(String::from_utf8_lossy(&output.stdout).to_string()) } async fn command_checked(cmd: &mut Command) -> Result<()> { command_output(cmd).await.map(|_| ()) } async fn command_status(cmd: &mut Command) -> Result { let output = tokio::time::timeout(Duration::from_secs(5), cmd.output()).await??; Ok(output.status.success()) } async fn ensure_user_exists(user: &str) -> Result<()> { if user.len() > 64 || !user .chars() .all(|c| c.is_ascii_alphanumeric() || "_-".contains(c)) { bail!("运行用户无效"); } command_checked(Command::new("id").arg("-u").arg(user)).await } fn string_param(params: &Value, name: &str) -> Result { params .get(name) .and_then(Value::as_str) .map(ToString::to_string) .ok_or_else(|| anyhow!("缺少参数:{name}")) } fn safe_app_name(name: &str) -> Result { let safe = name .to_lowercase() .chars() .map(|c| { if c.is_ascii_alphanumeric() || c == '-' || c == '_' { c } else { '-' } }) .collect::() .trim_matches('-') .to_string(); if safe.is_empty() || safe.len() > 64 { bail!("应用名称无效"); } Ok(safe) } fn validate_start_command(command: &str) -> Result<()> { if command.len() > 500 || command.contains('\0') || command.contains("&&") || command.contains("||") || command.contains(';') || command.contains('`') || command.contains("$(") { bail!("启动命令不安全"); } Ok(()) } fn validate_url_target(url: &str) -> Result<()> { if url.len() > 500 || url.contains('\0') || url.chars().any(char::is_whitespace) || !(url.starts_with("http://") || url.starts_with("https://")) { bail!("健康检查 URL 无效"); } Ok(()) } fn validate_host_target(host: &str) -> Result<()> { if host.is_empty() || host.len() > 253 || host.contains('\0') || host.chars().any(char::is_whitespace) || host.starts_with('-') { bail!("健康检查主机无效"); } Ok(()) } fn sanitize_env_key(key: &str) -> Result<&str> { if key.is_empty() || key.len() > 80 || !key .chars() .all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_') { bail!("环境变量名无效"); } Ok(key) } fn validate_unit(name: &str) -> Result<()> { if name.len() > 200 || !name .chars() .all(|c| c.is_ascii_alphanumeric() || ".@_-".contains(c)) { bail!("服务名称无效"); } Ok(()) } fn validate_docker_id(id: &str) -> Result<()> { if id.len() > 200 || !id .chars() .all(|c| c.is_ascii_alphanumeric() || ".:/@_-".contains(c)) { bail!("Docker 标识无效"); } Ok(()) } fn validate_compose_project(project: &str) -> Result<()> { if project.is_empty() || project.len() > 128 || !project .chars() .all(|c| c.is_ascii_alphanumeric() || "._-".contains(c)) { bail!("Compose 项目名称无效"); } Ok(()) } fn is_manageable_service(base: &str, service: &str, description: &str) -> bool { base.starts_with("lightops-") || allowed_packages().contains(base) || allowed_packages().contains(service.trim_end_matches(".service")) || description.to_lowercase().contains("docker") || description.to_lowercase().contains("nginx") || description.to_lowercase().contains("redis") } fn is_core_system_name(name: &str) -> bool { let blocked = [ "systemd", "dbus", "udev", "apt", "cron", "ssh", "getty", "networking", "rsyslog", "polkit", ]; blocked .iter() .any(|prefix| name == *prefix || name.starts_with(&format!("{prefix}-"))) } fn allowed_packages() -> HashSet<&'static str> { [ "nginx", "apache2", "caddy", "redis-server", "mysql-server", "mariadb-server", "postgresql", "docker", "docker-ce", "containerd", "fail2ban", "supervisor", "frps", "frpc", "gitea", "alist", "minio", "prometheus", "grafana", ] .into_iter() .collect() } fn infer_app_type(name: &str) -> ApplicationType { match name { "redis-server" | "mysql-server" | "mariadb-server" | "postgresql" => { ApplicationType::Database } "nginx" | "apache2" | "caddy" => ApplicationType::WebApp, "docker" | "docker-ce" | "containerd" => ApplicationType::Runtime, _ => ApplicationType::Service, } } fn human_name(name: &str) -> String { name.split(['-', '_']) .filter(|part| !part.is_empty()) .map(|part| { let mut chars = part.chars(); match chars.next() { Some(first) => first.to_uppercase().collect::() + chars.as_str(), None => String::new(), } }) .collect::>() .join(" ") } fn package_for_service(base: &str) -> Option { allowed_packages().contains(base).then(|| base.to_string()) } fn service_for_package(package: &str) -> Option { Some(match package { "docker-ce" => "docker.service".to_string(), "redis-server" => "redis-server.service".to_string(), _ => format!("{package}.service"), }) } fn common_config_paths(name: &str) -> Vec { match name { "nginx" => vec!["/etc/nginx/nginx.conf".into()], "redis-server" => vec!["/etc/redis/redis.conf".into()], "postgresql" => vec!["/etc/postgresql".into()], "mysql-server" | "mariadb-server" => vec!["/etc/mysql".into()], _ => Vec::new(), } } fn common_data_paths(name: &str) -> Vec { match name { "redis-server" => vec!["/var/lib/redis".into()], "postgresql" => vec!["/var/lib/postgresql".into()], "mysql-server" | "mariadb-server" => vec!["/var/lib/mysql".into()], _ => Vec::new(), } } fn common_ports(name: &str) -> Vec { match name { "nginx" | "apache2" | "caddy" => vec![80, 443], "redis-server" => vec![6379], "mysql-server" | "mariadb-server" => vec![3306], "postgresql" => vec![5432], "prometheus" => vec![9090], "grafana" => vec![3000], _ => Vec::new(), } } fn parse_port_numbers(input: &str) -> Vec { let mut ports = Vec::new(); for token in input.split([',', ' ', '-', '>']) { let cleaned = token .trim() .trim_end_matches("/tcp") .trim_end_matches("/udp"); if let Some(port) = cleaned .rsplit(':') .next() .and_then(|v| v.parse::().ok()) .or_else(|| cleaned.parse::().ok()) { if !ports.contains(&port) { ports.push(port); } } } ports } fn parse_nginx_domains(content: &str) -> Vec { content .lines() .filter_map(|line| line.trim().strip_prefix("server_name")) .flat_map(|line| { line.trim() .trim_end_matches(';') .split_whitespace() .map(ToString::to_string) .collect::>() }) .filter(|domain| domain != "_") .collect() } fn parse_nginx_listen_ports(content: &str) -> Vec { content .lines() .filter_map(|line| line.trim().strip_prefix("listen")) .filter_map(|line| { line.split_whitespace() .next()? .trim_end_matches(';') .parse::() .ok() }) .collect() } fn parse_nginx_root(content: &str) -> Option { content.lines().find_map(|line| { line.trim() .strip_prefix("root") .map(|v| v.trim().trim_end_matches(';').to_string()) }) } fn parse_nginx_logs(content: &str) -> Vec { let mut paths = content .lines() .filter_map(|line| { let line = line.trim(); if line.starts_with("access_log") || line.starts_with("error_log") { line.split_whitespace() .nth(1) .map(|v| v.trim_end_matches(';').to_string()) } else { None } }) .collect::>(); if paths.is_empty() { paths.push("/var/log/nginx/access.log".into()); paths.push("/var/log/nginx/error.log".into()); } paths } fn agent_id() -> String { std::env::var("LIGHTOPS_AGENT_ID").unwrap_or_else(|_| "local".into()) }