Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 158 additions & 90 deletions crates/runtime/src/merge_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@
//! # Timeout
//!
//! WASM merge calls have a configurable timeout (default: 5 seconds) to prevent
//! infinite loops or malicious code from blocking sync.
//! infinite loops or malicious code from blocking sync. When a timeout occurs, the
//! caller is unblocked and `MergeError::WasmTimeout` is returned. Note that due to
//! Wasmer limitations, the underlying WASM execution may continue in a background
//! thread until completion.

use std::sync::Mutex;
use std::sync::mpsc::{self, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use calimero_storage::collections::crdt_meta::{MergeError, WasmMergeCallback};
use tracing::{debug, warn};
Expand Down Expand Up @@ -135,17 +140,17 @@ impl WasmMergeResult {
/// This struct holds a reference to the WASM instance and provides
/// the merge callback interface for custom CRDT types.
///
/// Uses interior mutability (`Mutex`) for the store because the
/// Uses interior mutability (`Arc<Mutex>`) for the store because the
/// `WasmMergeCallback` trait requires `&self` but Wasmer's `Store`
/// needs mutable access for function calls. `Mutex` is used instead
/// of `RefCell` to satisfy the `Send + Sync` bounds on the trait.
/// needs mutable access for function calls. `Arc<Mutex>` is used to
/// satisfy the `Send + Sync` bounds on the trait and to enable
/// timeout enforcement via background threads.
pub struct RuntimeMergeCallback {
/// The WASM store (interior mutability for trait compatibility).
store: Mutex<Store>,
store: Arc<Mutex<Store>>,
/// The WASM instance with the application module.
instance: Instance,
/// Timeout for WASM merge operations.
#[allow(dead_code, reason = "Will be used for timeout handling in Issue #1780")]
instance: Arc<Instance>,
/// Timeout for WASM merge operations in milliseconds.
timeout_ms: u64,
}

Expand All @@ -172,8 +177,8 @@ impl RuntimeMergeCallback {
"WASM module exports merge functions, creating callback"
);
Some(Self {
store: Mutex::new(store),
instance,
store: Arc::new(Mutex::new(store)),
instance: Arc::new(instance),
timeout_ms: DEFAULT_MERGE_TIMEOUT_MS,
})
} else {
Expand All @@ -192,115 +197,172 @@ impl RuntimeMergeCallback {
self
}

/// Get the WASM memory from the instance.
fn get_memory(&self) -> Result<&Memory, MergeError> {
self.instance
.exports
.get_memory("memory")
.map_err(|e| MergeError::WasmCallbackFailed {
message: format!("Failed to get WASM memory: {e}"),
})
}

/// Write data to WASM memory and return the pointer.
/// Call a merge function by export name with timeout enforcement.
///
/// This allocates memory in the WASM instance using the `__calimero_alloc` export
/// and writes the data there.
fn write_to_wasm(&self, store: &mut Store, data: &[u8]) -> Result<(u64, u64), MergeError> {
// Get the allocator function
let alloc_fn: TypedFunction<u64, u64> = self
.instance
.exports
.get_typed_function(store, "__calimero_alloc")
.map_err(|e| MergeError::WasmCallbackFailed {
message: format!(
"WASM module does not export __calimero_alloc function: {e}. \
Ensure your app uses #[app::state] macro."
),
})?;
/// The WASM call is executed in a background thread with a timeout. If the
/// timeout expires, the caller is unblocked and `MergeError::WasmTimeout` is
/// returned. Note that due to Wasmer limitations, the underlying WASM
/// execution may continue in the background thread until completion.
fn call_merge_function(
&self,
export_name: &str,
local: &[u8],
remote: &[u8],
) -> Result<Vec<u8>, MergeError> {
let store = Arc::clone(&self.store);
let instance = Arc::clone(&self.instance);
let export_name_owned = export_name.to_string();
let local_owned = local.to_vec();
let remote_owned = remote.to_vec();
let timeout_ms = self.timeout_ms;

// Allocate memory for the data
let len = data.len() as u64;
let ptr = alloc_fn
.call(store, len)
.map_err(|e| MergeError::WasmCallbackFailed {
message: format!("Failed to allocate WASM memory: {e}"),
})?;
debug!(
target: "calimero_runtime::merge",
export_name,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Thread resource leak on timeout - spawned threads never terminate

After timeout, the spawned thread keeps running indefinitely if WASM is in an infinite loop; repeated timeouts will accumulate orphaned threads consuming CPU and memory.

Suggested fix:

Document this limitation clearly; consider tracking active threads with a counter/metric; investigate Wasmer's interrupt mechanisms for future improvement.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Thread spawning overhead on every merge call

Creating a new OS thread for each merge operation adds overhead; for high-frequency merges this could become a bottleneck.

Suggested fix:

Consider using a thread pool or async task spawning if merge frequency is expected to be high.

local_len = local.len(),
remote_len = remote.len(),
timeout_ms,
"Calling WASM merge function with timeout"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Mutex held indefinitely after timeout causes subsequent calls to block

When timeout occurs, the background thread still holds store.lock() and continues execution; subsequent call_merge_function calls will block on the mutex until the WASM completes, defeating the timeout purpose.

Suggested fix:

Consider using a try_lock with retry, or create a new WASM instance after timeout, or use a per-call store clone if Wasmer supports it.

);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Unbounded thread spawning enables resource exhaustion DoS

Each merge call spawns an unbounded thread that continues running after timeout, holding Arc references to Store and Instance; an attacker triggering many slow WASM merge operations could exhaust thread/memory resources.

Suggested fix:

Consider using a bounded thread pool, adding a semaphore to limit concurrent merge calls, or exploring WASM metering/fuel-based cancellation in future iterations.

// Write data to the allocated memory
let memory = self.get_memory()?;
let view = memory.view(store);
view.write(ptr, data)
.map_err(|e| MergeError::WasmCallbackFailed {
message: format!("Failed to write data to WASM memory: {e}"),
})?;
// Channel to receive the result from the worker thread
let (tx, rx) = mpsc::sync_channel(1);

Ok((ptr, len))
}
// Spawn a thread to execute the WASM call
std::thread::spawn(move || {
let result = Self::execute_merge_call(
&store,
&instance,
&export_name_owned,
&local_owned,
&remote_owned,
);
// Send result; ignore error if receiver dropped (timeout occurred)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 WASM instance state undefined after timeout - subsequent merges may corrupt data

After timeout, the WASM instance continues modifying its memory; subsequent merge calls on the same instance may read partially-written or corrupted state.

Suggested fix:

Mark the RuntimeMergeCallback as poisoned after timeout and return an error on subsequent calls, or create a fresh instance.

let _ = tx.send(result);
});

/// Read data from WASM memory at the given pointer and length.
fn read_from_wasm(&self, store: &Store, ptr: u64, len: u64) -> Result<Vec<u8>, MergeError> {
let memory = self.get_memory()?;
let view = memory.view(store);
let mut buf = vec![0u8; len as usize];
view.read(ptr, &mut buf)
.map_err(|e| MergeError::WasmCallbackFailed {
message: format!("Failed to read data from WASM memory: {e}"),
})?;
Ok(buf)
// Wait for result with timeout
match rx.recv_timeout(Duration::from_millis(timeout_ms)) {
Ok(result) => result,
Err(RecvTimeoutError::Timeout) => {
warn!(
target: "calimero_runtime::merge",
export_name,
timeout_ms,
"WASM merge operation timed out"
);
Err(MergeError::WasmTimeout { timeout_ms })
}
Err(RecvTimeoutError::Disconnected) => {
warn!(
target: "calimero_runtime::merge",
export_name,
"WASM merge thread panicked or disconnected"
);
Err(MergeError::WasmCallbackFailed {
message: "WASM merge thread panicked".to_string(),
})
}
}
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Long function could be decomposed for readability

The execute_merge_call function is ~110 lines; extracting helper closures or local functions for memory write/read operations would improve readability.

Suggested fix:

Extract local helper closures like `write_to_memory` and `read_from_memory` within the function to reduce repetition and clarify intent.

/// Call a merge function by export name.
fn call_merge_function(
&self,
/// Execute the actual merge call within the worker thread.
///
/// This is a static helper method that performs the WASM call with the
/// provided store and instance references.
fn execute_merge_call(
store: &Arc<Mutex<Store>>,
instance: &Arc<Instance>,
export_name: &str,
local: &[u8],
remote: &[u8],
) -> Result<Vec<u8>, MergeError> {
// Lock the store for the entire operation
let mut store = self
.store
.lock()
.map_err(|e| MergeError::WasmCallbackFailed {
message: format!("Failed to lock WASM store: {e}"),
})?;
let mut store_guard = store.lock().map_err(|e| MergeError::WasmCallbackFailed {
message: format!("Failed to lock WASM store: {e}"),
})?;

// Get the merge function
let merge_fn: TypedFunction<(u64, u64, u64, u64), u64> = self
.instance
let merge_fn: TypedFunction<(u64, u64, u64, u64), u64> = instance
.exports
.get_typed_function(&*store, export_name)
.get_typed_function(&*store_guard, export_name)
.map_err(|e| MergeError::WasmMergeNotExported {
export_name: format!("{export_name}: {e}"),
})?;

// Write inputs to WASM memory
let (local_ptr, local_len) = self.write_to_wasm(&mut store, local)?;
let (remote_ptr, remote_len) = self.write_to_wasm(&mut store, remote)?;
// Get the allocator function
let alloc_fn: TypedFunction<u64, u64> = instance
.exports
.get_typed_function(&*store_guard, "__calimero_alloc")
.map_err(|e| MergeError::WasmCallbackFailed {
message: format!(
"WASM module does not export __calimero_alloc function: {e}. \
Ensure your app uses #[app::state] macro."
),
})?;

// Get memory
let memory =
instance
.exports
.get_memory("memory")
.map_err(|e| MergeError::WasmCallbackFailed {
message: format!("Failed to get WASM memory: {e}"),
})?;

debug!(
target: "calimero_runtime::merge",
export_name,
local_len,
remote_len,
"Calling WASM merge function"
);
// Write local data to WASM memory
let local_len = local.len() as u64;
let local_ptr = alloc_fn.call(&mut *store_guard, local_len).map_err(|e| {
MergeError::WasmCallbackFailed {
message: format!("Failed to allocate WASM memory for local data: {e}"),
}
})?;
memory
.view(&*store_guard)
.write(local_ptr, local)
.map_err(|e| MergeError::WasmCallbackFailed {
message: format!("Failed to write local data to WASM memory: {e}"),
})?;

// Write remote data to WASM memory
let remote_len = remote.len() as u64;
let remote_ptr = alloc_fn.call(&mut *store_guard, remote_len).map_err(|e| {
MergeError::WasmCallbackFailed {
message: format!("Failed to allocate WASM memory for remote data: {e}"),
}
})?;
memory
.view(&*store_guard)
.write(remote_ptr, remote)
.map_err(|e| MergeError::WasmCallbackFailed {
message: format!("Failed to write remote data to WASM memory: {e}"),
})?;

// Call the merge function
// TODO: Add timeout handling (Issue #1780 acceptance criteria)
let result_ptr = merge_fn
.call(&mut *store, local_ptr, local_len, remote_ptr, remote_len)
.call(
&mut *store_guard,
local_ptr,
local_len,
remote_ptr,
remote_len,
)
.map_err(|e| MergeError::WasmCallbackFailed {
message: format!("WASM {export_name} call failed: {e}"),
})?;

// Read the result
let memory = self.get_memory()?;
let result = WasmMergeResult::from_memory(memory, &*store, result_ptr)?;
let result = WasmMergeResult::from_memory(memory, &*store_guard, result_ptr)?;

if result.success != 0 {
// Success - read the merged data
let merged = self.read_from_wasm(&store, result.data_ptr, result.data_len)?;
let view = memory.view(&*store_guard);
let mut merged = vec![0u8; result.data_len as usize];
view.read(result.data_ptr, &mut merged).map_err(|e| {
MergeError::WasmCallbackFailed {
message: format!("Failed to read merged data from WASM memory: {e}"),
}
})?;
debug!(
target: "calimero_runtime::merge",
export_name,
Expand All @@ -310,7 +372,13 @@ impl RuntimeMergeCallback {
Ok(merged)
} else {
// Failure - read the error message
let error_msg = self.read_from_wasm(&store, result.error_ptr, result.error_len)?;
let view = memory.view(&*store_guard);
let mut error_msg = vec![0u8; result.error_len as usize];
view.read(result.error_ptr, &mut error_msg).map_err(|e| {
MergeError::WasmCallbackFailed {
message: format!("Failed to read error message from WASM memory: {e}"),
}
})?;
let error_str = String::from_utf8_lossy(&error_msg).to_string();
warn!(
target: "calimero_runtime::merge",
Expand Down
Loading