Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions crates/datasilo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,14 +536,32 @@ impl<S: SnapshotCodec, O: OpCodec<Snapshot = S::Snapshot>> DataSilo<S, O> {
}

// Phase 1: seed snapshots from data.bin (no locks).
// Sort keys by their offset in data.bin so mmap reads are sequential.
// This turns random page faults into sequential readahead — the kernel
// prefetches adjacent pages, and the I/O scheduler merges requests.
let mmap_start = Instant::now();
let mut out: Vec<Option<S::Snapshot>> = keys
let mut out: Vec<Option<S::Snapshot>> = vec![None; keys.len()];

// Build (original_index, offset) pairs, sort by offset, read in order.
let mut order: Vec<(usize, u64)> = keys
.iter()
.map(|&key| match self.data_bytes_for(key) {
Some(bytes) => S::decode(bytes).ok(),
None => None,
.enumerate()
.map(|(i, &key)| {
let offset = self.index.as_ref()
.and_then(|idx| idx.get(key))
.map_or(u64::MAX, |e| e.offset);
(i, offset)
})
.collect();
order.sort_unstable_by_key(|&(_, offset)| offset);

for &(orig_idx, _) in &order {
let key = keys[orig_idx];
out[orig_idx] = match self.data_bytes_for(key) {
Some(bytes) => S::decode(bytes).ok(),
None => None,
};
}
let mmap_nanos = mmap_start.elapsed().as_nanos() as u64;

// Phase 2: scan each ops log once, applying every op to any key that
Expand Down
12 changes: 12 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ use crate::executor::{CaseSensitiveFields, StringMaps};
use crate::loader;
use crate::metrics::Metrics;
use crate::mutation::FieldValue;

/// Limit concurrent disk doc reads to prevent mmap page-fault storms.
/// At 109M records with multi-GB data.bin, 78 concurrent readers doing
/// random page faults thrash the I/O scheduler — P99 doc reads hit 5-10s.
/// 16 permits keeps disk utilization high without contention collapse.
static DOC_DISK_READ_SEMAPHORE: std::sync::LazyLock<tokio::sync::Semaphore> =
std::sync::LazyLock::new(|| tokio::sync::Semaphore::new(16));
use crate::query::{BitdexQuery, Value};

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -2775,6 +2782,11 @@ async fn handle_query(
m.query_docs_path_total
.with_label_values(&[&name_docs, "spawn_blocking"])
.inc();
// Acquire disk read permit before spawn_blocking. This queues
// excess readers in async context (no thread consumed) rather
// than spawning 78 blocking threads that all page-fault at once.
let _disk_permit = DOC_DISK_READ_SEMAPHORE.acquire().await
.expect("doc disk read semaphore closed");
// C1 isolation: capture wall time immediately before the
// spawn_blocking call so we can measure dispatch gap.
let dispatch_start = Instant::now();
Expand Down
Loading