Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/_in/design-prefilter-registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,18 @@ Both evaluate to the same set, assuming the prefilter bitmap is an accurate mate

**Drift-free fallback (optional):** If drift is unacceptable, we can track ops that touch prefilter fields and apply them incrementally (same pattern as alive_bitmap). Phase 2 work.

**Deleted-doc resurrection (Phase 1 hazard + mitigation):** The engine uses clean-delete — on delete, the doc's bits are cleared from every filter/sort bitmap AND from alive, so filter bitmaps stay clean and the query hot path has no `alive AND filter`. The cached prefilter bitmap is NOT maintained by clean-delete — it's a frozen snapshot. Between refreshes a deleted doc's bit stays set in the cached prefilter. If a query's clauses match the prefilter and no other constraint filters against a clean bitmap (e.g. a "show me everything safe" pagination with no extra filter), the deleted doc would re-surface in results.

*Mitigation:* `substitute()` intersects the cached prefilter with live `alive` at substitute time. Cost: one bitmap AND (~20–50 ms at 107 M) per substituted query — still a large win vs the 300–900 ms saved by eliding the original clauses. Verified by `substitute_excludes_dead_slots_from_stale_prefilter` unit test and the `deleted_docs_excluded_from_stale_prefilter` integration test.

## Implementation deviations from this design

- **Registry placement.** The design specified `InnerEngine` so the registry participated in the ArcSwap snapshot. The implementation puts it on `ConcurrentEngine` so registrations don't force a snapshot publish. Consequence: the clause list is fixed at register time from the live engine, not the caller's snapshot, but clause sets are immutable after registration so snapshot coherence isn't load-bearing. The cached bitmap lives in its own `ArcSwap` for per-entry atomic refresh.

- **Phase 1b (flag-gated substitution) was skipped.** The registry's empty-by-default `is_empty()` early-return in `substitute()` is sufficient as a kill switch: no registrations → zero hot-path cost and no behavior change. The opt-in-per-query flag was belt-and-suspenders and was consciously dropped.

- **Cache interaction with drift.** The unified cache is keyed off the pre-substitution clause list. On cache-hit the flush-thread-maintained result is returned (accurate). On cache-miss, substitution fires and the possibly-stale prefilter is intersected with live alive. Same user query can thus see slightly different result sets under cache churn — bounded by drift window × refresh interval. Acceptable for Phase 1 per the design's explicit "mild drift is acceptable" stance; Phase 2 would include a prefilter version in the cache key.

## Testing plan

