Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,14 @@ lazy-regex = "3.5.1"
# Atomic file operations
tempfile = "3.24.0"

[dev-dependencies]
bytes = "1.11.0"
http = "1.3.1"
http-body-util = "0.1.3"
tower = { version = "0.5.2", features = ["util"] }

[profile.release]
strip = true
lto = "thin"
codegen-units = 1
opt-level = "z" # Optimize for size

177 changes: 160 additions & 17 deletions src/kubernetes/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use kube::config::{KubeConfigOptions, Kubeconfig};
use kube::{Api, Client, Config, api::ListParams};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex as StdMutex};

use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, info, trace, warn};

use super::ApiFilters;
Expand Down Expand Up @@ -145,15 +145,23 @@ impl CachedRegistry {
pub struct K8sClientPool {
kubeconfig: Kubeconfig,
clients: Arc<RwLock<HashMap<String, Client>>>,
client_creation_locks: Arc<StdMutex<HashMap<String, Arc<Mutex<()>>>>>,
registries: Arc<RwLock<HashMap<String, CachedRegistry>>>,
/// Current active contexts (supports multi-USE)
current_contexts: Arc<RwLock<Vec<String>>>,
/// Progress reporter for query status updates
progress: ProgressHandle,
/// Local disk cache for CRD discovery results
resource_cache: ResourceCache,
#[cfg(test)]
client_factory: Arc<RwLock<Option<Arc<TestClientFactory>>>>,
}

#[cfg(test)]
type TestClientFactory = dyn Fn(String) -> Pin<Box<dyn std::future::Future<Output = Result<Client>> + Send>>
+ Send
+ Sync;

/// Extract (group, version, kind) tuples from CRD list
/// Only includes the storage version (the canonical/preferred version)
/// This matches kubectl's behavior of showing one version per CRD
Expand Down Expand Up @@ -181,13 +189,28 @@ impl K8sClientPool {
/// Create a minimal client pool for unit tests (no kubeconfig required)
#[cfg(test)]
pub fn new_for_test(progress: crate::progress::ProgressHandle) -> Self {
let test_context = "test-context".to_string();
Self {
kubeconfig: Kubeconfig::default(),
kubeconfig: Kubeconfig {
contexts: vec![kube::config::NamedContext {
name: test_context.clone(),
context: Some(kube::config::Context {
cluster: "test-cluster".to_string(),
user: None,
namespace: None,
extensions: None,
}),
}],
current_context: Some(test_context.clone()),
..Kubeconfig::default()
},
clients: Arc::new(RwLock::new(HashMap::new())),
client_creation_locks: Arc::new(StdMutex::new(HashMap::new())),
registries: Arc::new(RwLock::new(HashMap::new())),
current_contexts: Arc::new(RwLock::new(vec!["test-context".to_string()])),
current_contexts: Arc::new(RwLock::new(vec![test_context])),
progress,
resource_cache: ResourceCache::new().expect("Failed to create test cache"),
client_factory: Arc::new(RwLock::new(None)),
}
}

Expand All @@ -212,10 +235,13 @@ impl K8sClientPool {
Ok(Self {
kubeconfig,
clients: Arc::new(RwLock::new(HashMap::new())),
client_creation_locks: Arc::new(StdMutex::new(HashMap::new())),
registries: Arc::new(RwLock::new(HashMap::new())),
current_contexts: Arc::new(RwLock::new(vec![context_name])),
progress: crate::progress::create_progress_handle(),
resource_cache: ResourceCache::new()?,
#[cfg(test)]
client_factory: Arc::new(RwLock::new(None)),
})
}

Expand Down Expand Up @@ -627,11 +653,56 @@ impl K8sClientPool {
}
}

let client_creation_lock = self.client_creation_lock(context);
let _client_creation_guard = client_creation_lock.lock().await;

{
let clients = self.clients.read().await;
if let Some(client) = clients.get(context) {
self.progress
.connected(context, start.elapsed().as_millis() as u64);
return Ok(client.clone());
}
}

// Verify context exists
if !self.kubeconfig.contexts.iter().any(|c| c.name == context) {
return Err(anyhow!("Context '{}' not found in kubeconfig", context));
}

let client = self.build_client(context).await?;

// Report connected
self.progress
.connected(context, start.elapsed().as_millis() as u64);

// Cache it
{
let mut clients = self.clients.write().await;
clients.insert(context.to_string(), client.clone());
}

Ok(client)
}

