diff --git a/src/concurrent_engine.rs b/src/concurrent_engine.rs index 25e4999..c235bd1 100644 --- a/src/concurrent_engine.rs +++ b/src/concurrent_engine.rs @@ -6870,6 +6870,57 @@ impl ConcurrentEngine { pub fn unregister_prefilter(&self, name: &str) -> bool { self.prefilters.remove(name) } + + /// Spawn the prefilter stale-while-revalidate refresh thread. + /// + /// Ticks every `tick_secs` and, for each registered prefilter past its + /// `refresh_interval_secs`, recomputes the bitmap and atomically swaps it + /// via ArcSwap. In-flight queries holding the old Arc continue reading + /// the old bitmap; the next query after publish sees the new one. + /// + /// Holds a `Weak` so engine drop breaks the cycle. + /// Respects `self.shutdown` for cooperative exit. + /// + /// Call this *after* constructing the engine Arc (e.g. from the server + /// during the post-listen phase). + pub fn start_prefilter_refresh_thread( + self: &Arc, + tick_secs: u64, + ) -> std::thread::JoinHandle<()> { + let weak = Arc::downgrade(self); + let shutdown = Arc::clone(&self.shutdown); + let tick = std::time::Duration::from_secs(tick_secs.max(1)); + std::thread::Builder::new() + .name("bitdex-prefilter-swr".into()) + .spawn(move || { + while !shutdown.load(Ordering::Relaxed) { + let engine = match weak.upgrade() { + Some(e) => e, + None => return, // engine dropped + }; + let now_unix = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0); + for entry in engine.prefilters.entries() { + if shutdown.load(Ordering::Relaxed) { + return; + } + if entry.is_stale(now_unix) { + if let Err(e) = engine.refresh_prefilter(&entry.name) { + tracing::warn!( + "prefilter SWR refresh failed for '{}': {}", + entry.name, e + ); + } + } + } + drop(engine); + std::thread::sleep(tick); + } + }) + .expect("Failed to spawn prefilter SWR thread") + } /// Purge the entire BoundStore: disk first, then memory. /// Order matters: wipe disk before clearing RAM to prevent stale shard loads. /// Safe to call while the server is running — the merge thread will simply diff --git a/src/server.rs b/src/server.rs index a889c48..6d4c7ab 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1498,6 +1498,12 @@ impl BitdexServer { state.metrics.boot_phase_seconds .with_label_values(&["bound_cache"]) .set(phase6_elapsed.as_secs() as i64); + + // Phase 7: prefilter stale-while-revalidate refresh thread. + // Ticks every 10s; refreshes entries past their interval via + // ArcSwap so in-flight queries never block on compute. + let _handle = engine.start_prefilter_refresh_thread(10); + eprintln!(" Boot phase: prefilter SWR thread spawned"); } } diff --git a/tests/prefilter_integration.rs b/tests/prefilter_integration.rs index 28baaaf..415069b 100644 --- a/tests/prefilter_integration.rs +++ b/tests/prefilter_integration.rs @@ -224,6 +224,66 @@ fn deleted_docs_excluded_from_stale_prefilter() { assert_eq!(ids, vec![1, 2, 4, 6, 8, 9, 10], "deleted slots 3/5/7 must not leak through"); } +#[test] +fn swr_thread_refreshes_stale_prefilters() { + // Spawn the SWR thread with a 1s tick and a prefilter whose refresh + // interval clamps to the 30s minimum — that's too long for a fast + // test, so drive staleness manually by rewinding last_refreshed. + // + // Flow: + // 1. Register prefilter covering 10 initial docs. + // 2. Insert 5 more matching docs (cached cardinality stays at 10). + // 3. Rewind last_refreshed so is_stale() returns true. + // 4. SWR thread picks it up on its next tick → cardinality flips to 15. + use std::sync::Arc; + use std::sync::atomic::Ordering; + use std::time::Duration; + + let dir = tempfile::tempdir().unwrap(); + let engine = Arc::new(ConcurrentEngine::new_with_path( + test_config(&dir.path().join("bitmaps")), + &dir.path().join("docs"), + ).unwrap()); + + for i in 1..=10u32 { + engine.put(i, &doc(1, true, 100, i as i64)).unwrap(); + } + wait_for_alive(&engine, 10, 2000); + + let entry = engine.register_prefilter( + "nsfw1".into(), + vec![FilterClause::Eq("nsfwLevel".into(), Value::Integer(1))], + 30, // clamped to MIN_REFRESH_INTERVAL_SECS + ).unwrap(); + assert_eq!(entry.cardinality(), 10); + + // Boot SWR with a 1-second tick. + let _handle = engine.start_prefilter_refresh_thread(1); + + // Add 5 more matching docs. + for i in 11..=15u32 { + engine.put(i, &doc(1, true, 100, i as i64)).unwrap(); + } + wait_for_alive(&engine, 15, 2000); + + // Cardinality still reflects the pre-refresh snapshot. + assert_eq!(entry.cardinality(), 10, "still stale before rewind"); + + // Force the entry past its refresh interval: rewind last_refreshed to + // the epoch. SWR thread should see this on its next tick. + entry.last_refreshed.store(0, Ordering::Relaxed); + + // Wait up to 5s for the SWR thread to pick it up and refresh. + let deadline = std::time::Instant::now() + Duration::from_secs(5); + while std::time::Instant::now() < deadline { + if entry.cardinality() == 15 { + break; + } + std::thread::sleep(Duration::from_millis(50)); + } + assert_eq!(entry.cardinality(), 15, "SWR thread did not refresh within 5s"); +} + #[test] fn refresh_updates_cardinality() { let dir = tempfile::tempdir().unwrap();