Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 50 additions & 27 deletions src/concurrent_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3985,37 +3985,60 @@ impl ConcurrentEngine {
}
}
}
// Sequential phase: send LazyLoad messages to flush thread and update pending sets.
for (name, bitmaps) in &loaded_filters {
let _ = self.lazy_tx.send(LazyLoad::FilterField {
name: name.clone(),
bitmaps: bitmaps.clone(),
});
self.pending_filter_loads.lock().remove(name);
}
for (field_name, loaded_vals, _missing) in &loaded_values {
let _ = self.lazy_tx.send(LazyLoad::FilterValues {
field: field_name.clone(),
values: loaded_vals.clone(),
});
// Apr 13 2026: Apply filter data directly to the published snapshot.
//
// FilterField::load_values/load_field_complete take &self (internal RwLock),
// so we can call them on the current snapshot's fields without cloning the
// engine or blocking on the flush thread. The loaded bitmaps become visible
// to all readers immediately through the shared Arc<FilterField>.
//
// Sort fields need &mut self (load_layers), so we still use ForcePublish
// for sort-only loads. But filter loads (the overwhelmingly common case
// for per_value_lazy fields like postId, modelVersionIds) skip it entirely.
//
// This eliminates the ~431ms ForcePublish block on filter lazy loads.
let has_filter_loads = !loaded_filters.is_empty() || !loaded_values.is_empty();
let has_sort_load = loaded_sort.is_some();
if has_filter_loads {
// Direct apply to published snapshot's fields — no engine clone, no ForcePublish.
let snap = self.inner.load();
for (name, bitmaps) in &loaded_filters {
if let Some(field) = snap.filters.get_field(name) {
field.load_field_complete(bitmaps.clone());
}
self.pending_filter_loads.lock().remove(name);
}
for (field_name, loaded_vals, requested_keys) in &loaded_values {
if let Some(field) = snap.filters.get_field(field_name) {
field.load_values(loaded_vals.clone(), requested_keys);
}
}
// Also send to flush thread for its staging copy (fire-and-forget).
for (name, bitmaps) in &loaded_filters {
let _ = self.lazy_tx.send(LazyLoad::FilterField {
name: name.clone(),
bitmaps: bitmaps.clone(),
});
}
for (field_name, loaded_vals, _missing) in &loaded_values {
let _ = self.lazy_tx.send(LazyLoad::FilterValues {
field: field_name.clone(),
values: loaded_vals.clone(),
});
}
}
if let Some((ref sort_name, ref layers)) = loaded_sort {
let _ = self.lazy_tx.send(LazyLoad::SortField {
name: sort_name.clone(),
layers: layers.clone(),
});
self.pending_sort_loads.lock().remove(sort_name);
}
let any_loaded = !loaded_filters.is_empty() || !loaded_values.is_empty() || loaded_sort.is_some();
if any_loaded {
// Single-writer publish: data was already sent to the flush thread
// via lazy_tx. Ask the flush thread to drain it and publish a new
// snapshot. This avoids the old rcu() CAS loop which could race
// with the flush thread's own store() calls.
// Sort loads still need ForcePublish (load_layers requires &mut self).
if has_sort_load {
if let Some((ref sort_name, ref layers)) = loaded_sort {
let _ = self.lazy_tx.send(LazyLoad::SortField {
name: sort_name.clone(),
layers: layers.clone(),
});
self.pending_sort_loads.lock().remove(sort_name);
}
let (done_tx, done_rx) = crossbeam_channel::bounded(1);
let flush_alive = self.cmd_tx.send(FlushCommand::ForcePublish { done: done_tx }).is_ok();
if flush_alive {
// Block until flush thread publishes (typically <1ms).
let _ = done_rx.recv_timeout(Duration::from_secs(5));
} else {
// Flush thread is dead (shutdown called). Publish directly —
Expand Down
Loading