diff --git a/rust-native/Cargo.lock b/rust-native/Cargo.lock index 548e3bf..3290c4b 100644 --- a/rust-native/Cargo.lock +++ b/rust-native/Cargo.lock @@ -137,6 +137,12 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crypto-common" version = "0.1.7" @@ -163,6 +169,20 @@ version = "0.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52560adf09603e58c9a7ee1fe1dcb95a16927b17c127f0ac02d6e768a0e25bc1" +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "digest" version = "0.10.7" @@ -371,6 +391,12 @@ dependencies = [ "wasip2", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hermit-abi" version = "0.5.2" @@ -393,6 +419,8 @@ dependencies = [ "anyhow", "base64", "bytes", + "dashmap", + "flume", "getrandom 0.2.17", "hmac", "httparse", @@ -410,6 +438,7 @@ dependencies = [ "rustls-pemfile", "serde", "serde_json", + "sha1", "sha2", "socket2", "url", @@ -1043,6 +1072,17 @@ dependencies = [ "zmij", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" diff --git a/rust-native/Cargo.toml b/rust-native/Cargo.toml index d320e70..91e1854 100644 --- a/rust-native/Cargo.toml +++ b/rust-native/Cargo.toml @@ -15,7 +15,10 @@ itoa = "1.0" json5 = "0.4" memchr = "2.7" rustc-hash = "2" +dashmap = "6" +flume = "0.11" getrandom = "0.2" +sha1 = "0.10" hmac = "0.12" monoio = { version = "0.2", features = ["sync", "legacy"] } monoio-rustls = "0.4" diff --git a/rust-native/src/lib.rs b/rust-native/src/lib.rs index e8509d4..5efdb54 100644 --- a/rust-native/src/lib.rs +++ b/rust-native/src/lib.rs @@ -2,6 +2,7 @@ mod analyzer; mod manifest; mod router; pub mod session; +mod websocket; use anyhow::{anyhow, Context, Result}; use memchr::memmem; @@ -18,7 +19,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::io::BufReader; use std::net::{SocketAddr, ToSocketAddrs}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::rc::Rc; use std::sync::{mpsc, Arc, Mutex}; use url::form_urlencoded; @@ -247,6 +248,57 @@ impl NativeServerHandle { static GLOBAL_SESSION_STORE: std::sync::OnceLock> = std::sync::OnceLock::new(); +// ─── Global Stream Registry ────────── +// +// Streams are created on the monoio thread when a stream-start sentinel is +// detected in the JS dispatch response. The Sender lives in the DashMap so +// that NAPI calls from JS (`stream_write`, `stream_end`) can push chunks. +// The Receiver is held locally on the monoio thread that drives the chunked +// transfer loop. + +static NEXT_STREAM_ID: AtomicU64 = AtomicU64::new(1); +static STREAM_CHANNELS: std::sync::OnceLock>> = + std::sync::OnceLock::new(); + +fn stream_registry() -> &'static dashmap::DashMap> { + STREAM_CHANNELS.get_or_init(dashmap::DashMap::new) +} + +enum StreamMessage { + Chunk(Vec), + End, +} + +/// Allocate the next stream ID. Called from JS before dispatching so the +/// stream-start envelope can embed the ID. +#[napi] +pub fn stream_create() -> i64 { + NEXT_STREAM_ID.fetch_add(1, Ordering::Relaxed) as i64 +} + +/// Write a chunk to an active stream. Called from JS. +#[napi] +pub fn stream_write(stream_id: i64, chunk: Buffer) -> napi::Result<()> { + let registry = stream_registry(); + let sender = registry + .get(&(stream_id as u64)) + .ok_or_else(|| napi::Error::from_reason(format!("Stream {} not found", stream_id)))?; + sender + .send(StreamMessage::Chunk(chunk.to_vec())) + .map_err(|_| napi::Error::from_reason("Stream channel closed"))?; + Ok(()) +} + +/// End an active stream. Called from JS. +#[napi] +pub fn stream_end(stream_id: i64) -> napi::Result<()> { + let registry = stream_registry(); + if let Some((_, sender)) = registry.remove(&(stream_id as u64)) { + let _ = sender.send(StreamMessage::End); + } + Ok(()) +} + /// Get a session value by key. Returns JSON string or null. #[napi] pub fn session_get(session_id_hex: String, key: String) -> Option { @@ -631,6 +683,10 @@ struct ParsedRequest<'a> { headers: Vec<(&'a str, &'a str)>, /// Raw cookie header value for session extraction cookie_header: Option<&'a str>, + /// True when the request contains an Upgrade: websocket header + is_websocket_upgrade: bool, + /// The Sec-WebSocket-Key header value, if present + ws_key: Option<&'a str>, } use monoio::time::timeout; @@ -789,6 +845,25 @@ where } } + // ── WebSocket upgrade check ── + if parsed.is_websocket_upgrade { + if let Some(ws_key) = parsed.ws_key { + if let Some(ws_handler_id) = router.match_ws_route(std::str::from_utf8(parsed.path).unwrap_or("/")) { + let accept_key = crate::websocket::compute_accept_key(ws_key); + let upgrade_response = crate::websocket::build_upgrade_response(&accept_key); + drop(parsed); + drain_consumed_bytes(buffer, header_bytes); + + let (write_result, _) = stream.write_all(upgrade_response).await; + write_result?; + + // Enter WebSocket frame loop + handle_websocket_connection(stream, buffer, ws_handler_id, dispatcher).await?; + return Ok(()); + } + } + } + // ── Zero-copy path: non-body requests ── // Build dispatch envelope directly from borrowed parse data, avoiding // String/Vec allocations for method, target, path, and headers. @@ -957,6 +1032,8 @@ fn parse_request_httparse(bytes: &[u8]) -> Option> { let mut content_length: Option = None; let mut has_chunked_te = false; let mut cookie_header: Option<&str> = None; + let mut is_websocket_upgrade = false; + let mut ws_key: Option<&str> = None; let mut headers = Vec::with_capacity(req.headers.len()); for header in req.headers.iter() { @@ -1010,6 +1087,16 @@ fn parse_request_httparse(bytes: &[u8]) -> Option> { cookie_header = Some(value); } + // WebSocket: detect upgrade request + if name.eq_ignore_ascii_case("upgrade") { + if value.eq_ignore_ascii_case("websocket") { + is_websocket_upgrade = true; + } + } + if name.eq_ignore_ascii_case("sec-websocket-key") { + ws_key = Some(value); + } + headers.push((name, value)); } @@ -1024,6 +1111,8 @@ fn parse_request_httparse(bytes: &[u8]) -> Option> { has_chunked_te, headers, cookie_header, + is_websocket_upgrade, + ws_key, }) } @@ -1102,6 +1191,8 @@ fn parse_hot_root_request( has_chunked_te: false, headers: Vec::new(), cookie_header: None, // Hot path doesn't parse cookies + is_websocket_upgrade: false, + ws_key: None, }) } @@ -1259,6 +1350,8 @@ fn build_dispatch_decision_owned( has_chunked_te: false, headers: header_refs.clone(), cookie_header: None, + is_websocket_upgrade: false, + ws_key: None, }; let key = crate::router::interpolate_cache_key(cfg, &mock_parsed, url_str, matched_route.param_names, &matched_route.param_values); cache_insertion = Some((matched_route.handler_id, key, cfg.max_entries, cfg.ttl_secs)); @@ -2041,6 +2134,114 @@ where { match dispatcher.dispatch(request).await { Ok(response) => { + // ── Stream-start sentinel: 0xFF 0x53 ── + // If JS returns a stream-start envelope instead of a normal response, + // we enter chunked transfer mode and pipe chunks from JS via the + // global stream registry. + if response.len() >= 14 && response[0] == 0xFF && response[1] == 0x53 { + let mut off = 2usize; + let stream_id = u64::from_le_bytes( + response[off..off + 8] + .try_into() + .map_err(|_| anyhow!("stream envelope truncated"))?, + ); + off += 8; + let status = + u16::from_le_bytes([response[off], response[off + 1]]); + off += 2; + let header_count = + u16::from_le_bytes([response[off], response[off + 1]]) as usize; + off += 2; + + // Create the channel — Sender goes into the registry so JS can push + // chunks; we keep the Receiver locally for the write loop. + let (tx, rx) = flume::bounded::(16); + stream_registry().insert(stream_id, tx); + + // Build HTTP/1.1 response headers with chunked transfer-encoding + let reason = status_reason(status); + let connection = if keep_alive { "keep-alive" } else { "close" }; + let mut output = Vec::with_capacity(256); + output.extend_from_slice(b"HTTP/1.1 "); + write_u16(&mut output, status); + output.push(b' '); + output.extend_from_slice(reason.as_bytes()); + output.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\nconnection: "); + output.extend_from_slice(connection.as_bytes()); + output.extend_from_slice(b"\r\n"); + + // Parse user headers from the envelope + for _ in 0..header_count { + if off >= response.len() { + break; + } + let name_len = response[off] as usize; + off += 1; + if off + 2 > response.len() { + break; + } + let value_len = u16::from_le_bytes([response[off], response[off + 1]]) as usize; + off += 2; + if off + name_len + value_len > response.len() { + break; + } + let name = &response[off..off + name_len]; + off += name_len; + let value = &response[off..off + value_len]; + off += value_len; + + // Skip headers we manage ourselves + if name.eq_ignore_ascii_case(b"transfer-encoding") + || name.eq_ignore_ascii_case(b"content-length") + || name.eq_ignore_ascii_case(b"connection") + { + continue; + } + + output.extend_from_slice(name); + output.extend_from_slice(b": "); + output.extend_from_slice(value); + output.extend_from_slice(b"\r\n"); + } + output.extend_from_slice(b"\r\n"); + + // Write the header block to the TCP stream + let (write_result, _) = stream.write_all(output).await; + write_result?; + + // Chunked transfer loop — read from channel, write to TCP + loop { + match rx.recv_async().await { + Ok(StreamMessage::Chunk(data)) => { + if data.is_empty() { + continue; + } + // HTTP/1.1 chunked format: {hex_len}\r\n{data}\r\n + let hex_len = format!("{:x}", data.len()); + let mut chunk_buf = Vec::with_capacity(hex_len.len() + 2 + data.len() + 2); + chunk_buf.extend_from_slice(hex_len.as_bytes()); + chunk_buf.extend_from_slice(b"\r\n"); + chunk_buf.extend_from_slice(&data); + chunk_buf.extend_from_slice(b"\r\n"); + let (wr, _) = stream.write_all(chunk_buf).await; + if wr.is_err() { + stream_registry().remove(&stream_id); + break; + } + } + Ok(StreamMessage::End) | Err(_) => { + // Final chunk: 0\r\n\r\n + let (wr, _) = stream.write_all(b"0\r\n\r\n".to_vec()).await; + let _ = wr; + stream_registry().remove(&stream_id); + break; + } + } + } + + return Ok(()); + } + match build_http_response_from_dispatch(response.as_ref(), keep_alive) { Ok(mut http_response) => { if let Some((handler_id, cache_key, max_entries, ttl_secs)) = cache_insertion { @@ -2293,6 +2494,114 @@ fn resolve_session( (Some(new_id), true) } +// ─── WebSocket Connection Handler ────── + +async fn handle_websocket_connection( + stream: &mut S, + buffer: &mut Vec, + handler_id: u32, + dispatcher: &JsDispatcher, +) -> Result<()> +where + S: monoio::io::AsyncReadRent + monoio::io::AsyncWriteRent + monoio::io::AsyncWriteRentExt, +{ + use crate::websocket::*; + + let ws_id = NEXT_STREAM_ID.fetch_add(1, Ordering::Relaxed); + + // Create channel for outbound messages (JS → Rust → client) + let (tx, rx) = flume::bounded::(64); + stream_registry().insert(ws_id, tx); + + // Dispatch "open" event to JS + let open_envelope = build_ws_event_envelope(0x01, ws_id, handler_id, &[]); + let _ = dispatcher.dispatch(Buffer::from(open_envelope)).await; + + buffer.clear(); + + loop { + // Try to parse a frame from the buffer + if let Some((frame, consumed)) = parse_frame(buffer) { + buffer.drain(..consumed); + + match frame.opcode { + OPCODE_TEXT | OPCODE_BINARY => { + let msg_envelope = + build_ws_event_envelope(0x02, ws_id, handler_id, &frame.payload); + let _ = dispatcher.dispatch(Buffer::from(msg_envelope)).await; + } + OPCODE_PING => { + let pong = encode_frame(OPCODE_PONG, &frame.payload); + let (res, _) = stream.write_all(pong).await; + if res.is_err() { + break; + } + } + OPCODE_CLOSE => { + let close = encode_close_frame(1000, ""); + let (res, _) = stream.write_all(close).await; + let _ = res; + break; + } + _ => {} + } + continue; + } + + // Check for outbound messages from JS + match rx.try_recv() { + Ok(StreamMessage::Chunk(data)) => { + let frame = encode_frame(OPCODE_TEXT, &data); + let (res, _) = stream.write_all(frame).await; + if res.is_err() { + break; + } + continue; + } + Ok(StreamMessage::End) => { + let close = encode_close_frame(1000, ""); + let (res, _) = stream.write_all(close).await; + let _ = res; + break; + } + Err(flume::TryRecvError::Empty) => {} + Err(flume::TryRecvError::Disconnected) => break, + } + + // Read more data from the client + let owned_buf = std::mem::take(buffer); + let (read_result, returned_buf) = stream.read(owned_buf).await; + *buffer = returned_buf; + match read_result { + Ok(0) => break, + Ok(_) => {} + Err(_) => break, + } + } + + // Cleanup + stream_registry().remove(&ws_id); + + // Dispatch "close" event to JS + let close_envelope = build_ws_event_envelope(0x03, ws_id, handler_id, &[]); + let _ = dispatcher.dispatch(Buffer::from(close_envelope)).await; + + Ok(()) +} + +/// Build a WebSocket event envelope for dispatching to JS. +/// Layout: 0xFE | eventType(1) | wsId(8 LE) | handlerId(4 LE) | dataLen(4 LE) | data +fn build_ws_event_envelope(event_type: u8, ws_id: u64, handler_id: u32, data: &[u8]) -> Vec { + let mut buf = Vec::with_capacity(18 + data.len()); + buf.push(0xFE); // WS event sentinel + buf.push(event_type); + buf.extend_from_slice(&ws_id.to_le_bytes()); + buf.extend_from_slice(&handler_id.to_le_bytes()); + buf.extend_from_slice(&(data.len() as u32).to_le_bytes()); + buf.extend_from_slice(data); + buf +} + /// Inject a Set-Cookie header into an already-built HTTP response. /// Inserts the header just before the final \r\n\r\n (end of headers). fn inject_set_cookie_header(response: &mut Vec, cookie_value: &str) { diff --git a/rust-native/src/manifest.rs b/rust-native/src/manifest.rs index 900304b..8417400 100644 --- a/rust-native/src/manifest.rs +++ b/rust-native/src/manifest.rs @@ -10,6 +10,8 @@ pub struct ManifestInput { pub middlewares: Vec, pub routes: Vec, #[serde(default)] + pub ws_routes: Vec, + #[serde(default)] pub session: Option, } @@ -114,3 +116,10 @@ pub struct CacheVaryInput { pub source: String, pub name: String, } + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WsRouteInput { + pub path: String, + pub handler_id: u32, +} diff --git a/rust-native/src/router.rs b/rust-native/src/router.rs index 98d22a4..d960390 100644 --- a/rust-native/src/router.rs +++ b/rust-native/src/router.rs @@ -22,8 +22,10 @@ pub struct Router { dynamic_exact_routes: HashMap, DynamicRouteSpec>>, /// O(1) static-response routes exact_static_routes: HashMap, ExactStaticRoute>>, - /// O(M) radix-tree routes per method (M = path length) + /// O(M) radix-tree routes per method (M = path length) radix_trees: HashMap, + /// WebSocket routes: path → handler_id + ws_routes: HashMap, } #[derive(Clone)] @@ -268,11 +270,18 @@ impl Router { } } + let mut ws_routes = HashMap::new(); + for ws_route in &manifest.ws_routes { + let path = normalize_path(ws_route.path.as_str()); + ws_routes.insert(path, ws_route.handler_id); + } + Ok(Self { exact_get_root, dynamic_exact_routes, exact_static_routes, radix_trees, + ws_routes, }) } @@ -343,6 +352,10 @@ impl Router { pub fn exact_get_root(&self) -> Option<&ExactStaticRoute> { self.exact_get_root.as_ref() } + + pub fn match_ws_route(&self, path: &str) -> Option { + self.ws_routes.get(path).copied() + } } // ─── MethodKey ────────────────────────── diff --git a/rust-native/src/websocket.rs b/rust-native/src/websocket.rs new file mode 100644 index 0000000..ac1c838 --- /dev/null +++ b/rust-native/src/websocket.rs @@ -0,0 +1,131 @@ +//! WebSocket frame protocol implementation (RFC 6455). +//! Handles frame parsing, encoding, masking, and the upgrade handshake. + +use sha1::{Sha1, Digest}; +use base64::Engine; + +const WS_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + +/// WebSocket opcodes +#[allow(dead_code)] +pub const OPCODE_CONTINUATION: u8 = 0x0; +pub const OPCODE_TEXT: u8 = 0x1; +pub const OPCODE_BINARY: u8 = 0x2; +pub const OPCODE_CLOSE: u8 = 0x8; +pub const OPCODE_PING: u8 = 0x9; +pub const OPCODE_PONG: u8 = 0xA; + +/// Max frame payload (16 MB) +const MAX_PAYLOAD_SIZE: usize = 16 * 1024 * 1024; + +#[allow(dead_code)] +pub struct WsFrame { + pub fin: bool, + pub opcode: u8, + pub payload: Vec, +} + +/// Compute the Sec-WebSocket-Accept value from the client's key. +pub fn compute_accept_key(client_key: &str) -> String { + let mut hasher = Sha1::new(); + hasher.update(client_key.trim().as_bytes()); + hasher.update(WS_GUID); + let hash = hasher.finalize(); + base64::engine::general_purpose::STANDARD.encode(hash) +} + +/// Build the 101 Switching Protocols response. +pub fn build_upgrade_response(accept_key: &str) -> Vec { + format!( + "HTTP/1.1 101 Switching Protocols\r\n\ + Upgrade: websocket\r\n\ + Connection: Upgrade\r\n\ + Sec-WebSocket-Accept: {}\r\n\ + \r\n", + accept_key + ).into_bytes() +} + +/// Try to parse a WebSocket frame from the buffer. +/// Returns Some((frame, bytes_consumed)) if a complete frame is available. +pub fn parse_frame(buf: &[u8]) -> Option<(WsFrame, usize)> { + if buf.len() < 2 { + return None; + } + + let fin = (buf[0] & 0x80) != 0; + let opcode = buf[0] & 0x0F; + let masked = (buf[1] & 0x80) != 0; + let mut payload_len = (buf[1] & 0x7F) as usize; + let mut offset = 2; + + if payload_len == 126 { + if buf.len() < 4 { return None; } + payload_len = ((buf[2] as usize) << 8) | (buf[3] as usize); + offset = 4; + } else if payload_len == 127 { + if buf.len() < 10 { return None; } + payload_len = u64::from_be_bytes(buf[2..10].try_into().ok()?) as usize; + offset = 10; + } + + if payload_len > MAX_PAYLOAD_SIZE { + return None; // Reject oversized frames + } + + let mask_key = if masked { + if buf.len() < offset + 4 { return None; } + let key = [buf[offset], buf[offset+1], buf[offset+2], buf[offset+3]]; + offset += 4; + Some(key) + } else { + None + }; + + if buf.len() < offset + payload_len { + return None; // Incomplete frame + } + + let mut payload = buf[offset..offset + payload_len].to_vec(); + + // Unmask if needed (client-to-server frames are always masked) + if let Some(key) = mask_key { + for (i, byte) in payload.iter_mut().enumerate() { + *byte ^= key[i % 4]; + } + } + + Some((WsFrame { fin, opcode, payload }, offset + payload_len)) +} + +/// Encode a WebSocket frame for sending (server-to-client, unmasked). +pub fn encode_frame(opcode: u8, payload: &[u8]) -> Vec { + let mut frame = Vec::with_capacity(10 + payload.len()); + + // FIN + opcode + frame.push(0x80 | opcode); + + // Payload length (server frames are NOT masked) + if payload.len() < 126 { + frame.push(payload.len() as u8); + } else if payload.len() <= 65535 { + frame.push(126); + frame.push((payload.len() >> 8) as u8); + frame.push((payload.len() & 0xFF) as u8); + } else { + frame.push(127); + frame.extend_from_slice(&(payload.len() as u64).to_be_bytes()); + } + + frame.extend_from_slice(payload); + frame +} + +/// Encode a close frame with optional status code and reason. +pub fn encode_close_frame(code: u16, reason: &str) -> Vec { + let mut payload = Vec::with_capacity(2 + reason.len()); + payload.push((code >> 8) as u8); + payload.push((code & 0xFF) as u8); + payload.extend_from_slice(reason.as_bytes()); + encode_frame(OPCODE_CLOSE, &payload) +} diff --git a/src/bridge.js b/src/bridge.js index c5e0ea4..9bcb69d 100644 --- a/src/bridge.js +++ b/src/bridge.js @@ -686,6 +686,70 @@ export function encodeResponseEnvelope(snapshot) { return output; } +/** + * Encode a stream-start envelope for chunked streaming responses. + * Layout: sentinel(2) 0xFF 0x53 | streamId(8) | status(2) | headerCount(2) | headers... + * + * @param {number} streamId + * @param {{ status: number, headers: Object }} snapshot + * @returns {Buffer} + */ +export function encodeStreamStartEnvelope(streamId, snapshot) { + const rawHeaders = snapshot.headers; + const headerKeys = rawHeaders ? Object.keys(rawHeaders) : []; + const headerCount = headerKeys.length; + + // Encode headers + const encodedHeaders = new Array(headerCount); + let headersSize = 0; + for (let i = 0; i < headerCount; i++) { + const name = headerKeys[i]; + const nameBytes = getCachedHeaderNameBytes(name); + const valueBytes = textEncoder.encode(String(rawHeaders[name])); + encodedHeaders[i] = [nameBytes, valueBytes]; + headersSize += 3 + nameBytes.length + valueBytes.length; + } + + // sentinel(2) + streamId(8) + status(2) + headerCount(2) + headers + const totalLength = 2 + 8 + 2 + 2 + headersSize; + const output = Buffer.allocUnsafe(totalLength); + let offset = 0; + + // Stream sentinel + output[offset++] = 0xff; + output[offset++] = 0x53; + + // Stream ID (u64 LE) — use two u32 writes since JS doesn't have u64 + const low = streamId & 0xffffffff; + const high = (streamId / 0x100000000) >>> 0; + writeU32(output, offset, low); + offset += 4; + writeU32(output, offset, high); + offset += 4; + + // Status + writeU16(output, offset, Number(snapshot.status ?? 200)); + offset += 2; + + // Header count + writeU16(output, offset, headerCount); + offset += 2; + + // Headers (same format as regular response) + for (let i = 0; i < headerCount; i++) { + const [nameBytes, valueBytes] = encodedHeaders[i]; + output[offset++] = nameBytes.length; + writeU16(output, offset, valueBytes.length); + offset += 2; + output.set(nameBytes, offset); + offset += nameBytes.length; + output.set(valueBytes, offset); + offset += valueBytes.length; + } + + return output; +} + // ─── Object Materialization (Security-Hardened) ─────────────────────────────── // // All user-facing objects use Object.create(null) to prevent prototype pollution. diff --git a/src/index.js b/src/index.js index fd1e150..b0375c0 100644 --- a/src/index.js +++ b/src/index.js @@ -10,6 +10,7 @@ import { createRequestFactory, decodeRequestEnvelope, encodeResponseEnvelope, + encodeStreamStartEnvelope, mergeRequestAccessPlans, releaseRequestObject, } from "./bridge.js"; @@ -161,6 +162,8 @@ function acquireResponseState() { pooled.finished = false; pooled.locals = Object.create(null); pooled.ncache = null; + pooled.streaming = false; + pooled.streamId = null; return pooled; } @@ -171,6 +174,8 @@ function acquireResponseState() { finished: false, locals: Object.create(null), ncache: null, + streaming: false, + streamId: null, }; } @@ -312,6 +317,58 @@ const RESPONSE_PROTO = { return this; }, + + stream(options = {}) { + const state = this._state; + if (state.finished) { + return null; + } + + // Load native module for stream NAPI calls + const native = loadNativeModule(); + const streamId = native.streamCreate(); + + // Set default content-type if not set + if (!state.headers["content-type"]) { + state.headers["content-type"] = options.contentType || "application/octet-stream"; + } + + state.finished = true; + state.streaming = true; + state.streamId = streamId; + + const encoder = new TextEncoder(); + + return { + /** Write a chunk to the stream */ + write(data) { + let chunk; + if (typeof data === "string") { + chunk = Buffer.from(data, "utf8"); + } else if (Buffer.isBuffer(data)) { + chunk = data; + } else if (data instanceof Uint8Array) { + chunk = Buffer.from(data); + } else if (typeof data === "object") { + chunk = Buffer.from(JSON.stringify(data), "utf8"); + } else { + chunk = Buffer.from(String(data), "utf8"); + } + return native.streamWrite(streamId, chunk); + }, + + /** End the stream */ + end(finalChunk) { + if (finalChunk !== undefined) { + this.write(finalChunk); + } + return native.streamEnd(streamId); + }, + + /** The stream ID (for internal use) */ + id: streamId, + }; + }, }; function createResponseEnvelope(jsonSerializer = DEFAULT_JSON_SERIALIZER) { @@ -480,8 +537,10 @@ function createDispatcher( runtimeOptimizer, errorHandlers = [], devRouteCommentWriter = null, + wsRoutes = [], ) { const routesById = new Map(compiledRoutes.map((route) => [route.handlerId, route])); + const wsRoutesById = new Map(wsRoutes.map((r) => [r.handlerId, r])); const errorRequestFactory = createRequestFactory(ERROR_REQUEST_PLAN, [], null); const trackDispatchTiming = runtimeOptimizer?.shouldCaptureDispatchTiming?.() === true; @@ -516,6 +575,46 @@ function createDispatcher( } return async function dispatch(requestBuffer) { + // WebSocket event dispatch (sentinel 0xFE) + if (requestBuffer[0] === 0xFE) { + const eventType = requestBuffer[1]; + const wsId = Number(requestBuffer.readBigUInt64LE(2)); + const handlerId = requestBuffer.readUInt32LE(10); + const dataLen = requestBuffer.readUInt32LE(14); + const data = dataLen > 0 ? requestBuffer.subarray(18, 18 + dataLen) : null; + + const route = wsRoutesById.get(handlerId); + if (!route) return EMPTY_BUFFER; + + const native = loadNativeModule(); + const ws = { + send(msg) { + const chunk = typeof msg === "string" ? Buffer.from(msg, "utf8") : Buffer.from(msg); + native.streamWrite(Number(wsId), chunk); + }, + close(code = 1000, reason = "") { + native.streamEnd(Number(wsId)); + }, + id: wsId, + }; + + try { + switch (eventType) { + case 0x01: await route.handlers.open?.(ws); break; + case 0x02: { + const textData = data ? new TextDecoder().decode(data) : ""; + await route.handlers.message?.(ws, textData); + break; + } + case 0x03: await route.handlers.close?.(ws); break; + } + } catch (err) { + console.error("[http-native] WebSocket handler error:", err); + } + + return EMPTY_BUFFER; + } + let decoded; try { @@ -572,6 +671,15 @@ function createDispatcher( } const responseSnapshot = snapshot(); + + // Handle streaming responses — return stream-start envelope instead of normal response + if (res._state?.streaming) { + const streamEnvelope = encodeStreamStartEnvelope(res._state.streamId, responseSnapshot); + releaseRequestObject(req); + // Don't release response state — stream is still active + return streamEnvelope; + } + const dispatchDurationMs = trackDispatchTiming ? performance.now() - dispatchStartMs : undefined; @@ -956,6 +1064,7 @@ export function createApp() { _routes: [], _middlewares: [], _errorHandlers: [], + _wsRoutes: [], _groupPrefix: "/", use(pathOrMiddleware, maybeMiddleware) { @@ -1126,6 +1235,10 @@ export function createApp() { : null, needsSession: /\breq\.session\b|\breq\.sessionId\b/.test(route.handlerSource), })), + wsRoutes: this._wsRoutes.map((ws) => ({ + path: ws.path, + handlerId: ws.handlerId, + })), }; if (normalizedOptions.tls) { @@ -1185,6 +1298,7 @@ export function createApp() { runtimeOptimizer, this._errorHandlers, devRouteCommentWriter, + this._wsRoutes, ); // Hot reload: override port/host if the hot reloader is active const hotCtx = globalThis.__HTTP_NATIVE_HOT__; @@ -1362,6 +1476,18 @@ export function createApp() { }, }; + app.ws = (path, handlers) => { + if (typeof handlers !== "object") { + throw new TypeError("WebSocket handlers must be an object with open/message/close"); + } + app._wsRoutes.push({ + path: normalizeRoutePath("GET", path), + handlers, + handlerId: nextHandlerId++, + }); + return app; + }; + app.get = createMethodRegistrar(app, "GET"); app.post = createMethodRegistrar(app, "POST"); app.put = createMethodRegistrar(app, "PUT");