From 6397353db58d5c41ca38bf1d8de7a9ebed82f8cd Mon Sep 17 00:00:00 2001 From: johnny Date: Fri, 13 Feb 2026 17:56:19 -0500 Subject: [PATCH 1/4] feat: add event-driven projector, connection pooling, async writes, handler registration, and WriteBatch API - Event-driven projector: FileEventLog fires tokio::sync::Notify after each append; Projector and EventProcessor await instead of polling, giving near-zero latency with zero idle CPU. - Projection read pool enabled by default (4 connections); add projection_write_conn() for explicit write access. - Native async write API: LmdbCanonicalStore::submit_write() centralizes spawn_blocking + txn lifecycle; convenience wrappers async_put_state and async_del_state. - EventProcessorBuilder::build_with_projection() opens a dedicated connection to the projection DB, removing the need to pass one in. - WriteBatch API for explicit multi-op atomic commits with commit_async support. Co-authored-by: Cursor --- crates/azoth-core/src/config/projection.rs | 15 ++- crates/azoth-file-log/Cargo.toml | 1 + crates/azoth-file-log/src/store.rs | 32 +++++ crates/azoth-lmdb/src/store.rs | 140 +++++++++++++++++++- crates/azoth-projector/Cargo.toml | 2 +- crates/azoth-projector/src/projector.rs | 39 +++++- crates/azoth-sqlite/src/store.rs | 15 ++- crates/azoth/src/db.rs | 91 +++++++++++-- crates/azoth/src/event_processor.rs | 69 +++++++++- crates/azoth/src/lib.rs | 4 +- crates/azoth/src/write_batch.rs | 144 +++++++++++++++++++++ 11 files changed, 515 insertions(+), 37 deletions(-) create mode 100644 crates/azoth/src/write_batch.rs diff --git a/crates/azoth-core/src/config/projection.rs b/crates/azoth-core/src/config/projection.rs index 389fa69..6754d62 100644 --- a/crates/azoth-core/src/config/projection.rs +++ b/crates/azoth-core/src/config/projection.rs @@ -27,11 +27,12 @@ pub struct ProjectionConfig { #[serde(default = "default_cache_size")] pub cache_size: i32, - /// Read pool configuration (optional, disabled by default) + /// Read pool configuration (enabled by default with 4 connections). /// - /// When enabled, maintains a pool of read-only connections for - /// concurrent read access without blocking writes. - #[serde(default)] + /// Maintains a pool of read-only connections for concurrent read access + /// without blocking writes. Disable with `ReadPoolConfig::default()` if + /// you need single-connection behavior. + #[serde(default = "default_read_pool")] pub read_pool: ReadPoolConfig, } @@ -58,6 +59,10 @@ fn default_cache_size() -> i32 { -64000 // 64MB } +fn default_read_pool() -> ReadPoolConfig { + ReadPoolConfig::enabled(4) +} + impl ProjectionConfig { pub fn new(path: PathBuf) -> Self { Self { @@ -66,7 +71,7 @@ impl ProjectionConfig { synchronous: SynchronousMode::default(), schema_version: default_schema_version(), cache_size: default_cache_size(), - read_pool: ReadPoolConfig::default(), + read_pool: default_read_pool(), } } diff --git a/crates/azoth-file-log/Cargo.toml b/crates/azoth-file-log/Cargo.toml index 88f7041..0b8154a 100644 --- a/crates/azoth-file-log/Cargo.toml +++ b/crates/azoth-file-log/Cargo.toml @@ -18,6 +18,7 @@ serde_json = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } parking_lot = { workspace = true } +tokio = { workspace = true, features = ["sync"] } [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/azoth-file-log/src/store.rs b/crates/azoth-file-log/src/store.rs index ba899fc..12d8d70 100644 --- a/crates/azoth-file-log/src/store.rs +++ b/crates/azoth-file-log/src/store.rs @@ -12,6 +12,7 @@ use std::sync::{ atomic::{AtomicU64, Ordering}, Arc, }; +use tokio::sync::Notify; /// Configuration for file-based event log #[derive(Debug, Clone)] @@ -81,6 +82,12 @@ pub struct FileEventLog { next_event_id: Arc, writer: Arc>>, current_file_num: Arc, + /// Optional notifier that fires after each successful event append. + /// + /// Enables push-based projection: the projector can `notify.notified().await` + /// instead of polling, giving near-zero-latency event processing with zero + /// CPU waste when idle. + event_notify: Option>, } impl FileEventLog { @@ -119,9 +126,24 @@ impl FileEventLog { next_event_id, writer, current_file_num, + event_notify: None, }) } + /// Set the event notification handle. + /// + /// When set, `notify_waiters()` is called after every successful + /// `append_with_id` / `append_batch_with_ids`, waking any task that + /// is awaiting `notified()`. + pub fn set_event_notify(&mut self, notify: Arc) { + self.event_notify = Some(notify); + } + + /// Get a clone of the event notification handle (if set). + pub fn event_notify(&self) -> Option> { + self.event_notify.clone() + } + /// Get path to a log file by number fn log_file_path(base_dir: &Path, file_num: u64) -> PathBuf { base_dir.join(format!("events-{:08}.log", file_num)) @@ -257,6 +279,11 @@ impl EventLog for FileEventLog { // Check for rotation self.check_rotation()?; + // Wake any waiting projectors / event processors + if let Some(notify) = &self.event_notify { + notify.notify_waiters(); + } + Ok(()) } @@ -346,6 +373,11 @@ impl EventLog for FileEventLog { // Check for rotation self.check_rotation()?; + // Wake any waiting projectors / event processors + if let Some(notify) = &self.event_notify { + notify.notify_waiters(); + } + Ok(()) } diff --git a/crates/azoth-lmdb/src/store.rs b/crates/azoth-lmdb/src/store.rs index f4c4707..322cc3c 100644 --- a/crates/azoth-lmdb/src/store.rs +++ b/crates/azoth-lmdb/src/store.rs @@ -2,8 +2,8 @@ use azoth_core::{ error::{AzothError, Result}, event_log::{EventLog, EventLogIterator}, lock_manager::LockManager, - traits::{self, CanonicalStore, EventIter, StateIter}, - types::{BackupInfo, CanonicalMeta, EventId}, + traits::{self, CanonicalStore, CanonicalTxn, EventIter, StateIter}, + types::{BackupInfo, CanonicalMeta, CommitInfo, EventId}, CanonicalConfig, }; use azoth_file_log::{FileEventLog, FileEventLogConfig}; @@ -15,6 +15,7 @@ use std::sync::{ Arc, }; use std::time::Duration; +use tokio::sync::Notify; use crate::keys::meta_keys; use crate::preflight_cache::PreflightCache; @@ -52,6 +53,12 @@ pub struct LmdbCanonicalStore { txn_counter: Arc, preflight_cache: Arc, read_pool: Option>, + /// Shared notifier that fires after each successful event append. + /// + /// Created during `open()` and injected into the `FileEventLog`. + /// Consumers (projector, event processor) can clone this to await + /// new events without polling. + event_notify: Arc, } impl LmdbCanonicalStore { @@ -160,14 +167,17 @@ impl CanonicalStore for LmdbCanonicalStore { cfg.preflight_cache_enabled, )); - // Initialize file-based event log + // Initialize file-based event log with push-based notification + let event_notify = Arc::new(Notify::new()); let event_log_config = FileEventLogConfig { base_dir: cfg.path.join("event-log"), max_event_size: cfg.event_max_size_bytes, max_batch_bytes: cfg.event_batch_max_bytes, ..Default::default() }; - let event_log = Arc::new(FileEventLog::open(event_log_config)?); + let mut event_log = FileEventLog::open(event_log_config)?; + event_log.set_event_notify(event_notify.clone()); + let event_log = Arc::new(event_log); // Initialize metadata if needed { @@ -235,6 +245,7 @@ impl CanonicalStore for LmdbCanonicalStore { txn_counter: Arc::new(AtomicUsize::new(0)), preflight_cache, read_pool, + event_notify, }) } @@ -544,6 +555,15 @@ impl LmdbCanonicalStore { &self.event_log } + /// Get the shared event notification handle. + /// + /// This `Notify` fires after every successful event append. Consumers + /// (projectors, event processors) can call `notify.notified().await` to + /// wake immediately when new events are available, eliminating polling. + pub fn event_notify(&self) -> Arc { + self.event_notify.clone() + } + /// Get reference to the preflight cache /// /// This allows access to cache statistics and manual cache operations. @@ -558,6 +578,118 @@ impl LmdbCanonicalStore { self.read_pool.as_ref() } + // ---- Native async write API ---- + + /// Execute a write operation asynchronously. + /// + /// Opens a write transaction, passes it to the closure, and commits + /// atomically -- all inside `spawn_blocking`. This centralizes the + /// `spawn_blocking` boilerplate that otherwise appears at every call site. + /// + /// The closure receives a mutable reference to an `LmdbWriteTxn` and + /// can perform any combination of `put_state`, `del_state`, + /// `append_event`, etc. The return value `R` is forwarded to the caller. + /// + /// # Example + /// ```ignore + /// let info = store.submit_write(|txn| { + /// txn.put_state(b"key", b"value")?; + /// txn.append_event(b"{\"type\":\"put\"}")?; + /// Ok(()) + /// }).await?; + /// ``` + pub async fn submit_write(&self, f: F) -> Result + where + F: for<'a> FnOnce(&mut LmdbWriteTxn<'a>) -> Result + Send + 'static, + R: Send + 'static, + { + let env = self.env.clone(); + let state_db = self.state_db; + let meta_db = self.meta_db; + let event_log = self.event_log.clone(); + let txn_counter = self.txn_counter.clone(); + let preflight_cache = self.preflight_cache.clone(); + let paused = self.paused.clone(); + + tokio::task::spawn_blocking(move || { + // Check if paused + if paused.load(Ordering::SeqCst) { + return Err(AzothError::Paused); + } + + // Increment transaction counter + txn_counter.fetch_add(1, Ordering::SeqCst); + + let rw_txn = match env.begin_rw_txn() { + Ok(t) => t, + Err(e) => { + txn_counter.fetch_sub(1, Ordering::SeqCst); + return Err(AzothError::Transaction(e.to_string())); + } + }; + + let mut write_txn = LmdbWriteTxn::new( + rw_txn, + state_db, + meta_db, + event_log, + Arc::downgrade(&txn_counter), + preflight_cache, + ); + + let result = f(&mut write_txn)?; + write_txn.commit()?; + Ok(result) + }) + .await + .map_err(|e| AzothError::Internal(format!("Task join error: {}", e)))? + } + + /// Put a single key-value pair asynchronously and commit. + /// + /// Convenience wrapper around [`submit_write`] for the common case of + /// writing a single key without an event. + pub async fn async_put_state( + &self, + key: &[u8], + value: &[u8], + ) -> Result { + let key = key.to_vec(); + let value = value.to_vec(); + self.submit_write(move |txn| { + txn.put_state(&key, &value)?; + Ok(()) + }) + .await?; + // submit_write commits; return a minimal CommitInfo + Ok(CommitInfo { + events_written: 0, + first_event_id: None, + last_event_id: None, + state_keys_written: 1, + state_keys_deleted: 0, + }) + } + + /// Delete a single key asynchronously and commit. + /// + /// Convenience wrapper around [`submit_write`]. + pub async fn async_del_state(&self, key: &[u8]) -> Result { + let key = key.to_vec(); + self.submit_write(move |txn| { + txn.del_state(&key)?; + Ok(()) + }) + .await?; + Ok(CommitInfo { + events_written: 0, + first_event_id: None, + last_event_id: None, + state_keys_written: 0, + state_keys_deleted: 1, + }) + } + /// Check if read pooling is enabled pub fn has_read_pool(&self) -> bool { self.read_pool.is_some() diff --git a/crates/azoth-projector/Cargo.toml b/crates/azoth-projector/Cargo.toml index 9787dbe..14ad14a 100644 --- a/crates/azoth-projector/Cargo.toml +++ b/crates/azoth-projector/Cargo.toml @@ -17,7 +17,7 @@ serde = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } anyhow = { workspace = true } -tokio = { workspace = true, features = ["time"] } +tokio = { workspace = true, features = ["time", "sync"] } [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/azoth-projector/src/projector.rs b/crates/azoth-projector/src/projector.rs index dc04a6e..f788e4e 100644 --- a/crates/azoth-projector/src/projector.rs +++ b/crates/azoth-projector/src/projector.rs @@ -7,6 +7,7 @@ use azoth_core::{ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; +use tokio::sync::Notify; /// Projector: consumes events from canonical store and applies to projection pub struct Projector @@ -18,6 +19,10 @@ where projection: Arc

, config: ProjectorConfig, shutdown: Arc, + /// When set, the projector awaits this notification instead of polling. + /// The `FileEventLog` fires `notify_waiters()` after every successful append, + /// giving near-zero-latency projection with zero CPU waste when idle. + event_notify: Option>, } impl Projector @@ -31,9 +36,20 @@ where projection, config, shutdown: Arc::new(AtomicBool::new(false)), + event_notify: None, } } + /// Attach an event notification handle for push-based projection. + /// + /// When set, `run_continuous()` awaits this notification instead of + /// sleeping for `poll_interval_ms` when caught up, giving near-zero + /// latency event processing with zero idle CPU usage. + pub fn with_event_notify(mut self, notify: Arc) -> Self { + self.event_notify = Some(notify); + self + } + /// Run one iteration of the projector loop pub fn run_once(&self) -> Result { let start = Instant::now(); @@ -100,15 +116,30 @@ where }) } - /// Run the projector continuously until shutdown + /// Run the projector continuously until shutdown. + /// + /// When an `event_notify` handle is set (via [`with_event_notify`]), + /// the projector awaits the notification instead of polling, giving + /// near-zero-latency projection with zero CPU waste when idle. + /// Falls back to `poll_interval_ms` sleep if no notifier is present. pub async fn run_continuous(&self) -> Result<()> { while !self.shutdown.load(Ordering::SeqCst) { match self.run_once() { Ok(stats) => { if stats.events_applied == 0 { - // Caught up, sleep briefly - tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms)) - .await; + // Caught up -- wait for new events + if let Some(notify) = &self.event_notify { + // Push-based: await notification from event log + // Use tokio::select! so we also wake on shutdown + tokio::select! { + _ = notify.notified() => {} + _ = tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms)) => {} + } + } else { + // Legacy polling fallback + tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms)) + .await; + } } else { tracing::debug!( "Applied {} events, {} bytes in {:?}", diff --git a/crates/azoth-sqlite/src/store.rs b/crates/azoth-sqlite/src/store.rs index 9b06d49..30d9eba 100644 --- a/crates/azoth-sqlite/src/store.rs +++ b/crates/azoth-sqlite/src/store.rs @@ -33,14 +33,23 @@ pub struct SqliteProjectionStore { } impl SqliteProjectionStore { - /// Get the underlying write connection (for migrations and custom queries) + /// Get the underlying write connection (for migrations and custom queries). /// - /// Returns an Arc to the Mutex-protected SQLite connection. - /// Users should lock the mutex to access the connection. + /// **Prefer `write_conn()`** for new code -- this method exists for + /// backwards compatibility. pub fn conn(&self) -> &Arc> { &self.write_conn } + /// Get the write connection explicitly. + /// + /// Use this for migrations, DDL, projector event application, and any + /// other operations that mutate the projection database. Reads should + /// go through `read_pool()`, `query()`, or `query_async()` instead. + pub fn write_conn(&self) -> &Arc> { + &self.write_conn + } + /// Open a read-only connection to the same database fn open_read_connection(path: &Path, cfg: &ProjectionConfig) -> Result { let conn = Connection::open_with_flags( diff --git a/crates/azoth/src/db.rs b/crates/azoth/src/db.rs index d3c37fd..5eef083 100644 --- a/crates/azoth/src/db.rs +++ b/crates/azoth/src/db.rs @@ -51,9 +51,11 @@ impl AzothDb { let canonical = Arc::new(LmdbCanonicalStore::open(canonical_config)?); let projection = Arc::new(SqliteProjectionStore::open(projection_config)?); - // Create projector + // Create projector with push-based notification from the event log + let event_notify = canonical.event_notify(); let projector_config = ProjectorConfig::default(); - let projector = Projector::new(canonical.clone(), projection.clone(), projector_config); + let projector = Projector::new(canonical.clone(), projection.clone(), projector_config) + .with_event_notify(event_notify); Ok(Self { canonical, @@ -72,7 +74,11 @@ impl AzothDb { ) -> Result { let canonical = Arc::new(LmdbCanonicalStore::open(canonical_config)?); let projection = Arc::new(SqliteProjectionStore::open(projection_config)?); - let projector = Projector::new(canonical.clone(), projection.clone(), projector_config); + + // Create projector with push-based notification from the event log + let event_notify = canonical.event_notify(); + let projector = Projector::new(canonical.clone(), projection.clone(), projector_config) + .with_event_notify(event_notify); Ok(Self { canonical, @@ -92,21 +98,27 @@ impl AzothDb { &self.projection } - /// Get the underlying SQLite connection for the projection store - /// - /// This returns a shared reference to the Mutex-protected connection, - /// allowing direct SQL access when needed (e.g., for legacy code that - /// expects `Arc>`). + /// Get the projection **write** connection. /// - /// For new code, prefer using `query()`, `execute()`, or `transaction()` - /// methods which provide a safer closure-based API. + /// Use this for migrations, DDL, and projector event application. + /// For read-only queries, prefer `query()` / `query_async()` which + /// automatically use the read pool for concurrent access. /// /// # Example /// ```ignore - /// let conn = db.projection_connection(); + /// let conn = db.projection_write_conn(); /// let guard = conn.lock(); /// guard.execute("INSERT INTO ...", params![])?; /// ``` + pub fn projection_write_conn(&self) -> &Arc> { + self.projection.write_conn() + } + + /// Get the underlying SQLite connection for the projection store. + /// + /// **Deprecated**: prefer `projection_write_conn()` for writes and + /// `query()` / `query_async()` for reads. This method returns the + /// write connection, which contends with the projector. pub fn projection_connection(&self) -> &Arc> { self.projection.conn() } @@ -116,6 +128,15 @@ impl AzothDb { &self.projector } + /// Get the shared event notification handle. + /// + /// This `Notify` fires after every successful event append. Consumers + /// (event processors, custom projectors) can call `notify.notified().await` + /// to wake immediately when new events are available, eliminating polling. + pub fn event_notify(&self) -> std::sync::Arc { + self.canonical.event_notify() + } + /// Get the base path pub fn base_path(&self) -> &Path { &self.base_path @@ -233,12 +254,14 @@ impl AzothDb { projection_config, )?); - // Create projector + // Create projector with push-based notification + let event_notify = canonical.event_notify(); let projector = Projector::new( canonical.clone(), projection.clone(), ProjectorConfig::default(), - ); + ) + .with_event_notify(event_notify); Ok(Self { canonical, @@ -338,6 +361,48 @@ impl AzothDb { self.projection.transaction_async(f).await } + /// Execute a write operation on the canonical store asynchronously. + /// + /// Opens a write transaction, passes it to the closure, and commits + /// atomically -- all inside `spawn_blocking`. This replaces ad-hoc + /// `spawn_blocking` wrappers at each call site. + /// + /// # Example + /// ```ignore + /// use azoth_core::traits::CanonicalTxn; + /// + /// db.submit_write(|txn| { + /// txn.put_state(b"balance", b"100")?; + /// txn.append_event(b"{\"type\":\"deposit\",\"amount\":100}")?; + /// Ok(()) + /// }).await?; + /// ``` + pub async fn submit_write(&self, f: F) -> Result + where + F: for<'a> FnOnce( + &mut azoth_lmdb::txn::LmdbWriteTxn<'a>, + ) -> Result + + Send + + 'static, + R: Send + 'static, + { + self.canonical.submit_write(f).await + } + + /// Create a new write batch for atomic multi-operation commits. + /// + /// # Example + /// ```ignore + /// let mut batch = db.write_batch(); + /// batch.put(b"key1", b"value1"); + /// batch.put(b"key2", b"value2"); + /// batch.append_event(b"batch_update:{\"keys\":2}"); + /// let info = batch.commit()?; + /// ``` + pub fn write_batch(&self) -> crate::WriteBatch<'_> { + crate::WriteBatch::new(self) + } + /// Scan state keys with a prefix /// /// Returns an iterator over (key, value) pairs with keys starting with the prefix. diff --git a/crates/azoth/src/event_processor.rs b/crates/azoth/src/event_processor.rs index 36c4bcd..58a7e39 100644 --- a/crates/azoth/src/event_processor.rs +++ b/crates/azoth/src/event_processor.rs @@ -11,6 +11,7 @@ use rusqlite::Connection; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Notify; /// Error handling strategy for failed event processing #[allow(clippy::type_complexity)] @@ -117,8 +118,13 @@ impl EventProcessorBuilder { self } - /// Build the event processor + /// Build the event processor with an externally-provided connection. + /// + /// Automatically wires event-driven notification from the database, + /// so the processor wakes immediately when new events arrive instead + /// of blind-polling. pub fn build(self, conn: Arc) -> EventProcessor { + let event_notify = self.db.event_notify(); EventProcessor { db: self.db, conn, @@ -129,8 +135,46 @@ impl EventProcessorBuilder { dlq: self.dlq, shutdown: Arc::new(AtomicBool::new(false)), cursor: 0, + event_notify: Some(event_notify), } } + + /// Build the event processor using a dedicated connection to the + /// projection database. + /// + /// Opens a **new** read-write SQLite connection to the same projection + /// database file, so handlers can INSERT/UPDATE projection tables without + /// contending with the projector's own write connection or the read pool. + /// + /// This is the recommended builder when you don't need a custom or + /// external connection. + /// + /// # Example + /// ```ignore + /// let mut processor = EventProcessor::builder(db) + /// .with_handler(Box::new(MyHandler)) + /// .build_with_projection()?; + /// + /// processor.run().await?; + /// ``` + pub fn build_with_projection(self) -> Result { + let path = self.db.projection().db_path().to_path_buf(); + let conn = Connection::open(&path).map_err(|e| { + crate::AzothError::Projection(format!( + "Failed to open handler connection to {}: {}", + path.display(), + e + )) + })?; + + // Match WAL mode and pragmas for consistency + conn.pragma_update(None, "journal_mode", "WAL") + .map_err(|e| crate::AzothError::Projection(e.to_string()))?; + conn.pragma_update(None, "foreign_keys", "ON") + .map_err(|e| crate::AzothError::Projection(e.to_string()))?; + + Ok(self.build(Arc::new(conn))) + } } /// Continuous event processor @@ -147,6 +191,9 @@ pub struct EventProcessor { dlq: Option>, shutdown: Arc, cursor: u64, + /// Push-based notification from the event log. + /// When set, `run()` awaits this instead of polling when caught up. + event_notify: Option>, } impl EventProcessor { @@ -155,10 +202,13 @@ impl EventProcessor { EventProcessorBuilder::new(db) } - /// Run the processor until shutdown is signaled + /// Run the processor until shutdown is signaled. /// - /// This is an async function that runs continuously, polling for new - /// events and processing them through registered handlers. + /// When the event-driven notifier is active (default when built via + /// [`EventProcessorBuilder::build`]), the processor awaits new-event + /// notifications instead of polling, giving near-zero latency with + /// zero CPU waste when idle. Falls back to `poll_interval` sleep + /// if no notifier is present. pub async fn run(&mut self) -> Result<()> { tracing::info!("Event processor started"); @@ -166,8 +216,15 @@ impl EventProcessor { let processed = self.process_batch()?; if processed == 0 { - // No events, sleep before polling again - tokio::time::sleep(self.poll_interval).await; + // Caught up -- wait for new events + if let Some(notify) = &self.event_notify { + tokio::select! { + _ = notify.notified() => {} + _ = tokio::time::sleep(self.poll_interval) => {} + } + } else { + tokio::time::sleep(self.poll_interval).await; + } } } diff --git a/crates/azoth/src/lib.rs b/crates/azoth/src/lib.rs index 1cf2afc..7787693 100644 --- a/crates/azoth/src/lib.rs +++ b/crates/azoth/src/lib.rs @@ -47,6 +47,7 @@ pub mod prelude; pub mod recovery_file; pub mod transaction; pub mod typed_values; +pub mod write_batch; // Re-export core types pub use azoth_core::{ @@ -64,7 +65,7 @@ pub use azoth_core::{ }; // Re-export implementations -pub use azoth_lmdb::{LmdbCanonicalStore, LmdbReadPool, LmdbReadTxn, PooledLmdbReadTxn}; +pub use azoth_lmdb::{LmdbCanonicalStore, LmdbReadPool, LmdbReadTxn, LmdbWriteTxn, PooledLmdbReadTxn}; pub use azoth_projector::{Projector, ProjectorStats}; pub use azoth_sqlite::{PooledSqliteConnection, SqliteProjectionStore, SqliteReadPool}; @@ -100,3 +101,4 @@ pub use transaction::{ MAX_DECLARED_KEYS, }; pub use typed_values::{Array, Set, TypedValue, I256, U256}; +pub use write_batch::WriteBatch; diff --git a/crates/azoth/src/write_batch.rs b/crates/azoth/src/write_batch.rs new file mode 100644 index 0000000..fce226d --- /dev/null +++ b/crates/azoth/src/write_batch.rs @@ -0,0 +1,144 @@ +//! First-class write batch API for atomic multi-operation commits. +//! +//! `WriteBatch` collects state mutations and event appends, then applies +//! them in a single LMDB transaction on [`commit`](WriteBatch::commit). +//! This makes batch intent explicit and enables future Azoth-level +//! optimizations (e.g. deferred commit coalescing). +//! +//! # Example +//! +//! ```ignore +//! use azoth::WriteBatch; +//! +//! let mut batch = WriteBatch::new(&db); +//! batch.put(b"user:1:name", b"Alice"); +//! batch.put(b"user:1:balance", b"100"); +//! batch.append_event(b"user_created:{\"id\":1}"); +//! let info = batch.commit()?; +//! +//! // Or commit asynchronously (wraps spawn_blocking): +//! let info = batch.commit_async().await?; +//! ``` + +use crate::{AzothDb, Result}; +use azoth_core::traits::{CanonicalStore, CanonicalTxn}; +use azoth_core::types::CommitInfo; + +/// A buffered operation in the batch. +enum BatchOp { + Put { key: Vec, value: Vec }, + Delete { key: Vec }, + AppendEvent { data: Vec }, +} + +/// Collects state mutations and event appends, then commits them +/// atomically in a single LMDB transaction. +/// +/// Operations are buffered in memory until [`commit`](Self::commit) or +/// [`commit_async`](Self::commit_async) is called. If the batch is +/// dropped without committing, no changes are applied. +pub struct WriteBatch<'a> { + db: &'a AzothDb, + ops: Vec, +} + +impl<'a> WriteBatch<'a> { + /// Create a new empty write batch. + pub fn new(db: &'a AzothDb) -> Self { + Self { + db, + ops: Vec::new(), + } + } + + /// Buffer a key-value put operation. + pub fn put(&mut self, key: &[u8], value: &[u8]) -> &mut Self { + self.ops.push(BatchOp::Put { + key: key.to_vec(), + value: value.to_vec(), + }); + self + } + + /// Buffer a key deletion. + pub fn delete(&mut self, key: &[u8]) -> &mut Self { + self.ops.push(BatchOp::Delete { + key: key.to_vec(), + }); + self + } + + /// Buffer an event append. + pub fn append_event(&mut self, data: &[u8]) -> &mut Self { + self.ops.push(BatchOp::AppendEvent { + data: data.to_vec(), + }); + self + } + + /// Return the number of buffered operations. + pub fn len(&self) -> usize { + self.ops.len() + } + + /// Return `true` if the batch contains no operations. + pub fn is_empty(&self) -> bool { + self.ops.is_empty() + } + + /// Apply all buffered operations in a single LMDB transaction. + /// + /// Returns [`CommitInfo`] with statistics about the committed + /// transaction. If any operation fails, the entire batch is rolled + /// back (LMDB's standard atomic commit guarantee). + pub fn commit(self) -> Result { + let mut txn = self.db.canonical().write_txn()?; + + for op in &self.ops { + match op { + BatchOp::Put { key, value } => { + txn.put_state(key, value)?; + } + BatchOp::Delete { key } => { + txn.del_state(key)?; + } + BatchOp::AppendEvent { data } => { + txn.append_event(data)?; + } + } + } + + txn.commit() + } + + /// Apply all buffered operations asynchronously. + /// + /// Wraps [`commit`](Self::commit) inside `spawn_blocking` so it is + /// safe to call from an async context without blocking the runtime. + pub async fn commit_async(self) -> Result { + let ops = self.ops; + let canonical = self.db.canonical().clone(); + + tokio::task::spawn_blocking(move || { + let mut txn = canonical.write_txn()?; + + for op in &ops { + match op { + BatchOp::Put { key, value } => { + txn.put_state(key, value)?; + } + BatchOp::Delete { key } => { + txn.del_state(key)?; + } + BatchOp::AppendEvent { data } => { + txn.append_event(data)?; + } + } + } + + txn.commit() + }) + .await + .map_err(|e| azoth_core::error::AzothError::Internal(format!("Task join error: {}", e)))? + } +} From 50bd968b394d69b53e2374372e31b0ff7010fc47 Mon Sep 17 00:00:00 2001 From: johnny Date: Fri, 13 Feb 2026 17:57:31 -0500 Subject: [PATCH 2/4] format --- crates/azoth-lmdb/src/store.rs | 6 +----- crates/azoth/src/db.rs | 6 +----- crates/azoth/src/lib.rs | 4 +++- crates/azoth/src/write_batch.rs | 4 +--- 4 files changed, 6 insertions(+), 14 deletions(-) diff --git a/crates/azoth-lmdb/src/store.rs b/crates/azoth-lmdb/src/store.rs index 322cc3c..46dd90f 100644 --- a/crates/azoth-lmdb/src/store.rs +++ b/crates/azoth-lmdb/src/store.rs @@ -649,11 +649,7 @@ impl LmdbCanonicalStore { /// /// Convenience wrapper around [`submit_write`] for the common case of /// writing a single key without an event. - pub async fn async_put_state( - &self, - key: &[u8], - value: &[u8], - ) -> Result { + pub async fn async_put_state(&self, key: &[u8], value: &[u8]) -> Result { let key = key.to_vec(); let value = value.to_vec(); self.submit_write(move |txn| { diff --git a/crates/azoth/src/db.rs b/crates/azoth/src/db.rs index 5eef083..d4f0f7e 100644 --- a/crates/azoth/src/db.rs +++ b/crates/azoth/src/db.rs @@ -379,11 +379,7 @@ impl AzothDb { /// ``` pub async fn submit_write(&self, f: F) -> Result where - F: for<'a> FnOnce( - &mut azoth_lmdb::txn::LmdbWriteTxn<'a>, - ) -> Result - + Send - + 'static, + F: for<'a> FnOnce(&mut azoth_lmdb::txn::LmdbWriteTxn<'a>) -> Result + Send + 'static, R: Send + 'static, { self.canonical.submit_write(f).await diff --git a/crates/azoth/src/lib.rs b/crates/azoth/src/lib.rs index 7787693..777ec42 100644 --- a/crates/azoth/src/lib.rs +++ b/crates/azoth/src/lib.rs @@ -65,7 +65,9 @@ pub use azoth_core::{ }; // Re-export implementations -pub use azoth_lmdb::{LmdbCanonicalStore, LmdbReadPool, LmdbReadTxn, LmdbWriteTxn, PooledLmdbReadTxn}; +pub use azoth_lmdb::{ + LmdbCanonicalStore, LmdbReadPool, LmdbReadTxn, LmdbWriteTxn, PooledLmdbReadTxn, +}; pub use azoth_projector::{Projector, ProjectorStats}; pub use azoth_sqlite::{PooledSqliteConnection, SqliteProjectionStore, SqliteReadPool}; diff --git a/crates/azoth/src/write_batch.rs b/crates/azoth/src/write_batch.rs index fce226d..ecd99df 100644 --- a/crates/azoth/src/write_batch.rs +++ b/crates/azoth/src/write_batch.rs @@ -62,9 +62,7 @@ impl<'a> WriteBatch<'a> { /// Buffer a key deletion. pub fn delete(&mut self, key: &[u8]) -> &mut Self { - self.ops.push(BatchOp::Delete { - key: key.to_vec(), - }); + self.ops.push(BatchOp::Delete { key: key.to_vec() }); self } From 8eecab0cfa76356c0392e84b3a3f8f53edfbc594 Mon Sep 17 00:00:00 2001 From: johnny Date: Fri, 13 Feb 2026 18:15:47 -0500 Subject: [PATCH 3/4] update tests --- crates/azoth/src/event_processor.rs | 52 ++++---- crates/azoth/tests/event_handler_test.rs | 149 ++++++++++++++++++++++- crates/azoth/tests/integration_test.rs | 135 ++++++++++++++++++++ crates/azoth/tests/read_pool_test.rs | 101 +++++++++++++++ 4 files changed, 414 insertions(+), 23 deletions(-) diff --git a/crates/azoth/src/event_processor.rs b/crates/azoth/src/event_processor.rs index 58a7e39..ee75bbd 100644 --- a/crates/azoth/src/event_processor.rs +++ b/crates/azoth/src/event_processor.rs @@ -134,7 +134,7 @@ impl EventProcessorBuilder { error_strategy: self.error_strategy, dlq: self.dlq, shutdown: Arc::new(AtomicBool::new(false)), - cursor: 0, + cursor: None, event_notify: Some(event_notify), } } @@ -173,6 +173,10 @@ impl EventProcessorBuilder { conn.pragma_update(None, "foreign_keys", "ON") .map_err(|e| crate::AzothError::Projection(e.to_string()))?; + // EventProcessor is intentionally !Send (single-threaded owner of + // the Connection), so Arc is used only for shared ownership within + // that thread, not for cross-thread sharing. + #[allow(clippy::arc_with_non_send_sync)] Ok(self.build(Arc::new(conn))) } } @@ -190,7 +194,9 @@ pub struct EventProcessor { error_strategy: ErrorStrategy, dlq: Option>, shutdown: Arc, - cursor: u64, + /// Last successfully processed event ID, or `None` if no events + /// have been processed yet (so event 0 is not accidentally skipped). + cursor: Option, /// Push-based notification from the event log. /// When set, `run()` awaits this instead of polling when caught up. event_notify: Option>, @@ -257,24 +263,30 @@ impl EventProcessor { pub fn process_batch(&mut self) -> Result { let canonical = self.db.as_ref().canonical(); let meta = canonical.as_ref().meta()?; - let tip = if meta.next_event_id > 0 { - meta.next_event_id - 1 - } else { - 0 - }; - if self.cursor >= tip { + // Nothing to do if no events exist at all + if meta.next_event_id == 0 { return Ok(0); } - let to = std::cmp::min(tip + 1, self.cursor + self.batch_size as u64 + 1); - let mut iter = canonical.as_ref().iter_events(self.cursor + 1, Some(to))?; + let tip = meta.next_event_id - 1; + + // Already caught up? + if let Some(c) = self.cursor { + if c >= tip { + return Ok(0); + } + } + + let start = self.cursor.map(|c| c + 1).unwrap_or(0); + let to = std::cmp::min(tip + 1, start + self.batch_size as u64); + let mut iter = canonical.as_ref().iter_events(start, Some(to))?; let mut processed = 0; while let Some((id, bytes)) = iter.next()? { match self.registry.process(self.conn.as_ref(), id, &bytes) { Ok(_) => { - self.cursor = id; + self.cursor = Some(id); processed += 1; } Err(e) => { @@ -283,7 +295,7 @@ impl EventProcessor { ErrorAction::Stop => return Err(e), ErrorAction::Skip => { tracing::warn!("Skipping event {} due to error: {}", id, e); - self.cursor = id; + self.cursor = Some(id); } ErrorAction::Retry => { tracing::info!("Will retry event {} on next batch", id); @@ -299,7 +311,7 @@ impl EventProcessor { dlq_id, e ); - self.cursor = id; // Move past the failed event + self.cursor = Some(id); } Err(dlq_err) => { tracing::error!( @@ -323,7 +335,7 @@ impl EventProcessor { } if processed > 0 { - tracing::debug!("Processed {} events (cursor: {})", processed, self.cursor); + tracing::debug!("Processed {} events (cursor: {:?})", processed, self.cursor); } Ok(processed) @@ -376,8 +388,8 @@ impl EventProcessor { } } - /// Get the current cursor position - pub fn cursor(&self) -> u64 { + /// Get the current cursor position (`None` if no events processed yet) + pub fn cursor(&self) -> Option { self.cursor } @@ -385,12 +397,8 @@ impl EventProcessor { pub fn lag(&self) -> Result { let canonical = self.db.as_ref().canonical(); let meta = canonical.as_ref().meta()?; - let tip = if meta.next_event_id > 0 { - meta.next_event_id - 1 - } else { - 0 - }; - Ok(tip.saturating_sub(self.cursor)) + let consumed = self.cursor.map(|c| c + 1).unwrap_or(0); + Ok(meta.next_event_id.saturating_sub(consumed)) } } diff --git a/crates/azoth/tests/event_handler_test.rs b/crates/azoth/tests/event_handler_test.rs index e530589..614838b 100644 --- a/crates/azoth/tests/event_handler_test.rs +++ b/crates/azoth/tests/event_handler_test.rs @@ -1,8 +1,9 @@ //! Tests for event handler system use azoth::prelude::*; -use azoth::{EventHandler, EventHandlerRegistry}; +use azoth::{EventHandler, EventHandlerRegistry, EventProcessor}; use rusqlite::Connection; +use std::sync::Arc; use tempfile::TempDir; fn create_test_db() -> (AzothDb, TempDir) { @@ -189,3 +190,149 @@ fn test_multiple_handlers() { .unwrap(); assert_eq!(balance, 120); // 100 + 50 - 30 } + +#[test] +fn test_build_with_projection_opens_connection() { + let (db, _temp) = create_test_db(); + let db = Arc::new(db); + + // build_with_projection should succeed and return a functional processor + let mut processor = EventProcessor::builder(db.clone()) + .with_handler(Box::new(DepositHandler)) + .build_with_projection() + .expect("build_with_projection should succeed"); + + // Cursor should start as None (no events processed yet) + assert_eq!(processor.cursor(), None); + + // With no events, process_batch returns 0 + assert_eq!(processor.process_batch().unwrap(), 0); +} + +#[test] +fn test_build_with_projection_processes_events() { + let (db, _temp) = create_test_db(); + let db = Arc::new(db); + + // Write some events to the canonical log + { + let mut txn = db.canonical().write_txn().unwrap(); + txn.append_event(b"deposit:100").unwrap(); + txn.append_event(b"deposit:50").unwrap(); + txn.commit().unwrap(); + } + + // Build processor using the projection shortcut + let mut processor = EventProcessor::builder(db.clone()) + .with_handler(Box::new(DepositHandler)) + .build_with_projection() + .expect("build_with_projection should succeed"); + + // Process all pending events + let processed = processor.process_batch().unwrap(); + assert_eq!(processed, 2); + + // Cursor should have advanced past both events (last processed = event 1) + assert_eq!(processor.cursor(), Some(1)); + + // No lag remaining + assert_eq!(processor.lag().unwrap(), 0); +} + +#[test] +fn test_build_with_projection_with_multiple_handlers() { + let (db, _temp) = create_test_db(); + let db = Arc::new(db); + + // Write deposit and withdraw events + { + let mut txn = db.canonical().write_txn().unwrap(); + txn.append_event(b"deposit:200").unwrap(); + txn.append_event(b"deposit:50").unwrap(); + txn.append_event(b"withdraw:75").unwrap(); + txn.commit().unwrap(); + } + + // Build with multiple handlers + let mut processor = EventProcessor::builder(db.clone()) + .with_handler(Box::new(DepositHandler)) + .with_handler(Box::new(WithdrawHandler)) + .build_with_projection() + .expect("build_with_projection should succeed"); + + let processed = processor.process_batch().unwrap(); + assert_eq!(processed, 3); + + // Query the projection database to verify handlers wrote correctly. + // build_with_projection opens its own connection to the projection DB, + // so we open a separate one to read from it. + let proj_path = db.projection().db_path().to_path_buf(); + let verify_conn = Connection::open(&proj_path).unwrap(); + let balance: i64 = verify_conn + .query_row("SELECT balance FROM accounts WHERE id = 1", [], |row| { + row.get(0) + }) + .unwrap(); + assert_eq!(balance, 175); // 200 + 50 - 75 +} + +#[test] +fn test_build_with_projection_blocking_run() { + let temp_dir = tempfile::tempdir().unwrap(); + let db = Arc::new(AzothDb::open(temp_dir.path()).unwrap()); + + // Write events + { + let mut txn = db.canonical().write_txn().unwrap(); + txn.append_event(b"deposit:500").unwrap(); + txn.commit().unwrap(); + } + + // Build inside the dedicated thread because EventProcessor holds + // Arc which is !Send (rusqlite::Connection is !Sync). + let db2 = db.clone(); + let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel::<()>(); + let handle = std::thread::spawn(move || { + let mut processor = EventProcessor::builder(db2) + .with_handler(Box::new(DepositHandler)) + .build_with_projection() + .expect("build_with_projection should succeed"); + + let shutdown = processor.shutdown_handle(); + + // Signal the main thread that shutdown handle is ready + // by simply running; main thread uses the channel to stop us. + std::thread::spawn(move || { + let _ = shutdown_rx.recv(); + shutdown.shutdown(); + }); + + processor.run_blocking() + }); + + // Wait until the processor catches up + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2); + loop { + let proj_path = db.projection().db_path().to_path_buf(); + let check_conn = Connection::open(&proj_path).unwrap(); + let result: std::result::Result = + check_conn.query_row("SELECT balance FROM accounts WHERE id = 1", [], |row| { + row.get(0) + }); + if let Ok(balance) = result { + if balance == 500 { + break; + } + } + if std::time::Instant::now() > deadline { + panic!("EventProcessor did not process events within 2s"); + } + std::thread::sleep(std::time::Duration::from_millis(50)); + } + + let _ = shutdown_tx.send(()); + handle + .join() + .expect("processor thread should not panic") + .unwrap(); +} diff --git a/crates/azoth/tests/integration_test.rs b/crates/azoth/tests/integration_test.rs index a437afd..0892a47 100644 --- a/crates/azoth/tests/integration_test.rs +++ b/crates/azoth/tests/integration_test.rs @@ -1,6 +1,7 @@ //! Integration tests for axiom database use azoth::prelude::*; +use std::sync::Arc; use tempfile::TempDir; /// Helper to create a test database @@ -350,3 +351,137 @@ fn test_schema_version() { db.projection().migrate(1).unwrap(); assert_eq!(db.projection().schema_version().unwrap(), 1); } + +#[tokio::test] +async fn test_projector_wakes_on_event_notification() { + let temp_dir = tempfile::tempdir().unwrap(); + let db = Arc::new(AzothDb::open(temp_dir.path()).unwrap()); + + // Spawn the projector with a very long poll interval (10s). + // If notification works, it will wake up far sooner. + let db2 = Arc::clone(&db); + let projector_handle = tokio::spawn(async move { db2.projector().run_continuous().await }); + + // Give the projector task time to start and enter its await + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Write events + { + let mut txn = db.canonical().write_txn().unwrap(); + txn.append_event(b"test:event1").unwrap(); + txn.append_event(b"test:event2").unwrap(); + txn.commit().unwrap(); + } + + // The notification should wake the projector almost immediately. + // Wait up to 500ms -- if we had to rely on the 10s poll this would time out. + let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(500); + loop { + if db.projector().get_lag().unwrap() == 0 { + break; + } + if tokio::time::Instant::now() > deadline { + panic!("Projector did not catch up within 500ms -- notification may not be working"); + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + + assert_eq!(db.projection().get_cursor().unwrap(), 1); + + // Shut down + db.projector().shutdown(); + let _ = tokio::time::timeout(std::time::Duration::from_secs(2), projector_handle).await; +} + +#[test] +fn test_write_batch_atomic_commit() { + let (db, _temp) = create_test_db(); + + // Build a batch with multiple puts, a delete, and an event + let mut batch = db.write_batch(); + batch.put(b"wb:key1", b"value1"); + batch.put(b"wb:key2", b"value2"); + batch.put(b"wb:key3", b"value3"); + batch.delete(b"wb:key3"); // delete within same batch + batch.append_event(b"batch_write:3_keys"); + + assert_eq!(batch.len(), 5); + + let info = batch.commit().unwrap(); + assert_eq!(info.state_keys_written, 3); + assert_eq!(info.state_keys_deleted, 1); + assert_eq!(info.events_written, 1); + + // Verify state (scope the read txn so LMDB's single-reader-per-thread slot is freed) + { + let txn = db.canonical().read_txn().unwrap(); + assert_eq!(txn.get_state(b"wb:key1").unwrap(), Some(b"value1".to_vec())); + assert_eq!(txn.get_state(b"wb:key2").unwrap(), Some(b"value2".to_vec())); + assert_eq!(txn.get_state(b"wb:key3").unwrap(), None); // deleted + } + + // Verify event + let meta = db.canonical().meta().unwrap(); + assert_eq!(meta.next_event_id, 1); +} + +#[test] +fn test_write_batch_empty_is_noop() { + let (db, _temp) = create_test_db(); + + let batch = db.write_batch(); + assert!(batch.is_empty()); + + // Committing an empty batch should succeed with zero-stat info + let info = batch.commit().unwrap(); + assert_eq!(info.events_written, 0); + assert_eq!(info.state_keys_written, 0); +} + +#[tokio::test] +async fn test_write_batch_commit_async() { + let temp_dir = tempfile::tempdir().unwrap(); + let db = AzothDb::open(temp_dir.path()).unwrap(); + + let mut batch = db.write_batch(); + batch.put(b"async:key", b"async_value"); + batch.append_event(b"async_write:1"); + + let info = batch.commit_async().await.unwrap(); + assert_eq!(info.state_keys_written, 1); + assert_eq!(info.events_written, 1); + + // Verify + let txn = db.canonical().read_txn().unwrap(); + assert_eq!( + txn.get_state(b"async:key").unwrap(), + Some(b"async_value".to_vec()) + ); +} + +#[tokio::test] +async fn test_submit_write_async() { + let temp_dir = tempfile::tempdir().unwrap(); + let db = AzothDb::open(temp_dir.path()).unwrap(); + + // Use submit_write to do a multi-op transaction asynchronously + db.submit_write(|txn| { + txn.put_state(b"sw:key1", b"val1")?; + txn.put_state(b"sw:key2", b"val2")?; + txn.append_event(b"submit_write:2")?; + Ok(()) + }) + .await + .unwrap(); + + // Verify state was committed (scope the read txn to free the reader slot) + { + let read = db.canonical().read_txn().unwrap(); + assert_eq!(read.get_state(b"sw:key1").unwrap(), Some(b"val1".to_vec())); + assert_eq!(read.get_state(b"sw:key2").unwrap(), Some(b"val2".to_vec())); + } + + // Verify event + let meta = db.canonical().meta().unwrap(); + assert_eq!(meta.next_event_id, 1); +} diff --git a/crates/azoth/tests/read_pool_test.rs b/crates/azoth/tests/read_pool_test.rs index ef55ced..ea56f54 100644 --- a/crates/azoth/tests/read_pool_test.rs +++ b/crates/azoth/tests/read_pool_test.rs @@ -342,3 +342,104 @@ async fn test_sqlite_read_pool_query() { .unwrap(); assert_eq!(count, 2); } + +#[test] +fn test_projection_read_pool_enabled_by_default() { + // AzothDb::open() should create a projection store with a 4-connection + // read pool without any explicit configuration. + let temp_dir = tempfile::tempdir().unwrap(); + let db = AzothDb::open(temp_dir.path()).unwrap(); + + assert!(db.projection().has_read_pool()); + let pool = db.projection().read_pool().unwrap(); + assert_eq!(pool.pool_size(), 4); +} + +#[test] +fn test_projection_read_pool_explicitly_disabled() { + // Consumers that relied on disabled pool should still be able + // to opt out via config. + let temp_dir = tempfile::tempdir().unwrap(); + let canonical_config = CanonicalConfig::new(temp_dir.path().join("canonical")); + let projection_config = ProjectionConfig::new(temp_dir.path().join("projection.db")) + .with_read_pool(azoth_core::ReadPoolConfig::default()); // default = disabled + + let db = AzothDb::open_with_config( + temp_dir.path().to_path_buf(), + canonical_config, + projection_config, + ) + .unwrap(); + + assert!(!db.projection().has_read_pool()); + assert!(db.projection().read_pool().is_none()); +} + +#[tokio::test] +async fn test_projection_default_pool_serves_queries() { + // Verify that the default projection pool can actually serve + // read queries opened via the standard AzothDb::open() path. + let temp_dir = tempfile::tempdir().unwrap(); + let db = AzothDb::open(temp_dir.path()).unwrap(); + + // Write data through the write connection + db.execute(|conn| { + conn.execute("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)", []) + .map_err(|e| azoth_core::error::AzothError::Projection(e.to_string()))?; + conn.execute("INSERT INTO items (id, name) VALUES (1, 'alpha')", []) + .map_err(|e| azoth_core::error::AzothError::Projection(e.to_string()))?; + Ok(()) + }) + .unwrap(); + + // Read through the pool + let pool = db.projection().read_pool().unwrap().clone(); + let conn = pool.acquire().await.unwrap(); + let name: String = conn + .query_row("SELECT name FROM items WHERE id = 1", [], |row| row.get(0)) + .unwrap(); + assert_eq!(name, "alpha"); + + // Also verify the closure-based query() path uses the read side + let name2: String = db + .query(|conn| { + conn.query_row("SELECT name FROM items WHERE id = 1", [], |row| row.get(0)) + .map_err(|e| azoth_core::error::AzothError::Projection(e.to_string())) + }) + .unwrap(); + assert_eq!(name2, "alpha"); +} + +#[test] +fn test_projection_write_conn_separate_from_read() { + // projection_write_conn() should return the write connection, + // distinct from the read pool path. + let temp_dir = tempfile::tempdir().unwrap(); + let db = AzothDb::open(temp_dir.path()).unwrap(); + + // Write via write_conn + { + let conn = db.projection_write_conn(); + let guard = conn.lock(); + guard + .execute( + "CREATE TABLE write_test (id INTEGER PRIMARY KEY, val TEXT)", + [], + ) + .unwrap(); + guard + .execute("INSERT INTO write_test (id, val) VALUES (1, 'written')", []) + .unwrap(); + } + + // Read back via the closure API (uses read connection / pool) + let val: String = db + .query(|conn| { + conn.query_row("SELECT val FROM write_test WHERE id = 1", [], |row| { + row.get(0) + }) + .map_err(|e| azoth_core::error::AzothError::Projection(e.to_string())) + }) + .unwrap(); + assert_eq!(val, "written"); +} From 079e1dd25b6616ef7edb073df4e925df38f3005d Mon Sep 17 00:00:00 2001 From: johnny Date: Fri, 13 Feb 2026 18:25:54 -0500 Subject: [PATCH 4/4] fix doctest --- crates/azoth-lmdb/src/store.rs | 4 ++-- crates/azoth-projector/src/projector.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/azoth-lmdb/src/store.rs b/crates/azoth-lmdb/src/store.rs index 46dd90f..8a23cf6 100644 --- a/crates/azoth-lmdb/src/store.rs +++ b/crates/azoth-lmdb/src/store.rs @@ -647,7 +647,7 @@ impl LmdbCanonicalStore { /// Put a single key-value pair asynchronously and commit. /// - /// Convenience wrapper around [`submit_write`] for the common case of + /// Convenience wrapper around [`Self::submit_write`] for the common case of /// writing a single key without an event. pub async fn async_put_state(&self, key: &[u8], value: &[u8]) -> Result { let key = key.to_vec(); @@ -669,7 +669,7 @@ impl LmdbCanonicalStore { /// Delete a single key asynchronously and commit. /// - /// Convenience wrapper around [`submit_write`]. + /// Convenience wrapper around [`Self::submit_write`]. pub async fn async_del_state(&self, key: &[u8]) -> Result { let key = key.to_vec(); self.submit_write(move |txn| { diff --git a/crates/azoth-projector/src/projector.rs b/crates/azoth-projector/src/projector.rs index f788e4e..ccfbc9c 100644 --- a/crates/azoth-projector/src/projector.rs +++ b/crates/azoth-projector/src/projector.rs @@ -118,7 +118,7 @@ where /// Run the projector continuously until shutdown. /// - /// When an `event_notify` handle is set (via [`with_event_notify`]), + /// When an `event_notify` handle is set (via [`Self::with_event_notify`]), /// the projector awaits the notification instead of polling, giving /// near-zero-latency projection with zero CPU waste when idle. /// Falls back to `poll_interval_ms` sleep if no notifier is present.