diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e81235..b6e816e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ elfmem uses [Semantic Versioning](https://semver.org/). - **`MemorySystem.learn_document(text, chunk_size, chunker, skip_llm)`:** Ingest a document in one call — chunks text, learns each chunk, auto-consolidates via `dream()` at `inbox_threshold` intervals. Accepts an optional `chunker` callback (e.g. `nltk.sent_tokenize`); default splits at sentence boundaries. Returns `LearnDocumentResult` with chunk and consolidation counts. - **`LearnDocumentResult` type:** New result type with `chunks_total`, `chunks_created`, `chunks_duplicate`, `consolidations`, `blocks_promoted`. Exported from `elfmem`. - **BM25 keyword search in retrieval pipeline (stage 2b):** `hybrid_retrieve()` now runs BM25 in parallel with vector search, discovering blocks with strong keyword overlap that embedding similarity misses. Soft dependency on `rank_bm25` — when not installed, the stage is silently skipped (zero regression). Install via `pip install elfmem[bm25]`. +- **Reciprocal Rank Fusion (stage 2c):** When both vector and BM25 produce results, `hybrid_retrieve()` merges their ranked lists via RRF (k=60, Cormack et al. 2009). Blocks found by both rankers score higher; BM25-only blocks receive proportional relevance scores instead of the previous `similarity=0.0`. Falls back to raw cosine when BM25 is absent. - **`dream(skip_llm, skip_contradictions)` parameters:** `dream()` now forwards `skip_llm` and `skip_contradictions` to `consolidate()`, enabling fast-path consolidation without bypassing policy tracking or threshold persistence. - **`MABenchConfig.context_window_tokens`:** New config field (default 4096) representing the LM Studio model's context window. All answer-context truncation derives from this value; set to 2048 for smaller models. diff --git a/src/elfmem/memory/retrieval.py b/src/elfmem/memory/retrieval.py index f266e81..03bf897 100644 --- a/src/elfmem/memory/retrieval.py +++ b/src/elfmem/memory/retrieval.py @@ -1,8 +1,13 @@ """6-stage hybrid retrieval pipeline — pure (no side effects). -Stages: pre-filter → vector → BM25 → graph expand → composite score → MMR. +Stages: pre-filter → vector → BM25 → RRF fusion → graph expand → composite score → MMR. BM25 (stage 2b) requires the optional ``rank_bm25`` package. When not installed, the stage is silently skipped and retrieval works as a 5-stage vector-only pipeline. + +When both vector and BM25 produce results, Reciprocal Rank Fusion (RRF) merges +the two ranked lists into a single relevance score per block. Blocks found by +both rankers score higher than blocks found by one. When BM25 is absent, the +pipeline falls back to raw cosine similarity — zero behavioral change. """ from __future__ import annotations @@ -40,6 +45,9 @@ DEFAULT_SEARCH_WINDOW_HOURS = 200.0 # MMR diversity coefficient: 1.0 = pure relevance, 0.0 = pure diversity MMR_DIVERSITY_LAMBDA = 0.7 +# RRF damping constant (Cormack et al. 2009). Higher k dampens the influence +# of top-ranked items, producing smoother score distributions. +RRF_K = 60 async def hybrid_retrieve( @@ -53,11 +61,12 @@ async def hybrid_retrieve( tag_filter: str | None = None, search_window_hours: float = DEFAULT_SEARCH_WINDOW_HOURS, ) -> list[ScoredBlock]: - """Execute the 6-stage hybrid retrieval pipeline. + """Execute the 7-stage hybrid retrieval pipeline. Stage 1 — Pre-filter: active blocks within search window. Stage 2 — Vector search: cosine similarity → top N_seeds. (Skipped if no query.) Stage 2b — BM25 keyword search: term overlap → top N_bm25. (Requires rank_bm25.) + Stage 2c — RRF fusion: merge vector + BM25 into unified relevance scores. Stage 3 — Graph expand: 1-hop neighbours of seeds. (Skipped if no query.) Stage 4 — Composite score: rank all candidates. Stage 5 — MMR diversity: reorder for relevance + diversity. (Query-aware only.) @@ -82,16 +91,16 @@ async def hybrid_retrieve( n_seeds=top_k * N_SEEDS_MULTIPLIER, ) - # Stage 2b: BM25 keyword candidates (additive — discovers blocks - # that vector search missed due to vocabulary mismatch). - seed_ids_set = {b["id"] for b, _ in seed_pairs} + # Stage 2b: BM25 keyword candidates (discovers blocks that vector + # search missed due to vocabulary mismatch). bm25_pairs = _stage_2b_bm25_search( candidates, query, n_seeds=top_k * N_SEEDS_MULTIPLIER, ) - for block, _bm25_score in bm25_pairs: - if block["id"] not in seed_ids_set: - seed_pairs.append((block, 0.0)) - seed_ids_set.add(block["id"]) + + # Stage 2c: Fuse vector + BM25 via Reciprocal Rank Fusion. + # When BM25 has signal, RRF produces the similarity score. + # When BM25 is empty, raw cosine similarity is preserved. + seed_pairs, seed_ids_set = _fuse_candidates(seed_pairs, bm25_pairs) seed_ids = [b["id"] for b, _ in seed_pairs] expanded = await _stage_3_graph_expand( @@ -204,6 +213,58 @@ def _stage_2b_bm25_search( return ranked[:n_seeds] +def _fuse_candidates( + vector_ranked: list[tuple[dict[str, Any], float]], + bm25_ranked: list[tuple[dict[str, Any], float]], + k: int = RRF_K, +) -> tuple[list[tuple[dict[str, Any], float]], set[str]]: + """Fuse vector and BM25 candidate lists via Reciprocal Rank Fusion. + + When BM25 produces results with positive scores, both ranked lists are + merged using RRF (Cormack et al. 2009) to produce a single relevance + score per block in [0.0, 1.0]. Blocks found by both rankers receive + contributions from both, scoring higher than blocks found by one. + + When BM25 is empty or all scores are zero, returns vector results with + their original cosine similarity — zero behavioral change for users + without ``rank_bm25`` installed. + + Returns ``(seed_pairs, seed_id_set)`` ready for graph expansion. + """ + bm25_with_signal = [(b, s) for b, s in bm25_ranked if s > 0] + + if not bm25_with_signal: + return list(vector_ranked), {b["id"] for b, _ in vector_ranked} + + # Compute RRF: rrf(block) = Σ_ranker 1/(k + rank) + rrf: dict[str, float] = {} + for rank, (block, _cosine) in enumerate(vector_ranked): + rrf[block["id"]] = rrf.get(block["id"], 0.0) + 1.0 / (k + rank + 1) + for rank, (block, _score) in enumerate(bm25_with_signal): + rrf[block["id"]] = rrf.get(block["id"], 0.0) + 1.0 / (k + rank + 1) + + # Normalize so the top-ranked block gets 1.0 + max_rrf = max(rrf.values()) if rrf else 1.0 + rrf_norm = {bid: s / max_rrf for bid, s in rrf.items()} + + # Build deduplicated union with RRF scores as similarity + fused: list[tuple[dict[str, Any], float]] = [] + seen: set[str] = set() + + for block, _cosine in vector_ranked: + bid = block["id"] + fused.append((block, rrf_norm.get(bid, 0.0))) + seen.add(bid) + + for block, _score in bm25_with_signal: + bid = block["id"] + if bid not in seen: + fused.append((block, rrf_norm.get(bid, 0.0))) + seen.add(bid) + + return fused, seen + + async def _stage_3_graph_expand( conn: AsyncConnection, seed_ids: list[str],