Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,49 @@ Single-threaded benchmarks (release mode):

Use batching for maximum throughput.

## Performance Tuning

### Read Pools

Both LMDB and SQLite stores support optional read connection pools.
Enable them when your workload has significant concurrent read traffic:

```rust
use azoth_core::config::ReadPoolConfig;

// LMDB read pool (concurrent read transactions)
let config = CanonicalConfig::new(path)
.with_read_pool_size(4); // 4 concurrent read slots

// SQLite read pool (multiple read-only connections)
let config = ProjectionConfig {
read_pool: ReadPoolConfig::enabled(4),
..Default::default()
};
```

### Event Processing Batch Sizes

| Batch size | Use case |
|---|---|
| `100` (default) | Low-latency, real-time processing |
| `1,000` | Balanced throughput for steady-state workloads |
| `10,000+` | Bulk catch-up / replaying large backlogs |

### SyncMode (LMDB Durability)

| Mode | Durability | Performance |
|---|---|---|
| `Full` | Survives power loss | Slowest (fsync every commit) |
| `NoMetaSync` (default) | Survives process crash; may lose 1 txn on power loss | Good balance |
| `NoSync` | No guarantees beyond process lifetime | Fastest; **test/ephemeral only** |

### File Event Log – `flush_on_append`

By default the file event log flushes after every append. Set `flush_on_append: false`
in `FileEventLogConfig` to defer flushing to the OS, gaining significant write throughput
at the cost of losing un-flushed events on a process crash.

## Getting Help

