From fa3ec4c3ca1402a5f42ee264c70dc7fbc25ea613 Mon Sep 17 00:00:00 2001 From: Kenny Udovic Date: Sat, 17 Jan 2026 15:17:23 -0500 Subject: [PATCH 1/3] fix: utilize ips instead of hostnames - implement direct connection temporarily Temporary fix: we are creating a lot of clients in order to connect to nodes directly - VIP is causing issues --- crates/talos-pilot-tui/src/app.rs | 32 +- .../talos-pilot-tui/src/components/cluster.rs | 35 +- crates/talos-pilot-tui/src/components/etcd.rs | 26 +- .../src/components/lifecycle.rs | 145 ++- .../src/components/node_operations.rs | 7 +- crates/talos-rs/src/auth.rs | 23 +- crates/talos-rs/src/client.rs | 299 ++++- crates/talos-rs/src/config.rs | 9 + test-clusters/scripts/multi-cp-qemu.sh | 1036 +++++++++++++++++ 9 files changed, 1502 insertions(+), 110 deletions(-) create mode 100755 test-clusters/scripts/multi-cp-qemu.sh diff --git a/crates/talos-pilot-tui/src/app.rs b/crates/talos-pilot-tui/src/app.rs index a954c2f..aa2f9a8 100644 --- a/crates/talos-pilot-tui/src/app.rs +++ b/crates/talos-pilot-tui/src/app.rs @@ -922,7 +922,7 @@ impl App { // Create multi-logs component with all services, marking active ones let mut multi_logs = MultiLogsComponent::new( - node_ip, + node_ip.clone(), node_role, active_services.clone(), all_services, @@ -930,19 +930,27 @@ impl App { // Fetch logs from active services and set up client for streaming if let Some(client) = self.cluster.client() { - // Set the client for streaming capability - multi_logs.set_client(client.clone(), self.tail_lines); - - let service_refs: Vec<&str> = - active_services.iter().map(|s| s.as_str()).collect(); - match client.logs_multi(&service_refs, self.tail_lines).await { - Ok(logs) => { - multi_logs.set_logs(logs); - // Auto-start streaming for live updates - multi_logs.start_streaming(); + // Create a direct connection to the node (bypasses VIP issues) + match client.direct_to_node(&node_ip) { + Ok(node_client) => { + // Set the client for streaming capability + multi_logs.set_client(node_client.clone(), self.tail_lines); + + let service_refs: Vec<&str> = + active_services.iter().map(|s| s.as_str()).collect(); + match node_client.logs_multi(&service_refs, self.tail_lines).await { + Ok(logs) => { + multi_logs.set_logs(logs); + // Auto-start streaming for live updates + multi_logs.start_streaming(); + } + Err(e) => { + multi_logs.set_error(e.to_string()); + } + } } Err(e) => { - multi_logs.set_error(e.to_string()); + multi_logs.set_error(format!("Failed to connect to node: {}", e)); } } } diff --git a/crates/talos-pilot-tui/src/components/cluster.rs b/crates/talos-pilot-tui/src/components/cluster.rs index 8e21db2..17db137 100644 --- a/crates/talos-pilot-tui/src/components/cluster.rs +++ b/crates/talos-pilot-tui/src/components/cluster.rs @@ -573,12 +573,13 @@ impl ClusterComponent { // Fetch etcd status for header summary (target all control planes) if let Some(client) = &cluster.client { - let cp_hostnames: Vec = cluster + // Use IPs instead of hostnames (hostnames may not be resolvable) + let cp_ips: Vec = cluster .etcd_members .iter() - .map(|m| m.hostname.clone()) + .filter_map(|m| m.ip_address()) .collect(); - if let Ok(statuses) = client.etcd_status_for_nodes(&cp_hostnames).await { + if let Ok(statuses) = client.etcd_status_for_nodes(&cp_ips).await { let total = cluster.etcd_members.len(); let healthy = statuses.len(); let quorum_needed = total / 2 + 1; @@ -830,8 +831,13 @@ impl ClusterComponent { let service_ids = self.current_service_ids(); if !service_ids.is_empty() { let node_role = self.current_node_role(); + let node_ip = self + .node_ips() + .get(&node_name) + .cloned() + .unwrap_or(node_name.clone()); Ok(Some(Action::ShowMultiLogs( - node_name, + node_ip, node_role, service_ids.clone(), service_ids, @@ -1009,8 +1015,13 @@ impl Component for ClusterComponent { let service_ids = self.current_service_ids(); if !service_ids.is_empty() { let node_role = self.current_node_role(); + let node_ip = self + .node_ips() + .get(&node_name) + .cloned() + .unwrap_or(node_name.clone()); Ok(Some(Action::ShowMultiLogs( - node_name, + node_ip, node_role, service_ids.clone(), service_ids, @@ -1034,8 +1045,13 @@ impl Component for ClusterComponent { if let Some(service_id) = self.selected_service_id() { let node_role = self.current_node_role(); let all_services = self.current_service_ids(); + let node_ip = self + .node_ips() + .get(&node_name) + .cloned() + .unwrap_or(node_name.clone()); Ok(Some(Action::ShowMultiLogs( - node_name, + node_ip, node_role, vec![service_id], all_services, @@ -1056,8 +1072,13 @@ impl Component for ClusterComponent { let service_ids = self.current_service_ids(); if !service_ids.is_empty() { let node_role = self.current_node_role(); + let node_ip = self + .node_ips() + .get(&node_name) + .cloned() + .unwrap_or(node_name.clone()); Ok(Some(Action::ShowMultiLogs( - node_name, + node_ip, node_role, service_ids.clone(), service_ids, diff --git a/crates/talos-pilot-tui/src/components/etcd.rs b/crates/talos-pilot-tui/src/components/etcd.rs index 1519e35..d6c4a97 100644 --- a/crates/talos-pilot-tui/src/components/etcd.rs +++ b/crates/talos-pilot-tui/src/components/etcd.rs @@ -127,20 +127,14 @@ impl EtcdComponent { } }; - // Step 2: Extract control plane hostnames from members - let cp_hostnames: Vec = member_infos.iter().map(|m| m.hostname.clone()).collect(); + // Step 2: Extract control plane IPs from members (hostnames may not be resolvable) + let cp_ips: Vec = member_infos.iter().filter_map(|m| m.ip_address()).collect(); - tracing::debug!( - "Fetching etcd status from control planes: {:?}", - cp_hostnames - ); + tracing::debug!("Fetching etcd status from control planes: {:?}", cp_ips); - // Step 3: Fetch status (targeting all CPs) and alarms in parallel + // Step 3: Fetch status (targeting all CPs by IP) and alarms in parallel let fetch_result = tokio::time::timeout(timeout, async { - tokio::join!( - client.etcd_status_for_nodes(&cp_hostnames), - client.etcd_alarms() - ) + tokio::join!(client.etcd_status_for_nodes(&cp_ips), client.etcd_alarms()) }) .await; @@ -599,13 +593,13 @@ impl Component for EtcdComponent { if data.members.is_empty() { return Ok(None); } - // Use first member's hostname as the "node" but show all etcd services + // Use first member's IP as the "node" (hostnames may not be resolvable) // In practice, with the current API this shows etcd logs from all connected nodes let node = data .members .items() .first() - .map(|m| m.info.hostname.clone()) + .and_then(|m| m.info.ip_address()) .unwrap_or_else(|| "controlplane".to_string()); let etcd_vec = vec!["etcd".to_string()]; Ok(Some(Action::ShowMultiLogs( @@ -617,10 +611,14 @@ impl Component for EtcdComponent { } KeyCode::Enter => { // View etcd logs for selected member + // Use IP address since hostnames may not be resolvable if let Some(data) = self.data() && let Some(member) = data.members.selected() { - let node = member.info.hostname.clone(); + let node = member + .info + .ip_address() + .unwrap_or_else(|| member.info.hostname.clone()); let etcd_vec = vec!["etcd".to_string()]; return Ok(Some(Action::ShowMultiLogs( node, diff --git a/crates/talos-pilot-tui/src/components/lifecycle.rs b/crates/talos-pilot-tui/src/components/lifecycle.rs index 6d70e65..6725017 100644 --- a/crates/talos-pilot-tui/src/components/lifecycle.rs +++ b/crates/talos-pilot-tui/src/components/lifecycle.rs @@ -223,51 +223,8 @@ impl LifecycleComponent { // Get or create data let mut data = self.state.take_data().unwrap_or_default(); - // Fetch version information - match client.version().await { - Ok(versions) => { - // Get context name from first node - if !versions.is_empty() && data.context_name.is_empty() { - // Try to get from talosconfig - let config_result = match &self.config_path { - Some(path) => { - let path_buf = std::path::PathBuf::from(path); - TalosConfig::load_from(&path_buf) - } - None => TalosConfig::load_default(), - }; - if let Ok(config) = config_result { - data.context_name = config.context; - } - } - - data.versions = versions; - } - Err(e) => { - self.state - .set_error(format!("Failed to fetch versions: {}", e)); - // Re-store the data so far - self.state.set_data(data); - return Ok(()); - } - } - - // Fetch time sync status - match client.time().await { - Ok(times) => { - data.time_info = times; - } - Err(e) => { - // Time fetch failure is not fatal - just log it - tracing::warn!("Failed to fetch time status: {}", e); - data.time_info.clear(); - } - } - - // Fetch discovery members using context-aware async version - let context_name = if !data.context_name.is_empty() { - data.context_name.clone() - } else { + // First, determine context name + if data.context_name.is_empty() { let config_result = match &self.config_path { Some(path) => { let path_buf = std::path::PathBuf::from(path); @@ -275,11 +232,13 @@ impl LifecycleComponent { } None => TalosConfig::load_default(), }; - config_result - .map(|config| config.context) - .unwrap_or_default() - }; + if let Ok(config) = config_result { + data.context_name = config.context.clone(); + } + } + // Fetch discovery members early to get node IPs for direct connections + let context_name = data.context_name.clone(); if !context_name.is_empty() { match get_discovery_members_for_context(&context_name, self.config_path.as_deref()) .await @@ -294,6 +253,87 @@ impl LifecycleComponent { } } + // Extract node IPs from discovery members for direct connections + // Each member has addresses - use the first one as the node IP + let node_ips: Vec = data + .discovery_members + .iter() + .filter_map(|m| m.addresses.first().cloned()) + .collect(); + + // Fetch version information using direct node connections + if !node_ips.is_empty() { + match client.version_for_nodes(&node_ips).await { + Ok(versions) => { + data.versions = versions; + } + Err(e) => { + self.state + .set_error(format!("Failed to fetch versions: {}", e)); + self.state.set_data(data); + return Ok(()); + } + } + } else { + // Fallback: try etcd_members to get control plane IPs + match client.etcd_members().await { + Ok(members) => { + let cp_ips: Vec = + members.iter().filter_map(|m| m.ip_address()).collect(); + if !cp_ips.is_empty() { + match client.version_for_nodes(&cp_ips).await { + Ok(versions) => { + data.versions = versions; + } + Err(e) => { + self.state + .set_error(format!("Failed to fetch versions: {}", e)); + self.state.set_data(data); + return Ok(()); + } + } + } else { + self.state + .set_error("No node IPs available for version query"); + self.state.set_data(data); + return Ok(()); + } + } + Err(e) => { + self.state + .set_error(format!("Failed to fetch etcd members: {}", e)); + self.state.set_data(data); + return Ok(()); + } + } + } + + // Fetch time sync status using direct node connections + let time_node_ips: Vec = if !node_ips.is_empty() { + node_ips.clone() + } else { + // Use IPs from versions we just fetched + data.versions + .iter() + .map(|v| v.node.split(':').next().unwrap_or(&v.node).to_string()) + .collect() + }; + + if !time_node_ips.is_empty() { + match client.time_for_nodes(&time_node_ips).await { + Ok(times) => { + data.time_info = times; + } + Err(e) => { + // Time fetch failure is not fatal - just log it + tracing::warn!("Failed to fetch time status: {}", e); + data.time_info.clear(); + } + } + } else { + data.time_info.clear(); + } + // Build node statuses combining version and time info // Fetch config hash for each node individually for drift detection let statuses: Vec = data @@ -359,11 +399,10 @@ impl LifecycleComponent { let etcd_quorum = match client.etcd_members().await { Ok(members) => { let total = members.len(); - // Extract control plane hostnames to target status calls - let cp_hostnames: Vec = - members.iter().map(|m| m.hostname.clone()).collect(); + // Extract control plane IPs to target status calls (hostnames may not be resolvable) + let cp_ips: Vec = members.iter().filter_map(|m| m.ip_address()).collect(); // Try to get status from all control planes - let healthy = match client.etcd_status_for_nodes(&cp_hostnames).await { + let healthy = match client.etcd_status_for_nodes(&cp_ips).await { Ok(statuses) => { // Count members with status members diff --git a/crates/talos-pilot-tui/src/components/node_operations.rs b/crates/talos-pilot-tui/src/components/node_operations.rs index 8d98777..973a8b7 100644 --- a/crates/talos-pilot-tui/src/components/node_operations.rs +++ b/crates/talos-pilot-tui/src/components/node_operations.rs @@ -686,13 +686,12 @@ impl NodeOperationsComponent { .iter() .any(|m| m.peer_urls.iter().any(|url| url.contains(node_addr))); - // Extract control plane hostnames to target status calls - let cp_hostnames: Vec = - members.iter().map(|m| m.hostname.clone()).collect(); + // Extract control plane IPs to target status calls (hostnames may not be resolvable) + let cp_ips: Vec = members.iter().filter_map(|m| m.ip_address()).collect(); // Get status from all control planes (single call for both is_leader and healthy) let statuses = client - .etcd_status_for_nodes(&cp_hostnames) + .etcd_status_for_nodes(&cp_ips) .await .unwrap_or_default(); diff --git a/crates/talos-rs/src/auth.rs b/crates/talos-rs/src/auth.rs index b52c0fa..069e185 100644 --- a/crates/talos-rs/src/auth.rs +++ b/crates/talos-rs/src/auth.rs @@ -34,29 +34,38 @@ pub async fn create_channel(ctx: &Context) -> Result { .endpoint_url() .ok_or_else(|| TalosError::ConfigInvalid("No endpoints configured".to_string()))?; - tracing::debug!("Connecting to endpoint: {}", endpoint_url); - // Decode certificates from base64 let ca_pem = ctx.ca_pem()?; let client_cert_pem = ctx.client_cert_pem()?; let client_key_pem = ctx.client_key_pem()?; + create_channel_to_endpoint(&endpoint_url, &ca_pem, &client_cert_pem, &client_key_pem) +} + +/// Create a TLS-enabled gRPC channel to a specific endpoint using provided certificates +pub fn create_channel_to_endpoint( + endpoint_url: &str, + ca_pem: &[u8], + client_cert_pem: &[u8], + client_key_pem: &[u8], +) -> Result { + tracing::debug!("Connecting to endpoint: {}", endpoint_url); tracing::debug!("CA cert size: {} bytes", ca_pem.len()); tracing::debug!("Client cert size: {} bytes", client_cert_pem.len()); tracing::debug!("Client key size: {} bytes", client_key_pem.len()); // Convert Ed25519 key header to standard PKCS8 format if needed // Tonic expects "PRIVATE KEY" not "ED25519 PRIVATE KEY" - let client_key_pem = convert_ed25519_key_to_pkcs8(&client_key_pem); + let client_key_pem = convert_ed25519_key_to_pkcs8(client_key_pem); // Create TLS config - let ca = Certificate::from_pem(&ca_pem); - let identity = Identity::from_pem(&client_cert_pem, &client_key_pem); + let ca = Certificate::from_pem(ca_pem); + let identity = Identity::from_pem(client_cert_pem, &client_key_pem); let tls_config = ClientTlsConfig::new().ca_certificate(ca).identity(identity); // Build the channel (use connect_lazy to defer TLS handshake) - let endpoint = Channel::from_shared(endpoint_url.clone()) + let endpoint = Channel::from_shared(endpoint_url.to_string()) .map_err(|e| { TalosError::Connection(format!("Invalid endpoint URL '{}': {}", endpoint_url, e)) })? @@ -66,7 +75,7 @@ pub async fn create_channel(ctx: &Context) -> Result { // Use connect_lazy - connection happens on first request let channel = endpoint.connect_lazy(); - tracing::debug!("Successfully connected to {}", endpoint_url); + tracing::debug!("Successfully created channel to {}", endpoint_url); Ok(channel) } diff --git a/crates/talos-rs/src/client.rs b/crates/talos-rs/src/client.rs index 282c312..bfec036 100644 --- a/crates/talos-rs/src/client.rs +++ b/crates/talos-rs/src/client.rs @@ -33,6 +33,12 @@ pub struct TalosClient { nodes: Vec, /// Endpoints from configuration (used to filter out vIPs from node targeting) endpoints: Vec, + /// Decoded CA certificate PEM for creating additional connections + ca_pem: Vec, + /// Decoded client certificate PEM for creating additional connections + client_cert_pem: Vec, + /// Decoded client key PEM for creating additional connections + client_key_pem: Vec, } impl TalosClient { @@ -42,10 +48,18 @@ impl TalosClient { let nodes = ctx.target_nodes().to_vec(); let endpoints = ctx.endpoints.clone(); + // Store certificates for creating additional connections + let ca_pem = ctx.ca_pem()?; + let client_cert_pem = ctx.client_cert_pem()?; + let client_key_pem = ctx.client_key_pem()?; + Ok(Self { channel, nodes, endpoints, + ca_pem, + client_cert_pem, + client_key_pem, }) } @@ -68,14 +82,43 @@ impl TalosClient { /// Create a new client targeting a specific node /// /// This returns a clone of the client with requests directed to the specified node. + /// Note: This still uses the original channel (VIP), just with node targeting. + /// For direct connections, use `direct_to_node()` instead. pub fn with_node(&self, node: &str) -> Self { Self { channel: self.channel.clone(), nodes: vec![node.to_string()], endpoints: self.endpoints.clone(), + ca_pem: self.ca_pem.clone(), + client_cert_pem: self.client_cert_pem.clone(), + client_key_pem: self.client_key_pem.clone(), } } + /// Create a new client with a direct connection to a specific node IP + /// + /// This creates a new gRPC channel directly to the specified node, + /// bypassing any VIP or load balancer. Use this when node targeting + /// through VIP is unreliable. + pub fn direct_to_node(&self, node_ip: &str) -> Result { + let endpoint_url = format!("https://{}:50000", node_ip); + let channel = crate::auth::create_channel_to_endpoint( + &endpoint_url, + &self.ca_pem, + &self.client_cert_pem, + &self.client_key_pem, + )?; + + Ok(Self { + channel, + nodes: vec![node_ip.to_string()], + endpoints: vec![endpoint_url], + ca_pem: self.ca_pem.clone(), + client_cert_pem: self.client_cert_pem.clone(), + client_key_pem: self.client_key_pem.clone(), + }) + } + /// Get a MachineService client fn machine_client(&self) -> MachineServiceClient { MachineServiceClient::new(self.channel.clone()) @@ -156,7 +199,7 @@ impl TalosClient { if !valid_nodes.is_empty() { let nodes_str = valid_nodes.join(","); if let Ok(value) = nodes_str.parse() { - request.metadata_mut().insert("nodes", value); + request.metadata_mut().insert("node", value); } } } @@ -312,6 +355,165 @@ impl TalosClient { Ok(times) } + /// Get version information from specific nodes using direct connections + /// + /// This bypasses VIP/node targeting and connects directly to each node. + /// Pass node IPs to query specific nodes. + pub async fn version_for_nodes(&self, nodes: &[String]) -> Result, TalosError> { + if nodes.is_empty() { + // No specific nodes, use standard method + return self.version().await; + } + + let mut all_versions = Vec::new(); + + for node_ip in nodes { + let endpoint_url = format!("https://{}:50000", node_ip); + + match crate::auth::create_channel_to_endpoint( + &endpoint_url, + &self.ca_pem, + &self.client_cert_pem, + &self.client_key_pem, + ) { + Ok(channel) => { + let mut client = MachineServiceClient::new(channel); + let request = Request::new(()); + + match client.version(request).await { + Ok(response) => { + let inner = response.into_inner(); + for msg in inner.messages { + all_versions.push(VersionInfo { + node: node_ip.clone(), + version: msg + .version + .as_ref() + .map(|v| v.tag.clone()) + .unwrap_or_default(), + sha: msg + .version + .as_ref() + .map(|v| v.sha.clone()) + .unwrap_or_default(), + built: msg + .version + .as_ref() + .map(|v| v.built.clone()) + .unwrap_or_default(), + go_version: msg + .version + .as_ref() + .map(|v| v.go_version.clone()) + .unwrap_or_default(), + os: msg + .version + .as_ref() + .map(|v| v.os.clone()) + .unwrap_or_default(), + arch: msg + .version + .as_ref() + .map(|v| v.arch.clone()) + .unwrap_or_default(), + platform: msg + .platform + .as_ref() + .map(|p| p.name.clone()) + .unwrap_or_default(), + }); + } + } + Err(e) => { + tracing::debug!("Failed to get version from {}: {}", node_ip, e); + } + } + } + Err(e) => { + tracing::debug!("Failed to connect to {}: {}", node_ip, e); + } + } + } + + Ok(all_versions) + } + + /// Get time synchronization status from specific nodes using direct connections + /// + /// This bypasses VIP/node targeting and connects directly to each node. + pub async fn time_for_nodes(&self, nodes: &[String]) -> Result, TalosError> { + use crate::proto::time::time_service_client::TimeServiceClient; + + if nodes.is_empty() { + return self.time().await; + } + + const SYNC_TOLERANCE_SECS: f64 = 1.0; + let mut all_times = Vec::new(); + + for node_ip in nodes { + let endpoint_url = format!("https://{}:50000", node_ip); + + match crate::auth::create_channel_to_endpoint( + &endpoint_url, + &self.ca_pem, + &self.client_cert_pem, + &self.client_key_pem, + ) { + Ok(channel) => { + let mut client = TimeServiceClient::new(channel); + let request = Request::new(()); + + match client.time(request).await { + Ok(response) => { + let inner = response.into_inner(); + for msg in inner.messages { + let local_time = msg.localtime.map(|t| { + std::time::UNIX_EPOCH + + std::time::Duration::new(t.seconds as u64, t.nanos as u32) + }); + let remote_time = msg.remotetime.map(|t| { + std::time::UNIX_EPOCH + + std::time::Duration::new(t.seconds as u64, t.nanos as u32) + }); + + let offset_seconds = match (msg.localtime, msg.remotetime) { + (Some(local), Some(remote)) => { + let local_nanos = local.seconds as f64 * 1_000_000_000.0 + + local.nanos as f64; + let remote_nanos = remote.seconds as f64 * 1_000_000_000.0 + + remote.nanos as f64; + (local_nanos - remote_nanos) / 1_000_000_000.0 + } + _ => 0.0, + }; + + let synced = offset_seconds.abs() < SYNC_TOLERANCE_SECS; + + all_times.push(NodeTimeInfo { + node: node_ip.clone(), + server: msg.server, + local_time, + remote_time, + offset_seconds, + synced, + }); + } + } + Err(e) => { + tracing::debug!("Failed to get time from {}: {}", node_ip, e); + } + } + } + Err(e) => { + tracing::debug!("Failed to connect to {}: {}", node_ip, e); + } + } + } + + Ok(all_times) + } + /// Get list of services from all configured nodes pub async fn services(&self) -> Result, TalosError> { let mut client = self.machine_client(); @@ -678,31 +880,94 @@ impl TalosClient { /// Get etcd status from specific control plane nodes /// - /// Pass the hostnames from `etcd_members()` to get status from all control planes. + /// Pass the IPs from `etcd_members()` to get status from all control planes. /// If nodes is empty, queries only the endpoint node. + /// + /// Note: This makes individual calls to each node rather than using node targeting, + /// as node targeting through VIPs can be unreliable. pub async fn etcd_status_for_nodes( &self, nodes: &[String], ) -> Result, TalosError> { - let mut client = self.machine_client(); + if nodes.is_empty() { + // No specific nodes, query via current endpoint + let mut client = self.machine_client(); + let request = Request::new(()); + let response = client.etcd_status(request).await?; + return self.parse_etcd_status_response(response.into_inner()); + } - let mut request = Request::new(()); + tracing::debug!( + "etcd_status_for_nodes querying {} nodes individually", + nodes.len() + ); - // Add node targeting if specific nodes provided - if !nodes.is_empty() { - let nodes_str = nodes.join(","); - request - .metadata_mut() - .insert("nodes", nodes_str.parse().unwrap()); + // Query each node individually and collect results + let mut all_statuses = Vec::new(); + + for node_ip in nodes { + // Create endpoint URL for this specific node + let endpoint_url = format!("https://{}:50000", node_ip); + tracing::debug!("Querying etcd status from: {}", endpoint_url); + + // Try to create a direct connection to this node using stored certificates + match crate::auth::create_channel_to_endpoint( + &endpoint_url, + &self.ca_pem, + &self.client_cert_pem, + &self.client_key_pem, + ) { + Ok(channel) => { + let mut client = MachineServiceClient::new(channel); + let request = Request::new(()); + + match client.etcd_status(request).await { + Ok(response) => { + if let Ok(statuses) = + self.parse_etcd_status_response(response.into_inner()) + { + all_statuses.extend(statuses); + } + } + Err(e) => { + tracing::debug!("Failed to get etcd status from {}: {}", node_ip, e); + } + } + } + Err(e) => { + tracing::debug!("Failed to connect to {}: {}", node_ip, e); + } + } } - let response = client.etcd_status(request).await?; - let inner = response.into_inner(); + Ok(all_statuses) + } - let statuses: Vec = inner + /// Parse etcd status response into EtcdMemberStatus structs + fn parse_etcd_status_response( + &self, + response: crate::proto::machine::EtcdStatusResponse, + ) -> Result, TalosError> { + tracing::debug!( + "etcd_status response: {} messages received", + response.messages.len() + ); + + let statuses: Vec = response .messages .into_iter() .filter_map(|msg| { + let has_status = msg.member_status.is_some(); + let node_info = msg + .metadata + .as_ref() + .map(|m| m.hostname.as_str()) + .unwrap_or("unknown"); + tracing::debug!( + "etcd_status message from {}: has_status={}", + node_info, + has_status + ); msg.member_status.map(|status| EtcdMemberStatus { node: self.node_from_metadata(msg.metadata.as_ref(), 0), member_id: status.member_id, @@ -719,6 +984,10 @@ impl TalosClient { }) .collect(); + tracing::debug!( + "parse_etcd_status_response returning {} statuses", + statuses.len() + ); Ok(statuses) } @@ -2561,6 +2830,10 @@ mod tests { channel, nodes, endpoints, + // Empty certificates - not needed for filtering tests + ca_pem: Vec::new(), + client_cert_pem: Vec::new(), + client_key_pem: Vec::new(), } } diff --git a/crates/talos-rs/src/config.rs b/crates/talos-rs/src/config.rs index 0d7a606..0f96439 100644 --- a/crates/talos-rs/src/config.rs +++ b/crates/talos-rs/src/config.rs @@ -36,16 +36,23 @@ impl TalosConfig { /// Load configuration from the default location (~/.talos/config) pub fn load_default() -> Result { let path = Self::default_path()?; + tracing::debug!("Loading talosconfig from: {:?}", path); Self::load_from(&path) } /// Load configuration from a specific path pub fn load_from(path: &PathBuf) -> Result { + tracing::debug!("load_from called with path: {:?}", path); if !path.exists() { return Err(TalosError::ConfigNotFound(path.display().to_string())); } let content = std::fs::read_to_string(path)?; let config: TalosConfig = serde_yaml::from_str(&content)?; + tracing::debug!( + "Loaded config with {} contexts: {:?}", + config.contexts.len(), + config.contexts.keys().collect::>() + ); Ok(config) } @@ -53,9 +60,11 @@ impl TalosConfig { pub fn default_path() -> Result { // Check if TALOSCONFIG environment variable is set if let Ok(talosconfig) = std::env::var("TALOSCONFIG") { + tracing::debug!("TALOSCONFIG env var found: {}", talosconfig); return Ok(PathBuf::from(talosconfig)); } + tracing::debug!("TALOSCONFIG env var not set, using default path"); // Fallback to default location let home = dirs_next::home_dir().ok_or(TalosError::NoHomeDirectory)?; Ok(home.join(".talos").join("config")) diff --git a/test-clusters/scripts/multi-cp-qemu.sh b/test-clusters/scripts/multi-cp-qemu.sh new file mode 100755 index 0000000..7178a19 --- /dev/null +++ b/test-clusters/scripts/multi-cp-qemu.sh @@ -0,0 +1,1036 @@ +#!/bin/bash +# +# Multi-Control-Plane QEMU Test Cluster +# +# Creates a 3 control plane + 3 worker Talos cluster using QEMU +# for testing etcd quorum and multi-node scenarios. +# +# This reproduces the user's production config pattern: +# endpoints: [vip, cp1, cp2, cp3] +# nodes: [cp1, cp2, cp3, w1, w2, w3] +# +# Usage: +# ./multi-cp-qemu.sh create - Create and start all VMs +# ./multi-cp-qemu.sh destroy - Destroy all VMs and clean up +# ./multi-cp-qemu.sh status - Show cluster status +# ./multi-cp-qemu.sh bootstrap - Bootstrap the cluster (after config applied) +# ./multi-cp-qemu.sh config - Generate and show talosconfig +# ./multi-cp-qemu.sh help - Show this help +# + +set -e + +# Configuration +CLUSTER_NAME="multi-cp-test" +WORK_DIR="/tmp/talos-multi-cp" +CACHE_DIR="${XDG_CACHE_HOME:-$HOME/.cache}/talos-pilot" +TALOS_VERSION="v1.12.1" +ISO_URL="https://factory.talos.dev/image/376567988ad370138ad8b2698212367b8edcb69b5fd68c80be1f2ec7d603b4ba/${TALOS_VERSION}/metal-amd64.iso" +ISO_PATH="$CACHE_DIR/talos-${TALOS_VERSION}.iso" + +# Minimum requirements per Talos docs +CP_MEMORY="2048" # 2 GiB +CP_CPUS="2" +CP_DISK="10G" + +WORKER_MEMORY="1024" # 1 GiB +WORKER_CPUS="1" +WORKER_DISK="10G" + +# Network configuration - using a bridge network +# We'll use 192.168.100.0/24 subnet +BRIDGE_NAME="talos-br0" +SUBNET="192.168.100" + +# Node IPs (static) +CP1_IP="${SUBNET}.11" +CP2_IP="${SUBNET}.12" +CP3_IP="${SUBNET}.13" +W1_IP="${SUBNET}.21" +W2_IP="${SUBNET}.22" +W3_IP="${SUBNET}.23" +GATEWAY_IP="${SUBNET}.1" + +# Virtual IP for the cluster endpoint +VIP="${SUBNET}.10" + +# Hostnames (to simulate user's production config with DNS names) +VIP_HOSTNAME="cluster.test.local" +CP1_HOSTNAME="cp1.test.local" +CP2_HOSTNAME="cp2.test.local" +CP3_HOSTNAME="cp3.test.local" +W1_HOSTNAME="w1.test.local" +W2_HOSTNAME="w2.test.local" +W3_HOSTNAME="w3.test.local" + +# Port forwards from host (for accessing from localhost) +# We'll forward to CP1 by default +HOST_TALOS_PORT="50000" +HOST_K8S_PORT="6443" + +# Colors +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +CYAN='\033[0;36m' +NC='\033[0m' + +log_info() { echo -e "${BLUE}[INFO]${NC} $*"; } +log_success() { echo -e "${GREEN}[OK]${NC} $*"; } +log_warn() { echo -e "${YELLOW}[WARN]${NC} $*"; } +log_error() { echo -e "${RED}[ERROR]${NC} $*" >&2; } + +usage() { + cat << EOF +Multi-Control-Plane QEMU Test Cluster + +Creates a 3 CP + 3 worker Talos cluster for testing etcd quorum. +Uses HOSTNAMES (via /etc/hosts) to match user's production config pattern. + +USAGE: + $0 + +COMMANDS: + create Create and start all 6 VMs (requires sudo for bridge + /etc/hosts) + destroy Destroy all VMs and clean up + status Show cluster status + apply Apply configs to VMs in maintenance mode + bootstrap Bootstrap the cluster + config Generate talosconfig for the cluster + connect Show connection info + help Show this help + +WORKFLOW: + 1. $0 create # Creates bridge, /etc/hosts entries, and starts 6 VMs + 2. Wait for VMs to boot into maintenance mode (~30s) + 3. $0 apply # Apply Talos configs to all nodes + 4. Wait for nodes to install and reboot (~2-3 min) + 5. $0 bootstrap # Bootstrap etcd on first control plane + 6. cargo run # Test with talos-pilot + +ALTERNATIVELY (wizard mode): + 1. $0 create + 2. cargo run -- --insecure --endpoint $CP1_IP + 3. Use wizard to bootstrap + +HOSTNAMES (added to /etc/hosts): + $VIP_HOSTNAME -> $VIP (Virtual IP) + $CP1_HOSTNAME -> $CP1_IP + $CP2_HOSTNAME -> $CP2_IP + $CP3_HOSTNAME -> $CP3_IP + $W1_HOSTNAME -> $W1_IP + $W2_HOSTNAME -> $W2_IP + $W3_HOSTNAME -> $W3_IP + +TALOSCONFIG PATTERN (matches user's production): + endpoints: [$VIP_HOSTNAME, $CP1_HOSTNAME, $CP2_HOSTNAME, $CP3_HOSTNAME] + nodes: [$CP1_HOSTNAME, $CP2_HOSTNAME, $CP3_HOSTNAME, $W1_HOSTNAME, $W2_HOSTNAME, $W3_HOSTNAME] + +REQUIREMENTS: + - qemu-system-x86_64 + - KVM enabled (/dev/kvm) + - sudo access (for bridge network + /etc/hosts) + - ~12 GB RAM available (6 GB for CPs + 3 GB for workers + overhead) + +EOF + exit 0 +} + +check_prereqs() { + local missing=() + + command -v qemu-system-x86_64 &>/dev/null || missing+=("qemu-system-x86_64") + command -v ip &>/dev/null || missing+=("iproute2") + command -v talosctl &>/dev/null || missing+=("talosctl") + command -v dnsmasq &>/dev/null || missing+=("dnsmasq") + + if [[ ${#missing[@]} -gt 0 ]]; then + log_error "Missing required tools: ${missing[*]}" + echo "Install with: sudo apt install qemu-system-x86 iproute2 dnsmasq" + exit 1 + fi + + if [[ ! -r /dev/kvm ]]; then + log_error "/dev/kvm not accessible. Add yourself to kvm group: sudo usermod -aG kvm \$USER" + exit 1 + fi + + # Check available memory + local avail_mem=$(awk '/MemAvailable/ {print int($2/1024)}' /proc/meminfo) + local needed_mem=$((CP_MEMORY * 3 + WORKER_MEMORY * 3)) + if [[ $avail_mem -lt $needed_mem ]]; then + log_warn "Low memory: ${avail_mem}MB available, ${needed_mem}MB needed" + log_warn "Cluster may be slow or fail to start" + fi +} + +download_iso() { + mkdir -p "$CACHE_DIR" + + if [[ -f "$ISO_PATH" ]]; then + log_info "ISO cached: $ISO_PATH" + return + fi + + log_info "Downloading Talos ISO..." + curl -L -o "$ISO_PATH" "$ISO_URL" + log_success "ISO downloaded" +} + +setup_bridge() { + log_info "Setting up bridge network (requires sudo)..." + + # Check if bridge already exists + if ip link show "$BRIDGE_NAME" &>/dev/null; then + log_info "Bridge $BRIDGE_NAME already exists" + return + fi + + # Create bridge + sudo ip link add name "$BRIDGE_NAME" type bridge + sudo ip addr add "${GATEWAY_IP}/24" dev "$BRIDGE_NAME" + sudo ip link set "$BRIDGE_NAME" up + + # Save original ip_forward value before changing it + local original_ip_forward=$(cat /proc/sys/net/ipv4/ip_forward) + echo "$original_ip_forward" > "$WORK_DIR/ip_forward.orig" + + # Enable IP forwarding and NAT for internet access + sudo sysctl -w net.ipv4.ip_forward=1 >/dev/null + + # Get default interface for NAT + local default_iface=$(ip route | grep default | awk '{print $5}' | head -1) + if [[ -n "$default_iface" ]]; then + # Save default interface for teardown + echo "$default_iface" > "$WORK_DIR/default_iface" + sudo iptables -t nat -A POSTROUTING -s "${SUBNET}.0/24" -o "$default_iface" -j MASQUERADE + sudo iptables -A FORWARD -i "$BRIDGE_NAME" -o "$default_iface" -j ACCEPT + sudo iptables -A FORWARD -i "$default_iface" -o "$BRIDGE_NAME" -m state --state RELATED,ESTABLISHED -j ACCEPT + fi + + log_success "Bridge network created: $BRIDGE_NAME (${GATEWAY_IP}/24)" +} + +teardown_bridge() { + log_info "Tearing down bridge network..." + + # Use saved default interface if available (ensures we remove the same rules we added) + local default_iface="" + if [[ -f "$WORK_DIR/default_iface" ]]; then + default_iface=$(cat "$WORK_DIR/default_iface") + else + # Fallback to current default interface + default_iface=$(ip route | grep default | awk '{print $5}' | head -1) + fi + + # Remove iptables rules + if [[ -n "$default_iface" ]]; then + sudo iptables -t nat -D POSTROUTING -s "${SUBNET}.0/24" -o "$default_iface" -j MASQUERADE 2>/dev/null || true + sudo iptables -D FORWARD -i "$BRIDGE_NAME" -o "$default_iface" -j ACCEPT 2>/dev/null || true + sudo iptables -D FORWARD -i "$default_iface" -o "$BRIDGE_NAME" -m state --state RELATED,ESTABLISHED -j ACCEPT 2>/dev/null || true + fi + + # Restore original ip_forward value + if [[ -f "$WORK_DIR/ip_forward.orig" ]]; then + local original_ip_forward=$(cat "$WORK_DIR/ip_forward.orig") + sudo sysctl -w net.ipv4.ip_forward="$original_ip_forward" >/dev/null + log_info "Restored ip_forward to $original_ip_forward" + fi + + # Remove bridge + if ip link show "$BRIDGE_NAME" &>/dev/null; then + sudo ip link set "$BRIDGE_NAME" down + sudo ip link delete "$BRIDGE_NAME" + fi + + log_success "Bridge removed" +} + +setup_hosts() { + log_info "Setting up /etc/hosts entries (requires sudo)..." + + # Remove any existing entries for our test domain + sudo sed -i '/\.test\.local/d' /etc/hosts + + # Add new entries + { + echo "$VIP $VIP_HOSTNAME" + echo "$CP1_IP $CP1_HOSTNAME" + echo "$CP2_IP $CP2_HOSTNAME" + echo "$CP3_IP $CP3_HOSTNAME" + echo "$W1_IP $W1_HOSTNAME" + echo "$W2_IP $W2_HOSTNAME" + echo "$W3_IP $W3_HOSTNAME" + } | sudo tee -a /etc/hosts > /dev/null + + log_success "Added /etc/hosts entries for *.test.local" +} + +setup_dhcp() { + log_info "Starting DHCP server (dnsmasq) on bridge..." + + # Check if dnsmasq is installed + if ! command -v dnsmasq &>/dev/null; then + log_error "dnsmasq not installed. Install with: sudo apt install dnsmasq" + exit 1 + fi + + # Kill any existing dnsmasq for our bridge + sudo pkill -f "dnsmasq.*${BRIDGE_NAME}" 2>/dev/null || true + + # Create dnsmasq config in /run (AppArmor allows dnsmasq to read from /run) + local dnsmasq_conf="/run/talos-dnsmasq.conf" + local dnsmasq_pid="/run/talos-dnsmasq.pid" + local dnsmasq_lease="/run/talos-dnsmasq.leases" + + sudo tee "$dnsmasq_conf" > /dev/null << EOF +# DHCP server for Talos test cluster +interface=${BRIDGE_NAME} +bind-interfaces +port=0 +dhcp-range=${SUBNET}.100,${SUBNET}.199,12h +dhcp-option=option:router,${GATEWAY_IP} +dhcp-option=option:dns-server,8.8.8.8,8.8.4.4 +dhcp-leasefile=${dnsmasq_lease} +EOF + + # Store paths for cleanup + echo "$dnsmasq_conf" > "$WORK_DIR/dnsmasq_conf_path" + echo "$dnsmasq_pid" > "$WORK_DIR/dnsmasq_pid_path" + + # Start dnsmasq (use aa-exec to bypass AppArmor restrictions if available) + if command -v aa-exec &>/dev/null; then + sudo aa-exec -p unconfined -- dnsmasq --conf-file="$dnsmasq_conf" --pid-file="$dnsmasq_pid" --log-facility="$WORK_DIR/dnsmasq.log" + else + sudo dnsmasq --conf-file="$dnsmasq_conf" --pid-file="$dnsmasq_pid" --log-facility="$WORK_DIR/dnsmasq.log" + fi + + log_success "DHCP server started (range: ${SUBNET}.100-199)" +} + +stop_dhcp() { + # Try stored PID path first + local pid_path="/run/talos-dnsmasq.pid" + if [[ -f "$WORK_DIR/dnsmasq_pid_path" ]]; then + pid_path=$(cat "$WORK_DIR/dnsmasq_pid_path") + fi + + if [[ -f "$pid_path" ]]; then + local pid=$(cat "$pid_path") + if kill -0 "$pid" 2>/dev/null; then + sudo kill "$pid" 2>/dev/null || true + fi + sudo rm -f "$pid_path" + fi + + # Cleanup config and lease files + local conf_path="/run/talos-dnsmasq.conf" + if [[ -f "$WORK_DIR/dnsmasq_conf_path" ]]; then + conf_path=$(cat "$WORK_DIR/dnsmasq_conf_path") + fi + sudo rm -f "$conf_path" /run/talos-dnsmasq.leases + + # Fallback: kill any dnsmasq for our bridge + sudo pkill -f "dnsmasq.*${BRIDGE_NAME}" 2>/dev/null || true +} + +teardown_hosts() { + log_info "Removing /etc/hosts entries..." + sudo sed -i '/\.test\.local/d' /etc/hosts + log_success "Removed /etc/hosts entries" +} + +create_tap() { + local tap_name="$1" + + # Delete existing tap if it exists (might be orphaned from previous run) + if ip link show "$tap_name" &>/dev/null; then + sudo ip link set "$tap_name" down 2>/dev/null || true + sudo ip tuntap del dev "$tap_name" mode tap 2>/dev/null || true + fi + + sudo ip tuntap add dev "$tap_name" mode tap user "$USER" + sudo ip link set "$tap_name" master "$BRIDGE_NAME" + sudo ip link set "$tap_name" up +} + +delete_tap() { + local tap_name="$1" + + if ip link show "$tap_name" &>/dev/null; then + sudo ip link set "$tap_name" down + sudo ip tuntap del dev "$tap_name" mode tap + fi +} + +create_disk() { + local disk_path="$1" + local size="$2" + + if [[ ! -f "$disk_path" ]]; then + qemu-img create -f qcow2 "$disk_path" "$size" >/dev/null + fi +} + +start_vm() { + local name="$1" + local ip="$2" + local memory="$3" + local cpus="$4" + local disk_size="$5" + local role="$6" # controlplane or worker + + local tap_name="tap-${name}" + local disk_path="$WORK_DIR/${name}.qcow2" + local pid_file="$WORK_DIR/${name}.pid" + local mac=$(printf '52:54:00:%02x:%02x:%02x' $((RANDOM%256)) $((RANDOM%256)) $((RANDOM%256))) + + log_info "Starting $name ($role) - IP: $ip, RAM: ${memory}MB, CPUs: $cpus" + + # Create TAP interface + create_tap "$tap_name" + + # Create disk + create_disk "$disk_path" "$disk_size" + + # Start QEMU + qemu-system-x86_64 \ + -name "$name" \ + -m "$memory" \ + -smp "$cpus" \ + -cpu host \ + -enable-kvm \ + -drive "file=$disk_path,format=qcow2,if=virtio" \ + -cdrom "$ISO_PATH" \ + -boot d \ + -netdev "tap,id=net0,ifname=$tap_name,script=no,downscript=no" \ + -device "virtio-net-pci,netdev=net0,mac=$mac" \ + -display none \ + -daemonize \ + -pidfile "$pid_file" \ + 2>/dev/null + + # Store IP mapping + echo "$ip" > "$WORK_DIR/${name}.ip" + echo "$mac" > "$WORK_DIR/${name}.mac" +} + +stop_vm() { + local name="$1" + local pid_file="$WORK_DIR/${name}.pid" + local tap_name="tap-${name}" + + if [[ -f "$pid_file" ]]; then + local pid=$(cat "$pid_file") + if kill -0 "$pid" 2>/dev/null; then + kill "$pid" 2>/dev/null || true + fi + rm -f "$pid_file" + fi + + delete_tap "$tap_name" +} + +create_cluster() { + check_prereqs + + log_info "Creating multi-CP test cluster..." + echo "" + log_info "This will create:" + echo " - 3 control plane VMs (${CP_MEMORY}MB RAM, ${CP_CPUS} CPUs each)" + echo " - 3 worker VMs (${WORKER_MEMORY}MB RAM, ${WORKER_CPUS} CPU each)" + echo " - Bridge network: $BRIDGE_NAME (${SUBNET}.0/24)" + echo "" + + # Ensure work directory exists and is owned by current user + # (may have been left owned by root from a previous partial run) + if [[ -d "$WORK_DIR" ]]; then + sudo rm -rf "$WORK_DIR" + fi + mkdir -p "$WORK_DIR" + + download_iso + setup_bridge + setup_dhcp + setup_hosts + + # Start control plane VMs + start_vm "cp1" "$CP1_IP" "$CP_MEMORY" "$CP_CPUS" "$CP_DISK" "controlplane" + start_vm "cp2" "$CP2_IP" "$CP_MEMORY" "$CP_CPUS" "$CP_DISK" "controlplane" + start_vm "cp3" "$CP3_IP" "$CP_MEMORY" "$CP_CPUS" "$CP_DISK" "controlplane" + + # Start worker VMs + start_vm "w1" "$W1_IP" "$WORKER_MEMORY" "$WORKER_CPUS" "$WORKER_DISK" "worker" + start_vm "w2" "$W2_IP" "$WORKER_MEMORY" "$WORKER_CPUS" "$WORKER_DISK" "worker" + start_vm "w3" "$W3_IP" "$WORKER_MEMORY" "$WORKER_CPUS" "$WORKER_DISK" "worker" + + echo "" + log_success "All VMs started!" + echo "" + echo -e "${CYAN}=== Next Steps ===${NC}" + echo "" + echo "1. Wait for VMs to boot into maintenance mode (~30-60 seconds)" + echo " Check with: $0 status" + echo "" + echo "2. Apply Talos configuration to all nodes:" + echo " $0 apply" + echo "" + echo "3. Wait for nodes to install and reboot (~2-3 minutes)" + echo "" + echo "4. Bootstrap the cluster:" + echo " $0 bootstrap" + echo "" + echo "5. Test with talos-pilot:" + echo " cargo run" + echo "" + echo -e "${CYAN}=== OR use wizard mode ===${NC}" + echo "" + echo " cargo run -- --insecure --endpoint $CP1_IP" + echo "" +} + +generate_config() { + log_info "Generating Talos configuration..." + + # Generate base config with VIP as endpoint + talosctl gen config "$CLUSTER_NAME" "https://${VIP}:6443" \ + --additional-sans "$VIP,$CP1_IP,$CP2_IP,$CP3_IP" \ + --output-dir "$WORK_DIR" \ + --force + + # Patch controlplane.yaml to add VIP and fix install disk (virtio = /dev/vda) + cat > "$WORK_DIR/vip-patch.yaml" << EOF +machine: + install: + disk: /dev/vda + network: + interfaces: + - interface: ens3 + dhcp: false + addresses: + - \${IP}/24 + routes: + - network: 0.0.0.0/0 + gateway: $GATEWAY_IP + vip: + ip: $VIP +EOF + + # Create per-node configs + for node in cp1 cp2 cp3; do + local ip_var="${node^^}_IP" + local ip="${!ip_var}" + + # Substitute IP and create node-specific config + sed "s/\${IP}/$ip/g" "$WORK_DIR/vip-patch.yaml" > "$WORK_DIR/${node}-patch.yaml" + + talosctl machineconfig patch "$WORK_DIR/controlplane.yaml" \ + --patch @"$WORK_DIR/${node}-patch.yaml" \ + --output "$WORK_DIR/${node}.yaml" + done + + # Create worker configs with static IPs and install disk + for node in w1 w2 w3; do + local ip_var="${node^^}_IP" + local ip="${!ip_var}" + + cat > "$WORK_DIR/${node}-patch.yaml" << EOF +machine: + install: + disk: /dev/vda + network: + interfaces: + - interface: ens3 + dhcp: false + addresses: + - ${ip}/24 + routes: + - network: 0.0.0.0/0 + gateway: $GATEWAY_IP +EOF + + talosctl machineconfig patch "$WORK_DIR/worker.yaml" \ + --patch @"$WORK_DIR/${node}-patch.yaml" \ + --output "$WORK_DIR/${node}.yaml" + done + + log_success "Generated configs in $WORK_DIR" +} + +apply_configs() { + log_info "Applying configurations to nodes..." + + # Generate configs first + generate_config + + # Discover nodes in maintenance mode (use leases file for speed) + log_info "Discovering nodes in maintenance mode..." + local maintenance_ips=() + local ips_to_scan=() + + # First try to get IPs from leases file + if [[ -f /run/talos-dnsmasq.leases ]] && [[ -s /run/talos-dnsmasq.leases ]]; then + while read -r _ _ ip _; do + ips_to_scan+=("$ip") + done < /run/talos-dnsmasq.leases + log_info "Found ${#ips_to_scan[@]} DHCP leases" + fi + + # Fallback to scanning if no leases + if [[ ${#ips_to_scan[@]} -eq 0 ]]; then + log_info "No leases found, scanning range..." + for i in $(seq 100 120); do + ips_to_scan+=("${SUBNET}.$i") + done + fi + + for ip in "${ips_to_scan[@]}"; do + # Check for maintenance mode error (means node is alive and in maintenance) + if timeout 2 talosctl version --insecure --nodes "$ip" 2>&1 | grep -q "maintenance mode"; then + maintenance_ips+=("$ip") + log_info " Found: $ip" + fi + done + + if [[ ${#maintenance_ips[@]} -eq 0 ]]; then + log_error "No nodes found in maintenance mode!" + log_error "Wait for VMs to boot and try again." + exit 1 + fi + + log_info "Found ${#maintenance_ips[@]} nodes in maintenance mode" + + if [[ ${#maintenance_ips[@]} -lt 6 ]]; then + log_warn "Expected 6 nodes but found ${#maintenance_ips[@]}" + log_warn "Some VMs may still be booting. Continue anyway? (y/n)" + read -r answer + if [[ "$answer" != "y" ]]; then + exit 1 + fi + fi + + # Apply configs in order: first 3 are CPs, next 3 are workers + local configs=(cp1 cp2 cp3 w1 w2 w3) + local idx=0 + for maint_ip in "${maintenance_ips[@]}"; do + if [[ $idx -ge ${#configs[@]} ]]; then + log_warn "More nodes than configs, skipping $maint_ip" + continue + fi + + local node="${configs[$idx]}" + log_info "Applying $node config to $maint_ip..." + if talosctl apply-config --insecure --nodes "$maint_ip" --file "$WORK_DIR/${node}.yaml"; then + log_success "$node configured (was $maint_ip, will become ${node^^}_IP)" + else + log_warn "Failed to configure $node at $maint_ip" + fi + idx=$((idx + 1)) + done + + echo "" + log_success "Configs applied! Nodes are now installing Talos to disk." + echo "" + echo -e "${CYAN}=== IMPORTANT: Next Steps ===${NC}" + echo "" + echo "1. Wait ~2 minutes for installation to complete" + echo " (You can watch disk sizes grow: ls -lh $WORK_DIR/*.qcow2)" + echo "" + echo "2. Restart VMs to boot from disk (NOT the CD):" + echo -e " ${YELLOW}$0 reboot-disk${NC}" + echo "" + echo "3. Wait ~30 seconds, then verify nodes are up:" + echo " $0 check" + echo "" + echo "4. Once all nodes show ✓, bootstrap the cluster:" + echo " $0 bootstrap" + echo "" + echo "After reboot-disk, nodes will have these IPs:" + echo " cp1: $CP1_IP" + echo " cp2: $CP2_IP" + echo " cp3: $CP3_IP" + echo " w1: $W1_IP" + echo " w2: $W2_IP" + echo " w3: $W3_IP" +} + +setup_talosconfig() { + log_info "Setting up talosconfig with IPs..." + + # Merge the generated config + talosctl config merge "$WORK_DIR/talosconfig" + + # Set endpoints using IPs (vIP + CP IPs) + # Note: Using IPs because node certs have random hostnames + talosctl --context "$CLUSTER_NAME" config endpoint \ + "$VIP" "$CP1_IP" "$CP2_IP" "$CP3_IP" + + # Set nodes to all nodes using IPs + talosctl --context "$CLUSTER_NAME" config node \ + "$CP1_IP" "$CP2_IP" "$CP3_IP" \ + "$W1_IP" "$W2_IP" "$W3_IP" + + log_success "talosconfig updated" + echo "" + echo "Context: $CLUSTER_NAME" + echo "Endpoints: $VIP, $CP1_IP, $CP2_IP, $CP3_IP" + echo "Nodes: $CP1_IP, $CP2_IP, $CP3_IP, $W1_IP, $W2_IP, $W3_IP" +} + +bootstrap_cluster() { + log_info "Bootstrapping cluster on $CP1_IP..." + + # Setup talosconfig first + setup_talosconfig + + # Bootstrap on first control plane using direct talosconfig path and IP + if talosctl --talosconfig "$WORK_DIR/talosconfig" --endpoints "$CP1_IP" --nodes "$CP1_IP" bootstrap; then + log_success "Bootstrap initiated!" + echo "" + log_info "Waiting for cluster to form..." + sleep 15 + + # Check etcd members + log_info "Checking etcd members..." + talosctl --talosconfig "$WORK_DIR/talosconfig" --endpoints "$CP1_IP" --nodes "$CP1_IP" etcd members + + echo "" + log_success "Cluster bootstrapped! Test with: cargo run" + log_info "Select context: $CLUSTER_NAME" + else + log_error "Bootstrap failed. Are nodes configured and rebooted?" + log_error "Run: $0 check to verify nodes are up" + fi +} + +destroy_cluster() { + log_info "Destroying multi-CP cluster..." + + # Stop all VMs + for node in cp1 cp2 cp3 w1 w2 w3; do + stop_vm "$node" + done + + # Stop DHCP server + stop_dhcp + + # Teardown bridge and hosts + teardown_bridge + teardown_hosts + + # Remove work directory + if [[ -d "$WORK_DIR" ]]; then + rm -rf "$WORK_DIR" + fi + + # Remove talosconfig context + talosctl config remove "$CLUSTER_NAME" --noconfirm 2>/dev/null || true + + log_success "Cluster destroyed" +} + +show_status() { + echo "" + echo -e "${CYAN}=== Multi-CP Cluster Status ===${NC}" + echo "" + + # Check bridge + if ip link show "$BRIDGE_NAME" &>/dev/null; then + echo -e "Bridge: ${GREEN}$BRIDGE_NAME (up)${NC}" + else + echo -e "Bridge: ${RED}not created${NC}" + fi + + # Check DHCP server + if [[ -f /run/talos-dnsmasq.pid ]] && kill -0 "$(cat /run/talos-dnsmasq.pid)" 2>/dev/null; then + echo -e "DHCP: ${GREEN}running${NC}" + else + echo -e "DHCP: ${RED}not running${NC}" + fi + echo "" + + # Check each VM process + echo "VMs (processes):" + for node in cp1 cp2 cp3 w1 w2 w3; do + local pid_file="$WORK_DIR/${node}.pid" + if [[ -f "$pid_file" ]] && kill -0 "$(cat "$pid_file")" 2>/dev/null; then + echo -e " $node: ${GREEN}running${NC}" + else + echo -e " $node: ${RED}stopped${NC}" + fi + done + echo "" + + # Show DHCP leases (maintenance mode IPs) + echo "DHCP Leases (maintenance mode):" + if [[ -f /run/talos-dnsmasq.leases ]]; then + cat /run/talos-dnsmasq.leases 2>/dev/null | while read ts mac ip host _; do + echo " $ip ($mac) - $host" + done + else + echo " (no leases yet)" + fi + echo "" + + # Scan for maintenance mode nodes (use leases file if available for speed) + echo "Scanning for Talos nodes in maintenance mode..." + local found=0 + local ips_to_scan=() + + # First try to get IPs from leases file + if [[ -f /run/talos-dnsmasq.leases ]] && [[ -s /run/talos-dnsmasq.leases ]]; then + while read -r _ _ ip _; do + ips_to_scan+=("$ip") + done < /run/talos-dnsmasq.leases + fi + + # Fallback to scanning if no leases + if [[ ${#ips_to_scan[@]} -eq 0 ]]; then + for i in $(seq 100 120); do + ips_to_scan+=("${SUBNET}.$i") + done + fi + + for ip in "${ips_to_scan[@]}"; do + # Check for maintenance mode error (means node is alive and in maintenance) + if timeout 2 talosctl version --insecure --nodes "$ip" 2>&1 | grep -q "maintenance mode"; then + echo -e " ${GREEN}Found:${NC} $ip (maintenance mode)" + found=$((found + 1)) + fi + done + if [[ $found -eq 0 ]]; then + echo " (no nodes found yet - still booting?)" + fi + echo "" + + # Check configured nodes (after apply) + echo "Configured nodes (static IPs):" + for node in cp1 cp2 cp3 w1 w2 w3; do + local ip_var="${node^^}_IP" + local ip="${!ip_var}" + if talosctl version --insecure --nodes "$ip" &>/dev/null; then + local mode=$(talosctl version --insecure --nodes "$ip" 2>&1 | grep -q "maintenance" && echo "maintenance" || echo "running") + echo -e " $node ($ip): ${GREEN}$mode${NC}" + else + echo -e " $node ($ip): ${YELLOW}not reachable${NC}" + fi + done + echo "" + + # Check talosconfig + if talosctl config contexts 2>/dev/null | grep -q "$CLUSTER_NAME"; then + echo "Talosconfig: $CLUSTER_NAME context exists" + + # Try to get etcd members + echo "" + echo "etcd members:" + if talosctl --context "$CLUSTER_NAME" etcd members --nodes "$CP1_IP" 2>/dev/null; then + : + else + echo " (cannot connect - cluster may not be bootstrapped)" + fi + else + echo "Talosconfig: not configured" + fi + echo "" +} + +show_connect_info() { + cat << EOF + +=== Multi-CP Cluster Connection Info === + +Network: $BRIDGE_NAME (${SUBNET}.0/24) + +Hostnames (/etc/hosts): + $VIP_HOSTNAME -> $VIP (Virtual IP) + $CP1_HOSTNAME -> $CP1_IP + $CP2_HOSTNAME -> $CP2_IP + $CP3_HOSTNAME -> $CP3_IP + $W1_HOSTNAME -> $W1_IP + $W2_HOSTNAME -> $W2_IP + $W3_HOSTNAME -> $W3_IP + +talosctl commands: + talosctl --context $CLUSTER_NAME get members + talosctl --context $CLUSTER_NAME etcd members + talosctl --context $CLUSTER_NAME dashboard + +talos-pilot: + cargo run + # Select '$CLUSTER_NAME' context + # Check etcd view - should show 3/3 members + +Test the fix: + The talosconfig uses HOSTNAMES like the user's production config: + endpoints: [$VIP_HOSTNAME, $CP1_HOSTNAME, $CP2_HOSTNAME, $CP3_HOSTNAME] + nodes: [$CP1_HOSTNAME, $CP2_HOSTNAME, $CP3_HOSTNAME, $W1_HOSTNAME, $W2_HOSTNAME, $W3_HOSTNAME] + + This matches the user's real config: + endpoints: [cluster.example.com, kubec01.example.com, kubec02.example.com, kubec03.example.com] + nodes: [kubec01.example.com, kubec02.example.com, kubec03.example.com, kubew01.example.com, ...] + + etcd should show 3/3 healthy members (not 1/3 or 0/3) + +EOF +} + +check_nodes() { + echo "Checking static IP nodes..." + for node in cp1 cp2 cp3 w1 w2 w3; do + local ip_var="${node^^}_IP" + local ip="${!ip_var}" + if timeout 1 bash -c "echo >/dev/tcp/$ip/50000" 2>/dev/null; then + echo -e " ${GREEN}✓${NC} $node ($ip)" + else + echo -e " ${RED}✗${NC} $node ($ip)" + fi + done +} + +restart_vms() { + local boot_mode="${1:-cd}" # cd or disk + + if [[ "$boot_mode" == "disk" ]]; then + log_info "Restarting VMs to boot from disk..." + else + log_info "Restarting VMs to boot from CD (maintenance mode)..." + fi + + for node in cp1 cp2 cp3 w1 w2 w3; do + local pid_file="$WORK_DIR/${node}.pid" + local disk_path="$WORK_DIR/${node}.qcow2" + local tap_name="tap-${node}" + local mac_file="$WORK_DIR/${node}.mac" + + # Get memory/cpu settings + local memory cpus + if [[ "$node" == cp* ]]; then + memory="$CP_MEMORY" + cpus="$CP_CPUS" + else + memory="$WORKER_MEMORY" + cpus="$WORKER_CPUS" + fi + + # Get MAC address + local mac="" + if [[ -f "$mac_file" ]]; then + mac=$(cat "$mac_file") + else + mac=$(printf '52:54:00:%02x:%02x:%02x' $((RANDOM%256)) $((RANDOM%256)) $((RANDOM%256))) + fi + + # Kill old VM + if [[ -f "$pid_file" ]]; then + local pid=$(cat "$pid_file") + kill "$pid" 2>/dev/null || true + fi + done + + sleep 2 + + for node in cp1 cp2 cp3 w1 w2 w3; do + local pid_file="$WORK_DIR/${node}.pid" + local disk_path="$WORK_DIR/${node}.qcow2" + local tap_name="tap-${node}" + local mac_file="$WORK_DIR/${node}.mac" + + local memory cpus + if [[ "$node" == cp* ]]; then + memory="$CP_MEMORY" + cpus="$CP_CPUS" + else + memory="$WORKER_MEMORY" + cpus="$WORKER_CPUS" + fi + + local mac=$(cat "$mac_file" 2>/dev/null || printf '52:54:00:%02x:%02x:%02x' $((RANDOM%256)) $((RANDOM%256)) $((RANDOM%256))) + + log_info "Starting $node..." + + if [[ "$boot_mode" == "disk" ]]; then + # Boot from disk only + qemu-system-x86_64 \ + -name "$node" \ + -m "$memory" \ + -smp "$cpus" \ + -cpu host \ + -enable-kvm \ + -drive "file=$disk_path,format=qcow2,if=virtio" \ + -boot c \ + -netdev "tap,id=net0,ifname=$tap_name,script=no,downscript=no" \ + -device "virtio-net-pci,netdev=net0,mac=$mac" \ + -display none \ + -daemonize \ + -pidfile "$pid_file" \ + 2>/dev/null + else + # Boot from CD (maintenance mode) + qemu-system-x86_64 \ + -name "$node" \ + -m "$memory" \ + -smp "$cpus" \ + -cpu host \ + -enable-kvm \ + -drive "file=$disk_path,format=qcow2,if=virtio" \ + -cdrom "$ISO_PATH" \ + -boot d \ + -netdev "tap,id=net0,ifname=$tap_name,script=no,downscript=no" \ + -device "virtio-net-pci,netdev=net0,mac=$mac" \ + -display none \ + -daemonize \ + -pidfile "$pid_file" \ + 2>/dev/null + fi + done + + if [[ "$boot_mode" == "disk" ]]; then + log_success "VMs restarted (disk boot). Wait ~30s then: $0 check" + else + log_success "VMs restarted (CD boot). Wait ~30s then: $0 status" + fi +} + +# Main +case "${1:-help}" in + create) + create_cluster + ;; + destroy) + destroy_cluster + ;; + status) + show_status + ;; + check) + check_nodes + ;; + reboot) + restart_vms cd + ;; + reboot-disk) + restart_vms disk + ;; + apply) + apply_configs + ;; + bootstrap) + bootstrap_cluster + ;; + config) + generate_config + setup_talosconfig + ;; + connect) + show_connect_info + ;; + help|--help|-h) + usage + ;; + *) + log_error "Unknown command: $1" + usage + ;; +esac From 842b5edbc4e9e2d6b4df92511a8f47103326645b Mon Sep 17 00:00:00 2001 From: Kenny Udovic Date: Sat, 17 Jan 2026 15:39:23 -0500 Subject: [PATCH 2/3] feat: fix gRPC metadata format and VIP endpoint - reimplement VIP routing Add regression tests to gRPC metadata to prevent future problems chore: update readme --- README.md | 2 +- crates/talos-pilot-tui/src/app.rs | 33 +- .../src/components/lifecycle.rs | 136 ++--- crates/talos-rs/src/auth.rs | 23 +- crates/talos-rs/src/client.rs | 499 ++++++++---------- 5 files changed, 281 insertions(+), 412 deletions(-) diff --git a/README.md b/README.md index 4cb3872..e0fecc6 100644 --- a/README.md +++ b/README.md @@ -227,7 +227,7 @@ See [docs/local-talos-setup.md](docs/local-talos-setup.md) for setting up a loca ### Current Stats - **Core library**: ~1,760 lines across 8 modules -- **Tests**: 88 total (47 core + 8 TUI + 22 talos-rs + 11 doc) +- **Tests**: 98 total (47 core + 8 TUI + 32 talos-rs + 11 doc) - **Components**: 12 TUI components - **Build warnings**: 0 diff --git a/crates/talos-pilot-tui/src/app.rs b/crates/talos-pilot-tui/src/app.rs index aa2f9a8..9bbe67c 100644 --- a/crates/talos-pilot-tui/src/app.rs +++ b/crates/talos-pilot-tui/src/app.rs @@ -930,27 +930,22 @@ impl App { // Fetch logs from active services and set up client for streaming if let Some(client) = self.cluster.client() { - // Create a direct connection to the node (bypasses VIP issues) - match client.direct_to_node(&node_ip) { - Ok(node_client) => { - // Set the client for streaming capability - multi_logs.set_client(node_client.clone(), self.tail_lines); - - let service_refs: Vec<&str> = - active_services.iter().map(|s| s.as_str()).collect(); - match node_client.logs_multi(&service_refs, self.tail_lines).await { - Ok(logs) => { - multi_logs.set_logs(logs); - // Auto-start streaming for live updates - multi_logs.start_streaming(); - } - Err(e) => { - multi_logs.set_error(e.to_string()); - } - } + // Use node targeting through VIP + let node_client = client.with_node(&node_ip); + + // Set the client for streaming capability + multi_logs.set_client(node_client.clone(), self.tail_lines); + + let service_refs: Vec<&str> = + active_services.iter().map(|s| s.as_str()).collect(); + match node_client.logs_multi(&service_refs, self.tail_lines).await { + Ok(logs) => { + multi_logs.set_logs(logs); + // Auto-start streaming for live updates + multi_logs.start_streaming(); } Err(e) => { - multi_logs.set_error(format!("Failed to connect to node: {}", e)); + multi_logs.set_error(e.to_string()); } } } diff --git a/crates/talos-pilot-tui/src/components/lifecycle.rs b/crates/talos-pilot-tui/src/components/lifecycle.rs index 6725017..9dc877e 100644 --- a/crates/talos-pilot-tui/src/components/lifecycle.rs +++ b/crates/talos-pilot-tui/src/components/lifecycle.rs @@ -223,8 +223,49 @@ impl LifecycleComponent { // Get or create data let mut data = self.state.take_data().unwrap_or_default(); - // First, determine context name - if data.context_name.is_empty() { + // Fetch version information + match client.version().await { + Ok(versions) => { + // Get context name from talosconfig if not already set + if !versions.is_empty() && data.context_name.is_empty() { + let config_result = match &self.config_path { + Some(path) => { + let path_buf = std::path::PathBuf::from(path); + TalosConfig::load_from(&path_buf) + } + None => TalosConfig::load_default(), + }; + if let Ok(config) = config_result { + data.context_name = config.context; + } + } + + data.versions = versions; + } + Err(e) => { + self.state + .set_error(format!("Failed to fetch versions: {}", e)); + self.state.set_data(data); + return Ok(()); + } + } + + // Fetch time sync status + match client.time().await { + Ok(times) => { + data.time_info = times; + } + Err(e) => { + // Time fetch failure is not fatal - just log it + tracing::warn!("Failed to fetch time status: {}", e); + data.time_info.clear(); + } + } + + // Fetch discovery members using context-aware async version + let context_name = if !data.context_name.is_empty() { + data.context_name.clone() + } else { let config_result = match &self.config_path { Some(path) => { let path_buf = std::path::PathBuf::from(path); @@ -232,13 +273,11 @@ impl LifecycleComponent { } None => TalosConfig::load_default(), }; - if let Ok(config) = config_result { - data.context_name = config.context.clone(); - } - } + config_result + .map(|config| config.context) + .unwrap_or_default() + }; - // Fetch discovery members early to get node IPs for direct connections - let context_name = data.context_name.clone(); if !context_name.is_empty() { match get_discovery_members_for_context(&context_name, self.config_path.as_deref()) .await @@ -253,87 +292,6 @@ impl LifecycleComponent { } } - // Extract node IPs from discovery members for direct connections - // Each member has addresses - use the first one as the node IP - let node_ips: Vec = data - .discovery_members - .iter() - .filter_map(|m| m.addresses.first().cloned()) - .collect(); - - // Fetch version information using direct node connections - if !node_ips.is_empty() { - match client.version_for_nodes(&node_ips).await { - Ok(versions) => { - data.versions = versions; - } - Err(e) => { - self.state - .set_error(format!("Failed to fetch versions: {}", e)); - self.state.set_data(data); - return Ok(()); - } - } - } else { - // Fallback: try etcd_members to get control plane IPs - match client.etcd_members().await { - Ok(members) => { - let cp_ips: Vec = - members.iter().filter_map(|m| m.ip_address()).collect(); - if !cp_ips.is_empty() { - match client.version_for_nodes(&cp_ips).await { - Ok(versions) => { - data.versions = versions; - } - Err(e) => { - self.state - .set_error(format!("Failed to fetch versions: {}", e)); - self.state.set_data(data); - return Ok(()); - } - } - } else { - self.state - .set_error("No node IPs available for version query"); - self.state.set_data(data); - return Ok(()); - } - } - Err(e) => { - self.state - .set_error(format!("Failed to fetch etcd members: {}", e)); - self.state.set_data(data); - return Ok(()); - } - } - } - - // Fetch time sync status using direct node connections - let time_node_ips: Vec = if !node_ips.is_empty() { - node_ips.clone() - } else { - // Use IPs from versions we just fetched - data.versions - .iter() - .map(|v| v.node.split(':').next().unwrap_or(&v.node).to_string()) - .collect() - }; - - if !time_node_ips.is_empty() { - match client.time_for_nodes(&time_node_ips).await { - Ok(times) => { - data.time_info = times; - } - Err(e) => { - // Time fetch failure is not fatal - just log it - tracing::warn!("Failed to fetch time status: {}", e); - data.time_info.clear(); - } - } - } else { - data.time_info.clear(); - } - // Build node statuses combining version and time info // Fetch config hash for each node individually for drift detection let statuses: Vec = data diff --git a/crates/talos-rs/src/auth.rs b/crates/talos-rs/src/auth.rs index 069e185..b52c0fa 100644 --- a/crates/talos-rs/src/auth.rs +++ b/crates/talos-rs/src/auth.rs @@ -34,38 +34,29 @@ pub async fn create_channel(ctx: &Context) -> Result { .endpoint_url() .ok_or_else(|| TalosError::ConfigInvalid("No endpoints configured".to_string()))?; + tracing::debug!("Connecting to endpoint: {}", endpoint_url); + // Decode certificates from base64 let ca_pem = ctx.ca_pem()?; let client_cert_pem = ctx.client_cert_pem()?; let client_key_pem = ctx.client_key_pem()?; - create_channel_to_endpoint(&endpoint_url, &ca_pem, &client_cert_pem, &client_key_pem) -} - -/// Create a TLS-enabled gRPC channel to a specific endpoint using provided certificates -pub fn create_channel_to_endpoint( - endpoint_url: &str, - ca_pem: &[u8], - client_cert_pem: &[u8], - client_key_pem: &[u8], -) -> Result { - tracing::debug!("Connecting to endpoint: {}", endpoint_url); tracing::debug!("CA cert size: {} bytes", ca_pem.len()); tracing::debug!("Client cert size: {} bytes", client_cert_pem.len()); tracing::debug!("Client key size: {} bytes", client_key_pem.len()); // Convert Ed25519 key header to standard PKCS8 format if needed // Tonic expects "PRIVATE KEY" not "ED25519 PRIVATE KEY" - let client_key_pem = convert_ed25519_key_to_pkcs8(client_key_pem); + let client_key_pem = convert_ed25519_key_to_pkcs8(&client_key_pem); // Create TLS config - let ca = Certificate::from_pem(ca_pem); - let identity = Identity::from_pem(client_cert_pem, &client_key_pem); + let ca = Certificate::from_pem(&ca_pem); + let identity = Identity::from_pem(&client_cert_pem, &client_key_pem); let tls_config = ClientTlsConfig::new().ca_certificate(ca).identity(identity); // Build the channel (use connect_lazy to defer TLS handshake) - let endpoint = Channel::from_shared(endpoint_url.to_string()) + let endpoint = Channel::from_shared(endpoint_url.clone()) .map_err(|e| { TalosError::Connection(format!("Invalid endpoint URL '{}': {}", endpoint_url, e)) })? @@ -75,7 +66,7 @@ pub fn create_channel_to_endpoint( // Use connect_lazy - connection happens on first request let channel = endpoint.connect_lazy(); - tracing::debug!("Successfully created channel to {}", endpoint_url); + tracing::debug!("Successfully connected to {}", endpoint_url); Ok(channel) } diff --git a/crates/talos-rs/src/client.rs b/crates/talos-rs/src/client.rs index bfec036..2fbab70 100644 --- a/crates/talos-rs/src/client.rs +++ b/crates/talos-rs/src/client.rs @@ -33,12 +33,6 @@ pub struct TalosClient { nodes: Vec, /// Endpoints from configuration (used to filter out vIPs from node targeting) endpoints: Vec, - /// Decoded CA certificate PEM for creating additional connections - ca_pem: Vec, - /// Decoded client certificate PEM for creating additional connections - client_cert_pem: Vec, - /// Decoded client key PEM for creating additional connections - client_key_pem: Vec, } impl TalosClient { @@ -48,18 +42,10 @@ impl TalosClient { let nodes = ctx.target_nodes().to_vec(); let endpoints = ctx.endpoints.clone(); - // Store certificates for creating additional connections - let ca_pem = ctx.ca_pem()?; - let client_cert_pem = ctx.client_cert_pem()?; - let client_key_pem = ctx.client_key_pem()?; - Ok(Self { channel, nodes, endpoints, - ca_pem, - client_cert_pem, - client_key_pem, }) } @@ -82,43 +68,15 @@ impl TalosClient { /// Create a new client targeting a specific node /// /// This returns a clone of the client with requests directed to the specified node. - /// Note: This still uses the original channel (VIP), just with node targeting. - /// For direct connections, use `direct_to_node()` instead. + /// Uses gRPC metadata to target the node through the VIP/endpoint. pub fn with_node(&self, node: &str) -> Self { Self { channel: self.channel.clone(), nodes: vec![node.to_string()], endpoints: self.endpoints.clone(), - ca_pem: self.ca_pem.clone(), - client_cert_pem: self.client_cert_pem.clone(), - client_key_pem: self.client_key_pem.clone(), } } - /// Create a new client with a direct connection to a specific node IP - /// - /// This creates a new gRPC channel directly to the specified node, - /// bypassing any VIP or load balancer. Use this when node targeting - /// through VIP is unreliable. - pub fn direct_to_node(&self, node_ip: &str) -> Result { - let endpoint_url = format!("https://{}:50000", node_ip); - let channel = crate::auth::create_channel_to_endpoint( - &endpoint_url, - &self.ca_pem, - &self.client_cert_pem, - &self.client_key_pem, - )?; - - Ok(Self { - channel, - nodes: vec![node_ip.to_string()], - endpoints: vec![endpoint_url], - ca_pem: self.ca_pem.clone(), - client_cert_pem: self.client_cert_pem.clone(), - client_key_pem: self.client_key_pem.clone(), - }) - } - /// Get a MachineService client fn machine_client(&self) -> MachineServiceClient { MachineServiceClient::new(self.channel.clone()) @@ -157,6 +115,10 @@ impl TalosClient { /// Add node targeting metadata to a request /// If no explicit nodes are configured, don't add the header /// (Talos will respond from the endpoint node itself) + /// + /// Uses the correct Talos API metadata format: + /// - "node" (singular) for single-node targeting (direct proxy) + /// - "nodes" (plural) with multiple values for multi-node targeting (aggregated response) fn with_nodes(&self, mut request: Request) -> Request { // Only add nodes metadata if explicitly configured (not just endpoints) // When nodes is empty or same as endpoints, skip the header @@ -196,11 +158,20 @@ impl TalosClient { .map(|n| n.split(':').next().unwrap_or(n).to_string()) .collect(); - if !valid_nodes.is_empty() { - let nodes_str = valid_nodes.join(","); - if let Ok(value) = nodes_str.parse() { + if valid_nodes.len() == 1 { + // Single node: use "node" header (direct proxy, no aggregation) + if let Ok(value) = valid_nodes[0].parse() { request.metadata_mut().insert("node", value); } + } else if !valid_nodes.is_empty() { + // Multiple nodes: use "nodes" header with multiple values + // Each node must be appended as a separate metadata value (not comma-separated) + // This matches the Go client's behavior: md.Set("nodes", nodes...) + for node in &valid_nodes { + if let Ok(value) = node.parse() { + request.metadata_mut().append("nodes", value); + } + } } } request @@ -355,165 +326,6 @@ impl TalosClient { Ok(times) } - /// Get version information from specific nodes using direct connections - /// - /// This bypasses VIP/node targeting and connects directly to each node. - /// Pass node IPs to query specific nodes. - pub async fn version_for_nodes(&self, nodes: &[String]) -> Result, TalosError> { - if nodes.is_empty() { - // No specific nodes, use standard method - return self.version().await; - } - - let mut all_versions = Vec::new(); - - for node_ip in nodes { - let endpoint_url = format!("https://{}:50000", node_ip); - - match crate::auth::create_channel_to_endpoint( - &endpoint_url, - &self.ca_pem, - &self.client_cert_pem, - &self.client_key_pem, - ) { - Ok(channel) => { - let mut client = MachineServiceClient::new(channel); - let request = Request::new(()); - - match client.version(request).await { - Ok(response) => { - let inner = response.into_inner(); - for msg in inner.messages { - all_versions.push(VersionInfo { - node: node_ip.clone(), - version: msg - .version - .as_ref() - .map(|v| v.tag.clone()) - .unwrap_or_default(), - sha: msg - .version - .as_ref() - .map(|v| v.sha.clone()) - .unwrap_or_default(), - built: msg - .version - .as_ref() - .map(|v| v.built.clone()) - .unwrap_or_default(), - go_version: msg - .version - .as_ref() - .map(|v| v.go_version.clone()) - .unwrap_or_default(), - os: msg - .version - .as_ref() - .map(|v| v.os.clone()) - .unwrap_or_default(), - arch: msg - .version - .as_ref() - .map(|v| v.arch.clone()) - .unwrap_or_default(), - platform: msg - .platform - .as_ref() - .map(|p| p.name.clone()) - .unwrap_or_default(), - }); - } - } - Err(e) => { - tracing::debug!("Failed to get version from {}: {}", node_ip, e); - } - } - } - Err(e) => { - tracing::debug!("Failed to connect to {}: {}", node_ip, e); - } - } - } - - Ok(all_versions) - } - - /// Get time synchronization status from specific nodes using direct connections - /// - /// This bypasses VIP/node targeting and connects directly to each node. - pub async fn time_for_nodes(&self, nodes: &[String]) -> Result, TalosError> { - use crate::proto::time::time_service_client::TimeServiceClient; - - if nodes.is_empty() { - return self.time().await; - } - - const SYNC_TOLERANCE_SECS: f64 = 1.0; - let mut all_times = Vec::new(); - - for node_ip in nodes { - let endpoint_url = format!("https://{}:50000", node_ip); - - match crate::auth::create_channel_to_endpoint( - &endpoint_url, - &self.ca_pem, - &self.client_cert_pem, - &self.client_key_pem, - ) { - Ok(channel) => { - let mut client = TimeServiceClient::new(channel); - let request = Request::new(()); - - match client.time(request).await { - Ok(response) => { - let inner = response.into_inner(); - for msg in inner.messages { - let local_time = msg.localtime.map(|t| { - std::time::UNIX_EPOCH - + std::time::Duration::new(t.seconds as u64, t.nanos as u32) - }); - let remote_time = msg.remotetime.map(|t| { - std::time::UNIX_EPOCH - + std::time::Duration::new(t.seconds as u64, t.nanos as u32) - }); - - let offset_seconds = match (msg.localtime, msg.remotetime) { - (Some(local), Some(remote)) => { - let local_nanos = local.seconds as f64 * 1_000_000_000.0 - + local.nanos as f64; - let remote_nanos = remote.seconds as f64 * 1_000_000_000.0 - + remote.nanos as f64; - (local_nanos - remote_nanos) / 1_000_000_000.0 - } - _ => 0.0, - }; - - let synced = offset_seconds.abs() < SYNC_TOLERANCE_SECS; - - all_times.push(NodeTimeInfo { - node: node_ip.clone(), - server: msg.server, - local_time, - remote_time, - offset_seconds, - synced, - }); - } - } - Err(e) => { - tracing::debug!("Failed to get time from {}: {}", node_ip, e); - } - } - } - Err(e) => { - tracing::debug!("Failed to connect to {}: {}", node_ip, e); - } - } - } - - Ok(all_times) - } - /// Get list of services from all configured nodes pub async fn services(&self) -> Result, TalosError> { let mut client = self.machine_client(); @@ -882,92 +694,33 @@ impl TalosClient { /// /// Pass the IPs from `etcd_members()` to get status from all control planes. /// If nodes is empty, queries only the endpoint node. - /// - /// Note: This makes individual calls to each node rather than using node targeting, - /// as node targeting through VIPs can be unreliable. pub async fn etcd_status_for_nodes( &self, nodes: &[String], ) -> Result, TalosError> { - if nodes.is_empty() { - // No specific nodes, query via current endpoint - let mut client = self.machine_client(); - let request = Request::new(()); - let response = client.etcd_status(request).await?; - return self.parse_etcd_status_response(response.into_inner()); - } - - tracing::debug!( - "etcd_status_for_nodes querying {} nodes individually", - nodes.len() - ); + let mut client = self.machine_client(); - // Query each node individually and collect results - let mut all_statuses = Vec::new(); - - for node_ip in nodes { - // Create endpoint URL for this specific node - let endpoint_url = format!("https://{}:50000", node_ip); - tracing::debug!("Querying etcd status from: {}", endpoint_url); - - // Try to create a direct connection to this node using stored certificates - match crate::auth::create_channel_to_endpoint( - &endpoint_url, - &self.ca_pem, - &self.client_cert_pem, - &self.client_key_pem, - ) { - Ok(channel) => { - let mut client = MachineServiceClient::new(channel); - let request = Request::new(()); - - match client.etcd_status(request).await { - Ok(response) => { - if let Ok(statuses) = - self.parse_etcd_status_response(response.into_inner()) - { - all_statuses.extend(statuses); - } - } - Err(e) => { - tracing::debug!("Failed to get etcd status from {}: {}", node_ip, e); - } - } - } - Err(e) => { - tracing::debug!("Failed to connect to {}: {}", node_ip, e); + // Build request with node targeting if specific nodes provided + let request = if !nodes.is_empty() { + let mut req = Request::new(()); + // Use proper multi-node targeting via "nodes" header + for node in nodes { + if let Ok(value) = node.parse() { + req.metadata_mut().append("nodes", value); } } - } - - Ok(all_statuses) - } + req + } else { + Request::new(()) + }; - /// Parse etcd status response into EtcdMemberStatus structs - fn parse_etcd_status_response( - &self, - response: crate::proto::machine::EtcdStatusResponse, - ) -> Result, TalosError> { - tracing::debug!( - "etcd_status response: {} messages received", - response.messages.len() - ); + let response = client.etcd_status(request).await?; + let inner = response.into_inner(); - let statuses: Vec = response + let statuses: Vec = inner .messages .into_iter() .filter_map(|msg| { - let has_status = msg.member_status.is_some(); - let node_info = msg - .metadata - .as_ref() - .map(|m| m.hostname.as_str()) - .unwrap_or("unknown"); - tracing::debug!( - "etcd_status message from {}: has_status={}", - node_info, - has_status - ); msg.member_status.map(|status| EtcdMemberStatus { node: self.node_from_metadata(msg.metadata.as_ref(), 0), member_id: status.member_id, @@ -984,10 +737,6 @@ impl TalosClient { }) .collect(); - tracing::debug!( - "parse_etcd_status_response returning {} statuses", - statuses.len() - ); Ok(statuses) } @@ -2830,10 +2579,6 @@ mod tests { channel, nodes, endpoints, - // Empty certificates - not needed for filtering tests - ca_pem: Vec::new(), - client_cert_pem: Vec::new(), - client_key_pem: Vec::new(), } } @@ -3095,4 +2840,184 @@ mod tests { // But here, nodes only contains node1, node2 - both should remain assert_eq!(filtered, vec!["node1", "node2"]); } + + // ========================================================================= + // gRPC Metadata Format Tests + // + // These tests verify the correct gRPC metadata format for node targeting. + // This was the root cause of VIP + node targeting failures: + // - WRONG: insert("node", "node1,node2,node3") - single comma-separated value + // - CORRECT: append("nodes", "node1"), append("nodes", "node2") - multiple values + // + // Talos Go client behavior: + // - Single node: md.Set("node", node) -> one value + // - Multiple nodes: md.Set("nodes", nodes...) -> multiple values for same key + // ========================================================================= + + #[tokio::test] + async fn test_metadata_single_node_uses_node_header() { + // Single node should use "node" (singular) header + let client = create_test_client( + vec!["node1".to_string()], + vec!["vip.example.com:50000".to_string()], + ); + + let request: Request<()> = Request::new(()); + let request = client.with_nodes(request); + + // Should have "node" header (singular), not "nodes" + let metadata = request.metadata(); + assert!( + metadata.get("node").is_some(), + "Single node should use 'node' header" + ); + assert!( + metadata.get("nodes").is_none(), + "Single node should NOT use 'nodes' header" + ); + assert_eq!(metadata.get("node").unwrap(), "node1"); + } + + #[tokio::test] + async fn test_metadata_multiple_nodes_uses_nodes_header() { + // Multiple nodes should use "nodes" (plural) header + let client = create_test_client( + vec!["node1".to_string(), "node2".to_string(), "node3".to_string()], + vec!["vip.example.com:50000".to_string()], + ); + + let request: Request<()> = Request::new(()); + let request = client.with_nodes(request); + + // Should have "nodes" header (plural), not "node" + let metadata = request.metadata(); + assert!( + metadata.get("node").is_none(), + "Multiple nodes should NOT use 'node' header" + ); + assert!( + metadata.get("nodes").is_some(), + "Multiple nodes should use 'nodes' header" + ); + } + + #[tokio::test] + async fn test_metadata_multiple_nodes_are_separate_values_not_comma_separated() { + // CRITICAL: This is the root cause test + // Multiple nodes must be separate metadata values, NOT comma-separated + // tonic's append() creates multiple values for the same key + // This matches Talos Go client's md.Set("nodes", nodes...) behavior + let client = create_test_client( + vec!["node1".to_string(), "node2".to_string(), "node3".to_string()], + vec!["vip.example.com:50000".to_string()], + ); + + let request: Request<()> = Request::new(()); + let request = client.with_nodes(request); + + let metadata = request.metadata(); + + // Get all values for "nodes" header + let nodes_values: Vec<&str> = metadata + .get_all("nodes") + .iter() + .filter_map(|v| v.to_str().ok()) + .collect(); + + // Should have 3 separate values, not 1 comma-separated value + assert_eq!( + nodes_values.len(), + 3, + "Expected 3 separate metadata values, got {}: {:?}", + nodes_values.len(), + nodes_values + ); + + // Each value should be a single node, not comma-separated + for value in &nodes_values { + assert!( + !value.contains(','), + "Node metadata values should NOT be comma-separated: {}", + value + ); + } + + // Verify the actual values + assert!(nodes_values.contains(&"node1")); + assert!(nodes_values.contains(&"node2")); + assert!(nodes_values.contains(&"node3")); + } + + #[tokio::test] + async fn test_metadata_empty_nodes_no_header() { + // Empty nodes should not add any header + let client = create_test_client(vec![], vec!["vip.example.com:50000".to_string()]); + + let request: Request<()> = Request::new(()); + let request = client.with_nodes(request); + + let metadata = request.metadata(); + assert!( + metadata.get("node").is_none(), + "Empty nodes should not add 'node' header" + ); + assert!( + metadata.get("nodes").is_none(), + "Empty nodes should not add 'nodes' header" + ); + } + + #[tokio::test] + async fn test_metadata_two_nodes_uses_nodes_header() { + // Two nodes should use "nodes" (plural) header, not "node" + // This is a boundary condition - even 2 nodes should use the plural form + let client = create_test_client( + vec!["node1".to_string(), "node2".to_string()], + vec!["vip.example.com:50000".to_string()], + ); + + let request: Request<()> = Request::new(()); + let request = client.with_nodes(request); + + let metadata = request.metadata(); + assert!( + metadata.get("node").is_none(), + "Two nodes should NOT use 'node' header" + ); + assert!( + metadata.get("nodes").is_some(), + "Two nodes should use 'nodes' header" + ); + + let nodes_values: Vec<&str> = metadata + .get_all("nodes") + .iter() + .filter_map(|v| v.to_str().ok()) + .collect(); + assert_eq!(nodes_values.len(), 2); + } + + #[tokio::test] + async fn test_metadata_preserves_node_names_without_ports() { + // Port stripping happens before metadata is set + let client = create_test_client( + vec!["node1:50000".to_string(), "node2:50000".to_string()], + vec!["vip.example.com:50000".to_string()], + ); + + let request: Request<()> = Request::new(()); + let request = client.with_nodes(request); + + let metadata = request.metadata(); + let nodes_values: Vec<&str> = metadata + .get_all("nodes") + .iter() + .filter_map(|v| v.to_str().ok()) + .collect(); + + // Ports should be stripped + assert!(nodes_values.contains(&"node1")); + assert!(nodes_values.contains(&"node2")); + assert!(!nodes_values.iter().any(|v| v.contains(':'))); + } } From 6defe377bc13cbcd65d73bc2117231056c64bd21 Mon Sep 17 00:00:00 2001 From: Kenny Udovic Date: Sat, 17 Jan 2026 15:57:24 -0500 Subject: [PATCH 3/3] refactor: better ui for Pods and PDBs chore: run linters --- .../src/components/diagnostics/k8s.rs | 100 +++++++++++++++--- .../src/components/lifecycle.rs | 80 +++++++++++++- crates/talos-rs/src/client.rs | 12 ++- 3 files changed, 170 insertions(+), 22 deletions(-) diff --git a/crates/talos-pilot-tui/src/components/diagnostics/k8s.rs b/crates/talos-pilot-tui/src/components/diagnostics/k8s.rs index 1b2c16a..ba9284b 100644 --- a/crates/talos-pilot-tui/src/components/diagnostics/k8s.rs +++ b/crates/talos-pilot-tui/src/components/diagnostics/k8s.rs @@ -24,42 +24,99 @@ pub enum K8sError { ApiError(String), } +/// Source of the kubeconfig used to create the K8s client +#[derive(Debug, Clone)] +pub enum KubeconfigSource { + /// From KUBECONFIG environment variable or default path (~/.kube/config) + Environment, + /// Fetched from a specific Talos control plane node + TalosNode(String), + /// Source unknown or unavailable + Unavailable(String), +} + /// Create a Kubernetes client from Talos-provided kubeconfig /// /// If `kubeconfig_client` is provided, it will be used to fetch the kubeconfig. /// This is useful when diagnosing worker nodes that don't have the kubeconfig endpoint. pub async fn create_k8s_client(talos_client: &TalosClient) -> Result { - create_k8s_client_with_kubeconfig_source(talos_client, None).await + let (client, _source) = create_k8s_client_with_source(talos_client, None, None).await?; + Ok(client) } -/// Create a Kubernetes client, optionally using a different client to fetch kubeconfig +/// Create a Kubernetes client and return the source of the kubeconfig /// -/// This allows fetching kubeconfig from a control plane node while diagnosing a worker node. -/// -/// The function tries sources in this order: +/// This function tries sources in this order: /// 1. KUBECONFIG environment variable (via Config::infer()) -/// 2. Fetching kubeconfig from Talos API (fallback) -pub async fn create_k8s_client_with_kubeconfig_source( - _talos_client: &TalosClient, +/// 2. Fetching kubeconfig from a specific control plane node (if cp_node_ip provided) +/// 3. Fetching kubeconfig from Talos API via VIP (fallback, may fail with multiple nodes) +/// +/// # Arguments +/// * `talos_client` - The main Talos client (connected to VIP or endpoint) +/// * `cp_node_ip` - Optional control plane node IP to target for kubeconfig fetch +/// * `kubeconfig_client` - Optional pre-configured client targeting a control plane node +/// +/// # Returns +/// A tuple of (Client, KubeconfigSource) on success +pub async fn create_k8s_client_with_source( + talos_client: &TalosClient, + cp_node_ip: Option<&str>, kubeconfig_client: Option<&TalosClient>, -) -> Result { +) -> Result<(Client, KubeconfigSource), K8sError> { // Try KUBECONFIG environment variable first (via Config::infer()) // This respects standard K8s tooling conventions if let Ok(config) = Config::infer().await && let Ok(client) = Client::try_from(config) { tracing::debug!("Using kubeconfig from environment (KUBECONFIG or default path)"); - return Ok(client); + return Ok((client, KubeconfigSource::Environment)); + } + + tracing::debug!("KUBECONFIG not available, falling back to Talos API"); + + // If a specific control plane node IP is provided, target that node + if let Some(node_ip) = cp_node_ip { + tracing::debug!("Targeting control plane node {} for kubeconfig", node_ip); + let node_client = talos_client.with_node(node_ip); + match fetch_kubeconfig_from_client(&node_client).await { + Ok(client) => { + return Ok((client, KubeconfigSource::TalosNode(node_ip.to_string()))); + } + Err(e) => { + tracing::warn!("Failed to fetch kubeconfig from node {}: {}", node_ip, e); + // Fall through to try other methods + } + } } - // Fall back to fetching kubeconfig from Talos API - tracing::debug!("Falling back to fetching kubeconfig from Talos API"); + // Try the provided kubeconfig_client if available + if let Some(kc_client) = kubeconfig_client { + tracing::debug!("Trying provided kubeconfig client"); + match fetch_kubeconfig_from_client(kc_client).await { + Ok(client) => { + return Ok(( + client, + KubeconfigSource::TalosNode("control-plane".to_string()), + )); + } + Err(e) => { + tracing::warn!("Failed to fetch kubeconfig from provided client: {}", e); + } + } + } - // Use the provided kubeconfig_client if available, otherwise use the main client - let client_for_kubeconfig = kubeconfig_client.unwrap_or(_talos_client); + // Last resort: try the main client (may fail with multiple nodes configured) + tracing::debug!("Trying main Talos client for kubeconfig (may fail with multiple nodes)"); + match fetch_kubeconfig_from_client(talos_client).await { + Ok(client) => Ok((client, KubeconfigSource::TalosNode("vip".to_string()))), + Err(e) => Err(e), + } +} +/// Fetch kubeconfig from a specific Talos client and create a K8s client +async fn fetch_kubeconfig_from_client(client: &TalosClient) -> Result { // Get kubeconfig from Talos - let kubeconfig_yaml = client_for_kubeconfig + let kubeconfig_yaml = client .kubeconfig() .await .map_err(|e| K8sError::KubeconfigFetch(e.to_string()))?; @@ -77,6 +134,19 @@ pub async fn create_k8s_client_with_kubeconfig_source( Client::try_from(config).map_err(|e| K8sError::ClientCreate(e.to_string())) } +/// Create a Kubernetes client, optionally using a different client to fetch kubeconfig +/// +/// This allows fetching kubeconfig from a control plane node while diagnosing a worker node. +/// Legacy function - prefer create_k8s_client_with_source for new code. +pub async fn create_k8s_client_with_kubeconfig_source( + talos_client: &TalosClient, + kubeconfig_client: Option<&TalosClient>, +) -> Result { + let (client, _source) = + create_k8s_client_with_source(talos_client, None, kubeconfig_client).await?; + Ok(client) +} + /// Detected CNI information from K8s #[derive(Debug, Clone, Default)] pub struct CniInfo { diff --git a/crates/talos-pilot-tui/src/components/lifecycle.rs b/crates/talos-pilot-tui/src/components/lifecycle.rs index 9dc877e..db8bf14 100644 --- a/crates/talos-pilot-tui/src/components/lifecycle.rs +++ b/crates/talos-pilot-tui/src/components/lifecycle.rs @@ -5,7 +5,8 @@ use crate::action::Action; use crate::components::Component; use crate::components::diagnostics::k8s::{ - PdbHealthInfo, PodHealthInfo, check_pdb_health, check_pod_health, create_k8s_client, + KubeconfigSource, PdbHealthInfo, PodHealthInfo, check_pdb_health, check_pod_health, + create_k8s_client_with_source, }; use crate::ui_ext::HealthIndicatorExt; use color_eyre::Result; @@ -158,6 +159,9 @@ pub struct LifecycleComponent { /// K8s client for pod/PDB checks (reusable, not part of loaded data) k8s_client: Option, + /// Source of the kubeconfig (for display in UI) + k8s_source: Option, + /// Custom config file path (from --config flag) config_path: Option, } @@ -187,6 +191,7 @@ impl LifecycleComponent { auto_refresh: true, client: None, k8s_client: None, + k8s_source: None, config_path, } } @@ -341,14 +346,24 @@ impl LifecycleComponent { /// Fetch pre-operation health checks into data async fn fetch_pre_op_checks_into(&mut self, data: &mut LifecycleData, client: &TalosClient) { + // Get a control plane node IP for kubeconfig fetch + // We try to get this from etcd members (only control plane nodes run etcd) + let cp_node_ip = match client.etcd_members().await { + Ok(members) => members.first().and_then(|m| m.ip_address()), + Err(_) => None, + }; + // Initialize K8s client if not already done if self.k8s_client.is_none() { - match create_k8s_client(client).await { - Ok(k8s) => { + match create_k8s_client_with_source(client, cp_node_ip.as_deref(), None).await { + Ok((k8s, source)) => { + tracing::info!("K8s client created from: {:?}", source); self.k8s_client = Some(k8s); + self.k8s_source = Some(source); } Err(e) => { tracing::warn!("Failed to create K8s client: {}", e); + self.k8s_source = Some(KubeconfigSource::Unavailable(e.to_string())); } } } @@ -773,6 +788,43 @@ impl LifecycleComponent { ])); } + // K8s source info (show where kubeconfig came from) + let k8s_source_text = match &self.k8s_source { + Some(KubeconfigSource::Environment) => "via KUBECONFIG".to_string(), + Some(KubeconfigSource::TalosNode(node)) => format!("via node {}", node), + Some(KubeconfigSource::Unavailable(err)) => { + // Truncate long error messages + let short_err = if err.len() > 40 { + format!("{}...", &err[..40]) + } else { + err.clone() + }; + format!("unavailable: {}", short_err) + } + None => "not initialized".to_string(), + }; + + let k8s_color = match &self.k8s_source { + Some(KubeconfigSource::Environment) | Some(KubeconfigSource::TalosNode(_)) => { + Color::Green + } + _ => Color::DarkGray, + }; + + lines.push(Line::from(vec![ + Span::raw(" "), + Span::styled( + if self.k8s_client.is_some() { + "✓" + } else { + "?" + }, + Style::default().fg(k8s_color), + ), + Span::raw(" K8s: "), + Span::styled(k8s_source_text, Style::default().fg(k8s_color)), + ])); + // Pod health check if let Some(ref pods) = pre_op_checks.pod_health { let has_issues = pods.has_issues(); @@ -801,11 +853,20 @@ impl LifecycleComponent { Span::styled(summary, Style::default().fg(color)), ])); } else { + // Show hint about why pods are unavailable + let hint = match &self.k8s_source { + Some(KubeconfigSource::Unavailable(_)) => "set KUBECONFIG", + None => "loading...", + _ => "K8s API error", + }; lines.push(Line::from(vec![ Span::raw(" "), Span::styled("?", Style::default().fg(Color::DarkGray)), Span::raw(" Pods: "), - Span::styled("unavailable", Style::default().fg(Color::DarkGray)), + Span::styled( + format!("unavailable ({})", hint), + Style::default().fg(Color::DarkGray), + ), ])); } @@ -824,11 +885,20 @@ impl LifecycleComponent { Span::styled(pdbs.summary(), Style::default().fg(color)), ])); } else { + // Show hint about why PDBs are unavailable + let hint = match &self.k8s_source { + Some(KubeconfigSource::Unavailable(_)) => "set KUBECONFIG", + None => "loading...", + _ => "K8s API error", + }; lines.push(Line::from(vec![ Span::raw(" "), Span::styled("?", Style::default().fg(Color::DarkGray)), Span::raw(" PDBs: "), - Span::styled("unavailable", Style::default().fg(Color::DarkGray)), + Span::styled( + format!("unavailable ({})", hint), + Style::default().fg(Color::DarkGray), + ), ])); } diff --git a/crates/talos-rs/src/client.rs b/crates/talos-rs/src/client.rs index 2fbab70..6f0813d 100644 --- a/crates/talos-rs/src/client.rs +++ b/crates/talos-rs/src/client.rs @@ -2882,7 +2882,11 @@ mod tests { async fn test_metadata_multiple_nodes_uses_nodes_header() { // Multiple nodes should use "nodes" (plural) header let client = create_test_client( - vec!["node1".to_string(), "node2".to_string(), "node3".to_string()], + vec![ + "node1".to_string(), + "node2".to_string(), + "node3".to_string(), + ], vec!["vip.example.com:50000".to_string()], ); @@ -2908,7 +2912,11 @@ mod tests { // tonic's append() creates multiple values for the same key // This matches Talos Go client's md.Set("nodes", nodes...) behavior let client = create_test_client( - vec!["node1".to_string(), "node2".to_string(), "node3".to_string()], + vec![ + "node1".to_string(), + "node2".to_string(), + "node3".to_string(), + ], vec!["vip.example.com:50000".to_string()], );