Skip to content

Transactional Outbox Pattern for Atomic Audit Logging & Event Streaming #682

@markusahlstrand

Description

@markusahlstrand

Transactional Outbox Pattern for Atomic Audit Logging & Event Streaming

Type: Feature / Architecture
Priority: High
Affects: adapter-interfaces, kysely, authhero


Problem Statement

AuthHero currently has no mechanism to guarantee atomic writes across business data and audit logs. When a user is created, a role assigned, or a session started, the actual entity write and the corresponding audit log write (logs.create()) are two independent operations. If the application crashes between them, we lose audit records silently.

The problem is compounded by two architectural gaps:

  1. Non-transactional destinations. Some log destinations (Cloudflare Analytics Engine, R2, webhooks) are inherently fire-and-forget. They can never participate in a database transaction, so we need to separate "capture" from "delivery."

  2. Lost context. The current LogInsert type is an Auth0-compatible presentation format designed for the management API GET /logs endpoint. It doesn't capture what actually happened — there's no before/after entity state, no actor identity separate from the affected user, no request payload, no authorization context. Route handlers read the "before" state (e.g., userToPatch in the PATCH /users handler) but this context is thrown away before logging. This makes the audit log nearly useless for compliance forensics, database sync, or replaying state.

Proposed Solution

Implement the transactional outbox pattern with an event-first design:

  1. Define a rich AuditEvent type that captures "what happened" — the actor, the entity before and after, the request context, the authorization context.
  2. Write AuditEvent records to an outbox table atomically within the same database transaction as the entity mutation.
  3. A background relay drains the outbox and transforms events into whatever shape each destination needs — LogInsert for the logs adapter, a different projection for Analytics Engine, full events for database sync or R2 archival.
  4. The outbox is opt-in via config. When disabled, the current waitUntillogs.create() behavior is preserved.

The key insight: the outbox stores raw events, not log entries. The transformation to LogInsert (or any other format) happens in the relay, not at capture time. This means the outbox can serve use cases that LogInsert never could — syncing to another database, replaying entity state, computing diffs, feeding ML pipelines, etc.

Architecture Overview

With outbox enabled

┌──────────────────────────────────────────────────────────────────┐
│  Request Handler                                                 │
│                                                                  │
│   ┌────────────────────────────────────────────────────────┐     │
│   │  DB Transaction (request-scoped)                       │     │
│   │                                                        │     │
│   │  1. SELECT user WHERE id = X          ← before state   │     │
│   │  2. UPDATE user SET ...               ← entity write   │     │
│   │  3. INSERT INTO outbox {              ← audit event     │     │
│   │       actor, before, after,                            │     │
│   │       request context, ...                             │     │
│   │     }                                                  │     │
│   └────────────────────────────────────────────────────────┘     │
│                                                                  │
└──────────────────────────────────────────────────────────────────┘
                          │
                          ▼  waitUntil (async, after response)
┌──────────────────────────────────────────────────────────────────┐
│  Outbox Relay                                                    │
│                                                                  │
│  SELECT * FROM outbox WHERE processed_at IS NULL                 │
│  ORDER BY sequence ASC LIMIT batch_size                          │
│                                                                  │
│  For each event, TRANSFORM and deliver:                          │
│                                                                  │
│    → LogsDestination:     AuditEvent → LogInsert → logs.create() │
│    → AnalyticsEngine:     AuditEvent → blobs/doubles projection  │
│    → R2Destination:       AuditEvent → NDJSON (full event)       │
│    → DatabaseSync:        AuditEvent → replay to target adapter  │
│    → WebhookDestination:  AuditEvent → filtered tenant payload   │
│                                                                  │
│  Mark rows as processed, retry failures with backoff             │
└──────────────────────────────────────────────────────────────────┘

With outbox disabled (default, current behavior)

