From 1ae59734c62aa10c98bca70249c5d127bb591d18 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Mon, 24 Nov 2025 22:33:48 +0200 Subject: [PATCH] timely-util: specialize reclock compaction to total orders Since the reclock operator stores the reclock bindings in time order and we restrict `IntoTime` to be a total order we can perform compaction in a much cheaper way and only when necessary. Signed-off-by: Petros Angelatos --- src/timely-util/src/reclock.rs | 53 +++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/src/timely-util/src/reclock.rs b/src/timely-util/src/reclock.rs index 7543335ad10cb..e8cfbc68fe26c 100644 --- a/src/timely-util/src/reclock.rs +++ b/src/timely-util/src/reclock.rs @@ -242,7 +242,7 @@ where // maintenance implemented in this operator becomes problematic. let mut remap_upper = Antichain::from_elem(IntoTime::minimum()); let mut remap_since = as_of.clone(); - let mut remap_trace = Vec::new(); + let mut remap_trace = VecDeque::new(); // A stash of source updates for which we don't know the corresponding binding yet. let mut deferred_source_updates: Vec> = Vec::new(); @@ -296,7 +296,7 @@ where while let Some(update) = pending_remap.peek_mut() { if !remap_upper.less_equal(&update.0.0) { let Reverse((into, from, diff)) = PeekMut::pop(update); - remap_trace.push((from, into, diff)); + remap_trace.push_back((from, into, diff)); } else { break; } @@ -426,21 +426,40 @@ where } } - // STEP 5. Downgrade capability and compact remap trace - capset.downgrade(&reclocked_source_frontier.borrow()); - remap_since = reclocked_source_frontier; - for (_, t, _) in remap_trace.iter_mut() { - t.advance_by(remap_since.borrow()); - } - consolidation::consolidate_updates(&mut remap_trace); - remap_trace - .sort_unstable_by(|(_, t1, _): &(_, IntoTime, _), (_, t2, _)| t1.cmp(t2)); - - // If using less than a quarter of the capacity, shrink the container. To avoid having - // to resize the container on a subsequent push, shrink to 2x the length, which is - // what push would grow it to. - if remap_trace.len() < remap_trace.capacity() / 4 { - remap_trace.shrink_to(remap_trace.len() * 2); + // STEP 5. Downgrade capability and compact remap trace if our since frontier + // advanced + if PartialOrder::less_than(&remap_since, &reclocked_source_frontier) { + capset.downgrade(&reclocked_source_frontier.borrow()); + remap_since = reclocked_source_frontier; + + // The remap trace is stored in time order and T is a total order. The updates + // that can have their time advanced will form a prefix, which we extract here. + let mut advanced = vec![]; + while !remap_trace.is_empty() && !remap_since.less_equal(&remap_trace[0].1) { + let (d, mut t, r) = remap_trace.pop_front().unwrap(); + t.advance_by(remap_since.borrow()); + advanced.push((d, t, r)); + } + if !advanced.is_empty() { + // If we have updates whose time was advanced, further peel the prefix of + // updates that sit *at* the time of the since frontier. + while !remap_trace.is_empty() && remap_since.contains(&remap_trace[0].1) { + advanced.push(remap_trace.pop_front().unwrap()); + } + consolidation::consolidate_updates(&mut advanced); + advanced.sort_unstable_by(|(_, t1, _): &(_, IntoTime, _), (_, t2, _)| { + t1.cmp(t2) + }); + for u in advanced.into_iter().rev() { + remap_trace.push_front(u); + } + // If using less than a quarter of the capacity, shrink the container. To avoid having + // to resize the container on a subsequent push, shrink to 2x the length, which is + // what push would grow it to. + if remap_trace.len() < remap_trace.capacity() / 4 { + remap_trace.shrink_to(remap_trace.len() * 2); + } + } } }