test(sync): add delta buffering simulation tests for Invariant I6#1962
test(sync): add delta buffering simulation tests for Invariant I6#1962
Conversation
Add comprehensive simulation tests verifying delta buffering behavior during state-based sync (Invariant I6: deltas arriving during sync MUST be preserved and applied after sync completes). Changes: - Enhanced DeltaBuffer with FIFO eviction (oldest-first drop policy) - Added drops counter and configurable capacity - Integrated cancel_sync_session for proper error handling - Added 7 simulation test scenarios covering: - Deltas buffered during active sync - Buffered deltas replayed on completion - Immediate application when idle - Buffer cleared on crash - Multiple deltas preserved in FIFO order - Convergence blocked while buffer non-empty - Complex snapshot sync with concurrent writes All 176 sync_sim tests pass.
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 89% | Review time: 219.1s
🟡 2 warnings, 💡 2 suggestions, 📝 2 nitpicks. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-08ef8135
| peer: NodeId::new("peer"), | ||
| page: 0, | ||
| }; | ||
| } |
There was a problem hiding this comment.
🟡 Replayed buffered deltas not counted in metrics
When deltas are buffered during sync, no write metrics are recorded; when replayed in SyncComplete, record_write() is also not called, meaning buffered deltas are never counted in work metrics.
Suggested fix:
Add `self.metrics.work.record_write();` inside the inner loop after `n.apply_storage_op(op);`
| %context_id, | ||
| drops, | ||
| buffered_count, | ||
| "Sync session ended with {} dropped deltas (I6 partial violation)", |
There was a problem hiding this comment.
🟡 Missing required justification for #[allow(dead_code)]
AGENTS.md requires a comment explaining why dead code is allowed (e.g., FFI, test fixtures, future observability).
Suggested fix:
Add a comment above `#[allow(dead_code)]` explaining this is reserved for future metrics/observability integration, or remove the method if not needed (YAGNI).
| buffered_count: self.deltas.len(), | ||
| }); | ||
| /// If the buffer is full, the oldest delta is evicted (oldest-first policy) | ||
| /// and the `drops` counter is incremented. This ensures we never reject |
There was a problem hiding this comment.
💡 Edge case: capacity=0 behaves as capacity=1
When capacity is 0, the check len >= capacity passes on an empty buffer, but pop_front() returns None, so the buffer grows to size 1 and then evicts on each subsequent push, effectively acting as capacity=1.
Suggested fix:
Either document this behavior or add a guard: `if self.capacity == 0 { self.drops += 1; return false; }`
| /// Create a new delta buffer with specified capacity. | ||
| #[must_use] | ||
| pub fn new(max_size: usize, sync_start_hlc: u64) -> Self { | ||
| pub fn new(capacity: usize, sync_start_hlc: u64) -> Self { |
There was a problem hiding this comment.
💡 Initial capacity capped at 1000 despite configurable capacity up to 10,000
The with_capacity(capacity.min(1000)) limits pre-allocation to 1000 even when DEFAULT_BUFFER_CAPACITY is 10,000, causing unnecessary reallocations under normal load.
Suggested fix:
Consider `with_capacity(capacity.min(DEFAULT_BUFFER_CAPACITY))` or document why 1000 is the intentional pre-allocation limit.
| } | ||
|
|
||
| /// Drain all buffered operations for replay. | ||
| /// |
There was a problem hiding this comment.
📝 Nit: drain_buffered_operations does not drain delta_buffer
Method name suggests draining all buffered state, but only buffered_operations is cleared; delta_buffer requires a separate clear_buffer() call, which could lead to inconsistent state if forgotten.
Suggested fix:
Consider clearing `delta_buffer` inside this method or renaming to `take_buffered_operations`
| self.buffered_operations.insert(delta_id, operations); | ||
| } | ||
|
|
||
| /// Drain all buffered operations for replay. |
There was a problem hiding this comment.
📝 Nit: drain_buffered_operations clears orphans silently
The method clears orphaned operations without logging or returning them; this could mask bugs where operations are buffered without corresponding delta_buffer entries.
Suggested fix:
Consider debug-asserting that `buffered_operations.is_empty()` after the loop, or returning the orphan count for test verification.
| false | ||
| } else { | ||
| self.deltas.push_back(delta); | ||
| true |
There was a problem hiding this comment.
DeltaBuffer push falsely increments drops at zero capacity
Low Severity
When capacity is 0, the condition self.deltas.len() >= self.capacity (i.e. 0 >= 0) is immediately true on the first push, causing pop_front() on an empty VecDeque (returns None — nothing actually evicted) while still incrementing drops. This produces a false positive drop metric and allows the buffer to grow to size 1, exceeding its stated capacity of 0. The drops counter, which the delivery contract says "MUST be observable," becomes inaccurate.
| for op in operations { | ||
| n.apply_storage_op(op); | ||
| } | ||
| } |
There was a problem hiding this comment.
SyncComplete replay omits write metrics recording
Low Severity
The SyncComplete handler replays buffered operations via apply_storage_op but does not call self.metrics.work.record_write(), while the GossipDelta handler (idle path) and apply_actions both call record_write() for every storage op applied. This inconsistency means replayed writes are invisible to simulation metrics.
|
Bugbot Autofix prepared fixes for 2 of the 2 bugs found in the latest run.
Or push these changes by commenting: Preview (ec606d5fe9)diff --git a/crates/node/primitives/src/delta_buffer.rs b/crates/node/primitives/src/delta_buffer.rs
--- a/crates/node/primitives/src/delta_buffer.rs
+++ b/crates/node/primitives/src/delta_buffer.rs
@@ -92,8 +92,10 @@
pub fn push(&mut self, delta: BufferedDelta) -> bool {
if self.deltas.len() >= self.capacity {
// Evict oldest delta (front of queue)
- let _evicted = self.deltas.pop_front();
- self.drops += 1;
+ // Only increment drops if we actually evicted something
+ if self.deltas.pop_front().is_some() {
+ self.drops += 1;
+ }
self.deltas.push_back(delta);
false
} else {
diff --git a/crates/node/tests/sync_sim/sim_runtime.rs b/crates/node/tests/sync_sim/sim_runtime.rs
--- a/crates/node/tests/sync_sim/sim_runtime.rs
+++ b/crates/node/tests/sync_sim/sim_runtime.rs
@@ -622,6 +622,7 @@
for (_delta_id, operations) in buffered_ops {
for op in operations {
n.apply_storage_op(op);
+ self.metrics.work.record_write();
}
} |



Summary
Add comprehensive simulation tests verifying delta buffering behavior during state-based sync (Invariant I6: deltas arriving during sync MUST be preserved and applied after sync completes).
Changes:
DeltaBufferwith FIFO eviction (oldest-first drop policy)cancel_sync_sessionfor proper error handlingTest Plan
Note
Medium Risk
Changes snapshot-sync buffering semantics from fail-on-full to evict-on-full and adds new session lifecycle paths (
cancel_sync_session), which can affect sync correctness and data loss behavior under load.Overview
Implements and tests Invariant I6 delta buffering during snapshot sync.
DeltaBufferis reworked to be a bounded FIFO (VecDeque) with configurable capacity (default10_000), oldest-first eviction instead of erroring, and an observabledropscounter.Node sync session handling is updated to use the new buffer semantics: rate-limited warnings on eviction, explicit
cancel_sync_session()on snapshot-sync failure, and end-of-sync logging/metrics. The sync simulation gains new events (GossipDelta,SyncStart,SyncComplete) plus a newbufferingscenario suite that validates buffering, FIFO replay, crash clearing, and convergence behavior.Written by Cursor Bugbot for commit 2d3e04a. This will update automatically on new commits. Configure here.