diff --git a/Cargo.lock b/Cargo.lock index afe1191..98ba513 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -219,7 +219,7 @@ checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" [[package]] name = "bitdex-v2" -version = "1.0.199" +version = "1.0.217" dependencies = [ "ahash", "arc-swap", diff --git a/Cargo.toml b/Cargo.toml index 45adeea..98869ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,9 @@ simd = ["roaring/simd"] dump-timing = [] heap-prof = ["dep:tikv-jemallocator", "dep:tikv-jemalloc-ctl"] serde_yaml = ["dep:serde_yaml"] +# Pre-existing utility binary (silo_copy) that references the removed +# DocStoreV3 type. Build it explicitly with `--features legacy-docstorev3`. +legacy-docstorev3 = [] [dependencies] # Bitmap indexes @@ -192,6 +195,10 @@ required-features = ["server"] [[bin]] name = "silo_copy" path = "src/bin/silo_copy.rs" +# Pre-existing broken: references removed DocStoreV3 type. Gated behind an +# opt-in feature so it doesn't block the default build. Task #132 tracks full +# DocStoreV3 deletion. +required-features = ["legacy-docstorev3"] [[example]] name = "load_from_csv" diff --git a/docs/_in/design-prefilter-registry.md b/docs/_in/design-prefilter-registry.md new file mode 100644 index 0000000..b7632bd --- /dev/null +++ b/docs/_in/design-prefilter-registry.md @@ -0,0 +1,230 @@ +# Prefilter Registry — Design Doc + +**Author:** Ivy +**Date:** 2026-04-14 +**Status:** Proposed +**Estimated effort:** 2-3 days + +## Problem + +At 107M records, every Civitai feed query starts with a ~7-clause safety prefix that returns ~98M slots: + +``` +NOT(availability = Private) +AND blockedFor NOT IN [tos, moderated, CSAM, AiNotVerified] +AND postId IS NOT NULL +AND NOT(poi = true) +AND NOT(minor = true) +AND NOT((nsfwLevel IN [4,8,16,32] AND baseModel IN [SD 3, SD 3.5, ...])) +AND isPublished = true +``` + +Every query re-evaluates this prefix against a 100M-bit accumulator. Trace analysis (v1.0.217, 2026-04-14): + +| Clause | Cost (1 query) | +|--------|----------------| +| 7 simple ANDs on ~72M acc | ~330ms | +| Compound NOT (nsfwLevel × baseModel) | ~588ms | +| **Total safety prefix** | **~900ms** | + +The gap between `filter_us` total and sum of clause costs is <1ms — it's real bitmap work, not lock contention. + +## Prior measurement (validates theory) + +Tested via prod API with `skip_cache=true`: +- Safety prefix alone (6 clauses, no nsfwLevel): 100M records, compute 140-300ms +- Compound NOT alone: 110M records, compute 90-300ms +- Combined ~98M records, ~500ms cold compute + +Expected per-feed-query savings with precomputed combined bitmap: **300-800ms** (1 clone + 1 AND replaces 7-8 clause evaluations). + +## Goals + +1. **Live-registerable** — orchestrator (or ops) POSTs a clause-set + name, server precomputes, caches, and auto-substitutes on matching queries. No code deploys. +2. **Stale-while-revalidate** — periodic refresh without blocking queries. Mild drift is acceptable for the theory test. +3. **Zero risk to existing queries** — queries that don't match a registered prefilter run exactly as before. +4. **Observable** — metrics for substitution rate, cardinality, refresh time. + +## Non-goals (for this phase) + +- Auto-detection of common shapes (save for Phase 2 — requires fingerprint telemetry) +- Hot sync with ops pipeline (accept interval-based refresh + drift) +- Multi-level tree (save for Phase 2 — one flat registry per index is enough) +- Substitution when clauses are *subset* of registered prefilter (exact clause match only for Phase 1) + +## Architecture + +### Data structures + +```rust +// In src/prefilter.rs (new module) +pub struct PrefilterRegistry { + entries: DashMap>, +} + +pub struct PrefilterEntry { + pub name: String, + pub clauses: Vec, // Canonical form (sorted, deduped) + pub bitmap: ArcSwap, // Atomic swap on refresh + pub refresh_interval_secs: u64, + pub last_refreshed: AtomicI64, // Unix seconds + pub compute_duration_nanos: AtomicU64, // Last refresh duration + pub cardinality: AtomicU64, // Cache len() for fast access +} +``` + +The registry lives on `InnerEngine` (not `ConcurrentEngine`) so it participates in the ArcSwap snapshot model. Queries access via `snap.prefilters.lookup_matching(...)`. + +### HTTP API + +``` +POST /api/indexes/{index}/prefilters +Body: { "name": "civitai_safe", "clauses": [...FilterClause...], "refresh_interval_secs": 300 } +Response: { "cardinality": 98274619, "compute_time_ms": 487 } + +GET /api/indexes/{index}/prefilters +Response: { "prefilters": [ + { "name": "civitai_safe", "cardinality": 98274619, "last_refreshed": "2026-04-14T03:15:22Z", + "refresh_interval_secs": 300, "last_compute_ms": 487 } +]} + +DELETE /api/indexes/{index}/prefilters/{name} +Response: 204 No Content +``` + +### Background refresh thread (stale-while-revalidate) + +Single thread per index, spawned at boot. Loop: + +``` +every tick (e.g., 10s): + for entry in registry: + if now - entry.last_refreshed >= entry.refresh_interval_secs: + let new_bitmap = compute(entry.clauses) // ~300-500ms, off the query path + entry.bitmap.store(Arc::new(new_bitmap)) // Atomic swap + entry.last_refreshed.store(now) + entry.cardinality.store(new_bitmap.len()) +``` + +Queries in flight during the swap keep using the old bitmap (ArcSwap semantics). Next query gets the new one. + +**Compute path:** reuse `executor.compute_filters(&entry.clauses)` — same code the query planner uses. + +### Query planner substitution + +In `resolve_filters_traced` (src/concurrent_engine.rs:5581), before `plan_query_with_context`: + +```rust +let (effective_filters, substituted_prefilter) = if let Some(registry) = self.prefilters() { + registry.substitute(filters) // Returns (new_clauses, Some(prefilter_name)) or (filters, None) +} else { + (filters, None) +}; +``` + +**Substitution algorithm:** + +1. Canonicalize the query's clause set (sorted, deduped). +2. For each registered prefilter, check if its clause set is a subset of the query's clause set (exact clause matching — `FilterClause: PartialEq` already works). +3. Prefer the prefilter with the **largest** matched clause set (most work saved). +4. If match: remove the matched clauses from the query's list, prepend a `FilterClause::BucketBitmap { field: "__prefilter", bucket_name: prefilter.name, bitmap: prefilter.bitmap.load_full() }`. +5. Log + metric: `prefilter_substitutions_total{name}`. + +`BucketBitmap` evaluation is already just `acc &= bitmap` (executor.rs:466) — zero new code on the hot path. + +### Canonical clause form (matching key) + +For exact-match detection, both registered and query clauses must be normalized: + +- `In`/`NotIn` value lists sorted +- Top-level `And` flattened into the clause list +- Preserve `Not(Not(X))` = X collapsing +- Hashable canonical form (for future fingerprinting) + +Implementation: new `canonicalize_clause(&FilterClause) -> FilterClause` in src/prefilter.rs. Uses `FilterClause::PartialEq` after canonicalization. + +### Race considerations + +**Register while query in flight:** New prefilter doesn't affect in-flight queries — they've already picked their snapshot. First query after publish sees the new registry. + +**Refresh during substitution:** Substitution calls `bitmap.load_full()` once, holds Arc until query completes. Refresh thread may swap in parallel — new snapshot won't affect the Arc already held. + +**Invalidation on schema change (index config PATCH):** If a filter field is removed or changed, registered prefilters with that field become invalid. For Phase 1, leave them stale; operator responsibility to re-register. Phase 2 can auto-invalidate. + +## Correctness + +**Exact clause match** means result equivalence is trivially guaranteed — we're substituting: +``` +acc = compute(A ∧ B ∧ C ∧ rest_of_query) +``` +with: +``` +acc = prefilter_bitmap(A ∧ B ∧ C) ∧ rest_of_query +``` + +Both evaluate to the same set, assuming the prefilter bitmap is an accurate materialization of A ∧ B ∧ C at refresh time. + +**Drift:** Between refreshes, data changes mean the prefilter bitmap diverges from a fresh evaluation. At 107M records with 5min refresh: +- ~100 ops/sec → ~30,000 ops per refresh interval +- Most ops don't touch the prefilter's fields; only those that do cause drift +- Worst case: a newly-published image shows up as "not in prefilter" until next refresh = user sees result ~5min late +- Not a correctness bug; same semantics as any eventually-consistent cache + +**Drift-free fallback (optional):** If drift is unacceptable, we can track ops that touch prefilter fields and apply them incrementally (same pattern as alive_bitmap). Phase 2 work. + +## Testing plan + +### Unit tests (src/prefilter.rs) +- Register + lookup round trip +- Substitute on matching query +- No substitute on non-matching query +- Prefer largest match when multiple prefilters apply +- Clause canonicalization (sorted In values, flattened And, Not(Not(X)) = X) + +### Integration test (crates/bitdex/tests/) +- Full server boot, register prefilter, run matching + non-matching queries, verify result equivalence +- Run 100 queries mid-refresh, verify correctness throughout swap + +### Local 109M test +- Register civitai_safe via POST +- Run feed queries with and without matching prefix +- Verify: result counts identical, filter_us drops 300-800ms +- Monitor RSS, refresh latency + +## Rollout plan + +1. **Phase 1a: Register + compute + GET/DELETE (no substitution)** — Deploy, orchestrator registers prefilters, no query-path change. Validates compute feasibility at prod scale without risk. +2. **Phase 1b: Enable substitution behind query flag** — New `apply_prefilter: Option` on `BitdexQuery`. Orchestrator opts in per-query. Zero impact on non-opted-in queries. +3. **Phase 1c: Auto-substitute by clause match** — Remove the flag requirement; substitute automatically when a registered prefilter's clauses are a subset of the query's. Defaults to on, has config kill switch. + +Each phase bakes for ~1 hour in prod before advancing. + +## Metrics + +| Name | Type | Purpose | +|------|------|---------| +| `bitdex_prefilter_registered_total{index}` | gauge | Number of registered prefilters | +| `bitdex_prefilter_cardinality{index, name}` | gauge | Current bitmap cardinality | +| `bitdex_prefilter_substitutions_total{index, name}` | counter | Queries that matched this prefilter | +| `bitdex_prefilter_refresh_seconds{index, name}` | histogram | Time to recompute bitmap | +| `bitdex_prefilter_refresh_errors_total{index, name}` | counter | Refresh failures (compute error, etc) | + +Dashboard: substitution rate by prefilter, last-refresh age per prefilter, filter_seconds histogram grouped by `substituted` label. + +## Open questions + +1. **Clause ordering for canonicalization** — should `In([1,2,3])` and `Or(Eq(1), Eq(2), Eq(3))` canonicalize to the same form? Proposed: no, keep them distinct. Orchestrator can register both if they want coverage. +2. **Max registered prefilters** — bound the registry size to prevent memory explosion. Proposed: 32 per index, configurable. +3. **Persistence** — should the registry survive restart? Proposed: no for Phase 1; orchestrator re-registers on startup (health check triggers push). Phase 2 can add disk persistence. +4. **Compute during eager_load blocking boot** — don't compute prefilters until after the server is serving queries. First refresh happens asynchronously after boot. + +## Files changed (estimate) + +- `src/prefilter.rs` — new module, ~300 lines +- `src/concurrent_engine.rs` — hook into resolve_filters_traced, ~40 lines +- `src/server.rs` — 3 endpoints (POST/GET/DELETE), ~150 lines +- `src/query.rs` — canonicalize_clause helper, ~50 lines +- `src/metrics.rs` — 5 new metrics, ~30 lines +- `tests/prefilter_integration.rs` — new test, ~200 lines + +Total: ~770 lines of new/modified code. diff --git a/src/concurrent_engine.rs b/src/concurrent_engine.rs index 66dd759..af3ab9c 100644 --- a/src/concurrent_engine.rs +++ b/src/concurrent_engine.rs @@ -416,6 +416,11 @@ pub struct ConcurrentEngine { /// The WAL reader thread picks up ops and routes through apply_ops_batch. #[cfg(feature = "pg-sync")] wal_writer: Option>, + /// Registry of named precomputed filter bitmaps. Queries whose canonical + /// clause set is a superset of a registered prefilter's clauses will have + /// those clauses elided and replaced with a single AND against the cached + /// bitmap. See src/prefilter.rs and docs/_in/design-prefilter-registry.md. + prefilters: Arc, } impl ConcurrentEngine { /// Create a new concurrent engine with a temp docstore (for testing). @@ -1175,6 +1180,7 @@ impl ConcurrentEngine { doc_cache_eviction_handle: None, #[cfg(feature = "pg-sync")] wal_writer: None, + prefilters: Arc::new(crate::prefilter::PrefilterRegistry::new()), }); } let flush_handle = { @@ -3228,6 +3234,7 @@ impl ConcurrentEngine { doc_cache_eviction_handle, #[cfg(feature = "pg-sync")] wal_writer: None, + prefilters: Arc::new(crate::prefilter::PrefilterRegistry::new()), }) } /// Set the string maps for MappedString field query resolution. @@ -5601,11 +5608,18 @@ impl ConcurrentEngine { } else { filters }; + // Substitute registered prefilters: if the query's canonical clause + // set contains a registered prefilter's clauses as a subset, replace + // those clauses with a single BucketBitmap AND. See src/prefilter.rs. + let (post_prefilter, substituted) = crate::prefilter::substitute(&self.prefilters, effective_filters); + if let Some(ref entry) = substituted { + collector.prefilter_name = Some(entry.name.clone()); + } let planner_ctx = planner::PlannerContext { string_maps: executor.string_maps(), dictionaries: executor.dictionaries(), }; - let plan = planner::plan_query_with_context(effective_filters, executor.filter_index(), executor.slot_allocator(), Some(&planner_ctx)); + let plan = planner::plan_query_with_context(&post_prefilter, executor.filter_index(), executor.slot_allocator(), Some(&planner_ctx)); let filter_bitmap = Arc::new(executor.compute_filters_traced(&plan.ordered_clauses, Some(collector))?); Ok((filter_bitmap, plan.use_simple_sort)) } @@ -5638,11 +5652,15 @@ impl ConcurrentEngine { } else { filters }; + // Substitute registered prefilters: if the query's canonical clause + // set is a superset of a registered prefilter's clauses, replace + // those clauses with a single BucketBitmap AND. See src/prefilter.rs. + let (post_prefilter, _entry) = crate::prefilter::substitute(&self.prefilters, effective_filters); let planner_ctx = planner::PlannerContext { string_maps: executor.string_maps(), dictionaries: executor.dictionaries(), }; - let plan = planner::plan_query_with_context(effective_filters, executor.filter_index(), executor.slot_allocator(), Some(&planner_ctx)); + let plan = planner::plan_query_with_context(&post_prefilter, executor.filter_index(), executor.slot_allocator(), Some(&planner_ctx)); let filter_bitmap = Arc::new(executor.compute_filters(&plan.ordered_clauses)?); Ok((filter_bitmap, plan.use_simple_sort)) } @@ -6715,6 +6733,141 @@ impl ConcurrentEngine { pub fn clear_unified_cache(&self) { self.unified_cache.lock().clear(); } + + /// Access the prefilter registry. Used by HTTP handlers and tests. + /// See `src/prefilter.rs`. + pub fn prefilters(&self) -> &Arc { + &self.prefilters + } + + /// Register (or replace) a prefilter by name: compute its bitmap from the + /// current snapshot, then store it in the registry for later substitution. + /// + /// Returns the newly-registered entry on success. + pub fn register_prefilter( + &self, + name: String, + clauses: Vec, + refresh_interval_secs: u64, + ) -> Result> { + use crate::executor::QueryExecutor; + // Load any fields referenced by the clauses so compute_filters sees + // real bitmaps rather than empty lazy placeholders. + self.ensure_fields_loaded(&clauses, None)?; + + let snap = self.snapshot(); + let tb_guard = self.time_buckets.as_ref().map(|tb| tb.load_full()); + let now_unix = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let executor = { + let mut base = QueryExecutor::new( + &snap.slots, + &snap.filters, + &snap.sorts, + self.config.max_page_size, + ).with_not_null_bitmaps(&snap.not_null_bitmaps); + if let Some(ref maps) = self.string_maps { + base = base.with_string_maps(maps); + } + if let Some(ref cs) = self.case_sensitive_fields { + base = base.with_case_sensitive_fields(cs); + } + if !self.dictionaries.is_empty() { + base = base.with_dictionaries(&self.dictionaries); + } + if let Some(ref tb) = tb_guard { + base.with_time_buckets(tb, now_unix) + } else { + base + } + }; + + let canonical = crate::prefilter::canonicalize_clauses(&clauses); + let planner_ctx = planner::PlannerContext { + string_maps: executor.string_maps(), + dictionaries: executor.dictionaries(), + }; + let plan = planner::plan_query_with_context( + &canonical, executor.filter_index(), executor.slot_allocator(), Some(&planner_ctx), + ); + + let start = std::time::Instant::now(); + let bitmap = executor.compute_filters(&plan.ordered_clauses)?; + let compute_ns = start.elapsed().as_nanos() as u64; + + self.prefilters + .insert(name, clauses, bitmap, refresh_interval_secs, compute_ns) + .map_err(|e| crate::error::BitdexError::QueryParse(format!("prefilter: {e}"))) + } + + /// Recompute the bitmap for a registered prefilter and publish it via + /// ArcSwap. Called by the background refresh thread and manual refresh + /// endpoints. No-op if `name` is not registered. + pub fn refresh_prefilter(&self, name: &str) -> Result>> { + use crate::executor::QueryExecutor; + let entry = match self.prefilters.get(name) { + Some(e) => e, + None => return Ok(None), + }; + // Load any referenced fields (they may have been evicted since last refresh). + self.ensure_fields_loaded(&entry.clauses, None)?; + + let snap = self.snapshot(); + let tb_guard = self.time_buckets.as_ref().map(|tb| tb.load_full()); + let now_unix = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let executor = { + let mut base = QueryExecutor::new( + &snap.slots, + &snap.filters, + &snap.sorts, + self.config.max_page_size, + ).with_not_null_bitmaps(&snap.not_null_bitmaps); + if let Some(ref maps) = self.string_maps { + base = base.with_string_maps(maps); + } + if let Some(ref cs) = self.case_sensitive_fields { + base = base.with_case_sensitive_fields(cs); + } + if !self.dictionaries.is_empty() { + base = base.with_dictionaries(&self.dictionaries); + } + if let Some(ref tb) = tb_guard { + base.with_time_buckets(tb, now_unix) + } else { + base + } + }; + let planner_ctx = planner::PlannerContext { + string_maps: executor.string_maps(), + dictionaries: executor.dictionaries(), + }; + let plan = planner::plan_query_with_context( + &entry.clauses, executor.filter_index(), executor.slot_allocator(), Some(&planner_ctx), + ); + + let start = std::time::Instant::now(); + match executor.compute_filters(&plan.ordered_clauses) { + Ok(bm) => { + let compute_ns = start.elapsed().as_nanos() as u64; + entry.publish_refresh(bm, compute_ns); + Ok(Some(entry)) + } + Err(e) => { + entry.refresh_errors.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + Err(e) + } + } + } + + /// Remove a registered prefilter. Returns `true` if it existed. + pub fn unregister_prefilter(&self, name: &str) -> bool { + self.prefilters.remove(name) + } /// Purge the entire BoundStore: disk first, then memory. /// Order matters: wipe disk before clearing RAM to prevent stale shard loads. /// Safe to call while the server is running — the merge thread will simply diff --git a/src/lib.rs b/src/lib.rs index 63f48b9..387f3b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ pub mod meta_index; pub mod mutation; pub mod parser; pub mod planner; +pub mod prefilter; pub mod preset; pub mod query; pub mod query_metrics; diff --git a/src/prefilter.rs b/src/prefilter.rs new file mode 100644 index 0000000..c01c646 --- /dev/null +++ b/src/prefilter.rs @@ -0,0 +1,543 @@ +//! Prefilter Registry — named, precomputed filter bitmaps that the query +//! planner can substitute into matching queries. +//! +//! See `docs/_in/design-prefilter-registry.md` for the full design. +//! +//! A prefilter is a named clause set (e.g. the 7-clause Civitai safety prefix) +//! whose bitmap we compute once and cache. When a query's canonicalized clause +//! set is a superset of a registered prefilter's clauses, those clauses are +//! elided from the query and replaced with a single `BucketBitmap` AND against +//! the cached bitmap — turning ~900 ms of filter work into a single intersect. +//! +//! The registry lives on `ConcurrentEngine` (outside the `ArcSwap` +//! snapshot) so registrations don't force a snapshot publish. Each entry's +//! bitmap is an `ArcSwap` so a refresh thread can publish a +//! fresh version without disrupting in-flight queries. + +use arc_swap::ArcSwap; +use dashmap::DashMap; +use roaring::RoaringBitmap; +use serde::{Deserialize, Serialize}; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::query::{FilterClause, Value}; + +pub const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 300; +pub const MIN_REFRESH_INTERVAL_SECS: u64 = 30; +pub const MAX_REFRESH_INTERVAL_SECS: u64 = 86_400; +pub const MAX_REGISTERED_PREFILTERS: usize = 32; + +/// A named precomputed filter result. Shared across query threads via `Arc`. +pub struct PrefilterEntry { + pub name: String, + /// Canonical clause set (sorted, deduped, And-flattened). Queries whose + /// canonical clause set is a superset of this will have these clauses + /// elided and replaced by a single AND against `bitmap`. + pub clauses: Vec, + /// The precomputed bitmap — the conjunction of `clauses`. + pub bitmap: ArcSwap, + pub refresh_interval_secs: AtomicU64, + /// Unix seconds of last successful refresh. + pub last_refreshed: AtomicI64, + /// Nanoseconds spent on the last compute. + pub compute_duration_nanos: AtomicU64, + /// Cached bitmap cardinality so listing doesn't pay `bitmap.len()`. + pub cardinality: AtomicU64, + pub refresh_errors: AtomicU64, + /// Cumulative number of queries that substituted this prefilter. + pub substitutions: AtomicU64, +} + +impl PrefilterEntry { + pub fn cardinality(&self) -> u64 { self.cardinality.load(Ordering::Relaxed) } + pub fn last_refreshed(&self) -> i64 { self.last_refreshed.load(Ordering::Relaxed) } + pub fn refresh_interval_secs(&self) -> u64 { self.refresh_interval_secs.load(Ordering::Relaxed) } + pub fn compute_ms(&self) -> u64 { self.compute_duration_nanos.load(Ordering::Relaxed) / 1_000_000 } + pub fn refresh_errors(&self) -> u64 { self.refresh_errors.load(Ordering::Relaxed) } + pub fn substitutions(&self) -> u64 { self.substitutions.load(Ordering::Relaxed) } + + pub fn is_stale(&self, now_unix: i64) -> bool { + let interval = self.refresh_interval_secs() as i64; + now_unix.saturating_sub(self.last_refreshed()) >= interval + } + + /// Atomically publish a fresh bitmap. In-flight queries holding the old + /// `Arc` continue reading the old version. + pub fn publish_refresh(&self, bitmap: RoaringBitmap, compute_duration_nanos: u64) { + let card = bitmap.len(); + self.bitmap.store(Arc::new(bitmap)); + self.cardinality.store(card, Ordering::Relaxed); + self.compute_duration_nanos.store(compute_duration_nanos, Ordering::Relaxed); + self.last_refreshed.store(now_unix_secs(), Ordering::Relaxed); + } +} + +pub struct PrefilterRegistry { + entries: DashMap>, + max_entries: usize, +} + +impl PrefilterRegistry { + pub fn new() -> Self { + Self { + entries: DashMap::new(), + max_entries: MAX_REGISTERED_PREFILTERS, + } + } + + pub fn len(&self) -> usize { self.entries.len() } + pub fn is_empty(&self) -> bool { self.entries.is_empty() } + + pub fn get(&self, name: &str) -> Option> { + self.entries.get(name).map(|r| Arc::clone(r.value())) + } + + pub fn remove(&self, name: &str) -> bool { + self.entries.remove(name).is_some() + } + + /// Register a prefilter. If `name` already exists, the existing entry is + /// replaced. Clauses are canonicalized before storage so future matching + /// is a pure structural equality check. + pub fn insert( + &self, + name: String, + clauses: Vec, + bitmap: RoaringBitmap, + refresh_interval_secs: u64, + compute_duration_nanos: u64, + ) -> Result, RegistryError> { + let interval = refresh_interval_secs.clamp(MIN_REFRESH_INTERVAL_SECS, MAX_REFRESH_INTERVAL_SECS); + + if clauses.is_empty() { + return Err(RegistryError::InvalidClauses("clause list is empty".into())); + } + + let canonical = canonicalize_clauses(&clauses); + if canonical.is_empty() { + return Err(RegistryError::InvalidClauses("clauses reduced to empty after canonicalization".into())); + } + + if !self.entries.contains_key(&name) && self.entries.len() >= self.max_entries { + return Err(RegistryError::RegistryFull(self.max_entries)); + } + + let card = bitmap.len(); + let entry = Arc::new(PrefilterEntry { + name: name.clone(), + clauses: canonical, + bitmap: ArcSwap::from(Arc::new(bitmap)), + refresh_interval_secs: AtomicU64::new(interval), + last_refreshed: AtomicI64::new(now_unix_secs()), + compute_duration_nanos: AtomicU64::new(compute_duration_nanos), + cardinality: AtomicU64::new(card), + refresh_errors: AtomicU64::new(0), + substitutions: AtomicU64::new(0), + }); + + self.entries.insert(name, Arc::clone(&entry)); + Ok(entry) + } + + pub fn entries(&self) -> Vec> { + self.entries.iter().map(|r| Arc::clone(r.value())).collect() + } + + pub fn list(&self) -> Vec { + self.entries() + .iter() + .map(|e| PrefilterInfo { + name: e.name.clone(), + cardinality: e.cardinality(), + last_refreshed: e.last_refreshed(), + refresh_interval_secs: e.refresh_interval_secs(), + last_compute_ms: e.compute_ms(), + refresh_errors: e.refresh_errors(), + substitutions: e.substitutions(), + clauses: e.clauses.clone(), + }) + .collect() + } + + /// Find a registered prefilter whose canonical clause set is a subset of + /// `canonical_query_clauses`. Prefers the prefilter with the most matched + /// clauses (largest work saved). + /// + /// Returns the entry and the sorted indices (in `canonical_query_clauses`) + /// that were matched — callers should remove those indices from the query + /// clause list and prepend a `BucketBitmap` clause. + pub fn find_substitution( + &self, + canonical_query_clauses: &[FilterClause], + ) -> Option<(Arc, Vec)> { + if canonical_query_clauses.is_empty() || self.entries.is_empty() { + return None; + } + let mut best: Option<(Arc, Vec)> = None; + for r in self.entries.iter() { + let entry = r.value(); + if let Some(matched) = subset_match(&entry.clauses, canonical_query_clauses) { + let better = match &best { + None => true, + Some((_, prev)) => matched.len() > prev.len(), + }; + if better { + best = Some((Arc::clone(entry), matched)); + } + } + } + best + } +} + +impl Default for PrefilterRegistry { + fn default() -> Self { Self::new() } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PrefilterInfo { + pub name: String, + pub cardinality: u64, + pub last_refreshed: i64, + pub refresh_interval_secs: u64, + pub last_compute_ms: u64, + pub refresh_errors: u64, + pub substitutions: u64, + pub clauses: Vec, +} + +#[derive(Debug, thiserror::Error)] +pub enum RegistryError { + #[error("prefilter registry full (max={0})")] + RegistryFull(usize), + #[error("invalid clause set: {0}")] + InvalidClauses(String), +} + +// ─── Canonicalization ────────────────────────────────────────────────────── + +/// Canonicalize a clause set for structural matching: +/// - Flatten nested `And` at the top level. +/// - Sort `In`/`NotIn` value lists. +/// - Collapse `Not(Not(X))` to `X`. +/// - Canonicalize children of `And`/`Or`/`Not` recursively. +/// - Sort top-level clauses into a deterministic order and dedupe. +/// +/// Does NOT try to prove `Or` ↔ `In` equivalence or reorder `Or` children +/// (handled at a higher level if needed). +pub fn canonicalize_clauses(clauses: &[FilterClause]) -> Vec { + let mut flat: Vec = Vec::with_capacity(clauses.len()); + for c in clauses { + flatten_and_into(c.clone(), &mut flat); + } + let mut out: Vec = flat.into_iter().map(canonicalize_clause).collect(); + out.sort_by(|a, b| a.to_string().cmp(&b.to_string())); + out.dedup(); + out +} + +fn flatten_and_into(clause: FilterClause, out: &mut Vec) { + match clause { + FilterClause::And(children) => { + for c in children { + flatten_and_into(c, out); + } + } + other => out.push(other), + } +} + +fn canonicalize_clause(clause: FilterClause) -> FilterClause { + match clause { + FilterClause::In(field, mut values) => { + sort_values(&mut values); + values.dedup(); + FilterClause::In(field, values) + } + FilterClause::NotIn(field, mut values) => { + sort_values(&mut values); + values.dedup(); + FilterClause::NotIn(field, values) + } + FilterClause::Not(inner) => { + if let FilterClause::Not(inner2) = *inner { + return canonicalize_clause(*inner2); + } + FilterClause::Not(Box::new(canonicalize_clause(*inner))) + } + FilterClause::And(children) => { + let mut flat = Vec::new(); + for c in children { + flatten_and_into(c, &mut flat); + } + let mut canon: Vec = flat.into_iter().map(canonicalize_clause).collect(); + canon.sort_by(|a, b| a.to_string().cmp(&b.to_string())); + canon.dedup(); + FilterClause::And(canon) + } + FilterClause::Or(children) => { + let canon: Vec = children.into_iter().map(canonicalize_clause).collect(); + FilterClause::Or(canon) + } + other => other, + } +} + +fn sort_values(values: &mut [Value]) { + values.sort_by(|a, b| a.to_string().cmp(&b.to_string())); +} + +/// Check whether every clause in `prefilter_clauses` appears in +/// `query_clauses`. Returns the matched query-side indices (sorted) on +/// success. Both inputs MUST already be canonicalized. +fn subset_match( + prefilter_clauses: &[FilterClause], + query_clauses: &[FilterClause], +) -> Option> { + if prefilter_clauses.is_empty() || prefilter_clauses.len() > query_clauses.len() { + return None; + } + let mut used = vec![false; query_clauses.len()]; + let mut matched = Vec::with_capacity(prefilter_clauses.len()); + for pc in prefilter_clauses { + let mut found = None; + for (i, qc) in query_clauses.iter().enumerate() { + if !used[i] && qc == pc { + found = Some(i); + break; + } + } + match found { + Some(i) => { used[i] = true; matched.push(i); } + None => return None, + } + } + matched.sort_unstable(); + Some(matched) +} + +fn now_unix_secs() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0) +} + +/// Apply a registered prefilter to a query clause list. +/// +/// Input is the raw (post-bucket-snap) clause list. Returns a new clause list +/// where the matched prefilter clauses have been removed and a single +/// `BucketBitmap` clause prepended. If no prefilter matches, returns +/// `(original_clauses, None)`. +/// +/// This is the core substitution routine used by the query planner. +pub fn substitute<'a>( + registry: &PrefilterRegistry, + query_clauses: &'a [FilterClause], +) -> (std::borrow::Cow<'a, [FilterClause]>, Option>) { + use std::borrow::Cow; + if registry.is_empty() { + return (Cow::Borrowed(query_clauses), None); + } + let canonical_query = canonicalize_clauses(query_clauses); + let (entry, matched_in_canonical) = match registry.find_substitution(&canonical_query) { + Some(x) => x, + None => return (Cow::Borrowed(query_clauses), None), + }; + + // `matched_in_canonical` refers to indices in `canonical_query`, not in + // the input `query_clauses`. Map by structural equality: remove each + // matched canonical clause from a canonical copy of the query, then + // rebuild the resulting clause list. + // + // Since we drop the matched clauses and evaluate against the prefilter + // bitmap instead, the remaining clause order doesn't need to match the + // caller's input — the planner will re-order anyway. + let mut remaining: Vec = Vec::with_capacity(canonical_query.len() + 1); + for (i, c) in canonical_query.into_iter().enumerate() { + if !matched_in_canonical.contains(&i) { + remaining.push(c); + } + } + + let bitmap = entry.bitmap.load_full(); + let bucket_clause = FilterClause::BucketBitmap { + field: "__prefilter".to_string(), + bucket_name: entry.name.clone(), + bitmap, + }; + let mut out = Vec::with_capacity(remaining.len() + 1); + out.push(bucket_clause); + out.extend(remaining); + + entry.substitutions.fetch_add(1, Ordering::Relaxed); + (Cow::Owned(out), Some(entry)) +} + +// ─── Tests ───────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::query::{FilterClause, Value}; + + fn eq(field: &str, val: i64) -> FilterClause { + FilterClause::Eq(field.into(), Value::Integer(val)) + } + fn in_(field: &str, vals: &[i64]) -> FilterClause { + FilterClause::In(field.into(), vals.iter().map(|v| Value::Integer(*v)).collect()) + } + + #[test] + fn canonicalize_sorts_in_values() { + let c = canonicalize_clause(in_("x", &[3, 1, 2])); + assert_eq!(c, in_("x", &[1, 2, 3])); + } + + #[test] + fn canonicalize_flattens_and() { + let c = vec![FilterClause::And(vec![ + eq("a", 1), + FilterClause::And(vec![eq("b", 2)]), + ])]; + let canon = canonicalize_clauses(&c); + assert_eq!(canon, vec![eq("a", 1), eq("b", 2)]); + } + + #[test] + fn canonicalize_collapses_double_not() { + let c = FilterClause::Not(Box::new(FilterClause::Not(Box::new(eq("a", 1))))); + assert_eq!(canonicalize_clause(c), eq("a", 1)); + } + + #[test] + fn canonicalize_sorts_top_level_and_dedupes() { + let c = vec![eq("b", 2), eq("a", 1), eq("a", 1)]; + let canon = canonicalize_clauses(&c); + assert_eq!(canon.len(), 2); + assert_eq!(canon[0].to_string(), "a = 1"); + assert_eq!(canon[1].to_string(), "b = 2"); + } + + #[test] + fn subset_match_detects_matching_subset() { + let prefilter = canonicalize_clauses(&[eq("a", 1), eq("b", 2)]); + let query = canonicalize_clauses(&[eq("a", 1), eq("b", 2), eq("c", 3)]); + let m = subset_match(&prefilter, &query).unwrap(); + assert_eq!(m.len(), 2); + } + + #[test] + fn subset_match_rejects_partial_overlap() { + let prefilter = canonicalize_clauses(&[eq("a", 1), eq("b", 2)]); + let query = canonicalize_clauses(&[eq("a", 1), eq("c", 3)]); + assert!(subset_match(&prefilter, &query).is_none()); + } + + #[test] + fn registry_insert_and_get() { + let reg = PrefilterRegistry::new(); + let mut bm = RoaringBitmap::new(); + bm.insert(42); + let entry = reg.insert("test".into(), vec![eq("a", 1)], bm, 300, 1_000_000).unwrap(); + assert_eq!(entry.cardinality(), 1); + let got = reg.get("test").unwrap(); + assert_eq!(got.cardinality(), 1); + assert_eq!(got.refresh_interval_secs(), 300); + } + + #[test] + fn registry_replaces_existing_name() { + let reg = PrefilterRegistry::new(); + let mut bm1 = RoaringBitmap::new(); bm1.insert(1); + let mut bm2 = RoaringBitmap::new(); bm2.insert(2); bm2.insert(3); + reg.insert("x".into(), vec![eq("a", 1)], bm1, 300, 0).unwrap(); + reg.insert("x".into(), vec![eq("a", 1)], bm2, 300, 0).unwrap(); + assert_eq!(reg.get("x").unwrap().cardinality(), 2); + assert_eq!(reg.len(), 1); + } + + #[test] + fn registry_enforces_max() { + let mut reg = PrefilterRegistry::new(); + reg.max_entries = 2; + reg.insert("a".into(), vec![eq("x", 1)], RoaringBitmap::new(), 300, 0).unwrap(); + reg.insert("b".into(), vec![eq("x", 2)], RoaringBitmap::new(), 300, 0).unwrap(); + let err = reg.insert("c".into(), vec![eq("x", 3)], RoaringBitmap::new(), 300, 0); + assert!(matches!(err, Err(RegistryError::RegistryFull(_)))); + } + + #[test] + fn registry_rejects_empty_clauses() { + let reg = PrefilterRegistry::new(); + let err = reg.insert("x".into(), vec![], RoaringBitmap::new(), 300, 0); + assert!(matches!(err, Err(RegistryError::InvalidClauses(_)))); + } + + #[test] + fn registry_clamps_interval() { + let reg = PrefilterRegistry::new(); + let e1 = reg.insert("a".into(), vec![eq("x", 1)], RoaringBitmap::new(), 5, 0).unwrap(); + assert_eq!(e1.refresh_interval_secs(), MIN_REFRESH_INTERVAL_SECS); + let e2 = reg.insert("b".into(), vec![eq("x", 1)], RoaringBitmap::new(), 999_999, 0).unwrap(); + assert_eq!(e2.refresh_interval_secs(), MAX_REFRESH_INTERVAL_SECS); + } + + #[test] + fn find_substitution_prefers_largest_match() { + let reg = PrefilterRegistry::new(); + reg.insert("small".into(), vec![eq("a", 1)], RoaringBitmap::new(), 300, 0).unwrap(); + reg.insert("big".into(), vec![eq("a", 1), eq("b", 2)], RoaringBitmap::new(), 300, 0).unwrap(); + let query = canonicalize_clauses(&[eq("a", 1), eq("b", 2), eq("c", 3)]); + let (entry, matched) = reg.find_substitution(&query).unwrap(); + assert_eq!(entry.name, "big"); + assert_eq!(matched.len(), 2); + } + + #[test] + fn substitute_replaces_matched_clauses_with_bucket_bitmap() { + let reg = PrefilterRegistry::new(); + let mut bm = RoaringBitmap::new(); + bm.insert(10); bm.insert(20); + reg.insert("safe".into(), vec![eq("a", 1), eq("b", 2)], bm, 300, 0).unwrap(); + let query = vec![eq("c", 3), eq("a", 1), eq("b", 2)]; + let (out, entry) = substitute(®, &query); + let entry = entry.unwrap(); + assert_eq!(entry.name, "safe"); + assert_eq!(out.len(), 2); + // First clause must be the BucketBitmap substitution. + match &out[0] { + FilterClause::BucketBitmap { bucket_name, bitmap, .. } => { + assert_eq!(bucket_name, "safe"); + assert_eq!(bitmap.len(), 2); + } + other => panic!("expected BucketBitmap first, got {other:?}"), + } + // Remaining clause should be `c = 3` (the un-matched clause). + match &out[1] { + FilterClause::Eq(f, _) => assert_eq!(f, "c"), + other => panic!("expected Eq(c,..), got {other:?}"), + } + } + + #[test] + fn substitute_is_noop_when_no_match() { + let reg = PrefilterRegistry::new(); + reg.insert("safe".into(), vec![eq("a", 1)], RoaringBitmap::new(), 300, 0).unwrap(); + let query = vec![eq("b", 2)]; + let (out, entry) = substitute(®, &query); + assert!(entry.is_none()); + assert_eq!(out.len(), 1); + } + + #[test] + fn substitute_is_noop_on_empty_registry() { + let reg = PrefilterRegistry::new(); + let query = vec![eq("a", 1)]; + let (out, entry) = substitute(®, &query); + assert!(entry.is_none()); + assert_eq!(out.len(), 1); + } +} diff --git a/src/query_metrics.rs b/src/query_metrics.rs index 34694b3..ec710ab 100644 --- a/src/query_metrics.rs +++ b/src/query_metrics.rs @@ -43,6 +43,11 @@ pub struct QueryTrace { pub cache_hold_us: u64, /// Time from function entry to just before cache lookup. pub pre_cache_us: u64, + /// Name of the registered prefilter that was substituted into this query, + /// if any. When set, the first ClauseTrace's `op` will be "BucketBitmap" + /// and a chunk of the canonical clause set was elided. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub prefilter: Option, pub clauses: Vec, pub sort: Option, } @@ -84,6 +89,8 @@ pub struct QueryTraceCollector { pub cache_lock_wait_us: u64, pub cache_hold_us: u64, pub pre_cache_us: u64, + /// Name of a registered prefilter that was substituted into this query. + pub prefilter_name: Option, pub clauses: Vec, pub sort: Option, } @@ -100,6 +107,7 @@ impl QueryTraceCollector { cache_lock_wait_us: 0, cache_hold_us: 0, pre_cache_us: 0, + prefilter_name: None, clauses: Vec::new(), sort: None, } @@ -133,6 +141,7 @@ impl QueryTraceCollector { cache_lock_wait_us: self.cache_lock_wait_us, cache_hold_us: self.cache_hold_us, pre_cache_us: self.pre_cache_us, + prefilter: self.prefilter_name, clauses: self.clauses, sort: self.sort, } diff --git a/src/server.rs b/src/server.rs index 89f4579..22b69f3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1399,6 +1399,9 @@ impl BitdexServer { .route("/api/indexes/{name}/time-buckets/rebuild", post(handle_rebuild_time_buckets)) .route("/api/indexes/{name}/snapshot", post(handle_save_snapshot)) .route("/api/indexes/{name}/cursors/{cursor_name}", put(handle_set_cursor)) + .route("/api/indexes/{name}/prefilters", post(handle_register_prefilter).get(handle_list_prefilters)) + .route("/api/indexes/{name}/prefilters/{prefilter_name}", delete(handle_delete_prefilter)) + .route("/api/indexes/{name}/prefilters/{prefilter_name}/refresh", post(handle_refresh_prefilter)) // Capture endpoints (Phase 2) .route("/debug/capture/start", post(handle_capture_start)) .route("/debug/capture/stop", post(handle_capture_stop)) @@ -4375,6 +4378,159 @@ async fn handle_save_snapshot( } } +// --------------------------------------------------------------------------- +// Handlers: Prefilter registry +// --------------------------------------------------------------------------- + +#[derive(serde::Deserialize)] +struct RegisterPrefilterRequest { + name: String, + clauses: Vec, + #[serde(default = "default_refresh_interval")] + refresh_interval_secs: u64, +} + +fn default_refresh_interval() -> u64 { + crate::prefilter::DEFAULT_REFRESH_INTERVAL_SECS +} + +/// POST /api/indexes/{name}/prefilters — register (or replace) a prefilter. +/// Computes the bitmap synchronously from the current snapshot. +async fn handle_register_prefilter( + State(state): State, + AxumPath(name): AxumPath, + Json(req): Json, +) -> impl IntoResponse { + let engine = { + let guard = state.index.lock(); + match guard.as_ref() { + Some(idx) if idx.definition.name == name => Arc::clone(&idx.engine), + _ => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": format!("Index '{}' not found", name)})), + ).into_response(); + } + } + }; + + if req.name.trim().is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "name must not be empty"})), + ).into_response(); + } + if req.clauses.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "clauses must not be empty"})), + ).into_response(); + } + + let t0 = Instant::now(); + match engine.register_prefilter(req.name.clone(), req.clauses, req.refresh_interval_secs) { + Ok(entry) => { + let compute_ms = t0.elapsed().as_millis() as u64; + Json(serde_json::json!({ + "name": entry.name, + "cardinality": entry.cardinality(), + "compute_time_ms": compute_ms, + "refresh_interval_secs": entry.refresh_interval_secs(), + })).into_response() + } + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": format!("register_prefilter: {e}")})), + ).into_response(), + } +} + +/// GET /api/indexes/{name}/prefilters — list registered prefilters. +async fn handle_list_prefilters( + State(state): State, + AxumPath(name): AxumPath, +) -> impl IntoResponse { + let engine = { + let guard = state.index.lock(); + match guard.as_ref() { + Some(idx) if idx.definition.name == name => Arc::clone(&idx.engine), + _ => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": format!("Index '{}' not found", name)})), + ).into_response(); + } + } + }; + + let prefilters = engine.prefilters().list(); + Json(serde_json::json!({ "prefilters": prefilters })).into_response() +} + +/// DELETE /api/indexes/{name}/prefilters/{prefilter_name} — unregister. +async fn handle_delete_prefilter( + State(state): State, + AxumPath((name, prefilter_name)): AxumPath<(String, String)>, +) -> impl IntoResponse { + let engine = { + let guard = state.index.lock(); + match guard.as_ref() { + Some(idx) if idx.definition.name == name => Arc::clone(&idx.engine), + _ => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": format!("Index '{}' not found", name)})), + ).into_response(); + } + } + }; + + if engine.unregister_prefilter(&prefilter_name) { + StatusCode::NO_CONTENT.into_response() + } else { + ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": format!("prefilter '{}' not found", prefilter_name)})), + ).into_response() + } +} + +/// POST /api/indexes/{name}/prefilters/{prefilter_name}/refresh — manual refresh. +async fn handle_refresh_prefilter( + State(state): State, + AxumPath((name, prefilter_name)): AxumPath<(String, String)>, +) -> impl IntoResponse { + let engine = { + let guard = state.index.lock(); + match guard.as_ref() { + Some(idx) if idx.definition.name == name => Arc::clone(&idx.engine), + _ => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": format!("Index '{}' not found", name)})), + ).into_response(); + } + } + }; + + let t0 = Instant::now(); + match engine.refresh_prefilter(&prefilter_name) { + Ok(Some(entry)) => Json(serde_json::json!({ + "name": entry.name, + "cardinality": entry.cardinality(), + "compute_time_ms": t0.elapsed().as_millis() as u64, + })).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": format!("prefilter '{}' not found", prefilter_name)})), + ).into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": format!("refresh_prefilter: {e}")})), + ).into_response(), + } +} + // --------------------------------------------------------------------------- // Handlers: Capture (Phase 2 — snapshot capture system) // --------------------------------------------------------------------------- diff --git a/tests/prefilter_integration.rs b/tests/prefilter_integration.rs new file mode 100644 index 0000000..2d2483a --- /dev/null +++ b/tests/prefilter_integration.rs @@ -0,0 +1,206 @@ +//! Integration tests for the prefilter registry. +//! +//! Verifies the end-to-end path: +//! 1. Register a prefilter with a clause subset. +//! 2. Run a query whose clauses are a superset. +//! 3. Confirm substitution fires (trace has `prefilter` set) and results +//! are identical to the same query without the registered prefilter. + +use std::thread; +use std::time::Duration; + +use bitdex_v2::concurrent_engine::ConcurrentEngine; +use bitdex_v2::config::{Config, FilterFieldConfig, SortFieldConfig}; +use bitdex_v2::filter::FilterFieldType; +use bitdex_v2::mutation::{Document, FieldValue}; +use bitdex_v2::query::{BitdexQuery, FilterClause, Value}; + +fn test_config(bitmap_path: &std::path::Path) -> Config { + let mut config = Config { + filter_fields: vec![ + FilterFieldConfig { + name: "nsfwLevel".to_string(), + field_type: FilterFieldType::SingleValue, + behaviors: None, + eviction: None, + eager_load: true, + per_value_lazy: false, + }, + FilterFieldConfig { + name: "onSite".to_string(), + field_type: FilterFieldType::Boolean, + behaviors: None, + eviction: None, + eager_load: true, + per_value_lazy: false, + }, + FilterFieldConfig { + name: "userId".to_string(), + field_type: FilterFieldType::SingleValue, + behaviors: None, + eviction: None, + eager_load: true, + per_value_lazy: false, + }, + ], + sort_fields: vec![SortFieldConfig { + name: "reactionCount".to_string(), + source_type: "uint32".to_string(), + encoding: "linear".to_string(), + bits: 32, + eager_load: false, + computed: None, + }], + max_page_size: 1000, + flush_interval_us: 50, + merge_interval_ms: 100, + channel_capacity: 10_000, + ..Default::default() + }; + config.storage.bitmap_path = Some(bitmap_path.to_path_buf()); + config +} + +fn doc(nsfw: i64, on_site: bool, user_id: i64, react: i64) -> Document { + let mut fields: ahash::AHashMap = ahash::AHashMap::new(); + fields.insert("nsfwLevel".into(), FieldValue::Single(Value::Integer(nsfw))); + fields.insert("onSite".into(), FieldValue::Single(Value::Bool(on_site))); + fields.insert("userId".into(), FieldValue::Single(Value::Integer(user_id))); + fields.insert("reactionCount".into(), FieldValue::Single(Value::Integer(react))); + Document { fields } +} + +fn wait_for_alive(engine: &ConcurrentEngine, expected: u64, max_ms: u64) { + let deadline = std::time::Instant::now() + Duration::from_millis(max_ms); + while std::time::Instant::now() < deadline { + if engine.alive_count() == expected { + thread::sleep(Duration::from_millis(5)); + return; + } + thread::sleep(Duration::from_millis(1)); + } + panic!("timed out waiting for flush (alive={}, expected={})", engine.alive_count(), expected); +} + +fn sorted_ids(q: BitdexQuery, engine: &ConcurrentEngine) -> Vec { + let result = engine.execute_query_traced(&q, "test").unwrap().0; + let mut ids = result.ids; + ids.sort_unstable(); + ids +} + +#[test] +fn prefilter_substitution_preserves_results() { + let dir = tempfile::tempdir().unwrap(); + let engine = ConcurrentEngine::new_with_path( + test_config(&dir.path().join("bitmaps")), + &dir.path().join("docs"), + ).unwrap(); + + // 100 docs: nsfw in [1..5], onSite alternates, userId in [100..110], reactions 100..10000. + for i in 1..=100u32 { + let d = doc( + (i as i64 % 5) + 1, + i % 2 == 0, + 100 + (i as i64 % 11), + i as i64 * 100, + ); + engine.put(i, &d).unwrap(); + } + wait_for_alive(&engine, 100, 2000); + + // Safety-prefix analog: nsfwLevel=1 AND onSite=true. Matches docs where + // (i % 5)+1 == 1 AND i % 2 == 0 -> (i % 5 == 0) AND (i % 2 == 0) + let prefix = vec![ + FilterClause::Eq("nsfwLevel".into(), Value::Integer(1)), + FilterClause::Eq("onSite".into(), Value::Bool(true)), + ]; + + // Baseline: query without any registered prefilter. + let baseline_query = BitdexQuery { + filters: { + let mut f = prefix.clone(); + f.push(FilterClause::Eq("userId".into(), Value::Integer(105))); + f + }, + sort: None, + limit: 100, + cursor: None, + offset: None, + skip_cache: true, + }; + let baseline_ids = sorted_ids(baseline_query.clone(), &engine); + + // Register the prefilter. + let entry = engine.register_prefilter( + "safe".into(), + prefix.clone(), + 300, + ).unwrap(); + assert!(entry.cardinality() > 0, "safe prefilter should match at least one doc"); + + // Run the same query — should substitute and return identical results. + let (_result, trace) = engine.execute_query_traced(&baseline_query, "test").unwrap(); + assert_eq!(trace.prefilter.as_deref(), Some("safe"), "query should have substituted the safe prefilter"); + + let substituted_ids = sorted_ids(baseline_query.clone(), &engine); + assert_eq!(substituted_ids, baseline_ids, "substituted results must match baseline"); + + // A query that does NOT match the full prefilter should not substitute. + let partial_query = BitdexQuery { + filters: vec![ + FilterClause::Eq("nsfwLevel".into(), Value::Integer(1)), + // onSite missing — so prefix isn't fully present. + FilterClause::Eq("userId".into(), Value::Integer(105)), + ], + sort: None, + limit: 100, + cursor: None, + offset: None, + skip_cache: true, + }; + let (_partial_result, partial_trace) = engine.execute_query_traced(&partial_query, "test").unwrap(); + assert!(partial_trace.prefilter.is_none(), "partial match should not substitute"); + + // Unregister and re-run — substitution should no longer fire. + assert!(engine.unregister_prefilter("safe")); + let (_after_result, after_trace) = engine.execute_query_traced(&baseline_query, "test").unwrap(); + assert!(after_trace.prefilter.is_none(), "after unregister, no substitution"); + let after_ids = sorted_ids(baseline_query, &engine); + assert_eq!(after_ids, baseline_ids, "results unchanged after unregister"); +} + +#[test] +fn refresh_updates_cardinality() { + let dir = tempfile::tempdir().unwrap(); + let engine = ConcurrentEngine::new_with_path( + test_config(&dir.path().join("bitmaps")), + &dir.path().join("docs"), + ).unwrap(); + + // Insert 10 docs matching nsfw=1. + for i in 1..=10u32 { + engine.put(i, &doc(1, true, 100, i as i64)).unwrap(); + } + wait_for_alive(&engine, 10, 2000); + + let entry = engine.register_prefilter( + "nsfw1".into(), + vec![FilterClause::Eq("nsfwLevel".into(), Value::Integer(1))], + 300, + ).unwrap(); + assert_eq!(entry.cardinality(), 10); + + // Add 5 more matching docs. + for i in 11..=15u32 { + engine.put(i, &doc(1, true, 100, i as i64)).unwrap(); + } + wait_for_alive(&engine, 15, 2000); + + // Cardinality on the existing entry is stale until refresh. + assert_eq!(entry.cardinality(), 10, "before refresh, cardinality is stale"); + + // Refresh should bring it current. + let refreshed = engine.refresh_prefilter("nsfw1").unwrap().unwrap(); + assert_eq!(refreshed.cardinality(), 15, "after refresh, cardinality reflects new state"); +}