Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 144 additions & 49 deletions src/ingest/ingest-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ import {
} from './brief-debrief-detector.js';
import { detectTeamTopology, groupTeammateFiles, type TeamTopology } from './team-detector.js';
import { detectTeamEdges } from './team-edge-detector.js';
import { getCheckpoint, saveCheckpoint } from '../storage/checkpoint-store.js';
import {
getCheckpoint,
saveCheckpoint,
type IngestionCheckpoint,
} from '../storage/checkpoint-store.js';
import {
computeContentHash,
getCachedEmbeddingsBatch,
cacheEmbeddingsBatch,
} from '../storage/embedding-cache.js';
import type { ChunkInput } from '../storage/types.js';
import type { Chunk, Turn } from '../parser/types.js';
import type { Chunk, Turn, SessionInfo } from '../parser/types.js';
import { createLogger } from '../utils/logger.js';
import { resolveCanonicalProjectPath } from '../utils/project-path.js';
import { generateIndexEntriesForChunks } from './index-entry-hook.js';
Expand Down Expand Up @@ -149,26 +153,27 @@ function createSkipResult(
};
}

/** Result of validation and setup phase. */
interface SetupResult {
info: SessionInfo;
projectSlug: string;
projectPath: string;
sessionId: string;
checkpoint: IngestionCheckpoint | null;
fileMtime: string;
useStreaming: boolean;
}

/**
* Ingest a single session file.
* Validate session and gather setup data.
* Returns `{ skip }` for early exit or `{ setup }` to continue processing.
*/
export async function ingestSession(
async function validateAndSetup(
sessionPath: string,
options: IngestOptions = {},
): Promise<IngestResult> {
const startTime = Date.now();

const {
maxTokensPerChunk = 4096,
includeThinking = true,
embeddingModel = 'jina-small',
skipIfExists = true,
linkCrossSessions = true,
processSubAgents = true,
useIncrementalIngestion = true,
useEmbeddingCache = true,
embeddingDevice,
} = options;
options: IngestOptions,
startTime: number,
): Promise<{ skip: IngestResult } | { setup: SetupResult }> {
const { skipIfExists = true, useIncrementalIngestion = true } = options;

// Get session info
const info = await getSessionInfo(sessionPath);
Expand All @@ -186,22 +191,52 @@ export async function ingestSession(
if (checkpoint && checkpoint.fileMtime) {
const lastIngestMtime = new Date(checkpoint.fileMtime);
if (fileStats.mtime <= lastIngestMtime) {
return createSkipResult(info.sessionId, projectSlug, startTime, {
skipReason: 'unchanged_file',
});
return {
skip: createSkipResult(info.sessionId, projectSlug, startTime, {
skipReason: 'unchanged_file',
}),
};
}
}

// Check if already ingested (only if no checkpoint - checkpoint means we might resume)
if (skipIfExists && !checkpoint && isSessionIngested(info.sessionId)) {
return createSkipResult(info.sessionId, projectSlug, startTime, {
skipReason: 'already_ingested',
});
return {
skip: createSkipResult(info.sessionId, projectSlug, startTime, {
skipReason: 'already_ingested',
}),
};
}

// Use streaming for large files
const useStreaming = fileStats.size > STREAMING_THRESHOLD_BYTES;

return {
setup: {
info,
projectSlug,
projectPath,
sessionId: info.sessionId,
checkpoint,
fileMtime,
useStreaming,
},
};
}

/**
* Parse session turns and apply incremental filtering.
* Returns null if there are no new turns to process (with checkpoint mtime updated).
*/
async function assembleSessionTurns(
sessionPath: string,
setup: SetupResult,
startTime: number,
): Promise<
{ allTurns: Turn[]; turnsToProcess: Turn[]; startTurnIndex: number } | { skip: IngestResult }
> {
const { checkpoint, fileMtime, useStreaming, projectSlug, sessionId } = setup;

// Parse main session (sidechains filtered at message level)
// Include noise (progress messages) to get agent_progress for brief/debrief detection
let turns: Turn[];
Expand Down Expand Up @@ -231,12 +266,35 @@ export async function ingestSession(
fileMtime,
});
}
return createSkipResult(info.sessionId, projectSlug, startTime, {
skipped: startTurnIndex > 0,
skipReason: startTurnIndex > 0 ? 'no_new_turns' : undefined,
});
return {
skip: createSkipResult(sessionId, projectSlug, startTime, {
skipped: startTurnIndex > 0,
skipReason: startTurnIndex > 0 ? 'no_new_turns' : undefined,
}),
};
}

return { allTurns: turns, turnsToProcess, startTurnIndex };
}

/** Type for the embed-all function. */
type EmbedAllFn = (texts: string[]) => Promise<number[][]>;

/** Context for embedding operations. */
interface EmbeddingContext {
embedder: Embedder | null;
embedAllFn: EmbedAllFn;
shouldEmbed: boolean;
needsDispose: boolean;
}

