diff --git a/crates/cortexadb-core/src/core/command.rs b/crates/cortexadb-core/src/core/command.rs index a10dec7..1ebeaff 100644 --- a/crates/cortexadb-core/src/core/command.rs +++ b/crates/cortexadb-core/src/core/command.rs @@ -1,6 +1,7 @@ +use serde::{Deserialize, Serialize}; + use super::memory_entry::MemoryEntry; use crate::core::memory_entry::MemoryId; -use serde::{Deserialize, Serialize}; /// State-mutating commands for the state machine #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/cortexadb-core/src/core/memory_entry.rs b/crates/cortexadb-core/src/core/memory_entry.rs index 8fe5c24..0903cc9 100644 --- a/crates/cortexadb-core/src/core/memory_entry.rs +++ b/crates/cortexadb-core/src/core/memory_entry.rs @@ -1,6 +1,7 @@ -use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use serde::{Deserialize, Serialize}; + /// Unique identifier for a memory entry #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] pub struct MemoryId(pub u64); diff --git a/crates/cortexadb-core/src/core/state_machine.rs b/crates/cortexadb-core/src/core/state_machine.rs index 9345eb6..eb67282 100644 --- a/crates/cortexadb-core/src/core/state_machine.rs +++ b/crates/cortexadb-core/src/core/state_machine.rs @@ -1,9 +1,12 @@ -use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, HashMap}; + +use serde::{Deserialize, Serialize}; use thiserror::Error; -use super::command::Command; -use super::memory_entry::{MemoryEntry, MemoryId}; +use super::{ + command::Command, + memory_entry::{MemoryEntry, MemoryId}, +}; #[derive(Error, Debug)] pub enum StateMachineError { diff --git a/crates/cortexadb-core/src/engine.rs b/crates/cortexadb-core/src/engine.rs index 6fe78d1..4041c3a 100644 --- a/crates/cortexadb-core/src/engine.rs +++ b/crates/cortexadb-core/src/engine.rs @@ -1,11 +1,18 @@ use std::path::Path; + use thiserror::Error; -use crate::core::command::Command; -use crate::core::memory_entry::{MemoryEntry, MemoryId}; -use crate::core::state_machine::StateMachine; -use crate::storage::segment::SegmentStorage; -use crate::storage::wal::{CommandId, WriteAheadLog}; +use crate::{ + core::{ + command::Command, + memory_entry::{MemoryEntry, MemoryId}, + state_machine::StateMachine, + }, + storage::{ + segment::SegmentStorage, + wal::{CommandId, WriteAheadLog}, + }, +}; #[derive(Error, Debug)] pub enum EngineError { @@ -27,6 +34,8 @@ pub enum EngineError { CheckpointWalGap { checkpoint_last_applied: u64, wal_highest: Option }, #[error("Engine not recovered properly")] NotRecovered, + #[error("Lock was poisoned during {0}")] + LockPoisoned(&'static str), } pub type Result = std::result::Result; @@ -481,9 +490,10 @@ impl Engine { #[cfg(test)] mod tests { + use tempfile::TempDir; + use super::*; use crate::core::memory_entry::{MemoryEntry, MemoryId}; - use tempfile::TempDir; #[test] fn test_engine_creation() { diff --git a/crates/cortexadb-core/src/facade.rs b/crates/cortexadb-core/src/facade.rs index f8ac138..a8fc42e 100644 --- a/crates/cortexadb-core/src/facade.rs +++ b/crates/cortexadb-core/src/facade.rs @@ -4,16 +4,22 @@ //! It wraps [`CortexaDBStore`] and hides planner/engine/index details behind //! five core operations: `open`, `add`, `search`, `connect`, `compact`. -use std::collections::HashMap; -use std::path::PathBuf; -use std::time::{SystemTime, UNIX_EPOCH}; - -use crate::core::memory_entry::{MemoryEntry, MemoryId}; -use crate::core::state_machine::StateMachineError; -use crate::engine::{CapacityPolicy, SyncPolicy}; -use crate::index::IndexMode; -use crate::query::hybrid::{QueryEmbedder, QueryOptions}; -use crate::store::{CheckpointPolicy, CortexaDBStore, CortexaDBStoreError}; +use std::{ + collections::HashMap, + path::PathBuf, + time::{SystemTime, UNIX_EPOCH}, +}; + +use crate::{ + core::{ + memory_entry::{MemoryEntry, MemoryId}, + state_machine::StateMachineError, + }, + engine::{CapacityPolicy, SyncPolicy}, + index::IndexMode, + query::hybrid::{QueryEmbedder, QueryOptions}, + store::{CheckpointPolicy, CortexaDBStore, CortexaDBStoreError}, +}; // --------------------------------------------------------------------------- // Public types @@ -490,14 +496,14 @@ impl CortexaDB { } /// Get database statistics. - pub fn stats(&self) -> Stats { - Stats { + pub fn stats(&self) -> Result { + Ok(Stats { entries: self.inner.state_machine().len(), indexed_embeddings: self.inner.indexed_embeddings(), - wal_length: self.inner.wal_len(), + wal_length: self.inner.wal_len()?, vector_dimension: self.inner.vector_dimension(), storage_version: 1, - } + }) } /// Access the underlying `CortexaDBStore` for advanced operations. @@ -512,9 +518,10 @@ impl CortexaDB { #[cfg(test)] mod tests { - use super::*; use tempfile::TempDir; + use super::*; + #[test] fn test_open_add_search() { let temp = TempDir::new().unwrap(); @@ -540,7 +547,7 @@ mod tests { let id2 = db.add(vec![0.0, 1.0, 0.0], None).unwrap(); db.connect(id1, id2, "related").unwrap(); - let stats = db.stats(); + let stats = db.stats().unwrap(); assert_eq!(stats.entries, 2); assert_eq!(stats.indexed_embeddings, 2); assert_eq!(stats.vector_dimension, 3); @@ -560,7 +567,7 @@ mod tests { // Reopen — should recover from WAL. let db = CortexaDB::open(path.to_str().unwrap(), 3).unwrap(); let stats = db.stats(); - assert_eq!(stats.entries, 2); + assert_eq!(stats.unwrap().entries, 2); let hits = db.search(vec![1.0, 0.0, 0.0], 5, None).unwrap(); assert!(!hits.is_empty()); @@ -591,7 +598,7 @@ mod tests { } let db = CortexaDB::open_with_config(path.to_str().unwrap(), config).unwrap(); - let stats = db.stats(); + let stats = db.stats().unwrap(); assert_eq!(stats.entries, 3); } @@ -632,7 +639,7 @@ mod tests { let _id2 = db.add_in_collection("agent_c", vec![0.0, 0.0, 1.0], None).unwrap(); let stats = db.stats(); - assert_eq!(stats.entries, 2); + assert_eq!(stats.unwrap().entries, 2); let m1 = db.get_memory(id1).unwrap(); assert_eq!(m1.collection, "agent_b"); @@ -645,10 +652,10 @@ mod tests { let db = CortexaDB::open(path.to_str().unwrap(), 3).unwrap(); let id = db.add(vec![1.0, 0.0, 0.0], None).unwrap(); - assert_eq!(db.stats().entries, 1); + assert_eq!(db.stats().unwrap().entries, 1); db.delete(id).unwrap(); - assert_eq!(db.stats().entries, 0, "entry count should be 0 after delete"); + assert_eq!(db.stats().unwrap().entries, 0, "entry count should be 0 after delete"); } #[test] diff --git a/crates/cortexadb-core/src/index/combined.rs b/crates/cortexadb-core/src/index/combined.rs index f5999d9..4efa66d 100644 --- a/crates/cortexadb-core/src/index/combined.rs +++ b/crates/cortexadb-core/src/index/combined.rs @@ -1,12 +1,16 @@ use std::collections::{HashMap, HashSet}; + use thiserror::Error; -use crate::core::memory_entry::MemoryId; -use crate::core::state_machine::StateMachine; -use crate::index::graph::GraphIndex; -use crate::index::hnsw::{HnswBackend, HnswConfig}; -use crate::index::temporal::TemporalIndex; -use crate::index::vector::{VectorBackendMode, VectorIndex}; +use crate::{ + core::{memory_entry::MemoryId, state_machine::StateMachine}, + index::{ + graph::GraphIndex, + hnsw::{HnswBackend, HnswConfig}, + temporal::TemporalIndex, + vector::{VectorBackendMode, VectorIndex}, + }, +}; #[derive(Error, Debug)] pub enum CombinedError { diff --git a/crates/cortexadb-core/src/index/graph.rs b/crates/cortexadb-core/src/index/graph.rs index f96af83..3ef0ca3 100644 --- a/crates/cortexadb-core/src/index/graph.rs +++ b/crates/cortexadb-core/src/index/graph.rs @@ -1,9 +1,8 @@ -use std::collections::hash_map::Entry; -use std::collections::{HashMap, VecDeque}; +use std::collections::{hash_map::Entry, HashMap, VecDeque}; + use thiserror::Error; -use crate::core::memory_entry::MemoryId; -use crate::core::state_machine::StateMachine; +use crate::core::{memory_entry::MemoryId, state_machine::StateMachine}; #[derive(Error, Debug)] pub enum GraphError { diff --git a/crates/cortexadb-core/src/index/hnsw.rs b/crates/cortexadb-core/src/index/hnsw.rs index 90ef1b7..03b9025 100644 --- a/crates/cortexadb-core/src/index/hnsw.rs +++ b/crates/cortexadb-core/src/index/hnsw.rs @@ -1,5 +1,8 @@ -use std::path::Path; -use std::sync::{Arc, RwLock}; +use std::{ + path::Path, + sync::{Arc, RwLock}, +}; + use thiserror::Error; use crate::core::memory_entry::MemoryId; diff --git a/crates/cortexadb-core/src/index/temporal.rs b/crates/cortexadb-core/src/index/temporal.rs index d897b32..bb1510b 100644 --- a/crates/cortexadb-core/src/index/temporal.rs +++ b/crates/cortexadb-core/src/index/temporal.rs @@ -1,7 +1,6 @@ use thiserror::Error; -use crate::core::memory_entry::MemoryId; -use crate::core::state_machine::StateMachine; +use crate::core::{memory_entry::MemoryId, state_machine::StateMachine}; #[derive(Error, Debug)] pub enum TemporalError { diff --git a/crates/cortexadb-core/src/index/vector.rs b/crates/cortexadb-core/src/index/vector.rs index 0338abc..d1a2396 100644 --- a/crates/cortexadb-core/src/index/vector.rs +++ b/crates/cortexadb-core/src/index/vector.rs @@ -1,10 +1,15 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + use rayon::prelude::*; -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; use thiserror::Error; -use crate::core::memory_entry::MemoryId; -use crate::index::hnsw::{HnswBackend, HnswConfig}; +use crate::{ + core::memory_entry::MemoryId, + index::hnsw::{HnswBackend, HnswConfig}, +}; #[derive(Error, Debug)] pub enum VectorError { diff --git a/crates/cortexadb-core/src/query/executor.rs b/crates/cortexadb-core/src/query/executor.rs index 1941602..a05d91b 100644 --- a/crates/cortexadb-core/src/query/executor.rs +++ b/crates/cortexadb-core/src/query/executor.rs @@ -1,14 +1,18 @@ -use std::collections::{HashMap, HashSet}; -use std::sync::{Mutex, OnceLock}; -use std::time::Instant; - -use crate::core::memory_entry::MemoryId; -use crate::core::state_machine::StateMachine; -use crate::index::combined::IndexLayer; -use crate::index::graph::GraphIndex; -use crate::query::hybrid::{HybridQueryError, QueryEmbedder, QueryHit}; -use crate::query::intent::get_intent_policy; -use crate::query::planner::{ExecutionPath, QueryPlan, QueryPlanner}; +use std::{ + collections::{HashMap, HashSet}, + sync::{Mutex, OnceLock}, + time::Instant, +}; + +use crate::{ + core::{memory_entry::MemoryId, state_machine::StateMachine}, + index::{combined::IndexLayer, graph::GraphIndex}, + query::{ + hybrid::{HybridQueryError, QueryEmbedder, QueryHit}, + intent::get_intent_policy, + planner::{ExecutionPath, QueryPlan, QueryPlanner}, + }, +}; pub type Result = std::result::Result; @@ -53,8 +57,11 @@ fn intent_anchor_cache() -> &'static Mutex> { pub(crate) fn clear_intent_anchor_cache() { let cache = intent_anchor_cache(); - let mut guard = cache.lock().expect("intent anchor cache lock poisoned"); - guard.clear(); + if let Ok(mut guard) = cache.lock() { + guard.clear(); + } else { + log::warn!("intent anchor cache lock poisoned, unable to clear"); + } } fn load_or_build_intent_anchors( @@ -62,11 +69,12 @@ fn load_or_build_intent_anchors( dim: usize, ) -> std::result::Result { let cache = intent_anchor_cache(); - { - let guard = cache.lock().expect("intent anchor cache lock poisoned"); + if let Ok(guard) = cache.lock() { if let Some(found) = guard.get(&dim) { return Ok(found.clone()); } + } else { + log::warn!("intent anchor cache lock poisoned, rebuilding anchor from scratch"); } let policy = get_intent_policy(); @@ -77,8 +85,9 @@ fn load_or_build_intent_anchors( return Err("intent anchor embedding dimension mismatch".to_string()); } let anchors = IntentAnchors { semantic, recency, graph }; - let mut guard = cache.lock().expect("intent anchor cache lock poisoned"); - guard.insert(dim, anchors.clone()); + if let Ok(mut guard) = cache.lock() { + guard.insert(dim, anchors.clone()); + } Ok(anchors) } @@ -354,9 +363,13 @@ fn build_ranked_hits( #[cfg(test)] mod tests { use super::*; - use crate::core::memory_entry::{MemoryEntry, MemoryId}; - use crate::query::hybrid::{GraphExpansionOptions, QueryOptions}; - use crate::query::planner::{ExecutionPath, QueryPlanner}; + use crate::{ + core::memory_entry::{MemoryEntry, MemoryId}, + query::{ + hybrid::{GraphExpansionOptions, QueryOptions}, + planner::{ExecutionPath, QueryPlanner}, + }, + }; struct TestEmbedder; impl QueryEmbedder for TestEmbedder { diff --git a/crates/cortexadb-core/src/query/hybrid.rs b/crates/cortexadb-core/src/query/hybrid.rs index 5e87aa0..b37cdfa 100644 --- a/crates/cortexadb-core/src/query/hybrid.rs +++ b/crates/cortexadb-core/src/query/hybrid.rs @@ -2,10 +2,10 @@ use std::collections::{HashMap, HashSet}; use thiserror::Error; -use crate::core::memory_entry::MemoryId; -use crate::core::state_machine::StateMachine; -use crate::index::combined::IndexLayer; -use crate::index::graph::GraphIndex; +use crate::{ + core::{memory_entry::MemoryId, state_machine::StateMachine}, + index::{combined::IndexLayer, graph::GraphIndex}, +}; #[derive(Error, Debug)] pub enum HybridQueryError { diff --git a/crates/cortexadb-core/src/query/intent.rs b/crates/cortexadb-core/src/query/intent.rs index 57b57f9..16ae612 100644 --- a/crates/cortexadb-core/src/query/intent.rs +++ b/crates/cortexadb-core/src/query/intent.rs @@ -1,7 +1,5 @@ use std::sync::{OnceLock, RwLock}; -use crate::query::executor; - #[derive(Debug, Clone)] pub struct IntentPolicy { pub semantic_anchor_text: String, @@ -33,12 +31,19 @@ fn policy_cell() -> &'static RwLock { } pub fn set_intent_policy(policy: IntentPolicy) { - let mut guard = policy_cell().write().expect("intent policy write lock poisoned"); - *guard = policy; - drop(guard); - executor::clear_intent_anchor_cache(); + if let Ok(mut guard) = policy_cell().write() { + *guard = policy; + crate::query::executor::clear_intent_anchor_cache(); + } else { + log::warn!("intent policy write lock poisoned"); + } } pub fn get_intent_policy() -> IntentPolicy { - policy_cell().read().expect("intent policy read lock poisoned").clone() + if let Ok(guard) = policy_cell().read() { + guard.clone() + } else { + log::warn!("intent policy read lock poisoned, falling back to default"); + IntentPolicy::default() + } } diff --git a/crates/cortexadb-core/src/storage/checkpoint.rs b/crates/cortexadb-core/src/storage/checkpoint.rs index c790afd..15190c7 100644 --- a/crates/cortexadb-core/src/storage/checkpoint.rs +++ b/crates/cortexadb-core/src/storage/checkpoint.rs @@ -1,6 +1,8 @@ -use std::fs::OpenOptions; -use std::io::Write; -use std::path::{Path, PathBuf}; +use std::{ + fs::OpenOptions, + io::Write, + path::{Path, PathBuf}, +}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -95,9 +97,10 @@ pub fn load_checkpoint>(path: P) -> Result (PathBuf, PathBuf) { #[cfg(test)] mod tests { + use tempfile::TempDir; + use super::*; use crate::core::memory_entry::{MemoryEntry, MemoryId}; - use tempfile::TempDir; #[test] fn test_compact_segment_dir_noop_when_not_compactable() { diff --git a/crates/cortexadb-core/src/storage/segment.rs b/crates/cortexadb-core/src/storage/segment.rs index 34f5d07..b501204 100644 --- a/crates/cortexadb-core/src/storage/segment.rs +++ b/crates/cortexadb-core/src/storage/segment.rs @@ -1,12 +1,17 @@ +use std::{ + collections::HashMap, + fs::{File, OpenOptions}, + io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, +}; + use crc::Crc; -use std::collections::HashMap; -use std::fs::{File, OpenOptions}; -use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}; -use std::path::{Path, PathBuf}; use thiserror::Error; -use crate::core::memory_entry::{MemoryEntry, MemoryId}; -use crate::storage::serialization::{deserialize_versioned, serialize_versioned}; +use crate::{ + core::memory_entry::{MemoryEntry, MemoryId}, + storage::serialization::{deserialize_versioned, serialize_versioned}, +}; #[derive(Error, Debug)] pub enum SegmentError { @@ -442,9 +447,10 @@ impl SegmentStorage { #[cfg(test)] mod tests { - use super::*; use tempfile::TempDir; + use super::*; + fn create_test_entry(id: u64) -> MemoryEntry { MemoryEntry::new( MemoryId(id), diff --git a/crates/cortexadb-core/src/storage/wal.rs b/crates/cortexadb-core/src/storage/wal.rs index fab2952..d683ff6 100644 --- a/crates/cortexadb-core/src/storage/wal.rs +++ b/crates/cortexadb-core/src/storage/wal.rs @@ -1,11 +1,16 @@ +use std::{ + fs::{File, OpenOptions}, + io::{BufReader, BufWriter, Read, Write}, + path::{Path, PathBuf}, +}; + use crc::Crc; -use std::fs::{File, OpenOptions}; -use std::io::{BufReader, BufWriter, Read, Write}; -use std::path::{Path, PathBuf}; use thiserror::Error; -use crate::core::command::Command; -use crate::storage::serialization::{deserialize_versioned, serialize_versioned}; +use crate::{ + core::command::Command, + storage::serialization::{deserialize_versioned, serialize_versioned}, +}; #[derive(Error, Debug)] pub enum WalError { @@ -298,11 +303,13 @@ impl WriteAheadLog { #[cfg(test)] mod tests { - use super::*; - use crate::core::memory_entry::MemoryEntry; use std::io::{Seek, SeekFrom}; + use tempfile::TempDir; + use super::*; + use crate::core::memory_entry::MemoryEntry; + #[test] fn test_wal_append_and_read() { let temp_dir = TempDir::new().unwrap(); diff --git a/crates/cortexadb-core/src/store.rs b/crates/cortexadb-core/src/store.rs index 3f0f742..e82c474 100644 --- a/crates/cortexadb-core/src/store.rs +++ b/crates/cortexadb-core/src/store.rs @@ -1,27 +1,34 @@ -use arc_swap::ArcSwap; -use std::collections::HashSet; -use std::path::Path; -use std::sync::{Arc, Condvar, Mutex}; -use std::thread::JoinHandle; -use std::time::{Duration, Instant}; +use std::{ + collections::HashSet, + path::Path, + sync::{Arc, Condvar, Mutex}, + thread::JoinHandle, + time::{Duration, Instant}, +}; +use arc_swap::ArcSwap; use thiserror::Error; -use crate::core::command::Command; -use crate::core::memory_entry::{MemoryEntry, MemoryId}; -use crate::core::state_machine::StateMachine; -use crate::engine::{CapacityPolicy, Engine, EvictionReport, SyncPolicy}; -use crate::index::vector::VectorBackendMode; -use crate::index::IndexLayer; -use crate::query::{ - IntentAnchors, QueryEmbedder, QueryExecution, QueryExecutor, QueryOptions, QueryPlan, - QueryPlanner, StageTrace, +use crate::{ + core::{ + command::Command, + memory_entry::{MemoryEntry, MemoryId}, + state_machine::StateMachine, + }, + engine::{CapacityPolicy, Engine, EvictionReport, SyncPolicy}, + index::{vector::VectorBackendMode, IndexLayer}, + query::{ + IntentAnchors, QueryEmbedder, QueryExecution, QueryExecutor, QueryOptions, QueryPlan, + QueryPlanner, StageTrace, + }, + storage::{ + checkpoint::{ + checkpoint_path_from_wal, load_checkpoint, save_checkpoint, LoadedCheckpoint, + }, + compaction::CompactionReport, + wal::{CommandId, WriteAheadLog}, + }, }; -use crate::storage::checkpoint::{ - checkpoint_path_from_wal, load_checkpoint, save_checkpoint, LoadedCheckpoint, -}; -use crate::storage::compaction::CompactionReport; -use crate::storage::wal::{CommandId, WriteAheadLog}; #[derive(Error, Debug)] pub enum CortexaDBStoreError { @@ -39,6 +46,8 @@ pub enum CortexaDBStoreError { InvariantViolation(String), #[error("Embedding required when content changes for memory id {0:?}")] MissingEmbeddingOnContentChange(MemoryId), + #[error("Lock was poisoned during {0}")] + LockPoisoned(&'static str), } pub type Result = std::result::Result; @@ -110,6 +119,12 @@ pub struct CortexaDBStore { capacity_policy: CapacityPolicy, } +fn writer_lock<'a>( + writer: &'a std::sync::Mutex, +) -> Result> { + writer.lock().map_err(|_| CortexaDBStoreError::LockPoisoned("writer lock")) +} + impl CortexaDBStore { pub fn new>( wal_path: P, @@ -335,7 +350,8 @@ impl CortexaDBStore { } pub fn flush(&self) -> Result<()> { - let mut writer = self.writer.lock().expect("writer lock poisoned"); + let mut writer = + self.writer.lock().map_err(|_| CortexaDBStoreError::LockPoisoned("writer lock"))?; writer.engine.flush()?; self.clear_pending_sync_state(); Ok(()) @@ -343,7 +359,8 @@ impl CortexaDBStore { pub fn checkpoint_now(&self) -> Result<()> { let snapshot = self.snapshot(); - let mut writer = self.writer.lock().expect("writer lock poisoned"); + let mut writer = + self.writer.lock().map_err(|_| CortexaDBStoreError::LockPoisoned("writer lock"))?; let last_applied_id = writer.engine.last_applied_id().0; save_checkpoint(&self.checkpoint_path, snapshot.state_machine(), last_applied_id)?; @@ -425,7 +442,13 @@ impl CortexaDBStore { drop(runtime); if should_flush { - let mut write_state = writer.lock().expect("writer lock poisoned"); + let mut write_state = match writer.lock() { + Ok(guard) => guard, + Err(e) => { + log::error!("cortexadb sync manager flush error (lock poisoned): {e}"); + continue; + } + }; if let Err(err) = write_state.engine.flush_buffers() { log::error!("cortexadb sync manager flush_buffers error: {err}"); continue; @@ -509,8 +532,13 @@ impl CortexaDBStore { drop(runtime); let read_snapshot = snapshot.load_full(); - let last_applied_id = - writer.lock().expect("writer lock poisoned").engine.last_applied_id().0; + let last_applied_id = match writer.lock() { + Ok(guard) => guard.engine.last_applied_id().0, + Err(e) => { + log::error!("cortexadb checkpoint error (lock poisoned): {e}"); + continue; + } + }; if let Err(err) = save_checkpoint( &checkpoint_path, @@ -520,20 +548,31 @@ impl CortexaDBStore { log::error!("cortexadb checkpoint write error: {err}"); } else { // Truncate WAL prefix after successful checkpoint. - let wal_path = writer - .lock() - .expect("writer lock poisoned") - .engine - .wal_path() - .to_path_buf(); + let write_guard = match writer.lock() { + Ok(g) => g, + Err(e) => { + log::error!("cortexadb checkpoint error while getting WAL path (lock poisoned): {e}"); + continue; + } + }; + let wal_path = write_guard.engine.wal_path().to_path_buf(); + // Drop lock early just in case writing to disk is slow + drop(write_guard); if let Err(err) = WriteAheadLog::truncate_prefix(&wal_path, CommandId(last_applied_id)) { log::error!("cortexadb WAL truncation error: {err}"); - } else if let Err(err) = - writer.lock().expect("writer lock poisoned").engine.reopen_wal() - { - log::error!("cortexadb WAL reopen error: {err}"); + } else { + let mut write_guard = match writer.lock() { + Ok(g) => g, + Err(e) => { + log::error!("cortexadb checkpoint error while reopening WAL (lock poisoned): {e}"); + continue; + } + }; + if let Err(err) = write_guard.engine.reopen_wal() { + log::error!("cortexadb WAL reopen error: {err}"); + } } } } @@ -541,7 +580,8 @@ impl CortexaDBStore { } pub fn add(&self, entry: MemoryEntry) -> Result { - let mut writer = self.writer.lock().expect("writer lock poisoned"); + let mut writer = + self.writer.lock().map_err(|_| CortexaDBStoreError::LockPoisoned("writer lock"))?; let mut effective = entry; if let Ok(prev) = writer.engine.get_state_machine().get_memory(effective.id) { @@ -560,7 +600,8 @@ impl CortexaDBStore { } pub fn add_batch(&self, entries: Vec) -> Result { - let mut writer = self.writer.lock().expect("writer lock poisoned"); + let mut writer = + self.writer.lock().map_err(|_| CortexaDBStoreError::LockPoisoned("writer lock"))?; let sync_now = matches!(self.sync_policy, SyncPolicy::Strict); let mut last_cmd_id = CommandId(0); @@ -619,23 +660,27 @@ impl CortexaDBStore { } pub fn delete(&self, id: MemoryId) -> Result { - let mut writer = self.writer.lock().expect("writer lock poisoned"); + let mut writer = + self.writer.lock().map_err(|_| CortexaDBStoreError::LockPoisoned("writer lock"))?; self.execute_write_transaction_locked(&mut writer, WriteOp::Delete(id)) } pub fn connect(&self, from: MemoryId, to: MemoryId, relation: String) -> Result { - let mut writer = self.writer.lock().expect("writer lock poisoned"); + let mut writer = + self.writer.lock().map_err(|_| CortexaDBStoreError::LockPoisoned("writer lock"))?; self.execute_write_transaction_locked(&mut writer, WriteOp::Connect { from, to, relation }) } pub fn disconnect(&self, from: MemoryId, to: MemoryId) -> Result { - let mut writer = self.writer.lock().expect("writer lock poisoned"); + let mut writer = + self.writer.lock().map_err(|_| CortexaDBStoreError::LockPoisoned("writer lock"))?; self.execute_write_transaction_locked(&mut writer, WriteOp::Disconnect { from, to }) } /// Rebuild in-memory vector index from current state machine entries. pub fn rebuild_vector_index(&self) -> Result { - let mut writer = self.writer.lock().expect("writer lock poisoned"); + let mut writer = + self.writer.lock().map_err(|_| CortexaDBStoreError::LockPoisoned("writer lock"))?; writer.indexes = Self::build_vector_index( writer.engine.get_state_machine(), writer.indexes.vector.dimension(), @@ -652,10 +697,11 @@ impl CortexaDBStore { Ok(indexed) } - pub fn set_vector_backend_mode(&self, mode: VectorBackendMode) { - let mut writer = self.writer.lock().expect("writer lock poisoned"); + pub fn set_vector_backend_mode(&self, mode: VectorBackendMode) -> Result<()> { + let mut writer = writer_lock(&self.writer)?; writer.indexes.set_vector_backend_mode(mode); self.publish_snapshot_from_write_state(&writer); + Ok(()) } pub fn query( @@ -750,7 +796,8 @@ impl CortexaDBStore { } pub fn enforce_capacity(&self, policy: CapacityPolicy) -> Result { - let mut writer = self.writer.lock().expect("writer lock poisoned"); + let mut writer = + self.writer.lock().map_err(|_| CortexaDBStoreError::LockPoisoned("writer lock"))?; let sync_now = matches!(self.sync_policy, SyncPolicy::Strict); let report = Self::enforce_capacity_locked(&mut writer, policy, sync_now)?; @@ -786,7 +833,8 @@ impl CortexaDBStore { } pub fn compact_segments(&self) -> Result { - let mut writer = self.writer.lock().expect("writer lock poisoned"); + let mut writer = + self.writer.lock().map_err(|_| CortexaDBStoreError::LockPoisoned("writer lock"))?; let report = writer.engine.compact_segments()?; writer.indexes.vector_index_mut().compact()?; self.publish_snapshot_from_write_state(&writer); @@ -805,8 +853,8 @@ impl CortexaDBStore { self.snapshot().indexes().vector.len() } - pub fn wal_len(&self) -> u64 { - self.writer.lock().expect("writer lock poisoned").engine.wal_len() + pub fn wal_len(&self) -> Result { + Ok(writer_lock(&self.writer)?.engine.wal_len()) } fn mark_pending_write(&self, ops: usize) { @@ -1062,12 +1110,12 @@ enum WriteOp { #[cfg(test)] mod tests { - use super::*; - use std::sync::Arc; - use std::thread; - use std::time::Duration; + use std::{sync::Arc, thread, time::Duration}; + use tempfile::TempDir; + use super::*; + struct TestEmbedder; impl QueryEmbedder for TestEmbedder { fn embed(&self, _query: &str) -> std::result::Result, String> { diff --git a/crates/cortexadb-core/tests/integration.rs b/crates/cortexadb-core/tests/integration.rs index b612137..686ffa2 100644 --- a/crates/cortexadb-core/tests/integration.rs +++ b/crates/cortexadb-core/tests/integration.rs @@ -61,8 +61,8 @@ fn test_recover_after_drop_restores_entries() { // Reopen: should recover from WAL let db = open_db(&path); - assert_eq!(db.stats().entries, 3, "all entries must survive reopen"); - assert_eq!(db.stats().indexed_embeddings, 3); + assert_eq!(db.stats().unwrap().entries, 3, "all entries must survive reopen"); + assert_eq!(db.stats().unwrap().indexed_embeddings, 3); for id in &expected_ids { db.get_memory(*id).unwrap_or_else(|_| panic!("memory {} must survive recovery", id)); @@ -110,7 +110,7 @@ fn test_checkpoint_recovery_preserves_all_entries() { } let db = open_db(&path); - assert_eq!(db.stats().entries, 3, "all 3 entries must survive checkpoint+recovery"); + assert_eq!(db.stats().unwrap().entries, 3, "all 3 entries must survive checkpoint+recovery"); for id in &all_ids { db.get_memory(*id) .unwrap_or_else(|_| panic!("memory {} missing after checkpoint recovery", id)); @@ -135,7 +135,7 @@ fn test_double_checkpoint_recovery() { } let db = open_db(&path); - assert_eq!(db.stats().entries, 3, "all entries must survive double checkpoint"); + assert_eq!(db.stats().unwrap().entries, 3, "all entries must survive double checkpoint"); } // --------------------------------------------------------------------------- @@ -154,11 +154,11 @@ fn test_delete_persists_across_recovery() { deleted_id = db.add(vec![1.0, 0.0, 0.0], None).unwrap(); kept_id = db.add(vec![0.0, 1.0, 0.0], None).unwrap(); db.delete(deleted_id).unwrap(); - assert_eq!(db.stats().entries, 1); + assert_eq!(db.stats().unwrap().entries, 1); } let db = open_db(&path); - assert_eq!(db.stats().entries, 1, "deletion must persist across recovery"); + assert_eq!(db.stats().unwrap().entries, 1, "deletion must persist across recovery"); assert!(db.get_memory(deleted_id).is_err(), "deleted entry must not be recoverable"); assert!(db.get_memory(kept_id).is_ok(), "non-deleted entry must survive"); } @@ -180,7 +180,7 @@ fn test_delete_then_checkpoint_recovery() { } let db = open_db(&path); - assert_eq!(db.stats().entries, 1); + assert_eq!(db.stats().unwrap().entries, 1); assert!(db.get_memory(deleted_id).is_err(), "deleted entry must not survive checkpoint"); } @@ -260,9 +260,11 @@ fn test_metadata_persists_across_recovery() { #[test] fn test_capacity_eviction_keeps_max_entries() { - use cortexadb_core::engine::{CapacityPolicy, SyncPolicy}; - use cortexadb_core::index::IndexMode; - use cortexadb_core::store::CheckpointPolicy; + use cortexadb_core::{ + engine::{CapacityPolicy, SyncPolicy}, + index::IndexMode, + store::CheckpointPolicy, + }; let dir = TempDir::new().unwrap(); let config = CortexaDBConfig { @@ -279,7 +281,7 @@ fn test_capacity_eviction_keeps_max_entries() { db.add(vec![0.0, 0.0, 1.0], None).unwrap(); // After inserting 3 entries with max_entries=2, one should have been evicted. - assert_eq!(db.stats().entries, 2, "max_entries=2 must evict oldest entry"); + assert_eq!(db.stats().unwrap().entries, 2, "max_entries=2 must evict oldest entry"); } // --------------------------------------------------------------------------- @@ -288,10 +290,14 @@ fn test_capacity_eviction_keeps_max_entries() { #[test] fn test_hnsw_recovery_sync() { - use cortexadb_core::engine::SyncPolicy; - use cortexadb_core::index::hnsw::{HnswConfig, MetricKind}; - use cortexadb_core::index::IndexMode; - use cortexadb_core::store::CheckpointPolicy; + use cortexadb_core::{ + engine::SyncPolicy, + index::{ + hnsw::{HnswConfig, MetricKind}, + IndexMode, + }, + store::CheckpointPolicy, + }; let dir = TempDir::new().unwrap(); let config = CortexaDBConfig { diff --git a/crates/cortexadb-py/src/lib.rs b/crates/cortexadb-py/src/lib.rs index bab911e..c1b222d 100644 --- a/crates/cortexadb-py/src/lib.rs +++ b/crates/cortexadb-py/src/lib.rs @@ -4,15 +4,18 @@ use std::collections::HashMap; -use pyo3::create_exception; -use pyo3::exceptions::PyException; -use pyo3::prelude::*; -use pyo3::types::PyDict; - -use cortexadb_core::chunker; -use cortexadb_core::engine::{CapacityPolicy, SyncPolicy}; -use cortexadb_core::facade; -use cortexadb_core::store::CheckpointPolicy; +use cortexadb_core::{ + chunker, + engine::{CapacityPolicy, SyncPolicy}, + facade, + store::CheckpointPolicy, +}; +use pyo3::{ + create_exception, + exceptions::{PyException, PyRuntimeError, PyValueError}, + prelude::*, + types::PyDict, +}; // --------------------------------------------------------------------------- // Custom exception @@ -355,10 +358,10 @@ impl PyCortexaDB { let db = facade::CortexaDB::open_with_config(path, config).map_err(map_cortexadb_err)?; // Validate dimension matches existing data. - let stats = db.stats(); + let stats = db.stats().map_err(|e| PyRuntimeError::new_err(e.to_string()))?; if stats.entries > 0 && stats.vector_dimension != dimension { - return Err(CortexaDBConfigError::new_err(format!( - "dimension mismatch: database has dimension={}, but open() was called with dimension={}", + return Err(PyValueError::new_err(format!( + "Database initialized with dimension {} but opened with dimension {}", stats.vector_dimension, dimension, ))); } @@ -625,27 +628,27 @@ impl PyCortexaDB { /// Returns: /// Stats: Current database statistics. #[pyo3(text_signature = "(self)")] - fn stats(&self) -> PyStats { - let s = self.inner.stats(); - PyStats { + fn stats(&self) -> PyResult { + let s = self.inner.stats().map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + Ok(PyStats { entries: s.entries, indexed_embeddings: s.indexed_embeddings, wal_length: s.wal_length, vector_dimension: s.vector_dimension, storage_version: s.storage_version, - } + }) } - fn __repr__(&self) -> String { - let s = self.inner.stats(); - format!( - "CortexaDB(entries={}, dimension={}, indexed={})", + fn __repr__(&self) -> PyResult { + let s = self.inner.stats().map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + Ok(format!( + "CortexaDB(entries={}, dim={}, indexed={})", s.entries, self.dimension, s.indexed_embeddings, - ) + )) } fn __len__(&self) -> usize { - self.inner.stats().entries + self.inner.stats().map(|s| s.entries).unwrap_or(0) } fn __enter__(slf: Py) -> Py { diff --git a/rustfmt.toml b/rustfmt.toml index 8c114a0..798fc31 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -2,5 +2,5 @@ edition = "2024" newline_style = "Unix" use_small_heuristics = "Max" max_width = 100 -merge_imports = true +imports_granularity = "Crate" group_imports = "StdExternalCrate"