diff --git a/docs/memory.md b/docs/memory.md index 70e42e8..9d93dff 100644 --- a/docs/memory.md +++ b/docs/memory.md @@ -67,6 +67,7 @@ Nightly consolidation (configurable): - Compresses old episodes into summaries - Promotes repeated patterns from episodic to procedural - Prunes contradicted facts +- Forgets stale, low-signal episodes and long-expired superseded facts - Keeps storage bounded ## Configuration diff --git a/src/memory/__tests__/consolidation.test.ts b/src/memory/__tests__/consolidation.test.ts index ed088ef..181e82d 100644 --- a/src/memory/__tests__/consolidation.test.ts +++ b/src/memory/__tests__/consolidation.test.ts @@ -23,9 +23,13 @@ function createMockMemory(): { memory: MemorySystem; storedEpisodes: Array>; storedFacts: Array>; + pruneStaleEpisodesCalls: Array<[string, string]>; + pruneExpiredFactsCalls: string[]; } { const storedEpisodes: Array> = []; const storedFacts: Array> = []; + const pruneStaleEpisodesCalls: Array<[string, string]> = []; + const pruneExpiredFactsCalls: string[] = []; const memory = { storeEpisode: mock((episode: Record) => { @@ -36,9 +40,17 @@ function createMockMemory(): { storedFacts.push(fact); return Promise.resolve(fact.id as string); }), + pruneStaleEpisodes: mock((userId: string, endedAt: string) => { + pruneStaleEpisodesCalls.push([userId, endedAt]); + return Promise.resolve(0); + }), + pruneExpiredFacts: mock((endedAt: string) => { + pruneExpiredFactsCalls.push(endedAt); + return Promise.resolve(0); + }), } as unknown as MemorySystem; - return { memory, storedEpisodes, storedFacts }; + return { memory, storedEpisodes, storedFacts, pruneStaleEpisodesCalls, pruneExpiredFactsCalls }; } describe("consolidateSession", () => { @@ -156,4 +168,14 @@ describe("consolidateSession", () => { expect(result.durationMs).toBeGreaterThanOrEqual(0); expect(typeof result.durationMs).toBe("number"); }); + + test("runs retention cleanup after consolidation", async () => { + const { memory, pruneStaleEpisodesCalls, pruneExpiredFactsCalls } = createMockMemory(); + const data = makeTestSessionData(); + + await consolidateSession(memory, data); + + expect(pruneStaleEpisodesCalls).toEqual([[data.userId, data.endedAt]]); + expect(pruneExpiredFactsCalls).toEqual([data.endedAt]); + }); }); diff --git a/src/memory/__tests__/episodic-retention.test.ts b/src/memory/__tests__/episodic-retention.test.ts new file mode 100644 index 0000000..2bc351b --- /dev/null +++ b/src/memory/__tests__/episodic-retention.test.ts @@ -0,0 +1,69 @@ +import { afterAll, describe, expect, mock, test } from "bun:test"; +import type { MemoryConfig } from "../../config/types.ts"; +import { EmbeddingClient } from "../embeddings.ts"; +import { EpisodicStore } from "../episodic.ts"; +import { QdrantClient } from "../qdrant-client.ts"; + +const TEST_CONFIG: MemoryConfig = { + qdrant: { url: "http://localhost:6333" }, + ollama: { url: "http://localhost:11434", model: "nomic-embed-text" }, + collections: { episodes: "episodes", semantic_facts: "semantic_facts", procedures: "procedures" }, + embedding: { dimensions: 768, batch_size: 32 }, + context: { max_tokens: 50000, episode_limit: 10, fact_limit: 20, procedure_limit: 5 }, +}; + +describe("EpisodicStore retention", () => { + const originalFetch = globalThis.fetch; + + afterAll(() => { + globalThis.fetch = originalFetch; + }); + + test("pruneStaleEpisodes deletes low-signal episodes past the retention window", async () => { + const deletedIds: string[] = []; + let scrollBody: Record | null = null; + + globalThis.fetch = mock((url: string | Request, init?: RequestInit) => { + const urlStr = typeof url === "string" ? url : url.url; + + if (urlStr.includes("/points/scroll")) { + scrollBody = JSON.parse(init?.body as string); + return Promise.resolve( + new Response( + JSON.stringify({ + result: { + points: [ + { id: "stale-1", score: 0, payload: {} }, + { id: "stale-2", score: 0, payload: {} }, + ], + }, + }), + { status: 200, headers: { "Content-Type": "application/json" } }, + ), + ); + } + + if (urlStr.includes("/points/delete")) { + const body = JSON.parse(init?.body as string) as { points: string[] }; + deletedIds.push(body.points[0]); + } + + return Promise.resolve(new Response(JSON.stringify({ status: "ok" }), { status: 200 })); + }) as unknown as typeof fetch; + + const qdrant = new QdrantClient(TEST_CONFIG); + const embedder = new EmbeddingClient(TEST_CONFIG); + const store = new EpisodicStore(qdrant, embedder, TEST_CONFIG); + + const pruned = await store.pruneStaleEpisodes("user-1", new Date("2026-03-31T00:00:00.000Z").toISOString()); + + expect(pruned).toBe(2); + expect(deletedIds).toEqual(["stale-1", "stale-2"]); + + const filter = ((scrollBody as unknown as Record).filter ?? {}) as { + must: Array>; + }; + expect(filter.must).toHaveLength(4); + expect(filter.must[0]).toEqual({ key: "user_id", match: { value: "user-1" } }); + }); +}); diff --git a/src/memory/__tests__/qdrant-client.test.ts b/src/memory/__tests__/qdrant-client.test.ts index 3ae90fb..d5c2793 100644 --- a/src/memory/__tests__/qdrant-client.test.ts +++ b/src/memory/__tests__/qdrant-client.test.ts @@ -238,4 +238,39 @@ describe("QdrantClient", () => { expect(deleteBody).not.toBeNull(); expect(deleteBody.points).toEqual(["point-123"]); }); + + test("scroll sends filter and returns matching points", async () => { + let capturedUrl = ""; + let capturedBody: Record | null = null; + + globalThis.fetch = mock((url: string | Request, init?: RequestInit) => { + capturedUrl = typeof url === "string" ? url : url.url; + if (init?.body) { + capturedBody = JSON.parse(init.body as string); + } + + return Promise.resolve( + new Response( + JSON.stringify({ + result: { + points: [{ id: "stale-ep", score: 0, payload: { summary: "old memory" } }], + }, + }), + { status: 200, headers: { "Content-Type": "application/json" } }, + ), + ); + }) as unknown as typeof fetch; + + const client = new QdrantClient(TEST_CONFIG); + const results = await client.scroll("episodes", { + filter: { must: [{ key: "user_id", match: { value: "user-1" } }] }, + limit: 5, + withPayload: true, + }); + + expect(capturedUrl).toContain("/collections/episodes/points/scroll"); + expect(results).toHaveLength(1); + expect(results[0].id).toBe("stale-ep"); + expect((capturedBody as unknown as Record).limit).toBe(5); + }); }); diff --git a/src/memory/__tests__/semantic-retention.test.ts b/src/memory/__tests__/semantic-retention.test.ts new file mode 100644 index 0000000..6d6d402 --- /dev/null +++ b/src/memory/__tests__/semantic-retention.test.ts @@ -0,0 +1,65 @@ +import { afterAll, describe, expect, mock, test } from "bun:test"; +import type { MemoryConfig } from "../../config/types.ts"; +import { EmbeddingClient } from "../embeddings.ts"; +import { QdrantClient } from "../qdrant-client.ts"; +import { SemanticStore } from "../semantic.ts"; + +const TEST_CONFIG: MemoryConfig = { + qdrant: { url: "http://localhost:6333" }, + ollama: { url: "http://localhost:11434", model: "nomic-embed-text" }, + collections: { episodes: "episodes", semantic_facts: "semantic_facts", procedures: "procedures" }, + embedding: { dimensions: 768, batch_size: 32 }, + context: { max_tokens: 50000, episode_limit: 10, fact_limit: 20, procedure_limit: 5 }, +}; + +describe("SemanticStore retention", () => { + const originalFetch = globalThis.fetch; + + afterAll(() => { + globalThis.fetch = originalFetch; + }); + + test("pruneExpiredFacts deletes superseded facts past the retention window", async () => { + const deletedIds: string[] = []; + let scrollBody: Record | null = null; + + globalThis.fetch = mock((url: string | Request, init?: RequestInit) => { + const urlStr = typeof url === "string" ? url : url.url; + + if (urlStr.includes("/points/scroll")) { + scrollBody = JSON.parse(init?.body as string); + return Promise.resolve( + new Response( + JSON.stringify({ + result: { + points: [{ id: "expired-fact", score: 0, payload: {} }], + }, + }), + { status: 200, headers: { "Content-Type": "application/json" } }, + ), + ); + } + + if (urlStr.includes("/points/delete")) { + const body = JSON.parse(init?.body as string) as { points: string[] }; + deletedIds.push(body.points[0]); + } + + return Promise.resolve(new Response(JSON.stringify({ status: "ok" }), { status: 200 })); + }) as unknown as typeof fetch; + + const qdrant = new QdrantClient(TEST_CONFIG); + const embedder = new EmbeddingClient(TEST_CONFIG); + const store = new SemanticStore(qdrant, embedder, TEST_CONFIG); + + const pruned = await store.pruneExpiredFacts(new Date("2026-03-31T00:00:00.000Z").toISOString()); + + expect(pruned).toBe(1); + expect(deletedIds).toEqual(["expired-fact"]); + + const filter = ((scrollBody as unknown as Record).filter ?? {}) as { + must: Array>; + }; + expect(filter.must).toEqual([{ key: "valid_until", range: expect.any(Object) }]); + }); +}); diff --git a/src/memory/consolidation.ts b/src/memory/consolidation.ts index 35868d7..548c697 100644 --- a/src/memory/consolidation.ts +++ b/src/memory/consolidation.ts @@ -48,6 +48,8 @@ export async function consolidateSessionWithLLM( factsExtracted++; } + await applyRetentionPolicy(memory, sessionData); + return { result: { episodesCreated: 1, @@ -107,6 +109,8 @@ export async function consolidateSession(memory: MemorySystem, sessionData: Sess factsExtracted++; } + await applyRetentionPolicy(memory, sessionData); + return { episodesCreated, factsExtracted, @@ -241,3 +245,10 @@ function extractFactsFromSession(data: SessionData, episodeId: string): Semantic return facts; } + +async function applyRetentionPolicy(memory: MemorySystem, data: SessionData): Promise { + await Promise.allSettled([ + memory.pruneStaleEpisodes(data.userId, data.endedAt), + memory.pruneExpiredFacts(data.endedAt), + ]); +} diff --git a/src/memory/episodic.ts b/src/memory/episodic.ts index 64c0674..09743a8 100644 --- a/src/memory/episodic.ts +++ b/src/memory/episodic.ts @@ -2,6 +2,7 @@ import type { MemoryConfig } from "../config/types.ts"; import { type EmbeddingClient, textToSparseVector } from "./embeddings.ts"; import type { QdrantClient } from "./qdrant-client.ts"; import { calculateEpisodeRecallScore } from "./ranking.ts"; +import { RETENTION_BATCH_LIMIT, buildStaleEpisodeFilter } from "./retention.ts"; import type { Episode, QdrantSearchResult, RecallOptions } from "./types.ts"; const COLLECTION_SCHEMA = { @@ -125,6 +126,25 @@ export class EpisodicStore { }); } + async pruneStaleEpisodes(userId: string, referenceTime = new Date().toISOString()): Promise { + if (!userId) return 0; + + const referenceTimeMs = new Date(referenceTime).getTime(); + if (!Number.isFinite(referenceTimeMs)) return 0; + + const candidates = await this.qdrant.scroll(this.collectionName, { + filter: buildStaleEpisodeFilter(userId, referenceTimeMs), + limit: RETENTION_BATCH_LIMIT, + withPayload: false, + }); + + for (const episode of candidates) { + await this.qdrant.deletePoint(this.collectionName, episode.id); + } + + return candidates.length; + } + private async updateAccessCounts(ids: string[]): Promise { for (const id of ids) { try { diff --git a/src/memory/qdrant-client.ts b/src/memory/qdrant-client.ts index ecf599f..619a5c9 100644 --- a/src/memory/qdrant-client.ts +++ b/src/memory/qdrant-client.ts @@ -9,16 +9,16 @@ type CollectionSchema = { sparse_vectors?: SparseVectorConfig; }; -type QdrantResponse = { - status?: string; - result?: unknown; - time?: number; -}; +type QdrantResponse = { status?: string; result?: unknown; time?: number }; type QdrantQueryResponse = { result?: { points?: QdrantScoredPoint[] }; }; +type QdrantScrollResponse = { + result?: { points?: QdrantScoredPoint[]; next_page_offset?: string | number | null }; +}; + type QdrantScoredPoint = { id: string | number; score: number; @@ -146,6 +146,24 @@ export class QdrantClient { }); } + async scroll( + collection: string, + options: { + filter?: Record; + limit?: number; + withPayload?: boolean; + }, + ): Promise { + const response = (await this.request("POST", `/collections/${collection}/points/scroll`, { + filter: options.filter, + limit: options.limit ?? 50, + with_payload: options.withPayload ?? true, + with_vectors: false, + })) as QdrantScrollResponse; + + return this.mapPoints(response.result?.points ?? []); + } + async createPayloadIndex( collection: string, fieldName: string, @@ -246,7 +264,10 @@ export class QdrantClient { } private extractResults(response: QdrantQueryResponse): QdrantSearchResult[] { - const points = response.result?.points ?? []; + return this.mapPoints(response.result?.points ?? []); + } + + private mapPoints(points: QdrantScoredPoint[]): QdrantSearchResult[] { return points.map((p) => ({ id: String(p.id), score: p.score, diff --git a/src/memory/retention.ts b/src/memory/retention.ts new file mode 100644 index 0000000..65c2d8d --- /dev/null +++ b/src/memory/retention.ts @@ -0,0 +1,24 @@ +const DAY_MS = 24 * 60 * 60 * 1000; + +export const STALE_EPISODE_RETENTION_DAYS = 30; +export const STALE_EPISODE_MAX_IMPORTANCE = 0.35; +export const STALE_EPISODE_MAX_ACCESS_COUNT = 1; +export const EXPIRED_FACT_RETENTION_DAYS = 30; +export const RETENTION_BATCH_LIMIT = 50; + +export function buildStaleEpisodeFilter(userId: string, referenceTimeMs: number): Record { + return { + must: [ + { key: "user_id", match: { value: userId } }, + { key: "ended_at", range: { lte: referenceTimeMs - STALE_EPISODE_RETENTION_DAYS * DAY_MS } }, + { key: "importance", range: { lte: STALE_EPISODE_MAX_IMPORTANCE } }, + { key: "access_count", range: { lte: STALE_EPISODE_MAX_ACCESS_COUNT } }, + ], + }; +} + +export function buildExpiredFactFilter(referenceTimeMs: number): Record { + return { + must: [{ key: "valid_until", range: { lte: referenceTimeMs - EXPIRED_FACT_RETENTION_DAYS * DAY_MS } }], + }; +} diff --git a/src/memory/semantic.ts b/src/memory/semantic.ts index 619d8e4..0f6d473 100644 --- a/src/memory/semantic.ts +++ b/src/memory/semantic.ts @@ -1,6 +1,7 @@ import type { MemoryConfig } from "../config/types.ts"; import { type EmbeddingClient, textToSparseVector } from "./embeddings.ts"; import type { QdrantClient } from "./qdrant-client.ts"; +import { RETENTION_BATCH_LIMIT, buildExpiredFactFilter } from "./retention.ts"; import type { QdrantSearchResult, RecallOptions, SemanticFact } from "./types.ts"; const COLLECTION_SCHEMA = { @@ -142,6 +143,23 @@ export class SemanticStore { } } + async pruneExpiredFacts(referenceTime = new Date().toISOString()): Promise { + const referenceTimeMs = new Date(referenceTime).getTime(); + if (!Number.isFinite(referenceTimeMs)) return 0; + + const expiredFacts = await this.qdrant.scroll(this.collectionName, { + filter: buildExpiredFactFilter(referenceTimeMs), + limit: RETENTION_BATCH_LIMIT, + withPayload: false, + }); + + for (const fact of expiredFacts) { + await this.qdrant.deletePoint(this.collectionName, fact.id); + } + + return expiredFacts.length; + } + private buildFilter(options?: RecallOptions): Record | undefined { const must: Record[] = []; diff --git a/src/memory/system.ts b/src/memory/system.ts index 8800674..c1e090b 100644 --- a/src/memory/system.ts +++ b/src/memory/system.ts @@ -71,6 +71,11 @@ export class MemorySystem { return this.episodic.recall(query, options); } + async pruneStaleEpisodes(userId: string, referenceTime?: string): Promise { + if (!this.initialized) return 0; + return this.episodic.pruneStaleEpisodes(userId, referenceTime); + } + // Semantic memory async storeFact(fact: SemanticFact): Promise { if (!this.initialized) return fact.id; @@ -92,6 +97,11 @@ export class MemorySystem { return this.semantic.resolveContradiction(newFact, existingFact); } + async pruneExpiredFacts(referenceTime?: string): Promise { + if (!this.initialized) return 0; + return this.semantic.pruneExpiredFacts(referenceTime); + } + // Procedural memory async storeProcedure(procedure: Procedure): Promise { if (!this.initialized) return procedure.id;