Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ See [docs/local-talos-setup.md](docs/local-talos-setup.md) for setting up a loca
### Current Stats

- **Core library**: ~1,760 lines across 8 modules
- **Tests**: 88 total (47 core + 8 TUI + 22 talos-rs + 11 doc)
- **Tests**: 98 total (47 core + 8 TUI + 32 talos-rs + 11 doc)
- **Components**: 12 TUI components
- **Build warnings**: 0

Expand Down
9 changes: 6 additions & 3 deletions crates/talos-pilot-tui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,20 +922,23 @@ impl App {

// Create multi-logs component with all services, marking active ones
let mut multi_logs = MultiLogsComponent::new(
node_ip,
node_ip.clone(),
node_role,
active_services.clone(),
all_services,
);

// Fetch logs from active services and set up client for streaming
if let Some(client) = self.cluster.client() {
// Use node targeting through VIP
let node_client = client.with_node(&node_ip);

// Set the client for streaming capability
multi_logs.set_client(client.clone(), self.tail_lines);
multi_logs.set_client(node_client.clone(), self.tail_lines);

let service_refs: Vec<&str> =
active_services.iter().map(|s| s.as_str()).collect();
match client.logs_multi(&service_refs, self.tail_lines).await {
match node_client.logs_multi(&service_refs, self.tail_lines).await {
Ok(logs) => {
multi_logs.set_logs(logs);
// Auto-start streaming for live updates
Expand Down
35 changes: 28 additions & 7 deletions crates/talos-pilot-tui/src/components/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,12 +573,13 @@ impl ClusterComponent {

// Fetch etcd status for header summary (target all control planes)
if let Some(client) = &cluster.client {
let cp_hostnames: Vec<String> = cluster
// Use IPs instead of hostnames (hostnames may not be resolvable)
let cp_ips: Vec<String> = cluster
.etcd_members
.iter()
.map(|m| m.hostname.clone())
.filter_map(|m| m.ip_address())
.collect();
if let Ok(statuses) = client.etcd_status_for_nodes(&cp_hostnames).await {
if let Ok(statuses) = client.etcd_status_for_nodes(&cp_ips).await {
let total = cluster.etcd_members.len();
let healthy = statuses.len();
let quorum_needed = total / 2 + 1;
Expand Down Expand Up @@ -830,8 +831,13 @@ impl ClusterComponent {
let service_ids = self.current_service_ids();
if !service_ids.is_empty() {
let node_role = self.current_node_role();
let node_ip = self
.node_ips()
.get(&node_name)
.cloned()
.unwrap_or(node_name.clone());
Ok(Some(Action::ShowMultiLogs(
node_name,
node_ip,
node_role,
service_ids.clone(),
service_ids,
Expand Down Expand Up @@ -1009,8 +1015,13 @@ impl Component for ClusterComponent {
let service_ids = self.current_service_ids();
if !service_ids.is_empty() {
let node_role = self.current_node_role();
let node_ip = self
.node_ips()
.get(&node_name)
.cloned()
.unwrap_or(node_name.clone());
Ok(Some(Action::ShowMultiLogs(
node_name,
node_ip,
node_role,
service_ids.clone(),
service_ids,
Expand All @@ -1034,8 +1045,13 @@ impl Component for ClusterComponent {
if let Some(service_id) = self.selected_service_id() {
let node_role = self.current_node_role();
let all_services = self.current_service_ids();
let node_ip = self
.node_ips()
.get(&node_name)
.cloned()
.unwrap_or(node_name.clone());
Ok(Some(Action::ShowMultiLogs(
node_name,
node_ip,
node_role,
vec![service_id],
all_services,
Expand All @@ -1056,8 +1072,13 @@ impl Component for ClusterComponent {
let service_ids = self.current_service_ids();
if !service_ids.is_empty() {
let node_role = self.current_node_role();
let node_ip = self
.node_ips()
.get(&node_name)
.cloned()
.unwrap_or(node_name.clone());
Ok(Some(Action::ShowMultiLogs(
node_name,
node_ip,
node_role,
service_ids.clone(),
service_ids,
Expand Down
100 changes: 85 additions & 15 deletions crates/talos-pilot-tui/src/components/diagnostics/k8s.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,99 @@ pub enum K8sError {
ApiError(String),
}

/// Source of the kubeconfig used to create the K8s client
#[derive(Debug, Clone)]
pub enum KubeconfigSource {
/// From KUBECONFIG environment variable or default path (~/.kube/config)
Environment,
/// Fetched from a specific Talos control plane node
TalosNode(String),
/// Source unknown or unavailable
Unavailable(String),
}

/// Create a Kubernetes client from Talos-provided kubeconfig
///
/// If `kubeconfig_client` is provided, it will be used to fetch the kubeconfig.
/// This is useful when diagnosing worker nodes that don't have the kubeconfig endpoint.
pub async fn create_k8s_client(talos_client: &TalosClient) -> Result<Client, K8sError> {
create_k8s_client_with_kubeconfig_source(talos_client, None).await
let (client, _source) = create_k8s_client_with_source(talos_client, None, None).await?;
Ok(client)
}

/// Create a Kubernetes client, optionally using a different client to fetch kubeconfig
/// Create a Kubernetes client and return the source of the kubeconfig
///
/// This allows fetching kubeconfig from a control plane node while diagnosing a worker node.
///
/// The function tries sources in this order:
/// This function tries sources in this order:
/// 1. KUBECONFIG environment variable (via Config::infer())
/// 2. Fetching kubeconfig from Talos API (fallback)
pub async fn create_k8s_client_with_kubeconfig_source(
_talos_client: &TalosClient,
/// 2. Fetching kubeconfig from a specific control plane node (if cp_node_ip provided)
/// 3. Fetching kubeconfig from Talos API via VIP (fallback, may fail with multiple nodes)
///
/// # Arguments
/// * `talos_client` - The main Talos client (connected to VIP or endpoint)
/// * `cp_node_ip` - Optional control plane node IP to target for kubeconfig fetch
/// * `kubeconfig_client` - Optional pre-configured client targeting a control plane node
///
/// # Returns
/// A tuple of (Client, KubeconfigSource) on success
pub async fn create_k8s_client_with_source(
talos_client: &TalosClient,
cp_node_ip: Option<&str>,
kubeconfig_client: Option<&TalosClient>,
) -> Result<Client, K8sError> {
) -> Result<(Client, KubeconfigSource), K8sError> {
// Try KUBECONFIG environment variable first (via Config::infer())
// This respects standard K8s tooling conventions
if let Ok(config) = Config::infer().await
&& let Ok(client) = Client::try_from(config)
{
tracing::debug!("Using kubeconfig from environment (KUBECONFIG or default path)");
return Ok(client);
return Ok((client, KubeconfigSource::Environment));
}

tracing::debug!("KUBECONFIG not available, falling back to Talos API");

// If a specific control plane node IP is provided, target that node
if let Some(node_ip) = cp_node_ip {
tracing::debug!("Targeting control plane node {} for kubeconfig", node_ip);
let node_client = talos_client.with_node(node_ip);
match fetch_kubeconfig_from_client(&node_client).await {
Ok(client) => {
return Ok((client, KubeconfigSource::TalosNode(node_ip.to_string())));
}
Err(e) => {
tracing::warn!("Failed to fetch kubeconfig from node {}: {}", node_ip, e);
// Fall through to try other methods
}
}
}

// Fall back to fetching kubeconfig from Talos API
tracing::debug!("Falling back to fetching kubeconfig from Talos API");
// Try the provided kubeconfig_client if available
if let Some(kc_client) = kubeconfig_client {
tracing::debug!("Trying provided kubeconfig client");
match fetch_kubeconfig_from_client(kc_client).await {
Ok(client) => {
return Ok((
client,
KubeconfigSource::TalosNode("control-plane".to_string()),
));
}
Err(e) => {
tracing::warn!("Failed to fetch kubeconfig from provided client: {}", e);
}
}
}

// Use the provided kubeconfig_client if available, otherwise use the main client
let client_for_kubeconfig = kubeconfig_client.unwrap_or(_talos_client);
// Last resort: try the main client (may fail with multiple nodes configured)
tracing::debug!("Trying main Talos client for kubeconfig (may fail with multiple nodes)");
match fetch_kubeconfig_from_client(talos_client).await {
Ok(client) => Ok((client, KubeconfigSource::TalosNode("vip".to_string()))),
Err(e) => Err(e),
}
}

/// Fetch kubeconfig from a specific Talos client and create a K8s client
async fn fetch_kubeconfig_from_client(client: &TalosClient) -> Result<Client, K8sError> {
// Get kubeconfig from Talos
let kubeconfig_yaml = client_for_kubeconfig
let kubeconfig_yaml = client
.kubeconfig()
.await
.map_err(|e| K8sError::KubeconfigFetch(e.to_string()))?;
Expand All @@ -77,6 +134,19 @@ pub async fn create_k8s_client_with_kubeconfig_source(
Client::try_from(config).map_err(|e| K8sError::ClientCreate(e.to_string()))
}

/// Create a Kubernetes client, optionally using a different client to fetch kubeconfig
///
/// This allows fetching kubeconfig from a control plane node while diagnosing a worker node.
/// Legacy function - prefer create_k8s_client_with_source for new code.
pub async fn create_k8s_client_with_kubeconfig_source(
talos_client: &TalosClient,
kubeconfig_client: Option<&TalosClient>,
) -> Result<Client, K8sError> {
let (client, _source) =
create_k8s_client_with_source(talos_client, None, kubeconfig_client).await?;
Ok(client)
}

/// Detected CNI information from K8s
#[derive(Debug, Clone, Default)]
pub struct CniInfo {
Expand Down
26 changes: 12 additions & 14 deletions crates/talos-pilot-tui/src/components/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,14 @@ impl EtcdComponent {
}
};

// Step 2: Extract control plane hostnames from members
let cp_hostnames: Vec<String> = member_infos.iter().map(|m| m.hostname.clone()).collect();
// Step 2: Extract control plane IPs from members (hostnames may not be resolvable)
let cp_ips: Vec<String> = member_infos.iter().filter_map(|m| m.ip_address()).collect();

tracing::debug!(
"Fetching etcd status from control planes: {:?}",
cp_hostnames
);
tracing::debug!("Fetching etcd status from control planes: {:?}", cp_ips);

// Step 3: Fetch status (targeting all CPs) and alarms in parallel
// Step 3: Fetch status (targeting all CPs by IP) and alarms in parallel
let fetch_result = tokio::time::timeout(timeout, async {
tokio::join!(
client.etcd_status_for_nodes(&cp_hostnames),
client.etcd_alarms()
)
tokio::join!(client.etcd_status_for_nodes(&cp_ips), client.etcd_alarms())
})
.await;

Expand Down Expand Up @@ -599,13 +593,13 @@ impl Component for EtcdComponent {
if data.members.is_empty() {
return Ok(None);
}
// Use first member's hostname as the "node" but show all etcd services
// Use first member's IP as the "node" (hostnames may not be resolvable)
// In practice, with the current API this shows etcd logs from all connected nodes
let node = data
.members
.items()
.first()
.map(|m| m.info.hostname.clone())
.and_then(|m| m.info.ip_address())
.unwrap_or_else(|| "controlplane".to_string());
let etcd_vec = vec!["etcd".to_string()];
Ok(Some(Action::ShowMultiLogs(
Expand All @@ -617,10 +611,14 @@ impl Component for EtcdComponent {
}
KeyCode::Enter => {
// View etcd logs for selected member
// Use IP address since hostnames may not be resolvable
if let Some(data) = self.data()
&& let Some(member) = data.members.selected()
{
let node = member.info.hostname.clone();
let node = member
.info
.ip_address()
.unwrap_or_else(|| member.info.hostname.clone());
let etcd_vec = vec!["etcd".to_string()];
return Ok(Some(Action::ShowMultiLogs(
node,
Expand Down
Loading