- Crates.io: https://crates.io/crates/azoth
Expand Down
22 changes: 19 additions & 3 deletions crates/azoth-core/src/config/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,28 @@ pub struct CanonicalConfig {

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
pub enum SyncMode {
/// Full durability (fsync on every commit)
/// Full durability – calls `fsync()` on every commit.
///
/// Guarantees that committed data survives power loss and OS crashes.
/// This is the safest option but has the highest write latency (~2-5x slower
/// than `NoMetaSync`).
Full,
/// No metadata sync (faster, usually safe)

/// Skips syncing the LMDB meta-page on each commit (default).
///
/// Data pages are still synced, so committed data is durable against process
/// crashes. In the rare event of an OS crash or power failure, the last
/// transaction _may_ be lost, but the database will remain consistent.
/// This is a good balance of durability and performance for most workloads.
#[default]
NoMetaSync,
/// No sync at all (fastest, least safe)

/// Disables `fsync()` entirely – the OS page cache decides when to flush.
///
/// **WARNING**: This is the fastest mode but offers no durability guarantees
/// beyond normal process lifetime. A power failure or OS crash can lose an
/// unbounded number of recent transactions or, in the worst case, corrupt the
/// database file. Only use this for ephemeral, reproducible, or test workloads.
NoSync,
}

Expand Down
21 changes: 19 additions & 2 deletions crates/azoth-file-log/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ pub struct FileEventLogConfig {

/// Maximum total size for a single append batch (bytes)
pub max_batch_bytes: usize,

/// Whether to flush the write buffer after each append (default: true).
///
/// When `true`, every `append_with_id` / `append_events_batch` call flushes
/// the `BufWriter`, ensuring data reaches the OS page cache immediately.
/// Set to `false` for maximum throughput at the cost of losing in-flight
/// buffered events on a process crash (the OS-level file won't be corrupted,
/// but un-flushed events will be lost).
pub flush_on_append: bool,
}

impl Default for FileEventLogConfig {
Expand All @@ -43,6 +52,7 @@ impl Default for FileEventLogConfig {
batch_buffer_size: 1024 * 1024, // 1MB for batch writes
max_event_size: 4 * 1024 * 1024, // 4MB single-event limit
max_batch_bytes: 64 * 1024 * 1024, // 64MB batch limit
flush_on_append: true,
}
}
}
Expand Down Expand Up @@ -227,6 +237,12 @@ impl EventLog for FileEventLog {
// Write event entry
self.write_event_entry(event_id, event_bytes)?;

// Conditionally flush based on config
if self.config.flush_on_append {
let mut writer = self.writer.lock().unwrap();
writer.flush()?;
}

// Update metadata
{
let mut meta = self.meta.lock().unwrap();
Expand Down Expand Up @@ -309,8 +325,8 @@ impl EventLog for FileEventLog {
writer.write_all(&buffer)?;
}

// Flush after batch
{
// Conditionally flush based on config
if self.config.flush_on_append {
let mut writer = self.writer.lock().unwrap();
writer.flush()?;
}
Expand Down Expand Up @@ -580,6 +596,7 @@ mod tests {
batch_buffer_size: 4096,
max_event_size: 1024 * 1024,
max_batch_bytes: 16 * 1024 * 1024,
flush_on_append: true,
};
let log = FileEventLog::open(config).unwrap();
(log, temp_dir)
Expand Down
9 changes: 7 additions & 2 deletions crates/azoth-lmdb/src/preflight_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@
//!
//! This module provides an optional cache to speed up preflight validation by caching
//! frequently accessed state keys. The cache is global, thread-safe, and supports:
//! - LRU eviction when capacity is reached
//! - **FIFO eviction** when capacity is reached (oldest insertion is evicted first)
//! - TTL-based expiration for stale entries
//! - Invalidation on transaction commit for modified keys
//!
//! **Note**: The eviction policy is FIFO (First In, First Out), not LRU. This means
//! a frequently-read key can still be evicted if it was inserted earliest. For most
//! workloads this is acceptable because the TTL ensures freshness, and the cache's
//! primary goal is to reduce LMDB reads for "just written" keys during preflight.

use dashmap::DashMap;
use parking_lot::Mutex;
Expand Down Expand Up @@ -99,7 +104,7 @@ impl PreflightCache {

/// Insert a value into the cache.
///
/// If the cache is at capacity, evicts the least recently used entry.
/// If the cache is at capacity, evicts the oldest entry (FIFO order).
/// This is a no-op if the cache is disabled.
pub fn insert(&self, key: Vec<u8>, value: CachedValue) {
if !self.enabled {
Expand Down
35 changes: 23 additions & 12 deletions crates/azoth-lmdb/src/read_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,31 @@ impl LmdbReadPool {

/// Acquire a pooled read-only transaction (blocking)
///
/// This is a synchronous version that blocks the current thread.
/// This is a synchronous version that blocks the current thread using
/// exponential backoff (1ms, 2ms, 4ms, ... capped at 32ms) up to the
/// configured `acquire_timeout`.
///
/// Prefer `acquire()` in async contexts.
pub fn acquire_blocking(&self) -> Result<PooledLmdbReadTxn<'_>> {
let permit = self
.semaphore
.try_acquire()
.or_else(|_| {
// If immediate acquire fails, do a blocking wait with timeout
std::thread::sleep(Duration::from_millis(1));
self.semaphore.try_acquire()
})
.map_err(|_| {
AzothError::Internal("Read pool exhausted - use async acquire for waiting".into())
})?;
let deadline = std::time::Instant::now() + self.acquire_timeout;
let mut backoff_ms = 1u64;
const MAX_BACKOFF_MS: u64 = 32;

let permit = loop {
match self.semaphore.try_acquire() {
Ok(permit) => break permit,
Err(_) => {
if std::time::Instant::now() >= deadline {
return Err(AzothError::Timeout(format!(
"LMDB read pool acquire timeout after {:?}",
self.acquire_timeout
)));
}
std::thread::sleep(Duration::from_millis(backoff_ms));
backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
}
}
};

let txn = self
.env
Expand Down
13 changes: 10 additions & 3 deletions crates/azoth-scheduler/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ impl ScheduleProjection {
}

/// List all tasks with optional filtering.
///
/// All filter values are passed as bound parameters to prevent SQL injection.
pub fn list_tasks(&self, filter: &TaskFilter) -> Result<Vec<ScheduledTask>> {
let mut query = String::from(
r#"
Expand All @@ -160,19 +162,24 @@ impl ScheduleProjection {
"#,
);

// Collect bound parameters to prevent SQL injection
let mut bound_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();

if let Some(enabled) = filter.enabled {
query.push_str(&format!(" AND enabled = {}", if enabled { 1 } else { 0 }));
query.push_str(" AND enabled = ?");
bound_params.push(Box::new(if enabled { 1_i32 } else { 0_i32 }));
}

if let Some(task_type) = &filter.task_type {
query.push_str(&format!(" AND task_type = '{}'", task_type));
query.push_str(" AND task_type = ?");
bound_params.push(Box::new(task_type.clone()));
}

query.push_str(" ORDER BY created_at DESC");

let mut stmt = self.conn.prepare(&query)?;
let tasks = stmt
.query_map([], |row| {
.query_map(rusqlite::params_from_iter(bound_params.iter()), |row| {
Ok(ScheduledTask {
task_id: row.get(0)?,
task_type: row.get(1)?,
Expand Down
19 changes: 14 additions & 5 deletions crates/azoth-sqlite/src/read_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use azoth_core::{
};
use rusqlite::{Connection, OpenFlags};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::time::{Duration, Instant};
use tokio::sync::{Semaphore, SemaphorePermit};
Expand Down Expand Up @@ -76,6 +77,8 @@ pub struct SqliteReadPool {
acquire_timeout: Duration,
enabled: bool,
db_path: PathBuf,
/// Round-robin index for distributing connection acquisition attempts
next_idx: AtomicUsize,
}

impl SqliteReadPool {
Expand All @@ -102,6 +105,7 @@ impl SqliteReadPool {
acquire_timeout: Duration::from_millis(config.acquire_timeout_ms),
enabled: config.enabled,
db_path: db_path.to_path_buf(),
next_idx: AtomicUsize::new(0),
})
}

Expand All @@ -120,9 +124,11 @@ impl SqliteReadPool {
})?
.map_err(|e| AzothError::Internal(format!("Semaphore closed: {}", e)))?;

// Find an available connection (the permit ensures one is available)
for conn in &self.connections {
if let Ok(guard) = conn.try_lock() {
// Round-robin: start from next_idx to distribute lock contention
let start = self.next_idx.fetch_add(1, Ordering::Relaxed) % self.connections.len();
for i in 0..self.connections.len() {
let idx = (start + i) % self.connections.len();
if let Ok(guard) = self.connections[idx].try_lock() {
return Ok(PooledSqliteConnection {
conn: guard,
_permit: permit,
Expand All @@ -142,8 +148,11 @@ impl SqliteReadPool {
pub fn try_acquire(&self) -> Result<Option<PooledSqliteConnection<'_>>> {
match self.semaphore.try_acquire() {
Ok(permit) => {
for conn in &self.connections {
if let Ok(guard) = conn.try_lock() {
// Round-robin: start from next_idx to distribute lock contention
let start = self.next_idx.fetch_add(1, Ordering::Relaxed) % self.connections.len();
for i in 0..self.connections.len() {
let idx = (start + i) % self.connections.len();
if let Ok(guard) = self.connections[idx].try_lock() {
return Ok(Some(PooledSqliteConnection {
conn: guard,
_permit: permit,
Expand Down
10 changes: 8 additions & 2 deletions crates/azoth-vector/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ use std::path::Path;
///
/// # Safety
///
/// This function uses unsafe code to load the extension. The extension must be
/// a valid SQLite extension and should be from a trusted source.
/// This function uses `unsafe` because `rusqlite::Connection::load_extension`
/// calls `sqlite3_load_extension`, which loads a native shared library (`.so` / `.dylib` / `.dll`)
/// into the current process. Loading an untrusted library can execute arbitrary code.
///
/// **Requirements for safe usage:**
/// - The extension binary **must** come from a trusted, verified source (e.g., official releases).
/// - The `path` argument should **never** be derived from user-supplied input.
/// - Consider validating file checksums before loading in production deployments.
pub fn load_vector_extension(conn: &Connection, path: Option<&Path>) -> Result<()> {
let ext_path = path.unwrap_or_else(|| {
#[cfg(target_os = "linux")]
Expand Down
2 changes: 1 addition & 1 deletion crates/azoth-vector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
//!
//! // Search for similar vectors
//! let query = Vector::new(vec![0.15, 0.25, 0.35]);
//! let search = VectorSearch::new(db.projection().clone(), "embeddings", "vector");
//! let search = VectorSearch::new(db.projection().clone(), "embeddings", "vector")?;
//! let results = search.knn(&query, 10).await?;
//! # Ok(())
//! # }
Expand Down
Loading