From 814d4bdc9d30e1d6472292bab3422fe808177c73 Mon Sep 17 00:00:00 2001 From: johnny Date: Fri, 13 Feb 2026 21:53:06 -0500 Subject: [PATCH] feat: add observability, error context, LRU eviction, alloy interop, and circuit breaker enhancements - Add optional `observe` feature with metrics module (counters, histograms, gauges) - Add `ResultExt` trait and `context()` method on `AzothError` for richer error reporting - Refactor `PreflightCache` to support configurable FIFO and LRU eviction policies - Add `From` impls for `azoth::U256` <-> `alloy_primitives::U256` under `alloy` feature - Add public `record_success()` / `record_failure()` methods on `CircuitBreaker` - Deprecate `projection_connection()` in favor of read pool `query()` / `query_async()` - Re-export `EvictionPolicy` and `ResultExt` from top-level crate Co-authored-by: Cursor --- crates/azoth-core/Cargo.toml | 7 + crates/azoth-core/src/error.rs | 38 ++ crates/azoth-core/src/lib.rs | 1 + crates/azoth-core/src/observe.rs | 128 +++++++ crates/azoth-lmdb/src/lib.rs | 1 + crates/azoth-lmdb/src/preflight_cache.rs | 330 ++++++++++++++++-- crates/azoth-vector/tests/integration_test.rs | 9 +- crates/azoth/Cargo.toml | 4 +- crates/azoth/benches/basic_benchmark.rs | 1 + .../benches/preflight_cache_benchmark.rs | 30 +- crates/azoth/src/circuit_breaker.rs | 28 +- crates/azoth/src/db.rs | 4 + crates/azoth/src/lib.rs | 5 +- crates/azoth/src/typed_values.rs | 69 ++++ 14 files changed, 618 insertions(+), 37 deletions(-) create mode 100644 crates/azoth-core/src/observe.rs diff --git a/crates/azoth-core/Cargo.toml b/crates/azoth-core/Cargo.toml index d328768..99dd47d 100644 --- a/crates/azoth-core/Cargo.toml +++ b/crates/azoth-core/Cargo.toml @@ -24,6 +24,13 @@ chrono = { workspace = true } rusqlite = { workspace = true } parking_lot = { workspace = true } +# Optional metrics support +metrics = { version = "0.24", optional = true } + +[features] +default = [] +observe = ["metrics"] + [dev-dependencies] tempfile = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/azoth-core/src/error.rs b/crates/azoth-core/src/error.rs index ed2e16f..47f2e63 100644 --- a/crates/azoth-core/src/error.rs +++ b/crates/azoth-core/src/error.rs @@ -71,6 +71,44 @@ pub enum AzothError { pub type Result = std::result::Result; +impl AzothError { + /// Wrap this error with additional context. + /// + /// The context string is prepended to the error message, producing a + /// chain like `"during balance update: Transaction error: ..."`. + /// + /// # Example + /// ```ignore + /// db.write_txn() + /// .map_err(|e| e.context("during balance update"))?; + /// ``` + pub fn context(self, msg: impl Into) -> Self { + let ctx = msg.into(); + AzothError::Internal(format!("{}: {}", ctx, self)) + } +} + +/// Extension trait to add `.context()` on `Result`. +/// +/// Mirrors the ergonomics of `anyhow::Context`. +pub trait ResultExt { + /// If the result is `Err`, wrap the error with additional context. + fn context(self, msg: impl Into) -> Result; + + /// If the result is `Err`, wrap the error with a lazily-evaluated context. + fn with_context String>(self, f: F) -> Result; +} + +impl ResultExt for Result { + fn context(self, msg: impl Into) -> Result { + self.map_err(|e| e.context(msg)) + } + + fn with_context String>(self, f: F) -> Result { + self.map_err(|e| e.context(f())) + } +} + // Custom Error Types: // // Azoth supports custom error types through the `#[from] anyhow::Error` variant. diff --git a/crates/azoth-core/src/lib.rs b/crates/azoth-core/src/lib.rs index 0b97f84..c47cbfb 100644 --- a/crates/azoth-core/src/lib.rs +++ b/crates/azoth-core/src/lib.rs @@ -17,6 +17,7 @@ pub mod config; pub mod error; pub mod event_log; pub mod lock_manager; +pub mod observe; pub mod traits; pub mod types; diff --git a/crates/azoth-core/src/observe.rs b/crates/azoth-core/src/observe.rs new file mode 100644 index 0000000..4693194 --- /dev/null +++ b/crates/azoth-core/src/observe.rs @@ -0,0 +1,128 @@ +//! Optional metrics instrumentation for Azoth. +//! +//! When the `observe` feature is enabled, key operations emit counters, +//! histograms, and gauges via the [`metrics`] crate. A downstream +//! application must install a metrics recorder (e.g. `metrics-exporter-prometheus`) +//! to collect the data. +//! +//! When the feature is **not** enabled every function in this module is a +//! zero-cost no-op. + +/// Record a transaction commit (counter + latency histogram). +/// +/// - `azoth.transaction.commits_total` – incremented on every commit +/// - `azoth.transaction.commit_duration_seconds` – histogram of commit latency +#[inline] +pub fn record_commit(duration: std::time::Duration) { + #[cfg(feature = "observe")] + { + metrics::counter!("azoth.transaction.commits_total").increment(1); + metrics::histogram!("azoth.transaction.commit_duration_seconds") + .record(duration.as_secs_f64()); + } + #[cfg(not(feature = "observe"))] + { + let _ = duration; + } +} + +/// Record a preflight validation (counter + duration). +/// +/// - `azoth.preflight.total` – counter +/// - `azoth.preflight.duration_seconds` – histogram +#[inline] +pub fn record_preflight(duration: std::time::Duration, success: bool) { + #[cfg(feature = "observe")] + { + let outcome = if success { "ok" } else { "fail" }; + metrics::counter!("azoth.preflight.total", "outcome" => outcome).increment(1); + metrics::histogram!("azoth.preflight.duration_seconds").record(duration.as_secs_f64()); + } + #[cfg(not(feature = "observe"))] + { + let _ = (duration, success); + } +} + +/// Record a preflight cache hit or miss. +/// +/// - `azoth.preflight_cache.lookups_total` – counter with `result` label (`hit` / `miss`) +#[inline] +pub fn record_cache_lookup(hit: bool) { + #[cfg(feature = "observe")] + { + let result = if hit { "hit" } else { "miss" }; + metrics::counter!("azoth.preflight_cache.lookups_total", "result" => result).increment(1); + } + #[cfg(not(feature = "observe"))] + { + let _ = hit; + } +} + +/// Set the current preflight cache size gauge. +/// +/// - `azoth.preflight_cache.size` – gauge +#[inline] +pub fn set_cache_size(size: usize) { + #[cfg(feature = "observe")] + { + metrics::gauge!("azoth.preflight_cache.size").set(size as f64); + } + #[cfg(not(feature = "observe"))] + { + let _ = size; + } +} + +/// Record a projector run (counter + duration + events processed). +/// +/// - `azoth.projector.runs_total` – counter +/// - `azoth.projector.run_duration_seconds` – histogram +/// - `azoth.projector.events_processed_total` – counter +#[inline] +pub fn record_projector_run(duration: std::time::Duration, events_processed: u64) { + #[cfg(feature = "observe")] + { + metrics::counter!("azoth.projector.runs_total").increment(1); + metrics::histogram!("azoth.projector.run_duration_seconds").record(duration.as_secs_f64()); + metrics::counter!("azoth.projector.events_processed_total").increment(events_processed); + } + #[cfg(not(feature = "observe"))] + { + let _ = (duration, events_processed); + } +} + +/// Record a lock acquisition wait time. +/// +/// - `azoth.lock.wait_duration_seconds` – histogram +#[inline] +pub fn record_lock_wait(duration: std::time::Duration) { + #[cfg(feature = "observe")] + { + metrics::histogram!("azoth.lock.wait_duration_seconds").record(duration.as_secs_f64()); + } + #[cfg(not(feature = "observe"))] + { + let _ = duration; + } +} + +/// Record a backup operation. +/// +/// - `azoth.backup.total` – counter with `outcome` label +/// - `azoth.backup.duration_seconds` – histogram +#[inline] +pub fn record_backup(duration: std::time::Duration, success: bool) { + #[cfg(feature = "observe")] + { + let outcome = if success { "ok" } else { "fail" }; + metrics::counter!("azoth.backup.total", "outcome" => outcome).increment(1); + metrics::histogram!("azoth.backup.duration_seconds").record(duration.as_secs_f64()); + } + #[cfg(not(feature = "observe"))] + { + let _ = (duration, success); + } +} diff --git a/crates/azoth-lmdb/src/lib.rs b/crates/azoth-lmdb/src/lib.rs index 37b14bd..12d3458 100644 --- a/crates/azoth-lmdb/src/lib.rs +++ b/crates/azoth-lmdb/src/lib.rs @@ -19,6 +19,7 @@ pub mod state_iter; pub mod store; pub mod txn; +pub use preflight_cache::EvictionPolicy; pub use read_pool::{LmdbReadPool, PooledLmdbReadTxn}; pub use store::LmdbCanonicalStore; pub use txn::{LmdbReadTxn, LmdbWriteTxn}; diff --git a/crates/azoth-lmdb/src/preflight_cache.rs b/crates/azoth-lmdb/src/preflight_cache.rs index 76635b8..771eaf1 100644 --- a/crates/azoth-lmdb/src/preflight_cache.rs +++ b/crates/azoth-lmdb/src/preflight_cache.rs @@ -2,21 +2,34 @@ //! //! This module provides an optional cache to speed up preflight validation by caching //! frequently accessed state keys. The cache is global, thread-safe, and supports: -//! - **FIFO eviction** when capacity is reached (oldest insertion is evicted first) +//! - **FIFO eviction** (default) – oldest insertion is evicted first +//! - **LRU eviction** – least-recently-*read* key is evicted first //! - TTL-based expiration for stale entries //! - Invalidation on transaction commit for modified keys //! -//! **Note**: The eviction policy is FIFO (First In, First Out), not LRU. This means -//! a frequently-read key can still be evicted if it was inserted earliest. For most -//! workloads this is acceptable because the TTL ensures freshness, and the cache's -//! primary goal is to reduce LMDB reads for "just written" keys during preflight. +//! ## Choosing an eviction policy +//! +//! | Policy | Best for | Trade-off | +//! |--------|----------|-----------| +//! | `Fifo` | Write-heavy, short-lived keys (e.g. event processing) | Simpler, lower overhead | +//! | `Lru` | Read-heavy, hot-key workloads (e.g. bank balances) | Slightly more overhead per `get` | use dashmap::DashMap; use parking_lot::Mutex; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; +/// Eviction policy for the preflight cache. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum EvictionPolicy { + /// First-In, First-Out: evict the oldest inserted key. + #[default] + Fifo, + /// Least-Recently-Used: evict the key that hasn't been read the longest. + Lru, +} + /// A cached value, representing either a found value or a non-existent key. #[derive(Clone, Debug)] pub enum CachedValue { @@ -46,33 +59,193 @@ impl CacheEntry { } } +// --------------------------------------------------------------------------- +// LRU tracking with a doubly-linked list (intrusive, index-based) +// --------------------------------------------------------------------------- + +/// Index into the LRU node slab. +type NodeIdx = usize; + +/// A node in the LRU doubly-linked list. +struct LruNode { + key: Vec, + prev: Option, + next: Option, +} + +/// Minimal doubly-linked list for O(1) promote / evict-tail. +/// +/// Stored behind a single `Mutex` alongside a key→index map so that +/// `promote` and `evict` are O(1) amortised. +struct LruList { + nodes: Vec, + /// Free-list of recycled node slots. + free: Vec, + head: Option, + tail: Option, + /// Reverse map: key → node index (for O(1) promote on read). + index: HashMap, NodeIdx>, +} + +impl LruList { + fn with_capacity(cap: usize) -> Self { + Self { + nodes: Vec::with_capacity(cap), + free: Vec::new(), + head: None, + tail: None, + index: HashMap::with_capacity(cap), + } + } + + /// Push a key to the front (most-recently-used). Returns the node index. + fn push_front(&mut self, key: Vec) -> NodeIdx { + let idx = if let Some(free_idx) = self.free.pop() { + self.nodes[free_idx] = LruNode { + key: key.clone(), + prev: None, + next: self.head, + }; + free_idx + } else { + let idx = self.nodes.len(); + self.nodes.push(LruNode { + key: key.clone(), + prev: None, + next: self.head, + }); + idx + }; + + if let Some(old_head) = self.head { + self.nodes[old_head].prev = Some(idx); + } + self.head = Some(idx); + if self.tail.is_none() { + self.tail = Some(idx); + } + self.index.insert(key, idx); + idx + } + + /// Move an existing node to the front (most-recently-used). + fn promote(&mut self, key: &[u8]) { + let Some(&idx) = self.index.get(key) else { + return; + }; + if self.head == Some(idx) { + return; // already at front + } + self.detach(idx); + // Re-attach at head + self.nodes[idx].prev = None; + self.nodes[idx].next = self.head; + if let Some(old_head) = self.head { + self.nodes[old_head].prev = Some(idx); + } + self.head = Some(idx); + if self.tail.is_none() { + self.tail = Some(idx); + } + } + + /// Evict the tail (least-recently-used). Returns the evicted key. + fn evict_tail(&mut self) -> Option> { + let tail_idx = self.tail?; + let key = self.nodes[tail_idx].key.clone(); + self.detach(tail_idx); + self.index.remove(&key); + self.free.push(tail_idx); + Some(key) + } + + /// Remove a specific key from the list. + fn remove(&mut self, key: &[u8]) { + if let Some(idx) = self.index.remove(key) { + self.detach(idx); + self.free.push(idx); + } + } + + /// Clear the entire list. + fn clear(&mut self) { + self.nodes.clear(); + self.free.clear(); + self.head = None; + self.tail = None; + self.index.clear(); + } + + /// Detach a node from the linked list (does NOT remove from index/free-list). + fn detach(&mut self, idx: NodeIdx) { + let prev = self.nodes[idx].prev; + let next = self.nodes[idx].next; + + if let Some(p) = prev { + self.nodes[p].next = next; + } else { + self.head = next; + } + if let Some(n) = next { + self.nodes[n].prev = prev; + } else { + self.tail = prev; + } + self.nodes[idx].prev = None; + self.nodes[idx].next = None; + } +} + /// Thread-safe in-memory cache for preflight reads. /// -/// Uses DashMap for lock-free concurrent access and tracks insertion order -/// with an O(1) bounded eviction queue. +/// Uses `DashMap` for lock-free concurrent value access and a secondary +/// structure (FIFO queue or LRU list) for bounded eviction. pub struct PreflightCache { cache: Arc, CacheEntry>>, capacity: usize, ttl: Duration, enabled: bool, - /// FIFO queue used to bound cache size without O(n) scans. - eviction_queue: Arc>>>, + policy: EvictionPolicy, + /// FIFO queue (used when policy == Fifo). + fifo_queue: Arc>>>, + /// LRU list (used when policy == Lru). + lru_list: Arc>, } impl PreflightCache { /// Create a new preflight cache with the given configuration. /// + /// Uses the default FIFO eviction policy. + /// /// # Arguments /// * `capacity` - Maximum number of entries (default: 10,000) /// * `ttl_secs` - Time-to-live for entries in seconds (default: 60) /// * `enabled` - Whether the cache is enabled (default: true) pub fn new(capacity: usize, ttl_secs: u64, enabled: bool) -> Self { + Self::with_policy(capacity, ttl_secs, enabled, EvictionPolicy::Fifo) + } + + /// Create a new preflight cache with an explicit eviction policy. + /// + /// # Arguments + /// * `capacity` - Maximum number of entries + /// * `ttl_secs` - Time-to-live for entries in seconds + /// * `enabled` - Whether the cache is enabled + /// * `policy` - Eviction policy (`Fifo` or `Lru`) + pub fn with_policy( + capacity: usize, + ttl_secs: u64, + enabled: bool, + policy: EvictionPolicy, + ) -> Self { Self { cache: Arc::new(DashMap::with_capacity(capacity)), capacity, ttl: Duration::from_secs(ttl_secs), enabled, - eviction_queue: Arc::new(Mutex::new(VecDeque::with_capacity(capacity))), + policy, + fifo_queue: Arc::new(Mutex::new(VecDeque::with_capacity(capacity))), + lru_list: Arc::new(Mutex::new(LruList::with_capacity(capacity))), } } @@ -83,6 +256,9 @@ impl PreflightCache { /// Get a value from the cache if it exists and hasn't expired. /// + /// When LRU eviction is active, this promotes the key to + /// most-recently-used. + /// /// Returns `None` if the cache is disabled, the key is not in the cache, /// or the entry has expired. pub fn get(&self, key: &[u8]) -> Option { @@ -99,24 +275,47 @@ impl PreflightCache { return None; } - Some(entry.value.clone()) + let value = entry.value.clone(); + drop(entry); + + // LRU: promote on read + if self.policy == EvictionPolicy::Lru { + self.lru_list.lock().promote(key); + } + + Some(value) } /// Insert a value into the cache. /// - /// If the cache is at capacity, evicts the oldest entry (FIFO order). + /// If the cache is at capacity, evicts according to the active policy. /// This is a no-op if the cache is disabled. pub fn insert(&self, key: Vec, value: CachedValue) { if !self.enabled { return; } - if self.cache.len() >= self.capacity && !self.cache.contains_key(&key) { + let is_new = !self.cache.contains_key(&key); + + if self.cache.len() >= self.capacity && is_new { self.evict_one(); } self.cache.insert(key.clone(), CacheEntry::new(value)); - self.eviction_queue.lock().push_back(key); + + match self.policy { + EvictionPolicy::Fifo => { + self.fifo_queue.lock().push_back(key); + } + EvictionPolicy::Lru => { + let mut lru = self.lru_list.lock(); + // Remove old entry if updating, then push to front. + if !is_new { + lru.remove(&key); + } + lru.push_front(key); + } + } } /// Invalidate (remove) specific keys from the cache. @@ -129,15 +328,23 @@ impl PreflightCache { for key in keys { self.cache.remove(key); + if self.policy == EvictionPolicy::Lru { + self.lru_list.lock().remove(key); + } } } - /// Evict one key from the cache in amortized O(1) time. - /// - /// The queue may contain stale duplicate keys; those are skipped until - /// a currently present key is removed. + /// Evict one key from the cache. fn evict_one(&self) { - let mut queue = self.eviction_queue.lock(); + match self.policy { + EvictionPolicy::Fifo => self.evict_fifo(), + EvictionPolicy::Lru => self.evict_lru(), + } + } + + /// FIFO eviction: pop from front of queue, skip stale duplicates. + fn evict_fifo(&self) { + let mut queue = self.fifo_queue.lock(); while let Some(key) = queue.pop_front() { if self.cache.remove(&key).is_some() { break; @@ -145,6 +352,14 @@ impl PreflightCache { } } + /// LRU eviction: evict the tail (least-recently-used). + fn evict_lru(&self) { + let mut lru = self.lru_list.lock(); + if let Some(key) = lru.evict_tail() { + self.cache.remove(&key); + } + } + /// Clear all entries from the cache. pub fn clear(&self) { if !self.enabled { @@ -152,7 +367,8 @@ impl PreflightCache { } self.cache.clear(); - self.eviction_queue.lock().clear(); + self.fifo_queue.lock().clear(); + self.lru_list.lock().clear(); } /// Get cache statistics. @@ -161,6 +377,7 @@ impl PreflightCache { size: self.cache.len(), capacity: self.capacity, enabled: self.enabled, + policy: self.policy, } } } @@ -174,6 +391,8 @@ pub struct CacheStats { pub capacity: usize, /// Whether the cache is enabled pub enabled: bool, + /// Active eviction policy + pub policy: EvictionPolicy, } #[cfg(test)] @@ -324,4 +543,73 @@ mod tests { assert!(stats.size > 0); assert!(stats.size <= stats.capacity); } + + // ----------------------------------------------------------------------- + // LRU-specific tests + // ----------------------------------------------------------------------- + + #[test] + fn test_lru_eviction_keeps_hot_keys() { + let cache = PreflightCache::with_policy(3, 60, true, EvictionPolicy::Lru); + + cache.insert(b"a".to_vec(), CachedValue::Some(b"1".to_vec())); + cache.insert(b"b".to_vec(), CachedValue::Some(b"2".to_vec())); + cache.insert(b"c".to_vec(), CachedValue::Some(b"3".to_vec())); + + // Read "a" to make it most-recently-used + assert!(cache.get(b"a").is_some()); + + // Insert "d" — should evict "b" (least-recently-used), NOT "a" + cache.insert(b"d".to_vec(), CachedValue::Some(b"4".to_vec())); + + assert!(cache.get(b"a").is_some(), "Hot key 'a' should survive"); + assert!(cache.get(b"b").is_none(), "'b' should be evicted (LRU)"); + assert!(cache.get(b"c").is_some()); + assert!(cache.get(b"d").is_some()); + } + + #[test] + fn test_lru_eviction_order() { + let cache = PreflightCache::with_policy(2, 60, true, EvictionPolicy::Lru); + + cache.insert(b"x".to_vec(), CachedValue::Some(b"1".to_vec())); + cache.insert(b"y".to_vec(), CachedValue::Some(b"2".to_vec())); + + // Both present + assert!(cache.get(b"x").is_some()); + assert!(cache.get(b"y").is_some()); + + // Insert "z" — "x" was read before "y", so "x" is LRU after both + // reads. But "y" was read after "x" above, so the actual LRU is "x". + // Wait—let's be explicit: read order was x then y. After reads: + // MRU → y → x → LRU. So evicting should remove "x". + cache.insert(b"z".to_vec(), CachedValue::Some(b"3".to_vec())); + + assert!(cache.get(b"x").is_none(), "'x' should be evicted"); + assert!(cache.get(b"y").is_some()); + assert!(cache.get(b"z").is_some()); + } + + #[test] + fn test_lru_invalidation() { + let cache = PreflightCache::with_policy(100, 60, true, EvictionPolicy::Lru); + + cache.insert(b"a".to_vec(), CachedValue::Some(b"1".to_vec())); + cache.insert(b"b".to_vec(), CachedValue::Some(b"2".to_vec())); + + cache.invalidate_keys(&[b"a".to_vec()]); + + assert!(cache.get(b"a").is_none()); + assert!(cache.get(b"b").is_some()); + assert_eq!(cache.stats().size, 1); + } + + #[test] + fn test_lru_stats_show_policy() { + let fifo = PreflightCache::new(10, 60, true); + assert_eq!(fifo.stats().policy, EvictionPolicy::Fifo); + + let lru = PreflightCache::with_policy(10, 60, true, EvictionPolicy::Lru); + assert_eq!(lru.stats().policy, EvictionPolicy::Lru); + } } diff --git a/crates/azoth-vector/tests/integration_test.rs b/crates/azoth-vector/tests/integration_test.rs index a09d386..a0a867c 100644 --- a/crates/azoth-vector/tests/integration_test.rs +++ b/crates/azoth-vector/tests/integration_test.rs @@ -82,7 +82,8 @@ fn download_extension_macos() -> Option { let _ = fs::create_dir_all(&out_dir); let resp = ureq::get(ZIP_URL).call().ok()?; - let zip_bytes: Vec = resp.into_reader().bytes().filter_map(|b| b.ok()).collect(); + let mut zip_bytes = Vec::new(); + std::io::Read::read_to_end(&mut resp.into_reader(), &mut zip_bytes).ok()?; let mut archive = zip::ZipArchive::new(std::io::Cursor::new(zip_bytes)).ok()?; for i in 0..archive.len() { @@ -242,7 +243,7 @@ mod with_extension { ) .unwrap(); - let vectors = vec![ + let vectors = [ (Vector::new(vec![1.0, 0.0, 0.0]), "x-axis"), (Vector::new(vec![0.0, 1.0, 0.0]), "y-axis"), (Vector::new(vec![0.0, 0.0, 1.0]), "z-axis"), @@ -319,7 +320,7 @@ mod with_extension { ) .unwrap(); - let items = vec![ + let items = [ (vec![1.0, 0.0, 0.0], "electronics", 1), (vec![0.9, 0.1, 0.0], "electronics", 1), (vec![0.0, 1.0, 0.0], "books", 1), @@ -387,7 +388,7 @@ mod with_extension { ) .unwrap(); - let vectors = vec![ + let vectors = [ vec![1.0, 0.0, 0.0], vec![0.5, 0.5, 0.0], vec![0.0, 1.0, 0.0], diff --git a/crates/azoth/Cargo.toml b/crates/azoth/Cargo.toml index af5e40b..d95e7b5 100644 --- a/crates/azoth/Cargo.toml +++ b/crates/azoth/Cargo.toml @@ -58,7 +58,9 @@ alloy-transport-http = { version = "0.6", optional = true } [features] default = [] -onchain = ["alloy-primitives", "alloy-provider", "alloy-network", "alloy-rpc-types", "alloy-signer", "alloy-signer-local", "alloy-sol-types", "alloy-transport-http"] +alloy = ["alloy-primitives"] +observe = ["azoth-core/observe"] +onchain = ["alloy", "alloy-provider", "alloy-network", "alloy-rpc-types", "alloy-signer", "alloy-signer-local", "alloy-sol-types", "alloy-transport-http"] [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/azoth/benches/basic_benchmark.rs b/crates/azoth/benches/basic_benchmark.rs index 7578775..f29d51c 100644 --- a/crates/azoth/benches/basic_benchmark.rs +++ b/crates/azoth/benches/basic_benchmark.rs @@ -82,6 +82,7 @@ fn main() { let count = 10_000; for i in 0..count { Transaction::new(&db) + .keys(vec![b"counter".to_vec()]) .execute(|ctx| { ctx.set(b"counter", &TypedValue::I64(i))?; ctx.log_bytes(b"increment")?; diff --git a/crates/azoth/benches/preflight_cache_benchmark.rs b/crates/azoth/benches/preflight_cache_benchmark.rs index c83a698..c7153bf 100644 --- a/crates/azoth/benches/preflight_cache_benchmark.rs +++ b/crates/azoth/benches/preflight_cache_benchmark.rs @@ -18,6 +18,7 @@ fn create_db_with_cache(path: &std::path::Path, cache_enabled: bool) -> AzothDb fn benchmark_hot_key_reads(db: &AzothDb, iterations: usize) -> std::time::Duration { // Initialize a hot key Transaction::new(db) + .keys(vec![b"hot_key".to_vec()]) .execute(|ctx| { ctx.set(b"hot_key", &TypedValue::I64(1000))?; Ok(()) @@ -27,16 +28,15 @@ fn benchmark_hot_key_reads(db: &AzothDb, iterations: usize) -> std::time::Durati // Benchmark reading the same key repeatedly during preflight let start = Instant::now(); for i in 0..iterations { + let counter_key = format!("counter_{}", i); Transaction::new(db) - .keys(vec![b"hot_key".to_vec()]) + .keys(vec![b"hot_key".to_vec(), counter_key.as_bytes().to_vec()]) .preflight(|ctx| { let _value = ctx.get(b"hot_key")?; Ok(()) }) .execute(|ctx| { - // Write a dummy key to make it a real transaction - let key = format!("counter_{}", i); - ctx.set(key.as_bytes(), &TypedValue::I64(i as i64))?; + ctx.set(counter_key.as_bytes(), &TypedValue::I64(i as i64))?; Ok(()) }) .unwrap(); @@ -46,7 +46,11 @@ fn benchmark_hot_key_reads(db: &AzothDb, iterations: usize) -> std::time::Durati fn benchmark_mixed_workload(db: &AzothDb, iterations: usize) -> std::time::Duration { // Initialize some keys + let init_keys: Vec> = (0..10) + .map(|i| format!("mixed_key_{}", i).into_bytes()) + .collect(); Transaction::new(db) + .keys(init_keys) .execute(|ctx| { for i in 0..10 { let key = format!("mixed_key_{}", i); @@ -61,15 +65,18 @@ fn benchmark_mixed_workload(db: &AzothDb, iterations: usize) -> std::time::Durat for i in 0..iterations { let key_idx = i % 10; // Read keys in round-robin fashion let key = format!("mixed_key_{}", key_idx); + let counter_key = format!("counter_{}", i); Transaction::new(db) - .keys(vec![key.as_bytes().to_vec()]) + .keys(vec![ + key.as_bytes().to_vec(), + counter_key.as_bytes().to_vec(), + ]) .preflight(|ctx| { let _value = ctx.get(key.as_bytes())?; Ok(()) }) .execute(|ctx| { - let counter_key = format!("counter_{}", i); ctx.set(counter_key.as_bytes(), &TypedValue::I64(i as i64))?; Ok(()) }) @@ -105,7 +112,11 @@ fn benchmark_concurrent_hot_keys( iterations_per_thread: usize, ) -> std::time::Duration { // Initialize hot keys + let init_keys: Vec> = (0..num_threads) + .map(|i| format!("thread_key_{}", i).into_bytes()) + .collect(); Transaction::new(&db) + .keys(init_keys) .execute(|ctx| { for i in 0..num_threads { let key = format!("thread_key_{}", i); @@ -124,14 +135,17 @@ fn benchmark_concurrent_hot_keys( let handle = thread::spawn(move || { let key = format!("thread_key_{}", thread_id); for i in 0..iterations_per_thread { + let counter_key = format!("counter_{}_{}", thread_id, i); Transaction::new(&db_clone) - .keys(vec![key.as_bytes().to_vec()]) + .keys(vec![ + key.as_bytes().to_vec(), + counter_key.as_bytes().to_vec(), + ]) .preflight(|ctx| { let _value = ctx.get(key.as_bytes())?; Ok(()) }) .execute(|ctx| { - let counter_key = format!("counter_{}_{}", thread_id, i); ctx.set(counter_key.as_bytes(), &TypedValue::I64(i as i64))?; Ok(()) }) diff --git a/crates/azoth/src/circuit_breaker.rs b/crates/azoth/src/circuit_breaker.rs index 8760167..5714383 100644 --- a/crates/azoth/src/circuit_breaker.rs +++ b/crates/azoth/src/circuit_breaker.rs @@ -317,11 +317,37 @@ impl CircuitBreaker { } } - async fn on_success(&self) { + /// Record a successful operation. + /// + /// Use this when the protected operation is async and can't be wrapped + /// inside [`Self::call`]. For sync operations prefer [`Self::call`] which handles + /// recording automatically. + pub async fn record_success(&self) { + self.on_success_inner().await; + } + + /// Record a failed operation. + /// + /// Use this when the protected operation is async and can't be wrapped + /// inside [`Self::call`]. For sync operations prefer [`Self::call`] which handles + /// recording automatically. + pub async fn record_failure(&self) { + self.on_failure_inner().await; + } + + async fn on_success_inner(&self) { self.metrics.record_success(); } + async fn on_success(&self) { + self.on_success_inner().await; + } + async fn on_failure(&self) { + self.on_failure_inner().await; + } + + async fn on_failure_inner(&self) { self.metrics.record_failure(); let mut state = self.state.lock().await; diff --git a/crates/azoth/src/db.rs b/crates/azoth/src/db.rs index d4f0f7e..4197320 100644 --- a/crates/azoth/src/db.rs +++ b/crates/azoth/src/db.rs @@ -119,6 +119,10 @@ impl AzothDb { /// **Deprecated**: prefer `projection_write_conn()` for writes and /// `query()` / `query_async()` for reads. This method returns the /// write connection, which contends with the projector. + #[deprecated( + since = "0.3.0", + note = "Use query()/query_async() for reads, projection_write_conn() for writes" + )] pub fn projection_connection(&self) -> &Arc> { self.projection.conn() } diff --git a/crates/azoth/src/lib.rs b/crates/azoth/src/lib.rs index 777ec42..2d0b7a3 100644 --- a/crates/azoth/src/lib.rs +++ b/crates/azoth/src/lib.rs @@ -55,7 +55,8 @@ pub use azoth_core::{ CanonicalConfig, ProjectionConfig, ProjectorConfig, ReadPoolConfig, SyncMode, SynchronousMode, }, - error::{AzothError, Result}, + error::{AzothError, Result, ResultExt}, + observe, traits::{ CanonicalReadTxn, CanonicalStore, CanonicalTxn, DecodedEvent, EventApplier, EventDecoder, EventIter, PreflightResult, ProjectionStore, ProjectionTxn, StateIter, @@ -66,7 +67,7 @@ pub use azoth_core::{ // Re-export implementations pub use azoth_lmdb::{ - LmdbCanonicalStore, LmdbReadPool, LmdbReadTxn, LmdbWriteTxn, PooledLmdbReadTxn, + EvictionPolicy, LmdbCanonicalStore, LmdbReadPool, LmdbReadTxn, LmdbWriteTxn, PooledLmdbReadTxn, }; pub use azoth_projector::{Projector, ProjectorStats}; pub use azoth_sqlite::{PooledSqliteConnection, SqliteProjectionStore, SqliteReadPool}; diff --git a/crates/azoth/src/typed_values.rs b/crates/azoth/src/typed_values.rs index a112f6b..27ca445 100644 --- a/crates/azoth/src/typed_values.rs +++ b/crates/azoth/src/typed_values.rs @@ -164,6 +164,45 @@ impl From for U256 { } } +/// Access the raw 4×u64 little-endian limbs. +impl U256 { + /// Return a reference to the inner 4×u64 words (little-endian order). + pub fn as_limbs(&self) -> &[u64; 4] { + &self.0 + } + + /// Construct from raw 4×u64 words in little-endian order. + pub fn from_limbs(limbs: [u64; 4]) -> Self { + Self(limbs) + } +} + +// --------------------------------------------------------------------------- +// alloy-primitives interop (behind "alloy" feature) +// --------------------------------------------------------------------------- +#[cfg(feature = "alloy")] +mod alloy_interop { + use super::U256; + + impl From for U256 { + /// Convert an `alloy_primitives::U256` into an azoth `U256`. + /// + /// Both representations use 4×u64 little-endian limbs, so this is a + /// zero-cost reinterpretation. + fn from(v: alloy_primitives::U256) -> Self { + let limbs: [u64; 4] = v.into_limbs(); + U256::from_limbs(limbs) + } + } + + impl From for alloy_primitives::U256 { + /// Convert an azoth `U256` back into an `alloy_primitives::U256`. + fn from(v: U256) -> Self { + alloy_primitives::U256::from_limbs(*v.as_limbs()) + } + } +} + /// 256-bit signed integer /// /// Note: Currently aliased to U256. This means signed operations are not properly @@ -922,4 +961,34 @@ mod tests { let result: Result = value.to_json(); assert!(result.is_err()); } + + #[cfg(feature = "alloy")] + mod alloy_tests { + use super::*; + + #[test] + fn test_alloy_u256_roundtrip() { + let alloy_val = alloy_primitives::U256::from(1_000_000u64); + let azoth_val: U256 = alloy_val.into(); + let back: alloy_primitives::U256 = azoth_val.into(); + assert_eq!(alloy_val, back); + } + + #[test] + fn test_alloy_u256_large_value() { + let alloy_val = alloy_primitives::U256::MAX; + let azoth_val: U256 = alloy_val.into(); + let back: alloy_primitives::U256 = azoth_val.into(); + assert_eq!(alloy_val, back); + } + + #[test] + fn test_alloy_u256_zero() { + let alloy_val = alloy_primitives::U256::ZERO; + let azoth_val: U256 = alloy_val.into(); + assert_eq!(azoth_val, U256::zero()); + let back: alloy_primitives::U256 = azoth_val.into(); + assert_eq!(alloy_val, back); + } + } }