### Unit tests (src/prefilter.rs)
Expand Down
6 changes: 4 additions & 2 deletions src/concurrent_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5611,7 +5611,8 @@ impl ConcurrentEngine {
// Substitute registered prefilters: if the query's canonical clause
// set contains a registered prefilter's clauses as a subset, replace
// those clauses with a single BucketBitmap AND. See src/prefilter.rs.
let (post_prefilter, substituted) = crate::prefilter::substitute(&self.prefilters, effective_filters);
let alive = executor.slot_allocator().alive_fused_cow();
let (post_prefilter, substituted) = crate::prefilter::substitute(&self.prefilters, effective_filters, &alive);
if let Some(ref entry) = substituted {
collector.prefilter_name = Some(entry.name.clone());
}
Expand Down Expand Up @@ -5655,7 +5656,8 @@ impl ConcurrentEngine {
// Substitute registered prefilters: if the query's canonical clause
// set is a superset of a registered prefilter's clauses, replace
// those clauses with a single BucketBitmap AND. See src/prefilter.rs.
let (post_prefilter, _entry) = crate::prefilter::substitute(&self.prefilters, effective_filters);
let alive = executor.slot_allocator().alive_fused_cow();
let (post_prefilter, _entry) = crate::prefilter::substitute(&self.prefilters, effective_filters, &alive);
let planner_ctx = planner::PlannerContext {
string_maps: executor.string_maps(),
dictionaries: executor.dictionaries(),
Expand Down
91 changes: 83 additions & 8 deletions src/prefilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,19 @@
//! the cached bitmap — turning ~900 ms of filter work into a single intersect.
//!
//! The registry lives on `ConcurrentEngine` (outside the `ArcSwap<InnerEngine>`
//! snapshot) so registrations don't force a snapshot publish. Each entry's
//! bitmap is an `ArcSwap<RoaringBitmap>` so a refresh thread can publish a
//! fresh version without disrupting in-flight queries.
//! snapshot) so registrations don't force a snapshot publish. This is a
//! deliberate deviation from the design doc — see
//! `docs/_in/design-prefilter-registry.md#implementation-deviations`. Each
//! entry's bitmap is an `ArcSwap<RoaringBitmap>` so a refresh thread can
//! publish a fresh version without disrupting in-flight queries.
//!
//! **Delete safety.** The engine uses clean-delete: on delete, a doc's bits
//! are cleared from every filter/sort bitmap and from `alive`. The cached
//! prefilter bitmap is NOT maintained by clean-delete — it's a frozen
//! snapshot. Between refreshes, deleted docs still have bits set in the
//! cached prefilter. `substitute()` therefore intersects the cached bitmap
//! with the live `alive` bitmap at substitute time to avoid resurrecting
//! deleted docs through a superset-only query.

use arc_swap::ArcSwap;
use dashmap::DashMap;
Expand Down Expand Up @@ -336,6 +346,7 @@ fn now_unix_secs() -> i64 {
pub fn substitute<'a>(
registry: &PrefilterRegistry,
query_clauses: &'a [FilterClause],
alive: &RoaringBitmap,
) -> (std::borrow::Cow<'a, [FilterClause]>, Option<Arc<PrefilterEntry>>) {
use std::borrow::Cow;
if registry.is_empty() {
Expand All @@ -362,11 +373,28 @@ pub fn substitute<'a>(
}
}

let bitmap = entry.bitmap.load_full();
// Intersect with alive at substitute time.
//
// The engine uses clean-delete: on delete, the doc's bits are cleared
// from every filter/sort bitmap AND from alive. Filter bitmaps are thus
// kept clean and there is no `alive AND filter` in the query hot path.
//
// The cached prefilter bitmap is NOT maintained by clean-delete — it's
// a frozen snapshot. Between refreshes, bits for deleted docs remain
// set. If a query's only constraint is the prefilter (no extra clauses
// to narrow against a clean filter bitmap), the deleted doc would
// re-surface in results. Guard against that by intersecting with the
// current alive bitmap.
//
// Cost: one AND against alive per substituted query. At 107M records
// this is ~20-50 ms — still a large win vs the 300-900 ms we save by
// eliding the original clauses, and cheap insurance against drift.
let stale = entry.bitmap.load_full();
let live = Arc::new(stale.as_ref() & alive);
let bucket_clause = FilterClause::BucketBitmap {
field: "__prefilter".to_string(),
bucket_name: entry.name.clone(),
bitmap,
bitmap: live,
};
let mut out = Vec::with_capacity(remaining.len() + 1);
out.push(bucket_clause);
Expand Down Expand Up @@ -496,14 +524,21 @@ mod tests {
assert_eq!(matched.len(), 2);
}

fn alive_all() -> RoaringBitmap {
let mut bm = RoaringBitmap::new();
bm.insert_range(0..100u32);
bm
}

#[test]
fn substitute_replaces_matched_clauses_with_bucket_bitmap() {
let reg = PrefilterRegistry::new();
let mut bm = RoaringBitmap::new();
bm.insert(10); bm.insert(20);
reg.insert("safe".into(), vec![eq("a", 1), eq("b", 2)], bm, 300, 0).unwrap();
let query = vec![eq("c", 3), eq("a", 1), eq("b", 2)];
let (out, entry) = substitute(&reg, &query);
let alive = alive_all();
let (out, entry) = substitute(&reg, &query, &alive);
let entry = entry.unwrap();
assert_eq!(entry.name, "safe");
assert_eq!(out.len(), 2);
Expand All @@ -522,12 +557,39 @@ mod tests {
}
}

#[test]
fn substitute_excludes_dead_slots_from_stale_prefilter() {
// Regression: cached prefilter has bits for slots 10, 20, 30, but
// slot 20 has since been deleted (not in alive). Substitution must
// NOT leak slot 20 through the returned BucketBitmap.
let reg = PrefilterRegistry::new();
let mut stale = RoaringBitmap::new();
stale.insert(10); stale.insert(20); stale.insert(30);
reg.insert("safe".into(), vec![eq("a", 1)], stale, 300, 0).unwrap();

let mut alive = RoaringBitmap::new();
alive.insert(10); alive.insert(30); // 20 was deleted

let query = vec![eq("a", 1)];
let (out, _) = substitute(&reg, &query, &alive);
match &out[0] {
FilterClause::BucketBitmap { bitmap, .. } => {
assert_eq!(bitmap.len(), 2, "dead slot 20 must not leak through");
assert!(bitmap.contains(10));
assert!(!bitmap.contains(20), "slot 20 was deleted, must not appear");
assert!(bitmap.contains(30));
}
other => panic!("expected BucketBitmap, got {other:?}"),
}
}

#[test]
fn substitute_is_noop_when_no_match() {
let reg = PrefilterRegistry::new();
reg.insert("safe".into(), vec![eq("a", 1)], RoaringBitmap::new(), 300, 0).unwrap();
let query = vec![eq("b", 2)];
let (out, entry) = substitute(&reg, &query);
let alive = alive_all();
let (out, entry) = substitute(&reg, &query, &alive);
assert!(entry.is_none());
assert_eq!(out.len(), 1);
}
Expand All @@ -536,8 +598,21 @@ mod tests {
fn substitute_is_noop_on_empty_registry() {
let reg = PrefilterRegistry::new();
let query = vec![eq("a", 1)];
let (out, entry) = substitute(&reg, &query);
let alive = alive_all();
let (out, entry) = substitute(&reg, &query, &alive);
assert!(entry.is_none());
assert_eq!(out.len(), 1);
}

#[test]
fn canonicalize_does_not_reorder_or_children() {
// Design open question #1: Or(A, B) and Or(B, A) currently canonicalize
// distinctly. This regression test pins that behavior — a future
// change adding Or reordering must update it explicitly (and consider
// the test coverage implied by reordering).
let c1 = FilterClause::Or(vec![eq("a", 1), eq("b", 2)]);
let c2 = FilterClause::Or(vec![eq("b", 2), eq("a", 1)]);
assert_ne!(canonicalize_clause(c1), canonicalize_clause(c2),
"Or canonicalization changed — update substitution matching logic or this test");
}
}
37 changes: 30 additions & 7 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4427,9 +4427,19 @@ async fn handle_register_prefilter(
).into_response();
}

// compute_filters walks up to ~100M-bit bitmaps; at prod scale the
// register compute is ~300-900ms. Running that on the Axum async worker
// would starve the reactor and cause latency spikes for other requests.
// Offload to the blocking pool.
let t0 = Instant::now();
match engine.register_prefilter(req.name.clone(), req.clauses, req.refresh_interval_secs) {
Ok(entry) => {
let req_name = req.name.clone();
let clauses = req.clauses;
let interval = req.refresh_interval_secs;
let result = tokio::task::spawn_blocking(move || {
engine.register_prefilter(req_name, clauses, interval)
}).await;
match result {
Ok(Ok(entry)) => {
let compute_ms = t0.elapsed().as_millis() as u64;
Json(serde_json::json!({
"name": entry.name,
Expand All @@ -4438,10 +4448,14 @@ async fn handle_register_prefilter(
"refresh_interval_secs": entry.refresh_interval_secs(),
})).into_response()
}
Err(e) => (
Ok(Err(e)) => (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": format!("register_prefilter: {e}")})),
).into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("register_prefilter task panicked: {e}")})),
).into_response(),
}
}

Expand Down Expand Up @@ -4513,21 +4527,30 @@ async fn handle_refresh_prefilter(
}
};

