diff --git a/crates/cortexadb-core/src/store.rs b/crates/cortexadb-core/src/store.rs index e82c474..177d961 100644 --- a/crates/cortexadb-core/src/store.rs +++ b/crates/cortexadb-core/src/store.rs @@ -300,6 +300,9 @@ impl CortexaDBStore { indexes.clone(), ))); + // Capture wal_path before `engine` moves into WriteState — used by the + // checkpoint thread without needing to re-acquire the writer lock each time. + let wal_path_for_ckpt = engine.wal_path().to_path_buf(); let writer = Arc::new(Mutex::new(WriteState { engine, indexes })); let sync_control = Arc::new(( Mutex::new(SyncRuntime { pending_ops: 0, dirty_since: None, shutdown: false }), @@ -323,9 +326,9 @@ impl CortexaDBStore { CheckpointPolicy::Disabled => None, CheckpointPolicy::Periodic { .. } => Some(Self::spawn_checkpoint_thread( Arc::clone(&writer), - Arc::clone(&snapshot), Arc::clone(&checkpoint_control), checkpoint_path.clone(), + wal_path_for_ckpt, checkpoint_policy, )), }; @@ -484,9 +487,9 @@ impl CortexaDBStore { fn spawn_checkpoint_thread( writer: Arc>, - snapshot: Arc>, checkpoint_control: Arc<(Mutex, Condvar)>, checkpoint_path: std::path::PathBuf, + wal_path: std::path::PathBuf, checkpoint_policy: CheckpointPolicy, ) -> JoinHandle<()> { std::thread::spawn(move || { @@ -531,33 +534,33 @@ impl CortexaDBStore { runtime.dirty_since = None; drop(runtime); - let read_snapshot = snapshot.load_full(); - let last_applied_id = match writer.lock() { - Ok(guard) => guard.engine.last_applied_id().0, + // ── Atomic snapshot capture ─────────────────────────────────────── + // We MUST read both `state_machine` and `last_applied_id` under the + // SAME writer lock acquisition. If we read them separately there is a + // window where the main thread advances `last_applied_id` beyond the + // state captured in the snapshot — the checkpoint would then record an + // ID that is ahead of its actual state, causing entries to be silently + // skipped on crash recovery (TOCTOU). + let (checkpoint_state, last_applied_id) = match writer.lock() { + Ok(guard) => { + let id = guard.engine.last_applied_id().0; + let state = guard.engine.get_state_machine().clone(); + (state, id) + } Err(e) => { log::error!("cortexadb checkpoint error (lock poisoned): {e}"); continue; } }; + // Lock released — all I/O happens outside it. - if let Err(err) = save_checkpoint( - &checkpoint_path, - read_snapshot.state_machine(), - last_applied_id, - ) { + if let Err(err) = + save_checkpoint(&checkpoint_path, &checkpoint_state, last_applied_id) + { log::error!("cortexadb checkpoint write error: {err}"); } else { // Truncate WAL prefix after successful checkpoint. - let write_guard = match writer.lock() { - Ok(g) => g, - Err(e) => { - log::error!("cortexadb checkpoint error while getting WAL path (lock poisoned): {e}"); - continue; - } - }; - let wal_path = write_guard.engine.wal_path().to_path_buf(); - // Drop lock early just in case writing to disk is slow - drop(write_guard); + // `wal_path` was captured at thread-spawn time — no lock needed. if let Err(err) = WriteAheadLog::truncate_prefix(&wal_path, CommandId(last_applied_id)) {