Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .claude/CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
@../CLAUDE.md
10 changes: 0 additions & 10 deletions src/postgres/migrations/001_create_outbox.sql

This file was deleted.

17 changes: 10 additions & 7 deletions src/postgres/store.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
// MIT License
// Copyright (c) 2026 sparetimecoders

import { readFileSync } from "node:fs";
import { join, dirname } from "node:path";
import { fileURLToPath } from "node:url";
import type { Pool, PoolClient } from "pg";
import type { OutboxInserter, OutboxProcessor, OutboxRecord } from "../types.js";

const __dirname = dirname(fileURLToPath(import.meta.url));
const migrationSQL = readFileSync(
join(__dirname, "migrations", "001_create_outbox.sql"),
"utf-8",
const migrationSQL = `\
CREATE TABLE IF NOT EXISTS messaging_outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type TEXT NOT NULL,
routing_key TEXT NOT NULL,
payload JSONB NOT NULL,
headers JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX IF NOT EXISTS idx_messaging_outbox_created_at ON messaging_outbox (created_at, id);`;

export interface PostgresStoreOptions {
/** Skip running the embedded migration on creation. Default: false. */
skipMigrations?: boolean;
Expand Down
179 changes: 164 additions & 15 deletions src/relay.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// MIT License
// Copyright (c) 2026 sparetimecoders

import { describe, it, expect, mock, beforeEach, afterEach } from "bun:test";
import { describe, it, expect, mock } from "bun:test";
import { createRelay } from "./relay.js";
import type {
OutboxProcessor,
Expand Down Expand Up @@ -29,20 +29,27 @@ function makeRecord(id: string, routingKey: string): OutboxRecord {
}

describe("createRelay", () => {
beforeEach(() => {
mock.module("timers", () => ({}));
});

it("publishes events and deletes them", async () => {
const records = [
makeRecord("1", "user.created"),
makeRecord("2", "user.updated"),
];

const processFn = mock(async (_batchSize: number, fn: (records: OutboxRecord[]) => Promise<string[]>) => {
const published = await fn(records);
return published.length;
let processDone: () => void;
const processed = new Promise<void>((r) => {
processDone = r;
});

const processFn = mock(
async (
_batchSize: number,
fn: (records: OutboxRecord[]) => Promise<string[]>,
) => {
const published = await fn(records);
processDone();
return published.length;
},
);
const store: OutboxProcessor = {
process: processFn,
};
Expand All @@ -61,9 +68,8 @@ describe("createRelay", () => {
mockLogger(),
);

relay.start();
// Allow the initial poll to execute
await new Promise((resolve) => setTimeout(resolve, 50));
await relay.start();
await processed;
await relay.stop();

expect(published).toHaveLength(2);
Expand All @@ -73,7 +79,15 @@ describe("createRelay", () => {
});

it("stops when stop() is called", async () => {
const processFn = mock(async () => 0);
let processDone: () => void;
const processed = new Promise<void>((r) => {
processDone = r;
});

const processFn = mock(async () => {
processDone();
return 0;
});
const store: OutboxProcessor = {
process: processFn,
};
Expand All @@ -88,14 +102,149 @@ describe("createRelay", () => {
mockLogger(),
);

relay.start();
await new Promise((resolve) => setTimeout(resolve, 50));
await relay.start();
await processed;
await relay.stop();

const callCount = processFn.mock.calls.length;
await new Promise((resolve) => setTimeout(resolve, 200));

// No more calls after stop
expect(processFn).toHaveBeenCalledTimes(callCount);
});

it("rejects batchSize <= 0", () => {
expect(() =>
createRelay(
{ process: mock() } as OutboxProcessor,
{ publishRaw: mock() } as RawPublisher,
{ batchSize: 0 },
mockLogger(),
),
).toThrow("batchSize must be positive");
});

it("rejects pollIntervalMs <= 0", () => {
expect(() =>
createRelay(
{ process: mock() } as OutboxProcessor,
{ publishRaw: mock() } as RawPublisher,
{ pollIntervalMs: -1 },
mockLogger(),
),
).toThrow("pollIntervalMs must be positive");
});

it("handles publish error by breaking and logging", async () => {
const records = [
makeRecord("1", "user.created"),
makeRecord("2", "user.updated"),
];

let processDone: () => void;
const processed = new Promise<void>((r) => {
processDone = r;
});

const processFn = mock(
async (
_batchSize: number,
fn: (records: OutboxRecord[]) => Promise<string[]>,
) => {
const published = await fn(records);
processDone();
return published.length;
},
);
const store: OutboxProcessor = { process: processFn };

const publishRawMock = mock(async (routingKey: string) => {
if (routingKey === "user.updated") {
throw new Error("broker down");
}
});
const publisher: RawPublisher = { publishRaw: publishRawMock };

const log = mockLogger();
const relay = createRelay(
store,
publisher,
{ pollIntervalMs: 1000, batchSize: 100 },
log,
);

await relay.start();
await processed;
await relay.stop();

expect(publishRawMock).toHaveBeenCalledTimes(2);
expect(log.error).toHaveBeenCalled();
});

it("applies exponential backoff on store.process failure", async () => {
let callCount = 0;
let processDone: () => void;
const processed = new Promise<void>((r) => {
processDone = r;
});

const processFn = mock(async () => {
callCount++;
if (callCount === 1) {
throw new Error("db connection failed");
}
processDone();
return 0;
});
const store: OutboxProcessor = { process: processFn };
const publisher: RawPublisher = { publishRaw: mock() };

const log = mockLogger();
const relay = createRelay(
store,
publisher,
{ pollIntervalMs: 10, batchSize: 10 },
log,
);

await relay.start();
await processed;
await relay.stop();

expect(log.error).toHaveBeenCalled();
expect(callCount).toBeGreaterThanOrEqual(2);
});

it("start() awaits pending stop before restarting", async () => {
let processDone: () => void;
const processed = new Promise<void>((r) => {
processDone = r;
});

const processFn = mock(async () => {
processDone();
return 0;
});
const store: OutboxProcessor = { process: processFn };
const publisher: RawPublisher = { publishRaw: mock() };

const relay = createRelay(
store,
publisher,
{ pollIntervalMs: 100, batchSize: 10 },
mockLogger(),
);

await relay.start();
await processed;

const stopPromise = relay.stop();
// Start while stop is in progress
const startPromise = relay.start();
await stopPromise;
await startPromise;

// Should be running again - stop cleanly
await relay.stop();
});

});
42 changes: 34 additions & 8 deletions src/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type {
} from "./types.js";

const tracer = trace.getTracer("outbox-relay");
const MAX_BACKOFF_MS = 30_000;

/**
* Creates a relay that polls the outbox store and publishes events to the broker.
Expand All @@ -25,9 +26,14 @@ export function createRelay(
const pollIntervalMs = config.pollIntervalMs ?? 1000;
const batchSize = config.batchSize ?? 100;

if (batchSize <= 0) throw new Error("batchSize must be positive");
if (pollIntervalMs <= 0) throw new Error("pollIntervalMs must be positive");

let running = false;
let timeoutId: ReturnType<typeof setTimeout> | null = null;
let inflight: Promise<void> | null = null;
let stopPromise: Promise<void> | null = null;
let consecutiveErrors = 0;

async function processEvents(): Promise<void> {
if (!running) return;
Expand All @@ -42,8 +48,6 @@ export function createRelay(
const publishedIDs: string[] = [];

for (const record of records) {
if (!running) break;

try {
await publisher.publishRaw(
record.routing_key,
Expand All @@ -64,8 +68,14 @@ export function createRelay(
},
);

// When the batch was full there may be more events waiting,
// so we poll again immediately rather than waiting for the
// next interval. This is a conservative heuristic: it avoids
// an extra SELECT to check whether more rows exist.
batchWasFull = published >= batchSize;
consecutiveErrors = 0;
} catch (err) {
consecutiveErrors += 1;
span.recordException(
err instanceof Error ? err : new Error(String(err)),
);
Expand All @@ -80,15 +90,27 @@ export function createRelay(
});

if (running) {
const delay = batchWasFull ? 0 : pollIntervalMs;
const backoffDelay =
consecutiveErrors > 0
? Math.min(
pollIntervalMs * Math.pow(2, consecutiveErrors),
MAX_BACKOFF_MS,
) + Math.random() * 1000
: 0;
const delay = batchWasFull ? 0 : Math.max(pollIntervalMs, backoffDelay);
timeoutId = setTimeout(processEvents, delay);
}
}

return {
start() {
async start() {
if (stopPromise) {
await stopPromise;
stopPromise = null;
}
if (running) return;
running = true;
consecutiveErrors = 0;
log.info(
{ pollIntervalMs, batchSize },
"Outbox relay started",
Expand All @@ -104,10 +126,14 @@ export function createRelay(
clearTimeout(timeoutId);
timeoutId = null;
}
if (inflight) {
await inflight;
}
log.info("Outbox relay stopped");
const doStop = async (): Promise<void> => {
if (inflight) {
await inflight;
}
log.info({}, "Outbox relay stopped");
};
stopPromise = doStop();
await stopPromise;
},
};
}
Loading
Loading