┌──────────────────────────────────────────────────────────────────┐
│  Request Handler                                                 │
│                                                                  │
│   1. UPDATE user SET ...                                         │
│   2. waitUntil → logs.create(...)   ← fire-and-forget            │
│                                                                  │
└──────────────────────────────────────────────────────────────────┘

The AuditEvent Type

This is the core of the design. Instead of storing LogInsert, the outbox stores a rich event that captures everything about what happened:

interface AuditEvent {
  // ── Identity ──────────────────────────────────────────────
  id: string;                    // nanoid, generated at capture time
  tenant_id: string;

  // ── What happened ─────────────────────────────────────────
  event_type: string;            // e.g. 'user.updated', 'session.created'
  log_type: LogType;             // Auth0-compatible code ('sapi', 'f', etc.)
  description?: string;          // Human-readable summary
  category: AuditCategory;       // 'user_action' | 'admin_action' | 'system' | 'api'

  // ── Who did it ────────────────────────────────────────────
  actor: {
    type: 'user' | 'admin' | 'system' | 'api_key' | 'client_credentials';
    id?: string;                 // sub claim from JWT, or system identifier
    email?: string;
    org_id?: string;             // organization context
    org_name?: string;
    scopes?: string[];           // permissions/scopes used for this action
    client_id?: string;          // OAuth client that issued the token
  };

  // ── What was affected ─────────────────────────────────────
  target: {
    type: string;                // 'user' | 'session' | 'role' | 'client' | etc.
    id: string;                  // entity ID
    before?: Record<string, unknown>;  // entity state before the operation
    after?: Record<string, unknown>;   // entity state after the operation
    diff?: Record<string, { old: unknown; new: unknown }>;  // computed changes
  };

  // ── Request context ───────────────────────────────────────
  request: {
    method: string;              // GET, POST, PATCH, DELETE
    path: string;                // /api/v2/users/auth0|123
    query?: Record<string, string>;
    body?: unknown;              // request payload (sanitized — no passwords)
    ip: string;
    user_agent?: string;
    correlation_id?: string;     // for tracing across services
  };

  // ── Response context ──────────────────────────────────────
  response?: {
    status_code: number;
    body?: unknown;              // truncated if large
  };

  // ── Auth context ──────────────────────────────────────────
  connection?: string;           // auth connection used
  strategy?: string;             // auth strategy
  strategy_type?: string;

  // ── Environment ───────────────────────────────────────────
  location?: {
    country_code?: string;
    city_name?: string;
    latitude?: string;
    longitude?: string;
    time_zone?: string;
    continent_code?: string;
  };
  auth0_client?: {               // SDK client info
    name: string;
    version: string;
    env?: Record<string, string>;
  };
  hostname: string;
  is_mobile?: boolean;

  // ── Metadata ──────────────────────────────────────────────
  timestamp: string;             // ISO 8601
}

type AuditCategory = 'user_action' | 'admin_action' | 'system' | 'api';

What this captures that LogInsert doesn't

Context LogInsert today AuditEvent
Who performed the action user_id (ambiguous — is this the actor or the target?) Separate actor and target objects
Entity state before change Not captured (thrown away in route handlers) target.before
Entity state after change Partially in details.response.body target.after
What fields changed Not captured target.diff (computed from before/after)
Request payload Buried in details.request.body (auto-generated, inconsistent) request.body (explicit, always present)
HTTP method and path Buried in details.request Top-level request.method and request.path
Authorization scopes used scope field (the OAuth scope, not what was checked) actor.scopes (actual permissions used)
Actor type Not captured actor.type distinguishes admin vs user vs system
Audit category Not captured category field for filtering/routing
Correlation ID Not captured request.correlation_id for distributed tracing
Organization context Not captured actor.org_id and actor.org_name

Why this matters for new use cases

Database sync: With target.before and target.after, the relay can replay state changes to a secondary database. The outbox becomes a replication log. This is impossible with LogInsert since it doesn't contain entity state.

