feat(sync): SyncProtocolExecutor trait for unified protocol testing#1972
feat(sync): SyncProtocolExecutor trait for unified protocol testing#1972
Conversation
Add infrastructure to run sync protocols through in-memory channels, enabling testing of actual message flow and state convergence. Key changes: - Add SyncTransport trait abstracting network operations (send/recv/close) - Add StreamTransport for production Stream wrapper - Add SimStream for in-memory channel-based transport - Add protocol.rs with execute_hash_comparison_sync for simulation - Add SimNode::new_in_context for shared context testing - Fix entity_count to use storage leaf_count (source of truth) The simulation now uses the exact same storage code path as production (Index<MainStorage>, Interface<MainStorage>, RuntimeEnv callbacks), with only the Database implementation differing (InMemoryDB vs RocksDB). Phase 1 of sim-transport-abstraction plan.
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 96% | Review time: 262.5s
🔴 2 critical, 🟡 2 warnings, 💡 2 suggestions, 📝 2 nitpicks. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-fe149d58
| /// # Errors | ||
| /// | ||
| /// Returns error if timeout expires or receive fails. | ||
| async fn recv_timeout(&mut self, timeout: Duration) -> Result<Option<StreamMessage<'static>>>; |
There was a problem hiding this comment.
🔴 Nonce reuse vulnerability in SyncTransport design
The set_encryption method stores a single nonce that is reused for all subsequent encrypt/decrypt calls; reusing a nonce with the same key completely breaks AEAD encryption (e.g., AES-GCM) security.
Suggested fix:
Either increment/rotate the nonce after each encryption operation, or require callers to pass a fresh nonce per message (matching the existing `send`/`recv` function signatures).
There was a problem hiding this comment.
Acknowledged, out of scope: This is pre-existing infrastructure for future encrypted sync. The HashComparison protocol currently doesn't use encryption (nonce is just for message framing). The nonce reuse concern will be addressed when encryption is actually integrated.
There was a problem hiding this comment.
Acknowledged - Deferred: Encryption is not currently used in the sync protocol (infrastructure for future encrypted sync). When encryption is enabled, nonce rotation will be implemented. The wire protocol already includes next_nonce fields to support proper rotation at the application layer.
|
|
||
| #[async_trait] | ||
| impl SyncTransport for StreamTransport { | ||
| async fn send(&mut self, message: &StreamMessage<'_>) -> eyre::Result<()> { |
There was a problem hiding this comment.
🔴 StreamTransport reuses encryption nonce across messages
The send implementation encrypts every message using the same stored nonce from self.encryption, which leads to nonce reuse when multiple messages are sent—catastrophic for AEAD ciphers.
Suggested fix:
Implement nonce rotation: either increment the nonce after each encryption or derive it from a message counter/sequence ID.
There was a problem hiding this comment.
Acknowledged, out of scope: Same as above - this is pre-existing encryption infrastructure. The HashComparison protocol currently doesn't use encryption. The nonce handling will be properly implemented when encrypted sync is integrated.
There was a problem hiding this comment.
Acknowledged - Deferred: Same as above - encryption is not currently used. The EncryptionState is infrastructure for future encrypted sync. Nonce rotation will be addressed when encryption is enabled.
| id, | ||
| data: data.to_vec(), | ||
| ancestors: vec![], | ||
| metadata: Metadata::default(), |
There was a problem hiding this comment.
🟡 Silent error suppression in update_entity_data may violate I5
The let _ = Interface::<MainStorage>::apply_action(action); calls discard errors, which could lead to silent data loss violating invariant I5 (no silent data loss).
Suggested fix:
Propagate errors or at minimum log them with a warning; consider returning Result from update_entity_data.
There was a problem hiding this comment.
Acknowledged, low priority: This is test helper code. The let _ = pattern is intentional to allow tests to pass even if entity doesn't exist (for testing sync to empty nodes). Production code uses proper error propagation via Interface::apply_action.
There was a problem hiding this comment.
Acknowledged - Test code: This is simulation test infrastructure, not production code. Using .expect() would be appropriate here, but silent failures in test setup are acceptable since test assertions will catch any issues. The production code path uses proper error handling via Interface::apply_action.
|
|
||
| // Run both sides | ||
| let (init_result, resp_result) = tokio::join!(initiator_fut, responder_fut); | ||
|
|
There was a problem hiding this comment.
🟡 All entities buffered in memory before applying
entities_to_merge collects all leaf entities before applying them, which could cause memory pressure on large syncs with many entities.
Suggested fix:
Apply entities incrementally as they're received rather than buffering all in memory, or use a bounded buffer with periodic flushing.
There was a problem hiding this comment.
Superseded: This code was removed. The simulation now calls the production HashComparisonProtocol::run_initiator which applies entities incrementally via Interface::apply_action.
| is_root: bool, | ||
| ) -> Option<TreeNode> { | ||
| let id = if is_root { | ||
| storage.root_id() |
There was a problem hiding this comment.
💡 Nit: Comment says BFS but implementation is DFS
Using queue.pop() gives LIFO (stack/DFS) behavior, not FIFO (queue/BFS) as the comment states; doesn't affect correctness but misleads readers.
Suggested fix:
Either change comment to 'DFS' or use `VecDeque` with `pop_front()` for actual BFS.
There was a problem hiding this comment.
Superseded: The old simulation run_initiator function was removed. The simulation now calls the production HashComparisonProtocol::run_initiator directly.
| // 1. All zeros (empty root marker) | ||
| // 2. The root ID (direct request) | ||
| // 3. The root HASH (most common case from handshake) | ||
| let root_hash = storage.root_hash(); |
There was a problem hiding this comment.
💡 CRDT merge comment doesn't match implementation
Comment claims 'last-write-wins based on timestamp' but the code unconditionally writes without any timestamp comparison, making the match arm for Some(_) misleading.
Suggested fix:
Either implement actual timestamp comparison or update the comment to accurately describe the always-write behavior.
There was a problem hiding this comment.
Superseded: The old apply_leaf_to_storage was removed. The simulation now uses the production apply_leaf_with_crdt_merge which correctly implements CRDT merge via Interface::apply_action.
| use futures_util::SinkExt as _; | ||
| self.stream.close().await?; | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
📝 Nit: Redundant import of SinkExt
SinkExt is already imported at the top of the file (line 15); the inner use futures_util::SinkExt as _; is redundant.
Suggested fix:
Remove the redundant import inside the `close` method.
There was a problem hiding this comment.
Fixed: The redundant SinkExt import inside the close method was removed in commit 6a7d56a. The top-level import is sufficient.
| None | ||
| }; | ||
|
|
||
| Some(TreeNode { |
There was a problem hiding this comment.
📝 Nit: Unnecessary read of existing data
storage.get_entity_data(id) is called but the result is always ignored since should_write is always true.
Suggested fix:
Remove the dead read or add a TODO comment if merge logic is planned for later.
There was a problem hiding this comment.
Superseded: The old simulation protocol code was removed. The simulation now uses the production HashComparisonProtocol directly, which handles entity comparison correctly.
This comment has been minimized.
This comment has been minimized.
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 75% | Review time: 357.1s
🟡 3 warnings, 💡 4 suggestions, 📝 1 nitpicks. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-c0c3ad44
|
|
||
| /// Set encryption parameters. | ||
| pub fn set(&mut self, encryption: Option<(SharedKey, Nonce)>) { | ||
| self.key_nonce = encryption; |
There was a problem hiding this comment.
🟡 Nonce reuse vulnerability in encryption helper
EncryptionState uses the same nonce for all encrypt operations without incrementing; AES-GCM nonce reuse with the same key completely breaks confidentiality and authenticity guarantees.
Suggested fix:
Add nonce increment after each encrypt operation, or document that callers must update the nonce between messages via set_encryption().
There was a problem hiding this comment.
Acknowledged, out of scope: Same as above - the EncryptionState is pre-existing infrastructure for encrypted sync. Proper nonce incrementing will be implemented when encryption is actually used.
There was a problem hiding this comment.
Acknowledged - Deferred: Same as comment 2799530476 - encryption infrastructure for future use. Nonce rotation will be implemented when encryption is enabled.
| /// | ||
| /// This updates an existing entity's data or creates it if it doesn't exist. | ||
| /// For new entities, creates them as direct children of root. | ||
| pub fn update_entity_data(&self, id: Id, data: &[u8]) { |
There was a problem hiding this comment.
🟡 Silent error handling in update_entity_data may violate I5
Multiple let _ = Interface::<MainStorage>::apply_action(action) calls silently discard the Result, so storage write failures go undetected—this could violate invariant I5 (no silent data loss).
Suggested fix:
Propagate or log the Result from apply_action, e.g., `Interface::<MainStorage>::apply_action(action)?;` or at minimum log on error.
There was a problem hiding this comment.
Acknowledged, low priority: Same as above - this is test helper code with intentional permissive error handling. Not a concern for production code.
There was a problem hiding this comment.
Acknowledged - Test code: Same as comment 2799531036 - this is test infrastructure. Silent failures are acceptable here since test assertions will catch any issues.
| /// Encryption state. | ||
| encryption: EncryptionState, | ||
| /// Default timeout for receive operations. | ||
| recv_timeout: Duration, |
There was a problem hiding this comment.
💡 YAGNI: buffer field declared but never populated
The buffer: VecDeque<Vec<u8>> is initialized and checked in recv_raw_timeout, but nothing ever pushes to it, making the peek/reorder testing capability mentioned in the comment unreachable.
Suggested fix:
Remove the buffer field and related checks until the reorder testing feature is actually needed, or implement the buffering functionality now.
There was a problem hiding this comment.
Acknowledged: The buffer field is infrastructure for potential future use (e.g., message buffering during encryption key rotation). Removing it is low priority since it has no runtime cost when empty.
There was a problem hiding this comment.
Acknowledged - Test infrastructure: The buffer is scaffolding for future fault injection testing (message reordering, delay simulation). Leaving it in place as it's harmless and will be useful for testing I6 (delta buffering) edge cases.
| @@ -302,14 +327,32 @@ impl SimNode { | |||
| /// - Intermediate nodes created by `insert_entity_hierarchical` don't have metadata | |||
There was a problem hiding this comment.
💡 Semantic change to entity_count may affect existing tests
Switching from entity_metadata.len() to storage.leaf_count() changes what entity_count means; ensure all 241 tests pass with this new semantics and document the change.
Suggested fix:
Add a doc comment explaining that entity_count now returns storage leaf count (source of truth) rather than metadata cache size.
There was a problem hiding this comment.
Acknowledged: This is an intentional change. The new entity_count() uses the proper Merkle tree iterator, which correctly counts entities in the tree structure. The previous implementation counted raw storage entries which could be inconsistent with the tree. All tests have been updated.
There was a problem hiding this comment.
Fixed: Doc comment updated in commit 78cd972 to accurately reflect that entity_count() uses storage.leaf_count() as the source of truth. All 247 tests pass.
| identity, | ||
| config, | ||
| ) | ||
| .await |
There was a problem hiding this comment.
💡 Hardcoded zero nonce in protocol messages
Using next_nonce: [0; NONCE_LEN] for all messages is acceptable for unencrypted simulation but could mask nonce-handling bugs if encryption is later enabled.
Suggested fix:
Consider using unique/incrementing nonces even in simulation to catch nonce-handling issues early, or add a comment clarifying this is intentionally test-only.
There was a problem hiding this comment.
Superseded: The old simulation protocol code was removed. The simulation now uses the production HashComparisonProtocol directly, which uses proper nonce generation via generate_nonce().
| initiator_context, | ||
| identity, | ||
| config, | ||
| ) |
There was a problem hiding this comment.
💡 Entities collected in memory before applying
All transferred entities accumulate in entities_to_merge (O(n) memory) before being applied after the loop; for very large syncs this could cause high memory usage.
Suggested fix:
Consider applying entities incrementally in batches rather than collecting all first, or document this as acceptable for simulation scale.
There was a problem hiding this comment.
Superseded: The old simulation protocol code was removed. The simulation now uses the production HashComparisonProtocol directly, which applies entities incrementally via Interface::apply_action.
There was a problem hiding this comment.
Superseded: The old simulation run_initiator with entities_to_merge buffer was removed. The simulation now calls production HashComparisonProtocol::run_initiator which applies entities incrementally via Interface::apply_action.
|
|
||
| /// Set encryption parameters. | ||
| pub fn set(&mut self, encryption: Option<(SharedKey, Nonce)>) { | ||
| self.key_nonce = encryption; |
There was a problem hiding this comment.
📝 Nit: get() clones key and nonce on every call
Returning self.key_nonce.clone() allocates; consider returning a reference or Option<&(SharedKey, Nonce)> if callers don't need ownership.
Suggested fix:
Change to `pub fn get(&self) -> Option<&(SharedKey, Nonce)>` if API allows.
There was a problem hiding this comment.
Acknowledged, acceptable: The clone is intentional for API safety - returning an owned value prevents callers from holding references into the internal state. For small fixed-size arrays (key+nonce), the clone cost is negligible.
There was a problem hiding this comment.
Acknowledged - Minor optimization: Valid point, but the clone is cheap (32-byte key + 12-byte nonce) and only called during encryption setup, not per-message. Can optimize if profiling shows it matters.
Introduces a trait-based architecture for sync protocols that enables: - Same protocol code to run in production and simulation - Shared storage bridge (create_runtime_env) for both backends - Standalone HashComparisonProtocol implementation Changes: - Add SyncProtocolExecutor trait in node-primitives - Add create_runtime_env shared helper in storage_bridge.rs - Extract HashComparisonProtocol to standalone module - Clean up hash_comparison.rs (responder only, ~875 → ~285 lines) - Fix wire protocol to use u64 for sequence_id (portability) - Update simulation tests to use production protocol directly This removes ~730 lines of duplicated code and ensures the simulation tests exercise the exact same protocol logic as production.
0bebce1 to
6a7d56a
Compare
PR Comment ReviewSeveral comments have been addressed or superseded by the latest changes: Superseded (old simulation protocol code removed)The following comments were on the old reimplemented protocol code in
Fixed
Pre-existing / Out of Scope
|
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 90% | Review time: 313.9s
🟡 3 warnings, 💡 4 suggestions. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-960edfa9
| /// | ||
| /// # Errors | ||
| /// | ||
| /// Returns error if encryption fails. |
There was a problem hiding this comment.
🟡 EncryptionState does not enforce nonce rotation between messages
The encrypt/decrypt methods reuse the same nonce stored in key_nonce until explicitly updated via set(). For stream ciphers like AES-GCM, nonce reuse with the same key completely breaks confidentiality and authenticity. While the wire protocol includes next_nonce fields suggesting proper rotation is expected at a higher level, this helper provides no safeguard against misuse.
Suggested fix:
Consider either: (1) incrementing/consuming the nonce after each encrypt call, (2) accepting nonce as a parameter to encrypt/decrypt rather than storing it, or (3) adding documentation clearly stating the caller MUST update the nonce between operations.
There was a problem hiding this comment.
Acknowledged - Deferred: Same as other encryption comments - infrastructure for future encrypted sync. The wire protocol's next_nonce fields support application-level rotation when encryption is enabled.
| Message { | ||
| /// Sequence number for ordering. | ||
| sequence_id: usize, | ||
| sequence_id: u64, |
There was a problem hiding this comment.
🟡 Breaking wire format change from usize to u64
Changing sequence_id from usize to u64 is a wire protocol breaking change; nodes with mismatched versions may fail to deserialize messages during rolling deployments.
Suggested fix:
Document this as a breaking change and consider protocol versioning or ensure all nodes upgrade atomically.
There was a problem hiding this comment.
Fixed in 0d73005: Added doc comment documenting this as a breaking wire format change that requires coordinated upgrades.
There was a problem hiding this comment.
Documented: Added explicit doc comment to sequence_id field in commit 0d73005 explaining this is a breaking wire format change requiring coordinated node upgrades.
| /// * `transport` - Transport for sending/receiving messages | ||
| /// * `nonce` - Initial nonce (unused; each response generates its own nonce) | ||
| pub async fn handle_tree_node_request<T: SyncTransport>( | ||
| &self, |
There was a problem hiding this comment.
💡 Responder loop has no upper bound on total requests
The handle_tree_node_request loop processes requests indefinitely until the stream closes. While each response is bounded by MAX_NODES_PER_RESPONSE, a malicious peer could send unlimited requests to exhaust CPU/IO resources.
Suggested fix:
Consider adding a maximum request count or a deadline after which the responder terminates, similar to how the removed initiator code had `MAX_PENDING_NODES` for DoS protection.
There was a problem hiding this comment.
Acknowledged - Deferred: Valid DoS concern. The responder is bounded per-response by MAX_NODES_PER_RESPONSE and the stream will timeout/close naturally. Adding request limits is future hardening work.
| mod tests { | ||
| use super::*; | ||
|
|
||
| #[test] |
There was a problem hiding this comment.
💡 Unit tests only cover passthrough mode, not actual encryption
The test test_encryption_state_passthrough only verifies behavior when no encryption is configured. There are no tests verifying that encryption/decryption actually work correctly or that nonce misuse is detectable.
Suggested fix:
Add tests that configure encryption with a real key/nonce and verify encrypt followed by decrypt returns the original data, and that different nonces produce different ciphertexts.
There was a problem hiding this comment.
Acknowledged - Future work: Valid point. Encryption unit tests will be added when encryption is actually integrated into the sync protocol. Currently the infrastructure is in place but unused.
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 96% | Review time: 303.4s
🔴 1 critical, 🟡 3 warnings, 💡 4 suggestions. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-cf09d9ba
| /// Encrypt data if encryption is configured. | ||
| /// | ||
| /// # Errors | ||
| /// |
There was a problem hiding this comment.
🔴 Nonce reuse vulnerability in EncryptionState
The encrypt/decrypt methods reuse the same nonce for every call until set_encryption() is called again; reusing a nonce with the same key in AES-GCM completely breaks encryption security, allowing plaintext recovery.
Suggested fix:
Either increment/rotate the nonce after each encrypt/decrypt operation within EncryptionState, or clearly document that callers MUST call set_encryption with a fresh nonce before each operation and add debug assertions to detect reuse.
| let write: Rc<dyn Fn(Key, &[u8]) -> bool> = { | ||
| let handle_cell: Rc<RefCell<_>> = Rc::new(RefCell::new(store.handle())); | ||
| let ctx_id = context_id; | ||
| Rc::new(move |key: Key, value: &[u8]| { |
There was a problem hiding this comment.
🟡 Silent write failures may cause data loss
The write callback returns false on error without logging, unlike the read callback which logs errors; this inconsistency makes debugging storage failures difficult and could mask data corruption.
Suggested fix:
Add warn! logging for write failures similar to the read callback's error handling.
| .put(&state_key, &state_value) | ||
| .is_ok() | ||
| }) | ||
| }; |
There was a problem hiding this comment.
🟡 Silent remove failures may cause data inconsistency
The remove callback returns false on error without logging, which could mask failed deletions and lead to stale data persisting unnoticed.
Suggested fix:
Add warn! logging for remove failures to match read callback's error handling pattern.
| }, | ||
| next_nonce: super::helpers::generate_nonce(), | ||
| }; | ||
| transport.send(&msg).await?; |
There was a problem hiding this comment.
💡 Unnecessary async function with no await points
The build_tree_node_response method is marked async but contains no .await calls, making the async unnecessary.
Suggested fix:
Remove the `async` keyword and return `Result<TreeNodeResponse>` directly, or wrap the result in `async {}` block at call sites if needed.
| "Handling subsequent TreeNodeRequest" | ||
| ); | ||
|
|
||
| let clamped_depth = max_depth.map(|d| d.min(MAX_REQUEST_DEPTH)); |
There was a problem hiding this comment.
💡 N database lookups for N children in build_tree_node_response
Each child node requires a separate with_runtime_env + DB lookup call; for nodes with many children this creates N sequential database operations that could potentially be batched.
Suggested fix:
Consider adding a batch lookup API to retrieve multiple child nodes in a single database call.
| /// This is the responder side of HashComparison sync. | ||
| /// Handles the first request (already parsed) and then loops to handle | ||
| /// subsequent requests until the stream closes. | ||
| /// |
There was a problem hiding this comment.
🟡 YAGNI violation: unused parameter reserved for future use
The _nonce parameter is documented as 'Reserved for future encrypted sync' but is unused; this adds unnecessary API surface.
Suggested fix:
Remove the `_nonce` parameter and add it when actually needed for encryption.
| use calimero_storage::index::Index; | ||
| use calimero_storage::store::MainStorage; | ||
| use calimero_store::db::InMemoryDB; | ||
|
|
There was a problem hiding this comment.
💡 Test only verifies no panic, not actual functionality
The test test_create_runtime_env_with_inmemory only checks that calling Index::get_index doesn't panic; it would be more valuable to verify that write/read round-trips work correctly.
Suggested fix:
Add a test that writes via the RuntimeEnv callbacks and verifies the data can be read back.
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 85% | Review time: 264.1s
💡 7 suggestions, 📝 2 nitpicks. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-4b2b56e3
| match handle.get(&state_key) { | ||
| Ok(Some(state)) => Some(state.value.into_boxed().into_vec()), | ||
| Ok(None) => None, | ||
| Err(e) => { |
There was a problem hiding this comment.
💡 Storage read errors are silently masked
When storage read fails, the error is logged but None is returned, potentially masking storage corruption or integrity issues from higher-level code.
Suggested fix:
Consider propagating the error or providing a mechanism for callers to detect masked failures (e.g., a metric counter).
| /// `calimero-store` ContextStateKey-based storage. | ||
| #[expect( | ||
| clippy::type_complexity, | ||
| reason = "Matches RuntimeEnv callback signatures" |
There was a problem hiding this comment.
💡 Three separate store handles created for read/write/remove
Each callback creates its own store.handle(), meaning read, write, and remove use different handles; if handles maintain transaction-level caches, read-after-write within one with_runtime_env block may not see uncommitted writes.
Suggested fix:
Consider sharing a single `RefCell<Handle>` across all three callbacks to ensure consistent view within a session.
| use super::*; | ||
| use calimero_storage::env::with_runtime_env; | ||
| use calimero_storage::index::Index; | ||
| use calimero_storage::store::MainStorage; |
There was a problem hiding this comment.
💡 Test only verifies non-panic, not callback behavior
The test test_create_runtime_env_with_inmemory only checks that create_runtime_env doesn't panic; it doesn't verify that write/read callbacks actually persist and retrieve data correctly.
Suggested fix:
Add a test that writes data via the callbacks and reads it back to verify round-trip correctness.
| /// | ||
| /// Returns error if encryption fails. | ||
| pub fn encrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> { | ||
| match &self.key_nonce { |
There was a problem hiding this comment.
💡 Generic error messages lack context
The encrypt and decrypt methods return only 'encryption failed' / 'decryption failed' without context about what operation or data size was involved.
Suggested fix:
Consider including the data length or a hint in the error message, e.g., `eyre::eyre!("encryption failed for {} bytes", data.len())`.
| /// This enables the same protocol code to run in both production and simulation. | ||
| /// | ||
| /// Note: Uses `?Send` because `RuntimeEnv` (used for storage access) contains `Rc` | ||
| /// which is not `Send`. Callers must not spawn these futures across threads. |
There was a problem hiding this comment.
📝 Nit: Associated type bounds may be overly restrictive
Both Config: Send and Stats: Send + Default require Send, but the trait itself is ?Send - this asymmetry may cause confusion or limit flexibility for non-Send configs.
Suggested fix:
Consider whether `Send` bounds are truly necessary given the `?Send` trait bound, or document why they're required despite the trait being `?Send`.
| let response = self.recv(stream, None).await?; | ||
| // Expect Init messages with TreeNodeRequest | ||
| let StreamMessage::Init { payload, .. } = request else { | ||
| debug!(%context_id, "Received non-Init message, ending responder"); |
There was a problem hiding this comment.
📝 Nit: Async function without await
The build_tree_node_response method is marked async but contains no .await points; all operations (with_runtime_env, get_local_tree_node_from_index) are synchronous.
Suggested fix:
Remove `async` from the function signature and return `Result<TreeNodeResponse>` directly, unless async is required for trait consistency.
| /// This is the responder side of HashComparison sync. | ||
| /// Handles the first request (already parsed) and then loops to handle | ||
| /// subsequent requests until the stream closes. | ||
| /// |
There was a problem hiding this comment.
💡 YAGNI: Unused nonce parameter reserved for future use
The _nonce parameter is explicitly unused and documented as 'Reserved for future encrypted sync', which violates YAGNI - add it when actually needed.
Suggested fix:
Remove the `_nonce` parameter and add it back when encryption support is implemented; update callers accordingly.
| /// through the normal storage path which handles CRDT semantics. | ||
| /// This is the responder side of HashComparison sync. | ||
| /// Handles the first request (already parsed) and then loops to handle | ||
| /// subsequent requests until the stream closes. |
There was a problem hiding this comment.
💡 Unused nonce parameter suggests incomplete encryption integration
The _nonce parameter is documented as 'reserved for future encrypted sync' but the transport abstraction already supports encryption; this could lead to confusion about whether messages are actually encrypted.
Suggested fix:
Either use the nonce to configure transport encryption at the start of the session, or remove the parameter and document that encryption is handled at a different layer.
| /// Get current encryption parameters. | ||
| #[must_use] | ||
| pub fn get(&self) -> Option<(SharedKey, Nonce)> { | ||
| self.key_nonce.clone() |
There was a problem hiding this comment.
💡 Key material cloning in get() method
The get() method clones cryptographic key material (SharedKey), which could leave copies in memory that aren't zeroed on drop.
Suggested fix:
Consider returning a reference instead of cloning, or ensure SharedKey implements zeroize-on-drop semantics.
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 100% | Review time: 366.0s
🟡 3 warnings, 💡 3 suggestions. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-5172edc8
| /// # Errors | ||
| /// | ||
| /// Returns error if encryption fails. | ||
| pub fn encrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> { |
There was a problem hiding this comment.
🟡 Encryption uses same nonce for all messages in session
The encrypt() method reuses the same nonce stored in EncryptionState for every call; reusing a nonce with the same key produces deterministic ciphertext and can leak plaintext relationships.
Suggested fix:
Increment or regenerate the nonce after each encryption operation, or document that callers must call `set_encryption()` with a fresh nonce before each message.
| /// initialized nodes. This is enforced by storing the merged values | ||
| /// through the normal storage path which handles CRDT semantics. | ||
| /// This is the responder side of HashComparison sync. | ||
| /// Handles the first request (already parsed) and then loops to handle |
There was a problem hiding this comment.
🟡 Arbitrary identity selection for RuntimeEnv
Using random selection (choose_stream) to pick an owned identity is fragile; if any owned identity works for storage access, deterministically pick the first one to avoid non-deterministic behavior.
Suggested fix:
Replace `choose_stream(identities, &mut rand::thread_rng())` with taking the first identity from the stream, or document why random selection is intentional.
| .await | ||
| .transpose()? | ||
| { | ||
| Some((identity, _)) => identity, |
There was a problem hiding this comment.
💡 Error condition conflates 'no identity' with 'node not found'
When no owned identity exists, the responder sends not_found: true, which the initiator may misinterpret as the requested tree node being absent rather than an authorization failure.
Suggested fix:
Consider returning a distinct error payload (e.g., `SnapshotError::Unauthorized`) so the initiator can distinguish between missing data and missing permissions.
| } | ||
| }) | ||
| }; | ||
|
|
There was a problem hiding this comment.
💡 Silent failure on storage write operations
The write callback returns is_ok() which silently discards the specific error; while not a direct vulnerability, silent write failures during sync could lead to data inconsistency or incomplete state that violates invariant I5.
Suggested fix:
Consider logging write failures or propagating errors rather than returning a boolean.
| /// Statistics about the sync session, or error if sync failed. | ||
| pub(crate) async fn hash_comparison_sync( | ||
| /// * `first_node_id` - Node ID from the first request (already parsed) | ||
| /// * `first_max_depth` - Max depth from the first request |
There was a problem hiding this comment.
🟡 Unused nonce parameter bypasses expected encryption setup
The _nonce parameter is documented as 'reserved for future encrypted sync' but is never used, meaning the transport's encryption state is never configured from this value; if callers expect encryption to be applied based on this nonce, it won't be.
Suggested fix:
Either remove the parameter if encryption is handled elsewhere, or implement `transport.set_encryption()` using the provided nonce if encryption is expected.
| //! └─────────────────────────────────────────────────────────────────┘ | ||
| //! ``` | ||
| //! | ||
| //! # Example |
There was a problem hiding this comment.
💡 Doc example references non-existent type
The example uses HashComparisonProtocol but no struct implementing SyncProtocolExecutor is introduced in this PR, making the example misleading.
Suggested fix:
Either add a placeholder implementation or update the example to show the trait definition pattern without referencing unimplemented types.
| full_hash, | ||
| children_ids, | ||
| ))) | ||
| } |
There was a problem hiding this comment.
Duplicated get_local_tree_node function across two files
Medium Severity
The new standalone get_local_tree_node in hash_comparison_protocol.rs is a near-exact copy of SyncManager::get_local_tree_node_from_index in hash_comparison.rs. Both perform the same Index lookup, hash retrieval, children extraction, and leaf/internal node classification. The only difference is that the latter takes &self (which it doesn't use). A bug fix applied to one copy risks being missed in the other, leading to divergent behavior between production and simulation paths.
Additional Locations (1)
|
|
||
| info!(%context_id, requests_handled, "HashComparison responder complete"); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Production and simulation responders use different code paths
Medium Severity
The PR's goal is "same code, different backends," but production uses SyncManager::handle_tree_node_request while simulation calls HashComparisonProtocol::run_responder — two independent implementations. They differ in how is_root_request is determined: production queries context.root_hash from context_client per request, while the standalone responder caches Index::get_hashes_for once at startup. Simulation tests therefore don't exercise the production responder code, undermining the unified-testing architecture.
Additional Locations (1)
| const MAX_PENDING_NODES: usize = 10_000; | ||
|
|
||
| /// Maximum depth allowed in TreeNodeRequest. | ||
| pub const MAX_REQUEST_DEPTH: u8 = 16; |
There was a problem hiding this comment.
Duplicated MAX_REQUEST_DEPTH constant in two files
Low Severity
MAX_REQUEST_DEPTH is defined as pub const in both hash_comparison.rs and hash_comparison_protocol.rs. If the DoS protection threshold is changed in one file but not the other, the production responder and simulation responder would enforce different depth limits, causing subtle divergence.
Additional Locations (1)
|
Bugbot Autofix prepared fixes for 3 of the 3 bugs found in the latest run.
Or push these changes by commenting: |



Summary
Adds a trait-based architecture for sync protocols enabling the same protocol code to run in production and simulation.
Phase 1 (previous commits)
SyncTransporttrait abstracting network operations (send/recv/close)StreamTransportfor productionStreamwrapperSimStreamfor in-memory channel-based transportPhase 2 (this PR)
SyncProtocolExecutortrait as common interface for all sync protocolscreate_runtime_envshared helper to bridgecalimero-storagewithcalimero-storeHashComparisonProtocolto standalone module implementing the traitu64for sequence_id (portability)Architecture
Same code, different backends:
StreamTransport+Store<RocksDB>SimStream+Store<InMemoryDB>Changes
protocol_trait.rsSyncProtocolExecutortraitstorage_bridge.rscreate_runtime_env()helperhash_comparison_protocol.rshash_comparison.rswire.rssequence_idchanged fromusizetou64protocol.rs(test)Test plan
cargo fmtcleancargo clippyno errors in changed modulesNote
High Risk
Includes a breaking on-the-wire format change (
sequence_idtype), requiring coordinated upgrades; also refactors core sync protocol execution paths and storage/env bridging, which can affect correctness and interoperability.Overview
Introduces a trait-based sync protocol interface: adds
SyncTransport(with sharedEncryptionState),SyncProtocolExecutor, and a sharedcreate_runtime_env()bridge so protocols can run against either production streams/RocksDB or simulation channels/InMemoryDB.Extracts HashComparison into a standalone
HashComparisonProtocolimplementing the trait, updatesSyncManagerto execute it viaStreamTransport, and trimshash_comparison.rsto responder-only stream handling. Simulation is updated to run the production HashComparison protocol end-to-end via newSimStream, plus new/expanded multi-node and gossip buffering tests.Makes a breaking wire change by switching
StreamMessage::Message.sequence_id(and related tracking/state code) fromusizetou64for cross-platform portability.Written by Cursor Bugbot for commit a7eacd0. This will update automatically on new commits. Configure here.