diff --git a/crates/datasilo/src/lib.rs b/crates/datasilo/src/lib.rs index 3c4c538..9328a48 100644 --- a/crates/datasilo/src/lib.rs +++ b/crates/datasilo/src/lib.rs @@ -536,14 +536,32 @@ impl> DataSilo { } // Phase 1: seed snapshots from data.bin (no locks). + // Sort keys by their offset in data.bin so mmap reads are sequential. + // This turns random page faults into sequential readahead — the kernel + // prefetches adjacent pages, and the I/O scheduler merges requests. let mmap_start = Instant::now(); - let mut out: Vec> = keys + let mut out: Vec> = vec![None; keys.len()]; + + // Build (original_index, offset) pairs, sort by offset, read in order. + let mut order: Vec<(usize, u64)> = keys .iter() - .map(|&key| match self.data_bytes_for(key) { - Some(bytes) => S::decode(bytes).ok(), - None => None, + .enumerate() + .map(|(i, &key)| { + let offset = self.index.as_ref() + .and_then(|idx| idx.get(key)) + .map_or(u64::MAX, |e| e.offset); + (i, offset) }) .collect(); + order.sort_unstable_by_key(|&(_, offset)| offset); + + for &(orig_idx, _) in &order { + let key = keys[orig_idx]; + out[orig_idx] = match self.data_bytes_for(key) { + Some(bytes) => S::decode(bytes).ok(), + None => None, + }; + } let mmap_nanos = mmap_start.elapsed().as_nanos() as u64; // Phase 2: scan each ops log once, applying every op to any key that diff --git a/src/server.rs b/src/server.rs index 368c9bc..ce92f59 100644 --- a/src/server.rs +++ b/src/server.rs @@ -28,6 +28,13 @@ use crate::executor::{CaseSensitiveFields, StringMaps}; use crate::loader; use crate::metrics::Metrics; use crate::mutation::FieldValue; + +/// Limit concurrent disk doc reads to prevent mmap page-fault storms. +/// At 109M records with multi-GB data.bin, 78 concurrent readers doing +/// random page faults thrash the I/O scheduler — P99 doc reads hit 5-10s. +/// 16 permits keeps disk utilization high without contention collapse. +static DOC_DISK_READ_SEMAPHORE: std::sync::LazyLock = + std::sync::LazyLock::new(|| tokio::sync::Semaphore::new(16)); use crate::query::{BitdexQuery, Value}; // --------------------------------------------------------------------------- @@ -2775,6 +2782,11 @@ async fn handle_query( m.query_docs_path_total .with_label_values(&[&name_docs, "spawn_blocking"]) .inc(); + // Acquire disk read permit before spawn_blocking. This queues + // excess readers in async context (no thread consumed) rather + // than spawning 78 blocking threads that all page-fault at once. + let _disk_permit = DOC_DISK_READ_SEMAPHORE.acquire().await + .expect("doc disk read semaphore closed"); // C1 isolation: capture wall time immediately before the // spawn_blocking call so we can measure dispatch gap. let dispatch_start = Instant::now();