diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index cc6be490d..bba9de687 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -312,11 +312,23 @@ impl Module { // This allows the storage layer to call merge functions even while // the main WASM execution is in progress. if let Some(merge_callback) = self.create_merge_callback() { - let callback_rc = + // Wrap the callback in Arc so we can share it between the two closures + let merge_callback = std::sync::Arc::new(merge_callback); + + // Custom type merge callback + let custom_callback = std::sync::Arc::clone(&merge_callback); + let custom_rc = std::rc::Rc::new(move |local: &[u8], remote: &[u8], type_name: &str| { - merge_callback.merge_custom(local, remote, type_name) + custom_callback.merge_custom(local, remote, type_name) }); - logic.with_merge_callback(callback_rc); + logic.with_merge_callback(custom_rc); + + // Root state merge callback + let root_callback = merge_callback; + let root_rc = std::rc::Rc::new(move |local: &[u8], remote: &[u8]| { + root_callback.merge_root_state(local, remote) + }); + logic.with_root_merge_callback(root_rc); } let mut store = Store::new(self.engine.clone()); diff --git a/crates/runtime/src/logic.rs b/crates/runtime/src/logic.rs index df74c4cff..35f32762b 100644 --- a/crates/runtime/src/logic.rs +++ b/crates/runtime/src/logic.rs @@ -272,6 +272,19 @@ pub struct VMLogic<'a> { -> Result, calimero_storage::collections::crdt_meta::MergeError>, >, >, + + /// Optional WASM merge callback for root state merging during sync. + /// This is called by the storage layer when merging root state and no + /// in-process merge function is registered. + root_merge_callback: Option< + std::rc::Rc< + dyn Fn( + &[u8], + &[u8], + ) + -> Result, calimero_storage::collections::crdt_meta::MergeError>, + >, + >, } impl<'a> VMLogic<'a> { @@ -324,8 +337,9 @@ impl<'a> VMLogic<'a> { blob_handles: HashMap::new(), next_blob_fd: 1, - // Merge callback (set separately via with_merge_callback) + // Merge callbacks (set separately via with_merge_callback / with_root_merge_callback) merge_callback: None, + root_merge_callback: None, } } @@ -364,6 +378,38 @@ impl<'a> VMLogic<'a> { self.merge_callback.clone() } + /// Sets the WASM merge callback for root state merging. + /// + /// This callback is used by the storage layer when merging root state + /// and no in-process merge function is registered. + pub fn with_root_merge_callback( + &mut self, + callback: std::rc::Rc< + dyn Fn( + &[u8], + &[u8], + ) + -> Result, calimero_storage::collections::crdt_meta::MergeError>, + >, + ) { + self.root_merge_callback = Some(callback); + } + + /// Returns the root merge callback if set. + pub fn root_merge_callback( + &self, + ) -> Option< + std::rc::Rc< + dyn Fn( + &[u8], + &[u8], + ) + -> Result, calimero_storage::collections::crdt_meta::MergeError>, + >, + > { + self.root_merge_callback.clone() + } + /// Associates a Wasmer memory instance with this `VMLogic`. /// /// This method should be called after the guest module is instantiated but before diff --git a/crates/runtime/src/logic/host_functions/js_collections.rs b/crates/runtime/src/logic/host_functions/js_collections.rs index e86623204..4f5e029b6 100644 --- a/crates/runtime/src/logic/host_functions/js_collections.rs +++ b/crates/runtime/src/logic/host_functions/js_collections.rs @@ -29,13 +29,15 @@ const COLLECTION_ID_LEN: usize = 32; impl VMHostFunctions<'_> { fn make_runtime_env(&mut self) -> VMLogicResult { self.with_logic_mut(|logic| { - // Get callback before borrowing storage to avoid borrow conflicts + // Get callbacks before borrowing storage to avoid borrow conflicts let merge_callback = logic.merge_callback(); + let root_merge_callback = logic.root_merge_callback(); Ok(build_runtime_env_with_merge( logic.storage, logic.context.context_id, logic.context.executor_public_key, merge_callback, + root_merge_callback, )) }) } diff --git a/crates/runtime/src/logic/host_functions/system.rs b/crates/runtime/src/logic/host_functions/system.rs index 951cdbb28..96bc808bb 100644 --- a/crates/runtime/src/logic/host_functions/system.rs +++ b/crates/runtime/src/logic/host_functions/system.rs @@ -31,6 +31,7 @@ use std::rc::Rc; /// storage crate to resolve reads/writes against the live context storage. /// /// The optional `merge_callback` is used for custom type merging via WASM. +/// The optional `root_merge_callback` is used for root state merging via WASM. pub(super) fn build_runtime_env_with_merge( storage: &mut dyn RuntimeStorage, context_id: [u8; DIGEST_SIZE], @@ -45,6 +46,15 @@ pub(super) fn build_runtime_env_with_merge( -> Result, calimero_storage::collections::crdt_meta::MergeError>, >, >, + root_merge_callback: Option< + std::rc::Rc< + dyn Fn( + &[u8], + &[u8], + ) + -> Result, calimero_storage::collections::crdt_meta::MergeError>, + >, + >, ) -> RuntimeEnv { let raw_ptr: *mut dyn RuntimeStorage = storage; let (data_ptr, vtable_ptr): (*mut (), *mut ()) = unsafe { mem::transmute(raw_ptr) }; @@ -101,12 +111,14 @@ pub(super) fn build_runtime_env_with_merge( // the storage crate falls back to its default environment, so subsequent // calls that do not install an override will continue to use the mock / // WASM backends. - let env = RuntimeEnv::new(reader, writer, remover, context_id, executor_id); + let mut env = RuntimeEnv::new(reader, writer, remover, context_id, executor_id); if let Some(callback) = merge_callback { - env.with_merge_callback(callback) - } else { - env + env = env.with_merge_callback(callback); + } + if let Some(callback) = root_merge_callback { + env = env.with_root_merge_callback(callback); } + env } thread_local! { @@ -836,13 +848,15 @@ impl VMHostFunctions<'_> { target: "runtime::host::system", "apply_storage_delta using context id" ); - // Get callback before borrowing storage to avoid borrow conflicts + // Get callbacks before borrowing storage to avoid borrow conflicts let merge_callback = logic.merge_callback(); + let root_merge_callback = logic.root_merge_callback(); let env = build_runtime_env_with_merge( logic.storage, logic.context.context_id, logic.context.executor_public_key, merge_callback, + root_merge_callback, ); let payload = payload_opt @@ -881,13 +895,15 @@ impl VMHostFunctions<'_> { .iter() .map(|byte| format!("{byte:02x}")) .collect(); - // Get callback before borrowing storage to avoid borrow conflicts + // Get callbacks before borrowing storage to avoid borrow conflicts let merge_callback = logic.merge_callback(); + let root_merge_callback = logic.root_merge_callback(); let env = build_runtime_env_with_merge( logic.storage, logic.context.context_id, logic.context.executor_public_key, merge_callback, + root_merge_callback, ); let maybe_bytes = @@ -944,13 +960,15 @@ impl VMHostFunctions<'_> { "apply_storage_delta start" ); - // Get callback before borrowing storage to avoid borrow conflicts + // Get callbacks before borrowing storage to avoid borrow conflicts let merge_callback = logic.merge_callback(); + let root_merge_callback = logic.root_merge_callback(); let env = build_runtime_env_with_merge( logic.storage, logic.context.context_id, logic.context.executor_public_key, merge_callback, + root_merge_callback, ); with_runtime_env(env.clone(), || { @@ -1002,13 +1020,15 @@ impl VMHostFunctions<'_> { /// Returns `1` if a delta was emitted, `0` if there was nothing to commit. pub fn flush_delta(&mut self) -> VMLogicResult { self.with_logic_mut(|logic| -> VMLogicResult { - // Get callback before borrowing storage to avoid borrow conflicts + // Get callbacks before borrowing storage to avoid borrow conflicts let merge_callback = logic.merge_callback(); + let root_merge_callback = logic.root_merge_callback(); let env = build_runtime_env_with_merge( logic.storage, logic.context.context_id, logic.context.executor_public_key, merge_callback, + root_merge_callback, ); let root_hash = with_runtime_env(env.clone(), || { diff --git a/crates/storage/src/env.rs b/crates/storage/src/env.rs index fbae22d94..3ae876f3e 100644 --- a/crates/storage/src/env.rs +++ b/crates/storage/src/env.rs @@ -72,6 +72,13 @@ pub struct RuntimeEnv { ) -> Result, crate::collections::crdt_meta::MergeError>, >, >, + /// Optional WASM merge callback for root state merging. + /// This is used during sync when no in-process merge function is registered. + wasm_root_merge_callback: Option< + std::rc::Rc< + dyn Fn(&[u8], &[u8]) -> Result, crate::collections::crdt_meta::MergeError>, + >, + >, } #[cfg(not(target_arch = "wasm32"))] @@ -96,6 +103,7 @@ impl RuntimeEnv { context_id, executor_id, wasm_merge_callback: None, + wasm_root_merge_callback: None, } } @@ -118,6 +126,22 @@ impl RuntimeEnv { self } + /// Sets the WASM merge callback for root state merging. + /// + /// The callback receives (local_bytes, remote_bytes) and returns + /// the merged bytes or an error. This is used when no in-process + /// merge function is registered for the root entity type. + #[must_use] + pub fn with_root_merge_callback( + mut self, + callback: std::rc::Rc< + dyn Fn(&[u8], &[u8]) -> Result, crate::collections::crdt_meta::MergeError>, + >, + ) -> Self { + self.wasm_root_merge_callback = Some(callback); + self + } + /// Returns the WASM merge callback if set. #[must_use] pub fn wasm_merge_callback( @@ -134,6 +158,18 @@ impl RuntimeEnv { self.wasm_merge_callback.clone() } + /// Returns the WASM root merge callback if set. + #[must_use] + pub fn wasm_root_merge_callback( + &self, + ) -> Option< + std::rc::Rc< + dyn Fn(&[u8], &[u8]) -> Result, crate::collections::crdt_meta::MergeError>, + >, + > { + self.wasm_root_merge_callback.clone() + } + #[must_use] /// Returns the storage read callback. pub fn storage_read(&self) -> std::rc::Rc Option>> { @@ -184,6 +220,18 @@ pub fn get_wasm_merge_callback() -> Option< mocked::get_wasm_merge_callback() } +#[cfg(not(target_arch = "wasm32"))] +/// Returns the current WASM root merge callback from the runtime environment, if any. +/// +/// This is called by storage merge functions to merge root state via WASM +/// when no in-process merge function is registered. +#[must_use] +pub fn get_wasm_root_merge_callback() -> Option< + std::rc::Rc Result, crate::collections::crdt_meta::MergeError>>, +> { + mocked::get_wasm_root_merge_callback() +} + /// Commits the root hash to the runtime. /// #[expect(clippy::missing_const_for_fn, reason = "Cannot be const here")] @@ -595,6 +643,19 @@ mod mocked { RUNTIME_ENV.with(|env| env.borrow().as_ref().and_then(|e| e.wasm_merge_callback())) } + /// Returns the WASM root merge callback from the current runtime environment. + pub(super) fn get_wasm_root_merge_callback() -> Option< + std::rc::Rc< + dyn Fn(&[u8], &[u8]) -> Result, crate::collections::crdt_meta::MergeError>, + >, + > { + RUNTIME_ENV.with(|env| { + env.borrow() + .as_ref() + .and_then(|e| e.wasm_root_merge_callback()) + }) + } + /// Resets the environment state for testing. /// /// Clears the thread-local ROOT_HASH, HLC, and STORAGE, allowing multiple tests diff --git a/crates/storage/src/merge.rs b/crates/storage/src/merge.rs index 6d25dec5e..94e3c13c6 100644 --- a/crates/storage/src/merge.rs +++ b/crates/storage/src/merge.rs @@ -106,6 +106,17 @@ pub fn merge_root_state( match try_merge_registered(existing, incoming, existing_ts, incoming_ts) { MergeRegistryResult::Success(merged) => Ok(merged), MergeRegistryResult::NoFunctionsRegistered => { + // No in-process merge function registered. + // Try WASM callback if available (RuntimeEnv provides this). + #[cfg(not(target_arch = "wasm32"))] + if let Some(wasm_callback) = crate::env::get_wasm_root_merge_callback() { + tracing::debug!( + target: "calimero_storage::merge", + "No in-process merge function registered, trying WASM callback" + ); + return wasm_callback(existing, incoming); + } + // I5 Enforcement: No silent data loss // // If the root entity contains CRDTs (Counter, etc.) and no merge function @@ -122,6 +133,25 @@ pub fn merge_root_state( // - The data type doesn't match any registered type (test contamination) // - Deserialization failed (corrupt data) // + // Try WASM callback as a fallback before LWW. + #[cfg(not(target_arch = "wasm32"))] + if let Some(wasm_callback) = crate::env::get_wasm_root_merge_callback() { + tracing::debug!( + target: "calimero_storage::merge", + "All registered merge functions failed, trying WASM callback" + ); + match wasm_callback(existing, incoming) { + Ok(merged) => return Ok(merged), + Err(e) => { + tracing::warn!( + target: "calimero_storage::merge", + error = ?e, + "WASM callback also failed, falling back to LWW" + ); + } + } + } + // Fall back to LWW to maintain backwards compatibility. // The incoming value wins if timestamps are equal or incoming is newer. //