Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions crates/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,23 @@ impl Module {
// This allows the storage layer to call merge functions even while
// the main WASM execution is in progress.
if let Some(merge_callback) = self.create_merge_callback() {
let callback_rc =
// Wrap the callback in Arc so we can share it between the two closures
let merge_callback = std::sync::Arc::new(merge_callback);

// Custom type merge callback
let custom_callback = std::sync::Arc::clone(&merge_callback);
let custom_rc =
std::rc::Rc::new(move |local: &[u8], remote: &[u8], type_name: &str| {
merge_callback.merge_custom(local, remote, type_name)
custom_callback.merge_custom(local, remote, type_name)
});
logic.with_merge_callback(callback_rc);
logic.with_merge_callback(custom_rc);

// Root state merge callback
let root_callback = merge_callback;
let root_rc = std::rc::Rc::new(move |local: &[u8], remote: &[u8]| {
root_callback.merge_root_state(local, remote)
});
logic.with_root_merge_callback(root_rc);
}

let mut store = Store::new(self.engine.clone());
Expand Down
48 changes: 47 additions & 1 deletion crates/runtime/src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,19 @@ pub struct VMLogic<'a> {
-> Result<Vec<u8>, calimero_storage::collections::crdt_meta::MergeError>,
>,
>,

/// Optional WASM merge callback for root state merging during sync.
/// This is called by the storage layer when merging root state and no
/// in-process merge function is registered.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Nit: Verbose type signature could use a type alias

The Rc<dyn Fn(&[u8], &[u8]) -> Result<Vec<u8>, MergeError>> type is repeated across multiple files; a type alias would improve readability (DRY).

Suggested fix:

Consider adding `pub type RootMergeCallback = Rc<dyn Fn(&[u8], &[u8]) -> Result<Vec<u8>, MergeError>>;` in a shared location.

root_merge_callback: Option<
std::rc::Rc<
dyn Fn(
&[u8],
&[u8],
)
-> Result<Vec<u8>, calimero_storage::collections::crdt_meta::MergeError>,
>,
>,
}

impl<'a> VMLogic<'a> {
Expand Down Expand Up @@ -324,8 +337,9 @@ impl<'a> VMLogic<'a> {
blob_handles: HashMap::new(),
next_blob_fd: 1,

// Merge callback (set separately via with_merge_callback)
// Merge callbacks (set separately via with_merge_callback / with_root_merge_callback)
merge_callback: None,
root_merge_callback: None,
}
}

Expand Down Expand Up @@ -364,6 +378,38 @@ impl<'a> VMLogic<'a> {
self.merge_callback.clone()
}

/// Sets the WASM merge callback for root state merging.
///
/// This callback is used by the storage layer when merging root state
/// and no in-process merge function is registered.
pub fn with_root_merge_callback(
&mut self,
callback: std::rc::Rc<
dyn Fn(
&[u8],
&[u8],
)
-> Result<Vec<u8>, calimero_storage::collections::crdt_meta::MergeError>,
>,
) {
self.root_merge_callback = Some(callback);
}

/// Returns the root merge callback if set.
pub fn root_merge_callback(
&self,
) -> Option<
std::rc::Rc<
dyn Fn(
&[u8],
&[u8],
)
-> Result<Vec<u8>, calimero_storage::collections::crdt_meta::MergeError>,
>,
> {
self.root_merge_callback.clone()
}

