diff --git a/web/schema.sql b/web/schema.sql index 050bb40..aafeec8 100755 --- a/web/schema.sql +++ b/web/schema.sql @@ -65,9 +65,37 @@ CREATE TABLE IF NOT EXISTS episodic_memory ( reclassified INTEGER NOT NULL DEFAULT 0, thread_id TEXT, -- conversation thread for dreaming cycle executor TEXT, -- which executor handled this + complexity_tier TEXT, -- aegis#563: procedureKey complement (low|mid|high); NULL for non-dispatcher producers created_at TEXT NOT NULL DEFAULT (datetime('now')) ); +-- aegis#564 Phase 2: shadow-read drift log for cached-vs-derived stats on +-- procedural_memory. Populated by getProcedureWithDerivedStats / +-- getAllProceduresWithDerivedStats. Phase 3 gate: drift row p95 within +-- tolerance across a 7-day window before dropping the cached aggregate +-- columns from procedural_memory. +CREATE TABLE IF NOT EXISTS shadow_read_drift ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + reader TEXT NOT NULL, + task_pattern TEXT NOT NULL, + cached_count INTEGER NOT NULL, + cached_success_count INTEGER NOT NULL, + cached_fail_count INTEGER NOT NULL, + cached_avg_latency_ms REAL NOT NULL, + cached_avg_cost REAL NOT NULL, + cached_last_used TEXT, + derived_count INTEGER NOT NULL, + derived_success_count INTEGER NOT NULL, + derived_fail_count INTEGER NOT NULL, + derived_avg_latency_ms REAL NOT NULL, + derived_avg_cost REAL NOT NULL, + derived_last_used TEXT, + pre_tier_count INTEGER NOT NULL DEFAULT 0, + sampled_at TEXT NOT NULL DEFAULT (datetime('now')) +); +CREATE INDEX IF NOT EXISTS idx_shadow_read_drift_reader_sampled + ON shadow_read_drift(reader, sampled_at); + CREATE TABLE IF NOT EXISTS procedural_memory ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_pattern TEXT NOT NULL UNIQUE, diff --git a/web/src/dashboard.ts b/web/src/dashboard.ts index c4f1616..3f92f0d 100644 --- a/web/src/dashboard.ts +++ b/web/src/dashboard.ts @@ -1,7 +1,7 @@ // Operator dashboard — server-rendered system health, memory, goals, cost tracking // Auth-gated via existing bearerAuth middleware -import { getAllProcedures, getActiveAgendaItems, getActiveGoals } from './kernel/memory/index.js'; +import { getAllProceduresWithDerivedStats, getActiveAgendaItems, getActiveGoals } from './kernel/memory/index.js'; import { getMemoryStats } from './kernel/memory-adapter.js'; import type { MemoryServiceBinding } from './types.js'; import type { ProceduralEntry } from './kernel/types.js'; @@ -115,7 +115,7 @@ export async function getDashboardData(db: D1Database, memoryBinding?: MemorySer taskCompletedRows, taskRecentRows, ] = await Promise.all([ - getAllProcedures(db), + getAllProceduresWithDerivedStats(db, { reader: 'dashboard' }), getActiveAgendaItems(db), getActiveGoals(db), memoryBinding ? getMemoryStats(memoryBinding) : Promise.resolve({ total_active: 0, topics: [], recalled_last_24h: 0, strength_distribution: { low: 0, medium: 0, high: 0 } }), diff --git a/web/src/kernel/memory/episodic.ts b/web/src/kernel/memory/episodic.ts index 87e722c..479e7fd 100755 --- a/web/src/kernel/memory/episodic.ts +++ b/web/src/kernel/memory/episodic.ts @@ -71,6 +71,124 @@ export async function getEpisodeStats(db: D1Database, intentClass: string): Prom }; } +// ─── Stats by (intent_class, complexity_tier) — derived-stats path ── +// aegis#563 + aegis#564: stats keyed by (intent_class, complexity_tier) — +// matches procedural_memory's procedureKey shape so the projection-source +// path can derive procedural aggregates at read time. +// +// The helpers rely on episodic_memory.complexity_tier (added in the same +// schema migration that adds these functions). Rows without a tier value +// are excluded from derived results by the WHERE complexity_tier IS NOT +// NULL guard. Consumers that want stricter protection against retroactive +// backfills can add their own time-based filter in a wrapper. + +export async function getEpisodeStatsByComplexity( + db: D1Database, + intentClass: string, + complexityTier: string, +): Promise<{ + count: number; + successCount: number; + successRate: number; + avgCost: number; + avgLatency: number; + lastUsed: string | null; +} | null> { + // SUM(CASE outcome='success' ...) returns an exact integer — don't + // reconstruct successCount from count * avgSuccessRate downstream (FP + // rounding on ugly rates would break strict-equality drift checks). + const row = await db.prepare(` + SELECT + COUNT(*) as count, + SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as success_count, + AVG(cost) as avg_cost, + AVG(latency_ms) as avg_latency, + MAX(created_at) as last_used + FROM episodic_memory + WHERE intent_class = ? + AND complexity_tier = ? + `).bind(intentClass, complexityTier).first<{ + count: number; + success_count: number; + avg_cost: number; + avg_latency: number; + last_used: string | null; + }>(); + + if (!row || row.count === 0) return null; + return { + count: row.count, + successCount: row.success_count, + successRate: row.count > 0 ? row.success_count / row.count : 0, + avgCost: row.avg_cost, + avgLatency: row.avg_latency, + lastUsed: row.last_used, + }; +} + +// aegis#564 Phase 2: bulk variant for dashboard / observability / decision-docs. +// One GROUP BY intent_class, complexity_tier scan covers both the derived +// slice (non-null tier) and the pre-tier ghost slice (NULL tier) so callers +// avoid N+1 queries. Returns a Map keyed on intent_class with nested +// derived-by-tier and the pre-tier count. +export interface EpisodeStatsAggregate { + derived: Record; + preTierCount: number; +} + +export async function getAllEpisodeStatsByComplexity( + db: D1Database, +): Promise> { + // Single scan: grouping on (intent_class, complexity_tier) folds both + // derived (non-null tier) and pre-tier (NULL tier) rows into one query. + const result = await db.prepare(` + SELECT + intent_class, + complexity_tier, + COUNT(*) as count, + SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as success_count, + AVG(cost) as avg_cost, + AVG(latency_ms) as avg_latency, + MAX(created_at) as last_used + FROM episodic_memory + GROUP BY intent_class, complexity_tier + `).all<{ + intent_class: string; + complexity_tier: string | null; + count: number; + success_count: number; + avg_cost: number; + avg_latency: number; + last_used: string | null; + }>(); + + const byClass = new Map(); + for (const row of result.results) { + const entry = byClass.get(row.intent_class) ?? { derived: {}, preTierCount: 0 }; + if (row.complexity_tier === null) { + entry.preTierCount = row.count; + } else { + entry.derived[row.complexity_tier] = { + count: row.count, + successCount: row.success_count, + failCount: row.count - row.success_count, + avgCost: row.avg_cost, + avgLatency: row.avg_latency, + lastUsed: row.last_used, + }; + } + byClass.set(row.intent_class, entry); + } + return byClass; +} + // ─── Conversation History ─────────────────────────────────── export async function getConversationHistory( diff --git a/web/src/kernel/memory/index.ts b/web/src/kernel/memory/index.ts index 6d69d6f..15de553 100755 --- a/web/src/kernel/memory/index.ts +++ b/web/src/kernel/memory/index.ts @@ -1,5 +1,5 @@ -export { recordEpisode, retrogradeEpisode, sanitizeEpisodicOutcome, getRecentEpisodes, getEpisodeStats, getConversationHistory, estimateTokens, budgetConversationHistory } from './episodic.js'; -export { PROCEDURE_MIN_SUCCESSES, PROCEDURE_MIN_SUCCESS_RATE, complexityTier, procedureKey, getProcedure, getAllProcedures, findNearMiss, upsertProcedure, addRefinement, degradeProcedure, maintainProcedures } from './procedural.js'; +export { recordEpisode, retrogradeEpisode, sanitizeEpisodicOutcome, getRecentEpisodes, getEpisodeStats, getEpisodeStatsByComplexity, getAllEpisodeStatsByComplexity, type EpisodeStatsAggregate, getConversationHistory, estimateTokens, budgetConversationHistory } from './episodic.js'; +export { PROCEDURE_MIN_SUCCESSES, PROCEDURE_MIN_SUCCESS_RATE, complexityTier, procedureKey, getProcedure, getAllProcedures, getProcedureWithDerivedStats, getAllProceduresWithDerivedStats, type DriftLogOpts, findNearMiss, upsertProcedure, addRefinement, degradeProcedure, maintainProcedures } from './procedural.js'; export { normalizeTopic, tokenize, jaccardSimilarity, recordMemory, searchMemoryByKeywords, getMemoryEntries, recallMemory, computeEwaScore, getAllMemoryForContext } from './semantic.js'; export { pruneMemory } from './pruning.js'; export { consolidateEpisodicToSemantic } from './consolidation.js'; diff --git a/web/src/kernel/memory/procedural.ts b/web/src/kernel/memory/procedural.ts index fb33c20..1b657ea 100755 --- a/web/src/kernel/memory/procedural.ts +++ b/web/src/kernel/memory/procedural.ts @@ -1,4 +1,9 @@ import type { ProceduralEntry, ProceduralStatus, Refinement } from '../types.js'; +import { + getEpisodeStatsByComplexity, + getAllEpisodeStatsByComplexity, + type EpisodeStatsAggregate, +} from './episodic.js'; // ─── Constants ────────────────────────────────────────────── @@ -39,6 +44,211 @@ export async function getAllProcedures(db: D1Database): Promise { + const row = await getProcedure(db, taskPattern); + if (!row) return null; + + if (opts && Math.random() < (opts.sample ?? 1.0)) { + // Drift log failure must never break the read. Awaited + try/catch so + // observability is synchronous for tests and ordered for ctx.waitUntil. + try { + await logDriftSingle(db, row, opts.reader); + } catch (err) { + console.warn('[shadow-read] drift log failed (non-fatal):', + err instanceof Error ? err.message : String(err)); + } + } + + return row; +} + +export async function getAllProceduresWithDerivedStats( + db: D1Database, + opts?: DriftLogOpts, +): Promise { + const procedures = await getAllProcedures(db); + + if (opts && Math.random() < (opts.sample ?? 1.0)) { + try { + await logDriftBulk(db, procedures, opts.reader); + } catch (err) { + console.warn('[shadow-read] bulk drift log failed (non-fatal):', + err instanceof Error ? err.message : String(err)); + } + } + + return procedures; +} + +// ─── Shadow-read drift logging (Phase 2) ──────────────────────── + +function reconstructDerivedFields( + stats: EpisodeStatsAggregate['derived'][string] | undefined, +): { + count: number; + successCount: number; + failCount: number; + avgLatency: number; + avgCost: number; + lastUsed: string | null; +} { + if (!stats) { + return { count: 0, successCount: 0, failCount: 0, avgLatency: 0, avgCost: 0, lastUsed: null }; + } + return { + count: stats.count, + successCount: stats.successCount, + failCount: stats.failCount, + avgLatency: stats.avgLatency, + avgCost: stats.avgCost, + lastUsed: stats.lastUsed, + }; +} + +function parseProcedureKey(taskPattern: string): { intentClass: string; tier: string } | null { + const idx = taskPattern.lastIndexOf(':'); + if (idx === -1) return null; + return { + intentClass: taskPattern.slice(0, idx), + tier: taskPattern.slice(idx + 1), + }; +} + +async function logDriftSingle( + db: D1Database, + cached: ProceduralEntry, + reader: string, +): Promise { + const parsed = parseProcedureKey(cached.task_pattern); + if (!parsed) return; // legacy non-conforming key — skip rather than pollute the log + + const derivedStats = await getEpisodeStatsByComplexity(db, parsed.intentClass, parsed.tier); + const preTierRow = await db.prepare( + `SELECT COUNT(*) as c FROM episodic_memory + WHERE intent_class = ? AND complexity_tier IS NULL` + ).bind(parsed.intentClass).first<{ c: number }>(); + + // Use exact successCount from SUM(CASE ...) rather than reconstructing + // via Math.round(count * rate). Float rounding on ugly rates would + // break strict-equality drift checks. + const derived = derivedStats + ? { + count: derivedStats.count, + successCount: derivedStats.successCount, + failCount: derivedStats.count - derivedStats.successCount, + avgLatency: derivedStats.avgLatency, + avgCost: derivedStats.avgCost, + lastUsed: derivedStats.lastUsed, + } + : reconstructDerivedFields(undefined); + + await db.prepare( + `INSERT INTO shadow_read_drift ( + reader, task_pattern, + cached_count, cached_success_count, cached_fail_count, + cached_avg_latency_ms, cached_avg_cost, cached_last_used, + derived_count, derived_success_count, derived_fail_count, + derived_avg_latency_ms, derived_avg_cost, derived_last_used, + pre_tier_count + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + ).bind( + reader, cached.task_pattern, + cached.success_count + cached.fail_count, + cached.success_count, + cached.fail_count, + cached.avg_latency_ms, + cached.avg_cost, + cached.last_used ?? null, + derived.count, + derived.successCount, + derived.failCount, + derived.avgLatency, + derived.avgCost, + derived.lastUsed, + preTierRow?.c ?? 0, + ).run(); +} + +async function logDriftBulk( + db: D1Database, + procedures: ProceduralEntry[], + reader: string, +): Promise { + if (procedures.length === 0) return; + + const aggregate = await getAllEpisodeStatsByComplexity(db); + + const stmt = db.prepare( + `INSERT INTO shadow_read_drift ( + reader, task_pattern, + cached_count, cached_success_count, cached_fail_count, + cached_avg_latency_ms, cached_avg_cost, cached_last_used, + derived_count, derived_success_count, derived_fail_count, + derived_avg_latency_ms, derived_avg_cost, derived_last_used, + pre_tier_count + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + ); + + const batch: D1PreparedStatement[] = []; + for (const cached of procedures) { + const parsed = parseProcedureKey(cached.task_pattern); + if (!parsed) continue; + const classStats = aggregate.get(parsed.intentClass); + const derived = reconstructDerivedFields(classStats?.derived[parsed.tier]); + const preTierCount = classStats?.preTierCount ?? 0; + + batch.push(stmt.bind( + reader, cached.task_pattern, + cached.success_count + cached.fail_count, + cached.success_count, + cached.fail_count, + cached.avg_latency_ms, + cached.avg_cost, + cached.last_used ?? null, + derived.count, + derived.successCount, + derived.failCount, + derived.avgLatency, + derived.avgCost, + derived.lastUsed, + preTierCount, + )); + } + + // D1 caps batch size (~100 statements in practice). Chunk so a large + // procedural_memory table doesn't trip the cap and get swallowed by + // the non-fatal catch upstream, silently blinding the drift dashboard. + const BATCH = 100; + for (let i = 0; i < batch.length; i += BATCH) { + await db.batch(batch.slice(i, i + BATCH)); + } +} + export async function findNearMiss(db: D1Database, classification: string): Promise { const prefix = classification.split('_')[0]; if (!prefix) return null; diff --git a/web/src/routes/observability.ts b/web/src/routes/observability.ts index a8010db..25bdd99 100644 --- a/web/src/routes/observability.ts +++ b/web/src/routes/observability.ts @@ -2,7 +2,7 @@ import { Hono } from 'hono'; import type { Env } from '../types.js'; -import { getAllProcedures, getActiveAgendaItems } from '../kernel/memory/index.js'; +import { getAllProceduresWithDerivedStats, getActiveAgendaItems } from '../kernel/memory/index.js'; const observability = new Hono<{ Bindings: Env }>(); @@ -56,7 +56,7 @@ observability.get('/agenda', async (c) => { // ─── Procedures ───────────────────────────────────────────── observability.get('/procedures', async (c) => { - const raw = await getAllProcedures(c.env.DB); + const raw = await getAllProceduresWithDerivedStats(c.env.DB, { reader: 'observability' }); const procedures = raw.map((p: any) => { const total = p.success_count + p.fail_count; diff --git a/web/tests/observability.test.ts b/web/tests/observability.test.ts index 3bf47ad..e64f07c 100755 --- a/web/tests/observability.test.ts +++ b/web/tests/observability.test.ts @@ -5,12 +5,12 @@ import { describe, it, expect, vi, beforeEach } from 'vitest'; import { Hono } from 'hono'; vi.mock('../src/kernel/memory/index.js', () => ({ - getAllProcedures: vi.fn().mockResolvedValue([]), + getAllProceduresWithDerivedStats: vi.fn().mockResolvedValue([]), getActiveAgendaItems: vi.fn().mockResolvedValue([]), })); import { observability } from '../src/routes/observability.js'; -import { getAllProcedures, getActiveAgendaItems } from '../src/kernel/memory/index.js'; +import { getAllProceduresWithDerivedStats, getActiveAgendaItems } from '../src/kernel/memory/index.js'; import type { Env } from '../src/types.js'; // ─── D1 Mock ────────────────────────────────────────────────── @@ -167,7 +167,7 @@ describe('observability routes', () => { describe('GET /procedures', () => { it('returns formatted procedures', async () => { - vi.mocked(getAllProcedures).mockResolvedValueOnce([ + vi.mocked(getAllProceduresWithDerivedStats).mockResolvedValueOnce([ { task_pattern: 'greeting', executor: 'groq_8b', @@ -200,7 +200,7 @@ describe('observability routes', () => { }); it('handles zero total executions', async () => { - vi.mocked(getAllProcedures).mockResolvedValueOnce([ + vi.mocked(getAllProceduresWithDerivedStats).mockResolvedValueOnce([ { task_pattern: 'unknown', executor: 'claude',