Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/node/primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ publish = true
[dependencies]
actix.workspace = true
async-stream.workspace = true
async-trait.workspace = true
borsh.workspace = true
camino.workspace = true
clap = { workspace = true, features = ["derive"] }
Expand Down
12 changes: 12 additions & 0 deletions crates/node/primitives/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ pub mod handshake;
pub mod hash_comparison;
pub mod levelwise;
pub mod protocol;
pub mod protocol_trait;
pub mod snapshot;
pub mod state_machine;
pub mod storage_bridge;
pub mod subtree;
pub mod transport;
pub mod wire;

// =============================================================================
Expand Down Expand Up @@ -107,3 +110,12 @@ pub use state_machine::{
build_handshake, build_handshake_from_raw, estimate_entity_count, estimate_max_depth,
LocalSyncState,
};

// Transport abstraction (for production streams and simulation)
pub use transport::{EncryptionState, SyncTransport};

// Protocol trait (common interface for all sync protocols)
pub use protocol_trait::SyncProtocolExecutor;

// Storage bridge (RuntimeEnv creation for sync protocols)
pub use storage_bridge::create_runtime_env;
124 changes: 124 additions & 0 deletions crates/node/primitives/src/sync/protocol_trait.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//! Common trait for sync protocol implementations.
//!
//! This module defines the [`SyncProtocolExecutor`] trait that all sync protocols
//! implement. This enables:
//!
//! - Protocol implementation details contained within each protocol module
//! - Common interface for `SyncManager` to invoke any protocol
//! - Same code path for production and simulation (only `Store` backend differs)
//!
//! # Architecture
//!
//! ```text
//! ┌─────────────────────────────────────────────────────────────────┐
//! │ SyncProtocolExecutor trait │
//! │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
//! │ │ HashComparison │ │ Snapshot │ │ BloomFilter │ │
//! │ │ Protocol │ │ Protocol │ │ Protocol │ │
//! │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
//! │ │ │ │ │
//! │ └────────────────────┼────────────────────┘ │
//! │ │ │
//! │ ┌───────────┴───────────┐ │
//! │ │ SyncTransport │ │
//! │ │ (Stream or SimStream) │ │
//! │ └───────────────────────┘ │
//! └─────────────────────────────────────────────────────────────────┘
//! ```
//!
//! # Example
//!
//! ```ignore
//! use calimero_node_primitives::sync::{SyncProtocolExecutor, HashComparisonProtocol};
//!
//! // Production
//! let mut transport = StreamTransport::new(&mut stream);
//! let stats = HashComparisonProtocol::run_initiator(
//! &mut transport,
//! &store,
//! context_id,
//! identity,
//! HashComparisonConfig { remote_root_hash },
//! ).await?;
//!
//! // Simulation (exact same call, different transport/store)
//! let mut transport = SimStream::new();
//! let stats = HashComparisonProtocol::run_initiator(
//! &mut transport,
//! &store, // Store<InMemoryDB>
//! context_id,
//! identity,
//! HashComparisonConfig { remote_root_hash },
//! ).await?;
//! ```

use async_trait::async_trait;
use calimero_primitives::context::ContextId;
use calimero_primitives::identity::PublicKey;
use calimero_store::Store;
use eyre::Result;

use super::SyncTransport;

/// Trait for sync protocol implementations.
///
/// Each sync protocol (HashComparison, Snapshot, BloomFilter, etc.) implements
/// this trait. The protocol logic is generic over:
///
/// - `T: SyncTransport` - the transport layer (production streams or simulation channels)
/// - `Store` - the storage backend (RocksDB or InMemoryDB)
///
/// This enables the same protocol code to run in both production and simulation.
///
/// Note: Uses `?Send` because `RuntimeEnv` (used for storage access) contains `Rc`
/// which is not `Send`. Callers must not spawn these futures across threads.
#[async_trait(?Send)]
pub trait SyncProtocolExecutor {
/// Protocol-specific configuration for the initiator.
///
/// For example, HashComparison needs the remote root hash.
type Config: Send;

/// Protocol-specific statistics/results.
type Stats: Send + Default;

/// Run the initiator (pulling) side of the protocol.
///
/// The initiator requests data from the responder and applies it locally.
///
/// # Arguments
///
/// * `transport` - The transport for sending/receiving messages
/// * `store` - The local storage (works with both RocksDB and InMemoryDB)
/// * `context_id` - The context being synced
/// * `identity` - Our identity for this context
/// * `config` - Protocol-specific configuration
///
/// # Returns
///
/// Protocol-specific statistics on success.
async fn run_initiator<T: SyncTransport>(
transport: &mut T,
store: &Store,
context_id: ContextId,
identity: PublicKey,
config: Self::Config,
) -> Result<Self::Stats>;

/// Run the responder side of the protocol.
///
/// The responder answers requests from the initiator.
///
/// # Arguments
///
/// * `transport` - The transport for sending/receiving messages
/// * `store` - The local storage
/// * `context_id` - The context being synced
/// * `identity` - Our identity for this context
async fn run_responder<T: SyncTransport>(
transport: &mut T,
store: &Store,
context_id: ContextId,
identity: PublicKey,
) -> Result<()>;
}
184 changes: 184 additions & 0 deletions crates/node/primitives/src/sync/storage_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//! Storage bridge utilities for sync protocols.
//!
//! This module provides helpers to bridge `calimero-storage` APIs (which use
//! the `RuntimeEnv` thread-local) to the underlying `calimero-store` backend.
//!
//! # Why This Exists
//!
//! The `calimero-storage` crate provides high-level storage APIs (`Index`, `Interface`)
//! that operate through a thread-local `RuntimeEnv`. This `RuntimeEnv` contains
//! callbacks that route read/write/remove operations to the actual database.
//!
//! This module provides a single, reusable way to create these callbacks from
//! a `Store`, regardless of the backend (RocksDB or InMemoryDB).
//!
//! # Usage
//!
//! ```ignore
//! use calimero_node_primitives::sync::storage_bridge::create_runtime_env;
//!
//! // Works with any Store backend (RocksDB or InMemoryDB)
//! let runtime_env = create_runtime_env(&store, context_id, identity);
//!
//! // Use with storage APIs
//! with_runtime_env(runtime_env, || {
//! let index = Index::<MainStorage>::get_index(entity_id)?;
//! // ...
//! });
//! ```

