mod actions; mod app; mod config; mod system_info; mod terminal; use anyhow::{Context, Result}; use clap::Parser; use config::AgentConfig; use dashmap::DashMap; use futures_util::{SinkExt, StreamExt}; use lightops_common::protocol::{AgentCapabilities, AgentMessage, ServerMessage}; use std::{ sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; use tokio::sync::mpsc; use tokio_tungstenite::{connect_async, tungstenite::Message}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; const CONNECT_TIMEOUT_SECS: u64 = 15; const HANDSHAKE_TIMEOUT_SECS: u64 = 15; const READ_GRACE_SECS: u64 = 100; const MAX_RECONNECT_BACKOFF_SECS: u64 = 60; #[derive(Debug, Parser)] struct Args { #[arg(long)] server: Option, #[arg(long)] token: Option, #[arg(long)] config: Option, #[arg(long)] name: Option, } #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()), ) .with(tracing_subscriber::fmt::layer()) .init(); let args = Args::parse(); let config_path = args.config.clone().unwrap_or_else(|| default_config_path()); let mut cfg = AgentConfig::load_optional(&config_path)?; if let Some(server) = args.server { cfg.server_url = server; } if let Some(token) = args.token { cfg.token = Some(token); } if let Some(name) = args.name { cfg.name = Some(name); } cfg.config_path = Some(config_path); run_forever(cfg).await } fn default_config_path() -> String { #[cfg(windows)] { "agent.toml".to_string() } #[cfg(not(windows))] { "/etc/lightops/agent.toml".to_string() } } async fn run_forever(mut cfg: AgentConfig) -> Result<()> { let mut backoff = 1u64; loop { match run_once(cfg.clone()).await { Ok(updated) => { cfg = updated; tracing::warn!("Agent 连接已断开,准备重连"); tokio::time::sleep(reconnect_delay(1)).await; backoff = 1; } Err(err) => { tracing::warn!(?err, backoff, "Agent 连接失败,等待后重试"); tokio::time::sleep(reconnect_delay(backoff)).await; backoff = (backoff * 2).min(MAX_RECONNECT_BACKOFF_SECS); } } } } async fn run_once(mut cfg: AgentConfig) -> Result { let ws_url = cfg.ws_url()?; tracing::info!("正在连接主控端 {}", ws_url); let (ws, _) = tokio::time::timeout( Duration::from_secs(CONNECT_TIMEOUT_SECS), connect_async(ws_url), ) .await .context("连接主控端超时")? .context("连接主控端 WebSocket 失败")?; let (mut write, mut read) = ws.split(); let (tx, mut rx) = mpsc::unbounded_channel::(); let streams = Arc::new(DashMap::new()); let hello = AgentMessage::AgentHello { agent_id: cfg.agent_id.clone(), token: cfg.token.clone(), secret: cfg.secret.clone(), hostname: hostname(), os: std::env::consts::OS.to_string(), arch: std::env::consts::ARCH.to_string(), version: env!("CARGO_PKG_VERSION").to_string(), capabilities: AgentCapabilities::default(), }; write .send(Message::Text(serde_json::to_string(&hello)?)) .await?; let Some(Ok(Message::Text(first))) = tokio::time::timeout(Duration::from_secs(HANDSHAKE_TIMEOUT_SECS), read.next()) .await .context("等待主控端握手响应超时")? else { anyhow::bail!("Server 在接受 Agent 前关闭了连接"); }; match serde_json::from_str::(&first)? { ServerMessage::AgentAccepted { agent_id, secret } => { cfg.agent_id = Some(agent_id); if let Some(id) = cfg.agent_id.as_deref() { std::env::set_var("LIGHTOPS_AGENT_ID", id); } if let Some(secret) = secret { cfg.secret = Some(secret); cfg.token = None; } if let Some(path) = cfg.config_path.as_deref() { cfg.save(path)?; } } ServerMessage::ErrorMessage { message, .. } => anyhow::bail!(message), _ => anyhow::bail!("Server 首条消息不符合预期"), } let writer = tokio::spawn(async move { while let Some(msg) = rx.recv().await { let Ok(text) = serde_json::to_string(&msg) else { continue; }; if write.send(Message::Text(text)).await.is_err() { break; } } }); let heartbeat_tx = tx.clone(); let heartbeat_id = cfg.agent_id.clone().unwrap_or_default(); let heartbeat_interval = cfg.heartbeat_interval.unwrap_or(30).max(10); let heartbeat = tokio::spawn(async move { loop { let metrics = system_info::collect_metrics(); let _ = heartbeat_tx.send(AgentMessage::AgentHeartbeat { agent_id: heartbeat_id.clone(), metrics: Some(metrics), }); tokio::time::sleep(Duration::from_secs(heartbeat_interval)).await; } }); loop { let msg = tokio::time::timeout(Duration::from_secs(READ_GRACE_SECS), read.next()).await; match msg { Ok(Some(Ok(Message::Text(text)))) => { let server_msg = serde_json::from_str::(&text)?; handle_server_message(server_msg, tx.clone(), streams.clone(), &cfg).await; } Ok(Some(Ok(Message::Close(_)))) | Ok(None) => break, Ok(Some(Ok(_))) => {} Ok(Some(Err(err))) => return Err(err).context("读取主控端消息失败"), Err(_) => anyhow::bail!("主控端连接静默超时"), } } heartbeat.abort(); writer.abort(); Ok(cfg) } fn reconnect_delay(base_secs: u64) -> Duration { let jitter_ms = SystemTime::now() .duration_since(UNIX_EPOCH) .map(|d| (d.subsec_millis() % 1000) as u64) .unwrap_or(0); Duration::from_millis(base_secs.saturating_mul(1000).saturating_add(jitter_ms)) } async fn handle_server_message( msg: ServerMessage, tx: mpsc::UnboundedSender, streams: Arc>, cfg: &AgentConfig, ) { match msg { ServerMessage::ServerPing { timestamp } => { let agent_id = cfg.agent_id.clone().unwrap_or_default(); let _ = tx.send(AgentMessage::AgentPong { agent_id, timestamp, }); } ServerMessage::TaskRequest { task_id, action, params, } => { tokio::spawn(async move { let _ = tx.send(AgentMessage::TaskEvent { task_id: task_id.clone(), level: "info".into(), message: format!("开始执行 {action}"), data: serde_json::json!({ "action": action }), }); let result = actions::handle(&action, params).await; let response = match result { Ok(data) => { emit_task_output_events(&tx, &task_id, &data); let _ = tx.send(AgentMessage::TaskEvent { task_id: task_id.clone(), level: "info".into(), message: "任务执行完成".into(), data: serde_json::json!({ "success": true }), }); AgentMessage::TaskResponse { task_id, success: true, data, error: None, } } Err(err) => { let error = err.to_string(); let _ = tx.send(AgentMessage::TaskEvent { task_id: task_id.clone(), level: "error".into(), message: "任务执行失败".into(), data: serde_json::json!({ "error": error }), }); AgentMessage::TaskResponse { task_id, success: false, data: serde_json::json!({}), error: Some(error), } } }; let _ = tx.send(response); }); } ServerMessage::StreamOpen { stream_id, kind, meta, } => { if kind == "terminal" || kind == "docker.exec" { let result = if kind == "docker.exec" { terminal::open_docker_exec(stream_id.clone(), tx.clone(), meta) } else { terminal::open(stream_id.clone(), tx.clone(), meta) }; match result { Ok(handle) => { streams.insert(stream_id, handle); } Err(err) => { let _ = tx.send(AgentMessage::StreamClose { stream_id, reason: Some(err.to_string()), }); } } } } ServerMessage::StreamData { stream_id, data, binary, } => { if let Some(handle) = streams.get(&stream_id) { let _ = handle.write(data, binary); } } ServerMessage::StreamClose { stream_id, .. } => { streams.remove(&stream_id); } ServerMessage::AgentAccepted { .. } => {} ServerMessage::ErrorMessage { code, message } => { tracing::warn!(%code, %message, "主控端返回连接错误"); } } } fn emit_task_output_events( tx: &mpsc::UnboundedSender, task_id: &str, data: &serde_json::Value, ) { for key in ["stdout", "stderr", "pull_stdout", "pull_stderr"] { let Some(value) = data.get(key).and_then(serde_json::Value::as_str) else { continue; }; let text = value.trim(); if text.is_empty() { continue; } let level = if key.contains("stderr") { "warn" } else { "info" }; let _ = tx.send(AgentMessage::TaskEvent { task_id: task_id.to_string(), level: level.into(), message: key.to_string(), data: serde_json::json!({ "output": truncate_event_text(text) }), }); } } fn truncate_event_text(text: &str) -> String { const MAX_EVENT_TEXT: usize = 16 * 1024; if text.len() <= MAX_EVENT_TEXT { text.to_string() } else { format!("{}...(输出过长,已截断)", &text[..MAX_EVENT_TEXT]) } } fn hostname() -> String { std::env::var("HOSTNAME") .or_else(|_| std::env::var("COMPUTERNAME")) .unwrap_or_else(|_| "lightops-node".to_string()) }