From 6afa8aa3823b027cea61a0d50281b322cabe33e7 Mon Sep 17 00:00:00 2001 From: perfectra1n Date: Sun, 18 Jan 2026 11:42:03 -0800 Subject: [PATCH 1/3] fix: add retry logic and preserve cached data for discovery member fetch --- .../talos-pilot-tui/src/components/cluster.rs | 28 +++++++++++++--- .../src/components/lifecycle.rs | 12 ++++--- crates/talos-rs/src/lib.rs | 6 ++-- crates/talos-rs/src/talosctl.rs | 32 +++++++++++++++++++ 4 files changed, 65 insertions(+), 13 deletions(-) diff --git a/crates/talos-pilot-tui/src/components/cluster.rs b/crates/talos-pilot-tui/src/components/cluster.rs index 17db137..6912395 100644 --- a/crates/talos-pilot-tui/src/components/cluster.rs +++ b/crates/talos-pilot-tui/src/components/cluster.rs @@ -14,7 +14,7 @@ use ratatui::{ use std::collections::HashMap; use talos_rs::{ DiscoveryMember, EtcdMemberInfo, MemInfo, NodeCpuInfo, NodeLoadAvg, NodeMemory, NodeServices, - ServiceInfo, TalosClient, TalosConfig, VersionInfo, get_discovery_members_for_context, + ServiceInfo, TalosClient, TalosConfig, VersionInfo, get_discovery_members_with_retry, }; /// Simple etcd status for header display @@ -463,9 +463,9 @@ impl ClusterComponent { } // Try to get discovery members (ALL nodes including workers) - // Use context-aware async function to avoid blocking and to use correct certificates + // Use context-aware async function with retry to avoid blocking and to use correct certificates let context_name = cluster.name.clone(); - match get_discovery_members_for_context(&context_name, self.config_path.as_deref()).await { + match get_discovery_members_with_retry(&context_name, self.config_path.as_deref()).await { Ok(members) => { cluster.node_ips.clear(); for member in &members { @@ -477,11 +477,11 @@ impl ClusterComponent { } Err(e) => { tracing::warn!( - "Failed to fetch discovery members for {}: {}", + "Failed to fetch discovery members for {} after retries: {} (using cached data)", cluster.name, e ); - cluster.discovery_members.clear(); + // DO NOT clear - preserve existing data for resilience } } @@ -516,7 +516,25 @@ impl ClusterComponent { }) }) .collect() + } else if !cluster.versions.is_empty() { + // Tertiary fallback: use nodes from previous version queries + tracing::debug!( + "Using version info as fallback node source for {}", + cluster.name + ); + cluster + .versions + .iter() + .map(|v| { + let ip = v.node.split(':').next().unwrap_or(&v.node).to_string(); + (v.node.clone(), ip) + }) + .collect() } else { + tracing::warn!( + "No node sources available for {}, skipping queries", + cluster.name + ); Vec::new() }; diff --git a/crates/talos-pilot-tui/src/components/lifecycle.rs b/crates/talos-pilot-tui/src/components/lifecycle.rs index db8bf14..af64bb3 100644 --- a/crates/talos-pilot-tui/src/components/lifecycle.rs +++ b/crates/talos-pilot-tui/src/components/lifecycle.rs @@ -23,7 +23,7 @@ use std::time::Duration; use talos_pilot_core::{AsyncState, HasHealth, HealthIndicator, SelectableList}; use talos_rs::{ DiscoveryMember, NodeTimeInfo, TalosClient, TalosConfig, VersionInfo, - get_discovery_members_for_context, + get_discovery_members_with_retry, }; /// Auto-refresh interval in seconds @@ -284,15 +284,17 @@ impl LifecycleComponent { }; if !context_name.is_empty() { - match get_discovery_members_for_context(&context_name, self.config_path.as_deref()) - .await + match get_discovery_members_with_retry(&context_name, self.config_path.as_deref()).await { Ok(members) => { data.discovery_members = members; } Err(e) => { - tracing::debug!("Failed to get discovery members: {}", e); - data.discovery_members.clear(); + tracing::debug!( + "Failed to get discovery members after retries: {} (preserving cached)", + e + ); + // DO NOT clear - preserve existing data } } } diff --git a/crates/talos-rs/src/lib.rs b/crates/talos-rs/src/lib.rs index b614c02..2b8c0c1 100644 --- a/crates/talos-rs/src/lib.rs +++ b/crates/talos-rs/src/lib.rs @@ -110,8 +110,8 @@ pub use talosctl::{ AddressStatus, DiscoveryMember, DiskInfo, GenConfigResult, InsecureApplyResult, InsecureVersionInfo, KubeSpanPeerStatus, MachineConfigInfo, VolumeStatus, apply_config_insecure, check_insecure_connection, gen_config, get_address_status, - get_discovery_members, get_discovery_members_for_context, get_disks, get_disks_for_context, - get_disks_for_node, get_disks_insecure, get_kubespan_peers, get_machine_config, - get_version_insecure, get_volume_status, get_volume_status_for_node, + get_discovery_members, get_discovery_members_for_context, get_discovery_members_with_retry, + get_disks, get_disks_for_context, get_disks_for_node, get_disks_insecure, get_kubespan_peers, + get_machine_config, get_version_insecure, get_volume_status, get_volume_status_for_node, get_volume_status_insecure, is_kubespan_enabled, reboot_insecure, shutdown_insecure, }; diff --git a/crates/talos-rs/src/talosctl.rs b/crates/talos-rs/src/talosctl.rs index a928791..ae116cc 100644 --- a/crates/talos-rs/src/talosctl.rs +++ b/crates/talos-rs/src/talosctl.rs @@ -547,6 +547,38 @@ pub async fn get_discovery_members_for_context( parse_discovery_members_yaml(&output) } +/// Get discovery members with automatic retry on transient failures. +/// Retries up to 3 times with exponential backoff (100ms, 200ms, 400ms). +pub async fn get_discovery_members_with_retry( + context: &str, + config_path: Option<&str>, +) -> Result, TalosError> { + const MAX_RETRIES: u32 = 3; + const BASE_DELAY_MS: u64 = 100; + + let mut last_error = None; + + for attempt in 0..=MAX_RETRIES { + match get_discovery_members_for_context(context, config_path).await { + Ok(members) => return Ok(members), + Err(e) => { + last_error = Some(e); + if attempt < MAX_RETRIES { + let delay_ms = BASE_DELAY_MS * (1 << attempt); + tracing::debug!( + "Discovery fetch attempt {} failed, retrying in {}ms", + attempt + 1, + delay_ms + ); + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + } + } + } + } + + Err(last_error.unwrap()) +} + /// Get address status for a node (for VIP detection) /// /// Executes: talosctl get addressstatus --nodes -o yaml From 63f0839ea7dea021e834bc67c7b04b30a5df1978 Mon Sep 17 00:00:00 2001 From: perfectra1n Date: Sun, 18 Jan 2026 12:14:01 -0800 Subject: [PATCH 2/3] fix: add direct node fallback for discovery member fetch --- .../talos-pilot-tui/src/components/cluster.rs | 8 ++- .../src/components/lifecycle.rs | 8 ++- crates/talos-rs/src/talosctl.rs | 61 +++++++++++++++++-- 3 files changed, 69 insertions(+), 8 deletions(-) diff --git a/crates/talos-pilot-tui/src/components/cluster.rs b/crates/talos-pilot-tui/src/components/cluster.rs index 6912395..b743e57 100644 --- a/crates/talos-pilot-tui/src/components/cluster.rs +++ b/crates/talos-pilot-tui/src/components/cluster.rs @@ -464,8 +464,14 @@ impl ClusterComponent { // Try to get discovery members (ALL nodes including workers) // Use context-aware async function with retry to avoid blocking and to use correct certificates + // Pass etcd member IPs as fallback in case VIP-based discovery fails let context_name = cluster.name.clone(); - match get_discovery_members_with_retry(&context_name, self.config_path.as_deref()).await { + let fallback_ips: Vec = cluster + .etcd_members + .iter() + .filter_map(|m| m.ip_address()) + .collect(); + match get_discovery_members_with_retry(&context_name, self.config_path.as_deref(), &fallback_ips).await { Ok(members) => { cluster.node_ips.clear(); for member in &members { diff --git a/crates/talos-pilot-tui/src/components/lifecycle.rs b/crates/talos-pilot-tui/src/components/lifecycle.rs index af64bb3..072f7de 100644 --- a/crates/talos-pilot-tui/src/components/lifecycle.rs +++ b/crates/talos-pilot-tui/src/components/lifecycle.rs @@ -284,7 +284,13 @@ impl LifecycleComponent { }; if !context_name.is_empty() { - match get_discovery_members_with_retry(&context_name, self.config_path.as_deref()).await + // Use version info nodes as fallback in case VIP-based discovery fails + let fallback_ips: Vec = data + .versions + .iter() + .map(|v| v.node.split(':').next().unwrap_or(&v.node).to_string()) + .collect(); + match get_discovery_members_with_retry(&context_name, self.config_path.as_deref(), &fallback_ips).await { Ok(members) => { data.discovery_members = members; diff --git a/crates/talos-rs/src/talosctl.rs b/crates/talos-rs/src/talosctl.rs index ae116cc..497757d 100644 --- a/crates/talos-rs/src/talosctl.rs +++ b/crates/talos-rs/src/talosctl.rs @@ -547,26 +547,54 @@ pub async fn get_discovery_members_for_context( parse_discovery_members_yaml(&output) } -/// Get discovery members with automatic retry on transient failures. -/// Retries up to 3 times with exponential backoff (100ms, 200ms, 400ms). +/// Get discovery members for a specific node IP using context certificates (async). +/// +/// This allows querying a specific control plane node directly instead of going through the VIP. +async fn get_discovery_members_for_node_async( + context: &str, + node_ip: &str, +) -> Result, TalosError> { + let output = exec_talosctl_async(&[ + "--context", + context, + "-n", + node_ip, + "get", + "members", + "-o", + "yaml", + ]) + .await?; + parse_discovery_members_yaml(&output) +} + +/// Get discovery members with automatic retry and fallback to specific nodes. +/// +/// First tries the VIP endpoint (via context), then falls back to querying +/// individual control plane nodes directly if the VIP fails. +/// +/// This handles transient "no request forwarding" errors that occur when +/// the VIP routes to a node that can't forward the request. pub async fn get_discovery_members_with_retry( context: &str, config_path: Option<&str>, + fallback_node_ips: &[String], ) -> Result, TalosError> { - const MAX_RETRIES: u32 = 3; + // First, try the VIP-based approach with a couple retries + const VIP_RETRIES: u32 = 2; const BASE_DELAY_MS: u64 = 100; let mut last_error = None; - for attempt in 0..=MAX_RETRIES { + for attempt in 0..=VIP_RETRIES { match get_discovery_members_for_context(context, config_path).await { Ok(members) => return Ok(members), Err(e) => { last_error = Some(e); - if attempt < MAX_RETRIES { + if attempt < VIP_RETRIES { let delay_ms = BASE_DELAY_MS * (1 << attempt); tracing::debug!( - "Discovery fetch attempt {} failed, retrying in {}ms", + "Discovery fetch via VIP attempt {} failed, retrying in {}ms", attempt + 1, delay_ms ); @@ -576,6 +604,27 @@ pub async fn get_discovery_members_with_retry( } } + // VIP failed, try fallback nodes directly + if !fallback_node_ips.is_empty() { + tracing::debug!( + "VIP-based discovery failed, trying {} fallback nodes directly", + fallback_node_ips.len() + ); + + for node_ip in fallback_node_ips { + match get_discovery_members_for_node_async(context, node_ip).await { + Ok(members) => { + tracing::debug!("Successfully fetched discovery members from fallback node {}", node_ip); + return Ok(members); + } + Err(e) => { + tracing::debug!("Fallback node {} failed: {}", node_ip, e); + last_error = Some(e); + } + } + } + } + Err(last_error.unwrap()) } From 5df74ac7390c7fe568bb03e0ebb9080868a9ba91 Mon Sep 17 00:00:00 2001 From: Kenny Udovic Date: Tue, 20 Jan 2026 08:59:18 -0500 Subject: [PATCH 3/3] fix: improve discovery retry robustness and add tests - Replace unwrap() with unwrap_or_else to prevent panic - Shuffle fallback nodes to distribute load - Add unit tests for discovery member parsing - Fix formatting Small improvements to fix by: perfectra1n --- Cargo.lock | 1 + .../talos-pilot-tui/src/components/cluster.rs | 8 +- .../src/components/lifecycle.rs | 7 +- crates/talos-rs/Cargo.toml | 3 + crates/talos-rs/src/talosctl.rs | 132 +++++++++++++++++- 5 files changed, 142 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4377c37..ea33d39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2833,6 +2833,7 @@ version = "0.1.7" dependencies = [ "base64", "dirs-next", + "fastrand", "flate2", "futures", "prost", diff --git a/crates/talos-pilot-tui/src/components/cluster.rs b/crates/talos-pilot-tui/src/components/cluster.rs index b743e57..0421032 100644 --- a/crates/talos-pilot-tui/src/components/cluster.rs +++ b/crates/talos-pilot-tui/src/components/cluster.rs @@ -471,7 +471,13 @@ impl ClusterComponent { .iter() .filter_map(|m| m.ip_address()) .collect(); - match get_discovery_members_with_retry(&context_name, self.config_path.as_deref(), &fallback_ips).await { + match get_discovery_members_with_retry( + &context_name, + self.config_path.as_deref(), + &fallback_ips, + ) + .await + { Ok(members) => { cluster.node_ips.clear(); for member in &members { diff --git a/crates/talos-pilot-tui/src/components/lifecycle.rs b/crates/talos-pilot-tui/src/components/lifecycle.rs index 072f7de..603e04a 100644 --- a/crates/talos-pilot-tui/src/components/lifecycle.rs +++ b/crates/talos-pilot-tui/src/components/lifecycle.rs @@ -290,7 +290,12 @@ impl LifecycleComponent { .iter() .map(|v| v.node.split(':').next().unwrap_or(&v.node).to_string()) .collect(); - match get_discovery_members_with_retry(&context_name, self.config_path.as_deref(), &fallback_ips).await + match get_discovery_members_with_retry( + &context_name, + self.config_path.as_deref(), + &fallback_ips, + ) + .await { Ok(members) => { data.discovery_members = members; diff --git a/crates/talos-rs/Cargo.toml b/crates/talos-rs/Cargo.toml index ab0048b..bcb9ee2 100644 --- a/crates/talos-rs/Cargo.toml +++ b/crates/talos-rs/Cargo.toml @@ -41,6 +41,9 @@ tar = "0.4" # Logging tracing.workspace = true +# Random (for fallback node shuffling) +fastrand = "2" + [build-dependencies] tonic-build = "0.12" diff --git a/crates/talos-rs/src/talosctl.rs b/crates/talos-rs/src/talosctl.rs index 497757d..a41dc92 100644 --- a/crates/talos-rs/src/talosctl.rs +++ b/crates/talos-rs/src/talosctl.rs @@ -580,18 +580,19 @@ pub async fn get_discovery_members_with_retry( config_path: Option<&str>, fallback_node_ips: &[String], ) -> Result, TalosError> { - // First, try the VIP-based approach with a couple retries - const VIP_RETRIES: u32 = 2; + // First, try the VIP-based approach with retries + // VIP_MAX_RETRIES=2 means 3 total attempts (initial + 2 retries) + const VIP_MAX_RETRIES: u32 = 2; const BASE_DELAY_MS: u64 = 100; let mut last_error = None; - for attempt in 0..=VIP_RETRIES { + for attempt in 0..=VIP_MAX_RETRIES { match get_discovery_members_for_context(context, config_path).await { Ok(members) => return Ok(members), Err(e) => { last_error = Some(e); - if attempt < VIP_RETRIES { + if attempt < VIP_MAX_RETRIES { let delay_ms = BASE_DELAY_MS * (1 << attempt); tracing::debug!( "Discovery fetch via VIP attempt {} failed, retrying in {}ms", @@ -605,16 +606,23 @@ pub async fn get_discovery_members_with_retry( } // VIP failed, try fallback nodes directly + // Shuffle to distribute load and avoid repeatedly hitting a problematic node first if !fallback_node_ips.is_empty() { tracing::debug!( "VIP-based discovery failed, trying {} fallback nodes directly", fallback_node_ips.len() ); - for node_ip in fallback_node_ips { + let mut shuffled_ips: Vec<&String> = fallback_node_ips.iter().collect(); + fastrand::shuffle(&mut shuffled_ips); + + for node_ip in shuffled_ips { match get_discovery_members_for_node_async(context, node_ip).await { Ok(members) => { - tracing::debug!("Successfully fetched discovery members from fallback node {}", node_ip); + tracing::debug!( + "Successfully fetched discovery members from fallback node {}", + node_ip + ); return Ok(members); } Err(e) => { @@ -625,7 +633,7 @@ pub async fn get_discovery_members_with_retry( } } - Err(last_error.unwrap()) + Err(last_error.unwrap_or_else(|| TalosError::NoEndpoints(context.to_string()))) } /// Get address status for a node (for VIP detection) @@ -1291,4 +1299,114 @@ spec: assert_eq!(disks[1].transport, Some("nvme".to_string())); assert!(!disks[1].rotational); } + + #[test] + fn test_parse_discovery_members() { + let yaml = r#" +node: 192.168.9.11 +metadata: + namespace: cluster + type: Members.cluster.talos.dev + id: 3xKYjp + version: 1 + owner: cluster.DiscoveryService + phase: running +spec: + nodeId: 3xKYjp + addresses: + - 192.168.9.11 + hostname: cp-1 + machineType: controlplane + operatingSystem: Talos (v1.9.2) +--- +node: 192.168.9.11 +metadata: + namespace: cluster + type: Members.cluster.talos.dev + id: 4yLZkq + version: 1 + owner: cluster.DiscoveryService + phase: running +spec: + nodeId: 4yLZkq + addresses: + - 192.168.9.21 + - 10.244.0.1 + hostname: worker-1 + machineType: worker + operatingSystem: Talos (v1.9.2) +"#; + + let members = parse_discovery_members_yaml(yaml).unwrap(); + assert_eq!(members.len(), 2); + + // Control plane node + assert_eq!(members[0].id, "3xKYjp"); + assert_eq!(members[0].hostname, "cp-1"); + assert_eq!(members[0].machine_type, "controlplane"); + assert_eq!(members[0].addresses, vec!["192.168.9.11"]); + assert!(members[0].operating_system.contains("Talos")); + + // Worker node with multiple addresses + assert_eq!(members[1].id, "4yLZkq"); + assert_eq!(members[1].hostname, "worker-1"); + assert_eq!(members[1].machine_type, "worker"); + assert_eq!(members[1].addresses.len(), 2); + assert!(members[1].addresses.contains(&"192.168.9.21".to_string())); + } + + #[test] + fn test_parse_discovery_members_empty() { + let yaml = ""; + let members = parse_discovery_members_yaml(yaml).unwrap(); + assert!(members.is_empty()); + } + + #[test] + fn test_parse_discovery_members_invalid_yaml() { + // Should skip invalid documents and not panic + let yaml = r#" +not valid yaml: [ +--- +node: 192.168.9.11 +metadata: + namespace: cluster + type: Members.cluster.talos.dev + id: valid +spec: + hostname: valid-host + machineType: controlplane +"#; + let members = parse_discovery_members_yaml(yaml).unwrap(); + // Should have parsed the valid document + assert_eq!(members.len(), 1); + assert_eq!(members[0].id, "valid"); + } + + #[test] + fn test_shuffle_fallback_ips_does_not_panic() { + // Test that shuffle works correctly on various inputs + let empty: Vec = vec![]; + let mut shuffled: Vec<&String> = empty.iter().collect(); + fastrand::shuffle(&mut shuffled); + assert!(shuffled.is_empty()); + + let single = ["192.168.1.1".to_string()]; + let mut shuffled: Vec<&String> = single.iter().collect(); + fastrand::shuffle(&mut shuffled); + assert_eq!(shuffled.len(), 1); + + let multiple = vec![ + "192.168.1.1".to_string(), + "192.168.1.2".to_string(), + "192.168.1.3".to_string(), + ]; + let mut shuffled: Vec<&String> = multiple.iter().collect(); + fastrand::shuffle(&mut shuffled); + assert_eq!(shuffled.len(), 3); + // All original elements should still be present + for ip in &multiple { + assert!(shuffled.contains(&ip)); + } + } }