Compliance forensics: "Show me every change to this user's permissions in the last 90 days, who made each change, and what the permissions looked like before and after." Currently impossible; with AuditEvent, it's a simple query.

Anomaly detection: "Alert when an admin modifies more than 10 users in 5 minutes" requires knowing the actor type and action — both absent from LogInsert.

Webhook filtering: Tenants can subscribe to specific event types and receive the before/after state. Auth0's Log Streaming feature works this way.

Outbox Table Schema

The outbox stores the AuditEvent as a JSON payload, plus indexed columns for relay queries:

CREATE TABLE outbox_events (
  id              TEXT PRIMARY KEY,
  tenant_id       TEXT NOT NULL,
  sequence        INTEGER AUTO_INCREMENT,
  event_type      TEXT NOT NULL,          -- indexed for filtering
  log_type        TEXT NOT NULL,          -- Auth0 LogType code
  aggregate_type  TEXT NOT NULL,          -- 'user', 'session', etc.
  aggregate_id    TEXT NOT NULL,          -- entity ID
  payload         TEXT NOT NULL,          -- JSON: full AuditEvent
  created_at      TEXT NOT NULL,
  processed_at    TEXT,
  retry_count     INTEGER DEFAULT 0,
  next_retry_at   TEXT,
  error           TEXT
);

CREATE INDEX idx_outbox_unprocessed
  ON outbox_events (processed_at, sequence)
  WHERE processed_at IS NULL;

CREATE INDEX idx_outbox_tenant_type
  ON outbox_events (tenant_id, event_type, created_at);

The top-level columns (event_type, log_type, aggregate_type, aggregate_id) are denormalized from the JSON payload for indexing and relay query efficiency. The full AuditEvent lives in payload.

Relay Transformations

Each destination gets a transformer that converts AuditEvent into the format it needs:

interface EventDestination {
  name: string;
  transform(event: AuditEvent): unknown;  // destination-specific shape
  deliver(events: unknown[]): Promise<void>;
}

AuditEvent → LogInsert (for LogsDataAdapter)

function toLogInsert(event: AuditEvent): LogInsert {
  return {
    type: event.log_type,
    date: event.timestamp,
    description: event.description,
    ip: event.request.ip,
    user_agent: event.request.user_agent,
    user_id: event.target.type === 'user' ? event.target.id : event.actor.id,
    user_name: event.target.after?.name || event.target.after?.email,
    client_id: event.actor.client_id,
    client_name: '',
    connection: event.connection,
    strategy: event.strategy,
    strategy_type: event.strategy_type,
    audience: event.request.query?.audience,
    scope: event.actor.scopes?.join(' '),
    hostname: event.hostname,
    auth0_client: event.auth0_client,
    isMobile: event.is_mobile || false,
    location_info: event.location,
    details: {
      request: {
        method: event.request.method,
        path: event.request.path,
        qs: event.request.query,
        body: event.request.body,
      },
      response: event.response ? {
        statusCode: event.response.status_code,
        body: event.response.body,
      } : undefined,
    },
  };
}

AuditEvent → Database Sync

function toDatabaseSync(event: AuditEvent): SyncOperation {
  // Replay the mutation to a target adapter
  return {
    operation: event.event_type.split('.')[1],  // 'created' | 'updated' | 'deleted'
    entity_type: event.target.type,
    entity_id: event.target.id,
    tenant_id: event.tenant_id,
    state: event.target.after,  // full entity state to upsert
  };
}

AuditEvent → R2/S3

No transformation needed — store the full AuditEvent as NDJSON. This is your compliance archive.

Detailed Design

Phase 1: Transaction Support in Adapter Interfaces

Add a transaction method to DataAdapters:

interface DataAdapters {
  // ... existing adapters ...

  transaction<T>(
    fn: (trxAdapters: DataAdapters) => Promise<T>
  ): Promise<T>;
}

Kysely implementation passes a trx handle to create scoped adapters:

