feat(sync): implement HashComparison protocol with CRDT merge#1970
feat(sync): implement HashComparison protocol with CRDT merge#1970
Conversation
…tocol testing Replaces the flat `DigestCache` HashMap in `SimNode` with `SimStorage`, which uses the real `calimero-storage::Index<MainStorage>` implementation backed by `InMemoryDB`. This enables accurate simulation of sync protocols that depend on tree structure (e.g., HashComparison). Key changes: - Add `SimStorage` with in-memory Merkle tree using `Store + InMemoryDB` - Add `RuntimeEnv` bridge to connect storage Key operations - Update `SimNode` to use hybrid storage: real tree + metadata cache - Add `insert_entity_hierarchical()` for creating proper tree depth - Make `Index::get_index()` and `get_children_of()` public for traversal - Add tree structure verification tests for protocol selection - Fix: prevent self-referencing cycle in hierarchical insertion The entity counting now correctly excludes intermediate tree nodes, and `iter_entities()` returns only "real" entities (with metadata). Tree depth now affects protocol selection: - SubtreePrefetch scenarios have max_depth > 3 - LevelWise scenarios have max_depth <= 2 Spec reference: Simulation Framework Spec §5, §7, §11 Co-authored-by: cursor[bot] <cursor@calimero.network>
- Delegate apply_storage_op Insert/Update to insert_entity_with_metadata to avoid duplicating dual-write logic (cursor bugbot feedback) - Extract magic number 24 to MAX_HIERARCHICAL_DEPTH constant with docs - Add comprehensive documentation for max_depth() semantics explaining the difference between storage-level (root-inclusive) and protocol-level (root-exclusive) depth values
Implements the HashComparison sync protocol (CIP §4) with proper integration into SyncManager and comprehensive test coverage. Key changes: - Add HashComparison protocol implementation with iterative DFS traversal - Integrate with SyncManager for initiator and responder roles - Add wire protocol types (TreeNodeRequest, TreeNodeResponse) in new wire.rs - Implement CRDT merge at leaves with proper timestamp extraction (I5) - Add force_protocol mechanism in SimNode for testing - Add 10 HashComparison simulation tests - Add compliance tests for I4 (convergence) and I5 (CRDT merge) Invariants verified: - I5: CRDT merge only, no overwrite for initialized nodes - I4: Strategy equivalence verified via compliance tests Test coverage: 230 tests passing
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 96% | Review time: 303.4s
🟡 5 warnings, 💡 3 suggestions. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-4bd028af
Address meroreviewer feedback: 1. DFS stack overflow protection: Fail sync session instead of silently truncating (would break convergence guarantees) 2. CRDT merge fallback: Return error for Counter/Collection/Custom types instead of falling back to incoming value (violates I5) 3. max_depth validation: Add MAX_TREE_REQUEST_DEPTH constant (16) and clamp requests in handler to prevent expensive deep traversals 4. TOCTOU documentation: Add note explaining why read-merge-write is acceptable (serialized sync, CRDT commutativity, I6 buffering) 5. UserStorage/FrozenStorage: Keep LWW fallback since these are single-writer (safe for these types only)
BUG FIX (High Severity): Child nodes were incorrectly compared against parent's local version instead of their own local versions. When requesting with max_depth > 0, the response includes the requested node AND its children. The previous code compared ALL nodes in the response against `local_node` (the parent), but child nodes need to be compared against their own local versions. This broke the core subtree-skipping optimization of HashComparison and could cause incorrect comparison results. Fix: Look up the local version of EACH remote_node inside the loop, properly handling the is_root_request flag for root vs child nodes.
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 97% | Review time: 301.8s
🔴 1 critical, 🟡 6 warnings, 💡 3 suggestions. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-802e4fc9
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 96% | Review time: 323.5s
🟡 5 warnings, 💡 4 suggestions. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-0075f014
|
Bugbot Autofix prepared fixes for 3 of the 3 bugs found in the latest run.
Or push these changes by commenting: Preview (fce9a02616)diff --git a/crates/node/src/sync/hash_comparison.rs b/crates/node/src/sync/hash_comparison.rs
--- a/crates/node/src/sync/hash_comparison.rs
+++ b/crates/node/src/sync/hash_comparison.rs
@@ -299,7 +299,10 @@
}
// Process each node in response
- for remote_node in nodes {
+ // nodes[0] is the requested node, nodes[1..] are children (when max_depth >= 1)
+ // Only compare nodes[0] with local_node; children will be processed when popped
+ // from the to_compare stack in future iterations.
+ for (idx, remote_node) in nodes.iter().enumerate() {
// Validate node structure
if !remote_node.is_valid() {
warn!(%context_id, "Invalid TreeNode structure, skipping");
@@ -310,6 +313,8 @@
if remote_node.is_leaf() {
// Leaf node: apply CRDT merge (Invariant I5)
+ // CRDT merge reads existing value directly from storage, so it's
+ // correct for any leaf regardless of which iteration we're in.
if let Some(ref leaf_data) = remote_node.leaf_data {
trace!(
%context_id,
@@ -318,16 +323,17 @@
"Merging leaf entity via CRDT"
);
- self.apply_leaf_with_crdt_merge(context_id, leaf_data)
+ self.apply_leaf_with_crdt_merge(context_id, leaf_data, &runtime_env)
.await
.wrap_err("failed to merge leaf entity")?;
stats.entities_merged += 1;
}
- } else {
- // Internal node: compare with local and queue differing children
- let compare_result =
- compare_tree_nodes(local_node.as_ref(), Some(&remote_node));
+ } else if idx == 0 {
+ // Internal node at index 0: this is the requested node, compare with local
+ // For children (idx > 0), skip comparison here - they will be processed
+ // when they are popped from the to_compare stack with their correct local versions.
+ let compare_result = compare_tree_nodes(local_node.as_ref(), Some(remote_node));
match compare_result {
TreeCompareResult::Equal => {
@@ -384,6 +390,9 @@
}
}
}
+ // Internal nodes at idx > 0 are children included in the response.
+ // They are NOT compared here because local_node is the parent's local version.
+ // They will be processed correctly when popped from to_compare stack.
}
}
@@ -513,12 +522,17 @@
&self,
context_id: ContextId,
leaf: &TreeLeafData,
+ runtime_env: &RuntimeEnv,
) -> Result<()> {
// Get the datastore handle
let handle = self.context_client.datastore_handle();
- // Create the storage key for this entity
- let key = ContextStateKey::new(context_id, leaf.key);
+ // Create the storage key for this entity using the same hashing as
+ // create_storage_callbacks: Key::Entry(id).to_bytes() applies SHA256 hashing
+ // to (discriminant || entity_id) to produce the 32-byte state key.
+ let entity_id = Id::new(leaf.key);
+ let hashed_key = Key::Entry(entity_id).to_bytes();
+ let key = ContextStateKey::new(context_id, hashed_key);
// Get existing value to perform merge (copy to owned value to release borrow)
let existing_bytes: Option<Vec<u8>> = handle.get(&key)?.map(|v| v.as_ref().to_vec());
@@ -536,6 +550,7 @@
&leaf.value,
leaf.metadata.hlc_timestamp,
&leaf.metadata,
+ runtime_env,
)?
} else {
// No existing value - just use incoming (this is the only case where
@@ -562,6 +577,7 @@
incoming: &[u8],
incoming_timestamp: u64,
metadata: &LeafMetadata,
+ runtime_env: &RuntimeEnv,
) -> Result<Vec<u8>> {
use calimero_storage::merge::merge_root_state;
@@ -573,11 +589,14 @@
// Extract existing timestamp from stored metadata via Index
// The storage layer stores metadata separately from raw values
- let existing_ts = Index::<MainStorage>::get_index(Id::from(entity_key))
- .ok()
- .flatten()
- .map(|index| *index.metadata.updated_at)
- .unwrap_or(0);
+ // Must use with_runtime_env to ensure Index::get_index can access storage
+ let existing_ts = with_runtime_env(runtime_env.clone(), || {
+ Index::<MainStorage>::get_index(Id::from(entity_key))
+ })
+ .ok()
+ .flatten()
+ .map(|index| *index.metadata.updated_at)
+ .unwrap_or(0);
match merge_root_state(&existing, incoming, existing_ts, incoming_timestamp) {
Ok(merged) => Ok(merged), |

Summary
Implements the HashComparison sync protocol (CIP §4) with proper integration into SyncManager and comprehensive test coverage.
TreeNodeRequest,TreeNodeResponse) in newwire.rsforce_protocolmechanism in SimNode for testingInvariants Verified
Test Plan
cargo clippy- no warnings in new codecargo fmt- formattedFiles Changed
sync/hash_comparison.rs(new),sync/mod.rs,sync/manager.rsprimitives/sync/wire.rs(new),primitives/sync.rs,primitives/sync/snapshot.rstests/sync_sim/scenarios/hash_comparison.rs(new),tests/sync_sim/node/state.rstests/sync_compliance/crdt_merge.rs(new),tests/sync_compliance/convergence.rs(new)Note
Medium Risk
Introduces a new sync protocol path and new on-the-wire message variants, plus direct storage writes during CRDT merge; bugs here could impact state convergence or resource usage under adversarial peers.
Overview
Adds a production
HashComparisonsync implementation that traverses the Merkle tree via iterative DFS, requests subtrees from peers, and CRDT-merges leaf entities (enforcing Invariant I5), with basic DoS guards (request depth, response size, and pending-node limits).Refactors stream messaging by moving
StreamMessage/InitPayload/MessagePayloadout ofsnapshotinto a new sharedsync::wiremodule and extends the wire protocol withTreeNodeRequest/TreeNodeResponse(plusMAX_TREE_REQUEST_DEPTH).Wires
HashComparisonintoSyncManager(initiator + responder handling), including fallback to DAG catchup/snapshot on failure, and adds extensive new simulation and compliance tests (I4 convergence + I5 CRDT merge) plus aSimNode::force_protocoloverride to exercise specific protocol paths.Written by Cursor Bugbot for commit 8462073. This will update automatically on new commits. Configure here.