Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 156 additions & 20 deletions rust-native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,8 @@ async fn handle_connection_inner(
drain_consumed_bytes(buffer, header_bytes);

match dispatch_decision {
DispatchDecision::BridgeRequest(request, cache_insertion) => {
write_dynamic_dispatch_response(stream, dispatcher, request, keep_alive, cache_insertion)
DispatchDecision::BridgeRequest(request, cache_insertion, handler_id, url_bytes) => {
write_dynamic_dispatch_response(stream, dispatcher, request, keep_alive, cache_insertion, handler_id, &url_bytes)
.await?;
}
DispatchDecision::SpecializedResponse(response) => {
Expand Down Expand Up @@ -743,8 +743,8 @@ async fn handle_connection_inner(
)?;

match dispatch_decision_owned {
DispatchDecision::BridgeRequest(request, cache_insertion) => {
write_dynamic_dispatch_response(stream, dispatcher, request, keep_alive, cache_insertion).await?;
DispatchDecision::BridgeRequest(request, cache_insertion, handler_id, url_bytes) => {
write_dynamic_dispatch_response(stream, dispatcher, request, keep_alive, cache_insertion, handler_id, &url_bytes).await?;
}
DispatchDecision::SpecializedResponse(response) => {
let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await;
Expand Down Expand Up @@ -950,7 +950,8 @@ fn parse_hot_root_request(
/// avoiding all String/Vec allocations for method, target, path, and headers.
/// Used for non-body requests (GET, DELETE without body, etc.).
enum DispatchDecision {
BridgeRequest(Buffer, Option<(u32, u64, usize, u64)>),
/// (envelope, route-level cache insertion, handler_id, url_bytes for ncache key)
BridgeRequest(Buffer, Option<(u32, u64, usize, u64)>, u32, Vec<u8>),
SpecializedResponse(Vec<u8>),
CachedResponse(bytes::Bytes),
}
Expand All @@ -975,7 +976,7 @@ fn build_dispatch_decision_zero_copy(
&parsed.headers,
body,
)
.map(|envelope| DispatchDecision::BridgeRequest(envelope, None));
.map(|envelope| DispatchDecision::BridgeRequest(envelope, None, NOT_FOUND_HANDLER_ID, Vec::new()));
}

let matched_route = if method_code == UNKNOWN_METHOD_CODE {
Expand All @@ -992,16 +993,22 @@ fn build_dispatch_decision_zero_copy(
&parsed.headers,
body,
)
.map(|envelope| DispatchDecision::BridgeRequest(envelope, None));
.map(|envelope| DispatchDecision::BridgeRequest(envelope, None, NOT_FOUND_HANDLER_ID, Vec::new()));
};

let mut cache_insertion = None;
if let Some(cfg) = matched_route.cache_config {
let key = crate::router::interpolate_cache_key(cfg, parsed, url_str, matched_route.param_names, &matched_route.param_values);
if let Some(cached_response) = crate::router::get_cached_response(matched_route.handler_id, key, parsed.keep_alive) {
return Ok(DispatchDecision::CachedResponse(cached_response));
}
cache_insertion = Some((matched_route.handler_id, key, cfg.max_entries, cfg.ttl_secs));
} else {
// ncache lookup: check if a previous res.ncache() call cached this response
let ncache_key = compute_ncache_key(matched_route.handler_id, parsed.target);
if let Some(cached_response) = crate::router::get_cached_response(matched_route.handler_id, ncache_key, parsed.keep_alive) {
return Ok(DispatchDecision::CachedResponse(cached_response));
}
}

if let Some(response) =
Expand All @@ -1010,6 +1017,9 @@ fn build_dispatch_decision_zero_copy(
return Ok(DispatchDecision::SpecializedResponse(response));
};

let handler_id = matched_route.handler_id;
let url_bytes_owned = parsed.target.to_vec();

build_dispatch_envelope(
&matched_route,
method_code,
Expand All @@ -1018,7 +1028,7 @@ fn build_dispatch_decision_zero_copy(
&parsed.headers,
body,
)
.map(|envelope| DispatchDecision::BridgeRequest(envelope, cache_insertion))
.map(|envelope| DispatchDecision::BridgeRequest(envelope, cache_insertion, handler_id, url_bytes_owned))
}

fn build_dispatch_decision_owned(
Expand Down Expand Up @@ -1051,7 +1061,7 @@ fn build_dispatch_decision_owned(
&header_refs,
body,
)
.map(|envelope| DispatchDecision::BridgeRequest(envelope, None));
.map(|envelope| DispatchDecision::BridgeRequest(envelope, None, NOT_FOUND_HANDLER_ID, Vec::new()));
}

let matched_route = if method_code == UNKNOWN_METHOD_CODE {
Expand All @@ -1068,12 +1078,11 @@ fn build_dispatch_decision_owned(
&header_refs,
body,
)
.map(|envelope| DispatchDecision::BridgeRequest(envelope, None));
.map(|envelope| DispatchDecision::BridgeRequest(envelope, None, NOT_FOUND_HANDLER_ID, Vec::new()));
};

let mut cache_insertion = None;
if let Some(cfg) = matched_route.cache_config {
// Create a mock ParsedRequest for interpolate_cache_key
let mock_parsed = ParsedRequest {
method,
target,
Expand All @@ -1086,13 +1095,18 @@ fn build_dispatch_decision_owned(
headers: header_refs.clone(),
};
let key = crate::router::interpolate_cache_key(cfg, &mock_parsed, url_str, matched_route.param_names, &matched_route.param_values);

// Keep alive lookup is fine, we don't know it since keep_alive wasn't passed, wait!
// Wait, we can pass keep_alive into this fun. For now assume we don't know keep_alive. Actually, we do!
// Let's defer cached returned to handle_connection_inner instead and just pass out cache_insertion.
cache_insertion = Some((matched_route.handler_id, key, cfg.max_entries, cfg.ttl_secs));
} else {
// ncache lookup: check if a previous res.ncache() call cached this response
let ncache_key = compute_ncache_key(matched_route.handler_id, target);
if let Some(cached_response) = crate::router::get_cached_response(matched_route.handler_id, ncache_key, false) {
return Ok(DispatchDecision::CachedResponse(cached_response));
}
}

let handler_id = matched_route.handler_id;
let url_bytes_owned = target.to_vec();

build_dispatch_envelope(
&matched_route,
method_code,
Expand All @@ -1101,7 +1115,7 @@ fn build_dispatch_decision_owned(
&header_refs,
body,
)
.map(|envelope| DispatchDecision::BridgeRequest(envelope, cache_insertion))
.map(|envelope| DispatchDecision::BridgeRequest(envelope, cache_insertion, handler_id, url_bytes_owned))
}

fn build_not_found_dispatch_envelope(
Expand Down Expand Up @@ -1642,18 +1656,112 @@ async fn write_exact_static_response(
Ok(())
}

// ─── ncache Support ────────────────────
//
// Extracts a cache trailer appended by JS `res.ncache()` after the response
// body in the dispatch envelope. Layout: magic(2) 0xCA 0xCE | ttl_secs(4) | max_entries(4)

/// Maximum ncache TTL (24 hours) to prevent accidental permanent caching.
const NCACHE_MAX_TTL_SECS: u64 = 86400;
/// Maximum response body size eligible for ncache (1 MB).
const NCACHE_MAX_BODY_SIZE: usize = 1024 * 1024;

/// Walk the response envelope to find where the body ends, then check for
/// the 10-byte ncache trailer. Returns (ttl_secs, max_entries) if present.
fn extract_ncache_trailer(dispatch_bytes: &[u8]) -> Option<(u64, usize)> {
if dispatch_bytes.len() < 8 {
return None;
}

// Parse the envelope header to find body end offset
let mut offset = 0usize;
let _status = (dispatch_bytes[offset] as u16) | ((dispatch_bytes[offset + 1] as u16) << 8);
offset += 2;
let header_count = (dispatch_bytes[offset] as u16) | ((dispatch_bytes[offset + 1] as u16) << 8);
offset += 2;
let body_length = (dispatch_bytes[offset] as u32)
| ((dispatch_bytes[offset + 1] as u32) << 8)
| ((dispatch_bytes[offset + 2] as u32) << 16)
| ((dispatch_bytes[offset + 3] as u32) << 24);
offset += 4;

let body_length = body_length as usize;

// Guard: don't cache oversized responses
if body_length > NCACHE_MAX_BODY_SIZE {
return None;
}

// Skip headers
for _ in 0..header_count {
if offset + 3 > dispatch_bytes.len() {
return None;
}
let name_len = dispatch_bytes[offset] as usize;
offset += 1;
let value_len = (dispatch_bytes[offset] as u16) | ((dispatch_bytes[offset + 1] as u16) << 8);
offset += 2;
offset += name_len + value_len as usize;
}

// Skip body
offset += body_length;

// Check for 10-byte trailer: magic(2) + ttl(4) + max_entries(4)
if offset + 10 > dispatch_bytes.len() {
return None;
}

// Check magic bytes
if dispatch_bytes[offset] != 0xCA || dispatch_bytes[offset + 1] != 0xCE {
return None;
}
offset += 2;

let ttl_secs = (dispatch_bytes[offset] as u64)
| ((dispatch_bytes[offset + 1] as u64) << 8)
| ((dispatch_bytes[offset + 2] as u64) << 16)
| ((dispatch_bytes[offset + 3] as u64) << 24);
offset += 4;

let max_entries = (dispatch_bytes[offset] as u32)
| ((dispatch_bytes[offset + 1] as u32) << 8)
| ((dispatch_bytes[offset + 2] as u32) << 16)
| ((dispatch_bytes[offset + 3] as u32) << 24);

// Cap TTL to safety limit
let ttl_secs = ttl_secs.min(NCACHE_MAX_TTL_SECS);

Some((ttl_secs, max_entries as usize))
}

/// Compute an ncache key from handler_id + full request URL (including query string).
/// Different URLs naturally produce different cache keys, so /data?page=1 and
/// /data?page=2 are cached separately.
fn compute_ncache_key(handler_id: u32, url_bytes: &[u8]) -> u64 {
use std::hash::{Hash, Hasher};
use std::collections::hash_map::DefaultHasher;
let mut hasher = DefaultHasher::new();
handler_id.hash(&mut hasher);
url_bytes.hash(&mut hasher);
hasher.finish()
}

async fn write_dynamic_dispatch_response(
stream: &mut TcpStream,
dispatcher: &JsDispatcher,
request: Buffer,
keep_alive: bool,
cache_insertion: Option<(u32, u64, usize, u64)>,
handler_id: u32,
url_bytes: &[u8],
) -> Result<()> {
match dispatcher.dispatch(request).await {
Ok(response) => {
match build_http_response_from_dispatch(response.as_ref(), keep_alive) {
Ok(http_response) => {
if let Some((handler_id, cache_key, max_entries, ttl_secs)) = cache_insertion {
// Route-level cache insertion (takes precedence over ncache)
let response_bytes_close: bytes::Bytes = if !keep_alive {
http_response.clone().into()
} else {
Expand All @@ -1668,14 +1776,42 @@ async fn write_dynamic_dispatch_response(
.unwrap_or_default()
.into()
};

crate::router::insert_cached_response(handler_id, cache_key, crate::router::CacheEntry {
response_bytes: response_ka,
response_bytes_close,
expires_at: std::time::Instant::now() + std::time::Duration::from_secs(ttl_secs),
}, max_entries);
} else if handler_id != NOT_FOUND_HANDLER_ID {
// Check for ncache trailer from JS response envelope
if let Some((ncache_ttl, ncache_max_entries)) = extract_ncache_trailer(response.as_ref()) {
if ncache_ttl > 0 {
let ncache_key = compute_ncache_key(handler_id, url_bytes);

let response_bytes_close: bytes::Bytes = if !keep_alive {
http_response.clone().into()
} else {
build_http_response_from_dispatch(response.as_ref(), false)
.unwrap_or_default()
.into()
};
let response_ka: bytes::Bytes = if keep_alive {
http_response.clone().into()
} else {
build_http_response_from_dispatch(response.as_ref(), true)
.unwrap_or_default()
.into()
};

crate::router::insert_cached_response(handler_id, ncache_key, crate::router::CacheEntry {
response_bytes: response_ka,
response_bytes_close,
expires_at: std::time::Instant::now() + std::time::Duration::from_secs(ncache_ttl),
}, ncache_max_entries);
}
}
}

let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(http_response)).await;
if let Ok((write_result, _)) = timeout_result {
write_result?;
Expand Down
16 changes: 15 additions & 1 deletion src/bridge.js
Original file line number Diff line number Diff line change
Expand Up @@ -609,10 +609,12 @@ export function encodeResponseEnvelope(snapshot) {
: Buffer.alloc(0);

// Encode headers inline — avoids Object.entries().map() intermediate arrays
const ncache = snapshot.ncache || null;
const headerKeys = rawHeaders ? Object.keys(rawHeaders) : EMPTY_ARRAY;
const headerCount = headerKeys.length;
const encodedHeaders = new Array(headerCount);
let totalLength = 8 + body.length; // status(2) + count(2) + bodylen(4) + body
// status(2) + count(2) + bodylen(4) + body + optional ncache trailer(10)
let totalLength = 8 + body.length + (ncache ? 10 : 0);

for (let i = 0; i < headerCount; i++) {
const name = headerKeys[i];
Expand Down Expand Up @@ -650,6 +652,18 @@ export function encodeResponseEnvelope(snapshot) {
}

output.set(body, offset);
offset += body.length;

// Append ncache trailer: magic(2) | ttlSecs(4) | maxEntries(4)
if (ncache) {
output[offset] = 0xca;
output[offset + 1] = 0xce;
offset += 2;
writeU32(output, offset, ncache.ttl);
offset += 4;
writeU32(output, offset, ncache.maxEntries);
}

return output;
}

Expand Down
10 changes: 10 additions & 0 deletions src/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ export interface Response {

/** Set status and send status code as text body */
sendStatus(code: number): Response;

/**
* Send a JSON response and cache it in the Rust native layer.
* Subsequent requests are served directly from Rust without crossing the JS bridge.
*
* @param data - JSON-serializable response data
* @param ttl - Cache TTL in seconds
* @param options.maxEntries - Max LRU entries per route (default 256)
*/
ncache(data: unknown, ttl: number, options?: { maxEntries?: number }): Response;
}

export type NextFunction = () => Promise<void>;
Expand Down
Loading
Loading