From 2ce85dc368bbcf11d81cb036f3cd1ef79467a253 Mon Sep 17 00:00:00 2001 From: Dominic Petrick Date: Sun, 1 Mar 2026 20:15:13 +0100 Subject: [PATCH 1/6] Reapply changes to support in-memory backend handlers and endpoint monitoring for integration testing. --- cli/tests/integration/common.rs | 4 +- cli/tests/integration/common/backends.rs | 1 + src/cache.rs | 31 +++++ src/component/backend.rs | 1 + src/component/shielding.rs | 1 + src/config.rs | 5 +- src/config/backends.rs | 42 +++++- src/execute.rs | 162 ++++++++++++++++++++--- src/lib.rs | 14 +- src/logging.rs | 14 ++ src/session.rs | 19 ++- src/upstream.rs | 56 ++++---- src/wiggle_abi/req_impl.rs | 1 + src/wiggle_abi/shielding.rs | 1 + 14 files changed, 303 insertions(+), 49 deletions(-) diff --git a/cli/tests/integration/common.rs b/cli/tests/integration/common.rs index c367ef92..a54b446f 100644 --- a/cli/tests/integration/common.rs +++ b/cli/tests/integration/common.rs @@ -418,8 +418,8 @@ impl Test { .await .map(|result| { match result { - (resp, None) => resp, - (_, Some(err)) => { + (resp, None, _) => resp, + (_, Some(err), _) => { // Splat the string representation of the runtime error into a synthetic // 500. This is a bit of a hack, but good enough to check for expected error // strings. diff --git a/cli/tests/integration/common/backends.rs b/cli/tests/integration/common/backends.rs index 309620d4..e0f7f01c 100644 --- a/cli/tests/integration/common/backends.rs +++ b/cli/tests/integration/common/backends.rs @@ -76,6 +76,7 @@ impl TestBackends { grpc: false, client_cert: None, ca_certs: vec![], + handler: None, }; backends.insert(name.to_string(), Arc::new(backend_config)); } diff --git a/src/cache.rs b/src/cache.rs index 9cd96933..f5c624f5 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -557,6 +557,37 @@ impl std::str::FromStr for SurrogateKey { } } +/// A thin public wrapper around `Cache` that provides a convenient API for integration testing. +#[derive(Clone)] +pub struct InMemoryCache(pub(crate) Arc); + +impl Default for InMemoryCache { + fn default() -> Self { + Self(Arc::new(Cache::default())) + } +} + +impl InMemoryCache { + pub fn new() -> Self { + Self::default() + } + + /// Purge cache entries matching the given surrogate key strings. + pub fn purge(&self, surrogates: Vec) { + for s in surrogates { + if let Ok(key) = s.parse::() { + self.0.purge(key, false); + } + } + } +} + +impl std::fmt::Display for InMemoryCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "InMemoryCache") + } +} + #[cfg(test)] mod tests { use std::rc::Rc; diff --git a/src/component/backend.rs b/src/component/backend.rs index 137490c9..50238125 100644 --- a/src/component/backend.rs +++ b/src/component/backend.rs @@ -144,6 +144,7 @@ pub(crate) async fn register_dynamic_backend( grpc, client_cert, ca_certs, + handler: None, }; if !session.add_backend(name, new_backend) { diff --git a/src/component/shielding.rs b/src/component/shielding.rs index 9773c582..01a17cd0 100644 --- a/src/component/shielding.rs +++ b/src/component/shielding.rs @@ -28,6 +28,7 @@ pub(crate) fn backend_for_shield( grpc: false, client_cert: None, ca_certs: Vec::new(), + handler: None, }; if !session.add_backend(&new_name, new_backend) { diff --git a/src/config.rs b/src/config.rs index 360f7fb1..0b259d94 100644 --- a/src/config.rs +++ b/src/config.rs @@ -32,7 +32,10 @@ pub use crate::acl::Acls; /// Types and deserializers for backend configuration settings. mod backends; -pub use self::backends::{Backend, ClientCertError, ClientCertInfo}; +pub use self::backends::{ + Backend, ClientCertError, ClientCertInfo, DynamicBackendRegistrationInterceptor, Handler, + InMemoryBackendHandler, +}; pub type Backends = HashMap>; diff --git a/src/config/backends.rs b/src/config/backends.rs index 3a621036..a28dc8cd 100644 --- a/src/config/backends.rs +++ b/src/config/backends.rs @@ -1,12 +1,48 @@ mod client_cert_info; use { - hyper::{header::HeaderValue, Uri}, - std::{collections::HashMap, sync::Arc}, + crate::body::Body, + async_trait::async_trait, + hyper::{header::HeaderValue, Request, Response, Uri}, + std::{collections::HashMap, fmt, ops::Deref, sync::Arc}, }; pub use self::client_cert_info::{ClientCertError, ClientCertInfo}; +/// A trait for handling backend requests in-memory, without making real HTTP calls. +#[async_trait] +pub trait InMemoryBackendHandler: Send + Sync + 'static { + async fn handle(&self, req: Request) -> Response; +} + +/// A wrapper around an `InMemoryBackendHandler` trait object. +#[derive(Clone)] +pub struct Handler(Arc>); + +impl Handler { + pub fn new(handler: Box) -> Self { + Handler(Arc::new(handler)) + } +} + +impl Deref for Handler { + type Target = dyn InMemoryBackendHandler; + fn deref(&self) -> &Self::Target { + self.0.as_ref().as_ref() + } +} + +impl fmt::Debug for Handler { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Handler").finish_non_exhaustive() + } +} + +/// A trait for intercepting dynamic backend registration. +pub trait DynamicBackendRegistrationInterceptor: Send + Sync + 'static { + fn register(&self, backend: Backend) -> Backend; +} + /// A single backend definition. #[derive(Clone, Debug)] pub struct Backend { @@ -17,6 +53,7 @@ pub struct Backend { pub grpc: bool, pub client_cert: Option, pub ca_certs: Vec, + pub handler: Option, } /// A map of [`Backend`] definitions, keyed by their name. @@ -159,6 +196,7 @@ mod deserialization { client_cert, grpc, ca_certs, + handler: None, }) } } diff --git a/src/execute.rs b/src/execute.rs index 44ba1bf7..6e603b16 100644 --- a/src/execute.rs +++ b/src/execute.rs @@ -6,7 +6,7 @@ use { adapt, body::Body, body_tee::tee, - cache::Cache, + cache::{Cache, InMemoryCache}, component as compute, config::{ Backends, DeviceDetection, Dictionaries, ExperimentalModule, Geolocation, @@ -54,6 +54,9 @@ use { wasmtime_wasi::I32Exit, }; +use std::collections::BTreeMap; +use tokio::sync::RwLock; + pub const DEFAULT_EPOCH_INTERRUPTION_PERIOD: Duration = Duration::from_micros(50); const NEXT_REQ_PENDING_MAX: usize = 5; @@ -83,6 +86,55 @@ pub struct GuestProfileConfig { pub sample_period: Duration, } +/// A handle to a running guest computation. +pub struct GuestHandle { + pub(crate) handle: tokio::task::JoinHandle>, +} + +/// Wait for a guest to complete and return any error. +pub async fn run_to_completion(guest: GuestHandle) -> Option { + match guest.handle.await { + Ok(Ok(())) => None, + Ok(Err(ExecutionError::WasmTrap(e))) => Some(e), + Ok(Err(e)) => Some(anyhow::anyhow!("guest execution failed: {}", e)), + Err(e) => Some(anyhow::anyhow!("guest task panicked: {}", e)), + } +} + +/// Monitors logging endpoints by name, routing messages to registered listeners. +#[derive(Clone, Default)] +pub struct EndpointsMonitor { + pub endpoints: Arc, tokio::sync::mpsc::Sender>>>>, +} + +impl EndpointsMonitor { + /// Register a listener for the given endpoint name. + pub fn register_listener(&self, name: impl Into>) -> EndpointListener { + let (tx, rx) = tokio::sync::mpsc::channel(1024); + let name = name.into(); + let endpoints = self.endpoints.clone(); + // Use blocking_write since this is typically called from sync context during setup + endpoints.blocking_write().insert(name, tx); + EndpointListener { receiver: rx } + } +} + +/// Receives messages from a monitored logging endpoint. +pub struct EndpointListener { + receiver: tokio::sync::mpsc::Receiver>, +} + +impl EndpointListener { + /// Drain all currently available messages without blocking. + pub fn messages(&mut self) -> Vec> { + let mut msgs = Vec::new(); + while let Ok(msg) = self.receiver.try_recv() { + msgs.push(msg); + } + msgs + } +} + pub struct NextRequest(Option<(DownstreamRequest, Arc)>); impl NextRequest { @@ -151,7 +203,7 @@ pub struct ExecuteCtx { /// The shielding sites for this execution. shielding_sites: ShieldingSites, /// The cache for this service. - cache: Arc, + cache: InMemoryCache, /// Senders waiting for new requests for reusable sessions. pending_reuse: Arc>>>, epoch_increment_thread: Option>, @@ -159,6 +211,11 @@ pub struct ExecuteCtx { epoch_increment_stop: Arc, /// Configuration for guest profiling if enabled guest_profile_config: Option>, + /// Monitor for logging endpoints. + endpoints_monitor: EndpointsMonitor, + /// Optional interceptor for dynamic backend registration. + dynamic_backend_interceptor: + Option>, } impl ExecuteCtx { @@ -304,8 +361,10 @@ impl ExecuteCtx { epoch_increment_thread, epoch_increment_stop, guest_profile_config: guest_profile_config.map(|c| Arc::new(c)), - cache: Arc::new(Cache::default()), + cache: InMemoryCache::default(), pending_reuse: Arc::new(AsyncMutex::new(vec![])), + endpoints_monitor: EndpointsMonitor::default(), + dynamic_backend_interceptor: None, }; Ok(ExecuteCtxBuilder { inner }) @@ -425,7 +484,7 @@ impl ExecuteCtx { mut incoming_req: Request, local: SocketAddr, remote: SocketAddr, - ) -> Result<(Response, Option), Error> { + ) -> Result<(Response, Option, Option), Error> { let orig_req_on_upgrade = hyper::upgrade::on(&mut incoming_req); let (incoming_req_parts, incoming_req_body) = incoming_req.into_parts(); let local_pushpin_proxy_port = self.local_pushpin_proxy_port; @@ -449,7 +508,7 @@ impl ExecuteCtx { original_headers, }; - let (resp, mut err) = self.reuse_or_spawn_guest(req, metadata).await; + let (resp, mut err, guest_handle) = self.reuse_or_spawn_guest(req, metadata).await; let span = info_span!("request", id = req_id); let _span = span.enter(); @@ -472,7 +531,7 @@ impl ExecuteCtx { let resp = Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::from(hyper::Body::from(err.to_string())))?; - return Ok((resp, Some(err))); + return Ok((resp, Some(err), None)); } Some(port) => port, }; @@ -488,7 +547,7 @@ impl ExecuteCtx { .await; let (p, hyper_body) = proxy_resp.into_parts(); - return Ok((Response::from_parts(p, Body::from(hyper_body)), None)); + return Ok((Response::from_parts(p, Body::from(hyper_body)), None, None)); } Err(e) => { err = Some(e); @@ -496,7 +555,7 @@ impl ExecuteCtx { } } - Ok((resp, err)) + Ok((resp, err, guest_handle)) } /// Spawn a new guest to process a request whose processing was never attempted by @@ -509,7 +568,7 @@ impl ExecuteCtx { tokio::task::spawn(async move { let (sender, receiver) = oneshot::channel(); let original = std::mem::replace(&mut downstream.sender, sender); - let (resp, err) = self.spawn_guest(downstream, receiver).await; + let (resp, err, _guest_handle) = self.spawn_guest(downstream, receiver).await; let resp = guest_result_to_response(resp, err); let _ = original.send(DownstreamResponse::Http(resp)); }); @@ -531,7 +590,7 @@ impl ExecuteCtx { self: Arc, req: Request, metadata: DownstreamMetadata, - ) -> (Response, Option) { + ) -> (Response, Option, Option) { let (sender, receiver) = oneshot::channel(); let downstream = DownstreamRequest { req, @@ -549,9 +608,9 @@ impl ExecuteCtx { drop(reusable); if let Some(response) = Self::maybe_receive_response(receiver).await { - return response; + return (response.0, response.1, None); } - return (Response::default(), None); + return (Response::default(), None, None); } Err(nr) => next_req = nr, } @@ -569,7 +628,7 @@ impl ExecuteCtx { self: Arc, downstream: DownstreamRequest, receiver: oneshot::Receiver, - ) -> (Response, Option) { + ) -> (Response, Option, Option) { let active_cpu_time_us = Arc::new(AtomicU64::new(0)); // Spawn a separate task to run the guest code. That allows _this_ method to return a response early @@ -582,21 +641,21 @@ impl ExecuteCtx { )); if let Some(response) = Self::maybe_receive_response(receiver).await { - return response; + return (response.0, response.1, Some(GuestHandle { handle: guest_handle })); } match guest_handle .await .expect("guest worker finished without panicking") { - Ok(_) => (Response::new(Body::empty()), None), + Ok(_) => (Response::new(Body::empty()), None, None), Err(ExecutionError::WasmTrap(e)) => { event!( Level::ERROR, "There was an error handling the request {}", e.to_string() ); - (anyhow_response(&e), Some(e)) + (anyhow_response(&e), Some(e), None) } Err(e) => panic!("failed to run guest: {}", e), } @@ -850,10 +909,60 @@ impl ExecuteCtx { result } + /// Create a fork of this execution context, sharing the engine and compiled module + /// but with fresh per-test state (cache, request IDs, pending reuse). + pub fn fork(self: &Arc) -> ExecuteCtxBuilder { + ExecuteCtxBuilder { + inner: ExecuteCtx { + engine: self.engine.clone(), + instance_pre: self.instance_pre.clone(), + acls: self.acls.clone(), + backends: self.backends.clone(), + device_detection: self.device_detection.clone(), + geolocation: self.geolocation.clone(), + tls_config: self.tls_config.clone(), + dictionaries: self.dictionaries.clone(), + config_path: self.config_path.clone(), + capture_logs: self.capture_logs.clone(), + log_stdout: self.log_stdout, + log_stderr: self.log_stderr, + local_pushpin_proxy_port: self.local_pushpin_proxy_port, + object_store: self.object_store.clone(), + secret_stores: self.secret_stores.clone(), + shielding_sites: self.shielding_sites.clone(), + guest_profile_config: self.guest_profile_config.clone(), + // Fresh per-test state: + cache: InMemoryCache::new(), + next_req_id: Arc::new(AtomicU64::new(0)), + pending_reuse: Arc::new(AsyncMutex::new(vec![])), + // Don't own the epoch thread (parent owns it) + epoch_increment_thread: None, + epoch_increment_stop: self.epoch_increment_stop.clone(), + // New fields default: + dynamic_backend_interceptor: None, + endpoints_monitor: EndpointsMonitor::default(), + }, + } + } + pub fn cache(&self) -> &Arc { + &self.cache.0 + } + + pub fn in_memory_cache(&self) -> &InMemoryCache { &self.cache } + pub fn endpoints_monitor(&self) -> &EndpointsMonitor { + &self.endpoints_monitor + } + + pub fn dynamic_backend_interceptor( + &self, + ) -> Option<&dyn crate::config::DynamicBackendRegistrationInterceptor> { + self.dynamic_backend_interceptor.as_deref() + } + pub fn config_path(&self) -> Option<&Path> { self.config_path.as_deref() } @@ -944,6 +1053,27 @@ impl ExecuteCtxBuilder { self } + /// Set the dynamic backend registration interceptor. + pub fn with_dynamic_backend_interceptor( + mut self, + interceptor: Arc, + ) -> Self { + self.inner.dynamic_backend_interceptor = Some(interceptor); + self + } + + /// Set the endpoints monitor for this execution context. + pub fn with_endpoints(mut self, endpoints_monitor: EndpointsMonitor) -> Self { + self.inner.endpoints_monitor = endpoints_monitor; + self + } + + /// Set the cache for this execution context. + pub fn with_cache(mut self, cache: InMemoryCache) -> Self { + self.inner.cache = cache; + self + } + /// Set the path to the config for this execution context. pub fn with_config_path(mut self, config_path: PathBuf) -> Self { self.inner.config_path = Some(config_path); diff --git a/src/lib.rs b/src/lib.rs index 1c987b4d..d5075030 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,16 @@ mod upstream; pub mod wiggle_abi; pub use { - error::Error, execute::ExecuteCtx, execute::GuestProfileConfig, service::ViceroyService, - upstream::BackendConnector, wasmtime::ProfilingStrategy, + cache::InMemoryCache, + error::Error, + execute::{ + run_to_completion, EndpointListener, EndpointsMonitor, ExecuteCtx, ExecuteCtxBuilder, + GuestHandle, GuestProfileConfig, + }, + service::ViceroyService, + upstream::BackendConnector, + wasmtime::ProfilingStrategy, }; +pub use async_trait; +pub use http; +pub use hyper; diff --git a/src/logging.rs b/src/logging.rs index 21f0baf4..665e135a 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -11,6 +11,7 @@ use tokio::io::AsyncWrite; pub struct LogEndpoint { name: Vec, writer: Arc>, + channel_sender: Option>>, } impl LogEndpoint { @@ -20,9 +21,16 @@ impl LogEndpoint { LogEndpoint { name: name.to_owned(), writer, + channel_sender: None, } } + /// Attach a channel sender to this endpoint for monitoring. + pub fn with_channel(mut self, sender: tokio::sync::mpsc::Sender>) -> Self { + self.channel_sender = Some(sender); + self + } + /// Write a log entry to this endpoint. /// /// Log entries are prefixed with the endpoint name and terminated with a newline. @@ -42,6 +50,12 @@ impl LogEndpoint { return Ok(()); } + // If a channel sender is present, send the raw message bytes via the channel + if let Some(sender) = &self.channel_sender { + let _ = sender.try_send(msg.to_vec()); + return Ok(()); + } + // Accumulate log entry into a buffer before writing, while escaping newlines let mut to_write = Vec::with_capacity(msg.len() + self.name.len() + LOG_ENDPOINT_DELIM.len() + 1); diff --git a/src/session.rs b/src/session.rs index edded562..f7ba41ac 100644 --- a/src/session.rs +++ b/src/session.rs @@ -583,7 +583,18 @@ impl Session { if let Some(handle) = self.log_endpoints_by_name.get(name).copied() { return handle; } - let endpoint = LogEndpoint::new(name, self.capture_logs.clone()); + let mut endpoint = LogEndpoint::new(name, self.capture_logs.clone()); + // Check if the endpoints monitor has a registered sender for this name + if let Some(sender) = self + .ctx + .endpoints_monitor() + .endpoints + .blocking_read() + .get(name) + .cloned() + { + endpoint = endpoint.with_channel(sender); + } let handle = self.log_endpoints.push(endpoint); self.log_endpoints_by_name.insert(name.to_owned(), handle); handle @@ -649,6 +660,12 @@ impl Session { return false; } + let info = if let Some(interceptor) = self.ctx.dynamic_backend_interceptor() { + interceptor.register(info) + } else { + info + }; + self.dynamic_backends .insert(name.to_string(), Arc::new(info)); diff --git a/src/upstream.rs b/src/upstream.rs index e796483c..e1a65ff6 100644 --- a/src/upstream.rs +++ b/src/upstream.rs @@ -336,34 +336,40 @@ pub fn send_request( *req.uri_mut() = uri; let h2only = backend.grpc; + let handler = backend.handler.clone(); async move { - let mut builder = Client::builder(); + let basic_response = if let Some(handler) = &handler { + handler.handle(req).await + } else { + let mut builder = Client::builder(); - if req.version() == Version::HTTP_2 { - builder.http2_only(true); - } + if req.version() == Version::HTTP_2 { + builder.http2_only(true); + } - let is_pass = req - .extensions() - .get::() - .map(CacheOverride::is_pass) - .unwrap_or_default(); - - let mut basic_response = builder - .set_host(false) - .http2_only(h2only) - .build(connector) - .request(req) - .await - .map_err(|e| { - eprintln!("Error: {:?}", e); - e - })?; - - if let Some(md) = basic_response.extensions_mut().get_mut::() { - // This is used later to create similar behaviour between Compute and Viceroy. - md.direct_pass = is_pass; - } + let is_pass = req + .extensions() + .get::() + .map(CacheOverride::is_pass) + .unwrap_or_default(); + + let mut resp = builder + .set_host(false) + .http2_only(h2only) + .build(connector) + .request(req) + .await + .map_err(|e| { + eprintln!("Error: {:?}", e); + e + })?; + + if let Some(md) = resp.extensions_mut().get_mut::() { + md.direct_pass = is_pass; + } + + resp + }; if try_decompression && basic_response diff --git a/src/wiggle_abi/req_impl.rs b/src/wiggle_abi/req_impl.rs index 4f010e68..e3cfe980 100644 --- a/src/wiggle_abi/req_impl.rs +++ b/src/wiggle_abi/req_impl.rs @@ -564,6 +564,7 @@ impl FastlyHttpReq for Session { grpc, client_cert, ca_certs, + handler: None, }; if !self.add_backend(&name, new_backend) { diff --git a/src/wiggle_abi/shielding.rs b/src/wiggle_abi/shielding.rs index a68a822e..cc5ab9d9 100644 --- a/src/wiggle_abi/shielding.rs +++ b/src/wiggle_abi/shielding.rs @@ -95,6 +95,7 @@ impl fastly_shielding::FastlyShielding for Session { grpc: false, client_cert: None, ca_certs: Vec::new(), + handler: None, }; if !self.add_backend(&new_name, new_backend) { From 898b4f527e4e5bfd547acf3e25cb012bc9edd2a1 Mon Sep 17 00:00:00 2001 From: Dominic Petrick Date: Sun, 8 Mar 2026 16:09:11 +0100 Subject: [PATCH 2/6] =?UTF-8?q?src/execute.rs=20=E2=80=94=20New=20to=5Fbui?= =?UTF-8?q?lder()=20method=20on=20ExecuteCtx=20that=20creates=20a=20fresh?= =?UTF-8?q?=20ExecuteCtxBuilder=20reusing=20the=20already-compiled=20Wasm?= =?UTF-8?q?=20engine=20and=20instance=5Fpre.=20Unlike=20fork()=20(which=20?= =?UTF-8?q?copies=20existing=20config),=20this=20starts=20with=20all-defau?= =?UTF-8?q?lt=20configuration=20(empty=20backends,=20dictionaries,=20etc.)?= =?UTF-8?q?=20so=20the=20caller=20can=20set=20everything=20up=20from=20scr?= =?UTF-8?q?atch.=20It=20also=20spawns=20its=20own=20epoch=20increment=20th?= =?UTF-8?q?read.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit src/lib.rs — Re-exports ObjectKey and ObjectStoreKey from the object_store module, making them part of the public API. src/session.rs — Changes endpoint monitor lookup from blocking_read() to try_read() on the RwLock, avoiding potential deadlocks if the lock is already held. Falls through silently if the lock can't be acquired. --- src/execute.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/session.rs | 13 ++++--------- 3 files changed, 56 insertions(+), 9 deletions(-) diff --git a/src/execute.rs b/src/execute.rs index 6e603b16..7d083fbf 100644 --- a/src/execute.rs +++ b/src/execute.rs @@ -390,6 +390,57 @@ impl ExecuteCtx { .finish() } + /// Create a new builder that reuses the compiled wasm module from this context. + /// + /// This is much cheaper than calling `build()` again, as it skips wasm compilation. + /// All configuration (backends, dictionaries, object stores, etc.) starts at defaults + /// and must be set on the returned builder. + pub fn to_builder(&self) -> Result { + let epoch_increment_stop = Arc::new(AtomicBool::new(false)); + let engine_clone = self.engine.clone(); + let epoch_increment_stop_clone = epoch_increment_stop.clone(); + let sample_period = self + .guest_profile_config + .as_ref() + .map(|c| c.sample_period) + .unwrap_or(DEFAULT_EPOCH_INTERRUPTION_PERIOD); + let epoch_increment_thread = Some(thread::spawn(move || { + while !epoch_increment_stop_clone.load(Ordering::Relaxed) { + thread::sleep(sample_period); + engine_clone.increment_epoch(); + } + })); + + Ok(ExecuteCtxBuilder { + inner: Self { + engine: self.engine.clone(), + instance_pre: self.instance_pre.clone(), + acls: Acls::new(), + backends: Backends::default(), + device_detection: DeviceDetection::default(), + geolocation: Geolocation::default(), + tls_config: TlsConfig::new()?, + dictionaries: Dictionaries::default(), + config_path: None, + capture_logs: Arc::new(Mutex::new(std::io::stdout())), + log_stdout: false, + log_stderr: false, + local_pushpin_proxy_port: None, + next_req_id: Arc::new(AtomicU64::new(0)), + object_store: ObjectStores::new(), + secret_stores: SecretStores::new(), + shielding_sites: ShieldingSites::new(), + epoch_increment_thread, + epoch_increment_stop, + guest_profile_config: self.guest_profile_config.clone(), + cache: InMemoryCache::default(), + pending_reuse: Arc::new(AsyncMutex::new(vec![])), + endpoints_monitor: EndpointsMonitor::default(), + dynamic_backend_interceptor: None, + }, + }) + } + /// Get the engine for this execution context. pub fn engine(&self) -> &Engine { &self.engine diff --git a/src/lib.rs b/src/lib.rs index d5075030..ee8079b2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,7 @@ pub use { run_to_completion, EndpointListener, EndpointsMonitor, ExecuteCtx, ExecuteCtxBuilder, GuestHandle, GuestProfileConfig, }, + object_store::{ObjectKey, ObjectStoreKey}, service::ViceroyService, upstream::BackendConnector, wasmtime::ProfilingStrategy, diff --git a/src/session.rs b/src/session.rs index f7ba41ac..d9c1c61e 100644 --- a/src/session.rs +++ b/src/session.rs @@ -585,15 +585,10 @@ impl Session { } let mut endpoint = LogEndpoint::new(name, self.capture_logs.clone()); // Check if the endpoints monitor has a registered sender for this name - if let Some(sender) = self - .ctx - .endpoints_monitor() - .endpoints - .blocking_read() - .get(name) - .cloned() - { - endpoint = endpoint.with_channel(sender); + if let Ok(endpoints) = self.ctx.endpoints_monitor().endpoints.try_read() { + if let Some(sender) = endpoints.get(name).cloned() { + endpoint = endpoint.with_channel(sender); + } } let handle = self.log_endpoints.push(endpoint); self.log_endpoints_by_name.insert(name.to_owned(), handle); From b60044cf0c4248a50c4cc425d7085df103cef066 Mon Sep 17 00:00:00 2001 From: Dominic Petrick Date: Sun, 22 Mar 2026 11:11:50 +0100 Subject: [PATCH 3/6] Remove obsolete code. --- src/cache.rs | 31 -------------------- src/execute.rs | 76 ++++++++++---------------------------------------- src/lib.rs | 8 +++--- 3 files changed, 18 insertions(+), 97 deletions(-) diff --git a/src/cache.rs b/src/cache.rs index f5c624f5..9cd96933 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -557,37 +557,6 @@ impl std::str::FromStr for SurrogateKey { } } -/// A thin public wrapper around `Cache` that provides a convenient API for integration testing. -#[derive(Clone)] -pub struct InMemoryCache(pub(crate) Arc); - -impl Default for InMemoryCache { - fn default() -> Self { - Self(Arc::new(Cache::default())) - } -} - -impl InMemoryCache { - pub fn new() -> Self { - Self::default() - } - - /// Purge cache entries matching the given surrogate key strings. - pub fn purge(&self, surrogates: Vec) { - for s in surrogates { - if let Ok(key) = s.parse::() { - self.0.purge(key, false); - } - } - } -} - -impl std::fmt::Display for InMemoryCache { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "InMemoryCache") - } -} - #[cfg(test)] mod tests { use std::rc::Rc; diff --git a/src/execute.rs b/src/execute.rs index 7d083fbf..70e45df5 100644 --- a/src/execute.rs +++ b/src/execute.rs @@ -6,7 +6,7 @@ use { adapt, body::Body, body_tee::tee, - cache::{Cache, InMemoryCache}, + cache::Cache, component as compute, config::{ Backends, DeviceDetection, Dictionaries, ExperimentalModule, Geolocation, @@ -203,7 +203,7 @@ pub struct ExecuteCtx { /// The shielding sites for this execution. shielding_sites: ShieldingSites, /// The cache for this service. - cache: InMemoryCache, + cache: Arc, /// Senders waiting for new requests for reusable sessions. pending_reuse: Arc>>>, epoch_increment_thread: Option>, @@ -361,7 +361,7 @@ impl ExecuteCtx { epoch_increment_thread, epoch_increment_stop, guest_profile_config: guest_profile_config.map(|c| Arc::new(c)), - cache: InMemoryCache::default(), + cache: Arc::new(Cache::default()), pending_reuse: Arc::new(AsyncMutex::new(vec![])), endpoints_monitor: EndpointsMonitor::default(), dynamic_backend_interceptor: None, @@ -390,57 +390,6 @@ impl ExecuteCtx { .finish() } - /// Create a new builder that reuses the compiled wasm module from this context. - /// - /// This is much cheaper than calling `build()` again, as it skips wasm compilation. - /// All configuration (backends, dictionaries, object stores, etc.) starts at defaults - /// and must be set on the returned builder. - pub fn to_builder(&self) -> Result { - let epoch_increment_stop = Arc::new(AtomicBool::new(false)); - let engine_clone = self.engine.clone(); - let epoch_increment_stop_clone = epoch_increment_stop.clone(); - let sample_period = self - .guest_profile_config - .as_ref() - .map(|c| c.sample_period) - .unwrap_or(DEFAULT_EPOCH_INTERRUPTION_PERIOD); - let epoch_increment_thread = Some(thread::spawn(move || { - while !epoch_increment_stop_clone.load(Ordering::Relaxed) { - thread::sleep(sample_period); - engine_clone.increment_epoch(); - } - })); - - Ok(ExecuteCtxBuilder { - inner: Self { - engine: self.engine.clone(), - instance_pre: self.instance_pre.clone(), - acls: Acls::new(), - backends: Backends::default(), - device_detection: DeviceDetection::default(), - geolocation: Geolocation::default(), - tls_config: TlsConfig::new()?, - dictionaries: Dictionaries::default(), - config_path: None, - capture_logs: Arc::new(Mutex::new(std::io::stdout())), - log_stdout: false, - log_stderr: false, - local_pushpin_proxy_port: None, - next_req_id: Arc::new(AtomicU64::new(0)), - object_store: ObjectStores::new(), - secret_stores: SecretStores::new(), - shielding_sites: ShieldingSites::new(), - epoch_increment_thread, - epoch_increment_stop, - guest_profile_config: self.guest_profile_config.clone(), - cache: InMemoryCache::default(), - pending_reuse: Arc::new(AsyncMutex::new(vec![])), - endpoints_monitor: EndpointsMonitor::default(), - dynamic_backend_interceptor: None, - }, - }) - } - /// Get the engine for this execution context. pub fn engine(&self) -> &Engine { &self.engine @@ -692,7 +641,13 @@ impl ExecuteCtx { )); if let Some(response) = Self::maybe_receive_response(receiver).await { - return (response.0, response.1, Some(GuestHandle { handle: guest_handle })); + return ( + response.0, + response.1, + Some(GuestHandle { + handle: guest_handle, + }), + ); } match guest_handle @@ -983,24 +938,21 @@ impl ExecuteCtx { shielding_sites: self.shielding_sites.clone(), guest_profile_config: self.guest_profile_config.clone(), // Fresh per-test state: - cache: InMemoryCache::new(), + cache: Arc::new(Cache::default()), next_req_id: Arc::new(AtomicU64::new(0)), pending_reuse: Arc::new(AsyncMutex::new(vec![])), // Don't own the epoch thread (parent owns it) epoch_increment_thread: None, epoch_increment_stop: self.epoch_increment_stop.clone(), - // New fields default: + // New interceptor. dynamic_backend_interceptor: None, + // New endpoints monitor. endpoints_monitor: EndpointsMonitor::default(), }, } } pub fn cache(&self) -> &Arc { - &self.cache.0 - } - - pub fn in_memory_cache(&self) -> &InMemoryCache { &self.cache } @@ -1120,7 +1072,7 @@ impl ExecuteCtxBuilder { } /// Set the cache for this execution context. - pub fn with_cache(mut self, cache: InMemoryCache) -> Self { + pub fn with_cache(mut self, cache: Arc) -> Self { self.inner.cache = cache; self } diff --git a/src/lib.rs b/src/lib.rs index ee8079b2..c4a3ef0b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,8 +41,11 @@ mod streaming_body; mod upstream; pub mod wiggle_abi; +pub use async_trait; +pub use http; +pub use hyper; pub use { - cache::InMemoryCache, + cache::Cache, error::Error, execute::{ run_to_completion, EndpointListener, EndpointsMonitor, ExecuteCtx, ExecuteCtxBuilder, @@ -53,6 +56,3 @@ pub use { upstream::BackendConnector, wasmtime::ProfilingStrategy, }; -pub use async_trait; -pub use http; -pub use hyper; From 0535cbf19228d9c2fe2f4dc87fe2aa63aa066edc Mon Sep 17 00:00:00 2001 From: Dominic Petrick Date: Sun, 22 Mar 2026 11:28:56 +0100 Subject: [PATCH 4/6] Add purge_all convenience method to Cache --- src/cache.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/cache.rs b/src/cache.rs index 9cd96933..cc8cce49 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -370,6 +370,17 @@ impl Cache { .map(|(_, entry)| entry.purge(&key, soft_purge)) .sum() } + + /// Purge/soft-purge all cache entries corresponding to the given surrogate key. + /// Returns the number of entries (variants) purged. + /// + /// Note: this does not block concurrent reads _or inserts_; an insertion can race with the + /// purge. + pub fn purge_all(&self, keys: Vec, soft_purge: bool) -> usize { + keys.into_iter() + .map(|key| self.purge(key, soft_purge)) + .sum() + } } /// Options that can be applied to a write, e.g. insert or transaction_insert. From 182bfedcda2965a9aea5c324173947f44bbc13d5 Mon Sep 17 00:00:00 2001 From: Dominic Petrick Date: Sun, 22 Mar 2026 11:42:01 +0100 Subject: [PATCH 5/6] Add debug implementation to Cache struct --- src/cache.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cache.rs b/src/cache.rs index cc8cce49..1c755d74 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -285,6 +285,7 @@ impl Found { // Explain some about how this works: // - Request collapsing // - Stale-while-revalidate +#[derive(Debug)] pub struct Cache { inner: moka::future::Cache>, } From 2c004ba787a57eb8c576bf7000fc714f46dc7ac8 Mon Sep 17 00:00:00 2001 From: Dominic Petrick Date: Sun, 22 Mar 2026 12:36:37 +0100 Subject: [PATCH 6/6] Display for cache --- src/cache.rs | 27 ++++++++++++- src/cache/store.rs | 90 +++++++++++++++++++++++++++++++++++++++++++ src/cache/variance.rs | 22 ++++++++++- 3 files changed, 137 insertions(+), 2 deletions(-) diff --git a/src/cache.rs b/src/cache.rs index 1c755d74..8ee37627 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,5 +1,5 @@ use core::str; -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{collections::HashSet, fmt, sync::Arc, time::Duration}; use bytes::Bytes; #[cfg(test)] @@ -129,6 +129,20 @@ impl TryFrom<&str> for CacheKey { } } +impl fmt::Display for CacheKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match str::from_utf8(&self.0) { + Ok(s) => write!(f, "{s}"), + Err(_) => { + for byte in &self.0 { + write!(f, "{byte:02x}")?; + } + Ok(()) + } + } + } +} + /// The result of a lookup: the object (if found), and/or an obligation to fetch. #[derive(Debug)] pub struct CacheEntry { @@ -384,6 +398,17 @@ impl Cache { } } +impl fmt::Display for Cache { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "Cache {{")?; + for (key, objects) in self.inner.iter() { + writeln!(f, " key: {key}")?; + write!(f, "{:4}", &*objects)?; + } + write!(f, "}}") + } +} + /// Options that can be applied to a write, e.g. insert or transaction_insert. #[derive(Default, Clone)] pub struct WriteOptions { diff --git a/src/cache/store.rs b/src/cache/store.rs index 5a2b0864..a6ded721 100644 --- a/src/cache/store.rs +++ b/src/cache/store.rs @@ -4,6 +4,7 @@ use crate::cache::{variance::VaryRule, Error}; use bytes::Bytes; use std::{ collections::{HashMap, VecDeque}, + fmt, future::Future, sync::{atomic::AtomicBool, Arc}, time::{Duration, Instant}, @@ -123,6 +124,44 @@ impl ObjectMeta { } } +impl fmt::Display for ObjectMeta { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let indent = f.width().unwrap_or(0); + let i = " ".repeat(indent); + + writeln!(f, "{i}ObjectMeta:")?; + writeln!(f, "{i} age: {:?}", self.age())?; + writeln!(f, "{i} max_age: {:?}", self.max_age)?; + writeln!(f, "{i} stale_while_revalidate: {:?}", self.stale_while_revalidate)?; + writeln!(f, "{i} fresh: {}", self.is_fresh())?; + writeln!(f, "{i} usable: {}", self.is_usable())?; + writeln!(f, "{i} vary_rule: {}", self.vary_rule)?; + writeln!(f, "{i} surrogate_keys: {}", self.surrogate_keys)?; + writeln!(f, "{i} length: {:?}", self.length)?; + writeln!( + f, + "{i} soft_purge: {}", + self.soft_purge.load(std::sync::atomic::Ordering::SeqCst) + )?; + + // Show request headers + writeln!(f, "{i} request_headers:")?; + for (name, value) in self.request_headers.iter() { + writeln!(f, "{i} {}: {}", name, value.to_str().unwrap_or(""))?; + } + + // Show user_metadata as UTF-8 if possible + if !self.user_metadata.is_empty() { + match std::str::from_utf8(&self.user_metadata) { + Ok(s) => writeln!(f, "{i} user_metadata: {s}")?, + Err(_) => writeln!(f, "{i} user_metadata: ({} bytes)", self.user_metadata.len())?, + } + } + + Ok(()) + } +} + /// Object(s) indexed by a CacheKey. #[derive(Debug, Default)] pub struct CacheKeyObjects(watch::Sender); @@ -481,6 +520,57 @@ struct CacheValue { obligated: bool, } +impl fmt::Display for CacheKeyObjects { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let indent = f.width().unwrap_or(0); + let inner = self.0.borrow(); + fmt::Display::fmt(&IndentWrapper(&*inner, indent), f) + } +} + +/// Helper to pass indent through Display since CacheKeyObjectsInner is private. +struct IndentWrapper<'a>(&'a CacheKeyObjectsInner, usize); + +impl fmt::Display for IndentWrapper<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let inner = self.0; + let indent = self.1; + let i = " ".repeat(indent); + + // Vary rules + writeln!(f, "{i}vary_rules:")?; + for rule in inner.vary_rules.iter() { + writeln!(f, "{i} - {rule}")?; + } + + // Objects (variants) + writeln!(f, "{i}variants:")?; + for (variant, value) in inner.objects.iter() { + writeln!(f, "{i} {variant}:")?; + if let Some(data) = &value.present { + write!(f, "{:indent$}", data, indent = indent + 4)?; + } else { + writeln!(f, "{i} (empty)")?; + } + writeln!(f, "{i} obligated: {}", value.obligated)?; + } + + Ok(()) + } +} + +impl fmt::Display for CacheData { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let indent = f.width().unwrap_or(0); + write!(f, "{:indent$}", self.meta, indent = indent)?; + let i = " ".repeat(indent); + if let Some(len) = self.length() { + writeln!(f, "{i}body_length: {len}")?; + } + Ok(()) + } +} + /// An obligation to fetch & update the cache. #[derive(Debug)] pub struct Obligation { diff --git a/src/cache/variance.rs b/src/cache/variance.rs index 3184e156..cbb3f1b7 100644 --- a/src/cache/variance.rs +++ b/src/cache/variance.rs @@ -9,7 +9,7 @@ //! The core cache API provides the bones of this. //! -use std::{collections::HashSet, str::FromStr}; +use std::{collections::HashSet, fmt, str::FromStr}; use bytes::{Bytes, BytesMut}; pub use http::HeaderName; @@ -84,6 +84,26 @@ pub struct Variant { signature: Bytes, } +impl fmt::Display for VaryRule { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.headers.is_empty() { + write!(f, "VaryRule([])") + } else { + let headers: Vec<&str> = self.headers.iter().map(|h| h.as_str()).collect(); + write!(f, "VaryRule([{}])", headers.join(", ")) + } + } +} + +impl fmt::Display for Variant { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match std::str::from_utf8(&self.signature) { + Ok(s) => write!(f, "Variant({s})"), + Err(_) => write!(f, "Variant({:?})", self.signature), + } + } +} + #[cfg(test)] mod tests { use super::VaryRule;