transaction: async <T>(fn: (trxAdapters: DataAdapters) => Promise<T>): Promise<T> => {
  return db.transaction().execute(async (trx) => {
    const trxAdapters = createAdapters(trx, databaseOptions);
    return fn(trxAdapters);
  });
},

This is additive — no existing code changes. The transaction() method is new on the interface. Existing non-transactional code continues to work unchanged.

Phase 2: Outbox Table, AuditEvent Type & Adapter

Add the outbox_events table (schema above), the AuditEvent type to adapter-interfaces, and the OutboxAdapter:

interface OutboxAdapter {
  create(tenantId: string, event: AuditEventInsert): Promise<OutboxEvent>;
  getUnprocessed(limit: number): Promise<OutboxEvent[]>;
  markProcessed(ids: string[]): Promise<void>;
  markRetry(id: string, error: string, nextRetryAt: string): Promise<void>;
  cleanup(olderThan: string): Promise<number>;
}

Phase 3: Config Toggle & Capture Refactor

Add config:

interface AuthHeroConfig {
  dataAdapter: DataAdapters;
  outbox?: {
    enabled: boolean;
    destinations: EventDestination[];
    captureEntityState?: boolean;  // capture before/after (slight perf cost)
  };
}

Refactor logMessage() to build an AuditEvent from the available context and either write to outbox (when enabled) or pipe to logs.create() via waitUntil (when disabled):

export async function logMessage(ctx: Context, tenantId: string, params: LogParams) {
  if (ctx.env.outbox?.enabled) {
    const event = buildAuditEvent(ctx, tenantId, params);
    await ctx.env.data.outbox.create(tenantId, event);
  } else {
    // Current behavior: build LogInsert, fire-and-forget
    const log = buildLogInsert(ctx, tenantId, params);
    const promise = ctx.env.data.logs.create(tenantId, log);
    if (params.waitForCompletion) await promise;
    else ctx.executionCtx.waitUntil(promise);
  }
}

The LogParams type gains optional fields for entity state:

type LogParams = {
  // ... existing fields ...
  beforeState?: Record<string, unknown>;  // entity before mutation
  afterState?: Record<string, unknown>;   // entity after mutation
};

Route handlers that already read the before state (like PATCH /users which fetches userToPatch) just pass it through:

// In PATCH /users handler (already reads the user before patching)
const userToPatch = await data.users.get(tenant_id, user_id);
// ... apply patch ...
const updatedUser = await data.users.update(tenant_id, user_id, fields);

await logMessage(ctx, tenant_id, {
  type: LogTypes.SUCCESS_API_OPERATION,
  description: "Update a User",
  beforeState: userToPatch,   // ← already available, just not passed today
  afterState: updatedUser,    // ← already available, just not passed today
  body,
});

Phase 4: Request-Scoped Transactions

When outbox is enabled, middleware wraps each request in a single transaction:

app.use('*', async (ctx, next) => {
  if (ctx.env.outbox?.enabled) {
    await ctx.env.data.transaction(async (trxAdapters) => {
      ctx.env.data = trxAdapters;
      await next();
    });
  } else {
    await next();
  }
});

Not a major refactor — all route handlers already access adapters via ctx.env.data.*, so they work unchanged. The transaction commits at the middleware boundary.

Phase 5: Outbox Relay

Runs via waitUntil after each request (or Cron Trigger for bulk):

app.use('*', async (ctx, next) => {
  await next();
  if (ctx.env.outbox?.enabled) {
    ctx.executionCtx.waitUntil(
      drainOutbox(ctx.env.data, ctx.env.outbox.destinations)
    );
  }
});

Each destination transforms and delivers independently. Failures in one destination don't block others. Exponential backoff (1s → 2s → 4s → 8s → 16s, max 5 retries).

Phase 6: Cleanup & Retention

Processed events retained for configurable period (default: 7 days). Integrates with existing sessionCleanup pattern.

