Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions crates/azoth-core/src/config/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ pub struct ProjectionConfig {
#[serde(default = "default_cache_size")]
pub cache_size: i32,

/// Read pool configuration (optional, disabled by default)
/// Read pool configuration (enabled by default with 4 connections).
///
/// When enabled, maintains a pool of read-only connections for
/// concurrent read access without blocking writes.
#[serde(default)]
/// Maintains a pool of read-only connections for concurrent read access
/// without blocking writes. Disable with `ReadPoolConfig::default()` if
/// you need single-connection behavior.
#[serde(default = "default_read_pool")]
pub read_pool: ReadPoolConfig,
}

Expand All @@ -58,6 +59,10 @@ fn default_cache_size() -> i32 {
-64000 // 64MB
}

fn default_read_pool() -> ReadPoolConfig {
ReadPoolConfig::enabled(4)
}

impl ProjectionConfig {
pub fn new(path: PathBuf) -> Self {
Self {
Expand All @@ -66,7 +71,7 @@ impl ProjectionConfig {
synchronous: SynchronousMode::default(),
schema_version: default_schema_version(),
cache_size: default_cache_size(),
read_pool: ReadPoolConfig::default(),
read_pool: default_read_pool(),
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/azoth-file-log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ serde_json = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["sync"] }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
32 changes: 32 additions & 0 deletions crates/azoth-file-log/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use tokio::sync::Notify;

/// Configuration for file-based event log
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -81,6 +82,12 @@ pub struct FileEventLog {
next_event_id: Arc<AtomicU64>,
writer: Arc<Mutex<BufWriter<File>>>,
current_file_num: Arc<AtomicU64>,
/// Optional notifier that fires after each successful event append.
///
/// Enables push-based projection: the projector can `notify.notified().await`
/// instead of polling, giving near-zero-latency event processing with zero
/// CPU waste when idle.
event_notify: Option<Arc<Notify>>,
}

impl FileEventLog {
Expand Down Expand Up @@ -119,9 +126,24 @@ impl FileEventLog {
next_event_id,
writer,
current_file_num,
event_notify: None,
})
}

/// Set the event notification handle.
///
/// When set, `notify_waiters()` is called after every successful
/// `append_with_id` / `append_batch_with_ids`, waking any task that
/// is awaiting `notified()`.
pub fn set_event_notify(&mut self, notify: Arc<Notify>) {
self.event_notify = Some(notify);
}

/// Get a clone of the event notification handle (if set).
pub fn event_notify(&self) -> Option<Arc<Notify>> {
self.event_notify.clone()
}

/// Get path to a log file by number
fn log_file_path(base_dir: &Path, file_num: u64) -> PathBuf {
base_dir.join(format!("events-{:08}.log", file_num))
Expand Down Expand Up @@ -257,6 +279,11 @@ impl EventLog for FileEventLog {
// Check for rotation
self.check_rotation()?;

// Wake any waiting projectors / event processors
if let Some(notify) = &self.event_notify {
notify.notify_waiters();
}

Ok(())
}

Expand Down Expand Up @@ -346,6 +373,11 @@ impl EventLog for FileEventLog {
// Check for rotation
self.check_rotation()?;

// Wake any waiting projectors / event processors
if let Some(notify) = &self.event_notify {
notify.notify_waiters();
}

Ok(())
}

Expand Down
136 changes: 132 additions & 4 deletions crates/azoth-lmdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use azoth_core::{
error::{AzothError, Result},
event_log::{EventLog, EventLogIterator},
lock_manager::LockManager,
traits::{self, CanonicalStore, EventIter, StateIter},
types::{BackupInfo, CanonicalMeta, EventId},
traits::{self, CanonicalStore, CanonicalTxn, EventIter, StateIter},
types::{BackupInfo, CanonicalMeta, CommitInfo, EventId},
CanonicalConfig,
};
use azoth_file_log::{FileEventLog, FileEventLogConfig};
Expand All @@ -15,6 +15,7 @@ use std::sync::{
Arc,
};
use std::time::Duration;
use tokio::sync::Notify;

use crate::keys::meta_keys;
use crate::preflight_cache::PreflightCache;
Expand Down Expand Up @@ -52,6 +53,12 @@ pub struct LmdbCanonicalStore {
txn_counter: Arc<AtomicUsize>,
preflight_cache: Arc<PreflightCache>,
read_pool: Option<Arc<LmdbReadPool>>,
/// Shared notifier that fires after each successful event append.
///
/// Created during `open()` and injected into the `FileEventLog`.
/// Consumers (projector, event processor) can clone this to await
/// new events without polling.
event_notify: Arc<Notify>,
}

impl LmdbCanonicalStore {
Expand Down Expand Up @@ -160,14 +167,17 @@ impl CanonicalStore for LmdbCanonicalStore {
cfg.preflight_cache_enabled,
));

// Initialize file-based event log
// Initialize file-based event log with push-based notification
let event_notify = Arc::new(Notify::new());
let event_log_config = FileEventLogConfig {
base_dir: cfg.path.join("event-log"),
max_event_size: cfg.event_max_size_bytes,
max_batch_bytes: cfg.event_batch_max_bytes,
..Default::default()
};
let event_log = Arc::new(FileEventLog::open(event_log_config)?);
let mut event_log = FileEventLog::open(event_log_config)?;
event_log.set_event_notify(event_notify.clone());
let event_log = Arc::new(event_log);

// Initialize metadata if needed
{
Expand Down Expand Up @@ -235,6 +245,7 @@ impl CanonicalStore for LmdbCanonicalStore {
txn_counter: Arc::new(AtomicUsize::new(0)),
preflight_cache,
read_pool,
event_notify,
})
}

Expand Down Expand Up @@ -544,6 +555,15 @@ impl LmdbCanonicalStore {
&self.event_log
}

/// Get the shared event notification handle.
///
/// This `Notify` fires after every successful event append. Consumers
/// (projectors, event processors) can call `notify.notified().await` to
/// wake immediately when new events are available, eliminating polling.
pub fn event_notify(&self) -> Arc<Notify> {
self.event_notify.clone()
}

/// Get reference to the preflight cache
///
/// This allows access to cache statistics and manual cache operations.
Expand All @@ -558,6 +578,114 @@ impl LmdbCanonicalStore {
self.read_pool.as_ref()
}

// ---- Native async write API ----

/// Execute a write operation asynchronously.
///
/// Opens a write transaction, passes it to the closure, and commits
/// atomically -- all inside `spawn_blocking`. This centralizes the
/// `spawn_blocking` boilerplate that otherwise appears at every call site.
///
/// The closure receives a mutable reference to an `LmdbWriteTxn` and
/// can perform any combination of `put_state`, `del_state`,
/// `append_event`, etc. The return value `R` is forwarded to the caller.
///
/// # Example
/// ```ignore
/// let info = store.submit_write(|txn| {
/// txn.put_state(b"key", b"value")?;
/// txn.append_event(b"{\"type\":\"put\"}")?;
/// Ok(())
/// }).await?;
/// ```
pub async fn submit_write<F, R>(&self, f: F) -> Result<R>
where
F: for<'a> FnOnce(&mut LmdbWriteTxn<'a>) -> Result<R> + Send + 'static,
R: Send + 'static,
{
let env = self.env.clone();
let state_db = self.state_db;
let meta_db = self.meta_db;
let event_log = self.event_log.clone();
let txn_counter = self.txn_counter.clone();
let preflight_cache = self.preflight_cache.clone();
let paused = self.paused.clone();

tokio::task::spawn_blocking(move || {
// Check if paused
if paused.load(Ordering::SeqCst) {
return Err(AzothError::Paused);
}

// Increment transaction counter
txn_counter.fetch_add(1, Ordering::SeqCst);

let rw_txn = match env.begin_rw_txn() {
Ok(t) => t,
Err(e) => {
txn_counter.fetch_sub(1, Ordering::SeqCst);
return Err(AzothError::Transaction(e.to_string()));
}
};

let mut write_txn = LmdbWriteTxn::new(
rw_txn,
state_db,
meta_db,
event_log,
Arc::downgrade(&txn_counter),
preflight_cache,
);

let result = f(&mut write_txn)?;
write_txn.commit()?;
Ok(result)
})
.await
.map_err(|e| AzothError::Internal(format!("Task join error: {}", e)))?
}

/// Put a single key-value pair asynchronously and commit.
///
/// Convenience wrapper around [`Self::submit_write`] for the common case of
/// writing a single key without an event.
pub async fn async_put_state(&self, key: &[u8], value: &[u8]) -> Result<CommitInfo> {
let key = key.to_vec();
let value = value.to_vec();
self.submit_write(move |txn| {
txn.put_state(&key, &value)?;
Ok(())
})
.await?;
// submit_write commits; return a minimal CommitInfo
Ok(CommitInfo {
events_written: 0,
first_event_id: None,
last_event_id: None,
state_keys_written: 1,
state_keys_deleted: 0,
})
}

/// Delete a single key asynchronously and commit.
///
/// Convenience wrapper around [`Self::submit_write`].
pub async fn async_del_state(&self, key: &[u8]) -> Result<CommitInfo> {
let key = key.to_vec();
self.submit_write(move |txn| {
txn.del_state(&key)?;
Ok(())
})
.await?;
Ok(CommitInfo {
events_written: 0,
first_event_id: None,
last_event_id: None,
state_keys_written: 0,
state_keys_deleted: 1,
})
}

/// Check if read pooling is enabled
pub fn has_read_pool(&self) -> bool {
self.read_pool.is_some()
Expand Down
2 changes: 1 addition & 1 deletion crates/azoth-projector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ serde = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
anyhow = { workspace = true }
tokio = { workspace = true, features = ["time"] }
tokio = { workspace = true, features = ["time", "sync"] }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
39 changes: 35 additions & 4 deletions crates/azoth-projector/src/projector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use azoth_core::{
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Notify;

/// Projector: consumes events from canonical store and applies to projection
pub struct Projector<C, P>
Expand All @@ -18,6 +19,10 @@ where
projection: Arc<P>,
config: ProjectorConfig,
shutdown: Arc<AtomicBool>,
/// When set, the projector awaits this notification instead of polling.
/// The `FileEventLog` fires `notify_waiters()` after every successful append,
/// giving near-zero-latency projection with zero CPU waste when idle.
event_notify: Option<Arc<Notify>>,
}

impl<C, P> Projector<C, P>
Expand All @@ -31,9 +36,20 @@ where
projection,
config,
shutdown: Arc::new(AtomicBool::new(false)),
event_notify: None,
}
}

