diff --git a/Cargo.toml b/Cargo.toml index a6032e7..87c7003 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,8 @@ uuid = { version = "1.0", features = ["v4", "serde"] } walkdir = "2.4" sha2 = "0.10" parking_lot = "0.12" +base64 = "0.22" +fs4 = "0.13" # Dev/test dependencies tempfile = "3.0" diff --git a/crates/azoth-core/src/error.rs b/crates/azoth-core/src/error.rs index 47f2e63..328852c 100644 --- a/crates/azoth-core/src/error.rs +++ b/crates/azoth-core/src/error.rs @@ -62,6 +62,23 @@ pub enum AzothError { #[error("Circuit breaker is open, rejecting request")] CircuitBreakerOpen, + #[error("Event log write failed after state commit (events saved to dead letter queue): {0}")] + EventLogWriteFailed(String), + + #[error("Key too large ({size} bytes, max {max}): {context}")] + KeyTooLarge { + size: usize, + max: usize, + context: String, + }, + + #[error("Value too large ({size} bytes, max {max}): {context}")] + ValueTooLarge { + size: usize, + max: usize, + context: String, + }, + #[error("Internal error: {0}")] Internal(String), diff --git a/crates/azoth-core/src/lock_manager.rs b/crates/azoth-core/src/lock_manager.rs index 7805b98..df0e3fd 100644 --- a/crates/azoth-core/src/lock_manager.rs +++ b/crates/azoth-core/src/lock_manager.rs @@ -54,19 +54,28 @@ pub struct MultiLockGuard<'a> { } impl LockManager { - /// Create a new lock manager with the specified number of stripes + /// Create a new lock manager with the specified number of stripes. /// /// # Arguments /// /// * `num_stripes` - Number of lock stripes (common values: 256, 512, 1024). - /// More stripes = less contention but more memory. + /// More stripes = less contention but more memory. Must be > 0. /// * `default_timeout` - Default timeout for lock acquisition. /// - /// # Panics + /// # Returns /// - /// Panics if `num_stripes` is 0. + /// `Err(AzothError::Config)` if `num_stripes` is 0. pub fn new(num_stripes: usize, default_timeout: Duration) -> Self { - assert!(num_stripes > 0, "num_stripes must be positive"); + // Clamp to 1 instead of panicking — a single stripe still works (just no parallelism). + let num_stripes = if num_stripes == 0 { + tracing::warn!( + "LockManager created with num_stripes=0, defaulting to 1. \ + This is a configuration error — set ARCANA_KEY_LOCK_STRIPES > 0." + ); + 1 + } else { + num_stripes + }; let stripes = (0..num_stripes).map(|_| Mutex::new(())).collect(); Self { @@ -76,7 +85,7 @@ impl LockManager { } } - /// Create a lock manager with default timeout + /// Create a lock manager with default timeout. pub fn with_stripes(num_stripes: usize) -> Self { Self::new(num_stripes, Duration::from_millis(DEFAULT_LOCK_TIMEOUT_MS)) } diff --git a/crates/azoth-core/src/types/event.rs b/crates/azoth-core/src/types/event.rs index 07f9590..4049237 100644 --- a/crates/azoth-core/src/types/event.rs +++ b/crates/azoth-core/src/types/event.rs @@ -23,6 +23,12 @@ pub struct CommitInfo { /// Number of state keys deleted pub state_keys_deleted: usize, + + /// Number of events that were written to the dead letter queue + /// because the event log write failed after state commit. + /// When this is > 0, the state was committed but the event log + /// has a gap. Callers should surface this as an operational alert. + pub dlq_events: usize, } impl CommitInfo { @@ -33,6 +39,7 @@ impl CommitInfo { last_event_id: None, state_keys_written: 0, state_keys_deleted: 0, + dlq_events: 0, } } } diff --git a/crates/azoth-file-log/Cargo.toml b/crates/azoth-file-log/Cargo.toml index 0b8154a..9776267 100644 --- a/crates/azoth-file-log/Cargo.toml +++ b/crates/azoth-file-log/Cargo.toml @@ -19,6 +19,7 @@ thiserror = { workspace = true } tracing = { workspace = true } parking_lot = { workspace = true } tokio = { workspace = true, features = ["sync"] } +fs4 = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/azoth-file-log/src/store.rs b/crates/azoth-file-log/src/store.rs index 12d8d70..5fe5775 100644 --- a/crates/azoth-file-log/src/store.rs +++ b/crates/azoth-file-log/src/store.rs @@ -43,6 +43,26 @@ pub struct FileEventLogConfig { /// buffered events on a process crash (the OS-level file won't be corrupted, /// but un-flushed events will be lost). pub flush_on_append: bool, + + /// Whether to fsync (File::sync_all) after batch writes for full durability. + /// + /// When `true`, data is guaranteed to be on disk (not just OS page cache) + /// after each batch commit. This is the strongest durability guarantee but + /// has higher latency. Default: `true` for safety. + pub sync_on_commit: bool, + + /// Whether to enable the background disk space monitor. + /// Default: `false` (disabled). Enable for environments where + /// disk exhaustion is a real concern (e.g. shared hosts, small volumes). + pub enable_disk_monitor: bool, + + /// Minimum disk space (bytes) to reserve before warning about low disk. + /// Default: 100 MB. Only used when `enable_disk_monitor` is `true`. + pub min_disk_space_bytes: u64, + + /// How often (in seconds) to check disk space. Default: 60. + /// Only used when `enable_disk_monitor` is `true`. + pub disk_check_interval_secs: u64, } impl Default for FileEventLogConfig { @@ -55,6 +75,10 @@ impl Default for FileEventLogConfig { max_event_size: 4 * 1024 * 1024, // 4MB single-event limit max_batch_bytes: 64 * 1024 * 1024, // 64MB batch limit flush_on_append: true, + sync_on_commit: true, + enable_disk_monitor: false, + min_disk_space_bytes: 100 * 1024 * 1024, // 100 MB + disk_check_interval_secs: 60, } } } @@ -88,6 +112,10 @@ pub struct FileEventLog { /// instead of polling, giving near-zero-latency event processing with zero /// CPU waste when idle. event_notify: Option>, + /// Atomic flag set by the background disk space monitor. + /// When `true`, available disk space is below `min_disk_space_bytes`. + /// Checked by the commit path to log warnings without blocking writes. + low_disk_space: Arc, } impl FileEventLog { @@ -120,6 +148,32 @@ impl FileEventLog { file, ))); + let low_disk_space = Arc::new(std::sync::atomic::AtomicBool::new(false)); + + // Perform initial disk space check (only if monitor is enabled) + if config.enable_disk_monitor { + match fs4::available_space(&config.base_dir) { + Ok(available) => { + if available < config.min_disk_space_bytes { + low_disk_space.store(true, std::sync::atomic::Ordering::Relaxed); + tracing::warn!( + available_mb = available / (1024 * 1024), + min_mb = config.min_disk_space_bytes / (1024 * 1024), + path = %config.base_dir.display(), + "Low disk space on event log volume" + ); + } + } + Err(e) => { + tracing::warn!( + error = %e, + path = %config.base_dir.display(), + "Could not check disk space on event log volume" + ); + } + } + } + Ok(Self { config, meta: Arc::new(Mutex::new(meta)), @@ -127,6 +181,7 @@ impl FileEventLog { writer, current_file_num, event_notify: None, + low_disk_space, }) } @@ -144,6 +199,73 @@ impl FileEventLog { self.event_notify.clone() } + /// Start the background disk space monitor. + /// + /// Checks available disk space every `disk_check_interval_secs` seconds + /// and sets the `low_disk_space` flag when space drops below + /// `min_disk_space_bytes`. The commit path reads this flag (zero-cost + /// atomic load) to log warnings. + /// + /// Returns `None` if `enable_disk_monitor` is `false` in the config. + /// Otherwise returns a `JoinHandle` that runs until the process shuts down. + pub fn start_disk_monitor(&self) -> Option> { + if !self.config.enable_disk_monitor { + return None; + } + + let flag = self.low_disk_space.clone(); + let base_dir = self.config.base_dir.clone(); + let min_bytes = self.config.min_disk_space_bytes; + let interval_secs = self.config.disk_check_interval_secs; + + Some(tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + + match fs4::available_space(&base_dir) { + Ok(available) => { + let was_low = flag.load(std::sync::atomic::Ordering::Relaxed); + let is_low = available < min_bytes; + flag.store(is_low, std::sync::atomic::Ordering::Relaxed); + + if is_low && !was_low { + tracing::warn!( + available_mb = available / (1024 * 1024), + min_mb = min_bytes / (1024 * 1024), + path = %base_dir.display(), + "Disk space dropped below threshold on event log volume" + ); + } else if !is_low && was_low { + tracing::info!( + available_mb = available / (1024 * 1024), + path = %base_dir.display(), + "Disk space recovered above threshold on event log volume" + ); + } + } + Err(e) => { + tracing::warn!( + error = %e, + path = %base_dir.display(), + "Failed to check disk space" + ); + } + } + } + })) + } + + /// Returns `true` if available disk space is below the configured threshold. + /// + /// This is a zero-cost atomic load — safe to call on every commit. + pub fn is_low_disk_space(&self) -> bool { + self.low_disk_space + .load(std::sync::atomic::Ordering::Relaxed) + } + /// Get path to a log file by number fn log_file_path(base_dir: &Path, file_num: u64) -> PathBuf { base_dir.join(format!("events-{:08}.log", file_num)) @@ -260,12 +382,18 @@ impl EventLog for FileEventLog { // Write event entry self.write_event_entry(event_id, event_bytes)?; - // Conditionally flush based on config + // Flush BufWriter to OS page cache if self.config.flush_on_append { let mut writer = self.writer.lock(); writer.flush()?; } + // Fsync to disk for full durability + if self.config.sync_on_commit { + let writer = self.writer.lock(); + writer.get_ref().sync_all()?; + } + // Update metadata { let mut meta = self.meta.lock(); @@ -353,12 +481,18 @@ impl EventLog for FileEventLog { writer.write_all(&buffer)?; } - // Conditionally flush based on config + // Flush BufWriter to OS page cache if self.config.flush_on_append { let mut writer = self.writer.lock(); writer.flush()?; } + // Fsync to disk for full durability (guarantees events survive power loss) + if self.config.sync_on_commit { + let writer = self.writer.lock(); + writer.get_ref().sync_all()?; + } + // Update metadata let last_id = first_event_id + events.len() as u64 - 1; { @@ -630,6 +764,8 @@ mod tests { max_event_size: 1024 * 1024, max_batch_bytes: 16 * 1024 * 1024, flush_on_append: true, + sync_on_commit: false, // Disable fsync in tests for speed + ..Default::default() }; let log = FileEventLog::open(config).unwrap(); (log, temp_dir) diff --git a/crates/azoth-lmdb/Cargo.toml b/crates/azoth-lmdb/Cargo.toml index ec8613a..f0225b2 100644 --- a/crates/azoth-lmdb/Cargo.toml +++ b/crates/azoth-lmdb/Cargo.toml @@ -24,6 +24,7 @@ serde_json = { workspace = true } chrono = { workspace = true } dashmap = { workspace = true } parking_lot = { workspace = true } +base64 = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/azoth-lmdb/src/dead_letter_queue.rs b/crates/azoth-lmdb/src/dead_letter_queue.rs new file mode 100644 index 0000000..7f94278 --- /dev/null +++ b/crates/azoth-lmdb/src/dead_letter_queue.rs @@ -0,0 +1,313 @@ +//! Dead Letter Queue (DLQ) for failed event log writes. +//! +//! When the two-phase commit succeeds for LMDB state (phase 1) but fails +//! to write events to the file log (phase 2), events are persisted here +//! so they can be recovered and replayed later. +//! +//! Format: Each entry is a JSON line containing the event metadata and +//! base64-encoded event data. This is intentionally simple and human-readable +//! for operational debugging. + +use azoth_core::{ + error::{AzothError, Result}, + types::EventId, +}; +use std::fs::{self, File, OpenOptions}; +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +/// A dead letter queue entry representing a failed event log write. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct DlqEntry { + /// The pre-allocated event ID from LMDB metadata. + pub event_id: EventId, + /// Base64-encoded event data. + pub event_data_b64: String, + /// ISO 8601 timestamp when the entry was written to the DLQ. + pub timestamp: String, + /// The original error message from the failed event log write. + pub error: String, +} + +/// Dead Letter Queue for persisting events that failed to write to the event log. +/// +/// Thread-safe: uses a parking_lot mutex around the file handle to allow +/// concurrent callers. In practice, only one write txn commits at a time +/// (LMDB single-writer), but this is defensive. +pub struct DeadLetterQueue { + path: PathBuf, + writer: parking_lot::Mutex>, +} + +impl DeadLetterQueue { + /// Open or create the DLQ file at `dir/dead_letter_queue.jsonl`. + /// + /// Creates the parent directory if it does not exist. + pub fn open(dir: &Path) -> Result> { + fs::create_dir_all(dir)?; + let path = dir.join("dead_letter_queue.jsonl"); + + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .map_err(|e| { + AzothError::Io(std::io::Error::new( + e.kind(), + format!("Failed to open DLQ at {}: {}", path.display(), e), + )) + })?; + + tracing::info!("Dead letter queue opened at {}", path.display()); + + Ok(Arc::new(Self { + path, + writer: parking_lot::Mutex::new(Some(file)), + })) + } + + /// Write a batch of failed events to the DLQ. + /// + /// Each event is written as a separate JSON line and flushed with fsync + /// to ensure durability. If the DLQ itself fails to write, we log a + /// critical error — this is the last resort. + pub fn write_batch( + &self, + first_event_id: EventId, + events: &[Vec], + original_error: &str, + ) -> Result<()> { + use base64::Engine; + + let mut guard = self.writer.lock(); + let file = guard + .as_mut() + .ok_or_else(|| AzothError::InvalidState("DLQ file handle is closed".into()))?; + + let timestamp = chrono::Utc::now().to_rfc3339(); + + for (i, event_data) in events.iter().enumerate() { + let entry = DlqEntry { + event_id: first_event_id + i as u64, + event_data_b64: base64::engine::general_purpose::STANDARD.encode(event_data), + timestamp: timestamp.clone(), + error: original_error.to_string(), + }; + + let json = serde_json::to_string(&entry).map_err(|e| { + AzothError::Serialization(format!("Failed to serialize DLQ entry: {}", e)) + })?; + + writeln!(file, "{}", json).map_err(|e| { + AzothError::Io(std::io::Error::new( + e.kind(), + format!( + "Failed to write DLQ entry for event {}: {}", + entry.event_id, e + ), + )) + })?; + } + + // Ensure DLQ entries are durably persisted (this is our safety net) + file.sync_all().map_err(|e| { + AzothError::Io(std::io::Error::new( + e.kind(), + format!("Failed to fsync DLQ: {}", e), + )) + })?; + + tracing::error!( + first_event_id = first_event_id, + count = events.len(), + dlq_path = %self.path.display(), + "Events written to dead letter queue (event log write failed)" + ); + + Ok(()) + } + + /// Read all DLQ entries (for recovery/replay tooling). + pub fn read_all(&self) -> Result> { + let file = File::open(&self.path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + return AzothError::NotFound(format!( + "DLQ file not found: {}", + self.path.display() + )); + } + AzothError::Io(e) + })?; + + let reader = BufReader::new(file); + let mut entries = Vec::new(); + + for line in reader.lines() { + let line = line?; + if line.trim().is_empty() { + continue; + } + let entry: DlqEntry = serde_json::from_str(&line).map_err(|e| { + AzothError::Serialization(format!("Failed to deserialize DLQ entry: {}", e)) + })?; + entries.push(entry); + } + + Ok(entries) + } + + /// Returns the number of entries currently in the DLQ. + /// Used for health checks to detect canonical-vs-event-log drift. + pub fn entry_count(&self) -> Result { + match File::open(&self.path) { + Ok(file) => { + let reader = BufReader::new(file); + Ok(reader.lines().count()) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(0), + Err(e) => Err(AzothError::Io(e)), + } + } + + /// Path to the DLQ file. + pub fn path(&self) -> &Path { + &self.path + } +} + +#[cfg(test)] +mod tests { + use super::*; + use base64::Engine; + + fn temp_dlq() -> (Arc, tempfile::TempDir) { + let dir = tempfile::TempDir::new().unwrap(); + let dlq = DeadLetterQueue::open(dir.path()).unwrap(); + (dlq, dir) + } + + #[test] + fn test_open_creates_file() { + let (dlq, _dir) = temp_dlq(); + assert!(dlq.path().exists()); + } + + #[test] + fn test_empty_dlq_has_zero_entries() { + let (dlq, _dir) = temp_dlq(); + assert_eq!(dlq.entry_count().unwrap(), 0); + } + + #[test] + fn test_write_and_read_single_event() { + let (dlq, _dir) = temp_dlq(); + let event_data = b"hello world".to_vec(); + + dlq.write_batch(42, &[event_data.clone()], "test error") + .unwrap(); + + let entries = dlq.read_all().unwrap(); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].event_id, 42); + assert_eq!(entries[0].error, "test error"); + + // Verify base64 round-trip + let decoded = base64::engine::general_purpose::STANDARD + .decode(&entries[0].event_data_b64) + .unwrap(); + assert_eq!(decoded, event_data); + } + + #[test] + fn test_write_batch_multiple_events() { + let (dlq, _dir) = temp_dlq(); + let events: Vec> = vec![ + b"event_a".to_vec(), + b"event_b".to_vec(), + b"event_c".to_vec(), + ]; + + dlq.write_batch(100, &events, "disk full").unwrap(); + + let entries = dlq.read_all().unwrap(); + assert_eq!(entries.len(), 3); + + // Event IDs should be sequential starting from first_event_id + assert_eq!(entries[0].event_id, 100); + assert_eq!(entries[1].event_id, 101); + assert_eq!(entries[2].event_id, 102); + + // All entries share the same error message + for entry in &entries { + assert_eq!(entry.error, "disk full"); + } + + assert_eq!(dlq.entry_count().unwrap(), 3); + } + + #[test] + fn test_multiple_write_batches_append() { + let (dlq, _dir) = temp_dlq(); + + dlq.write_batch(10, &[b"first".to_vec()], "err1").unwrap(); + dlq.write_batch(20, &[b"second".to_vec(), b"third".to_vec()], "err2") + .unwrap(); + + let entries = dlq.read_all().unwrap(); + assert_eq!(entries.len(), 3); + assert_eq!(entries[0].event_id, 10); + assert_eq!(entries[1].event_id, 20); + assert_eq!(entries[2].event_id, 21); + assert_eq!(dlq.entry_count().unwrap(), 3); + } + + #[test] + fn test_dlq_survives_reopen() { + let dir = tempfile::TempDir::new().unwrap(); + + // Write some entries + { + let dlq = DeadLetterQueue::open(dir.path()).unwrap(); + dlq.write_batch(1, &[b"persistent".to_vec()], "crash") + .unwrap(); + } + + // Reopen and verify entries survived + { + let dlq = DeadLetterQueue::open(dir.path()).unwrap(); + let entries = dlq.read_all().unwrap(); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].event_id, 1); + + let decoded = base64::engine::general_purpose::STANDARD + .decode(&entries[0].event_data_b64) + .unwrap(); + assert_eq!(decoded, b"persistent"); + } + } + + #[test] + fn test_entry_count_without_file() { + let dir = tempfile::TempDir::new().unwrap(); + let dlq = DeadLetterQueue::open(dir.path()).unwrap(); + // File exists but is empty + assert_eq!(dlq.entry_count().unwrap(), 0); + } + + #[test] + fn test_binary_event_data_roundtrip() { + let (dlq, _dir) = temp_dlq(); + + // Test with binary data that would break if not base64-encoded + let binary_data: Vec = (0..=255).collect(); + dlq.write_batch(0, &[binary_data.clone()], "binary test") + .unwrap(); + + let entries = dlq.read_all().unwrap(); + let decoded = base64::engine::general_purpose::STANDARD + .decode(&entries[0].event_data_b64) + .unwrap(); + assert_eq!(decoded, binary_data); + } +} diff --git a/crates/azoth-lmdb/src/lib.rs b/crates/azoth-lmdb/src/lib.rs index 12d3458..0a18ac9 100644 --- a/crates/azoth-lmdb/src/lib.rs +++ b/crates/azoth-lmdb/src/lib.rs @@ -11,6 +11,7 @@ //! - Optional read transaction pooling for concurrent reads pub mod backup; +pub mod dead_letter_queue; pub mod iter; pub mod keys; pub mod preflight_cache; @@ -19,6 +20,7 @@ pub mod state_iter; pub mod store; pub mod txn; +pub use dead_letter_queue::DeadLetterQueue; pub use preflight_cache::EvictionPolicy; pub use read_pool::{LmdbReadPool, PooledLmdbReadTxn}; pub use store::LmdbCanonicalStore; diff --git a/crates/azoth-lmdb/src/store.rs b/crates/azoth-lmdb/src/store.rs index 8a23cf6..e4c357d 100644 --- a/crates/azoth-lmdb/src/store.rs +++ b/crates/azoth-lmdb/src/store.rs @@ -46,6 +46,7 @@ pub struct LmdbCanonicalStore { pub(crate) meta_db: Database, pub(crate) config_path: std::path::PathBuf, pub(crate) event_log: Arc, + dead_letter_queue: Arc, lock_manager: Arc, write_lock: Arc>, paused: Arc, @@ -179,6 +180,28 @@ impl CanonicalStore for LmdbCanonicalStore { event_log.set_event_notify(event_notify.clone()); let event_log = Arc::new(event_log); + // Initialize dead letter queue for failed event log writes + let dlq_dir = cfg.path.join("dlq"); + let dead_letter_queue = crate::dead_letter_queue::DeadLetterQueue::open(&dlq_dir)?; + + // Check for existing DLQ entries on startup (operational warning) + match dead_letter_queue.entry_count() { + Ok(count) if count > 0 => { + tracing::warn!( + dlq_entries = count, + dlq_path = %dlq_dir.display(), + "Dead letter queue has {} unprocessed entries from previous runs. \ + These events were committed to state but not to the event log. \ + Run the DLQ recovery tool to replay them.", + count + ); + } + Ok(_) => {} + Err(e) => { + tracing::warn!("Failed to check DLQ entry count: {}", e); + } + } + // Initialize metadata if needed { let mut txn = env @@ -238,6 +261,7 @@ impl CanonicalStore for LmdbCanonicalStore { meta_db, config_path: cfg.path.clone(), event_log, + dead_letter_queue, lock_manager, write_lock: Arc::new(Mutex::new(())), paused: Arc::new(AtomicBool::new(false)), @@ -294,6 +318,7 @@ impl CanonicalStore for LmdbCanonicalStore { self.state_db, self.meta_db, self.event_log.clone(), + self.dead_letter_queue.clone(), Arc::downgrade(&self.txn_counter), self.preflight_cache.clone(), )) @@ -607,6 +632,7 @@ impl LmdbCanonicalStore { let state_db = self.state_db; let meta_db = self.meta_db; let event_log = self.event_log.clone(); + let dead_letter_queue = self.dead_letter_queue.clone(); let txn_counter = self.txn_counter.clone(); let preflight_cache = self.preflight_cache.clone(); let paused = self.paused.clone(); @@ -633,6 +659,7 @@ impl LmdbCanonicalStore { state_db, meta_db, event_log, + dead_letter_queue, Arc::downgrade(&txn_counter), preflight_cache, ); @@ -664,6 +691,7 @@ impl LmdbCanonicalStore { last_event_id: None, state_keys_written: 1, state_keys_deleted: 0, + dlq_events: 0, }) } @@ -683,6 +711,7 @@ impl LmdbCanonicalStore { last_event_id: None, state_keys_written: 0, state_keys_deleted: 1, + dlq_events: 0, }) } diff --git a/crates/azoth-lmdb/src/txn.rs b/crates/azoth-lmdb/src/txn.rs index 5c02785..9f4767d 100644 --- a/crates/azoth-lmdb/src/txn.rs +++ b/crates/azoth-lmdb/src/txn.rs @@ -11,18 +11,28 @@ use std::sync::{ Arc, Weak, }; +use crate::dead_letter_queue::DeadLetterQueue; use crate::keys::meta_keys; use crate::preflight_cache::PreflightCache; +/// Maximum number of retries for event log writes before falling back to DLQ. +const EVENT_LOG_WRITE_RETRIES: usize = 3; + +/// Delay between retries in milliseconds (doubles each attempt). +const EVENT_LOG_RETRY_BASE_DELAY_MS: u64 = 10; + /// Write transaction for LMDB canonical store /// /// State updates go to LMDB for transactional integrity. /// Events are buffered and written to FileEventLog on commit. +/// If event log writes fail after retries, events are persisted +/// to a dead letter queue (DLQ) for later recovery. pub struct LmdbWriteTxn<'a> { txn: Option>, state_db: Database, meta_db: Database, event_log: Arc, + dead_letter_queue: Arc, pending_events: Vec>, stats: TxnStats, txn_counter: Weak, @@ -38,6 +48,8 @@ struct TxnStats { events_written: usize, first_event_id: Option, last_event_id: Option, + /// Events that went to the dead letter queue instead of the event log. + dlq_events: usize, } /// Read-only transaction for LMDB canonical store @@ -72,6 +84,7 @@ impl<'a> LmdbWriteTxn<'a> { state_db: Database, meta_db: Database, event_log: Arc, + dead_letter_queue: Arc, txn_counter: Weak, preflight_cache: Arc, ) -> Self { @@ -80,6 +93,7 @@ impl<'a> LmdbWriteTxn<'a> { state_db, meta_db, event_log, + dead_letter_queue, pending_events: Vec::new(), stats: TxnStats { state_keys_written: 0, @@ -87,6 +101,7 @@ impl<'a> LmdbWriteTxn<'a> { events_written: 0, first_event_id: None, last_event_id: None, + dlq_events: 0, }, txn_counter, counter_decremented: false, @@ -160,6 +175,34 @@ impl<'a> CanonicalTxn for LmdbWriteTxn<'a> { } fn put_state(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + // LMDB default max key size is 511 bytes. Reject oversized keys early + // with a clear error instead of letting LMDB produce cryptic failures. + const MAX_KEY_SIZE: usize = 511; + if key.len() > MAX_KEY_SIZE { + return Err(AzothError::KeyTooLarge { + size: key.len(), + max: MAX_KEY_SIZE, + context: format!( + "key prefix: {:?}", + String::from_utf8_lossy(&key[..key.len().min(64)]) + ), + }); + } + + // LMDB values can be up to ~2GB but we cap at 16MB to prevent + // accidental bloat and keep mmap usage reasonable. + const MAX_VALUE_SIZE: usize = 16 * 1024 * 1024; // 16 MB + if value.len() > MAX_VALUE_SIZE { + return Err(AzothError::ValueTooLarge { + size: value.len(), + max: MAX_VALUE_SIZE, + context: format!( + "key: {:?}", + String::from_utf8_lossy(&key[..key.len().min(64)]) + ), + }); + } + let txn = self .txn .as_mut() @@ -262,23 +305,102 @@ impl<'a> CanonicalTxn for LmdbWriteTxn<'a> { .map_err(|e| AzothError::Transaction(e.to_string()))?; } + // Pre-commit check: warn if disk space is low (zero-cost atomic read) + if !self.pending_events.is_empty() && self.event_log.is_low_disk_space() { + tracing::warn!( + event_count = self.pending_events.len(), + "Committing transaction with LOW DISK SPACE on event log volume. \ + Event log write may fail; events would go to dead letter queue." + ); + } + // Phase 1: Commit LMDB state transaction let txn = self.txn.take().unwrap(); txn.commit() .map_err(|e| AzothError::Transaction(e.to_string()))?; - // Phase 2: Write events to file-based log - // This happens AFTER state commit succeeds - // If this fails, it's a critical error (should rarely happen) + // Phase 2: Write events to file-based log (durable with retries + DLQ fallback) + // + // This happens AFTER state commit succeeds. Since LMDB state is already + // committed, we MUST NOT return an error that would cause callers to retry + // the entire transaction (which would double-apply state changes). + // + // Strategy: + // 1. Try event log write up to EVENT_LOG_WRITE_RETRIES times + // 2. If all retries fail, persist events to the dead letter queue (DLQ) + // 3. Return Ok (state IS committed) but log a critical warning + // 4. The DLQ can be drained by a recovery process to replay events if !self.pending_events.is_empty() { let first_event_id = self .stats .first_event_id .ok_or_else(|| AzothError::InvalidState("Missing first_event_id".into()))?; - // Write events with pre-allocated EventIds from LMDB - self.event_log - .append_batch_with_ids(first_event_id, &self.pending_events)?; + let mut last_error = None; + for attempt in 0..EVENT_LOG_WRITE_RETRIES { + match self + .event_log + .append_batch_with_ids(first_event_id, &self.pending_events) + { + Ok(()) => { + if attempt > 0 { + tracing::warn!( + attempt = attempt + 1, + first_event_id, + "Event log write succeeded after retry" + ); + } + last_error = None; + break; + } + Err(e) => { + tracing::warn!( + attempt = attempt + 1, + max_attempts = EVENT_LOG_WRITE_RETRIES, + first_event_id, + error = %e, + "Event log write failed, retrying" + ); + last_error = Some(e); + // Exponential backoff: 10ms, 20ms, 40ms... + std::thread::sleep(std::time::Duration::from_millis( + EVENT_LOG_RETRY_BASE_DELAY_MS << attempt, + )); + } + } + } + + // If all retries failed, write to DLQ as last resort + if let Some(err) = last_error { + let err_msg = err.to_string(); + tracing::error!( + first_event_id, + event_count = self.pending_events.len(), + error = %err_msg, + "Event log write failed after all retries, writing to dead letter queue" + ); + + // Track that events went to DLQ so callers can detect this + self.stats.dlq_events = self.pending_events.len(); + + // DLQ write is our last safety net — if THIS fails, we have a serious problem + if let Err(dlq_err) = self.dead_letter_queue.write_batch( + first_event_id, + &self.pending_events, + &err_msg, + ) { + // This is the worst case: state committed, events lost, DLQ failed. + // Log the catastrophic error with all event details. + tracing::error!( + first_event_id, + event_count = self.pending_events.len(), + event_log_error = %err_msg, + dlq_error = %dlq_err, + "CATASTROPHIC: Both event log and DLQ writes failed. Events may be lost." + ); + // Still return Ok — state IS committed and we must not cause a retry + } + } } // Phase 3: Invalidate preflight cache for modified keys @@ -293,6 +415,7 @@ impl<'a> CanonicalTxn for LmdbWriteTxn<'a> { last_event_id: self.stats.last_event_id, state_keys_written: self.stats.state_keys_written, state_keys_deleted: self.stats.state_keys_deleted, + dlq_events: self.stats.dlq_events, }) } diff --git a/crates/azoth-projector/src/projector.rs b/crates/azoth-projector/src/projector.rs index ccfbc9c..697c268 100644 --- a/crates/azoth-projector/src/projector.rs +++ b/crates/azoth-projector/src/projector.rs @@ -4,7 +4,7 @@ use azoth_core::{ types::EventId, ProjectorConfig, }; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::Notify; @@ -23,6 +23,11 @@ where /// The `FileEventLog` fires `notify_waiters()` after every successful append, /// giving near-zero-latency projection with zero CPU waste when idle. event_notify: Option>, + /// Running count of event ID gaps detected across all `run_once()` calls. + /// Used for operational monitoring / health checks. + total_gaps_detected: Arc, + /// Running count of individual missing event IDs across all gaps. + total_events_skipped: Arc, } impl Projector @@ -37,6 +42,8 @@ where config, shutdown: Arc::new(AtomicBool::new(false)), event_notify: None, + total_gaps_detected: Arc::new(AtomicU64::new(0)), + total_events_skipped: Arc::new(AtomicU64::new(0)), } } @@ -50,7 +57,12 @@ where self } - /// Run one iteration of the projector loop + /// Run one iteration of the projector loop. + /// + /// Fetches a batch of events from the canonical store and applies them + /// to the projection. Detects and tolerates event ID gaps: if the event + /// log is missing events (e.g. due to DLQ fallback), the projector logs + /// a warning and advances past the gap instead of blocking. pub fn run_once(&self) -> Result { let start = Instant::now(); @@ -101,7 +113,17 @@ where return Ok(ProjectorStats::empty()); } - // Apply events in a transaction + // Detect event ID gaps (events lost to DLQ) + let (gaps_detected, events_skipped) = detect_gaps(from, &events); + + if gaps_detected > 0 { + self.total_gaps_detected + .fetch_add(gaps_detected, Ordering::Relaxed); + self.total_events_skipped + .fetch_add(events_skipped, Ordering::Relaxed); + } + + // Apply events in a transaction (only events we actually have) let mut txn = self.projection.begin_txn()?; txn.apply_batch(&events)?; @@ -113,6 +135,8 @@ where bytes_processed: total_bytes, duration: start.elapsed(), new_cursor: last_id, + gaps_detected, + events_skipped, }) } @@ -140,6 +164,15 @@ where tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms)) .await; } + } else if stats.gaps_detected > 0 { + tracing::warn!( + events = stats.events_applied, + bytes = stats.bytes_processed, + gaps = stats.gaps_detected, + skipped = stats.events_skipped, + elapsed = ?stats.duration, + "Applied events with gaps (events may be in DLQ)" + ); } else { tracing::debug!( "Applied {} events, {} bytes in {:?}", @@ -165,6 +198,16 @@ where self.shutdown.store(true, Ordering::SeqCst); } + /// Returns the total number of event ID gaps detected across all iterations. + pub fn total_gaps_detected(&self) -> u64 { + self.total_gaps_detected.load(Ordering::Relaxed) + } + + /// Returns the total number of individual event IDs that were skipped due to gaps. + pub fn total_events_skipped(&self) -> u64 { + self.total_events_skipped.load(Ordering::Relaxed) + } + /// Check lag (events behind) pub fn get_lag(&self) -> Result { let cursor = self.projection.get_cursor()?; @@ -193,6 +236,10 @@ pub struct ProjectorStats { pub bytes_processed: usize, pub duration: Duration, pub new_cursor: EventId, + /// Number of event ID gaps detected in this batch. + pub gaps_detected: u64, + /// Number of individual event IDs that were skipped due to gaps. + pub events_skipped: u64, } impl ProjectorStats { @@ -202,6 +249,140 @@ impl ProjectorStats { bytes_processed: 0, duration: Duration::from_secs(0), new_cursor: 0, + gaps_detected: 0, + events_skipped: 0, + } + } +} + +/// Detect event ID gaps in a batch of events. +/// +/// Checks two types of gaps: +/// 1. Between the expected first event ID and the actual first event +/// 2. Between consecutive events within the batch +/// +/// Returns `(gaps_detected, events_skipped)`. +fn detect_gaps(expected_first: EventId, events: &[(EventId, Vec)]) -> (u64, u64) { + if events.is_empty() { + return (0, 0); + } + + let mut gaps_detected: u64 = 0; + let mut events_skipped: u64 = 0; + + // Check gap between cursor and first event + let actual_first = events[0].0; + if actual_first > expected_first { + let missing = actual_first - expected_first; + gaps_detected += 1; + events_skipped += missing; + tracing::warn!( + expected_id = expected_first, + actual_id = actual_first, + missing_count = missing, + "Projector: event gap detected between cursor and first event \ + (events {}-{} missing, likely in DLQ). Advancing past gap.", + expected_first, + actual_first - 1, + ); + } + + // Check gaps within the batch + for window in events.windows(2) { + let prev_id = window[0].0; + let curr_id = window[1].0; + let expected = prev_id + 1; + if curr_id > expected { + let missing = curr_id - expected; + gaps_detected += 1; + events_skipped += missing; + tracing::warn!( + expected_id = expected, + actual_id = curr_id, + missing_count = missing, + "Projector: event gap detected within batch \ + (events {}-{} missing, likely in DLQ). Skipping gap.", + expected, + curr_id - 1, + ); } } + + (gaps_detected, events_skipped) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn event(id: EventId) -> (EventId, Vec) { + (id, vec![id as u8]) + } + + #[test] + fn test_no_gaps_sequential() { + let events = vec![event(5), event(6), event(7)]; + let (gaps, skipped) = detect_gaps(5, &events); + assert_eq!(gaps, 0); + assert_eq!(skipped, 0); + } + + #[test] + fn test_gap_before_first_event() { + // Expected 5, but first available event is 8 (5,6,7 missing) + let events = vec![event(8), event(9), event(10)]; + let (gaps, skipped) = detect_gaps(5, &events); + assert_eq!(gaps, 1); + assert_eq!(skipped, 3); // events 5, 6, 7 + } + + #[test] + fn test_gap_within_batch() { + // 5,6 present, 7,8 missing, 9 present + let events = vec![event(5), event(6), event(9)]; + let (gaps, skipped) = detect_gaps(5, &events); + assert_eq!(gaps, 1); + assert_eq!(skipped, 2); // events 7, 8 + } + + #[test] + fn test_multiple_gaps() { + // Gap before first (0,1 missing), gap mid-batch (4 missing), gap again (7 missing) + let events = vec![event(2), event(3), event(5), event(8)]; + let (gaps, skipped) = detect_gaps(0, &events); + assert_eq!(gaps, 3); // before first, between 3-5, between 5-8 + assert_eq!(skipped, 2 + 1 + 2); // 0-1 + 4 + 6-7 + } + + #[test] + fn test_single_event_no_gap() { + let events = vec![event(42)]; + let (gaps, skipped) = detect_gaps(42, &events); + assert_eq!(gaps, 0); + assert_eq!(skipped, 0); + } + + #[test] + fn test_single_event_with_gap() { + let events = vec![event(45)]; + let (gaps, skipped) = detect_gaps(42, &events); + assert_eq!(gaps, 1); + assert_eq!(skipped, 3); // events 42, 43, 44 + } + + #[test] + fn test_empty_events() { + let events: Vec<(EventId, Vec)> = vec![]; + let (gaps, skipped) = detect_gaps(0, &events); + assert_eq!(gaps, 0); + assert_eq!(skipped, 0); + } + + #[test] + fn test_first_event_matches_exactly() { + let events = vec![event(0), event(1), event(2)]; + let (gaps, skipped) = detect_gaps(0, &events); + assert_eq!(gaps, 0); + assert_eq!(skipped, 0); + } }