From 0bcad4e813d88e4f0b30eb9c09dd60ae71dee129 Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Mon, 13 Apr 2026 21:46:52 -0600 Subject: [PATCH] fix(prefilter): spawn_blocking handlers + alive AND at substitute MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review findings from Gemini + GPT + Plan Reviewer on PR #207 surfaced two critical bugs plus a few design-doc deviations worth documenting. 1. Deleted-doc resurrection (Gemini, CRITICAL) The engine uses clean-delete: on delete, the doc's bits are cleared from every filter/sort bitmap AND from alive, so filter bitmaps stay clean and the query hot path has no `alive AND filter`. The cached prefilter bitmap is NOT maintained by clean-delete — it's a frozen snapshot. Between refreshes, deleted docs still have bits set in the cached prefilter. A query whose clauses match the prefilter (no extra clauses to narrow against clean filter bitmaps) would have acc = stale bitmap → deleted docs leak through. Fix: substitute() now takes an `&RoaringBitmap alive` argument and intersects the cached prefilter bitmap with alive before wrapping it in a BucketBitmap clause. Cost: one bitmap AND (~20-50ms at 107M) per substituted query — still a large win vs the 300-900ms saved by eliding the original clauses. Verified by: - unit test substitute_excludes_dead_slots_from_stale_prefilter - integration test deleted_docs_excluded_from_stale_prefilter (registers prefilter, deletes 3/10 docs without refresh, confirms substituted query returns 7 IDs excluding the deleted slots) 2. Tokio reactor starvation (Gemini, CRITICAL) handle_register_prefilter and handle_refresh_prefilter called engine.register_prefilter / engine.refresh_prefilter directly on the Axum async worker thread. compute_filters over 100M-bit bitmaps is ~300-900ms of CPU work — running that on a reactor thread starves the runtime and tanks p99 for every other request. Fix: wrap both in tokio::task::spawn_blocking. Errors propagate as JoinError → 500 Internal Server Error with message. No other behavioral change. 3. Or-children canonicalization (Plan Reviewer, low priority) Design open question #1: whether Or(A, B) and Or(B, A) should canonicalize identically. Current answer is "no" — we don't reorder Or children. Added unit test canonicalize_does_not_reorder_or_children as a regression backstop: if a future change enables Or sorting, the test must be updated deliberately. 4. Documented deviations + Phase 1 hazards in the design doc - Registry on ConcurrentEngine, not InnerEngine (reason: avoid snapshot publish on registration; clause sets are immutable so no coherence issue) - Phase 1b flag gate skipped (empty-registry early-return is the kill switch) - Cache/prefilter drift bifurcation: cache-hit path and cache-miss path may return slightly different results under drift; bounded by refresh interval; acceptable for Phase 1 per design's explicit "mild drift is acceptable"; Phase 2 includes prefilter version in cache key - Delete-drift / alive-intersection rationale Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/_in/design-prefilter-registry.md | 12 ++++ src/concurrent_engine.rs | 6 +- src/prefilter.rs | 91 ++++++++++++++++++++++++--- src/server.rs | 37 ++++++++--- tests/prefilter_integration.rs | 54 ++++++++++++++++ 5 files changed, 183 insertions(+), 17 deletions(-) diff --git a/docs/_in/design-prefilter-registry.md b/docs/_in/design-prefilter-registry.md index b7632bd..475dbdf 100644 --- a/docs/_in/design-prefilter-registry.md +++ b/docs/_in/design-prefilter-registry.md @@ -172,6 +172,18 @@ Both evaluate to the same set, assuming the prefilter bitmap is an accurate mate **Drift-free fallback (optional):** If drift is unacceptable, we can track ops that touch prefilter fields and apply them incrementally (same pattern as alive_bitmap). Phase 2 work. +**Deleted-doc resurrection (Phase 1 hazard + mitigation):** The engine uses clean-delete — on delete, the doc's bits are cleared from every filter/sort bitmap AND from alive, so filter bitmaps stay clean and the query hot path has no `alive AND filter`. The cached prefilter bitmap is NOT maintained by clean-delete — it's a frozen snapshot. Between refreshes a deleted doc's bit stays set in the cached prefilter. If a query's clauses match the prefilter and no other constraint filters against a clean bitmap (e.g. a "show me everything safe" pagination with no extra filter), the deleted doc would re-surface in results. + +*Mitigation:* `substitute()` intersects the cached prefilter with live `alive` at substitute time. Cost: one bitmap AND (~20–50 ms at 107 M) per substituted query — still a large win vs the 300–900 ms saved by eliding the original clauses. Verified by `substitute_excludes_dead_slots_from_stale_prefilter` unit test and the `deleted_docs_excluded_from_stale_prefilter` integration test. + +## Implementation deviations from this design + +- **Registry placement.** The design specified `InnerEngine` so the registry participated in the ArcSwap snapshot. The implementation puts it on `ConcurrentEngine` so registrations don't force a snapshot publish. Consequence: the clause list is fixed at register time from the live engine, not the caller's snapshot, but clause sets are immutable after registration so snapshot coherence isn't load-bearing. The cached bitmap lives in its own `ArcSwap` for per-entry atomic refresh. + +- **Phase 1b (flag-gated substitution) was skipped.** The registry's empty-by-default `is_empty()` early-return in `substitute()` is sufficient as a kill switch: no registrations → zero hot-path cost and no behavior change. The opt-in-per-query flag was belt-and-suspenders and was consciously dropped. + +- **Cache interaction with drift.** The unified cache is keyed off the pre-substitution clause list. On cache-hit the flush-thread-maintained result is returned (accurate). On cache-miss, substitution fires and the possibly-stale prefilter is intersected with live alive. Same user query can thus see slightly different result sets under cache churn — bounded by drift window × refresh interval. Acceptable for Phase 1 per the design's explicit "mild drift is acceptable" stance; Phase 2 would include a prefilter version in the cache key. + ## Testing plan ### Unit tests (src/prefilter.rs) diff --git a/src/concurrent_engine.rs b/src/concurrent_engine.rs index af3ab9c..25e4999 100644 --- a/src/concurrent_engine.rs +++ b/src/concurrent_engine.rs @@ -5611,7 +5611,8 @@ impl ConcurrentEngine { // Substitute registered prefilters: if the query's canonical clause // set contains a registered prefilter's clauses as a subset, replace // those clauses with a single BucketBitmap AND. See src/prefilter.rs. - let (post_prefilter, substituted) = crate::prefilter::substitute(&self.prefilters, effective_filters); + let alive = executor.slot_allocator().alive_fused_cow(); + let (post_prefilter, substituted) = crate::prefilter::substitute(&self.prefilters, effective_filters, &alive); if let Some(ref entry) = substituted { collector.prefilter_name = Some(entry.name.clone()); } @@ -5655,7 +5656,8 @@ impl ConcurrentEngine { // Substitute registered prefilters: if the query's canonical clause // set is a superset of a registered prefilter's clauses, replace // those clauses with a single BucketBitmap AND. See src/prefilter.rs. - let (post_prefilter, _entry) = crate::prefilter::substitute(&self.prefilters, effective_filters); + let alive = executor.slot_allocator().alive_fused_cow(); + let (post_prefilter, _entry) = crate::prefilter::substitute(&self.prefilters, effective_filters, &alive); let planner_ctx = planner::PlannerContext { string_maps: executor.string_maps(), dictionaries: executor.dictionaries(), diff --git a/src/prefilter.rs b/src/prefilter.rs index c01c646..1be4f57 100644 --- a/src/prefilter.rs +++ b/src/prefilter.rs @@ -10,9 +10,19 @@ //! the cached bitmap — turning ~900 ms of filter work into a single intersect. //! //! The registry lives on `ConcurrentEngine` (outside the `ArcSwap` -//! snapshot) so registrations don't force a snapshot publish. Each entry's -//! bitmap is an `ArcSwap` so a refresh thread can publish a -//! fresh version without disrupting in-flight queries. +//! snapshot) so registrations don't force a snapshot publish. This is a +//! deliberate deviation from the design doc — see +//! `docs/_in/design-prefilter-registry.md#implementation-deviations`. Each +//! entry's bitmap is an `ArcSwap` so a refresh thread can +//! publish a fresh version without disrupting in-flight queries. +//! +//! **Delete safety.** The engine uses clean-delete: on delete, a doc's bits +//! are cleared from every filter/sort bitmap and from `alive`. The cached +//! prefilter bitmap is NOT maintained by clean-delete — it's a frozen +//! snapshot. Between refreshes, deleted docs still have bits set in the +//! cached prefilter. `substitute()` therefore intersects the cached bitmap +//! with the live `alive` bitmap at substitute time to avoid resurrecting +//! deleted docs through a superset-only query. use arc_swap::ArcSwap; use dashmap::DashMap; @@ -336,6 +346,7 @@ fn now_unix_secs() -> i64 { pub fn substitute<'a>( registry: &PrefilterRegistry, query_clauses: &'a [FilterClause], + alive: &RoaringBitmap, ) -> (std::borrow::Cow<'a, [FilterClause]>, Option>) { use std::borrow::Cow; if registry.is_empty() { @@ -362,11 +373,28 @@ pub fn substitute<'a>( } } - let bitmap = entry.bitmap.load_full(); + // Intersect with alive at substitute time. + // + // The engine uses clean-delete: on delete, the doc's bits are cleared + // from every filter/sort bitmap AND from alive. Filter bitmaps are thus + // kept clean and there is no `alive AND filter` in the query hot path. + // + // The cached prefilter bitmap is NOT maintained by clean-delete — it's + // a frozen snapshot. Between refreshes, bits for deleted docs remain + // set. If a query's only constraint is the prefilter (no extra clauses + // to narrow against a clean filter bitmap), the deleted doc would + // re-surface in results. Guard against that by intersecting with the + // current alive bitmap. + // + // Cost: one AND against alive per substituted query. At 107M records + // this is ~20-50 ms — still a large win vs the 300-900 ms we save by + // eliding the original clauses, and cheap insurance against drift. + let stale = entry.bitmap.load_full(); + let live = Arc::new(stale.as_ref() & alive); let bucket_clause = FilterClause::BucketBitmap { field: "__prefilter".to_string(), bucket_name: entry.name.clone(), - bitmap, + bitmap: live, }; let mut out = Vec::with_capacity(remaining.len() + 1); out.push(bucket_clause); @@ -496,6 +524,12 @@ mod tests { assert_eq!(matched.len(), 2); } + fn alive_all() -> RoaringBitmap { + let mut bm = RoaringBitmap::new(); + bm.insert_range(0..100u32); + bm + } + #[test] fn substitute_replaces_matched_clauses_with_bucket_bitmap() { let reg = PrefilterRegistry::new(); @@ -503,7 +537,8 @@ mod tests { bm.insert(10); bm.insert(20); reg.insert("safe".into(), vec![eq("a", 1), eq("b", 2)], bm, 300, 0).unwrap(); let query = vec![eq("c", 3), eq("a", 1), eq("b", 2)]; - let (out, entry) = substitute(®, &query); + let alive = alive_all(); + let (out, entry) = substitute(®, &query, &alive); let entry = entry.unwrap(); assert_eq!(entry.name, "safe"); assert_eq!(out.len(), 2); @@ -522,12 +557,39 @@ mod tests { } } + #[test] + fn substitute_excludes_dead_slots_from_stale_prefilter() { + // Regression: cached prefilter has bits for slots 10, 20, 30, but + // slot 20 has since been deleted (not in alive). Substitution must + // NOT leak slot 20 through the returned BucketBitmap. + let reg = PrefilterRegistry::new(); + let mut stale = RoaringBitmap::new(); + stale.insert(10); stale.insert(20); stale.insert(30); + reg.insert("safe".into(), vec![eq("a", 1)], stale, 300, 0).unwrap(); + + let mut alive = RoaringBitmap::new(); + alive.insert(10); alive.insert(30); // 20 was deleted + + let query = vec![eq("a", 1)]; + let (out, _) = substitute(®, &query, &alive); + match &out[0] { + FilterClause::BucketBitmap { bitmap, .. } => { + assert_eq!(bitmap.len(), 2, "dead slot 20 must not leak through"); + assert!(bitmap.contains(10)); + assert!(!bitmap.contains(20), "slot 20 was deleted, must not appear"); + assert!(bitmap.contains(30)); + } + other => panic!("expected BucketBitmap, got {other:?}"), + } + } + #[test] fn substitute_is_noop_when_no_match() { let reg = PrefilterRegistry::new(); reg.insert("safe".into(), vec![eq("a", 1)], RoaringBitmap::new(), 300, 0).unwrap(); let query = vec![eq("b", 2)]; - let (out, entry) = substitute(®, &query); + let alive = alive_all(); + let (out, entry) = substitute(®, &query, &alive); assert!(entry.is_none()); assert_eq!(out.len(), 1); } @@ -536,8 +598,21 @@ mod tests { fn substitute_is_noop_on_empty_registry() { let reg = PrefilterRegistry::new(); let query = vec![eq("a", 1)]; - let (out, entry) = substitute(®, &query); + let alive = alive_all(); + let (out, entry) = substitute(®, &query, &alive); assert!(entry.is_none()); assert_eq!(out.len(), 1); } + + #[test] + fn canonicalize_does_not_reorder_or_children() { + // Design open question #1: Or(A, B) and Or(B, A) currently canonicalize + // distinctly. This regression test pins that behavior — a future + // change adding Or reordering must update it explicitly (and consider + // the test coverage implied by reordering). + let c1 = FilterClause::Or(vec![eq("a", 1), eq("b", 2)]); + let c2 = FilterClause::Or(vec![eq("b", 2), eq("a", 1)]); + assert_ne!(canonicalize_clause(c1), canonicalize_clause(c2), + "Or canonicalization changed — update substitution matching logic or this test"); + } } diff --git a/src/server.rs b/src/server.rs index 22b69f3..a889c48 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4427,9 +4427,19 @@ async fn handle_register_prefilter( ).into_response(); } + // compute_filters walks up to ~100M-bit bitmaps; at prod scale the + // register compute is ~300-900ms. Running that on the Axum async worker + // would starve the reactor and cause latency spikes for other requests. + // Offload to the blocking pool. let t0 = Instant::now(); - match engine.register_prefilter(req.name.clone(), req.clauses, req.refresh_interval_secs) { - Ok(entry) => { + let req_name = req.name.clone(); + let clauses = req.clauses; + let interval = req.refresh_interval_secs; + let result = tokio::task::spawn_blocking(move || { + engine.register_prefilter(req_name, clauses, interval) + }).await; + match result { + Ok(Ok(entry)) => { let compute_ms = t0.elapsed().as_millis() as u64; Json(serde_json::json!({ "name": entry.name, @@ -4438,10 +4448,14 @@ async fn handle_register_prefilter( "refresh_interval_secs": entry.refresh_interval_secs(), })).into_response() } - Err(e) => ( + Ok(Err(e)) => ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": format!("register_prefilter: {e}")})), ).into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": format!("register_prefilter task panicked: {e}")})), + ).into_response(), } } @@ -4513,21 +4527,30 @@ async fn handle_refresh_prefilter( } }; + // Offload to blocking pool — compute_filters runs ~300-900ms at prod scale. let t0 = Instant::now(); - match engine.refresh_prefilter(&prefilter_name) { - Ok(Some(entry)) => Json(serde_json::json!({ + let pname = prefilter_name.clone(); + let result = tokio::task::spawn_blocking(move || { + engine.refresh_prefilter(&pname) + }).await; + match result { + Ok(Ok(Some(entry))) => Json(serde_json::json!({ "name": entry.name, "cardinality": entry.cardinality(), "compute_time_ms": t0.elapsed().as_millis() as u64, })).into_response(), - Ok(None) => ( + Ok(Ok(None)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": format!("prefilter '{}' not found", prefilter_name)})), ).into_response(), - Err(e) => ( + Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("refresh_prefilter: {e}")})), ).into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": format!("refresh_prefilter task panicked: {e}")})), + ).into_response(), } } diff --git a/tests/prefilter_integration.rs b/tests/prefilter_integration.rs index 2d2483a..28baaaf 100644 --- a/tests/prefilter_integration.rs +++ b/tests/prefilter_integration.rs @@ -170,6 +170,60 @@ fn prefilter_substitution_preserves_results() { assert_eq!(after_ids, baseline_ids, "results unchanged after unregister"); } +#[test] +fn deleted_docs_excluded_from_stale_prefilter() { + // Engine uses clean-delete: filter bitmaps are kept clean and there is no + // `alive AND filter` in the query hot path. Our cached prefilter bitmap is + // NOT maintained by clean-delete — it's a frozen snapshot. Between + // refreshes a deleted doc's bit stays set in the cached prefilter. Without + // a guard, a query whose clause set matches the prefilter would resurface + // deleted docs. + // + // substitute() intersects the prefilter with live alive at substitute + // time. Verify end-to-end. + let dir = tempfile::tempdir().unwrap(); + let engine = ConcurrentEngine::new_with_path( + test_config(&dir.path().join("bitmaps")), + &dir.path().join("docs"), + ).unwrap(); + + for i in 1..=10u32 { + engine.put(i, &doc(1, true, 500, i as i64 * 10)).unwrap(); + } + wait_for_alive(&engine, 10, 2000); + + let entry = engine.register_prefilter( + "nsfw1".into(), + vec![FilterClause::Eq("nsfwLevel".into(), Value::Integer(1))], + 300, + ).unwrap(); + assert_eq!(entry.cardinality(), 10); + + // Delete 3/5/7 WITHOUT refreshing the prefilter. + engine.delete(3).unwrap(); + engine.delete(5).unwrap(); + engine.delete(7).unwrap(); + wait_for_alive(&engine, 7, 2000); + + // Cached prefilter cardinality is still 10 — confirms the guard has to + // handle stale bitmaps. + assert_eq!(entry.cardinality(), 10, "prefilter bitmap NOT auto-refreshed"); + + let q = BitdexQuery { + filters: vec![FilterClause::Eq("nsfwLevel".into(), Value::Integer(1))], + sort: None, + limit: 100, + cursor: None, + offset: None, + skip_cache: true, + }; + let (result, trace) = engine.execute_query_traced(&q, "test").unwrap(); + assert_eq!(trace.prefilter.as_deref(), Some("nsfw1"), "substitution should have fired"); + let mut ids = result.ids; + ids.sort_unstable(); + assert_eq!(ids, vec![1, 2, 4, 6, 8, 9, 10], "deleted slots 3/5/7 must not leak through"); +} + #[test] fn refresh_updates_cardinality() { let dir = tempfile::tempdir().unwrap();