Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/cortexadb-core/src/core/command.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};

use super::memory_entry::MemoryEntry;
use crate::core::memory_entry::MemoryId;
use serde::{Deserialize, Serialize};

/// State-mutating commands for the state machine
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
3 changes: 2 additions & 1 deletion crates/cortexadb-core/src/core/memory_entry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

use serde::{Deserialize, Serialize};

/// Unique identifier for a memory entry
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct MemoryId(pub u64);
Expand Down
9 changes: 6 additions & 3 deletions crates/cortexadb-core/src/core/state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};

use serde::{Deserialize, Serialize};
use thiserror::Error;

use super::command::Command;
use super::memory_entry::{MemoryEntry, MemoryId};
use super::{
command::Command,
memory_entry::{MemoryEntry, MemoryId},
};

#[derive(Error, Debug)]
pub enum StateMachineError {
Expand Down
22 changes: 16 additions & 6 deletions crates/cortexadb-core/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use std::path::Path;

use thiserror::Error;

use crate::core::command::Command;
use crate::core::memory_entry::{MemoryEntry, MemoryId};
use crate::core::state_machine::StateMachine;
use crate::storage::segment::SegmentStorage;
use crate::storage::wal::{CommandId, WriteAheadLog};
use crate::{
core::{
command::Command,
memory_entry::{MemoryEntry, MemoryId},
state_machine::StateMachine,
},
storage::{
segment::SegmentStorage,
wal::{CommandId, WriteAheadLog},
},
};

#[derive(Error, Debug)]
pub enum EngineError {
Expand All @@ -27,6 +34,8 @@ pub enum EngineError {
CheckpointWalGap { checkpoint_last_applied: u64, wal_highest: Option<u64> },
#[error("Engine not recovered properly")]
NotRecovered,
#[error("Lock was poisoned during {0}")]
LockPoisoned(&'static str),
}

pub type Result<T> = std::result::Result<T, EngineError>;
Expand Down Expand Up @@ -481,9 +490,10 @@ impl Engine {

#[cfg(test)]
mod tests {
use tempfile::TempDir;

use super::*;
use crate::core::memory_entry::{MemoryEntry, MemoryId};
use tempfile::TempDir;

#[test]
fn test_engine_creation() {
Expand Down
49 changes: 28 additions & 21 deletions crates/cortexadb-core/src/facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@
//! It wraps [`CortexaDBStore`] and hides planner/engine/index details behind
//! five core operations: `open`, `add`, `search`, `connect`, `compact`.

use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};

use crate::core::memory_entry::{MemoryEntry, MemoryId};
use crate::core::state_machine::StateMachineError;
use crate::engine::{CapacityPolicy, SyncPolicy};
use crate::index::IndexMode;
use crate::query::hybrid::{QueryEmbedder, QueryOptions};
use crate::store::{CheckpointPolicy, CortexaDBStore, CortexaDBStoreError};
use std::{
collections::HashMap,
path::PathBuf,
time::{SystemTime, UNIX_EPOCH},
};

use crate::{
core::{
memory_entry::{MemoryEntry, MemoryId},
state_machine::StateMachineError,
},
engine::{CapacityPolicy, SyncPolicy},
index::IndexMode,
query::hybrid::{QueryEmbedder, QueryOptions},
store::{CheckpointPolicy, CortexaDBStore, CortexaDBStoreError},
};

// ---------------------------------------------------------------------------
// Public types
Expand Down Expand Up @@ -490,14 +496,14 @@ impl CortexaDB {
}

/// Get database statistics.
pub fn stats(&self) -> Stats {
Stats {
pub fn stats(&self) -> Result<Stats> {
Ok(Stats {
entries: self.inner.state_machine().len(),
indexed_embeddings: self.inner.indexed_embeddings(),
wal_length: self.inner.wal_len(),
wal_length: self.inner.wal_len()?,
vector_dimension: self.inner.vector_dimension(),
storage_version: 1,
}
})
Comment on lines 498 to +506
}

/// Access the underlying `CortexaDBStore` for advanced operations.
Expand All @@ -512,9 +518,10 @@ impl CortexaDB {

#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;

use super::*;

#[test]
fn test_open_add_search() {
let temp = TempDir::new().unwrap();
Expand All @@ -540,7 +547,7 @@ mod tests {
let id2 = db.add(vec![0.0, 1.0, 0.0], None).unwrap();
db.connect(id1, id2, "related").unwrap();

let stats = db.stats();
let stats = db.stats().unwrap();
assert_eq!(stats.entries, 2);
assert_eq!(stats.indexed_embeddings, 2);
assert_eq!(stats.vector_dimension, 3);
Expand All @@ -560,7 +567,7 @@ mod tests {
// Reopen — should recover from WAL.
let db = CortexaDB::open(path.to_str().unwrap(), 3).unwrap();
let stats = db.stats();
assert_eq!(stats.entries, 2);
assert_eq!(stats.unwrap().entries, 2);

let hits = db.search(vec![1.0, 0.0, 0.0], 5, None).unwrap();
assert!(!hits.is_empty());
Expand Down Expand Up @@ -591,7 +598,7 @@ mod tests {
}

let db = CortexaDB::open_with_config(path.to_str().unwrap(), config).unwrap();
let stats = db.stats();
let stats = db.stats().unwrap();
assert_eq!(stats.entries, 3);
}

Expand Down Expand Up @@ -632,7 +639,7 @@ mod tests {
let _id2 = db.add_in_collection("agent_c", vec![0.0, 0.0, 1.0], None).unwrap();

let stats = db.stats();
assert_eq!(stats.entries, 2);
assert_eq!(stats.unwrap().entries, 2);

let m1 = db.get_memory(id1).unwrap();
assert_eq!(m1.collection, "agent_b");
Expand All @@ -645,10 +652,10 @@ mod tests {
let db = CortexaDB::open(path.to_str().unwrap(), 3).unwrap();

let id = db.add(vec![1.0, 0.0, 0.0], None).unwrap();
assert_eq!(db.stats().entries, 1);
assert_eq!(db.stats().unwrap().entries, 1);

db.delete(id).unwrap();
assert_eq!(db.stats().entries, 0, "entry count should be 0 after delete");
assert_eq!(db.stats().unwrap().entries, 0, "entry count should be 0 after delete");
}

#[test]
Expand Down
16 changes: 10 additions & 6 deletions crates/cortexadb-core/src/index/combined.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::collections::{HashMap, HashSet};

use thiserror::Error;

use crate::core::memory_entry::MemoryId;
use crate::core::state_machine::StateMachine;
use crate::index::graph::GraphIndex;
use crate::index::hnsw::{HnswBackend, HnswConfig};
use crate::index::temporal::TemporalIndex;
use crate::index::vector::{VectorBackendMode, VectorIndex};
use crate::{
core::{memory_entry::MemoryId, state_machine::StateMachine},
index::{
graph::GraphIndex,
hnsw::{HnswBackend, HnswConfig},
temporal::TemporalIndex,
vector::{VectorBackendMode, VectorIndex},
},
};

#[derive(Error, Debug)]
pub enum CombinedError {
Expand Down
7 changes: 3 additions & 4 deletions crates/cortexadb-core/src/index/graph.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::collections::{hash_map::Entry, HashMap, VecDeque};

use thiserror::Error;

use crate::core::memory_entry::MemoryId;
use crate::core::state_machine::StateMachine;
use crate::core::{memory_entry::MemoryId, state_machine::StateMachine};

#[derive(Error, Debug)]
pub enum GraphError {
Expand Down
7 changes: 5 additions & 2 deletions crates/cortexadb-core/src/index/hnsw.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::{
path::Path,
sync::{Arc, RwLock},
};

use thiserror::Error;

use crate::core::memory_entry::MemoryId;
Expand Down
3 changes: 1 addition & 2 deletions crates/cortexadb-core/src/index/temporal.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use thiserror::Error;

use crate::core::memory_entry::MemoryId;
use crate::core::state_machine::StateMachine;
use crate::core::{memory_entry::MemoryId, state_machine::StateMachine};

#[derive(Error, Debug)]
pub enum TemporalError {
Expand Down
13 changes: 9 additions & 4 deletions crates/cortexadb-core/src/index/vector.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use rayon::prelude::*;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use thiserror::Error;

use crate::core::memory_entry::MemoryId;
use crate::index::hnsw::{HnswBackend, HnswConfig};
use crate::{
core::memory_entry::MemoryId,
index::hnsw::{HnswBackend, HnswConfig},
};

#[derive(Error, Debug)]
pub enum VectorError {
Expand Down
53 changes: 33 additions & 20 deletions crates/cortexadb-core/src/query/executor.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::collections::{HashMap, HashSet};
use std::sync::{Mutex, OnceLock};
use std::time::Instant;

use crate::core::memory_entry::MemoryId;
use crate::core::state_machine::StateMachine;
use crate::index::combined::IndexLayer;
use crate::index::graph::GraphIndex;
use crate::query::hybrid::{HybridQueryError, QueryEmbedder, QueryHit};
use crate::query::intent::get_intent_policy;
use crate::query::planner::{ExecutionPath, QueryPlan, QueryPlanner};
use std::{
collections::{HashMap, HashSet},
sync::{Mutex, OnceLock},
time::Instant,
};

use crate::{
core::{memory_entry::MemoryId, state_machine::StateMachine},
index::{combined::IndexLayer, graph::GraphIndex},
query::{
hybrid::{HybridQueryError, QueryEmbedder, QueryHit},
intent::get_intent_policy,
planner::{ExecutionPath, QueryPlan, QueryPlanner},
},
};

pub type Result<T> = std::result::Result<T, HybridQueryError>;

Expand Down Expand Up @@ -53,20 +57,24 @@ fn intent_anchor_cache() -> &'static Mutex<HashMap<usize, IntentAnchors>> {

pub(crate) fn clear_intent_anchor_cache() {
let cache = intent_anchor_cache();
let mut guard = cache.lock().expect("intent anchor cache lock poisoned");
guard.clear();
if let Ok(mut guard) = cache.lock() {
guard.clear();
} else {
log::warn!("intent anchor cache lock poisoned, unable to clear");
}
}

fn load_or_build_intent_anchors(
embedder: &dyn QueryEmbedder,
dim: usize,
) -> std::result::Result<IntentAnchors, String> {
let cache = intent_anchor_cache();
{
let guard = cache.lock().expect("intent anchor cache lock poisoned");
if let Ok(guard) = cache.lock() {
if let Some(found) = guard.get(&dim) {
return Ok(found.clone());
}
} else {
log::warn!("intent anchor cache lock poisoned, rebuilding anchor from scratch");
}

let policy = get_intent_policy();
Expand All @@ -77,8 +85,9 @@ fn load_or_build_intent_anchors(
return Err("intent anchor embedding dimension mismatch".to_string());
}
let anchors = IntentAnchors { semantic, recency, graph };
let mut guard = cache.lock().expect("intent anchor cache lock poisoned");
guard.insert(dim, anchors.clone());
if let Ok(mut guard) = cache.lock() {
guard.insert(dim, anchors.clone());
}
Ok(anchors)
}

Expand Down Expand Up @@ -354,9 +363,13 @@ fn build_ranked_hits(
#[cfg(test)]
mod tests {
use super::*;
use crate::core::memory_entry::{MemoryEntry, MemoryId};
use crate::query::hybrid::{GraphExpansionOptions, QueryOptions};
use crate::query::planner::{ExecutionPath, QueryPlanner};
use crate::{
core::memory_entry::{MemoryEntry, MemoryId},
query::{
hybrid::{GraphExpansionOptions, QueryOptions},
planner::{ExecutionPath, QueryPlanner},
},
};

struct TestEmbedder;
impl QueryEmbedder for TestEmbedder {
Expand Down
8 changes: 4 additions & 4 deletions crates/cortexadb-core/src/query/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::collections::{HashMap, HashSet};

use thiserror::Error;

use crate::core::memory_entry::MemoryId;
use crate::core::state_machine::StateMachine;
use crate::index::combined::IndexLayer;
use crate::index::graph::GraphIndex;
use crate::{
core::{memory_entry::MemoryId, state_machine::StateMachine},
index::{combined::IndexLayer, graph::GraphIndex},
};

#[derive(Error, Debug)]
pub enum HybridQueryError {
Expand Down
Loading
Loading