Skip to content
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/node/primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ publish = true
[dependencies]
actix.workspace = true
async-stream.workspace = true
async-trait.workspace = true
borsh.workspace = true
camino.workspace = true
clap = { workspace = true, features = ["derive"] }
Expand Down
12 changes: 12 additions & 0 deletions crates/node/primitives/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ pub mod handshake;
pub mod hash_comparison;
pub mod levelwise;
pub mod protocol;
pub mod protocol_trait;
pub mod snapshot;
pub mod state_machine;
pub mod storage_bridge;
pub mod subtree;
pub mod transport;
pub mod wire;

// =============================================================================
Expand Down Expand Up @@ -107,3 +110,12 @@ pub use state_machine::{
build_handshake, build_handshake_from_raw, estimate_entity_count, estimate_max_depth,
LocalSyncState,
};

// Transport abstraction (for production streams and simulation)
pub use transport::{EncryptionState, SyncTransport};

// Protocol trait (common interface for all sync protocols)
pub use protocol_trait::SyncProtocolExecutor;

// Storage bridge (RuntimeEnv creation for sync protocols)
pub use storage_bridge::create_runtime_env;
124 changes: 124 additions & 0 deletions crates/node/primitives/src/sync/protocol_trait.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//! Common trait for sync protocol implementations.
//!
//! This module defines the [`SyncProtocolExecutor`] trait that all sync protocols
//! implement. This enables:
//!
//! - Protocol implementation details contained within each protocol module
//! - Common interface for `SyncManager` to invoke any protocol
//! - Same code path for production and simulation (only `Store` backend differs)
//!
//! # Architecture
//!
//! ```text
//! ┌─────────────────────────────────────────────────────────────────┐
//! │ SyncProtocolExecutor trait │
//! │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
//! │ │ HashComparison │ │ Snapshot │ │ BloomFilter │ │
//! │ │ Protocol │ │ Protocol │ │ Protocol │ │
//! │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
//! │ │ │ │ │
//! │ └────────────────────┼────────────────────┘ │
//! │ │ │
//! │ ┌───────────┴───────────┐ │
//! │ │ SyncTransport │ │
//! │ │ (Stream or SimStream) │ │
//! │ └───────────────────────┘ │
//! └─────────────────────────────────────────────────────────────────┘
//! ```
//!
//! # Example
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Doc example references non-existent type

The example uses HashComparisonProtocol but no struct implementing SyncProtocolExecutor is introduced in this PR, making the example misleading.

Suggested fix:

Either add a placeholder implementation or update the example to show the trait definition pattern without referencing unimplemented types.

//!
//! ```ignore
//! use calimero_node_primitives::sync::{SyncProtocolExecutor, HashComparisonProtocol};
//!
//! // Production
//! let mut transport = StreamTransport::new(&mut stream);
//! let stats = HashComparisonProtocol::run_initiator(
//! &mut transport,
//! &store,
//! context_id,
//! identity,
//! HashComparisonConfig { remote_root_hash },
//! ).await?;
//!
//! // Simulation (exact same call, different transport/store)
//! let mut transport = SimStream::new();
//! let stats = HashComparisonProtocol::run_initiator(
//! &mut transport,
//! &store, // Store<InMemoryDB>
//! context_id,
//! identity,
//! HashComparisonConfig { remote_root_hash },
//! ).await?;
//! ```

use async_trait::async_trait;
use calimero_primitives::context::ContextId;
use calimero_primitives::identity::PublicKey;
use calimero_store::Store;
use eyre::Result;

use super::SyncTransport;

/// Trait for sync protocol implementations.
///
/// Each sync protocol (HashComparison, Snapshot, BloomFilter, etc.) implements
/// this trait. The protocol logic is generic over:
///
/// - `T: SyncTransport` - the transport layer (production streams or simulation channels)
/// - `Store` - the storage backend (RocksDB or InMemoryDB)
///
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Trait bounds may be overly restrictive for Config type

