diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..6fdc147e Binary files /dev/null and b/.DS_Store differ diff --git a/Cargo.lock b/Cargo.lock index 03aa88fc..b2d7f176 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2263,6 +2263,7 @@ name = "viceroy-lib" version = "0.9.4" dependencies = [ "anyhow", + "async-trait", "bytes", "bytesize", "cfg-if", diff --git a/Cargo.toml b/Cargo.toml index e5b15096..b6d09a92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,20 +1,15 @@ [workspace] -members = [ - "cli", - "lib", -] +members = ["cli", "lib"] resolver = "2" # Exclude our integration test fixtures, which need to be compiled to wasm # (managed by the Makefile) -exclude = [ - "test-fixtures", -] +exclude = ["test-fixtures"] # Specify `cli` as the default workspace member to operate on. This means that # commands like `cargo run` will run the CLI binary by default. # See: https://doc.rust-lang.org/cargo/reference/workspaces.html#package-selection -default-members = [ "cli" ] +default-members = ["cli"] [profile.dev] # Since some of the integration tests involve compiling Wasm, a little optimization goes a long way @@ -36,6 +31,7 @@ tracing = "0.1.37" tracing-futures = "0.2.5" futures = "0.3.24" url = "2.3.1" +async-trait = "0.1.74" # Wasmtime dependencies wasi-common = "13.0.0" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 98fdaebe..3bcdbf2e 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -15,14 +15,14 @@ categories = [ "development-tools", "network-programming", "simulation", - "wasm" + "wasm", ] include = [ - "../README.md", - "../CHANGELOG.md", - "../SECURITY.md", - "../doc/logo.png", - "src/**/*" + "../README.md", + "../CHANGELOG.md", + "../SECURITY.md", + "../doc/logo.png", + "src/**/*", ] [[bin]] @@ -38,7 +38,12 @@ serde_json = { workspace = true } clap = { workspace = true } rustls = { workspace = true } rustls-pemfile = { workspace = true } -tls-listener = { version = "^0.7.0", features = ["rustls", "hyper-h1", "tokio-net", "rt"] } +tls-listener = { version = "^0.7.0", features = [ + "rustls", + "hyper-h1", + "tokio-net", + "rt", +] } tokio = { workspace = true } tokio-rustls = { workspace = true } tracing = { workspace = true } diff --git a/cli/tests/integration/common.rs b/cli/tests/integration/common.rs index b1a4b498..e5a4275e 100644 --- a/cli/tests/integration/common.rs +++ b/cli/tests/integration/common.rs @@ -358,8 +358,8 @@ impl Test { .await .map(|result| { match result { - (resp, None) => resp, - (_, Some(err)) => { + (resp, None, _) => resp, + (_, Some(err), _) => { // Splat the string representation of the runtime error into a synthetic // 500. This is a bit of a hack, but good enough to check for expected error // strings. diff --git a/cli/tests/integration/common/backends.rs b/cli/tests/integration/common/backends.rs index e2e284d8..538433dd 100644 --- a/cli/tests/integration/common/backends.rs +++ b/cli/tests/integration/common/backends.rs @@ -75,6 +75,7 @@ impl TestBackends { use_sni: backend.use_sni, grpc: false, client_cert: None, + handler: None, }; backends.insert(name.to_string(), Arc::new(backend_config)); } diff --git a/lib/Cargo.toml b/lib/Cargo.toml index b98b4a76..d75be487 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -9,12 +9,7 @@ documentation = "https://docs.rs/viceroy-lib" homepage = "https://github.com/fastly/Viceroy" repository = "https://github.com/fastly/Viceroy" keywords = ["wasm", "fastly"] -categories = [ - "development-tools", - "network-programming", - "simulation", - "wasm" -] +categories = ["development-tools", "network-programming", "simulation", "wasm"] include = [ "../CHANGELOG.md", "../SECURITY.md", @@ -57,6 +52,7 @@ wasmtime = { workspace = true } wasmtime-wasi = { workspace = true } wasmtime-wasi-nn = { workspace = true } wiggle = { workspace = true } +async-trait.workspace = true [dev-dependencies] tempfile = "3.6.0" diff --git a/lib/src/config.rs b/lib/src/config.rs index 370c7a24..4e584692 100644 --- a/lib/src/config.rs +++ b/lib/src/config.rs @@ -29,7 +29,10 @@ pub type Dictionaries = HashMap; /// Types and deserializers for backend configuration settings. mod backends; -pub use self::backends::{Backend, ClientCertError, ClientCertInfo}; +pub use self::backends::{ + Backend, ClientCertError, ClientCertInfo, DynamicBackendRegistrationInterceptor, Handler, + InMemoryBackendHandler, +}; pub type Backends = HashMap>; @@ -46,7 +49,7 @@ pub use self::geolocation::Geolocation; /// Types and deserializers for object store configuration settings. mod object_store; -pub use crate::object_store::ObjectStores; +pub use crate::object_store::{ObjectKey, ObjectStoreKey, ObjectStores}; /// Types and deserializers for secret store configuration settings. mod secret_store; diff --git a/lib/src/config/backends.rs b/lib/src/config/backends.rs index 4c1bfd7a..86c83fe4 100644 --- a/lib/src/config/backends.rs +++ b/lib/src/config/backends.rs @@ -1,5 +1,8 @@ mod client_cert_info; +use async_trait::async_trait; +use http::{Request, Response}; +use hyper::Body; use { hyper::{header::HeaderValue, Uri}, std::{collections::HashMap, sync::Arc}, @@ -16,6 +19,47 @@ pub struct Backend { pub use_sni: bool, pub grpc: bool, pub client_cert: Option, + + /// Handler that will be called instead of making an HTTP call. + pub handler: Option, +} + +#[derive(Clone)] +pub struct Handler { + handler: Arc>, +} + +impl Handler { + pub fn new(handler: Box) -> Self { + Self { + handler: Arc::new(handler), + } + } +} + +impl std::ops::Deref for Handler { + type Target = dyn InMemoryBackendHandler; + + fn deref(&self) -> &Self::Target { + &**self.handler + } +} + +impl std::fmt::Debug for Handler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Handler") + .field("handler", &"opaque handler function".to_string()) + .finish() + } +} + +#[async_trait] +pub trait InMemoryBackendHandler: Send + Sync + 'static { + async fn handle(&self, req: Request) -> Response; +} + +pub trait DynamicBackendRegistrationInterceptor: Send + Sync + 'static { + fn register(&self, backend: Backend) -> Backend; } /// A map of [`Backend`] definitions, keyed by their name. @@ -149,6 +193,7 @@ mod deserialization { grpc, // NOTE: Update when we support client certs in static backends client_cert: None, + handler: None, }) } } diff --git a/lib/src/error.rs b/lib/src/error.rs index a7ed0f62..7b8fcb77 100644 --- a/lib/src/error.rs +++ b/lib/src/error.rs @@ -253,6 +253,10 @@ pub enum HandleError { #[error("Invalid body handle: {0}")] InvalidBodyHandle(crate::wiggle_abi::types::BodyHandle), + /// A cache handle was not valid. + #[error("Invalid cache handle: {0}")] + InvalidCacheHandle(crate::wiggle_abi::types::CacheHandle), + /// A logging endpoint handle was not valid. #[error("Invalid endpoint handle: {0}")] InvalidEndpointHandle(crate::wiggle_abi::types::EndpointHandle), diff --git a/lib/src/execute.rs b/lib/src/execute.rs index bfbec0fe..eed265f1 100644 --- a/lib/src/execute.rs +++ b/lib/src/execute.rs @@ -1,11 +1,15 @@ //! Guest code execution. -use std::time::SystemTime; - +use crate::{ + config::{DynamicBackendRegistrationInterceptor, UnknownImportBehavior}, + in_memory_cache::InMemoryCache, +}; +use std::{collections::BTreeMap, sync::RwLock, time::SystemTime}; +use tokio::{ + sync::mpsc::{Receiver, Sender as MspcSender}, + task::JoinHandle as TokioJoinHandle, +}; use wasmtime::GuestProfiler; - -use crate::config::UnknownImportBehavior; - use { crate::{ body::Body, @@ -36,6 +40,11 @@ use { }; pub const EPOCH_INTERRUPTION_PERIOD: Duration = Duration::from_micros(50); + +pub struct GuestHandle { + pub(crate) handle: TokioJoinHandle>, +} + /// Execution context used by a [`ViceroyService`](struct.ViceroyService.html). /// /// This is all of the state needed to instantiate a module, in order to respond to an HTTP @@ -51,6 +60,8 @@ pub struct ExecuteCtx { module: Module, /// The backends for this execution. backends: Arc, + /// If set, intercepts dynamic backend registrations. + dynamic_backend_interceptor: Option>>, /// The device detection mappings for this execution. device_detection: Arc, /// The geolocation mappings for this execution. @@ -78,6 +89,52 @@ pub struct ExecuteCtx { /// this must refer to a directory, while in run mode it names /// a file. guest_profile_path: Arc>, + /// Endpoints to monitor for messages. + endpoints_monitor: EndpointsMonitor, + /// Cache to use for this execution. + cache: InMemoryCache, +} + +/// A monitor for logging endpoints. Messages written to the contained endpoints will instead be +/// directed at the wrapped channel, allowing to programmatically access the logs on the receiver side. +#[derive(Clone, Default)] +pub struct EndpointsMonitor { + pub endpoints: Arc, MspcSender>>>>, +} + +impl EndpointsMonitor { + pub fn new() -> Self { + Self { + endpoints: Arc::new(RwLock::new(BTreeMap::new())), + } + } + + /// Registers a listener for the given endpoint name. + /// Any messages the invocation sends to this endpoint will be captured by this listener. + pub fn register_listener>>(&self, name: T) -> EndpointListener { + let (sender, receiver) = tokio::sync::mpsc::channel(1000); // todo arbitrary limit right now. + self.endpoints.write().unwrap().insert(name.into(), sender); + + EndpointListener { receiver } + } +} + +pub struct EndpointListener { + receiver: Receiver>, +} + +impl EndpointListener { + /// Drains and returns all (raw) messages from this listener. + /// Will _not_ block and wait for messages. + pub fn messages(&mut self) -> Vec> { + let mut messages = vec![]; + + while let Ok(msg) = self.receiver.try_recv() { + messages.push(msg); + } + + messages + } } impl ExecuteCtx { @@ -133,6 +190,9 @@ impl ExecuteCtx { epoch_increment_thread, epoch_increment_stop, guest_profile_path: Arc::new(guest_profile_path), + endpoints_monitor: EndpointsMonitor::new(), + dynamic_backend_interceptor: None, + cache: InMemoryCache::new(), }) } @@ -230,6 +290,25 @@ impl ExecuteCtx { &self.tls_config } + /// Set the endpoints for this execution context. + pub fn with_endpoints(mut self, endpoints: EndpointsMonitor) -> Self { + self.endpoints_monitor = endpoints; + self + } + + pub fn with_dynamic_backend_interceptor( + mut self, + interceptor: Box, + ) -> Self { + self.dynamic_backend_interceptor = Some(Arc::new(interceptor)); + self + } + + pub fn with_cache(mut self, cache: InMemoryCache) -> Self { + self.cache = cache; + self + } + /// Asynchronously handle a request. /// /// This method fully instantiates the wasm module housed within the `ExecuteCtx`, @@ -258,7 +337,14 @@ impl ExecuteCtx { self, incoming_req: Request, remote: IpAddr, - ) -> Result<(Response, Option), Error> { + ) -> Result< + ( + Response, + Option, + Option, // TODO: Just hiding the error for now, need this to work. + ), + Error, + > { let req = prepare_request(incoming_req)?; let (sender, receiver) = oneshot::channel(); @@ -274,12 +360,18 @@ impl ExecuteCtx { ); let resp = match receiver.await { - Ok(resp) => (resp, None), + Ok(resp) => ( + resp, + None, + Some(GuestHandle { + handle: guest_handle, + }), + ), Err(_) => match guest_handle .await .expect("guest worker finished without panicking") { - Ok(_) => (Response::new(Body::empty()), None), + Ok(_) => (Response::new(Body::empty()), None, None), Err(ExecutionError::WasmTrap(_e)) => { event!( Level::ERROR, @@ -291,7 +383,7 @@ impl ExecuteCtx { .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) .body(Body::empty()) .unwrap(); - (response, Some(_e)) + (response, Some(_e), None) } Err(e) => panic!("failed to run guest: {}", e), }, @@ -314,6 +406,7 @@ impl ExecuteCtx { .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) .body(Body::from(body.as_bytes())) .unwrap() + .into() } }; @@ -342,6 +435,9 @@ impl ExecuteCtx { self.config_path.clone(), self.object_store.clone(), self.secret_stores.clone(), + self.endpoints_monitor.clone(), + self.dynamic_backend_interceptor.clone(), + self.cache.clone(), ); let guest_profile_path = self.guest_profile_path.as_deref().map(|path| { @@ -437,6 +533,9 @@ impl ExecuteCtx { self.config_path.clone(), self.object_store.clone(), self.secret_stores.clone(), + self.endpoints_monitor.clone(), + self.dynamic_backend_interceptor.clone(), + self.cache.clone(), ); let profiler = self.guest_profile_path.is_some().then(|| { @@ -481,6 +580,29 @@ impl ExecuteCtx { result } + + /// Runs a guest handle to completion and returns any error that might have occurred. + /// This is useful when a guest returns a response early, but later processing errors, + /// which should be surfaced to the calling code. + pub async fn run_to_completion(handle: GuestHandle) -> Option { + match handle + .handle + .await + .expect("guest worker finished without panicking") + { + Ok(_) => None, + Err(ExecutionError::WasmTrap(e)) => { + event!( + Level::ERROR, + "There was an error running the guest to completion {}", + e.to_string() + ); + + Some(e) + } + Err(e) => panic!("failed to run guest to completion: {}", e), + } + } } fn write_profile(store: &mut wasmtime::Store, guest_profile_path: Option<&PathBuf>) { diff --git a/lib/src/in_memory_cache.rs b/lib/src/in_memory_cache.rs new file mode 100644 index 00000000..97778d55 --- /dev/null +++ b/lib/src/in_memory_cache.rs @@ -0,0 +1,424 @@ +use crate::{wiggle_abi::types, Error}; +use cranelift_entity::PrimaryMap; +use http::{request::Parts, HeaderMap}; +use std::{ + collections::{btree_map::Entry, BTreeMap, HashMap, HashSet}, + fmt::Display, + sync::{Arc, RwLock}, + time::Instant, +}; +use tracing::{event, Level}; + +/// Handle used for a non-existing cache entry. +pub fn not_found_handle() -> types::CacheHandle { + types::CacheHandle::from(u32::MAX) +} + +type PrimaryCacheKey = Vec; + +#[derive(Clone, Default, Debug)] +pub struct InMemoryCache { + /// Cache entries, indexable by handle. + /// `None` indicates a deleted entry OR a tx marker. + pub cache_entries: Arc>>>, + + /// Primary cache key to a list of variants. + pub key_candidates: Arc>>>, + + // CacheHandle markers for pending transactions. Since we need to retrieve the cache key for + // a tx insert, this map maps the pending handle the the key used to look it up. + pub pending_tx: Arc>>, +} + +impl InMemoryCache { + pub fn new() -> Self { + Default::default() + } + + pub fn purge(&self, surrogates: Vec) { + let mut cache_entries = self.cache_entries.write().unwrap(); + let surrogates_to_purge: HashSet = surrogates.into_iter().collect(); + + // Simplistic implementation: Just go over all cache entries and kick out matching ones. + for (_handle, handle_entry) in cache_entries.iter_mut() { + if let Some(entry) = handle_entry { + if entry + .surrogate_keys + .intersection(&surrogates_to_purge) + .count() + != 0 + { + // Drop the content of the cache entry. + handle_entry.take(); + } + } + } + } + + /// Attempts to retrieve a cache entry by primary key. + /// Matches vary rules against the given headers to retrieve the correct variant. + pub fn get_entry( + &self, + key: &PrimaryCacheKey, + headers: &HeaderMap, + ) -> Option { + let candidates_lock = self.key_candidates.read().unwrap(); + candidates_lock.get(key).and_then(|candidates| { + let entry_lock = self.cache_entries.write().unwrap(); + + candidates.iter().find_map(|candidate_handle| { + entry_lock + .get(*candidate_handle) + .and_then(|candidate_entry| { + candidate_entry.as_ref().and_then(|entry| { + entry.vary_matches(headers).then(|| *candidate_handle) + }) + }) + }) + }) + } + + pub fn insert<'a>( + &self, + key: PrimaryCacheKey, + options_mask: types::CacheWriteOptionsMask, + options: types::CacheWriteOptions, + request_parts: Option<&Parts>, + ) -> Result { + // Cache writes must contain max-age. + let max_age_ns = options.max_age_ns; + + // Example on how the bitmask works: The data the guest gives us via the ABI (here, `CacheWriteOptions`) + // doesn't have option semantics, so we can't distinguish between unset and zero values, + // We check the bitmask for if it's set, else we'd always get `Some(0)` here. + let swr_ns = options_mask + .contains(types::CacheWriteOptionsMask::STALE_WHILE_REVALIDATE_NS) + .then(|| options.stale_while_revalidate_ns); + + let surrogate_keys = if options_mask.contains(types::CacheWriteOptionsMask::SURROGATE_KEYS) + { + // Unclear if needed. + // if options.surrogate_keys_len == 0 { + // return Err(Error::InvalidArgument); + // } + + let byte_slice = options + .surrogate_keys_ptr + .as_array(options.surrogate_keys_len) + .to_vec()?; + + match String::from_utf8(byte_slice) { + Ok(s) => s + .split_whitespace() + .map(ToOwned::to_owned) + .collect::>(), + + Err(_) => return Err(Error::InvalidArgument), + } + } else { + vec![] + }; + + let user_metadata = if options_mask.contains(types::CacheWriteOptionsMask::USER_METADATA) { + if options.user_metadata_len == 0 { + return Err(Error::InvalidArgument); + } + + let byte_slice = options + .user_metadata_ptr + .as_array(options.user_metadata_len) + .to_vec()?; + + byte_slice + } else { + vec![] + }; + + let vary = if options_mask.contains(types::CacheWriteOptionsMask::VARY_RULE) { + if options.vary_rule_len == 0 { + return Err(Error::InvalidArgument); + } + + let byte_slice = options + .vary_rule_ptr + .as_array(options.vary_rule_len) + .to_vec()?; + + let vary_rules = match String::from_utf8(byte_slice) { + Ok(s) => s + .split_whitespace() + .map(ToOwned::to_owned) + .collect::>(), + + Err(_) => return Err(Error::InvalidArgument), + }; + + if let Some(req_parts) = request_parts { + let mut map = BTreeMap::new(); + + // Extract necessary vary headers. + for vary in vary_rules { + // Dear reader, if you think this sucks... then you'd be right. + // (Just supposed to work right now) + let value = req_parts + .headers + .get(&vary) + .map(|h| h.to_str().unwrap().to_string()); + + map.insert(vary, value); + } + + map + } else { + // Or invalid argument? + BTreeMap::new() + } + } else { + BTreeMap::new() + }; + + let initial_age_ns = options_mask + .contains(types::CacheWriteOptionsMask::INITIAL_AGE_NS) + .then(|| options.initial_age_ns); + + let entry = CacheEntry { + key: key.clone(), + body_bytes: vec![], + vary, + initial_age_ns, + max_age_ns: Some(max_age_ns), + swr_ns: swr_ns, + created_at: Instant::now(), + user_metadata, + surrogate_keys: surrogate_keys.into_iter().collect(), + }; + + // Check for and perform overwrites or write a new entry. + let existing_entry_handle = self.get_entry( + &key, + request_parts + .map(|p| &p.headers) + .unwrap_or(&HeaderMap::new()), + ); + + let entry_handle = match existing_entry_handle { + Some(handle) => { + // Overwrite entry. + event!(Level::TRACE, "Overwriting cache entry {}", handle); + + self.cache_entries + .write() + .unwrap() + .get_mut(handle) + .map(|old_entry| old_entry.replace(entry)); + + handle + } + + None => { + // Write new entry. + let new_entry_handle = self.cache_entries.write().unwrap().push(Some(entry)); + event!(Level::TRACE, "Wrote new cache entry {}", new_entry_handle); + + // Write handle key candidate mapping. + match self.key_candidates.write().unwrap().entry(key) { + Entry::Vacant(vacant) => { + vacant.insert(vec![new_entry_handle]); + } + Entry::Occupied(mut occupied) => { + occupied.get_mut().push(new_entry_handle); + } + } + + new_entry_handle + } + }; + + Ok(entry_handle) + } +} + +#[derive(Debug)] +pub struct CacheEntry { + /// Key the entry was created with. + /// Here for convenience, entries are retrieved via handles. + pub key: PrimaryCacheKey, + + /// The raw bytes of the cached entry. + pub body_bytes: Vec, + + // The cached bytes. + // pub body_handle: types::BodyHandle, + /// Vary used to create this entry. + pub vary: BTreeMap>, + + /// Surrogates attached to this cache entry. + pub surrogate_keys: HashSet, + + /// Initial age of the cache entry. + pub initial_age_ns: Option, + + /// Max-age this entry has been created with. + pub max_age_ns: Option, + + /// Stale-while-revalidate this entry has been created with. + pub swr_ns: Option, + + /// Instant this entry has been created. + pub created_at: Instant, + + /// The user metadata stored alongside the cache. + pub user_metadata: Vec, +} + +impl CacheEntry { + pub fn vary_matches(&self, headers: &HeaderMap) -> bool { + self.vary.iter().all(|(vary_key, vary_value)| { + headers.get(vary_key).and_then(|h| h.to_str().ok()) == vary_value.as_deref() + }) + } + + pub fn age_ns(&self) -> u64 { + self.created_at + .elapsed() + .as_nanos() + .try_into() + .ok() + .and_then(|age_ns: u64| age_ns.checked_add(self.initial_age_ns.unwrap_or(0))) + .unwrap_or(u64::MAX) + } + + /// Stale: Is within max-age + ttl, but only if there's an swr given. + pub fn is_stale(&self) -> bool { + let age = self.age_ns(); + let total_ttl = self.total_ttl_ns(); + + match (self.max_age_ns, self.swr_ns) { + (Some(max_age), Some(_)) => age > max_age && age < total_ttl, + _ => false, + } + } + + /// Usable: Age is smaller than max-age + swr. + pub fn is_usable(&self) -> bool { + let total_ttl = self.total_ttl_ns(); + let age = self.age_ns(); + + age < total_ttl + } + + /// Max-age + swr of the cache entry. + fn total_ttl_ns(&self) -> u64 { + let mut total_ttl = 0; + if let Some(max_age) = self.max_age_ns { + total_ttl += max_age; + }; + + if let Some(swr) = self.swr_ns { + total_ttl += swr; + }; + + total_ttl + } +} + +pub const NS_TO_S_FACTOR: u64 = 1_000_000_000; + +impl Display for InMemoryCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "Cache State:")?; + writeln!(f, "{}Entries:", Self::indent(1))?; + + for (handle, entry) in self.cache_entries.read().unwrap().iter() { + match entry { + Some(entry) => self.fmt_entry(f, handle, entry)?, + None => writeln!(f, "{}[{}]: Purged", Self::indent(2), handle,)?, + } + } + + Ok(()) + } +} + +impl InMemoryCache { + fn indent(level: usize) -> String { + " ".repeat(level) + } + + fn fmt_entry( + &self, + f: &mut std::fmt::Formatter<'_>, + handle: types::CacheHandle, + entry: &CacheEntry, + ) -> std::fmt::Result { + writeln!( + f, + "{}[{}]: {}", + Self::indent(2), + handle, + entry + .key + .iter() + .map(|x| format!("{x:X}")) + .collect::>() + .join("") + )?; + + writeln!(f, "{}TTLs (in seconds):", Self::indent(3),)?; + writeln!( + f, + "{}Age: {}", + Self::indent(4), + entry.age_ns() / NS_TO_S_FACTOR + )?; + writeln!( + f, + "{}Inital age: {:?}", + Self::indent(4), + entry.initial_age_ns.map(|ia| ia / NS_TO_S_FACTOR) + )?; + writeln!( + f, + "{}Max-age: {:?}", + Self::indent(4), + entry.max_age_ns.map(|x| x / NS_TO_S_FACTOR) + )?; + writeln!( + f, + "{}Swr: {:?}", + Self::indent(4), + entry.swr_ns.map(|x| x / NS_TO_S_FACTOR) + )?; + + writeln!(f, "{}Vary:", Self::indent(3))?; + for (key, value) in entry.vary.iter() { + writeln!(f, "{}{key}: {:?}", Self::indent(4), value)?; + } + + writeln!(f, "{}Surrogate keys:", Self::indent(3))?; + let mut surrogates: Vec<&String> = entry.surrogate_keys.iter().collect(); + surrogates.sort(); + + for surrogate in surrogates { + writeln!(f, "{}{surrogate}", Self::indent(4))?; + } + + writeln!(f, "{}User Metadata:", Self::indent(3))?; + writeln!( + f, + "{}{}", + Self::indent(4), + std::str::from_utf8(&entry.user_metadata).unwrap_or("Invalid UITF-8") + )?; + + writeln!(f, "{}Body:", Self::indent(3))?; + writeln!( + f, + "{}{}", + Self::indent(4), + std::str::from_utf8(&entry.body_bytes).unwrap_or("") + )?; + + Ok(()) + } +} diff --git a/lib/src/lib.rs b/lib/src/lib.rs index a2383aff..4e21d49d 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -23,6 +23,7 @@ mod async_io; mod downstream; mod execute; mod headers; +mod in_memory_cache; mod linking; mod object_store; mod secret_store; @@ -32,6 +33,12 @@ mod upstream; mod wiggle_abi; pub use { - error::Error, execute::ExecuteCtx, service::ViceroyService, upstream::BackendConnector, + async_trait, + error::Error, + execute::{EndpointListener, EndpointsMonitor, ExecuteCtx, GuestHandle}, + http, hyper, + in_memory_cache::InMemoryCache, + service::ViceroyService, + upstream::BackendConnector, wasmtime::ProfilingStrategy, }; diff --git a/lib/src/linking.rs b/lib/src/linking.rs index 24fd90a6..a4de2f3e 100644 --- a/lib/src/linking.rs +++ b/lib/src/linking.rs @@ -132,13 +132,13 @@ fn make_wasi_ctx(ctx: &ExecuteCtx, session: &Session) -> Result); +/// A logging endpoint. +pub struct LogEndpoint { + name: Vec, + sender: Option>>, +} lazy_static! { /// The underlying writer to use for all log messages. It defaults to `stdout`, @@ -18,9 +22,12 @@ lazy_static! { } impl LogEndpoint { - /// Allocate a new `LogEndpoint` with the given name. - pub fn new(name: &[u8]) -> LogEndpoint { - LogEndpoint(name.to_owned()) + /// Allocate a new `LogEndpoint` with the given name and optional sender. + pub fn new(name: &[u8], sender: Option>>) -> LogEndpoint { + LogEndpoint { + name: name.to_owned(), + sender, + } } /// Write a log entry to this endpoint. @@ -43,11 +50,18 @@ impl LogEndpoint { } // Accumulate log entry into a buffer before writing, while escaping newlines - let mut to_write = - Vec::with_capacity(msg.len() + self.0.len() + LOG_ENDPOINT_DELIM.len() + 1); + // A listener just takes the messages line by line, while the stdout format is with delimiters. + let mut to_write = if self.sender.is_some() { + Vec::with_capacity(msg.len()) + } else { + let buf_len = msg.len() + self.name.len() + LOG_ENDPOINT_DELIM.len() + 1; + let mut buf = Vec::with_capacity(buf_len); + + buf.extend_from_slice(&self.name); + buf.extend_from_slice(LOG_ENDPOINT_DELIM); + buf + }; - to_write.extend_from_slice(&self.0); - to_write.extend_from_slice(LOG_ENDPOINT_DELIM); for &byte in msg { if byte == b'\n' { to_write.extend_from_slice(br"\n"); @@ -57,7 +71,12 @@ impl LogEndpoint { } to_write.push(b'\n'); - LOG_WRITER.lock().unwrap().write_all(&to_write) + if let Some(ref sender) = self.sender { + sender.try_send(to_write).expect("todo"); + Ok(()) + } else { + LOG_WRITER.lock().unwrap().write_all(&to_write) + } } } diff --git a/lib/src/session.rs b/lib/src/session.rs index 41b9d866..e626a468 100644 --- a/lib/src/session.rs +++ b/lib/src/session.rs @@ -7,6 +7,10 @@ pub use async_item::{ AsyncItem, PeekableTask, PendingKvDeleteTask, PendingKvInsertTask, PendingKvLookupTask, }; +use crate::{ + config::DynamicBackendRegistrationInterceptor, execute::EndpointsMonitor, + in_memory_cache::InMemoryCache, +}; use { self::downstream::DownstreamResponse, crate::{ @@ -85,9 +89,14 @@ pub struct Session { /// Populated prior to guest execution, and never modified. geolocation: Arc, /// The backends dynamically added by the program. This is separated from - /// `backends` because we do not want one session to effect the backends + /// `backends` because we do not want one session to affect the backends /// available to any other session. dynamic_backends: Backends, + /// If set, intercepts dynamic backend registrations and allows to modify them on the fly. + /// Useful in combination with in-memory backend handlers to allow handling dynamic backends + /// in memory as well. + pub(crate) dynamic_backend_interceptor: + Option>>, /// The TLS configuration for this execution. /// /// Populated prior to guest execution, and never modified. @@ -126,6 +135,8 @@ pub struct Session { config_path: Arc>, /// The ID for the client request being processed. req_id: u64, + /// The cache state to use. + pub(crate) cache: InMemoryCache, } impl Session { @@ -144,6 +155,9 @@ impl Session { config_path: Arc>, object_store: Arc, secret_stores: Arc, + endpoints: EndpointsMonitor, + dynamic_backend_interceptor: Option>>, + cache: InMemoryCache, ) -> Session { let (parts, body) = req.into_parts(); let downstream_req_original_headers = parts.headers.clone(); @@ -179,7 +193,21 @@ impl Session { secrets_by_name: PrimaryMap::new(), config_path, req_id, + dynamic_backend_interceptor, + cache, + } + .write_endpoints(endpoints) + } + + // Todo: Haxx, it's better to run this code in the constructor and initialize `log_endpoints_by_name` and `log_endpoints`. + fn write_endpoints(mut self, endpoints: EndpointsMonitor) -> Self { + for (name, sender) in endpoints.endpoints.read().unwrap().iter() { + let endpoint = LogEndpoint::new(name, Some(sender.clone())); + let handle = self.log_endpoints.push(endpoint); + self.log_endpoints_by_name.insert(name.to_owned(), handle); } + + self } // ----- Downstream Request API ----- @@ -244,6 +272,12 @@ impl Session { self.async_items.push(Some(AsyncItem::Body(body))).into() } + pub fn insert_cache_body(&mut self, cache_handle: types::CacheHandle) -> BodyHandle { + self.async_items + .push(Some(AsyncItem::CachingBody(cache_handle))) + .into() + } + /// Get a reference to a [`Body`][body], given its [`BodyHandle`][handle]. /// /// Returns a [`HandleError`][err] if the handle is not associated with a body in the session. @@ -327,6 +361,25 @@ impl Session { } } + pub fn is_caching_body(&self, handle: BodyHandle) -> bool { + if let Some(Some(body)) = self.async_items.get(handle.into()) { + body.is_caching() + } else { + false + } + } + + pub fn caching_body_handle( + &mut self, + handle: BodyHandle, + ) -> Result { + self.async_items + .get_mut(handle.into()) + .and_then(|item| item.as_ref().map(|item| item.resolve_cache_handle())) + .flatten() + .ok_or(HandleError::InvalidBodyHandle(handle)) + } + /// Get a mutable reference to the streaming body `Sender`, if and only if the provided /// `BodyHandle` is the downstream body being sent. /// @@ -528,7 +581,7 @@ impl Session { if let Some(handle) = self.log_endpoints_by_name.get(name).copied() { return handle; } - let endpoint = LogEndpoint::new(name); + let endpoint = LogEndpoint::new(name, None); let handle = self.log_endpoints.push(endpoint); self.log_endpoints_by_name.insert(name.to_owned(), handle); handle diff --git a/lib/src/session/async_item.rs b/lib/src/session/async_item.rs index c6600049..92e19e51 100644 --- a/lib/src/session/async_item.rs +++ b/lib/src/session/async_item.rs @@ -1,8 +1,9 @@ -use crate::object_store::ObjectStoreError; -use crate::{body::Body, error::Error, streaming_body::StreamingBody}; +use crate::{ + body::Body, error::Error, object_store::ObjectStoreError, streaming_body::StreamingBody, + wiggle_abi::types, +}; use anyhow::anyhow; -use futures::Future; -use futures::FutureExt; +use futures::{Future, FutureExt}; use http::Response; use tokio::sync::oneshot; @@ -45,6 +46,7 @@ impl PendingKvDeleteTask { /// body (writeable only). It is used within the body handle map in `Session`. #[derive(Debug)] pub enum AsyncItem { + CachingBody(types::CacheHandle), Body(Body), StreamingBody(StreamingBody), PendingReq(PeekableTask>), @@ -58,9 +60,21 @@ impl AsyncItem { matches!(self, Self::StreamingBody(_)) } + pub fn is_caching(&self) -> bool { + matches!(self, Self::CachingBody(_)) + } + + pub fn resolve_cache_handle(&self) -> Option { + match self { + Self::CachingBody(handle) => Some(*handle), + _ => None, + } + } + pub fn as_body(&self) -> Option<&Body> { match self { Self::Body(body) => Some(body), + Self::CachingBody(_) => panic!("as_body called"), _ => None, } } @@ -68,6 +82,7 @@ impl AsyncItem { pub fn as_body_mut(&mut self) -> Option<&mut Body> { match self { Self::Body(body) => Some(body), + Self::CachingBody(_) => panic!("as_body_mut called"), _ => None, } } @@ -75,6 +90,7 @@ impl AsyncItem { pub fn into_body(self) -> Option { match self { Self::Body(body) => Some(body), + Self::CachingBody(_) => panic!("into_body called"), _ => None, } } @@ -174,6 +190,7 @@ impl AsyncItem { match self { Self::StreamingBody(body) => body.await_ready().await, Self::Body(body) => body.await_ready().await, + Self::CachingBody(_) => (), Self::PendingReq(req) => req.await_ready().await, Self::PendingKvLookup(req) => req.0.await_ready().await, Self::PendingKvInsert(req) => req.0.await_ready().await, diff --git a/lib/src/upstream.rs b/lib/src/upstream.rs index 4bcbcb4f..fd36420f 100644 --- a/lib/src/upstream.rs +++ b/lib/src/upstream.rs @@ -265,24 +265,30 @@ pub fn send_request( req.headers_mut().insert(hyper::header::HOST, host); *req.uri_mut() = uri; + let handler = backend.handler.clone(); // Haxx let h2only = backend.grpc; + async move { - let mut builder = Client::builder(); + let basic_response = if let Some(handler) = handler { + handler.handle(req).await + } else { + let mut builder = Client::builder(); - if req.version() == Version::HTTP_2 { - builder.http2_only(true); - } + if req.version() == Version::HTTP_2 { + builder.http2_only(true); + } - let basic_response = builder - .set_host(false) - .http2_only(h2only) - .build(connector) - .request(req) - .await - .map_err(|e| { - eprintln!("Error: {:?}", e); - e - })?; + builder + .set_host(false) + .http2_only(h2only) + .build(connector) + .request(req) + .await + .map_err(|e| { + eprintln!("Error: {:?}", e); + e + })? + }; if try_decompression && basic_response diff --git a/lib/src/wiggle_abi/body_impl.rs b/lib/src/wiggle_abi/body_impl.rs index d2d85e16..2ecc69a7 100644 --- a/lib/src/wiggle_abi/body_impl.rs +++ b/lib/src/wiggle_abi/body_impl.rs @@ -1,9 +1,8 @@ //! fastly_body` hostcall implementations. -use http::{HeaderName, HeaderValue}; - use crate::wiggle_abi::headers::HttpHeaders; - +use http::{HeaderName, HeaderValue}; +use tracing::{event, Level}; use { crate::{ body::Body, @@ -33,11 +32,27 @@ impl FastlyHttpBody for Session { dest.send_chunk(chunk).await?; } dest.trailers.extend(trailers); + } else if self.is_caching_body(dest) { + // If we're writing to a caching body, we directly append to the buffer. + let cache_handle = self.caching_body_handle(dest).unwrap(); + let source_bytes = src.read_into_vec().await?; + + self.cache + .cache_entries + .write() + .unwrap() + .get_mut(cache_handle) + .map(|entry| { + entry + .as_mut() + .map(|entry| entry.body_bytes.extend(source_bytes)) + }); } else { let dest = self.body_mut(dest)?; dest.trailers.extend(trailers); dest.append(src); } + Ok(()) } @@ -90,6 +105,10 @@ impl FastlyHttpBody for Session { match end { BodyWriteEnd::Front => { // Only normal bodies can be front-written + if self.is_caching_body(body_handle) { + event!(Level::ERROR, "Attempted pushing front to a caching body"); + } + self.body_mut(body_handle)?.push_front(buf); } BodyWriteEnd::Back => { @@ -97,11 +116,20 @@ impl FastlyHttpBody for Session { self.streaming_body_mut(body_handle)? .send_chunk(buf) .await?; + } else if self.is_caching_body(body_handle) { + let cache_handle = self.caching_body_handle(body_handle).unwrap(); + self.cache + .cache_entries + .write() + .unwrap() + .get_mut(cache_handle) + .map(|entry| entry.as_mut().map(|entry| entry.body_bytes.extend(buf))); } else { self.body_mut(body_handle)?.push_back(buf); } } } + // Finally, return the number of bytes written, which is _always_ the full buffer Ok(buf .len() diff --git a/lib/src/wiggle_abi/cache.rs b/lib/src/wiggle_abi/cache.rs index f2972c13..d9f5b482 100644 --- a/lib/src/wiggle_abi/cache.rs +++ b/lib/src/wiggle_abi/cache.rs @@ -1,7 +1,10 @@ -use crate::session::Session; - -use super::fastly_cache::FastlyCache; -use super::{types, Error}; +use super::{ + fastly_cache::FastlyCache, + types::{self}, + Error, +}; +use crate::{body::Body, error::HandleError, in_memory_cache::not_found_handle, session::Session}; +use tracing::{event, Level}; #[allow(unused_variables)] impl FastlyCache for Session { @@ -11,9 +14,14 @@ impl FastlyCache for Session { options_mask: types::CacheLookupOptionsMask, options: &wiggle::GuestPtr<'a, types::CacheLookupOptions>, ) -> Result { - Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", - }) + let primary_key: Vec = cache_key.as_slice().unwrap().unwrap().to_vec(); + let options: types::CacheLookupOptions = options.read().unwrap(); + let req_parts = self.request_parts(options.request_headers)?; + + Ok(self + .cache + .get_entry(&primary_key, &req_parts.headers) + .unwrap_or(not_found_handle())) } fn insert<'a>( @@ -22,31 +30,69 @@ impl FastlyCache for Session { options_mask: types::CacheWriteOptionsMask, options: &wiggle::GuestPtr<'a, types::CacheWriteOptions<'a>>, ) -> Result { - Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", - }) + // TODO: Skipped over all the sanity checks usually done by similar code (see `req_impl`). + let options: types::CacheWriteOptions = options.read().unwrap(); + let key: Vec = cache_key.as_slice().unwrap().unwrap().to_vec(); + let parts = if options_mask.contains(types::CacheWriteOptionsMask::REQUEST_HEADERS) { + Some(self.request_parts(options.request_headers)?) + } else { + None + }; + + let cache_handle = self.cache.insert(key, options_mask, options, parts)?; + Ok(self.insert_cache_body(cache_handle)) } + /// Stub delegating to regular lookup. fn transaction_lookup<'a>( &mut self, cache_key: &wiggle::GuestPtr<'a, [u8]>, options_mask: types::CacheLookupOptionsMask, options: &wiggle::GuestPtr<'a, types::CacheLookupOptions>, ) -> Result { - Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", - }) + // Check if the entry already exists. + let handle = self.lookup(cache_key, options_mask, options)?; + if handle == not_found_handle() { + let key: Vec = cache_key.as_slice().unwrap().unwrap().to_vec(); + let handle = self.cache.cache_entries.write().unwrap().push(None); + self.cache.pending_tx.write().unwrap().insert(handle, key); + Ok(handle) + } else { + Ok(handle) + } } + /// Stub delegating to regular insert. fn transaction_insert<'a>( &mut self, handle: types::CacheHandle, options_mask: types::CacheWriteOptionsMask, options: &wiggle::GuestPtr<'a, types::CacheWriteOptions<'a>>, ) -> Result { - Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", - }) + let key = self + .cache + .pending_tx + .read() + .unwrap() + .get(&handle) + .map(ToOwned::to_owned); + + if let Some(pending_tx_key) = key { + let options: types::CacheWriteOptions = options.read().unwrap(); + let parts = if options_mask.contains(types::CacheWriteOptionsMask::REQUEST_HEADERS) { + Some(self.request_parts(options.request_headers)?) + } else { + None + }; + + let cache_handle = + self.cache + .insert(pending_tx_key.to_owned(), options_mask, options, parts)?; + + Ok(self.insert_cache_body(cache_handle)) + } else { + Err(HandleError::InvalidCacheHandle(handle).into()) + } } fn transaction_insert_and_stream_back<'a>( @@ -55,8 +101,9 @@ impl FastlyCache for Session { options_mask: types::CacheWriteOptionsMask, options: &wiggle::GuestPtr<'a, types::CacheWriteOptions<'a>>, ) -> Result<(types::BodyHandle, types::CacheHandle), Error> { + event!(Level::ERROR, "Tx insert and stream back not implemented"); Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", + msg: "Tx insert and stream back not implemented", }) } @@ -66,39 +113,80 @@ impl FastlyCache for Session { options_mask: types::CacheWriteOptionsMask, options: &wiggle::GuestPtr<'a, types::CacheWriteOptions<'a>>, ) -> Result<(), Error> { + event!(Level::ERROR, "Tx update not implemented"); Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", + msg: "Tx update not implemented", }) } fn transaction_cancel(&mut self, handle: types::CacheHandle) -> Result<(), Error> { + event!(Level::ERROR, "Tx cancel not implemented"); Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", + msg: "Tx cancel not implemented", }) } fn close(&mut self, handle: types::CacheHandle) -> Result<(), Error> { - Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", - }) + Ok(()) } fn get_state(&mut self, handle: types::CacheHandle) -> Result { - Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", - }) + if let Some(Some(entry)) = self.cache.cache_entries.read().unwrap().get(handle) { + // Entry found. + let mut state = types::CacheLookupState::FOUND; + + if entry.is_stale() { + state |= types::CacheLookupState::STALE + } else { + // If stale, entry must be updated. + state |= types::CacheLookupState::MUST_INSERT_OR_UPDATE + } + + if entry.is_usable() { + state |= types::CacheLookupState::USABLE; + } else { + // If not usable, caller must insert / refresh the cache entry. + state |= types::CacheLookupState::MUST_INSERT_OR_UPDATE + } + + Ok(state) + } else { + // Entry not found, entry must be inserted. + Ok(types::CacheLookupState::MUST_INSERT_OR_UPDATE) + } } fn get_user_metadata<'a>( &mut self, handle: types::CacheHandle, user_metadata_out_ptr: &wiggle::GuestPtr<'a, u8>, - user_metadata_out_len: u32, + user_metadata_out_len: u32, // TODO: Is this the maximum allowed length? nwritten_out: &wiggle::GuestPtr<'a, u32>, ) -> Result<(), Error> { - Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", - }) + if let Some(Some(entry)) = self.cache.cache_entries.read().unwrap().get(handle) { + if entry.user_metadata.len() > user_metadata_out_len as usize { + nwritten_out.write(entry.user_metadata.len().try_into().unwrap_or(0))?; + return Err(Error::BufferLengthError { + buf: "user_metadata_out", + len: "user_metadata_out_len", + }); + } + + let user_metadata_len = u32::try_from(entry.user_metadata.len()) + .expect("smaller than user_metadata_out_len means it must fit"); + + let mut metadata_out = user_metadata_out_ptr + .as_array(user_metadata_len) + .as_slice_mut()? + .ok_or(Error::SharedMemory)?; + + metadata_out.copy_from_slice(&entry.user_metadata); + nwritten_out.write(user_metadata_len)?; + + Ok(()) + } else { + Err(HandleError::InvalidCacheHandle(handle).into()) + } } fn get_body( @@ -107,17 +195,30 @@ impl FastlyCache for Session { options_mask: types::CacheGetBodyOptionsMask, options: &types::CacheGetBodyOptions, ) -> Result { - Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", - }) + let body = self + .cache + .cache_entries + .read() + .unwrap() + .get(handle) + .and_then(|entry| entry.as_ref().map(|entry| entry.body_bytes.clone())); + + if let Some(body) = body { + // Re-insert a body into the session to allow subsequent reads. + let body_handle = self.insert_body(Body::from(body)); + Ok(body_handle) + } else { + Err(HandleError::InvalidCacheHandle(handle).into()) + } } fn get_length( &mut self, handle: types::CacheHandle, ) -> Result { + event!(Level::ERROR, "Cache entry length get not implemented."); Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", + msg: "Cache entry length get not implemented.", }) } @@ -125,29 +226,36 @@ impl FastlyCache for Session { &mut self, handle: types::CacheHandle, ) -> Result { - Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", - }) + if let Some(Some(entry)) = self.cache.cache_entries.read().unwrap().get(handle) { + Ok(types::CacheDurationNs::from(entry.max_age_ns.unwrap_or(0))) + } else { + Err(HandleError::InvalidCacheHandle(handle).into()) + } } fn get_stale_while_revalidate_ns( &mut self, handle: types::CacheHandle, ) -> Result { - Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", - }) + if let Some(Some(entry)) = self.cache.cache_entries.read().unwrap().get(handle) { + Ok(types::CacheDurationNs::from(entry.swr_ns.unwrap_or(0))) + } else { + Err(HandleError::InvalidCacheHandle(handle).into()) + } } fn get_age_ns(&mut self, handle: types::CacheHandle) -> Result { - Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", - }) + if let Some(Some(entry)) = self.cache.cache_entries.read().unwrap().get(handle) { + Ok(types::CacheDurationNs::from(entry.age_ns())) + } else { + Err(HandleError::InvalidCacheHandle(handle).into()) + } } fn get_hits(&mut self, handle: types::CacheHandle) -> Result { + event!(Level::ERROR, "Cache entry get_hits not implemented."); Err(Error::Unsupported { - msg: "Cache API primitives not yet supported", + msg: "Cache entry get_hits not implemented.", }) } } diff --git a/lib/src/wiggle_abi/entity.rs b/lib/src/wiggle_abi/entity.rs index 1641ade8..d23f7ef6 100644 --- a/lib/src/wiggle_abi/entity.rs +++ b/lib/src/wiggle_abi/entity.rs @@ -3,7 +3,7 @@ //! [ref]: https://docs.rs/cranelift-entity/latest/cranelift_entity/trait.EntityRef.html use super::types::{ - AsyncItemHandle, BodyHandle, DictionaryHandle, EndpointHandle, ObjectStoreHandle, + AsyncItemHandle, BodyHandle, CacheHandle, DictionaryHandle, EndpointHandle, ObjectStoreHandle, PendingRequestHandle, RequestHandle, ResponseHandle, SecretHandle, SecretStoreHandle, }; @@ -49,3 +49,4 @@ wiggle_entity!(ObjectStoreHandle); wiggle_entity!(SecretStoreHandle); wiggle_entity!(SecretHandle); wiggle_entity!(AsyncItemHandle); +wiggle_entity!(CacheHandle); diff --git a/lib/src/wiggle_abi/req_impl.rs b/lib/src/wiggle_abi/req_impl.rs index 3d6737a7..f71c4609 100644 --- a/lib/src/wiggle_abi/req_impl.rs +++ b/lib/src/wiggle_abi/req_impl.rs @@ -2,12 +2,11 @@ use super::types::SendErrorDetail; use super::SecretStoreError; -use crate::config::ClientCertInfo; +use crate::config::{Backend, ClientCertInfo}; use crate::secret_store::SecretLookup; use { crate::{ - config::Backend, error::Error, session::{AsyncItem, PeekableTask, Session, ViceroyRequestMetadata}, upstream, @@ -389,6 +388,12 @@ impl FastlyHttpReq for Session { use_sni, grpc, client_cert, + handler: None, + }; + + let new_backend = match self.dynamic_backend_interceptor { + Some(ref interceptor) => interceptor.register(new_backend), + None => new_backend, }; if !self.add_backend(name, new_backend) {