diff --git a/Cargo.toml b/Cargo.toml index 83e0755f..0759c0ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "vectorless" -version = "0.1.18" +version = "0.1.19" edition = "2024" authors = ["zTgx "] description = "Hierarchical, reasoning-native document intelligence engine" @@ -62,6 +62,13 @@ lru = "0.12" # Checksum sha2 = "0.10" +# BLAKE2b hashing for fingerprints +blake2 = "0.10" +base64 = "0.22" + +# Synchronization primitives (for memo store) +parking_lot = "0.12" + # Compression flate2 = "1.0" diff --git a/docs/design/memo.md b/docs/design/memo.md new file mode 100644 index 00000000..99d6f851 --- /dev/null +++ b/docs/design/memo.md @@ -0,0 +1,314 @@ +# LLM Memoization System + +## Overview + +The memoization system provides intelligent caching for expensive LLM operations, reducing API costs and latency while maintaining semantic correctness. + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ Memoization Layer │ +├─────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ +│ │ Engine │───▶│ Retriever │───▶│ LlmPilot │ │ +│ │ Builder │ │ Pipeline │ │ │ │ +│ └──────────────┘ └──────────────┘ └──────────────┘ │ +│ │ │ │ │ +│ └───────────────────┴───────────────────┘ │ +│ │ │ +│ ┌────────▼────────┐ │ +│ │ MemoStore │ │ +│ │ │ │ +│ │ ┌───────────┐ │ │ +│ │ │ LRU Cache │ │ │ +│ │ └───────────┘ │ │ +│ │ ┌───────────┐ │ │ +│ │ │ Stats │ │ │ +│ │ └───────────┘ │ │ +│ │ ┌───────────┐ │ │ +│ │ │ TTL │ │ │ +│ │ └───────────┘ │ │ +│ └─────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +## Key Components + +### MemoKey + +Content-addressed cache key that ensures cache hits only occur when inputs are semantically identical. + +```rust +pub struct MemoKey { + /// Type of operation (Summary, PilotDecision, QueryAnalysis, etc.) + pub op_type: MemoOpType, + + /// Fingerprint of the input content (BLAKE2b-128) + pub input_fp: Fingerprint, + + /// Model identifier for cache invalidation when model changes + pub model_id: Option, + + /// Version for cache invalidation when algorithm changes + pub version: u32, + + /// Additional context fingerprint (e.g., navigation context for pilot) + pub context_fp: Fingerprint, +} +``` + +### MemoStore + +Thread-safe LRU cache with TTL expiration and optional disk persistence. + +```rust +pub struct MemoStore { + cache: Arc>>, + stats: Arc>, + ttl: Duration, + model_id: Option, + version: u32, +} +``` + +**Features:** +- LRU eviction policy (default: 10,000 entries) +- TTL-based expiration (default: 7 days) +- Optional disk persistence (JSON format) +- Thread-safe access via `parking_lot::RwLock` + +### Integration Points + +| Component | Operation Type | Description | +|-----------|---------------|-------------| +| `LlmSummaryGenerator` | `Summary` | Node summary generation | +| `LlmPilot` | `PilotDecision` | Navigation decision caching | +| Query Analyzer | `QueryAnalysis` | Query complexity/intent analysis | +| Content Extractor | `Extraction` | Structured data extraction | + +## Design Principles + +### 1. Layered Architecture + +Each layer can be independently configured and tested: + +``` +Engine → PipelineRetriever → LlmPilot → MemoStore +``` + +Benefits: +- `MemoStore` can be reused by multiple components +- Each layer has single responsibility +- Easy to mock for testing + +### 2. Non-Intrusive Integration + +Memoization is optional and doesn't break existing APIs: + +```rust +// Without memoization (works as before) +let pilot = LlmPilot::new(client, config); + +// With memoization (opt-in) +let pilot = LlmPilot::new(client, config) + .with_memo_store(store); +``` + +### 3. Smart Cache Key Design + +Cache keys include semantic context for precise invalidation: + +```rust +// Key automatically invalidates when: +// - Model changes (model_id field) +// - Algorithm version changes (version field) +// - Input content changes (input_fp field) +// - Navigation context changes (context_fp field) +``` + +### 4. Cost Tracking + +The system tracks savings to quantify the value of caching: + +```rust +pub struct MemoStats { + pub entries: usize, + pub hits: u64, + pub misses: u64, + pub tokens_saved: u64, + pub cost_saved: f64, +} + +impl MemoStats { + pub fn hit_rate(&self) -> f64 { + let total = self.hits + self.misses; + if total == 0 { 0.0 } else { self.hits as f64 / total as f64 } + } +} +``` + +### 5. Flexible Invalidation Strategies + +```rust +// Time-based (automatic) +store.with_ttl(Duration::days(7)) + +// By operation type +store.invalidate_by_op_type(MemoOpType::PilotDecision) + +// By model prefix +store.invalidate_by_model_prefix("gpt-4") + +// Manual +store.remove(&key) +store.clear() +``` + +## Usage Examples + +### Basic Setup + +```rust +use vectorless::memo::MemoStore; +use chrono::Duration; + +// Create with custom settings +let store = MemoStore::new() + .with_ttl(Duration::days(7)) + .with_model("gpt-4o") + .with_version(1); +``` + +### With Engine Builder + +```rust +use vectorless::client::EngineBuilder; + +// Option 1: Custom memo store +let memo_store = MemoStore::new() + .with_ttl(Duration::days(7)) + .with_model("gpt-4o"); + +let engine = EngineBuilder::new() + .with_workspace("./data") + .with_memo_store(memo_store) + .with_openai(api_key) + .build() + .await?; + +// Option 2: Default (auto-created with config model) +let engine = EngineBuilder::new() + .with_workspace("./data") + .with_openai(api_key) + .build() + .await?; +``` + +### Monitoring Cache Performance + +```rust +// Async stats (includes all metrics) +let stats = store.stats().await; +println!("Hit rate: {:.2}%", stats.hit_rate() * 100.0); +println!("Tokens saved: {}", stats.tokens_saved); + +// Sync snapshot (for monitoring without async) +let stats = store.stats_snapshot(); +println!("Cache entries: {}", stats.entries); +``` + +### Cache Invalidation + +```rust +// When switching models +store.invalidate_by_model_prefix("gpt-3.5"); + +// When algorithm changes +store.invalidate_by_op_type(MemoOpType::PilotDecision); + +// Manual pruning of expired entries +let removed = store.prune_expired(); +``` + +### Persistence + +```rust +// Save to disk +store.save(Path::new("./cache/memo.json")).await?; + +// Load from disk (on startup) +store.load(Path::new("./cache/memo.json")).await?; +``` + +## Performance Characteristics + +### Concurrency + +| Component | Lock Type | Rationale | +|-----------|-----------|-----------| +| LRU Cache | `parking_lot::RwLock` | High-performance, allows concurrent reads | +| Statistics | `tokio::sync::RwLock` | Async-compatible for integration | +| Atomic Stats | `AtomicU64` | Lock-free for hot paths | + +### Memory + +- Default capacity: 10,000 entries +- Per-entry overhead: ~200-500 bytes (depending on cached value size) +- Estimated memory: 2-5 MB at full capacity + +### Latency + +| Operation | Typical Latency | +|-----------|-----------------| +| Cache hit | < 1 µs | +| Cache miss (no compute) | < 5 µs | +| Cache miss (with LLM) | 100-2000 ms | + +## Cost Savings Estimation + +### Typical Document Retrieval Scenario + +| Scenario | Without Cache | With Cache | Savings | +|----------|---------------|------------|---------| +| First query | 5-10 LLM calls | 5-10 LLM calls | 0% | +| Repeated query | 5-10 LLM calls | 0-1 LLM calls | **80-100%** | +| Similar query | 5-10 LLM calls | 2-3 LLM calls | **50-70%** | + +### Token Savings Example + +```rust +// Assuming GPT-4 pricing: $0.03 / 1K input tokens, $0.06 / 1K output tokens +// Average Pilot decision: 500 input tokens, 100 output tokens + +// Without cache (100 queries): +// Cost = 100 * (500 * 0.03/1000 + 100 * 0.06/1000) = $2.10 + +// With 80% hit rate: +// Cost = 20 * $0.021 = $0.42 +// Savings = $1.68 (80%) +``` + +## Future Improvements + +### Potential Enhancements + +1. **Semantic Cache Keys**: Use embedding similarity for fuzzy matching +2. **Distributed Cache**: Share cache across multiple instances via Redis +3. **Compression**: Compress cached values for large responses +4. **Warm-up**: Pre-populate cache with common patterns +5. **Analytics Dashboard**: Real-time visualization of cache performance + +### Implementation Notes + +- Consider using `AtomicU64` for all stats to eliminate async lock overhead +- Cache `MemoKey::fingerprint()` result for frequently used keys +- Add automatic periodic persistence with configurable interval + +## Related Documentation + +- [Fingerprint System](./fingerprint.md) - Content-addressed hashing +- [Incremental Indexing](./incremental.md) - Change detection for reindexing +- [Pilot Architecture](./pilot.md) - LLM-based navigation intelligence diff --git a/src/client/builder.rs b/src/client/builder.rs index 12db36bb..61621f23 100644 --- a/src/client/builder.rs +++ b/src/client/builder.rs @@ -36,6 +36,7 @@ use std::path::PathBuf; use crate::config::{Config, ConfigLoader, RetrievalConfig}; +use crate::memo::MemoStore; use crate::retrieval::PipelineRetriever; use crate::storage::Workspace; @@ -108,6 +109,9 @@ pub struct EngineBuilder { /// Precise mode flag. precise_mode: bool, + + /// Memo store for caching LLM decisions. + memo_store: Option, } impl EngineBuilder { @@ -126,6 +130,7 @@ impl EngineBuilder { top_k: None, fast_mode: false, precise_mode: false, + memo_store: None, } } @@ -192,6 +197,39 @@ impl EngineBuilder { self } + /// Set a memo store for caching LLM decisions. + /// + /// When enabled, the pilot will cache navigation decisions based on + /// context fingerprints, avoiding redundant API calls for similar + /// navigation scenarios. + /// + /// # Example + /// + /// ```rust,no_run + /// use vectorless::client::EngineBuilder; + /// use vectorless::memo::MemoStore; + /// use chrono::Duration; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<(), vectorless::BuildError> { + /// let memo_store = MemoStore::new() + /// .with_ttl(Duration::days(7)) + /// .with_model("gpt-4o"); + /// + /// let engine = EngineBuilder::new() + /// .with_workspace("./data") + /// .with_memo_store(memo_store) + /// .build() + /// .await?; + /// # Ok(()) + /// # } + /// ``` + #[must_use] + pub fn with_memo_store(mut self, store: MemoStore) -> Self { + self.memo_store = Some(store); + self + } + // ============================================================ // LLM Configuration // ============================================================ @@ -469,6 +507,17 @@ impl EngineBuilder { retriever.with_content_config(retrieval_config.content.to_aggregator_config()); } + // Add memo store if provided or create default + if let Some(memo_store) = self.memo_store { + retriever = retriever.with_memo_store(memo_store); + } else { + // Create default memo store with model from config + let memo_store = MemoStore::new() + .with_model(&retrieval_config.model) + .with_version(1); + retriever = retriever.with_memo_store(memo_store); + } + // Build engine Engine::with_components(config, workspace, retriever, executor) .await diff --git a/src/index/incremental/detector.rs b/src/index/incremental/detector.rs index 1db0d4fc..748d0f2b 100644 --- a/src/index/incremental/detector.rs +++ b/src/index/incremental/detector.rs @@ -2,16 +2,22 @@ // SPDX-License-Identifier: Apache-2.0 //! Change detection for incremental updates. +//! +//! This module provides fine-grained change detection using subtree fingerprints, +//! enabling precise identification of changed nodes without full reprocessing. use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::path::Path; use std::time::SystemTime; -use crate::document::DocumentTree; +use serde::{Deserialize, Serialize}; + +use crate::document::{DocumentTree, NodeId}; +use crate::utils::fingerprint::{Fingerprint, Fingerprinter, NodeFingerprint}; /// Type of change detected. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum ChangeType { /// Node was added. Added, @@ -23,27 +29,59 @@ pub enum ChangeType { Restructured, } +impl std::fmt::Display for ChangeType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ChangeType::Added => write!(f, "added"), + ChangeType::Removed => write!(f, "removed"), + ChangeType::Modified => write!(f, "modified"), + ChangeType::Restructured => write!(f, "restructured"), + } + } +} + /// A single change in the document. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeChange { /// Node ID (from old tree). pub node_id: Option, - /// Node title. + /// Node title (for human-readable output). pub title: String, /// Type of change. pub change_type: ChangeType, + /// Node fingerprint (for modified nodes). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub fingerprint: Option, +} + +impl NodeChange { + /// Create a new node change. + pub fn new(node_id: Option, title: String, change_type: ChangeType) -> Self { + Self { + node_id, + title, + change_type, + fingerprint: None, + } + } + + /// Add fingerprint information. + pub fn with_fingerprint(mut self, fp: NodeFingerprint) -> Self { + self.fingerprint = Some(fp); + self + } } /// Set of changes between two document versions. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ChangeSet { /// Added nodes. pub added: Vec, /// Removed nodes. pub removed: Vec, - /// Modified nodes. + /// Modified nodes (content changed). pub modified: Vec, - /// Restructured nodes. + /// Restructured nodes (children changed). pub restructured: Vec, } @@ -73,27 +111,104 @@ impl ChangeSet { self.modified.extend(other.modified); self.restructured.extend(other.restructured); } + + /// Get all changed node IDs. + pub fn changed_node_ids(&self) -> Vec<&str> { + let mut ids: Vec<&str> = Vec::new(); + for change in &self.added { + if let Some(ref id) = change.node_id { + ids.push(id.as_str()); + } + } + for change in &self.modified { + if let Some(ref id) = change.node_id { + ids.push(id.as_str()); + } + } + for change in &self.restructured { + if let Some(ref id) = change.node_id { + ids.push(id.as_str()); + } + } + ids + } +} + +/// Document-level change detection result. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DocumentChangeInfo { + /// Document ID. + pub doc_id: String, + /// Overall content fingerprint. + pub content_fp: Fingerprint, + /// Node-level fingerprints. + pub node_fingerprints: HashMap, + /// Last modification time. + pub modified_at: chrono::DateTime, + /// Processing version (incremented when processing algorithm changes). + pub processing_version: u32, +} + +impl DocumentChangeInfo { + /// Create a new document change info. + pub fn new(doc_id: &str) -> Self { + Self { + doc_id: doc_id.to_string(), + content_fp: Fingerprint::zero(), + node_fingerprints: HashMap::new(), + modified_at: chrono::Utc::now(), + processing_version: 1, + } + } + + /// Update from a tree. + pub fn update_from_tree(&mut self, tree: &DocumentTree) { + self.content_fp = compute_tree_fingerprint(tree); + self.node_fingerprints = compute_all_node_fingerprints(tree); + self.modified_at = chrono::Utc::now(); + } } /// Change detector for incremental updates. +/// +/// Supports both simple hash-based detection and fine-grained +/// subtree fingerprint-based detection. pub struct ChangeDetector { - /// Content hashes by document ID. - hashes: HashMap, + /// Content fingerprints by document ID. + content_fps: HashMap, + + /// Node-level fingerprints by document ID. + node_fps: HashMap>, /// File modification times by document ID. mtimes: HashMap, + + /// Processing versions by document ID. + processing_versions: HashMap, + + /// Current processing version (for algorithm upgrades). + current_processing_version: u32, } impl ChangeDetector { /// Create a new change detector. pub fn new() -> Self { Self { - hashes: HashMap::new(), + content_fps: HashMap::new(), + node_fps: HashMap::new(), mtimes: HashMap::new(), + processing_versions: HashMap::new(), + current_processing_version: 1, } } - /// Compute hash of content. + /// Set the current processing version. + pub fn with_processing_version(mut self, version: u32) -> Self { + self.current_processing_version = version; + self + } + + /// Compute hash of content (simple u64 hash). fn hash_content(content: &str) -> u64 { let mut hasher = std::collections::hash_map::DefaultHasher::new(); content.hash(&mut hasher); @@ -102,12 +217,10 @@ impl ChangeDetector { /// Check if a file needs reindexing based on mtime. pub fn needs_reindex_by_mtime(&self, doc_id: &str, path: &Path) -> bool { - // Check if we have recorded mtime let Some(recorded_mtime) = self.mtimes.get(doc_id) else { return true; // Never indexed }; - // Get current mtime let Ok(metadata) = std::fs::metadata(path) else { return true; // Can't read file }; @@ -119,21 +232,60 @@ impl ChangeDetector { current_mtime > *recorded_mtime } - /// Check if content needs reindexing based on hash. + /// Check if content needs reindexing based on simple hash. pub fn needs_reindex_by_hash(&self, doc_id: &str, content: &str) -> bool { let current_hash = Self::hash_content(content); - match self.hashes.get(doc_id) { - Some(recorded_hash) => *recorded_hash != current_hash, + match self.content_fps.get(doc_id) { + Some(recorded_fp) => { + // Compare first 8 bytes of fingerprint to hash + let recorded_hash = u64::from_le_bytes( + recorded_fp.as_bytes()[..8].try_into().unwrap_or([0u8; 8]), + ); + recorded_hash != current_hash + } None => true, } } - /// Record content hash and mtime for a document. + /// Check if document needs reindexing based on fingerprint. + pub fn needs_reindex_by_fingerprint(&self, doc_id: &str, new_fp: &Fingerprint) -> bool { + match self.content_fps.get(doc_id) { + Some(recorded_fp) => recorded_fp != new_fp, + None => true, + } + } + + /// Check if processing version has changed. + pub fn needs_reindex_by_version(&self, doc_id: &str) -> bool { + match self.processing_versions.get(doc_id) { + Some(recorded_version) => *recorded_version < self.current_processing_version, + None => true, + } + } + + /// Record document state after indexing. pub fn record(&mut self, doc_id: &str, content: &str, path: Option<&Path>) { - // Record hash - let hash = Self::hash_content(content); - self.hashes.insert(doc_id.to_string(), hash); + self.record_with_tree(doc_id, content, None, path); + } + + /// Record document state with tree (for fine-grained detection). + pub fn record_with_tree( + &mut self, + doc_id: &str, + content: &str, + tree: Option<&DocumentTree>, + path: Option<&Path>, + ) { + // Record content fingerprint + let content_fp = Fingerprint::from_str(content); + self.content_fps.insert(doc_id.to_string(), content_fp); + + // Record node fingerprints if tree provided + if let Some(tree) = tree { + let node_fps = compute_all_node_fingerprints(tree); + self.node_fps.insert(doc_id.to_string(), node_fps); + } // Record mtime if path provided if let Some(path) = path { @@ -143,47 +295,117 @@ impl ChangeDetector { } } } + + // Record processing version + self.processing_versions.insert( + doc_id.to_string(), + self.current_processing_version, + ); } - /// Compare two trees and detect changes. - pub fn detect_changes(&self, old_tree: &DocumentTree, new_tree: &DocumentTree) -> ChangeSet { + /// Record document from ChangeInfo. + pub fn record_change_info(&mut self, info: &DocumentChangeInfo, path: Option<&Path>) { + self.content_fps + .insert(info.doc_id.clone(), info.content_fp); + self.node_fps + .insert(info.doc_id.clone(), info.node_fingerprints.clone()); + self.processing_versions + .insert(info.doc_id.clone(), info.processing_version); + + if let Some(path) = path { + if let Ok(metadata) = std::fs::metadata(path) { + if let Ok(mtime) = metadata.modified() { + self.mtimes.insert(info.doc_id.clone(), mtime); + } + } + } + } + + /// Detect changes between two trees using fingerprints. + pub fn detect_changes( + &self, + old_tree: &DocumentTree, + new_tree: &DocumentTree, + ) -> ChangeSet { let mut changes = ChangeSet::new(); - // Collect nodes from both trees - let old_nodes = self.collect_node_info(old_tree); - let new_nodes = self.collect_node_info(new_tree); + // Collect fingerprints from both trees + let old_fps = compute_all_node_fingerprints(old_tree); + let new_fps = compute_all_node_fingerprints(new_tree); + + // Build title -> (string_key, Fingerprint) maps by traversing trees + // We store owned Strings to avoid lifetime issues + let old_by_title: HashMap = { + let mut map = HashMap::new(); + for node_id in old_tree.traverse() { + if let Some(node) = old_tree.get(node_id) { + let key = node.node_id.clone().unwrap_or_else(|| format!("node_{:?}", node_id.0)); + if let Some(fp) = old_fps.get(&key) { + map.insert(node.title.clone(), (key, fp.clone())); + } + } + } + map + }; + + let new_by_title: HashMap = { + let mut map = HashMap::new(); + for node_id in new_tree.traverse() { + if let Some(node) = new_tree.get(node_id) { + let key = node.node_id.clone().unwrap_or_else(|| format!("node_{:?}", node_id.0)); + if let Some(fp) = new_fps.get(&key) { + map.insert(node.title.clone(), (key, fp.clone())); + } + } + } + map + }; // Find added nodes - for (title, info) in &new_nodes { - if !old_nodes.contains_key(title) { - changes.added.push(NodeChange { - node_id: info.node_id.clone(), - title: title.clone(), - change_type: ChangeType::Added, - }); + for (title, (node_key, fp)) in &new_by_title { + if !old_by_title.contains_key(title) { + changes.added.push( + NodeChange::new(Some(node_key.clone()), title.clone(), ChangeType::Added) + .with_fingerprint(fp.clone()), + ); } } // Find removed nodes - for (title, info) in &old_nodes { - if !new_nodes.contains_key(title) { - changes.removed.push(NodeChange { - node_id: info.node_id.clone(), - title: title.clone(), - change_type: ChangeType::Removed, - }); + for (title, (node_key, fp)) in &old_by_title { + if !new_by_title.contains_key(title) { + changes.removed.push( + NodeChange::new( + Some(node_key.clone()), + title.clone(), + ChangeType::Removed, + ) + .with_fingerprint(fp.clone()), + ); } } // Find modified nodes - for (title, new_info) in &new_nodes { - if let Some(old_info) = old_nodes.get(title) { - if old_info.content_hash != new_info.content_hash { - changes.modified.push(NodeChange { - node_id: new_info.node_id.clone(), - title: title.clone(), - change_type: ChangeType::Modified, - }); + for (title, (new_key, new_fp)) in &new_by_title { + if let Some((_old_key, old_fp)) = old_by_title.get(title) { + if new_fp.content_changed(old_fp) { + changes.modified.push( + NodeChange::new( + Some(new_key.clone()), + title.clone(), + ChangeType::Modified, + ) + .with_fingerprint(new_fp.clone()), + ); + } else if new_fp.subtree_changed(old_fp) { + changes.restructured.push( + NodeChange::new( + Some(new_key.clone()), + title.clone(), + ChangeType::Restructured, + ) + .with_fingerprint(new_fp.clone()), + ); } } } @@ -191,35 +413,78 @@ impl ChangeDetector { changes } - /// Collect node information from a tree. - fn collect_node_info(&self, tree: &DocumentTree) -> HashMap { - let mut info = HashMap::new(); + /// Get nodes that need reprocessing (summary regeneration). + /// + /// This returns nodes where either: + /// - Content changed (summary may need update) + /// - Processing version changed (all summaries need update) + pub fn get_nodes_needing_reprocess( + &self, + doc_id: &str, + new_tree: &DocumentTree, + ) -> Option> { + let old_fps = self.node_fps.get(doc_id)?; + let new_fps = compute_all_node_fingerprints(new_tree); + + let mut needs_reprocess = Vec::new(); + + // If processing version changed, all nodes need reprocessing + if self.needs_reindex_by_version(doc_id) { + return Some(new_fps.keys().cloned().collect()); + } - for node_id in tree.traverse() { - if let Some(node) = tree.get(node_id) { - // Skip root node - if node.depth == 0 { - continue; + // Otherwise, only changed nodes need reprocessing + for (node_key, new_fp) in &new_fps { + if let Some(old_fp) = old_fps.get(node_key) { + // Content changed or subtree structure changed + if new_fp.content_changed(old_fp) || new_fp.subtree_changed(old_fp) { + needs_reprocess.push(node_key.clone()); } - - info.insert( - node.title.clone(), - NodeInfo { - node_id: node.node_id.clone(), - content_hash: Self::hash_content(&node.content), - child_count: tree.children(node_id).len(), - }, - ); + } else { + // New node + needs_reprocess.push(node_key.clone()); } } - info + Some(needs_reprocess) } /// Clear stored data for a document. pub fn clear(&mut self, doc_id: &str) { - self.hashes.remove(doc_id); + self.content_fps.remove(doc_id); + self.node_fps.remove(doc_id); self.mtimes.remove(doc_id); + self.processing_versions.remove(doc_id); + } + + /// Get the current content fingerprint for a document. + pub fn get_content_fingerprint(&self, doc_id: &str) -> Option<&Fingerprint> { + self.content_fps.get(doc_id) + } + + /// Get all node fingerprints for a document. + pub fn get_node_fingerprints(&self, doc_id: &str) -> Option<&HashMap> { + self.node_fps.get(doc_id) + } + + /// Serialize state for persistence. + pub fn to_state(&self) -> ChangeDetectorState { + ChangeDetectorState { + content_fps: self.content_fps.clone(), + node_fps: self.node_fps.clone(), + processing_versions: self.processing_versions.clone(), + } + } + + /// Restore state from persistence. + pub fn from_state(state: ChangeDetectorState) -> Self { + Self { + content_fps: state.content_fps, + node_fps: state.node_fps, + mtimes: HashMap::new(), + processing_versions: state.processing_versions, + current_processing_version: 1, + } } } @@ -229,12 +494,193 @@ impl Default for ChangeDetector { } } -/// Internal node information for change detection. -struct NodeInfo { - /// Node ID (if assigned). - node_id: Option, - /// Hash of node content. - content_hash: u64, - /// Number of direct children. - child_count: usize, +/// Serializable state for change detector. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChangeDetectorState { + /// Content fingerprints by document ID. + pub content_fps: HashMap, + /// Node fingerprints by document ID. + pub node_fps: HashMap>, + /// Processing versions by document ID. + pub processing_versions: HashMap, +} + +// ============================================================================= +// Helper Functions +// ============================================================================= + +/// Compute the overall fingerprint for a tree. +pub fn compute_tree_fingerprint(tree: &DocumentTree) -> Fingerprint { + let root_fp = compute_node_fingerprint(tree, tree.root()); + root_fp.subtree +} + +/// Compute content fingerprint for a single node. +fn compute_node_content_fp(tree: &DocumentTree, node_id: NodeId) -> Fingerprint { + let node = match tree.get(node_id) { + Some(n) => n, + None => return Fingerprint::zero(), + }; + + Fingerprinter::new() + .with_str(&node.title) + .with_str(&node.content) + .with_option_str(node.node_id.as_deref()) + .into_fingerprint() +} + +/// Compute fingerprint for a node and its subtree. +fn compute_node_fingerprint(tree: &DocumentTree, node_id: NodeId) -> NodeFingerprint { + let node = match tree.get(node_id) { + Some(n) => n, + None => return NodeFingerprint::zero(), + }; + + // Content fingerprint + let content_fp = Fingerprinter::new() + .with_str(&node.title) + .with_str(&node.content) + .with_option_str(node.node_id.as_deref()) + .into_fingerprint(); + + // Check if leaf node + let children = tree.children(node_id); + if children.is_empty() { + return NodeFingerprint::leaf(content_fp); + } + + // Compute subtree fingerprint from children + let mut subtree_fp = Fingerprinter::new(); + subtree_fp.write_fingerprint(&content_fp); + + for child_id in children { + let child_fp = compute_node_fingerprint(tree, child_id); + subtree_fp.write_fingerprint(&child_fp.subtree); + } + + NodeFingerprint::new(content_fp, subtree_fp.into_fingerprint()) +} + +/// Compute fingerprints for all nodes in a tree. +/// Returns a map from string key (for persistence) to NodeFingerprint. +pub fn compute_all_node_fingerprints(tree: &DocumentTree) -> HashMap { + let mut fingerprints = HashMap::new(); + + for node_id in tree.traverse() { + if let Some(node) = tree.get(node_id) { + let key = node.node_id.clone().unwrap_or_else(|| format!("node_{:?}", node_id.0)); + let fp = compute_node_fingerprint(tree, node_id); + fingerprints.insert(key, fp); + } + } + + fingerprints +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::document::DocumentTree; + + #[test] + fn test_change_detector_new() { + let detector = ChangeDetector::new(); + assert!(detector.content_fps.is_empty()); + } + + #[test] + fn test_needs_reindex_by_hash() { + let mut detector = ChangeDetector::new(); + + // First time: always needs reindex + assert!(detector.needs_reindex_by_hash("doc1", "content")); + + // Record the content + detector.record("doc1", "content", None); + + // Same content: no reindex needed + assert!(!detector.needs_reindex_by_hash("doc1", "content")); + + // Different content: needs reindex + assert!(detector.needs_reindex_by_hash("doc1", "new content")); + } + + #[test] + fn test_detect_changes() { + let detector = ChangeDetector::new(); + + // Create two simple trees + let mut tree1 = DocumentTree::new("Root", ""); + let child1 = tree1.add_child(tree1.root(), "Section 1", "Content 1"); + tree1.add_child(tree1.root(), "Section 2", "Content 2"); + + let mut tree2 = DocumentTree::new("Root", ""); + tree2.add_child(tree2.root(), "Section 1", "Content 1"); // Same + tree2.add_child(tree2.root(), "Section 2", "Content modified"); // Changed + tree2.add_child(tree2.root(), "Section 3", "Content 3"); // New + + let changes = detector.detect_changes(&tree1, &tree2); + + assert!(!changes.is_empty()); + assert!(!changes.added.is_empty()); // Section 3 added + assert!(!changes.modified.is_empty()); // Section 2 modified + } + + #[test] + fn test_change_set() { + let mut changes = ChangeSet::new(); + assert!(changes.is_empty()); + + changes.added.push(NodeChange::new( + Some("node1".to_string()), + "Title".to_string(), + ChangeType::Added, + )); + + assert!(!changes.is_empty()); + assert_eq!(changes.total_changes(), 1); + } + + #[test] + fn test_processing_version() { + let mut detector = ChangeDetector::new().with_processing_version(2); + detector.record("doc1", "content", None); + + // Version matches, no reindex needed + assert!(!detector.needs_reindex_by_version("doc1")); + + // Create new detector with higher version + let detector2 = ChangeDetector::new().with_processing_version(3); + assert!(detector2.needs_reindex_by_version("doc1")); + } + + #[test] + fn test_node_fingerprint() { + let mut tree = DocumentTree::new("Root", "root content"); + let child = tree.add_child(tree.root(), "Child", "child content"); + + let root_fp = compute_node_fingerprint(&tree, tree.root()); + let child_fp = compute_node_fingerprint(&tree, child); + + // Child is a leaf, content == subtree + assert_eq!(child_fp.content, child_fp.subtree); + + // Root is not a leaf + assert_ne!(root_fp.content, root_fp.subtree); + } + + #[test] + fn test_fingerprint_serialization() { + let mut detector = ChangeDetector::new(); + let mut tree = DocumentTree::new("Root", "content"); + tree.add_child(tree.root(), "Section", "section content"); + + detector.record_with_tree("doc1", "content", Some(&tree), None); + + let state = detector.to_state(); + let json = serde_json::to_string(&state).unwrap(); + let restored: ChangeDetectorState = serde_json::from_str(&json).unwrap(); + + assert_eq!(state.content_fps, restored.content_fps); + } } diff --git a/src/index/incremental/mod.rs b/src/index/incremental/mod.rs index 51bf2352..741f4ae1 100644 --- a/src/index/incremental/mod.rs +++ b/src/index/incremental/mod.rs @@ -5,9 +5,20 @@ //! //! This module provides functionality to incrementally update //! an existing document index when the source document changes. +//! +//! # Features +//! +//! - **Fine-grained change detection**: Uses subtree fingerprints to identify +//! exactly which nodes changed +//! - **Processing version tracking**: Automatically reprocesses when algorithm +//! versions change +//! - **Partial updates**: Only reprocess changed nodes mod detector; mod updater; -pub use detector::{ChangeDetector, ChangeSet, ChangeType}; +pub use detector::{ + ChangeDetector, ChangeDetectorState, ChangeSet, ChangeType, DocumentChangeInfo, NodeChange, + compute_all_node_fingerprints, compute_tree_fingerprint, +}; pub use updater::PartialUpdater; diff --git a/src/index/stages/build.rs b/src/index/stages/build.rs index 1ab16d26..389bf25e 100644 --- a/src/index/stages/build.rs +++ b/src/index/stages/build.rs @@ -10,7 +10,7 @@ use tracing::info; use crate::document::{DocumentTree, NodeId}; use crate::error::Result; use crate::parser::RawNode; -use crate::util::estimate_tokens; +use crate::utils::estimate_tokens; use super::{IndexStage, StageResult}; use crate::index::ThinningConfig; diff --git a/src/index/stages/enhance.rs b/src/index/stages/enhance.rs index 278b572c..4fb29310 100644 --- a/src/index/stages/enhance.rs +++ b/src/index/stages/enhance.rs @@ -5,12 +5,14 @@ use super::async_trait; use std::sync::Arc; -use std::time::Instant; -use tracing::{info, warn}; +use std::time::{Duration, Instant}; +use tracing::{debug, info, warn}; -use crate::document::{DocumentTree, NodeId}; +use crate::document::{DocumentTree, NodeId, TreeNode}; use crate::error::Result; +use crate::utils::fingerprint::Fingerprint; use crate::llm::LlmClient; +use crate::memo::{MemoKey, MemoStore, MemoValue}; use super::{IndexStage, StageResult}; use crate::index::pipeline::{FailurePolicy, IndexContext, StageRetryConfig}; @@ -20,78 +22,48 @@ use crate::index::summary::{LlmSummaryGenerator, SummaryGenerator, SummaryStrate pub struct EnhanceStage { /// LLM client for summary generation. llm_client: Option>, + /// Memo store for caching LLM results. + memo_store: Option>, } impl EnhanceStage { /// Create a new enhance stage. pub fn new() -> Self { - Self { llm_client: None } + Self { + llm_client: None, + memo_store: None, + } } /// Create with LLM client. pub fn with_llm_client(client: LlmClient) -> Self { Self { llm_client: Some(Arc::new(client)), + memo_store: None, } } - /// Check if summary generation is needed. - fn needs_summaries(&self, ctx: &IndexContext) -> bool { - match &ctx.options.summary_strategy { - SummaryStrategy::None => false, - SummaryStrategy::Lazy { .. } => false, - _ => true, + /// Create with LLM client and memo store. + pub fn with_llm_and_memo(client: LlmClient, memo_store: MemoStore) -> Self { + Self { + llm_client: Some(Arc::new(client)), + memo_store: Some(Arc::new(memo_store)), } } - /// Generate summary for a single node. - async fn generate_node_summary( - tree: &mut DocumentTree, - node_id: NodeId, - generator: &LlmSummaryGenerator, - strategy: &SummaryStrategy, - metrics: &mut crate::index::IndexMetrics, - ) -> Result<()> { - let node = match tree.get(node_id) { - Some(n) => n.clone(), - None => return Ok(()), - }; - - // Skip if no content - if node.content.is_empty() { - return Ok(()); - } - - // Get token count - let token_count = node.token_count.unwrap_or(0); - let should_gen = strategy.should_generate(tree, node_id, token_count); - - // Check if we should generate - if !should_gen { - return Ok(()); - } + /// Set memo store for caching. + pub fn with_memo_store(mut self, store: MemoStore) -> Self { + self.memo_store = Some(Arc::new(store)); + self + } - // Generate summary - match generator.generate(&node.title, &node.content).await { - Ok(summary) => { - if summary.is_empty() { - warn!("Empty summary returned for node '{}'", node.title); - } else { - tree.set_summary(node_id, &summary); - info!( - "Generated summary for node: {} ({} chars)", - node.title, - summary.len() - ); - metrics.increment_summaries(); - } - } - Err(e) => { - warn!("Failed to generate summary for {}: {}", node.title, e); - } + /// Check if summary generation is needed based on strategy. + fn needs_summaries(&self, ctx: &IndexContext) -> bool { + match &ctx.options.summary_strategy { + SummaryStrategy::None => false, + SummaryStrategy::Lazy { .. } => false, // Generated on-demand at query time + SummaryStrategy::Full { .. } | SummaryStrategy::Selective { .. } => true, } - - Ok(()) } } @@ -120,7 +92,7 @@ impl IndexStage for EnhanceStage { FailurePolicy::retry_with( StageRetryConfig::new() .with_max_attempts(2) - .with_initial_delay(std::time::Duration::from_millis(500)), + .with_initial_delay(Duration::from_millis(500)), ) } @@ -156,37 +128,91 @@ impl IndexStage for EnhanceStage { info!("Using summary strategy: {:?}", ctx.options.summary_strategy); - // Create summary generator - let generator = LlmSummaryGenerator::new((*llm_client).as_ref().clone()) + // Create summary generator with optional memo store + let mut generator = LlmSummaryGenerator::new((*llm_client).as_ref().clone()) .with_max_tokens(ctx.options.indexer.max_summary_tokens); + // Attach memo store to generator if available + if let Some(store) = &self.memo_store { + generator = generator.with_memo_store((**store).clone()); + } + // Get all nodes to process let node_ids: Vec = tree.traverse(); let total_nodes = node_ids.len(); info!("Processing {} nodes for summary generation", total_nodes); - // Process nodes (with concurrency control) + // Process nodes let mut generated = 0; let mut failed = 0; let strategy = ctx.options.summary_strategy.clone(); for node_id in node_ids { - match Self::generate_node_summary( - tree, - node_id, - &generator, - &strategy, - &mut ctx.metrics, - ) - .await - { - Ok(()) => { + // Get node data (need to clone to avoid borrow issues) + let node = match tree.get(node_id) { + Some(n) => n.clone(), + None => continue, + }; + + // Skip if no content + if node.content.is_empty() { + continue; + } + + // Get token count and check if we should generate + let token_count = node.token_count.unwrap_or(0); + if !strategy.should_generate(tree, node_id, token_count) { + continue; + } + + // Check memo store first (additional check beyond generator) + let cached_summary = if let Some(store) = self.memo_store.as_deref() { + let content_fp = + Fingerprint::from_str(&format!("{}|{}", node.title, node.content)); + let memo_key = MemoKey::summary(&content_fp); + + store + .get(&memo_key) + .and_then(|cached| cached.as_summary().map(|s| s.to_string())) + } else { + None + }; + + if let Some(summary) = cached_summary { + if !summary.is_empty() { + tree.set_summary(node_id, &summary); + debug!( + "Using cached summary for node: {} ({} chars)", + node.title, + summary.len() + ); + ctx.metrics.increment_summaries(); generated += 1; + continue; + } + } + + // Generate summary (generator also has memoization built-in) + match generator.generate(&node.title, &node.content).await { + Ok(summary) => { + if summary.is_empty() { + warn!("Empty summary returned for node '{}'", node.title); + failed += 1; + } else { + tree.set_summary(node_id, &summary); + debug!( + "Generated summary for node: {} ({} chars)", + node.title, + summary.len() + ); + ctx.metrics.increment_summaries(); + generated += 1; + } } Err(e) => { + warn!("Failed to generate summary for {}: {}", node.title, e); failed += 1; - warn!("Failed to generate summary: {}", e); } } diff --git a/src/index/summary/strategy.rs b/src/index/summary/strategy.rs index eac0055c..91c0ff29 100644 --- a/src/index/summary/strategy.rs +++ b/src/index/summary/strategy.rs @@ -6,7 +6,9 @@ use async_trait::async_trait; use crate::document::{DocumentTree, NodeId}; +use crate::utils::fingerprint::Fingerprint; use crate::llm::{LlmClient, LlmResult}; +use crate::memo::{MemoKey, MemoStore, MemoValue}; /// Configuration for summary strategies. #[derive(Debug, Clone)] @@ -160,6 +162,8 @@ pub trait SummaryGenerator: Send + Sync { pub struct LlmSummaryGenerator { client: LlmClient, max_tokens: usize, + /// Optional memo store for caching results. + memo_store: Option, } impl LlmSummaryGenerator { @@ -168,6 +172,7 @@ impl LlmSummaryGenerator { Self { client, max_tokens: 200, + memo_store: None, } } @@ -176,11 +181,32 @@ impl LlmSummaryGenerator { self.max_tokens = max_tokens; self } + + /// Set memo store for caching. + pub fn with_memo_store(mut self, store: MemoStore) -> Self { + self.memo_store = Some(store); + self + } } #[async_trait] impl SummaryGenerator for LlmSummaryGenerator { async fn generate(&self, title: &str, content: &str) -> LlmResult { + // Compute content fingerprint for cache key + let content_fp = Fingerprint::from_str(&format!("{}|{}", title, content)); + let memo_key = MemoKey::summary(&content_fp); + + // Check memo store first + if let Some(ref store) = self.memo_store { + if let Some(cached) = store.get(&memo_key) { + if let Some(summary) = cached.as_summary() { + tracing::debug!("Memo cache hit for summary: {}", title); + return Ok(summary.to_string()); + } + } + } + + // Generate with LLM let system_prompt = "You are a document summarization assistant. \ Generate a concise summary (2-3 sentences) of the given section. \ Focus on the main topics and key information. \ @@ -188,8 +214,22 @@ impl SummaryGenerator for LlmSummaryGenerator { let user_prompt = format!("Title: {}\n\nContent:\n{}", title, content); - self.client + let summary = self.client .complete_with_max_tokens(&system_prompt, &user_prompt, self.max_tokens as u16) - .await + .await?; + + // Cache the result + if let Some(ref store) = self.memo_store { + // Estimate tokens saved (roughly: input + output tokens) + let tokens_saved = (title.len() + content.len() + summary.len()) / 4; + store.put_with_tokens( + memo_key, + MemoValue::Summary(summary.clone()), + tokens_saved as u64, + ); + tracing::debug!("Memo cache stored for summary: {}", title); + } + + Ok(summary) } } diff --git a/src/lib.rs b/src/lib.rs index 49ffaf99..642afb03 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,30 +28,114 @@ //! ## Architecture //! //! ```text -//! ┌─────────────────────────────────────────────────────────────────┐ -//! │ client │ -//! │ (Engine, EngineBuilder) │ -//! └────────────────────────────┬────────────────────────────────────┘ -//! │ -//! ┌──────────────────┼──────────────────┐ -//! ▼ ▼ ▼ -//! ┌──────────┐ ┌───────────┐ ┌──────────┐ -//! │ index │ │ retrieval │ │ storage │ -//! │ (write) │ │ (read) │ │ (persist)│ -//! └────┬─────┘ └─────┬─────┘ └────┬─────┘ -//! │ │ │ -//! └───────────┬───────┘ │ -//! ▼ │ -//! ┌───────────┐ │ -//! │ domain │ │ -//! │(Tree/Node)│ │ -//! └─────┬─────┘ │ -//! │ │ -//! ┌──────────────┼──────────────┐ │ -//! ▼ ▼ ▼ │ -//! ┌────────┐ ┌──────────┐ ┌────────┐ │ -//! │ parser │ │ llm │ │ config │◄─────┘ -//! └────────┘ └──────────┘ └────────┘ +//! ┌─────────────────────────────────────────────────┐ +//! │ USER │ +//! │ (Query / Index) │ +//! └────────────────────────┬────────────────────────┘ +//! │ +//! ▼ +//! ┌─────────────────────────────────────────────────────────────────────────────────┐ +//! │ CLIENT LAYER │ +//! │ ┌───────────────────────────────────────────────────────────────────────────┐ │ +//! │ │ Engine / EngineBuilder │ │ +//! │ │ (Unified API for Index + Query) │ │ +//! │ └───────────────────────────────────────────────────────────────────────────┘ │ +//! └─────────────────────────────────────────────────────────────────────────────────┘ +//! │ +//! ┌────────────────────────┴────────────────────────┐ +//! │ │ +//! ▼ ▼ +//! ┌──────────────────────────────────────────────┐ ┌──────────────────────────────────────────────┐ +//! │ INDEX PIPELINE │ │ RETRIEVAL ENGINE │ +//! │ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │ │ ┌─────────────────────────────────────┐ │ +//! │ │ Parse │─▶│ Build │─▶│ Enhance │ │ │ │ Pilot (LLM) │ │ +//! │ │ (Doc) │ │ (Tree) │ │ (Summaries) │ │ │ │ ┌───────────────────────┐ │ │ +//! │ └─────────┘ └────┬────┘ └──────┬──────┘ │ │ │ │ Navigation Agent │ │ │ +//! │ │ │ │ │ │ │ │ ┌─────┐ ┌─────────┐ │ │ │ +//! │ ▼ ▼ ▼ │ │ │ │ │Decide│▶│Traverse │ │ │ │ +//! │ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │ │ │ │ │Path │ │ Tree │ │ │ │ +//! │ │ Enrich │─▶│ Optimize│─▶│ Persist │ │ │ │ │ └─────┘ └─────────┘ │ │ │ +//! │ │(Meta) │ │ (Tree) │ │ (Storage) │ │ │ │ └───────────────────────┘ │ │ +//! │ └─────────┘ └─────────┘ └─────────────┘ │ │ └─────────────────────────────────────┘ │ +//! │ │ │ │ │ │ +//! │ │ ┌──────────────────────┐ │ │ ▼ │ +//! │ └────────▶│ Change Detector │◀─────┼───┤ ┌─────────────────────────────────────┐ │ +//! │ │ (Fingerprint-based) │ │ │ │ Context Assembler │ │ +//! │ └──────────────────────┘ │ │ │ ┌─────────┐ ┌─────────────────┐ │ │ +//! │ │ │ │ │ Pruning │ │ Token Budget │ │ │ +//! └──────────────────────────────────────────────┘ │ │ │Strategy │ │ Management │ │ │ +//! │ │ │ └─────────┘ └─────────────────┘ │ │ +//! │ │ └─────────────────────────────────────┘ │ +//! │ │ │ │ +//! ▼ │ ▼ │ +//! ┌──────────────────────────────────────────────────────────────────────────────────────────────┐ +//! │ DOMAIN LAYER (Core) │ +//! │ │ +//! │ ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐ │ +//! │ │ DocumentTree │ │ TreeNode │ │ NodeId │ │ +//! │ │ (Arena-based) │────▶│ - title │ │ (indextree) │ │ +//! │ │ │ │ - content │ │ │ │ +//! │ └───────────────────┘ │ - summary │ └───────────────────┘ │ +//! │ │ │ - depth │ │ │ +//! │ ▼ │ - token_count │ │ │ +//! │ ┌───────────────────┐ └───────────────────┘ │ │ +//! │ │ TocView │ │ │ │ +//! │ │ (Table of │ │ │ │ +//! │ │ Contents) │ │ │ │ +//! │ └───────────────────┘ │ │ │ +//! └──────────────────────────────────────────────────────────────────────────────────────────────┘ +//! │ │ +//! ┌─────────────────┴─────────────────────────┴─────────────────┐ +//! │ │ +//! ▼ ▼ +//! ┌─────────────────────────────────────────┐ ┌─────────────────────────────────────────────────┐ +//! │ SUPPORT LAYER │ │ STORAGE LAYER │ +//! │ │ │ │ +//! │ ┌─────────────┐ ┌──────────────────┐ │ │ ┌────────────────┐ ┌─────────────────────┐ │ +//! │ │ LLM │ │ Parser │ │ │ │ Workspace │ │ MemoStore │ │ +//! │ │ (OpenAI) │ │ - Markdown │ │ │ │ (Persistence) │ │ (LLM Cache) │ │ +//! │ │ │ │ - PDF │ │ │ │ │ │ - LRU Eviction │ │ +//! │ │ ┌─────────┐ │ │ - DOCX │ │ │ │ ┌────────────┐ │ │ - TTL Expiration │ │ +//! │ │ │ Pool │ │ │ │ │ │ │ │ LRU │ │ │ - Disk Persist │ │ +//! │ │ │ Retry │ │ └──────────────────┘ │ │ │ │ Cache │ │ │ │ │ +//! │ │ │ Fallback│ │ │ │ │ └────────────┘ │ └─────────────────────┘ │ +//! │ │ └─────────┘ │ ┌──────────────────┐ │ │ │ │ │ +//! │ └─────────────┘ │ Fingerprint │ │ │ │ ┌────────────┐ │ ┌─────────────────────┐ │ +//! │ │ (BLAKE2b) │ │ │ │ │ Atomic │ │ │ ChangeDetector │ │ +//! │ ┌─────────────┐ │ │ │ │ │ │ Writes │ │ │ (Incremental) │ │ +//! │ │ Config │ │ ┌──────────────┐ │ │ │ │ └────────────┘ │ │ │ │ +//! │ │ Loader │ │ │ Content FP │ │ │ │ │ │ │ ┌─────────────────┐ │ │ +//! │ │ │ │ │ Subtree FP │ │ │ │ └────────────────┘ │ │ Processing Ver │ │ │ +//! │ └─────────────┘ │ │ Node FP │ │ │ │ │ └─────────────────┘ │ │ +//! │ │ └──────────────┘ │ │ │ └─────────────────────┘ │ +//! │ ┌─────────────┐ └──────────────────┘ │ │ │ +//! │ │ Throttle │ │ │ ┌────────────────────────────────────────────┐ │ +//! │ │ (Rate Limit)│ ┌──────────────────┐ │ │ │ DocumentMeta │ │ +//! │ └─────────────┘ │ Throttle │ │ │ │ - content_fingerprint │ │ +//! │ │ (Concurrency) │ │ │ │ - processing_version │ │ +//! │ └──────────────────┘ │ │ │ - node_count, total_summary_tokens │ │ +//! │ │ │ └────────────────────────────────────────────┘ │ +//! └─────────────────────────────────────────┘ └─────────────────────────────────────────────────┘ +//! ``` +//! +//! ## Data Flow +//! +//! ### Indexing Flow +//! ```text +//! Document ──▶ Parse ──▶ Build Tree ──▶ Generate Summaries ──▶ Detect Changes ──▶ Persist +//! │ │ │ +//! │ └──▶ MemoStore ◀───────┘ +//! │ (Cache) +//! └──▶ Fingerprint ──▶ ChangeDetector +//! ``` +//! +//! ### Query Flow +//! ```text +//! Query ──▶ Pilot Agent ──▶ Navigate Tree ──▶ Assemble Context ──▶ Return Result +//! │ │ │ +//! └──▶ LLM ◀────────┘ │ +//! (Decide) │ +//! └──▶ MemoStore (Cached Summaries) //! ``` //! //! ## Features @@ -62,6 +146,8 @@ //! - 📄 **Multi-Format** — Markdown, PDF, DOCX support //! - 💾 **Persistent Workspace** — LRU-cached storage with lazy loading //! - 🔄 **Retry & Fallback** — Resilient LLM calls with automatic recovery +//! - 🔍 **Incremental Updates** — Fingerprint-based change detection +//! - ⚡ **LLM Memoization** — Cache summaries and decisions to reduce costs //! //! ## Quick Start //! @@ -93,14 +179,16 @@ //! | Module | Description | //! |--------|-------------| //! | [`client`] | High-level API (`Engine`, `EngineBuilder`) | -//! | [`domain`] | Core domain types (`DocumentTree`, `TreeNode`, `NodeId`) | -//! | [`index`] | Document indexing pipeline | -//! | [`retrieval`] | Retrieval strategies and search algorithms | +//! | [`document`] | Core domain types (`DocumentTree`, `TreeNode`, `NodeId`) | +//! | [`index`] | Document indexing pipeline with incremental updates | +//! | [`retrieval`] | Retrieval strategies and LLM-based navigation | //! | [`config`] | Configuration management | //! | [`llm`] | LLM client with retry & fallback | //! | [`parser`] | Document parsers (Markdown, PDF, DOCX) | -//! | [`storage`] | Workspace persistence | -//! | [`throttle`] | Rate limiting | +//! | [`storage`] | Workspace persistence with LRU caching | +//! | [`throttle`] | Rate limiting and concurrency control | +//! | [`fingerprint`] | Content and subtree fingerprinting | +//! | [`memo`] | LLM result memoization and caching | // ============================================================================= // Modules @@ -112,12 +200,13 @@ pub mod document; pub mod error; pub mod index; pub mod llm; +pub mod memo; pub mod metrics; pub mod parser; pub mod retrieval; pub mod storage; pub mod throttle; -pub mod util; +pub mod utils; // ============================================================================= // Re-exports (Convenience API) @@ -139,7 +228,7 @@ pub use document::{ }; // Utility functions -pub use util::{estimate_tokens, estimate_tokens_fast}; +pub use utils::{estimate_tokens, estimate_tokens_fast}; // Configuration pub use config::{Config, ConfigLoader, RetrievalConfig, SummaryConfig}; @@ -174,3 +263,6 @@ pub use storage::{DocumentMeta as StorageDocumentMeta, PersistedDocument, Worksp // Throttle pub use throttle::{ConcurrencyConfig, ConcurrencyController, RateLimiter}; + +// Memo +pub use memo::{MemoEntry, MemoKey, MemoOpType, MemoStats, MemoStore, MemoValue}; diff --git a/src/memo/mod.rs b/src/memo/mod.rs new file mode 100644 index 00000000..75d754c2 --- /dev/null +++ b/src/memo/mod.rs @@ -0,0 +1,35 @@ +// Copyright (c) 2026 vectorless developers +// SPDX-License-Identifier: Apache-2.0 + +//! LLM Memoization system for caching expensive LLM calls. +//! +//! This module provides a caching layer for LLM-generated content, +//! enabling significant cost savings by avoiding redundant API calls. +//! +//! # Key Features +//! +//! - **Operation-based caching**: Cache summaries, pilot decisions, query results +//! - **Content-addressed**: Keys are based on content fingerprints +//! - **TTL support**: Optional time-to-live for cache entries +//! - **Persistence**: Save/load cache to disk for cross-session reuse +//! +//! # Usage +//! +//! ```rust,ignore +//! use vectorless::memo::{MemoStore, MemoKey, MemoOpType}; +//! +//! // Create a memo store +//! let mut store = MemoStore::new(1000); +//! +//! // Get or compute a summary +//! let key = MemoKey::summary(&node_fingerprint); +//! let summary = store.get_or_compute(key, || async { +//! llm_client.generate_summary(node).await +//! }).await?; +//! ``` + +mod store; +mod types; + +pub use store::MemoStore; +pub use types::{MemoEntry, MemoKey, MemoOpType, MemoStats, MemoValue, PilotDecisionValue}; diff --git a/src/memo/store.rs b/src/memo/store.rs new file mode 100644 index 00000000..913392fa --- /dev/null +++ b/src/memo/store.rs @@ -0,0 +1,715 @@ +// Copyright (c) 2026 vectorless developers +// SPDX-License-Identifier: Apache-2.0 + +//! Memoization store implementation. +//! +//! Provides an in-memory LRU cache with optional disk persistence. + +use std::collections::HashMap; +use std::future::Future; +use std::num::NonZeroUsize; +use std::path::Path; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use chrono::{Duration, Utc}; +use lru::LruCache; +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock as AsyncRwLock; +use tracing::{debug, info, warn}; + +use super::types::{MemoEntry, MemoKey, MemoOpType, MemoStats, MemoValue}; +use crate::error::Result; +use crate::utils::fingerprint::Fingerprint; + +/// Default TTL for cache entries (7 days). +const DEFAULT_TTL: Duration = Duration::days(7); + +/// Default maximum cache size. +const DEFAULT_MAX_SIZE: usize = 10_000; + +/// Serializable format for memo store persistence. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct MemoStoreData { + /// Format version. + version: u32, + + /// Cache entries. + entries: HashMap, + + /// Statistics. + stats: MemoStats, +} + +/// Atomic statistics for lock-free access. +#[derive(Debug, Default)] +struct AtomicStats { + hits: AtomicU64, + misses: AtomicU64, + tokens_saved: AtomicU64, +} + +impl AtomicStats { + fn new() -> Self { + Self { + hits: AtomicU64::new(0), + misses: AtomicU64::new(0), + tokens_saved: AtomicU64::new(0), + } + } + + fn record_hit(&self) { + self.hits.fetch_add(1, Ordering::Relaxed); + } + + fn record_miss(&self) { + self.misses.fetch_add(1, Ordering::Relaxed); + } + + fn add_tokens_saved(&self, tokens: u64) { + self.tokens_saved.fetch_add(tokens, Ordering::Relaxed); + } + + fn snapshot(&self) -> (u64, u64, u64) { + ( + self.hits.load(Ordering::Relaxed), + self.misses.load(Ordering::Relaxed), + self.tokens_saved.load(Ordering::Relaxed), + ) + } +} + +/// LLM Memoization store. +/// +/// Provides caching for expensive LLM operations with: +/// - LRU eviction policy +/// - TTL-based expiration +/// - Optional disk persistence +/// - Thread-safe access +/// +/// # Example +/// +/// ```rust,ignore +/// let store = MemoStore::new(1000); +/// +/// let summary = store.get_or_compute( +/// MemoKey::summary(&content_fp), +/// || async { +/// llm.generate_summary(content).await +/// } +/// ).await?; +/// ``` +pub struct MemoStore { + /// LRU cache for entries. + cache: Arc>>, + + /// Statistics (async for safe updates). + stats: Arc>, + + /// TTL for entries. + ttl: Duration, + + /// Model identifier for cache keys. + model_id: Option, + + /// Version for cache invalidation. + version: u32, +} + +impl std::fmt::Debug for MemoStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MemoStore") + .field("ttl", &self.ttl) + .field("model_id", &self.model_id) + .field("version", &self.version) + .field("cache_len", &self.cache.read().len()) + .finish() + } +} + +impl Clone for MemoStore { + fn clone(&self) -> Self { + Self { + cache: Arc::clone(&self.cache), + stats: Arc::clone(&self.stats), + ttl: self.ttl, + model_id: self.model_id.clone(), + version: self.version, + } + } +} + +impl MemoStore { + /// Create a new memo store with default size. + pub fn new() -> Self { + Self::with_capacity(DEFAULT_MAX_SIZE) + } + + /// Create a new memo store with specified capacity. + pub fn with_capacity(capacity: usize) -> Self { + Self { + cache: Arc::new(RwLock::new(LruCache::new( + std::num::NonZeroUsize::new(capacity).unwrap_or(std::num::NonZeroUsize::new(1000).unwrap()), + ))), + stats: Arc::new(AsyncRwLock::new(MemoStats::default())), + ttl: DEFAULT_TTL, + model_id: None, + version: 1, + } + } + + /// Set the TTL for cache entries. + pub fn with_ttl(mut self, ttl: Duration) -> Self { + self.ttl = ttl; + self + } + + /// Set the model identifier. + pub fn with_model(mut self, model_id: &str) -> Self { + self.model_id = Some(model_id.to_string()); + self + } + + /// Set the version. + pub fn with_version(mut self, version: u32) -> Self { + self.version = version; + self + } + + /// Get a cached value if present and not expired. + pub fn get(&self, key: &MemoKey) -> Option { + let full_key = self.make_key(key); + let mut cache = self.cache.write(); + + if let Some(entry) = cache.get_mut(&full_key) { + // Check TTL + if entry.is_expired(self.ttl) { + cache.pop(&full_key); + return None; + } + + // Record hit + entry.record_hit(); + debug!("Memo cache hit for {:?}", key.op_type); + return Some(entry.value.clone()); + } + + None + } + + /// Put a value in the cache. + pub fn put(&self, key: MemoKey, value: MemoValue) { + self.put_with_tokens(key, value, 0); + } + + /// Put a value in the cache with token count. + pub fn put_with_tokens(&self, key: MemoKey, value: MemoValue, tokens_saved: u64) { + let full_key = self.make_key(&key); + let entry = MemoEntry::with_tokens(value, tokens_saved); + + let mut cache = self.cache.write(); + cache.put(full_key, entry); + + debug!("Memo cache put for {:?}", key.op_type); + } + + /// Get a value or compute it if not present. + /// + /// This is the primary method for using the memo store. + /// It will return the cached value if present, or call the + /// provided compute function and cache the result. + pub async fn get_or_compute( + &self, + key: MemoKey, + compute: F, + ) -> Result + where + F: FnOnce() -> Fut, + Fut: Future>, // (value, tokens) + { + // Check cache first (synchronous) + if let Some(value) = self.get(&key) { + // Update stats + let mut stats = self.stats.write().await; + stats.hits += 1; + return Ok(value); + } + + // Record miss + { + let mut stats = self.stats.write().await; + stats.misses += 1; + } + + // Compute + let (value, tokens) = compute().await?; + + // Cache result + self.put_with_tokens(key.clone(), value.clone(), tokens); + + // Update stats + { + let mut stats = self.stats.write().await; + stats.tokens_saved += tokens; + } + + Ok(value) + } + + /// Check if a key exists in the cache. + pub fn contains(&self, key: &MemoKey) -> bool { + let full_key = self.make_key(key); + let cache = self.cache.read(); + cache.contains(&full_key) + } + + /// Remove a key from the cache. + pub fn remove(&self, key: &MemoKey) -> Option { + let full_key = self.make_key(key); + let mut cache = self.cache.write(); + cache.pop(&full_key).map(|e| e.value) + } + + /// Clear all entries from the cache. + pub fn clear(&self) { + let mut cache = self.cache.write(); + cache.clear(); + debug!("Memo cache cleared"); + } + + /// Get the number of entries in the cache. + pub fn len(&self) -> usize { + let cache = self.cache.read(); + cache.len() + } + + /// Check if the cache is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Get cache statistics. + pub async fn stats(&self) -> MemoStats { + let stats = self.stats.read().await; + let mut result = stats.clone(); + result.entries = self.len(); + result + } + + /// Get cache statistics synchronously. + /// + /// This acquires a read lock on the stats, which is generally fast. + /// Use this when you need stats without async context. + pub fn stats_snapshot(&self) -> MemoStats { + // Use try_read to avoid blocking; fall back to defaults if locked + match self.stats.try_read() { + Ok(stats) => { + let mut result = stats.clone(); + result.entries = self.len(); + result + } + Err(_) => MemoStats { + entries: self.len(), + ..Default::default() + }, + } + } + + /// Invalidate all entries of a specific operation type. + /// + /// This is useful for batch invalidation when the algorithm for + /// a specific operation type changes. + /// + /// # Example + /// + /// ```rust,ignore + /// // Invalidate all pilot decision caches + /// let removed = store.invalidate_by_op_type(MemoOpType::PilotDecision); + /// println!("Removed {} cached pilot decisions", removed); + /// ``` + pub fn invalidate_by_op_type(&self, op_type: MemoOpType) -> usize { + let mut cache = self.cache.write(); + let before = cache.len(); + + // Collect keys to remove based on entry value type + let keys_to_remove: Vec = cache + .iter() + .filter_map(|(key, entry)| { + let matches = match (&entry.value, op_type) { + (MemoValue::Summary(_), MemoOpType::Summary) => true, + (MemoValue::PilotDecision(_), MemoOpType::PilotDecision) => true, + (MemoValue::QueryAnalysis(_), MemoOpType::QueryAnalysis) => true, + (MemoValue::Extraction(_), MemoOpType::Extraction) => true, + _ => false, + }; + if matches { Some(key.clone()) } else { None } + }) + .collect(); + + // Remove entries + for key in keys_to_remove { + cache.pop(&key); + } + + let removed = before - cache.len(); + if removed > 0 { + debug!("Invalidated {} entries for op_type {:?}", removed, op_type); + } + removed + } + + /// Invalidate all entries matching a model ID prefix. + /// + /// This is useful when switching models or when a model's behavior changes. + /// + /// # Example + /// + /// ```rust,ignore + /// // Invalidate all GPT-4 caches + /// let removed = store.invalidate_by_model_prefix("gpt-4"); + /// ``` + pub fn invalidate_by_model_prefix(&self, prefix: &str) -> usize { + let mut cache = self.cache.write(); + let before = cache.len(); + + // Since the key is a fingerprint, we need to check model_id from entries + // For now, we'll clear all entries if prefix matches our model_id + // A better approach would be to store model_id in entry metadata + let should_clear = self.model_id.as_ref() + .map(|m| m.starts_with(prefix)) + .unwrap_or(false); + + if should_clear { + cache.clear(); + let removed = before; + debug!("Invalidated all {} entries (model prefix '{}')", removed, prefix); + return removed; + } + + 0 + } + + /// Remove expired entries. + pub fn prune_expired(&self) -> usize { + let mut cache = self.cache.write(); + let before = cache.len(); + + // Collect expired keys + let expired: Vec = cache + .iter() + .filter(|(_, entry)| entry.is_expired(self.ttl)) + .map(|(k, _)| k.clone()) + .collect(); + + // Remove expired entries + for key in expired { + cache.pop(&key); + } + + let removed = before - cache.len(); + if removed > 0 { + debug!("Pruned {} expired memo entries", removed); + } + removed + } + + /// Save the cache to disk. + pub async fn save(&self, path: &Path) -> Result<()> { + let cache = self.cache.read(); + let stats = self.stats.read().await; + + let entries: HashMap = cache + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + let data = MemoStoreData { + version: 1, + entries, + stats: stats.clone(), + }; + + let parent = path.parent().ok_or_else(|| { + crate::Error::Parse("Invalid path for memo store".to_string()) + })?; + tokio::fs::create_dir_all(parent).await?; + + let temp_path = path.with_extension("tmp"); + let json = serde_json::to_vec_pretty(&data) + .map_err(|e| crate::Error::Parse(format!("Failed to serialize memo store: {}", e)))?; + tokio::fs::write(&temp_path, &json).await?; + tokio::fs::rename(&temp_path, path).await?; + + info!("Saved memo store with {} entries to {:?}", data.entries.len(), path); + Ok(()) + } + + /// Load the cache from disk. + pub async fn load(&self, path: &Path) -> Result<()> { + if !path.exists() { + return Ok(()); + } + + let bytes = tokio::fs::read(path).await?; + let data: MemoStoreData = serde_json::from_slice(&bytes) + .map_err(|e| crate::Error::Parse(format!("Failed to deserialize memo store: {}", e)))?; + + let mut cache = self.cache.write(); + let mut stats = self.stats.write().await; + + for (key, entry) in data.entries { + // Skip expired entries + if !entry.is_expired(self.ttl) { + cache.put(key, entry); + } + } + + stats.entries = cache.len(); + stats.hits = data.stats.hits; + stats.misses = data.stats.misses; + stats.tokens_saved = data.stats.tokens_saved; + stats.cost_saved = data.stats.cost_saved; + + info!("Loaded memo store with {} entries from {:?}", cache.len(), path); + Ok(()) + } + + /// Make a full cache key from a MemoKey. + fn make_key(&self, key: &MemoKey) -> String { + // Include model_id and version in the key + let mut key_with_context = key.clone(); + if key_with_context.model_id.is_none() { + key_with_context.model_id = self.model_id.clone(); + } + if key_with_context.version == 0 { + key_with_context.version = self.version; + } + key_with_context.fingerprint().to_string() + } +} + +impl Default for MemoStore { + fn default() -> Self { + Self::new() + } +} + +/// A helper for building memo keys with context. +pub struct MemoKeyBuilder { + model_id: Option, + version: u32, +} + +impl MemoKeyBuilder { + /// Create a new key builder. + pub fn new() -> Self { + Self { + model_id: None, + version: 1, + } + } + + /// Set the model identifier. + pub fn with_model(mut self, model_id: &str) -> Self { + self.model_id = Some(model_id.to_string()); + self + } + + /// Set the version. + pub fn with_version(mut self, version: u32) -> Self { + self.version = version; + self + } + + /// Build a summary key. + pub fn summary_key(&self, content_fp: &Fingerprint) -> MemoKey { + MemoKey { + op_type: super::types::MemoOpType::Summary, + input_fp: *content_fp, + model_id: self.model_id.clone(), + version: self.version, + context_fp: Fingerprint::zero(), + } + } + + /// Build a pilot decision key. + pub fn pilot_key(&self, context_fp: &Fingerprint, query_fp: &Fingerprint) -> MemoKey { + MemoKey { + op_type: super::types::MemoOpType::PilotDecision, + input_fp: *query_fp, + model_id: self.model_id.clone(), + version: self.version, + context_fp: *context_fp, + } + } + + /// Build a query analysis key. + pub fn query_analysis_key(&self, query_fp: &Fingerprint) -> MemoKey { + MemoKey { + op_type: super::types::MemoOpType::QueryAnalysis, + input_fp: *query_fp, + model_id: self.model_id.clone(), + version: self.version, + context_fp: Fingerprint::zero(), + } + } + + /// Build an extraction key. + pub fn extraction_key(&self, content_fp: &Fingerprint) -> MemoKey { + MemoKey { + op_type: super::types::MemoOpType::Extraction, + input_fp: *content_fp, + model_id: self.model_id.clone(), + version: self.version, + context_fp: Fingerprint::zero(), + } + } +} + +impl Default for MemoKeyBuilder { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn make_test_key() -> MemoKey { + let fp = Fingerprint::from_str("test content"); + MemoKey::summary(&fp) + } + + #[test] + fn test_memo_store_basic() { + let store = MemoStore::new(); + let key = make_test_key(); + + assert!(!store.contains(&key)); + + store.put(key.clone(), MemoValue::Summary("Test summary".to_string())); + + assert!(store.contains(&key)); + + let value = store.get(&key); + assert!(value.is_some()); + assert_eq!(value.unwrap().as_summary(), Some("Test summary")); + } + + #[test] + fn test_memo_store_lru_eviction() { + let store = MemoStore::with_capacity(3); + + for i in 0..5 { + let fp = Fingerprint::from_str(&format!("content {}", i)); + let key = MemoKey::summary(&fp); + store.put(key, MemoValue::Summary(format!("Summary {}", i))); + } + + // Only 3 entries should remain + assert_eq!(store.len(), 3); + } + + #[tokio::test] + async fn test_memo_store_get_or_compute() { + let store = MemoStore::new(); + let key = make_test_key(); + + let call_count = Arc::new(std::sync::atomic::AtomicU64::new(0)); + let count_clone = call_count.clone(); + + // First call should compute + let result = store + .get_or_compute(key.clone(), || { + let c = count_clone.clone(); + async move { + c.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok((MemoValue::Summary("Computed".to_string()), 100)) + } + }) + .await + .unwrap(); + + assert_eq!(result.as_summary(), Some("Computed")); + assert_eq!(call_count.load(std::sync::atomic::Ordering::SeqCst), 1); + + // Second call should use cache + let result2 = store + .get_or_compute(key.clone(), || { + let c = count_clone.clone(); + async move { + c.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok((MemoValue::Summary("Should not be called".to_string()), 100)) + } + }) + .await + .unwrap(); + + assert_eq!(result2.as_summary(), Some("Computed")); + assert_eq!(call_count.load(std::sync::atomic::Ordering::SeqCst), 1); // Still 1 + } + + #[tokio::test] + async fn test_memo_store_persistence() { + let temp = TempDir::new().unwrap(); + let path = temp.path().join("memo.json"); + + let store = MemoStore::new(); + let key = make_test_key(); + + store.put_with_tokens(key.clone(), MemoValue::Summary("Test summary".to_string()), 100); + + // Save + store.save(&path).await.unwrap(); + assert!(path.exists()); + + // Load into new store + let store2 = MemoStore::new(); + store2.load(&path).await.unwrap(); + + assert!(store2.contains(&key)); + let value = store2.get(&key); + assert_eq!(value.unwrap().as_summary(), Some("Test summary")); + } + + #[tokio::test] + async fn test_memo_store_stats() { + let store = MemoStore::new(); + let key = make_test_key(); + + // Miss + store + .get_or_compute(key.clone(), || async { + Ok((MemoValue::Summary("Test".to_string()), 100)) + }) + .await + .unwrap(); + + // Hit via get_or_compute (this updates global stats) + store + .get_or_compute(key.clone(), || async { + Ok((MemoValue::Summary("Should not be called".to_string()), 0)) + }) + .await + .unwrap(); + + let stats = store.stats().await; + assert_eq!(stats.misses, 1); + assert_eq!(stats.hits, 1); + assert_eq!(stats.tokens_saved, 100); + } + + #[test] + fn test_memo_key_builder() { + let builder = MemoKeyBuilder::new().with_model("gpt-4").with_version(2); + + let fp = Fingerprint::from_str("content"); + let key = builder.summary_key(&fp); + + assert_eq!(key.model_id, Some("gpt-4".to_string())); + assert_eq!(key.version, 2); + } +} diff --git a/src/memo/types.rs b/src/memo/types.rs new file mode 100644 index 00000000..0ed92400 --- /dev/null +++ b/src/memo/types.rs @@ -0,0 +1,398 @@ +// Copyright (c) 2026 vectorless developers +// SPDX-License-Identifier: Apache-2.0 + +//! Types for the memoization system. + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +use crate::utils::fingerprint::Fingerprint; + +/// Types of operations that can be memoized. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum MemoOpType { + /// Node summary generation. + Summary, + + /// Pilot navigation decision. + PilotDecision, + + /// Query analysis result. + QueryAnalysis, + + /// Content extraction result. + Extraction, + + /// Custom operation type. + Custom(u8), +} + +impl MemoOpType { + /// Get a unique byte identifier for this operation type. + pub fn as_byte(&self) -> u8 { + match self { + MemoOpType::Summary => 0, + MemoOpType::PilotDecision => 1, + MemoOpType::QueryAnalysis => 2, + MemoOpType::Extraction => 3, + MemoOpType::Custom(n) => 100 + n, + } + } +} + +/// Key for memoization lookup. +/// +/// Keys are content-addressed using fingerprints, ensuring that +/// cache hits only occur when the input is semantically identical. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct MemoKey { + /// Type of operation being memoized. + pub op_type: MemoOpType, + + /// Fingerprint of the input content. + pub input_fp: Fingerprint, + + /// Optional model identifier for cache invalidation when model changes. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub model_id: Option, + + /// Optional version for cache invalidation when algorithm changes. + #[serde(default)] + pub version: u32, + + /// Additional context fingerprint (e.g., query context for pilot decisions). + #[serde(default, skip_serializing_if = "Fingerprint::is_zero")] + pub context_fp: Fingerprint, +} + +impl MemoKey { + /// Create a key for summary generation. + pub fn summary(content_fp: &Fingerprint) -> Self { + Self { + op_type: MemoOpType::Summary, + input_fp: *content_fp, + model_id: None, + version: 1, + context_fp: Fingerprint::zero(), + } + } + + /// Create a key for summary generation with model and version. + pub fn summary_with_model(content_fp: &Fingerprint, model_id: &str, version: u32) -> Self { + Self { + op_type: MemoOpType::Summary, + input_fp: *content_fp, + model_id: Some(model_id.to_string()), + version, + context_fp: Fingerprint::zero(), + } + } + + /// Create a key for pilot decision. + pub fn pilot_decision(context_fp: &Fingerprint, query_fp: &Fingerprint) -> Self { + Self { + op_type: MemoOpType::PilotDecision, + input_fp: *query_fp, + model_id: None, + version: 1, + context_fp: *context_fp, + } + } + + /// Create a key for query analysis. + pub fn query_analysis(query_fp: &Fingerprint) -> Self { + Self { + op_type: MemoOpType::QueryAnalysis, + input_fp: *query_fp, + model_id: None, + version: 1, + context_fp: Fingerprint::zero(), + } + } + + /// Create a key for content extraction. + pub fn extraction(content_fp: &Fingerprint) -> Self { + Self { + op_type: MemoOpType::Extraction, + input_fp: *content_fp, + model_id: None, + version: 1, + context_fp: Fingerprint::zero(), + } + } + + /// Set the model identifier. + pub fn with_model(mut self, model_id: &str) -> Self { + self.model_id = Some(model_id.to_string()); + self + } + + /// Set the version. + pub fn with_version(mut self, version: u32) -> Self { + self.version = version; + self + } + + /// Set the context fingerprint. + pub fn with_context(mut self, context_fp: &Fingerprint) -> Self { + self.context_fp = *context_fp; + self + } + + /// Compute a fingerprint of this key for storage. + pub fn fingerprint(&self) -> Fingerprint { + use crate::utils::fingerprint::Fingerprinter; + + let mut fp = Fingerprinter::new(); + fp.write_u64(self.op_type.as_byte() as u64); + fp.write_fingerprint(&self.input_fp); + fp.write_option_str(self.model_id.as_deref()); + fp.write_u64(self.version as u64); + if !self.context_fp.is_zero() { + fp.write_fingerprint(&self.context_fp); + } + fp.into_fingerprint() + } +} + +/// Cached value from an LLM operation. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum MemoValue { + /// Generated summary text. + Summary(String), + + /// Pilot navigation decision. + PilotDecision(PilotDecisionValue), + + /// Query analysis result. + QueryAnalysis(QueryAnalysisValue), + + /// Extracted content. + Extraction(serde_json::Value), + + /// Raw text (for custom operations). + Text(String), + + /// JSON value (for structured outputs). + Json(serde_json::Value), +} + +/// Serializable pilot decision value. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PilotDecisionValue { + /// Selected candidate index. + pub selected_idx: usize, + + /// Confidence score (0.0 to 1.0). + pub confidence: f32, + + /// Reasoning text. + pub reasoning: String, +} + +/// Serializable query analysis value. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct QueryAnalysisValue { + /// Query complexity score. + pub complexity: f32, + + /// Detected intent. + pub intent: String, + + /// Suggested strategy. + pub strategy: String, +} + +impl MemoValue { + /// Get the value as a string summary. + pub fn as_summary(&self) -> Option<&str> { + match self { + MemoValue::Summary(s) => Some(s), + _ => None, + } + } + + /// Get the value as text. + pub fn as_text(&self) -> Option<&str> { + match self { + MemoValue::Text(s) => Some(s), + MemoValue::Summary(s) => Some(s), + _ => None, + } + } + + /// Check if this is a summary value. + pub fn is_summary(&self) -> bool { + matches!(self, MemoValue::Summary(_)) + } +} + +/// A cached entry in the memo store. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MemoEntry { + /// The cached value. + pub value: MemoValue, + + /// When this entry was created. + pub created_at: DateTime, + + /// When this entry was last accessed. + pub last_accessed: DateTime, + + /// Number of cache hits. + pub hits: u64, + + /// Token cost saved by this cache entry. + pub tokens_saved: u64, +} + +impl MemoEntry { + /// Create a new entry. + pub fn new(value: MemoValue) -> Self { + let now = Utc::now(); + Self { + value, + created_at: now, + last_accessed: now, + hits: 0, + tokens_saved: 0, + } + } + + /// Create a new entry with token count. + pub fn with_tokens(value: MemoValue, tokens_saved: u64) -> Self { + Self { + tokens_saved, + ..Self::new(value) + } + } + + /// Record a cache hit. + pub fn record_hit(&mut self) { + self.hits += 1; + self.last_accessed = Utc::now(); + } + + /// Check if this entry has expired. + pub fn is_expired(&self, ttl: chrono::Duration) -> bool { + let now = Utc::now(); + now - self.created_at > ttl + } + + /// Get the age of this entry. + pub fn age(&self) -> chrono::Duration { + Utc::now() - self.created_at + } +} + +/// Statistics for the memo store. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct MemoStats { + /// Total number of cache entries. + pub entries: usize, + + /// Total cache hits. + pub hits: u64, + + /// Total cache misses. + pub misses: u64, + + /// Total tokens saved by cache hits. + pub tokens_saved: u64, + + /// Estimated cost saved (in USD). + pub cost_saved: f64, +} + +impl MemoStats { + /// Calculate the cache hit rate. + pub fn hit_rate(&self) -> f64 { + let total = self.hits + self.misses; + if total == 0 { + 0.0 + } else { + self.hits as f64 / total as f64 + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_memo_key_summary() { + let fp = Fingerprint::from_str("test content"); + let key = MemoKey::summary(&fp); + + assert_eq!(key.op_type, MemoOpType::Summary); + assert_eq!(key.input_fp, fp); + assert!(key.model_id.is_none()); + } + + #[test] + fn test_memo_key_with_model() { + let fp = Fingerprint::from_str("test content"); + let key = MemoKey::summary(&fp).with_model("gpt-4").with_version(2); + + assert_eq!(key.model_id, Some("gpt-4".to_string())); + assert_eq!(key.version, 2); + } + + #[test] + fn test_memo_key_fingerprint() { + let fp = Fingerprint::from_str("test content"); + let key1 = MemoKey::summary(&fp); + let key2 = MemoKey::summary(&fp); + + assert_eq!(key1.fingerprint(), key2.fingerprint()); + + let key3 = MemoKey::summary_with_model(&fp, "gpt-4", 1); + assert_ne!(key1.fingerprint(), key3.fingerprint()); + } + + #[test] + fn test_memo_entry() { + let entry = MemoEntry::new(MemoValue::Summary("Test summary".to_string())); + + assert_eq!(entry.hits, 0); + assert!(entry.value.as_summary().is_some()); + } + + #[test] + fn test_memo_entry_hit() { + let mut entry = MemoEntry::new(MemoValue::Summary("Test summary".to_string())); + entry.record_hit(); + entry.record_hit(); + + assert_eq!(entry.hits, 2); + } + + #[test] + fn test_memo_stats_hit_rate() { + let mut stats = MemoStats::default(); + stats.hits = 80; + stats.misses = 20; + + assert!((stats.hit_rate() - 0.8).abs() < 0.001); + } + + #[test] + fn test_memo_key_serialization() { + let fp = Fingerprint::from_str("test content"); + let key = MemoKey::summary_with_model(&fp, "gpt-4", 1); + + let json = serde_json::to_string(&key).unwrap(); + let decoded: MemoKey = serde_json::from_str(&json).unwrap(); + + assert_eq!(key, decoded); + } + + #[test] + fn test_memo_value_serialization() { + let value = MemoValue::Summary("Test summary".to_string()); + let json = serde_json::to_string(&value).unwrap(); + let decoded: MemoValue = serde_json::from_str(&json).unwrap(); + assert_eq!(value.as_summary(), decoded.as_summary()); + } +} diff --git a/src/parser/html/parser.rs b/src/parser/html/parser.rs index 331a2b1b..a0c81e5c 100644 --- a/src/parser/html/parser.rs +++ b/src/parser/html/parser.rs @@ -9,7 +9,7 @@ use std::path::Path; use crate::error::Result; use crate::parser::{DocumentFormat, DocumentMeta, DocumentParser, ParseResult, RawNode}; -use crate::util::estimate_tokens; +use crate::utils::estimate_tokens; use super::config::HtmlConfig; diff --git a/src/parser/markdown/parser.rs b/src/parser/markdown/parser.rs index 7d0041a0..bb0fe3cb 100644 --- a/src/parser/markdown/parser.rs +++ b/src/parser/markdown/parser.rs @@ -9,7 +9,7 @@ use std::path::Path; use crate::error::Result; use crate::parser::{DocumentFormat, DocumentMeta, DocumentParser, ParseResult, RawNode}; -use crate::util::estimate_tokens; +use crate::utils::estimate_tokens; use super::config::MarkdownConfig; use super::frontmatter; diff --git a/src/parser/pdf/types.rs b/src/parser/pdf/types.rs index 1c2ac9fc..3b978836 100644 --- a/src/parser/pdf/types.rs +++ b/src/parser/pdf/types.rs @@ -3,7 +3,7 @@ //! PDF document types. -use crate::util::estimate_tokens; +use crate::utils::estimate_tokens; use serde::{Deserialize, Serialize}; /// A single page from a PDF document. diff --git a/src/retrieval/content/aggregator.rs b/src/retrieval/content/aggregator.rs index 2fa74435..04464930 100644 --- a/src/retrieval/content/aggregator.rs +++ b/src/retrieval/content/aggregator.rs @@ -11,7 +11,7 @@ use std::collections::HashMap; use tracing::{debug, info}; use crate::document::{DocumentTree, NodeId}; -use crate::util::estimate_tokens; +use crate::utils::estimate_tokens; use super::budget::{AllocationResult, AllocationStrategy, BudgetAllocator, SelectedContent}; use super::builder::{ContentMetadata, StructureBuilder, StructuredContent}; diff --git a/src/retrieval/content/budget.rs b/src/retrieval/content/budget.rs index 622712c6..830a7685 100644 --- a/src/retrieval/content/budget.rs +++ b/src/retrieval/content/budget.rs @@ -9,7 +9,7 @@ use std::collections::HashMap; use crate::document::NodeId; -use crate::util::estimate_tokens; +use crate::utils::estimate_tokens; use super::scorer::ContentRelevance; diff --git a/src/retrieval/content/scorer.rs b/src/retrieval/content/scorer.rs index d9d0dcbe..6e3c3abb 100644 --- a/src/retrieval/content/scorer.rs +++ b/src/retrieval/content/scorer.rs @@ -10,7 +10,7 @@ use std::collections::HashMap; use crate::document::NodeId; use crate::retrieval::search::{extract_keywords, Bm25Params, STOPWORDS}; -use crate::util::estimate_tokens; +use crate::utils::estimate_tokens; use super::config::ScoringStrategyConfig; diff --git a/src/retrieval/context.rs b/src/retrieval/context.rs index c4f278b9..0c9ecc4b 100644 --- a/src/retrieval/context.rs +++ b/src/retrieval/context.rs @@ -29,7 +29,7 @@ use super::types::RetrievalResult; use crate::document::{DocumentTree, NodeId}; -use crate::util::estimate_tokens; +use crate::utils::estimate_tokens; use std::collections::HashSet; /// Pruning strategy for context building. diff --git a/src/retrieval/pilot/llm_pilot.rs b/src/retrieval/pilot/llm_pilot.rs index 40aa945e..af0fd602 100644 --- a/src/retrieval/pilot/llm_pilot.rs +++ b/src/retrieval/pilot/llm_pilot.rs @@ -12,7 +12,9 @@ use tracing::{debug, info, warn}; use crate::document::DocumentTree; use crate::llm::{LlmClient, LlmExecutor}; +use crate::memo::{MemoKey, MemoOpType, MemoStore, MemoValue}; use crate::throttle::ConcurrencyController; +use crate::utils::fingerprint::Fingerprint; use super::budget::BudgetController; use super::builder::ContextBuilder; @@ -43,6 +45,11 @@ use super::r#trait::{Pilot, SearchState}; /// │ │ Budget │ │ LlmExecutor │ │ /// │ │ Controller │ │ (throttle+retry+fall) │ │ /// │ └─────────────┘ └───────────────────────┘ │ +/// │ │ +/// │ ┌─────────────┐ ┌───────────────────────┐ │ +/// │ │ Memo │ │ (cache LLM decisions) │ │ +/// │ │ Store │ │ │ │ +/// │ └─────────────┘ └───────────────────────┘ │ /// └─────────────────────────────────────────────────────────────┘ /// ``` /// @@ -81,6 +88,8 @@ pub struct LlmPilot { response_parser: ResponseParser, /// Feedback learner for improving decisions (optional). learner: Option>, + /// Memo store for caching decisions (optional). + memo_store: Option, } impl std::fmt::Debug for LlmPilot { @@ -107,6 +116,7 @@ impl LlmPilot { prompt_builder: PromptBuilder::new(), response_parser: ResponseParser::new(), learner: None, + memo_store: None, } } @@ -126,6 +136,7 @@ impl LlmPilot { prompt_builder: PromptBuilder::new(), response_parser: ResponseParser::new(), learner: None, + memo_store: None, } } @@ -144,6 +155,7 @@ impl LlmPilot { prompt_builder: PromptBuilder::new(), response_parser: ResponseParser::new(), learner: None, + memo_store: None, } } @@ -165,6 +177,7 @@ impl LlmPilot { prompt_builder, response_parser: ResponseParser::new(), learner: None, + memo_store: None, } } @@ -186,6 +199,16 @@ impl LlmPilot { self } + /// Add a memo store for caching decisions. + /// + /// When enabled, the pilot will cache LLM decisions based on + /// context fingerprints, avoiding redundant API calls for + /// similar navigation scenarios. + pub fn with_memo_store(mut self, store: MemoStore) -> Self { + self.memo_store = Some(store); + self + } + /// Check if using LlmExecutor (unified throttle/retry/fallback). pub fn has_executor(&self) -> bool { self.executor.is_some() @@ -196,11 +219,21 @@ impl LlmPilot { self.learner.is_some() } + /// Check if using memo store. + pub fn has_memo_store(&self) -> bool { + self.memo_store.is_some() + } + /// Get the feedback learner (if any). pub fn learner(&self) -> Option<&PilotLearner> { self.learner.as_deref() } + /// Get the memo store (if any). + pub fn memo_store(&self) -> Option<&MemoStore> { + self.memo_store.as_ref() + } + /// Record feedback for a decision. pub fn record_feedback(&self, record: FeedbackRecord) { if let Some(ref learner) = self.learner { @@ -210,6 +243,18 @@ impl LlmPilot { } } + /// Compute a cache key for a pilot decision. + fn compute_cache_key(&self, context: &super::builder::PilotContext, point: InterventionPoint) -> Option { + let store = self.memo_store.as_ref()?; + + // Build a fingerprint from the context using available methods + let context_str = context.to_string(); + let context_fp = Fingerprint::from_str(&context_str); + let query_fp = Fingerprint::from_str(&context.query_section); + + Some(MemoKey::pilot_decision(&context_fp, &query_fp)) + } + /// Check if budget allows LLM calls. fn has_budget(&self) -> bool { self.budget.can_call() @@ -243,6 +288,20 @@ impl LlmPilot { context: &super::builder::PilotContext, candidates: &[crate::document::NodeId], ) -> PilotDecision { + // Check memo cache first + if let Some(ref store) = self.memo_store { + if let Some(cache_key) = self.compute_cache_key(context, point) { + if let Some(cached) = store.get(&cache_key) { + if let MemoValue::PilotDecision(decision_value) = cached { + debug!("Memo cache hit for pilot decision at {:?}", point); + // Convert cached value back to PilotDecision + let decision = self.cached_value_to_decision(decision_value, candidates, point); + return decision; + } + } + } + } + // Build prompt let prompt = self.prompt_builder.build(point, context); @@ -313,6 +372,16 @@ impl LlmPilot { decision.ranked_candidates.len() ); + // Cache the decision + if let Some(ref store) = self.memo_store { + if let Some(cache_key) = self.compute_cache_key(context, point) { + let decision_value = self.decision_to_cached_value(&decision); + let tokens_saved = prompt.estimated_tokens as u64 + output_tokens as u64; + store.put_with_tokens(cache_key, MemoValue::PilotDecision(decision_value), tokens_saved); + debug!("Memo cache stored for pilot decision at {:?}", point); + } + } + decision } Err(e) => { @@ -322,6 +391,45 @@ impl LlmPilot { } } + /// Convert a PilotDecision to a cacheable value. + fn decision_to_cached_value(&self, decision: &PilotDecision) -> crate::memo::PilotDecisionValue { + crate::memo::PilotDecisionValue { + selected_idx: decision.ranked_candidates.first() + .map(|c| c.node_id.0.into()) + .unwrap_or(0), + confidence: decision.confidence, + reasoning: decision.reasoning.clone(), + } + } + + /// Convert a cached value back to a PilotDecision. + fn cached_value_to_decision( + &self, + value: crate::memo::PilotDecisionValue, + candidates: &[crate::document::NodeId], + point: InterventionPoint, + ) -> PilotDecision { + let ranked = candidates + .iter() + .enumerate() + .map(|(i, &node_id)| super::decision::RankedCandidate { + node_id, + score: if i == value.selected_idx { 1.0 } else { 0.5 / (i + 1) as f32 }, + reason: None, + }) + .collect(); + + PilotDecision { + ranked_candidates: ranked, + direction: super::decision::SearchDirection::GoDeeper { + reason: "Cached decision".to_string(), + }, + confidence: value.confidence, + reasoning: value.reasoning, + intervention_point: point, + } + } + /// Create a default decision when LLM fails. fn default_decision( &self, diff --git a/src/retrieval/pipeline_retriever.rs b/src/retrieval/pipeline_retriever.rs index 2947704a..222b3a9b 100644 --- a/src/retrieval/pipeline_retriever.rs +++ b/src/retrieval/pipeline_retriever.rs @@ -18,6 +18,7 @@ use super::types::{RetrieveOptions, RetrieveResponse}; use crate::document::DocumentTree; use crate::error::Result; use crate::llm::LlmClient; +use crate::memo::MemoStore; use crate::retrieval::pilot::{LlmPilot, PilotConfig}; /// Pipeline-based retriever using the stage architecture. @@ -42,6 +43,8 @@ pub struct PipelineRetriever { max_iterations: usize, /// Content aggregator configuration. content_config: Option, + /// Memo store for caching LLM decisions. + memo_store: Option, } impl Default for PipelineRetriever { @@ -58,6 +61,7 @@ impl PipelineRetriever { max_backtracks: 5, max_iterations: 10, content_config: None, + memo_store: None, } } @@ -88,6 +92,16 @@ impl PipelineRetriever { self } + /// Add a memo store for caching LLM decisions. + /// + /// When enabled, the pilot will cache navigation decisions based on + /// context fingerprints, avoiding redundant API calls for similar + /// navigation scenarios. + pub fn with_memo_store(mut self, store: MemoStore) -> Self { + self.memo_store = Some(store); + self + } + /// Build the orchestrator with all stages. fn build_orchestrator(&self) -> RetrievalOrchestrator { let mut orchestrator = RetrievalOrchestrator::new() @@ -108,7 +122,13 @@ impl PipelineRetriever { let mut search_stage = SearchStage::new(); if let Some(ref client) = self.llm_client { // Create LLM-based Pilot for semantic navigation guidance - let pilot = LlmPilot::new(client.clone(), PilotConfig::default()); + let mut pilot = LlmPilot::new(client.clone(), PilotConfig::default()); + + // Add memo store if available + if let Some(ref store) = self.memo_store { + pilot = pilot.with_memo_store(store.clone()); + } + search_stage = search_stage.with_pilot(Arc::new(pilot)); } orchestrator = orchestrator.stage(search_stage); @@ -180,6 +200,7 @@ impl Clone for PipelineRetriever { max_backtracks: self.max_backtracks, max_iterations: self.max_iterations, content_config: self.content_config.clone(), + memo_store: self.memo_store.clone(), } } } diff --git a/src/retrieval/stages/evaluate.rs b/src/retrieval/stages/evaluate.rs index d6d8a213..31e7f173 100644 --- a/src/retrieval/stages/evaluate.rs +++ b/src/retrieval/stages/evaluate.rs @@ -15,7 +15,7 @@ use crate::retrieval::content::{ContentAggregator, ContentAggregatorConfig}; use crate::retrieval::pipeline::{FailurePolicy, PipelineContext, RetrievalStage, StageOutcome}; use crate::retrieval::sufficiency::{LlmJudge, SufficiencyChecker, ThresholdChecker}; use crate::retrieval::types::{RetrievalResult, RetrieveResponse, SufficiencyLevel}; -use crate::util::estimate_tokens; +use crate::utils::estimate_tokens; /// Evaluate Stage - evaluates retrieval sufficiency. /// diff --git a/src/storage/persistence.rs b/src/storage/persistence.rs index 1d63cd47..3e0e8281 100644 --- a/src/storage/persistence.rs +++ b/src/storage/persistence.rs @@ -51,6 +51,32 @@ pub struct DocumentMeta { /// Last modified timestamp. pub modified_at: chrono::DateTime, + + // === Processing State (for incremental updates) === + + /// Content fingerprint for change detection. + #[serde(default, skip_serializing_if = "crate::utils::fingerprint::Fingerprint::is_zero")] + pub content_fingerprint: crate::utils::fingerprint::Fingerprint, + + /// Processing version (incremented when algorithm changes). + #[serde(default)] + pub processing_version: u32, + + /// Node count in the tree. + #[serde(default)] + pub node_count: usize, + + /// Total tokens in summaries. + #[serde(default)] + pub total_summary_tokens: usize, + + /// LLM model used for processing. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub processing_model: Option, + + /// Last processing duration in milliseconds. + #[serde(default)] + pub processing_duration_ms: u64, } impl DocumentMeta { @@ -67,6 +93,12 @@ impl DocumentMeta { line_count: None, created_at: now, modified_at: now, + content_fingerprint: crate::utils::fingerprint::Fingerprint::zero(), + processing_version: 0, + node_count: 0, + total_summary_tokens: 0, + processing_model: None, + processing_duration_ms: 0, } } @@ -81,6 +113,70 @@ impl DocumentMeta { self.description = Some(desc.into()); self } + + /// Set the content fingerprint. + pub fn with_fingerprint(mut self, fp: crate::utils::fingerprint::Fingerprint) -> Self { + self.content_fingerprint = fp; + self + } + + /// Set the processing version. + pub fn with_processing_version(mut self, version: u32) -> Self { + self.processing_version = version; + self + } + + /// Set the processing model. + pub fn with_processing_model(mut self, model: impl Into) -> Self { + self.processing_model = Some(model.into()); + self + } + + /// Update processing statistics. + pub fn update_processing_stats( + &mut self, + node_count: usize, + summary_tokens: usize, + duration_ms: u64, + ) { + self.node_count = node_count; + self.total_summary_tokens = summary_tokens; + self.processing_duration_ms = duration_ms; + self.modified_at = chrono::Utc::now(); + } + + /// Mark as processed with given fingerprint and version. + pub fn mark_processed( + &mut self, + fp: crate::utils::fingerprint::Fingerprint, + version: u32, + model: Option<&str>, + ) { + self.content_fingerprint = fp; + self.processing_version = version; + self.processing_model = model.map(|s| s.to_string()); + self.modified_at = chrono::Utc::now(); + } + + /// Check if the document needs reprocessing. + pub fn needs_reprocessing(&self, current_fp: &crate::utils::fingerprint::Fingerprint, current_version: u32) -> bool { + // Never processed + if self.processing_version == 0 { + return true; + } + + // Algorithm version changed + if self.processing_version < current_version { + return true; + } + + // Content changed + if &self.content_fingerprint != current_fp { + return true; + } + + false + } } /// A persisted document index containing tree and metadata. diff --git a/src/utils/fingerprint.rs b/src/utils/fingerprint.rs new file mode 100644 index 00000000..d7b8a988 --- /dev/null +++ b/src/utils/fingerprint.rs @@ -0,0 +1,496 @@ +// Copyright (c) 2026 vectorless developers +// SPDX-License-Identifier: Apache-2.0 + +//! Fingerprint system for content and subtree identification. +//! +//! This module provides a robust fingerprinting system for content identification, +//! enabling precise change detection at both content and subtree levels. +//! +//! # Key Features +//! +//! - **Content Fingerprint**: Hash of node content (title + text) +//! - **Subtree Fingerprint**: Recursive hash including all descendants +//! - **Stable Serialization**: Type-tagged hashing for consistent results +//! +//! # Usage +//! +//! ```rust,ignore +//! use vectorless::fingerprint::{Fingerprint, Fingerprinter}; +//! +//! // Create a fingerprint from content +//! let fp = Fingerprinter::new() +//! .with_str("Hello, world!") +//! .into_fingerprint(); +//! +//! // Compare fingerprints +//! if old_fp == new_fp { +//! // Content unchanged +//! } +//! ``` + +use base64::prelude::*; +use blake2::digest::typenum; +use blake2::{Blake2b, Digest}; +use serde::{Deserialize, Serialize}; +use std::hash::{Hash, Hasher}; + +/// A 128-bit fingerprint for content identification. +/// +/// Uses BLAKE2b-128 for fast, collision-resistant hashing. +/// Displayed as base64 for compact representation. +#[derive(Clone, Copy, PartialEq, Eq)] +pub struct Fingerprint(pub [u8; 16]); + +impl Fingerprint { + /// Create a fingerprint from raw bytes. + pub fn new(bytes: [u8; 16]) -> Self { + Self(bytes) + } + + /// Create a fingerprint from a byte slice (hashes the slice). + pub fn from_bytes(data: &[u8]) -> Self { + let mut hasher = Blake2b::::default(); + hasher.update(data); + Self(hasher.finalize().into()) + } + + /// Create a fingerprint from a string. + pub fn from_str(s: &str) -> Self { + Self::from_bytes(s.as_bytes()) + } + + /// Encode fingerprint to base64 string. + pub fn to_base64(self) -> String { + BASE64_STANDARD.encode(self.0) + } + + /// Decode fingerprint from base64 string. + pub fn from_base64(s: &str) -> Result { + let bytes = BASE64_STANDARD + .decode(s) + .map_err(|e| FingerprintError::InvalidBase64(e.to_string()))?; + let bytes: [u8; 16] = bytes + .try_into() + .map_err(|e: Vec| FingerprintError::InvalidLength(e.len()))?; + Ok(Self(bytes)) + } + + /// Get the raw bytes. + pub fn as_bytes(&self) -> &[u8] { + &self.0 + } + + /// Check if this is a zero/null fingerprint. + pub fn is_zero(&self) -> bool { + self.0 == [0u8; 16] + } + + /// Create a zero/null fingerprint (for uninitialized state). + pub fn zero() -> Self { + Self([0u8; 16]) + } +} + +impl std::fmt::Display for Fingerprint { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for byte in &self.0 { + write!(f, "{:02x}", byte)?; + } + Ok(()) + } +} + +impl std::fmt::Debug for Fingerprint { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Fingerprint({})", self) + } +} + +impl Hash for Fingerprint { + fn hash(&self, state: &mut H) { + // Fingerprint is already evenly distributed, use first 8 bytes + state.write(&self.0[..8]); + } +} + +impl Serialize for Fingerprint { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.to_base64()) + } +} + +impl<'de> Deserialize<'de> for Fingerprint { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Self::from_base64(&s).map_err(serde::de::Error::custom) + } +} + +impl Default for Fingerprint { + fn default() -> Self { + Self::zero() + } +} + +/// Error type for fingerprint operations. +#[derive(Debug, thiserror::Error)] +pub enum FingerprintError { + /// Invalid base64 encoding. + #[error("Invalid base64: {0}")] + InvalidBase64(String), + + /// Invalid fingerprint length. + #[error("Invalid fingerprint length: {0}")] + InvalidLength(usize), + + /// Serialization error. + #[error("Serialization error: {0}")] + Serialization(String), +} + +/// Builder for creating fingerprints. +/// +/// Provides a fluent API for incrementally building fingerprints +/// from multiple values. +/// +/// # Example +/// +/// ```rust,ignore +/// let fp = Fingerprinter::new() +/// .with_str("title") +/// .with_str("content") +/// .with_usize(42) +/// .into_fingerprint(); +/// ``` +#[derive(Clone)] +pub struct Fingerprinter { + hasher: Blake2b, +} + +impl Fingerprinter { + /// Create a new fingerprinter. + pub fn new() -> Self { + Self { + hasher: Blake2b::::default(), + } + } + + /// Finalize and produce the fingerprint. + pub fn into_fingerprint(self) -> Fingerprint { + Fingerprint(self.hasher.finalize().into()) + } + + /// Add a string to the hash. + pub fn with_str(mut self, s: &str) -> Self { + self.write_str(s); + self + } + + /// Add a string to the hash (mutable). + pub fn write_str(&mut self, s: &str) { + self.write_type_tag("s"); + self.write_varlen_bytes(s.as_bytes()); + } + + /// Add bytes to the hash. + pub fn with_bytes(mut self, bytes: &[u8]) -> Self { + self.write_bytes(bytes); + self + } + + /// Add bytes to the hash (mutable). + pub fn write_bytes(&mut self, bytes: &[u8]) { + self.write_type_tag("b"); + self.write_varlen_bytes(bytes); + } + + /// Add a usize to the hash. + pub fn with_usize(mut self, n: usize) -> Self { + self.write_usize(n); + self + } + + /// Add a usize to the hash (mutable). + pub fn write_usize(&mut self, n: usize) { + self.write_type_tag("u"); + self.hasher.update((n as u64).to_le_bytes()); + } + + /// Add a u64 to the hash. + pub fn with_u64(mut self, n: u64) -> Self { + self.write_u64(n); + self + } + + /// Add a u64 to the hash (mutable). + pub fn write_u64(&mut self, n: u64) { + self.write_type_tag("u8"); + self.hasher.update(n.to_le_bytes()); + } + + /// Add an i64 to the hash. + pub fn with_i64(mut self, n: i64) -> Self { + self.write_i64(n); + self + } + + /// Add an i64 to the hash (mutable). + pub fn write_i64(&mut self, n: i64) { + self.write_type_tag("i8"); + self.hasher.update(n.to_le_bytes()); + } + + /// Add a bool to the hash. + pub fn with_bool(mut self, b: bool) -> Self { + self.write_bool(b); + self + } + + /// Add a bool to the hash (mutable). + pub fn write_bool(&mut self, b: bool) { + self.write_type_tag(if b { "t" } else { "f" }); + } + + /// Add an optional string to the hash. + pub fn with_option_str(mut self, opt: Option<&str>) -> Self { + self.write_option_str(opt); + self + } + + /// Add an optional string to the hash (mutable). + pub fn write_option_str(&mut self, opt: Option<&str>) { + match opt { + Some(s) => { + self.write_type_tag("some"); + self.write_str(s); + } + None => { + self.write_type_tag("none"); + } + } + } + + /// Add another fingerprint to the hash. + pub fn with_fingerprint(mut self, fp: &Fingerprint) -> Self { + self.write_fingerprint(fp); + self + } + + /// Add another fingerprint to the hash (mutable). + pub fn write_fingerprint(&mut self, fp: &Fingerprint) { + self.write_type_tag("fp"); + self.hasher.update(&fp.0); + } + + /// Add raw bytes directly (no type tag). + pub fn write_raw(&mut self, bytes: &[u8]) { + self.hasher.update(bytes); + } + + // Internal helpers + + fn write_type_tag(&mut self, tag: &str) { + self.hasher.update(tag.as_bytes()); + self.hasher.update(b";"); + } + + fn write_varlen_bytes(&mut self, bytes: &[u8]) { + self.hasher.update((bytes.len() as u32).to_le_bytes()); + self.hasher.update(bytes); + } +} + +/// Node fingerprint containing both content and subtree fingerprints. +/// +/// This enables precise change detection: +/// - If `content_fp` changes, the node's content was modified +/// - If `subtree_fp` changes, the node or its descendants were modified +/// - If `content_fp` is same but `subtree_fp` changed, only descendants changed +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub struct NodeFingerprint { + /// Fingerprint of this node's content (title + text). + pub content: Fingerprint, + + /// Fingerprint of the entire subtree (including this node). + /// Computed recursively from all descendants. + pub subtree: Fingerprint, +} + +impl NodeFingerprint { + /// Create a new node fingerprint. + pub fn new(content: Fingerprint, subtree: Fingerprint) -> Self { + Self { content, subtree } + } + + /// Create a fingerprint for a leaf node (content == subtree). + pub fn leaf(content: Fingerprint) -> Self { + Self { + content, + subtree: content, + } + } + + /// Create a zero/null fingerprint. + pub fn zero() -> Self { + Self { + content: Fingerprint::zero(), + subtree: Fingerprint::zero(), + } + } + + /// Check if this is a zero fingerprint. + pub fn is_zero(&self) -> bool { + self.content.is_zero() && self.subtree.is_zero() + } + + /// Check if content changed compared to another fingerprint. + pub fn content_changed(&self, other: &Self) -> bool { + self.content != other.content + } + + /// Check if subtree changed compared to another fingerprint. + pub fn subtree_changed(&self, other: &Self) -> bool { + self.subtree != other.subtree + } + + /// Check if only descendants changed (content same, subtree different). + pub fn only_descendants_changed(&self, other: &Self) -> bool { + self.content == other.content && self.subtree != other.subtree + } +} + +impl Default for NodeFingerprint { + fn default() -> Self { + Self::zero() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_fingerprint_from_str() { + let fp1 = Fingerprint::from_str("hello"); + let fp2 = Fingerprint::from_str("hello"); + let fp3 = Fingerprint::from_str("world"); + + assert_eq!(fp1, fp2); + assert_ne!(fp1, fp3); + } + + #[test] + fn test_fingerprint_base64_roundtrip() { + let fp = Fingerprint::from_str("test content"); + let encoded = fp.to_base64(); + let decoded = Fingerprint::from_base64(&encoded).unwrap(); + assert_eq!(fp, decoded); + } + + #[test] + fn test_fingerprinter_chaining() { + let fp1 = Fingerprinter::new() + .with_str("title") + .with_str("content") + .into_fingerprint(); + + let fp2 = Fingerprinter::new() + .with_str("title") + .with_str("content") + .into_fingerprint(); + + let fp3 = Fingerprinter::new() + .with_str("title") + .with_str("different") + .into_fingerprint(); + + assert_eq!(fp1, fp2); + assert_ne!(fp1, fp3); + } + + #[test] + fn test_fingerprinter_types() { + let fp1 = Fingerprinter::new() + .with_str("test") + .with_usize(42) + .with_bool(true) + .into_fingerprint(); + + let fp2 = Fingerprinter::new() + .with_str("test") + .with_usize(42) + .with_bool(true) + .into_fingerprint(); + + let fp3 = Fingerprinter::new() + .with_str("test") + .with_usize(43) // different number + .with_bool(true) + .into_fingerprint(); + + assert_eq!(fp1, fp2); + assert_ne!(fp1, fp3); + } + + #[test] + fn test_node_fingerprint() { + let content = Fingerprint::from_str("content"); + let subtree = Fingerprint::from_str("subtree"); + + let fp = NodeFingerprint::new(content, subtree); + + assert!(!fp.is_zero()); + assert_eq!(fp.content, content); + assert_eq!(fp.subtree, subtree); + } + + #[test] + fn test_node_fingerprint_change_detection() { + let old = NodeFingerprint::new( + Fingerprint::from_str("content"), + Fingerprint::from_str("subtree"), + ); + + // Same content, different subtree + let new1 = NodeFingerprint::new( + Fingerprint::from_str("content"), + Fingerprint::from_str("different"), + ); + assert!(new1.only_descendants_changed(&old)); + assert!(!new1.content_changed(&old)); + assert!(new1.subtree_changed(&old)); + + // Different content + let new2 = NodeFingerprint::new( + Fingerprint::from_str("different"), + Fingerprint::from_str("subtree"), + ); + assert!(!new2.only_descendants_changed(&old)); + assert!(new2.content_changed(&old)); + } + + #[test] + fn test_fingerprint_serialization() { + let fp = Fingerprint::from_str("test serialization"); + let json = serde_json::to_string(&fp).unwrap(); + let decoded: Fingerprint = serde_json::from_str(&json).unwrap(); + assert_eq!(fp, decoded); + } + + #[test] + fn test_node_fingerprint_serialization() { + let fp = NodeFingerprint::new( + Fingerprint::from_str("content"), + Fingerprint::from_str("subtree"), + ); + let json = serde_json::to_string(&fp).unwrap(); + let decoded: NodeFingerprint = serde_json::from_str(&json).unwrap(); + assert_eq!(fp, decoded); + } +} diff --git a/src/util/format.rs b/src/utils/format.rs similarity index 97% rename from src/util/format.rs rename to src/utils/format.rs index 99e821bf..95ceea07 100644 --- a/src/util/format.rs +++ b/src/utils/format.rs @@ -8,7 +8,7 @@ /// # Example /// /// ``` -/// use vectorless::util::truncate; +/// use vectorless::utils::truncate; /// /// assert_eq!(truncate("hello world", 8), "hello..."); /// assert_eq!(truncate("hi", 10), "hi"); @@ -53,7 +53,7 @@ pub fn truncate_words(text: &str, max_len: usize) -> String { /// # Example /// /// ``` -/// use vectorless::util::format_number; +/// use vectorless::utils::format_number; /// /// assert_eq!(format_number(1000), "1,000"); /// assert_eq!(format_number(1234567), "1,234,567"); @@ -78,7 +78,7 @@ pub fn format_number(n: usize) -> String { /// # Example /// /// ``` -/// use vectorless::util::format_bytes; +/// use vectorless::utils::format_bytes; /// /// assert_eq!(format_bytes(500), "500 B"); /// assert_eq!(format_bytes(1024), "1.0 KB"); @@ -106,7 +106,7 @@ pub fn format_bytes(bytes: usize) -> String { /// # Example /// /// ``` -/// use vectorless::util::format_percent; +/// use vectorless::utils::format_percent; /// /// assert_eq!(format_percent(0.5), "50.0%"); /// assert_eq!(format_percent(0.123), "12.3%"); diff --git a/src/util/mod.rs b/src/utils/mod.rs similarity index 87% rename from src/util/mod.rs rename to src/utils/mod.rs index 6af20f59..2a7f4198 100644 --- a/src/util/mod.rs +++ b/src/utils/mod.rs @@ -12,6 +12,7 @@ mod format; mod timing; mod token; +pub mod fingerprint; pub use format::{ clean_whitespace, format_bytes, format_number, format_percent, indent, line_count, truncate, @@ -19,3 +20,5 @@ pub use format::{ }; pub use timing::{Timer, format_duration, format_duration_compact}; pub use token::{estimate_tokens, estimate_tokens_batch, estimate_tokens_fast}; +// Fingerprint +pub use fingerprint::{Fingerprint, Fingerprinter, NodeFingerprint}; \ No newline at end of file diff --git a/src/util/timing.rs b/src/utils/timing.rs similarity index 99% rename from src/util/timing.rs rename to src/utils/timing.rs index b8858497..f133f484 100644 --- a/src/util/timing.rs +++ b/src/utils/timing.rs @@ -10,7 +10,7 @@ use std::time::{Duration, Instant}; /// # Example /// /// ```rust -/// use vectorless::util::Timer; +/// use vectorless::utils::Timer; /// /// let timer = Timer::start("indexing"); /// // ... do work ... diff --git a/src/util/token.rs b/src/utils/token.rs similarity index 100% rename from src/util/token.rs rename to src/utils/token.rs