Database-Specific Considerations

SQLite (local development)

Works natively. WAL mode for concurrent reads. No special considerations.

PlanetScale / MySQL (production)

Standard MySQL transactions work. PlanetScale supports foreign keys (optional) but the outbox intentionally avoids them — aggregate rows may be deleted before their outbox events are processed. Index (processed_at, sequence) for relay performance.

How Auth Systems Handle This

Keycloak — Event Listener SPI, not transactional with entity writes. Events can be lost on crashes. No before/after state capture.

FusionAuth — Audit log events via webhooks. Best-effort delivery. Limited context (no entity state diffs).

Supabase Auth — Writes audit logs transactionally in Postgres. Closest to our approach but no outbox relay, no entity state capture, no pluggable destinations.

Our approach combines Supabase's transactional writes with a rich event model (before/after state, actor context) and a pluggable relay for non-transactional destinations.

Migration Path

Each phase is independently shippable:

  1. Phase 1DataAdapters.transaction(). Additive, no existing code changes.
  2. Phase 2AuditEvent type, outbox table, OutboxAdapter. Schema only, no behavioral changes.
  3. Phase 3 — Config toggle, logMessage() refactor, LogParams gains beforeState/afterState. Gradually update route handlers to pass entity state. When outbox disabled, identical to today.
  4. Phase 4 — Request-scoped transaction middleware. Activated only when outbox enabled.
  5. Phase 5 — Relay processor. Start with LogsDestination (writes to existing logs table). Add Analytics Engine, R2, database sync, webhooks over time.
  6. Phase 6 — Cleanup job.

The existing LogsDataAdapter is preserved. When outbox is enabled, it becomes a read-optimized query store populated asynchronously by the LogsDestination transformer. The management API GET /logs endpoints continue to read from it in both modes.

Acceptance Criteria

  • AuditEvent type defined in adapter-interfaces with actor, target (before/after), request, response, and auth context
  • DataAdapters.transaction() available in adapter interface and implemented in Kysely
  • outbox_events table exists with proper indexes
  • OutboxAdapter interface defined and implemented in Kysely
  • AuthHeroConfig.outbox.enabled boolean controls behavior
  • When outbox disabled: behavior identical to today
  • When outbox enabled: logMessage() builds AuditEvent and writes to outbox within request transaction
  • Request-scoped transaction middleware active when outbox enabled
  • Relay with at least LogsDestination transformer: AuditEventLogInsertlogs.create()
  • At least one route handler passes beforeState/afterState (e.g., PATCH /users)
  • Failed deliveries retried with exponential backoff
  • Processed events cleaned up after retention period
  • Works with SQLite (dev) and PlanetScale/MySQL (production)
  • Management API GET /logs works in both modes
  • Unit test: transaction rollback (outbox event not persisted if entity write fails)
  • Unit test: AuditEventLogInsert transformer produces valid LogInsert
  • Integration test: entity write → outbox → relay → logs table populated

Open Questions

  1. Before state capture cost — Fetching the entity before every update adds a read per write. The captureEntityState config option makes this opt-in. Is it worth the overhead for all entities, or only for high-value ones (users, roles, clients)?
  2. Payload size — Full before/after entity state can be large. Should we cap the payload size or store only changed fields (the diff)?
  3. Sensitive field redaction — Passwords are already excluded, but should we redact other fields (tokens, secrets) from target.before/target.after?
  4. Per-tenant destination configuration — Should tenants configure their own webhook endpoints and R2 buckets, or is this system-level?
  5. Destination-level tracking — Should the outbox track delivery status per-destination (e.g., "delivered to logs but failed for R2"), or is a single processed_at sufficient?
  6. Cloudflare Durable Objects — For production, should the relay be a Durable Object (single-instance per tenant, natural ordering) rather than waitUntil?
  7. Event schema versioning — Should AuditEvent carry a schema_version field for forward compatibility?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions