Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions src/timely-util/src/reclock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ where
// maintenance implemented in this operator becomes problematic.
let mut remap_upper = Antichain::from_elem(IntoTime::minimum());
let mut remap_since = as_of.clone();
let mut remap_trace = Vec::new();
let mut remap_trace = VecDeque::new();

// A stash of source updates for which we don't know the corresponding binding yet.
let mut deferred_source_updates: Vec<ChainBatch<_, _, _>> = Vec::new();
Expand Down Expand Up @@ -296,7 +296,7 @@ where
while let Some(update) = pending_remap.peek_mut() {
if !remap_upper.less_equal(&update.0.0) {
let Reverse((into, from, diff)) = PeekMut::pop(update);
remap_trace.push((from, into, diff));
remap_trace.push_back((from, into, diff));
} else {
break;
}
Expand Down Expand Up @@ -426,21 +426,40 @@ where
}
}

// STEP 5. Downgrade capability and compact remap trace
capset.downgrade(&reclocked_source_frontier.borrow());
remap_since = reclocked_source_frontier;
for (_, t, _) in remap_trace.iter_mut() {
t.advance_by(remap_since.borrow());
}
consolidation::consolidate_updates(&mut remap_trace);
remap_trace
.sort_unstable_by(|(_, t1, _): &(_, IntoTime, _), (_, t2, _)| t1.cmp(t2));

// If using less than a quarter of the capacity, shrink the container. To avoid having
// to resize the container on a subsequent push, shrink to 2x the length, which is
// what push would grow it to.
if remap_trace.len() < remap_trace.capacity() / 4 {
remap_trace.shrink_to(remap_trace.len() * 2);
// STEP 5. Downgrade capability and compact remap trace if our since frontier
// advanced
if PartialOrder::less_than(&remap_since, &reclocked_source_frontier) {
capset.downgrade(&reclocked_source_frontier.borrow());
remap_since = reclocked_source_frontier;

// The remap trace is stored in time order and T is a total order. The updates
// that can have their time advanced will form a prefix, which we extract here.
let mut advanced = vec![];
while !remap_trace.is_empty() && !remap_since.less_equal(&remap_trace[0].1) {
let (d, mut t, r) = remap_trace.pop_front().unwrap();
t.advance_by(remap_since.borrow());
advanced.push((d, t, r));
}
if !advanced.is_empty() {
// If we have updates whose time was advanced, further peel the prefix of
// updates that sit *at* the time of the since frontier.
while !remap_trace.is_empty() && remap_since.contains(&remap_trace[0].1) {
advanced.push(remap_trace.pop_front().unwrap());
}
consolidation::consolidate_updates(&mut advanced);
advanced.sort_unstable_by(|(_, t1, _): &(_, IntoTime, _), (_, t2, _)| {
t1.cmp(t2)
});
for u in advanced.into_iter().rev() {
remap_trace.push_front(u);
}
// If using less than a quarter of the capacity, shrink the container. To avoid having
// to resize the container on a subsequent push, shrink to 2x the length, which is
// what push would grow it to.
if remap_trace.len() < remap_trace.capacity() / 4 {
remap_trace.shrink_to(remap_trace.len() * 2);
}
}
}
}

Expand Down