test(sync): add delta buffering simulation tests for Invariant I6#1959
test(sync): add delta buffering simulation tests for Invariant I6#1959
Conversation
Add comprehensive simulation tests verifying delta buffering behavior during state-based sync (Invariant I6: deltas arriving during sync MUST be preserved and applied after sync completes). Changes: - Enhanced DeltaBuffer with FIFO eviction (oldest-first drop policy) - Added drops counter and configurable capacity - Integrated cancel_sync_session for proper error handling - Added 7 simulation test scenarios covering: - Deltas buffered during active sync - Buffered deltas replayed on completion - Immediate application when idle - Buffer cleared on crash - Multiple deltas preserved in FIFO order - Convergence blocked while buffer non-empty - Complex snapshot sync with concurrent writes All 176 sync_sim tests pass.
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 94% | Review time: 171.8s
💡 4 suggestions, 📝 2 nitpicks. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-4c16169e
This comment has been minimized.
This comment has been minimized.
Addresses all review comments from PR #1959: **Bug Fixes:** - Fix zero capacity edge case in DeltaBuffer::push - now correctly drops incoming delta and returns its ID instead of phantom drops - Change push() return type to Option<[u8; 32]> to return the evicted delta's ID (fixes misleading log message issue) - Add record_write() metrics in SyncComplete replay loop for consistent observability **Code Quality:** - Remove dead code get_buffer_metrics() with #[allow(dead_code)] - Add finish_sync() method on SimNode to atomically drain buffered ops, clear buffer, and reset sync state (prevents partial cleanup) - Extract rate-limit duration as DELTA_BUFFER_DROP_WARNING_RATE_LIMIT_S constant **Test Coverage:** - Add test_zero_capacity_drops_immediately unit test - Add test_buffer_overflow_fifo_eviction simulation test - Add test_buffer_drops_accumulated simulation test - Add buffer_drops metric to EffectMetrics All 178 sync_sim tests pass, 7 delta_buffer unit tests pass.
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 95% | Review time: 281.4s
🟡 2 warnings, 💡 5 suggestions. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-9a4999e1
| /// incoming deltas but may lose old ones under extreme load. | ||
| /// | ||
| /// Returns `None` if the delta was added without eviction, or `Some(evicted_id)` | ||
| /// containing the ID of the evicted delta if one was dropped. |
There was a problem hiding this comment.
🟡 Inconsistent return semantics between zero-capacity and overflow cases
When capacity is 0, push returns the incoming delta's ID; when evicting due to overflow, it returns the evicted delta's ID. This inconsistency can confuse callers about what the returned ID represents.
Suggested fix:
Consider always returning the dropped/evicted delta's ID, or use a richer return type like `enum PushResult { Added, Evicted([u8;32]), DroppedIncoming([u8;32]) }` to make semantics explicit.
| } | ||
|
|
||
| /// Add a delta to the buffer. | ||
| /// |
There was a problem hiding this comment.
🟡 No deduplication allows buffer exhaustion via replay attacks
The push() method doesn't check for duplicate delta IDs before adding, allowing an attacker to repeatedly send the same delta during sync to fill the buffer and evict legitimate deltas (simulation code has this check, production doesn't).
Suggested fix:
Add deduplication check: return early if `delta.id` already exists in `deltas` to prevent replay-based buffer exhaustion.
| pub(crate) fn buffer_delta( | ||
| &self, | ||
| context_id: &ContextId, | ||
| delta: calimero_node_primitives::delta_buffer::BufferedDelta, |
There was a problem hiding this comment.
💡 Return type no longer indicates data loss to callers
The buffer_delta method now always returns true when a session exists, even if eviction occurred; callers cannot distinguish successful buffering from buffering-with-data-loss without checking logs.
Suggested fix:
Consider returning an enum or struct indicating whether eviction happened, allowing callers to react appropriately (e.g., trigger early sync completion).
| last.elapsed() | ||
| > Duration::from_secs(constants::DELTA_BUFFER_DROP_WARNING_RATE_LIMIT_S) | ||
| }); | ||
|
|
There was a problem hiding this comment.
💡 Rate-limited warnings could mask sustained attack patterns
The 5-second rate limit on drop warnings could hide sustained delta-flooding attacks; consider also exposing a total drops metric endpoint for monitoring.
Suggested fix:
Ensure the `drops` counter is exported to a metrics system (not just logged) so monitoring can detect sustained eviction patterns.
| /// | ||
| /// Implements FIFO eviction when buffer is full. Returns `true` if added without | ||
| /// eviction, `false` if oldest delta was evicted. | ||
| pub fn buffer_delta(&mut self, delta_id: DeltaId) -> bool { |
There was a problem hiding this comment.
💡 SimNode uses O(n) Vec operations instead of VecDeque
The simulation uses Vec::contains() (O(n)) and Vec::remove(0) (O(n)) for buffer operations, while production DeltaBuffer uses VecDeque with O(1) operations; this inconsistency could mask performance issues.
Suggested fix:
Consider using VecDeque for delta_buffer in SimNode to match production behavior, or document this as intentional simplification for test code.
| let source_id = rt.add_existing_node(source); | ||
|
|
||
| // Fresh node (needs snapshot sync) | ||
| let fresh_id = rt.add_node("fresh"); |
There was a problem hiding this comment.
💡 Test relies on insertion order but doesn't verify FIFO replay order
The test test_multiple_deltas_preserved_fifo verifies counts but doesn't assert that the entities were created in the expected FIFO order; a timestamp or sequence check would strengthen the invariant.
Suggested fix:
Store and verify a sequence number or timestamp in each entity's data to confirm FIFO replay order.
| /// incoming deltas but may lose old ones under extreme load. | ||
| /// | ||
| /// Returns `None` if the delta was added without eviction, or `Some(evicted_id)` | ||
| /// containing the ID of the evicted delta if one was dropped. |
There was a problem hiding this comment.
💡 Zero capacity configuration silently drops all deltas
A capacity of 0 drops every incoming delta immediately; while documented, this could cause complete data loss during sync if misconfigured.
Suggested fix:
Consider adding a minimum capacity check (e.g., >= 100) or logging a startup warning when capacity is suspiciously low.
This comment has been minimized.
This comment has been minimized.
Bug Fixes: - #8: Fix SimNode buffer_delta zero capacity handling (was not incrementing drops) - #1: Add PushResult enum with clear semantics (Added, Duplicate, Evicted, DroppedZeroCapacity) - #2: Add deduplication via HashSet to prevent replay attacks - #3: Change buffer_delta return type to Option<PushResult> for data loss indication Improvements: - #4: Add TODO for Prometheus metrics export (drops counter already observable) - #5: Change SimNode delta_buffer from Vec to VecDeque for O(1) FIFO eviction - #6: Add test_fifo_order_last_writer_wins to verify actual replay order - #7: Add MIN_RECOMMENDED_CAPACITY (100) with warning at session start Test Coverage: - test_zero_capacity_drops_all (SimNode unit test) - test_zero_capacity_drops_all_deltas (simulation scenario) - test_fifo_order_last_writer_wins (FIFO order verification) - test_deduplication_* (3 tests for dedup behavior) - test_capacity_below_recommended All 182 sync_sim tests pass.
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 83% | Review time: 319.7s
💡 4 suggestions, 📝 2 nitpicks. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-9ae2eea1
| } | ||
|
|
||
| // Don't buffer duplicates (#2: deduplication prevents replay attacks) | ||
| if self.delta_buffer.contains(&delta_id) { |
There was a problem hiding this comment.
💡 Return value semantics differ from production PushResult
SimNode's buffer_delta() returns false when eviction occurs, but production's PushResult::Evicted has was_added() -> true; this semantic mismatch could cause simulation tests to miss edge cases where eviction behavior matters.
Suggested fix:
Consider returning a similar enum or struct to `PushResult` to maintain semantic parity with production code.
| /// | ||
| /// # Performance (#5) | ||
| /// | ||
| /// Uses `VecDeque::pop_front()` for O(1) FIFO eviction instead of O(n) `Vec::remove(0)`. |
There was a problem hiding this comment.
💡 O(n) deduplication check where production uses O(1)
SimNode uses VecDeque::contains() which is O(n), while production DeltaBuffer uses a HashSet for O(1) deduplication; this divergence could mask performance issues in tests with large buffers.
Suggested fix:
Add a `HashSet<DeltaId>` for deduplication in SimNode to match production behavior, similar to how `DeltaBuffer` tracks `seen_ids`.
| @@ -90,11 +90,21 @@ pub async fn handle_state_delta( | |||
| source_peer: source, | |||
| }; | |||
|
|
|||
There was a problem hiding this comment.
💡 DRY: Duplicated buffer-check pattern
The pattern if let Some(result) = node_state.buffer_delta(...) { if result.was_added() || matches!(...Duplicate) { return Ok(()); } } is repeated twice; consider extracting a helper function.
Suggested fix:
Extract a helper like `fn try_buffer_and_return_if_handled(node_state, context_id, buffered) -> Option<Result<()>>`
|
|
||
| if node_state.buffer_delta(&context_id, buffered) { | ||
| return Ok(()); // Successfully buffered, will be replayed after sync | ||
| if let Some(result) = node_state.buffer_delta(&context_id, buffered) { |
There was a problem hiding this comment.
💡 Simplify pattern match for non-drop case
The check result.was_added() || matches!(result, PushResult::Duplicate) could be simplified to !matches!(result, PushResult::DroppedZeroCapacity(_)) since that's the only case that should fall through.
Suggested fix:
Replace with `if !matches!(result, calimero_node_primitives::delta_buffer::PushResult::DroppedZeroCapacity(_))`
| /// If capacity is 0, the incoming delta is immediately dropped (not added) | ||
| /// and this method returns `false`. This matches production `DeltaBuffer::push`. | ||
| /// | ||
| /// # Performance (#5) |
There was a problem hiding this comment.
📝 Nit: SimNode.buffer_delta returns bool while production returns PushResult
The simulation's buffer_delta returns a simple bool whereas production returns PushResult with rich semantics; tests may miss edge cases around Duplicate vs Added distinction.
Suggested fix:
Consider returning an enum similar to PushResult for closer fidelity to production behavior
| /// # Deduplication | ||
| /// | ||
| /// If a delta with the same ID is already in the buffer, it is not added | ||
| /// again and `PushResult::Duplicate` is returned. This prevents replay attacks. |
There was a problem hiding this comment.
📝 Nit: Zero-capacity buffer counts duplicates as drops
When capacity is 0, the drop check happens before deduplication, so pushing the same delta twice counts as 2 drops even though it's the same delta; this may be intentional but could inflate metrics.
Suggested fix:
Document this behavior explicitly or consider checking deduplication first even for zero-capacity buffers.
|
Bugbot Autofix prepared fixes for 1 of the 1 bugs found in the latest run.
Or push these changes by commenting: Preview (99afa7e1d6)diff --git a/crates/node/tests/sync_sim/node/state.rs b/crates/node/tests/sync_sim/node/state.rs
--- a/crates/node/tests/sync_sim/node/state.rs
+++ b/crates/node/tests/sync_sim/node/state.rs
@@ -8,7 +8,7 @@
use std::collections::{HashMap, HashSet, VecDeque};
use calimero_node_primitives::sync::handshake::SyncHandshake;
-use calimero_node_primitives::sync::state_machine::{build_handshake, LocalSyncState};
+use calimero_node_primitives::sync::state_machine::{LocalSyncState, build_handshake};
use calimero_primitives::crdt::CrdtType;
use crate::sync_sim::actions::{EntityMetadata, StorageOp};
@@ -330,6 +330,17 @@
self.delta_buffer.clear();
}
+ /// Reset buffer state for a new sync session.
+ ///
+ /// Clears both delta_buffer and buffered_operations to match production
+ /// behavior where `start_sync_session` creates a fresh `DeltaBuffer`.
+ /// This ensures stale buffered deltas from a previous interrupted sync
+ /// are discarded when a new sync session starts.
+ pub fn reset_buffer_state(&mut self) {
+ self.delta_buffer.clear();
+ self.buffered_operations.clear();
+ }
+
/// Get buffer drops count.
pub fn buffer_drops(&self) -> u64 {
self.buffer_drops
@@ -509,7 +520,7 @@
fn root_hash(&self) -> [u8; 32] {
// Compute digest without using cache (trait requires &self, cache needs &mut self)
// This is acceptable for protocol negotiation which is infrequent
- use crate::sync_sim::digest::{compute_state_digest, DigestEntity};
+ use crate::sync_sim::digest::{DigestEntity, compute_state_digest};
let entities: Vec<DigestEntity> = self.storage.iter().cloned().collect();
compute_state_digest(&entities).0
}
diff --git a/crates/node/tests/sync_sim/sim_runtime.rs b/crates/node/tests/sync_sim/sim_runtime.rs
--- a/crates/node/tests/sync_sim/sim_runtime.rs
+++ b/crates/node/tests/sync_sim/sim_runtime.rs
@@ -6,7 +6,7 @@
use crate::sync_sim::actions::{SyncActions, SyncMessage, TimerOp};
use crate::sync_sim::convergence::{
- check_convergence, is_deadlocked, ConvergenceInput, ConvergenceResult, NodeConvergenceState,
+ ConvergenceInput, ConvergenceResult, NodeConvergenceState, check_convergence, is_deadlocked,
};
use crate::sync_sim::metrics::SimMetrics;
use crate::sync_sim::network::{FaultConfig, NetworkRouter, SimEvent};
@@ -630,6 +630,9 @@
use crate::sync_sim::node::SyncState;
if let Some(n) = self.nodes.get_mut(&node) {
if !n.is_crashed {
+ // Reset buffer state to match production behavior where
+ // start_sync_session creates a fresh DeltaBuffer
+ n.reset_buffer_state();
// Transition to syncing state (simulates snapshot sync starting)
n.sync_state = SyncState::SnapshotTransfer {
peer: NodeId::new("peer"), |
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 100% | Review time: 196.0s
💡 3 suggestions, 📝 2 nitpicks. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-42b183ce
| /// eviction, `false` if oldest delta was evicted or dropped. | ||
| /// | ||
| /// # Edge case: zero capacity | ||
| /// |
There was a problem hiding this comment.
💡 SimNode::buffer_delta return semantics differ from production PushResult
SimNode returns true for duplicates and false for eviction, but production PushResult::was_added() returns false for Duplicate and true for Evicted; this semantic mismatch may cause simulation tests to not accurately reflect production behavior.
Suggested fix:
Consider returning a simplified enum matching production semantics, or document the intentional divergence.
| /// | ||
| /// Uses `VecDeque::pop_front()` for O(1) FIFO eviction instead of O(n) `Vec::remove(0)`. | ||
| pub fn buffer_delta(&mut self, delta_id: DeltaId) -> bool { | ||
| // Handle zero capacity: drop incoming delta immediately (matches production) |
There was a problem hiding this comment.
💡 O(n) deduplication check on VecDeque
VecDeque::contains() is O(n) for each buffer_delta call, while production DeltaBuffer uses O(1) HashSet lookup; with large buffers and many pushes this becomes O(n²).
Suggested fix:
Add a `HashSet<DeltaId>` alongside the VecDeque to track seen IDs, mirroring the production implementation.
|
|
||
| if node_state.buffer_delta(&context_id, buffered) { | ||
| return Ok(()); // Successfully buffered, will be replayed after sync | ||
| if let Some(result) = node_state.buffer_delta(&context_id, buffered) { |
There was a problem hiding this comment.
💡 Repeated pattern for checking buffered result should be extracted
The condition result.was_added() || matches!(result, PushResult::Duplicate) appears twice in this file (lines 93-102 and 125-134), violating DRY; consider adding a helper method like should_skip_processing() on PushResult.
Suggested fix:
Add `pub fn should_skip_processing(&self) -> bool { matches!(self, Self::Added | Self::Evicted(_) | Self::Duplicate) }` to `PushResult` and use it in both places.
| /// | ||
| /// If capacity is 0, the incoming delta is immediately dropped (not added) | ||
| /// and this method returns `false`. This matches production `DeltaBuffer::push`. | ||
| /// |
There was a problem hiding this comment.
📝 Nit: Simulation buffer_delta returns bool while production returns PushResult
The SimNode::buffer_delta returns a simple bool whereas production DeltaBuffer::push returns a rich PushResult enum; this divergence may limit the ability to test all edge cases faithfully.
Suggested fix:
Consider returning a similar enum from SimNode to enable richer assertions in simulation tests.
| /// | ||
| /// If capacity is 0, the incoming delta is immediately dropped (not added) | ||
| /// and `PushResult::DroppedZeroCapacity` is returned with the dropped delta's ID. | ||
| pub fn push(&mut self, delta: BufferedDelta) -> PushResult { |
There was a problem hiding this comment.
📝 Nit: Unreachable else branch in push eviction logic
The else branch after pop_front() is logically unreachable since capacity > 0 and len >= capacity guarantees at least one element exists.
Suggested fix:
Consider using `expect` with a safety comment or removing the else branch with an `unreachable!()` macro for clarity.
| pub fn reset_buffer_state(&mut self) { | ||
| self.delta_buffer.clear(); | ||
| self.buffered_operations.clear(); | ||
| } |
There was a problem hiding this comment.
Simulation reset_buffer_state omits buffer_drops counter reset
Medium Severity
reset_buffer_state clears delta_buffer and buffered_operations but does not reset buffer_drops to zero. Its own doc comment says it matches production behavior where start_sync_session creates a fresh DeltaBuffer, but production's DeltaBuffer::new() initializes drops: 0. This means buffer_drops() accumulates across sync sessions in the sim, unlike production. Any test verifying per-session drop counts across multiple sync sessions will observe incorrect values.



Summary
DeltaBufferwith FIFO eviction, drops counter, and configurable capacitycancel_sync_sessionfor proper error handling inSyncManagerCIP Sections
Implements CIP Invariant I6 - Liveness Guarantee:
Changes
DeltaBuffer(crates/node/primitives/src/delta_buffer.rs)dropscounter for observabilitywith_capacity()SyncManager(crates/node/src/sync/manager.rs)cancel_sync_session()call on sync errorsSimulation Framework
SimEventwithGossipDelta,SyncStart,SyncCompletevariantsSimNodewithbuffered_operationstrackingSimRuntimefor buffering behaviorTest Scenarios (
crates/node/tests/sync_sim/scenarios/buffering.rs)7 new test cases:
test_deltas_buffered_during_sync- deltas buffered when sync activetest_buffered_deltas_replayed_on_completion- replay on sync finishtest_deltas_applied_immediately_when_idle- immediate apply when idletest_buffered_deltas_cleared_on_crash- buffer cleared on crashtest_multiple_deltas_preserved_fifo- FIFO ordering preservedtest_convergence_blocked_with_buffered_deltas- convergence requires empty buffertest_snapshot_sync_with_concurrent_writes- complex real-world scenarioTest Plan
cargo build -p calimero-node --testspassescargo test --test sync_simpasses (176 tests)Invariants
Note
Medium Risk
Touches core sync buffering behavior and introduces eviction/drop handling that can affect delta delivery and post-snapshot replay ordering. While heavily test-covered (unit + simulation), incorrect integration could still cause missed deltas or unexpected fall-through processing under load/zero-capacity configurations.
Overview
Strengthens snapshot-sync delta buffering (Invariant I6) by making
DeltaBuffercapacity-driven with FIFO eviction, O(1) ID deduplication, and explicitPushResultoutcomes (including zero-capacity drops) plus a per-bufferdropscounter for observability.Integrates the new buffering contract into runtime sync flow:
NodeState::buffer_deltanow returnsPushResult, logs rate-limited data-loss warnings, supports custom capacities on session start, addscancel_sync_sessionfor failure cleanup, and ensures snapshot-sync failures discard buffered deltas.handle_state_deltais updated to only short-circuit when deltas were actually buffered (or duplicates), otherwise falling back to normal processing.Expands the sync simulator to model buffering explicitly (
GossipDelta/SyncStart/SyncCompleteevents, FIFO buffer + buffered operations replay, drop metrics) and adds a comprehensivescenarios/buffering.rssuite covering buffering during sync, FIFO replay, crash/cleanup, overflow/zero-capacity behavior, convergence gating, and concurrent writes during snapshot sync.Written by Cursor Bugbot for commit 7899b12. This will update automatically on new commits. Configure here.