diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md new file mode 100644 index 0000000..8e93194 --- /dev/null +++ b/.claude/CLAUDE.md @@ -0,0 +1 @@ +@../CLAUDE.md diff --git a/src/postgres/migrations/001_create_outbox.sql b/src/postgres/migrations/001_create_outbox.sql deleted file mode 100644 index dfc1c48..0000000 --- a/src/postgres/migrations/001_create_outbox.sql +++ /dev/null @@ -1,10 +0,0 @@ -CREATE TABLE IF NOT EXISTS messaging_outbox ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - event_type TEXT NOT NULL, - routing_key TEXT NOT NULL, - payload JSONB NOT NULL, - headers JSONB NOT NULL DEFAULT '{}', - created_at TIMESTAMPTZ NOT NULL DEFAULT now() -); - -CREATE INDEX IF NOT EXISTS idx_messaging_outbox_created_at ON messaging_outbox (created_at, id); diff --git a/src/postgres/store.ts b/src/postgres/store.ts index 8d22d5c..fa8de1c 100644 --- a/src/postgres/store.ts +++ b/src/postgres/store.ts @@ -1,18 +1,21 @@ // MIT License // Copyright (c) 2026 sparetimecoders -import { readFileSync } from "node:fs"; -import { join, dirname } from "node:path"; -import { fileURLToPath } from "node:url"; import type { Pool, PoolClient } from "pg"; import type { OutboxInserter, OutboxProcessor, OutboxRecord } from "../types.js"; -const __dirname = dirname(fileURLToPath(import.meta.url)); -const migrationSQL = readFileSync( - join(__dirname, "migrations", "001_create_outbox.sql"), - "utf-8", +const migrationSQL = `\ +CREATE TABLE IF NOT EXISTS messaging_outbox ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + event_type TEXT NOT NULL, + routing_key TEXT NOT NULL, + payload JSONB NOT NULL, + headers JSONB NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); +CREATE INDEX IF NOT EXISTS idx_messaging_outbox_created_at ON messaging_outbox (created_at, id);`; + export interface PostgresStoreOptions { /** Skip running the embedded migration on creation. Default: false. */ skipMigrations?: boolean; diff --git a/src/relay.test.ts b/src/relay.test.ts index 98ebc61..2e1546a 100644 --- a/src/relay.test.ts +++ b/src/relay.test.ts @@ -1,7 +1,7 @@ // MIT License // Copyright (c) 2026 sparetimecoders -import { describe, it, expect, mock, beforeEach, afterEach } from "bun:test"; +import { describe, it, expect, mock } from "bun:test"; import { createRelay } from "./relay.js"; import type { OutboxProcessor, @@ -29,20 +29,27 @@ function makeRecord(id: string, routingKey: string): OutboxRecord { } describe("createRelay", () => { - beforeEach(() => { - mock.module("timers", () => ({})); - }); - it("publishes events and deletes them", async () => { const records = [ makeRecord("1", "user.created"), makeRecord("2", "user.updated"), ]; - const processFn = mock(async (_batchSize: number, fn: (records: OutboxRecord[]) => Promise) => { - const published = await fn(records); - return published.length; + let processDone: () => void; + const processed = new Promise((r) => { + processDone = r; }); + + const processFn = mock( + async ( + _batchSize: number, + fn: (records: OutboxRecord[]) => Promise, + ) => { + const published = await fn(records); + processDone(); + return published.length; + }, + ); const store: OutboxProcessor = { process: processFn, }; @@ -61,9 +68,8 @@ describe("createRelay", () => { mockLogger(), ); - relay.start(); - // Allow the initial poll to execute - await new Promise((resolve) => setTimeout(resolve, 50)); + await relay.start(); + await processed; await relay.stop(); expect(published).toHaveLength(2); @@ -73,7 +79,15 @@ describe("createRelay", () => { }); it("stops when stop() is called", async () => { - const processFn = mock(async () => 0); + let processDone: () => void; + const processed = new Promise((r) => { + processDone = r; + }); + + const processFn = mock(async () => { + processDone(); + return 0; + }); const store: OutboxProcessor = { process: processFn, }; @@ -88,14 +102,149 @@ describe("createRelay", () => { mockLogger(), ); - relay.start(); - await new Promise((resolve) => setTimeout(resolve, 50)); + await relay.start(); + await processed; await relay.stop(); const callCount = processFn.mock.calls.length; - await new Promise((resolve) => setTimeout(resolve, 200)); // No more calls after stop expect(processFn).toHaveBeenCalledTimes(callCount); }); + + it("rejects batchSize <= 0", () => { + expect(() => + createRelay( + { process: mock() } as OutboxProcessor, + { publishRaw: mock() } as RawPublisher, + { batchSize: 0 }, + mockLogger(), + ), + ).toThrow("batchSize must be positive"); + }); + + it("rejects pollIntervalMs <= 0", () => { + expect(() => + createRelay( + { process: mock() } as OutboxProcessor, + { publishRaw: mock() } as RawPublisher, + { pollIntervalMs: -1 }, + mockLogger(), + ), + ).toThrow("pollIntervalMs must be positive"); + }); + + it("handles publish error by breaking and logging", async () => { + const records = [ + makeRecord("1", "user.created"), + makeRecord("2", "user.updated"), + ]; + + let processDone: () => void; + const processed = new Promise((r) => { + processDone = r; + }); + + const processFn = mock( + async ( + _batchSize: number, + fn: (records: OutboxRecord[]) => Promise, + ) => { + const published = await fn(records); + processDone(); + return published.length; + }, + ); + const store: OutboxProcessor = { process: processFn }; + + const publishRawMock = mock(async (routingKey: string) => { + if (routingKey === "user.updated") { + throw new Error("broker down"); + } + }); + const publisher: RawPublisher = { publishRaw: publishRawMock }; + + const log = mockLogger(); + const relay = createRelay( + store, + publisher, + { pollIntervalMs: 1000, batchSize: 100 }, + log, + ); + + await relay.start(); + await processed; + await relay.stop(); + + expect(publishRawMock).toHaveBeenCalledTimes(2); + expect(log.error).toHaveBeenCalled(); + }); + + it("applies exponential backoff on store.process failure", async () => { + let callCount = 0; + let processDone: () => void; + const processed = new Promise((r) => { + processDone = r; + }); + + const processFn = mock(async () => { + callCount++; + if (callCount === 1) { + throw new Error("db connection failed"); + } + processDone(); + return 0; + }); + const store: OutboxProcessor = { process: processFn }; + const publisher: RawPublisher = { publishRaw: mock() }; + + const log = mockLogger(); + const relay = createRelay( + store, + publisher, + { pollIntervalMs: 10, batchSize: 10 }, + log, + ); + + await relay.start(); + await processed; + await relay.stop(); + + expect(log.error).toHaveBeenCalled(); + expect(callCount).toBeGreaterThanOrEqual(2); + }); + + it("start() awaits pending stop before restarting", async () => { + let processDone: () => void; + const processed = new Promise((r) => { + processDone = r; + }); + + const processFn = mock(async () => { + processDone(); + return 0; + }); + const store: OutboxProcessor = { process: processFn }; + const publisher: RawPublisher = { publishRaw: mock() }; + + const relay = createRelay( + store, + publisher, + { pollIntervalMs: 100, batchSize: 10 }, + mockLogger(), + ); + + await relay.start(); + await processed; + + const stopPromise = relay.stop(); + // Start while stop is in progress + const startPromise = relay.start(); + await stopPromise; + await startPromise; + + // Should be running again - stop cleanly + await relay.stop(); + }); + }); diff --git a/src/relay.ts b/src/relay.ts index 0378ce4..81001cb 100644 --- a/src/relay.ts +++ b/src/relay.ts @@ -11,6 +11,7 @@ import type { } from "./types.js"; const tracer = trace.getTracer("outbox-relay"); +const MAX_BACKOFF_MS = 30_000; /** * Creates a relay that polls the outbox store and publishes events to the broker. @@ -25,9 +26,14 @@ export function createRelay( const pollIntervalMs = config.pollIntervalMs ?? 1000; const batchSize = config.batchSize ?? 100; + if (batchSize <= 0) throw new Error("batchSize must be positive"); + if (pollIntervalMs <= 0) throw new Error("pollIntervalMs must be positive"); + let running = false; let timeoutId: ReturnType | null = null; let inflight: Promise | null = null; + let stopPromise: Promise | null = null; + let consecutiveErrors = 0; async function processEvents(): Promise { if (!running) return; @@ -42,8 +48,6 @@ export function createRelay( const publishedIDs: string[] = []; for (const record of records) { - if (!running) break; - try { await publisher.publishRaw( record.routing_key, @@ -64,8 +68,14 @@ export function createRelay( }, ); + // When the batch was full there may be more events waiting, + // so we poll again immediately rather than waiting for the + // next interval. This is a conservative heuristic: it avoids + // an extra SELECT to check whether more rows exist. batchWasFull = published >= batchSize; + consecutiveErrors = 0; } catch (err) { + consecutiveErrors += 1; span.recordException( err instanceof Error ? err : new Error(String(err)), ); @@ -80,15 +90,27 @@ export function createRelay( }); if (running) { - const delay = batchWasFull ? 0 : pollIntervalMs; + const backoffDelay = + consecutiveErrors > 0 + ? Math.min( + pollIntervalMs * Math.pow(2, consecutiveErrors), + MAX_BACKOFF_MS, + ) + Math.random() * 1000 + : 0; + const delay = batchWasFull ? 0 : Math.max(pollIntervalMs, backoffDelay); timeoutId = setTimeout(processEvents, delay); } } return { - start() { + async start() { + if (stopPromise) { + await stopPromise; + stopPromise = null; + } if (running) return; running = true; + consecutiveErrors = 0; log.info( { pollIntervalMs, batchSize }, "Outbox relay started", @@ -104,10 +126,14 @@ export function createRelay( clearTimeout(timeoutId); timeoutId = null; } - if (inflight) { - await inflight; - } - log.info("Outbox relay stopped"); + const doStop = async (): Promise => { + if (inflight) { + await inflight; + } + log.info({}, "Outbox relay stopped"); + }; + stopPromise = doStop(); + await stopPromise; }, }; } diff --git a/src/types.ts b/src/types.ts index 19da996..2e72995 100644 --- a/src/types.ts +++ b/src/types.ts @@ -6,7 +6,7 @@ export interface OutboxRecord { id: string; event_type: string; routing_key: string; - /** JSON-serialized payload (stored as string to match Go's []byte). */ + /** JSON-serialized payload (stored as string to match Go []byte). */ payload: string; headers: Record; created_at: Date; @@ -19,7 +19,13 @@ export interface OutboxEvent { headers?: Record; } -/** Write path: inserts outbox records within a caller-managed transaction. */ +/** + * Write path: inserts outbox records within a caller-managed transaction. + * + * The Writer supplies `id` and `created_at` so that the CE `ce-id` header + * and the database row share the same identity and timestamp, guaranteeing + * consistency between the stored record and the published CloudEvent. + */ export interface OutboxInserter { insert(record: OutboxRecord): Promise; } @@ -57,9 +63,7 @@ export interface RawPublisher { /** Pino-compatible logger subset. */ export interface Logger { info(obj: Record, msg?: string): void; - info(msg: string): void; error(obj: Record, msg?: string): void; - error(msg: string): void; } /** Relay polling configuration. */ @@ -72,6 +76,6 @@ export interface RelayConfig { /** Controls the relay lifecycle. */ export interface RelayHandle { - start: () => void; + start: () => Promise; stop: () => Promise; } diff --git a/src/writer.test.ts b/src/writer.test.ts index a1d7462..c9222aa 100644 --- a/src/writer.test.ts +++ b/src/writer.test.ts @@ -51,4 +51,19 @@ describe("Writer", () => { expect(inserter.inserted[0].headers["ce-subject"]).toBe("user/123"); }); + + it("does not allow caller to override required CE headers", async () => { + const inserter = mockInserter(); + const writer = new Writer("test-service"); + + await writer.write(inserter, { + routingKey: "user.created", + payload: { name: "alice" }, + headers: { "ce-source": "attacker", "ce-type": "evil.event" }, + }); + + const headers = inserter.inserted[0].headers; + expect(headers["ce-source"]).toBe("test-service"); + expect(headers["ce-type"]).toBe("user.created"); + }); }); diff --git a/src/writer.ts b/src/writer.ts index d2e8aa8..a5b33ef 100644 --- a/src/writer.ts +++ b/src/writer.ts @@ -32,13 +32,13 @@ export class Writer { async write(inserter: OutboxInserter, event: OutboxEvent): Promise { const now = new Date(); const ceHeaders: Record = { + ...event.headers, [CESpecVersion]: CESpecVersionValue, [CEType]: event.routingKey, [CESource]: this.serviceName, [CEID]: randomUUID(), [CETime]: now.toISOString(), [CEDataContentType]: "application/json", - ...event.headers, }; const record: OutboxRecord = {