The Config: Send bound requires all config types to be Send, but the trait already uses ?Send futures; if all usage is single-threaded, this constraint may be unnecessary.

Suggested fix:

Verify whether `Send` bound on `Config` is required, or document why it's needed despite the `?Send` future.

/// This enables the same protocol code to run in both production and simulation.
///
/// Note: Uses `?Send` because `RuntimeEnv` (used for storage access) contains `Rc`
/// which is not `Send`. Callers must not spawn these futures across threads.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Nit: Associated type bounds may be overly restrictive

Both Config: Send and Stats: Send + Default require Send, but the trait itself is ?Send - this asymmetry may cause confusion or limit flexibility for non-Send configs.

Suggested fix:

Consider whether `Send` bounds are truly necessary given the `?Send` trait bound, or document why they're required despite the trait being `?Send`.

#[async_trait(?Send)]
pub trait SyncProtocolExecutor {
/// Protocol-specific configuration for the initiator.
///
/// For example, HashComparison needs the remote root hash.
type Config: Send;

/// Protocol-specific statistics/results.
type Stats: Send + Default;

/// Run the initiator (pulling) side of the protocol.
///
/// The initiator requests data from the responder and applies it locally.
///
/// # Arguments
///
/// * `transport` - The transport for sending/receiving messages
/// * `store` - The local storage (works with both RocksDB and InMemoryDB)
/// * `context_id` - The context being synced
/// * `identity` - Our identity for this context
/// * `config` - Protocol-specific configuration
///
/// # Returns
///
/// Protocol-specific statistics on success.
async fn run_initiator<T: SyncTransport>(
transport: &mut T,
store: &Store,
context_id: ContextId,
identity: PublicKey,
config: Self::Config,
) -> Result<Self::Stats>;

/// Run the responder side of the protocol.
///
/// The responder answers requests from the initiator.
///
/// # Arguments
///
/// * `transport` - The transport for sending/receiving messages
/// * `store` - The local storage
/// * `context_id` - The context being synced
/// * `identity` - Our identity for this context
async fn run_responder<T: SyncTransport>(
transport: &mut T,
store: &Store,
context_id: ContextId,
identity: PublicKey,
) -> Result<()>;
}
184 changes: 184 additions & 0 deletions crates/node/primitives/src/sync/storage_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//! Storage bridge utilities for sync protocols.
//!
//! This module provides helpers to bridge `calimero-storage` APIs (which use
//! the `RuntimeEnv` thread-local) to the underlying `calimero-store` backend.
//!
//! # Why This Exists
//!
//! The `calimero-storage` crate provides high-level storage APIs (`Index`, `Interface`)
//! that operate through a thread-local `RuntimeEnv`. This `RuntimeEnv` contains
//! callbacks that route read/write/remove operations to the actual database.
//!
//! This module provides a single, reusable way to create these callbacks from
//! a `Store`, regardless of the backend (RocksDB or InMemoryDB).
//!
//! # Usage
//!
//! ```ignore
//! use calimero_node_primitives::sync::storage_bridge::create_runtime_env;
//!
//! // Works with any Store backend (RocksDB or InMemoryDB)
//! let runtime_env = create_runtime_env(&store, context_id, identity);
//!
//! // Use with storage APIs
//! with_runtime_env(runtime_env, || {
//! let index = Index::<MainStorage>::get_index(entity_id)?;
//! // ...
//! });
//! ```

use std::cell::RefCell;
use std::rc::Rc;

use calimero_primitives::context::ContextId;
use calimero_primitives::identity::PublicKey;
use calimero_storage::env::RuntimeEnv;
use calimero_storage::store::Key;
use calimero_store::{key, types, Store};
use tracing::warn;

