实现 LightOps 运维面板基础功能
This commit is contained in:
343
crates/lightops-agent/src/main.rs
Normal file
343
crates/lightops-agent/src/main.rs
Normal file
@@ -0,0 +1,343 @@
|
||||
mod actions;
|
||||
mod app;
|
||||
mod config;
|
||||
mod system_info;
|
||||
mod terminal;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use clap::Parser;
|
||||
use config::AgentConfig;
|
||||
use dashmap::DashMap;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use lightops_common::protocol::{AgentCapabilities, AgentMessage, ServerMessage};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_tungstenite::{connect_async, tungstenite::Message};
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
const CONNECT_TIMEOUT_SECS: u64 = 15;
|
||||
const HANDSHAKE_TIMEOUT_SECS: u64 = 15;
|
||||
const READ_GRACE_SECS: u64 = 100;
|
||||
const MAX_RECONNECT_BACKOFF_SECS: u64 = 60;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
struct Args {
|
||||
#[arg(long)]
|
||||
server: Option<String>,
|
||||
#[arg(long)]
|
||||
token: Option<String>,
|
||||
#[arg(long)]
|
||||
config: Option<String>,
|
||||
#[arg(long)]
|
||||
name: Option<String>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
tracing_subscriber::registry()
|
||||
.with(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
|
||||
)
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.init();
|
||||
|
||||
let args = Args::parse();
|
||||
let config_path = args.config.clone().unwrap_or_else(|| default_config_path());
|
||||
let mut cfg = AgentConfig::load_optional(&config_path)?;
|
||||
if let Some(server) = args.server {
|
||||
cfg.server_url = server;
|
||||
}
|
||||
if let Some(token) = args.token {
|
||||
cfg.token = Some(token);
|
||||
}
|
||||
if let Some(name) = args.name {
|
||||
cfg.name = Some(name);
|
||||
}
|
||||
cfg.config_path = Some(config_path);
|
||||
run_forever(cfg).await
|
||||
}
|
||||
|
||||
fn default_config_path() -> String {
|
||||
#[cfg(windows)]
|
||||
{
|
||||
"agent.toml".to_string()
|
||||
}
|
||||
#[cfg(not(windows))]
|
||||
{
|
||||
"/etc/lightops/agent.toml".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_forever(mut cfg: AgentConfig) -> Result<()> {
|
||||
let mut backoff = 1u64;
|
||||
loop {
|
||||
match run_once(cfg.clone()).await {
|
||||
Ok(updated) => {
|
||||
cfg = updated;
|
||||
tracing::warn!("Agent 连接已断开,准备重连");
|
||||
tokio::time::sleep(reconnect_delay(1)).await;
|
||||
backoff = 1;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(?err, backoff, "Agent 连接失败,等待后重试");
|
||||
tokio::time::sleep(reconnect_delay(backoff)).await;
|
||||
backoff = (backoff * 2).min(MAX_RECONNECT_BACKOFF_SECS);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_once(mut cfg: AgentConfig) -> Result<AgentConfig> {
|
||||
let ws_url = cfg.ws_url()?;
|
||||
tracing::info!("正在连接主控端 {}", ws_url);
|
||||
let (ws, _) = tokio::time::timeout(
|
||||
Duration::from_secs(CONNECT_TIMEOUT_SECS),
|
||||
connect_async(ws_url),
|
||||
)
|
||||
.await
|
||||
.context("连接主控端超时")?
|
||||
.context("连接主控端 WebSocket 失败")?;
|
||||
let (mut write, mut read) = ws.split();
|
||||
let (tx, mut rx) = mpsc::unbounded_channel::<AgentMessage>();
|
||||
let streams = Arc::new(DashMap::new());
|
||||
|
||||
let hello = AgentMessage::AgentHello {
|
||||
agent_id: cfg.agent_id.clone(),
|
||||
token: cfg.token.clone(),
|
||||
secret: cfg.secret.clone(),
|
||||
hostname: hostname(),
|
||||
os: std::env::consts::OS.to_string(),
|
||||
arch: std::env::consts::ARCH.to_string(),
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
capabilities: AgentCapabilities::default(),
|
||||
};
|
||||
write
|
||||
.send(Message::Text(serde_json::to_string(&hello)?))
|
||||
.await?;
|
||||
|
||||
let Some(Ok(Message::Text(first))) =
|
||||
tokio::time::timeout(Duration::from_secs(HANDSHAKE_TIMEOUT_SECS), read.next())
|
||||
.await
|
||||
.context("等待主控端握手响应超时")?
|
||||
else {
|
||||
anyhow::bail!("Server 在接受 Agent 前关闭了连接");
|
||||
};
|
||||
match serde_json::from_str::<ServerMessage>(&first)? {
|
||||
ServerMessage::AgentAccepted { agent_id, secret } => {
|
||||
cfg.agent_id = Some(agent_id);
|
||||
if let Some(id) = cfg.agent_id.as_deref() {
|
||||
std::env::set_var("LIGHTOPS_AGENT_ID", id);
|
||||
}
|
||||
if let Some(secret) = secret {
|
||||
cfg.secret = Some(secret);
|
||||
cfg.token = None;
|
||||
}
|
||||
if let Some(path) = cfg.config_path.as_deref() {
|
||||
cfg.save(path)?;
|
||||
}
|
||||
}
|
||||
ServerMessage::ErrorMessage { message, .. } => anyhow::bail!(message),
|
||||
_ => anyhow::bail!("Server 首条消息不符合预期"),
|
||||
}
|
||||
|
||||
let writer = tokio::spawn(async move {
|
||||
while let Some(msg) = rx.recv().await {
|
||||
let Ok(text) = serde_json::to_string(&msg) else {
|
||||
continue;
|
||||
};
|
||||
if write.send(Message::Text(text)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let heartbeat_tx = tx.clone();
|
||||
let heartbeat_id = cfg.agent_id.clone().unwrap_or_default();
|
||||
let heartbeat_interval = cfg.heartbeat_interval.unwrap_or(30).max(10);
|
||||
let heartbeat = tokio::spawn(async move {
|
||||
loop {
|
||||
let metrics = system_info::collect_metrics();
|
||||
let _ = heartbeat_tx.send(AgentMessage::AgentHeartbeat {
|
||||
agent_id: heartbeat_id.clone(),
|
||||
metrics: Some(metrics),
|
||||
});
|
||||
tokio::time::sleep(Duration::from_secs(heartbeat_interval)).await;
|
||||
}
|
||||
});
|
||||
|
||||
loop {
|
||||
let msg = tokio::time::timeout(Duration::from_secs(READ_GRACE_SECS), read.next()).await;
|
||||
match msg {
|
||||
Ok(Some(Ok(Message::Text(text)))) => {
|
||||
let server_msg = serde_json::from_str::<ServerMessage>(&text)?;
|
||||
handle_server_message(server_msg, tx.clone(), streams.clone(), &cfg).await;
|
||||
}
|
||||
Ok(Some(Ok(Message::Close(_)))) | Ok(None) => break,
|
||||
Ok(Some(Ok(_))) => {}
|
||||
Ok(Some(Err(err))) => return Err(err).context("读取主控端消息失败"),
|
||||
Err(_) => anyhow::bail!("主控端连接静默超时"),
|
||||
}
|
||||
}
|
||||
|
||||
heartbeat.abort();
|
||||
writer.abort();
|
||||
Ok(cfg)
|
||||
}
|
||||
|
||||
fn reconnect_delay(base_secs: u64) -> Duration {
|
||||
let jitter_ms = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|d| (d.subsec_millis() % 1000) as u64)
|
||||
.unwrap_or(0);
|
||||
Duration::from_millis(base_secs.saturating_mul(1000).saturating_add(jitter_ms))
|
||||
}
|
||||
|
||||
async fn handle_server_message(
|
||||
msg: ServerMessage,
|
||||
tx: mpsc::UnboundedSender<AgentMessage>,
|
||||
streams: Arc<DashMap<String, terminal::TerminalHandle>>,
|
||||
cfg: &AgentConfig,
|
||||
) {
|
||||
match msg {
|
||||
ServerMessage::ServerPing { timestamp } => {
|
||||
let agent_id = cfg.agent_id.clone().unwrap_or_default();
|
||||
let _ = tx.send(AgentMessage::AgentPong {
|
||||
agent_id,
|
||||
timestamp,
|
||||
});
|
||||
}
|
||||
ServerMessage::TaskRequest {
|
||||
task_id,
|
||||
action,
|
||||
params,
|
||||
} => {
|
||||
tokio::spawn(async move {
|
||||
let _ = tx.send(AgentMessage::TaskEvent {
|
||||
task_id: task_id.clone(),
|
||||
level: "info".into(),
|
||||
message: format!("开始执行 {action}"),
|
||||
data: serde_json::json!({ "action": action }),
|
||||
});
|
||||
let result = actions::handle(&action, params).await;
|
||||
let response = match result {
|
||||
Ok(data) => {
|
||||
emit_task_output_events(&tx, &task_id, &data);
|
||||
let _ = tx.send(AgentMessage::TaskEvent {
|
||||
task_id: task_id.clone(),
|
||||
level: "info".into(),
|
||||
message: "任务执行完成".into(),
|
||||
data: serde_json::json!({ "success": true }),
|
||||
});
|
||||
AgentMessage::TaskResponse {
|
||||
task_id,
|
||||
success: true,
|
||||
data,
|
||||
error: None,
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let error = err.to_string();
|
||||
let _ = tx.send(AgentMessage::TaskEvent {
|
||||
task_id: task_id.clone(),
|
||||
level: "error".into(),
|
||||
message: "任务执行失败".into(),
|
||||
data: serde_json::json!({ "error": error }),
|
||||
});
|
||||
AgentMessage::TaskResponse {
|
||||
task_id,
|
||||
success: false,
|
||||
data: serde_json::json!({}),
|
||||
error: Some(error),
|
||||
}
|
||||
}
|
||||
};
|
||||
let _ = tx.send(response);
|
||||
});
|
||||
}
|
||||
ServerMessage::StreamOpen {
|
||||
stream_id,
|
||||
kind,
|
||||
meta,
|
||||
} => {
|
||||
if kind == "terminal" || kind == "docker.exec" {
|
||||
let result = if kind == "docker.exec" {
|
||||
terminal::open_docker_exec(stream_id.clone(), tx.clone(), meta)
|
||||
} else {
|
||||
terminal::open(stream_id.clone(), tx.clone(), meta)
|
||||
};
|
||||
match result {
|
||||
Ok(handle) => {
|
||||
streams.insert(stream_id, handle);
|
||||
}
|
||||
Err(err) => {
|
||||
let _ = tx.send(AgentMessage::StreamClose {
|
||||
stream_id,
|
||||
reason: Some(err.to_string()),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ServerMessage::StreamData {
|
||||
stream_id,
|
||||
data,
|
||||
binary,
|
||||
} => {
|
||||
if let Some(handle) = streams.get(&stream_id) {
|
||||
let _ = handle.write(data, binary);
|
||||
}
|
||||
}
|
||||
ServerMessage::StreamClose { stream_id, .. } => {
|
||||
streams.remove(&stream_id);
|
||||
}
|
||||
ServerMessage::AgentAccepted { .. } => {}
|
||||
ServerMessage::ErrorMessage { code, message } => {
|
||||
tracing::warn!(%code, %message, "主控端返回连接错误");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_task_output_events(
|
||||
tx: &mpsc::UnboundedSender<AgentMessage>,
|
||||
task_id: &str,
|
||||
data: &serde_json::Value,
|
||||
) {
|
||||
for key in ["stdout", "stderr", "pull_stdout", "pull_stderr"] {
|
||||
let Some(value) = data.get(key).and_then(serde_json::Value::as_str) else {
|
||||
continue;
|
||||
};
|
||||
let text = value.trim();
|
||||
if text.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let level = if key.contains("stderr") {
|
||||
"warn"
|
||||
} else {
|
||||
"info"
|
||||
};
|
||||
let _ = tx.send(AgentMessage::TaskEvent {
|
||||
task_id: task_id.to_string(),
|
||||
level: level.into(),
|
||||
message: key.to_string(),
|
||||
data: serde_json::json!({ "output": truncate_event_text(text) }),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn truncate_event_text(text: &str) -> String {
|
||||
const MAX_EVENT_TEXT: usize = 16 * 1024;
|
||||
if text.len() <= MAX_EVENT_TEXT {
|
||||
text.to_string()
|
||||
} else {
|
||||
format!("{}...(输出过长,已截断)", &text[..MAX_EVENT_TEXT])
|
||||
}
|
||||
}
|
||||
|
||||
fn hostname() -> String {
|
||||
std::env::var("HOSTNAME")
|
||||
.or_else(|_| std::env::var("COMPUTERNAME"))
|
||||
.unwrap_or_else(|_| "lightops-node".to_string())
|
||||
}
|
||||
Reference in New Issue
Block a user