From 0777b5e8054cbf09bed763143521a69a4bf458bc Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Mon, 13 Apr 2026 00:50:29 -0600 Subject: [PATCH] =?UTF-8?q?perf(p99):=20skip=20ForcePublish=20for=20filter?= =?UTF-8?q?=20lazy=20loads=20=E2=80=94=20direct=20apply=20via=20internal?= =?UTF-8?q?=20RwLock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previous attempt (PR #197, reverted) tried to clone + publish the entire InnerEngine from the query thread, which was O(all fields) and caused a 1.85s flush regression. New approach: FilterField::load_values() and load_field_complete() take &self and use internal RwLock — they can be called directly on the published snapshot's fields without cloning the engine or publishing. Loaded bitmaps become visible to all readers immediately through the shared Arc. Key changes: - Filter loads (load_field_complete, load_values): apply directly to the current snapshot, skip ForcePublish entirely. Fire-and-forget send to flush thread for its staging copy. - Sort loads (load_layers needs &mut self): still use ForcePublish round-trip to flush thread. This is safe because: - FilterField::bitmaps is RwLock — internal mutation is sound - The flush thread also applies the same data from lazy_tx (idempotent) - No engine clone, no ArcSwap::store race Expected: filter lazy_load_us drops from 1-5s (ForcePublish block) to 10-100ms (actual disk read time). Sort lazy loads still block but are much rarer than filter loads. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/concurrent_engine.rs | 77 ++++++++++++++++++++++++++-------------- 1 file changed, 50 insertions(+), 27 deletions(-) diff --git a/src/concurrent_engine.rs b/src/concurrent_engine.rs index 0ad31e4..160982b 100644 --- a/src/concurrent_engine.rs +++ b/src/concurrent_engine.rs @@ -3985,37 +3985,60 @@ impl ConcurrentEngine { } } } - // Sequential phase: send LazyLoad messages to flush thread and update pending sets. - for (name, bitmaps) in &loaded_filters { - let _ = self.lazy_tx.send(LazyLoad::FilterField { - name: name.clone(), - bitmaps: bitmaps.clone(), - }); - self.pending_filter_loads.lock().remove(name); - } - for (field_name, loaded_vals, _missing) in &loaded_values { - let _ = self.lazy_tx.send(LazyLoad::FilterValues { - field: field_name.clone(), - values: loaded_vals.clone(), - }); + // Apr 13 2026: Apply filter data directly to the published snapshot. + // + // FilterField::load_values/load_field_complete take &self (internal RwLock), + // so we can call them on the current snapshot's fields without cloning the + // engine or blocking on the flush thread. The loaded bitmaps become visible + // to all readers immediately through the shared Arc. + // + // Sort fields need &mut self (load_layers), so we still use ForcePublish + // for sort-only loads. But filter loads (the overwhelmingly common case + // for per_value_lazy fields like postId, modelVersionIds) skip it entirely. + // + // This eliminates the ~431ms ForcePublish block on filter lazy loads. + let has_filter_loads = !loaded_filters.is_empty() || !loaded_values.is_empty(); + let has_sort_load = loaded_sort.is_some(); + if has_filter_loads { + // Direct apply to published snapshot's fields — no engine clone, no ForcePublish. + let snap = self.inner.load(); + for (name, bitmaps) in &loaded_filters { + if let Some(field) = snap.filters.get_field(name) { + field.load_field_complete(bitmaps.clone()); + } + self.pending_filter_loads.lock().remove(name); + } + for (field_name, loaded_vals, requested_keys) in &loaded_values { + if let Some(field) = snap.filters.get_field(field_name) { + field.load_values(loaded_vals.clone(), requested_keys); + } + } + // Also send to flush thread for its staging copy (fire-and-forget). + for (name, bitmaps) in &loaded_filters { + let _ = self.lazy_tx.send(LazyLoad::FilterField { + name: name.clone(), + bitmaps: bitmaps.clone(), + }); + } + for (field_name, loaded_vals, _missing) in &loaded_values { + let _ = self.lazy_tx.send(LazyLoad::FilterValues { + field: field_name.clone(), + values: loaded_vals.clone(), + }); + } } - if let Some((ref sort_name, ref layers)) = loaded_sort { - let _ = self.lazy_tx.send(LazyLoad::SortField { - name: sort_name.clone(), - layers: layers.clone(), - }); - self.pending_sort_loads.lock().remove(sort_name); - } - let any_loaded = !loaded_filters.is_empty() || !loaded_values.is_empty() || loaded_sort.is_some(); - if any_loaded { - // Single-writer publish: data was already sent to the flush thread - // via lazy_tx. Ask the flush thread to drain it and publish a new - // snapshot. This avoids the old rcu() CAS loop which could race - // with the flush thread's own store() calls. + // Sort loads still need ForcePublish (load_layers requires &mut self). + if has_sort_load { + if let Some((ref sort_name, ref layers)) = loaded_sort { + let _ = self.lazy_tx.send(LazyLoad::SortField { + name: sort_name.clone(), + layers: layers.clone(), + }); + self.pending_sort_loads.lock().remove(sort_name); + } let (done_tx, done_rx) = crossbeam_channel::bounded(1); let flush_alive = self.cmd_tx.send(FlushCommand::ForcePublish { done: done_tx }).is_ok(); if flush_alive { - // Block until flush thread publishes (typically <1ms). let _ = done_rx.recv_timeout(Duration::from_secs(5)); } else { // Flush thread is dead (shutdown called). Publish directly —