Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ elfmem uses [Semantic Versioning](https://semver.org/).
- **`MemorySystem.learn_document(text, chunk_size, chunker, skip_llm)`:** Ingest a document in one call — chunks text, learns each chunk, auto-consolidates via `dream()` at `inbox_threshold` intervals. Accepts an optional `chunker` callback (e.g. `nltk.sent_tokenize`); default splits at sentence boundaries. Returns `LearnDocumentResult` with chunk and consolidation counts.
- **`LearnDocumentResult` type:** New result type with `chunks_total`, `chunks_created`, `chunks_duplicate`, `consolidations`, `blocks_promoted`. Exported from `elfmem`.
- **BM25 keyword search in retrieval pipeline (stage 2b):** `hybrid_retrieve()` now runs BM25 in parallel with vector search, discovering blocks with strong keyword overlap that embedding similarity misses. Soft dependency on `rank_bm25` — when not installed, the stage is silently skipped (zero regression). Install via `pip install elfmem[bm25]`.
- **Reciprocal Rank Fusion (stage 2c):** When both vector and BM25 produce results, `hybrid_retrieve()` merges their ranked lists via RRF (k=60, Cormack et al. 2009). Blocks found by both rankers score higher; BM25-only blocks receive proportional relevance scores instead of the previous `similarity=0.0`. Falls back to raw cosine when BM25 is absent.
- **`dream(skip_llm, skip_contradictions)` parameters:** `dream()` now forwards `skip_llm` and `skip_contradictions` to `consolidate()`, enabling fast-path consolidation without bypassing policy tracking or threshold persistence.
- **`MABenchConfig.context_window_tokens`:** New config field (default 4096) representing the LM Studio model's context window. All answer-context truncation derives from this value; set to 2048 for smaller models.

Expand Down
79 changes: 70 additions & 9 deletions src/elfmem/memory/retrieval.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
"""6-stage hybrid retrieval pipeline — pure (no side effects).

Stages: pre-filter → vector → BM25 → graph expand → composite score → MMR.
Stages: pre-filter → vector → BM25 → RRF fusion → graph expand → composite score → MMR.
BM25 (stage 2b) requires the optional ``rank_bm25`` package. When not installed,
the stage is silently skipped and retrieval works as a 5-stage vector-only pipeline.

When both vector and BM25 produce results, Reciprocal Rank Fusion (RRF) merges
the two ranked lists into a single relevance score per block. Blocks found by
both rankers score higher than blocks found by one. When BM25 is absent, the
pipeline falls back to raw cosine similarity — zero behavioral change.
"""

from __future__ import annotations
Expand Down Expand Up @@ -40,6 +45,9 @@
DEFAULT_SEARCH_WINDOW_HOURS = 200.0
# MMR diversity coefficient: 1.0 = pure relevance, 0.0 = pure diversity
MMR_DIVERSITY_LAMBDA = 0.7
# RRF damping constant (Cormack et al. 2009). Higher k dampens the influence
# of top-ranked items, producing smoother score distributions.
RRF_K = 60


async def hybrid_retrieve(
Expand All @@ -53,11 +61,12 @@ async def hybrid_retrieve(
tag_filter: str | None = None,
search_window_hours: float = DEFAULT_SEARCH_WINDOW_HOURS,
) -> list[ScoredBlock]:
"""Execute the 6-stage hybrid retrieval pipeline.
"""Execute the 7-stage hybrid retrieval pipeline.

Stage 1 — Pre-filter: active blocks within search window.
Stage 2 — Vector search: cosine similarity → top N_seeds. (Skipped if no query.)
Stage 2b — BM25 keyword search: term overlap → top N_bm25. (Requires rank_bm25.)
Stage 2c — RRF fusion: merge vector + BM25 into unified relevance scores.
Stage 3 — Graph expand: 1-hop neighbours of seeds. (Skipped if no query.)
Stage 4 — Composite score: rank all candidates.
Stage 5 — MMR diversity: reorder for relevance + diversity. (Query-aware only.)
Expand All @@ -82,16 +91,16 @@ async def hybrid_retrieve(
n_seeds=top_k * N_SEEDS_MULTIPLIER,
)

# Stage 2b: BM25 keyword candidates (additive — discovers blocks
# that vector search missed due to vocabulary mismatch).
seed_ids_set = {b["id"] for b, _ in seed_pairs}
# Stage 2b: BM25 keyword candidates (discovers blocks that vector
# search missed due to vocabulary mismatch).
bm25_pairs = _stage_2b_bm25_search(
candidates, query, n_seeds=top_k * N_SEEDS_MULTIPLIER,
)
for block, _bm25_score in bm25_pairs:
if block["id"] not in seed_ids_set:
seed_pairs.append((block, 0.0))
seed_ids_set.add(block["id"])

# Stage 2c: Fuse vector + BM25 via Reciprocal Rank Fusion.
# When BM25 has signal, RRF produces the similarity score.
# When BM25 is empty, raw cosine similarity is preserved.
seed_pairs, seed_ids_set = _fuse_candidates(seed_pairs, bm25_pairs)

seed_ids = [b["id"] for b, _ in seed_pairs]
expanded = await _stage_3_graph_expand(
Expand Down Expand Up @@ -204,6 +213,58 @@ def _stage_2b_bm25_search(
return ranked[:n_seeds]


def _fuse_candidates(
vector_ranked: list[tuple[dict[str, Any], float]],
bm25_ranked: list[tuple[dict[str, Any], float]],
k: int = RRF_K,
) -> tuple[list[tuple[dict[str, Any], float]], set[str]]:
"""Fuse vector and BM25 candidate lists via Reciprocal Rank Fusion.

When BM25 produces results with positive scores, both ranked lists are
merged using RRF (Cormack et al. 2009) to produce a single relevance
score per block in [0.0, 1.0]. Blocks found by both rankers receive
contributions from both, scoring higher than blocks found by one.

When BM25 is empty or all scores are zero, returns vector results with
their original cosine similarity — zero behavioral change for users
without ``rank_bm25`` installed.

Returns ``(seed_pairs, seed_id_set)`` ready for graph expansion.
"""
bm25_with_signal = [(b, s) for b, s in bm25_ranked if s > 0]

if not bm25_with_signal:
return list(vector_ranked), {b["id"] for b, _ in vector_ranked}

# Compute RRF: rrf(block) = Σ_ranker 1/(k + rank)
rrf: dict[str, float] = {}
for rank, (block, _cosine) in enumerate(vector_ranked):
rrf[block["id"]] = rrf.get(block["id"], 0.0) + 1.0 / (k + rank + 1)
for rank, (block, _score) in enumerate(bm25_with_signal):
rrf[block["id"]] = rrf.get(block["id"], 0.0) + 1.0 / (k + rank + 1)

# Normalize so the top-ranked block gets 1.0
max_rrf = max(rrf.values()) if rrf else 1.0
rrf_norm = {bid: s / max_rrf for bid, s in rrf.items()}

# Build deduplicated union with RRF scores as similarity
fused: list[tuple[dict[str, Any], float]] = []
seen: set[str] = set()

for block, _cosine in vector_ranked:
bid = block["id"]
fused.append((block, rrf_norm.get(bid, 0.0)))
seen.add(bid)

for block, _score in bm25_with_signal:
bid = block["id"]
if bid not in seen:
fused.append((block, rrf_norm.get(bid, 0.0)))
seen.add(bid)

return fused, seen


async def _stage_3_graph_expand(
conn: AsyncConnection,
seed_ids: list[str],
Expand Down
Loading