diff --git a/rust-native/src/lib.rs b/rust-native/src/lib.rs index 4600f55..cc91864 100644 --- a/rust-native/src/lib.rs +++ b/rust-native/src/lib.rs @@ -634,8 +634,8 @@ async fn handle_connection_inner( drain_consumed_bytes(buffer, header_bytes); match dispatch_decision { - DispatchDecision::BridgeRequest(request, cache_insertion) => { - write_dynamic_dispatch_response(stream, dispatcher, request, keep_alive, cache_insertion) + DispatchDecision::BridgeRequest(request, cache_insertion, handler_id, url_bytes) => { + write_dynamic_dispatch_response(stream, dispatcher, request, keep_alive, cache_insertion, handler_id, &url_bytes) .await?; } DispatchDecision::SpecializedResponse(response) => { @@ -743,8 +743,8 @@ async fn handle_connection_inner( )?; match dispatch_decision_owned { - DispatchDecision::BridgeRequest(request, cache_insertion) => { - write_dynamic_dispatch_response(stream, dispatcher, request, keep_alive, cache_insertion).await?; + DispatchDecision::BridgeRequest(request, cache_insertion, handler_id, url_bytes) => { + write_dynamic_dispatch_response(stream, dispatcher, request, keep_alive, cache_insertion, handler_id, &url_bytes).await?; } DispatchDecision::SpecializedResponse(response) => { let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await; @@ -950,7 +950,8 @@ fn parse_hot_root_request( /// avoiding all String/Vec allocations for method, target, path, and headers. /// Used for non-body requests (GET, DELETE without body, etc.). enum DispatchDecision { - BridgeRequest(Buffer, Option<(u32, u64, usize, u64)>), + /// (envelope, route-level cache insertion, handler_id, url_bytes for ncache key) + BridgeRequest(Buffer, Option<(u32, u64, usize, u64)>, u32, Vec), SpecializedResponse(Vec), CachedResponse(bytes::Bytes), } @@ -975,7 +976,7 @@ fn build_dispatch_decision_zero_copy( &parsed.headers, body, ) - .map(|envelope| DispatchDecision::BridgeRequest(envelope, None)); + .map(|envelope| DispatchDecision::BridgeRequest(envelope, None, NOT_FOUND_HANDLER_ID, Vec::new())); } let matched_route = if method_code == UNKNOWN_METHOD_CODE { @@ -992,9 +993,9 @@ fn build_dispatch_decision_zero_copy( &parsed.headers, body, ) - .map(|envelope| DispatchDecision::BridgeRequest(envelope, None)); + .map(|envelope| DispatchDecision::BridgeRequest(envelope, None, NOT_FOUND_HANDLER_ID, Vec::new())); }; - + let mut cache_insertion = None; if let Some(cfg) = matched_route.cache_config { let key = crate::router::interpolate_cache_key(cfg, parsed, url_str, matched_route.param_names, &matched_route.param_values); @@ -1002,6 +1003,12 @@ fn build_dispatch_decision_zero_copy( return Ok(DispatchDecision::CachedResponse(cached_response)); } cache_insertion = Some((matched_route.handler_id, key, cfg.max_entries, cfg.ttl_secs)); + } else { + // ncache lookup: check if a previous res.ncache() call cached this response + let ncache_key = compute_ncache_key(matched_route.handler_id, parsed.target); + if let Some(cached_response) = crate::router::get_cached_response(matched_route.handler_id, ncache_key, parsed.keep_alive) { + return Ok(DispatchDecision::CachedResponse(cached_response)); + } } if let Some(response) = @@ -1010,6 +1017,9 @@ fn build_dispatch_decision_zero_copy( return Ok(DispatchDecision::SpecializedResponse(response)); }; + let handler_id = matched_route.handler_id; + let url_bytes_owned = parsed.target.to_vec(); + build_dispatch_envelope( &matched_route, method_code, @@ -1018,7 +1028,7 @@ fn build_dispatch_decision_zero_copy( &parsed.headers, body, ) - .map(|envelope| DispatchDecision::BridgeRequest(envelope, cache_insertion)) + .map(|envelope| DispatchDecision::BridgeRequest(envelope, cache_insertion, handler_id, url_bytes_owned)) } fn build_dispatch_decision_owned( @@ -1051,7 +1061,7 @@ fn build_dispatch_decision_owned( &header_refs, body, ) - .map(|envelope| DispatchDecision::BridgeRequest(envelope, None)); + .map(|envelope| DispatchDecision::BridgeRequest(envelope, None, NOT_FOUND_HANDLER_ID, Vec::new())); } let matched_route = if method_code == UNKNOWN_METHOD_CODE { @@ -1068,12 +1078,11 @@ fn build_dispatch_decision_owned( &header_refs, body, ) - .map(|envelope| DispatchDecision::BridgeRequest(envelope, None)); + .map(|envelope| DispatchDecision::BridgeRequest(envelope, None, NOT_FOUND_HANDLER_ID, Vec::new())); }; - + let mut cache_insertion = None; if let Some(cfg) = matched_route.cache_config { - // Create a mock ParsedRequest for interpolate_cache_key let mock_parsed = ParsedRequest { method, target, @@ -1086,13 +1095,18 @@ fn build_dispatch_decision_owned( headers: header_refs.clone(), }; let key = crate::router::interpolate_cache_key(cfg, &mock_parsed, url_str, matched_route.param_names, &matched_route.param_values); - - // Keep alive lookup is fine, we don't know it since keep_alive wasn't passed, wait! - // Wait, we can pass keep_alive into this fun. For now assume we don't know keep_alive. Actually, we do! - // Let's defer cached returned to handle_connection_inner instead and just pass out cache_insertion. cache_insertion = Some((matched_route.handler_id, key, cfg.max_entries, cfg.ttl_secs)); + } else { + // ncache lookup: check if a previous res.ncache() call cached this response + let ncache_key = compute_ncache_key(matched_route.handler_id, target); + if let Some(cached_response) = crate::router::get_cached_response(matched_route.handler_id, ncache_key, false) { + return Ok(DispatchDecision::CachedResponse(cached_response)); + } } + let handler_id = matched_route.handler_id; + let url_bytes_owned = target.to_vec(); + build_dispatch_envelope( &matched_route, method_code, @@ -1101,7 +1115,7 @@ fn build_dispatch_decision_owned( &header_refs, body, ) - .map(|envelope| DispatchDecision::BridgeRequest(envelope, cache_insertion)) + .map(|envelope| DispatchDecision::BridgeRequest(envelope, cache_insertion, handler_id, url_bytes_owned)) } fn build_not_found_dispatch_envelope( @@ -1642,18 +1656,112 @@ async fn write_exact_static_response( Ok(()) } +// ─── ncache Support ──────────────────── +// +// Extracts a cache trailer appended by JS `res.ncache()` after the response +// body in the dispatch envelope. Layout: magic(2) 0xCA 0xCE | ttl_secs(4) | max_entries(4) + +/// Maximum ncache TTL (24 hours) to prevent accidental permanent caching. +const NCACHE_MAX_TTL_SECS: u64 = 86400; +/// Maximum response body size eligible for ncache (1 MB). +const NCACHE_MAX_BODY_SIZE: usize = 1024 * 1024; + +/// Walk the response envelope to find where the body ends, then check for +/// the 10-byte ncache trailer. Returns (ttl_secs, max_entries) if present. +fn extract_ncache_trailer(dispatch_bytes: &[u8]) -> Option<(u64, usize)> { + if dispatch_bytes.len() < 8 { + return None; + } + + // Parse the envelope header to find body end offset + let mut offset = 0usize; + let _status = (dispatch_bytes[offset] as u16) | ((dispatch_bytes[offset + 1] as u16) << 8); + offset += 2; + let header_count = (dispatch_bytes[offset] as u16) | ((dispatch_bytes[offset + 1] as u16) << 8); + offset += 2; + let body_length = (dispatch_bytes[offset] as u32) + | ((dispatch_bytes[offset + 1] as u32) << 8) + | ((dispatch_bytes[offset + 2] as u32) << 16) + | ((dispatch_bytes[offset + 3] as u32) << 24); + offset += 4; + + let body_length = body_length as usize; + + // Guard: don't cache oversized responses + if body_length > NCACHE_MAX_BODY_SIZE { + return None; + } + + // Skip headers + for _ in 0..header_count { + if offset + 3 > dispatch_bytes.len() { + return None; + } + let name_len = dispatch_bytes[offset] as usize; + offset += 1; + let value_len = (dispatch_bytes[offset] as u16) | ((dispatch_bytes[offset + 1] as u16) << 8); + offset += 2; + offset += name_len + value_len as usize; + } + + // Skip body + offset += body_length; + + // Check for 10-byte trailer: magic(2) + ttl(4) + max_entries(4) + if offset + 10 > dispatch_bytes.len() { + return None; + } + + // Check magic bytes + if dispatch_bytes[offset] != 0xCA || dispatch_bytes[offset + 1] != 0xCE { + return None; + } + offset += 2; + + let ttl_secs = (dispatch_bytes[offset] as u64) + | ((dispatch_bytes[offset + 1] as u64) << 8) + | ((dispatch_bytes[offset + 2] as u64) << 16) + | ((dispatch_bytes[offset + 3] as u64) << 24); + offset += 4; + + let max_entries = (dispatch_bytes[offset] as u32) + | ((dispatch_bytes[offset + 1] as u32) << 8) + | ((dispatch_bytes[offset + 2] as u32) << 16) + | ((dispatch_bytes[offset + 3] as u32) << 24); + + // Cap TTL to safety limit + let ttl_secs = ttl_secs.min(NCACHE_MAX_TTL_SECS); + + Some((ttl_secs, max_entries as usize)) +} + +/// Compute an ncache key from handler_id + full request URL (including query string). +/// Different URLs naturally produce different cache keys, so /data?page=1 and +/// /data?page=2 are cached separately. +fn compute_ncache_key(handler_id: u32, url_bytes: &[u8]) -> u64 { + use std::hash::{Hash, Hasher}; + use std::collections::hash_map::DefaultHasher; + let mut hasher = DefaultHasher::new(); + handler_id.hash(&mut hasher); + url_bytes.hash(&mut hasher); + hasher.finish() +} + async fn write_dynamic_dispatch_response( stream: &mut TcpStream, dispatcher: &JsDispatcher, request: Buffer, keep_alive: bool, cache_insertion: Option<(u32, u64, usize, u64)>, + handler_id: u32, + url_bytes: &[u8], ) -> Result<()> { match dispatcher.dispatch(request).await { Ok(response) => { match build_http_response_from_dispatch(response.as_ref(), keep_alive) { Ok(http_response) => { if let Some((handler_id, cache_key, max_entries, ttl_secs)) = cache_insertion { + // Route-level cache insertion (takes precedence over ncache) let response_bytes_close: bytes::Bytes = if !keep_alive { http_response.clone().into() } else { @@ -1668,14 +1776,42 @@ async fn write_dynamic_dispatch_response( .unwrap_or_default() .into() }; - + crate::router::insert_cached_response(handler_id, cache_key, crate::router::CacheEntry { response_bytes: response_ka, response_bytes_close, expires_at: std::time::Instant::now() + std::time::Duration::from_secs(ttl_secs), }, max_entries); + } else if handler_id != NOT_FOUND_HANDLER_ID { + // Check for ncache trailer from JS response envelope + if let Some((ncache_ttl, ncache_max_entries)) = extract_ncache_trailer(response.as_ref()) { + if ncache_ttl > 0 { + let ncache_key = compute_ncache_key(handler_id, url_bytes); + + let response_bytes_close: bytes::Bytes = if !keep_alive { + http_response.clone().into() + } else { + build_http_response_from_dispatch(response.as_ref(), false) + .unwrap_or_default() + .into() + }; + let response_ka: bytes::Bytes = if keep_alive { + http_response.clone().into() + } else { + build_http_response_from_dispatch(response.as_ref(), true) + .unwrap_or_default() + .into() + }; + + crate::router::insert_cached_response(handler_id, ncache_key, crate::router::CacheEntry { + response_bytes: response_ka, + response_bytes_close, + expires_at: std::time::Instant::now() + std::time::Duration::from_secs(ncache_ttl), + }, ncache_max_entries); + } + } } - + let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(http_response)).await; if let Ok((write_result, _)) = timeout_result { write_result?; diff --git a/src/bridge.js b/src/bridge.js index 487dc76..0d63356 100644 --- a/src/bridge.js +++ b/src/bridge.js @@ -609,10 +609,12 @@ export function encodeResponseEnvelope(snapshot) { : Buffer.alloc(0); // Encode headers inline — avoids Object.entries().map() intermediate arrays + const ncache = snapshot.ncache || null; const headerKeys = rawHeaders ? Object.keys(rawHeaders) : EMPTY_ARRAY; const headerCount = headerKeys.length; const encodedHeaders = new Array(headerCount); - let totalLength = 8 + body.length; // status(2) + count(2) + bodylen(4) + body + // status(2) + count(2) + bodylen(4) + body + optional ncache trailer(10) + let totalLength = 8 + body.length + (ncache ? 10 : 0); for (let i = 0; i < headerCount; i++) { const name = headerKeys[i]; @@ -650,6 +652,18 @@ export function encodeResponseEnvelope(snapshot) { } output.set(body, offset); + offset += body.length; + + // Append ncache trailer: magic(2) | ttlSecs(4) | maxEntries(4) + if (ncache) { + output[offset] = 0xca; + output[offset + 1] = 0xce; + offset += 2; + writeU32(output, offset, ncache.ttl); + offset += 4; + writeU32(output, offset, ncache.maxEntries); + } + return output; } diff --git a/src/index.d.ts b/src/index.d.ts index 9f04e0a..b09ca16 100644 --- a/src/index.d.ts +++ b/src/index.d.ts @@ -70,6 +70,16 @@ export interface Response { /** Set status and send status code as text body */ sendStatus(code: number): Response; + + /** + * Send a JSON response and cache it in the Rust native layer. + * Subsequent requests are served directly from Rust without crossing the JS bridge. + * + * @param data - JSON-serializable response data + * @param ttl - Cache TTL in seconds + * @param options.maxEntries - Max LRU entries per route (default 256) + */ + ncache(data: unknown, ttl: number, options?: { maxEntries?: number }): Response; } export type NextFunction = () => Promise; diff --git a/src/index.js b/src/index.js index 654e0fa..1e3efcf 100644 --- a/src/index.js +++ b/src/index.js @@ -128,6 +128,7 @@ function acquireResponseState() { pooled.body = EMPTY_BUFFER; pooled.finished = false; pooled.locals = Object.create(null); + pooled.ncache = null; return pooled; } @@ -137,6 +138,7 @@ function acquireResponseState() { body: EMPTY_BUFFER, finished: false, locals: Object.create(null), + ncache: null, }; } @@ -247,6 +249,37 @@ const RESPONSE_PROTO = { } return this.send(String(code)); }, + + /** + * Send a JSON response and cache it in the Rust native layer so that + * subsequent requests are served directly from Rust without crossing + * the JS bridge. + * + * @param {*} data - JSON-serializable response data + * @param {number} ttl - Cache TTL in seconds + * @param {Object} [options] + * @param {number} [options.maxEntries] - Max LRU entries per route (default 256) + * @returns {this} + */ + ncache(data, ttl, options = {}) { + const state = this._state; + if (state.finished) { + return this; + } + + const ttlSecs = Math.max(1, Math.floor(Number(ttl) || 60)); + const maxEntries = Math.max(1, Math.floor(Number(options.maxEntries) || 256)); + + if (!state.headers["content-type"]) { + state.headers["content-type"] = "application/json; charset=utf-8"; + } + + state.body = this._jsonSerializer(data); + state.finished = true; + state.ncache = { ttl: ttlSecs, maxEntries }; + + return this; + }, }; function createResponseEnvelope(jsonSerializer = DEFAULT_JSON_SERIALIZER) { @@ -263,6 +296,7 @@ function createResponseEnvelope(jsonSerializer = DEFAULT_JSON_SERIALIZER) { status: state.status, headers: state.headers, body: state.body, + ncache: state.ncache, }; }, release() {