Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/concurrent_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6870,6 +6870,57 @@ impl ConcurrentEngine {
pub fn unregister_prefilter(&self, name: &str) -> bool {
self.prefilters.remove(name)
}

/// Spawn the prefilter stale-while-revalidate refresh thread.
///
/// Ticks every `tick_secs` and, for each registered prefilter past its
/// `refresh_interval_secs`, recomputes the bitmap and atomically swaps it
/// via ArcSwap. In-flight queries holding the old Arc continue reading
/// the old bitmap; the next query after publish sees the new one.
///
/// Holds a `Weak<ConcurrentEngine>` so engine drop breaks the cycle.
/// Respects `self.shutdown` for cooperative exit.
///
/// Call this *after* constructing the engine Arc (e.g. from the server
/// during the post-listen phase).
pub fn start_prefilter_refresh_thread(
self: &Arc<Self>,
tick_secs: u64,
) -> std::thread::JoinHandle<()> {
let weak = Arc::downgrade(self);
let shutdown = Arc::clone(&self.shutdown);
let tick = std::time::Duration::from_secs(tick_secs.max(1));
std::thread::Builder::new()
.name("bitdex-prefilter-swr".into())
.spawn(move || {
while !shutdown.load(Ordering::Relaxed) {
let engine = match weak.upgrade() {
Some(e) => e,
None => return, // engine dropped
};
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
for entry in engine.prefilters.entries() {
if shutdown.load(Ordering::Relaxed) {
return;
}
if entry.is_stale(now_unix) {
if let Err(e) = engine.refresh_prefilter(&entry.name) {
tracing::warn!(
"prefilter SWR refresh failed for '{}': {}",
entry.name, e
);
}
}
}
drop(engine);
std::thread::sleep(tick);
}
})
.expect("Failed to spawn prefilter SWR thread")
}
/// Purge the entire BoundStore: disk first, then memory.
/// Order matters: wipe disk before clearing RAM to prevent stale shard loads.
/// Safe to call while the server is running — the merge thread will simply
Expand Down
6 changes: 6 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,12 @@ impl BitdexServer {
state.metrics.boot_phase_seconds
.with_label_values(&["bound_cache"])
.set(phase6_elapsed.as_secs() as i64);

// Phase 7: prefilter stale-while-revalidate refresh thread.
// Ticks every 10s; refreshes entries past their interval via
// ArcSwap so in-flight queries never block on compute.
let _handle = engine.start_prefilter_refresh_thread(10);
eprintln!(" Boot phase: prefilter SWR thread spawned");
}
}

Expand Down
60 changes: 60 additions & 0 deletions tests/prefilter_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,66 @@ fn deleted_docs_excluded_from_stale_prefilter() {
assert_eq!(ids, vec![1, 2, 4, 6, 8, 9, 10], "deleted slots 3/5/7 must not leak through");
}

#[test]
fn swr_thread_refreshes_stale_prefilters() {
// Spawn the SWR thread with a 1s tick and a prefilter whose refresh
// interval clamps to the 30s minimum — that's too long for a fast
// test, so drive staleness manually by rewinding last_refreshed.
//
// Flow:
// 1. Register prefilter covering 10 initial docs.
// 2. Insert 5 more matching docs (cached cardinality stays at 10).
// 3. Rewind last_refreshed so is_stale() returns true.
// 4. SWR thread picks it up on its next tick → cardinality flips to 15.
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;

let dir = tempfile::tempdir().unwrap();
let engine = Arc::new(ConcurrentEngine::new_with_path(
test_config(&dir.path().join("bitmaps")),
&dir.path().join("docs"),
).unwrap());

for i in 1..=10u32 {
engine.put(i, &doc(1, true, 100, i as i64)).unwrap();
}
wait_for_alive(&engine, 10, 2000);

let entry = engine.register_prefilter(
"nsfw1".into(),
vec![FilterClause::Eq("nsfwLevel".into(), Value::Integer(1))],
30, // clamped to MIN_REFRESH_INTERVAL_SECS
).unwrap();
assert_eq!(entry.cardinality(), 10);

// Boot SWR with a 1-second tick.
let _handle = engine.start_prefilter_refresh_thread(1);

// Add 5 more matching docs.
for i in 11..=15u32 {
engine.put(i, &doc(1, true, 100, i as i64)).unwrap();
}
wait_for_alive(&engine, 15, 2000);

// Cardinality still reflects the pre-refresh snapshot.
assert_eq!(entry.cardinality(), 10, "still stale before rewind");

// Force the entry past its refresh interval: rewind last_refreshed to
// the epoch. SWR thread should see this on its next tick.
entry.last_refreshed.store(0, Ordering::Relaxed);

// Wait up to 5s for the SWR thread to pick it up and refresh.
let deadline = std::time::Instant::now() + Duration::from_secs(5);
while std::time::Instant::now() < deadline {
if entry.cardinality() == 15 {
break;
}
std::thread::sleep(Duration::from_millis(50));
}
assert_eq!(entry.cardinality(), 15, "SWR thread did not refresh within 5s");
}

#[test]
fn refresh_updates_cardinality() {
let dir = tempfile::tempdir().unwrap();
Expand Down
Loading