Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ uuid = { version = "1.0", features = ["v4", "serde"] }
walkdir = "2.4"
sha2 = "0.10"
parking_lot = "0.12"
base64 = "0.22"
fs4 = "0.13"

# Dev/test dependencies
tempfile = "3.0"
Expand Down
17 changes: 17 additions & 0 deletions crates/azoth-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,23 @@ pub enum AzothError {
#[error("Circuit breaker is open, rejecting request")]
CircuitBreakerOpen,

#[error("Event log write failed after state commit (events saved to dead letter queue): {0}")]
EventLogWriteFailed(String),

#[error("Key too large ({size} bytes, max {max}): {context}")]
KeyTooLarge {
size: usize,
max: usize,
context: String,
},

#[error("Value too large ({size} bytes, max {max}): {context}")]
ValueTooLarge {
size: usize,
max: usize,
context: String,
},

#[error("Internal error: {0}")]
Internal(String),

Expand Down
21 changes: 15 additions & 6 deletions crates/azoth-core/src/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,28 @@ pub struct MultiLockGuard<'a> {
}

impl LockManager {
/// Create a new lock manager with the specified number of stripes
/// Create a new lock manager with the specified number of stripes.
///
/// # Arguments
///
/// * `num_stripes` - Number of lock stripes (common values: 256, 512, 1024).
/// More stripes = less contention but more memory.
/// More stripes = less contention but more memory. Must be > 0.
/// * `default_timeout` - Default timeout for lock acquisition.
///
/// # Panics
/// # Returns
///
/// Panics if `num_stripes` is 0.
/// `Err(AzothError::Config)` if `num_stripes` is 0.
pub fn new(num_stripes: usize, default_timeout: Duration) -> Self {
assert!(num_stripes > 0, "num_stripes must be positive");
// Clamp to 1 instead of panicking — a single stripe still works (just no parallelism).
let num_stripes = if num_stripes == 0 {
tracing::warn!(
"LockManager created with num_stripes=0, defaulting to 1. \
This is a configuration error — set ARCANA_KEY_LOCK_STRIPES > 0."
);
1
} else {
num_stripes
};
let stripes = (0..num_stripes).map(|_| Mutex::new(())).collect();

Self {
Expand All @@ -76,7 +85,7 @@ impl LockManager {
}
}

/// Create a lock manager with default timeout
/// Create a lock manager with default timeout.
pub fn with_stripes(num_stripes: usize) -> Self {
Self::new(num_stripes, Duration::from_millis(DEFAULT_LOCK_TIMEOUT_MS))
}
Expand Down
7 changes: 7 additions & 0 deletions crates/azoth-core/src/types/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ pub struct CommitInfo {

/// Number of state keys deleted
pub state_keys_deleted: usize,

/// Number of events that were written to the dead letter queue
/// because the event log write failed after state commit.
/// When this is > 0, the state was committed but the event log
/// has a gap. Callers should surface this as an operational alert.
pub dlq_events: usize,
}

impl CommitInfo {
Expand All @@ -33,6 +39,7 @@ impl CommitInfo {
last_event_id: None,
state_keys_written: 0,
state_keys_deleted: 0,
dlq_events: 0,
}
}
}
1 change: 1 addition & 0 deletions crates/azoth-file-log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ thiserror = { workspace = true }
tracing = { workspace = true }
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
fs4 = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
140 changes: 138 additions & 2 deletions crates/azoth-file-log/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,26 @@ pub struct FileEventLogConfig {
/// buffered events on a process crash (the OS-level file won't be corrupted,
/// but un-flushed events will be lost).
pub flush_on_append: bool,

/// Whether to fsync (File::sync_all) after batch writes for full durability.
///
/// When `true`, data is guaranteed to be on disk (not just OS page cache)
/// after each batch commit. This is the strongest durability guarantee but
/// has higher latency. Default: `true` for safety.
pub sync_on_commit: bool,

/// Whether to enable the background disk space monitor.
/// Default: `false` (disabled). Enable for environments where
/// disk exhaustion is a real concern (e.g. shared hosts, small volumes).
pub enable_disk_monitor: bool,

/// Minimum disk space (bytes) to reserve before warning about low disk.
/// Default: 100 MB. Only used when `enable_disk_monitor` is `true`.
pub min_disk_space_bytes: u64,

/// How often (in seconds) to check disk space. Default: 60.
/// Only used when `enable_disk_monitor` is `true`.
pub disk_check_interval_secs: u64,
}

impl Default for FileEventLogConfig {
Expand All @@ -55,6 +75,10 @@ impl Default for FileEventLogConfig {
max_event_size: 4 * 1024 * 1024, // 4MB single-event limit
max_batch_bytes: 64 * 1024 * 1024, // 64MB batch limit
flush_on_append: true,
sync_on_commit: true,
enable_disk_monitor: false,
min_disk_space_bytes: 100 * 1024 * 1024, // 100 MB
disk_check_interval_secs: 60,
}
}
}
Expand Down Expand Up @@ -88,6 +112,10 @@ pub struct FileEventLog {
/// instead of polling, giving near-zero-latency event processing with zero
/// CPU waste when idle.
event_notify: Option<Arc<Notify>>,
/// Atomic flag set by the background disk space monitor.
/// When `true`, available disk space is below `min_disk_space_bytes`.
/// Checked by the commit path to log warnings without blocking writes.
low_disk_space: Arc<std::sync::atomic::AtomicBool>,
}

