diff --git a/cli/tests/integration/common.rs b/cli/tests/integration/common.rs index c367ef92..a54b446f 100644 --- a/cli/tests/integration/common.rs +++ b/cli/tests/integration/common.rs @@ -418,8 +418,8 @@ impl Test { .await .map(|result| { match result { - (resp, None) => resp, - (_, Some(err)) => { + (resp, None, _) => resp, + (_, Some(err), _) => { // Splat the string representation of the runtime error into a synthetic // 500. This is a bit of a hack, but good enough to check for expected error // strings. diff --git a/cli/tests/integration/common/backends.rs b/cli/tests/integration/common/backends.rs index 309620d4..e0f7f01c 100644 --- a/cli/tests/integration/common/backends.rs +++ b/cli/tests/integration/common/backends.rs @@ -76,6 +76,7 @@ impl TestBackends { grpc: false, client_cert: None, ca_certs: vec![], + handler: None, }; backends.insert(name.to_string(), Arc::new(backend_config)); } diff --git a/src/cache.rs b/src/cache.rs index 9cd96933..8ee37627 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,5 +1,5 @@ use core::str; -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{collections::HashSet, fmt, sync::Arc, time::Duration}; use bytes::Bytes; #[cfg(test)] @@ -129,6 +129,20 @@ impl TryFrom<&str> for CacheKey { } } +impl fmt::Display for CacheKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match str::from_utf8(&self.0) { + Ok(s) => write!(f, "{s}"), + Err(_) => { + for byte in &self.0 { + write!(f, "{byte:02x}")?; + } + Ok(()) + } + } + } +} + /// The result of a lookup: the object (if found), and/or an obligation to fetch. #[derive(Debug)] pub struct CacheEntry { @@ -285,6 +299,7 @@ impl Found { // Explain some about how this works: // - Request collapsing // - Stale-while-revalidate +#[derive(Debug)] pub struct Cache { inner: moka::future::Cache>, } @@ -370,6 +385,28 @@ impl Cache { .map(|(_, entry)| entry.purge(&key, soft_purge)) .sum() } + + /// Purge/soft-purge all cache entries corresponding to the given surrogate key. + /// Returns the number of entries (variants) purged. + /// + /// Note: this does not block concurrent reads _or inserts_; an insertion can race with the + /// purge. + pub fn purge_all(&self, keys: Vec, soft_purge: bool) -> usize { + keys.into_iter() + .map(|key| self.purge(key, soft_purge)) + .sum() + } +} + +impl fmt::Display for Cache { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "Cache {{")?; + for (key, objects) in self.inner.iter() { + writeln!(f, " key: {key}")?; + write!(f, "{:4}", &*objects)?; + } + write!(f, "}}") + } } /// Options that can be applied to a write, e.g. insert or transaction_insert. diff --git a/src/cache/store.rs b/src/cache/store.rs index 5a2b0864..a6ded721 100644 --- a/src/cache/store.rs +++ b/src/cache/store.rs @@ -4,6 +4,7 @@ use crate::cache::{variance::VaryRule, Error}; use bytes::Bytes; use std::{ collections::{HashMap, VecDeque}, + fmt, future::Future, sync::{atomic::AtomicBool, Arc}, time::{Duration, Instant}, @@ -123,6 +124,44 @@ impl ObjectMeta { } } +impl fmt::Display for ObjectMeta { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let indent = f.width().unwrap_or(0); + let i = " ".repeat(indent); + + writeln!(f, "{i}ObjectMeta:")?; + writeln!(f, "{i} age: {:?}", self.age())?; + writeln!(f, "{i} max_age: {:?}", self.max_age)?; + writeln!(f, "{i} stale_while_revalidate: {:?}", self.stale_while_revalidate)?; + writeln!(f, "{i} fresh: {}", self.is_fresh())?; + writeln!(f, "{i} usable: {}", self.is_usable())?; + writeln!(f, "{i} vary_rule: {}", self.vary_rule)?; + writeln!(f, "{i} surrogate_keys: {}", self.surrogate_keys)?; + writeln!(f, "{i} length: {:?}", self.length)?; + writeln!( + f, + "{i} soft_purge: {}", + self.soft_purge.load(std::sync::atomic::Ordering::SeqCst) + )?; + + // Show request headers + writeln!(f, "{i} request_headers:")?; + for (name, value) in self.request_headers.iter() { + writeln!(f, "{i} {}: {}", name, value.to_str().unwrap_or(""))?; + } + + // Show user_metadata as UTF-8 if possible + if !self.user_metadata.is_empty() { + match std::str::from_utf8(&self.user_metadata) { + Ok(s) => writeln!(f, "{i} user_metadata: {s}")?, + Err(_) => writeln!(f, "{i} user_metadata: ({} bytes)", self.user_metadata.len())?, + } + } + + Ok(()) + } +} + /// Object(s) indexed by a CacheKey. #[derive(Debug, Default)] pub struct CacheKeyObjects(watch::Sender); @@ -481,6 +520,57 @@ struct CacheValue { obligated: bool, } +impl fmt::Display for CacheKeyObjects { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let indent = f.width().unwrap_or(0); + let inner = self.0.borrow(); + fmt::Display::fmt(&IndentWrapper(&*inner, indent), f) + } +} + +/// Helper to pass indent through Display since CacheKeyObjectsInner is private. +struct IndentWrapper<'a>(&'a CacheKeyObjectsInner, usize); + +impl fmt::Display for IndentWrapper<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let inner = self.0; + let indent = self.1; + let i = " ".repeat(indent); + + // Vary rules + writeln!(f, "{i}vary_rules:")?; + for rule in inner.vary_rules.iter() { + writeln!(f, "{i} - {rule}")?; + } + + // Objects (variants) + writeln!(f, "{i}variants:")?; + for (variant, value) in inner.objects.iter() { + writeln!(f, "{i} {variant}:")?; + if let Some(data) = &value.present { + write!(f, "{:indent$}", data, indent = indent + 4)?; + } else { + writeln!(f, "{i} (empty)")?; + } + writeln!(f, "{i} obligated: {}", value.obligated)?; + } + + Ok(()) + } +} + +impl fmt::Display for CacheData { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let indent = f.width().unwrap_or(0); + write!(f, "{:indent$}", self.meta, indent = indent)?; + let i = " ".repeat(indent); + if let Some(len) = self.length() { + writeln!(f, "{i}body_length: {len}")?; + } + Ok(()) + } +} + /// An obligation to fetch & update the cache. #[derive(Debug)] pub struct Obligation { diff --git a/src/cache/variance.rs b/src/cache/variance.rs index 3184e156..cbb3f1b7 100644 --- a/src/cache/variance.rs +++ b/src/cache/variance.rs @@ -9,7 +9,7 @@ //! The core cache API provides the bones of this. //! -use std::{collections::HashSet, str::FromStr}; +use std::{collections::HashSet, fmt, str::FromStr}; use bytes::{Bytes, BytesMut}; pub use http::HeaderName; @@ -84,6 +84,26 @@ pub struct Variant { signature: Bytes, } +impl fmt::Display for VaryRule { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.headers.is_empty() { + write!(f, "VaryRule([])") + } else { + let headers: Vec<&str> = self.headers.iter().map(|h| h.as_str()).collect(); + write!(f, "VaryRule([{}])", headers.join(", ")) + } + } +} + +impl fmt::Display for Variant { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match std::str::from_utf8(&self.signature) { + Ok(s) => write!(f, "Variant({s})"), + Err(_) => write!(f, "Variant({:?})", self.signature), + } + } +} + #[cfg(test)] mod tests { use super::VaryRule; diff --git a/src/component/backend.rs b/src/component/backend.rs index 137490c9..50238125 100644 --- a/src/component/backend.rs +++ b/src/component/backend.rs @@ -144,6 +144,7 @@ pub(crate) async fn register_dynamic_backend( grpc, client_cert, ca_certs, + handler: None, }; if !session.add_backend(name, new_backend) { diff --git a/src/component/shielding.rs b/src/component/shielding.rs index 9773c582..01a17cd0 100644 --- a/src/component/shielding.rs +++ b/src/component/shielding.rs @@ -28,6 +28,7 @@ pub(crate) fn backend_for_shield( grpc: false, client_cert: None, ca_certs: Vec::new(), + handler: None, }; if !session.add_backend(&new_name, new_backend) { diff --git a/src/config.rs b/src/config.rs index 360f7fb1..0b259d94 100644 --- a/src/config.rs +++ b/src/config.rs @@ -32,7 +32,10 @@ pub use crate::acl::Acls; /// Types and deserializers for backend configuration settings. mod backends; -pub use self::backends::{Backend, ClientCertError, ClientCertInfo}; +pub use self::backends::{ + Backend, ClientCertError, ClientCertInfo, DynamicBackendRegistrationInterceptor, Handler, + InMemoryBackendHandler, +}; pub type Backends = HashMap>; diff --git a/src/config/backends.rs b/src/config/backends.rs index 3a621036..a28dc8cd 100644 --- a/src/config/backends.rs +++ b/src/config/backends.rs @@ -1,12 +1,48 @@ mod client_cert_info; use { - hyper::{header::HeaderValue, Uri}, - std::{collections::HashMap, sync::Arc}, + crate::body::Body, + async_trait::async_trait, + hyper::{header::HeaderValue, Request, Response, Uri}, + std::{collections::HashMap, fmt, ops::Deref, sync::Arc}, }; pub use self::client_cert_info::{ClientCertError, ClientCertInfo}; +/// A trait for handling backend requests in-memory, without making real HTTP calls. +#[async_trait] +pub trait InMemoryBackendHandler: Send + Sync + 'static { + async fn handle(&self, req: Request) -> Response; +} + +/// A wrapper around an `InMemoryBackendHandler` trait object. +#[derive(Clone)] +pub struct Handler(Arc>); + +impl Handler { + pub fn new(handler: Box) -> Self { + Handler(Arc::new(handler)) + } +} + +impl Deref for Handler { + type Target = dyn InMemoryBackendHandler; + fn deref(&self) -> &Self::Target { + self.0.as_ref().as_ref() + } +} + +impl fmt::Debug for Handler { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Handler").finish_non_exhaustive() + } +} + +/// A trait for intercepting dynamic backend registration. +pub trait DynamicBackendRegistrationInterceptor: Send + Sync + 'static { + fn register(&self, backend: Backend) -> Backend; +} + /// A single backend definition. #[derive(Clone, Debug)] pub struct Backend { @@ -17,6 +53,7 @@ pub struct Backend { pub grpc: bool, pub client_cert: Option, pub ca_certs: Vec, + pub handler: Option, } /// A map of [`Backend`] definitions, keyed by their name. @@ -159,6 +196,7 @@ mod deserialization { client_cert, grpc, ca_certs, + handler: None, }) } } diff --git a/src/execute.rs b/src/execute.rs index 44ba1bf7..70e45df5 100644 --- a/src/execute.rs +++ b/src/execute.rs @@ -54,6 +54,9 @@ use { wasmtime_wasi::I32Exit, }; +use std::collections::BTreeMap; +use tokio::sync::RwLock; + pub const DEFAULT_EPOCH_INTERRUPTION_PERIOD: Duration = Duration::from_micros(50); const NEXT_REQ_PENDING_MAX: usize = 5; @@ -83,6 +86,55 @@ pub struct GuestProfileConfig { pub sample_period: Duration, } +/// A handle to a running guest computation. +pub struct GuestHandle { + pub(crate) handle: tokio::task::JoinHandle>, +} + +/// Wait for a guest to complete and return any error. +pub async fn run_to_completion(guest: GuestHandle) -> Option { + match guest.handle.await { + Ok(Ok(())) => None, + Ok(Err(ExecutionError::WasmTrap(e))) => Some(e), + Ok(Err(e)) => Some(anyhow::anyhow!("guest execution failed: {}", e)), + Err(e) => Some(anyhow::anyhow!("guest task panicked: {}", e)), + } +} + +/// Monitors logging endpoints by name, routing messages to registered listeners. +#[derive(Clone, Default)] +pub struct EndpointsMonitor { + pub endpoints: Arc, tokio::sync::mpsc::Sender>>>>, +} + +impl EndpointsMonitor { + /// Register a listener for the given endpoint name. + pub fn register_listener(&self, name: impl Into>) -> EndpointListener { + let (tx, rx) = tokio::sync::mpsc::channel(1024); + let name = name.into(); + let endpoints = self.endpoints.clone(); + // Use blocking_write since this is typically called from sync context during setup + endpoints.blocking_write().insert(name, tx); + EndpointListener { receiver: rx } + } +} + +/// Receives messages from a monitored logging endpoint. +pub struct EndpointListener { + receiver: tokio::sync::mpsc::Receiver>, +} + +impl EndpointListener { + /// Drain all currently available messages without blocking. + pub fn messages(&mut self) -> Vec> { + let mut msgs = Vec::new(); + while let Ok(msg) = self.receiver.try_recv() { + msgs.push(msg); + } + msgs + } +} + pub struct NextRequest(Option<(DownstreamRequest, Arc)>); impl NextRequest { @@ -159,6 +211,11 @@ pub struct ExecuteCtx { epoch_increment_stop: Arc, /// Configuration for guest profiling if enabled guest_profile_config: Option>, + /// Monitor for logging endpoints. + endpoints_monitor: EndpointsMonitor, + /// Optional interceptor for dynamic backend registration. + dynamic_backend_interceptor: + Option>, } impl ExecuteCtx { @@ -306,6 +363,8 @@ impl ExecuteCtx { guest_profile_config: guest_profile_config.map(|c| Arc::new(c)), cache: Arc::new(Cache::default()), pending_reuse: Arc::new(AsyncMutex::new(vec![])), + endpoints_monitor: EndpointsMonitor::default(), + dynamic_backend_interceptor: None, }; Ok(ExecuteCtxBuilder { inner }) @@ -425,7 +484,7 @@ impl ExecuteCtx { mut incoming_req: Request, local: SocketAddr, remote: SocketAddr, - ) -> Result<(Response, Option), Error> { + ) -> Result<(Response, Option, Option), Error> { let orig_req_on_upgrade = hyper::upgrade::on(&mut incoming_req); let (incoming_req_parts, incoming_req_body) = incoming_req.into_parts(); let local_pushpin_proxy_port = self.local_pushpin_proxy_port; @@ -449,7 +508,7 @@ impl ExecuteCtx { original_headers, }; - let (resp, mut err) = self.reuse_or_spawn_guest(req, metadata).await; + let (resp, mut err, guest_handle) = self.reuse_or_spawn_guest(req, metadata).await; let span = info_span!("request", id = req_id); let _span = span.enter(); @@ -472,7 +531,7 @@ impl ExecuteCtx { let resp = Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::from(hyper::Body::from(err.to_string())))?; - return Ok((resp, Some(err))); + return Ok((resp, Some(err), None)); } Some(port) => port, }; @@ -488,7 +547,7 @@ impl ExecuteCtx { .await; let (p, hyper_body) = proxy_resp.into_parts(); - return Ok((Response::from_parts(p, Body::from(hyper_body)), None)); + return Ok((Response::from_parts(p, Body::from(hyper_body)), None, None)); } Err(e) => { err = Some(e); @@ -496,7 +555,7 @@ impl ExecuteCtx { } } - Ok((resp, err)) + Ok((resp, err, guest_handle)) } /// Spawn a new guest to process a request whose processing was never attempted by @@ -509,7 +568,7 @@ impl ExecuteCtx { tokio::task::spawn(async move { let (sender, receiver) = oneshot::channel(); let original = std::mem::replace(&mut downstream.sender, sender); - let (resp, err) = self.spawn_guest(downstream, receiver).await; + let (resp, err, _guest_handle) = self.spawn_guest(downstream, receiver).await; let resp = guest_result_to_response(resp, err); let _ = original.send(DownstreamResponse::Http(resp)); }); @@ -531,7 +590,7 @@ impl ExecuteCtx { self: Arc, req: Request, metadata: DownstreamMetadata, - ) -> (Response, Option) { + ) -> (Response, Option, Option) { let (sender, receiver) = oneshot::channel(); let downstream = DownstreamRequest { req, @@ -549,9 +608,9 @@ impl ExecuteCtx { drop(reusable); if let Some(response) = Self::maybe_receive_response(receiver).await { - return response; + return (response.0, response.1, None); } - return (Response::default(), None); + return (Response::default(), None, None); } Err(nr) => next_req = nr, } @@ -569,7 +628,7 @@ impl ExecuteCtx { self: Arc, downstream: DownstreamRequest, receiver: oneshot::Receiver, - ) -> (Response, Option) { + ) -> (Response, Option, Option) { let active_cpu_time_us = Arc::new(AtomicU64::new(0)); // Spawn a separate task to run the guest code. That allows _this_ method to return a response early @@ -582,21 +641,27 @@ impl ExecuteCtx { )); if let Some(response) = Self::maybe_receive_response(receiver).await { - return response; + return ( + response.0, + response.1, + Some(GuestHandle { + handle: guest_handle, + }), + ); } match guest_handle .await .expect("guest worker finished without panicking") { - Ok(_) => (Response::new(Body::empty()), None), + Ok(_) => (Response::new(Body::empty()), None, None), Err(ExecutionError::WasmTrap(e)) => { event!( Level::ERROR, "There was an error handling the request {}", e.to_string() ); - (anyhow_response(&e), Some(e)) + (anyhow_response(&e), Some(e), None) } Err(e) => panic!("failed to run guest: {}", e), } @@ -850,10 +915,57 @@ impl ExecuteCtx { result } + /// Create a fork of this execution context, sharing the engine and compiled module + /// but with fresh per-test state (cache, request IDs, pending reuse). + pub fn fork(self: &Arc) -> ExecuteCtxBuilder { + ExecuteCtxBuilder { + inner: ExecuteCtx { + engine: self.engine.clone(), + instance_pre: self.instance_pre.clone(), + acls: self.acls.clone(), + backends: self.backends.clone(), + device_detection: self.device_detection.clone(), + geolocation: self.geolocation.clone(), + tls_config: self.tls_config.clone(), + dictionaries: self.dictionaries.clone(), + config_path: self.config_path.clone(), + capture_logs: self.capture_logs.clone(), + log_stdout: self.log_stdout, + log_stderr: self.log_stderr, + local_pushpin_proxy_port: self.local_pushpin_proxy_port, + object_store: self.object_store.clone(), + secret_stores: self.secret_stores.clone(), + shielding_sites: self.shielding_sites.clone(), + guest_profile_config: self.guest_profile_config.clone(), + // Fresh per-test state: + cache: Arc::new(Cache::default()), + next_req_id: Arc::new(AtomicU64::new(0)), + pending_reuse: Arc::new(AsyncMutex::new(vec![])), + // Don't own the epoch thread (parent owns it) + epoch_increment_thread: None, + epoch_increment_stop: self.epoch_increment_stop.clone(), + // New interceptor. + dynamic_backend_interceptor: None, + // New endpoints monitor. + endpoints_monitor: EndpointsMonitor::default(), + }, + } + } + pub fn cache(&self) -> &Arc { &self.cache } + pub fn endpoints_monitor(&self) -> &EndpointsMonitor { + &self.endpoints_monitor + } + + pub fn dynamic_backend_interceptor( + &self, + ) -> Option<&dyn crate::config::DynamicBackendRegistrationInterceptor> { + self.dynamic_backend_interceptor.as_deref() + } + pub fn config_path(&self) -> Option<&Path> { self.config_path.as_deref() } @@ -944,6 +1056,27 @@ impl ExecuteCtxBuilder { self } + /// Set the dynamic backend registration interceptor. + pub fn with_dynamic_backend_interceptor( + mut self, + interceptor: Arc, + ) -> Self { + self.inner.dynamic_backend_interceptor = Some(interceptor); + self + } + + /// Set the endpoints monitor for this execution context. + pub fn with_endpoints(mut self, endpoints_monitor: EndpointsMonitor) -> Self { + self.inner.endpoints_monitor = endpoints_monitor; + self + } + + /// Set the cache for this execution context. + pub fn with_cache(mut self, cache: Arc) -> Self { + self.inner.cache = cache; + self + } + /// Set the path to the config for this execution context. pub fn with_config_path(mut self, config_path: PathBuf) -> Self { self.inner.config_path = Some(config_path); diff --git a/src/lib.rs b/src/lib.rs index 1c987b4d..c4a3ef0b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,7 +41,18 @@ mod streaming_body; mod upstream; pub mod wiggle_abi; +pub use async_trait; +pub use http; +pub use hyper; pub use { - error::Error, execute::ExecuteCtx, execute::GuestProfileConfig, service::ViceroyService, - upstream::BackendConnector, wasmtime::ProfilingStrategy, + cache::Cache, + error::Error, + execute::{ + run_to_completion, EndpointListener, EndpointsMonitor, ExecuteCtx, ExecuteCtxBuilder, + GuestHandle, GuestProfileConfig, + }, + object_store::{ObjectKey, ObjectStoreKey}, + service::ViceroyService, + upstream::BackendConnector, + wasmtime::ProfilingStrategy, }; diff --git a/src/logging.rs b/src/logging.rs index 21f0baf4..665e135a 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -11,6 +11,7 @@ use tokio::io::AsyncWrite; pub struct LogEndpoint { name: Vec, writer: Arc>, + channel_sender: Option>>, } impl LogEndpoint { @@ -20,9 +21,16 @@ impl LogEndpoint { LogEndpoint { name: name.to_owned(), writer, + channel_sender: None, } } + /// Attach a channel sender to this endpoint for monitoring. + pub fn with_channel(mut self, sender: tokio::sync::mpsc::Sender>) -> Self { + self.channel_sender = Some(sender); + self + } + /// Write a log entry to this endpoint. /// /// Log entries are prefixed with the endpoint name and terminated with a newline. @@ -42,6 +50,12 @@ impl LogEndpoint { return Ok(()); } + // If a channel sender is present, send the raw message bytes via the channel + if let Some(sender) = &self.channel_sender { + let _ = sender.try_send(msg.to_vec()); + return Ok(()); + } + // Accumulate log entry into a buffer before writing, while escaping newlines let mut to_write = Vec::with_capacity(msg.len() + self.name.len() + LOG_ENDPOINT_DELIM.len() + 1); diff --git a/src/session.rs b/src/session.rs index edded562..d9c1c61e 100644 --- a/src/session.rs +++ b/src/session.rs @@ -583,7 +583,13 @@ impl Session { if let Some(handle) = self.log_endpoints_by_name.get(name).copied() { return handle; } - let endpoint = LogEndpoint::new(name, self.capture_logs.clone()); + let mut endpoint = LogEndpoint::new(name, self.capture_logs.clone()); + // Check if the endpoints monitor has a registered sender for this name + if let Ok(endpoints) = self.ctx.endpoints_monitor().endpoints.try_read() { + if let Some(sender) = endpoints.get(name).cloned() { + endpoint = endpoint.with_channel(sender); + } + } let handle = self.log_endpoints.push(endpoint); self.log_endpoints_by_name.insert(name.to_owned(), handle); handle @@ -649,6 +655,12 @@ impl Session { return false; } + let info = if let Some(interceptor) = self.ctx.dynamic_backend_interceptor() { + interceptor.register(info) + } else { + info + }; + self.dynamic_backends .insert(name.to_string(), Arc::new(info)); diff --git a/src/upstream.rs b/src/upstream.rs index e796483c..e1a65ff6 100644 --- a/src/upstream.rs +++ b/src/upstream.rs @@ -336,34 +336,40 @@ pub fn send_request( *req.uri_mut() = uri; let h2only = backend.grpc; + let handler = backend.handler.clone(); async move { - let mut builder = Client::builder(); + let basic_response = if let Some(handler) = &handler { + handler.handle(req).await + } else { + let mut builder = Client::builder(); - if req.version() == Version::HTTP_2 { - builder.http2_only(true); - } + if req.version() == Version::HTTP_2 { + builder.http2_only(true); + } - let is_pass = req - .extensions() - .get::() - .map(CacheOverride::is_pass) - .unwrap_or_default(); - - let mut basic_response = builder - .set_host(false) - .http2_only(h2only) - .build(connector) - .request(req) - .await - .map_err(|e| { - eprintln!("Error: {:?}", e); - e - })?; - - if let Some(md) = basic_response.extensions_mut().get_mut::() { - // This is used later to create similar behaviour between Compute and Viceroy. - md.direct_pass = is_pass; - } + let is_pass = req + .extensions() + .get::() + .map(CacheOverride::is_pass) + .unwrap_or_default(); + + let mut resp = builder + .set_host(false) + .http2_only(h2only) + .build(connector) + .request(req) + .await + .map_err(|e| { + eprintln!("Error: {:?}", e); + e + })?; + + if let Some(md) = resp.extensions_mut().get_mut::() { + md.direct_pass = is_pass; + } + + resp + }; if try_decompression && basic_response diff --git a/src/wiggle_abi/req_impl.rs b/src/wiggle_abi/req_impl.rs index 4f010e68..e3cfe980 100644 --- a/src/wiggle_abi/req_impl.rs +++ b/src/wiggle_abi/req_impl.rs @@ -564,6 +564,7 @@ impl FastlyHttpReq for Session { grpc, client_cert, ca_certs, + handler: None, }; if !self.add_backend(&name, new_backend) { diff --git a/src/wiggle_abi/shielding.rs b/src/wiggle_abi/shielding.rs index a68a822e..cc5ab9d9 100644 --- a/src/wiggle_abi/shielding.rs +++ b/src/wiggle_abi/shielding.rs @@ -95,6 +95,7 @@ impl fastly_shielding::FastlyShielding for Session { grpc: false, client_cert: None, ca_certs: Vec::new(), + handler: None, }; if !self.add_backend(&new_name, new_backend) {