Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/query_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pub struct QueryTrace {
pub lazy_load_us: u64,
pub docs_us: u64,
pub docs_count: u64,
/// Doc cache hits (slots served from doc_cache without disk read)
#[serde(default)]
pub docs_cache_hits: u64,
/// Doc cache misses (slots that required spawn_blocking disk read)
#[serde(default)]
pub docs_cache_misses: u64,
/// Time (µs) the query thread spent blocked waiting for unified_cache
/// Mutex, contended by the flush thread's Phase A/C lock-held work.
pub cache_lock_wait_us: u64,
Expand Down Expand Up @@ -122,6 +128,8 @@ impl QueryTraceCollector {
lazy_load_us: self.lazy_load_us,
docs_us: 0,
docs_count: 0,
docs_cache_hits: 0,
docs_cache_misses: 0,
cache_lock_wait_us: self.cache_lock_wait_us,
cache_hold_us: self.cache_hold_us,
pre_cache_us: self.pre_cache_us,
Expand Down Expand Up @@ -374,6 +382,8 @@ mod tests {
result_count: n,
docs_us: 0,
docs_count: 0,
docs_cache_hits: 0,
docs_cache_misses: 0,
cache_hit: false,
clauses: vec![],
sort: None,
Expand Down
56 changes: 53 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ use crate::mutation::FieldValue;

use crate::query::{BitdexQuery, Value};

/// Doc disk read semaphore — limits concurrent spawn_blocking doc reads
/// to control both I/O contention and memory pressure from mmap page faults.
/// Runtime-tunable via PATCH /indexes/{name}/config with doc_disk_read_permits.
///
/// Default 64 permits — balance between convoy (too low) and OOM (too high).
/// Only gates the cache-miss path; full-cache-hit batches skip the semaphore
/// via the inline fast path.
static DOC_DISK_READ_SEMAPHORE: std::sync::LazyLock<arc_swap::ArcSwap<tokio::sync::Semaphore>> =
std::sync::LazyLock::new(|| arc_swap::ArcSwap::from_pointee(tokio::sync::Semaphore::new(64)));
static DOC_DISK_READ_PERMITS: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(64);

// ---------------------------------------------------------------------------
// Server state
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -933,6 +945,13 @@ struct ConfigPatch {
/// Takes effect on the next generation rotation.
#[serde(default)]
doc_cache_max_generations: Option<usize>,
/// Apr 14 2026: Doc disk read semaphore permits — concurrent cache-miss
/// reads allowed. Too low → convoy (14.5s at 16 perms, cold cache).
/// Too high → OOM (unlimited grew RSS past 32 GiB). Default 64.
/// Only applies to cache-miss batches; full-cache-hit inline path bypasses.
/// Not persisted — experiment lever. Pass 0 for no change.
#[serde(default)]
doc_disk_read_permits: Option<usize>,
/// Toggle expensive metric groups at runtime. Array of group names to enable.
/// Groups: "bitmap_memory", "eviction_stats", "boundstore_disk"
/// DEPRECATED: Use disabled_metrics instead.
Expand Down Expand Up @@ -2288,6 +2307,18 @@ async fn handle_patch_config(
eprintln!("Config patch: doc_cache_max_generations set to {v} (0 = revert to config default, not persisted)");
}

// Apr 14 2026: doc disk read semaphore permit count.
// Swap in a new semaphore atomically — in-flight permits on
// the old semaphore drain naturally; new acquires hit the new.
// Not persisted — experiment lever.
if let Some(v) = patch.doc_disk_read_permits {
if v > 0 {
DOC_DISK_READ_PERMITS.store(v, std::sync::atomic::Ordering::Relaxed);
DOC_DISK_READ_SEMAPHORE.store(std::sync::Arc::new(tokio::sync::Semaphore::new(v)));
eprintln!("Config patch: doc_disk_read_permits set to {v} (not persisted)");
}
}

// Persist updated config
let index_dir = state.data_dir.join("indexes").join(&name);
if let Err(e) = idx.definition.save_yaml(&index_dir) {
Expand Down Expand Up @@ -2675,6 +2706,9 @@ async fn handle_query(
// Doc reads can hit disk (cache miss) — sync I/O on the async
// runtime causes 4s+ response times under load.
let doc_start = Instant::now();
// Track per-query doc cache hits/misses for trace correlation.
let docs_cache_hits_cell = Arc::new(std::sync::atomic::AtomicU64::new(0));
let docs_cache_misses_cell = Arc::new(std::sync::atomic::AtomicU64::new(0));
let documents = if !include_docs.is_none() {
let engine_docs = Arc::clone(&engine);
let _ids = result.ids.clone();
Expand Down Expand Up @@ -2713,6 +2747,8 @@ async fn handle_query(
}
if all_hit && cached.len() == slot_ids.len() {
// All cache hits — format inline, skip spawn_blocking.
docs_cache_hits_cell.store(cached.len() as u64, std::sync::atomic::Ordering::Relaxed);
docs_cache_misses_cell.store(0, std::sync::atomic::Ordering::Relaxed);
let format_start = Instant::now();
let docs: Vec<serde_json::Value> = cached.into_iter().enumerate().map(|(i, opt)| {
match opt {
Expand Down Expand Up @@ -2765,6 +2801,9 @@ async fn handle_query(
let format_hist = m.query_doc_format_seconds.clone();
let read_path_total = m.docstore_read_path_total.clone();
let batch_miss_hist = m.query_docs_batch_miss_count.clone();
let docs_cache_hits_cell2 = Arc::clone(&docs_cache_hits_cell);
let docs_cache_misses_cell2 = Arc::clone(&docs_cache_misses_cell);
let total_slots = result.ids.len() as u64;
let docstore_hist_clone = docstore_hist.clone();
let name_docs_clone = name_docs.clone();
let ids = result.ids.clone();
Expand All @@ -2776,9 +2815,14 @@ async fn handle_query(
m.query_docs_path_total
.with_label_values(&[&name_docs, "spawn_blocking"])
.inc();
// Semaphore removed: 16 permits caused 14.5s convoy effect under
// cold page cache (47 QPS × 1-5s holds = instant queue buildup).
// Offset-sorted reads handle I/O contention via sequential access.
// Acquire doc disk read permit. Runtime-tunable via PATCH
// (doc_disk_read_permits). Limits concurrent miss-path reads
// to control memory pressure from page faults without the
// 14.5s convoy seen at 16 permits. Default 64.
// Only gates cache-miss path; inline fast path bypasses entirely.
let semaphore = DOC_DISK_READ_SEMAPHORE.load_full();
let _disk_permit = semaphore.acquire_owned().await
.expect("doc disk read semaphore closed");
// C1 isolation: capture wall time immediately before the
// spawn_blocking call so we can measure dispatch gap.
let dispatch_start = Instant::now();
Expand All @@ -2797,6 +2841,10 @@ async fn handle_query(

let docs: Vec<serde_json::Value> = match batch_result {
Ok((stored_opts, stats, unique_shards, cache_probe_ns, disk_fetch_ns, miss_count)) => {
// Per-query doc cache hits/misses for trace correlation.
let hits = total_slots.saturating_sub(miss_count as u64);
docs_cache_hits_cell2.store(hits, std::sync::atomic::Ordering::Relaxed);
docs_cache_misses_cell2.store(miss_count as u64, std::sync::atomic::Ordering::Relaxed);
// Apr 11 2026: observe per-batch miss count
// distribution. Measures "all or nothing"
// amplification — one miss forces the whole
Expand Down Expand Up @@ -2917,6 +2965,8 @@ async fn handle_query(
let mut trace = trace;
trace.docs_us = docs_us;
trace.docs_count = docs_count;
trace.docs_cache_hits = docs_cache_hits_cell.load(std::sync::atomic::Ordering::Relaxed);
trace.docs_cache_misses = docs_cache_misses_cell.load(std::sync::atomic::Ordering::Relaxed);

// Observe query phase histograms
m.query_filter_seconds
Expand Down
Loading