diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e81235..b6e816e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ elfmem uses [Semantic Versioning](https://semver.org/). - **`MemorySystem.learn_document(text, chunk_size, chunker, skip_llm)`:** Ingest a document in one call — chunks text, learns each chunk, auto-consolidates via `dream()` at `inbox_threshold` intervals. Accepts an optional `chunker` callback (e.g. `nltk.sent_tokenize`); default splits at sentence boundaries. Returns `LearnDocumentResult` with chunk and consolidation counts. - **`LearnDocumentResult` type:** New result type with `chunks_total`, `chunks_created`, `chunks_duplicate`, `consolidations`, `blocks_promoted`. Exported from `elfmem`. - **BM25 keyword search in retrieval pipeline (stage 2b):** `hybrid_retrieve()` now runs BM25 in parallel with vector search, discovering blocks with strong keyword overlap that embedding similarity misses. Soft dependency on `rank_bm25` — when not installed, the stage is silently skipped (zero regression). Install via `pip install elfmem[bm25]`. +- **Reciprocal Rank Fusion (stage 2c):** When both vector and BM25 produce results, `hybrid_retrieve()` merges their ranked lists via RRF (k=60, Cormack et al. 2009). Blocks found by both rankers score higher; BM25-only blocks receive proportional relevance scores instead of the previous `similarity=0.0`. Falls back to raw cosine when BM25 is absent. - **`dream(skip_llm, skip_contradictions)` parameters:** `dream()` now forwards `skip_llm` and `skip_contradictions` to `consolidate()`, enabling fast-path consolidation without bypassing policy tracking or threshold persistence. - **`MABenchConfig.context_window_tokens`:** New config field (default 4096) representing the LM Studio model's context window. All answer-context truncation derives from this value; set to 2048 for smaller models. diff --git a/docs/plans/learnedmembench_adapter.md b/docs/plans/learnedmembench_adapter.md new file mode 100644 index 0000000..63c912e --- /dev/null +++ b/docs/plans/learnedmembench_adapter.md @@ -0,0 +1,366 @@ +# LearnedMemBench Adapter for elfmem + +## Overview + +The LMB adapter lives at `benchmarks/learnedmembench/adapter.py`, following the +same pattern as the existing LoCoMo and MABench adapters. It implements the +`LearnedMemoryAdapter` protocol defined by the external `learnedmembench` +package, wiring each protocol method to elfmem's public API. + +The adapter is thin — elfmem already has direct API equivalents for every +protocol method. The main work is mapping between LMB's protocol types and +elfmem's types, plus exposing state introspection (which elfmem supports +internally but doesn't yet expose publicly). + +## File Structure + +``` +benchmarks/learnedmembench/ +├── adapter.py # LearnedMemoryAdapter implementation +├── config.py # LMBenchConfig (embedding model, LLM model, etc) +└── __init__.py +``` + +## Protocol → elfmem API Mapping + +Every protocol method maps directly to an existing elfmem operation: + +``` +Protocol method elfmem API Notes +───────────────────────────────────────────────────────────────── +learn(content, tags) system.learn(content, Direct. Returns block ID + tags=tags) from LearnResult.block_id + +recall(query, top_k) system.recall(query=, Direct. Map ScoredBlock → + top_k=) protocol RetrievedBlock + +consolidate() system.consolidate() Direct. No return needed + +curate() system.curate() Direct. No return needed + +begin_session() system.begin_session() Direct + +end_session() system.end_session() Direct + +outcome(ids, signal) system.outcome(ids, Direct + signal=signal) + +setup_identity(values) system.setup(values= Direct + values) + +capabilities() Return fixed set See §Capabilities below + +get_block_state(id) queries.get_block(conn, Needs new public API + id) — see §State Introspection + +get_contradictions() queries.get_contra- Needs new public API + dictions_for_blocks() + +get_edges() queries.get_edges_for_ Needs new public API + block() / get_all_edges +``` + +## Capabilities Declaration + +elfmem supports all LMB capabilities: + +```python +def capabilities(self) -> set[str]: + return { + "consolidation", + "decay", + "contradiction", + "graph", + "sessions", + "outcome", + "identity", + "curate", + "state_introspection", + } +``` + +## Implementation + +### adapter.py + +```python +"""elfmem adapter for LearnedMemBench.""" + +from __future__ import annotations + +import tempfile +from pathlib import Path + +from elfmem import ElfmemConfig, MemorySystem + +from benchmarks.learnedmembench.config import LMBenchConfig + +# Import the protocol types from the external learnedmembench package. +# These will be defined by learnedmembench — shown here as reference. +# +# from learnedmembench.protocol import ( +# LearnedMemoryAdapter, +# RetrievedBlock, +# BlockState, +# ContradictionRecord, +# EdgeRecord, +# ) + + +class ElfmemLMBAdapter: + """LearnedMemoryAdapter implementation for elfmem. + + Lifecycle: create via `ElfmemLMBAdapter.create()`, use, then `close()`. + Or use as async context manager. + """ + + def __init__(self, system: MemorySystem, db_path: str) -> None: + self._system = system + self._db_path = db_path + + @classmethod + async def create(cls, config: LMBenchConfig) -> ElfmemLMBAdapter: + elfmem_cfg = build_elfmem_config(config) + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: + db_path = tmp.name + system = await MemorySystem.from_config(db_path, config=elfmem_cfg) + return cls(system, db_path) + + async def close(self) -> None: + await self._system.close() + for suffix in ["", "-wal", "-shm"]: + p = Path(self._db_path + suffix) + if p.exists(): + p.unlink() + + # ── Required operations ──────────────────────────────────── + + async def learn(self, content: str, tags: list[str]) -> str: + result = await self._system.learn(content=content, tags=tags) + return result.block_id + + async def recall(self, query: str, top_k: int) -> list[dict]: + blocks = await self._system.recall(query=query, top_k=top_k) + return [ + { + "id": b.id, + "content": b.content, + "score": b.score, + "similarity": b.similarity, + "confidence": b.confidence, + "recency": b.recency, + "tags": b.tags, + } + for b in blocks + ] + + # ── Optional lifecycle operations ────────────────────────── + + async def consolidate(self) -> None: + await self._system.consolidate() + + async def curate(self) -> None: + await self._system.curate() + + async def begin_session(self) -> None: + await self._system.begin_session() + + async def end_session(self) -> None: + await self._system.end_session() + + async def outcome(self, block_ids: list[str], signal: float) -> None: + await self._system.outcome(block_ids, signal=signal) + + async def setup_identity(self, values: list[str]) -> None: + await self._system.setup(values=values) + + # ── Capability declaration ───────────────────────────────── + + def capabilities(self) -> set[str]: + return { + "consolidation", "decay", "contradiction", "graph", + "sessions", "outcome", "identity", "curate", + "state_introspection", + } + + # ── State introspection ──────────────────────────────────── + # These methods require new public API on MemorySystem. + # See §What elfmem Needs below. + + async def get_block_state(self, block_id: str) -> dict | None: + return await self._system.get_block(block_id) + + async def get_contradictions(self) -> list[dict]: + return await self._system.get_contradictions() + + async def get_edges(self) -> list[dict]: + return await self._system.get_edges() + + +def build_elfmem_config(config: LMBenchConfig) -> ElfmemConfig: + return ElfmemConfig.model_validate({ + "llm": { + "model": config.llm_model, + "base_url": config.base_url, + }, + "embeddings": { + "model": config.embedding_model, + "base_url": config.base_url, + "dimensions": config.embedding_dimensions, + }, + "memory": { + "inbox_threshold": config.inbox_threshold, + "top_k": config.top_k, + "search_window_hours": 100000.0, + "curate_interval_hours": 100000.0, + }, + }) +``` + +### config.py + +```python +"""Configuration for LearnedMemBench adapter.""" + +from dataclasses import dataclass + + +@dataclass +class LMBenchConfig: + base_url: str = "http://localhost:1234/v1" + llm_model: str = "google/gemma-4-26b-a4b" + embedding_model: str = "text-embedding-nomic-embed-text-v1.5" + embedding_dimensions: int = 768 + top_k: int = 10 + inbox_threshold: int = 50 +``` + +## What elfmem Needs + +The adapter is almost entirely passthrough. Three state introspection methods +need new public API on `MemorySystem`. The internal `queries.py` functions +already exist — they just need thin public wrappers. + +### 1. `get_block(block_id) -> dict | None` + +**Internal function**: `queries.get_block(conn, block_id)` at +`src/elfmem/db/queries.py:85` + +**What to add to `api.py`**: + +```python +async def get_block(self, block_id: str) -> dict[str, Any] | None: + """Return raw block state for introspection. + + USE WHEN: Benchmarks or debugging need internal block state. + DON'T USE WHEN: You want retrieval results — use recall(). + COST: Single DB query. + RETURNS: Block dict with id, content, status, confidence, + decay_lambda, reinforcement_count, last_reinforced_at, + embedding. None if not found. + NEXT: Interpret state for verification. + """ + async with self._engine.connect() as conn: + return await queries.get_block(conn, block_id) +``` + +### 2. `get_contradictions() -> list[dict]` + +**Internal function**: `queries.get_contradictions_for_blocks(conn, block_ids)` +at `src/elfmem/db/queries.py:742`. Currently requires block IDs — need a +variant that returns all contradictions. + +**What to add**: + +First, add `get_all_contradictions()` to `queries.py`: + +```python +async def get_all_contradictions(conn: AsyncConnection) -> list[dict[str, Any]]: + """Return all contradiction records.""" + result = await conn.execute( + text("SELECT * FROM contradictions") + ) + return [dict(row._mapping) for row in result] +``` + +Then expose on `api.py`: + +```python +async def get_contradictions(self) -> list[dict[str, Any]]: + """Return all detected contradiction records. + + USE WHEN: Benchmarks need to verify contradiction detection. + DON'T USE WHEN: You want retrieval — contradictions are already + suppressed in recall(). + COST: Single DB query. + RETURNS: List of dicts with block_a_id, block_b_id, score, resolved. + NEXT: Compare against expected contradictions. + """ + async with self._engine.connect() as conn: + return await queries.get_all_contradictions(conn) +``` + +### 3. `get_edges() -> list[dict]` + +**Internal function**: `queries.get_edges_for_block(conn, block_id)` at +`src/elfmem/db/queries.py:468`. Currently per-block — need a variant that +returns all edges. + +**What to add**: + +First, add `get_all_edges()` to `queries.py`: + +```python +async def get_all_edges(conn: AsyncConnection) -> list[dict[str, Any]]: + """Return all edges in the knowledge graph.""" + result = await conn.execute( + text("SELECT * FROM edges") + ) + return [dict(row._mapping) for row in result] +``` + +Then expose on `api.py`: + +```python +async def get_edges(self) -> list[dict[str, Any]]: + """Return all edges in the knowledge graph. + + USE WHEN: Benchmarks need to verify graph formation. + DON'T USE WHEN: You want retrieval — graph expansion is already + part of recall(). + COST: Single DB query. O(edges) result size. + RETURNS: List of dicts with from_id, to_id, weight, relation_type, + origin, reinforcement_count, last_active_hours. + NEXT: Analyse graph topology or compare against expected edges. + """ + async with self._engine.connect() as conn: + return await queries.get_all_edges(conn) +``` + +## Implementation Order + +1. **Add 3 public API methods** to `MemorySystem` (`get_block`, `get_contradictions`, + `get_edges`) — wrapping existing internal queries +2. **Add 2 query functions** to `queries.py` (`get_all_contradictions`, + `get_all_edges`) +3. **Create** `benchmarks/learnedmembench/config.py` +4. **Create** `benchmarks/learnedmembench/adapter.py` +5. **Tests**: Test the 3 new public API methods through the public API + (e.g., learn facts, consolidate, verify `get_block` returns expected state) + +Steps 1-2 are the only changes to elfmem's core. Steps 3-4 are benchmark +adapter code. Step 5 follows testing_principles.md — test through the public +API only. + +## What NOT to Do + +- **Don't add LMB as a dependency of elfmem.** The adapter imports from + `learnedmembench` (the external package) but elfmem itself has no knowledge + of LMB. Same pattern as the LoCoMo adapter importing `sentence_transformers`. +- **Don't change existing API semantics.** The 3 new methods are additive. + `recall()`, `consolidate()`, etc remain unchanged. +- **Don't expose internal state beyond what the protocol needs.** `get_block` + returns the raw dict — the adapter maps it to the protocol's `BlockState`. + elfmem doesn't need to know about LMB's types. +- **Don't duplicate logic.** The adapter is pure delegation. No scoring, no + filtering, no BM25 — elfmem's pipeline handles all of that via `recall()`. diff --git a/src/elfmem/memory/retrieval.py b/src/elfmem/memory/retrieval.py index f266e81..03bf897 100644 --- a/src/elfmem/memory/retrieval.py +++ b/src/elfmem/memory/retrieval.py @@ -1,8 +1,13 @@ """6-stage hybrid retrieval pipeline — pure (no side effects). -Stages: pre-filter → vector → BM25 → graph expand → composite score → MMR. +Stages: pre-filter → vector → BM25 → RRF fusion → graph expand → composite score → MMR. BM25 (stage 2b) requires the optional ``rank_bm25`` package. When not installed, the stage is silently skipped and retrieval works as a 5-stage vector-only pipeline. + +When both vector and BM25 produce results, Reciprocal Rank Fusion (RRF) merges +the two ranked lists into a single relevance score per block. Blocks found by +both rankers score higher than blocks found by one. When BM25 is absent, the +pipeline falls back to raw cosine similarity — zero behavioral change. """ from __future__ import annotations @@ -40,6 +45,9 @@ DEFAULT_SEARCH_WINDOW_HOURS = 200.0 # MMR diversity coefficient: 1.0 = pure relevance, 0.0 = pure diversity MMR_DIVERSITY_LAMBDA = 0.7 +# RRF damping constant (Cormack et al. 2009). Higher k dampens the influence +# of top-ranked items, producing smoother score distributions. +RRF_K = 60 async def hybrid_retrieve( @@ -53,11 +61,12 @@ async def hybrid_retrieve( tag_filter: str | None = None, search_window_hours: float = DEFAULT_SEARCH_WINDOW_HOURS, ) -> list[ScoredBlock]: - """Execute the 6-stage hybrid retrieval pipeline. + """Execute the 7-stage hybrid retrieval pipeline. Stage 1 — Pre-filter: active blocks within search window. Stage 2 — Vector search: cosine similarity → top N_seeds. (Skipped if no query.) Stage 2b — BM25 keyword search: term overlap → top N_bm25. (Requires rank_bm25.) + Stage 2c — RRF fusion: merge vector + BM25 into unified relevance scores. Stage 3 — Graph expand: 1-hop neighbours of seeds. (Skipped if no query.) Stage 4 — Composite score: rank all candidates. Stage 5 — MMR diversity: reorder for relevance + diversity. (Query-aware only.) @@ -82,16 +91,16 @@ async def hybrid_retrieve( n_seeds=top_k * N_SEEDS_MULTIPLIER, ) - # Stage 2b: BM25 keyword candidates (additive — discovers blocks - # that vector search missed due to vocabulary mismatch). - seed_ids_set = {b["id"] for b, _ in seed_pairs} + # Stage 2b: BM25 keyword candidates (discovers blocks that vector + # search missed due to vocabulary mismatch). bm25_pairs = _stage_2b_bm25_search( candidates, query, n_seeds=top_k * N_SEEDS_MULTIPLIER, ) - for block, _bm25_score in bm25_pairs: - if block["id"] not in seed_ids_set: - seed_pairs.append((block, 0.0)) - seed_ids_set.add(block["id"]) + + # Stage 2c: Fuse vector + BM25 via Reciprocal Rank Fusion. + # When BM25 has signal, RRF produces the similarity score. + # When BM25 is empty, raw cosine similarity is preserved. + seed_pairs, seed_ids_set = _fuse_candidates(seed_pairs, bm25_pairs) seed_ids = [b["id"] for b, _ in seed_pairs] expanded = await _stage_3_graph_expand( @@ -204,6 +213,58 @@ def _stage_2b_bm25_search( return ranked[:n_seeds] +def _fuse_candidates( + vector_ranked: list[tuple[dict[str, Any], float]], + bm25_ranked: list[tuple[dict[str, Any], float]], + k: int = RRF_K, +) -> tuple[list[tuple[dict[str, Any], float]], set[str]]: + """Fuse vector and BM25 candidate lists via Reciprocal Rank Fusion. + + When BM25 produces results with positive scores, both ranked lists are + merged using RRF (Cormack et al. 2009) to produce a single relevance + score per block in [0.0, 1.0]. Blocks found by both rankers receive + contributions from both, scoring higher than blocks found by one. + + When BM25 is empty or all scores are zero, returns vector results with + their original cosine similarity — zero behavioral change for users + without ``rank_bm25`` installed. + + Returns ``(seed_pairs, seed_id_set)`` ready for graph expansion. + """ + bm25_with_signal = [(b, s) for b, s in bm25_ranked if s > 0] + + if not bm25_with_signal: + return list(vector_ranked), {b["id"] for b, _ in vector_ranked} + + # Compute RRF: rrf(block) = Σ_ranker 1/(k + rank) + rrf: dict[str, float] = {} + for rank, (block, _cosine) in enumerate(vector_ranked): + rrf[block["id"]] = rrf.get(block["id"], 0.0) + 1.0 / (k + rank + 1) + for rank, (block, _score) in enumerate(bm25_with_signal): + rrf[block["id"]] = rrf.get(block["id"], 0.0) + 1.0 / (k + rank + 1) + + # Normalize so the top-ranked block gets 1.0 + max_rrf = max(rrf.values()) if rrf else 1.0 + rrf_norm = {bid: s / max_rrf for bid, s in rrf.items()} + + # Build deduplicated union with RRF scores as similarity + fused: list[tuple[dict[str, Any], float]] = [] + seen: set[str] = set() + + for block, _cosine in vector_ranked: + bid = block["id"] + fused.append((block, rrf_norm.get(bid, 0.0))) + seen.add(bid) + + for block, _score in bm25_with_signal: + bid = block["id"] + if bid not in seen: + fused.append((block, rrf_norm.get(bid, 0.0))) + seen.add(bid) + + return fused, seen + + async def _stage_3_graph_expand( conn: AsyncConnection, seed_ids: list[str],