Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/tests/integration/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,8 @@ impl Test {
.await
.map(|result| {
match result {
(resp, None) => resp,
(_, Some(err)) => {
(resp, None, _) => resp,
(_, Some(err), _) => {
// Splat the string representation of the runtime error into a synthetic
// 500. This is a bit of a hack, but good enough to check for expected error
// strings.
Expand Down
1 change: 1 addition & 0 deletions cli/tests/integration/common/backends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl TestBackends {
grpc: false,
client_cert: None,
ca_certs: vec![],
handler: None,
};
backends.insert(name.to_string(), Arc::new(backend_config));
}
Expand Down
39 changes: 38 additions & 1 deletion src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::str;
use std::{collections::HashSet, sync::Arc, time::Duration};
use std::{collections::HashSet, fmt, sync::Arc, time::Duration};

use bytes::Bytes;
#[cfg(test)]
Expand Down Expand Up @@ -129,6 +129,20 @@ impl TryFrom<&str> for CacheKey {
}
}

impl fmt::Display for CacheKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match str::from_utf8(&self.0) {
Ok(s) => write!(f, "{s}"),
Err(_) => {
for byte in &self.0 {
write!(f, "{byte:02x}")?;
}
Ok(())
}
}
}
}

/// The result of a lookup: the object (if found), and/or an obligation to fetch.
#[derive(Debug)]
pub struct CacheEntry {
Expand Down Expand Up @@ -285,6 +299,7 @@ impl Found {
// Explain some about how this works:
// - Request collapsing
// - Stale-while-revalidate
#[derive(Debug)]
pub struct Cache {
inner: moka::future::Cache<CacheKey, Arc<CacheKeyObjects>>,
}
Expand Down Expand Up @@ -370,6 +385,28 @@ impl Cache {
.map(|(_, entry)| entry.purge(&key, soft_purge))
.sum()
}

/// Purge/soft-purge all cache entries corresponding to the given surrogate key.
/// Returns the number of entries (variants) purged.
///
/// Note: this does not block concurrent reads _or inserts_; an insertion can race with the
/// purge.
pub fn purge_all(&self, keys: Vec<SurrogateKey>, soft_purge: bool) -> usize {
keys.into_iter()
.map(|key| self.purge(key, soft_purge))
.sum()
}
}

impl fmt::Display for Cache {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Cache {{")?;
for (key, objects) in self.inner.iter() {
writeln!(f, " key: {key}")?;
write!(f, "{:4}", &*objects)?;
}
write!(f, "}}")
}
}

/// Options that can be applied to a write, e.g. insert or transaction_insert.
Expand Down
90 changes: 90 additions & 0 deletions src/cache/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::cache::{variance::VaryRule, Error};
use bytes::Bytes;
use std::{
collections::{HashMap, VecDeque},
fmt,
future::Future,
sync::{atomic::AtomicBool, Arc},
time::{Duration, Instant},
Expand Down Expand Up @@ -123,6 +124,44 @@ impl ObjectMeta {
}
}

impl fmt::Display for ObjectMeta {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let indent = f.width().unwrap_or(0);
let i = " ".repeat(indent);

writeln!(f, "{i}ObjectMeta:")?;
writeln!(f, "{i} age: {:?}", self.age())?;
writeln!(f, "{i} max_age: {:?}", self.max_age)?;
writeln!(f, "{i} stale_while_revalidate: {:?}", self.stale_while_revalidate)?;
writeln!(f, "{i} fresh: {}", self.is_fresh())?;
writeln!(f, "{i} usable: {}", self.is_usable())?;
writeln!(f, "{i} vary_rule: {}", self.vary_rule)?;
writeln!(f, "{i} surrogate_keys: {}", self.surrogate_keys)?;
writeln!(f, "{i} length: {:?}", self.length)?;
writeln!(
f,
"{i} soft_purge: {}",
self.soft_purge.load(std::sync::atomic::Ordering::SeqCst)
)?;

// Show request headers
writeln!(f, "{i} request_headers:")?;
for (name, value) in self.request_headers.iter() {
writeln!(f, "{i} {}: {}", name, value.to_str().unwrap_or("<binary>"))?;
}

// Show user_metadata as UTF-8 if possible
if !self.user_metadata.is_empty() {
match std::str::from_utf8(&self.user_metadata) {
Ok(s) => writeln!(f, "{i} user_metadata: {s}")?,
Err(_) => writeln!(f, "{i} user_metadata: ({} bytes)", self.user_metadata.len())?,
}
}

Ok(())
}
}

/// Object(s) indexed by a CacheKey.
#[derive(Debug, Default)]
pub struct CacheKeyObjects(watch::Sender<CacheKeyObjectsInner>);
Expand Down Expand Up @@ -481,6 +520,57 @@ struct CacheValue {
obligated: bool,
}

impl fmt::Display for CacheKeyObjects {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let indent = f.width().unwrap_or(0);
let inner = self.0.borrow();
fmt::Display::fmt(&IndentWrapper(&*inner, indent), f)
}
}

/// Helper to pass indent through Display since CacheKeyObjectsInner is private.
struct IndentWrapper<'a>(&'a CacheKeyObjectsInner, usize);

impl fmt::Display for IndentWrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let inner = self.0;
let indent = self.1;
let i = " ".repeat(indent);

// Vary rules
writeln!(f, "{i}vary_rules:")?;
for rule in inner.vary_rules.iter() {
writeln!(f, "{i} - {rule}")?;
}

// Objects (variants)
writeln!(f, "{i}variants:")?;
for (variant, value) in inner.objects.iter() {
writeln!(f, "{i} {variant}:")?;
if let Some(data) = &value.present {
write!(f, "{:indent$}", data, indent = indent + 4)?;
} else {
writeln!(f, "{i} (empty)")?;
}
writeln!(f, "{i} obligated: {}", value.obligated)?;
}

Ok(())
}
}

