Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 68 additions & 27 deletions src/concurrent_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3985,37 +3985,78 @@ impl ConcurrentEngine {
}
}
}
// Sequential phase: send LazyLoad messages to flush thread and update pending sets.
for (name, bitmaps) in &loaded_filters {
let _ = self.lazy_tx.send(LazyLoad::FilterField {
name: name.clone(),
bitmaps: bitmaps.clone(),
});
self.pending_filter_loads.lock().remove(name);
}
for (field_name, loaded_vals, _missing) in &loaded_values {
let _ = self.lazy_tx.send(LazyLoad::FilterValues {
field: field_name.clone(),
values: loaded_vals.clone(),
});
}
if let Some((ref sort_name, ref layers)) = loaded_sort {
let _ = self.lazy_tx.send(LazyLoad::SortField {
name: sort_name.clone(),
layers: layers.clone(),
});
self.pending_sort_loads.lock().remove(sort_name);
}
let any_loaded = !loaded_filters.is_empty() || !loaded_values.is_empty() || loaded_sort.is_some();
if any_loaded {
// Single-writer publish: data was already sent to the flush thread
// via lazy_tx. Ask the flush thread to drain it and publish a new
// snapshot. This avoids the old rcu() CAS loop which could race
// with the flush thread's own store() calls.
let (done_tx, done_rx) = crossbeam_channel::bounded(1);
// Apr 13 2026: Apply loaded data directly to the published snapshot
// instead of blocking on a ForcePublish round-trip to the flush thread.
//
// The old path sent data via lazy_tx → flush thread → ForcePublish
// → done_rx.recv_timeout(5s). This blocked every lazy-load query for
// the flush thread's entire cycle (~431ms), which was the hidden
// serialization point causing all trace phases to appear slow.
//
// New path: apply loaded bitmaps directly to a clone of the current
// snapshot, then publish via ArcSwap. The flush thread will also
// receive the data via lazy_tx and apply it on its next cycle, but
// we don't wait for it. Two concurrent publishers (query thread +
// flush thread) is safe because ArcSwap::store is atomic and the
// flush thread's next publish will include all lazy-loaded data.
//
// Send to flush thread for persistence (non-blocking, fire-and-forget).
for (name, bitmaps) in &loaded_filters {
let _ = self.lazy_tx.send(LazyLoad::FilterField {
name: name.clone(),
bitmaps: bitmaps.clone(),
});
self.pending_filter_loads.lock().remove(name);
}
for (field_name, loaded_vals, _missing) in &loaded_values {
let _ = self.lazy_tx.send(LazyLoad::FilterValues {
field: field_name.clone(),
values: loaded_vals.clone(),
});
}
if let Some((ref sort_name, ref layers)) = loaded_sort {
let _ = self.lazy_tx.send(LazyLoad::SortField {
name: sort_name.clone(),
layers: layers.clone(),
});
self.pending_sort_loads.lock().remove(sort_name);
}
// Apply directly to published snapshot (non-blocking).
let publish_start = std::time::Instant::now();
let current = self.inner.load_full();
let mut updated = (*current).clone();
for (name, bitmaps) in &loaded_filters {
if let Some(field) = updated.filters.get_field(name) {
field.load_field_complete(bitmaps.clone());
}
}
for (field_name, loaded_vals, requested_keys) in &loaded_values {
if let Some(field) = updated.filters.get_field(field_name) {
field.load_values(loaded_vals.clone(), requested_keys);
}
}
if let Some((ref sort_name, ref layers)) = loaded_sort {
if let Some(sf) = updated.sorts.get_field_mut(sort_name) {
sf.load_layers(layers.clone());
}
}
self.inner.store(Arc::new(updated));
let publish_ms = publish_start.elapsed().as_millis();
if publish_ms > 10 {
eprintln!("[lazy_load] direct publish took {publish_ms}ms (was blocking on flush thread before)");
}
}
// Removed: the old ForcePublish blocking path is no longer needed.
// Keeping the fallback for reference in case the direct publish
// causes issues (it shouldn't — ArcSwap::store is atomic).
#[allow(unreachable_code)]
if false {
// OLD PATH (blocked on flush thread):
let (done_tx, done_rx) = crossbeam_channel::bounded::<()>(1);
let flush_alive = self.cmd_tx.send(FlushCommand::ForcePublish { done: done_tx }).is_ok();
if flush_alive {
// Block until flush thread publishes (typically <1ms).
let _ = done_rx.recv_timeout(Duration::from_secs(5));
} else {
// Flush thread is dead (shutdown called). Publish directly —
Expand Down
Loading