Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions crates/talos-pilot-tui/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Action {
ShowNodeDetails(String, String), // cluster, node
/// Show multi-service logs: (node_ip, node_role, active_service_ids, all_service_ids)
ShowMultiLogs(String, String, Vec<String>, Vec<String>),
/// Show multi-node logs: (group_name, node_role, Vec<(hostname, ip)>, services)
ShowGroupLogs(String, String, Vec<(String, String)>, Vec<String>),
/// Show etcd cluster status
ShowEtcd,
/// Show processes for a node: (hostname, address)
Expand All @@ -46,6 +48,16 @@ pub enum Action {
ShowWorkloads,
/// Show storage/disks view for a node: (hostname, address)
ShowStorage(String, String),

/// Show group processes: (group_name, Vec<(hostname, ip)>)
ShowGroupProcesses(String, Vec<(String, String)>),
/// Show group network: (group_name, Vec<(hostname, ip)>)
ShowGroupNetwork(String, Vec<(String, String)>),
/// Show group storage: (group_name, Vec<(hostname, ip)>)
ShowGroupStorage(String, Vec<(String, String)>),
/// Show group diagnostics: (group_name, node_role, Vec<(hostname, ip)>, cp_endpoint)
ShowGroupDiagnostics(String, String, Vec<(String, String)>, Option<String>),

/// Show node operations overlay: (hostname, address, is_controlplane)
ShowNodeOperations(String, String, bool),
/// Show rolling operations overlay with node list: Vec<(hostname, address, is_controlplane)>
Expand Down
263 changes: 263 additions & 0 deletions crates/talos-pilot-tui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,68 @@ impl App {
self.multi_logs = Some(multi_logs);
self.view = View::MultiLogs;
}
Action::ShowGroupLogs(group_name, node_role, nodes, services) => {
// Switch to group logs view (multiple nodes)
tracing::info!(
"Viewing group logs for {} ({} nodes)",
group_name,
nodes.len()
);

// Create multi-logs component in group mode
let mut multi_logs = MultiLogsComponent::new_group(
group_name.clone(),
node_role,
nodes.clone(),
services.clone(),
);

// Set up clients for each node and fetch initial logs
if let Some(client) = self.cluster.client() {
// Build per-node clients
let mut node_clients = std::collections::HashMap::new();
for (hostname, ip) in &nodes {
let node_client = client.with_node(ip);
node_clients.insert(hostname.clone(), node_client);
}

// Set the clients for streaming capability
multi_logs.set_multi_node_clients(node_clients.clone(), self.tail_lines);

// Fetch initial logs from all nodes
let mut all_logs: Vec<(String, String, String)> = Vec::new();
let service_refs: Vec<&str> = services.iter().map(|s| s.as_str()).collect();

for (hostname, node_client) in &node_clients {
match node_client.logs_multi(&service_refs, self.tail_lines).await {
Ok(logs) => {
// logs is Vec<(service_id, content)>
for (service_id, content) in logs {
all_logs.push((hostname.clone(), service_id, content));
}
}
Err(e) => {
tracing::warn!("Failed to fetch logs from {}: {}", hostname, e);
}
}
}

if !all_logs.is_empty() {
multi_logs.set_group_logs(all_logs);
// Auto-start streaming for live updates
multi_logs.start_streaming();
} else {
multi_logs.set_error(format!(
"Failed to fetch logs from {} nodes in {}",
nodes.len(),
group_name
));
}
}

self.multi_logs = Some(multi_logs);
self.view = View::MultiLogs;
}
Action::ShowNodeDetails(_, _) => {
// Legacy - no longer used, we use ShowMultiLogs now
}
Expand Down Expand Up @@ -1194,6 +1256,207 @@ impl App {
self.node_operations = Some(node_ops);
self.view = View::NodeOperations;
}
Action::ShowGroupProcesses(group_name, nodes) => {
// Switch to group processes view (multiple nodes)
tracing::info!(
"Viewing group processes for {} ({} nodes)",
group_name,
nodes.len()
);

// Create processes component in group mode
let mut processes =
ProcessesComponent::new_group(group_name.clone(), nodes.clone());

// Fetch processes from all nodes
if let Some(client) = self.cluster.client() {
// Set the base cluster client for group refresh capability
processes.set_client(client.clone());

for (hostname, ip) in &nodes {
let node_client = client.with_node(ip);
match node_client.processes().await {
Ok(node_processes_list) => {
// The API returns Vec<NodeProcesses>, extract processes from first (should only be one)
for np in node_processes_list {
processes.add_node_processes(hostname.clone(), np.processes);
}
}
Err(e) => {
tracing::warn!(
"Failed to fetch processes from {}: {}",
hostname,
e
);
}
}
}
}

self.processes = Some(processes);
self.view = View::Processes;
}
Action::ShowGroupNetwork(group_name, nodes) => {
// Switch to group network view (multiple nodes)
tracing::info!(
"Viewing group network for {} ({} nodes)",
group_name,
nodes.len()
);

// Create network component in group mode
let mut network =
NetworkStatsComponent::new_group(group_name.clone(), nodes.clone());

// Fetch network stats from all nodes
if let Some(client) = self.cluster.client() {
// Set the base cluster client for group refresh capability
network.set_client(client.clone());

for (hostname, ip) in &nodes {
let node_client = client.with_node(ip);
match node_client.network_device_stats().await {
Ok(node_stats_list) => {
// The API returns Vec<NodeNetworkStats>, extract devices from each
for ns in node_stats_list {
network.add_node_network(hostname.clone(), ns.devices);
}
}
Err(e) => {
tracing::warn!(
"Failed to fetch network stats from {}: {}",
hostname,
e
);
}
}
}
}

self.network = Some(network);
self.view = View::Network;
}
Action::ShowGroupStorage(group_name, nodes) => {
// Switch to group storage view (multiple nodes)
tracing::info!(
"Viewing group storage for {} ({} nodes)",
group_name,
nodes.len()
);

// Create storage component in group mode
let mut storage = StorageComponent::new_group(group_name.clone(), nodes.clone());

// Get context and config from cluster component
let context = self.cluster.current_context_name().map(|s| s.to_string());
let config_path = self.cluster.config_path().map(|s| s.to_string());

// Set context and config for group refresh capability
storage.set_context(context.clone(), config_path.clone());

// Fetch storage info from all nodes using talosctl
for (hostname, ip) in &nodes {
// Extract IP without port
let node_ip = ip.split(':').next().unwrap_or(ip);
if let Some(ctx) = &context {
// Fetch disks
match talos_rs::get_disks_for_node(ctx, node_ip, config_path.as_deref())
.await
{
Ok(disks) => {
// Fetch volumes
match talos_rs::get_volume_status_for_node(
ctx,
node_ip,
config_path.as_deref(),
)
.await
{
Ok(volumes) => {
storage.add_node_storage(hostname.clone(), disks, volumes);
}
Err(e) => {
tracing::warn!(
"Failed to fetch volumes from {}: {}",
hostname,
e
);
storage.add_node_storage(
hostname.clone(),
disks,
Vec::new(),
);
}
}
}
Err(e) => {
tracing::warn!("Failed to fetch disks from {}: {}", hostname, e);
}
}
}
}

self.storage = Some(storage);
self.view = View::Storage;
}
Action::ShowGroupDiagnostics(group_name, node_role, nodes, cp_endpoint) => {
// Switch to group diagnostics view (multiple nodes)
tracing::info!(
"Viewing group diagnostics for {} ({} nodes, role={})",
group_name,
nodes.len(),
node_role
);

// Create diagnostics component in group mode
let mut diagnostics = DiagnosticsComponent::new_group(
group_name.clone(),
node_role.clone(),
nodes.clone(),
cp_endpoint.clone(),
self.config_path.clone(),
);

// Fetch diagnostics from all nodes
if let Some(client) = self.cluster.client() {
// Set the base cluster client for group refresh capability
diagnostics.set_client(client.clone());

for (hostname, ip) in &nodes {
let node_client = client.with_node(ip);

// Create a temporary single-node diagnostics component to run checks
let mut temp_diag = DiagnosticsComponent::new(
hostname.clone(),
ip.clone(),
node_role.clone(),
self.config_path.clone(),
);
temp_diag.set_client(node_client);
temp_diag.set_controlplane_endpoint(cp_endpoint.clone());

// Run the diagnostics checks
match temp_diag.refresh().await {
Ok(_) => {
// Extract the data and add it to the group component
if let Some(data) = temp_diag.take_data() {
diagnostics.add_node_diagnostics(hostname.clone(), data);
}
}
Err(e) => {
tracing::warn!(
"Failed to fetch diagnostics from {}: {}",
hostname,
e
);
}
}
}
}

self.diagnostics = Some(diagnostics);
self.view = View::Diagnostics;
}
Action::ShowRollingOperations(nodes) => {
// Show rolling operations overlay
tracing::info!("Viewing rolling operations for {} nodes", nodes.len());
Expand Down
Loading