impl fmt::Display for CacheData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let indent = f.width().unwrap_or(0);
write!(f, "{:indent$}", self.meta, indent = indent)?;
let i = " ".repeat(indent);
if let Some(len) = self.length() {
writeln!(f, "{i}body_length: {len}")?;
}
Ok(())
}
}

/// An obligation to fetch & update the cache.
#[derive(Debug)]
pub struct Obligation {
Expand Down
22 changes: 21 additions & 1 deletion src/cache/variance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! The core cache API provides the bones of this.
//!

use std::{collections::HashSet, str::FromStr};
use std::{collections::HashSet, fmt, str::FromStr};

use bytes::{Bytes, BytesMut};
pub use http::HeaderName;
Expand Down Expand Up @@ -84,6 +84,26 @@ pub struct Variant {
signature: Bytes,
}

impl fmt::Display for VaryRule {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.headers.is_empty() {
write!(f, "VaryRule([])")
} else {
let headers: Vec<&str> = self.headers.iter().map(|h| h.as_str()).collect();
write!(f, "VaryRule([{}])", headers.join(", "))
}
}
}

impl fmt::Display for Variant {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match std::str::from_utf8(&self.signature) {
Ok(s) => write!(f, "Variant({s})"),
Err(_) => write!(f, "Variant({:?})", self.signature),
}
}
}

#[cfg(test)]
mod tests {
use super::VaryRule;
Expand Down
1 change: 1 addition & 0 deletions src/component/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ pub(crate) async fn register_dynamic_backend(
grpc,
client_cert,
ca_certs,
handler: None,
};

if !session.add_backend(name, new_backend) {
Expand Down
1 change: 1 addition & 0 deletions src/component/shielding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(crate) fn backend_for_shield(
grpc: false,
client_cert: None,
ca_certs: Vec::new(),
handler: None,
};

if !session.add_backend(&new_name, new_backend) {
Expand Down
5 changes: 4 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ pub use crate::acl::Acls;
/// Types and deserializers for backend configuration settings.
mod backends;

pub use self::backends::{Backend, ClientCertError, ClientCertInfo};
pub use self::backends::{
Backend, ClientCertError, ClientCertInfo, DynamicBackendRegistrationInterceptor, Handler,
InMemoryBackendHandler,
};

pub type Backends = HashMap<String, Arc<Backend>>;

Expand Down
42 changes: 40 additions & 2 deletions src/config/backends.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,48 @@
mod client_cert_info;

use {
hyper::{header::HeaderValue, Uri},
std::{collections::HashMap, sync::Arc},
crate::body::Body,
async_trait::async_trait,
hyper::{header::HeaderValue, Request, Response, Uri},
std::{collections::HashMap, fmt, ops::Deref, sync::Arc},
};

pub use self::client_cert_info::{ClientCertError, ClientCertInfo};

/// A trait for handling backend requests in-memory, without making real HTTP calls.
#[async_trait]
pub trait InMemoryBackendHandler: Send + Sync + 'static {
async fn handle(&self, req: Request<Body>) -> Response<hyper::Body>;
}

/// A wrapper around an `InMemoryBackendHandler` trait object.
#[derive(Clone)]
pub struct Handler(Arc<Box<dyn InMemoryBackendHandler>>);

impl Handler {
pub fn new(handler: Box<dyn InMemoryBackendHandler>) -> Self {
Handler(Arc::new(handler))
}
}

impl Deref for Handler {
type Target = dyn InMemoryBackendHandler;
fn deref(&self) -> &Self::Target {
self.0.as_ref().as_ref()
}
}

impl fmt::Debug for Handler {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Handler").finish_non_exhaustive()
}
}

/// A trait for intercepting dynamic backend registration.
pub trait DynamicBackendRegistrationInterceptor: Send + Sync + 'static {
fn register(&self, backend: Backend) -> Backend;
}

/// A single backend definition.
#[derive(Clone, Debug)]
pub struct Backend {
Expand All @@ -17,6 +53,7 @@ pub struct Backend {
pub grpc: bool,
pub client_cert: Option<ClientCertInfo>,
pub ca_certs: Vec<rustls::Certificate>,
pub handler: Option<Handler>,
}

/// A map of [`Backend`] definitions, keyed by their name.
Expand Down Expand Up @@ -159,6 +196,7 @@ mod deserialization {
client_cert,
grpc,
ca_certs,
handler: None,
})
}
}
Expand Down
Loading