impl FileEventLog {
Expand Down Expand Up @@ -120,13 +148,40 @@ impl FileEventLog {
file,
)));

let low_disk_space = Arc::new(std::sync::atomic::AtomicBool::new(false));

// Perform initial disk space check (only if monitor is enabled)
if config.enable_disk_monitor {
match fs4::available_space(&config.base_dir) {
Ok(available) => {
if available < config.min_disk_space_bytes {
low_disk_space.store(true, std::sync::atomic::Ordering::Relaxed);
tracing::warn!(
available_mb = available / (1024 * 1024),
min_mb = config.min_disk_space_bytes / (1024 * 1024),
path = %config.base_dir.display(),
"Low disk space on event log volume"
);
}
}
Err(e) => {
tracing::warn!(
error = %e,
path = %config.base_dir.display(),
"Could not check disk space on event log volume"
);
}
}
}

Ok(Self {
config,
meta: Arc::new(Mutex::new(meta)),
next_event_id,
writer,
current_file_num,
event_notify: None,
low_disk_space,
})
}

Expand All @@ -144,6 +199,73 @@ impl FileEventLog {
self.event_notify.clone()
}

/// Start the background disk space monitor.
///
/// Checks available disk space every `disk_check_interval_secs` seconds
/// and sets the `low_disk_space` flag when space drops below
/// `min_disk_space_bytes`. The commit path reads this flag (zero-cost
/// atomic load) to log warnings.
///
/// Returns `None` if `enable_disk_monitor` is `false` in the config.
/// Otherwise returns a `JoinHandle` that runs until the process shuts down.
pub fn start_disk_monitor(&self) -> Option<tokio::task::JoinHandle<()>> {
if !self.config.enable_disk_monitor {
return None;
}

let flag = self.low_disk_space.clone();
let base_dir = self.config.base_dir.clone();
let min_bytes = self.config.min_disk_space_bytes;
let interval_secs = self.config.disk_check_interval_secs;

Some(tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
interval.tick().await;

match fs4::available_space(&base_dir) {
Ok(available) => {
let was_low = flag.load(std::sync::atomic::Ordering::Relaxed);
let is_low = available < min_bytes;
flag.store(is_low, std::sync::atomic::Ordering::Relaxed);

if is_low && !was_low {
tracing::warn!(
available_mb = available / (1024 * 1024),
min_mb = min_bytes / (1024 * 1024),
path = %base_dir.display(),
"Disk space dropped below threshold on event log volume"
);
} else if !is_low && was_low {
tracing::info!(
available_mb = available / (1024 * 1024),
path = %base_dir.display(),
"Disk space recovered above threshold on event log volume"
);
}
}
Err(e) => {
tracing::warn!(
error = %e,
path = %base_dir.display(),
"Failed to check disk space"
);
}
}
}
}))
}

/// Returns `true` if available disk space is below the configured threshold.
///
/// This is a zero-cost atomic load — safe to call on every commit.
pub fn is_low_disk_space(&self) -> bool {
self.low_disk_space
.load(std::sync::atomic::Ordering::Relaxed)
}

/// Get path to a log file by number
fn log_file_path(base_dir: &Path, file_num: u64) -> PathBuf {
base_dir.join(format!("events-{:08}.log", file_num))
Expand Down Expand Up @@ -260,12 +382,18 @@ impl EventLog for FileEventLog {
// Write event entry
self.write_event_entry(event_id, event_bytes)?;

// Conditionally flush based on config
// Flush BufWriter to OS page cache
if self.config.flush_on_append {
let mut writer = self.writer.lock();
writer.flush()?;
}

// Fsync to disk for full durability
if self.config.sync_on_commit {
let writer = self.writer.lock();
writer.get_ref().sync_all()?;
}

// Update metadata
{
let mut meta = self.meta.lock();
Expand Down Expand Up @@ -353,12 +481,18 @@ impl EventLog for FileEventLog {
writer.write_all(&buffer)?;
}

// Conditionally flush based on config
// Flush BufWriter to OS page cache
if self.config.flush_on_append {
let mut writer = self.writer.lock();
writer.flush()?;
}

// Fsync to disk for full durability (guarantees events survive power loss)
if self.config.sync_on_commit {
let writer = self.writer.lock();
writer.get_ref().sync_all()?;
}

// Update metadata
let last_id = first_event_id + events.len() as u64 - 1;
{
Expand Down Expand Up @@ -630,6 +764,8 @@ mod tests {
max_event_size: 1024 * 1024,
max_batch_bytes: 16 * 1024 * 1024,
flush_on_append: true,
sync_on_commit: false, // Disable fsync in tests for speed
..Default::default()
};
let log = FileEventLog::open(config).unwrap();
(log, temp_dir)
Expand Down
1 change: 1 addition & 0 deletions crates/azoth-lmdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ serde_json = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true }
parking_lot = { workspace = true }
base64 = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
Loading