// Offload to blocking pool — compute_filters runs ~300-900ms at prod scale.
let t0 = Instant::now();
match engine.refresh_prefilter(&prefilter_name) {
Ok(Some(entry)) => Json(serde_json::json!({
let pname = prefilter_name.clone();
let result = tokio::task::spawn_blocking(move || {
engine.refresh_prefilter(&pname)
}).await;
match result {
Ok(Ok(Some(entry))) => Json(serde_json::json!({
"name": entry.name,
"cardinality": entry.cardinality(),
"compute_time_ms": t0.elapsed().as_millis() as u64,
})).into_response(),
Ok(None) => (
Ok(Ok(None)) => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": format!("prefilter '{}' not found", prefilter_name)})),
).into_response(),
Err(e) => (
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("refresh_prefilter: {e}")})),
).into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("refresh_prefilter task panicked: {e}")})),
).into_response(),
}
}

Expand Down
54 changes: 54 additions & 0 deletions tests/prefilter_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,60 @@ fn prefilter_substitution_preserves_results() {
assert_eq!(after_ids, baseline_ids, "results unchanged after unregister");
}

#[test]
fn deleted_docs_excluded_from_stale_prefilter() {
// Engine uses clean-delete: filter bitmaps are kept clean and there is no
// `alive AND filter` in the query hot path. Our cached prefilter bitmap is
// NOT maintained by clean-delete — it's a frozen snapshot. Between
// refreshes a deleted doc's bit stays set in the cached prefilter. Without
// a guard, a query whose clause set matches the prefilter would resurface
// deleted docs.
//
// substitute() intersects the prefilter with live alive at substitute
// time. Verify end-to-end.
let dir = tempfile::tempdir().unwrap();
let engine = ConcurrentEngine::new_with_path(
test_config(&dir.path().join("bitmaps")),
&dir.path().join("docs"),
).unwrap();

for i in 1..=10u32 {
engine.put(i, &doc(1, true, 500, i as i64 * 10)).unwrap();
}
wait_for_alive(&engine, 10, 2000);

let entry = engine.register_prefilter(
"nsfw1".into(),
vec![FilterClause::Eq("nsfwLevel".into(), Value::Integer(1))],
300,
).unwrap();
assert_eq!(entry.cardinality(), 10);

// Delete 3/5/7 WITHOUT refreshing the prefilter.
engine.delete(3).unwrap();
engine.delete(5).unwrap();
engine.delete(7).unwrap();
wait_for_alive(&engine, 7, 2000);

// Cached prefilter cardinality is still 10 — confirms the guard has to
// handle stale bitmaps.
assert_eq!(entry.cardinality(), 10, "prefilter bitmap NOT auto-refreshed");

let q = BitdexQuery {
filters: vec![FilterClause::Eq("nsfwLevel".into(), Value::Integer(1))],
sort: None,
limit: 100,
cursor: None,
offset: None,
skip_cache: true,
};
let (result, trace) = engine.execute_query_traced(&q, "test").unwrap();
assert_eq!(trace.prefilter.as_deref(), Some("nsfw1"), "substitution should have fired");
let mut ids = result.ids;
ids.sort_unstable();
assert_eq!(ids, vec![1, 2, 4, 6, 8, 9, 10], "deleted slots 3/5/7 must not leak through");
}

#[test]
fn refresh_updates_cardinality() {
let dir = tempfile::tempdir().unwrap();
Expand Down
Loading