diff --git a/AGENTS.md b/AGENTS.md index f12e111..f731231 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -672,6 +672,49 @@ Single-threaded benchmarks (release mode): Use batching for maximum throughput. +## Performance Tuning + +### Read Pools + +Both LMDB and SQLite stores support optional read connection pools. +Enable them when your workload has significant concurrent read traffic: + +```rust +use azoth_core::config::ReadPoolConfig; + +// LMDB read pool (concurrent read transactions) +let config = CanonicalConfig::new(path) + .with_read_pool_size(4); // 4 concurrent read slots + +// SQLite read pool (multiple read-only connections) +let config = ProjectionConfig { + read_pool: ReadPoolConfig::enabled(4), + ..Default::default() +}; +``` + +### Event Processing Batch Sizes + +| Batch size | Use case | +|---|---| +| `100` (default) | Low-latency, real-time processing | +| `1,000` | Balanced throughput for steady-state workloads | +| `10,000+` | Bulk catch-up / replaying large backlogs | + +### SyncMode (LMDB Durability) + +| Mode | Durability | Performance | +|---|---|---| +| `Full` | Survives power loss | Slowest (fsync every commit) | +| `NoMetaSync` (default) | Survives process crash; may lose 1 txn on power loss | Good balance | +| `NoSync` | No guarantees beyond process lifetime | Fastest; **test/ephemeral only** | + +### File Event Log – `flush_on_append` + +By default the file event log flushes after every append. Set `flush_on_append: false` +in `FileEventLogConfig` to defer flushing to the OS, gaining significant write throughput +at the cost of losing un-flushed events on a process crash. + ## Getting Help - Crates.io: https://crates.io/crates/azoth diff --git a/crates/azoth-core/src/config/canonical.rs b/crates/azoth-core/src/config/canonical.rs index f85a9f1..541243f 100644 --- a/crates/azoth-core/src/config/canonical.rs +++ b/crates/azoth-core/src/config/canonical.rs @@ -145,12 +145,28 @@ pub struct CanonicalConfig { #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] pub enum SyncMode { - /// Full durability (fsync on every commit) + /// Full durability – calls `fsync()` on every commit. + /// + /// Guarantees that committed data survives power loss and OS crashes. + /// This is the safest option but has the highest write latency (~2-5x slower + /// than `NoMetaSync`). Full, - /// No metadata sync (faster, usually safe) + + /// Skips syncing the LMDB meta-page on each commit (default). + /// + /// Data pages are still synced, so committed data is durable against process + /// crashes. In the rare event of an OS crash or power failure, the last + /// transaction _may_ be lost, but the database will remain consistent. + /// This is a good balance of durability and performance for most workloads. #[default] NoMetaSync, - /// No sync at all (fastest, least safe) + + /// Disables `fsync()` entirely – the OS page cache decides when to flush. + /// + /// **WARNING**: This is the fastest mode but offers no durability guarantees + /// beyond normal process lifetime. A power failure or OS crash can lose an + /// unbounded number of recent transactions or, in the worst case, corrupt the + /// database file. Only use this for ephemeral, reproducible, or test workloads. NoSync, } diff --git a/crates/azoth-file-log/src/store.rs b/crates/azoth-file-log/src/store.rs index f098601..2d481d6 100644 --- a/crates/azoth-file-log/src/store.rs +++ b/crates/azoth-file-log/src/store.rs @@ -32,6 +32,15 @@ pub struct FileEventLogConfig { /// Maximum total size for a single append batch (bytes) pub max_batch_bytes: usize, + + /// Whether to flush the write buffer after each append (default: true). + /// + /// When `true`, every `append_with_id` / `append_events_batch` call flushes + /// the `BufWriter`, ensuring data reaches the OS page cache immediately. + /// Set to `false` for maximum throughput at the cost of losing in-flight + /// buffered events on a process crash (the OS-level file won't be corrupted, + /// but un-flushed events will be lost). + pub flush_on_append: bool, } impl Default for FileEventLogConfig { @@ -43,6 +52,7 @@ impl Default for FileEventLogConfig { batch_buffer_size: 1024 * 1024, // 1MB for batch writes max_event_size: 4 * 1024 * 1024, // 4MB single-event limit max_batch_bytes: 64 * 1024 * 1024, // 64MB batch limit + flush_on_append: true, } } } @@ -227,6 +237,12 @@ impl EventLog for FileEventLog { // Write event entry self.write_event_entry(event_id, event_bytes)?; + // Conditionally flush based on config + if self.config.flush_on_append { + let mut writer = self.writer.lock().unwrap(); + writer.flush()?; + } + // Update metadata { let mut meta = self.meta.lock().unwrap(); @@ -309,8 +325,8 @@ impl EventLog for FileEventLog { writer.write_all(&buffer)?; } - // Flush after batch - { + // Conditionally flush based on config + if self.config.flush_on_append { let mut writer = self.writer.lock().unwrap(); writer.flush()?; } @@ -580,6 +596,7 @@ mod tests { batch_buffer_size: 4096, max_event_size: 1024 * 1024, max_batch_bytes: 16 * 1024 * 1024, + flush_on_append: true, }; let log = FileEventLog::open(config).unwrap(); (log, temp_dir) diff --git a/crates/azoth-lmdb/src/preflight_cache.rs b/crates/azoth-lmdb/src/preflight_cache.rs index 47d9ebb..76635b8 100644 --- a/crates/azoth-lmdb/src/preflight_cache.rs +++ b/crates/azoth-lmdb/src/preflight_cache.rs @@ -2,9 +2,14 @@ //! //! This module provides an optional cache to speed up preflight validation by caching //! frequently accessed state keys. The cache is global, thread-safe, and supports: -//! - LRU eviction when capacity is reached +//! - **FIFO eviction** when capacity is reached (oldest insertion is evicted first) //! - TTL-based expiration for stale entries //! - Invalidation on transaction commit for modified keys +//! +//! **Note**: The eviction policy is FIFO (First In, First Out), not LRU. This means +//! a frequently-read key can still be evicted if it was inserted earliest. For most +//! workloads this is acceptable because the TTL ensures freshness, and the cache's +//! primary goal is to reduce LMDB reads for "just written" keys during preflight. use dashmap::DashMap; use parking_lot::Mutex; @@ -99,7 +104,7 @@ impl PreflightCache { /// Insert a value into the cache. /// - /// If the cache is at capacity, evicts the least recently used entry. + /// If the cache is at capacity, evicts the oldest entry (FIFO order). /// This is a no-op if the cache is disabled. pub fn insert(&self, key: Vec, value: CachedValue) { if !self.enabled { diff --git a/crates/azoth-lmdb/src/read_pool.rs b/crates/azoth-lmdb/src/read_pool.rs index 35e4a73..12c0cd8 100644 --- a/crates/azoth-lmdb/src/read_pool.rs +++ b/crates/azoth-lmdb/src/read_pool.rs @@ -125,20 +125,31 @@ impl LmdbReadPool { /// Acquire a pooled read-only transaction (blocking) /// - /// This is a synchronous version that blocks the current thread. + /// This is a synchronous version that blocks the current thread using + /// exponential backoff (1ms, 2ms, 4ms, ... capped at 32ms) up to the + /// configured `acquire_timeout`. + /// /// Prefer `acquire()` in async contexts. pub fn acquire_blocking(&self) -> Result> { - let permit = self - .semaphore - .try_acquire() - .or_else(|_| { - // If immediate acquire fails, do a blocking wait with timeout - std::thread::sleep(Duration::from_millis(1)); - self.semaphore.try_acquire() - }) - .map_err(|_| { - AzothError::Internal("Read pool exhausted - use async acquire for waiting".into()) - })?; + let deadline = std::time::Instant::now() + self.acquire_timeout; + let mut backoff_ms = 1u64; + const MAX_BACKOFF_MS: u64 = 32; + + let permit = loop { + match self.semaphore.try_acquire() { + Ok(permit) => break permit, + Err(_) => { + if std::time::Instant::now() >= deadline { + return Err(AzothError::Timeout(format!( + "LMDB read pool acquire timeout after {:?}", + self.acquire_timeout + ))); + } + std::thread::sleep(Duration::from_millis(backoff_ms)); + backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS); + } + } + }; let txn = self .env diff --git a/crates/azoth-scheduler/src/projection.rs b/crates/azoth-scheduler/src/projection.rs index ba01626..233413d 100644 --- a/crates/azoth-scheduler/src/projection.rs +++ b/crates/azoth-scheduler/src/projection.rs @@ -149,6 +149,8 @@ impl ScheduleProjection { } /// List all tasks with optional filtering. + /// + /// All filter values are passed as bound parameters to prevent SQL injection. pub fn list_tasks(&self, filter: &TaskFilter) -> Result> { let mut query = String::from( r#" @@ -160,19 +162,24 @@ impl ScheduleProjection { "#, ); + // Collect bound parameters to prevent SQL injection + let mut bound_params: Vec> = Vec::new(); + if let Some(enabled) = filter.enabled { - query.push_str(&format!(" AND enabled = {}", if enabled { 1 } else { 0 })); + query.push_str(" AND enabled = ?"); + bound_params.push(Box::new(if enabled { 1_i32 } else { 0_i32 })); } if let Some(task_type) = &filter.task_type { - query.push_str(&format!(" AND task_type = '{}'", task_type)); + query.push_str(" AND task_type = ?"); + bound_params.push(Box::new(task_type.clone())); } query.push_str(" ORDER BY created_at DESC"); let mut stmt = self.conn.prepare(&query)?; let tasks = stmt - .query_map([], |row| { + .query_map(rusqlite::params_from_iter(bound_params.iter()), |row| { Ok(ScheduledTask { task_id: row.get(0)?, task_type: row.get(1)?, diff --git a/crates/azoth-sqlite/src/read_pool.rs b/crates/azoth-sqlite/src/read_pool.rs index 0a8d6af..b60634d 100644 --- a/crates/azoth-sqlite/src/read_pool.rs +++ b/crates/azoth-sqlite/src/read_pool.rs @@ -9,6 +9,7 @@ use azoth_core::{ }; use rusqlite::{Connection, OpenFlags}; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; use std::time::{Duration, Instant}; use tokio::sync::{Semaphore, SemaphorePermit}; @@ -76,6 +77,8 @@ pub struct SqliteReadPool { acquire_timeout: Duration, enabled: bool, db_path: PathBuf, + /// Round-robin index for distributing connection acquisition attempts + next_idx: AtomicUsize, } impl SqliteReadPool { @@ -102,6 +105,7 @@ impl SqliteReadPool { acquire_timeout: Duration::from_millis(config.acquire_timeout_ms), enabled: config.enabled, db_path: db_path.to_path_buf(), + next_idx: AtomicUsize::new(0), }) } @@ -120,9 +124,11 @@ impl SqliteReadPool { })? .map_err(|e| AzothError::Internal(format!("Semaphore closed: {}", e)))?; - // Find an available connection (the permit ensures one is available) - for conn in &self.connections { - if let Ok(guard) = conn.try_lock() { + // Round-robin: start from next_idx to distribute lock contention + let start = self.next_idx.fetch_add(1, Ordering::Relaxed) % self.connections.len(); + for i in 0..self.connections.len() { + let idx = (start + i) % self.connections.len(); + if let Ok(guard) = self.connections[idx].try_lock() { return Ok(PooledSqliteConnection { conn: guard, _permit: permit, @@ -142,8 +148,11 @@ impl SqliteReadPool { pub fn try_acquire(&self) -> Result>> { match self.semaphore.try_acquire() { Ok(permit) => { - for conn in &self.connections { - if let Ok(guard) = conn.try_lock() { + // Round-robin: start from next_idx to distribute lock contention + let start = self.next_idx.fetch_add(1, Ordering::Relaxed) % self.connections.len(); + for i in 0..self.connections.len() { + let idx = (start + i) % self.connections.len(); + if let Ok(guard) = self.connections[idx].try_lock() { return Ok(Some(PooledSqliteConnection { conn: guard, _permit: permit, diff --git a/crates/azoth-vector/src/extension.rs b/crates/azoth-vector/src/extension.rs index 7f4cafe..6ab8bbc 100644 --- a/crates/azoth-vector/src/extension.rs +++ b/crates/azoth-vector/src/extension.rs @@ -20,8 +20,14 @@ use std::path::Path; /// /// # Safety /// -/// This function uses unsafe code to load the extension. The extension must be -/// a valid SQLite extension and should be from a trusted source. +/// This function uses `unsafe` because `rusqlite::Connection::load_extension` +/// calls `sqlite3_load_extension`, which loads a native shared library (`.so` / `.dylib` / `.dll`) +/// into the current process. Loading an untrusted library can execute arbitrary code. +/// +/// **Requirements for safe usage:** +/// - The extension binary **must** come from a trusted, verified source (e.g., official releases). +/// - The `path` argument should **never** be derived from user-supplied input. +/// - Consider validating file checksums before loading in production deployments. pub fn load_vector_extension(conn: &Connection, path: Option<&Path>) -> Result<()> { let ext_path = path.unwrap_or_else(|| { #[cfg(target_os = "linux")] diff --git a/crates/azoth-vector/src/lib.rs b/crates/azoth-vector/src/lib.rs index 56a4ff6..c8d93b3 100644 --- a/crates/azoth-vector/src/lib.rs +++ b/crates/azoth-vector/src/lib.rs @@ -42,7 +42,7 @@ //! //! // Search for similar vectors //! let query = Vector::new(vec![0.15, 0.25, 0.35]); -//! let search = VectorSearch::new(db.projection().clone(), "embeddings", "vector"); +//! let search = VectorSearch::new(db.projection().clone(), "embeddings", "vector")?; //! let results = search.knn(&query, 10).await?; //! # Ok(()) //! # } diff --git a/crates/azoth-vector/src/search.rs b/crates/azoth-vector/src/search.rs index 34aa774..e224588 100644 --- a/crates/azoth-vector/src/search.rs +++ b/crates/azoth-vector/src/search.rs @@ -6,6 +6,44 @@ use azoth_sqlite::SqliteProjectionStore; use rusqlite::params; use std::sync::Arc; +/// Validate that a SQL identifier (table or column name) is safe. +/// +/// Only allows `[a-zA-Z_][a-zA-Z0-9_]*` to prevent SQL injection via +/// identifier manipulation. Returns an error if the identifier is invalid. +fn validate_sql_identifier(name: &str, kind: &str) -> Result<()> { + if name.is_empty() { + return Err(azoth_core::error::AzothError::Config(format!( + "{} name must not be empty", + kind + ))); + } + if name.len() > 128 { + return Err(azoth_core::error::AzothError::Config(format!( + "{} name must be 128 characters or fewer, got {}", + kind, + name.len() + ))); + } + let mut chars = name.chars(); + let first = chars.next().unwrap(); // safe: name is non-empty + if !first.is_ascii_alphabetic() && first != '_' { + return Err(azoth_core::error::AzothError::Config(format!( + "{} name '{}' must start with a letter or underscore", + kind, name + ))); + } + for c in chars { + if !c.is_ascii_alphanumeric() && c != '_' { + return Err(azoth_core::error::AzothError::Config(format!( + "{} name '{}' contains invalid character '{}'. \ + Only ASCII alphanumeric and underscore are allowed.", + kind, name, c + ))); + } + } + Ok(()) +} + /// Vector search builder /// /// Provides k-NN search with optional filtering and custom distance metrics. @@ -20,7 +58,7 @@ use std::sync::Arc; /// let db = AzothDb::open("./data")?; /// /// let query = Vector::new(vec![0.1, 0.2, 0.3]); -/// let search = VectorSearch::new(db.projection().clone(), "embeddings", "vector") +/// let search = VectorSearch::new(db.projection().clone(), "embeddings", "vector")? /// .distance_metric(DistanceMetric::Cosine); /// /// let results = search.knn(&query, 10).await?; @@ -40,19 +78,28 @@ impl VectorSearch { /// # Arguments /// /// * `projection` - The SQLite projection store - /// * `table` - Table name containing the vector column - /// * `column` - Vector column name (must be initialized with vector_init) + /// * `table` - Table name containing the vector column (must be a valid SQL identifier) + /// * `column` - Vector column name (must be a valid SQL identifier, initialized with vector_init) + /// + /// # Errors + /// + /// Returns an error if `table` or `column` contain characters other than + /// ASCII alphanumeric and underscore, or don't start with a letter/underscore. pub fn new( projection: Arc, table: impl Into, column: impl Into, - ) -> Self { - Self { + ) -> Result { + let table = table.into(); + let column = column.into(); + validate_sql_identifier(&table, "Table")?; + validate_sql_identifier(&column, "Column")?; + Ok(Self { projection, - table: table.into(), - column: column.into(), + table, + column, distance_metric: DistanceMetric::Cosine, - } + }) } /// Set the distance metric @@ -82,6 +129,7 @@ impl VectorSearch { /// # } /// ``` pub async fn knn(&self, query: &Vector, k: usize) -> Result> { + // Table and column are validated at construction time via validate_sql_identifier let table = self.table.clone(); let column = self.column.clone(); let query_json = query.to_json(); @@ -91,10 +139,8 @@ impl VectorSearch { .query_async(move |conn| { let sql = format!( "SELECT rowid, distance - FROM vector_quantize_scan('{}', '{}', ?, ?) + FROM vector_quantize_scan('{table}', '{column}', ?, ?) ORDER BY distance ASC", - table.replace('\'', "''"), - column.replace('\'', "''") ); let mut stmt = conn @@ -149,6 +195,15 @@ impl VectorSearch { /// /// Allows filtering results by additional columns in the table. /// + /// # Safety (SQL Injection) + /// + /// The `filter` string is interpolated into the WHERE clause of the query. + /// **Always use `?` placeholders** for values and pass them via `filter_params`. + /// Never interpolate user input directly into the filter string. + /// + /// Table and column identifiers are validated at `VectorSearch::new()` time + /// to prevent identifier injection. + /// /// # Example /// /// ```no_run @@ -156,10 +211,15 @@ impl VectorSearch { /// # async fn example(search: VectorSearch) -> Result<(), Box> { /// let query = Vector::new(vec![0.1, 0.2, 0.3]); /// - /// // Only search within a specific category + /// // GOOD: parameterized filter /// let results = search - /// .knn_filtered(&query, 10, "category = ?", vec!["tech".to_string()]) + /// .knn_filtered(&query, 10, "t.category = ?", vec!["tech".to_string()]) /// .await?; + /// + /// // BAD (DO NOT DO THIS): interpolating user input + /// // let results = search + /// // .knn_filtered(&query, 10, &format!("t.category = '{}'", user_input), vec![]) + /// // .await?; /// # Ok(()) /// # } /// ``` @@ -170,6 +230,7 @@ impl VectorSearch { filter: &str, filter_params: Vec, ) -> Result> { + // Table and column are validated at construction time via validate_sql_identifier let table = self.table.clone(); let column = self.column.clone(); let query_json = query.to_json(); @@ -180,14 +241,10 @@ impl VectorSearch { .query_async(move |conn| { let sql = format!( "SELECT v.rowid, v.distance - FROM vector_quantize_scan('{}', '{}', ?, ?) AS v - JOIN {} AS t ON v.rowid = t.rowid - WHERE {} + FROM vector_quantize_scan('{table}', '{column}', ?, ?) AS v + JOIN {table} AS t ON v.rowid = t.rowid + WHERE {filter} ORDER BY v.distance ASC", - table.replace('\'', "''"), - column.replace('\'', "''"), - table.replace('\'', "''"), - filter ); let mut params_vec: Vec> = @@ -269,8 +326,7 @@ mod tests { use super::*; use azoth_core::traits::ProjectionStore; - #[test] - fn test_search_builder() { + fn make_store() -> Arc { use tempfile::tempdir; let dir = tempdir().unwrap(); let db_path = dir.path().join("test.db"); @@ -284,15 +340,51 @@ mod tests { read_pool: azoth_core::config::ReadPoolConfig::default(), }; - let store = Arc::new(azoth_sqlite::SqliteProjectionStore::open(config).unwrap()); + // Leak the tempdir so it lives long enough for the test + std::mem::forget(dir); + Arc::new(azoth_sqlite::SqliteProjectionStore::open(config).unwrap()) + } + + #[test] + fn test_search_builder() { + let store = make_store(); - let search = - VectorSearch::new(store.clone(), "test", "vector").distance_metric(DistanceMetric::L2); + let search = VectorSearch::new(store.clone(), "test", "vector") + .unwrap() + .distance_metric(DistanceMetric::L2); assert_eq!(search.table(), "test"); assert_eq!(search.column(), "vector"); assert_eq!(search.distance_metric_value(), DistanceMetric::L2); } + #[test] + fn test_identifier_validation_rejects_injection() { + let store = make_store(); + + // SQL injection in table name should be rejected + let result = VectorSearch::new(store.clone(), "x; DROP TABLE y; --", "vector"); + assert!(result.is_err()); + + // SQL injection in column name should be rejected + let result = VectorSearch::new(store.clone(), "test", "v'; DROP TABLE y; --"); + assert!(result.is_err()); + + // Empty names should be rejected + let result = VectorSearch::new(store.clone(), "", "vector"); + assert!(result.is_err()); + + // Names starting with digits should be rejected + let result = VectorSearch::new(store.clone(), "123table", "vector"); + assert!(result.is_err()); + + // Valid identifiers should work + let result = VectorSearch::new(store.clone(), "my_table", "embedding_col"); + assert!(result.is_ok()); + + let result = VectorSearch::new(store.clone(), "_private", "_col"); + assert!(result.is_ok()); + } + // Full integration tests with vector extension in tests/ directory } diff --git a/crates/azoth-vector/tests/integration_test.rs b/crates/azoth-vector/tests/integration_test.rs index 06fc1a3..1e426f2 100644 --- a/crates/azoth-vector/tests/integration_test.rs +++ b/crates/azoth-vector/tests/integration_test.rs @@ -263,7 +263,7 @@ mod with_extension { run_vector_quantize(&store, "embeddings", "vector"); let query = Vector::new(vec![0.9, 0.1, 0.0]); - let search = VectorSearch::new(store.clone(), "embeddings", "vector"); + let search = VectorSearch::new(store.clone(), "embeddings", "vector").unwrap(); let results = search.knn(&query, 2).await.unwrap(); assert_eq!(results.len(), 2); @@ -341,7 +341,7 @@ mod with_extension { run_vector_quantize(&store, "items", "vector"); let query = Vector::new(vec![0.95, 0.05, 0.0]); - let search = VectorSearch::new(store.clone(), "items", "vector"); + let search = VectorSearch::new(store.clone(), "items", "vector").unwrap(); let results = search .knn_filtered( @@ -412,7 +412,7 @@ mod with_extension { run_vector_quantize(&store, "docs", "vector"); let query = Vector::new(vec![0.9, 0.1, 0.0]); - let search = VectorSearch::new(store.clone(), "docs", "vector"); + let search = VectorSearch::new(store.clone(), "docs", "vector").unwrap(); let results = search.threshold(&query, 0.4, 10).await.unwrap(); diff --git a/crates/azoth/benches/basic_benchmark.rs b/crates/azoth/benches/basic_benchmark.rs index f008ef0..7578775 100644 --- a/crates/azoth/benches/basic_benchmark.rs +++ b/crates/azoth/benches/basic_benchmark.rs @@ -94,12 +94,12 @@ fn main() { println!(" {} transactions in {:?}", count, duration); println!(" {:.2} tx/sec\n", tps); - // Benchmark 6: State reads + // Benchmark 6: State reads (using read-only transactions to avoid blocking writers) println!("6. State Read Performance"); let start = Instant::now(); let count = 10_000; for i in 0..count { - let txn = db.canonical().write_txn().unwrap(); + let txn = db.canonical().read_txn().unwrap(); let key = format!("key_{}", i % 100); // Read from first 100 keys let _ = txn.get_state(key.as_bytes()).unwrap(); } diff --git a/crates/azoth/src/dlq_replayer.rs b/crates/azoth/src/dlq_replayer.rs index a4b6d24..eb1b92d 100644 --- a/crates/azoth/src/dlq_replayer.rs +++ b/crates/azoth/src/dlq_replayer.rs @@ -162,20 +162,44 @@ pub enum ReplayPriority { } impl ReplayPriority { - fn order_by_clause(&self) -> String { + /// Generate the ORDER BY clause for this priority. + /// + /// For `ByErrorType`, error type strings are validated against a strict + /// allowlist (`[a-zA-Z0-9_ -.]`) to prevent SQL injection via CASE/LIKE expressions. + fn order_by_clause(&self) -> Result { match self { - ReplayPriority::FIFO => "failed_at ASC".to_string(), - ReplayPriority::LIFO => "failed_at DESC".to_string(), - ReplayPriority::ByRetryCount => "retry_count ASC, failed_at ASC".to_string(), + ReplayPriority::FIFO => Ok("failed_at ASC".to_string()), + ReplayPriority::LIFO => Ok("failed_at DESC".to_string()), + ReplayPriority::ByRetryCount => Ok("retry_count ASC, failed_at ASC".to_string()), ReplayPriority::ByErrorType(types) => { - // Order by CASE expression matching error types + // Validate each error type to prevent SQL injection. + // Only alphanumeric, underscore, dash, dot, and space are allowed. + for t in types { + if t.is_empty() || t.len() > 128 { + return Err(AzothError::Config(format!( + "ByErrorType string must be 1-128 characters, got length {}", + t.len() + ))); + } + if !t.chars().all(|c| { + c.is_alphanumeric() || c == '_' || c == '-' || c == '.' || c == ' ' + }) { + return Err(AzothError::Config(format!( + "ByErrorType string '{}' contains disallowed characters. \ + Only alphanumeric, underscore, dash, dot, and space are permitted.", + t + ))); + } + } + + // Safe to interpolate after validation let cases = types .iter() .enumerate() .map(|(i, t)| format!("WHEN error_message LIKE '%{}%' THEN {}", t, i)) .collect::>() .join(" "); - format!("CASE {} ELSE 999 END, failed_at ASC", cases) + Ok(format!("CASE {} ELSE 999 END, failed_at ASC", cases)) } } } @@ -390,7 +414,7 @@ impl DlqReplayer { } fn get_eligible_events(&self) -> Result> { - let order_by = self.config.priority.order_by_clause(); + let order_by = self.config.priority.order_by_clause()?; let min_age_secs = self.config.min_age.as_secs(); let query = format!( @@ -585,13 +609,35 @@ mod tests { #[test] fn test_replay_priority_order_by() { let priority = ReplayPriority::FIFO; - assert_eq!(priority.order_by_clause(), "failed_at ASC"); + assert_eq!(priority.order_by_clause().unwrap(), "failed_at ASC"); let priority = ReplayPriority::LIFO; - assert_eq!(priority.order_by_clause(), "failed_at DESC"); + assert_eq!(priority.order_by_clause().unwrap(), "failed_at DESC"); let priority = ReplayPriority::ByRetryCount; - assert_eq!(priority.order_by_clause(), "retry_count ASC, failed_at ASC"); + assert_eq!( + priority.order_by_clause().unwrap(), + "retry_count ASC, failed_at ASC" + ); + } + + #[test] + fn test_replay_priority_by_error_type_validation() { + // Valid error types should work + let priority = ReplayPriority::ByErrorType(vec![ + "timeout".to_string(), + "connection_error".to_string(), + ]); + assert!(priority.order_by_clause().is_ok()); + + // SQL injection attempt should be rejected + let priority = + ReplayPriority::ByErrorType(vec!["'; DROP TABLE dead_letter_queue; --".to_string()]); + assert!(priority.order_by_clause().is_err()); + + // Empty string should be rejected + let priority = ReplayPriority::ByErrorType(vec!["".to_string()]); + assert!(priority.order_by_clause().is_err()); } #[test] diff --git a/crates/azoth/src/event_processor.rs b/crates/azoth/src/event_processor.rs index 843a2c5..36c4bcd 100644 --- a/crates/azoth/src/event_processor.rs +++ b/crates/azoth/src/event_processor.rs @@ -91,7 +91,15 @@ impl EventProcessorBuilder { self } - /// Set the batch size (how many events to process at once) + /// Set the batch size (how many events to process per `process_batch` call). + /// + /// Larger batches increase throughput by amortizing per-batch overhead, but + /// require more memory and increase latency to first processed event. + /// + /// **Recommended values:** + /// - `100` (default) – good starting point for low-latency workloads + /// - `1,000` – balanced throughput for steady-state processing + /// - `10,000+` – maximum throughput for bulk catch-up (e.g. replaying a large backlog) pub fn with_batch_size(mut self, size: usize) -> Self { self.batch_size = size; self diff --git a/crates/azoth/src/ipfs.rs b/crates/azoth/src/ipfs.rs index 1f573f9..5f97c79 100644 --- a/crates/azoth/src/ipfs.rs +++ b/crates/azoth/src/ipfs.rs @@ -34,7 +34,9 @@ use serde::{Deserialize, Serialize}; use std::path::Path; /// IPFS provider configuration -#[derive(Debug, Clone, Serialize, Deserialize)] +/// +/// The `Debug` impl redacts API credentials to prevent accidental exposure in logs. +#[derive(Clone, Serialize, Deserialize)] pub enum IpfsProvider { /// Gateway-only provider (read-only) Gateway { url: String }, @@ -46,6 +48,20 @@ pub enum IpfsProvider { }, } +impl std::fmt::Debug for IpfsProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + IpfsProvider::Gateway { url } => f.debug_struct("Gateway").field("url", url).finish(), + IpfsProvider::Pinata { gateway_url, .. } => f + .debug_struct("Pinata") + .field("api_key", &"[REDACTED]") + .field("secret_key", &"[REDACTED]") + .field("gateway_url", gateway_url) + .finish(), + } + } +} + impl IpfsProvider { /// Create provider from environment variables ///