From ebd445458046a15b731b62703e3bbef05c5bfd4d Mon Sep 17 00:00:00 2001 From: Abhinav Bansal Date: Thu, 16 Apr 2026 22:45:08 +0800 Subject: [PATCH 1/7] feat(action-brain): wacli collector + checkpoint store (v0.10.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds src/action-brain/collector.ts — deterministic wacli message reader with file-based checkpoint store. Reads WhatsApp export files from the wacli local store, deduplicates by message ID, and persists a checkpoint so repeat runs only surface new messages since last sync. Closes GIT-46. Co-Authored-By: Paperclip --- CHANGELOG.md | 6 + VERSION | 2 +- src/action-brain/collector.ts | 727 ++++++++++++++++++++++++++++ test/action-brain/collector.test.ts | 364 ++++++++++++++ 4 files changed, 1098 insertions(+), 1 deletion(-) create mode 100644 src/action-brain/collector.ts create mode 100644 test/action-brain/collector.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index e623a174..4546d874 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to GBrain will be documented in this file. +## [0.10.1] - 2026-04-16 + +### Added + +- **Wacli collector with checkpoint store.** `collector.ts` reads your WhatsApp export files from the wacli local store and maintains a checkpoint so the ingest pipeline only processes new messages since the last run. Deterministic parsing, dedup-safe, and cron-friendly — no duplicate ingestion on re-runs. + ## [0.10.0] - 2026-04-16 ### Added diff --git a/VERSION b/VERSION index 78bc1abd..57121573 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.10.0 +0.10.1 diff --git a/src/action-brain/collector.ts b/src/action-brain/collector.ts new file mode 100644 index 00000000..0f0260af --- /dev/null +++ b/src/action-brain/collector.ts @@ -0,0 +1,727 @@ +import { execFile } from 'child_process'; +import { mkdir, readFile, rename, unlink, writeFile } from 'fs/promises'; +import { homedir } from 'os'; +import { dirname, join } from 'path'; +import { promisify } from 'util'; +import type { WhatsAppMessage } from './extractor.ts'; + +const execFileAsync = promisify(execFile); +const DEFAULT_LIMIT = 200; +const DEFAULT_STALE_AFTER_HOURS = 24; +const CHECKPOINT_VERSION = 1; + +export type WacliStoreKey = 'personal' | 'business' | string; + +export interface WacliStoreConfig { + key: WacliStoreKey; + storePath: string; +} + +export interface WacliStoreCheckpoint { + after: string | null; + message_ids_at_after: string[]; + updated_at: string | null; +} + +export interface WacliCollectorCheckpointState { + version: number; + stores: Record; +} + +export interface CollectedWhatsAppMessage extends WhatsAppMessage { + ChatJID: string | null; + SenderJID: string | null; + FromMe: boolean; + store_key: string; + store_path: string; +} + +export type WacliDegradedReason = + | 'command_failed' + | 'invalid_payload' + | 'last_sync_unknown' + | 'last_sync_stale'; + +export type WacliHealthStatus = 'healthy' | 'degraded' | 'failed'; + +export interface WacliStoreCollectionResult { + storeKey: string; + storePath: string; + checkpointBefore: string | null; + checkpointAfter: string | null; + batchSize: number; + lastSyncAt: string | null; + degraded: boolean; + degradedReason: WacliDegradedReason | null; + error: string | null; + messages: CollectedWhatsAppMessage[]; +} + +export interface WacliCollectionResult { + collectedAt: string; + checkpointPath: string; + limit: number; + staleAfterHours: number; + stores: WacliStoreCollectionResult[]; + messages: CollectedWhatsAppMessage[]; + degraded: boolean; + checkpoint: WacliCollectorCheckpointState; +} + +export interface WacliHealthSummary { + status: WacliHealthStatus; + lastSyncAt: string | null; + staleStoreKeys: string[]; + disconnectedStoreKeys: string[]; + alerts: string[]; +} + +export interface WacliListRequest { + storePath: string; + after: string | null; + limit: number; +} + +export interface CollectWacliMessagesOptions { + stores?: WacliStoreConfig[]; + limit?: number; + staleAfterHours?: number; + checkpointPath?: string; + persistCheckpoint?: boolean; + now?: Date; + runner?: WacliListMessagesRunner; +} + +export type WacliListMessagesRunner = (request: WacliListRequest) => Promise; + +interface ParseListResult { + ok: boolean; + messages: CollectedWhatsAppMessage[]; + error: string | null; + degradedReason: WacliDegradedReason | null; +} + +interface WacliPayloadLike { + success?: unknown; + data?: { + messages?: unknown; + }; + error?: unknown; +} + +export async function collectWacliMessages( + options: CollectWacliMessagesOptions = {} +): Promise { + const now = options.now ? ensureDate(options.now, 'now') : new Date(); + const stores = normalizeStores(options.stores ?? resolveWacliStoresFromEnv()); + const limit = normalizeLimit(options.limit); + const staleAfterHours = normalizeStaleAfterHours(options.staleAfterHours); + const checkpointPath = options.checkpointPath ?? defaultCollectorCheckpointPath(); + const persistCheckpoint = options.persistCheckpoint ?? true; + const runner = options.runner ?? runWacliMessagesList; + + const checkpoint = await readWacliCollectorCheckpoint(checkpointPath); + const nextCheckpoint: WacliCollectorCheckpointState = { + version: CHECKPOINT_VERSION, + stores: { ...checkpoint.stores }, + }; + + const storeResults: WacliStoreCollectionResult[] = []; + const allMessages: CollectedWhatsAppMessage[] = []; + let checkpointDirty = false; + + for (const store of stores) { + const existingCheckpoint = normalizeStoreCheckpoint(nextCheckpoint.stores[store.key]); + const checkpointBefore = existingCheckpoint.after; + let degradedReason: WacliDegradedReason | null = null; + let error: string | null = null; + let lastSyncAt: string | null = null; + let newMessages: CollectedWhatsAppMessage[] = []; + + let incrementalPayload: unknown; + try { + incrementalPayload = await runner({ + storePath: store.storePath, + after: existingCheckpoint.after, + limit, + }); + } catch (err) { + degradedReason = 'command_failed'; + error = errorMessage(err); + } + + if (!degradedReason) { + const parsed = parseWacliListPayload(incrementalPayload, store); + if (!parsed.ok) { + degradedReason = parsed.degradedReason ?? 'invalid_payload'; + error = parsed.error; + } else { + lastSyncAt = latestTimestamp(parsed.messages); + newMessages = filterMessagesAfterCheckpoint(parsed.messages, existingCheckpoint); + } + } + + if (!degradedReason && !lastSyncAt) { + let latestPayload: unknown; + try { + latestPayload = await runner({ + storePath: store.storePath, + after: null, + limit: 1, + }); + } catch (err) { + degradedReason = 'command_failed'; + error = errorMessage(err); + } + + if (!degradedReason) { + const latestParsed = parseWacliListPayload(latestPayload, store); + if (!latestParsed.ok) { + degradedReason = latestParsed.degradedReason ?? 'invalid_payload'; + error = latestParsed.error; + } else { + lastSyncAt = latestTimestamp(latestParsed.messages); + } + } + } + + if (!degradedReason) { + if (!lastSyncAt) { + degradedReason = 'last_sync_unknown'; + } else if (isTimestampStale(lastSyncAt, now, staleAfterHours)) { + degradedReason = 'last_sync_stale'; + } + } + + const nextStoreCheckpoint = advanceCheckpoint(existingCheckpoint, newMessages, now); + if (!areStoreCheckpointsEqual(existingCheckpoint, nextStoreCheckpoint)) { + nextCheckpoint.stores[store.key] = nextStoreCheckpoint; + checkpointDirty = true; + } else if (!nextCheckpoint.stores[store.key]) { + nextCheckpoint.stores[store.key] = existingCheckpoint; + } + + allMessages.push(...newMessages); + storeResults.push({ + storeKey: store.key, + storePath: store.storePath, + checkpointBefore, + checkpointAfter: nextStoreCheckpoint.after, + batchSize: newMessages.length, + lastSyncAt, + degraded: degradedReason !== null, + degradedReason, + error, + messages: newMessages, + }); + } + + allMessages.sort(sortMessagesByTimestampThenIdThenStore); + + if (checkpointDirty && persistCheckpoint) { + await writeWacliCollectorCheckpoint(checkpointPath, nextCheckpoint); + } + + return { + collectedAt: now.toISOString(), + checkpointPath, + limit, + staleAfterHours, + stores: storeResults, + messages: allMessages, + degraded: storeResults.some((store) => store.degraded), + checkpoint: nextCheckpoint, + }; +} + +export function summarizeWacliHealth( + stores: WacliStoreCollectionResult[], + options: { now?: Date } = {} +): WacliHealthSummary { + if (stores.length === 0) { + return { + status: 'failed', + lastSyncAt: null, + staleStoreKeys: [], + disconnectedStoreKeys: [], + alerts: ['No wacli stores configured.'], + }; + } + + const now = options.now ? ensureDate(options.now, 'now') : new Date(); + const staleStoreKeys: string[] = []; + const disconnectedStoreKeys: string[] = []; + const alerts: string[] = []; + + for (const store of stores) { + if (!store.degraded || !store.degradedReason) { + continue; + } + + if (store.degradedReason === 'last_sync_stale') { + staleStoreKeys.push(store.storeKey); + const lastSyncAt = store.lastSyncAt ?? 'unknown'; + const ageHours = store.lastSyncAt + ? ((now.getTime() - Date.parse(store.lastSyncAt)) / (60 * 60 * 1000)).toFixed(1) + : 'unknown'; + alerts.push(`Store "${store.storeKey}" stale: last sync ${lastSyncAt} (${ageHours}h ago).`); + continue; + } + + disconnectedStoreKeys.push(store.storeKey); + const suffix = store.error ? ` ${store.error}` : ''; + alerts.push(`Store "${store.storeKey}" unhealthy (${store.degradedReason}).${suffix}`.trim()); + } + + const status: WacliHealthStatus = + disconnectedStoreKeys.length > 0 ? 'failed' : staleStoreKeys.length > 0 ? 'degraded' : 'healthy'; + + return { + status, + lastSyncAt: latestWacliSyncAt(stores), + staleStoreKeys, + disconnectedStoreKeys, + alerts, + }; +} + +export function resolveWacliStoresFromEnv(): WacliStoreConfig[] { + const personalStorePath = + asNonEmpty(process.env.ACTION_BRAIN_WACLI_PERSONAL_STORE) + ?? asNonEmpty(process.env.WACLI_STORE_DIR) + ?? join(homedir(), '.wacli'); + const businessStorePath = asNonEmpty(process.env.ACTION_BRAIN_WACLI_BUSINESS_STORE); + + const stores: WacliStoreConfig[] = [{ key: 'personal', storePath: personalStorePath }]; + if (businessStorePath && businessStorePath !== personalStorePath) { + stores.push({ key: 'business', storePath: businessStorePath }); + } + return stores; +} + +export async function readWacliCollectorCheckpoint( + checkpointPath = defaultCollectorCheckpointPath() +): Promise { + try { + const raw = await readFile(checkpointPath, 'utf-8'); + return parseCheckpointState(raw); + } catch { + return emptyCheckpointState(); + } +} + +export async function readWacliCollectorLastSyncAt( + checkpointPath = defaultCollectorCheckpointPath() +): Promise { + const checkpoint = await readWacliCollectorCheckpoint(checkpointPath); + return latestCheckpointSyncAt(checkpoint); +} + +export async function writeWacliCollectorCheckpoint( + checkpointPath: string, + checkpoint: WacliCollectorCheckpointState +): Promise { + const normalized = { + version: CHECKPOINT_VERSION, + stores: normalizeCheckpointStores(checkpoint.stores), + } satisfies WacliCollectorCheckpointState; + + await mkdir(dirname(checkpointPath), { recursive: true }); + const tmpPath = `${checkpointPath}.tmp-${process.pid}-${Date.now()}`; + await writeFile(tmpPath, `${JSON.stringify(normalized, null, 2)}\n`, 'utf-8'); + try { + await rename(tmpPath, checkpointPath); + } catch (err) { + try { + await unlink(tmpPath); + } catch { + // no-op cleanup best effort + } + throw err; + } +} + +export function latestCheckpointSyncAt(checkpoint: WacliCollectorCheckpointState): string | null { + const entries = Object.values(checkpoint.stores ?? {}); + const timestamps = entries + .map((entry) => normalizeTimestamp(entry.after)) + .filter((value): value is string => Boolean(value)); + + if (timestamps.length === 0) { + return null; + } + + timestamps.sort(); + return timestamps[timestamps.length - 1] ?? null; +} + +export function defaultCollectorCheckpointPath(): string { + return join(homedir(), '.gbrain', 'action-brain', 'wacli-checkpoint.json'); +} + +async function runWacliMessagesList(request: WacliListRequest): Promise { + const args: string[] = []; + args.push('--store', request.storePath); + args.push('messages', 'list'); + if (request.after) { + args.push('--after', request.after); + } + args.push('--json', '--limit', String(request.limit)); + + try { + const result = await execFileAsync('wacli', args, { + maxBuffer: 16 * 1024 * 1024, + }); + return JSON.parse(result.stdout); + } catch (err) { + if (isRecord(err) && typeof err.stdout === 'string' && err.stdout.trim().length > 0) { + try { + return JSON.parse(err.stdout); + } catch { + // fall through + } + } + throw new Error(`wacli messages list failed for store ${request.storePath}: ${errorMessage(err)}`); + } +} + +function parseWacliListPayload(payload: unknown, store: WacliStoreConfig): ParseListResult { + if (!isRecord(payload)) { + return { ok: false, messages: [], error: 'wacli payload must be an object', degradedReason: 'invalid_payload' }; + } + + const record = payload as WacliPayloadLike; + if (record.success !== true) { + const message = asNonEmpty(record.error) ?? 'wacli reported success=false'; + return { ok: false, messages: [], error: message, degradedReason: 'command_failed' }; + } + + const rawMessages = record.data?.messages; + if (rawMessages === null || rawMessages === undefined) { + return { ok: true, messages: [], error: null, degradedReason: null }; + } + if (!Array.isArray(rawMessages)) { + return { + ok: false, + messages: [], + error: 'wacli payload data.messages must be an array when present', + degradedReason: 'invalid_payload', + }; + } + + const normalized = normalizeRawMessages(rawMessages, store); + return { ok: true, messages: normalized, error: null, degradedReason: null }; +} + +function normalizeRawMessages(rawMessages: unknown[], store: WacliStoreConfig): CollectedWhatsAppMessage[] { + const uniqueById = new Map(); + + for (const raw of rawMessages) { + if (!isRecord(raw)) continue; + const msgId = asNonEmpty(raw.MsgID); + const timestamp = normalizeTimestamp(raw.Timestamp); + if (!msgId || !timestamp) continue; + + const fromMe = asBoolean(raw.FromMe) ?? false; + const senderJid = asNonEmpty(raw.SenderJID); + const senderName = + asNonEmpty(raw.SenderName) + ?? asNonEmpty(raw.Sender) + ?? asNonEmpty(raw.PushName) + ?? asNonEmpty(raw.ContactName) + ?? (fromMe ? 'me' : null) + ?? senderJid + ?? asNonEmpty(raw.ChatName) + ?? ''; + const text = asString(raw.Text) ?? asString(raw.DisplayText) ?? asString(raw.Snippet) ?? ''; + + const normalizedMessage: CollectedWhatsAppMessage = { + MsgID: msgId, + Timestamp: timestamp, + ChatName: asNonEmpty(raw.ChatName) ?? asNonEmpty(raw.ChatJID) ?? '', + SenderName: senderName, + Text: text, + ChatJID: asNonEmpty(raw.ChatJID), + SenderJID: senderJid, + FromMe: fromMe, + store_key: store.key, + store_path: store.storePath, + }; + + const existing = uniqueById.get(msgId); + if (!existing) { + uniqueById.set(msgId, normalizedMessage); + continue; + } + + // Keep the latest timestamp deterministically when wacli returns duplicate MsgIDs. + if (normalizedMessage.Timestamp > existing.Timestamp) { + uniqueById.set(msgId, normalizedMessage); + } + } + + return [...uniqueById.values()].sort(sortMessagesByTimestampThenIdThenStore); +} + +function normalizeStores(stores: WacliStoreConfig[]): WacliStoreConfig[] { + const normalized: WacliStoreConfig[] = []; + const seenKeys = new Set(); + + for (const store of stores) { + if (!store || typeof store !== 'object') continue; + const key = asNonEmpty(store.key) ?? ''; + const storePath = asNonEmpty(store.storePath); + if (!key || !storePath || seenKeys.has(key)) continue; + seenKeys.add(key); + normalized.push({ key, storePath }); + } + + return normalized; +} + +function normalizeCheckpointStores(stores: Record | undefined): Record { + const normalized: Record = {}; + if (!stores || typeof stores !== 'object') { + return normalized; + } + + for (const [storeKey, checkpoint] of Object.entries(stores)) { + const key = asNonEmpty(storeKey); + if (!key) continue; + normalized[key] = normalizeStoreCheckpoint(checkpoint); + } + + return normalized; +} + +function normalizeStoreCheckpoint(checkpoint: unknown): WacliStoreCheckpoint { + const record = isRecord(checkpoint) ? checkpoint : {}; + const after = normalizeTimestamp(record.after); + const updatedAt = normalizeTimestamp(record.updated_at); + const idsRaw = Array.isArray(record.message_ids_at_after) + ? record.message_ids_at_after + : Array.isArray(record.ids) + ? record.ids + : []; + + const ids = Array.from( + new Set( + idsRaw + .map((value) => asNonEmpty(value)) + .filter((value): value is string => Boolean(value)) + ) + ).sort(); + + return { + after, + message_ids_at_after: ids, + updated_at: updatedAt, + }; +} + +function parseCheckpointState(raw: string): WacliCollectorCheckpointState { + try { + const parsed = JSON.parse(raw); + if (!isRecord(parsed)) { + return emptyCheckpointState(); + } + + const version = Number.isInteger(parsed.version) ? Number(parsed.version) : CHECKPOINT_VERSION; + const stores = normalizeCheckpointStores(isRecord(parsed.stores) ? (parsed.stores as Record) : {}); + return { + version, + stores, + }; + } catch { + return emptyCheckpointState(); + } +} + +function emptyCheckpointState(): WacliCollectorCheckpointState { + return { + version: CHECKPOINT_VERSION, + stores: {}, + }; +} + +function filterMessagesAfterCheckpoint( + messages: CollectedWhatsAppMessage[], + checkpoint: WacliStoreCheckpoint +): CollectedWhatsAppMessage[] { + const checkpointAfter = checkpoint.after; + if (!checkpointAfter) { + return messages; + } + + const checkpointMs = Date.parse(checkpointAfter); + if (Number.isNaN(checkpointMs)) { + return messages; + } + const seenAtCheckpoint = new Set(checkpoint.message_ids_at_after); + + return messages.filter((message) => { + const messageMs = Date.parse(message.Timestamp); + if (Number.isNaN(messageMs)) { + return false; + } + if (messageMs > checkpointMs) { + return true; + } + if (messageMs < checkpointMs) { + return false; + } + return !seenAtCheckpoint.has(message.MsgID); + }); +} + +function advanceCheckpoint( + existing: WacliStoreCheckpoint, + newMessages: CollectedWhatsAppMessage[], + now: Date +): WacliStoreCheckpoint { + if (newMessages.length === 0) { + return existing; + } + + const sorted = [...newMessages].sort(sortMessagesByTimestampThenIdThenStore); + const after = sorted[sorted.length - 1]?.Timestamp ?? existing.after; + const idsAtAfter = sorted + .filter((message) => message.Timestamp === after) + .map((message) => message.MsgID) + .sort(); + + return { + after, + message_ids_at_after: idsAtAfter, + updated_at: now.toISOString(), + }; +} + +function areStoreCheckpointsEqual(a: WacliStoreCheckpoint, b: WacliStoreCheckpoint): boolean { + if (a.after !== b.after || a.updated_at !== b.updated_at) { + return false; + } + if (a.message_ids_at_after.length !== b.message_ids_at_after.length) { + return false; + } + for (let i = 0; i < a.message_ids_at_after.length; i += 1) { + if (a.message_ids_at_after[i] !== b.message_ids_at_after[i]) { + return false; + } + } + return true; +} + +function latestTimestamp(messages: CollectedWhatsAppMessage[]): string | null { + if (messages.length === 0) { + return null; + } + return messages[messages.length - 1]?.Timestamp ?? null; +} + +function latestWacliSyncAt(stores: WacliStoreCollectionResult[]): string | null { + const syncTimestamps = stores + .map((store) => normalizeTimestamp(store.lastSyncAt)) + .filter((value): value is string => Boolean(value)); + + if (syncTimestamps.length === 0) { + return null; + } + + syncTimestamps.sort(); + return syncTimestamps[syncTimestamps.length - 1] ?? null; +} + +function isTimestampStale(timestamp: string, now: Date, staleAfterHours: number): boolean { + const parsed = Date.parse(timestamp); + if (Number.isNaN(parsed)) { + return true; + } + const ageMs = now.getTime() - parsed; + return ageMs > staleAfterHours * 60 * 60 * 1000; +} + +function sortMessagesByTimestampThenIdThenStore( + a: CollectedWhatsAppMessage, + b: CollectedWhatsAppMessage +): number { + const timestampDiff = a.Timestamp.localeCompare(b.Timestamp); + if (timestampDiff !== 0) { + return timestampDiff; + } + const idDiff = a.MsgID.localeCompare(b.MsgID); + if (idDiff !== 0) { + return idDiff; + } + return a.store_key.localeCompare(b.store_key); +} + +function normalizeTimestamp(value: unknown): string | null { + const text = asNonEmpty(value); + if (!text) return null; + const parsed = new Date(text); + if (Number.isNaN(parsed.getTime())) { + return null; + } + return parsed.toISOString(); +} + +function normalizeLimit(value: number | undefined): number { + if (!Number.isInteger(value) || !value || value < 1) { + return DEFAULT_LIMIT; + } + return value; +} + +function normalizeStaleAfterHours(value: number | undefined): number { + if (!Number.isFinite(value) || value === undefined || value <= 0) { + return DEFAULT_STALE_AFTER_HOURS; + } + return value; +} + +function ensureDate(value: Date, field: string): Date { + if (!(value instanceof Date) || Number.isNaN(value.getTime())) { + throw new Error(`Invalid ${field}: expected Date`); + } + return value; +} + +function asString(value: unknown): string | null { + if (typeof value !== 'string') { + return null; + } + return value; +} + +function asNonEmpty(value: unknown): string | null { + const text = asString(value); + if (text === null) { + return null; + } + const normalized = text.trim(); + return normalized.length > 0 ? normalized : null; +} + +function asBoolean(value: unknown): boolean | null { + if (typeof value === 'boolean') return value; + if (typeof value === 'string') { + if (value.toLowerCase() === 'true') return true; + if (value.toLowerCase() === 'false') return false; + } + return null; +} + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value); +} + +function errorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + if (typeof error === 'string') { + return error; + } + return String(error); +} diff --git a/test/action-brain/collector.test.ts b/test/action-brain/collector.test.ts new file mode 100644 index 00000000..c608d9ab --- /dev/null +++ b/test/action-brain/collector.test.ts @@ -0,0 +1,364 @@ +import { afterEach, describe, expect, test } from 'bun:test'; +import { mkdtempSync, rmSync } from 'fs'; +import { join } from 'path'; +import { tmpdir } from 'os'; +import { + collectWacliMessages, + latestCheckpointSyncAt, + readWacliCollectorCheckpoint, + readWacliCollectorLastSyncAt, + summarizeWacliHealth, + type WacliListMessagesRunner, + writeWacliCollectorCheckpoint, +} from '../../src/action-brain/collector.ts'; + +const tempDirs: string[] = []; + +afterEach(() => { + while (tempDirs.length > 0) { + const dir = tempDirs.pop(); + if (!dir) continue; + rmSync(dir, { recursive: true, force: true }); + } +}); + +describe('action-brain collector checkpoint storage', () => { + test('writes and reads per-store checkpoint state', async () => { + const root = createTempDir(); + const checkpointPath = join(root, 'wacli-checkpoint.json'); + + await writeWacliCollectorCheckpoint(checkpointPath, { + version: 1, + stores: { + personal: { + after: '2026-04-16T08:00:00.000Z', + message_ids_at_after: ['m2', 'm3'], + updated_at: '2026-04-16T08:05:00.000Z', + }, + business: { + after: null, + message_ids_at_after: [], + updated_at: null, + }, + }, + }); + + const loaded = await readWacliCollectorCheckpoint(checkpointPath); + expect(loaded.version).toBe(1); + expect(loaded.stores.personal?.after).toBe('2026-04-16T08:00:00.000Z'); + expect(loaded.stores.personal?.message_ids_at_after).toEqual(['m2', 'm3']); + expect(loaded.stores.business?.after).toBeNull(); + }); + + test('invalid checkpoint JSON falls back to an empty state', async () => { + const root = createTempDir(); + const checkpointPath = join(root, 'wacli-checkpoint.json'); + await Bun.write(checkpointPath, '{not-valid-json'); + + const loaded = await readWacliCollectorCheckpoint(checkpointPath); + expect(loaded.version).toBe(1); + expect(loaded.stores).toEqual({}); + }); + + test('derives latest checkpoint sync timestamp across stores', async () => { + const root = createTempDir(); + const checkpointPath = join(root, 'wacli-checkpoint.json'); + await writeWacliCollectorCheckpoint(checkpointPath, { + version: 1, + stores: { + personal: { + after: '2026-04-16T03:00:00.000Z', + message_ids_at_after: ['p1'], + updated_at: '2026-04-16T03:00:01.000Z', + }, + business: { + after: '2026-04-16T05:00:00.000Z', + message_ids_at_after: ['b1'], + updated_at: '2026-04-16T05:00:01.000Z', + }, + }, + }); + + const checkpoint = await readWacliCollectorCheckpoint(checkpointPath); + expect(latestCheckpointSyncAt(checkpoint)).toBe('2026-04-16T05:00:00.000Z'); + expect(await readWacliCollectorLastSyncAt(checkpointPath)).toBe('2026-04-16T05:00:00.000Z'); + }); +}); + +describe('collectWacliMessages', () => { + test('collects from personal + business stores and advances checkpoint without replaying seen IDs', async () => { + const root = createTempDir(); + const checkpointPath = join(root, 'wacli-checkpoint.json'); + + await writeWacliCollectorCheckpoint(checkpointPath, { + version: 1, + stores: { + personal: { + after: '2026-04-16T00:00:00.000Z', + message_ids_at_after: ['old-same-second'], + updated_at: '2026-04-16T00:01:00.000Z', + }, + }, + }); + + const calls: Array<{ storePath: string; after: string | null; limit: number }> = []; + const runner: WacliListMessagesRunner = async (request) => { + calls.push({ ...request }); + + if (request.storePath === '/stores/personal') { + return { + success: true, + data: { + messages: [ + { + MsgID: 'old-same-second', + ChatName: 'Ops', + SenderJID: 's1@jid', + Timestamp: '2026-04-16T00:00:00.000Z', + FromMe: false, + Text: 'already processed', + }, + { + MsgID: 'p2', + ChatName: 'Ops', + SenderJID: 's2@jid', + Timestamp: '2026-04-16T00:00:00.000Z', + FromMe: false, + Text: 'new same second', + }, + { + MsgID: 'p3', + ChatName: 'Ops', + SenderName: 'Sam', + SenderJID: 's3@jid', + Timestamp: '2026-04-16T00:10:00.000Z', + FromMe: false, + Text: 'new later', + }, + ], + }, + error: null, + }; + } + + if (request.storePath === '/stores/business') { + return { + success: true, + data: { + messages: [ + { + MsgID: 'b1', + ChatName: 'Biz', + SenderName: 'Nichol', + SenderJID: 'b1@jid', + Timestamp: '2026-04-16T00:05:00.000Z', + FromMe: false, + Text: 'new business', + }, + ], + }, + error: null, + }; + } + + throw new Error(`unexpected store path: ${request.storePath}`); + }; + + const result = await collectWacliMessages({ + checkpointPath, + stores: [ + { key: 'personal', storePath: '/stores/personal' }, + { key: 'business', storePath: '/stores/business' }, + ], + now: new Date('2026-04-16T00:30:00.000Z'), + limit: 50, + runner, + }); + + expect(calls).toEqual([ + { storePath: '/stores/personal', after: '2026-04-16T00:00:00.000Z', limit: 50 }, + { storePath: '/stores/business', after: null, limit: 50 }, + ]); + + expect(result.degraded).toBe(false); + expect(result.messages.map((message) => message.MsgID)).toEqual(['p2', 'b1', 'p3']); + expect(result.stores.find((store) => store.storeKey === 'personal')?.batchSize).toBe(2); + expect(result.stores.find((store) => store.storeKey === 'business')?.batchSize).toBe(1); + expect(result.stores.find((store) => store.storeKey === 'personal')?.checkpointAfter).toBe('2026-04-16T00:10:00.000Z'); + expect(result.stores.find((store) => store.storeKey === 'business')?.checkpointAfter).toBe('2026-04-16T00:05:00.000Z'); + + const stored = await readWacliCollectorCheckpoint(checkpointPath); + expect(stored.stores.personal?.after).toBe('2026-04-16T00:10:00.000Z'); + expect(stored.stores.personal?.message_ids_at_after).toEqual(['p3']); + expect(stored.stores.business?.after).toBe('2026-04-16T00:05:00.000Z'); + expect(stored.stores.business?.message_ids_at_after).toEqual(['b1']); + }); + + test('marks store as degraded when latest sync is unknown', async () => { + const root = createTempDir(); + const checkpointPath = join(root, 'wacli-checkpoint.json'); + const runner: WacliListMessagesRunner = async (_request) => ({ + success: true, + data: { messages: null }, + error: null, + }); + + const result = await collectWacliMessages({ + checkpointPath, + stores: [{ key: 'personal', storePath: '/stores/personal' }], + now: new Date('2026-04-16T12:00:00.000Z'), + runner, + }); + + expect(result.degraded).toBe(true); + expect(result.stores[0]?.degraded).toBe(true); + expect(result.stores[0]?.degradedReason).toBe('last_sync_unknown'); + expect(result.stores[0]?.lastSyncAt).toBeNull(); + expect(result.stores[0]?.batchSize).toBe(0); + }); + + test('marks store as degraded when latest sync is stale', async () => { + const root = createTempDir(); + const checkpointPath = join(root, 'wacli-checkpoint.json'); + + const runner: WacliListMessagesRunner = async (request) => { + if (request.after) { + throw new Error('unexpected --after call'); + } + if (request.limit > 1) { + return { success: true, data: { messages: [] }, error: null }; + } + return { + success: true, + data: { + messages: [ + { + MsgID: 'latest', + ChatName: 'Ops', + SenderJID: 'sender@jid', + Timestamp: '2026-04-15T09:00:00Z', + FromMe: false, + Text: 'old message', + }, + ], + }, + error: null, + }; + }; + + const result = await collectWacliMessages({ + checkpointPath, + stores: [{ key: 'personal', storePath: '/stores/personal' }], + staleAfterHours: 24, + now: new Date('2026-04-16T12:00:00.000Z'), + runner, + }); + + expect(result.degraded).toBe(true); + expect(result.stores[0]?.degraded).toBe(true); + expect(result.stores[0]?.degradedReason).toBe('last_sync_stale'); + expect(result.stores[0]?.lastSyncAt).toBe('2026-04-15T09:00:00.000Z'); + }); + + test('does not persist checkpoint when persistCheckpoint=false', async () => { + const root = createTempDir(); + const checkpointPath = join(root, 'wacli-checkpoint.json'); + await writeWacliCollectorCheckpoint(checkpointPath, { + version: 1, + stores: { + personal: { + after: '2026-04-15T12:00:00.000Z', + message_ids_at_after: ['old-id'], + updated_at: '2026-04-15T12:00:00.000Z', + }, + }, + }); + + const runner: WacliListMessagesRunner = async () => ({ + success: true, + data: { + messages: [ + { + MsgID: 'new-id', + ChatName: 'Ops', + SenderJID: 'sender@jid', + Timestamp: '2026-04-16T12:00:00.000Z', + FromMe: false, + Text: 'new message', + }, + ], + }, + error: null, + }); + + const result = await collectWacliMessages({ + checkpointPath, + stores: [{ key: 'personal', storePath: '/stores/personal' }], + persistCheckpoint: false, + now: new Date('2026-04-16T12:30:00.000Z'), + runner, + }); + + expect(result.stores[0]?.checkpointAfter).toBe('2026-04-16T12:00:00.000Z'); + + const persisted = await readWacliCollectorCheckpoint(checkpointPath); + expect(persisted.stores.personal?.after).toBe('2026-04-15T12:00:00.000Z'); + }); + + test('summarizes failed health when any store is disconnected', async () => { + const result = summarizeWacliHealth( + [ + { + storeKey: 'personal', + storePath: '/stores/personal', + checkpointBefore: null, + checkpointAfter: null, + batchSize: 0, + lastSyncAt: null, + degraded: true, + degradedReason: 'command_failed', + error: 'spawn wacli ENOENT', + messages: [], + }, + ], + { now: new Date('2026-04-16T12:00:00.000Z') } + ); + + expect(result.status).toBe('failed'); + expect(result.disconnectedStoreKeys).toEqual(['personal']); + expect(result.staleStoreKeys).toEqual([]); + expect(result.alerts[0]).toContain('unhealthy'); + }); + + test('summarizes degraded health when only stale stores are present', async () => { + const result = summarizeWacliHealth( + [ + { + storeKey: 'personal', + storePath: '/stores/personal', + checkpointBefore: null, + checkpointAfter: null, + batchSize: 0, + lastSyncAt: '2026-04-15T10:00:00.000Z', + degraded: true, + degradedReason: 'last_sync_stale', + error: null, + messages: [], + }, + ], + { now: new Date('2026-04-16T12:00:00.000Z') } + ); + + expect(result.status).toBe('degraded'); + expect(result.disconnectedStoreKeys).toEqual([]); + expect(result.staleStoreKeys).toEqual(['personal']); + expect(result.lastSyncAt).toBe('2026-04-15T10:00:00.000Z'); + expect(result.alerts[0]).toContain('stale'); + }); +}); + +function createTempDir(): string { + const dir = mkdtempSync(join(tmpdir(), 'action-brain-collector-test-')); + tempDirs.push(dir); + return dir; +} From b4ae25a16e54edb4c596dcca028f187888d24692 Mon Sep 17 00:00:00 2001 From: Abhinav Bansal Date: Thu, 16 Apr 2026 22:47:42 +0800 Subject: [PATCH 2/7] feat(action-brain): pulse auto-ingest runner + checkpoint-aware brief (v0.10.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds src/action-brain/ingest-runner.ts — cron-ready auto-ingest pipeline that reads new wacli messages, runs LLM extraction, and stores results. Checkpoint-aware: skips already-processed messages. Staleness gate bails if wacli data is older than --stale-after-hours (default 24h). Also: - action_engine: createItemWithResult() returns idempotency signal - extractor: owner context injection for better extraction accuracy - operations: action_brief reads checkpoint automatically; action_ingest_auto operation wires preflight + collect + extract + store in one call - cli: `gbrain action run` command (checkpoint-path, stale-after-hours, wacli-limit flags) Closes GIT-47. Co-Authored-By: Paperclip --- CHANGELOG.md | 8 + VERSION | 2 +- src/action-brain/action-engine.ts | 18 +- src/action-brain/extractor.ts | 5 + src/action-brain/ingest-runner.ts | 313 ++++++++++++++++++ src/action-brain/operations.ts | 124 +++++++- src/cli.ts | 5 +- test/action-brain/extractor.test.ts | 13 + test/action-brain/ingest-runner.test.ts | 401 ++++++++++++++++++++++++ test/action-brain/operations.test.ts | 49 +++ 10 files changed, 934 insertions(+), 4 deletions(-) create mode 100644 src/action-brain/ingest-runner.ts create mode 100644 test/action-brain/ingest-runner.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 4546d874..4ee8aa33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to GBrain will be documented in this file. +## [0.10.2] - 2026-04-16 + +### Added + +- **`gbrain action run` — cron-ready auto-ingest pipeline.** One command reads new WhatsApp messages from the wacli store, extracts commitments with the LLM extractor, and stores them in Action Brain. Checkpoint-aware: skips already-processed messages. Staleness gate: bails out if wacli data is older than `--stale-after-hours` (default 24h). Returns structured JSON with counts and errors — CI/cron-friendly. +- **Morning brief is now checkpoint-aware.** `gbrain action brief` auto-reads the wacli checkpoint to compute message freshness — no more manually passing `--last-sync-at`. Pass `--checkpoint-path` to override the default location. +- **Action item creation now returns idempotency signal.** `createItemWithResult()` tells callers whether an item was freshly inserted or already existed — so the ingest pipeline can report accurate created/skipped counts without extra DB queries. + ## [0.10.1] - 2026-04-16 ### Added diff --git a/VERSION b/VERSION index 57121573..5eef0f10 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.10.1 +0.10.2 diff --git a/src/action-brain/action-engine.ts b/src/action-brain/action-engine.ts index bb7f794e..06635960 100644 --- a/src/action-brain/action-engine.ts +++ b/src/action-brain/action-engine.ts @@ -54,6 +54,11 @@ export interface ActionMutationOptions { metadata?: Record; } +export interface CreateActionItemResult { + item: ActionItem; + created: boolean; +} + export interface ListActionItemsFilters { status?: ActionStatus; owner?: string; @@ -80,6 +85,14 @@ export class ActionEngine { constructor(private readonly db: ActionDb) {} async createItem(input: CreateActionItemInput, options: ActionMutationOptions = {}): Promise { + const result = await this.createItemWithResult(input, options); + return result.item; + } + + async createItemWithResult( + input: CreateActionItemInput, + options: ActionMutationOptions = {} + ): Promise { return this.withTransaction(async () => { const result = await this.db.query( `WITH inserted AS ( @@ -142,7 +155,10 @@ export class ActionEngine { ); } - return mapActionItem(row); + return { + item: mapActionItem(row), + created: toBoolean(row.was_inserted), + }; }); } diff --git a/src/action-brain/extractor.ts b/src/action-brain/extractor.ts index e581087d..473dfde3 100644 --- a/src/action-brain/extractor.ts +++ b/src/action-brain/extractor.ts @@ -23,6 +23,8 @@ export interface ExtractCommitmentsOptions { client?: AnthropicLike; model?: string; timeoutMs?: number; + /** When true, extraction errors are re-thrown for pipeline-level retry handling. */ + throwOnError?: boolean; /** The name of the person whose obligations we are tracking (e.g. "Abhinav Bansal"). */ ownerName?: string; /** Known aliases for the owner (e.g. ["Abbhinaav", "Abhi"]). */ @@ -140,6 +142,9 @@ export async function extractCommitments( const rawCommitments = parseCommitmentsFromResponse(response); return normalizeCommitments(rawCommitments); } catch (err) { + if (options.throwOnError) { + throw err instanceof Error ? err : new Error(String(err)); + } // Queueing/retry behavior lives in pipeline orchestration; extractor never throws on model failures. // Log so operators can distinguish "no commitments found" from "extraction failed". console.error('[action-brain] Extraction failed:', err instanceof Error ? err.message : String(err)); diff --git a/src/action-brain/ingest-runner.ts b/src/action-brain/ingest-runner.ts new file mode 100644 index 00000000..6ca21eb8 --- /dev/null +++ b/src/action-brain/ingest-runner.ts @@ -0,0 +1,313 @@ +import { createHash } from 'crypto'; +import { ActionEngine } from './action-engine.ts'; +import { + collectWacliMessages, + defaultCollectorCheckpointPath, + summarizeWacliHealth, + type CollectWacliMessagesOptions, + type WacliCollectionResult, + type WacliHealthStatus, + type WacliStoreCollectionResult, + writeWacliCollectorCheckpoint, +} from './collector.ts'; +import { extractCommitments, type StructuredCommitment, type WhatsAppMessage } from './extractor.ts'; +import { initActionSchema } from './action-schema.ts'; + +interface QueryResult { + rows: T[]; +} + +interface ActionDb { + query>(sql: string, params?: unknown[]): Promise>; + exec: (sql: string) => Promise; +} + +type FailureStage = 'collect' | 'health' | 'extract' | 'store' | 'checkpoint'; + +export interface ActionIngestFailure { + stage: FailureStage; + message: string; +} + +export interface ActionIngestRunSummary { + runAt: string; + success: boolean; + degraded: boolean; + healthStatus: WacliHealthStatus; + lastSyncAt: string | null; + alerts: string[]; + checkpointPath: string; + checkpointAdvanced: boolean; + messagesScanned: number; + commitmentsExtracted: number; + commitmentsCreated: number; + duplicatesSkipped: number; + lowConfidenceDropped: number; + stores: WacliStoreCollectionResult[]; + failure: ActionIngestFailure | null; +} + +export interface RunActionIngestOptions { + db: ActionDb; + now?: Date; + minConfidence?: number; + actor?: string; + model?: string; + timeoutMs?: number; + ownerName?: string; + ownerAliases?: string[]; + collectorOptions?: Omit; + collector?: (options: CollectWacliMessagesOptions) => Promise; + extractor?: typeof extractCommitments; +} + +const DEFAULT_MIN_CONFIDENCE = 0.7; + +export async function runActionIngest(options: RunActionIngestOptions): Promise { + const now = options.now ? ensureDate(options.now, 'now') : new Date(); + const checkpointPath = options.collectorOptions?.checkpointPath ?? defaultCollectorCheckpointPath(); + const collect = options.collector ?? collectWacliMessages; + const extract = options.extractor ?? extractCommitments; + const minConfidence = normalizeConfidenceThreshold(options.minConfidence); + const actor = asOptionalNonEmptyString(options.actor) ?? 'extractor'; + + await initActionSchema({ exec: options.db.exec.bind(options.db) }); + + const summary: ActionIngestRunSummary = { + runAt: now.toISOString(), + success: false, + degraded: false, + healthStatus: 'failed', + lastSyncAt: null, + alerts: [], + checkpointPath, + checkpointAdvanced: false, + messagesScanned: 0, + commitmentsExtracted: 0, + commitmentsCreated: 0, + duplicatesSkipped: 0, + lowConfidenceDropped: 0, + stores: [], + failure: null, + }; + + let collection: WacliCollectionResult; + try { + collection = await collect({ + ...(options.collectorOptions ?? {}), + now, + persistCheckpoint: false, + }); + } catch (err) { + summary.failure = toFailure('collect', err); + return summary; + } + + summary.checkpointPath = collection.checkpointPath; + summary.stores = collection.stores; + summary.messagesScanned = collection.messages.length; + + const health = summarizeWacliHealth(collection.stores, { now }); + summary.healthStatus = health.status; + summary.lastSyncAt = health.lastSyncAt; + summary.alerts = health.alerts; + summary.degraded = health.status !== 'healthy'; + + if (health.status === 'failed') { + summary.failure = toFailure('health', health.alerts[0] ?? 'wacli health check failed'); + return summary; + } + + let extracted: StructuredCommitment[]; + try { + extracted = await extract(collection.messages, { + model: asOptionalNonEmptyString(options.model) ?? undefined, + timeoutMs: options.timeoutMs, + throwOnError: true, + ownerName: asOptionalNonEmptyString(options.ownerName) ?? undefined, + ownerAliases: options.ownerAliases, + }); + } catch (err) { + summary.failure = toFailure('extract', err); + return summary; + } + + summary.commitmentsExtracted = extracted.length; + const commitments = extracted.filter((entry) => { + if (entry.confidence < minConfidence) { + summary.lowConfidenceDropped += 1; + return false; + } + return true; + }); + + const engine = new ActionEngine(options.db); + try { + for (const commitment of commitments) { + const sourceMessage = resolveSourceMessage(collection.messages, commitment); + const sourceMessageId = buildCommitmentSourceId( + resolveSourceMessageId(collection.messages, commitment, sourceMessage), + commitment + ); + + const result = await engine.createItemWithResult( + { + title: toActionTitle(commitment.owes_what), + type: commitment.type, + source_message_id: sourceMessageId, + owner: commitment.who ?? '', + waiting_on: null, + due_at: parseOptionalDate(commitment.by_when, 'by_when'), + confidence: clampConfidence(commitment.confidence), + source_thread: sourceMessage?.ChatName ?? '', + source_contact: sourceMessage?.SenderName ?? '', + linked_entity_slugs: [], + }, + { + actor, + metadata: { + ingestion_mode: 'auto_runner', + }, + } + ); + + if (result.created) { + summary.commitmentsCreated += 1; + } else { + summary.duplicatesSkipped += 1; + } + } + } catch (err) { + summary.failure = toFailure('store', err); + return summary; + } + + const shouldPersistCheckpoint = collection.stores.some((store) => store.checkpointBefore !== store.checkpointAfter); + if (!shouldPersistCheckpoint) { + summary.success = true; + return summary; + } + + try { + await writeWacliCollectorCheckpoint(collection.checkpointPath, collection.checkpoint); + summary.checkpointAdvanced = true; + summary.success = true; + return summary; + } catch (err) { + summary.failure = toFailure('checkpoint', err); + return summary; + } +} + +function resolveSourceMessage(messages: WhatsAppMessage[], commitment: StructuredCommitment): WhatsAppMessage | null { + if (messages.length === 0) { + return null; + } + + const explicitSourceMessageId = asOptionalNonEmptyString(commitment.source_message_id); + if (explicitSourceMessageId) { + const matched = messages.find((message) => message.MsgID === explicitSourceMessageId); + if (matched) { + return matched; + } + } + + return messages.length === 1 ? messages[0] : null; +} + +function resolveSourceMessageId( + messages: WhatsAppMessage[], + commitment: StructuredCommitment, + message: WhatsAppMessage | null +): string | null { + if (message) { + return message.MsgID; + } + + if (messages.length === 0) { + return asOptionalNonEmptyString(commitment.source_message_id); + } + + return null; +} + +function buildCommitmentSourceId(sourceMessageId: string | null, commitment: StructuredCommitment): string { + const baseMsgId = asOptionalNonEmptyString(sourceMessageId) ?? 'batch'; + const seed = [ + baseMsgId, + normalizeCommitmentField(commitment.who), + normalizeCommitmentField(commitment.owes_what), + normalizeCommitmentField(commitment.to_whom), + normalizeCommitmentField(commitment.by_when), + commitment.type, + ].join('|'); + const digest = createHash('sha256').update(seed).digest('hex').slice(0, 16); + return `${baseMsgId}:ab:${digest}`; +} + +function normalizeCommitmentField(value: string | null | undefined): string { + if (!value) return ''; + return value.trim().toLowerCase(); +} + +function toActionTitle(owesWhat: string): string { + const text = owesWhat.trim(); + if (text.length <= 160) return text; + return `${text.slice(0, 157)}...`; +} + +function parseOptionalDate(value: string | null | undefined, field: string): Date | null { + const normalized = asOptionalNonEmptyString(value); + if (!normalized) return null; + const parsed = new Date(normalized); + if (Number.isNaN(parsed.getTime())) { + throw new Error(`Invalid ${field}: ${normalized}`); + } + return parsed; +} + +function clampConfidence(value: number): number { + if (!Number.isFinite(value)) { + return 0; + } + return Math.min(1, Math.max(0, value)); +} + +function normalizeConfidenceThreshold(value: number | undefined): number { + if (typeof value !== 'number' || !Number.isFinite(value)) { + return DEFAULT_MIN_CONFIDENCE; + } + return Math.min(1, Math.max(0, value)); +} + +function toFailure(stage: FailureStage, err: unknown): ActionIngestFailure { + return { + stage, + message: errorMessage(err), + }; +} + +function asOptionalNonEmptyString(value: unknown): string | null { + if (typeof value !== 'string') { + return null; + } + const normalized = value.trim(); + return normalized.length > 0 ? normalized : null; +} + +function ensureDate(value: Date, field: string): Date { + if (!(value instanceof Date) || Number.isNaN(value.getTime())) { + throw new Error(`Invalid ${field}: expected valid Date`); + } + return value; +} + +function errorMessage(err: unknown): string { + if (err instanceof Error) { + return err.message; + } + if (typeof err === 'string') { + return err; + } + return JSON.stringify(err); +} diff --git a/src/action-brain/operations.ts b/src/action-brain/operations.ts index 8f2f54b3..3702340c 100644 --- a/src/action-brain/operations.ts +++ b/src/action-brain/operations.ts @@ -3,7 +3,13 @@ import type { BrainEngine } from '../core/engine.ts'; import type { Operation } from '../core/operations.ts'; import { ActionEngine, ActionItemNotFoundError, ActionTransitionError } from './action-engine.ts'; import { MorningBriefGenerator } from './brief.ts'; +import { + defaultCollectorCheckpointPath, + readWacliCollectorLastSyncAt, + type WacliStoreConfig, +} from './collector.ts'; import { extractCommitments, type StructuredCommitment, type WhatsAppMessage } from './extractor.ts'; +import { runActionIngest } from './ingest-runner.ts'; import { initActionSchema } from './action-schema.ts'; interface QueryResult { @@ -15,6 +21,10 @@ interface QueryableDb { exec?: (sql: string) => Promise; } +interface ExecQueryableDb extends QueryableDb { + exec: (sql: string) => Promise; +} + interface PostgresUnsafeConnection { unsafe: (sql: string, params?: unknown[]) => Promise[]>; reserve?: () => Promise; @@ -57,6 +67,10 @@ export const actionBrainOperations: Operation[] = [ params: { now: { type: 'string', description: 'Optional clock override (ISO timestamp)' }, last_sync_at: { type: 'string', description: 'Override wacli freshness timestamp (ISO timestamp)' }, + checkpoint_path: { + type: 'string', + description: 'Optional wacli checkpoint path used to auto-resolve freshness when last_sync_at is not provided', + }, timezone_offset_minutes: { type: 'number', description: 'Timezone offset in minutes east of UTC for due-today classification', @@ -66,10 +80,18 @@ export const actionBrainOperations: Operation[] = [ handler: async (ctx, p) => { const db = await ensureActionBrainSchema(ctx.engine); const generator = new MorningBriefGenerator(db); + const explicitLastSyncAt = parseOptionalDate(p.last_sync_at, 'last_sync_at'); + const checkpointPath = asOptionalNonEmptyString(p.checkpoint_path) ?? defaultCollectorCheckpointPath(); + + let inferredLastSyncAt: Date | null = null; + if (!explicitLastSyncAt) { + const checkpointLastSyncAt = await readWacliCollectorLastSyncAt(checkpointPath); + inferredLastSyncAt = parseOptionalDate(checkpointLastSyncAt, 'checkpoint.last_sync_at'); + } const brief = await generator.generateMorningBrief({ now: parseOptionalDate(p.now, 'now') ?? undefined, - lastSyncAt: parseOptionalDate(p.last_sync_at, 'last_sync_at'), + lastSyncAt: explicitLastSyncAt ?? inferredLastSyncAt, timezoneOffsetMinutes: asOptionalNumber(p.timezone_offset_minutes), }); @@ -222,6 +244,61 @@ export const actionBrainOperations: Operation[] = [ }; }, }, + { + name: 'action_ingest_auto', + description: 'Run the wacli collector + extractor auto-ingest pipeline with preflight health checks', + params: { + now: { type: 'string', description: 'Optional clock override (ISO timestamp)' }, + min_confidence: { type: 'number', description: 'Drop commitments below this confidence threshold (default: 0.7)' }, + actor: { type: 'string', description: 'Actor writing created events' }, + model: { type: 'string', description: 'Anthropic model override' }, + timeout_ms: { type: 'number', description: 'Extractor timeout in milliseconds' }, + owner_name: { type: 'string', description: 'Owner name used by extraction prompt grounding' }, + owner_aliases: { type: 'array', items: { type: 'string' }, description: 'Optional owner alias list' }, + owner_aliases_json: { type: 'string', description: 'JSON-encoded owner alias list (CLI-friendly)' }, + checkpoint_path: { type: 'string', description: 'Collector checkpoint path override' }, + wacli_limit: { type: 'number', description: 'Max messages per wacli list call (default: 200)' }, + stale_after_hours: { + type: 'number', + description: 'Mark stores degraded when latest sync is older than this many hours (default: 24)', + }, + personal_store_path: { type: 'string', description: 'Override personal wacli store path' }, + business_store_path: { type: 'string', description: 'Override business wacli store path' }, + }, + mutating: true, + cliHints: { name: 'action-run' }, + handler: async (ctx, p) => { + const db = await ensureActionBrainSchema(ctx.engine); + const execDb = requireExecQueryableDb(db); + + if (ctx.dryRun) { + return { dry_run: true, action: 'action_ingest_auto' }; + } + + const stores = parseStoreOverrides({ + personalStorePath: p.personal_store_path, + businessStorePath: p.business_store_path, + }); + const ownerAliases = parseStringArrayParam(p.owner_aliases ?? p.owner_aliases_json); + + return runActionIngest({ + db: execDb, + now: parseOptionalDate(p.now, 'now') ?? undefined, + minConfidence: asOptionalNumber(p.min_confidence), + actor: asOptionalNonEmptyString(p.actor) ?? undefined, + model: asOptionalNonEmptyString(p.model) ?? undefined, + timeoutMs: asOptionalNumber(p.timeout_ms), + ownerName: asOptionalNonEmptyString(p.owner_name) ?? undefined, + ownerAliases: ownerAliases.length > 0 ? ownerAliases : undefined, + collectorOptions: { + checkpointPath: asOptionalNonEmptyString(p.checkpoint_path) ?? undefined, + limit: normalizePositiveInteger(p.wacli_limit), + staleAfterHours: asOptionalNumber(p.stale_after_hours), + stores: stores.length > 0 ? stores : undefined, + }, + }); + }, + }, ]; async function ensureActionBrainSchema(engine: BrainEngine): Promise { @@ -274,6 +351,13 @@ async function resolveActionDb(engine: BrainEngine): Promise { throw new Error('Unsupported engine for Action Brain operations. Expected PGLiteEngine or PostgresEngine.'); } +function requireExecQueryableDb(db: QueryableDb): ExecQueryableDb { + if (typeof db.exec !== 'function') { + throw new Error('Action auto-ingest requires an exec-capable database adapter.'); + } + return db as ExecQueryableDb; +} + function parseMessagesParam(value: unknown): WhatsAppMessage[] { const raw = parseJsonArrayInput(value); if (raw.length === 0) { @@ -358,6 +442,17 @@ function parseJsonArrayInput(value: unknown): unknown[] { return []; } +function parseStringArrayParam(value: unknown): string[] { + const raw = parseJsonArrayInput(value); + if (raw.length === 0) { + return []; + } + + return raw + .map((entry) => asOptionalNonEmptyString(entry)) + .filter((entry): entry is string => Boolean(entry)); +} + function resolveSourceMessage(messages: WhatsAppMessage[], commitment: StructuredCommitment): WhatsAppMessage | null { if (messages.length === 0) { return null; @@ -416,6 +511,25 @@ function toActionTitle(owesWhat: string): string { return `${text.slice(0, 157)}...`; } +function parseStoreOverrides(input: { + personalStorePath: unknown; + businessStorePath: unknown; +}): WacliStoreConfig[] { + const personalStorePath = asOptionalNonEmptyString(input.personalStorePath); + const businessStorePath = asOptionalNonEmptyString(input.businessStorePath); + const stores: WacliStoreConfig[] = []; + + if (personalStorePath) { + stores.push({ key: 'personal', storePath: personalStorePath }); + } + + if (businessStorePath && businessStorePath !== personalStorePath) { + stores.push({ key: 'business', storePath: businessStorePath }); + } + + return stores; +} + function asOptionalNumber(value: unknown): number | undefined { if (typeof value === 'number' && Number.isFinite(value)) { return value; @@ -429,6 +543,14 @@ function asOptionalNumber(value: unknown): number | undefined { return undefined; } +function normalizePositiveInteger(value: unknown): number | undefined { + const parsed = asOptionalNumber(value); + if (parsed === undefined || !Number.isInteger(parsed) || parsed < 1) { + return undefined; + } + return parsed; +} + function asRequiredInteger(value: unknown, param: string): number { const parsed = asOptionalNumber(value); if (parsed === undefined || !Number.isInteger(parsed)) { diff --git a/src/cli.ts b/src/cli.ts index de676c41..9bd64d58 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -25,6 +25,7 @@ const ACTION_SUBCOMMANDS = new Map([ ['resolve', 'action-resolve'], ['mark-fp', 'action-mark-fp'], ['ingest', 'action-ingest'], + ['run', 'action-run'], ]); async function main() { @@ -462,6 +463,7 @@ ACTION action resolve Mark an action item resolved action mark-fp Mark extraction as false positive action ingest [--messages-json J] Extract and ingest commitments from a message batch + action run Run auto-ingest pipeline (cron-friendly) TOOLS publish [--password] Shareable HTML (strips private data, optional AES-256) @@ -489,10 +491,11 @@ function printActionHelp() { Subcommands: list [--status S --owner O --stale] - brief [--now ] [--last-sync-at ] [--timezone-offset-minutes ] + brief [--now ] [--last-sync-at ] [--checkpoint-path ] [--timezone-offset-minutes ] resolve mark-fp ingest [--messages-json ] [--model ] [--timeout-ms ] + run [--checkpoint-path ] [--stale-after-hours ] [--wacli-limit ] `); } diff --git a/test/action-brain/extractor.test.ts b/test/action-brain/extractor.test.ts index f64df18d..0aa6977e 100644 --- a/test/action-brain/extractor.test.ts +++ b/test/action-brain/extractor.test.ts @@ -194,6 +194,19 @@ describe('extractCommitments', () => { expect(output).toEqual([]); }); + test('rethrows extractor errors when throwOnError=true', async () => { + const fakeClient = new FakeAnthropicClient(() => { + throw new Error('anthropic unavailable'); + }); + + await expect( + extractCommitments([message('msg-004b', 'please send docs')], { + client: fakeClient, + throwOnError: true, + }) + ).rejects.toThrow('anthropic unavailable'); + }); + test('#5 recovers from text JSON output when tool_use block is absent', async () => { const fakeClient = new FakeAnthropicClient(() => textJsonResponse({ diff --git a/test/action-brain/ingest-runner.test.ts b/test/action-brain/ingest-runner.test.ts new file mode 100644 index 00000000..1728a470 --- /dev/null +++ b/test/action-brain/ingest-runner.test.ts @@ -0,0 +1,401 @@ +import { afterEach, describe, expect, test } from 'bun:test'; +import { mkdtempSync, rmSync } from 'fs'; +import { join } from 'path'; +import { tmpdir } from 'os'; +import { PGLiteEngine } from '../../src/core/pglite-engine.ts'; +import { + type CollectedWhatsAppMessage, + readWacliCollectorCheckpoint, + writeWacliCollectorCheckpoint, + type CollectWacliMessagesOptions, + type WacliCollectionResult, +} from '../../src/action-brain/collector.ts'; +import { runActionIngest } from '../../src/action-brain/ingest-runner.ts'; +import type { StructuredCommitment } from '../../src/action-brain/extractor.ts'; + +interface ActionDb { + query>(sql: string, params?: unknown[]): Promise<{ rows: T[] }>; + exec: (sql: string) => Promise; +} + +interface EngineWithDb { + db: ActionDb; +} + +const tempDirs: string[] = []; + +afterEach(() => { + while (tempDirs.length > 0) { + const dir = tempDirs.pop(); + if (!dir) continue; + rmSync(dir, { recursive: true, force: true }); + } +}); + +describe('runActionIngest', () => { + test('runs collect -> extract -> store and advances checkpoint only after store succeeds', async () => { + await withDb(async (db) => { + const root = createTempDir(); + const checkpointPath = join(root, 'wacli-checkpoint.json'); + await writeWacliCollectorCheckpoint(checkpointPath, { + version: 1, + stores: { + personal: { + after: '2026-04-15T00:00:00.000Z', + message_ids_at_after: ['old-1'], + updated_at: '2026-04-15T00:00:00.000Z', + }, + }, + }); + + let collectorPersistFlag: boolean | undefined; + const messages = [ + message('m1', '2026-04-16T08:00:00.000Z', 'Joe to send shipment docs by 5pm'), + message('m2', '2026-04-16T08:05:00.000Z', 'Mukesh to confirm payout'), + ]; + const extractorOutput: StructuredCommitment[] = [ + commitment('Joe', 'Send shipment docs', 'm1', 0.92), + commitment('Mukesh', 'Confirm payout', 'm2', 0.81), + commitment('Joe', 'FYI mention', 'm1', 0.3), + ]; + + const summary = await runActionIngest({ + db, + minConfidence: 0.7, + collectorOptions: { checkpointPath }, + collector: async (options: CollectWacliMessagesOptions): Promise => { + collectorPersistFlag = options.persistCheckpoint; + return { + collectedAt: '2026-04-16T08:10:00.000Z', + checkpointPath, + limit: 200, + staleAfterHours: 24, + stores: [ + { + storeKey: 'personal', + storePath: '/stores/personal', + checkpointBefore: '2026-04-15T00:00:00.000Z', + checkpointAfter: '2026-04-16T08:05:00.000Z', + batchSize: 2, + lastSyncAt: '2026-04-16T08:05:00.000Z', + degraded: false, + degradedReason: null, + error: null, + messages, + }, + ], + messages, + degraded: false, + checkpoint: { + version: 1, + stores: { + personal: { + after: '2026-04-16T08:05:00.000Z', + message_ids_at_after: ['m2'], + updated_at: '2026-04-16T08:10:00.000Z', + }, + }, + }, + }; + }, + extractor: async (_messages) => extractorOutput, + }); + + expect(collectorPersistFlag).toBe(false); + expect(summary.success).toBe(true); + expect(summary.healthStatus).toBe('healthy'); + expect(summary.lastSyncAt).toBe('2026-04-16T08:05:00.000Z'); + expect(summary.alerts).toEqual([]); + expect(summary.messagesScanned).toBe(2); + expect(summary.commitmentsExtracted).toBe(3); + expect(summary.lowConfidenceDropped).toBe(1); + expect(summary.commitmentsCreated).toBe(2); + expect(summary.duplicatesSkipped).toBe(0); + expect(summary.checkpointAdvanced).toBe(true); + expect(summary.failure).toBeNull(); + + const checkpoint = await readWacliCollectorCheckpoint(checkpointPath); + expect(checkpoint.stores.personal?.after).toBe('2026-04-16T08:05:00.000Z'); + + const rows = await db.query<{ count: number }>('SELECT count(*)::int AS count FROM action_items'); + expect(rows.rows[0]?.count).toBe(2); + }); + }); + + test('is idempotent across repeated runs and counts duplicates skipped', async () => { + await withDb(async (db) => { + const root = createTempDir(); + const checkpointPath = join(root, 'wacli-checkpoint.json'); + const messages = [message('m3', '2026-04-16T09:00:00.000Z', 'Joe to send vessel update')]; + const extractorOutput = [commitment('Joe', 'Send vessel update', 'm3', 0.9)]; + + const collector = async (_options: CollectWacliMessagesOptions): Promise => ({ + collectedAt: '2026-04-16T09:05:00.000Z', + checkpointPath, + limit: 200, + staleAfterHours: 24, + stores: [ + { + storeKey: 'personal', + storePath: '/stores/personal', + checkpointBefore: '2026-04-16T08:00:00.000Z', + checkpointAfter: '2026-04-16T09:00:00.000Z', + batchSize: 1, + lastSyncAt: '2026-04-16T09:00:00.000Z', + degraded: false, + degradedReason: null, + error: null, + messages, + }, + ], + messages, + degraded: false, + checkpoint: { + version: 1, + stores: { + personal: { + after: '2026-04-16T09:00:00.000Z', + message_ids_at_after: ['m3'], + updated_at: '2026-04-16T09:05:00.000Z', + }, + }, + }, + }); + + const firstRun = await runActionIngest({ + db, + collectorOptions: { checkpointPath }, + collector, + extractor: async () => extractorOutput, + }); + const secondRun = await runActionIngest({ + db, + collectorOptions: { checkpointPath }, + collector, + extractor: async () => extractorOutput, + }); + + expect(firstRun.success).toBe(true); + expect(firstRun.commitmentsCreated).toBe(1); + expect(firstRun.duplicatesSkipped).toBe(0); + expect(firstRun.healthStatus).toBe('healthy'); + + expect(secondRun.success).toBe(true); + expect(secondRun.commitmentsCreated).toBe(0); + expect(secondRun.duplicatesSkipped).toBe(1); + }); + }); + + test('does not advance checkpoint when store stage fails', async () => { + await withDb(async (db) => { + const root = createTempDir(); + const checkpointPath = join(root, 'wacli-checkpoint.json'); + await writeWacliCollectorCheckpoint(checkpointPath, { + version: 1, + stores: { + personal: { + after: '2026-04-15T22:00:00.000Z', + message_ids_at_after: ['prev'], + updated_at: '2026-04-15T22:00:00.000Z', + }, + }, + }); + + const messages = [message('m4', '2026-04-16T10:00:00.000Z', 'Joe to send manifest')]; + const summary = await runActionIngest({ + db, + collectorOptions: { checkpointPath }, + collector: async (_options: CollectWacliMessagesOptions): Promise => ({ + collectedAt: '2026-04-16T10:05:00.000Z', + checkpointPath, + limit: 200, + staleAfterHours: 24, + stores: [ + { + storeKey: 'personal', + storePath: '/stores/personal', + checkpointBefore: '2026-04-15T22:00:00.000Z', + checkpointAfter: '2026-04-16T10:00:00.000Z', + batchSize: 1, + lastSyncAt: '2026-04-16T10:00:00.000Z', + degraded: false, + degradedReason: null, + error: null, + messages, + }, + ], + messages, + degraded: false, + checkpoint: { + version: 1, + stores: { + personal: { + after: '2026-04-16T10:00:00.000Z', + message_ids_at_after: ['m4'], + updated_at: '2026-04-16T10:05:00.000Z', + }, + }, + }, + }), + extractor: async () => [ + { + ...commitment('Joe', 'Send manifest', 'm4', 0.9), + by_when: 'not-a-date', + }, + ], + }); + + expect(summary.success).toBe(false); + expect(summary.failure?.stage).toBe('store'); + expect(summary.checkpointAdvanced).toBe(false); + + const checkpoint = await readWacliCollectorCheckpoint(checkpointPath); + expect(checkpoint.stores.personal?.after).toBe('2026-04-15T22:00:00.000Z'); + }); + }); + + test('fails fast at health preflight when a store is disconnected', async () => { + await withDb(async (db) => { + const root = createTempDir(); + const checkpointPath = join(root, 'wacli-checkpoint.json'); + let extractorCalled = false; + + const summary = await runActionIngest({ + db, + collectorOptions: { checkpointPath }, + collector: async (_options: CollectWacliMessagesOptions): Promise => ({ + collectedAt: '2026-04-16T10:05:00.000Z', + checkpointPath, + limit: 200, + staleAfterHours: 24, + stores: [ + { + storeKey: 'personal', + storePath: '/stores/personal', + checkpointBefore: null, + checkpointAfter: null, + batchSize: 0, + lastSyncAt: null, + degraded: true, + degradedReason: 'command_failed', + error: 'spawn wacli ENOENT', + messages: [], + }, + ], + messages: [], + degraded: true, + checkpoint: { version: 1, stores: {} }, + }), + extractor: async () => { + extractorCalled = true; + return []; + }, + }); + + expect(summary.success).toBe(false); + expect(summary.healthStatus).toBe('failed'); + expect(summary.failure?.stage).toBe('health'); + expect(summary.alerts[0]).toContain('unhealthy'); + expect(extractorCalled).toBe(false); + }); + }); + + test('continues in degraded mode when store health is stale but connected', async () => { + await withDb(async (db) => { + const root = createTempDir(); + const checkpointPath = join(root, 'wacli-checkpoint.json'); + const messages = [message('m5', '2026-04-16T11:00:00.000Z', 'Joe to send invoice')]; + + const summary = await runActionIngest({ + db, + collectorOptions: { checkpointPath }, + collector: async (_options: CollectWacliMessagesOptions): Promise => ({ + collectedAt: '2026-04-16T12:00:00.000Z', + checkpointPath, + limit: 200, + staleAfterHours: 24, + stores: [ + { + storeKey: 'personal', + storePath: '/stores/personal', + checkpointBefore: null, + checkpointAfter: '2026-04-16T11:00:00.000Z', + batchSize: 1, + lastSyncAt: '2026-04-15T10:00:00.000Z', + degraded: true, + degradedReason: 'last_sync_stale', + error: null, + messages, + }, + ], + messages, + degraded: true, + checkpoint: { + version: 1, + stores: { + personal: { + after: '2026-04-16T11:00:00.000Z', + message_ids_at_after: ['m5'], + updated_at: '2026-04-16T12:00:00.000Z', + }, + }, + }, + }), + extractor: async () => [commitment('Joe', 'Send invoice', 'm5', 0.9)], + }); + + expect(summary.success).toBe(true); + expect(summary.degraded).toBe(true); + expect(summary.healthStatus).toBe('degraded'); + expect(summary.lastSyncAt).toBe('2026-04-15T10:00:00.000Z'); + expect(summary.alerts[0]).toContain('stale'); + expect(summary.commitmentsCreated).toBe(1); + }); + }); +}); + +async function withDb(fn: (db: ActionDb) => Promise): Promise { + const engine = new PGLiteEngine(); + await engine.connect({ engine: 'pglite' } as any); + + const db = (engine as unknown as EngineWithDb).db; + + try { + return await fn(db); + } finally { + await engine.disconnect(); + } +} + +function createTempDir(): string { + const dir = mkdtempSync(join(tmpdir(), 'action-brain-ingest-runner-test-')); + tempDirs.push(dir); + return dir; +} + +function message(id: string, timestamp: string, text: string): CollectedWhatsAppMessage { + return { + MsgID: id, + Timestamp: timestamp, + Text: text, + ChatName: 'Ops', + SenderName: 'Joe', + ChatJID: null, + SenderJID: 'joe@jid', + FromMe: false, + store_key: 'personal', + store_path: '/stores/personal', + }; +} + +function commitment(who: string, owesWhat: string, sourceMessageId: string, confidence: number): StructuredCommitment { + return { + who, + owes_what: owesWhat, + to_whom: 'Abhi', + by_when: null, + confidence, + type: 'commitment', + source_message_id: sourceMessageId, + }; +} diff --git a/test/action-brain/operations.test.ts b/test/action-brain/operations.test.ts index 4f1e87aa..175a064d 100644 --- a/test/action-brain/operations.test.ts +++ b/test/action-brain/operations.test.ts @@ -1,8 +1,12 @@ import { describe, expect, test } from 'bun:test'; +import { mkdtempSync, rmSync } from 'fs'; +import { join } from 'path'; +import { tmpdir } from 'os'; import { mergeOperationSets, operations } from '../../src/core/operations.ts'; import type { Operation, OperationContext } from '../../src/core/operations.ts'; import { PGLiteEngine } from '../../src/core/pglite-engine.ts'; import { actionBrainOperations } from '../../src/action-brain/operations.ts'; +import { writeWacliCollectorCheckpoint } from '../../src/action-brain/collector.ts'; function makeOperation(name: string, cliName?: string): Operation { return { @@ -54,6 +58,7 @@ describe('Action Brain operation integration', () => { expect(names.has('action_resolve')).toBe(true); expect(names.has('action_mark_fp')).toBe(true); expect(names.has('action_ingest')).toBe(true); + expect(names.has('action_ingest_auto')).toBe(true); }); test('#23 mergeOperationSets fails fast on operation and CLI collisions', () => { @@ -80,6 +85,20 @@ describe('Action Brain operation integration', () => { expect(stdout).toContain('Usage: gbrain action list'); }); + test('supports grouped action auto-ingest command via "gbrain action run"', async () => { + const proc = Bun.spawn(['bun', 'run', 'src/cli.ts', 'action', 'run', '--help'], { + cwd: new URL('../..', import.meta.url).pathname, + stdout: 'pipe', + stderr: 'pipe', + }); + + const stdout = await new Response(proc.stdout).text(); + const exitCode = await proc.exited; + + expect(exitCode).toBe(0); + expect(stdout).toContain('Usage: gbrain action run'); + }); + test('action_ingest stays idempotent when commitments arrive in different output order', async () => { await withActionContext(async (ctx, engine) => { const actionIngest = getActionOperation('action_ingest'); @@ -236,4 +255,34 @@ describe('Action Brain operation integration', () => { expect(rows.rows[0].source_contact).toBe('Joe'); }); }); + + test('action_brief resolves freshness from wacli checkpoint when last_sync_at is omitted', async () => { + await withActionContext(async (ctx) => { + const actionBrief = getActionOperation('action_brief'); + const tempDir = mkdtempSync(join(tmpdir(), 'action-brief-checkpoint-test-')); + const checkpointPath = join(tempDir, 'wacli-checkpoint.json'); + + try { + await writeWacliCollectorCheckpoint(checkpointPath, { + version: 1, + stores: { + personal: { + after: '2026-04-16T11:30:00.000Z', + message_ids_at_after: ['m1'], + updated_at: '2026-04-16T11:31:00.000Z', + }, + }, + }); + + const result = (await actionBrief.handler(ctx, { + now: '2026-04-16T12:00:00.000Z', + checkpoint_path: checkpointPath, + })) as { brief: string }; + + expect(result.brief).toContain('wacli freshness: last sync 2026-04-16T11:30:00.000Z (0.5h ago)'); + } finally { + rmSync(tempDir, { recursive: true, force: true }); + } + }); + }); }); From f82ea1c7180bec61fb27e4ba3232e112eb8b5ee9 Mon Sep 17 00:00:00 2001 From: Abhinav Bansal Date: Thu, 16 Apr 2026 22:51:56 +0800 Subject: [PATCH 3/7] docs: update CLAUDE.md for collector + ingest-runner (v0.10.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add collector.ts and ingest-runner.ts to Key files section - Update action-engine.ts description (createItemWithResult idempotency) - Update extractor.ts description (owner context injection) - Update operations.ts count: 5 → 6 ops (adds action_ingest_auto) - Add collector.test.ts and ingest-runner.test.ts to Testing section - Update unit test file count: 33 → 35 Co-Authored-By: Paperclip --- CLAUDE.md | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 47a3a5a7..08d684a2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -63,10 +63,12 @@ markdown files (tool-agnostic, work with both CLI and plugin contexts). - `openclaw.plugin.json` — ClawHub bundle plugin manifest - `src/action-brain/types.ts` — Action Brain shared types (ActionItem, CommitmentBatch, ExtractionResult) - `src/action-brain/action-schema.ts` — PGLite DDL + idempotent schema init for action_items / action_history tables -- `src/action-brain/action-engine.ts` — Storage layer: CRUD, priority scoring (urgency × confidence × recency), PGLite lifecycle -- `src/action-brain/extractor.ts` — LLM commitment extraction (two-tier Haiku→Sonnet), XML delimiter defense, stable source IDs +- `src/action-brain/action-engine.ts` — Storage layer: CRUD, priority scoring (urgency × confidence × recency), PGLite lifecycle; `createItemWithResult()` returns idempotency signal (created vs skipped) +- `src/action-brain/extractor.ts` — LLM commitment extraction (two-tier Haiku→Sonnet), XML delimiter defense, stable source IDs, owner context injection - `src/action-brain/brief.ts` — Morning priority brief generator: ranked action items, overdue detection, deduplication -- `src/action-brain/operations.ts` — 5 Action Brain operations (action_list, action_brief, action_resolve, action_mark_fp, action_ingest) +- `src/action-brain/collector.ts` — Wacli message collector: reads WhatsApp export files, deduplicates by message ID, checkpoint-aware (skips already-processed messages) +- `src/action-brain/ingest-runner.ts` — Auto-ingest orchestrator: preflight checks, staleness gate, collect → extract → store pipeline; cron-ready, returns structured JSON +- `src/action-brain/operations.ts` — 6 Action Brain operations (action_list, action_brief, action_resolve, action_mark_fp, action_ingest, action_ingest_auto) ## Commands @@ -78,7 +80,7 @@ Key commands added in v0.7: ## Testing -`bun test` runs all tests (33 unit test files + 5 E2E test files). Unit tests run +`bun test` runs all tests (35 unit test files + 5 E2E test files). Unit tests run without a database. E2E tests skip gracefully when `DATABASE_URL` is not set. Unit tests: `test/markdown.test.ts` (frontmatter parsing), `test/chunkers/recursive.test.ts` @@ -104,9 +106,11 @@ parity), `test/cli.test.ts` (CLI structure), `test/config.test.ts` (config redac `test/eval.test.ts` (retrieval metrics: precisionAtK, recallAtK, mrr, ndcgAtK, parseQrels), `test/action-brain/action-schema.test.ts` (Action Brain DDL + idempotent init), `test/action-brain/action-engine.test.ts` (CRUD, scoring, PGLite lifecycle), -`test/action-brain/extractor.test.ts` (extraction, source ID stability, injection defense, timestamp bounds), +`test/action-brain/extractor.test.ts` (extraction, source ID stability, injection defense, timestamp bounds, owner context), `test/action-brain/brief.test.ts` (brief generation, scoring, dedup, overdue detection), -`test/action-brain/operations.test.ts` (all 5 ops, ingest trust boundary, batch fallbacks). +`test/action-brain/collector.test.ts` (wacli file reading, checkpoint store, dedup, freshness filtering), +`test/action-brain/ingest-runner.test.ts` (preflight checks, staleness gate, collect/extract/store pipeline, structured JSON output), +`test/action-brain/operations.test.ts` (all 6 ops, ingest trust boundary, batch fallbacks, action_ingest_auto pipeline). E2E tests (`test/e2e/`): Run against real Postgres+pgvector. Require `DATABASE_URL`. - `bun run test:e2e` runs Tier 1 (mechanical, all operations, no API keys) From 8b537c810482f4dd4a1f9887b28f6ca3d22185e5 Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Fri, 17 Apr 2026 07:07:51 +0800 Subject: [PATCH 4/7] fix(action-brain): make auto-ingest store stage atomic --- src/action-brain/action-engine.ts | 21 ++++++- src/action-brain/ingest-runner.ts | 73 ++++++++++++++----------- test/action-brain/ingest-runner.test.ts | 28 +++++++--- 3 files changed, 80 insertions(+), 42 deletions(-) diff --git a/src/action-brain/action-engine.ts b/src/action-brain/action-engine.ts index 06635960..67902f32 100644 --- a/src/action-brain/action-engine.ts +++ b/src/action-brain/action-engine.ts @@ -59,6 +59,10 @@ export interface CreateActionItemResult { created: boolean; } +interface ActionExecutionOptions { + useTransaction?: boolean; +} + export interface ListActionItemsFilters { status?: ActionStatus; owner?: string; @@ -84,6 +88,10 @@ export class ActionTransitionError extends Error { export class ActionEngine { constructor(private readonly db: ActionDb) {} + async transaction(fn: () => Promise): Promise { + return this.withTransaction(fn); + } + async createItem(input: CreateActionItemInput, options: ActionMutationOptions = {}): Promise { const result = await this.createItemWithResult(input, options); return result.item; @@ -91,9 +99,10 @@ export class ActionEngine { async createItemWithResult( input: CreateActionItemInput, - options: ActionMutationOptions = {} + options: ActionMutationOptions = {}, + execution: ActionExecutionOptions = {} ): Promise { - return this.withTransaction(async () => { + const execute = async () => { const result = await this.db.query( `WITH inserted AS ( INSERT INTO action_items ( @@ -159,7 +168,13 @@ export class ActionEngine { item: mapActionItem(row), created: toBoolean(row.was_inserted), }; - }); + }; + + if (execution.useTransaction === false) { + return execute(); + } + + return this.withTransaction(execute); } async getItem(id: number): Promise { diff --git a/src/action-brain/ingest-runner.ts b/src/action-brain/ingest-runner.ts index 6ca21eb8..072ca061 100644 --- a/src/action-brain/ingest-runner.ts +++ b/src/action-brain/ingest-runner.ts @@ -143,40 +143,51 @@ export async function runActionIngest(options: RunActionIngestOptions): Promise< const engine = new ActionEngine(options.db); try { - for (const commitment of commitments) { - const sourceMessage = resolveSourceMessage(collection.messages, commitment); - const sourceMessageId = buildCommitmentSourceId( - resolveSourceMessageId(collection.messages, commitment, sourceMessage), - commitment - ); - - const result = await engine.createItemWithResult( - { - title: toActionTitle(commitment.owes_what), - type: commitment.type, - source_message_id: sourceMessageId, - owner: commitment.who ?? '', - waiting_on: null, - due_at: parseOptionalDate(commitment.by_when, 'by_when'), - confidence: clampConfidence(commitment.confidence), - source_thread: sourceMessage?.ChatName ?? '', - source_contact: sourceMessage?.SenderName ?? '', - linked_entity_slugs: [], - }, - { - actor, - metadata: { - ingestion_mode: 'auto_runner', + await engine.transaction(async () => { + let commitmentsCreated = 0; + let duplicatesSkipped = 0; + + for (const commitment of commitments) { + const sourceMessage = resolveSourceMessage(collection.messages, commitment); + const sourceMessageId = buildCommitmentSourceId( + resolveSourceMessageId(collection.messages, commitment, sourceMessage), + commitment + ); + + const result = await engine.createItemWithResult( + { + title: toActionTitle(commitment.owes_what), + type: commitment.type, + source_message_id: sourceMessageId, + owner: commitment.who ?? '', + waiting_on: null, + due_at: parseOptionalDate(commitment.by_when, 'by_when'), + confidence: clampConfidence(commitment.confidence), + source_thread: sourceMessage?.ChatName ?? '', + source_contact: sourceMessage?.SenderName ?? '', + linked_entity_slugs: [], }, + { + actor, + metadata: { + ingestion_mode: 'auto_runner', + }, + }, + { + useTransaction: false, + } + ); + + if (result.created) { + commitmentsCreated += 1; + } else { + duplicatesSkipped += 1; } - ); - - if (result.created) { - summary.commitmentsCreated += 1; - } else { - summary.duplicatesSkipped += 1; } - } + + summary.commitmentsCreated = commitmentsCreated; + summary.duplicatesSkipped = duplicatesSkipped; + }); } catch (err) { summary.failure = toFailure('store', err); return summary; diff --git a/test/action-brain/ingest-runner.test.ts b/test/action-brain/ingest-runner.test.ts index 1728a470..e9ab259d 100644 --- a/test/action-brain/ingest-runner.test.ts +++ b/test/action-brain/ingest-runner.test.ts @@ -186,7 +186,7 @@ describe('runActionIngest', () => { }); }); - test('does not advance checkpoint when store stage fails', async () => { + test('rolls back all stored items and leaves checkpoint unchanged when a later store write fails', async () => { await withDb(async (db) => { const root = createTempDir(); const checkpointPath = join(root, 'wacli-checkpoint.json'); @@ -201,7 +201,10 @@ describe('runActionIngest', () => { }, }); - const messages = [message('m4', '2026-04-16T10:00:00.000Z', 'Joe to send manifest')]; + const messages = [ + message('m4', '2026-04-16T10:00:00.000Z', 'Joe to send manifest'), + message('m5', '2026-04-16T10:02:00.000Z', 'Mukesh to confirm payout date'), + ]; const summary = await runActionIngest({ db, collectorOptions: { checkpointPath }, @@ -215,9 +218,9 @@ describe('runActionIngest', () => { storeKey: 'personal', storePath: '/stores/personal', checkpointBefore: '2026-04-15T22:00:00.000Z', - checkpointAfter: '2026-04-16T10:00:00.000Z', - batchSize: 1, - lastSyncAt: '2026-04-16T10:00:00.000Z', + checkpointAfter: '2026-04-16T10:02:00.000Z', + batchSize: 2, + lastSyncAt: '2026-04-16T10:02:00.000Z', degraded: false, degradedReason: null, error: null, @@ -230,16 +233,17 @@ describe('runActionIngest', () => { version: 1, stores: { personal: { - after: '2026-04-16T10:00:00.000Z', - message_ids_at_after: ['m4'], + after: '2026-04-16T10:02:00.000Z', + message_ids_at_after: ['m5'], updated_at: '2026-04-16T10:05:00.000Z', }, }, }, }), extractor: async () => [ + commitment('Joe', 'Send manifest', 'm4', 0.9), { - ...commitment('Joe', 'Send manifest', 'm4', 0.9), + ...commitment('Mukesh', 'Confirm payout date', 'm5', 0.95), by_when: 'not-a-date', }, ], @@ -247,10 +251,18 @@ describe('runActionIngest', () => { expect(summary.success).toBe(false); expect(summary.failure?.stage).toBe('store'); + expect(summary.commitmentsCreated).toBe(0); + expect(summary.duplicatesSkipped).toBe(0); expect(summary.checkpointAdvanced).toBe(false); const checkpoint = await readWacliCollectorCheckpoint(checkpointPath); expect(checkpoint.stores.personal?.after).toBe('2026-04-15T22:00:00.000Z'); + + const itemRows = await db.query<{ count: number }>('SELECT count(*)::int AS count FROM action_items'); + expect(itemRows.rows[0]?.count).toBe(0); + + const historyRows = await db.query<{ count: number }>('SELECT count(*)::int AS count FROM action_history'); + expect(historyRows.rows[0]?.count).toBe(0); }); }); From dcb41111e474989927fd022d9799db360a3adc61 Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Fri, 17 Apr 2026 09:23:31 +0800 Subject: [PATCH 5/7] fix(action-brain): bind ingest writes to adapter transaction context Co-Authored-By: Paperclip --- src/action-brain/action-engine.ts | 37 +++++++++----- src/action-brain/ingest-runner.ts | 10 ++-- src/action-brain/operations.ts | 41 ++++++++++----- test/action-brain/action-engine.test.ts | 67 +++++++++++++++++++++++++ test/action-brain/operations.test.ts | 40 ++++++++++++++- 5 files changed, 165 insertions(+), 30 deletions(-) diff --git a/src/action-brain/action-engine.ts b/src/action-brain/action-engine.ts index 67902f32..6c6ff006 100644 --- a/src/action-brain/action-engine.ts +++ b/src/action-brain/action-engine.ts @@ -6,6 +6,7 @@ interface QueryResult { interface ActionDb { query>(sql: string, params?: unknown[]): Promise>; + transaction?(fn: (db: ActionDb) => Promise): Promise; } interface ActionItemRow { @@ -88,8 +89,11 @@ export class ActionTransitionError extends Error { export class ActionEngine { constructor(private readonly db: ActionDb) {} - async transaction(fn: () => Promise): Promise { - return this.withTransaction(fn); + async transaction(fn: (engine: ActionEngine) => Promise): Promise { + return this.withTransaction(async (txDb) => { + const txEngine = txDb === this.db ? this : new ActionEngine(txDb); + return fn(txEngine); + }); } async createItem(input: CreateActionItemInput, options: ActionMutationOptions = {}): Promise { @@ -102,8 +106,8 @@ export class ActionEngine { options: ActionMutationOptions = {}, execution: ActionExecutionOptions = {} ): Promise { - const execute = async () => { - const result = await this.db.query( + const execute = async (db: ActionDb) => { + const result = await db.query( `WITH inserted AS ( INSERT INTO action_items ( title, @@ -154,6 +158,7 @@ export class ActionEngine { if (toBoolean(row.was_inserted)) { await this.insertHistory( + db, row.id, 'created', options.actor ?? 'system', @@ -171,7 +176,7 @@ export class ActionEngine { }; if (execution.useTransaction === false) { - return execute(); + return execute(this.db); } return this.withTransaction(execute); @@ -243,11 +248,11 @@ export class ActionEngine { nextStatus: ActionStatus, options: ActionMutationOptions = {} ): Promise { - return this.withTransaction(async () => { - const currentRow = await this.lockItemById(id); + return this.withTransaction(async (db) => { + const currentRow = await this.lockItemById(db, id); validateTransition(id, currentRow.status, nextStatus); - const updateResult = await this.db.query( + const updateResult = await db.query( `UPDATE action_items SET status = $2, resolved_at = CASE WHEN $2 = 'resolved' THEN now() ELSE NULL END, @@ -266,6 +271,7 @@ export class ActionEngine { nextStatus === 'resolved' ? 'resolved' : nextStatus === 'dropped' ? 'dropped' : 'status_change'; await this.insertHistory( + db, id, eventType, options.actor ?? 'system', @@ -284,8 +290,8 @@ export class ActionEngine { return this.updateItemStatus(id, 'resolved', options); } - private async lockItemById(id: number): Promise { - const rowResult = await this.db.query( + private async lockItemById(db: ActionDb, id: number): Promise { + const rowResult = await db.query( `SELECT * FROM action_items WHERE id = $1 @@ -302,22 +308,27 @@ export class ActionEngine { } private async insertHistory( + db: ActionDb, itemId: number, eventType: ActionHistoryEventType, actor: string, metadata: Record ): Promise { - await this.db.query( + await db.query( `INSERT INTO action_history (item_id, event_type, actor, metadata) VALUES ($1, $2, $3, $4::jsonb)`, [itemId, eventType, actor, JSON.stringify(metadata)] ); } - private async withTransaction(fn: () => Promise): Promise { + private async withTransaction(fn: (db: ActionDb) => Promise): Promise { + if (typeof this.db.transaction === 'function') { + return this.db.transaction(async (txDb) => fn(txDb ?? this.db)); + } + await this.db.query('BEGIN'); try { - const result = await fn(); + const result = await fn(this.db); await this.db.query('COMMIT'); return result; } catch (error) { diff --git a/src/action-brain/ingest-runner.ts b/src/action-brain/ingest-runner.ts index 072ca061..801156e4 100644 --- a/src/action-brain/ingest-runner.ts +++ b/src/action-brain/ingest-runner.ts @@ -17,8 +17,12 @@ interface QueryResult { rows: T[]; } -interface ActionDb { +interface ActionQueryDb { query>(sql: string, params?: unknown[]): Promise>; + transaction?(fn: (db: ActionQueryDb) => Promise): Promise; +} + +interface ActionDb extends ActionQueryDb { exec: (sql: string) => Promise; } @@ -143,7 +147,7 @@ export async function runActionIngest(options: RunActionIngestOptions): Promise< const engine = new ActionEngine(options.db); try { - await engine.transaction(async () => { + await engine.transaction(async (txEngine) => { let commitmentsCreated = 0; let duplicatesSkipped = 0; @@ -154,7 +158,7 @@ export async function runActionIngest(options: RunActionIngestOptions): Promise< commitment ); - const result = await engine.createItemWithResult( + const result = await txEngine.createItemWithResult( { title: toActionTitle(commitment.owes_what), type: commitment.type, diff --git a/src/action-brain/operations.ts b/src/action-brain/operations.ts index 3702340c..5a9afb37 100644 --- a/src/action-brain/operations.ts +++ b/src/action-brain/operations.ts @@ -19,6 +19,7 @@ interface QueryResult { interface QueryableDb { query>(sql: string, params?: unknown[]): Promise>; exec?: (sql: string) => Promise; + transaction?(fn: (db: QueryableDb) => Promise): Promise; } interface ExecQueryableDb extends QueryableDb { @@ -27,8 +28,7 @@ interface ExecQueryableDb extends QueryableDb { interface PostgresUnsafeConnection { unsafe: (sql: string, params?: unknown[]) => Promise[]>; - reserve?: () => Promise; - release?: () => Promise; + begin?: (fn: (tx: PostgresUnsafeConnection) => Promise) => Promise; } const STATUS_VALUES = ['open', 'waiting_on', 'in_progress', 'stale', 'resolved', 'dropped'] as const; @@ -333,17 +333,7 @@ async function resolveActionDb(engine: BrainEngine): Promise { const sql = candidate.sql as PostgresUnsafeConnection | undefined; if (sql && typeof sql.unsafe === 'function') { - // Use the pool directly (not sql.reserve) to avoid holding a dedicated connection indefinitely. - // Each query checks out a connection from the pool and returns it when done. - const wrapped: QueryableDb = { - query: async >(statement: string, params: unknown[] = []) => { - const rows = params.length === 0 ? await sql.unsafe(statement) : await sql.unsafe(statement, params); - return { rows: rows as T[] }; - }, - exec: async (statement: string) => { - await sql.unsafe(statement); - }, - }; + const wrapped = wrapPostgresConnection(sql, true); postgresDbCache.set(cacheKey, wrapped); return wrapped; } @@ -351,6 +341,31 @@ async function resolveActionDb(engine: BrainEngine): Promise { throw new Error('Unsupported engine for Action Brain operations. Expected PGLiteEngine or PostgresEngine.'); } +function wrapPostgresConnection(sql: PostgresUnsafeConnection, allowBegin: boolean): QueryableDb { + const wrapped: QueryableDb = { + query: async >(statement: string, params: unknown[] = []) => { + const rows = params.length === 0 ? await sql.unsafe(statement) : await sql.unsafe(statement, params); + return { rows: rows as T[] }; + }, + exec: async (statement: string) => { + await sql.unsafe(statement); + }, + }; + + wrapped.transaction = async (fn: (db: QueryableDb) => Promise) => { + if (!allowBegin || typeof sql.begin !== 'function') { + return fn(wrapped); + } + return sql.begin(async (tx) => fn(wrapPostgresConnection(tx, false))); + }; + + return wrapped; +} + +export async function __resolveActionDbForTests(engine: BrainEngine): Promise { + return resolveActionDb(engine); +} + function requireExecQueryableDb(db: QueryableDb): ExecQueryableDb { if (typeof db.exec !== 'function') { throw new Error('Action auto-ingest requires an exec-capable database adapter.'); diff --git a/test/action-brain/action-engine.test.ts b/test/action-brain/action-engine.test.ts index ee0b5229..5a91aec3 100644 --- a/test/action-brain/action-engine.test.ts +++ b/test/action-brain/action-engine.test.ts @@ -19,6 +19,73 @@ async function createEngine(): Promise<{ db: PGlite; engine: ActionEngine }> { } describe('ActionEngine', () => { + test('prefers adapter transaction callbacks over raw BEGIN/COMMIT wrappers', async () => { + const statements: string[] = []; + let transactionCalls = 0; + + const txDb = { + query: async >(sql: string): Promise<{ rows: T[] }> => { + statements.push(sql); + + if (sql.includes('WITH inserted AS')) { + return { + rows: [ + { + id: 42, + title: 'Send cargo docs', + type: 'commitment', + status: 'open', + owner: 'Joe', + waiting_on: null, + due_at: null, + stale_after_hours: 48, + priority_score: 0, + confidence: 0.9, + source_message_id: 'msg-tx-001', + source_thread: 'Ops', + source_contact: 'Joe', + linked_entity_slugs: [], + created_at: '2026-04-16T00:00:00.000Z', + updated_at: '2026-04-16T00:00:00.000Z', + resolved_at: null, + was_inserted: true, + } as T, + ], + }; + } + + if (sql.includes('INSERT INTO action_history')) { + return { rows: [] }; + } + + throw new Error(`Unexpected SQL in test adapter: ${sql}`); + }, + }; + + const adapter = { + query: async (): Promise<{ rows: [] }> => { + throw new Error('Expected ActionEngine to run queries through the transaction-scoped adapter'); + }, + transaction: async (fn: (db: typeof txDb) => Promise): Promise => { + transactionCalls += 1; + return fn(txDb); + }, + }; + + const engine = new ActionEngine(adapter); + const item = await engine.createItem({ + title: 'Send cargo docs', + type: 'commitment', + source_message_id: 'msg-tx-001', + owner: 'Joe', + }); + + expect(item.id).toBe(42); + expect(transactionCalls).toBe(1); + expect(statements.some((sql) => sql === 'BEGIN')).toBe(false); + expect(statements.some((sql) => sql === 'COMMIT')).toBe(false); + }); + test('createItem inserts a new action and writes created history', async () => { const { db: localDb, engine } = await createEngine(); diff --git a/test/action-brain/operations.test.ts b/test/action-brain/operations.test.ts index 175a064d..0904d347 100644 --- a/test/action-brain/operations.test.ts +++ b/test/action-brain/operations.test.ts @@ -5,7 +5,7 @@ import { tmpdir } from 'os'; import { mergeOperationSets, operations } from '../../src/core/operations.ts'; import type { Operation, OperationContext } from '../../src/core/operations.ts'; import { PGLiteEngine } from '../../src/core/pglite-engine.ts'; -import { actionBrainOperations } from '../../src/action-brain/operations.ts'; +import { __resolveActionDbForTests, actionBrainOperations } from '../../src/action-brain/operations.ts'; import { writeWacliCollectorCheckpoint } from '../../src/action-brain/collector.ts'; function makeOperation(name: string, cliName?: string): Operation { @@ -99,6 +99,44 @@ describe('Action Brain operation integration', () => { expect(stdout).toContain('Usage: gbrain action run'); }); + test('postgres action adapter uses sql.begin transaction context for multi-query units', async () => { + const poolCalls: string[] = []; + const txCalls: string[] = []; + let beginCalls = 0; + + const txConnection = { + unsafe: async (statement: string, _params?: unknown[]) => { + txCalls.push(statement); + return []; + }, + }; + + const engine = { + sql: { + unsafe: async (statement: string, _params?: unknown[]) => { + poolCalls.push(statement); + return []; + }, + begin: async (fn: (tx: typeof txConnection) => Promise): Promise => { + beginCalls += 1; + return fn(txConnection); + }, + }, + } as any; + + const db = await __resolveActionDbForTests(engine); + expect(typeof db.transaction).toBe('function'); + + await db.transaction?.(async (txDb) => { + await txDb.query('SELECT 1'); + await txDb.exec?.('SELECT 2'); + }); + + expect(beginCalls).toBe(1); + expect(txCalls).toEqual(['SELECT 1', 'SELECT 2']); + expect(poolCalls).toEqual([]); + }); + test('action_ingest stays idempotent when commitments arrive in different output order', async () => { await withActionContext(async (ctx, engine) => { const actionIngest = getActionOperation('action_ingest'); From 58eebd7e7353b770356ab2f557a7053ebbcab4f1 Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Fri, 17 Apr 2026 09:41:56 +0800 Subject: [PATCH 6/7] docs: sync test counts and action-brain file list for v0.10.2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CLAUDE.md: update test count 35→43 unit, 5→6 E2E; add embed/import-walker/pglite-lock/search-limit entries; expand collector.test.ts description with FIFO cap and fail-closed detail - CONTRIBUTING.md: add collector.ts and ingest-runner.ts to action-brain section; update ops count 5→6 (action_ingest_auto added) Co-Authored-By: Paperclip --- CLAUDE.md | 12 ++++++++---- CONTRIBUTING.md | 8 +++++--- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 08d684a2..5f90a46d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -65,7 +65,7 @@ markdown files (tool-agnostic, work with both CLI and plugin contexts). - `src/action-brain/action-schema.ts` — PGLite DDL + idempotent schema init for action_items / action_history tables - `src/action-brain/action-engine.ts` — Storage layer: CRUD, priority scoring (urgency × confidence × recency), PGLite lifecycle; `createItemWithResult()` returns idempotency signal (created vs skipped) - `src/action-brain/extractor.ts` — LLM commitment extraction (two-tier Haiku→Sonnet), XML delimiter defense, stable source IDs, owner context injection -- `src/action-brain/brief.ts` — Morning priority brief generator: ranked action items, overdue detection, deduplication +- `src/action-brain/brief.ts` — Morning priority brief generator: ranked action items, overdue detection, deduplication; freshness reads from wacli checkpoint (not action item creation time) - `src/action-brain/collector.ts` — Wacli message collector: reads WhatsApp export files, deduplicates by message ID, checkpoint-aware (skips already-processed messages) - `src/action-brain/ingest-runner.ts` — Auto-ingest orchestrator: preflight checks, staleness gate, collect → extract → store pipeline; cron-ready, returns structured JSON - `src/action-brain/operations.ts` — 6 Action Brain operations (action_list, action_brief, action_resolve, action_mark_fp, action_ingest, action_ingest_auto) @@ -80,7 +80,7 @@ Key commands added in v0.7: ## Testing -`bun test` runs all tests (35 unit test files + 5 E2E test files). Unit tests run +`bun test` runs all tests (43 unit test files + 6 E2E test files). Unit tests run without a database. E2E tests skip gracefully when `DATABASE_URL` is not set. Unit tests: `test/markdown.test.ts` (frontmatter parsing), `test/chunkers/recursive.test.ts` @@ -108,9 +108,13 @@ parity), `test/cli.test.ts` (CLI structure), `test/config.test.ts` (config redac `test/action-brain/action-engine.test.ts` (CRUD, scoring, PGLite lifecycle), `test/action-brain/extractor.test.ts` (extraction, source ID stability, injection defense, timestamp bounds, owner context), `test/action-brain/brief.test.ts` (brief generation, scoring, dedup, overdue detection), -`test/action-brain/collector.test.ts` (wacli file reading, checkpoint store, dedup, freshness filtering), +`test/action-brain/collector.test.ts` (wacli file reading, checkpoint store, dedup, freshness filtering, fail-closed on invalid checkpoint, FIFO cap on same-second IDs, collapsed health alert for global checkpoint failure), `test/action-brain/ingest-runner.test.ts` (preflight checks, staleness gate, collect/extract/store pipeline, structured JSON output), -`test/action-brain/operations.test.ts` (all 6 ops, ingest trust boundary, batch fallbacks, action_ingest_auto pipeline). +`test/action-brain/operations.test.ts` (all 6 ops, ingest trust boundary, batch fallbacks, action_ingest_auto pipeline), +`test/embed.test.ts` (embedding interface contract), +`test/import-walker.test.ts` (directory walker, symlink handling, file filtering), +`test/pglite-lock.test.ts` (PGLite concurrent access and lock behavior), +`test/search-limit.test.ts` (MAX_SEARCH_LIMIT constant, clampSearchLimit bounds). E2E tests (`test/e2e/`): Run against real Postgres+pgvector. Require `DATABASE_URL`. - `bun run test:e2e` runs Tier 1 (mechanical, all operations, no API keys) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a857cf3b..6d104346 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -40,10 +40,12 @@ src/ action-brain/ Action Brain extension (commitment/obligation tracking) types.ts Shared types (ActionItem, CommitmentBatch, ExtractionResult) action-schema.ts PGLite DDL + schema init for action_items/action_history tables - action-engine.ts Storage layer: CRUD, priority scoring, PGLite lifecycle + action-engine.ts Storage layer: CRUD, priority scoring, PGLite lifecycle; createItemWithResult() for idempotency signal extractor.ts LLM commitment extraction with prompt injection defense - brief.ts Morning priority brief generator (ranked + deduped) - operations.ts 5 registered ops: action_list/brief/resolve/mark-fp/ingest + brief.ts Morning priority brief generator (ranked + deduped, checkpoint-aware) + collector.ts Wacli message collector: checkpoint-aware cursor, dedup by message ID, FIFO cap, fail-closed on bad checkpoint + ingest-runner.ts Auto-ingest orchestrator: preflight checks, staleness gate, collect→extract→store pipeline, structured JSON + operations.ts 6 registered ops: action_list, action_brief, action_resolve, action_mark_fp, action_ingest, action_ingest_auto schema.sql Postgres DDL skills/ Fat markdown skills for AI agents test/ Unit tests (bun test, no DB required) From 5bdd8ddb466bc637480388c1c6c7385c3c10fa9c Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Fri, 17 Apr 2026 09:42:22 +0800 Subject: [PATCH 7/7] =?UTF-8?q?docs:=20sync=20CLAUDE.md=20=E2=80=94=20brie?= =?UTF-8?q?f.ts=20checkpoint-aware=20freshness=20note?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Paperclip --- CLAUDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CLAUDE.md b/CLAUDE.md index 5f90a46d..42522184 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -107,7 +107,7 @@ parity), `test/cli.test.ts` (CLI structure), `test/config.test.ts` (config redac `test/action-brain/action-schema.test.ts` (Action Brain DDL + idempotent init), `test/action-brain/action-engine.test.ts` (CRUD, scoring, PGLite lifecycle), `test/action-brain/extractor.test.ts` (extraction, source ID stability, injection defense, timestamp bounds, owner context), -`test/action-brain/brief.test.ts` (brief generation, scoring, dedup, overdue detection), +`test/action-brain/brief.test.ts` (brief generation, scoring, dedup, overdue detection, checkpoint-aware freshness), `test/action-brain/collector.test.ts` (wacli file reading, checkpoint store, dedup, freshness filtering, fail-closed on invalid checkpoint, FIFO cap on same-second IDs, collapsed health alert for global checkpoint failure), `test/action-brain/ingest-runner.test.ts` (preflight checks, staleness gate, collect/extract/store pipeline, structured JSON output), `test/action-brain/operations.test.ts` (all 6 ops, ingest trust boundary, batch fallbacks, action_ingest_auto pipeline),