Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 35 additions & 5 deletions crates/talos-pilot-tui/src/components/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use ratatui::{
use std::collections::HashMap;
use talos_rs::{
DiscoveryMember, EtcdMemberInfo, MemInfo, NodeCpuInfo, NodeLoadAvg, NodeMemory, NodeServices,
ServiceInfo, TalosClient, TalosConfig, VersionInfo, get_discovery_members_for_context,
ServiceInfo, TalosClient, TalosConfig, VersionInfo, get_discovery_members_with_retry,
};

/// Simple etcd status for header display
Expand Down Expand Up @@ -463,9 +463,21 @@ impl ClusterComponent {
}

// Try to get discovery members (ALL nodes including workers)
// Use context-aware async function to avoid blocking and to use correct certificates
// Use context-aware async function with retry to avoid blocking and to use correct certificates
// Pass etcd member IPs as fallback in case VIP-based discovery fails
let context_name = cluster.name.clone();
match get_discovery_members_for_context(&context_name, self.config_path.as_deref()).await {
let fallback_ips: Vec<String> = cluster
.etcd_members
.iter()
.filter_map(|m| m.ip_address())
.collect();
match get_discovery_members_with_retry(
&context_name,
self.config_path.as_deref(),
&fallback_ips,
)
.await
{
Ok(members) => {
cluster.node_ips.clear();
for member in &members {
Expand All @@ -477,11 +489,11 @@ impl ClusterComponent {
}
Err(e) => {
tracing::warn!(
"Failed to fetch discovery members for {}: {}",
"Failed to fetch discovery members for {} after retries: {} (using cached data)",
cluster.name,
e
);
cluster.discovery_members.clear();
// DO NOT clear - preserve existing data for resilience
}
}

Expand Down Expand Up @@ -516,7 +528,25 @@ impl ClusterComponent {
})
})
.collect()
} else if !cluster.versions.is_empty() {
// Tertiary fallback: use nodes from previous version queries
tracing::debug!(
"Using version info as fallback node source for {}",
cluster.name
);
cluster
.versions
.iter()
.map(|v| {
let ip = v.node.split(':').next().unwrap_or(&v.node).to_string();
(v.node.clone(), ip)
})
.collect()
} else {
tracing::warn!(
"No node sources available for {}, skipping queries",
cluster.name
);
Vec::new()
};

Expand Down
23 changes: 18 additions & 5 deletions crates/talos-pilot-tui/src/components/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::time::Duration;
use talos_pilot_core::{AsyncState, HasHealth, HealthIndicator, SelectableList};
use talos_rs::{
DiscoveryMember, NodeTimeInfo, TalosClient, TalosConfig, VersionInfo,
get_discovery_members_for_context,
get_discovery_members_with_retry,
};

/// Auto-refresh interval in seconds
Expand Down Expand Up @@ -284,15 +284,28 @@ impl LifecycleComponent {
};

if !context_name.is_empty() {
match get_discovery_members_for_context(&context_name, self.config_path.as_deref())
.await
// Use version info nodes as fallback in case VIP-based discovery fails
let fallback_ips: Vec<String> = data
.versions
.iter()
.map(|v| v.node.split(':').next().unwrap_or(&v.node).to_string())
.collect();
match get_discovery_members_with_retry(
&context_name,
self.config_path.as_deref(),
&fallback_ips,
)
.await
{
Ok(members) => {
data.discovery_members = members;
}
Err(e) => {
tracing::debug!("Failed to get discovery members: {}", e);
data.discovery_members.clear();
tracing::debug!(
"Failed to get discovery members after retries: {} (preserving cached)",
e
);
// DO NOT clear - preserve existing data
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/talos-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ tar = "0.4"
# Logging
tracing.workspace = true

# Random (for fallback node shuffling)
fastrand = "2"

[build-dependencies]
tonic-build = "0.12"

Expand Down
6 changes: 3 additions & 3 deletions crates/talos-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ pub use talosctl::{
AddressStatus, DiscoveryMember, DiskInfo, GenConfigResult, InsecureApplyResult,
InsecureVersionInfo, KubeSpanPeerStatus, MachineConfigInfo, VolumeStatus,
apply_config_insecure, check_insecure_connection, gen_config, get_address_status,
get_discovery_members, get_discovery_members_for_context, get_disks, get_disks_for_context,
get_disks_for_node, get_disks_insecure, get_kubespan_peers, get_machine_config,
get_version_insecure, get_volume_status, get_volume_status_for_node,
get_discovery_members, get_discovery_members_for_context, get_discovery_members_with_retry,
get_disks, get_disks_for_context, get_disks_for_node, get_disks_insecure, get_kubespan_peers,
get_machine_config, get_version_insecure, get_volume_status, get_volume_status_for_node,
get_volume_status_insecure, is_kubespan_enabled, reboot_insecure, shutdown_insecure,
};
199 changes: 199 additions & 0 deletions crates/talos-rs/src/talosctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,95 @@ pub async fn get_discovery_members_for_context(
parse_discovery_members_yaml(&output)
}

/// Get discovery members for a specific node IP using context certificates (async).
///
/// This allows querying a specific control plane node directly instead of going through the VIP.
async fn get_discovery_members_for_node_async(
context: &str,
node_ip: &str,
) -> Result<Vec<DiscoveryMember>, TalosError> {
let output = exec_talosctl_async(&[
"--context",
context,
"-n",
node_ip,
"get",
"members",
"-o",
"yaml",
])
.await?;
parse_discovery_members_yaml(&output)
}

/// Get discovery members with automatic retry and fallback to specific nodes.
///
/// First tries the VIP endpoint (via context), then falls back to querying
/// individual control plane nodes directly if the VIP fails.
///
/// This handles transient "no request forwarding" errors that occur when
/// the VIP routes to a node that can't forward the request.
pub async fn get_discovery_members_with_retry(
context: &str,
config_path: Option<&str>,
fallback_node_ips: &[String],
) -> Result<Vec<DiscoveryMember>, TalosError> {
// First, try the VIP-based approach with retries
// VIP_MAX_RETRIES=2 means 3 total attempts (initial + 2 retries)
const VIP_MAX_RETRIES: u32 = 2;
const BASE_DELAY_MS: u64 = 100;

let mut last_error = None;

for attempt in 0..=VIP_MAX_RETRIES {
match get_discovery_members_for_context(context, config_path).await {
Ok(members) => return Ok(members),
Err(e) => {
last_error = Some(e);
if attempt < VIP_MAX_RETRIES {
let delay_ms = BASE_DELAY_MS * (1 << attempt);
tracing::debug!(
"Discovery fetch via VIP attempt {} failed, retrying in {}ms",
attempt + 1,
delay_ms
);
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
}
}
}
}

// VIP failed, try fallback nodes directly
// Shuffle to distribute load and avoid repeatedly hitting a problematic node first
if !fallback_node_ips.is_empty() {
tracing::debug!(
"VIP-based discovery failed, trying {} fallback nodes directly",
fallback_node_ips.len()
);

let mut shuffled_ips: Vec<&String> = fallback_node_ips.iter().collect();
fastrand::shuffle(&mut shuffled_ips);

for node_ip in shuffled_ips {
match get_discovery_members_for_node_async(context, node_ip).await {
Ok(members) => {
tracing::debug!(
"Successfully fetched discovery members from fallback node {}",
node_ip
);
return Ok(members);
}
Err(e) => {
tracing::debug!("Fallback node {} failed: {}", node_ip, e);
last_error = Some(e);
}
}
}
}

Err(last_error.unwrap_or_else(|| TalosError::NoEndpoints(context.to_string())))
}

/// Get address status for a node (for VIP detection)
///
/// Executes: talosctl get addressstatus --nodes <node> -o yaml
Expand Down Expand Up @@ -1210,4 +1299,114 @@ spec:
assert_eq!(disks[1].transport, Some("nvme".to_string()));
assert!(!disks[1].rotational);
}

#[test]
fn test_parse_discovery_members() {
let yaml = r#"
node: 192.168.9.11
metadata:
namespace: cluster
type: Members.cluster.talos.dev
id: 3xKYjp
version: 1
owner: cluster.DiscoveryService
phase: running
spec:
nodeId: 3xKYjp
addresses:
- 192.168.9.11
hostname: cp-1
machineType: controlplane
operatingSystem: Talos (v1.9.2)
---
node: 192.168.9.11
metadata:
namespace: cluster
type: Members.cluster.talos.dev
id: 4yLZkq
version: 1
owner: cluster.DiscoveryService
phase: running
spec:
nodeId: 4yLZkq
addresses:
- 192.168.9.21
- 10.244.0.1
hostname: worker-1
machineType: worker
operatingSystem: Talos (v1.9.2)
"#;

let members = parse_discovery_members_yaml(yaml).unwrap();
assert_eq!(members.len(), 2);

// Control plane node
assert_eq!(members[0].id, "3xKYjp");
assert_eq!(members[0].hostname, "cp-1");
assert_eq!(members[0].machine_type, "controlplane");
assert_eq!(members[0].addresses, vec!["192.168.9.11"]);
assert!(members[0].operating_system.contains("Talos"));

// Worker node with multiple addresses
assert_eq!(members[1].id, "4yLZkq");
assert_eq!(members[1].hostname, "worker-1");
assert_eq!(members[1].machine_type, "worker");
assert_eq!(members[1].addresses.len(), 2);
assert!(members[1].addresses.contains(&"192.168.9.21".to_string()));
}

#[test]
fn test_parse_discovery_members_empty() {
let yaml = "";
let members = parse_discovery_members_yaml(yaml).unwrap();
assert!(members.is_empty());
}

#[test]
fn test_parse_discovery_members_invalid_yaml() {
// Should skip invalid documents and not panic
let yaml = r#"
not valid yaml: [
---
node: 192.168.9.11
metadata:
namespace: cluster
type: Members.cluster.talos.dev
id: valid
spec:
hostname: valid-host
machineType: controlplane
"#;
let members = parse_discovery_members_yaml(yaml).unwrap();
// Should have parsed the valid document
assert_eq!(members.len(), 1);
assert_eq!(members[0].id, "valid");
}

#[test]
fn test_shuffle_fallback_ips_does_not_panic() {
// Test that shuffle works correctly on various inputs
let empty: Vec<String> = vec![];
let mut shuffled: Vec<&String> = empty.iter().collect();
fastrand::shuffle(&mut shuffled);
assert!(shuffled.is_empty());

let single = ["192.168.1.1".to_string()];
let mut shuffled: Vec<&String> = single.iter().collect();
fastrand::shuffle(&mut shuffled);
assert_eq!(shuffled.len(), 1);

let multiple = vec![
"192.168.1.1".to_string(),
"192.168.1.2".to_string(),
"192.168.1.3".to_string(),
];
let mut shuffled: Vec<&String> = multiple.iter().collect();
fastrand::shuffle(&mut shuffled);
assert_eq!(shuffled.len(), 3);
// All original elements should still be present
for ip in &multiple {
assert!(shuffled.contains(&ip));
}
}
}