diff --git a/blockless/src/lib.rs b/blockless/src/lib.rs index f7acb8ec..6330e675 100644 --- a/blockless/src/lib.rs +++ b/blockless/src/lib.rs @@ -657,6 +657,7 @@ impl BlocklessRunner { add_to_linker!(blockless_env::add_memory_to_linker); add_to_linker!(blockless_env::add_cgi_to_linker); add_to_linker!(blockless_env::add_socket_to_linker); + add_to_linker!(blockless_env::add_bless_to_linker); wasi_common::sync::add_to_linker(linker, |host| host.preview1_ctx.as_mut().unwrap()) .unwrap(); } diff --git a/crates/blockless-drivers/Cargo.toml b/crates/blockless-drivers/Cargo.toml index 8d914539..8073cb11 100644 --- a/crates/blockless-drivers/Cargo.toml +++ b/crates/blockless-drivers/Cargo.toml @@ -26,7 +26,7 @@ async-trait = { workspace = true } dlopen = { workspace = true } json = { workspace = true } lazy_static = { workspace = true} -reqwest = { version = "0.11", features = ["stream", "rustls-tls", "json"], default-features = false } +reqwest = { version = "0.11", features = ["stream", "rustls-tls", "json", "multipart"], default-features = false } serde_urlencoded = "0.7" bytes = { workspace = true } httparse = "1" diff --git a/crates/blockless-drivers/src/error.rs b/crates/blockless-drivers/src/error.rs index b0ebdecf..8db27884 100644 --- a/crates/blockless-drivers/src/error.rs +++ b/crates/blockless-drivers/src/error.rs @@ -217,3 +217,26 @@ pub enum LlmErrorKind { MCPFunctionCallError, // 9 PermissionDeny, // 10 } + +#[derive(Debug)] +pub enum BlocklessRpcErrorKind { + InvalidJson, + MethodNotFound, + InvalidParams, + InternalError, + BufferTooSmall, +} + +impl std::error::Error for BlocklessRpcErrorKind {} + +impl std::fmt::Display for BlocklessRpcErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + Self::InvalidJson => write!(f, "Invalid JSON format"), + Self::MethodNotFound => write!(f, "Method not found"), + Self::InvalidParams => write!(f, "Invalid parameters"), + Self::InternalError => write!(f, "Internal error"), + Self::BufferTooSmall => write!(f, "Buffer too small"), + } + } +} diff --git a/crates/blockless-drivers/src/handlers/http.rs b/crates/blockless-drivers/src/handlers/http.rs new file mode 100644 index 00000000..1e66ba70 --- /dev/null +++ b/crates/blockless-drivers/src/handlers/http.rs @@ -0,0 +1,253 @@ +use reqwest::{Client, Method}; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, str::FromStr, time::Duration}; + +// Import RPC types from parent module +use crate::wasi::rpc::{JsonRpcError, JsonRpcErrorCode, JsonRpcResponse, RPC_VERSION}; + +// HTTP request structures matching the SDK +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HttpRpcRequest { + pub url: String, + pub options: HttpOptions, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HttpOptions { + #[serde(skip_serializing_if = "Option::is_none")] + pub method: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub headers: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub body: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub timeout: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub query_params: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum HttpBody { + Text(String), + Binary(Vec), + Form(HashMap), + Multipart(Vec), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MultipartField { + pub name: String, + pub value: MultipartValue, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum MultipartValue { + Text(String), + Binary { + data: Vec, + filename: Option, + content_type: Option, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HttpResponse { + pub status: u16, + pub headers: HashMap, + pub body: Vec, + pub url: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HttpResult { + pub success: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +pub async fn handle_http_request(params: Option, id: u32) -> JsonRpcResponse { + // Parse the HTTP request parameters + let http_request: HttpRpcRequest = match params { + Some(p) => match serde_json::from_value(p) { + Ok(req) => req, + Err(e) => { + return JsonRpcResponse { + jsonrpc: RPC_VERSION.to_string(), + result: None, + error: Some(JsonRpcError { + code: -32602, + message: "Invalid params".to_string(), + data: Some(serde_json::json!({ + "error": format!("Failed to parse HTTP request: {}", e) + })), + }), + id, + }; + } + }, + None => { + return JsonRpcResponse { + jsonrpc: RPC_VERSION.to_string(), + result: None, + error: Some(JsonRpcError { + code: JsonRpcErrorCode::InvalidParams as i32, + message: "Invalid params".to_string(), + data: Some(serde_json::json!({ + "error": "Missing HTTP request parameters" + })), + }), + id, + }; + } + }; + + // Execute the HTTP request using the http_v2 driver + let result = execute_http_request(http_request).await; + JsonRpcResponse { + jsonrpc: RPC_VERSION.to_string(), + result: match serde_json::to_value(result) { + Ok(value) => Some(value), + Err(e) => { + return JsonRpcResponse { + jsonrpc: RPC_VERSION.to_string(), + result: None, + error: Some(JsonRpcError { + code: JsonRpcErrorCode::InternalError as i32, + message: "Internal error".to_string(), + data: Some(serde_json::json!({ + "error": format!("Failed to serialize result: {}", e) + })), + }), + id, + }; + } + }, + error: None, + id, + } +} + +pub async fn execute_http_request(request: HttpRpcRequest) -> HttpResult { + log::trace!("=== HTTP Request via RPC ==="); + log::trace!("URL: {}", request.url); + log::trace!("Method: {:?}", request.options.method); + log::trace!("Headers: {:?}", request.options.headers); + log::trace!("Body: {:?}", request.options.body); + log::trace!("Timeout: {:?}", request.options.timeout); + log::trace!("Query Params: {:?}", request.options.query_params); + log::trace!("============================"); + + let result = async { + // Create HTTP client with timeout + let timeout = Duration::from_millis(request.options.timeout.unwrap_or(30000) as u64); + let client = Client::builder().timeout(timeout).build()?; + + // Parse HTTP method + let method = request.options.method.as_deref().unwrap_or("GET"); + let http_method = Method::from_str(method)?; + + // Build URL with query parameters + let mut url = reqwest::Url::parse(&request.url)?; + // Only add query parameters if the URL doesn't already have a query string + if url.query().is_none() { + if let Some(query_params) = &request.options.query_params { + for (key, value) in query_params { + url.query_pairs_mut().append_pair(key, value); + } + } + } + + // Create request builder + let mut req_builder = client.request(http_method, url.clone()); + + // Add headers + if let Some(headers) = &request.options.headers { + for (key, value) in headers { + req_builder = req_builder.header(key, value); + } + } + + // Add body based on type + if let Some(body) = &request.options.body { + req_builder = match body { + HttpBody::Text(text) => req_builder.body(text.clone()), + HttpBody::Binary(data) => req_builder.body(data.clone()), + HttpBody::Form(form_data) => { + let mut form = reqwest::multipart::Form::new(); + for (key, value) in form_data { + form = form.text(key.clone(), value.clone()); + } + req_builder.multipart(form) + } + HttpBody::Multipart(fields) => { + let mut form = reqwest::multipart::Form::new(); + for field in fields { + match &field.value { + MultipartValue::Text(text) => { + form = form.text(field.name.clone(), text.clone()); + } + MultipartValue::Binary { + data, + filename, + content_type, + } => { + let mut part = reqwest::multipart::Part::bytes(data.clone()); + if let Some(filename) = filename { + part = part.file_name(filename.clone()); + } + if let Some(content_type) = content_type { + part = part.mime_str(content_type)?; + } + form = form.part(field.name.clone(), part); + } + } + } + req_builder.multipart(form) + } + }; + } + + // Execute the request + let response = req_builder.send().await?; + let status = response.status().as_u16(); + let final_url = response.url().to_string(); + + // Extract headers + let mut headers = HashMap::new(); + for (name, value) in response.headers() { + if let Ok(value_str) = value.to_str() { + headers.insert(name.to_string(), value_str.to_string()); + } + } + + // Get response body + let body = response.bytes().await?.to_vec(); + + Ok::>(HttpResponse { + status, + headers, + body, + url: final_url, + }) + } + .await; + + match result { + Ok(response) => HttpResult { + success: true, + data: Some(response), + error: None, + }, + Err(e) => { + eprintln!("HTTP request failed: {}", e); + HttpResult { + success: false, + data: None, + error: Some(e.to_string()), + } + } + } +} diff --git a/crates/blockless-drivers/src/handlers/mod.rs b/crates/blockless-drivers/src/handlers/mod.rs new file mode 100644 index 00000000..3883215f --- /dev/null +++ b/crates/blockless-drivers/src/handlers/mod.rs @@ -0,0 +1 @@ +pub mod http; diff --git a/crates/blockless-drivers/src/lib.rs b/crates/blockless-drivers/src/lib.rs index 17aa7b2f..7d1c9213 100644 --- a/crates/blockless-drivers/src/lib.rs +++ b/crates/blockless-drivers/src/lib.rs @@ -2,6 +2,7 @@ mod cdylib_driver; pub mod cgi_driver; pub mod error; +pub mod handlers; pub mod http_driver; pub mod ipfs_driver; pub mod llm_driver; diff --git a/crates/blockless-drivers/src/wasi/mod.rs b/crates/blockless-drivers/src/wasi/mod.rs index b0d00506..1175fe6f 100644 --- a/crates/blockless-drivers/src/wasi/mod.rs +++ b/crates/blockless-drivers/src/wasi/mod.rs @@ -5,6 +5,7 @@ pub mod http; pub mod ipfs; pub mod llm; pub mod memory; +pub mod rpc; pub mod s3; pub mod socket; use crate::ErrorKind; diff --git a/crates/blockless-drivers/src/wasi/rpc.rs b/crates/blockless-drivers/src/wasi/rpc.rs new file mode 100644 index 00000000..489183a9 --- /dev/null +++ b/crates/blockless-drivers/src/wasi/rpc.rs @@ -0,0 +1,166 @@ +#![allow(non_upper_case_globals)] +use crate::BlocklessRpcErrorKind; +use serde::{Deserialize, Serialize}; +use wasi_common::WasiCtx; +use wiggle::{GuestMemory, GuestPtr}; + +wiggle::from_witx!({ + witx: ["$BLOCKLESS_DRIVERS_ROOT/witx/blockless_rpc.witx"], + errors: { blockless_rpc_error => BlocklessRpcErrorKind }, + async: *, +}); + +pub const RPC_VERSION: &str = "2.0"; + +// JSON-RPC 2.0 structures +#[derive(Serialize, Deserialize, Debug)] +pub struct JsonRpcRequest { + pub jsonrpc: String, + pub method: String, + pub params: Option, + pub id: u32, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct JsonRpcResponse { + pub jsonrpc: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + pub id: u32, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct JsonRpcError { + pub code: i32, + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, +} + +// https://www.jsonrpc.org/specification#error_object +#[derive(Debug, Clone, Copy)] +#[repr(i32)] +pub enum JsonRpcErrorCode { + ParseError = -32700, + InvalidRequest = -32600, + MethodNotFound = -32601, + InvalidParams = -32602, + InternalError = -32603, +} + +impl types::UserErrorConversion for WasiCtx { + fn blockless_rpc_error_from_blockless_rpc_error_kind( + &mut self, + e: self::BlocklessRpcErrorKind, + ) -> wiggle::anyhow::Result { + Ok(e.into()) + } +} + +impl From for types::BlocklessRpcError { + fn from(e: BlocklessRpcErrorKind) -> types::BlocklessRpcError { + use types::BlocklessRpcError; + match e { + BlocklessRpcErrorKind::InvalidJson => BlocklessRpcError::InvalidJson, + BlocklessRpcErrorKind::MethodNotFound => BlocklessRpcError::MethodNotFound, + BlocklessRpcErrorKind::InvalidParams => BlocklessRpcError::InvalidParams, + BlocklessRpcErrorKind::InternalError => BlocklessRpcError::InternalError, + BlocklessRpcErrorKind::BufferTooSmall => BlocklessRpcError::BufferTooSmall, + } + } +} + +impl wiggle::GuestErrorType for types::BlocklessRpcError { + fn success() -> Self { + Self::Success + } +} + +#[wiggle::async_trait] +impl bless::Bless for WasiCtx { + async fn rpc_call( + &mut self, + memory: &mut GuestMemory<'_>, + request_buf: GuestPtr, + request_len: u32, + response_buf: GuestPtr, + response_max_len: u32, + ) -> Result { + // Read the JSON-RPC request from WASM memory + let request_bytes = memory + .as_slice(request_buf.as_array(request_len)) + .map_err(|_| BlocklessRpcErrorKind::InternalError)? + .unwrap(); + + // Parse JSON-RPC request directly from bytes + let request: JsonRpcRequest = serde_json::from_slice(request_bytes) + .map_err(|_| BlocklessRpcErrorKind::InvalidJson)?; + + // Handle the request + let response = handle_rpc_request(request).await; + + // Serialize response directly to bytes + let response_bytes = + serde_json::to_vec(&response).map_err(|_| BlocklessRpcErrorKind::InternalError)?; + + // Check if response fits in buffer + let response_len = response_bytes.len() as u32; + if response_len > response_max_len { + return Err(BlocklessRpcErrorKind::BufferTooSmall); + } + + // Write response to WASM memory + memory + .copy_from_slice(&response_bytes, response_buf.as_array(response_len)) + .map_err(|_| BlocklessRpcErrorKind::InternalError)?; + + Ok(response_len) + } +} + +async fn handle_rpc_request(request: JsonRpcRequest) -> JsonRpcResponse { + let id = request.id; + + match request.method.as_str() { + "ping" => JsonRpcResponse { + jsonrpc: RPC_VERSION.to_string(), + result: Some(serde_json::json!("pong")), + error: None, + id, + }, + "echo" => { + let params = request.params.unwrap_or(serde_json::Value::Null); + JsonRpcResponse { + jsonrpc: RPC_VERSION.to_string(), + result: Some(params), + error: None, + id, + } + } + "version" => JsonRpcResponse { + jsonrpc: RPC_VERSION.to_string(), + result: Some(serde_json::json!({ + "runtime": "bls-runtime", + "version": env!("CARGO_PKG_VERSION"), + "rpc_version": "2.0" + })), + error: None, + id, + }, + "http.request" => crate::handlers::http::handle_http_request(request.params, id).await, + _ => JsonRpcResponse { + jsonrpc: RPC_VERSION.to_string(), + result: None, + error: Some(JsonRpcError { + code: JsonRpcErrorCode::MethodNotFound as i32, + message: "Method not found".to_string(), + data: Some(serde_json::json!({ + "method": request.method + })), + }), + id, + }, + } +} diff --git a/crates/blockless-drivers/witx/blockless_rpc.witx b/crates/blockless-drivers/witx/blockless_rpc.witx new file mode 100644 index 00000000..95b21a01 --- /dev/null +++ b/crates/blockless-drivers/witx/blockless_rpc.witx @@ -0,0 +1,30 @@ +(typename $blockless_rpc_error + (enum (@witx tag u16) + ;;; Success + $success + ;;; Invalid JSON format + $invalid_json + ;;; Method not found + $method_not_found + ;;; Invalid parameters + $invalid_params + ;;; Internal error + $internal_error + ;;; Buffer too small + $buffer_too_small + ) +) + +;;; Number of bytes +(typename $num_bytes u32) + +(module $bless + + (@interface func (export "rpc_call") + (param $request_buf (@witx pointer u8)) + (param $request_len u32) + (param $response_buf (@witx pointer u8)) + (param $response_max_len u32) + (result $error (expected $num_bytes (error $blockless_rpc_error))) + ) +) \ No newline at end of file diff --git a/crates/blockless-env/src/lib.rs b/crates/blockless-env/src/lib.rs index d99d7ef3..7bf40254 100644 --- a/crates/blockless-env/src/lib.rs +++ b/crates/blockless-env/src/lib.rs @@ -48,3 +48,9 @@ linker_integration!({ target: blockless_drivers::wasi::llm, link_method: "add_llm_to_linker", }); + +linker_integration!({ + witx: ["$BLOCKLESS_DRIVERS_ROOT/witx/blockless_rpc.witx"], + target: blockless_drivers::wasi::rpc, + link_method: "add_bless_to_linker", +});