fn client_creation_lock(&self, context: &str) -> Arc<Mutex<()>> {
let mut locks = self
.client_creation_locks
.lock()
.expect("client creation locks mutex poisoned");

locks
.entry(context.to_string())
.or_insert_with(|| Arc::new(Mutex::new(())))
Comment on lines +695 to +696

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Validate context before caching creation lock

get_or_create_client now calls client_creation_lock(context) before checking whether the context exists, and client_creation_lock inserts a new mutex entry for every unique context string. Because these entries are never removed, repeated requests with invalid context names (e.g., from daemon/SQL callers passing arbitrary contexts) cause unbounded growth of client_creation_locks and permanent memory usage increase; move validation ahead of lock creation or avoid storing locks for unknown contexts.

Useful? React with 👍 / 👎.

.clone()
}

async fn build_client(&self, context: &str) -> Result<Client> {
#[cfg(test)]
if let Some(factory) = self.client_factory.read().await.clone() {
return factory(context.to_string()).await;
}

// Create new client with timeouts
let mut config = Config::from_custom_kubeconfig(
self.kubeconfig.clone(),
Expand All @@ -647,20 +718,13 @@ impl K8sClientPool {
config.connect_timeout = Some(CONNECT_TIMEOUT);
config.read_timeout = Some(READ_TIMEOUT);

let client = Client::try_from(config)
.with_context(|| format!("Failed to create client for context '{}'", context))?;

// Report connected
self.progress
.connected(context, start.elapsed().as_millis() as u64);

// Cache it
{
let mut clients = self.clients.write().await;
clients.insert(context.to_string(), client.clone());
}
Client::try_from(config)
.with_context(|| format!("Failed to create client for context '{}'", context))
}

Ok(client)
#[cfg(test)]
async fn set_client_factory_for_test(&self, factory: Arc<TestClientFactory>) {
*self.client_factory.write().await = Some(factory);
}

/// Get client for a specific context, or current context if None
Expand Down Expand Up @@ -1046,6 +1110,19 @@ impl K8sClientPool {

#[cfg(test)]
mod tests {
use super::K8sClientPool;
use crate::progress::create_progress_handle;
use bytes::Bytes;
use futures::future::join_all;
use http::{Request, Response, StatusCode};
use http_body_util::Full;
use std::convert::Infallible;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::sync::Barrier;
use tower::service_fn;

/// Helper function to test alias building logic in isolation
/// This replicates the logic from process_discovered_crds
fn build_aliases(
Expand Down Expand Up @@ -1151,4 +1228,70 @@ mod tests {
assert!(aliases.contains(&"cert".to_string()));
assert!(aliases.contains(&"certificate".to_string()));
}

fn make_test_client() -> kube::Client {
kube::Client::new(
service_fn(|_request: Request<kube::client::Body>| async move {
Ok::<_, Infallible>(
Response::builder()
.status(StatusCode::OK)
.body(Full::new(Bytes::from_static(b"{}")))
.expect("test response"),
)
}),
"default",
)
}

#[tokio::test]
async fn get_or_create_client_creates_once_per_context_under_concurrency() {
let pool = Arc::new(K8sClientPool::new_for_test(create_progress_handle()));
let create_count = Arc::new(AtomicUsize::new(0));

pool.set_client_factory_for_test(Arc::new({
let create_count = create_count.clone();
move |_context| {
let create_count = create_count.clone();
Box::pin(async move {
create_count.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(50)).await;
Ok(make_test_client())
})
}
}))
.await;

let callers = 16;
let start_barrier = Arc::new(Barrier::new(callers + 1));

let tasks = (0..callers)
.map(|_| {
let pool = pool.clone();
let start_barrier = start_barrier.clone();
tokio::spawn(async move {
start_barrier.wait().await;
pool.get_client(Some("test-context")).await
})
})
.collect::<Vec<_>>();

start_barrier.wait().await;

let results = join_all(tasks).await;
for result in results {
result
.expect("task join should succeed")
.expect("client creation should succeed");
}

assert_eq!(
create_count.load(Ordering::SeqCst),
1,
"parallel callers should share a single client construction"
);

let clients = pool.clients.read().await;
assert_eq!(clients.len(), 1);
assert!(clients.contains_key("test-context"));
}
}