diff --git a/crates/runtime/src/merge_callback.rs b/crates/runtime/src/merge_callback.rs index ed2f1b90f..98337bbf1 100644 --- a/crates/runtime/src/merge_callback.rs +++ b/crates/runtime/src/merge_callback.rs @@ -56,9 +56,14 @@ //! # Timeout //! //! WASM merge calls have a configurable timeout (default: 5 seconds) to prevent -//! infinite loops or malicious code from blocking sync. +//! infinite loops or malicious code from blocking sync. When a timeout occurs, the +//! caller is unblocked and `MergeError::WasmTimeout` is returned. Note that due to +//! Wasmer limitations, the underlying WASM execution may continue in a background +//! thread until completion. -use std::sync::Mutex; +use std::sync::mpsc::{self, RecvTimeoutError}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use calimero_storage::collections::crdt_meta::{MergeError, WasmMergeCallback}; use tracing::{debug, warn}; @@ -135,17 +140,17 @@ impl WasmMergeResult { /// This struct holds a reference to the WASM instance and provides /// the merge callback interface for custom CRDT types. /// -/// Uses interior mutability (`Mutex`) for the store because the +/// Uses interior mutability (`Arc`) for the store because the /// `WasmMergeCallback` trait requires `&self` but Wasmer's `Store` -/// needs mutable access for function calls. `Mutex` is used instead -/// of `RefCell` to satisfy the `Send + Sync` bounds on the trait. +/// needs mutable access for function calls. `Arc` is used to +/// satisfy the `Send + Sync` bounds on the trait and to enable +/// timeout enforcement via background threads. pub struct RuntimeMergeCallback { /// The WASM store (interior mutability for trait compatibility). - store: Mutex, + store: Arc>, /// The WASM instance with the application module. - instance: Instance, - /// Timeout for WASM merge operations. - #[allow(dead_code, reason = "Will be used for timeout handling in Issue #1780")] + instance: Arc, + /// Timeout for WASM merge operations in milliseconds. timeout_ms: u64, } @@ -172,8 +177,8 @@ impl RuntimeMergeCallback { "WASM module exports merge functions, creating callback" ); Some(Self { - store: Mutex::new(store), - instance, + store: Arc::new(Mutex::new(store)), + instance: Arc::new(instance), timeout_ms: DEFAULT_MERGE_TIMEOUT_MS, }) } else { @@ -192,115 +197,172 @@ impl RuntimeMergeCallback { self } - /// Get the WASM memory from the instance. - fn get_memory(&self) -> Result<&Memory, MergeError> { - self.instance - .exports - .get_memory("memory") - .map_err(|e| MergeError::WasmCallbackFailed { - message: format!("Failed to get WASM memory: {e}"), - }) - } - - /// Write data to WASM memory and return the pointer. + /// Call a merge function by export name with timeout enforcement. /// - /// This allocates memory in the WASM instance using the `__calimero_alloc` export - /// and writes the data there. - fn write_to_wasm(&self, store: &mut Store, data: &[u8]) -> Result<(u64, u64), MergeError> { - // Get the allocator function - let alloc_fn: TypedFunction = self - .instance - .exports - .get_typed_function(store, "__calimero_alloc") - .map_err(|e| MergeError::WasmCallbackFailed { - message: format!( - "WASM module does not export __calimero_alloc function: {e}. \ - Ensure your app uses #[app::state] macro." - ), - })?; + /// The WASM call is executed in a background thread with a timeout. If the + /// timeout expires, the caller is unblocked and `MergeError::WasmTimeout` is + /// returned. Note that due to Wasmer limitations, the underlying WASM + /// execution may continue in the background thread until completion. + fn call_merge_function( + &self, + export_name: &str, + local: &[u8], + remote: &[u8], + ) -> Result, MergeError> { + let store = Arc::clone(&self.store); + let instance = Arc::clone(&self.instance); + let export_name_owned = export_name.to_string(); + let local_owned = local.to_vec(); + let remote_owned = remote.to_vec(); + let timeout_ms = self.timeout_ms; - // Allocate memory for the data - let len = data.len() as u64; - let ptr = alloc_fn - .call(store, len) - .map_err(|e| MergeError::WasmCallbackFailed { - message: format!("Failed to allocate WASM memory: {e}"), - })?; + debug!( + target: "calimero_runtime::merge", + export_name, + local_len = local.len(), + remote_len = remote.len(), + timeout_ms, + "Calling WASM merge function with timeout" + ); - // Write data to the allocated memory - let memory = self.get_memory()?; - let view = memory.view(store); - view.write(ptr, data) - .map_err(|e| MergeError::WasmCallbackFailed { - message: format!("Failed to write data to WASM memory: {e}"), - })?; + // Channel to receive the result from the worker thread + let (tx, rx) = mpsc::sync_channel(1); - Ok((ptr, len)) - } + // Spawn a thread to execute the WASM call + std::thread::spawn(move || { + let result = Self::execute_merge_call( + &store, + &instance, + &export_name_owned, + &local_owned, + &remote_owned, + ); + // Send result; ignore error if receiver dropped (timeout occurred) + let _ = tx.send(result); + }); - /// Read data from WASM memory at the given pointer and length. - fn read_from_wasm(&self, store: &Store, ptr: u64, len: u64) -> Result, MergeError> { - let memory = self.get_memory()?; - let view = memory.view(store); - let mut buf = vec![0u8; len as usize]; - view.read(ptr, &mut buf) - .map_err(|e| MergeError::WasmCallbackFailed { - message: format!("Failed to read data from WASM memory: {e}"), - })?; - Ok(buf) + // Wait for result with timeout + match rx.recv_timeout(Duration::from_millis(timeout_ms)) { + Ok(result) => result, + Err(RecvTimeoutError::Timeout) => { + warn!( + target: "calimero_runtime::merge", + export_name, + timeout_ms, + "WASM merge operation timed out" + ); + Err(MergeError::WasmTimeout { timeout_ms }) + } + Err(RecvTimeoutError::Disconnected) => { + warn!( + target: "calimero_runtime::merge", + export_name, + "WASM merge thread panicked or disconnected" + ); + Err(MergeError::WasmCallbackFailed { + message: "WASM merge thread panicked".to_string(), + }) + } + } } - /// Call a merge function by export name. - fn call_merge_function( - &self, + /// Execute the actual merge call within the worker thread. + /// + /// This is a static helper method that performs the WASM call with the + /// provided store and instance references. + fn execute_merge_call( + store: &Arc>, + instance: &Arc, export_name: &str, local: &[u8], remote: &[u8], ) -> Result, MergeError> { // Lock the store for the entire operation - let mut store = self - .store - .lock() - .map_err(|e| MergeError::WasmCallbackFailed { - message: format!("Failed to lock WASM store: {e}"), - })?; + let mut store_guard = store.lock().map_err(|e| MergeError::WasmCallbackFailed { + message: format!("Failed to lock WASM store: {e}"), + })?; // Get the merge function - let merge_fn: TypedFunction<(u64, u64, u64, u64), u64> = self - .instance + let merge_fn: TypedFunction<(u64, u64, u64, u64), u64> = instance .exports - .get_typed_function(&*store, export_name) + .get_typed_function(&*store_guard, export_name) .map_err(|e| MergeError::WasmMergeNotExported { export_name: format!("{export_name}: {e}"), })?; - // Write inputs to WASM memory - let (local_ptr, local_len) = self.write_to_wasm(&mut store, local)?; - let (remote_ptr, remote_len) = self.write_to_wasm(&mut store, remote)?; + // Get the allocator function + let alloc_fn: TypedFunction = instance + .exports + .get_typed_function(&*store_guard, "__calimero_alloc") + .map_err(|e| MergeError::WasmCallbackFailed { + message: format!( + "WASM module does not export __calimero_alloc function: {e}. \ + Ensure your app uses #[app::state] macro." + ), + })?; + + // Get memory + let memory = + instance + .exports + .get_memory("memory") + .map_err(|e| MergeError::WasmCallbackFailed { + message: format!("Failed to get WASM memory: {e}"), + })?; - debug!( - target: "calimero_runtime::merge", - export_name, - local_len, - remote_len, - "Calling WASM merge function" - ); + // Write local data to WASM memory + let local_len = local.len() as u64; + let local_ptr = alloc_fn.call(&mut *store_guard, local_len).map_err(|e| { + MergeError::WasmCallbackFailed { + message: format!("Failed to allocate WASM memory for local data: {e}"), + } + })?; + memory + .view(&*store_guard) + .write(local_ptr, local) + .map_err(|e| MergeError::WasmCallbackFailed { + message: format!("Failed to write local data to WASM memory: {e}"), + })?; + + // Write remote data to WASM memory + let remote_len = remote.len() as u64; + let remote_ptr = alloc_fn.call(&mut *store_guard, remote_len).map_err(|e| { + MergeError::WasmCallbackFailed { + message: format!("Failed to allocate WASM memory for remote data: {e}"), + } + })?; + memory + .view(&*store_guard) + .write(remote_ptr, remote) + .map_err(|e| MergeError::WasmCallbackFailed { + message: format!("Failed to write remote data to WASM memory: {e}"), + })?; // Call the merge function - // TODO: Add timeout handling (Issue #1780 acceptance criteria) let result_ptr = merge_fn - .call(&mut *store, local_ptr, local_len, remote_ptr, remote_len) + .call( + &mut *store_guard, + local_ptr, + local_len, + remote_ptr, + remote_len, + ) .map_err(|e| MergeError::WasmCallbackFailed { message: format!("WASM {export_name} call failed: {e}"), })?; // Read the result - let memory = self.get_memory()?; - let result = WasmMergeResult::from_memory(memory, &*store, result_ptr)?; + let result = WasmMergeResult::from_memory(memory, &*store_guard, result_ptr)?; if result.success != 0 { // Success - read the merged data - let merged = self.read_from_wasm(&store, result.data_ptr, result.data_len)?; + let view = memory.view(&*store_guard); + let mut merged = vec![0u8; result.data_len as usize]; + view.read(result.data_ptr, &mut merged).map_err(|e| { + MergeError::WasmCallbackFailed { + message: format!("Failed to read merged data from WASM memory: {e}"), + } + })?; debug!( target: "calimero_runtime::merge", export_name, @@ -310,7 +372,13 @@ impl RuntimeMergeCallback { Ok(merged) } else { // Failure - read the error message - let error_msg = self.read_from_wasm(&store, result.error_ptr, result.error_len)?; + let view = memory.view(&*store_guard); + let mut error_msg = vec![0u8; result.error_len as usize]; + view.read(result.error_ptr, &mut error_msg).map_err(|e| { + MergeError::WasmCallbackFailed { + message: format!("Failed to read error message from WASM memory: {e}"), + } + })?; let error_str = String::from_utf8_lossy(&error_msg).to_string(); warn!( target: "calimero_runtime::merge",