feat(sync): implement WASM merge callback for custom CRDT types#1995
feat(sync): implement WASM merge callback for custom CRDT types#1995
Conversation
- Add WasmMergeCallback trait with merge_custom() and merge_root_state()
- Implement RuntimeMergeCallback for calling WASM exports
- Add #[app::mergeable] macro for generating __calimero_merge_{TypeName}
- Update merge_by_crdt_type_with_callback() for CrdtType::Custom dispatch
- Add comprehensive tests for WASM callback dispatch
- Update team-metrics-custom app as reference implementation
- Update storage/sdk/apps AGENTS.md documentation
Closes #1768
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 92% | Review time: 221.8s
🔴 1 critical, 🟡 4 warnings, 💡 3 suggestions. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-e974fbaf
| let view = memory.view(&self.store); | ||
| view.write(ptr, data) | ||
| .map_err(|e| MergeError::WasmCallbackFailed { | ||
| message: format!("Failed to write data to WASM memory: {e}"), |
There was a problem hiding this comment.
🔴 Memory exhaustion via untrusted WASM length values
The len parameter in read_from_wasm comes from WASM-controlled memory (via result.data_len/result.error_len); a malicious module could return u64::MAX causing the host to allocate unbounded memory.
Suggested fix:
Add a maximum length check (e.g., `if len > MAX_MERGE_RESULT_SIZE { return Err(...) }`) before allocating the buffer, where MAX_MERGE_RESULT_SIZE is a reasonable limit like 64MB.
| })?; | ||
| Ok(buf) | ||
| } | ||
|
|
There was a problem hiding this comment.
🟡 Timeout mechanism declared but never enforced
The timeout_ms field and with_timeout() builder exist, but the TODO on line 214 shows the timeout is never actually applied to the WASM call; a malicious or buggy merge function could block sync indefinitely.
Suggested fix:
Wrap the `merge_fn.call()` in a timeout using `tokio::time::timeout` or Wasmer's metering/fuel mechanism.
|
|
||
| /// Get the WASM memory from the instance. | ||
| fn get_memory(&self) -> Result<&Memory, MergeError> { | ||
| self.instance |
There was a problem hiding this comment.
🟡 Memory leak: WASM allocations are never freed
Every call to write_to_wasm allocates memory via __calimero_alloc for inputs, and the WASM side allocates memory for the result struct and output data, but nothing is ever deallocated; over many sync operations this will exhaust WASM linear memory.
Suggested fix:
Add a `__calimero_free` export to the SDK macros and call it from the runtime after reading results to reclaim memory.
crates/runtime/src/merge_callback.rs
Outdated
| let alloc_fn: TypedFunction<u64, u64> = self | ||
| .instance | ||
| .exports | ||
| .get_typed_function(&self.store, "__calimero_alloc") |
There was a problem hiding this comment.
🟡 No validation of WASM-returned allocation pointer
The pointer returned by __calimero_alloc is used directly for writes without checking if it's within valid WASM memory bounds; a malicious allocator could return arbitrary values.
Suggested fix:
Verify that `ptr + len` is within the WASM memory size before calling `view.write()`.
| let error_str = String::from_utf8_lossy(&error_msg).to_string(); | ||
| warn!( | ||
| target: "calimero_runtime::merge", | ||
| error = %error_str, |
There was a problem hiding this comment.
🟡 Trait implementation returns errors instead of working
The WasmMergeCallback trait methods merge_custom and merge_root_state always return errors and suggest using _mut variants, violating Interface Segregation; if the trait cannot be implemented correctly, consider redesigning the trait to take &mut self or using interior mutability.
Suggested fix:
Refactor `WasmMergeCallback` trait to use `&mut self`, or wrap `Store` in `RefCell`/`Mutex` to enable interior mutability.
| pub struct RuntimeMergeCallback { | ||
| /// The WASM store. | ||
| store: Store, | ||
| /// The WASM instance with the application module. |
There was a problem hiding this comment.
💡 Unused timeout_ms field stored but never used
The timeout_ms field is set via with_timeout() but never checked during WASM calls, which violates YAGNI; consider removing until timeout is actually implemented.
Suggested fix:
Remove `timeout_ms` field and `with_timeout()` method until Issue #1780 timeout handling is implemented, or implement the timeout check now.
| let mod_name = syn::Ident::new( | ||
| &format!("__calimero_merge_{}_impl", type_name_str.to_lowercase()), | ||
| type_name.span(), | ||
| ); |
There was a problem hiding this comment.
💡 DRY violation: duplicated helper functions
The alloc, make_success, and make_error helper functions in the mergeable macro are nearly identical to __calimero_alloc, __calimero_make_merge_success, and __calimero_make_merge_error in state.rs; consider extracting to a shared internal module.
Suggested fix:
Extract these helpers into a shared `calimero_sdk::__internal::merge_helpers` module that both macros can reference.
| #[cfg(target_arch = "wasm32")] | ||
| #[no_mangle] | ||
| pub extern "C" fn __calimero_merge_root_state( | ||
| local_ptr: u64, |
There was a problem hiding this comment.
💡 No input length validation before creating slice from raw pointer
The generated __calimero_merge_root_state creates slices via from_raw_parts trusting the runtime-provided lengths; if local_len or remote_len exceeds actual allocated memory, this causes undefined behavior.
Suggested fix:
Add a maximum length check (e.g., reject lengths > 100MB) before creating slices to guard against corrupted or malicious inputs.
This comment has been minimized.
This comment has been minimized.
- Add merge_callback field to VMLogic for custom type merging - Create separate WASM instance for merge callbacks in Module::run() - Pass callback through RuntimeEnv to storage layer - Update host functions to use build_runtime_env_with_merge() - Storage try_merge_non_root_internal now retrieves callback from env - Update AGENTS.md documentation for runtime and storage Performance optimization tracked in issue #1997
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 97% | Review time: 239.3s
🔴 1 critical, 🟡 5 warnings, 💡 4 suggestions, 📝 1 nitpicks. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-778076c8
| let view = memory.view(store); | ||
| view.write(ptr, data) | ||
| .map_err(|e| MergeError::WasmCallbackFailed { | ||
| message: format!("Failed to write data to WASM memory: {e}"), |
There was a problem hiding this comment.
🔴 Unbounded memory allocation from untrusted WASM return value
The data_len and error_len fields are read directly from WASM memory and used to allocate host memory via read_from_wasm() without any size limits, allowing a malicious WASM module to cause denial-of-service through memory exhaustion.
Suggested fix:
Add a maximum allowed length constant (e.g., MAX_MERGE_RESULT_SIZE) and validate `result.data_len` and `result.error_len` against it before calling `read_from_wasm()`.
| "WASM module does not export __calimero_alloc function: {e}. \ | ||
| Ensure your app uses #[app::state] macro." | ||
| ), | ||
| })?; |
There was a problem hiding this comment.
🟡 Missing timeout enforcement allows infinite loop DoS
The TODO comment indicates timeout handling is not implemented; a malicious WASM merge function could run indefinitely and block sync operations, causing denial-of-service.
Suggested fix:
Implement the timeout mechanism using Wasmer's metering or async timeout wrappers before this code reaches production.
crates/runtime/src/lib.rs
Outdated
| /// Returns `None` if the WASM module doesn't export the required merge functions. | ||
| fn create_merge_callback(&self) -> Option<merge_callback::RuntimeMergeCallback> { | ||
| let mut store = Store::new(self.engine.clone()); | ||
| let instance = Instance::new(&mut store, &self.module, &wasmer::Imports::default()).ok()?; |
There was a problem hiding this comment.
🟡 Silent failure when creating merge WASM instance
Using .ok()? on Instance::new() silently discards errors; if the module requires imports, this fails without any logging, making debugging difficult.
Suggested fix:
Log at debug level when instance creation fails, e.g., `Instance::new(...).map_err(|e| debug!("Merge callback instance failed: {e}")).ok()?`
| /// Get the WASM memory from the instance. | ||
| fn get_memory(&self) -> Result<&Memory, MergeError> { | ||
| self.instance | ||
| .exports |
There was a problem hiding this comment.
🟡 No validation of WASM allocation return value
The pointer returned by __calimero_alloc is used directly without checking if allocation failed (ptr == 0 or other error indicator), which could lead to writing data to unintended WASM memory locations.
Suggested fix:
Check that the returned pointer is non-zero and within expected bounds before writing data.
| /// This allocates memory in the WASM instance using the `__calimero_alloc` export | ||
| /// and writes the data there. | ||
| fn write_to_wasm(&self, store: &mut Store, data: &[u8]) -> Result<(u64, u64), MergeError> { | ||
| // Get the allocator function |
There was a problem hiding this comment.
🟡 WASM memory leak: allocations never freed
Each call to write_to_wasm allocates memory via __calimero_alloc but there's no corresponding deallocation; over many merges, WASM linear memory will grow unboundedly until the instance is dropped.
Suggested fix:
Add a `__calimero_dealloc` export and call it after reading results, or document that memory is only reclaimed when the merge instance is dropped (which happens per Module::run).
| let output = quote! { | ||
| #impl_block | ||
|
|
||
| // ============================================================================ |
There was a problem hiding this comment.
🟡 Generated alloc function leaks memory
The generated alloc function in make_success and make_error allocates memory that is never freed; each merge call accumulates allocations in WASM linear memory.
Suggested fix:
Since the merge callback creates a fresh WASM instance per Module::run(), this is bounded but wasteful; consider documenting this trade-off or adding a dealloc export for future optimization.
| /// WASM merge callback implementation for the Calimero runtime. | ||
| /// | ||
| /// This struct holds a reference to the WASM instance and provides | ||
| /// the merge callback interface for custom CRDT types. |
There was a problem hiding this comment.
💡 from_instance only checks root state export, may miss custom-only modules
The check only verifies __calimero_merge_root_state exists; modules with only custom type merges (no root state merge) will return None and won't get a callback.
Suggested fix:
Consider also checking for any `__calimero_merge_` prefixed exports, or document this limitation clearly.
| ); | ||
|
|
||
| let output = quote! { | ||
| #impl_block |
There was a problem hiding this comment.
💡 Generated alloc function panics on invalid size
The generated alloc function uses expect() which will panic on invalid layout; while sandboxed in WASM, this could cause unexpected merge failures for edge-case sizes.
Suggested fix:
Return 0 on allocation failure instead of panicking, and handle the 0 return value on the host side.
crates/runtime/src/lib.rs
Outdated
| /// Returns `None` if the WASM module doesn't export the required merge functions. | ||
| fn create_merge_callback(&self) -> Option<merge_callback::RuntimeMergeCallback> { | ||
| let mut store = Store::new(self.engine.clone()); | ||
| let instance = Instance::new(&mut store, &self.module, &wasmer::Imports::default()).ok()?; |
There was a problem hiding this comment.
💡 New WASM instance created per Module::run()
Creating a fresh WASM instance for each merge callback adds instantiation overhead; for high-frequency sync scenarios this could become a bottleneck.
Suggested fix:
Already tracked in issue #1997 for future optimization via instance caching/pooling.
| blob_handles: HashMap<u64, BlobHandle>, | ||
| /// The next available file descriptor for a new blob handle. | ||
| next_blob_fd: u64, | ||
|
|
There was a problem hiding this comment.
💡 DRY: Repeated complex closure type should be a type alias
The type std::rc::Rc<dyn Fn(&[u8], &[u8], &str) -> Result<Vec<u8>, MergeError>> is duplicated 4+ times across logic.rs and system.rs; extract to a type alias for maintainability.
Suggested fix:
Add `pub type MergeCallback = std::rc::Rc<dyn Fn(&[u8], &[u8], &str) -> Result<Vec<u8>, MergeError>>;` in merge_callback.rs and use it throughout.
This comment has been minimized.
This comment has been minimized.
- Fix test failures by converting NoWasmCallback to WasmRequired in merge_by_crdt_type() - the simple API signals 'type requires WASM' rather than 'callback wasn't provided' - Add stub imports for merge callback WASM instance to satisfy module import requirements (host functions)
The error is self-explanatory: NoWasmCallback means a WASM callback was needed but not provided. No conversion needed.
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 89% | Review time: 319.6s
🔴 2 critical, 🟡 4 warnings, 💡 3 suggestions. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-a0507fd1
| let view = memory.view(store); | ||
| let mut buf = vec![0u8; len as usize]; | ||
| view.read(ptr, &mut buf) | ||
| .map_err(|e| MergeError::WasmCallbackFailed { |
There was a problem hiding this comment.
🟡 Timeout not enforced allowing infinite loops in WASM
The timeout_ms field exists but the call_merge_function method doesn't implement timeout enforcement. A malicious or buggy WASM merge function can block indefinitely, causing denial-of-service during sync operations.
Suggested fix:
Wrap the `merge_fn.call()` invocation with a timeout mechanism (e.g., using `tokio::time::timeout` or Wasmer's metering/fuel features) before merging to master.
| .store | ||
| .lock() | ||
| .map_err(|e| MergeError::WasmCallbackFailed { | ||
| message: format!("Failed to lock WASM store: {e}"), |
There was a problem hiding this comment.
🟡 WASM memory allocations never freed during merge operations
Each write_to_wasm call allocates memory via __calimero_alloc but never frees it; during sync with many entity merges, this could exhaust the WASM instance's linear memory within a single Module::run() call.
Suggested fix:
Consider implementing a free export or reusing a single pre-allocated buffer for merge operations to bound memory growth.
| @@ -254,6 +298,17 @@ impl Module { | |||
| context_host, | |||
There was a problem hiding this comment.
💡 New WASM instance created for every Module::run() call
A separate WASM instance with full module compilation is created for merge callbacks on every run; for high-frequency calls this adds latency.
Suggested fix:
As noted in Issue #1997, consider caching or pooling merge callback instances per Module.
| return syn::Error::new_spanned(&impl_block.self_ty, "Expected a type name") | ||
| .to_compile_error() | ||
| .into(); | ||
| } |
There was a problem hiding this comment.
💡 Helper functions duplicated per #[app::mergeable] invocation
Each use of #[app::mergeable] generates its own alloc, make_success, and make_error functions; when multiple types use this macro, the WASM binary will contain duplicate code.
Suggested fix:
Consider extracting these helpers to a shared module in calimero-sdk (e.g., `calimero_sdk::merge_helpers`) that apps can import, reducing generated code size.
| } | ||
| result_ptr | ||
| } | ||
|
|
There was a problem hiding this comment.
💡 Generated WASM allocator uses expect() which panics on failure
The generated alloc function uses .expect() which will panic on invalid allocation sizes. While WASM panics are sandboxed, they will abort the merge operation without a clean error message to the host.
Suggested fix:
Return an error result pointer (with success=0) instead of panicking to provide better error diagnostics to the runtime.
| @@ -259,6 +259,19 @@ pub struct VMLogic<'a> { | |||
| blob_handles: HashMap<u64, BlobHandle>, | |||
There was a problem hiding this comment.
💡 Extract type alias for complex callback signature
The verbose closure type Option<Rc<dyn Fn(&[u8], &[u8], &str) -> Result<Vec<u8>, MergeError>>> is repeated three times (field, setter, getter); a type alias would improve readability and maintainability.
Suggested fix:
Add `type MergeCallbackFn = Rc<dyn Fn(&[u8], &[u8], &str) -> Result<Vec<u8>, MergeError>>;` and use `Option<MergeCallbackFn>` in all three locations.
| /// We create stub imports that satisfy the module's requirements | ||
| /// but trap if actually called. | ||
| fn create_stub_imports(&self, store: &mut Store) -> wasmer::Imports { | ||
| let mut imports = wasmer::Imports::new(); |
There was a problem hiding this comment.
💡 Document potential panic on poisoned mutex in merge callback
The callback wraps RuntimeMergeCallback which uses Mutex internally; if the WASM merge panics, subsequent calls could encounter a poisoned mutex leading to Err from lock().
Suggested fix:
Document this edge case behavior in the callback creation or consider using `parking_lot::Mutex` which doesn't poison on panic.
This comment has been minimized.
This comment has been minimized.
- Fix zero-size allocation UB in SDK macros (#22, #23) Guard against size==0 which is undefined behavior per GlobalAlloc contract - Add MAX_MERGE_RESULT_SIZE (64MB) check in merge_callback.rs (#2,11,21,24) Prevents DoS via malicious WASM returning huge length values - Add debug logging for merge callback instance creation failure (#13) Helps debugging when WASM instantiation fails
Review Comments Addressed✅ Fixed in this push (7d5256e)
🔄 Previously Fixed
📋 Known Limitations (by design)
💡 Deferred for Follow-up
|
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 100% | Review time: 356.7s
🟡 2 warnings, 💡 3 suggestions. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-16204642
| // Lock the store for the entire operation | ||
| let mut store = self | ||
| .store | ||
| .lock() |
There was a problem hiding this comment.
🟡 Missing timeout enforcement allows DoS via infinite loop in WASM merge
The timeout_ms field exists but is marked dead_code and the TODO shows timeout is not enforced; a malicious WASM module could run an infinite loop during merge, blocking sync indefinitely.
Suggested fix:
Implement timeout enforcement before merging to master, or document that this PR requires Issue #1780 to be complete for production use.
|
|
||
| fn alloc(size: u64) -> u64 { | ||
| // Guard against zero-size allocation (UB per GlobalAlloc contract) | ||
| if size == 0 { |
There was a problem hiding this comment.
🟡 Null pointer not checked after allocation failure
The generated alloc function returns std::alloc::alloc(layout) as u64 without checking for null; on OOM, subsequent copy_nonoverlapping to address 0 is undefined behavior.
Suggested fix:
Check if the returned pointer is null and call `std::alloc::handle_alloc_error(layout)` or return an error result before using the pointer.
| merged_len = merged.len(), | ||
| "WASM merge function succeeded" | ||
| ); | ||
| Ok(merged) |
There was a problem hiding this comment.
💡 Minimal unit test coverage for merge callback module
The module only has one trivial test for struct size; functions like call_merge_function, read_from_wasm, and result parsing lack unit tests despite being core to merge correctness.
Suggested fix:
Add tests for `WasmMergeResult::from_memory` with valid/invalid data, and for `read_from_wasm` with size exceeding MAX_MERGE_RESULT_SIZE.
| merge_callback: Option< | ||
| std::rc::Rc< | ||
| dyn Fn( | ||
| &[u8], |
There was a problem hiding this comment.
💡 DRY: Merge callback type repeated three times
The verbose Rc<dyn Fn(&[u8], &[u8], &str) -> Result<Vec<u8>, MergeError>> type appears identically in the field, setter, and getter; a type alias would improve maintainability.
Suggested fix:
Define `type MergeCallbackFn = Rc<dyn Fn(&[u8], &[u8], &str) -> Result<Vec<u8>, MergeError>>;` and use it in all three locations.
| @@ -819,10 +836,13 @@ impl VMHostFunctions<'_> { | |||
| target: "runtime::host::system", | |||
| "apply_storage_delta using context id" | |||
| ); | |||
There was a problem hiding this comment.
💡 DRY: Repeated pattern for building RuntimeEnv with merge callback
The sequence let merge_callback = logic.merge_callback(); let env = build_runtime_env_with_merge(...) is duplicated 5 times; extracting a helper method would reduce repetition and risk of inconsistency.
Suggested fix:
Add a method like `fn make_runtime_env(&mut self) -> RuntimeEnv` on VMLogic that encapsulates this pattern.
E2E Blockchain Proposals FailedThe following proposal workflow(s) failed:
Please check the workflow logs for more details. |
E2E Rust Apps FailedOne or more E2E workflows (e2e-kv-store, xcall-example) failed after retries. Please check the workflow logs for more details. |
NonNull::dangling() can't be cast to u64 in WASM context (E0641). Use simple non-null sentinel value (1) instead.
There was a problem hiding this comment.
🤖 AI Code Reviewer
Reviewed by 3 agents | Quality score: 89% | Review time: 263.6s
🟡 3 warnings, 💡 3 suggestions. See inline comments.
🤖 Generated by AI Code Reviewer | Review ID: review-8fcfc4d9
| Ok(buf) | ||
| } | ||
|
|
||
| /// Call a merge function by export name. |
There was a problem hiding this comment.
🟡 Missing timeout enforcement allows DoS via infinite loops
The timeout_ms field is declared but unused; malicious WASM merge functions can infinite loop and block sync indefinitely.
Suggested fix:
Implement actual timeout enforcement using the existing `timeout_ms` field before merging, or document this as a known limitation with a tracking issue.
| std::rc::Rc::new(move |local: &[u8], remote: &[u8], type_name: &str| { | ||
| merge_callback.merge_custom(local, remote, type_name) | ||
| }); | ||
| logic.with_merge_callback(callback_rc); |
There was a problem hiding this comment.
Root WASM merge path remains disconnected
High Severity
The new callback plumbing only forwards merge_custom(local, remote, type_name). The WasmMergeCallback::merge_root_state path is never wired into RuntimeEnv, so root conflicts still cannot dispatch to __calimero_merge_root_state and can fail with host-side root-merge errors instead of using the new WASM merge flow.
Additional Locations (2)
| } | ||
|
|
||
| /// Create stub imports for the merge callback instance. | ||
| /// |
There was a problem hiding this comment.
🟡 Callback closure only exposes merge_custom, merge_root_state is inaccessible
The Rc closure wraps only merge_callback.merge_custom(), but RuntimeMergeCallback also implements merge_root_state() which becomes unreachable through the stored callback interface.
Suggested fix:
If root state merging via WASM is needed, either change the callback signature to support both operations or store the full RuntimeMergeCallback.
| } | ||
|
|
||
| /// Set the timeout for WASM merge operations. | ||
| #[must_use] |
There was a problem hiding this comment.
💡 Asymmetric size validation between read and write paths
read_from_wasm validates against MAX_MERGE_RESULT_SIZE but write_to_wasm has no size limit; a malicious or buggy caller could attempt to write unbounded data.
Suggested fix:
Add a size check in `write_to_wasm` similar to `read_from_wasm`, or document why it's safe to omit (e.g., if input sizes are bounded upstream).
| ), | ||
| })?; | ||
|
|
||
| // Allocate memory for the data |
There was a problem hiding this comment.
💡 No input size validation before writing to WASM memory
The write_to_wasm function writes caller-provided data without checking against a maximum size, potentially exhausting WASM memory if storage passes huge buffers.
Suggested fix:
Add a size check similar to `MAX_MERGE_RESULT_SIZE` before writing data to WASM memory.
| .exports | ||
| .get_memory("memory") | ||
| .map_err(|e| MergeError::WasmCallbackFailed { | ||
| message: format!("Failed to get WASM memory: {e}"), |
There was a problem hiding this comment.
🟡 WASM memory allocated but never freed during merge operations
Each call to write_to_wasm allocates memory via __calimero_alloc for local and remote data, but there's no corresponding deallocation after the merge completes, causing memory growth in the WASM instance during sync.
Suggested fix:
Either export and call a `__calimero_dealloc` function after reading results, or document that the WASM instance is short-lived per Module::run() so leaks are bounded.
| @@ -819,10 +836,13 @@ impl VMHostFunctions<'_> { | |||
| target: "runtime::host::system", | |||
There was a problem hiding this comment.
💡 DRY: Repeated pattern for building RuntimeEnv with merge callback
The pattern of calling logic.merge_callback() then build_runtime_env_with_merge(storage, context_id, executor_key, callback) is repeated 5 times; consider extracting a helper method on VMLogic like fn make_runtime_env(&mut self) -> RuntimeEnv.
Suggested fix:
Add `fn make_runtime_env(&mut self) -> RuntimeEnv` to VMLogic that encapsulates this pattern, reducing the 5 call sites to single method calls.
|
Bugbot Autofix prepared fixes for 1 of the 1 bugs found in the latest run.
Or push these changes by commenting: Preview (8220195cdb)diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs
--- a/crates/runtime/src/lib.rs
+++ b/crates/runtime/src/lib.rs
@@ -312,11 +312,23 @@
// This allows the storage layer to call merge functions even while
// the main WASM execution is in progress.
if let Some(merge_callback) = self.create_merge_callback() {
- let callback_rc =
+ // Wrap the callback in Arc so we can share it between the two closures
+ let merge_callback = std::sync::Arc::new(merge_callback);
+
+ // Custom type merge callback
+ let custom_callback = std::sync::Arc::clone(&merge_callback);
+ let custom_rc =
std::rc::Rc::new(move |local: &[u8], remote: &[u8], type_name: &str| {
- merge_callback.merge_custom(local, remote, type_name)
+ custom_callback.merge_custom(local, remote, type_name)
});
- logic.with_merge_callback(callback_rc);
+ logic.with_merge_callback(custom_rc);
+
+ // Root state merge callback
+ let root_callback = merge_callback;
+ let root_rc = std::rc::Rc::new(move |local: &[u8], remote: &[u8]| {
+ root_callback.merge_root_state(local, remote)
+ });
+ logic.with_root_merge_callback(root_rc);
}
let mut store = Store::new(self.engine.clone());
diff --git a/crates/runtime/src/logic.rs b/crates/runtime/src/logic.rs
--- a/crates/runtime/src/logic.rs
+++ b/crates/runtime/src/logic.rs
@@ -272,6 +272,19 @@
-> Result<Vec<u8>, calimero_storage::collections::crdt_meta::MergeError>,
>,
>,
+
+ /// Optional WASM merge callback for root state merging during sync.
+ /// This is called by the storage layer when merging root state and no
+ /// in-process merge function is registered.
+ root_merge_callback: Option<
+ std::rc::Rc<
+ dyn Fn(
+ &[u8],
+ &[u8],
+ )
+ -> Result<Vec<u8>, calimero_storage::collections::crdt_meta::MergeError>,
+ >,
+ >,
}
impl<'a> VMLogic<'a> {
@@ -324,8 +337,9 @@
blob_handles: HashMap::new(),
next_blob_fd: 1,
- // Merge callback (set separately via with_merge_callback)
+ // Merge callbacks (set separately via with_merge_callback / with_root_merge_callback)
merge_callback: None,
+ root_merge_callback: None,
}
}
@@ -364,6 +378,38 @@
self.merge_callback.clone()
}
+ /// Sets the WASM merge callback for root state merging.
+ ///
+ /// This callback is used by the storage layer when merging root state
+ /// and no in-process merge function is registered.
+ pub fn with_root_merge_callback(
+ &mut self,
+ callback: std::rc::Rc<
+ dyn Fn(
+ &[u8],
+ &[u8],
+ )
+ -> Result<Vec<u8>, calimero_storage::collections::crdt_meta::MergeError>,
+ >,
+ ) {
+ self.root_merge_callback = Some(callback);
+ }
+
+ /// Returns the root merge callback if set.
+ pub fn root_merge_callback(
+ &self,
+ ) -> Option<
+ std::rc::Rc<
+ dyn Fn(
+ &[u8],
+ &[u8],
+ )
+ -> Result<Vec<u8>, calimero_storage::collections::crdt_meta::MergeError>,
+ >,
+ > {
+ self.root_merge_callback.clone()
+ }
+
/// Associates a Wasmer memory instance with this `VMLogic`.
///
/// This method should be called after the guest module is instantiated but before
diff --git a/crates/runtime/src/logic/host_functions/js_collections.rs b/crates/runtime/src/logic/host_functions/js_collections.rs
--- a/crates/runtime/src/logic/host_functions/js_collections.rs
+++ b/crates/runtime/src/logic/host_functions/js_collections.rs
@@ -29,13 +29,15 @@
impl VMHostFunctions<'_> {
fn make_runtime_env(&mut self) -> VMLogicResult<RuntimeEnv> {
self.with_logic_mut(|logic| {
- // Get callback before borrowing storage to avoid borrow conflicts
+ // Get callbacks before borrowing storage to avoid borrow conflicts
let merge_callback = logic.merge_callback();
+ let root_merge_callback = logic.root_merge_callback();
Ok(build_runtime_env_with_merge(
logic.storage,
logic.context.context_id,
logic.context.executor_public_key,
merge_callback,
+ root_merge_callback,
))
})
}
diff --git a/crates/runtime/src/logic/host_functions/system.rs b/crates/runtime/src/logic/host_functions/system.rs
--- a/crates/runtime/src/logic/host_functions/system.rs
+++ b/crates/runtime/src/logic/host_functions/system.rs
@@ -31,6 +31,7 @@
/// storage crate to resolve reads/writes against the live context storage.
///
/// The optional `merge_callback` is used for custom type merging via WASM.
+/// The optional `root_merge_callback` is used for root state merging via WASM.
pub(super) fn build_runtime_env_with_merge(
storage: &mut dyn RuntimeStorage,
context_id: [u8; DIGEST_SIZE],
@@ -45,6 +46,15 @@
-> Result<Vec<u8>, calimero_storage::collections::crdt_meta::MergeError>,
>,
>,
+ root_merge_callback: Option<
+ std::rc::Rc<
+ dyn Fn(
+ &[u8],
+ &[u8],
+ )
+ -> Result<Vec<u8>, calimero_storage::collections::crdt_meta::MergeError>,
+ >,
+ >,
) -> RuntimeEnv {
let raw_ptr: *mut dyn RuntimeStorage = storage;
let (data_ptr, vtable_ptr): (*mut (), *mut ()) = unsafe { mem::transmute(raw_ptr) };
@@ -101,12 +111,14 @@
// the storage crate falls back to its default environment, so subsequent
// calls that do not install an override will continue to use the mock /
// WASM backends.
- let env = RuntimeEnv::new(reader, writer, remover, context_id, executor_id);
+ let mut env = RuntimeEnv::new(reader, writer, remover, context_id, executor_id);
if let Some(callback) = merge_callback {
- env.with_merge_callback(callback)
- } else {
- env
+ env = env.with_merge_callback(callback);
}
+ if let Some(callback) = root_merge_callback {
+ env = env.with_root_merge_callback(callback);
+ }
+ env
}
thread_local! {
@@ -836,13 +848,15 @@
target: "runtime::host::system",
"apply_storage_delta using context id"
);
- // Get callback before borrowing storage to avoid borrow conflicts
+ // Get callbacks before borrowing storage to avoid borrow conflicts
let merge_callback = logic.merge_callback();
+ let root_merge_callback = logic.root_merge_callback();
let env = build_runtime_env_with_merge(
logic.storage,
logic.context.context_id,
logic.context.executor_public_key,
merge_callback,
+ root_merge_callback,
);
let payload = payload_opt
@@ -881,13 +895,15 @@
.iter()
.map(|byte| format!("{byte:02x}"))
.collect();
- // Get callback before borrowing storage to avoid borrow conflicts
+ // Get callbacks before borrowing storage to avoid borrow conflicts
let merge_callback = logic.merge_callback();
+ let root_merge_callback = logic.root_merge_callback();
let env = build_runtime_env_with_merge(
logic.storage,
logic.context.context_id,
logic.context.executor_public_key,
merge_callback,
+ root_merge_callback,
);
let maybe_bytes =
@@ -944,13 +960,15 @@
"apply_storage_delta start"
);
- // Get callback before borrowing storage to avoid borrow conflicts
+ // Get callbacks before borrowing storage to avoid borrow conflicts
let merge_callback = logic.merge_callback();
+ let root_merge_callback = logic.root_merge_callback();
let env = build_runtime_env_with_merge(
logic.storage,
logic.context.context_id,
logic.context.executor_public_key,
merge_callback,
+ root_merge_callback,
);
with_runtime_env(env.clone(), || {
@@ -1002,13 +1020,15 @@
/// Returns `1` if a delta was emitted, `0` if there was nothing to commit.
pub fn flush_delta(&mut self) -> VMLogicResult<i32> {
self.with_logic_mut(|logic| -> VMLogicResult<i32> {
- // Get callback before borrowing storage to avoid borrow conflicts
+ // Get callbacks before borrowing storage to avoid borrow conflicts
let merge_callback = logic.merge_callback();
+ let root_merge_callback = logic.root_merge_callback();
let env = build_runtime_env_with_merge(
logic.storage,
logic.context.context_id,
logic.context.executor_public_key,
merge_callback,
+ root_merge_callback,
);
let root_hash = with_runtime_env(env.clone(), || {
diff --git a/crates/storage/src/env.rs b/crates/storage/src/env.rs
--- a/crates/storage/src/env.rs
+++ b/crates/storage/src/env.rs
@@ -72,6 +72,13 @@
) -> Result<Vec<u8>, crate::collections::crdt_meta::MergeError>,
>,
>,
+ /// Optional WASM merge callback for root state merging.
+ /// This is used during sync when no in-process merge function is registered.
+ wasm_root_merge_callback: Option<
+ std::rc::Rc<
+ dyn Fn(&[u8], &[u8]) -> Result<Vec<u8>, crate::collections::crdt_meta::MergeError>,
+ >,
+ >,
}
#[cfg(not(target_arch = "wasm32"))]
@@ -96,6 +103,7 @@
context_id,
executor_id,
wasm_merge_callback: None,
+ wasm_root_merge_callback: None,
}
}
@@ -118,6 +126,22 @@
self
}
+ /// Sets the WASM merge callback for root state merging.
+ ///
+ /// The callback receives (local_bytes, remote_bytes) and returns
+ /// the merged bytes or an error. This is used when no in-process
+ /// merge function is registered for the root entity type.
+ #[must_use]
+ pub fn with_root_merge_callback(
+ mut self,
+ callback: std::rc::Rc<
+ dyn Fn(&[u8], &[u8]) -> Result<Vec<u8>, crate::collections::crdt_meta::MergeError>,
+ >,
+ ) -> Self {
+ self.wasm_root_merge_callback = Some(callback);
+ self
+ }
+
/// Returns the WASM merge callback if set.
#[must_use]
pub fn wasm_merge_callback(
@@ -134,7 +158,19 @@
self.wasm_merge_callback.clone()
}
+ /// Returns the WASM root merge callback if set.
#[must_use]
+ pub fn wasm_root_merge_callback(
+ &self,
+ ) -> Option<
+ std::rc::Rc<
+ dyn Fn(&[u8], &[u8]) -> Result<Vec<u8>, crate::collections::crdt_meta::MergeError>,
+ >,
+ > {
+ self.wasm_root_merge_callback.clone()
+ }
+
+ #[must_use]
/// Returns the storage read callback.
pub fn storage_read(&self) -> std::rc::Rc<dyn Fn(&Key) -> Option<Vec<u8>>> {
self.storage_read.clone()
@@ -184,6 +220,18 @@
mocked::get_wasm_merge_callback()
}
+#[cfg(not(target_arch = "wasm32"))]
+/// Returns the current WASM root merge callback from the runtime environment, if any.
+///
+/// This is called by storage merge functions to merge root state via WASM
+/// when no in-process merge function is registered.
+#[must_use]
+pub fn get_wasm_root_merge_callback() -> Option<
+ std::rc::Rc<dyn Fn(&[u8], &[u8]) -> Result<Vec<u8>, crate::collections::crdt_meta::MergeError>>,
+> {
+ mocked::get_wasm_root_merge_callback()
+}
+
/// Commits the root hash to the runtime.
///
#[expect(clippy::missing_const_for_fn, reason = "Cannot be const here")]
@@ -595,6 +643,19 @@
RUNTIME_ENV.with(|env| env.borrow().as_ref().and_then(|e| e.wasm_merge_callback()))
}
+ /// Returns the WASM root merge callback from the current runtime environment.
+ pub(super) fn get_wasm_root_merge_callback() -> Option<
+ std::rc::Rc<
+ dyn Fn(&[u8], &[u8]) -> Result<Vec<u8>, crate::collections::crdt_meta::MergeError>,
+ >,
+ > {
+ RUNTIME_ENV.with(|env| {
+ env.borrow()
+ .as_ref()
+ .and_then(|e| e.wasm_root_merge_callback())
+ })
+ }
+
/// Resets the environment state for testing.
///
/// Clears the thread-local ROOT_HASH, HLC, and STORAGE, allowing multiple tests
diff --git a/crates/storage/src/merge.rs b/crates/storage/src/merge.rs
--- a/crates/storage/src/merge.rs
+++ b/crates/storage/src/merge.rs
@@ -106,6 +106,17 @@
match try_merge_registered(existing, incoming, existing_ts, incoming_ts) {
MergeRegistryResult::Success(merged) => Ok(merged),
MergeRegistryResult::NoFunctionsRegistered => {
+ // No in-process merge function registered.
+ // Try WASM callback if available (RuntimeEnv provides this).
+ #[cfg(not(target_arch = "wasm32"))]
+ if let Some(wasm_callback) = crate::env::get_wasm_root_merge_callback() {
+ tracing::debug!(
+ target: "calimero_storage::merge",
+ "No in-process merge function registered, trying WASM callback"
+ );
+ return wasm_callback(existing, incoming);
+ }
+
// I5 Enforcement: No silent data loss
//
// If the root entity contains CRDTs (Counter, etc.) and no merge function
@@ -122,6 +133,25 @@
// - The data type doesn't match any registered type (test contamination)
// - Deserialization failed (corrupt data)
//
+ // Try WASM callback as a fallback before LWW.
+ #[cfg(not(target_arch = "wasm32"))]
+ if let Some(wasm_callback) = crate::env::get_wasm_root_merge_callback() {
+ tracing::debug!(
+ target: "calimero_storage::merge",
+ "All registered merge functions failed, trying WASM callback"
+ );
+ match wasm_callback(existing, incoming) {
+ Ok(merged) => return Ok(merged),
+ Err(e) => {
+ tracing::warn!(
+ target: "calimero_storage::merge",
+ error = ?e,
+ "WASM callback also failed, falling back to LWW"
+ );
+ }
+ }
+ }
+
// Fall back to LWW to maintain backwards compatibility.
// The incoming value wins if timestamps are equal or incoming is newer.
// |
|
This pull request has been automatically marked as stale. If this pull request is still relevant, please leave any comment (for example, "bump"), and we'll keep it open. We are sorry that we haven't been able to prioritize reviewing it yet. Your contribution is very much appreciated. |



Summary
Implements the WASM merge callback infrastructure for custom CRDT types, enabling applications to define their own merge logic via WASM exports.
Changes
Storage Crate
WasmMergeCallbacktrait incollections/crdt_meta.rs:merge_custom(local, remote, type_name)- for entity-level custom typesmerge_root_state(local, remote)- for root state mergesMergeErrorvariants:NoWasmCallback,WasmCallbackFailed,WasmMergeNotExported,WasmTimeoutmerge_by_crdt_type_with_callback()- dispatches to WASM callback forCrdtType::CustomRuntime Crate
RuntimeMergeCallbackstruct implementingWasmMergeCallback:merge_root_state_mut()- calls__calimero_merge_root_stateWASM exportmerge_custom_mut()- calls__calimero_merge_{TypeName}WASM export__calimero_allocexportWasmMergeResultstruct for deserializing WASM return valuesSDK Macros
#[app::state]macro now generates:__calimero_alloc- memory allocator for WASM__calimero_merge_root_state- root state merge export#[app::mergeable]macro (NEW) generates:__calimero_merge_{TypeName}- type-specific merge export for custom typesExample App
team-metrics-customto use#[app::mergeable]forTeamStatsDocumentation
crates/storage/AGENTS.mdwith merge architecturecrates/storage/readme/merging.mdwith WASM callback flowcrates/sdk/AGENTS.mdwith new macro documentationapps/AGENTS.mdwith custom CRDT examplesTesting
team-metrics-customcompiles to WASM with all expected exportsWASM Exports Generated
Merge Decision Tree
Closes #1768
Supersedes #1940 (closed as outdated)
Note
High Risk
Touches core sync/merge behavior across
storage,runtime, and SDK macros, and introduces WASM execution during conflict resolution; failures can now hard-error merges for custom types, impacting data convergence and availability.Overview
Enables custom CRDT entity merges during sync by introducing a runtime-provided WASM callback path for
CrdtType::Custom, instead of falling back to LWW or returning a generic error.Storage now defines a
WasmMergeCallbackinterface plus newMergeErrorvariants, routesCrdtType::Customthroughmerge_by_crdt_type_with_callback(), and threads an optional callback viaRuntimeEnv/Interface(treating missing/failed callbacks for custom types as I5 hard errors, while built-ins can still LWW-fallback on deserialization issues).Runtime creates a separate Wasmer instance per
Module::run()to avoid reentrancy and implementsRuntimeMergeCallbackthat calls__calimero_alloc,__calimero_merge_root_state, and__calimero_merge_{TypeName}exports. SDK adds#[app::mergeable](generates__calimero_merge_{TypeName}) and extends#[app::state]to export allocator/root merge; theteam-metrics-customapp and tests/docs are updated accordingly (including renaming expected error fromWasmRequiredtoNoWasmCallback).Written by Cursor Bugbot for commit d3c3b1b. This will update automatically on new commits. Configure here.