1789 lines
56 KiB
Rust
1789 lines
56 KiB
Rust
use anyhow::{anyhow, bail, Context, Result};
|
|
use async_trait::async_trait;
|
|
use chrono::Utc;
|
|
use lightops_common::protocol::{
|
|
Application, ApplicationDetail, ApplicationProviderType, ApplicationRelation,
|
|
ApplicationStatus, ApplicationType,
|
|
};
|
|
use serde_json::{json, Value};
|
|
use std::{
|
|
collections::HashSet,
|
|
fs,
|
|
path::{Path, PathBuf},
|
|
time::Instant,
|
|
};
|
|
use tokio::{net::TcpStream, process::Command, time::Duration};
|
|
use uuid::Uuid;
|
|
|
|
const NGINX_AVAILABLE: &str = "/etc/nginx/sites-available";
|
|
const NGINX_ENABLED: &str = "/etc/nginx/sites-enabled";
|
|
const NGINX_CONF_D: &str = "/etc/nginx/conf.d";
|
|
|
|
#[async_trait]
|
|
#[allow(dead_code)]
|
|
pub trait ApplicationProvider {
|
|
async fn detect(&self) -> Result<bool>;
|
|
async fn list_apps(&self) -> Result<Vec<Application>>;
|
|
async fn get_app(&self, id: &str) -> Result<ApplicationDetail>;
|
|
async fn start(&self, id: &str) -> Result<()>;
|
|
async fn stop(&self, id: &str) -> Result<()>;
|
|
async fn restart(&self, id: &str) -> Result<()>;
|
|
async fn reload(&self, id: &str) -> Result<()>;
|
|
async fn logs(&self, id: &str, lines: usize) -> Result<Vec<String>>;
|
|
async fn update(&self, id: &str) -> Result<()>;
|
|
async fn uninstall(&self, id: &str) -> Result<()>;
|
|
}
|
|
|
|
pub async fn handle(action: &str, params: Value) -> Result<Value> {
|
|
match action {
|
|
"app.discover" | "app.list" => discover().await,
|
|
"app.get" => app_detail(params).await,
|
|
"app.start" => provider_action(params, ProviderVerb::Start).await,
|
|
"app.stop" => provider_action(params, ProviderVerb::Stop).await,
|
|
"app.restart" => provider_action(params, ProviderVerb::Restart).await,
|
|
"app.reload" => provider_action(params, ProviderVerb::Reload).await,
|
|
"app.logs" => app_logs(params).await,
|
|
"app.health" => app_health(params).await,
|
|
"app.update" => bail!("暂不支持操作:应用更新"),
|
|
"app.uninstall" => bail!("暂不支持操作:应用卸载需要先实现备份能力"),
|
|
"app.backup" => app_backup(params).await,
|
|
"app.restore" => bail!("暂不支持操作:应用恢复"),
|
|
"app.manage_custom" => manage_custom(params, false).await,
|
|
"app.create_systemd_service" => manage_custom(params, true).await,
|
|
"app.unmanage" => Ok(json!({ "ok": true })),
|
|
_ => bail!("不支持的操作"),
|
|
}
|
|
}
|
|
|
|
async fn discover() -> Result<Value> {
|
|
let agent_id = agent_id();
|
|
let mut apps = Vec::new();
|
|
let mut relations = Vec::new();
|
|
|
|
let providers: Vec<Box<dyn ApplicationProvider + Send + Sync>> = vec![
|
|
Box::new(SystemdAppProvider::new(agent_id.clone())),
|
|
Box::new(DockerComposeAppProvider::new(agent_id.clone())),
|
|
Box::new(DockerAppProvider::new(agent_id.clone())),
|
|
Box::new(NginxSiteAppProvider::new(agent_id.clone())),
|
|
Box::new(PackageAppProvider::new(agent_id.clone())),
|
|
Box::new(LightOpsManagedAppProvider::new(agent_id.clone())),
|
|
];
|
|
|
|
for provider in providers {
|
|
if provider.detect().await.unwrap_or(false) {
|
|
match provider.list_apps().await {
|
|
Ok(found) => merge_apps(&mut apps, found),
|
|
Err(err) => tracing::debug!(?err, "应用 Provider 执行失败"),
|
|
}
|
|
}
|
|
}
|
|
|
|
enrich_with_ports(&mut apps).await;
|
|
build_relations(&apps, &mut relations);
|
|
|
|
Ok(json!({ "applications": apps, "relations": relations }))
|
|
}
|
|
|
|
fn merge_apps(apps: &mut Vec<Application>, found: Vec<Application>) {
|
|
for mut app in found {
|
|
if let Some(existing) = apps.iter_mut().find(|existing| same_app(existing, &app)) {
|
|
if existing.version.is_none() {
|
|
existing.version = app.version.take();
|
|
}
|
|
if existing.install_path.is_none() {
|
|
existing.install_path = app.install_path.take();
|
|
}
|
|
if existing.work_dir.is_none() {
|
|
existing.work_dir = app.work_dir.take();
|
|
}
|
|
merge_vec(&mut existing.config_paths, app.config_paths);
|
|
merge_vec(&mut existing.log_paths, app.log_paths);
|
|
merge_vec(&mut existing.data_paths, app.data_paths);
|
|
merge_vec(&mut existing.ports, app.ports);
|
|
merge_vec(&mut existing.domains, app.domains);
|
|
existing.package_name = existing.package_name.take().or(app.package_name.take());
|
|
existing.service_name = existing.service_name.take().or(app.service_name.take());
|
|
existing.container_id = existing.container_id.take().or(app.container_id.take());
|
|
existing.nginx_site = existing.nginx_site.take().or(app.nginx_site.take());
|
|
existing.metadata = merge_metadata(existing.metadata.clone(), app.metadata);
|
|
} else {
|
|
apps.push(app);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn same_app(left: &Application, right: &Application) -> bool {
|
|
left.id == right.id
|
|
|| left.name == right.name
|
|
|| (left.service_name.is_some() && left.service_name == right.service_name)
|
|
|| (left.container_id.is_some() && left.container_id == right.container_id)
|
|
|| (left.nginx_site.is_some() && left.nginx_site == right.nginx_site)
|
|
}
|
|
|
|
fn merge_vec<T: Eq + std::hash::Hash + Clone>(target: &mut Vec<T>, incoming: Vec<T>) {
|
|
let mut seen = target.iter().cloned().collect::<HashSet<_>>();
|
|
for item in incoming {
|
|
if seen.insert(item.clone()) {
|
|
target.push(item);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn merge_metadata(left: Value, right: Value) -> Value {
|
|
let mut merged = left.as_object().cloned().unwrap_or_default();
|
|
if let Some(map) = right.as_object() {
|
|
for (key, value) in map {
|
|
merged.insert(key.clone(), value.clone());
|
|
}
|
|
}
|
|
Value::Object(merged)
|
|
}
|
|
|
|
async fn app_detail(params: Value) -> Result<Value> {
|
|
let app = app_param(¶ms)?;
|
|
let detail = ApplicationDetail {
|
|
application: app,
|
|
relations: Vec::new(),
|
|
recent_actions: Vec::new(),
|
|
runtime_info: json!({}),
|
|
available_actions: vec![
|
|
"start".into(),
|
|
"stop".into(),
|
|
"restart".into(),
|
|
"logs".into(),
|
|
"health".into(),
|
|
"backup".into(),
|
|
],
|
|
risk_level: "normal".into(),
|
|
provider_specific_info: json!({}),
|
|
};
|
|
Ok(json!(detail))
|
|
}
|
|
|
|
enum ProviderVerb {
|
|
Start,
|
|
Stop,
|
|
Restart,
|
|
Reload,
|
|
}
|
|
|
|
async fn provider_action(params: Value, verb: ProviderVerb) -> Result<Value> {
|
|
let app = app_param(¶ms)?;
|
|
if app.is_system {
|
|
bail!("系统应用默认只读");
|
|
}
|
|
match app.provider {
|
|
ApplicationProviderType::Systemd | ApplicationProviderType::LightOpsManaged => {
|
|
let service = app
|
|
.service_name
|
|
.ok_or_else(|| anyhow!("缺少 service_name"))?;
|
|
let provider = SystemdAppProvider::new(app.agent_id);
|
|
match verb {
|
|
ProviderVerb::Start => provider.start(&service).await?,
|
|
ProviderVerb::Stop => provider.stop(&service).await?,
|
|
ProviderVerb::Restart => provider.restart(&service).await?,
|
|
ProviderVerb::Reload => provider.reload(&service).await?,
|
|
}
|
|
}
|
|
ApplicationProviderType::Docker => {
|
|
let container = app
|
|
.container_id
|
|
.ok_or_else(|| anyhow!("缺少 container_id"))?;
|
|
let provider = DockerAppProvider::new(app.agent_id);
|
|
match verb {
|
|
ProviderVerb::Start => provider.start(&container).await?,
|
|
ProviderVerb::Stop => provider.stop(&container).await?,
|
|
ProviderVerb::Restart => provider.restart(&container).await?,
|
|
ProviderVerb::Reload => bail!("Docker 容器不支持重载操作"),
|
|
}
|
|
}
|
|
ApplicationProviderType::DockerCompose => {
|
|
let project = app
|
|
.compose_project
|
|
.ok_or_else(|| anyhow!("缺少 compose_project"))?;
|
|
let provider = DockerComposeAppProvider::new(app.agent_id);
|
|
match verb {
|
|
ProviderVerb::Start => provider.start(&project).await?,
|
|
ProviderVerb::Stop => provider.stop(&project).await?,
|
|
ProviderVerb::Restart => provider.restart(&project).await?,
|
|
ProviderVerb::Reload => bail!("Docker Compose 项目不支持重载操作"),
|
|
}
|
|
}
|
|
ApplicationProviderType::NginxSite => {
|
|
if matches!(verb, ProviderVerb::Reload) {
|
|
command_checked(Command::new("systemctl").args(["reload", "nginx"])).await?;
|
|
} else {
|
|
bail!("Nginx 站点不支持此操作");
|
|
}
|
|
}
|
|
_ => bail!("此应用来源不支持该操作"),
|
|
}
|
|
Ok(json!({ "ok": true }))
|
|
}
|
|
|
|
async fn app_logs(params: Value) -> Result<Value> {
|
|
let app = app_param(¶ms)?;
|
|
let lines = params
|
|
.get("params")
|
|
.and_then(|p| p.get("lines"))
|
|
.and_then(Value::as_u64)
|
|
.unwrap_or(200)
|
|
.min(2000) as usize;
|
|
let logs = match app.provider {
|
|
ApplicationProviderType::Docker => {
|
|
let id = app
|
|
.container_id
|
|
.ok_or_else(|| anyhow!("缺少 container_id"))?;
|
|
DockerAppProvider::new(app.agent_id)
|
|
.logs(&id, lines)
|
|
.await?
|
|
}
|
|
ApplicationProviderType::DockerCompose => {
|
|
let project = app
|
|
.compose_project
|
|
.ok_or_else(|| anyhow!("缺少 compose_project"))?;
|
|
DockerComposeAppProvider::new(app.agent_id)
|
|
.logs(&project, lines)
|
|
.await?
|
|
}
|
|
ApplicationProviderType::Systemd | ApplicationProviderType::LightOpsManaged => {
|
|
let service = app
|
|
.service_name
|
|
.ok_or_else(|| anyhow!("缺少 service_name"))?;
|
|
SystemdAppProvider::new(app.agent_id)
|
|
.logs(&service, lines)
|
|
.await?
|
|
}
|
|
ApplicationProviderType::NginxSite => {
|
|
let mut out = Vec::new();
|
|
for path in app.log_paths.iter().take(4) {
|
|
out.extend(tail_file(path, lines / 2).await.unwrap_or_default());
|
|
}
|
|
out
|
|
}
|
|
_ => {
|
|
let mut out = Vec::new();
|
|
for path in app.log_paths.iter().take(4) {
|
|
out.extend(tail_file(path, lines / 2).await.unwrap_or_default());
|
|
}
|
|
out
|
|
}
|
|
};
|
|
Ok(json!({ "lines": logs, "content": logs.join("\n") }))
|
|
}
|
|
|
|
async fn app_health(params: Value) -> Result<Value> {
|
|
let app = app_param(¶ms)?;
|
|
let options = params.get("params").cloned().unwrap_or_else(|| json!({}));
|
|
let timeout_secs = options
|
|
.get("timeout_secs")
|
|
.and_then(Value::as_u64)
|
|
.unwrap_or(5)
|
|
.clamp(1, 30);
|
|
let timeout_duration = Duration::from_secs(timeout_secs);
|
|
let kind = options
|
|
.get("kind")
|
|
.and_then(Value::as_str)
|
|
.unwrap_or_default()
|
|
.to_lowercase();
|
|
|
|
if kind == "http"
|
|
|| options.get("url").and_then(Value::as_str).is_some()
|
|
|| (!kind.eq("tcp") && !app.domains.is_empty())
|
|
{
|
|
return http_health(&app, &options, timeout_secs).await;
|
|
}
|
|
|
|
tcp_health(&app, &options, timeout_duration).await
|
|
}
|
|
|
|
async fn app_backup(params: Value) -> Result<Value> {
|
|
let app = app_param(¶ms)?;
|
|
if app.is_system {
|
|
bail!("系统应用默认不允许备份");
|
|
}
|
|
let paths = backup_paths(&app);
|
|
if paths.is_empty() {
|
|
bail!("当前应用没有可备份路径");
|
|
}
|
|
let backup_dir = Path::new("/var/backups/lightops/apps");
|
|
fs::create_dir_all(backup_dir).context("创建应用备份目录失败")?;
|
|
let timestamp = Utc::now().format("%Y%m%d%H%M%S").to_string();
|
|
let archive_name = format!("{}-{timestamp}.tar.gz", safe_app_name(&app.name)?);
|
|
let archive_path = backup_dir.join(archive_name);
|
|
|
|
let mut cmd = Command::new("tar");
|
|
cmd.arg("-czf").arg(&archive_path).arg("-C").arg("/");
|
|
for path in &paths {
|
|
let relative = path
|
|
.strip_prefix("/")
|
|
.ok_or_else(|| anyhow!("备份路径必须是绝对路径"))?;
|
|
cmd.arg(relative);
|
|
}
|
|
command_checked(&mut cmd).await?;
|
|
let size = fs::metadata(&archive_path)
|
|
.map(|meta| meta.len())
|
|
.unwrap_or(0);
|
|
Ok(json!({
|
|
"ok": true,
|
|
"archive_path": archive_path.to_string_lossy(),
|
|
"size": size,
|
|
"paths": paths,
|
|
}))
|
|
}
|
|
|
|
fn backup_paths(app: &Application) -> Vec<String> {
|
|
let mut out = Vec::new();
|
|
let mut seen = HashSet::new();
|
|
for value in app
|
|
.install_path
|
|
.iter()
|
|
.chain(app.work_dir.iter())
|
|
.chain(app.config_paths.iter())
|
|
.chain(app.data_paths.iter())
|
|
{
|
|
let path = Path::new(value);
|
|
if !path.is_absolute() || !path.exists() || value == "/" || value.starts_with("/proc/") {
|
|
continue;
|
|
}
|
|
if seen.insert(value.clone()) {
|
|
out.push(value.clone());
|
|
}
|
|
}
|
|
out.truncate(64);
|
|
out
|
|
}
|
|
|
|
async fn http_health(app: &Application, options: &Value, timeout_secs: u64) -> Result<Value> {
|
|
let target = options
|
|
.get("url")
|
|
.and_then(Value::as_str)
|
|
.map(ToString::to_string)
|
|
.or_else(|| app.domains.first().map(|domain| format!("http://{domain}")))
|
|
.or_else(|| {
|
|
app.ports
|
|
.first()
|
|
.map(|port| format!("http://127.0.0.1:{port}"))
|
|
})
|
|
.ok_or_else(|| anyhow!("缺少 HTTP 健康检查目标"))?;
|
|
validate_url_target(&target)?;
|
|
|
|
let mut cmd = Command::new("curl");
|
|
cmd.arg("-I")
|
|
.arg("-L")
|
|
.arg("-sS")
|
|
.arg("-o")
|
|
.arg("/dev/null")
|
|
.arg("-w")
|
|
.arg("%{http_code}")
|
|
.arg("--max-time")
|
|
.arg(timeout_secs.to_string())
|
|
.arg(&target);
|
|
|
|
let start = Instant::now();
|
|
let output =
|
|
tokio::time::timeout(Duration::from_secs(timeout_secs + 2), cmd.output()).await??;
|
|
let latency_ms = start.elapsed().as_millis() as u64;
|
|
let status_text = String::from_utf8_lossy(&output.stdout).trim().to_string();
|
|
let status_code = status_text.parse::<u16>().ok();
|
|
let ok = output.status.success() && status_code.is_some_and(|code| (200..500).contains(&code));
|
|
let error = if ok {
|
|
None
|
|
} else {
|
|
Some(String::from_utf8_lossy(&output.stderr).trim().to_string())
|
|
.filter(|text| !text.is_empty())
|
|
.or_else(|| Some("HTTP 健康检查失败".to_string()))
|
|
};
|
|
|
|
Ok(json!({
|
|
"ok": ok,
|
|
"kind": "http",
|
|
"target": target,
|
|
"latency_ms": latency_ms,
|
|
"status_code": status_code,
|
|
"error": error,
|
|
}))
|
|
}
|
|
|
|
async fn tcp_health(
|
|
app: &Application,
|
|
options: &Value,
|
|
timeout_duration: Duration,
|
|
) -> Result<Value> {
|
|
let host = options
|
|
.get("host")
|
|
.and_then(Value::as_str)
|
|
.unwrap_or("127.0.0.1")
|
|
.to_string();
|
|
validate_host_target(&host)?;
|
|
let port = options
|
|
.get("port")
|
|
.and_then(Value::as_u64)
|
|
.and_then(|port| u16::try_from(port).ok())
|
|
.or_else(|| app.ports.first().copied())
|
|
.ok_or_else(|| anyhow!("缺少 TCP 健康检查端口"))?;
|
|
|
|
let target = format!("{host}:{port}");
|
|
let start = Instant::now();
|
|
let result =
|
|
tokio::time::timeout(timeout_duration, TcpStream::connect((&host[..], port))).await;
|
|
let latency_ms = start.elapsed().as_millis() as u64;
|
|
let (ok, error) = match result {
|
|
Ok(Ok(_stream)) => (true, None),
|
|
Ok(Err(err)) => (false, Some(err.to_string())),
|
|
Err(_) => (false, Some("TCP 健康检查超时".to_string())),
|
|
};
|
|
|
|
Ok(json!({
|
|
"ok": ok,
|
|
"kind": "tcp",
|
|
"target": target,
|
|
"latency_ms": latency_ms,
|
|
"status_code": null,
|
|
"error": error,
|
|
}))
|
|
}
|
|
|
|
async fn manage_custom(params: Value, create_service: bool) -> Result<Value> {
|
|
let name = string_param(¶ms, "name")?;
|
|
let safe_name = safe_app_name(&name)?;
|
|
let work_dir = string_param(¶ms, "work_dir")?;
|
|
let run_user = params
|
|
.get("run_user")
|
|
.and_then(Value::as_str)
|
|
.unwrap_or("root")
|
|
.to_string();
|
|
if !Path::new(&work_dir).is_dir() {
|
|
bail!("工作目录不存在");
|
|
}
|
|
ensure_user_exists(&run_user).await?;
|
|
let ports = params
|
|
.get("ports")
|
|
.and_then(Value::as_array)
|
|
.map(|items| {
|
|
items
|
|
.iter()
|
|
.filter_map(|v| v.as_u64().map(|n| n as u16))
|
|
.collect::<Vec<_>>()
|
|
})
|
|
.unwrap_or_default();
|
|
let log_paths = params
|
|
.get("log_paths")
|
|
.and_then(Value::as_array)
|
|
.map(|items| {
|
|
items
|
|
.iter()
|
|
.filter_map(|v| v.as_str().map(ToString::to_string))
|
|
.collect::<Vec<_>>()
|
|
})
|
|
.unwrap_or_default();
|
|
|
|
let mut service_name = None;
|
|
let mut generated_service_path = None;
|
|
if create_service
|
|
|| params
|
|
.get("create_systemd")
|
|
.and_then(Value::as_bool)
|
|
.unwrap_or(false)
|
|
{
|
|
let start_command = string_param(¶ms, "start_command")?;
|
|
validate_start_command(&start_command)?;
|
|
let unit = format!("lightops-{safe_name}.service");
|
|
let path = Path::new("/etc/systemd/system").join(&unit);
|
|
if path.exists() {
|
|
bail!("服务已存在");
|
|
}
|
|
let env = params
|
|
.get("env")
|
|
.and_then(Value::as_object)
|
|
.cloned()
|
|
.unwrap_or_default();
|
|
let env_lines = env
|
|
.iter()
|
|
.map(|(key, value)| {
|
|
Ok(format!(
|
|
"Environment=\"{}={}\"",
|
|
sanitize_env_key(key)?,
|
|
value.as_str().unwrap_or_default().replace('"', "\\\"")
|
|
))
|
|
})
|
|
.collect::<Result<Vec<_>>>()?
|
|
.join("\n");
|
|
let content = format!(
|
|
"[Unit]\nDescription=LightOps App {name}\nAfter=network.target\n\n[Service]\nType=simple\nWorkingDirectory={work_dir}\nExecStart={start_command}\nRestart=always\nUser={run_user}\n{env_lines}\n\n[Install]\nWantedBy=multi-user.target\n"
|
|
);
|
|
fs::write(&path, content)?;
|
|
let result = async {
|
|
command_checked(Command::new("systemctl").arg("daemon-reload")).await?;
|
|
if params
|
|
.get("enable")
|
|
.and_then(Value::as_bool)
|
|
.unwrap_or(true)
|
|
{
|
|
command_checked(Command::new("systemctl").args(["enable", &unit])).await?;
|
|
}
|
|
if params.get("start").and_then(Value::as_bool).unwrap_or(true) {
|
|
command_checked(Command::new("systemctl").args(["start", &unit])).await?;
|
|
}
|
|
Ok::<_, anyhow::Error>(())
|
|
}
|
|
.await;
|
|
if let Err(err) = result {
|
|
let _ = fs::remove_file(&path);
|
|
let _ = command_checked(Command::new("systemctl").arg("daemon-reload")).await;
|
|
return Err(err);
|
|
}
|
|
service_name = Some(unit);
|
|
generated_service_path = Some(path.to_string_lossy().to_string());
|
|
}
|
|
|
|
let app = Application {
|
|
id: format!("lightops:{}", safe_name),
|
|
agent_id: agent_id(),
|
|
name: safe_name.clone(),
|
|
display_name: name,
|
|
description: params
|
|
.get("description")
|
|
.and_then(Value::as_str)
|
|
.map(ToString::to_string),
|
|
app_type: ApplicationType::Custom,
|
|
provider: ApplicationProviderType::LightOpsManaged,
|
|
status: if params.get("start").and_then(Value::as_bool).unwrap_or(true) {
|
|
ApplicationStatus::Running
|
|
} else {
|
|
ApplicationStatus::Stopped
|
|
},
|
|
version: None,
|
|
install_path: Some(work_dir.clone()),
|
|
work_dir: Some(work_dir),
|
|
config_paths: generated_service_path.iter().cloned().collect(),
|
|
log_paths,
|
|
data_paths: Vec::new(),
|
|
ports,
|
|
domains: Vec::new(),
|
|
service_name,
|
|
container_id: None,
|
|
compose_project: None,
|
|
package_name: None,
|
|
nginx_site: None,
|
|
run_user: Some(run_user),
|
|
is_system: false,
|
|
is_managed: true,
|
|
is_lightops_managed: true,
|
|
metadata: json!({ "source": "lightops", "generated_service": generated_service_path }),
|
|
created_at: None,
|
|
updated_at: None,
|
|
};
|
|
Ok(json!({ "application": app }))
|
|
}
|
|
|
|
fn app_param(params: &Value) -> Result<Application> {
|
|
serde_json::from_value(
|
|
params
|
|
.get("application")
|
|
.cloned()
|
|
.ok_or_else(|| anyhow!("缺少应用数据"))?,
|
|
)
|
|
.context("解析应用数据失败")
|
|
}
|
|
|
|
struct SystemdAppProvider {
|
|
agent_id: String,
|
|
}
|
|
|
|
impl SystemdAppProvider {
|
|
fn new(agent_id: String) -> Self {
|
|
Self { agent_id }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ApplicationProvider for SystemdAppProvider {
|
|
async fn detect(&self) -> Result<bool> {
|
|
Ok(Path::new("/bin/systemctl").exists() || Path::new("/usr/bin/systemctl").exists())
|
|
}
|
|
|
|
async fn list_apps(&self) -> Result<Vec<Application>> {
|
|
let output = command_output(Command::new("systemctl").args([
|
|
"list-units",
|
|
"--type=service",
|
|
"--all",
|
|
"--no-pager",
|
|
"--plain",
|
|
]))
|
|
.await?;
|
|
let mut apps = Vec::new();
|
|
for line in output.lines() {
|
|
if let Some(app) = parse_systemd_line(&self.agent_id, line) {
|
|
apps.push(app);
|
|
}
|
|
}
|
|
Ok(apps)
|
|
}
|
|
|
|
async fn get_app(&self, id: &str) -> Result<ApplicationDetail> {
|
|
let app = self
|
|
.list_apps()
|
|
.await?
|
|
.into_iter()
|
|
.find(|app| app.id == id || app.service_name.as_deref() == Some(id))
|
|
.ok_or_else(|| anyhow!("应用不存在"))?;
|
|
Ok(detail(app))
|
|
}
|
|
|
|
async fn start(&self, id: &str) -> Result<()> {
|
|
validate_unit(id)?;
|
|
command_checked(Command::new("systemctl").args(["start", id])).await
|
|
}
|
|
|
|
async fn stop(&self, id: &str) -> Result<()> {
|
|
validate_unit(id)?;
|
|
command_checked(Command::new("systemctl").args(["stop", id])).await
|
|
}
|
|
|
|
async fn restart(&self, id: &str) -> Result<()> {
|
|
validate_unit(id)?;
|
|
command_checked(Command::new("systemctl").args(["restart", id])).await
|
|
}
|
|
|
|
async fn reload(&self, id: &str) -> Result<()> {
|
|
validate_unit(id)?;
|
|
command_checked(Command::new("systemctl").args(["reload", id])).await
|
|
}
|
|
|
|
async fn logs(&self, id: &str, lines: usize) -> Result<Vec<String>> {
|
|
validate_unit(id)?;
|
|
let line_count = lines.min(2000).to_string();
|
|
let output = command_output(
|
|
Command::new("journalctl")
|
|
.arg("-u")
|
|
.arg(id)
|
|
.arg("-n")
|
|
.arg(line_count)
|
|
.arg("--no-pager"),
|
|
)
|
|
.await?;
|
|
Ok(output.lines().map(ToString::to_string).collect())
|
|
}
|
|
|
|
async fn update(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn uninstall(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
}
|
|
|
|
fn parse_systemd_line(agent_id: &str, line: &str) -> Option<Application> {
|
|
let parts = line.split_whitespace().collect::<Vec<_>>();
|
|
if parts.len() < 4 || !parts[0].ends_with(".service") {
|
|
return None;
|
|
}
|
|
let service = parts[0].to_string();
|
|
let base = service.trim_end_matches(".service").to_string();
|
|
let description = parts.get(4..).unwrap_or(&[]).join(" ");
|
|
if !is_manageable_service(&base, &service, &description) {
|
|
return None;
|
|
}
|
|
let is_system = is_core_system_name(&base);
|
|
Some(Application {
|
|
id: format!("systemd:{service}"),
|
|
agent_id: agent_id.to_string(),
|
|
name: base.clone(),
|
|
display_name: human_name(&base),
|
|
description: (!description.is_empty()).then_some(description),
|
|
app_type: infer_app_type(&base),
|
|
provider: ApplicationProviderType::Systemd,
|
|
status: match parts.get(2).copied().unwrap_or_default() {
|
|
"active" => ApplicationStatus::Running,
|
|
"failed" => ApplicationStatus::Failed,
|
|
_ => ApplicationStatus::Stopped,
|
|
},
|
|
version: None,
|
|
install_path: None,
|
|
work_dir: None,
|
|
config_paths: common_config_paths(&base),
|
|
log_paths: Vec::new(),
|
|
data_paths: common_data_paths(&base),
|
|
ports: common_ports(&base),
|
|
domains: Vec::new(),
|
|
service_name: Some(service),
|
|
container_id: None,
|
|
compose_project: None,
|
|
package_name: package_for_service(&base),
|
|
nginx_site: None,
|
|
run_user: None,
|
|
is_system,
|
|
is_managed: true,
|
|
is_lightops_managed: base.starts_with("lightops-"),
|
|
metadata: json!({ "source": "systemd", "load": parts.get(1).copied().unwrap_or_default(), "sub": parts.get(3).copied().unwrap_or_default() }),
|
|
created_at: None,
|
|
updated_at: None,
|
|
})
|
|
}
|
|
|
|
struct DockerAppProvider {
|
|
agent_id: String,
|
|
}
|
|
|
|
impl DockerAppProvider {
|
|
fn new(agent_id: String) -> Self {
|
|
Self { agent_id }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ApplicationProvider for DockerAppProvider {
|
|
async fn detect(&self) -> Result<bool> {
|
|
Ok(command_status(Command::new("docker").arg("version"))
|
|
.await
|
|
.unwrap_or(false))
|
|
}
|
|
|
|
async fn list_apps(&self) -> Result<Vec<Application>> {
|
|
let output =
|
|
command_output(Command::new("docker").args(["ps", "-a", "--format", "{{json .}}"]))
|
|
.await?;
|
|
Ok(output
|
|
.lines()
|
|
.filter_map(|line| parse_docker_app(&self.agent_id, line))
|
|
.collect())
|
|
}
|
|
|
|
async fn get_app(&self, id: &str) -> Result<ApplicationDetail> {
|
|
let app = self
|
|
.list_apps()
|
|
.await?
|
|
.into_iter()
|
|
.find(|app| app.id == id || app.container_id.as_deref() == Some(id))
|
|
.ok_or_else(|| anyhow!("应用不存在"))?;
|
|
Ok(detail(app))
|
|
}
|
|
|
|
async fn start(&self, id: &str) -> Result<()> {
|
|
validate_docker_id(id)?;
|
|
command_checked(Command::new("docker").args(["start", id])).await
|
|
}
|
|
|
|
async fn stop(&self, id: &str) -> Result<()> {
|
|
validate_docker_id(id)?;
|
|
command_checked(Command::new("docker").args(["stop", id])).await
|
|
}
|
|
|
|
async fn restart(&self, id: &str) -> Result<()> {
|
|
validate_docker_id(id)?;
|
|
command_checked(Command::new("docker").args(["restart", id])).await
|
|
}
|
|
|
|
async fn reload(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn logs(&self, id: &str, lines: usize) -> Result<Vec<String>> {
|
|
validate_docker_id(id)?;
|
|
let line_count = lines.min(2000).to_string();
|
|
let output = command_output(
|
|
Command::new("docker")
|
|
.arg("logs")
|
|
.arg("--tail")
|
|
.arg(line_count)
|
|
.arg(id),
|
|
)
|
|
.await?;
|
|
Ok(output.lines().map(ToString::to_string).collect())
|
|
}
|
|
|
|
async fn update(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn uninstall(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
}
|
|
|
|
struct DockerComposeAppProvider {
|
|
agent_id: String,
|
|
}
|
|
|
|
impl DockerComposeAppProvider {
|
|
fn new(agent_id: String) -> Self {
|
|
Self { agent_id }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ApplicationProvider for DockerComposeAppProvider {
|
|
async fn detect(&self) -> Result<bool> {
|
|
Ok(command_status(Command::new("docker").arg("version"))
|
|
.await
|
|
.unwrap_or(false))
|
|
}
|
|
|
|
async fn list_apps(&self) -> Result<Vec<Application>> {
|
|
let output =
|
|
command_output(Command::new("docker").args(["ps", "-a", "--format", "{{json .}}"]))
|
|
.await?;
|
|
Ok(compose_project_apps(&self.agent_id, &output))
|
|
}
|
|
|
|
async fn get_app(&self, id: &str) -> Result<ApplicationDetail> {
|
|
let app = self
|
|
.list_apps()
|
|
.await?
|
|
.into_iter()
|
|
.find(|app| app.id == id || app.compose_project.as_deref() == Some(id))
|
|
.ok_or_else(|| anyhow!("应用不存在"))?;
|
|
Ok(detail(app))
|
|
}
|
|
|
|
async fn start(&self, id: &str) -> Result<()> {
|
|
docker_compose_container_action(id, "start").await
|
|
}
|
|
|
|
async fn stop(&self, id: &str) -> Result<()> {
|
|
docker_compose_container_action(id, "stop").await
|
|
}
|
|
|
|
async fn restart(&self, id: &str) -> Result<()> {
|
|
docker_compose_container_action(id, "restart").await
|
|
}
|
|
|
|
async fn reload(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn logs(&self, id: &str, lines: usize) -> Result<Vec<String>> {
|
|
validate_compose_project(id)?;
|
|
let ids = compose_container_ids(id).await?;
|
|
let mut out = Vec::new();
|
|
let line_count = lines.min(2000).to_string();
|
|
for container_id in ids {
|
|
out.push(format!("===== {container_id} ====="));
|
|
let output = command_output(Command::new("docker").args([
|
|
"logs",
|
|
"--tail",
|
|
&line_count,
|
|
&container_id,
|
|
]))
|
|
.await
|
|
.unwrap_or_else(|err| err.to_string());
|
|
out.extend(output.lines().map(ToString::to_string));
|
|
}
|
|
Ok(out)
|
|
}
|
|
|
|
async fn update(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn uninstall(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
}
|
|
|
|
fn compose_project_apps(agent_id: &str, output: &str) -> Vec<Application> {
|
|
let mut projects = std::collections::BTreeMap::<String, ComposeAccumulator>::new();
|
|
for line in output.lines() {
|
|
let Ok(value) = serde_json::from_str::<Value>(line) else {
|
|
continue;
|
|
};
|
|
let labels = value
|
|
.get("Labels")
|
|
.and_then(Value::as_str)
|
|
.unwrap_or_default();
|
|
let Some(project) = docker_label_value(labels, "com.docker.compose.project") else {
|
|
continue;
|
|
};
|
|
let service = docker_label_value(labels, "com.docker.compose.service").unwrap_or_default();
|
|
let status = value
|
|
.get("Status")
|
|
.and_then(Value::as_str)
|
|
.unwrap_or_default();
|
|
let ports = parse_port_numbers(
|
|
value
|
|
.get("Ports")
|
|
.and_then(Value::as_str)
|
|
.unwrap_or_default(),
|
|
);
|
|
let entry = projects.entry(project.clone()).or_default();
|
|
entry.running |= status.starts_with("Up");
|
|
if !service.is_empty() && !entry.services.contains(&service) {
|
|
entry.services.push(service);
|
|
}
|
|
if let Some(id) = value.get("ID").and_then(Value::as_str) {
|
|
entry.container_ids.push(id.to_string());
|
|
}
|
|
for port in ports {
|
|
if !entry.ports.contains(&port) {
|
|
entry.ports.push(port);
|
|
}
|
|
}
|
|
}
|
|
projects
|
|
.into_iter()
|
|
.map(|(project, acc)| Application {
|
|
id: format!("compose:{project}"),
|
|
agent_id: agent_id.to_string(),
|
|
name: project.clone(),
|
|
display_name: project.clone(),
|
|
description: Some(format!(
|
|
"Docker Compose 项目,服务:{}",
|
|
if acc.services.is_empty() {
|
|
"-".into()
|
|
} else {
|
|
acc.services.join(", ")
|
|
}
|
|
)),
|
|
app_type: ApplicationType::ComposeProject,
|
|
provider: ApplicationProviderType::DockerCompose,
|
|
status: if acc.running {
|
|
ApplicationStatus::Running
|
|
} else {
|
|
ApplicationStatus::Stopped
|
|
},
|
|
version: None,
|
|
install_path: None,
|
|
work_dir: None,
|
|
config_paths: Vec::new(),
|
|
log_paths: Vec::new(),
|
|
data_paths: Vec::new(),
|
|
ports: acc.ports,
|
|
domains: Vec::new(),
|
|
service_name: None,
|
|
container_id: None,
|
|
compose_project: Some(project.clone()),
|
|
package_name: None,
|
|
nginx_site: None,
|
|
run_user: None,
|
|
is_system: false,
|
|
is_managed: true,
|
|
is_lightops_managed: false,
|
|
metadata: json!({ "source": "docker-compose", "services": acc.services, "containers": acc.container_ids }),
|
|
created_at: None,
|
|
updated_at: None,
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct ComposeAccumulator {
|
|
running: bool,
|
|
services: Vec<String>,
|
|
container_ids: Vec<String>,
|
|
ports: Vec<u16>,
|
|
}
|
|
|
|
async fn docker_compose_container_action(project: &str, action: &str) -> Result<()> {
|
|
validate_compose_project(project)?;
|
|
let ids = compose_container_ids(project).await?;
|
|
if ids.is_empty() {
|
|
bail!("未找到 Compose 项目容器");
|
|
}
|
|
let mut cmd = Command::new("docker");
|
|
cmd.arg(action);
|
|
for id in ids {
|
|
cmd.arg(id);
|
|
}
|
|
command_checked(&mut cmd).await
|
|
}
|
|
|
|
async fn compose_container_ids(project: &str) -> Result<Vec<String>> {
|
|
let label = format!("label=com.docker.compose.project={project}");
|
|
let output =
|
|
command_output(Command::new("docker").args(["ps", "-aq", "--filter", &label])).await?;
|
|
Ok(output
|
|
.lines()
|
|
.map(str::trim)
|
|
.filter(|line| !line.is_empty())
|
|
.map(ToString::to_string)
|
|
.collect())
|
|
}
|
|
|
|
fn docker_label_value(labels: &str, key: &str) -> Option<String> {
|
|
labels.split(',').find_map(|item| {
|
|
item.trim()
|
|
.strip_prefix(&format!("{key}="))
|
|
.map(ToString::to_string)
|
|
})
|
|
}
|
|
|
|
fn parse_docker_app(agent_id: &str, line: &str) -> Option<Application> {
|
|
let value = serde_json::from_str::<Value>(line).ok()?;
|
|
let id = value.get("ID").and_then(Value::as_str)?.to_string();
|
|
let name = value
|
|
.get("Names")
|
|
.and_then(Value::as_str)
|
|
.unwrap_or(&id)
|
|
.trim_start_matches('/')
|
|
.to_string();
|
|
let status_text = value
|
|
.get("Status")
|
|
.and_then(Value::as_str)
|
|
.unwrap_or_default();
|
|
let labels = value
|
|
.get("Labels")
|
|
.and_then(Value::as_str)
|
|
.unwrap_or_default();
|
|
let compose_project = labels.split(',').find_map(|item| {
|
|
item.strip_prefix("com.docker.compose.project=")
|
|
.map(ToString::to_string)
|
|
});
|
|
Some(Application {
|
|
id: format!("docker:{id}"),
|
|
agent_id: agent_id.to_string(),
|
|
name: name.clone(),
|
|
display_name: name,
|
|
description: value
|
|
.get("Image")
|
|
.and_then(Value::as_str)
|
|
.map(ToString::to_string),
|
|
app_type: ApplicationType::Container,
|
|
provider: ApplicationProviderType::Docker,
|
|
status: if status_text.starts_with("Up") {
|
|
ApplicationStatus::Running
|
|
} else {
|
|
ApplicationStatus::Stopped
|
|
},
|
|
version: None,
|
|
install_path: None,
|
|
work_dir: None,
|
|
config_paths: Vec::new(),
|
|
log_paths: Vec::new(),
|
|
data_paths: Vec::new(),
|
|
ports: parse_port_numbers(
|
|
value
|
|
.get("Ports")
|
|
.and_then(Value::as_str)
|
|
.unwrap_or_default(),
|
|
),
|
|
domains: Vec::new(),
|
|
service_name: None,
|
|
container_id: Some(id),
|
|
compose_project,
|
|
package_name: None,
|
|
nginx_site: None,
|
|
run_user: None,
|
|
is_system: false,
|
|
is_managed: true,
|
|
is_lightops_managed: false,
|
|
metadata: value,
|
|
created_at: None,
|
|
updated_at: None,
|
|
})
|
|
}
|
|
|
|
struct NginxSiteAppProvider {
|
|
agent_id: String,
|
|
}
|
|
|
|
impl NginxSiteAppProvider {
|
|
fn new(agent_id: String) -> Self {
|
|
Self { agent_id }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ApplicationProvider for NginxSiteAppProvider {
|
|
async fn detect(&self) -> Result<bool> {
|
|
Ok(Path::new(NGINX_AVAILABLE).exists() || Path::new(NGINX_CONF_D).exists())
|
|
}
|
|
|
|
async fn list_apps(&self) -> Result<Vec<Application>> {
|
|
let mut sites = Vec::new();
|
|
for dir in [NGINX_AVAILABLE, NGINX_CONF_D] {
|
|
if let Ok(entries) = fs::read_dir(dir) {
|
|
for entry in entries.flatten() {
|
|
if entry.path().is_file() {
|
|
sites.push(site_app(&self.agent_id, entry.path()));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(sites)
|
|
}
|
|
|
|
async fn get_app(&self, id: &str) -> Result<ApplicationDetail> {
|
|
let app = self
|
|
.list_apps()
|
|
.await?
|
|
.into_iter()
|
|
.find(|app| app.id == id || app.nginx_site.as_deref() == Some(id))
|
|
.ok_or_else(|| anyhow!("应用不存在"))?;
|
|
Ok(detail(app))
|
|
}
|
|
|
|
async fn start(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn stop(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn restart(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn reload(&self, _id: &str) -> Result<()> {
|
|
command_checked(Command::new("systemctl").args(["reload", "nginx"])).await
|
|
}
|
|
|
|
async fn logs(&self, id: &str, lines: usize) -> Result<Vec<String>> {
|
|
let app = self.get_app(id).await?.application;
|
|
let mut out = Vec::new();
|
|
for path in app.log_paths {
|
|
out.extend(tail_file(&path, lines / 2).await.unwrap_or_default());
|
|
}
|
|
Ok(out)
|
|
}
|
|
|
|
async fn update(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn uninstall(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
}
|
|
|
|
fn site_app(agent_id: &str, path: PathBuf) -> Application {
|
|
let name = path
|
|
.file_name()
|
|
.map(|n| n.to_string_lossy().to_string())
|
|
.unwrap_or_else(|| "nginx-site".into());
|
|
let content = fs::read_to_string(&path).unwrap_or_default();
|
|
let domains = parse_nginx_domains(&content);
|
|
let ports = parse_nginx_listen_ports(&content);
|
|
let enabled = Path::new(NGINX_ENABLED).join(&name).exists() || path.starts_with(NGINX_CONF_D);
|
|
Application {
|
|
id: format!("nginx:{name}"),
|
|
agent_id: agent_id.to_string(),
|
|
name: name.clone(),
|
|
display_name: if domains.is_empty() {
|
|
name.clone()
|
|
} else {
|
|
domains.join(", ")
|
|
},
|
|
description: Some("Nginx site".into()),
|
|
app_type: if content.contains("proxy_pass") {
|
|
ApplicationType::ReverseProxy
|
|
} else {
|
|
ApplicationType::StaticSite
|
|
},
|
|
provider: ApplicationProviderType::NginxSite,
|
|
status: if enabled {
|
|
ApplicationStatus::Enabled
|
|
} else {
|
|
ApplicationStatus::Disabled
|
|
},
|
|
version: None,
|
|
install_path: None,
|
|
work_dir: parse_nginx_root(&content),
|
|
config_paths: vec![path.to_string_lossy().to_string()],
|
|
log_paths: parse_nginx_logs(&content),
|
|
data_paths: Vec::new(),
|
|
ports,
|
|
domains,
|
|
service_name: Some("nginx.service".into()),
|
|
container_id: None,
|
|
compose_project: None,
|
|
package_name: Some("nginx".into()),
|
|
nginx_site: Some(name),
|
|
run_user: None,
|
|
is_system: false,
|
|
is_managed: true,
|
|
is_lightops_managed: false,
|
|
metadata: json!({ "enabled": enabled, "source": "nginx" }),
|
|
created_at: None,
|
|
updated_at: None,
|
|
}
|
|
}
|
|
|
|
struct PackageAppProvider {
|
|
agent_id: String,
|
|
}
|
|
|
|
impl PackageAppProvider {
|
|
fn new(agent_id: String) -> Self {
|
|
Self { agent_id }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ApplicationProvider for PackageAppProvider {
|
|
async fn detect(&self) -> Result<bool> {
|
|
Ok(Path::new("/usr/bin/dpkg-query").exists())
|
|
}
|
|
|
|
async fn list_apps(&self) -> Result<Vec<Application>> {
|
|
let output =
|
|
command_output(Command::new("dpkg-query").args(["-W", "-f=${Package} ${Version}\n"]))
|
|
.await?;
|
|
let allowed = allowed_packages();
|
|
Ok(output
|
|
.lines()
|
|
.filter_map(|line| {
|
|
let mut parts = line.split_whitespace();
|
|
let package = parts.next()?.to_string();
|
|
if !allowed.contains(package.as_str()) {
|
|
return None;
|
|
}
|
|
let version = parts.next().map(ToString::to_string);
|
|
Some(Application {
|
|
id: format!("apt:{package}"),
|
|
agent_id: self.agent_id.clone(),
|
|
name: package.clone(),
|
|
display_name: human_name(&package),
|
|
description: Some("APT package with operational value".into()),
|
|
app_type: infer_app_type(&package),
|
|
provider: ApplicationProviderType::Apt,
|
|
status: ApplicationStatus::Unknown,
|
|
version,
|
|
install_path: None,
|
|
work_dir: None,
|
|
config_paths: common_config_paths(&package),
|
|
log_paths: Vec::new(),
|
|
data_paths: common_data_paths(&package),
|
|
ports: common_ports(&package),
|
|
domains: Vec::new(),
|
|
service_name: service_for_package(&package),
|
|
container_id: None,
|
|
compose_project: None,
|
|
package_name: Some(package),
|
|
nginx_site: None,
|
|
run_user: None,
|
|
is_system: false,
|
|
is_managed: true,
|
|
is_lightops_managed: false,
|
|
metadata: json!({ "source": "apt" }),
|
|
created_at: None,
|
|
updated_at: None,
|
|
})
|
|
})
|
|
.collect())
|
|
}
|
|
|
|
async fn get_app(&self, id: &str) -> Result<ApplicationDetail> {
|
|
let app = self
|
|
.list_apps()
|
|
.await?
|
|
.into_iter()
|
|
.find(|app| app.id == id || app.package_name.as_deref() == Some(id))
|
|
.ok_or_else(|| anyhow!("应用不存在"))?;
|
|
Ok(detail(app))
|
|
}
|
|
|
|
async fn start(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn stop(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn restart(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn reload(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn logs(&self, _id: &str, _lines: usize) -> Result<Vec<String>> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn update(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn uninstall(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
}
|
|
|
|
struct LightOpsManagedAppProvider {
|
|
agent_id: String,
|
|
}
|
|
|
|
impl LightOpsManagedAppProvider {
|
|
fn new(agent_id: String) -> Self {
|
|
Self { agent_id }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ApplicationProvider for LightOpsManagedAppProvider {
|
|
async fn detect(&self) -> Result<bool> {
|
|
Ok(true)
|
|
}
|
|
|
|
async fn list_apps(&self) -> Result<Vec<Application>> {
|
|
SystemdAppProvider::new(self.agent_id.clone())
|
|
.list_apps()
|
|
.await
|
|
.map(|apps| {
|
|
apps.into_iter()
|
|
.filter(|app| app.is_lightops_managed)
|
|
.collect()
|
|
})
|
|
}
|
|
|
|
async fn get_app(&self, id: &str) -> Result<ApplicationDetail> {
|
|
SystemdAppProvider::new(self.agent_id.clone())
|
|
.get_app(id)
|
|
.await
|
|
}
|
|
|
|
async fn start(&self, id: &str) -> Result<()> {
|
|
SystemdAppProvider::new(self.agent_id.clone())
|
|
.start(id)
|
|
.await
|
|
}
|
|
|
|
async fn stop(&self, id: &str) -> Result<()> {
|
|
SystemdAppProvider::new(self.agent_id.clone())
|
|
.stop(id)
|
|
.await
|
|
}
|
|
|
|
async fn restart(&self, id: &str) -> Result<()> {
|
|
SystemdAppProvider::new(self.agent_id.clone())
|
|
.restart(id)
|
|
.await
|
|
}
|
|
|
|
async fn reload(&self, id: &str) -> Result<()> {
|
|
SystemdAppProvider::new(self.agent_id.clone())
|
|
.reload(id)
|
|
.await
|
|
}
|
|
|
|
async fn logs(&self, id: &str, lines: usize) -> Result<Vec<String>> {
|
|
SystemdAppProvider::new(self.agent_id.clone())
|
|
.logs(id, lines)
|
|
.await
|
|
}
|
|
|
|
async fn update(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
|
|
async fn uninstall(&self, _id: &str) -> Result<()> {
|
|
bail!("不支持的操作")
|
|
}
|
|
}
|
|
|
|
fn detail(application: Application) -> ApplicationDetail {
|
|
ApplicationDetail {
|
|
application,
|
|
relations: Vec::new(),
|
|
recent_actions: Vec::new(),
|
|
runtime_info: json!({}),
|
|
available_actions: vec![
|
|
"start".into(),
|
|
"stop".into(),
|
|
"restart".into(),
|
|
"logs".into(),
|
|
"health".into(),
|
|
"backup".into(),
|
|
],
|
|
risk_level: "normal".into(),
|
|
provider_specific_info: json!({}),
|
|
}
|
|
}
|
|
|
|
fn build_relations(apps: &[Application], relations: &mut Vec<ApplicationRelation>) {
|
|
for nginx in apps
|
|
.iter()
|
|
.filter(|app| app.provider == ApplicationProviderType::NginxSite)
|
|
{
|
|
for port in &nginx.ports {
|
|
for target in apps.iter().filter(|app| {
|
|
app.provider != ApplicationProviderType::NginxSite && app.ports.contains(port)
|
|
}) {
|
|
relations.push(ApplicationRelation {
|
|
id: Uuid::new_v4().to_string(),
|
|
agent_id: nginx.agent_id.clone(),
|
|
app_id: nginx.id.clone(),
|
|
relation_type: "proxy_to_port".into(),
|
|
target_id: Some(target.id.clone()),
|
|
target_name: Some(target.display_name.clone()),
|
|
metadata: json!({ "port": port }),
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn enrich_with_ports(apps: &mut [Application]) {
|
|
let Ok(output) = command_output(Command::new("ss").args(["-tulpen"])).await else {
|
|
return;
|
|
};
|
|
for line in output.lines() {
|
|
let Some(port) = line
|
|
.rsplit(':')
|
|
.find_map(|part| part.split_whitespace().next()?.parse::<u16>().ok())
|
|
else {
|
|
continue;
|
|
};
|
|
for app in apps.iter_mut() {
|
|
if line.to_lowercase().contains(&app.name.to_lowercase()) && !app.ports.contains(&port)
|
|
{
|
|
app.ports.push(port);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn tail_file(path: &str, lines: usize) -> Result<Vec<String>> {
|
|
let line_count = lines.min(2000).to_string();
|
|
let output = command_output(Command::new("tail").arg("-n").arg(line_count).arg(path)).await?;
|
|
Ok(output.lines().map(ToString::to_string).collect())
|
|
}
|
|
|
|
async fn command_output(cmd: &mut Command) -> Result<String> {
|
|
let output = tokio::time::timeout(Duration::from_secs(8), cmd.output()).await??;
|
|
if !output.status.success() {
|
|
bail!("{}", String::from_utf8_lossy(&output.stderr));
|
|
}
|
|
Ok(String::from_utf8_lossy(&output.stdout).to_string())
|
|
}
|
|
|
|
async fn command_checked(cmd: &mut Command) -> Result<()> {
|
|
command_output(cmd).await.map(|_| ())
|
|
}
|
|
|
|
async fn command_status(cmd: &mut Command) -> Result<bool> {
|
|
let output = tokio::time::timeout(Duration::from_secs(5), cmd.output()).await??;
|
|
Ok(output.status.success())
|
|
}
|
|
|
|
async fn ensure_user_exists(user: &str) -> Result<()> {
|
|
if user.len() > 64
|
|
|| !user
|
|
.chars()
|
|
.all(|c| c.is_ascii_alphanumeric() || "_-".contains(c))
|
|
{
|
|
bail!("运行用户无效");
|
|
}
|
|
command_checked(Command::new("id").arg("-u").arg(user)).await
|
|
}
|
|
|
|
fn string_param(params: &Value, name: &str) -> Result<String> {
|
|
params
|
|
.get(name)
|
|
.and_then(Value::as_str)
|
|
.map(ToString::to_string)
|
|
.ok_or_else(|| anyhow!("缺少参数:{name}"))
|
|
}
|
|
|
|
fn safe_app_name(name: &str) -> Result<String> {
|
|
let safe = name
|
|
.to_lowercase()
|
|
.chars()
|
|
.map(|c| {
|
|
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
|
|
c
|
|
} else {
|
|
'-'
|
|
}
|
|
})
|
|
.collect::<String>()
|
|
.trim_matches('-')
|
|
.to_string();
|
|
if safe.is_empty() || safe.len() > 64 {
|
|
bail!("应用名称无效");
|
|
}
|
|
Ok(safe)
|
|
}
|
|
|
|
fn validate_start_command(command: &str) -> Result<()> {
|
|
if command.len() > 500
|
|
|| command.contains('\0')
|
|
|| command.contains("&&")
|
|
|| command.contains("||")
|
|
|| command.contains(';')
|
|
|| command.contains('`')
|
|
|| command.contains("$(")
|
|
{
|
|
bail!("启动命令不安全");
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn validate_url_target(url: &str) -> Result<()> {
|
|
if url.len() > 500
|
|
|| url.contains('\0')
|
|
|| url.chars().any(char::is_whitespace)
|
|
|| !(url.starts_with("http://") || url.starts_with("https://"))
|
|
{
|
|
bail!("健康检查 URL 无效");
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn validate_host_target(host: &str) -> Result<()> {
|
|
if host.is_empty()
|
|
|| host.len() > 253
|
|
|| host.contains('\0')
|
|
|| host.chars().any(char::is_whitespace)
|
|
|| host.starts_with('-')
|
|
{
|
|
bail!("健康检查主机无效");
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn sanitize_env_key(key: &str) -> Result<&str> {
|
|
if key.is_empty()
|
|
|| key.len() > 80
|
|
|| !key
|
|
.chars()
|
|
.all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
|
|
{
|
|
bail!("环境变量名无效");
|
|
}
|
|
Ok(key)
|
|
}
|
|
|
|
fn validate_unit(name: &str) -> Result<()> {
|
|
if name.len() > 200
|
|
|| !name
|
|
.chars()
|
|
.all(|c| c.is_ascii_alphanumeric() || ".@_-".contains(c))
|
|
{
|
|
bail!("服务名称无效");
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn validate_docker_id(id: &str) -> Result<()> {
|
|
if id.len() > 200
|
|
|| !id
|
|
.chars()
|
|
.all(|c| c.is_ascii_alphanumeric() || ".:/@_-".contains(c))
|
|
{
|
|
bail!("Docker 标识无效");
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn validate_compose_project(project: &str) -> Result<()> {
|
|
if project.is_empty()
|
|
|| project.len() > 128
|
|
|| !project
|
|
.chars()
|
|
.all(|c| c.is_ascii_alphanumeric() || "._-".contains(c))
|
|
{
|
|
bail!("Compose 项目名称无效");
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn is_manageable_service(base: &str, service: &str, description: &str) -> bool {
|
|
base.starts_with("lightops-")
|
|
|| allowed_packages().contains(base)
|
|
|| allowed_packages().contains(service.trim_end_matches(".service"))
|
|
|| description.to_lowercase().contains("docker")
|
|
|| description.to_lowercase().contains("nginx")
|
|
|| description.to_lowercase().contains("redis")
|
|
}
|
|
|
|
fn is_core_system_name(name: &str) -> bool {
|
|
let blocked = [
|
|
"systemd",
|
|
"dbus",
|
|
"udev",
|
|
"apt",
|
|
"cron",
|
|
"ssh",
|
|
"getty",
|
|
"networking",
|
|
"rsyslog",
|
|
"polkit",
|
|
];
|
|
blocked
|
|
.iter()
|
|
.any(|prefix| name == *prefix || name.starts_with(&format!("{prefix}-")))
|
|
}
|
|
|
|
fn allowed_packages() -> HashSet<&'static str> {
|
|
[
|
|
"nginx",
|
|
"apache2",
|
|
"caddy",
|
|
"redis-server",
|
|
"mysql-server",
|
|
"mariadb-server",
|
|
"postgresql",
|
|
"docker",
|
|
"docker-ce",
|
|
"containerd",
|
|
"fail2ban",
|
|
"supervisor",
|
|
"frps",
|
|
"frpc",
|
|
"gitea",
|
|
"alist",
|
|
"minio",
|
|
"prometheus",
|
|
"grafana",
|
|
]
|
|
.into_iter()
|
|
.collect()
|
|
}
|
|
|
|
fn infer_app_type(name: &str) -> ApplicationType {
|
|
match name {
|
|
"redis-server" | "mysql-server" | "mariadb-server" | "postgresql" => {
|
|
ApplicationType::Database
|
|
}
|
|
"nginx" | "apache2" | "caddy" => ApplicationType::WebApp,
|
|
"docker" | "docker-ce" | "containerd" => ApplicationType::Runtime,
|
|
_ => ApplicationType::Service,
|
|
}
|
|
}
|
|
|
|
fn human_name(name: &str) -> String {
|
|
name.split(['-', '_'])
|
|
.filter(|part| !part.is_empty())
|
|
.map(|part| {
|
|
let mut chars = part.chars();
|
|
match chars.next() {
|
|
Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(),
|
|
None => String::new(),
|
|
}
|
|
})
|
|
.collect::<Vec<_>>()
|
|
.join(" ")
|
|
}
|
|
|
|
fn package_for_service(base: &str) -> Option<String> {
|
|
allowed_packages().contains(base).then(|| base.to_string())
|
|
}
|
|
|
|
fn service_for_package(package: &str) -> Option<String> {
|
|
Some(match package {
|
|
"docker-ce" => "docker.service".to_string(),
|
|
"redis-server" => "redis-server.service".to_string(),
|
|
_ => format!("{package}.service"),
|
|
})
|
|
}
|
|
|
|
fn common_config_paths(name: &str) -> Vec<String> {
|
|
match name {
|
|
"nginx" => vec!["/etc/nginx/nginx.conf".into()],
|
|
"redis-server" => vec!["/etc/redis/redis.conf".into()],
|
|
"postgresql" => vec!["/etc/postgresql".into()],
|
|
"mysql-server" | "mariadb-server" => vec!["/etc/mysql".into()],
|
|
_ => Vec::new(),
|
|
}
|
|
}
|
|
|
|
fn common_data_paths(name: &str) -> Vec<String> {
|
|
match name {
|
|
"redis-server" => vec!["/var/lib/redis".into()],
|
|
"postgresql" => vec!["/var/lib/postgresql".into()],
|
|
"mysql-server" | "mariadb-server" => vec!["/var/lib/mysql".into()],
|
|
_ => Vec::new(),
|
|
}
|
|
}
|
|
|
|
fn common_ports(name: &str) -> Vec<u16> {
|
|
match name {
|
|
"nginx" | "apache2" | "caddy" => vec![80, 443],
|
|
"redis-server" => vec![6379],
|
|
"mysql-server" | "mariadb-server" => vec![3306],
|
|
"postgresql" => vec![5432],
|
|
"prometheus" => vec![9090],
|
|
"grafana" => vec![3000],
|
|
_ => Vec::new(),
|
|
}
|
|
}
|
|
|
|
fn parse_port_numbers(input: &str) -> Vec<u16> {
|
|
let mut ports = Vec::new();
|
|
for token in input.split([',', ' ', '-', '>']) {
|
|
let cleaned = token
|
|
.trim()
|
|
.trim_end_matches("/tcp")
|
|
.trim_end_matches("/udp");
|
|
if let Some(port) = cleaned
|
|
.rsplit(':')
|
|
.next()
|
|
.and_then(|v| v.parse::<u16>().ok())
|
|
.or_else(|| cleaned.parse::<u16>().ok())
|
|
{
|
|
if !ports.contains(&port) {
|
|
ports.push(port);
|
|
}
|
|
}
|
|
}
|
|
ports
|
|
}
|
|
|
|
fn parse_nginx_domains(content: &str) -> Vec<String> {
|
|
content
|
|
.lines()
|
|
.filter_map(|line| line.trim().strip_prefix("server_name"))
|
|
.flat_map(|line| {
|
|
line.trim()
|
|
.trim_end_matches(';')
|
|
.split_whitespace()
|
|
.map(ToString::to_string)
|
|
.collect::<Vec<_>>()
|
|
})
|
|
.filter(|domain| domain != "_")
|
|
.collect()
|
|
}
|
|
|
|
fn parse_nginx_listen_ports(content: &str) -> Vec<u16> {
|
|
content
|
|
.lines()
|
|
.filter_map(|line| line.trim().strip_prefix("listen"))
|
|
.filter_map(|line| {
|
|
line.split_whitespace()
|
|
.next()?
|
|
.trim_end_matches(';')
|
|
.parse::<u16>()
|
|
.ok()
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
fn parse_nginx_root(content: &str) -> Option<String> {
|
|
content.lines().find_map(|line| {
|
|
line.trim()
|
|
.strip_prefix("root")
|
|
.map(|v| v.trim().trim_end_matches(';').to_string())
|
|
})
|
|
}
|
|
|
|
fn parse_nginx_logs(content: &str) -> Vec<String> {
|
|
let mut paths = content
|
|
.lines()
|
|
.filter_map(|line| {
|
|
let line = line.trim();
|
|
if line.starts_with("access_log") || line.starts_with("error_log") {
|
|
line.split_whitespace()
|
|
.nth(1)
|
|
.map(|v| v.trim_end_matches(';').to_string())
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect::<Vec<_>>();
|
|
if paths.is_empty() {
|
|
paths.push("/var/log/nginx/access.log".into());
|
|
paths.push("/var/log/nginx/error.log".into());
|
|
}
|
|
paths
|
|
}
|
|
|
|
fn agent_id() -> String {
|
|
std::env::var("LIGHTOPS_AGENT_ID").unwrap_or_else(|_| "local".into())
|
|
}
|