use std::cell::RefCell;
use std::rc::Rc;

use calimero_primitives::context::ContextId;
use calimero_primitives::identity::PublicKey;
use calimero_storage::env::RuntimeEnv;
use calimero_storage::store::Key;
use calimero_store::{key, types, Store};
use tracing::warn;

/// Create a `RuntimeEnv` that bridges `calimero-storage` to a `Store`.
///
/// This is the canonical way to set up storage access for sync protocols.
/// The returned `RuntimeEnv` can be used with `with_runtime_env()` to enable
/// `Index<MainStorage>` and `Interface<MainStorage>` operations.
///
/// # Arguments
///
/// * `store` - The underlying store (works with both RocksDB and InMemoryDB)
/// * `context_id` - The context being accessed
/// * `executor_id` - The identity executing operations
///
/// # Example
///
/// ```ignore
/// let env = create_runtime_env(&store, context_id, identity);
/// let result = with_runtime_env(env, || {
/// Index::<MainStorage>::get_index(entity_id)
/// });
/// ```
pub fn create_runtime_env(
store: &Store,
context_id: ContextId,
executor_id: PublicKey,
) -> RuntimeEnv {
let callbacks = create_storage_callbacks(store, context_id);
RuntimeEnv::new(
callbacks.read,
callbacks.write,
callbacks.remove,
*context_id.as_ref(),
*executor_id.as_ref(),
)
}

/// Storage callback closures that bridge `calimero-storage` Key API to the Store.
///
/// These closures translate `calimero-storage::Key` (Index/Entry) to
/// `calimero-store::ContextStateKey` for access to the actual database.
#[expect(
clippy::type_complexity,
reason = "Matches RuntimeEnv callback signatures"
)]
struct StorageCallbacks {
read: Rc<dyn Fn(&Key) -> Option<Vec<u8>>>,
write: Rc<dyn Fn(Key, &[u8]) -> bool>,
remove: Rc<dyn Fn(&Key) -> bool>,
}

/// Create storage callbacks for a context.
///
/// These bridge the `calimero-storage` Key-based API to the underlying
/// `calimero-store` ContextStateKey-based storage.
#[expect(
clippy::type_complexity,
reason = "Matches RuntimeEnv callback signatures"
)]
fn create_storage_callbacks(store: &Store, context_id: ContextId) -> StorageCallbacks {
let read: Rc<dyn Fn(&Key) -> Option<Vec<u8>>> = {
let handle = store.handle();
let ctx_id = context_id;
Rc::new(move |key: &Key| {
let storage_key = key.to_bytes();
let state_key = key::ContextState::new(ctx_id, storage_key);
match handle.get(&state_key) {
Ok(Some(state)) => Some(state.value.into_boxed().into_vec()),
Ok(None) => None,
Err(e) => {
warn!(
%ctx_id,
storage_key = %hex::encode(storage_key),
error = ?e,
"Storage read failed"
);
None
}
}
})
};

let write: Rc<dyn Fn(Key, &[u8]) -> bool> = {
let handle_cell: Rc<RefCell<_>> = Rc::new(RefCell::new(store.handle()));
let ctx_id = context_id;
Rc::new(move |key: Key, value: &[u8]| {
let storage_key = key.to_bytes();
let state_key = key::ContextState::new(ctx_id, storage_key);
let slice: calimero_store::slice::Slice<'_> = value.to_vec().into();
let state_value = types::ContextState::from(slice);
handle_cell
.borrow_mut()
.put(&state_key, &state_value)
.is_ok()
})
};

let remove: Rc<dyn Fn(&Key) -> bool> = {
let handle_cell: Rc<RefCell<_>> = Rc::new(RefCell::new(store.handle()));
let ctx_id = context_id;
Rc::new(move |key: &Key| {
let storage_key = key.to_bytes();
let state_key = key::ContextState::new(ctx_id, storage_key);
handle_cell.borrow_mut().delete(&state_key).is_ok()
})
};

StorageCallbacks {
read,
write,
remove,
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;
use calimero_storage::env::with_runtime_env;
use calimero_storage::index::Index;
use calimero_storage::store::MainStorage;
use calimero_store::db::InMemoryDB;

#[test]
fn test_create_runtime_env_with_inmemory() {
// Create an in-memory store
let db = InMemoryDB::owned();
let store = Store::new(Arc::new(db));

// Create a context ID and identity
let context_id = ContextId::from([1u8; 32]);
let identity = PublicKey::from([2u8; 32]);

// Create RuntimeEnv - should not panic
let env = create_runtime_env(&store, context_id, identity);

// Use it with storage APIs
let result = with_runtime_env(env, || {
// Try to get a non-existent index - should return None, not panic
Index::<MainStorage>::get_index(calimero_storage::address::Id::root())
});

// Root index doesn't exist yet, should be Ok(None)
assert!(result.is_ok());
}
}
Loading
Loading