/// Associates a Wasmer memory instance with this `VMLogic`.
///
/// This method should be called after the guest module is instantiated but before
Expand Down
4 changes: 3 additions & 1 deletion crates/runtime/src/logic/host_functions/js_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ const COLLECTION_ID_LEN: usize = 32;
impl VMHostFunctions<'_> {
fn make_runtime_env(&mut self) -> VMLogicResult<RuntimeEnv> {
self.with_logic_mut(|logic| {
// Get callback before borrowing storage to avoid borrow conflicts
// Get callbacks before borrowing storage to avoid borrow conflicts
let merge_callback = logic.merge_callback();
let root_merge_callback = logic.root_merge_callback();
Ok(build_runtime_env_with_merge(
logic.storage,
logic.context.context_id,
logic.context.executor_public_key,
merge_callback,
root_merge_callback,
))
})
}
Expand Down
36 changes: 28 additions & 8 deletions crates/runtime/src/logic/host_functions/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::rc::Rc;
/// storage crate to resolve reads/writes against the live context storage.
///
/// The optional `merge_callback` is used for custom type merging via WASM.
/// The optional `root_merge_callback` is used for root state merging via WASM.
pub(super) fn build_runtime_env_with_merge(
storage: &mut dyn RuntimeStorage,
context_id: [u8; DIGEST_SIZE],
Expand All @@ -45,6 +46,15 @@ pub(super) fn build_runtime_env_with_merge(
-> Result<Vec<u8>, calimero_storage::collections::crdt_meta::MergeError>,
>,
>,
root_merge_callback: Option<
std::rc::Rc<
dyn Fn(
&[u8],
&[u8],
)
-> Result<Vec<u8>, calimero_storage::collections::crdt_meta::MergeError>,
>,
>,
) -> RuntimeEnv {
let raw_ptr: *mut dyn RuntimeStorage = storage;
let (data_ptr, vtable_ptr): (*mut (), *mut ()) = unsafe { mem::transmute(raw_ptr) };
Expand Down Expand Up @@ -101,12 +111,14 @@ pub(super) fn build_runtime_env_with_merge(
// the storage crate falls back to its default environment, so subsequent
// calls that do not install an override will continue to use the mock /
// WASM backends.
let env = RuntimeEnv::new(reader, writer, remover, context_id, executor_id);
let mut env = RuntimeEnv::new(reader, writer, remover, context_id, executor_id);
if let Some(callback) = merge_callback {
env.with_merge_callback(callback)
} else {
env
env = env.with_merge_callback(callback);
}
if let Some(callback) = root_merge_callback {
env = env.with_root_merge_callback(callback);
}
env
}

thread_local! {
Expand Down Expand Up @@ -836,13 +848,15 @@ impl VMHostFunctions<'_> {
target: "runtime::host::system",
"apply_storage_delta using context id"
);
// Get callback before borrowing storage to avoid borrow conflicts
// Get callbacks before borrowing storage to avoid borrow conflicts
let merge_callback = logic.merge_callback();
let root_merge_callback = logic.root_merge_callback();
let env = build_runtime_env_with_merge(
logic.storage,
logic.context.context_id,
logic.context.executor_public_key,
merge_callback,
root_merge_callback,
);

let payload = payload_opt
Expand Down Expand Up @@ -881,13 +895,15 @@ impl VMHostFunctions<'_> {
.iter()
.map(|byte| format!("{byte:02x}"))
.collect();
// Get callback before borrowing storage to avoid borrow conflicts
// Get callbacks before borrowing storage to avoid borrow conflicts
let merge_callback = logic.merge_callback();
let root_merge_callback = logic.root_merge_callback();
let env = build_runtime_env_with_merge(
logic.storage,
logic.context.context_id,
logic.context.executor_public_key,
merge_callback,
root_merge_callback,
);

let maybe_bytes =
Expand Down Expand Up @@ -944,13 +960,15 @@ impl VMHostFunctions<'_> {
"apply_storage_delta start"
);

// Get callback before borrowing storage to avoid borrow conflicts
// Get callbacks before borrowing storage to avoid borrow conflicts
let merge_callback = logic.merge_callback();
let root_merge_callback = logic.root_merge_callback();
let env = build_runtime_env_with_merge(
logic.storage,
logic.context.context_id,
logic.context.executor_public_key,
merge_callback,
root_merge_callback,
);

with_runtime_env(env.clone(), || {
Expand Down Expand Up @@ -1002,13 +1020,15 @@ impl VMHostFunctions<'_> {
/// Returns `1` if a delta was emitted, `0` if there was nothing to commit.
pub fn flush_delta(&mut self) -> VMLogicResult<i32> {
self.with_logic_mut(|logic| -> VMLogicResult<i32> {
// Get callback before borrowing storage to avoid borrow conflicts
// Get callbacks before borrowing storage to avoid borrow conflicts
let merge_callback = logic.merge_callback();
let root_merge_callback = logic.root_merge_callback();
let env = build_runtime_env_with_merge(
logic.storage,
logic.context.context_id,
logic.context.executor_public_key,
merge_callback,
root_merge_callback,
);

let root_hash = with_runtime_env(env.clone(), || {
Expand Down
61 changes: 61 additions & 0 deletions crates/storage/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ pub struct RuntimeEnv {
) -> Result<Vec<u8>, crate::collections::crdt_meta::MergeError>,
>,
>,
/// Optional WASM merge callback for root state merging.
/// This is used during sync when no in-process merge function is registered.
wasm_root_merge_callback: Option<
std::rc::Rc<
dyn Fn(&[u8], &[u8]) -> Result<Vec<u8>, crate::collections::crdt_meta::MergeError>,
>,
>,
}

#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -96,6 +103,7 @@ impl RuntimeEnv {
context_id,
executor_id,
wasm_merge_callback: None,
wasm_root_merge_callback: None,
}
}

Expand All @@ -118,6 +126,22 @@ impl RuntimeEnv {
self
}

/// Sets the WASM merge callback for root state merging.
///
/// The callback receives (local_bytes, remote_bytes) and returns
/// the merged bytes or an error. This is used when no in-process
/// merge function is registered for the root entity type.
#[must_use]
pub fn with_root_merge_callback(
mut self,
callback: std::rc::Rc<
dyn Fn(&[u8], &[u8]) -> Result<Vec<u8>, crate::collections::crdt_meta::MergeError>,
>,
) -> Self {
self.wasm_root_merge_callback = Some(callback);
self
}

/// Returns the WASM merge callback if set.
#[must_use]
pub fn wasm_merge_callback(
Expand All @@ -134,6 +158,18 @@ impl RuntimeEnv {
self.wasm_merge_callback.clone()
}

/// Returns the WASM root merge callback if set.
#[must_use]
pub fn wasm_root_merge_callback(
&self,
) -> Option<
std::rc::Rc<
dyn Fn(&[u8], &[u8]) -> Result<Vec<u8>, crate::collections::crdt_meta::MergeError>,
>,
> {
self.wasm_root_merge_callback.clone()
}

#[must_use]
/// Returns the storage read callback.
pub fn storage_read(&self) -> std::rc::Rc<dyn Fn(&Key) -> Option<Vec<u8>>> {
Expand Down Expand Up @@ -184,6 +220,18 @@ pub fn get_wasm_merge_callback() -> Option<
mocked::get_wasm_merge_callback()
}

#[cfg(not(target_arch = "wasm32"))]
/// Returns the current WASM root merge callback from the runtime environment, if any.
///
/// This is called by storage merge functions to merge root state via WASM
/// when no in-process merge function is registered.
#[must_use]
pub fn get_wasm_root_merge_callback() -> Option<
std::rc::Rc<dyn Fn(&[u8], &[u8]) -> Result<Vec<u8>, crate::collections::crdt_meta::MergeError>>,
> {
mocked::get_wasm_root_merge_callback()
}

/// Commits the root hash to the runtime.
///
#[expect(clippy::missing_const_for_fn, reason = "Cannot be const here")]
Expand Down Expand Up @@ -595,6 +643,19 @@ mod mocked {
RUNTIME_ENV.with(|env| env.borrow().as_ref().and_then(|e| e.wasm_merge_callback()))
}

/// Returns the WASM root merge callback from the current runtime environment.
pub(super) fn get_wasm_root_merge_callback() -> Option<
std::rc::Rc<
dyn Fn(&[u8], &[u8]) -> Result<Vec<u8>, crate::collections::crdt_meta::MergeError>,
>,
> {
RUNTIME_ENV.with(|env| {
env.borrow()
.as_ref()
.and_then(|e| e.wasm_root_merge_callback())
})
}

/// Resets the environment state for testing.
///
/// Clears the thread-local ROOT_HASH, HLC, and STORAGE, allowing multiple tests
Expand Down
30 changes: 30 additions & 0 deletions crates/storage/src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ pub fn merge_root_state(
match try_merge_registered(existing, incoming, existing_ts, incoming_ts) {
MergeRegistryResult::Success(merged) => Ok(merged),
MergeRegistryResult::NoFunctionsRegistered => {
// No in-process merge function registered.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Duplicate WASM callback check creates dead code and incorrect error handling

The first WASM callback check at line 109 returns immediately on both success AND failure, making the second check at line 136 unreachable dead code; if the callback returns an error, it propagates up instead of falling back to LWW as the second block's comment suggests.

Suggested fix:

Remove the first early-return WASM check and keep only the second one that properly handles errors with LWW fallback, or change the first check to only return on Ok and fall through on Err.

// Try WASM callback if available (RuntimeEnv provides this).
#[cfg(not(target_arch = "wasm32"))]
if let Some(wasm_callback) = crate::env::get_wasm_root_merge_callback() {
tracing::debug!(
target: "calimero_storage::merge",
"No in-process merge function registered, trying WASM callback"
);
return wasm_callback(existing, incoming);
}

// I5 Enforcement: No silent data loss
//
// If the root entity contains CRDTs (Counter, etc.) and no merge function
Expand All @@ -122,6 +133,25 @@ pub fn merge_root_state(
// - The data type doesn't match any registered type (test contamination)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Duplicate WASM callback check is dead code

The second get_wasm_root_merge_callback() call (lines 136-156) is unreachable: if the first check (lines 109-119) finds a callback it returns, otherwise there's no callback and this second check will also find None.

Suggested fix:

Remove the second WASM callback block entirely; the first check already handles this case.

// - Deserialization failed (corrupt data)
//
// Try WASM callback as a fallback before LWW.
#[cfg(not(target_arch = "wasm32"))]
if let Some(wasm_callback) = crate::env::get_wasm_root_merge_callback() {
tracing::debug!(
target: "calimero_storage::merge",
"All registered merge functions failed, trying WASM callback"
);
match wasm_callback(existing, incoming) {
Ok(merged) => return Ok(merged),
Err(e) => {
tracing::warn!(
target: "calimero_storage::merge",
error = ?e,
"WASM callback also failed, falling back to LWW"
);
}
}
}

// Fall back to LWW to maintain backwards compatibility.
// The incoming value wins if timestamps are equal or incoming is newer.
//
Expand Down
Loading