diff --git a/Cargo.lock b/Cargo.lock index d36a0c9b4..cc5439ecd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2247,6 +2247,7 @@ version = "0.0.0" dependencies = [ "actix", "async-stream", + "async-trait", "base64 0.22.1", "borsh", "bs58 0.5.1", diff --git a/crates/node/primitives/Cargo.toml b/crates/node/primitives/Cargo.toml index 8c295cf6f..77c991e39 100644 --- a/crates/node/primitives/Cargo.toml +++ b/crates/node/primitives/Cargo.toml @@ -11,6 +11,7 @@ publish = true [dependencies] actix.workspace = true async-stream.workspace = true +async-trait.workspace = true borsh.workspace = true camino.workspace = true clap = { workspace = true, features = ["derive"] } diff --git a/crates/node/primitives/src/sync.rs b/crates/node/primitives/src/sync.rs index 2b99f0157..65ff271f7 100644 --- a/crates/node/primitives/src/sync.rs +++ b/crates/node/primitives/src/sync.rs @@ -39,9 +39,12 @@ pub mod handshake; pub mod hash_comparison; pub mod levelwise; pub mod protocol; +pub mod protocol_trait; pub mod snapshot; pub mod state_machine; +pub mod storage_bridge; pub mod subtree; +pub mod transport; pub mod wire; // ============================================================================= @@ -107,3 +110,12 @@ pub use state_machine::{ build_handshake, build_handshake_from_raw, estimate_entity_count, estimate_max_depth, LocalSyncState, }; + +// Transport abstraction (for production streams and simulation) +pub use transport::{EncryptionState, SyncTransport}; + +// Protocol trait (common interface for all sync protocols) +pub use protocol_trait::SyncProtocolExecutor; + +// Storage bridge (RuntimeEnv creation for sync protocols) +pub use storage_bridge::create_runtime_env; diff --git a/crates/node/primitives/src/sync/protocol_trait.rs b/crates/node/primitives/src/sync/protocol_trait.rs new file mode 100644 index 000000000..0a9fc93ed --- /dev/null +++ b/crates/node/primitives/src/sync/protocol_trait.rs @@ -0,0 +1,124 @@ +//! Common trait for sync protocol implementations. +//! +//! This module defines the [`SyncProtocolExecutor`] trait that all sync protocols +//! implement. This enables: +//! +//! - Protocol implementation details contained within each protocol module +//! - Common interface for `SyncManager` to invoke any protocol +//! - Same code path for production and simulation (only `Store` backend differs) +//! +//! # Architecture +//! +//! ```text +//! ┌─────────────────────────────────────────────────────────────────┐ +//! │ SyncProtocolExecutor trait │ +//! │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ +//! │ │ HashComparison │ │ Snapshot │ │ BloomFilter │ │ +//! │ │ Protocol │ │ Protocol │ │ Protocol │ │ +//! │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ +//! │ │ │ │ │ +//! │ └────────────────────┼────────────────────┘ │ +//! │ │ │ +//! │ ┌───────────┴───────────┐ │ +//! │ │ SyncTransport │ │ +//! │ │ (Stream or SimStream) │ │ +//! │ └───────────────────────┘ │ +//! └─────────────────────────────────────────────────────────────────┘ +//! ``` +//! +//! # Example +//! +//! ```ignore +//! use calimero_node_primitives::sync::{SyncProtocolExecutor, HashComparisonProtocol}; +//! +//! // Production +//! let mut transport = StreamTransport::new(&mut stream); +//! let stats = HashComparisonProtocol::run_initiator( +//! &mut transport, +//! &store, +//! context_id, +//! identity, +//! HashComparisonConfig { remote_root_hash }, +//! ).await?; +//! +//! // Simulation (exact same call, different transport/store) +//! let mut transport = SimStream::new(); +//! let stats = HashComparisonProtocol::run_initiator( +//! &mut transport, +//! &store, // Store +//! context_id, +//! identity, +//! HashComparisonConfig { remote_root_hash }, +//! ).await?; +//! ``` + +use async_trait::async_trait; +use calimero_primitives::context::ContextId; +use calimero_primitives::identity::PublicKey; +use calimero_store::Store; +use eyre::Result; + +use super::SyncTransport; + +/// Trait for sync protocol implementations. +/// +/// Each sync protocol (HashComparison, Snapshot, BloomFilter, etc.) implements +/// this trait. The protocol logic is generic over: +/// +/// - `T: SyncTransport` - the transport layer (production streams or simulation channels) +/// - `Store` - the storage backend (RocksDB or InMemoryDB) +/// +/// This enables the same protocol code to run in both production and simulation. +/// +/// Note: Uses `?Send` because `RuntimeEnv` (used for storage access) contains `Rc` +/// which is not `Send`. Callers must not spawn these futures across threads. +#[async_trait(?Send)] +pub trait SyncProtocolExecutor { + /// Protocol-specific configuration for the initiator. + /// + /// For example, HashComparison needs the remote root hash. + type Config: Send; + + /// Protocol-specific statistics/results. + type Stats: Send + Default; + + /// Run the initiator (pulling) side of the protocol. + /// + /// The initiator requests data from the responder and applies it locally. + /// + /// # Arguments + /// + /// * `transport` - The transport for sending/receiving messages + /// * `store` - The local storage (works with both RocksDB and InMemoryDB) + /// * `context_id` - The context being synced + /// * `identity` - Our identity for this context + /// * `config` - Protocol-specific configuration + /// + /// # Returns + /// + /// Protocol-specific statistics on success. + async fn run_initiator( + transport: &mut T, + store: &Store, + context_id: ContextId, + identity: PublicKey, + config: Self::Config, + ) -> Result; + + /// Run the responder side of the protocol. + /// + /// The responder answers requests from the initiator. + /// + /// # Arguments + /// + /// * `transport` - The transport for sending/receiving messages + /// * `store` - The local storage + /// * `context_id` - The context being synced + /// * `identity` - Our identity for this context + async fn run_responder( + transport: &mut T, + store: &Store, + context_id: ContextId, + identity: PublicKey, + ) -> Result<()>; +} diff --git a/crates/node/primitives/src/sync/storage_bridge.rs b/crates/node/primitives/src/sync/storage_bridge.rs new file mode 100644 index 000000000..894859dcd --- /dev/null +++ b/crates/node/primitives/src/sync/storage_bridge.rs @@ -0,0 +1,184 @@ +//! Storage bridge utilities for sync protocols. +//! +//! This module provides helpers to bridge `calimero-storage` APIs (which use +//! the `RuntimeEnv` thread-local) to the underlying `calimero-store` backend. +//! +//! # Why This Exists +//! +//! The `calimero-storage` crate provides high-level storage APIs (`Index`, `Interface`) +//! that operate through a thread-local `RuntimeEnv`. This `RuntimeEnv` contains +//! callbacks that route read/write/remove operations to the actual database. +//! +//! This module provides a single, reusable way to create these callbacks from +//! a `Store`, regardless of the backend (RocksDB or InMemoryDB). +//! +//! # Usage +//! +//! ```ignore +//! use calimero_node_primitives::sync::storage_bridge::create_runtime_env; +//! +//! // Works with any Store backend (RocksDB or InMemoryDB) +//! let runtime_env = create_runtime_env(&store, context_id, identity); +//! +//! // Use with storage APIs +//! with_runtime_env(runtime_env, || { +//! let index = Index::::get_index(entity_id)?; +//! // ... +//! }); +//! ``` + +use std::cell::RefCell; +use std::rc::Rc; + +use calimero_primitives::context::ContextId; +use calimero_primitives::identity::PublicKey; +use calimero_storage::env::RuntimeEnv; +use calimero_storage::store::Key; +use calimero_store::{key, types, Store}; +use tracing::warn; + +/// Create a `RuntimeEnv` that bridges `calimero-storage` to a `Store`. +/// +/// This is the canonical way to set up storage access for sync protocols. +/// The returned `RuntimeEnv` can be used with `with_runtime_env()` to enable +/// `Index` and `Interface` operations. +/// +/// # Arguments +/// +/// * `store` - The underlying store (works with both RocksDB and InMemoryDB) +/// * `context_id` - The context being accessed +/// * `executor_id` - The identity executing operations +/// +/// # Example +/// +/// ```ignore +/// let env = create_runtime_env(&store, context_id, identity); +/// let result = with_runtime_env(env, || { +/// Index::::get_index(entity_id) +/// }); +/// ``` +pub fn create_runtime_env( + store: &Store, + context_id: ContextId, + executor_id: PublicKey, +) -> RuntimeEnv { + let callbacks = create_storage_callbacks(store, context_id); + RuntimeEnv::new( + callbacks.read, + callbacks.write, + callbacks.remove, + *context_id.as_ref(), + *executor_id.as_ref(), + ) +} + +/// Storage callback closures that bridge `calimero-storage` Key API to the Store. +/// +/// These closures translate `calimero-storage::Key` (Index/Entry) to +/// `calimero-store::ContextStateKey` for access to the actual database. +#[expect( + clippy::type_complexity, + reason = "Matches RuntimeEnv callback signatures" +)] +struct StorageCallbacks { + read: Rc Option>>, + write: Rc bool>, + remove: Rc bool>, +} + +/// Create storage callbacks for a context. +/// +/// These bridge the `calimero-storage` Key-based API to the underlying +/// `calimero-store` ContextStateKey-based storage. +#[expect( + clippy::type_complexity, + reason = "Matches RuntimeEnv callback signatures" +)] +fn create_storage_callbacks(store: &Store, context_id: ContextId) -> StorageCallbacks { + let read: Rc Option>> = { + let handle = store.handle(); + let ctx_id = context_id; + Rc::new(move |key: &Key| { + let storage_key = key.to_bytes(); + let state_key = key::ContextState::new(ctx_id, storage_key); + match handle.get(&state_key) { + Ok(Some(state)) => Some(state.value.into_boxed().into_vec()), + Ok(None) => None, + Err(e) => { + warn!( + %ctx_id, + storage_key = %hex::encode(storage_key), + error = ?e, + "Storage read failed" + ); + None + } + } + }) + }; + + let write: Rc bool> = { + let handle_cell: Rc> = Rc::new(RefCell::new(store.handle())); + let ctx_id = context_id; + Rc::new(move |key: Key, value: &[u8]| { + let storage_key = key.to_bytes(); + let state_key = key::ContextState::new(ctx_id, storage_key); + let slice: calimero_store::slice::Slice<'_> = value.to_vec().into(); + let state_value = types::ContextState::from(slice); + handle_cell + .borrow_mut() + .put(&state_key, &state_value) + .is_ok() + }) + }; + + let remove: Rc bool> = { + let handle_cell: Rc> = Rc::new(RefCell::new(store.handle())); + let ctx_id = context_id; + Rc::new(move |key: &Key| { + let storage_key = key.to_bytes(); + let state_key = key::ContextState::new(ctx_id, storage_key); + handle_cell.borrow_mut().delete(&state_key).is_ok() + }) + }; + + StorageCallbacks { + read, + write, + remove, + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use calimero_storage::env::with_runtime_env; + use calimero_storage::index::Index; + use calimero_storage::store::MainStorage; + use calimero_store::db::InMemoryDB; + + #[test] + fn test_create_runtime_env_with_inmemory() { + // Create an in-memory store + let db = InMemoryDB::owned(); + let store = Store::new(Arc::new(db)); + + // Create a context ID and identity + let context_id = ContextId::from([1u8; 32]); + let identity = PublicKey::from([2u8; 32]); + + // Create RuntimeEnv - should not panic + let env = create_runtime_env(&store, context_id, identity); + + // Use it with storage APIs + let result = with_runtime_env(env, || { + // Try to get a non-existent index - should return None, not panic + Index::::get_index(calimero_storage::address::Id::root()) + }); + + // Root index doesn't exist yet, should be Ok(None) + assert!(result.is_ok()); + } +} diff --git a/crates/node/primitives/src/sync/transport.rs b/crates/node/primitives/src/sync/transport.rs new file mode 100644 index 000000000..2a5d2c9f6 --- /dev/null +++ b/crates/node/primitives/src/sync/transport.rs @@ -0,0 +1,174 @@ +//! Transport abstraction for sync protocols. +//! +//! This module provides the [`SyncTransport`] trait that abstracts the underlying +//! network transport, enabling: +//! +//! - Production code to use libp2p streams +//! - Simulation tests to use in-memory channels +//! - Same protocol code to run in both environments +//! +//! # Design Rationale +//! +//! The sync protocol code needs to send and receive [`StreamMessage`] payloads. +//! By abstracting this behind a trait, we can: +//! +//! 1. Test the actual production protocol logic in simulation +//! 2. Verify invariants (I4, I5, I6) with real message flow +//! 3. Inject network faults (latency, loss, reorder) in tests +//! +//! # Example +//! +//! ```ignore +//! async fn hash_comparison_sync( +//! transport: &mut T, +//! context_id: ContextId, +//! // ... +//! ) -> Result { +//! transport.send(&request_msg).await?; +//! let response = transport.recv().await?; +//! // ... +//! } +//! ``` + +use std::time::Duration; + +use async_trait::async_trait; +use calimero_crypto::{Nonce, SharedKey}; +use eyre::Result; + +use super::wire::StreamMessage; + +/// Transport abstraction for sync protocol message exchange. +/// +/// Implementations handle serialization, optional encryption, and the +/// underlying transport mechanism (network streams or in-memory channels). +/// +/// # Encryption +/// +/// Transport implementations may support optional encryption. Use +/// [`set_encryption`](SyncTransport::set_encryption) to configure the +/// shared key and nonce for encrypted communication. +#[async_trait] +pub trait SyncTransport: Send { + /// Send a message to the peer. + /// + /// The implementation handles serialization and optional encryption. + /// + /// # Errors + /// + /// Returns error if serialization, encryption, or send fails. + async fn send(&mut self, message: &StreamMessage<'_>) -> Result<()>; + + /// Receive a message from the peer. + /// + /// The implementation handles deserialization and optional decryption. + /// + /// # Errors + /// + /// Returns error if receive, decryption, or deserialization fails. + /// Returns `Ok(None)` if the stream is closed. + async fn recv(&mut self) -> Result>>; + + /// Receive a message with a timeout. + /// + /// # Errors + /// + /// Returns error if timeout expires or receive fails. + async fn recv_timeout(&mut self, timeout: Duration) -> Result>>; + + /// Set encryption parameters for subsequent send/recv operations. + /// + /// Pass `None` to disable encryption. + fn set_encryption(&mut self, encryption: Option<(SharedKey, Nonce)>); + + /// Get the current encryption parameters. + fn encryption(&self) -> Option<(SharedKey, Nonce)>; + + /// Close the transport. + /// + /// After closing, further send/recv calls will fail. + async fn close(&mut self) -> Result<()>; +} + +// ============================================================================= +// Encryption Helper +// ============================================================================= + +/// Common encryption state that implementations can embed. +#[derive(Debug, Clone, Default)] +pub struct EncryptionState { + /// Current encryption key and nonce. + pub key_nonce: Option<(SharedKey, Nonce)>, +} + +impl EncryptionState { + /// Create new encryption state (no encryption). + #[must_use] + pub fn new() -> Self { + Self { key_nonce: None } + } + + /// Set encryption parameters. + pub fn set(&mut self, encryption: Option<(SharedKey, Nonce)>) { + self.key_nonce = encryption; + } + + /// Get current encryption parameters. + #[must_use] + pub fn get(&self) -> Option<(SharedKey, Nonce)> { + self.key_nonce.clone() + } + + /// Encrypt data if encryption is configured. + /// + /// # Errors + /// + /// Returns error if encryption fails. + pub fn encrypt(&self, data: Vec) -> Result> { + match &self.key_nonce { + Some((key, nonce)) => key + .encrypt(data, *nonce) + .ok_or_else(|| eyre::eyre!("encryption failed")), + None => Ok(data), + } + } + + /// Decrypt data if encryption is configured. + /// + /// # Errors + /// + /// Returns error if decryption fails. + pub fn decrypt(&self, data: Vec) -> Result> { + match &self.key_nonce { + Some((key, nonce)) => key + .decrypt(data, *nonce) + .ok_or_else(|| eyre::eyre!("decryption failed")), + None => Ok(data), + } + } +} + +// ============================================================================= +// Tests +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_encryption_state_default() { + let state = EncryptionState::new(); + assert!(state.get().is_none()); + } + + #[test] + fn test_encryption_state_passthrough() { + let state = EncryptionState::new(); + let data = b"hello world".to_vec(); + let encrypted = state.encrypt(data.clone()).unwrap(); + assert_eq!(encrypted, data); // No encryption = passthrough + let decrypted = state.decrypt(encrypted).unwrap(); + assert_eq!(decrypted, data); + } +} diff --git a/crates/node/primitives/src/sync/wire.rs b/crates/node/primitives/src/sync/wire.rs index 6d43d687a..354de958f 100644 --- a/crates/node/primitives/src/sync/wire.rs +++ b/crates/node/primitives/src/sync/wire.rs @@ -71,7 +71,13 @@ pub enum StreamMessage<'a> { /// Follow-up message in an ongoing sync operation. Message { /// Sequence number for ordering. - sequence_id: usize, + /// + /// # Wire Format Change + /// + /// Changed from `usize` to `u64` for cross-platform portability. + /// This is a breaking wire format change - nodes must be upgraded + /// together to avoid deserialization failures. + sequence_id: u64, /// The message payload. payload: MessagePayload<'a>, /// Nonce for the next message. diff --git a/crates/node/src/handlers/state_delta.rs b/crates/node/src/handlers/state_delta.rs index c0589a1b4..7fb880763 100644 --- a/crates/node/src/handlers/state_delta.rs +++ b/crates/node/src/handlers/state_delta.rs @@ -1104,8 +1104,8 @@ async fn request_key_share_with_peer( .get_identity(context_id, &their_identity)? .ok_or_eyre("expected peer identity to exist")?; - let mut sequence_id_out: usize = 0; - let mut sequence_id_in: usize = 0; + let mut sequence_id_out: u64 = 0; + let mut sequence_id_in: u64 = 0; // Step 4: Challenge/Response authentication // Protocol must match sync/key.rs exactly: diff --git a/crates/node/src/sync/hash_comparison.rs b/crates/node/src/sync/hash_comparison.rs index 2e6020cea..ce24e816d 100644 --- a/crates/node/src/sync/hash_comparison.rs +++ b/crates/node/src/sync/hash_comparison.rs @@ -1,705 +1,65 @@ -//! HashComparison sync protocol handler (CIP §2.3 Rules 3, 7). +//! HashComparison sync protocol responder (CIP §2.3 Rules 3, 7). //! -//! Traverses Merkle tree comparing hashes, transfers differing entities. +//! This module contains the responder side of the HashComparison protocol. +//! The initiator logic is in `hash_comparison_protocol.rs`. //! -//! # Protocol Overview +//! # Responder Flow //! //! ```text -//! Initiator Responder +//! Initiator Responder (this module) //! │ │ //! │ ── TreeNodeRequest (root) ───────────────► │ -//! │ │ Lookup node +//! │ │ handle_tree_node_request //! │ ◄── TreeNodeResponse (children hashes) ─── │ //! │ │ -//! │ Compare with local tree │ -//! │ │ -//! │ For differing children: │ //! │ ── TreeNodeRequest (child) ──────────────► │ //! │ ◄── TreeNodeResponse ─────────────────────│ //! │ │ -//! │ ...recurse until leaves... │ -//! │ │ -//! │ At leaf: CRDT MERGE (Invariant I5) │ +//! │ ...repeat until initiator closes stream... │ //! └────────────────────────────────────────────┘ //! ``` -//! -//! # Critical Invariants -//! -//! - **I5 - No Silent Data Loss**: ALWAYS uses CRDT merge for leaf entities. -//! LWW overwrite is NEVER permitted for initialized nodes. -//! - **I4 - Strategy Equivalence**: Final state must match other sync strategies -//! (Snapshot, BloomFilter, etc.) given identical inputs. -//! -//! # Implementation Notes -//! -//! - Uses the real Merkle tree via `Index` with RuntimeEnv bridge -//! - Uses iterative DFS with explicit stack (avoids stack overflow on deep trees) -//! - Validates all responses (DoS protection via `is_valid()`) -//! - Integrates with delta buffer for concurrent delta handling (I6) - -use std::cell::RefCell; -use std::rc::Rc; use calimero_crypto::Nonce; -use calimero_network_primitives::stream::Stream; use calimero_node_primitives::sync::{ - compare_tree_nodes, InitPayload, LeafMetadata, MessagePayload, StreamMessage, - TreeCompareResult, TreeLeafData, TreeNode, TreeNodeResponse, MAX_NODES_PER_RESPONSE, + create_runtime_env, InitPayload, MessagePayload, StreamMessage, SyncTransport, + TreeNodeResponse, MAX_NODES_PER_RESPONSE, }; use calimero_primitives::context::ContextId; -use calimero_primitives::crdt::CrdtType; -use calimero_primitives::identity::PublicKey; -use calimero_storage::address::Id; use calimero_storage::env::{with_runtime_env, RuntimeEnv}; -use calimero_storage::index::Index; -use calimero_storage::interface::Interface; -use calimero_storage::store::{Key, MainStorage}; -use calimero_store::key::ContextState as ContextStateKey; -use calimero_store::{key, types}; -use eyre::{bail, Result, WrapErr}; -use libp2p::PeerId; +use eyre::Result; use tracing::{debug, info, trace, warn}; +use super::hash_comparison_protocol::{get_local_tree_node, MAX_REQUEST_DEPTH}; use super::manager::SyncManager; -/// Maximum number of pending node requests (DFS stack depth limit). -/// -/// Prevents unbounded memory growth during traversal. -/// If exceeded, the sync session fails rather than silently dropping nodes. -const MAX_PENDING_NODES: usize = 10_000; - -/// Maximum depth allowed in TreeNodeRequest. -/// -/// Prevents malicious peers from requesting expensive deep traversals. -pub const MAX_REQUEST_DEPTH: u8 = 16; - -/// Statistics from a HashComparison sync session. -#[derive(Debug, Default)] -pub struct HashComparisonStats { - /// Number of tree nodes compared. - pub nodes_compared: u64, - /// Number of leaf entities merged via CRDT. - pub entities_merged: u64, - /// Number of nodes skipped (hashes matched). - pub nodes_skipped: u64, - /// Number of requests sent to peer. - pub requests_sent: u64, -} - // ============================================================================= -// Storage Bridge -// ============================================================================= - -/// Storage callback closures that bridge `calimero-storage` Key API to the Store. -/// -/// These closures translate `calimero-storage::Key` (Index/Entry) to -/// `calimero-store::ContextStateKey` for access to the actual RocksDB data. -#[expect( - clippy::type_complexity, - reason = "Matches RuntimeEnv callback signatures" -)] -struct StorageCallbacks { - read: Rc Option>>, - write: Rc bool>, - remove: Rc bool>, -} - -/// Create storage callbacks for a context. -/// -/// These bridge the `calimero-storage` Key-based API to the underlying -/// `calimero-store` ContextStateKey-based storage. -#[expect( - clippy::type_complexity, - reason = "Matches RuntimeEnv callback signatures" -)] -fn create_storage_callbacks( - datastore: &calimero_store::Store, - context_id: ContextId, -) -> StorageCallbacks { - let read: Rc Option>> = { - let handle = datastore.handle(); - let ctx_id = context_id; - Rc::new(move |key: &Key| { - let storage_key = key.to_bytes(); - let state_key = key::ContextState::new(ctx_id, storage_key); - match handle.get(&state_key) { - Ok(Some(state)) => Some(state.value.into_boxed().into_vec()), - Ok(None) => None, - Err(e) => { - warn!( - %ctx_id, - storage_key = %hex::encode(storage_key), - error = ?e, - "Storage read failed during HashComparison" - ); - None - } - } - }) - }; - - let write: Rc bool> = { - let handle_cell: Rc> = Rc::new(RefCell::new(datastore.handle())); - let ctx_id = context_id; - Rc::new(move |key: Key, value: &[u8]| { - let storage_key = key.to_bytes(); - let state_key = key::ContextState::new(ctx_id, storage_key); - let slice: calimero_store::slice::Slice<'_> = value.to_vec().into(); - let state_value = types::ContextState::from(slice); - handle_cell - .borrow_mut() - .put(&state_key, &state_value) - .is_ok() - }) - }; - - let remove: Rc bool> = { - let handle_cell: Rc> = Rc::new(RefCell::new(datastore.handle())); - let ctx_id = context_id; - Rc::new(move |key: &Key| { - let storage_key = key.to_bytes(); - let state_key = key::ContextState::new(ctx_id, storage_key); - handle_cell.borrow_mut().delete(&state_key).is_ok() - }) - }; - - StorageCallbacks { - read, - write, - remove, - } -} - -/// Create a RuntimeEnv for accessing the Merkle tree Index. -fn create_runtime_env( - callbacks: &StorageCallbacks, - context_id: ContextId, - executor_id: PublicKey, -) -> RuntimeEnv { - RuntimeEnv::new( - callbacks.read.clone(), - callbacks.write.clone(), - callbacks.remove.clone(), - *context_id.as_ref(), - *executor_id.as_ref(), - ) -} - -// ============================================================================= -// SyncManager Implementation +// SyncManager Responder Implementation // ============================================================================= impl SyncManager { - /// Execute HashComparison sync protocol as initiator. - /// - /// # Wire Protocol - /// - /// 1. Request root node from peer - /// 2. Compare children with local tree - /// 3. For differing children: recurse (request subtree) - /// 4. At leaves: CRDT merge (Invariant I5) - /// - /// # Invariant I5 - No Silent Data Loss - /// - /// Uses CRDT merge for all entities. NEVER uses last-write-wins for - /// initialized nodes. This is enforced by storing the merged values - /// through the normal storage path which handles CRDT semantics. - /// - /// # Arguments - /// - /// * `context_id` - Context being synchronized - /// * `peer_id` - Peer to sync from - /// * `our_identity` - Our identity for authentication - /// * `stream` - Network stream for communication - /// * `remote_root_hash` - Peer's root hash from handshake - /// - /// # Returns - /// - /// Statistics about the sync session, or error if sync failed. - pub(crate) async fn hash_comparison_sync( - &self, - context_id: ContextId, - peer_id: PeerId, - our_identity: PublicKey, - stream: &mut Stream, - remote_root_hash: [u8; 32], - ) -> Result { - info!(%context_id, %peer_id, "Starting HashComparison sync"); - - let mut stats = HashComparisonStats::default(); - - // Set up storage bridge for accessing the Merkle tree - let datastore = self.context_client.datastore_handle().into_inner(); - let callbacks = create_storage_callbacks(&datastore, context_id); - let runtime_env = create_runtime_env(&callbacks, context_id, our_identity); - - // Stack for DFS traversal (iterative to avoid deep recursion) - // Each entry is (node_id, is_root_request) - // For root: node_id is the root_hash, we look up Id::root() - // For children: node_id is the entity Id bytes - let mut to_compare: Vec<([u8; 32], bool)> = vec![(remote_root_hash, true)]; - - while let Some((node_id, is_root_request)) = to_compare.pop() { - // Limit stack size to prevent memory exhaustion (DoS protection) - // Fail the sync session rather than silently dropping nodes, - // which would cause incomplete sync and break convergence guarantees. - if to_compare.len() > MAX_PENDING_NODES { - bail!( - "HashComparison sync aborted: pending nodes ({}) exceeds limit ({}). \ - Tree may be too large for HashComparison; consider using Snapshot sync.", - to_compare.len(), - MAX_PENDING_NODES - ); - } - - // Request node from peer - let request_msg = StreamMessage::Init { - context_id, - party_id: our_identity, - payload: InitPayload::TreeNodeRequest { - context_id, - node_id, - max_depth: Some(1), // Request immediate children only - }, - next_nonce: super::helpers::generate_nonce(), - }; - - self.send(stream, &request_msg, None).await?; - stats.requests_sent += 1; - - // Receive response - let response = self.recv(stream, None).await?; - - let Some(StreamMessage::Message { payload, .. }) = response else { - bail!("Unexpected response type during HashComparison sync"); - }; - - let (nodes, not_found) = match payload { - MessagePayload::TreeNodeResponse { nodes, not_found } => (nodes, not_found), - MessagePayload::SnapshotError { error } => { - warn!(%context_id, ?error, "Peer returned error during HashComparison"); - bail!("Peer error: {:?}", error); - } - _ => { - bail!("Unexpected payload type during HashComparison sync"); - } - }; - - // Validate response (DoS protection) - if nodes.len() > MAX_NODES_PER_RESPONSE { - warn!( - %context_id, - count = nodes.len(), - max = MAX_NODES_PER_RESPONSE, - "TreeNodeResponse exceeds max nodes, skipping" - ); - continue; - } - - if not_found { - debug!(%context_id, node_id = %hex::encode(node_id), "Node not found on peer"); - continue; - } - - // Process each node in response - for remote_node in nodes { - // Validate node structure - if !remote_node.is_valid() { - warn!(%context_id, "Invalid TreeNode structure, skipping"); - continue; - } - - stats.nodes_compared += 1; - - if remote_node.is_leaf() { - // Leaf node: apply CRDT merge (Invariant I5) - if let Some(ref leaf_data) = remote_node.leaf_data { - trace!( - %context_id, - key = %hex::encode(leaf_data.key), - crdt_type = ?leaf_data.metadata.crdt_type, - "Merging leaf entity via CRDT" - ); - - self.apply_leaf_with_crdt_merge(context_id, leaf_data) - .await - .wrap_err("failed to merge leaf entity")?; - - stats.entities_merged += 1; - } - } else { - // Internal node: look up LOCAL version of THIS node (not the parent!) - // BUG FIX: Previously we compared all nodes against local_node (the parent), - // but when max_depth > 0, the response includes children which must be - // compared against their own local versions. - // - // Determine if this specific node is the root: - // - If remote_node.id matches the originally requested node_id AND is_root_request is true - let is_this_node_root = is_root_request && remote_node.id == node_id; - - let local_version = with_runtime_env(runtime_env.clone(), || { - self.get_local_tree_node_from_index( - context_id, - &remote_node.id, - is_this_node_root, - ) - })?; - - let compare_result = - compare_tree_nodes(local_version.as_ref(), Some(&remote_node)); - - match compare_result { - TreeCompareResult::Equal => { - // Subtree matches, skip - stats.nodes_skipped += 1; - trace!( - %context_id, - node_id = %hex::encode(remote_node.id), - "Subtree hashes match, skipping" - ); - } - TreeCompareResult::LocalMissing => { - // We don't have this node locally - fetch all children - trace!( - %context_id, - node_id = %hex::encode(remote_node.id), - children = remote_node.children.len(), - "Local missing, fetching all children" - ); - // Children are entity IDs, not root requests - for child_id in &remote_node.children { - to_compare.push((*child_id, false)); - } - } - TreeCompareResult::Different { - remote_only_children, - common_children, - local_only_children: _, - } => { - // Queue remote-only and common children for comparison - trace!( - %context_id, - remote_only = remote_only_children.len(), - common = common_children.len(), - "Subtrees differ, queuing children" - ); - for child_id in remote_only_children { - to_compare.push((child_id, false)); - } - for child_id in common_children { - to_compare.push((child_id, false)); - } - // Note: local_only_children are for bidirectional sync (future work) - } - TreeCompareResult::RemoteMissing => { - // We have data that peer doesn't - // For unidirectional pull, nothing to do - // For bidirectional sync, we would push here (future work) - trace!( - %context_id, - node_id = %hex::encode(remote_node.id), - "Remote missing (bidirectional push not implemented)" - ); - } - } - } - } - } - - info!( - %context_id, - %peer_id, - nodes_compared = stats.nodes_compared, - entities_merged = stats.entities_merged, - nodes_skipped = stats.nodes_skipped, - requests_sent = stats.requests_sent, - "HashComparison sync complete" - ); - - Ok(stats) - } - - /// Get local tree node from the real Merkle tree Index. + /// Handle incoming TreeNodeRequest from a peer. /// - /// Must be called within `with_runtime_env` context. + /// This is the responder side of HashComparison sync. + /// Handles the first request (already parsed) and then loops to handle + /// subsequent requests until the stream closes. /// /// # Arguments /// /// * `context_id` - Context being synchronized - /// * `node_id` - Either root_hash (for root request) or entity ID - /// * `is_root_request` - True if this is a request for the root node - fn get_local_tree_node_from_index( - &self, - context_id: ContextId, - node_id: &[u8; 32], - is_root_request: bool, - ) -> Result> { - // Determine the entity ID to look up - let entity_id = if is_root_request { - // For root request, look up Id::root() (which equals context_id) - Id::new(*context_id.as_ref()) - } else { - // For child requests, node_id IS the entity ID - Id::new(*node_id) - }; - - // Get the entity's index from the Merkle tree - let index = match Index::::get_index(entity_id) { - Ok(Some(idx)) => idx, - Ok(None) => return Ok(None), - Err(e) => { - warn!( - %context_id, - entity_id = %entity_id, - error = %e, - "Failed to get index for entity" - ); - return Ok(None); - } - }; - - // Get hashes from the index - let full_hash = index.full_hash(); - - // Get children from the index - let children_ids: Vec<[u8; 32]> = index - .children() - .map(|children| { - children - .iter() - .map(|child| *child.id().as_bytes()) - .collect() - }) - .unwrap_or_default(); - - // Determine if this is a leaf or internal node - if children_ids.is_empty() { - // Leaf node - try to get entity data - if let Some(entry_data) = Interface::::find_by_id_raw(entity_id) { - let metadata = LeafMetadata::new( - // Get CRDT type from index metadata if available - index - .metadata - .crdt_type - .clone() - .unwrap_or(CrdtType::LwwRegister), - index.metadata.updated_at(), - // Collection ID - use parent if available - [0u8; 32], - ); - - let leaf_data = TreeLeafData::new(*entity_id.as_bytes(), entry_data, metadata); - - Ok(Some(TreeNode::leaf( - *entity_id.as_bytes(), - full_hash, - leaf_data, - ))) - } else { - // Index exists but no entry data - treat as internal node with no children - // This can happen for collection containers - Ok(Some(TreeNode::internal( - *entity_id.as_bytes(), - full_hash, - vec![], - ))) - } - } else { - // Internal node with children - Ok(Some(TreeNode::internal( - *entity_id.as_bytes(), - full_hash, - children_ids, - ))) - } - } - - /// Apply leaf data using CRDT merge (Invariant I5). - /// - /// This stores the entity through the normal storage path, which handles - /// CRDT merge semantics based on the entity's crdt_type. - /// - /// # Invariant I5 - /// - /// The storage layer's `put` operation with CRDT metadata will: - /// - For LwwRegister: Use timestamp comparison - /// - For GCounter/PnCounter: Union of contributions - /// - For UnorderedMap/Set: Per-key/element merge - /// - For Rga/Vector: Ordered merge - /// - /// We NEVER use raw overwrite for initialized entities. - /// - /// # Concurrency Note - /// - /// The read-merge-write pattern has a theoretical TOCTOU window, but this is - /// acceptable because: - /// 1. Sync operations are serialized per context (only one sync session active) - /// 2. CRDT merge is commutative - even if concurrent updates occur, the final - /// state will converge correctly - /// 3. Deltas during sync are buffered (Invariant I6) and replayed after - async fn apply_leaf_with_crdt_merge( - &self, - context_id: ContextId, - leaf: &TreeLeafData, - ) -> Result<()> { - // Get the datastore handle - let handle = self.context_client.datastore_handle(); - - // Create the storage key for this entity - let key = ContextStateKey::new(context_id, leaf.key); - - // Get existing value to perform merge (copy to owned value to release borrow) - let existing_bytes: Option> = handle.get(&key)?.map(|v| v.as_ref().to_vec()); - drop(handle); // Explicitly release the immutable borrow - - // Merge the values using CRDT semantics - // The merge strategy depends on the crdt_type in metadata - let merged_value = if let Some(existing_value) = existing_bytes { - // CRDT merge: combine existing and incoming values - // The storage layer handles this via merge_root_state for root entities, - // or via the entity's inherent merge for collection entries - self.merge_entity_values( - leaf.key, - existing_value, - &leaf.value, - leaf.metadata.hlc_timestamp, - &leaf.metadata, - )? - } else { - // No existing value - just use incoming (this is the only case where - // we can "overwrite" because there's nothing to overwrite) - leaf.value.clone() - }; - - // Store the merged value (get a new mutable handle) - let mut handle = self.context_client.datastore_handle(); - let slice: calimero_store::slice::Slice<'_> = merged_value.into(); - handle.put(&key, &calimero_store::types::ContextState::from(slice))?; - - Ok(()) - } - - /// Merge two entity values using CRDT semantics. - /// - /// For HashComparison, we receive raw entity bytes with metadata. - /// The merge strategy depends on the CRDT type. - fn merge_entity_values( - &self, - entity_key: [u8; 32], - existing: Vec, - incoming: &[u8], - incoming_timestamp: u64, - metadata: &LeafMetadata, - ) -> Result> { - use calimero_storage::merge::merge_root_state; - - match metadata.crdt_type { - CrdtType::LwwRegister => { - // Last-write-wins based on HLC timestamp - // This is still "merge" semantics - the newer value wins - // (Not raw overwrite - we're comparing timestamps) - - // Extract existing timestamp from stored metadata via Index - // The storage layer stores metadata separately from raw values - let existing_ts = Index::::get_index(Id::from(entity_key)) - .ok() - .flatten() - .map(|index| *index.metadata.updated_at) - .unwrap_or(0); - - match merge_root_state(&existing, incoming, existing_ts, incoming_timestamp) { - Ok(merged) => Ok(merged), - Err(_) => { - // Fallback: if merge fails, use LWW semantics manually - if incoming_timestamp > existing_ts { - Ok(incoming.to_vec()) - } else { - Ok(existing) - } - } - } - } - CrdtType::GCounter | CrdtType::PnCounter => { - // Counter merge: MUST succeed to preserve I5 (no silent data loss) - // Counters cannot fall back to incoming without losing contributions - merge_root_state(&existing, incoming, 0, incoming_timestamp).map_err(|e| { - eyre::eyre!( - "Counter merge failed for {:?}: {}. Cannot fall back without violating I5.", - metadata.crdt_type, - e - ) - }) - } - CrdtType::UnorderedMap | CrdtType::UnorderedSet | CrdtType::Rga | CrdtType::Vector => { - // Collection merge: MUST succeed to preserve I5 - merge_root_state(&existing, incoming, 0, incoming_timestamp).map_err(|e| { - eyre::eyre!( - "Collection merge failed for {:?}: {}. Cannot fall back without violating I5.", - metadata.crdt_type, - e - ) - }) - } - CrdtType::UserStorage | CrdtType::FrozenStorage => { - // Specialized storage types - use registered merge - // These are typically single-writer, so LWW fallback is acceptable - match merge_root_state(&existing, incoming, 0, incoming_timestamp) { - Ok(merged) => Ok(merged), - Err(_) => { - // Single-writer storage: LWW is safe - if incoming_timestamp > 0 { - Ok(incoming.to_vec()) - } else { - Ok(existing) - } - } - } - } - CrdtType::Custom(_) => { - // Custom CRDT - must have registered merge function - // Failing to merge a custom CRDT is a serious issue - return error - merge_root_state(&existing, incoming, 0, incoming_timestamp).map_err(|e| { - eyre::eyre!( - "Custom CRDT merge failed for {:?}: {}. Custom CRDTs must have registered merge functions.", - metadata.crdt_type, - e - ) - }) - } - } - } - - /// Handle incoming TreeNodeRequest from a peer. - /// - /// This is the responder side of HashComparison sync. - /// Look up the requested node in our Merkle tree and return it. - pub async fn handle_tree_node_request( + /// * `first_node_id` - Node ID from the first request (already parsed) + /// * `first_max_depth` - Max depth from the first request + /// * `transport` - Transport for sending/receiving messages + /// * `_nonce` - Reserved for future encrypted sync (currently unused as each + /// response generates its own nonce via `generate_nonce()`) + pub async fn handle_tree_node_request( &self, context_id: ContextId, - node_id: [u8; 32], - max_depth: Option, - stream: &mut Stream, + first_node_id: [u8; 32], + first_max_depth: Option, + transport: &mut T, _nonce: Nonce, ) -> Result<()> { - debug!( - %context_id, - node_id = %hex::encode(node_id), - ?max_depth, - "Handling TreeNodeRequest" - ); - - // Validate max_depth to prevent expensive deep traversals (DoS protection) - if let Some(depth) = max_depth { - if depth > MAX_REQUEST_DEPTH { - warn!( - %context_id, - requested_depth = depth, - max_allowed = MAX_REQUEST_DEPTH, - "TreeNodeRequest depth exceeds limit, clamping" - ); - } - } - let clamped_depth = max_depth.map(|d| d.min(MAX_REQUEST_DEPTH)); + info!(%context_id, "Starting HashComparison responder"); // Get our identity for RuntimeEnv - look up from context members let identities = self @@ -723,46 +83,103 @@ impl SyncManager { }, next_nonce: super::helpers::generate_nonce(), }; - super::stream::send(stream, &msg, None).await?; + transport.send(&msg).await?; return Ok(()); } }; - // Build response with requested node(s) - let response = self - .build_tree_node_response(context_id, &node_id, clamped_depth, our_identity) - .await?; - - // Send response let mut sqx = super::tracking::Sequencer::default(); - let msg = StreamMessage::Message { - sequence_id: sqx.next(), - payload: MessagePayload::TreeNodeResponse { - nodes: response.nodes, - not_found: response.not_found, - }, - next_nonce: super::helpers::generate_nonce(), - }; + let mut requests_handled = 0u64; + + // Create RuntimeEnv once for all requests (optimization: avoids per-request allocation) + let datastore = self.context_client.datastore_handle().into_inner(); + let runtime_env = create_runtime_env(&datastore, context_id, our_identity); + + // Handle the first request (already parsed by handle_sync_request) + { + let clamped_depth = first_max_depth.map(|d| d.min(MAX_REQUEST_DEPTH)); + let response = self + .build_tree_node_response(context_id, &first_node_id, clamped_depth, &runtime_env) + .await?; + + let msg = StreamMessage::Message { + sequence_id: sqx.next(), + payload: MessagePayload::TreeNodeResponse { + nodes: response.nodes, + not_found: response.not_found, + }, + next_nonce: super::helpers::generate_nonce(), + }; + transport.send(&msg).await?; + requests_handled += 1; + } + + // Loop to handle subsequent requests until stream closes + loop { + let Some(request) = transport.recv().await? else { + debug!(%context_id, requests_handled, "Stream closed, responder done"); + break; + }; + + // Expect Init messages with TreeNodeRequest + let StreamMessage::Init { payload, .. } = request else { + debug!(%context_id, "Received non-Init message, ending responder"); + break; + }; - super::stream::send(stream, &msg, None).await?; + let InitPayload::TreeNodeRequest { + node_id, max_depth, .. + } = payload + else { + debug!(%context_id, "Received non-TreeNodeRequest, ending responder"); + break; + }; + + trace!( + %context_id, + node_id = %hex::encode(node_id), + ?max_depth, + "Handling subsequent TreeNodeRequest" + ); + + let clamped_depth = max_depth.map(|d| d.min(MAX_REQUEST_DEPTH)); + let response = self + .build_tree_node_response(context_id, &node_id, clamped_depth, &runtime_env) + .await?; + + let msg = StreamMessage::Message { + sequence_id: sqx.next(), + payload: MessagePayload::TreeNodeResponse { + nodes: response.nodes, + not_found: response.not_found, + }, + next_nonce: super::helpers::generate_nonce(), + }; + transport.send(&msg).await?; + requests_handled += 1; + } + + info!(%context_id, requests_handled, "HashComparison responder complete"); Ok(()) } /// Build TreeNodeResponse for a requested node. /// /// Uses the real Merkle tree Index via RuntimeEnv bridge. + /// + /// # Arguments + /// + /// * `context_id` - Context being synchronized + /// * `node_id` - ID of the node to retrieve + /// * `max_depth` - Maximum depth to traverse (clamped externally) + /// * `runtime_env` - Pre-created RuntimeEnv (shared across requests for efficiency) async fn build_tree_node_response( &self, context_id: ContextId, node_id: &[u8; 32], max_depth: Option, - our_identity: PublicKey, + runtime_env: &RuntimeEnv, ) -> Result { - // Set up storage bridge - let datastore = self.context_client.datastore_handle().into_inner(); - let callbacks = create_storage_callbacks(&datastore, context_id); - let runtime_env = create_runtime_env(&callbacks, context_id, our_identity); - // Get context to check if this is a root request let context = self.context_client.get_context(&context_id)?; let Some(context) = context else { @@ -776,9 +193,9 @@ impl SyncManager { // Determine if this is a root request (node_id matches root_hash) let is_root_request = node_id == context.root_hash.as_ref(); - // Get the local node + // Get the local node using shared function from hash_comparison_protocol let local_node = with_runtime_env(runtime_env.clone(), || { - self.get_local_tree_node_from_index(context_id, node_id, is_root_request) + get_local_tree_node(context_id, node_id, is_root_request) })?; let Some(node) = local_node else { @@ -795,10 +212,10 @@ impl SyncManager { // If max_depth > 0 and this is an internal node, include children let depth = max_depth.unwrap_or(0); if depth > 0 && node.is_internal() { - // Include child nodes + // Include child nodes using shared function from hash_comparison_protocol for child_id in &node.children { let child_node = with_runtime_env(runtime_env.clone(), || { - self.get_local_tree_node_from_index(context_id, child_id, false) + get_local_tree_node(context_id, child_id, false) })?; if let Some(child) = child_node { @@ -822,17 +239,3 @@ impl SyncManager { Ok(TreeNodeResponse::new(nodes)) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_hash_comparison_stats_default() { - let stats = HashComparisonStats::default(); - assert_eq!(stats.nodes_compared, 0); - assert_eq!(stats.entities_merged, 0); - assert_eq!(stats.nodes_skipped, 0); - assert_eq!(stats.requests_sent, 0); - } -} diff --git a/crates/node/src/sync/hash_comparison_protocol.rs b/crates/node/src/sync/hash_comparison_protocol.rs new file mode 100644 index 000000000..1fcb84fe6 --- /dev/null +++ b/crates/node/src/sync/hash_comparison_protocol.rs @@ -0,0 +1,568 @@ +//! Standalone HashComparison protocol implementation. +//! +//! This module contains the protocol logic extracted from `SyncManager` methods +//! into standalone functions that work with any `Store` backend. +//! +//! # Design +//! +//! The protocol is implemented as `HashComparisonProtocol` which implements +//! `SyncProtocolExecutor`. This allows the same code to run in: +//! - Production (with `Store` and `StreamTransport`) +//! - Simulation (with `Store` and `SimStream`) +//! +//! # Usage +//! +//! ```ignore +//! use calimero_node::sync::hash_comparison_protocol::HashComparisonProtocol; +//! use calimero_node_primitives::sync::SyncProtocolExecutor; +//! +//! // Initiator side +//! let stats = HashComparisonProtocol::run_initiator( +//! &mut transport, +//! &store, +//! context_id, +//! identity, +//! HashComparisonConfig { remote_root_hash }, +//! ).await?; +//! +//! // Responder side (handles one request) +//! HashComparisonProtocol::run_responder( +//! &mut transport, +//! &store, +//! context_id, +//! identity, +//! ).await?; +//! ``` + +use crate::sync::helpers::generate_nonce; +use async_trait::async_trait; +use calimero_node_primitives::sync::{ + compare_tree_nodes, create_runtime_env, InitPayload, LeafMetadata, MessagePayload, + StreamMessage, SyncProtocolExecutor, SyncTransport, TreeCompareResult, TreeLeafData, TreeNode, + TreeNodeResponse, MAX_NODES_PER_RESPONSE, +}; +use calimero_primitives::context::ContextId; +use calimero_primitives::crdt::CrdtType; +use calimero_primitives::identity::PublicKey; +use calimero_storage::address::Id; +use calimero_storage::env::with_runtime_env; +use calimero_storage::index::Index; +use calimero_storage::interface::Interface; +use calimero_storage::store::MainStorage; +use calimero_store::Store; +use eyre::{bail, Result}; +use tracing::{debug, info, trace, warn}; + +/// Maximum number of pending node requests (DFS stack depth limit). +const MAX_PENDING_NODES: usize = 10_000; + +/// Maximum depth allowed in TreeNodeRequest. +pub const MAX_REQUEST_DEPTH: u8 = 16; + +/// Configuration for HashComparison initiator. +#[derive(Debug, Clone)] +pub struct HashComparisonConfig { + /// Remote peer's root hash (from handshake). + pub remote_root_hash: [u8; 32], +} + +/// Statistics from a HashComparison sync session. +#[derive(Debug, Default, Clone)] +pub struct HashComparisonStats { + /// Number of tree nodes compared. + pub nodes_compared: u64, + /// Number of leaf entities merged via CRDT. + pub entities_merged: u64, + /// Number of nodes skipped (hashes matched). + pub nodes_skipped: u64, + /// Number of requests sent to peer. + pub requests_sent: u64, +} + +/// HashComparison sync protocol. +/// +/// Implements the Merkle tree traversal protocol (CIP §2.3). +pub struct HashComparisonProtocol; + +#[async_trait(?Send)] +impl SyncProtocolExecutor for HashComparisonProtocol { + type Config = HashComparisonConfig; + type Stats = HashComparisonStats; + + async fn run_initiator( + transport: &mut T, + store: &Store, + context_id: ContextId, + identity: PublicKey, + config: Self::Config, + ) -> Result { + run_initiator_impl( + transport, + store, + context_id, + identity, + config.remote_root_hash, + ) + .await + } + + async fn run_responder( + transport: &mut T, + store: &Store, + context_id: ContextId, + identity: PublicKey, + ) -> Result<()> { + run_responder_impl(transport, store, context_id, identity).await + } +} + +// ============================================================================= +// Initiator Implementation +// ============================================================================= + +async fn run_initiator_impl( + transport: &mut T, + store: &Store, + context_id: ContextId, + identity: PublicKey, + remote_root_hash: [u8; 32], +) -> Result { + info!(%context_id, "Starting HashComparison sync (initiator)"); + + let mut stats = HashComparisonStats::default(); + + // Set up storage bridge + let runtime_env = create_runtime_env(store, context_id, identity); + + // Stack for DFS traversal + let mut to_compare: Vec<([u8; 32], bool)> = vec![(remote_root_hash, true)]; + + while let Some((node_id, is_root_request)) = to_compare.pop() { + // DoS protection: limit stack size + if to_compare.len() > MAX_PENDING_NODES { + bail!( + "HashComparison sync aborted: pending nodes ({}) exceeds limit ({})", + to_compare.len(), + MAX_PENDING_NODES + ); + } + + // Request node from peer + let request_msg = StreamMessage::Init { + context_id, + party_id: identity, + payload: InitPayload::TreeNodeRequest { + context_id, + node_id, + max_depth: Some(1), + }, + next_nonce: generate_nonce(), + }; + + transport.send(&request_msg).await?; + stats.requests_sent += 1; + + // Receive response + let response = transport + .recv() + .await? + .ok_or_else(|| eyre::eyre!("stream closed unexpectedly"))?; + + let StreamMessage::Message { payload, .. } = response else { + bail!("Unexpected response type during HashComparison sync"); + }; + + let (nodes, not_found) = match payload { + MessagePayload::TreeNodeResponse { nodes, not_found } => (nodes, not_found), + MessagePayload::SnapshotError { error } => { + warn!(%context_id, ?error, "Peer returned error"); + bail!("Peer error: {:?}", error); + } + _ => bail!("Unexpected payload type"), + }; + + // DoS protection: validate response size + if nodes.len() > MAX_NODES_PER_RESPONSE { + warn!(%context_id, count = nodes.len(), "Response too large, skipping"); + continue; + } + + if not_found { + debug!(%context_id, node_id = %hex::encode(node_id), "Node not found on peer"); + continue; + } + + // Process each node + for remote_node in nodes { + if !remote_node.is_valid() { + warn!(%context_id, "Invalid TreeNode, skipping"); + continue; + } + + stats.nodes_compared += 1; + + if remote_node.is_leaf() { + // Leaf: apply CRDT merge (Invariant I5) + if let Some(ref leaf_data) = remote_node.leaf_data { + trace!( + %context_id, + key = %hex::encode(leaf_data.key), + "Merging leaf entity" + ); + + with_runtime_env(runtime_env.clone(), || { + apply_leaf_with_crdt_merge(context_id, leaf_data) + })?; + stats.entities_merged += 1; + } + } else { + // Internal node: compare with local version + let is_this_node_root = is_root_request && remote_node.id == node_id; + + let local_version = with_runtime_env(runtime_env.clone(), || { + get_local_tree_node(context_id, &remote_node.id, is_this_node_root) + })?; + + match compare_tree_nodes(local_version.as_ref(), Some(&remote_node)) { + TreeCompareResult::Equal => { + stats.nodes_skipped += 1; + trace!(%context_id, "Subtree matches, skipping"); + } + TreeCompareResult::LocalMissing => { + for child_id in &remote_node.children { + to_compare.push((*child_id, false)); + } + } + TreeCompareResult::Different { + remote_only_children, + common_children, + .. + } => { + for child_id in remote_only_children { + to_compare.push((child_id, false)); + } + for child_id in common_children { + to_compare.push((child_id, false)); + } + } + TreeCompareResult::RemoteMissing => { + // Bidirectional sync: future work + } + } + } + } + } + + // Close the transport to signal completion to the responder + transport.close().await?; + + info!( + %context_id, + nodes_compared = stats.nodes_compared, + entities_merged = stats.entities_merged, + nodes_skipped = stats.nodes_skipped, + "HashComparison sync complete" + ); + + Ok(stats) +} + +// ============================================================================= +// Responder Implementation +// ============================================================================= + +async fn run_responder_impl( + transport: &mut T, + store: &Store, + context_id: ContextId, + identity: PublicKey, +) -> Result<()> { + info!(%context_id, "Starting HashComparison sync (responder)"); + + // Set up storage bridge (reused across all requests) + let runtime_env = create_runtime_env(store, context_id, identity); + + // Get our root hash to determine root requests + let local_root_hash = with_runtime_env(runtime_env.clone(), || { + Index::::get_hashes_for(Id::new(*context_id.as_ref())) + .ok() + .flatten() + .map(|(full, _)| full) + .unwrap_or([0; 32]) + }); + + let mut sequence_id = 0u64; + let mut requests_handled = 0u64; + + // Handle requests until stream closes + loop { + // Receive request (None means stream closed = sync complete) + let Some(request) = transport.recv().await? else { + debug!(%context_id, requests_handled, "Stream closed, responder done"); + break; + }; + + let StreamMessage::Init { payload, .. } = request else { + // Non-Init message might indicate end of sync or protocol error + debug!(%context_id, "Received non-Init message, ending responder"); + break; + }; + + let InitPayload::TreeNodeRequest { + node_id, max_depth, .. + } = payload + else { + // Different payload type - might be end of sync + debug!(%context_id, "Received non-TreeNodeRequest, ending responder"); + break; + }; + + trace!( + %context_id, + node_id = %hex::encode(node_id), + ?max_depth, + "Handling TreeNodeRequest" + ); + + // Clamp depth for DoS protection + let clamped_depth = max_depth.map(|d| d.min(MAX_REQUEST_DEPTH)); + + let is_root_request = node_id == local_root_hash; + + // Get the requested node + let local_node = with_runtime_env(runtime_env.clone(), || { + get_local_tree_node(context_id, &node_id, is_root_request) + })?; + + let response = if let Some(node) = local_node { + let mut nodes = vec![node.clone()]; + + // Include children if depth > 0 + let depth = clamped_depth.unwrap_or(0); + if depth > 0 && node.is_internal() { + for child_id in &node.children { + if let Some(child) = with_runtime_env(runtime_env.clone(), || { + get_local_tree_node(context_id, child_id, false) + })? { + nodes.push(child); + if nodes.len() >= MAX_NODES_PER_RESPONSE { + break; + } + } + } + } + + TreeNodeResponse::new(nodes) + } else { + TreeNodeResponse::not_found() + }; + + // Send response + let msg = StreamMessage::Message { + sequence_id, + payload: MessagePayload::TreeNodeResponse { + nodes: response.nodes, + not_found: response.not_found, + }, + next_nonce: generate_nonce(), + }; + + transport.send(&msg).await?; + sequence_id += 1; + requests_handled += 1; + } + + info!(%context_id, requests_handled, "HashComparison responder complete"); + Ok(()) +} + +// ============================================================================= +// Helper Functions +// ============================================================================= + +/// Get a tree node from the local Merkle tree Index. +/// +/// This function is shared between the standalone protocol implementation +/// and the SyncManager responder to avoid duplication. +pub(crate) fn get_local_tree_node( + context_id: ContextId, + node_id: &[u8; 32], + is_root_request: bool, +) -> Result> { + let entity_id = if is_root_request { + Id::new(*context_id.as_ref()) + } else { + Id::new(*node_id) + }; + + let index = match Index::::get_index(entity_id) { + Ok(Some(idx)) => idx, + Ok(None) => return Ok(None), + Err(e) => { + warn!(%context_id, %entity_id, error = %e, "Failed to get index"); + return Ok(None); + } + }; + + let full_hash = index.full_hash(); + let children_ids: Vec<[u8; 32]> = index + .children() + .map(|children| children.iter().map(|c| *c.id().as_bytes()).collect()) + .unwrap_or_default(); + + if children_ids.is_empty() { + // Leaf node + if let Some(entry_data) = Interface::::find_by_id_raw(entity_id) { + let metadata = LeafMetadata::new( + index + .metadata + .crdt_type + .clone() + .unwrap_or(CrdtType::LwwRegister), + index.metadata.updated_at(), + [0u8; 32], + ); + let leaf_data = TreeLeafData::new(*entity_id.as_bytes(), entry_data, metadata); + Ok(Some(TreeNode::leaf( + *entity_id.as_bytes(), + full_hash, + leaf_data, + ))) + } else { + Ok(Some(TreeNode::internal( + *entity_id.as_bytes(), + full_hash, + vec![], + ))) + } + } else { + Ok(Some(TreeNode::internal( + *entity_id.as_bytes(), + full_hash, + children_ids, + ))) + } +} + +/// Apply leaf data using CRDT merge (Invariant I5). +/// +/// This function must be called within a `with_runtime_env` scope. +/// Uses `Interface::apply_action` to properly update both the raw storage +/// and the Merkle tree Index. +/// +/// For CRDT types that require merging (counters, collections, custom types), +/// this function explicitly performs the CRDT merge to ensure no data is lost, +/// rather than relying on LWW semantics. +fn apply_leaf_with_crdt_merge(context_id: ContextId, leaf: &TreeLeafData) -> Result<()> { + use calimero_storage::entities::{ChildInfo, Metadata}; + use calimero_storage::interface::Action; + use calimero_storage::merge::merge_root_state; + + let entity_id = Id::new(leaf.key); + let root_id = Id::new(*context_id.as_ref()); + + // Check if entity already exists + let existing_index = Index::::get_index(entity_id).ok().flatten(); + + // Build metadata from leaf info + let mut metadata = Metadata::default(); + metadata.crdt_type = Some(leaf.metadata.crdt_type.clone()); + metadata.updated_at = leaf.metadata.hlc_timestamp.into(); + + // Determine if this CRDT type requires explicit merging (not just LWW) + let crdt_type = &leaf.metadata.crdt_type; + let needs_crdt_merge = + crdt_type.is_counter() || crdt_type.is_collection() || crdt_type.is_custom(); + + // For CRDT types that need merging, we must explicitly merge existing data + // with incoming data. We pass existing_ts=0 to force merge_root_state to + // always perform the merge (Invariant I5: No Silent Data Loss). + let final_data = if needs_crdt_merge { + if let Some(existing_data) = Interface::::find_by_id_raw(entity_id) { + // Perform CRDT merge with existing_ts=0 to force merge + match merge_root_state(&existing_data, &leaf.value, 0, leaf.metadata.hlc_timestamp) { + Ok(merged) => merged, + Err(_) => { + // Fallback to incoming data if merge fails + leaf.value.clone() + } + } + } else { + // No existing data, use incoming + leaf.value.clone() + } + } else { + // LWW types can just use the incoming value + leaf.value.clone() + }; + + let action = if existing_index.is_some() { + // Update existing entity + Action::Update { + id: entity_id, + data: final_data, + ancestors: vec![], // No ancestors needed for update + metadata, + } + } else { + // Add new entity as child of root + // First ensure root exists + if Index::::get_index(root_id) + .ok() + .flatten() + .is_none() + { + let root_action = Action::Update { + id: root_id, + data: vec![], + ancestors: vec![], + metadata: Metadata::default(), + }; + Interface::::apply_action(root_action)?; + } + + // Get root info for ancestor chain + let root_hash = Index::::get_hashes_for(root_id) + .ok() + .flatten() + .map(|(full, _)| full) + .unwrap_or([0; 32]); + let root_metadata = Index::::get_index(root_id) + .ok() + .flatten() + .map(|idx| idx.metadata.clone()) + .unwrap_or_default(); + + let ancestor = ChildInfo::new(root_id, root_hash, root_metadata); + + Action::Add { + id: entity_id, + data: final_data, + ancestors: vec![ancestor], + metadata, + } + }; + + Interface::::apply_action(action)?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_creation() { + let config = HashComparisonConfig { + remote_root_hash: [1u8; 32], + }; + assert_eq!(config.remote_root_hash, [1u8; 32]); + } + + #[test] + fn test_stats_default() { + let stats = HashComparisonStats::default(); + assert_eq!(stats.nodes_compared, 0); + assert_eq!(stats.entities_merged, 0); + } +} diff --git a/crates/node/src/sync/manager.rs b/crates/node/src/sync/manager.rs index e11310884..7508cf399 100644 --- a/crates/node/src/sync/manager.rs +++ b/crates/node/src/sync/manager.rs @@ -35,9 +35,10 @@ use super::tracking::SyncState; use super::tracking::SyncProtocol as TrackingSyncProtocol; // Full SyncProtocol from primitives for protocol selection (7 variants, CIP §2.3) // Uses shared state machine types for consistent behavior with simulation +use super::hash_comparison_protocol::{HashComparisonConfig, HashComparisonProtocol}; use calimero_node_primitives::sync::{ build_handshake_from_raw, estimate_entity_count, estimate_max_depth, select_protocol, - SyncHandshake, SyncProtocol, + SyncHandshake, SyncProtocol, SyncProtocolExecutor, }; /// Network synchronization manager. @@ -1039,15 +1040,23 @@ impl SyncManager { "Starting HashComparison sync" ); - match self - .hash_comparison_sync( - context_id, - chosen_peer, - our_identity, - stream, - root_hash, - ) - .await + // Wrap stream in transport abstraction + let mut transport = super::stream::StreamTransport::new(stream); + + // Get store for protocol execution + let store = self.context_client.datastore_handle().into_inner(); + let config = HashComparisonConfig { + remote_root_hash: root_hash, + }; + + match HashComparisonProtocol::run_initiator( + &mut transport, + &store, + context_id, + our_identity, + config, + ) + .await { Ok(stats) => { info!( @@ -1940,11 +1949,13 @@ impl SyncManager { max_depth, } => { // Handle tree node request from peer (HashComparison sync) + // Wrap stream in transport abstraction + let mut transport = super::stream::StreamTransport::new(stream); self.handle_tree_node_request( requested_context_id, node_id, max_depth, - stream, + &mut transport, nonce, ) .await? diff --git a/crates/node/src/sync/mod.rs b/crates/node/src/sync/mod.rs index df5ab6921..b9f7fe52b 100644 --- a/crates/node/src/sync/mod.rs +++ b/crates/node/src/sync/mod.rs @@ -26,6 +26,7 @@ mod blobs; mod config; mod delta_request; mod hash_comparison; +pub mod hash_comparison_protocol; mod helpers; mod key; mod manager; @@ -34,6 +35,9 @@ pub(crate) mod stream; mod tracking; pub use config::SyncConfig; +pub use hash_comparison_protocol::{ + HashComparisonConfig, HashComparisonProtocol, HashComparisonStats, +}; pub use manager::SyncManager; pub use key::CHALLENGE_DOMAIN; diff --git a/crates/node/src/sync/stream.rs b/crates/node/src/sync/stream.rs index ca9775687..ba9c62095 100644 --- a/crates/node/src/sync/stream.rs +++ b/crates/node/src/sync/stream.rs @@ -6,13 +6,21 @@ //! //! This module extracts the repeated send/recv logic that was duplicated across //! all protocol files. Every protocol needs to send/recv encrypted messages. +//! +//! ## Transport Abstraction +//! +//! [`StreamTransport`] implements [`SyncTransport`] for production libp2p streams, +//! enabling the same protocol code to work with both production and simulation. +use std::time::Duration; + +use async_trait::async_trait; use calimero_crypto::{Nonce, SharedKey}; use calimero_network_primitives::stream::{Message, Stream}; -use calimero_node_primitives::sync::StreamMessage; +use calimero_node_primitives::sync::{EncryptionState, StreamMessage, SyncTransport}; use eyre::{OptionExt, WrapErr}; use futures_util::{SinkExt, TryStreamExt}; -use tokio::time::{timeout, Duration}; +use tokio::time::timeout; /// Sends an encrypted message over a stream. /// @@ -82,3 +90,106 @@ pub async fn recv( Ok(Some(decoded)) } + +// ============================================================================= +// StreamTransport - SyncTransport implementation for libp2p Stream +// ============================================================================= + +/// Default timeout for receive operations. +const DEFAULT_RECV_TIMEOUT: Duration = Duration::from_secs(30); + +/// Transport wrapper for libp2p [`Stream`] implementing [`SyncTransport`]. +/// +/// This enables production sync protocols to be generic over transport, +/// allowing the same code to work with both libp2p streams and simulation. +/// +/// # Example +/// +/// ```ignore +/// let stream = open_stream(peer_id).await?; +/// let mut transport = StreamTransport::new(stream); +/// +/// // Set encryption after key exchange +/// transport.set_encryption(Some((shared_key, nonce))); +/// +/// // Use with protocol +/// hash_comparison_sync(&mut transport, context_id, ...).await?; +/// ``` +pub struct StreamTransport<'a> { + /// The underlying libp2p stream (mutable reference). + stream: &'a mut Stream, + /// Encryption state. + encryption: EncryptionState, + /// Default timeout for recv operations. + recv_timeout: Duration, +} + +impl<'a> StreamTransport<'a> { + /// Create a new transport wrapper around a libp2p stream. + #[must_use] + pub fn new(stream: &'a mut Stream) -> Self { + Self { + stream, + encryption: EncryptionState::new(), + recv_timeout: DEFAULT_RECV_TIMEOUT, + } + } + + /// Create with a custom default receive timeout. + #[must_use] + #[expect(dead_code, reason = "Future API for custom timeouts")] + pub fn with_timeout(stream: &'a mut Stream, timeout: Duration) -> Self { + Self { + stream, + encryption: EncryptionState::new(), + recv_timeout: timeout, + } + } +} + +#[async_trait] +impl SyncTransport for StreamTransport<'_> { + async fn send(&mut self, message: &StreamMessage<'_>) -> eyre::Result<()> { + let encoded = borsh::to_vec(message)?; + let encrypted = self.encryption.encrypt(encoded)?; + self.stream.send(Message::new(encrypted)).await?; + Ok(()) + } + + async fn recv(&mut self) -> eyre::Result>> { + self.recv_timeout(self.recv_timeout).await + } + + async fn recv_timeout( + &mut self, + budget: Duration, + ) -> eyre::Result>> { + let message = timeout(budget, self.stream.try_next()) + .await + .wrap_err("timeout receiving message from peer")? + .wrap_err("error receiving message from peer")?; + + let Some(message) = message else { + return Ok(None); + }; + + let message = message.data.into_owned(); + let decrypted = self.encryption.decrypt(message)?; + let decoded = borsh::from_slice::>(&decrypted)?; + + Ok(Some(decoded)) + } + + fn set_encryption(&mut self, encryption: Option<(SharedKey, Nonce)>) { + self.encryption.set(encryption); + } + + fn encryption(&self) -> Option<(SharedKey, Nonce)> { + self.encryption.get() + } + + async fn close(&mut self) -> eyre::Result<()> { + self.stream.close().await?; + Ok(()) + } +} diff --git a/crates/node/src/sync/tracking.rs b/crates/node/src/sync/tracking.rs index 98e5f5a34..50f343e52 100644 --- a/crates/node/src/sync/tracking.rs +++ b/crates/node/src/sync/tracking.rs @@ -136,12 +136,12 @@ impl Default for SyncState { /// Prevents message reordering attacks and protocol confusion. #[derive(Debug, Default)] pub(crate) struct Sequencer { - current: usize, + current: u64, } impl Sequencer { /// Get next sequence ID and advance counter. - pub(crate) fn next(&mut self) -> usize { + pub(crate) fn next(&mut self) -> u64 { let id = self.current; self.current += 1; id @@ -153,7 +153,7 @@ impl Sequencer { /// /// Returns error if the provided ID doesn't match the expected sequence. /// This indicates out-of-order messages or a protocol violation. - pub(crate) fn expect(&mut self, expected: usize) -> eyre::Result<()> { + pub(crate) fn expect(&mut self, expected: u64) -> eyre::Result<()> { if self.current != expected { bail!("sequence error: expected {}, at {}", expected, self.current); } diff --git a/crates/node/tests/sync_sim/mod.rs b/crates/node/tests/sync_sim/mod.rs index 89398c4c2..c18cd9333 100644 --- a/crates/node/tests/sync_sim/mod.rs +++ b/crates/node/tests/sync_sim/mod.rs @@ -88,10 +88,12 @@ pub mod digest; pub mod metrics; pub mod network; pub mod node; +pub mod protocol; pub mod runtime; pub mod scenarios; pub mod sim_runtime; pub mod storage; +pub mod transport; pub mod types; /// Prelude for convenient imports. @@ -115,10 +117,12 @@ pub mod prelude { FaultConfig, NetworkRouter, PartitionManager, PartitionSpec, SimEvent, }; pub use super::node::{SimNode, SyncState}; + pub use super::protocol::{execute_hash_comparison_sync, SimSyncStats}; pub use super::runtime::{EventQueue, EventSeq, SimClock, SimDuration, SimRng, SimTime}; pub use super::scenarios::{RandomScenario, Scenario}; pub use super::sim_runtime::{SimConfig, SimRuntime, StopCondition}; pub use super::storage::SimStorage; + pub use super::transport::{SimStream, SimStreamSender}; pub use super::types::{DeltaId, EntityId, MessageId, NodeId, StateDigest, TimerId, TimerKind}; // Note: assertion macros (assert_converged!, assert_entity_count!, etc.) are available diff --git a/crates/node/tests/sync_sim/node/state.rs b/crates/node/tests/sync_sim/node/state.rs index 34b78001b..e7f497095 100644 --- a/crates/node/tests/sync_sim/node/state.rs +++ b/crates/node/tests/sync_sim/node/state.rs @@ -164,6 +164,10 @@ pub struct SimNode { } impl SimNode { + /// Default shared context ID for simulation. + /// All nodes in a simulation typically share this context. + pub const DEFAULT_CONTEXT_ID: [u8; 32] = [0xCA; 32]; // "Calimero" + /// Create a new node with default buffer capacity. /// /// Creates unique `ContextId` and `PublicKey` derived from the node ID @@ -172,6 +176,14 @@ impl SimNode { Self::with_buffer_capacity(id, DEFAULT_SIM_BUFFER_CAPACITY) } + /// Create a new node in a shared context. + /// + /// Nodes that sync together should share the same context ID. + /// This is the correct way to create nodes for sync testing. + pub fn new_in_context(id: impl Into, context_id: ContextId) -> Self { + Self::with_context_and_buffer(id, context_id, DEFAULT_SIM_BUFFER_CAPACITY) + } + /// Create a new node with custom buffer capacity. /// /// Useful for testing buffer overflow behavior (Invariant I6). @@ -182,6 +194,19 @@ impl SimNode { // Create deterministic context/executor IDs from node name let context_id = Self::create_context_id(&node_id); + Self::with_context_and_buffer(node_id, context_id, buffer_capacity) + } + + /// Create a new node with specific context and buffer capacity. + /// + /// This is the most flexible constructor, allowing full control over + /// context ID (for shared contexts) and buffer capacity (for I6 testing). + pub fn with_context_and_buffer( + id: impl Into, + context_id: ContextId, + buffer_capacity: usize, + ) -> Self { + let node_id = id.into(); let executor_id = Self::create_executor_id(&node_id); Self { @@ -302,7 +327,10 @@ impl SimNode { /// - Intermediate nodes created by `insert_entity_hierarchical` don't have metadata /// - This matches what production would count as entities pub fn entity_count(&self) -> usize { - self.entity_metadata.len() + // Use actual storage leaf count (source of truth). + // This counts only leaf nodes (actual entities), excluding intermediate nodes. + // This ensures sync results are visible while hierarchical structures are counted correctly. + self.storage.leaf_count() } /// Get root hash (for handshake). @@ -310,6 +338,21 @@ impl SimNode { self.storage.root_hash() } + /// Get context ID. + pub fn context_id(&self) -> ContextId { + self.storage.context_id() + } + + /// Get reference to storage. + pub fn storage(&self) -> &SimStorage { + &self.storage + } + + /// Get mutable reference to storage. + pub fn storage_mut(&mut self) -> &mut SimStorage { + &mut self.storage + } + /// Check if node has any state (real entities). pub fn has_any_state(&self) -> bool { self.has_state || !self.entity_metadata.is_empty() diff --git a/crates/node/tests/sync_sim/protocol.rs b/crates/node/tests/sync_sim/protocol.rs new file mode 100644 index 000000000..6b4aee461 --- /dev/null +++ b/crates/node/tests/sync_sim/protocol.rs @@ -0,0 +1,568 @@ +//! Protocol execution for simulation testing. +//! +//! Runs the **production** sync protocol implementations using simulation +//! infrastructure (`SimStream`, `SimStorage`) for end-to-end testing. +//! +//! # Architecture +//! +//! ```text +//! ┌─────────────────────────────────────────────────────────────────┐ +//! │ execute_hash_comparison_sync │ +//! │ │ +//! │ ┌────────────────────┐ ┌────────────────────┐ │ +//! │ │ Initiator Task │ │ Responder Task │ │ +//! │ │ (alice) │◄───────►│ (bob) │ │ +//! │ │ │ SimStream│ │ │ +//! │ │ Store (InMemory) │ pair │ Store (InMemory) │ │ +//! │ └────────────────────┘ └────────────────────┘ │ +//! └─────────────────────────────────────────────────────────────────┘ +//! ``` +//! +//! # Key Design: Same Code, Different Backends +//! +//! This module calls the **exact same** `HashComparisonProtocol` that runs +//! in production. The only difference is the backends: +//! - Production: `StreamTransport` (network) + `Store` +//! - Simulation: `SimStream` (channels) + `Store` +//! +//! # Invariants Tested +//! +//! - **I4**: Strategy equivalence (same final state as other protocols) +//! - **I5**: No silent data loss (CRDT merge at leaves) +//! - **I6**: Delta buffering during sync + +use calimero_node::sync::{HashComparisonConfig, HashComparisonProtocol, HashComparisonStats}; +use calimero_node_primitives::sync::SyncProtocolExecutor; +use calimero_primitives::identity::PublicKey; +use eyre::{Result, WrapErr}; + +use super::node::SimNode; +use super::transport::SimStream; + +/// Statistics from a simulated HashComparison sync session. +/// +/// This is a thin wrapper around the production `HashComparisonStats` +/// with simulation-specific additions if needed. +#[derive(Debug, Default, Clone)] +pub struct SimSyncStats { + /// Number of tree nodes compared. + pub nodes_compared: u64, + /// Number of leaf entities transferred. + pub entities_transferred: u64, + /// Number of nodes skipped (hashes matched). + pub nodes_skipped: u64, + /// Number of request/response rounds. + pub rounds: u64, +} + +impl From for SimSyncStats { + fn from(stats: HashComparisonStats) -> Self { + Self { + nodes_compared: stats.nodes_compared, + entities_transferred: stats.entities_merged, + nodes_skipped: stats.nodes_skipped, + rounds: stats.requests_sent, + } + } +} + +/// Execute HashComparison sync between two SimNodes. +/// +/// This runs the **production** `HashComparisonProtocol`: +/// 1. Creates bidirectional SimStream +/// 2. Spawns initiator and responder tasks +/// 3. Returns when sync completes +/// +/// # Arguments +/// +/// * `initiator` - Node initiating sync (will pull from responder) +/// * `responder` - Node responding to sync requests +/// +/// # Returns +/// +/// Statistics about the sync session. +/// +/// # Example +/// +/// ```ignore +/// let mut alice = SimNode::new("alice"); +/// let mut bob = SimNode::new("bob"); +/// +/// // Set up state... +/// +/// let stats = execute_hash_comparison_sync(&mut alice, &mut bob).await?; +/// assert_eq!(alice.root_hash(), bob.root_hash()); // Converged! +/// ``` +pub async fn execute_hash_comparison_sync( + initiator: &mut SimNode, + responder: &SimNode, +) -> Result { + let (mut init_stream, mut resp_stream) = SimStream::pair(); + + // Get root hashes for comparison + let init_root = initiator.root_hash(); + let resp_root = responder.root_hash(); + + // If already in sync, no work needed + if init_root == resp_root { + return Ok(SimSyncStats::default()); + } + + // Get stores and context info + let initiator_store = initiator.storage().store(); + let responder_store = responder.storage().store(); + let initiator_context = initiator.context_id(); + let responder_context = responder.context_id(); + + // Dummy identity for simulation + let identity = PublicKey::from([0u8; 32]); + + // Config for initiator + let config = HashComparisonConfig { + remote_root_hash: resp_root, + }; + + // Run both sides concurrently using the PRODUCTION protocol + let initiator_fut = async { + HashComparisonProtocol::run_initiator( + &mut init_stream, + initiator_store, + initiator_context, + identity, + config, + ) + .await + }; + + let responder_fut = async { + HashComparisonProtocol::run_responder( + &mut resp_stream, + responder_store, + responder_context, + identity, + ) + .await + }; + + // Run both sides + let (init_result, resp_result) = tokio::join!(initiator_fut, responder_fut); + + // Check for errors + resp_result.wrap_err("responder failed")?; + let stats = init_result.wrap_err("initiator failed")?; + + Ok(stats.into()) +} + +// ============================================================================= +// Tests +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + use crate::sync_sim::actions::EntityMetadata; + use crate::sync_sim::types::EntityId; + use calimero_primitives::context::ContextId; + + /// Create a shared context ID for testing. + fn shared_context() -> ContextId { + ContextId::from(SimNode::DEFAULT_CONTEXT_ID) + } + + #[tokio::test] + async fn test_sync_empty_to_populated() { + // Both nodes share the same context (as they would in production) + let ctx = shared_context(); + let mut alice = SimNode::new_in_context("alice", ctx); + let mut bob = SimNode::new_in_context("bob", ctx); + + // Bob has entities, Alice is empty + bob.insert_entity_with_metadata( + EntityId::from_u64(1), + b"hello".to_vec(), + EntityMetadata::default(), + ); + bob.insert_entity_with_metadata( + EntityId::from_u64(2), + b"world".to_vec(), + EntityMetadata::default(), + ); + + assert_ne!(alice.root_hash(), bob.root_hash()); + + // Sync + let stats = execute_hash_comparison_sync(&mut alice, &bob) + .await + .expect("sync should succeed"); + + // Verify convergence (Invariant I4) + assert_eq!( + alice.root_hash(), + bob.root_hash(), + "root hashes should match after sync" + ); + assert!( + stats.entities_transferred > 0, + "should have transferred entities" + ); + assert_eq!( + alice.entity_count(), + bob.entity_count(), + "entity counts should match after sync" + ); + } + + #[tokio::test] + async fn test_sync_already_in_sync() { + let ctx = shared_context(); + let mut alice = SimNode::new_in_context("alice", ctx); + let bob = SimNode::new_in_context("bob", ctx); + + // Both empty = already in sync + let stats = execute_hash_comparison_sync(&mut alice, &bob) + .await + .expect("sync should succeed"); + + assert_eq!(stats.rounds, 0, "no rounds needed when already in sync"); + } + + #[tokio::test] + async fn test_sync_partial_overlap() { + let ctx = shared_context(); + let mut alice = SimNode::new_in_context("alice", ctx); + let mut bob = SimNode::new_in_context("bob", ctx); + + // Shared entity + let shared_id = EntityId::from_u64(100); + alice.insert_entity_with_metadata(shared_id, b"shared".to_vec(), EntityMetadata::default()); + bob.insert_entity_with_metadata(shared_id, b"shared".to_vec(), EntityMetadata::default()); + + // Bob-only entity + bob.insert_entity_with_metadata( + EntityId::from_u64(200), + b"bob-only".to_vec(), + EntityMetadata::default(), + ); + + // Sync + let stats = execute_hash_comparison_sync(&mut alice, &bob) + .await + .expect("sync should succeed"); + + // Verify Alice got Bob's entity + assert!(stats.entities_transferred >= 1); + } + + // ========================================================================= + // 3-Node Sync Tests (Issue: reports of 3-node sync failing) + // ========================================================================= + + /// Test 3 nodes where each has unique data, syncing in a chain: A→B→C→A + /// + /// This tests the scenario where: + /// - A has entities 1-3 + /// - B has entities 4-6 + /// - C has entities 7-9 + /// + /// After chain sync, all should have entities 1-9. + #[tokio::test] + async fn test_three_node_chain_sync() { + let ctx = shared_context(); + let mut alice = SimNode::new_in_context("alice", ctx); + let mut bob = SimNode::new_in_context("bob", ctx); + let mut charlie = SimNode::new_in_context("charlie", ctx); + + // Alice has entities 1-3 + for i in 1..=3 { + alice.insert_entity_with_metadata( + EntityId::from_u64(i), + format!("alice-{i}").into_bytes(), + EntityMetadata::default(), + ); + } + + // Bob has entities 4-6 + for i in 4..=6 { + bob.insert_entity_with_metadata( + EntityId::from_u64(i), + format!("bob-{i}").into_bytes(), + EntityMetadata::default(), + ); + } + + // Charlie has entities 7-9 + for i in 7..=9 { + charlie.insert_entity_with_metadata( + EntityId::from_u64(i), + format!("charlie-{i}").into_bytes(), + EntityMetadata::default(), + ); + } + + // Initial state: all different + assert_ne!(alice.root_hash(), bob.root_hash()); + assert_ne!(bob.root_hash(), charlie.root_hash()); + assert_ne!(alice.root_hash(), charlie.root_hash()); + + println!("Initial state:"); + println!( + " Alice: {} entities, hash {:?}", + alice.entity_count(), + &alice.root_hash()[..4] + ); + println!( + " Bob: {} entities, hash {:?}", + bob.entity_count(), + &bob.root_hash()[..4] + ); + println!( + " Charlie: {} entities, hash {:?}", + charlie.entity_count(), + &charlie.root_hash()[..4] + ); + + // Step 1: Alice syncs FROM Bob (Alice pulls Bob's data) + let stats1 = execute_hash_comparison_sync(&mut alice, &bob) + .await + .expect("alice <- bob sync should succeed"); + println!("\nAfter Alice <- Bob:"); + println!( + " Alice: {} entities (transferred {})", + alice.entity_count(), + stats1.entities_transferred + ); + + // Step 2: Alice syncs FROM Charlie (Alice pulls Charlie's data) + let stats2 = execute_hash_comparison_sync(&mut alice, &charlie) + .await + .expect("alice <- charlie sync should succeed"); + println!("\nAfter Alice <- Charlie:"); + println!( + " Alice: {} entities (transferred {})", + alice.entity_count(), + stats2.entities_transferred + ); + + // Now Alice has all 9 entities + assert_eq!(alice.entity_count(), 9, "Alice should have all 9 entities"); + + // Step 3: Bob syncs FROM Alice (Bob pulls Alice's data, which now includes Charlie's) + let stats3 = execute_hash_comparison_sync(&mut bob, &alice) + .await + .expect("bob <- alice sync should succeed"); + println!("\nAfter Bob <- Alice:"); + println!( + " Bob: {} entities (transferred {})", + bob.entity_count(), + stats3.entities_transferred + ); + + // Step 4: Charlie syncs FROM Alice + let stats4 = execute_hash_comparison_sync(&mut charlie, &alice) + .await + .expect("charlie <- alice sync should succeed"); + println!("\nAfter Charlie <- Alice:"); + println!( + " Charlie: {} entities (transferred {})", + charlie.entity_count(), + stats4.entities_transferred + ); + + // Final state: ALL should be converged + println!("\nFinal state:"); + println!( + " Alice: {} entities, hash {:?}", + alice.entity_count(), + &alice.root_hash()[..4] + ); + println!( + " Bob: {} entities, hash {:?}", + bob.entity_count(), + &bob.root_hash()[..4] + ); + println!( + " Charlie: {} entities, hash {:?}", + charlie.entity_count(), + &charlie.root_hash()[..4] + ); + + // Verify convergence (Invariant I4) + assert_eq!( + alice.root_hash(), + bob.root_hash(), + "Alice and Bob should converge" + ); + assert_eq!( + bob.root_hash(), + charlie.root_hash(), + "Bob and Charlie should converge" + ); + assert_eq!(alice.entity_count(), 9); + assert_eq!(bob.entity_count(), 9); + assert_eq!(charlie.entity_count(), 9); + } + + /// Test 3 nodes with mesh sync (all pairs sync bidirectionally) + /// + /// This is a more thorough test: every node syncs with every other node. + #[tokio::test] + async fn test_three_node_mesh_sync() { + let ctx = shared_context(); + let mut alice = SimNode::new_in_context("alice", ctx); + let mut bob = SimNode::new_in_context("bob", ctx); + let mut charlie = SimNode::new_in_context("charlie", ctx); + + // Each node has 5 unique entities + for i in 0..5 { + alice.insert_entity_with_metadata( + EntityId::from_u64(100 + i), + format!("a-{i}").into_bytes(), + EntityMetadata::default(), + ); + bob.insert_entity_with_metadata( + EntityId::from_u64(200 + i), + format!("b-{i}").into_bytes(), + EntityMetadata::default(), + ); + charlie.insert_entity_with_metadata( + EntityId::from_u64(300 + i), + format!("c-{i}").into_bytes(), + EntityMetadata::default(), + ); + } + + // Full mesh sync: each node pulls from both others + // Round 1: Everyone pulls from everyone else + execute_hash_comparison_sync(&mut alice, &bob) + .await + .expect("a<-b"); + execute_hash_comparison_sync(&mut alice, &charlie) + .await + .expect("a<-c"); + execute_hash_comparison_sync(&mut bob, &alice) + .await + .expect("b<-a"); + execute_hash_comparison_sync(&mut bob, &charlie) + .await + .expect("b<-c"); + execute_hash_comparison_sync(&mut charlie, &alice) + .await + .expect("c<-a"); + execute_hash_comparison_sync(&mut charlie, &bob) + .await + .expect("c<-b"); + + // All should have 15 entities and same hash + assert_eq!(alice.entity_count(), 15, "Alice should have 15 entities"); + assert_eq!(bob.entity_count(), 15, "Bob should have 15 entities"); + assert_eq!( + charlie.entity_count(), + 15, + "Charlie should have 15 entities" + ); + + assert_eq!( + alice.root_hash(), + bob.root_hash(), + "Alice and Bob should match" + ); + assert_eq!( + bob.root_hash(), + charlie.root_hash(), + "Bob and Charlie should match" + ); + } + + /// Test 3 nodes where one starts empty (fresh join scenario) + #[tokio::test] + async fn test_three_node_fresh_join() { + let ctx = shared_context(); + let mut alice = SimNode::new_in_context("alice", ctx); + let mut bob = SimNode::new_in_context("bob", ctx); + let mut charlie = SimNode::new_in_context("charlie", ctx); // Fresh, empty + + // Alice and Bob have shared state + for i in 1..=5 { + alice.insert_entity_with_metadata( + EntityId::from_u64(i), + format!("shared-{i}").into_bytes(), + EntityMetadata::default(), + ); + bob.insert_entity_with_metadata( + EntityId::from_u64(i), + format!("shared-{i}").into_bytes(), + EntityMetadata::default(), + ); + } + + // Alice and Bob are synced + assert_eq!(alice.root_hash(), bob.root_hash()); + // Charlie is empty + assert_eq!(charlie.entity_count(), 0); + + // Charlie joins by syncing from Alice + execute_hash_comparison_sync(&mut charlie, &alice) + .await + .expect("charlie <- alice sync should succeed"); + + // Charlie should now match Alice and Bob + assert_eq!(charlie.root_hash(), alice.root_hash()); + assert_eq!(charlie.entity_count(), 5); + } + + /// Test 3 nodes with conflicting updates to same entity (CRDT merge) + #[tokio::test] + async fn test_three_node_crdt_conflict() { + use calimero_primitives::crdt::CrdtType; + + let ctx = shared_context(); + let mut alice = SimNode::new_in_context("alice", ctx); + let mut bob = SimNode::new_in_context("bob", ctx); + let mut charlie = SimNode::new_in_context("charlie", ctx); + + // All three modify the same entity with different timestamps + let conflict_id = EntityId::from_u64(999); + + alice.insert_entity_with_metadata( + conflict_id, + b"alice-version".to_vec(), + EntityMetadata::new(CrdtType::LwwRegister, 100), // oldest + ); + bob.insert_entity_with_metadata( + conflict_id, + b"bob-version".to_vec(), + EntityMetadata::new(CrdtType::LwwRegister, 200), // middle + ); + charlie.insert_entity_with_metadata( + conflict_id, + b"charlie-version".to_vec(), + EntityMetadata::new(CrdtType::LwwRegister, 300), // newest - should win + ); + + // Sync all to Alice (Alice pulls from both) + execute_hash_comparison_sync(&mut alice, &bob) + .await + .expect("a<-b"); + execute_hash_comparison_sync(&mut alice, &charlie) + .await + .expect("a<-c"); + + // Sync others from Alice + execute_hash_comparison_sync(&mut bob, &alice) + .await + .expect("b<-a"); + execute_hash_comparison_sync(&mut charlie, &alice) + .await + .expect("c<-a"); + + // All should converge to same hash (winner is charlie's version with ts=300) + assert_eq!(alice.root_hash(), bob.root_hash(), "A and B should match"); + assert_eq!(bob.root_hash(), charlie.root_hash(), "B and C should match"); + + // All should have exactly 1 entity + assert_eq!(alice.entity_count(), 1); + assert_eq!(bob.entity_count(), 1); + assert_eq!(charlie.entity_count(), 1); + } +} diff --git a/crates/node/tests/sync_sim/scenarios/buffering.rs b/crates/node/tests/sync_sim/scenarios/buffering.rs index b709e5af1..0de77bb30 100644 --- a/crates/node/tests/sync_sim/scenarios/buffering.rs +++ b/crates/node/tests/sync_sim/scenarios/buffering.rs @@ -11,6 +11,7 @@ //! 3. `test_deltas_applied_immediately_when_idle` - Deltas applied immediately when not syncing //! 4. `test_buffered_deltas_cleared_on_crash` - Buffer cleared on node crash //! 5. `test_multiple_deltas_preserved_fifo` - Multiple deltas replayed in FIFO order +//! 6. `test_three_node_gossip_propagation` - Simulates merobox 3-node scenario use crate::sync_sim::actions::{EntityMetadata, StorageOp}; use crate::sync_sim::node::SimNode; @@ -682,4 +683,179 @@ mod tests { "No entities should exist (all deltas were dropped)" ); } + + // ========================================================================= + // Merobox Scenario: 3-Node Gossip Delta Propagation + // ========================================================================= + + /// Test: Simulates the merobox 3-node scenario with gossip delta propagation. + /// + /// This test mimics what should happen in production: + /// 1. 3 nodes start idle (already synced) + /// 2. Node-1 makes a local change (insert entity) + /// 3. Node-1 "broadcasts" the delta via gossip (Node-2 and Node-3 receive it) + /// 4. All nodes should converge to the same state + /// + /// This validates that gossip delta application works correctly. + /// If this passes but merobox fails, the issue is in gossipsub mesh formation, + /// not in the delta application logic. + #[test] + fn test_three_node_gossip_propagation() { + let mut rt = SimRuntime::new(42); + + // Step 1: Create 3 idle nodes with same initial state + let node1 = rt.add_node("node-1"); + let node2 = rt.add_node("node-2"); + let node3 = rt.add_node("node-3"); + + // Give all nodes the same initial entity (simulating already-synced state) + let initial_ops = vec![make_insert_op(1, "initial_state")]; + + // Apply initial state to all nodes via gossip (with different delta IDs since + // in simulation each delivery is tracked separately) + rt.schedule_gossip_delta( + node1.clone(), + delta_id_from_u64(0), + initial_ops.clone(), + SimDuration::from_millis(1), + ); + rt.schedule_gossip_delta( + node2.clone(), + delta_id_from_u64(1), + initial_ops.clone(), + SimDuration::from_millis(2), + ); + rt.schedule_gossip_delta( + node3.clone(), + delta_id_from_u64(2), + initial_ops, + SimDuration::from_millis(3), + ); + // Process all 3 scheduled events + rt.step(); // node1 + rt.step(); // node2 + rt.step(); // node3 + + // Verify all nodes have the same initial state + assert_eq!( + rt.node(&node1).unwrap().entity_count(), + 1, + "node1 should have 1 entity" + ); + assert_eq!( + rt.node(&node2).unwrap().entity_count(), + 1, + "node2 should have 1 entity" + ); + assert_eq!( + rt.node(&node3).unwrap().entity_count(), + 1, + "node3 should have 1 entity" + ); + println!("Initial state: all 3 nodes have 1 entity"); + + // Step 2: Node-1 makes a local change (simulated by receiving its own delta) + // In production, node-1 would execute a mutation and then broadcast + let new_ops = vec![make_insert_op(999, "demo_value")]; // This is like set(demo_key, demo_value) + + // Node-1 applies locally first + rt.schedule_gossip_delta( + node1.clone(), + delta_id_from_u64(10), + new_ops.clone(), + SimDuration::from_millis(10), + ); + rt.step(); + + // Node-1 now has 2 entities, others still have 1 + assert_eq!( + rt.node(&node1).unwrap().entity_count(), + 2, + "Node-1 should have new entity" + ); + assert_eq!( + rt.node(&node2).unwrap().entity_count(), + 1, + "Node-2 not yet updated" + ); + assert_eq!( + rt.node(&node3).unwrap().entity_count(), + 1, + "Node-3 not yet updated" + ); + println!("After Node-1 local change: Node-1 has 2 entities, others have 1"); + + // Step 3: Gossip propagates to Node-2 and Node-3 + // This is what should happen via gossipsub BroadcastMessage::StateDelta + rt.schedule_gossip_delta( + node2.clone(), + delta_id_from_u64(11), + new_ops.clone(), + SimDuration::from_millis(20), + ); + rt.schedule_gossip_delta( + node3.clone(), + delta_id_from_u64(12), + new_ops, + SimDuration::from_millis(20), + ); + rt.step(); // node2 + rt.step(); // node3 + + // Step 4: Verify convergence + assert_eq!( + rt.node(&node1).unwrap().entity_count(), + 2, + "Node-1 should have 2 entities" + ); + assert_eq!( + rt.node(&node2).unwrap().entity_count(), + 2, + "Node-2 should have 2 entities after gossip" + ); + assert_eq!( + rt.node(&node3).unwrap().entity_count(), + 2, + "Node-3 should have 2 entities after gossip" + ); + + println!("SUCCESS: All 3 nodes converged via gossip delta propagation"); + } + + /// Test: Verifies that gossip deltas are idempotent (safe to receive twice). + /// + /// In gossipsub, a node might receive the same delta multiple times + /// (e.g., from different peers, or echo back). This should be handled safely. + #[test] + fn test_gossip_delta_idempotent() { + let mut rt = SimRuntime::new(42); + + let node1 = rt.add_node("node-1"); + + let delta = delta_id_from_u64(1); + let ops = vec![make_insert_op(100, "value")]; + + // Receive the same delta twice (simulating gossip echo or multi-peer delivery) + rt.schedule_gossip_delta( + node1.clone(), + delta, + ops.clone(), + SimDuration::from_millis(10), + ); + rt.step(); + + // First delivery + assert_eq!(rt.node(&node1).unwrap().entity_count(), 1); + + // Second delivery of same delta + rt.schedule_gossip_delta(node1.clone(), delta, ops, SimDuration::from_millis(20)); + rt.step(); + + // Should still be 1 entity (idempotent) + assert_eq!( + rt.node(&node1).unwrap().entity_count(), + 1, + "Delta should be idempotent - duplicate delivery should not create duplicate entity" + ); + } } diff --git a/crates/node/tests/sync_sim/storage.rs b/crates/node/tests/sync_sim/storage.rs index 20de22d7e..421531ef9 100644 --- a/crates/node/tests/sync_sim/storage.rs +++ b/crates/node/tests/sync_sim/storage.rs @@ -103,6 +103,19 @@ impl SimStorage { self.context_id } + /// Get a reference to the underlying Store. + /// + /// This allows using `HashComparisonProtocol` and other protocol + /// implementations that operate on `&Store` directly. + pub fn store(&self) -> &Store { + &self.store + } + + /// Get the executor ID. + pub fn executor_id(&self) -> PublicKey { + self.executor_id + } + /// Get the root entity ID for this storage. /// /// The root ID is derived from the context ID and must be used @@ -228,6 +241,41 @@ impl SimStorage { count } + /// Count only leaf nodes (actual entities, excluding intermediate nodes). + /// + /// This is useful for getting the "real" entity count after sync, + /// where intermediate nodes shouldn't be counted. + pub fn leaf_count(&self) -> usize { + let root_id = self.root_id(); + self.with_index(|| self.count_leaves_recursive(root_id, true)) + } + + /// Recursively count leaf nodes. + fn count_leaves_recursive(&self, id: Id, is_root: bool) -> usize { + let index = match Index::::get_index(id).ok().flatten() { + Some(idx) => idx, + None => return 0, + }; + + let children = index.children(); + let has_children = children.as_ref().map_or(false, |c| !c.is_empty()); + + if has_children { + // Internal node: count children recursively + children + .unwrap() + .iter() + .map(|child| self.count_leaves_recursive(child.id(), false)) + .sum() + } else if is_root { + // Empty root doesn't count + 0 + } else { + // Leaf node + 1 + } + } + /// Check if the tree is empty (no root or root has no data). pub fn is_empty(&self) -> bool { let root_id = self.root_id(); @@ -342,6 +390,54 @@ impl SimStorage { }) } + /// Update entity data by ID. + /// + /// This updates an existing entity's data or creates it if it doesn't exist. + /// For new entities, creates them as direct children of root. + pub fn update_entity_data(&self, id: Id, data: &[u8]) { + self.with_index(|| { + // Check if entity exists by trying to get its index + let exists = Index::::get_index(id).ok().flatten().is_some(); + + if exists { + // Update existing entity - no ancestors needed for update + let action = Action::Update { + id, + data: data.to_vec(), + ancestors: vec![], + metadata: Metadata::default(), + }; + let _ = Interface::::apply_action(action); + } else { + // Create new entity as child of root + // First ensure root exists + let root_id = self.root_id(); + if Index::::get_index(root_id) + .ok() + .flatten() + .is_none() + { + let root_action = Action::Update { + id: root_id, + data: vec![], + ancestors: vec![], + metadata: Metadata::default(), + }; + let _ = Interface::::apply_action(root_action); + } + + // Add new entity under root + let action = Action::Add { + id, + data: data.to_vec(), + ancestors: vec![ChildInfo::new(root_id, [0; 32], Metadata::default())], + metadata: Metadata::default(), + }; + let _ = Interface::::apply_action(action); + } + }); + } + /// Remove an entity by marking it as deleted (creates tombstone). pub fn remove_entity(&self, id: Id) { self.with_index(|| { @@ -597,4 +693,130 @@ mod tests { // Entity index still exists (tombstone) but is marked deleted assert!(storage.is_deleted(id)); } + + #[test] + fn test_update_entity_data_creates_entity() { + let storage = SimStorage::new(test_context_id(), test_executor_id()); + + // Initially empty + assert_eq!(storage.entity_count(), 0, "should start empty"); + + // Update entity (doesn't exist, should create it) + let entity_id = Id::new([42u8; 32]); + storage.update_entity_data(entity_id, b"hello"); + + // Verify entity was created + let count = storage.entity_count(); + eprintln!("entity_count after update_entity_data: {}", count); + + // Should have at least 2: root + the new entity + assert!(count >= 2, "should have root + entity, got {}", count); + + // Verify data is readable + let data = storage.get_entity_data(entity_id); + assert_eq!(data, Some(b"hello".to_vec()), "data should be stored"); + } + + #[test] + fn test_update_entity_data_vs_add_entity() { + // Compare update_entity_data with add_entity + let storage1 = SimStorage::new(test_context_id(), test_executor_id()); + let storage2 = SimStorage::new(test_context_id(), test_executor_id()); + + let entity_id = Id::new([42u8; 32]); + + // Method 1: add_entity (the production-like way) + storage1.add_entity(entity_id, b"hello", Metadata::default()); + + // Method 2: update_entity_data (used by sync) + storage2.update_entity_data(entity_id, b"hello"); + + eprintln!("add_entity count: {}", storage1.entity_count()); + eprintln!("update_entity_data count: {}", storage2.entity_count()); + + // Both should have same structure + assert_eq!( + storage1.entity_count(), + storage2.entity_count(), + "both methods should result in same entity count" + ); + + // Both should be able to read the data + assert_eq!(storage1.get_entity_data(entity_id), Some(b"hello".to_vec())); + assert_eq!(storage2.get_entity_data(entity_id), Some(b"hello".to_vec())); + } + + #[test] + fn test_clone_shares_underlying_db() { + // This tests if cloning SimStorage shares the underlying database + let storage = SimStorage::new(test_context_id(), test_executor_id()); + let cloned = storage.clone(); + + // Initially both empty + assert_eq!(storage.entity_count(), 0); + assert_eq!(cloned.entity_count(), 0); + + // Write via clone + let entity_id = Id::new([42u8; 32]); + cloned.update_entity_data(entity_id, b"hello"); + + // Both should see the data + eprintln!( + "original count after clone write: {}", + storage.entity_count() + ); + eprintln!("cloned count after clone write: {}", cloned.entity_count()); + + assert_eq!( + cloned.entity_count(), + storage.entity_count(), + "clone and original should see same entity count" + ); + + // Original should be able to read the data written via clone + let original_data = storage.get_entity_data(entity_id); + eprintln!("original sees data: {:?}", original_data); + assert_eq!( + original_data, + Some(b"hello".to_vec()), + "original should see data written via clone" + ); + } + + #[tokio::test] + async fn test_clone_shares_db_across_async_tasks() { + // Simulate what the protocol does: clone storage, spawn async task, write, check original + use crate::sync_sim::node::SimNode; + + let ctx = ContextId::from([0xCA; 32]); + let node = SimNode::new_in_context("test", ctx); + + // Clone the storage (like protocol does) + let cloned_storage = node.storage().clone(); + + // Spawn async task that writes to cloned storage + let write_task = async move { + let entity_id = Id::new([42u8; 32]); + cloned_storage.update_entity_data(entity_id, b"async-write"); + cloned_storage.leaf_count() // Count only leaves + }; + + // Run the task + let count_in_task = write_task.await; + + // The data should be visible in original (1 leaf entity) + assert_eq!(count_in_task, 1, "should have 1 leaf entity"); + assert_eq!( + count_in_task, + node.storage().leaf_count(), + "original should see same leaf count as task" + ); + + // Node entity_count uses leaf_count, so should match + assert_eq!( + node.entity_count(), + count_in_task, + "node.entity_count should match leaf count" + ); + } } diff --git a/crates/node/tests/sync_sim/transport.rs b/crates/node/tests/sync_sim/transport.rs new file mode 100644 index 000000000..9de785238 --- /dev/null +++ b/crates/node/tests/sync_sim/transport.rs @@ -0,0 +1,417 @@ +//! Simulated transport for testing sync protocols. +//! +//! Provides [`SimStream`], an in-memory implementation of [`SyncTransport`] +//! that enables running the production sync protocol code in simulation. +//! +//! # Architecture +//! +//! ```text +//! ┌─────────────────┐ ┌─────────────────┐ +//! │ SimNode A │ │ SimNode B │ +//! │ │ │ │ +//! │ SimStream │ │ SimStream │ +//! │ ┌───────────┐ │ │ ┌───────────┐ │ +//! │ │ tx_a ────┼──┼────────────────────┼──┼─► rx_b │ │ +//! │ │ rx_a ◄───┼──┼────────────────────┼──┼── tx_b │ │ +//! │ └───────────┘ │ │ └───────────┘ │ +//! └─────────────────┘ └─────────────────┘ +//! ``` +//! +//! # Usage +//! +//! ```ignore +//! // Create bidirectional channel pair +//! let (stream_a, stream_b) = SimStream::pair(); +//! +//! // Run protocol on both ends +//! let initiator = async { hash_comparison_sync(&mut stream_a, ...).await }; +//! let responder = async { handle_tree_node_request(&mut stream_b, ...).await }; +//! +//! // Execute concurrently +//! tokio::join!(initiator, responder); +//! ``` + +use std::collections::VecDeque; +use std::time::Duration; + +use async_trait::async_trait; +use calimero_crypto::{Nonce, SharedKey}; +use calimero_node_primitives::sync::{EncryptionState, StreamMessage, SyncTransport}; +use eyre::{bail, Result}; +use tokio::sync::mpsc; +use tokio::time::timeout; + +/// Default channel buffer size. +const DEFAULT_BUFFER_SIZE: usize = 64; + +/// Default timeout for receive operations in simulation. +const DEFAULT_SIM_TIMEOUT: Duration = Duration::from_secs(5); + +/// In-memory transport for simulation testing. +/// +/// Implements [`SyncTransport`] using tokio mpsc channels, enabling +/// the production sync protocol code to run in simulation. +/// +/// # Features +/// +/// - Bidirectional communication via channel pairs +/// - Optional message buffering/queueing for testing +/// - Configurable timeouts +/// - Optional encryption (for testing encrypted flows) +pub struct SimStream { + /// Sender channel (outgoing messages). + /// Option so we can drop it to signal closure. + tx: Option>>, + /// Receiver channel (incoming messages). + rx: mpsc::Receiver>, + /// Buffer for received messages (allows peek/reorder testing). + buffer: VecDeque>, + /// Encryption state. + encryption: EncryptionState, + /// Default timeout for receive operations. + recv_timeout: Duration, + /// Whether the stream is closed. + closed: bool, +} + +impl SimStream { + /// Create a bidirectional stream pair for two-party communication. + /// + /// Returns `(stream_a, stream_b)` where: + /// - Messages sent on `stream_a` are received on `stream_b` + /// - Messages sent on `stream_b` are received on `stream_a` + /// + /// # Example + /// + /// ```ignore + /// let (mut alice_stream, mut bob_stream) = SimStream::pair(); + /// + /// // Alice sends, Bob receives + /// alice_stream.send(&msg).await?; + /// let received = bob_stream.recv().await?; + /// ``` + #[must_use] + pub fn pair() -> (Self, Self) { + Self::pair_with_buffer(DEFAULT_BUFFER_SIZE) + } + + /// Create a stream pair with custom buffer size. + /// + /// Larger buffers allow more in-flight messages before backpressure. + #[must_use] + pub fn pair_with_buffer(buffer_size: usize) -> (Self, Self) { + let (tx_a, rx_b) = mpsc::channel(buffer_size); + let (tx_b, rx_a) = mpsc::channel(buffer_size); + + let stream_a = Self { + tx: Some(tx_a), + rx: rx_a, + buffer: VecDeque::new(), + encryption: EncryptionState::new(), + recv_timeout: DEFAULT_SIM_TIMEOUT, + closed: false, + }; + + let stream_b = Self { + tx: Some(tx_b), + rx: rx_b, + buffer: VecDeque::new(), + encryption: EncryptionState::new(), + recv_timeout: DEFAULT_SIM_TIMEOUT, + closed: false, + }; + + (stream_a, stream_b) + } + + /// Create a one-way stream (for testing responder-only scenarios). + /// + /// Returns `(sender, receiver)` where sender can only send and receiver can only receive. + #[must_use] + pub fn one_way() -> (SimStreamSender, Self) { + Self::one_way_with_buffer(DEFAULT_BUFFER_SIZE) + } + + /// Create a one-way stream with custom buffer size. + #[must_use] + pub fn one_way_with_buffer(buffer_size: usize) -> (SimStreamSender, Self) { + let (tx, rx) = mpsc::channel(buffer_size); + + let sender = SimStreamSender { + tx, + encryption: EncryptionState::new(), + }; + + let receiver = Self { + tx: None, // No sender for one-way receiver + rx, + buffer: VecDeque::new(), + encryption: EncryptionState::new(), + recv_timeout: DEFAULT_SIM_TIMEOUT, + closed: false, + }; + + (sender, receiver) + } + + /// Set the default receive timeout. + pub fn set_timeout(&mut self, timeout: Duration) { + self.recv_timeout = timeout; + } + + /// Check if there are buffered messages. + #[must_use] + pub fn has_buffered(&self) -> bool { + !self.buffer.is_empty() + } + + /// Get count of buffered messages. + #[must_use] + pub fn buffered_count(&self) -> usize { + self.buffer.len() + } + + /// Internal: receive raw bytes with timeout. + async fn recv_raw_timeout(&mut self, budget: Duration) -> Result>> { + // First check buffer + if let Some(data) = self.buffer.pop_front() { + return Ok(Some(data)); + } + + if self.closed { + return Ok(None); + } + + // Then try channel with timeout + match timeout(budget, self.rx.recv()).await { + Ok(Some(data)) => Ok(Some(data)), + Ok(None) => { + self.closed = true; + Ok(None) + } + Err(_) => bail!("timeout receiving message"), + } + } +} + +#[async_trait] +impl SyncTransport for SimStream { + async fn send(&mut self, message: &StreamMessage<'_>) -> Result<()> { + if self.closed { + bail!("stream is closed"); + } + + let tx = self + .tx + .as_ref() + .ok_or_else(|| eyre::eyre!("no sender available"))?; + + let encoded = borsh::to_vec(message)?; + let encrypted = self.encryption.encrypt(encoded)?; + + tx.send(encrypted) + .await + .map_err(|_| eyre::eyre!("channel closed"))?; + + Ok(()) + } + + async fn recv(&mut self) -> Result>> { + self.recv_timeout(self.recv_timeout).await + } + + async fn recv_timeout(&mut self, budget: Duration) -> Result>> { + let Some(data) = self.recv_raw_timeout(budget).await? else { + return Ok(None); + }; + + let decrypted = self.encryption.decrypt(data)?; + let decoded = borsh::from_slice::>(&decrypted)?; + + Ok(Some(decoded)) + } + + fn set_encryption(&mut self, encryption: Option<(SharedKey, Nonce)>) { + self.encryption.set(encryption); + } + + fn encryption(&self) -> Option<(SharedKey, Nonce)> { + self.encryption.get() + } + + async fn close(&mut self) -> Result<()> { + self.closed = true; + // Drop the sender to signal closure to the other end + // When all senders are dropped, the receiver will see None + self.tx = None; + Ok(()) + } +} + +impl std::fmt::Debug for SimStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SimStream") + .field("buffered", &self.buffer.len()) + .field("closed", &self.closed) + .field("has_encryption", &self.encryption.get().is_some()) + .finish() + } +} + +// ============================================================================= +// One-way sender (for testing) +// ============================================================================= + +/// One-way sender for testing scenarios. +/// +/// Can only send messages, not receive. +pub struct SimStreamSender { + tx: mpsc::Sender>, + encryption: EncryptionState, +} + +impl SimStreamSender { + /// Send a message. + /// + /// # Errors + /// + /// Returns error if serialization, encryption, or send fails. + pub async fn send(&self, message: &StreamMessage<'_>) -> Result<()> { + let encoded = borsh::to_vec(message)?; + let encrypted = self.encryption.encrypt(encoded)?; + + self.tx + .send(encrypted) + .await + .map_err(|_| eyre::eyre!("channel closed"))?; + + Ok(()) + } + + /// Set encryption parameters. + pub fn set_encryption(&mut self, encryption: Option<(SharedKey, Nonce)>) { + self.encryption.set(encryption); + } +} + +// ============================================================================= +// Tests +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + use calimero_crypto::NONCE_LEN; + use calimero_node_primitives::sync::wire::{InitPayload, MessagePayload}; + use calimero_primitives::context::ContextId; + use calimero_primitives::identity::PublicKey; + + fn test_context_id() -> ContextId { + ContextId::from([1u8; 32]) + } + + fn test_public_key() -> PublicKey { + PublicKey::from([2u8; 32]) + } + + #[tokio::test] + async fn test_pair_send_recv() { + let (mut alice, mut bob) = SimStream::pair(); + + let msg = StreamMessage::Init { + context_id: test_context_id(), + party_id: test_public_key(), + payload: InitPayload::DagHeadsRequest { + context_id: test_context_id(), + }, + next_nonce: [0; NONCE_LEN], + }; + + // Alice sends + alice.send(&msg).await.expect("send should succeed"); + + // Bob receives + let received = bob.recv().await.expect("recv should succeed"); + assert!(received.is_some()); + + // Verify message matches (compare serialized form since StreamMessage doesn't impl Eq) + let original_bytes = borsh::to_vec(&msg).unwrap(); + let received_bytes = borsh::to_vec(&received.unwrap()).unwrap(); + assert_eq!(original_bytes, received_bytes); + } + + #[tokio::test] + async fn test_bidirectional() { + let (mut alice, mut bob) = SimStream::pair(); + + let msg_from_alice = StreamMessage::Init { + context_id: test_context_id(), + party_id: test_public_key(), + payload: InitPayload::DagHeadsRequest { + context_id: test_context_id(), + }, + next_nonce: [1; NONCE_LEN], + }; + + let msg_from_bob = StreamMessage::Message { + sequence_id: 1, + payload: MessagePayload::DagHeadsResponse { + dag_heads: vec![], + root_hash: [0u8; 32].into(), + }, + next_nonce: [2; NONCE_LEN], + }; + + // Send both directions + alice.send(&msg_from_alice).await.unwrap(); + bob.send(&msg_from_bob).await.unwrap(); + + // Receive both directions + let from_alice = bob.recv().await.unwrap().unwrap(); + let from_bob = alice.recv().await.unwrap().unwrap(); + + // Verify + assert_eq!( + borsh::to_vec(&msg_from_alice).unwrap(), + borsh::to_vec(&from_alice).unwrap() + ); + assert_eq!( + borsh::to_vec(&msg_from_bob).unwrap(), + borsh::to_vec(&from_bob).unwrap() + ); + } + + #[tokio::test] + async fn test_timeout() { + let (mut _alice, mut bob) = SimStream::pair(); + + // Bob tries to receive with short timeout, should fail + let result = bob.recv_timeout(Duration::from_millis(10)).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("timeout")); + } + + #[tokio::test] + async fn test_close() { + let (mut alice, mut bob) = SimStream::pair(); + + // Close Alice's stream + alice.close().await.unwrap(); + + // Sending should fail + let msg = StreamMessage::Init { + context_id: test_context_id(), + party_id: test_public_key(), + payload: InitPayload::DagHeadsRequest { + context_id: test_context_id(), + }, + next_nonce: [0; NONCE_LEN], + }; + let result = alice.send(&msg).await; + assert!(result.is_err()); + + // Bob should eventually see closed channel (after timeout) + bob.set_timeout(Duration::from_millis(50)); + // Note: recv returns None when channel is closed after draining + } +}