diff --git a/Cargo.lock b/Cargo.lock index 4377c37..ea33d39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2833,6 +2833,7 @@ version = "0.1.7" dependencies = [ "base64", "dirs-next", + "fastrand", "flate2", "futures", "prost", diff --git a/crates/talos-pilot-tui/src/components/cluster.rs b/crates/talos-pilot-tui/src/components/cluster.rs index 17db137..0421032 100644 --- a/crates/talos-pilot-tui/src/components/cluster.rs +++ b/crates/talos-pilot-tui/src/components/cluster.rs @@ -14,7 +14,7 @@ use ratatui::{ use std::collections::HashMap; use talos_rs::{ DiscoveryMember, EtcdMemberInfo, MemInfo, NodeCpuInfo, NodeLoadAvg, NodeMemory, NodeServices, - ServiceInfo, TalosClient, TalosConfig, VersionInfo, get_discovery_members_for_context, + ServiceInfo, TalosClient, TalosConfig, VersionInfo, get_discovery_members_with_retry, }; /// Simple etcd status for header display @@ -463,9 +463,21 @@ impl ClusterComponent { } // Try to get discovery members (ALL nodes including workers) - // Use context-aware async function to avoid blocking and to use correct certificates + // Use context-aware async function with retry to avoid blocking and to use correct certificates + // Pass etcd member IPs as fallback in case VIP-based discovery fails let context_name = cluster.name.clone(); - match get_discovery_members_for_context(&context_name, self.config_path.as_deref()).await { + let fallback_ips: Vec = cluster + .etcd_members + .iter() + .filter_map(|m| m.ip_address()) + .collect(); + match get_discovery_members_with_retry( + &context_name, + self.config_path.as_deref(), + &fallback_ips, + ) + .await + { Ok(members) => { cluster.node_ips.clear(); for member in &members { @@ -477,11 +489,11 @@ impl ClusterComponent { } Err(e) => { tracing::warn!( - "Failed to fetch discovery members for {}: {}", + "Failed to fetch discovery members for {} after retries: {} (using cached data)", cluster.name, e ); - cluster.discovery_members.clear(); + // DO NOT clear - preserve existing data for resilience } } @@ -516,7 +528,25 @@ impl ClusterComponent { }) }) .collect() + } else if !cluster.versions.is_empty() { + // Tertiary fallback: use nodes from previous version queries + tracing::debug!( + "Using version info as fallback node source for {}", + cluster.name + ); + cluster + .versions + .iter() + .map(|v| { + let ip = v.node.split(':').next().unwrap_or(&v.node).to_string(); + (v.node.clone(), ip) + }) + .collect() } else { + tracing::warn!( + "No node sources available for {}, skipping queries", + cluster.name + ); Vec::new() }; diff --git a/crates/talos-pilot-tui/src/components/lifecycle.rs b/crates/talos-pilot-tui/src/components/lifecycle.rs index db8bf14..603e04a 100644 --- a/crates/talos-pilot-tui/src/components/lifecycle.rs +++ b/crates/talos-pilot-tui/src/components/lifecycle.rs @@ -23,7 +23,7 @@ use std::time::Duration; use talos_pilot_core::{AsyncState, HasHealth, HealthIndicator, SelectableList}; use talos_rs::{ DiscoveryMember, NodeTimeInfo, TalosClient, TalosConfig, VersionInfo, - get_discovery_members_for_context, + get_discovery_members_with_retry, }; /// Auto-refresh interval in seconds @@ -284,15 +284,28 @@ impl LifecycleComponent { }; if !context_name.is_empty() { - match get_discovery_members_for_context(&context_name, self.config_path.as_deref()) - .await + // Use version info nodes as fallback in case VIP-based discovery fails + let fallback_ips: Vec = data + .versions + .iter() + .map(|v| v.node.split(':').next().unwrap_or(&v.node).to_string()) + .collect(); + match get_discovery_members_with_retry( + &context_name, + self.config_path.as_deref(), + &fallback_ips, + ) + .await { Ok(members) => { data.discovery_members = members; } Err(e) => { - tracing::debug!("Failed to get discovery members: {}", e); - data.discovery_members.clear(); + tracing::debug!( + "Failed to get discovery members after retries: {} (preserving cached)", + e + ); + // DO NOT clear - preserve existing data } } } diff --git a/crates/talos-rs/Cargo.toml b/crates/talos-rs/Cargo.toml index ab0048b..bcb9ee2 100644 --- a/crates/talos-rs/Cargo.toml +++ b/crates/talos-rs/Cargo.toml @@ -41,6 +41,9 @@ tar = "0.4" # Logging tracing.workspace = true +# Random (for fallback node shuffling) +fastrand = "2" + [build-dependencies] tonic-build = "0.12" diff --git a/crates/talos-rs/src/lib.rs b/crates/talos-rs/src/lib.rs index b614c02..2b8c0c1 100644 --- a/crates/talos-rs/src/lib.rs +++ b/crates/talos-rs/src/lib.rs @@ -110,8 +110,8 @@ pub use talosctl::{ AddressStatus, DiscoveryMember, DiskInfo, GenConfigResult, InsecureApplyResult, InsecureVersionInfo, KubeSpanPeerStatus, MachineConfigInfo, VolumeStatus, apply_config_insecure, check_insecure_connection, gen_config, get_address_status, - get_discovery_members, get_discovery_members_for_context, get_disks, get_disks_for_context, - get_disks_for_node, get_disks_insecure, get_kubespan_peers, get_machine_config, - get_version_insecure, get_volume_status, get_volume_status_for_node, + get_discovery_members, get_discovery_members_for_context, get_discovery_members_with_retry, + get_disks, get_disks_for_context, get_disks_for_node, get_disks_insecure, get_kubespan_peers, + get_machine_config, get_version_insecure, get_volume_status, get_volume_status_for_node, get_volume_status_insecure, is_kubespan_enabled, reboot_insecure, shutdown_insecure, }; diff --git a/crates/talos-rs/src/talosctl.rs b/crates/talos-rs/src/talosctl.rs index a928791..a41dc92 100644 --- a/crates/talos-rs/src/talosctl.rs +++ b/crates/talos-rs/src/talosctl.rs @@ -547,6 +547,95 @@ pub async fn get_discovery_members_for_context( parse_discovery_members_yaml(&output) } +/// Get discovery members for a specific node IP using context certificates (async). +/// +/// This allows querying a specific control plane node directly instead of going through the VIP. +async fn get_discovery_members_for_node_async( + context: &str, + node_ip: &str, +) -> Result, TalosError> { + let output = exec_talosctl_async(&[ + "--context", + context, + "-n", + node_ip, + "get", + "members", + "-o", + "yaml", + ]) + .await?; + parse_discovery_members_yaml(&output) +} + +/// Get discovery members with automatic retry and fallback to specific nodes. +/// +/// First tries the VIP endpoint (via context), then falls back to querying +/// individual control plane nodes directly if the VIP fails. +/// +/// This handles transient "no request forwarding" errors that occur when +/// the VIP routes to a node that can't forward the request. +pub async fn get_discovery_members_with_retry( + context: &str, + config_path: Option<&str>, + fallback_node_ips: &[String], +) -> Result, TalosError> { + // First, try the VIP-based approach with retries + // VIP_MAX_RETRIES=2 means 3 total attempts (initial + 2 retries) + const VIP_MAX_RETRIES: u32 = 2; + const BASE_DELAY_MS: u64 = 100; + + let mut last_error = None; + + for attempt in 0..=VIP_MAX_RETRIES { + match get_discovery_members_for_context(context, config_path).await { + Ok(members) => return Ok(members), + Err(e) => { + last_error = Some(e); + if attempt < VIP_MAX_RETRIES { + let delay_ms = BASE_DELAY_MS * (1 << attempt); + tracing::debug!( + "Discovery fetch via VIP attempt {} failed, retrying in {}ms", + attempt + 1, + delay_ms + ); + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + } + } + } + } + + // VIP failed, try fallback nodes directly + // Shuffle to distribute load and avoid repeatedly hitting a problematic node first + if !fallback_node_ips.is_empty() { + tracing::debug!( + "VIP-based discovery failed, trying {} fallback nodes directly", + fallback_node_ips.len() + ); + + let mut shuffled_ips: Vec<&String> = fallback_node_ips.iter().collect(); + fastrand::shuffle(&mut shuffled_ips); + + for node_ip in shuffled_ips { + match get_discovery_members_for_node_async(context, node_ip).await { + Ok(members) => { + tracing::debug!( + "Successfully fetched discovery members from fallback node {}", + node_ip + ); + return Ok(members); + } + Err(e) => { + tracing::debug!("Fallback node {} failed: {}", node_ip, e); + last_error = Some(e); + } + } + } + } + + Err(last_error.unwrap_or_else(|| TalosError::NoEndpoints(context.to_string()))) +} + /// Get address status for a node (for VIP detection) /// /// Executes: talosctl get addressstatus --nodes -o yaml @@ -1210,4 +1299,114 @@ spec: assert_eq!(disks[1].transport, Some("nvme".to_string())); assert!(!disks[1].rotational); } + + #[test] + fn test_parse_discovery_members() { + let yaml = r#" +node: 192.168.9.11 +metadata: + namespace: cluster + type: Members.cluster.talos.dev + id: 3xKYjp + version: 1 + owner: cluster.DiscoveryService + phase: running +spec: + nodeId: 3xKYjp + addresses: + - 192.168.9.11 + hostname: cp-1 + machineType: controlplane + operatingSystem: Talos (v1.9.2) +--- +node: 192.168.9.11 +metadata: + namespace: cluster + type: Members.cluster.talos.dev + id: 4yLZkq + version: 1 + owner: cluster.DiscoveryService + phase: running +spec: + nodeId: 4yLZkq + addresses: + - 192.168.9.21 + - 10.244.0.1 + hostname: worker-1 + machineType: worker + operatingSystem: Talos (v1.9.2) +"#; + + let members = parse_discovery_members_yaml(yaml).unwrap(); + assert_eq!(members.len(), 2); + + // Control plane node + assert_eq!(members[0].id, "3xKYjp"); + assert_eq!(members[0].hostname, "cp-1"); + assert_eq!(members[0].machine_type, "controlplane"); + assert_eq!(members[0].addresses, vec!["192.168.9.11"]); + assert!(members[0].operating_system.contains("Talos")); + + // Worker node with multiple addresses + assert_eq!(members[1].id, "4yLZkq"); + assert_eq!(members[1].hostname, "worker-1"); + assert_eq!(members[1].machine_type, "worker"); + assert_eq!(members[1].addresses.len(), 2); + assert!(members[1].addresses.contains(&"192.168.9.21".to_string())); + } + + #[test] + fn test_parse_discovery_members_empty() { + let yaml = ""; + let members = parse_discovery_members_yaml(yaml).unwrap(); + assert!(members.is_empty()); + } + + #[test] + fn test_parse_discovery_members_invalid_yaml() { + // Should skip invalid documents and not panic + let yaml = r#" +not valid yaml: [ +--- +node: 192.168.9.11 +metadata: + namespace: cluster + type: Members.cluster.talos.dev + id: valid +spec: + hostname: valid-host + machineType: controlplane +"#; + let members = parse_discovery_members_yaml(yaml).unwrap(); + // Should have parsed the valid document + assert_eq!(members.len(), 1); + assert_eq!(members[0].id, "valid"); + } + + #[test] + fn test_shuffle_fallback_ips_does_not_panic() { + // Test that shuffle works correctly on various inputs + let empty: Vec = vec![]; + let mut shuffled: Vec<&String> = empty.iter().collect(); + fastrand::shuffle(&mut shuffled); + assert!(shuffled.is_empty()); + + let single = ["192.168.1.1".to_string()]; + let mut shuffled: Vec<&String> = single.iter().collect(); + fastrand::shuffle(&mut shuffled); + assert_eq!(shuffled.len(), 1); + + let multiple = vec![ + "192.168.1.1".to_string(), + "192.168.1.2".to_string(), + "192.168.1.3".to_string(), + ]; + let mut shuffled: Vec<&String> = multiple.iter().collect(); + fastrand::shuffle(&mut shuffled); + assert_eq!(shuffled.len(), 3); + // All original elements should still be present + for ip in &multiple { + assert!(shuffled.contains(&ip)); + } + } }