Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions crates/cortexadb-core/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ impl CortexaDBStore {
indexes.clone(),
)));

// Capture wal_path before `engine` moves into WriteState — used by the
// checkpoint thread without needing to re-acquire the writer lock each time.
let wal_path_for_ckpt = engine.wal_path().to_path_buf();
let writer = Arc::new(Mutex::new(WriteState { engine, indexes }));
let sync_control = Arc::new((
Mutex::new(SyncRuntime { pending_ops: 0, dirty_since: None, shutdown: false }),
Expand All @@ -323,9 +326,9 @@ impl CortexaDBStore {
CheckpointPolicy::Disabled => None,
CheckpointPolicy::Periodic { .. } => Some(Self::spawn_checkpoint_thread(
Arc::clone(&writer),
Arc::clone(&snapshot),
Arc::clone(&checkpoint_control),
checkpoint_path.clone(),
wal_path_for_ckpt,
checkpoint_policy,
)),
};
Expand Down Expand Up @@ -484,9 +487,9 @@ impl CortexaDBStore {

fn spawn_checkpoint_thread(
writer: Arc<Mutex<WriteState>>,
snapshot: Arc<ArcSwap<ReadSnapshot>>,
checkpoint_control: Arc<(Mutex<CheckpointRuntime>, Condvar)>,
checkpoint_path: std::path::PathBuf,
wal_path: std::path::PathBuf,
checkpoint_policy: CheckpointPolicy,
) -> JoinHandle<()> {
std::thread::spawn(move || {
Expand Down Expand Up @@ -531,33 +534,33 @@ impl CortexaDBStore {
runtime.dirty_since = None;
drop(runtime);

let read_snapshot = snapshot.load_full();
let last_applied_id = match writer.lock() {
Ok(guard) => guard.engine.last_applied_id().0,
// ── Atomic snapshot capture ───────────────────────────────────────
// We MUST read both `state_machine` and `last_applied_id` under the
// SAME writer lock acquisition. If we read them separately there is a
// window where the main thread advances `last_applied_id` beyond the
// state captured in the snapshot — the checkpoint would then record an
// ID that is ahead of its actual state, causing entries to be silently
// skipped on crash recovery (TOCTOU).
let (checkpoint_state, last_applied_id) = match writer.lock() {
Ok(guard) => {
let id = guard.engine.last_applied_id().0;
let state = guard.engine.get_state_machine().clone();
(state, id)
}
Err(e) => {
log::error!("cortexadb checkpoint error (lock poisoned): {e}");
continue;
}
};
// Lock released — all I/O happens outside it.

if let Err(err) = save_checkpoint(
&checkpoint_path,
read_snapshot.state_machine(),
last_applied_id,
) {
if let Err(err) =
save_checkpoint(&checkpoint_path, &checkpoint_state, last_applied_id)
{
Comment on lines +544 to +559
log::error!("cortexadb checkpoint write error: {err}");
} else {
// Truncate WAL prefix after successful checkpoint.
let write_guard = match writer.lock() {
Ok(g) => g,
Err(e) => {
log::error!("cortexadb checkpoint error while getting WAL path (lock poisoned): {e}");
continue;
}
};
let wal_path = write_guard.engine.wal_path().to_path_buf();
// Drop lock early just in case writing to disk is slow
drop(write_guard);
// `wal_path` was captured at thread-spawn time — no lock needed.
if let Err(err) =
WriteAheadLog::truncate_prefix(&wal_path, CommandId(last_applied_id))
{
Expand Down
Loading