diff --git a/src/query_metrics.rs b/src/query_metrics.rs index 181ed1c..34694b3 100644 --- a/src/query_metrics.rs +++ b/src/query_metrics.rs @@ -30,6 +30,12 @@ pub struct QueryTrace { pub lazy_load_us: u64, pub docs_us: u64, pub docs_count: u64, + /// Doc cache hits (slots served from doc_cache without disk read) + #[serde(default)] + pub docs_cache_hits: u64, + /// Doc cache misses (slots that required spawn_blocking disk read) + #[serde(default)] + pub docs_cache_misses: u64, /// Time (µs) the query thread spent blocked waiting for unified_cache /// Mutex, contended by the flush thread's Phase A/C lock-held work. pub cache_lock_wait_us: u64, @@ -122,6 +128,8 @@ impl QueryTraceCollector { lazy_load_us: self.lazy_load_us, docs_us: 0, docs_count: 0, + docs_cache_hits: 0, + docs_cache_misses: 0, cache_lock_wait_us: self.cache_lock_wait_us, cache_hold_us: self.cache_hold_us, pre_cache_us: self.pre_cache_us, @@ -374,6 +382,8 @@ mod tests { result_count: n, docs_us: 0, docs_count: 0, + docs_cache_hits: 0, + docs_cache_misses: 0, cache_hit: false, clauses: vec![], sort: None, diff --git a/src/server.rs b/src/server.rs index 251af0d..89f4579 100644 --- a/src/server.rs +++ b/src/server.rs @@ -31,6 +31,18 @@ use crate::mutation::FieldValue; use crate::query::{BitdexQuery, Value}; +/// Doc disk read semaphore — limits concurrent spawn_blocking doc reads +/// to control both I/O contention and memory pressure from mmap page faults. +/// Runtime-tunable via PATCH /indexes/{name}/config with doc_disk_read_permits. +/// +/// Default 64 permits — balance between convoy (too low) and OOM (too high). +/// Only gates the cache-miss path; full-cache-hit batches skip the semaphore +/// via the inline fast path. +static DOC_DISK_READ_SEMAPHORE: std::sync::LazyLock> = + std::sync::LazyLock::new(|| arc_swap::ArcSwap::from_pointee(tokio::sync::Semaphore::new(64))); +static DOC_DISK_READ_PERMITS: std::sync::atomic::AtomicUsize = + std::sync::atomic::AtomicUsize::new(64); + // --------------------------------------------------------------------------- // Server state // --------------------------------------------------------------------------- @@ -933,6 +945,13 @@ struct ConfigPatch { /// Takes effect on the next generation rotation. #[serde(default)] doc_cache_max_generations: Option, + /// Apr 14 2026: Doc disk read semaphore permits — concurrent cache-miss + /// reads allowed. Too low → convoy (14.5s at 16 perms, cold cache). + /// Too high → OOM (unlimited grew RSS past 32 GiB). Default 64. + /// Only applies to cache-miss batches; full-cache-hit inline path bypasses. + /// Not persisted — experiment lever. Pass 0 for no change. + #[serde(default)] + doc_disk_read_permits: Option, /// Toggle expensive metric groups at runtime. Array of group names to enable. /// Groups: "bitmap_memory", "eviction_stats", "boundstore_disk" /// DEPRECATED: Use disabled_metrics instead. @@ -2288,6 +2307,18 @@ async fn handle_patch_config( eprintln!("Config patch: doc_cache_max_generations set to {v} (0 = revert to config default, not persisted)"); } + // Apr 14 2026: doc disk read semaphore permit count. + // Swap in a new semaphore atomically — in-flight permits on + // the old semaphore drain naturally; new acquires hit the new. + // Not persisted — experiment lever. + if let Some(v) = patch.doc_disk_read_permits { + if v > 0 { + DOC_DISK_READ_PERMITS.store(v, std::sync::atomic::Ordering::Relaxed); + DOC_DISK_READ_SEMAPHORE.store(std::sync::Arc::new(tokio::sync::Semaphore::new(v))); + eprintln!("Config patch: doc_disk_read_permits set to {v} (not persisted)"); + } + } + // Persist updated config let index_dir = state.data_dir.join("indexes").join(&name); if let Err(e) = idx.definition.save_yaml(&index_dir) { @@ -2675,6 +2706,9 @@ async fn handle_query( // Doc reads can hit disk (cache miss) — sync I/O on the async // runtime causes 4s+ response times under load. let doc_start = Instant::now(); + // Track per-query doc cache hits/misses for trace correlation. + let docs_cache_hits_cell = Arc::new(std::sync::atomic::AtomicU64::new(0)); + let docs_cache_misses_cell = Arc::new(std::sync::atomic::AtomicU64::new(0)); let documents = if !include_docs.is_none() { let engine_docs = Arc::clone(&engine); let _ids = result.ids.clone(); @@ -2713,6 +2747,8 @@ async fn handle_query( } if all_hit && cached.len() == slot_ids.len() { // All cache hits — format inline, skip spawn_blocking. + docs_cache_hits_cell.store(cached.len() as u64, std::sync::atomic::Ordering::Relaxed); + docs_cache_misses_cell.store(0, std::sync::atomic::Ordering::Relaxed); let format_start = Instant::now(); let docs: Vec = cached.into_iter().enumerate().map(|(i, opt)| { match opt { @@ -2765,6 +2801,9 @@ async fn handle_query( let format_hist = m.query_doc_format_seconds.clone(); let read_path_total = m.docstore_read_path_total.clone(); let batch_miss_hist = m.query_docs_batch_miss_count.clone(); + let docs_cache_hits_cell2 = Arc::clone(&docs_cache_hits_cell); + let docs_cache_misses_cell2 = Arc::clone(&docs_cache_misses_cell); + let total_slots = result.ids.len() as u64; let docstore_hist_clone = docstore_hist.clone(); let name_docs_clone = name_docs.clone(); let ids = result.ids.clone(); @@ -2776,9 +2815,14 @@ async fn handle_query( m.query_docs_path_total .with_label_values(&[&name_docs, "spawn_blocking"]) .inc(); - // Semaphore removed: 16 permits caused 14.5s convoy effect under - // cold page cache (47 QPS × 1-5s holds = instant queue buildup). - // Offset-sorted reads handle I/O contention via sequential access. + // Acquire doc disk read permit. Runtime-tunable via PATCH + // (doc_disk_read_permits). Limits concurrent miss-path reads + // to control memory pressure from page faults without the + // 14.5s convoy seen at 16 permits. Default 64. + // Only gates cache-miss path; inline fast path bypasses entirely. + let semaphore = DOC_DISK_READ_SEMAPHORE.load_full(); + let _disk_permit = semaphore.acquire_owned().await + .expect("doc disk read semaphore closed"); // C1 isolation: capture wall time immediately before the // spawn_blocking call so we can measure dispatch gap. let dispatch_start = Instant::now(); @@ -2797,6 +2841,10 @@ async fn handle_query( let docs: Vec = match batch_result { Ok((stored_opts, stats, unique_shards, cache_probe_ns, disk_fetch_ns, miss_count)) => { + // Per-query doc cache hits/misses for trace correlation. + let hits = total_slots.saturating_sub(miss_count as u64); + docs_cache_hits_cell2.store(hits, std::sync::atomic::Ordering::Relaxed); + docs_cache_misses_cell2.store(miss_count as u64, std::sync::atomic::Ordering::Relaxed); // Apr 11 2026: observe per-batch miss count // distribution. Measures "all or nothing" // amplification — one miss forces the whole @@ -2917,6 +2965,8 @@ async fn handle_query( let mut trace = trace; trace.docs_us = docs_us; trace.docs_count = docs_count; + trace.docs_cache_hits = docs_cache_hits_cell.load(std::sync::atomic::Ordering::Relaxed); + trace.docs_cache_misses = docs_cache_misses_cell.load(std::sync::atomic::Ordering::Relaxed); // Observe query phase histograms m.query_filter_seconds