From f45435e6b97ff8d4b10080f51ef0b66f042c9274 Mon Sep 17 00:00:00 2001 From: Greg von Nessi Date: Fri, 13 Mar 2026 18:07:10 +0000 Subject: [PATCH 1/2] =?UTF-8?q?Break=20searchContext=20into=20pipeline=20s?= =?UTF-8?q?tages=20(440=20=E2=86=92=20120=20lines)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract 6 functions from the monolithic searchContext: - filterByAgent: eliminates 4x duplicated agent post-filtering - applyEntityBoost: shared entity boost RRF fusion - keywordPrimarySearch: keyword-first retrieval path - indexBasedSearch: hybrid retrieval via semantic index - chunkBasedSearch: hybrid fallback via chunk vectors - postProcessResults: shared post-processing pipeline searchContext is now a clear orchestrator: pick path → fuse → post-process. --- src/retrieval/search-assembler.ts | 757 +++++++++++++++++------------- 1 file changed, 443 insertions(+), 314 deletions(-) diff --git a/src/retrieval/search-assembler.ts b/src/retrieval/search-assembler.ts index b0a6f51..d5fb08f 100644 --- a/src/retrieval/search-assembler.ts +++ b/src/retrieval/search-assembler.ts @@ -27,6 +27,7 @@ import { extractEntities } from '../utils/entity-extractor.js'; import { findEntitiesByAlias, getChunkIdsForEntity } from '../storage/entity-store.js'; import { createLogger } from '../utils/logger.js'; import { formatSearchChunk } from './formatting.js'; +import type { MemoryConfig } from '../config/memory-config.js'; const log = createLogger('search-assembler'); @@ -113,6 +114,24 @@ function getKeywordStore(): KeywordStore { return sharedKeywordStore; } +// ── Extracted pipeline stages ──────────────────────────────────────────────── + +/** + * Filter items by agent when agent filtering is active but project filtering is not. + * + * When projectFilter is set, agent filtering is handled by the storage layer. + * This function handles the post-filter case where no project scope was provided. + */ +function filterByAgent( + items: T[], + agentFilter: string | undefined, + projectFilter: string | string[] | undefined, + getAgent: (id: string) => string | null | undefined, +): T[] { + if (!agentFilter || projectFilter) return items; + return items.filter((item) => getAgent(item.id) === agentFilter); +} + /** * Extract entity mentions from the query and find matching chunks. * Returns ranked items suitable for RRF fusion. @@ -142,360 +161,335 @@ function getEntityResults(query: string, projectFilter?: string | string[]): Ran } /** - * Run the search pipeline. - * - * Keyword-primary mode: keyword → [optional vector enrichment] → recency → MMR → budget - * Hybrid mode: embed → [vector, keyword] → RRF → cluster expand → recency → MMR → budget + * Merge entity-boosted results into fused results via RRF. */ -export async function searchContext(request: SearchRequest): Promise { - const startTime = Date.now(); - const externalConfig = loadConfig(); - const config = toRuntimeConfig(externalConfig); - - const { - query, - currentSessionId, - projectFilter, - maxTokens = config.mcpMaxResponseTokens, - vectorSearchLimit = 20, - skipClusters = false, - agentFilter, - } = request; +function applyEntityBoost( + fusedResults: RankedItem[], + query: string, + projectFilter: string | string[] | undefined, + rrfK: number, +): RankedItem[] { + try { + const entityItems = getEntityResults(query, projectFilter); + if (entityItems.length > 0) { + return fuseRRF( + [ + { items: fusedResults, weight: 1.0 }, + { items: entityItems, weight: ENTITY_RRF_BOOST }, + ], + rrfK, + ); + } + } catch (error) { + log.warn('Entity search failed', { error: (error as Error).message }); + } + return fusedResults; +} - const { hybridSearch, clusterExpansion, mmrReranking, embeddingModel } = config; - const retrievalMode = config.retrievalPrimary; +/** Return type for retrieval path functions. null signals empty results (early return). */ +interface RetrievalResult { + fusedResults: RankedItem[]; + queryEmbedding: number[]; + useIndexSearch: boolean; +} - let fusedResults: RankedItem[]; +/** + * Keyword-primary retrieval path. + * + * keyword → [optional vector enrichment] → entity boost + * No cluster expansion. + */ +async function keywordPrimarySearch( + query: string, + projectFilter: string | string[] | undefined, + agentFilter: string | undefined, + vectorSearchLimit: number, + config: MemoryConfig, +): Promise { + const { hybridSearch, embeddingModel } = config; let queryEmbedding: number[] = []; - let useIndexSearch = false; - if (retrievalMode === 'keyword') { - // ── Keyword-primary search path ────────────────────────────────────── - // No embedding needed unless vector enrichment is enabled. + let keywordResults: Array<{ id: string; score: number }> = []; + try { + const keywordStore = getKeywordStore(); + keywordResults = projectFilter + ? keywordStore.searchByProject( + query, + projectFilter, + hybridSearch.keywordSearchLimit, + agentFilter, + ) + : keywordStore.search(query, hybridSearch.keywordSearchLimit); + } catch (error) { + log.warn('Keyword search failed', { error: (error as Error).message }); + } + + // Post-filter by agent when no project filter was used + keywordResults = filterByAgent(keywordResults, agentFilter, projectFilter, (id) => { + const chunk = getChunkById(id); + return chunk?.agentId; + }); + + let fusedResults: RankedItem[] = keywordResults.map((r) => ({ + chunkId: r.id, + score: r.score, + source: 'keyword' as const, + })); - let keywordResults: Array<{ id: string; score: number }> = []; + // Optional vector enrichment: merge vector results via RRF + if (config.vectorEnrichment) { try { - const keywordStore = getKeywordStore(); - keywordResults = projectFilter - ? keywordStore.searchByProject( - query, + vectorStore.setModelId(embeddingModel); + const embedder = await getEmbedder(embeddingModel); + const queryResult = await embedder.embed(query, true); + queryEmbedding = queryResult.embedding; + + let vectorResults = await (projectFilter + ? vectorStore.searchByProject( + queryResult.embedding, projectFilter, - hybridSearch.keywordSearchLimit, + vectorSearchLimit, agentFilter, ) - : keywordStore.search(query, hybridSearch.keywordSearchLimit); - } catch (error) { - log.warn('Keyword search failed', { error: (error as Error).message }); - } + : vectorStore.search(queryResult.embedding, vectorSearchLimit)); - // Post-filter by agent when no project filter was used - if (agentFilter && !projectFilter) { - keywordResults = keywordResults.filter((r) => { - const chunk = getChunkById(r.id); - return chunk?.agentId === agentFilter; + vectorResults = filterByAgent(vectorResults, agentFilter, projectFilter, (id) => { + const chunk = getChunkById(id); + return chunk?.agentId; }); - } - fusedResults = keywordResults.map((r) => ({ - chunkId: r.id, - score: r.score, - source: 'keyword' as const, - })); - - // Optional vector enrichment: merge vector results via RRF - if (config.vectorEnrichment) { - try { - vectorStore.setModelId(embeddingModel); - const embedder = await getEmbedder(embeddingModel); - const queryResult = await embedder.embed(query, true); - queryEmbedding = queryResult.embedding; - - let vectorResults = await (projectFilter - ? vectorStore.searchByProject( - queryResult.embedding, - projectFilter, - vectorSearchLimit, - agentFilter, - ) - : vectorStore.search(queryResult.embedding, vectorSearchLimit)); - - if (agentFilter && !projectFilter) { - vectorResults = vectorResults.filter((s) => { - const chunk = getChunkById(s.id); - return chunk?.agentId === agentFilter; - }); - } - - if (vectorResults.length > 0) { - const vectorItems: RankedItem[] = vectorResults.map((s) => ({ - chunkId: s.id, - score: Math.max(0, 1 - s.distance), - source: 'vector' as const, - })); - - fusedResults = fuseRRF( - [ - { items: fusedResults, weight: hybridSearch.keywordWeight }, - { items: vectorItems, weight: hybridSearch.vectorWeight }, - ], - hybridSearch.rrfK, - ); - } - } catch (error) { - log.warn('Vector enrichment failed, using keyword results only', { - error: (error as Error).message, - }); - } - } + if (vectorResults.length > 0) { + const vectorItems: RankedItem[] = vectorResults.map((s) => ({ + chunkId: s.id, + score: Math.max(0, 1 - s.distance), + source: 'vector' as const, + })); - // Entity boost: merge entity-matched chunks via RRF - try { - const entityItems = getEntityResults(query, projectFilter); - if (entityItems.length > 0) { fusedResults = fuseRRF( [ - { items: fusedResults, weight: 1.0 }, - { items: entityItems, weight: ENTITY_RRF_BOOST }, + { items: fusedResults, weight: hybridSearch.keywordWeight }, + { items: vectorItems, weight: hybridSearch.vectorWeight }, ], hybridSearch.rrfK, ); } } catch (error) { - log.warn('Entity search failed', { error: (error as Error).message }); - } - - if (fusedResults.length === 0) { - return { - text: '', - tokenCount: 0, - chunks: [], - totalConsidered: 0, - durationMs: Date.now() - startTime, - queryEmbedding, - seedIds: [], - }; + log.warn('Vector enrichment failed, using keyword results only', { + error: (error as Error).message, + }); } + } - // Skip cluster expansion for keyword-primary mode - } else { - // ── Hybrid/vector search path ──────────────────────────────────────── - // Configure vector store for current model - vectorStore.setModelId(embeddingModel); - - // 1. Embed query - const embedder = await getEmbedder(embeddingModel); - const queryResult = await embedder.embed(query, true); - queryEmbedding = queryResult.embedding; - - // Determine whether to use index-based search - useIndexSearch = config.semanticIndex.useForSearch && getIndexEntryCount() > 0; - - if (useIndexSearch) { - // ── Index-based search path ────────────────────────────────────── - indexVectorStore.setModelId(embeddingModel); - - const entryCount = getIndexEntryCount(); - const indexedChunks = getIndexedChunkCount(); - const entriesPerChunk = indexedChunks > 0 ? entryCount / indexedChunks : 1; - const indexSearchLimit = Math.ceil(vectorSearchLimit * entriesPerChunk); + // Entity boost + fusedResults = applyEntityBoost(fusedResults, query, projectFilter, hybridSearch.rrfK); - const indexVectorPromise = projectFilter - ? indexVectorStore.searchByProject( - queryResult.embedding, - projectFilter, - indexSearchLimit, - agentFilter, - ) - : indexVectorStore.search(queryResult.embedding, indexSearchLimit); + if (fusedResults.length === 0) { + return null; + } - let indexKeywordResults: Array<{ id: string; score: number }> = []; - try { - indexKeywordResults = searchIndexEntriesByKeyword( - query, - hybridSearch.keywordSearchLimit, - projectFilter, - agentFilter, - ); - } catch (error) { - log.warn('Index keyword search unavailable', { - error: (error as Error).message, - }); - } + return { fusedResults, queryEmbedding, useIndexSearch: false }; +} - let indexSimilar = await indexVectorPromise; +/** + * Index-based hybrid retrieval path. + * + * Uses semantic index entries (vector + keyword) → RRF → dereference to chunks. + */ +async function indexBasedSearch( + queryEmbedding: number[], + query: string, + projectFilter: string | string[] | undefined, + agentFilter: string | undefined, + vectorSearchLimit: number, + config: MemoryConfig, +): Promise { + const { hybridSearch, embeddingModel } = config; + + indexVectorStore.setModelId(embeddingModel); + + const entryCount = getIndexEntryCount(); + const indexedChunks = getIndexedChunkCount(); + const entriesPerChunk = indexedChunks > 0 ? entryCount / indexedChunks : 1; + const indexSearchLimit = Math.ceil(vectorSearchLimit * entriesPerChunk); + + const indexVectorPromise = projectFilter + ? indexVectorStore.searchByProject(queryEmbedding, projectFilter, indexSearchLimit, agentFilter) + : indexVectorStore.search(queryEmbedding, indexSearchLimit); + + let indexKeywordResults: Array<{ id: string; score: number }> = []; + try { + indexKeywordResults = searchIndexEntriesByKeyword( + query, + hybridSearch.keywordSearchLimit, + projectFilter, + agentFilter, + ); + } catch (error) { + log.warn('Index keyword search unavailable', { + error: (error as Error).message, + }); + } - if (agentFilter && !projectFilter) { - indexSimilar = indexSimilar.filter((s) => { - const agent = indexVectorStore.getChunkAgent(s.id); - return agent === agentFilter; - }); - indexKeywordResults = indexKeywordResults.filter((r) => { - const agent = indexVectorStore.getChunkAgent(r.id); - return agent === agentFilter; - }); - } + let indexSimilar = await indexVectorPromise; - if (indexSimilar.length === 0 && indexKeywordResults.length === 0) { - return { - text: '', - tokenCount: 0, - chunks: [], - totalConsidered: 0, - durationMs: Date.now() - startTime, - queryEmbedding, - seedIds: [], - }; - } + indexSimilar = filterByAgent(indexSimilar, agentFilter, projectFilter, (id) => + indexVectorStore.getChunkAgent(id), + ); + indexKeywordResults = filterByAgent(indexKeywordResults, agentFilter, projectFilter, (id) => + indexVectorStore.getChunkAgent(id), + ); - const indexVectorItems: RankedItem[] = indexSimilar.map((s) => ({ - chunkId: s.id, - score: Math.max(0, 1 - s.distance), - source: 'vector' as const, - })); + if (indexSimilar.length === 0 && indexKeywordResults.length === 0) { + return null; + } - const indexKeywordItems: RankedItem[] = indexKeywordResults.map((r) => ({ - chunkId: r.id, - score: r.score, - source: 'keyword' as const, - })); + const indexVectorItems: RankedItem[] = indexSimilar.map((s) => ({ + chunkId: s.id, + score: Math.max(0, 1 - s.distance), + source: 'vector' as const, + })); - const indexFused = fuseRRF( - [ - { items: indexVectorItems, weight: hybridSearch.vectorWeight }, - ...(indexKeywordItems.length > 0 - ? [{ items: indexKeywordItems, weight: hybridSearch.keywordWeight }] - : []), - ], - hybridSearch.rrfK, - ); + const indexKeywordItems: RankedItem[] = indexKeywordResults.map((r) => ({ + chunkId: r.id, + score: r.score, + source: 'keyword' as const, + })); - const indexEntryIds = indexFused.map((r) => r.chunkId); - const chunkIds = dereferenceToChunkIds(indexEntryIds); - - const chunkScoreMap = new Map(); - for (const item of indexFused) { - const entryChunkIds = dereferenceToChunkIds([item.chunkId]); - for (const cid of entryChunkIds) { - const existing = chunkScoreMap.get(cid); - if (!existing || item.score > existing.score) { - chunkScoreMap.set(cid, { score: item.score, source: item.source }); - } - } + const indexFused = fuseRRF( + [ + { items: indexVectorItems, weight: hybridSearch.vectorWeight }, + ...(indexKeywordItems.length > 0 + ? [{ items: indexKeywordItems, weight: hybridSearch.keywordWeight }] + : []), + ], + hybridSearch.rrfK, + ); + + const indexEntryIds = indexFused.map((r) => r.chunkId); + const chunkIds = dereferenceToChunkIds(indexEntryIds); + + const chunkScoreMap = new Map(); + for (const item of indexFused) { + const entryChunkIds = dereferenceToChunkIds([item.chunkId]); + for (const cid of entryChunkIds) { + const existing = chunkScoreMap.get(cid); + if (!existing || item.score > existing.score) { + chunkScoreMap.set(cid, { score: item.score, source: item.source }); } + } + } - fusedResults = chunkIds.map((cid) => { - const entry = chunkScoreMap.get(cid); - return { - chunkId: cid, - score: entry?.score ?? 0, - source: entry?.source, - }; - }); - } else { - // ── Chunk-based search path (fallback) ───────────────────────────── - const vectorSearchPromise = projectFilter - ? vectorStore.searchByProject( - queryResult.embedding, - projectFilter, - vectorSearchLimit, - agentFilter, - ) - : vectorStore.search(queryResult.embedding, vectorSearchLimit); - - let keywordResults: Array<{ id: string; score: number }> = []; - try { - const keywordStore = getKeywordStore(); - keywordResults = projectFilter - ? keywordStore.searchByProject( - query, - projectFilter, - hybridSearch.keywordSearchLimit, - agentFilter, - ) - : keywordStore.search(query, hybridSearch.keywordSearchLimit); - } catch (error) { - log.warn('Keyword search unavailable, falling back to vector-only', { - error: (error as Error).message, - }); - } + const fusedResults: RankedItem[] = chunkIds.map((cid) => { + const entry = chunkScoreMap.get(cid); + return { + chunkId: cid, + score: entry?.score ?? 0, + source: entry?.source, + }; + }); - let similar = await vectorSearchPromise; + return { fusedResults, queryEmbedding, useIndexSearch: true }; +} - if (agentFilter && !projectFilter) { - similar = similar.filter((s) => { - const chunk = getChunkById(s.id); - return chunk?.agentId === agentFilter; - }); - keywordResults = keywordResults.filter((r) => { - const chunk = getChunkById(r.id); - return chunk?.agentId === agentFilter; - }); - } +/** + * Chunk-based hybrid retrieval path (fallback when no semantic index). + * + * vector + keyword → RRF + */ +async function chunkBasedSearch( + queryEmbedding: number[], + query: string, + projectFilter: string | string[] | undefined, + agentFilter: string | undefined, + vectorSearchLimit: number, + config: MemoryConfig, +): Promise { + const { hybridSearch } = config; + + const vectorSearchPromise = projectFilter + ? vectorStore.searchByProject(queryEmbedding, projectFilter, vectorSearchLimit, agentFilter) + : vectorStore.search(queryEmbedding, vectorSearchLimit); + + let keywordResults: Array<{ id: string; score: number }> = []; + try { + const keywordStore = getKeywordStore(); + keywordResults = projectFilter + ? keywordStore.searchByProject( + query, + projectFilter, + hybridSearch.keywordSearchLimit, + agentFilter, + ) + : keywordStore.search(query, hybridSearch.keywordSearchLimit); + } catch (error) { + log.warn('Keyword search unavailable, falling back to vector-only', { + error: (error as Error).message, + }); + } - if (similar.length === 0 && keywordResults.length === 0) { - return { - text: '', - tokenCount: 0, - chunks: [], - totalConsidered: 0, - durationMs: Date.now() - startTime, - queryEmbedding, - seedIds: [], - }; - } + let similar = await vectorSearchPromise; - const vectorItems: RankedItem[] = similar.map((s) => ({ - chunkId: s.id, - score: Math.max(0, 1 - s.distance), - source: 'vector' as const, - })); + similar = filterByAgent(similar, agentFilter, projectFilter, (id) => { + const chunk = getChunkById(id); + return chunk?.agentId; + }); + keywordResults = filterByAgent(keywordResults, agentFilter, projectFilter, (id) => { + const chunk = getChunkById(id); + return chunk?.agentId; + }); - const keywordItems: RankedItem[] = keywordResults.map((r) => ({ - chunkId: r.id, - score: r.score, - source: 'keyword' as const, - })); + if (similar.length === 0 && keywordResults.length === 0) { + return null; + } - fusedResults = fuseRRF( - [ - { items: vectorItems, weight: hybridSearch.vectorWeight }, - ...(keywordItems.length > 0 - ? [{ items: keywordItems, weight: hybridSearch.keywordWeight }] - : []), - ], - hybridSearch.rrfK, - ); - } + const vectorItems: RankedItem[] = similar.map((s) => ({ + chunkId: s.id, + score: Math.max(0, 1 - s.distance), + source: 'vector' as const, + })); - // Entity boost (hybrid/vector path) - try { - const entityItems = getEntityResults(query, projectFilter); - if (entityItems.length > 0) { - fusedResults = fuseRRF( - [ - { items: fusedResults, weight: 1.0 }, - { items: entityItems, weight: ENTITY_RRF_BOOST }, - ], - hybridSearch.rrfK, - ); - } - } catch (error) { - log.warn('Entity search failed', { error: (error as Error).message }); - } + const keywordItems: RankedItem[] = keywordResults.map((r) => ({ + chunkId: r.id, + score: r.score, + source: 'keyword' as const, + })); - // Cluster expansion (hybrid/vector path only) - if (!skipClusters) { - fusedResults = expandViaClusters( - fusedResults, - clusterExpansion, - projectFilter, - agentFilter, - config.feedbackWeight, - ); - } - } + const fusedResults = fuseRRF( + [ + { items: vectorItems, weight: hybridSearch.vectorWeight }, + ...(keywordItems.length > 0 + ? [{ items: keywordItems, weight: hybridSearch.keywordWeight }] + : []), + ], + hybridSearch.rrfK, + ); + + return { fusedResults, queryEmbedding, useIndexSearch: false }; +} - // ── Shared post-processing ─────────────────────────────────────────── +/** + * Shared post-processing pipeline that all retrieval paths converge on. + * + * source tracking → seed extraction → dedupe → recency boost + length penalty → + * size bounding → MMR reranking → score normalization → budget assembly + */ +async function postProcessResults( + fusedResults: RankedItem[], + opts: { + queryEmbedding: number[]; + maxTokens: number; + currentSessionId?: string; + config: MemoryConfig; + useIndexSearch: boolean; + }, +): Promise<{ + text: string; + tokenCount: number; + chunks: SearchResponse['chunks']; + totalConsidered: number; + seedIds: string[]; +}> { + const { queryEmbedding, maxTokens, currentSessionId, config, useIndexSearch } = opts; // Track sources type ChunkSource = 'vector' | 'keyword' | 'cluster' | 'entity'; @@ -557,7 +551,7 @@ export async function searchContext(request: SearchRequest): Promise { + const startTime = Date.now(); + const externalConfig = loadConfig(); + const config = toRuntimeConfig(externalConfig); + + const { + query, + currentSessionId, + projectFilter, + maxTokens = config.mcpMaxResponseTokens, + vectorSearchLimit = 20, + skipClusters = false, + agentFilter, + } = request; + + const { embeddingModel } = config; + const retrievalMode = config.retrievalPrimary; + + const emptyResponse: SearchResponse = { + text: '', + tokenCount: 0, + chunks: [], + totalConsidered: 0, + durationMs: Date.now() - startTime, + queryEmbedding: [], + seedIds: [], + }; + + let result: RetrievalResult | null; + + if (retrievalMode === 'keyword') { + // ── Keyword-primary search path ────────────────────────────────────── + result = await keywordPrimarySearch( + query, + projectFilter, + agentFilter, + vectorSearchLimit, + config, + ); + + if (!result) { + emptyResponse.durationMs = Date.now() - startTime; + return emptyResponse; + } + + // Skip cluster expansion for keyword-primary mode + } else { + // ── Hybrid/vector search path ──────────────────────────────────────── + // Configure vector store for current model + vectorStore.setModelId(embeddingModel); + + // 1. Embed query + const embedder = await getEmbedder(embeddingModel); + const queryResult = await embedder.embed(query, true); + const queryEmbedding = queryResult.embedding; + + // Determine whether to use index-based search + const useIndexSearch = config.semanticIndex.useForSearch && getIndexEntryCount() > 0; + + if (useIndexSearch) { + result = await indexBasedSearch( + queryEmbedding, + query, + projectFilter, + agentFilter, + vectorSearchLimit, + config, + ); + } else { + result = await chunkBasedSearch( + queryEmbedding, + query, + projectFilter, + agentFilter, + vectorSearchLimit, + config, + ); + } + + if (!result) { + emptyResponse.queryEmbedding = queryEmbedding; + emptyResponse.durationMs = Date.now() - startTime; + return emptyResponse; + } + + // Entity boost (hybrid/vector path) + result.fusedResults = applyEntityBoost( + result.fusedResults, + query, + projectFilter, + config.hybridSearch.rrfK, + ); + + // Cluster expansion (hybrid/vector path only) + if (!skipClusters) { + result.fusedResults = expandViaClusters( + result.fusedResults, + config.clusterExpansion, + projectFilter, + agentFilter, + config.feedbackWeight, + ); + } + } + + // ── Shared post-processing ─────────────────────────────────────────── + const processed = await postProcessResults(result.fusedResults, { + queryEmbedding: result.queryEmbedding, + maxTokens, + currentSessionId, + config, + useIndexSearch: result.useIndexSearch, + }); + + return { + text: processed.text, + tokenCount: processed.tokenCount, + chunks: processed.chunks, + totalConsidered: processed.totalConsidered, + durationMs: Date.now() - startTime, + queryEmbedding: result.queryEmbedding, + seedIds: processed.seedIds, + }; +} + +// ── Budget assembly ────────────────────────────────────────────────────────── + /** * Formatting overhead constants. * @@ -680,6 +807,8 @@ function truncateChunk(content: string, maxTokens: number): string { return truncated + '\n...[truncated]'; } +// ── Additional exports ─────────────────────────────────────────────────────── + /** * Result from similarity search for semantic deletion. */ From d61974d8a41916686e9b04b4d227ca03a0f6fe73 Mon Sep 17 00:00:00 2001 From: Greg von Nessi Date: Fri, 13 Mar 2026 18:07:10 +0000 Subject: [PATCH 2/2] =?UTF-8?q?Break=20ingestSession=20into=20composable?= =?UTF-8?q?=20steps=20(370=20=E2=86=92=20~200=20lines)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract 3 stage functions from the monolithic ingestSession: - validateAndSetup: session info, checkpoints, mtime, skip checks - assembleSessionTurns: stream/file parsing, incremental filtering - initializeEmbedding: embedder setup with error recovery Sub-agent processing kept inline (complex team topology branching). --- src/ingest/ingest-session.ts | 193 ++++++++++++++++++++++++++--------- 1 file changed, 144 insertions(+), 49 deletions(-) diff --git a/src/ingest/ingest-session.ts b/src/ingest/ingest-session.ts index 0ca31a3..c81ce26 100644 --- a/src/ingest/ingest-session.ts +++ b/src/ingest/ingest-session.ts @@ -39,14 +39,18 @@ import { } from './brief-debrief-detector.js'; import { detectTeamTopology, groupTeammateFiles, type TeamTopology } from './team-detector.js'; import { detectTeamEdges } from './team-edge-detector.js'; -import { getCheckpoint, saveCheckpoint } from '../storage/checkpoint-store.js'; +import { + getCheckpoint, + saveCheckpoint, + type IngestionCheckpoint, +} from '../storage/checkpoint-store.js'; import { computeContentHash, getCachedEmbeddingsBatch, cacheEmbeddingsBatch, } from '../storage/embedding-cache.js'; import type { ChunkInput } from '../storage/types.js'; -import type { Chunk, Turn } from '../parser/types.js'; +import type { Chunk, Turn, SessionInfo } from '../parser/types.js'; import { createLogger } from '../utils/logger.js'; import { resolveCanonicalProjectPath } from '../utils/project-path.js'; import { generateIndexEntriesForChunks } from './index-entry-hook.js'; @@ -149,26 +153,27 @@ function createSkipResult( }; } +/** Result of validation and setup phase. */ +interface SetupResult { + info: SessionInfo; + projectSlug: string; + projectPath: string; + sessionId: string; + checkpoint: IngestionCheckpoint | null; + fileMtime: string; + useStreaming: boolean; +} + /** - * Ingest a single session file. + * Validate session and gather setup data. + * Returns `{ skip }` for early exit or `{ setup }` to continue processing. */ -export async function ingestSession( +async function validateAndSetup( sessionPath: string, - options: IngestOptions = {}, -): Promise { - const startTime = Date.now(); - - const { - maxTokensPerChunk = 4096, - includeThinking = true, - embeddingModel = 'jina-small', - skipIfExists = true, - linkCrossSessions = true, - processSubAgents = true, - useIncrementalIngestion = true, - useEmbeddingCache = true, - embeddingDevice, - } = options; + options: IngestOptions, + startTime: number, +): Promise<{ skip: IngestResult } | { setup: SetupResult }> { + const { skipIfExists = true, useIncrementalIngestion = true } = options; // Get session info const info = await getSessionInfo(sessionPath); @@ -186,22 +191,52 @@ export async function ingestSession( if (checkpoint && checkpoint.fileMtime) { const lastIngestMtime = new Date(checkpoint.fileMtime); if (fileStats.mtime <= lastIngestMtime) { - return createSkipResult(info.sessionId, projectSlug, startTime, { - skipReason: 'unchanged_file', - }); + return { + skip: createSkipResult(info.sessionId, projectSlug, startTime, { + skipReason: 'unchanged_file', + }), + }; } } // Check if already ingested (only if no checkpoint - checkpoint means we might resume) if (skipIfExists && !checkpoint && isSessionIngested(info.sessionId)) { - return createSkipResult(info.sessionId, projectSlug, startTime, { - skipReason: 'already_ingested', - }); + return { + skip: createSkipResult(info.sessionId, projectSlug, startTime, { + skipReason: 'already_ingested', + }), + }; } // Use streaming for large files const useStreaming = fileStats.size > STREAMING_THRESHOLD_BYTES; + return { + setup: { + info, + projectSlug, + projectPath, + sessionId: info.sessionId, + checkpoint, + fileMtime, + useStreaming, + }, + }; +} + +/** + * Parse session turns and apply incremental filtering. + * Returns null if there are no new turns to process (with checkpoint mtime updated). + */ +async function assembleSessionTurns( + sessionPath: string, + setup: SetupResult, + startTime: number, +): Promise< + { allTurns: Turn[]; turnsToProcess: Turn[]; startTurnIndex: number } | { skip: IngestResult } +> { + const { checkpoint, fileMtime, useStreaming, projectSlug, sessionId } = setup; + // Parse main session (sidechains filtered at message level) // Include noise (progress messages) to get agent_progress for brief/debrief detection let turns: Turn[]; @@ -231,12 +266,35 @@ export async function ingestSession( fileMtime, }); } - return createSkipResult(info.sessionId, projectSlug, startTime, { - skipped: startTurnIndex > 0, - skipReason: startTurnIndex > 0 ? 'no_new_turns' : undefined, - }); + return { + skip: createSkipResult(sessionId, projectSlug, startTime, { + skipped: startTurnIndex > 0, + skipReason: startTurnIndex > 0 ? 'no_new_turns' : undefined, + }), + }; } + return { allTurns: turns, turnsToProcess, startTurnIndex }; +} + +/** Type for the embed-all function. */ +type EmbedAllFn = (texts: string[]) => Promise; + +/** Context for embedding operations. */ +interface EmbeddingContext { + embedder: Embedder | null; + embedAllFn: EmbedAllFn; + shouldEmbed: boolean; + needsDispose: boolean; +} + +/** + * Initialize embedding context based on configuration. + * Non-fatal — logs and disables embedding on error. + */ +async function initializeEmbedding(options: IngestOptions): Promise { + const { embeddingModel = 'jina-small', embeddingDevice } = options; + // Determine whether to embed chunks during ingestion const runtimeConfig = toRuntimeConfig(loadConfig()); const shouldEmbed = runtimeConfig.embeddingEager; @@ -245,7 +303,7 @@ export async function ingestSession( const embedder = shouldEmbed ? (options.embedder ?? new Embedder()) : null; const needsDispose = shouldEmbed && !options.embedder; - const embedAllFn = async (texts: string[]): Promise => { + const embedAllFn: EmbedAllFn = async (texts: string[]): Promise => { if (!embedder) return texts.map(() => []); const results: number[][] = []; for (const t of texts) { @@ -255,23 +313,60 @@ export async function ingestSession( return results; }; + // Load model if using a local embedder (pool workers load their own) + if (embedder && (!embedder.currentModel || embedder.currentModel.id !== embeddingModel)) { + await embedder.load(getModel(embeddingModel), { device: embeddingDevice }); + } + + return { embedder, embedAllFn, shouldEmbed, needsDispose }; +} + +/** + * Ingest a single session file. + */ +export async function ingestSession( + sessionPath: string, + options: IngestOptions = {}, +): Promise { + const startTime = Date.now(); + + const { + maxTokensPerChunk = 4096, + includeThinking = true, + embeddingModel = 'jina-small', + linkCrossSessions = true, + processSubAgents = true, + useIncrementalIngestion = true, + useEmbeddingCache = true, + } = options; + + // Stage 1: Validate and setup + const validated = await validateAndSetup(sessionPath, options, startTime); + if ('skip' in validated) return validated.skip; + const { setup } = validated; + const { projectSlug, projectPath, sessionId, fileMtime } = setup; + + // Stage 2: Parse and filter turns + const turnResult = await assembleSessionTurns(sessionPath, setup, startTime); + if ('skip' in turnResult) return turnResult.skip; + const { allTurns: turns, turnsToProcess, startTurnIndex } = turnResult; + + // Stage 3: Initialize embedding + const embedding = await initializeEmbedding(options); + const { embedAllFn, shouldEmbed } = embedding; + // Track cache stats let totalCacheHits = 0; let totalCacheMisses = 0; try { - // Load model if using a local embedder (pool workers load their own) - if (embedder && (!embedder.currentModel || embedder.currentModel.id !== embeddingModel)) { - await embedder.load(getModel(embeddingModel), { device: embeddingDevice }); - } - // Track all chunks and sub-agent data let totalChunkCount = 0; let totalEdgeCount = 0; let deadEndFilesSkipped = 0; const subAgentData = new Map(); - // 1. Discover sub-agents (if enabled) + // Stage 4: Discover and process sub-agents (if enabled) let subAgentInfos: SubAgentInfo[] = []; if (processSubAgents) { subAgentInfos = await discoverSubAgents(sessionPath); @@ -315,7 +410,7 @@ export async function ingestSession( for (const subAgent of regularSubAgents) { const result = await processSubAgent( subAgent, - info.sessionId, + sessionId, projectSlug, projectPath, maxTokensPerChunk, @@ -349,7 +444,7 @@ export async function ingestSession( const subChunks = chunkTurns(subTurns, { maxTokens: maxTokensPerChunk, includeThinking, - sessionId: info.sessionId, + sessionId, sessionSlug: projectSlug, }); if (subChunks.length === 0) continue; @@ -405,11 +500,11 @@ export async function ingestSession( } } - // 2. Process main session (shared pipeline) + // Process main session (shared pipeline) const mainResult = await processMainSession({ turnsToProcess, allTurns: turns, - sessionId: info.sessionId, + sessionId, projectSlug, projectPath, maxTokensPerChunk, @@ -432,10 +527,10 @@ export async function ingestSession( totalCacheMisses += mainResult?.cacheMisses ?? 0; // Extract and store session state - saveSessionState(turns, info.sessionId, projectSlug, projectPath); + saveSessionState(turns, sessionId, projectSlug, projectPath); return { - sessionId: info.sessionId, + sessionId, sessionSlug: projectSlug, chunkCount: totalChunkCount, edgeCount: totalEdgeCount, @@ -456,7 +551,7 @@ export async function ingestSession( for (const subAgent of activeSubAgents) { const result = await processSubAgent( subAgent, - info.sessionId, + sessionId, projectSlug, projectPath, maxTokensPerChunk, @@ -476,11 +571,11 @@ export async function ingestSession( } } - // 2. Process main session (shared pipeline) + // Stage 5: Process main session (shared pipeline) const mainResult = await processMainSession({ turnsToProcess, allTurns: turns, - sessionId: info.sessionId, + sessionId, projectSlug, projectPath, maxTokensPerChunk, @@ -503,10 +598,10 @@ export async function ingestSession( totalCacheMisses += mainResult?.cacheMisses ?? 0; // Extract and store session state - saveSessionState(turns, info.sessionId, projectSlug, projectPath); + saveSessionState(turns, sessionId, projectSlug, projectPath); return { - sessionId: info.sessionId, + sessionId, sessionSlug: projectSlug, chunkCount: totalChunkCount, edgeCount: totalEdgeCount, @@ -522,8 +617,8 @@ export async function ingestSession( isTeamSession: false, }; } finally { - if (needsDispose && embedder) { - await embedder.dispose(); + if (embedding.needsDispose && embedding.embedder) { + await embedding.embedder.dispose(); } } }