/// Attach an event notification handle for push-based projection.
///
/// When set, `run_continuous()` awaits this notification instead of
/// sleeping for `poll_interval_ms` when caught up, giving near-zero
/// latency event processing with zero idle CPU usage.
pub fn with_event_notify(mut self, notify: Arc<Notify>) -> Self {
self.event_notify = Some(notify);
self
}

/// Run one iteration of the projector loop
pub fn run_once(&self) -> Result<ProjectorStats> {
let start = Instant::now();
Expand Down Expand Up @@ -100,15 +116,30 @@ where
})
}

/// Run the projector continuously until shutdown
/// Run the projector continuously until shutdown.
///
/// When an `event_notify` handle is set (via [`Self::with_event_notify`]),
/// the projector awaits the notification instead of polling, giving
/// near-zero-latency projection with zero CPU waste when idle.
/// Falls back to `poll_interval_ms` sleep if no notifier is present.
pub async fn run_continuous(&self) -> Result<()> {
while !self.shutdown.load(Ordering::SeqCst) {
match self.run_once() {
Ok(stats) => {
if stats.events_applied == 0 {
// Caught up, sleep briefly
tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms))
.await;
// Caught up -- wait for new events
if let Some(notify) = &self.event_notify {
// Push-based: await notification from event log
// Use tokio::select! so we also wake on shutdown
tokio::select! {
_ = notify.notified() => {}
_ = tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms)) => {}
}
} else {
// Legacy polling fallback
tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms))
.await;
}
} else {
tracing::debug!(
"Applied {} events, {} bytes in {:?}",
Expand Down
Loading