diff --git a/src/concurrent_engine.rs b/src/concurrent_engine.rs index 0ad31e4..9113107 100644 --- a/src/concurrent_engine.rs +++ b/src/concurrent_engine.rs @@ -3985,37 +3985,78 @@ impl ConcurrentEngine { } } } - // Sequential phase: send LazyLoad messages to flush thread and update pending sets. - for (name, bitmaps) in &loaded_filters { - let _ = self.lazy_tx.send(LazyLoad::FilterField { - name: name.clone(), - bitmaps: bitmaps.clone(), - }); - self.pending_filter_loads.lock().remove(name); - } - for (field_name, loaded_vals, _missing) in &loaded_values { - let _ = self.lazy_tx.send(LazyLoad::FilterValues { - field: field_name.clone(), - values: loaded_vals.clone(), - }); - } - if let Some((ref sort_name, ref layers)) = loaded_sort { - let _ = self.lazy_tx.send(LazyLoad::SortField { - name: sort_name.clone(), - layers: layers.clone(), - }); - self.pending_sort_loads.lock().remove(sort_name); - } let any_loaded = !loaded_filters.is_empty() || !loaded_values.is_empty() || loaded_sort.is_some(); if any_loaded { - // Single-writer publish: data was already sent to the flush thread - // via lazy_tx. Ask the flush thread to drain it and publish a new - // snapshot. This avoids the old rcu() CAS loop which could race - // with the flush thread's own store() calls. - let (done_tx, done_rx) = crossbeam_channel::bounded(1); + // Apr 13 2026: Apply loaded data directly to the published snapshot + // instead of blocking on a ForcePublish round-trip to the flush thread. + // + // The old path sent data via lazy_tx → flush thread → ForcePublish + // → done_rx.recv_timeout(5s). This blocked every lazy-load query for + // the flush thread's entire cycle (~431ms), which was the hidden + // serialization point causing all trace phases to appear slow. + // + // New path: apply loaded bitmaps directly to a clone of the current + // snapshot, then publish via ArcSwap. The flush thread will also + // receive the data via lazy_tx and apply it on its next cycle, but + // we don't wait for it. Two concurrent publishers (query thread + + // flush thread) is safe because ArcSwap::store is atomic and the + // flush thread's next publish will include all lazy-loaded data. + // + // Send to flush thread for persistence (non-blocking, fire-and-forget). + for (name, bitmaps) in &loaded_filters { + let _ = self.lazy_tx.send(LazyLoad::FilterField { + name: name.clone(), + bitmaps: bitmaps.clone(), + }); + self.pending_filter_loads.lock().remove(name); + } + for (field_name, loaded_vals, _missing) in &loaded_values { + let _ = self.lazy_tx.send(LazyLoad::FilterValues { + field: field_name.clone(), + values: loaded_vals.clone(), + }); + } + if let Some((ref sort_name, ref layers)) = loaded_sort { + let _ = self.lazy_tx.send(LazyLoad::SortField { + name: sort_name.clone(), + layers: layers.clone(), + }); + self.pending_sort_loads.lock().remove(sort_name); + } + // Apply directly to published snapshot (non-blocking). + let publish_start = std::time::Instant::now(); + let current = self.inner.load_full(); + let mut updated = (*current).clone(); + for (name, bitmaps) in &loaded_filters { + if let Some(field) = updated.filters.get_field(name) { + field.load_field_complete(bitmaps.clone()); + } + } + for (field_name, loaded_vals, requested_keys) in &loaded_values { + if let Some(field) = updated.filters.get_field(field_name) { + field.load_values(loaded_vals.clone(), requested_keys); + } + } + if let Some((ref sort_name, ref layers)) = loaded_sort { + if let Some(sf) = updated.sorts.get_field_mut(sort_name) { + sf.load_layers(layers.clone()); + } + } + self.inner.store(Arc::new(updated)); + let publish_ms = publish_start.elapsed().as_millis(); + if publish_ms > 10 { + eprintln!("[lazy_load] direct publish took {publish_ms}ms (was blocking on flush thread before)"); + } + } + // Removed: the old ForcePublish blocking path is no longer needed. + // Keeping the fallback for reference in case the direct publish + // causes issues (it shouldn't — ArcSwap::store is atomic). + #[allow(unreachable_code)] + if false { + // OLD PATH (blocked on flush thread): + let (done_tx, done_rx) = crossbeam_channel::bounded::<()>(1); let flush_alive = self.cmd_tx.send(FlushCommand::ForcePublish { done: done_tx }).is_ok(); if flush_alive { - // Block until flush thread publishes (typically <1ms). let _ = done_rx.recv_timeout(Duration::from_secs(5)); } else { // Flush thread is dead (shutdown called). Publish directly —