Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Nightly consolidation (configurable):
- Compresses old episodes into summaries
- Promotes repeated patterns from episodic to procedural
- Prunes contradicted facts
- Forgets stale, low-signal episodes and long-expired superseded facts
- Keeps storage bounded

## Configuration
Expand Down
24 changes: 23 additions & 1 deletion src/memory/__tests__/consolidation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ function createMockMemory(): {
memory: MemorySystem;
storedEpisodes: Array<Record<string, unknown>>;
storedFacts: Array<Record<string, unknown>>;
pruneStaleEpisodesCalls: Array<[string, string]>;
pruneExpiredFactsCalls: string[];
} {
const storedEpisodes: Array<Record<string, unknown>> = [];
const storedFacts: Array<Record<string, unknown>> = [];
const pruneStaleEpisodesCalls: Array<[string, string]> = [];
const pruneExpiredFactsCalls: string[] = [];

const memory = {
storeEpisode: mock((episode: Record<string, unknown>) => {
Expand All @@ -36,9 +40,17 @@ function createMockMemory(): {
storedFacts.push(fact);
return Promise.resolve(fact.id as string);
}),
pruneStaleEpisodes: mock((userId: string, endedAt: string) => {
pruneStaleEpisodesCalls.push([userId, endedAt]);
return Promise.resolve(0);
}),
pruneExpiredFacts: mock((endedAt: string) => {
pruneExpiredFactsCalls.push(endedAt);
return Promise.resolve(0);
}),
} as unknown as MemorySystem;

return { memory, storedEpisodes, storedFacts };
return { memory, storedEpisodes, storedFacts, pruneStaleEpisodesCalls, pruneExpiredFactsCalls };
}

describe("consolidateSession", () => {
Expand Down Expand Up @@ -156,4 +168,14 @@ describe("consolidateSession", () => {
expect(result.durationMs).toBeGreaterThanOrEqual(0);
expect(typeof result.durationMs).toBe("number");
});

test("runs retention cleanup after consolidation", async () => {
const { memory, pruneStaleEpisodesCalls, pruneExpiredFactsCalls } = createMockMemory();
const data = makeTestSessionData();

await consolidateSession(memory, data);

expect(pruneStaleEpisodesCalls).toEqual([[data.userId, data.endedAt]]);
expect(pruneExpiredFactsCalls).toEqual([data.endedAt]);
});
});
69 changes: 69 additions & 0 deletions src/memory/__tests__/episodic-retention.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { afterAll, describe, expect, mock, test } from "bun:test";
import type { MemoryConfig } from "../../config/types.ts";
import { EmbeddingClient } from "../embeddings.ts";
import { EpisodicStore } from "../episodic.ts";
import { QdrantClient } from "../qdrant-client.ts";

const TEST_CONFIG: MemoryConfig = {
qdrant: { url: "http://localhost:6333" },
ollama: { url: "http://localhost:11434", model: "nomic-embed-text" },
collections: { episodes: "episodes", semantic_facts: "semantic_facts", procedures: "procedures" },
embedding: { dimensions: 768, batch_size: 32 },
context: { max_tokens: 50000, episode_limit: 10, fact_limit: 20, procedure_limit: 5 },
};

describe("EpisodicStore retention", () => {
const originalFetch = globalThis.fetch;

afterAll(() => {
globalThis.fetch = originalFetch;
});

test("pruneStaleEpisodes deletes low-signal episodes past the retention window", async () => {
const deletedIds: string[] = [];
let scrollBody: Record<string, unknown> | null = null;

globalThis.fetch = mock((url: string | Request, init?: RequestInit) => {
const urlStr = typeof url === "string" ? url : url.url;

if (urlStr.includes("/points/scroll")) {
scrollBody = JSON.parse(init?.body as string);
return Promise.resolve(
new Response(
JSON.stringify({
result: {
points: [
{ id: "stale-1", score: 0, payload: {} },
{ id: "stale-2", score: 0, payload: {} },
],
},
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
),
);
}

if (urlStr.includes("/points/delete")) {
const body = JSON.parse(init?.body as string) as { points: string[] };
deletedIds.push(body.points[0]);
}

return Promise.resolve(new Response(JSON.stringify({ status: "ok" }), { status: 200 }));
}) as unknown as typeof fetch;

const qdrant = new QdrantClient(TEST_CONFIG);
const embedder = new EmbeddingClient(TEST_CONFIG);
const store = new EpisodicStore(qdrant, embedder, TEST_CONFIG);

const pruned = await store.pruneStaleEpisodes("user-1", new Date("2026-03-31T00:00:00.000Z").toISOString());

expect(pruned).toBe(2);
expect(deletedIds).toEqual(["stale-1", "stale-2"]);

const filter = ((scrollBody as unknown as Record<string, unknown>).filter ?? {}) as {
must: Array<Record<string, unknown>>;
};
expect(filter.must).toHaveLength(4);
expect(filter.must[0]).toEqual({ key: "user_id", match: { value: "user-1" } });
});
});
35 changes: 35 additions & 0 deletions src/memory/__tests__/qdrant-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,39 @@ describe("QdrantClient", () => {
expect(deleteBody).not.toBeNull();
expect(deleteBody.points).toEqual(["point-123"]);
});

test("scroll sends filter and returns matching points", async () => {
let capturedUrl = "";
let capturedBody: Record<string, unknown> | null = null;

globalThis.fetch = mock((url: string | Request, init?: RequestInit) => {
capturedUrl = typeof url === "string" ? url : url.url;
if (init?.body) {
capturedBody = JSON.parse(init.body as string);
}

return Promise.resolve(
new Response(
JSON.stringify({
result: {
points: [{ id: "stale-ep", score: 0, payload: { summary: "old memory" } }],
},
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
),
);
}) as unknown as typeof fetch;

const client = new QdrantClient(TEST_CONFIG);
const results = await client.scroll("episodes", {
filter: { must: [{ key: "user_id", match: { value: "user-1" } }] },
limit: 5,
withPayload: true,
});

expect(capturedUrl).toContain("/collections/episodes/points/scroll");
expect(results).toHaveLength(1);
expect(results[0].id).toBe("stale-ep");
expect((capturedBody as unknown as Record<string, unknown>).limit).toBe(5);
});
});
65 changes: 65 additions & 0 deletions src/memory/__tests__/semantic-retention.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { afterAll, describe, expect, mock, test } from "bun:test";
import type { MemoryConfig } from "../../config/types.ts";
import { EmbeddingClient } from "../embeddings.ts";
import { QdrantClient } from "../qdrant-client.ts";
import { SemanticStore } from "../semantic.ts";

const TEST_CONFIG: MemoryConfig = {
qdrant: { url: "http://localhost:6333" },
ollama: { url: "http://localhost:11434", model: "nomic-embed-text" },
collections: { episodes: "episodes", semantic_facts: "semantic_facts", procedures: "procedures" },
embedding: { dimensions: 768, batch_size: 32 },
context: { max_tokens: 50000, episode_limit: 10, fact_limit: 20, procedure_limit: 5 },
};

describe("SemanticStore retention", () => {
const originalFetch = globalThis.fetch;

afterAll(() => {
globalThis.fetch = originalFetch;
});

test("pruneExpiredFacts deletes superseded facts past the retention window", async () => {
const deletedIds: string[] = [];
let scrollBody: Record<string, unknown> | null = null;

globalThis.fetch = mock((url: string | Request, init?: RequestInit) => {
const urlStr = typeof url === "string" ? url : url.url;

if (urlStr.includes("/points/scroll")) {
scrollBody = JSON.parse(init?.body as string);
return Promise.resolve(
new Response(
JSON.stringify({
result: {
points: [{ id: "expired-fact", score: 0, payload: {} }],
},
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
),
);
}

if (urlStr.includes("/points/delete")) {
const body = JSON.parse(init?.body as string) as { points: string[] };
deletedIds.push(body.points[0]);
}

return Promise.resolve(new Response(JSON.stringify({ status: "ok" }), { status: 200 }));
}) as unknown as typeof fetch;

const qdrant = new QdrantClient(TEST_CONFIG);
const embedder = new EmbeddingClient(TEST_CONFIG);
const store = new SemanticStore(qdrant, embedder, TEST_CONFIG);

const pruned = await store.pruneExpiredFacts(new Date("2026-03-31T00:00:00.000Z").toISOString());

expect(pruned).toBe(1);
expect(deletedIds).toEqual(["expired-fact"]);

const filter = ((scrollBody as unknown as Record<string, unknown>).filter ?? {}) as {
must: Array<Record<string, unknown>>;
};
expect(filter.must).toEqual([{ key: "valid_until", range: expect.any(Object) }]);
});
});
11 changes: 11 additions & 0 deletions src/memory/consolidation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export async function consolidateSessionWithLLM(
factsExtracted++;
}

await applyRetentionPolicy(memory, sessionData);

return {
result: {
episodesCreated: 1,
Expand Down Expand Up @@ -107,6 +109,8 @@ export async function consolidateSession(memory: MemorySystem, sessionData: Sess
factsExtracted++;
}

await applyRetentionPolicy(memory, sessionData);

return {
episodesCreated,
factsExtracted,
Expand Down Expand Up @@ -241,3 +245,10 @@ function extractFactsFromSession(data: SessionData, episodeId: string): Semantic

return facts;
}

async function applyRetentionPolicy(memory: MemorySystem, data: SessionData): Promise<void> {
await Promise.allSettled([
memory.pruneStaleEpisodes(data.userId, data.endedAt),
memory.pruneExpiredFacts(data.endedAt),
]);
}
20 changes: 20 additions & 0 deletions src/memory/episodic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { MemoryConfig } from "../config/types.ts";
import { type EmbeddingClient, textToSparseVector } from "./embeddings.ts";
import type { QdrantClient } from "./qdrant-client.ts";
import { calculateEpisodeRecallScore } from "./ranking.ts";
import { RETENTION_BATCH_LIMIT, buildStaleEpisodeFilter } from "./retention.ts";
import type { Episode, QdrantSearchResult, RecallOptions } from "./types.ts";

const COLLECTION_SCHEMA = {
Expand Down Expand Up @@ -125,6 +126,25 @@ export class EpisodicStore {
});
}

async pruneStaleEpisodes(userId: string, referenceTime = new Date().toISOString()): Promise<number> {
if (!userId) return 0;

const referenceTimeMs = new Date(referenceTime).getTime();
if (!Number.isFinite(referenceTimeMs)) return 0;

const candidates = await this.qdrant.scroll(this.collectionName, {
filter: buildStaleEpisodeFilter(userId, referenceTimeMs),
limit: RETENTION_BATCH_LIMIT,
withPayload: false,
});

for (const episode of candidates) {
await this.qdrant.deletePoint(this.collectionName, episode.id);
}

return candidates.length;
}

private async updateAccessCounts(ids: string[]): Promise<void> {
for (const id of ids) {
try {
Expand Down
33 changes: 27 additions & 6 deletions src/memory/qdrant-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ type CollectionSchema = {
sparse_vectors?: SparseVectorConfig;
};

type QdrantResponse = {
status?: string;
result?: unknown;
time?: number;
};
type QdrantResponse = { status?: string; result?: unknown; time?: number };

type QdrantQueryResponse = {
result?: { points?: QdrantScoredPoint[] };
};

type QdrantScrollResponse = {
result?: { points?: QdrantScoredPoint[]; next_page_offset?: string | number | null };
};

type QdrantScoredPoint = {
id: string | number;
score: number;
Expand Down Expand Up @@ -146,6 +146,24 @@ export class QdrantClient {
});
}

async scroll(
collection: string,
options: {
filter?: Record<string, unknown>;
limit?: number;
withPayload?: boolean;
},
): Promise<QdrantSearchResult[]> {
const response = (await this.request("POST", `/collections/${collection}/points/scroll`, {
filter: options.filter,
limit: options.limit ?? 50,
with_payload: options.withPayload ?? true,
with_vectors: false,
})) as QdrantScrollResponse;

return this.mapPoints(response.result?.points ?? []);
}

async createPayloadIndex(
collection: string,
fieldName: string,
Expand Down Expand Up @@ -246,7 +264,10 @@ export class QdrantClient {
}

private extractResults(response: QdrantQueryResponse): QdrantSearchResult[] {
const points = response.result?.points ?? [];
return this.mapPoints(response.result?.points ?? []);
}

private mapPoints(points: QdrantScoredPoint[]): QdrantSearchResult[] {
return points.map((p) => ({
id: String(p.id),
score: p.score,
Expand Down
24 changes: 24 additions & 0 deletions src/memory/retention.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
const DAY_MS = 24 * 60 * 60 * 1000;

export const STALE_EPISODE_RETENTION_DAYS = 30;
export const STALE_EPISODE_MAX_IMPORTANCE = 0.35;
export const STALE_EPISODE_MAX_ACCESS_COUNT = 1;
export const EXPIRED_FACT_RETENTION_DAYS = 30;
export const RETENTION_BATCH_LIMIT = 50;

export function buildStaleEpisodeFilter(userId: string, referenceTimeMs: number): Record<string, unknown> {
return {
must: [
{ key: "user_id", match: { value: userId } },
{ key: "ended_at", range: { lte: referenceTimeMs - STALE_EPISODE_RETENTION_DAYS * DAY_MS } },
{ key: "importance", range: { lte: STALE_EPISODE_MAX_IMPORTANCE } },
{ key: "access_count", range: { lte: STALE_EPISODE_MAX_ACCESS_COUNT } },
],
};
}

export function buildExpiredFactFilter(referenceTimeMs: number): Record<string, unknown> {
return {
must: [{ key: "valid_until", range: { lte: referenceTimeMs - EXPIRED_FACT_RETENTION_DAYS * DAY_MS } }],
};
}
Loading
Loading