/**
* Initialize embedding context based on configuration.
* Non-fatal — logs and disables embedding on error.
*/
async function initializeEmbedding(options: IngestOptions): Promise<EmbeddingContext> {
const { embeddingModel = 'jina-small', embeddingDevice } = options;

// Determine whether to embed chunks during ingestion
const runtimeConfig = toRuntimeConfig(loadConfig());
const shouldEmbed = runtimeConfig.embeddingEager;
Expand All @@ -245,7 +303,7 @@ export async function ingestSession(
const embedder = shouldEmbed ? (options.embedder ?? new Embedder()) : null;
const needsDispose = shouldEmbed && !options.embedder;

const embedAllFn = async (texts: string[]): Promise<number[][]> => {
const embedAllFn: EmbedAllFn = async (texts: string[]): Promise<number[][]> => {
if (!embedder) return texts.map(() => []);
const results: number[][] = [];
for (const t of texts) {
Expand All @@ -255,23 +313,60 @@ export async function ingestSession(
return results;
};

// Load model if using a local embedder (pool workers load their own)
if (embedder && (!embedder.currentModel || embedder.currentModel.id !== embeddingModel)) {
await embedder.load(getModel(embeddingModel), { device: embeddingDevice });
}

return { embedder, embedAllFn, shouldEmbed, needsDispose };
}

/**
* Ingest a single session file.
*/
export async function ingestSession(
sessionPath: string,
options: IngestOptions = {},
): Promise<IngestResult> {
const startTime = Date.now();

const {
maxTokensPerChunk = 4096,
includeThinking = true,
embeddingModel = 'jina-small',
linkCrossSessions = true,
processSubAgents = true,
useIncrementalIngestion = true,
useEmbeddingCache = true,
} = options;

// Stage 1: Validate and setup
const validated = await validateAndSetup(sessionPath, options, startTime);
if ('skip' in validated) return validated.skip;
const { setup } = validated;
const { projectSlug, projectPath, sessionId, fileMtime } = setup;

// Stage 2: Parse and filter turns
const turnResult = await assembleSessionTurns(sessionPath, setup, startTime);
if ('skip' in turnResult) return turnResult.skip;
const { allTurns: turns, turnsToProcess, startTurnIndex } = turnResult;

// Stage 3: Initialize embedding
const embedding = await initializeEmbedding(options);
const { embedAllFn, shouldEmbed } = embedding;

// Track cache stats
let totalCacheHits = 0;
let totalCacheMisses = 0;

try {
// Load model if using a local embedder (pool workers load their own)
if (embedder && (!embedder.currentModel || embedder.currentModel.id !== embeddingModel)) {
await embedder.load(getModel(embeddingModel), { device: embeddingDevice });
}

// Track all chunks and sub-agent data
let totalChunkCount = 0;
let totalEdgeCount = 0;
let deadEndFilesSkipped = 0;
const subAgentData = new Map<string, { turns: Turn[]; chunks: Chunk[] }>();

// 1. Discover sub-agents (if enabled)
// Stage 4: Discover and process sub-agents (if enabled)
let subAgentInfos: SubAgentInfo[] = [];
if (processSubAgents) {
subAgentInfos = await discoverSubAgents(sessionPath);
Expand Down Expand Up @@ -315,7 +410,7 @@ export async function ingestSession(
for (const subAgent of regularSubAgents) {
const result = await processSubAgent(
subAgent,
info.sessionId,
sessionId,
projectSlug,
projectPath,
maxTokensPerChunk,
Expand Down Expand Up @@ -349,7 +444,7 @@ export async function ingestSession(
const subChunks = chunkTurns(subTurns, {
maxTokens: maxTokensPerChunk,
includeThinking,
sessionId: info.sessionId,
sessionId,
sessionSlug: projectSlug,
});
if (subChunks.length === 0) continue;
Expand Down Expand Up @@ -405,11 +500,11 @@ export async function ingestSession(
}
}

// 2. Process main session (shared pipeline)
// Process main session (shared pipeline)
const mainResult = await processMainSession({
turnsToProcess,
allTurns: turns,
sessionId: info.sessionId,
sessionId,
projectSlug,
projectPath,
maxTokensPerChunk,
Expand All @@ -432,10 +527,10 @@ export async function ingestSession(
totalCacheMisses += mainResult?.cacheMisses ?? 0;

// Extract and store session state
saveSessionState(turns, info.sessionId, projectSlug, projectPath);
saveSessionState(turns, sessionId, projectSlug, projectPath);

return {
sessionId: info.sessionId,
sessionId,
sessionSlug: projectSlug,
chunkCount: totalChunkCount,
edgeCount: totalEdgeCount,
Expand All @@ -456,7 +551,7 @@ export async function ingestSession(
for (const subAgent of activeSubAgents) {
const result = await processSubAgent(
subAgent,
info.sessionId,
sessionId,
projectSlug,
projectPath,
maxTokensPerChunk,
Expand All @@ -476,11 +571,11 @@ export async function ingestSession(
}
}

// 2. Process main session (shared pipeline)
// Stage 5: Process main session (shared pipeline)
const mainResult = await processMainSession({
turnsToProcess,
allTurns: turns,
sessionId: info.sessionId,
sessionId,
projectSlug,
projectPath,
maxTokensPerChunk,
Expand All @@ -503,10 +598,10 @@ export async function ingestSession(
totalCacheMisses += mainResult?.cacheMisses ?? 0;

// Extract and store session state
saveSessionState(turns, info.sessionId, projectSlug, projectPath);
saveSessionState(turns, sessionId, projectSlug, projectPath);

return {
sessionId: info.sessionId,
sessionId,
sessionSlug: projectSlug,
chunkCount: totalChunkCount,
edgeCount: totalEdgeCount,
Expand All @@ -522,8 +617,8 @@ export async function ingestSession(
isTeamSession: false,
};
} finally {
if (needsDispose && embedder) {
await embedder.dispose();
if (embedding.needsDispose && embedding.embedder) {
await embedding.embedder.dispose();
}
}
}
Expand Down
Loading