We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent d0cb3a7 commit a28dcdcCopy full SHA for a28dcdc
src/dataflow/src/source/mod.rs
@@ -1259,6 +1259,8 @@ where
1259
timestamp_histories.downgrade(cap, &partition_cursors);
1260
bindings_cap.downgrade(cap.time());
1261
source_metrics.capability.set(*cap.time());
1262
+ // Downgrade compaction frontier to track the current time.
1263
+ timestamp_histories.set_compaction_frontier(Antichain::from_elem(*cap.time()).borrow());
1264
1265
let (source_status, processing_status) = source_state;
1266
// Schedule our next activation
0 commit comments