/// Create a `RuntimeEnv` that bridges `calimero-storage` to a `Store`.
///
/// This is the canonical way to set up storage access for sync protocols.
/// The returned `RuntimeEnv` can be used with `with_runtime_env()` to enable
/// `Index<MainStorage>` and `Interface<MainStorage>` operations.
///
/// # Arguments
///
/// * `store` - The underlying store (works with both RocksDB and InMemoryDB)
/// * `context_id` - The context being accessed
/// * `executor_id` - The identity executing operations
///
/// # Example
///
/// ```ignore
/// let env = create_runtime_env(&store, context_id, identity);
/// let result = with_runtime_env(env, || {
/// Index::<MainStorage>::get_index(entity_id)
/// });
/// ```
pub fn create_runtime_env(
store: &Store,
context_id: ContextId,
executor_id: PublicKey,
) -> RuntimeEnv {
let callbacks = create_storage_callbacks(store, context_id);
RuntimeEnv::new(
callbacks.read,
callbacks.write,
callbacks.remove,
*context_id.as_ref(),
*executor_id.as_ref(),
)
}

/// Storage callback closures that bridge `calimero-storage` Key API to the Store.
///
/// These closures translate `calimero-storage::Key` (Index/Entry) to
/// `calimero-store::ContextStateKey` for access to the actual database.
#[expect(
clippy::type_complexity,
reason = "Matches RuntimeEnv callback signatures"
)]
struct StorageCallbacks {
read: Rc<dyn Fn(&Key) -> Option<Vec<u8>>>,
write: Rc<dyn Fn(Key, &[u8]) -> bool>,
remove: Rc<dyn Fn(&Key) -> bool>,
}

/// Create storage callbacks for a context.
///
/// These bridge the `calimero-storage` Key-based API to the underlying
/// `calimero-store` ContextStateKey-based storage.
#[expect(
clippy::type_complexity,
reason = "Matches RuntimeEnv callback signatures"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Three separate store handles created for read/write/remove

Each callback creates its own store.handle(), meaning read, write, and remove use different handles; if handles maintain transaction-level caches, read-after-write within one with_runtime_env block may not see uncommitted writes.

Suggested fix:

Consider sharing a single `RefCell<Handle>` across all three callbacks to ensure consistent view within a session.

)]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Multiple separate store handles created when one could be shared

create_storage_callbacks calls store.handle() three times, creating separate handles for read, write, and remove callbacks; write and remove could share one Rc<RefCell<_>> since they're used in a single-threaded context.

Suggested fix:

Create one shared `Rc<RefCell<store.handle()>>` for both write and remove closures.

fn create_storage_callbacks(store: &Store, context_id: ContextId) -> StorageCallbacks {
let read: Rc<dyn Fn(&Key) -> Option<Vec<u8>>> = {
let handle = store.handle();
let ctx_id = context_id;
Rc::new(move |key: &Key| {
let storage_key = key.to_bytes();
let state_key = key::ContextState::new(ctx_id, storage_key);
match handle.get(&state_key) {
Ok(Some(state)) => Some(state.value.into_boxed().into_vec()),
Ok(None) => None,
Err(e) => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Storage read errors are silently masked

When storage read fails, the error is logged but None is returned, potentially masking storage corruption or integrity issues from higher-level code.

Suggested fix:

Consider propagating the error or providing a mechanism for callers to detect masked failures (e.g., a metric counter).

warn!(
%ctx_id,
storage_key = %hex::encode(storage_key),
error = ?e,
"Storage read failed"
);
None
}
}
})
};

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Silent failure on storage write operations

The write callback returns is_ok() which silently discards the specific error; while not a direct vulnerability, silent write failures during sync could lead to data inconsistency or incomplete state that violates invariant I5.

Suggested fix:

Consider logging write failures or propagating errors rather than returning a boolean.

let write: Rc<dyn Fn(Key, &[u8]) -> bool> = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Storage errors logged but operation appears successful to caller

The read callback returns None on error (same as not-found), making it impossible for callers to distinguish storage failures from missing data, potentially masking data corruption or access issues.

Suggested fix:

Consider propagating errors through a Result type or using a sentinel pattern so callers can detect and handle storage failures appropriately.

let handle_cell: Rc<RefCell<_>> = Rc::new(RefCell::new(store.handle()));
let ctx_id = context_id;
Rc::new(move |key: Key, value: &[u8]| {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Silent write failures may cause data loss

The write callback returns false on error without logging, unlike the read callback which logs errors; this inconsistency makes debugging storage failures difficult and could mask data corruption.

Suggested fix:

Add warn! logging for write failures similar to the read callback's error handling.

let storage_key = key.to_bytes();
let state_key = key::ContextState::new(ctx_id, storage_key);
let slice: calimero_store::slice::Slice<'_> = value.to_vec().into();
let state_value = types::ContextState::from(slice);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Write/remove callbacks could panic on recursive borrow

Using borrow_mut() on RefCell will panic if the callback is invoked recursively within the same thread; try_borrow_mut() with proper error handling would be safer.

Suggested fix:

Consider using `try_borrow_mut()` and returning `false` on borrow failure, or document the non-reentrant constraint.

handle_cell
.borrow_mut()
.put(&state_key, &state_value)
.is_ok()
})
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Silent remove failures may cause data inconsistency

The remove callback returns false on error without logging, which could mask failed deletions and lead to stale data persisting unnoticed.

Suggested fix:

Add warn! logging for remove failures to match read callback's error handling pattern.


let remove: Rc<dyn Fn(&Key) -> bool> = {
let handle_cell: Rc<RefCell<_>> = Rc::new(RefCell::new(store.handle()));
let ctx_id = context_id;
Rc::new(move |key: &Key| {
let storage_key = key.to_bytes();
let state_key = key::ContextState::new(ctx_id, storage_key);
handle_cell.borrow_mut().delete(&state_key).is_ok()
})
};

StorageCallbacks {
read,
write,
remove,
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;
use calimero_storage::env::with_runtime_env;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Test coverage only verifies read callback, not write or remove

The test only verifies that create_runtime_env doesn't panic and that reading a non-existent index returns Ok(None), but doesn't verify that the write and remove callbacks function correctly.

Suggested fix:

Add tests that write data via the RuntimeEnv and verify it persists, then remove it and verify deletion.

use calimero_storage::index::Index;
use calimero_storage::store::MainStorage;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Test only verifies non-panic, not callback behavior

The test test_create_runtime_env_with_inmemory only checks that create_runtime_env doesn't panic; it doesn't verify that write/read callbacks actually persist and retrieve data correctly.

Suggested fix:

Add a test that writes data via the callbacks and reads it back to verify round-trip correctness.

use calimero_store::db::InMemoryDB;

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Test only verifies no panic, not actual functionality

The test test_create_runtime_env_with_inmemory only checks that calling Index::get_index doesn't panic; it would be more valuable to verify that write/read round-trips work correctly.

Suggested fix:

Add a test that writes via the RuntimeEnv callbacks and verifies the data can be read back.

#[test]
fn test_create_runtime_env_with_inmemory() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Test only verifies read callback, not write/remove

The unit test creates a RuntimeEnv and calls get_index but doesn't exercise the write or remove callbacks, leaving those code paths untested.

Suggested fix:

Add test cases that perform write operations and verify persistence, and remove operations to ensure all three callbacks work correctly.

// Create an in-memory store
let db = InMemoryDB::owned();
let store = Store::new(Arc::new(db));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Test assertion doesn't verify expected return value

The comment states 'should be Ok(None)' but the assertion only checks is_ok(), not that the inner value is None.

Suggested fix:

Change assertion to `assert!(matches!(result, Ok(None)));` or use `assert_eq!(result.unwrap(), None);`


// Create a context ID and identity
let context_id = ContextId::from([1u8; 32]);
let identity = PublicKey::from([2u8; 32]);

// Create RuntimeEnv - should not panic
let env = create_runtime_env(&store, context_id, identity);

// Use it with storage APIs
let result = with_runtime_env(env, || {
// Try to get a non-existent index - should return None, not panic
Index::<MainStorage>::get_index(calimero_storage::address::Id::root())
});

// Root index doesn't exist yet, should be Ok(None)
assert!(result.is_ok());
}
}
Loading
Loading