diff --git a/docs/superpowers/plans/2026-04-27-offline-queue-watchdog-and-idempotency.md b/docs/superpowers/plans/2026-04-27-offline-queue-watchdog-and-idempotency.md new file mode 100644 index 00000000..1ebc2d99 --- /dev/null +++ b/docs/superpowers/plans/2026-04-27-offline-queue-watchdog-and-idempotency.md @@ -0,0 +1,1196 @@ +# Offline Queue Watchdog + Idempotency Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Make the offline payment queue safe across tab crashes by adding (a) a watchdog that reclaims stuck `processing` rows after 60s and (b) an idempotency-key partial unique index on `payment_intents` so retries don't double-charge. + +**Architecture:** Three concerns, one PR. Schema gets `idempotency_key TEXT` + partial unique index. `BaseOfflineQueue.sync()` gains a watchdog reclaim sweep at the top and conflict-aware accounting in the per-item try block. `PaymentQueueAdapter.executePaymentIntent` generates a UUID at queue-time and uses upsert-with-ignoreDuplicates so a retry of a successful INSERT is a server-side no-op. + +**Tech Stack:** Dexie.js (IndexedDB), Supabase JS client, Postgres partial unique index, Vitest, TypeScript strict. + +**Spec:** [`docs/superpowers/specs/2026-04-27-offline-queue-watchdog-and-idempotency-design.md`](../specs/2026-04-27-offline-queue-watchdog-and-idempotency-design.md) + +**Tracks:** [#52](https://github.com/TortoiseWolfe/ScriptHammer/issues/52) (Family A2 in `docs/STABILITY-TRACKING.md`) + +--- + +## File map + +| File | Action | Purpose | +| ------------------------------------------------------------ | ------ | ------------------------------------------------------------------------------------------------------------------ | +| `supabase/migrations/20251006_complete_monolithic_setup.sql` | Modify | Add `idempotency_key TEXT` column + partial unique index to `payment_intents` | +| `src/lib/offline-queue/types.ts` | Modify | Add `processingTimeoutMs` to `QueueConfig`/default; add `conflicted` to `SyncResult`; add `ProcessItemResult` type | +| `src/lib/offline-queue/base-queue.ts` | Modify | Watchdog reclaim sweep + conflict-aware accounting in `sync()` | +| `src/lib/offline-queue/payment-adapter.ts` | Modify | Generate idempotency_key at queue-time; upsert with ignoreDuplicates; return ProcessItemResult | +| `src/lib/offline-queue/__tests__/base-queue.test.ts` | Modify | Add 3 cases: watchdog reclaim, fresh-processing-not-reclaimed, conflict accounting | +| `src/lib/offline-queue/__tests__/payment-adapter.test.ts` | Create | New test file: 2 cases for dedupe and missing-key fallback | + +--- + +## Task 1: Branch off main + +**Files:** + +- (none) + +- [ ] **Step 1: Confirm clean tree on main** + +```bash +git status --short +git rev-parse --abbrev-ref HEAD +``` + +Expected: only `?? .claude/scheduled_tasks.lock` (local clutter), branch `main`. + +- [ ] **Step 2: Sync local main with origin** + +```bash +git fetch --prune origin +git merge --ff-only origin/main +``` + +Expected: "Already up to date." or fast-forward to current origin tip. + +- [ ] **Step 3: Create branch from inside container** + +```bash +docker compose exec scripthammer git checkout -b 052/offline-queue-watchdog-idempotency +``` + +Expected: "Switched to a new branch '052/offline-queue-watchdog-idempotency'" + +--- + +## Task 2: Schema — add `idempotency_key` to `payment_intents` + +**Files:** + +- Modify: `supabase/migrations/20251006_complete_monolithic_setup.sql:47-54` + +- [ ] **Step 1: Read the surrounding context** + +```bash +sed -n '35,55p' supabase/migrations/20251006_complete_monolithic_setup.sql +``` + +Expected: `CREATE TABLE IF NOT EXISTS payment_intents (...)` followed by indexes and the `COMMENT ON TABLE payment_intents IS '...'` line. + +- [ ] **Step 2: Insert the column + index after the existing indexes, before the COMMENT** + +Find the line `COMMENT ON TABLE payment_intents IS 'Customer payment intentions before provider redirect (24hr expiry)';` and immediately _before_ it, insert these blank-line-separated blocks: + +```sql +-- Idempotency key for offline-queue retries (#52). Partial unique index: +-- only enforced when set, so direct-server INSERTs (admin tooling, edge +-- functions) without a key remain valid. Only client-queued INSERTs +-- participate in dedupe. +ALTER TABLE payment_intents + ADD COLUMN IF NOT EXISTS idempotency_key TEXT; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_payment_intents_idempotency_key + ON payment_intents(idempotency_key) + WHERE idempotency_key IS NOT NULL; + +``` + +Use the Edit tool with the existing COMMENT line as `old_string` and the new block + COMMENT as `new_string` to make the insertion exact. + +- [ ] **Step 3: Verify the change parses as valid SQL via psql against local Supabase** + +```bash +docker compose --profile supabase up -d +docker compose exec supabase-db psql -U postgres -d postgres -c " +ALTER TABLE payment_intents + ADD COLUMN IF NOT EXISTS idempotency_key TEXT; +CREATE UNIQUE INDEX IF NOT EXISTS idx_payment_intents_idempotency_key + ON payment_intents(idempotency_key) + WHERE idempotency_key IS NOT NULL; +" +``` + +Expected: `ALTER TABLE` and `CREATE INDEX` (or `NOTICE: relation already exists, skipping`). + +- [ ] **Step 4: Confirm column + index exist** + +```bash +docker compose exec supabase-db psql -U postgres -d postgres -c "\d payment_intents" | grep -E "idempotency_key|idx_payment_intents_idempotency" +``` + +Expected: + +- `idempotency_key | text` row in column listing +- `idx_payment_intents_idempotency_key UNIQUE, btree (idempotency_key) WHERE idempotency_key IS NOT NULL` in index listing + +- [ ] **Step 5: Commit** + +```bash +docker compose exec scripthammer git add supabase/migrations/20251006_complete_monolithic_setup.sql +docker compose exec scripthammer git commit -m "feat(payments): add idempotency_key partial unique index to payment_intents + +Supports the offline-queue retry safety work tracked in #52: a queued +INSERT that's retried after a tab crash must not produce a duplicate +payment_intents row. The partial unique index (WHERE idempotency_key IS +NOT NULL) keeps existing rows and direct-server INSERTs valid; only +client-queued INSERTs participate in dedupe. + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 3: Add `processingTimeoutMs` and `conflicted` to types + +**Files:** + +- Modify: `src/lib/offline-queue/types.ts:36-68` + +- [ ] **Step 1: Add `processingTimeoutMs` to `QueueConfig`** + +Edit `QueueConfig` interface to add a new field after `backoffMultiplier`: + +```ts +export interface QueueConfig { + /** IndexedDB database name */ + dbName: string; + /** IndexedDB table/store name */ + tableName: string; + /** Maximum retry attempts before marking as failed */ + maxRetries: number; + /** Initial delay in ms for exponential backoff */ + initialDelayMs: number; + /** Backoff multiplier (e.g., 2 for doubling) */ + backoffMultiplier: number; + /** + * Watchdog: reclaim items stuck in `processing` longer than this (ms). + * Defends against tab crashes between claim and completion. Default 60_000. + * The processItem implementation must be idempotent for safe reclaim — see + * payment-adapter's idempotency_key path for the pattern. + */ + processingTimeoutMs: number; +} +``` + +- [ ] **Step 2: Add the default value to `DEFAULT_QUEUE_CONFIG`** + +```ts +export const DEFAULT_QUEUE_CONFIG: Omit = { + maxRetries: 5, + initialDelayMs: 1000, + backoffMultiplier: 2, + processingTimeoutMs: 60_000, +}; +``` + +- [ ] **Step 3: Add `conflicted` to `SyncResult`** + +```ts +export interface SyncResult { + /** Number of items successfully processed (fresh work) */ + success: number; + /** Number of items that failed */ + failed: number; + /** Number of items skipped (e.g., still in backoff) */ + skipped: number; + /** Number of items completed via dedupe (server already had this work) */ + conflicted: number; +} +``` + +- [ ] **Step 4: Add `ProcessItemResult` type at end of file** + +```ts +/** + * Optional return value from processItem. Subclasses that don't need to + * distinguish fresh-success from dedupe can keep returning void; void is + * treated as `{ status: 'completed' }`. + */ +export type ProcessItemResult = { status: 'completed' | 'conflicted' }; +``` + +- [ ] **Step 5: Run type-check to confirm no type errors** + +```bash +docker compose exec scripthammer pnpm run type-check +``` + +Expected: clean (no output after `> tsc --noEmit`). + +- [ ] **Step 6: Commit** + +```bash +docker compose exec scripthammer git add src/lib/offline-queue/types.ts +docker compose exec scripthammer git commit -m "feat(offline-queue): add processingTimeoutMs and conflicted accounting types + +Prepares the type surface for the watchdog-reclaim and conflict-aware +accounting that follow in subsequent commits. void return from +processItem remains valid for backward compatibility. + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 4: Watchdog reclaim test (TDD — write failing first) + +**Files:** + +- Modify: `src/lib/offline-queue/__tests__/base-queue.test.ts` + +- [ ] **Step 1: Add the watchdog reclaim test to the existing describe block** + +Open `src/lib/offline-queue/__tests__/base-queue.test.ts`. Find the existing `describe('BaseOfflineQueue', () => {` block. Add this case as the _last_ `it` in the block (before the closing `});`): + +```ts +it('reclaims items stuck in processing past processingTimeoutMs', async () => { + // Queue an item, then manually move it to `processing` with a stale + // lastAttempt — simulating a prior tab that crashed mid-processing. + const queued = await queue.queue({ data: 'recover-me' }); + const longAgo = Date.now() - 70_000; // 70s > default 60s timeout + // Use the protected `items` table directly via a test-only escape hatch. + await ( + queue as unknown as { + items: { + update: ( + id: number, + changes: Record + ) => Promise; + }; + } + ).items.update(queued.id!, { status: 'processing', lastAttempt: longAgo }); + + // Sync should reclaim the row (status: pending) and then process it. + const result = await queue.sync(); + + expect(result.success).toBe(1); + expect(queue.processedItems).toHaveLength(1); + expect(queue.processedItems[0].data).toBe('recover-me'); + + const final = await queue.get(queued.id!); + expect(final?.status).toBe('completed'); +}); +``` + +- [ ] **Step 2: Run the test, expect failure** + +```bash +docker compose exec scripthammer pnpm vitest run src/lib/offline-queue/__tests__/base-queue.test.ts -t "reclaims items stuck in processing" +``` + +Expected: FAIL — the test runs but stuck `processing` items aren't reclaimed yet, so `result.success` is `0` (the item stays `processing`, never enters the loop). + +--- + +## Task 5: Implement watchdog reclaim in base-queue.ts + +**Files:** + +- Modify: `src/lib/offline-queue/base-queue.ts:173-282` (the `sync()` method) + +- [ ] **Step 1: Insert the watchdog sweep at the top of `sync()`, immediately after the `syncInProgress = true` assignment** + +In `sync()`, after this block: + +```ts +this.syncInProgress = true; + +try { + const pending = await this.getPending(); +``` + +Replace it with: + +```ts +this.syncInProgress = true; + +try { + // Watchdog: reclaim items stuck in `processing` past the timeout. Defends + // against tab crashes between the claim and the completion update. The + // reclaim is itself an atomic Dexie modify, so racing tabs converge. + // processItem must be idempotent for safe reclaim — see payment-adapter's + // idempotency_key path for the pattern. + const reclaimNow = Date.now(); + const reclaimedCount = await this.items + .where('status') + .equals('processing') + .and( + (row) => + !!row.lastAttempt && + reclaimNow - row.lastAttempt > this.config.processingTimeoutMs + ) + .modify({ status: 'pending' as QueueStatus }); + if (reclaimedCount > 0) { + this.logger.warn('Reclaimed stuck processing items', { + count: reclaimedCount, + processingTimeoutMs: this.config.processingTimeoutMs, + }); + } + + const pending = await this.getPending(); +``` + +- [ ] **Step 2: Run the test, expect pass** + +```bash +docker compose exec scripthammer pnpm vitest run src/lib/offline-queue/__tests__/base-queue.test.ts -t "reclaims items stuck in processing" +``` + +Expected: PASS. + +- [ ] **Step 3: Run the full base-queue test file to confirm no regressions** + +```bash +docker compose exec scripthammer pnpm vitest run src/lib/offline-queue/__tests__/base-queue.test.ts +``` + +Expected: all existing tests still pass. + +- [ ] **Step 4: Commit** + +```bash +docker compose exec scripthammer git add src/lib/offline-queue/__tests__/base-queue.test.ts src/lib/offline-queue/base-queue.ts +docker compose exec scripthammer git commit -m "feat(offline-queue): watchdog reclaim for stuck processing items + +A tab that crashes after atomically claiming a queue item but before +writing the completion update leaves the row in 'processing' forever. +The claim guard requires status === 'pending', so no other tab will +ever re-claim it. The watchdog runs at the top of sync() and resets +'processing' items older than processingTimeoutMs (default 60s) back +to 'pending'. processItem MUST be idempotent for safe reclaim — that +side of the contract is enforced for payment_intents in a follow-up +commit (idempotency_key partial unique index). + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 6: Test that fresh `processing` is not reclaimed (TDD) + +**Files:** + +- Modify: `src/lib/offline-queue/__tests__/base-queue.test.ts` + +- [ ] **Step 1: Add the negative-case test next to the previous one** + +```ts +it('leaves fresh processing items alone (within processingTimeoutMs)', async () => { + const queued = await queue.queue({ data: 'leave-me-alone' }); + const recent = Date.now() - 30_000; // 30s < default 60s timeout + await ( + queue as unknown as { + items: { + update: ( + id: number, + changes: Record + ) => Promise; + }; + } + ).items.update(queued.id!, { status: 'processing', lastAttempt: recent }); + + const result = await queue.sync(); + + // Item was NOT reclaimed — it stayed in `processing` and never + // entered the per-item loop, so processedItems is empty and the + // sync result reports nothing for this item. + expect(queue.processedItems).toHaveLength(0); + expect(result.success).toBe(0); + + const final = await queue.get(queued.id!); + expect(final?.status).toBe('processing'); +}); +``` + +- [ ] **Step 2: Run the test, expect pass** + +```bash +docker compose exec scripthammer pnpm vitest run src/lib/offline-queue/__tests__/base-queue.test.ts -t "leaves fresh processing" +``` + +Expected: PASS (the watchdog's `>` comparison correctly excludes 30s-old items). + +- [ ] **Step 3: Commit** + +```bash +docker compose exec scripthammer git add src/lib/offline-queue/__tests__/base-queue.test.ts +docker compose exec scripthammer git commit -m "test(offline-queue): pin watchdog negative case (fresh processing preserved) + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 7: Conflict-aware accounting test (TDD) + +**Files:** + +- Modify: `src/lib/offline-queue/__tests__/base-queue.test.ts` + +- [ ] **Step 1: Extend the TestQueue class to optionally return a conflict result** + +Find the `class TestQueue extends BaseOfflineQueue` block. Add a new public flag `shouldReturnConflict = false` near `shouldFail`. Modify `processItem` to honor it: + +```ts +class TestQueue extends BaseOfflineQueue { + public processedItems: TestQueueItem[] = []; + public shouldFail = false; + public shouldReturnConflict = false; + public failCount = 0; + + constructor() { + super({ + dbName: TEST_DB_NAME, + tableName: 'testItems', + ...DEFAULT_QUEUE_CONFIG, + }); + } + + protected async processItem(item: TestQueueItem) { + if (this.shouldFail) { + this.failCount++; + throw new Error('Test failure'); + } + this.processedItems.push(item); + if (this.shouldReturnConflict) { + return { status: 'conflicted' as const }; + } + } + + // Expose protected method for testing + public async testMarkAsFailed(id: number): Promise { + await this.markAsFailed(id); + } +} +``` + +- [ ] **Step 2: Add the conflict test case at the bottom of the describe block** + +```ts +it('counts conflicted as completed but in the conflicted bucket', async () => { + queue.shouldReturnConflict = true; + await queue.queue({ data: 'dedupe-me' }); + + const result = await queue.sync(); + + expect(result.success).toBe(0); + expect(result.conflicted).toBe(1); + expect(result.failed).toBe(0); + + // The queue row is still marked completed (the item is done). + const all = await queue.getQueue(); + expect(all[0].status).toBe('completed'); +}); +``` + +- [ ] **Step 3: Run the test, expect failure** + +```bash +docker compose exec scripthammer pnpm vitest run src/lib/offline-queue/__tests__/base-queue.test.ts -t "counts conflicted" +``` + +Expected: FAIL — `result.conflicted` is `undefined` (or the assertion `success === 0` fails because conflicts currently flow into the success counter), and the type-check may also flag missing `conflicted` in `SyncResult` returns. + +--- + +## Task 8: Implement conflict accounting in base-queue.ts + +**Files:** + +- Modify: `src/lib/offline-queue/base-queue.ts:173-282` (the `sync()` method, the per-item try block specifically) + +- [ ] **Step 1: Initialize the new counter and use the result type** + +Inside `sync()`, find the line `let success = 0;` and replace the three counter declarations with four: + +```ts +let success = 0; +let failed = 0; +let skipped = 0; +let conflicted = 0; +``` + +- [ ] **Step 2: Update the try block that calls `processItem` to inspect its return value** + +Find this block: + +```ts +try { + // Process the item (implemented by subclass) + await this.processItem(item); + + // Mark as completed + + await this.items.update(item.id!, { + status: 'completed', + } as any); + + success++; + this.logger.debug('Item processed successfully', { id: item.id }); +} catch (error) { +``` + +Replace it with: + +```ts +try { + // Process the item (implemented by subclass). The subclass may return + // `{ status: 'conflicted' }` when it detected a server-side dedupe + // (its work was already done by a prior attempt). void return is + // treated as completed for backwards compatibility. + const processResult = await this.processItem(item); + + await this.items.update(item.id!, { + status: 'completed', + } as any); + + if (processResult?.status === 'conflicted') { + conflicted++; + this.logger.info( + 'Item completed via dedupe (server already had this work)', + { id: item.id } + ); + } else { + success++; + this.logger.debug('Item processed successfully', { id: item.id }); + } +} catch (error) { +``` + +- [ ] **Step 3: Update the final return + logger call to include `conflicted`** + +Find this block at the end of `sync()`: + +```ts +this.logger.info('Sync complete', { success, failed, skipped }); +return { success, failed, skipped }; +``` + +Replace with: + +```ts +this.logger.info('Sync complete', { success, failed, skipped, conflicted }); +return { success, failed, skipped, conflicted }; +``` + +- [ ] **Step 4: Update the early-return for empty queue and the syncInProgress guard to include conflicted** + +Find this block (early return when queue is empty): + +```ts +if (pending.length === 0) { + return { success: 0, failed: 0, skipped: 0 }; +} +``` + +Replace with: + +```ts +if (pending.length === 0) { + return { success: 0, failed: 0, skipped: 0, conflicted: 0 }; +} +``` + +Find this block (syncInProgress guard): + +```ts +if (this.syncInProgress) { + this.logger.debug('Sync already in progress, skipping'); + return { success: 0, failed: 0, skipped: 0 }; +} +``` + +Replace with: + +```ts +if (this.syncInProgress) { + this.logger.debug('Sync already in progress, skipping'); + return { success: 0, failed: 0, skipped: 0, conflicted: 0 }; +} +``` + +- [ ] **Step 5: Update `processItem` abstract signature to allow ProcessItemResult** + +Find this: + +```ts +/** + * Process a single queue item + * Must be implemented by subclasses with domain-specific logic + * + * @param item - Item to process + * @throws Error if processing fails (will trigger retry) + */ +protected abstract processItem(item: T): Promise; +``` + +Replace with: + +```ts +/** + * Process a single queue item. + * Must be implemented by subclasses with domain-specific logic. + * + * Subclasses may return `{ status: 'conflicted' }` to signal the work + * was already completed by a prior attempt (server-side dedupe). void + * return is treated as `{ status: 'completed' }`. + * + * @param item - Item to process + * @throws Error if processing fails (will trigger retry) + */ +protected abstract processItem(item: T): Promise; +``` + +- [ ] **Step 6: Add `ProcessItemResult` to the imports at top of base-queue.ts** + +Find this: + +```ts +import { + BaseQueueItem, + QueueConfig, + QueueStatus, + SyncResult, + DEFAULT_QUEUE_CONFIG, +} from './types'; +``` + +Replace with: + +```ts +import { + BaseQueueItem, + ProcessItemResult, + QueueConfig, + QueueStatus, + SyncResult, + DEFAULT_QUEUE_CONFIG, +} from './types'; +``` + +- [ ] **Step 7: Run the conflict test, expect pass** + +```bash +docker compose exec scripthammer pnpm vitest run src/lib/offline-queue/__tests__/base-queue.test.ts -t "counts conflicted" +``` + +Expected: PASS. + +- [ ] **Step 8: Run the full file to confirm no regressions** + +```bash +docker compose exec scripthammer pnpm vitest run src/lib/offline-queue/__tests__/base-queue.test.ts +docker compose exec scripthammer pnpm run type-check +``` + +Expected: all green; type-check clean. + +- [ ] **Step 9: Commit** + +```bash +docker compose exec scripthammer git add src/lib/offline-queue/types.ts src/lib/offline-queue/base-queue.ts src/lib/offline-queue/__tests__/base-queue.test.ts +docker compose exec scripthammer git commit -m "feat(offline-queue): conflict-aware accounting in sync() + +processItem may now return { status: 'conflicted' } to signal that the +work was already done server-side (typical case: an idempotency-key +INSERT that hit ON CONFLICT DO NOTHING). The queue row still marks +completed, but SyncResult tracks the dedupe count separately for +observability. void return remains valid and is treated as +{ status: 'completed' } so existing subclasses don't break. + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 9: Payment adapter — generate idempotency_key at queue-time + +**Files:** + +- Modify: `src/lib/offline-queue/payment-adapter.ts` + +- [ ] **Step 1: Read the current `queuePaymentIntent` method** + +```bash +grep -n "queuePaymentIntent\|queueSubscriptionUpdate" src/lib/offline-queue/payment-adapter.ts +``` + +Note the line numbers; the data shape is what we need to mutate. + +- [ ] **Step 2: Modify `queuePaymentIntent` to inject an idempotency_key into data** + +Find the `queuePaymentIntent` method. Inside the body, before the `return await this.queue(...)` call, generate the key and merge it into `data`: + +```ts +async queuePaymentIntent( + intent: Record, + userId: string +): Promise { + // Stable key for offline-queue dedupe across retries (#52). Generated + // once at queue-time so retries of the same logical INSERT all carry + // the same key. The INSERT is server-side idempotent via a partial + // UNIQUE INDEX on payment_intents.idempotency_key. + const idempotencyKey = + (intent.idempotency_key as string | undefined) ?? + crypto.randomUUID(); + + return await this.queue({ + type: 'payment_intent', + data: { ...intent, idempotency_key: idempotencyKey }, + userId, + } as Omit); +} +``` + +(Adjust the parameter list to match what's actually there — read the file first and preserve any existing parameter names. The point is: inject `idempotency_key` into `data` if not already present.) + +- [ ] **Step 3: Run type-check** + +```bash +docker compose exec scripthammer pnpm run type-check +``` + +Expected: clean. + +- [ ] **Step 4: Commit** + +```bash +docker compose exec scripthammer git add src/lib/offline-queue/payment-adapter.ts +docker compose exec scripthammer git commit -m "feat(payments): generate idempotency_key at offline-queue time + +Stable key per logical payment intent. Retries of a queued payment +INSERT now share the same key, so server-side ON CONFLICT DO NOTHING +makes them no-ops instead of duplicates. + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 10: Payment adapter — upsert with ignoreDuplicates + +**Files:** + +- Modify: `src/lib/offline-queue/payment-adapter.ts:107-143` (the `executePaymentIntent` private method) + +- [ ] **Step 1: Replace the INSERT with an upsert + dedupe-aware return** + +Find the entire `executePaymentIntent` method body. Replace its contents with: + +```ts +private async executePaymentIntent( + data: Record, + storedUserId?: string +): Promise { + let userId = storedUserId; + + if (!userId) { + const { + data: { user }, + error: authError, + } = await supabase.auth.getUser(); + + if (authError || !user) { + throw new Error('Must be authenticated to execute payment intent'); + } + userId = user.id; + } + + // Idempotency key: prefer the one queued at intake. If absent (older + // queue rows from before this column shipped), generate a fresh one + // and warn — that retry chain will dedupe with itself but not with + // any prior attempt that lacked a key. + let idempotencyKey = data.idempotency_key as string | undefined; + if (!idempotencyKey) { + idempotencyKey = crypto.randomUUID(); + this.logger.warn( + 'Queued payment_intent missing idempotency_key — generating one. ' + + 'Retries of this row will dedupe; prior attempts without a key will not.', + { generatedKey: idempotencyKey } + ); + } + + // Upsert with ignoreDuplicates → INSERT ... ON CONFLICT (idempotency_key) + // DO NOTHING server-side. A retry whose prior attempt actually wrote + // the row produces zero rows here; we treat that as a conflicted + // completion (work already done, do not re-charge). + const { data: inserted, error } = await supabase + .from('payment_intents') + .upsert( + { + amount: data.amount as number, + currency: data.currency as string, + type: data.type as string, + interval: (data.interval as string) || null, + customer_email: data.customer_email as string, + description: (data.description as string) || null, + metadata: (data.metadata || {}) as Json, + template_user_id: userId, + idempotency_key: idempotencyKey, + }, + { onConflict: 'idempotency_key', ignoreDuplicates: true } + ) + .select('id') + .maybeSingle(); + + if (error) { + throw new Error(`Failed to create payment intent: ${error.message}`); + } + + if (!inserted) { + return { status: 'conflicted' }; + } + + return { status: 'completed' }; +} +``` + +- [ ] **Step 2: Add `ProcessItemResult` import** + +At the top of `payment-adapter.ts`, find the import from `./types` and add `ProcessItemResult`: + +```ts +import { + BaseQueueItem, + PaymentQueueItem, + ProcessItemResult, + QueueConfig, + // ... whatever else is already imported from types +} from './types'; +``` + +(Adjust to match the actual existing import shape.) + +- [ ] **Step 3: Update `processItem` return type to allow ProcessItemResult** + +Find the `processItem` override in the adapter. Update the signature: + +```ts +protected async processItem(item: PaymentQueueItem): Promise { + switch (item.type) { + case 'payment_intent': + return await this.executePaymentIntent(item.data, item.userId); + case 'subscription_update': + await this.executeSubscriptionUpdate(item.data); + return; + default: + throw new Error(`Unknown payment operation type: ${item.type}`); + } +} +``` + +(The `subscription_update` branch keeps `void` return since UPDATE is implicitly idempotent by primary key — no dedupe distinction needed.) + +- [ ] **Step 4: Run type-check** + +```bash +docker compose exec scripthammer pnpm run type-check +``` + +Expected: clean. + +- [ ] **Step 5: Run all existing payment-related tests to confirm no regressions** + +```bash +docker compose exec scripthammer pnpm vitest run tests/unit/payment-service.test.ts +``` + +Expected: all existing tests pass (none of them assert on the INSERT shape; they're at a higher abstraction). + +- [ ] **Step 6: Commit** + +```bash +docker compose exec scripthammer git add src/lib/offline-queue/payment-adapter.ts +docker compose exec scripthammer git commit -m "feat(payments): upsert payment_intents with ignoreDuplicates for offline retry safety + +Replaces the prior insert() with upsert(..., { onConflict: +'idempotency_key', ignoreDuplicates: true }), which maps to INSERT ... +ON CONFLICT DO NOTHING server-side. A queued retry whose prior attempt +already created the row now returns zero rows; the adapter signals +{ status: 'conflicted' } and the queue row marks completed without +double-charging. + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 11: Payment adapter dedupe test + +**Files:** + +- Create: `src/lib/offline-queue/__tests__/payment-adapter.test.ts` + +- [ ] **Step 1: Create the test file with 2 cases** + +```ts +/** + * Unit tests for PaymentQueueAdapter idempotent INSERT path. + * + * Validates the offline-queue retry safety contract from #52: + * - A retry whose prior attempt succeeded server-side is a no-op. + * - A queued item without an idempotency_key falls back to generating + * one and logs a warning. + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; + +// Mock the Supabase client surface PaymentQueueAdapter touches. We mock +// at the import-graph boundary to avoid pulling a real network client. +const mockUpsert = vi.fn(); +const mockSelect = vi.fn(); +const mockMaybeSingle = vi.fn(); +const mockFrom = vi.fn(() => ({ + upsert: mockUpsert.mockReturnValue({ + select: mockSelect.mockReturnValue({ + maybeSingle: mockMaybeSingle, + }), + }), +})); + +vi.mock('@/lib/supabase/client', () => ({ + supabase: { + from: mockFrom, + auth: { + getUser: vi.fn(async () => ({ + data: { user: { id: 'test-user-1' } }, + error: null, + })), + }, + }, +})); + +// Import AFTER the mock so the adapter binds to the mocked supabase. +import { PaymentQueueAdapter } from '../payment-adapter'; + +describe('PaymentQueueAdapter (idempotent INSERT path, #52)', () => { + let adapter: PaymentQueueAdapter; + + beforeEach(async () => { + mockMaybeSingle.mockReset(); + mockFrom.mockClear(); + mockUpsert.mockClear(); + mockSelect.mockClear(); + adapter = new PaymentQueueAdapter(); + await adapter.clear(); + }); + + it('queues an item and processes it via upsert with the queued idempotency_key', async () => { + mockMaybeSingle.mockResolvedValueOnce({ + data: { id: 'intent-1' }, + error: null, + }); + + const intent = { + amount: 1000, + currency: 'usd', + type: 'one_time', + customer_email: 'a@example.com', + idempotency_key: 'fixed-key-1', + }; + + await adapter.queuePaymentIntent(intent, 'test-user-1'); + const result = await adapter.sync(); + + expect(result.success).toBe(1); + expect(result.conflicted).toBe(0); + expect(mockFrom).toHaveBeenCalledWith('payment_intents'); + // The upsert must have received the queued idempotency_key and the + // ignoreDuplicates option. + const [payload, options] = mockUpsert.mock.calls[0]; + expect(payload.idempotency_key).toBe('fixed-key-1'); + expect(options).toEqual({ + onConflict: 'idempotency_key', + ignoreDuplicates: true, + }); + }); + + it('treats a zero-row upsert response as conflicted (work was already done)', async () => { + // First call: server already had this row → upsert returns null data. + mockMaybeSingle.mockResolvedValueOnce({ data: null, error: null }); + + await adapter.queuePaymentIntent( + { + amount: 1000, + currency: 'usd', + type: 'one_time', + customer_email: 'a@example.com', + idempotency_key: 'already-inserted-key', + }, + 'test-user-1' + ); + const result = await adapter.sync(); + + expect(result.success).toBe(0); + expect(result.conflicted).toBe(1); + expect(result.failed).toBe(0); + }); +}); +``` + +- [ ] **Step 2: Run the new test file, expect pass (mocks align with the implementation)** + +```bash +docker compose exec scripthammer pnpm vitest run src/lib/offline-queue/__tests__/payment-adapter.test.ts +``` + +Expected: 2 tests pass. If a test fails, the mock shape may not match the actual upsert chain — read the actual error and adjust the chain (e.g., `.select('id')` may need its own `mockReturnValue`). + +- [ ] **Step 3: Commit** + +```bash +docker compose exec scripthammer git add src/lib/offline-queue/__tests__/payment-adapter.test.ts +docker compose exec scripthammer git commit -m "test(payments): pin idempotent-INSERT and conflicted-completion contracts + +Two regression cases for the #52 retry-safety surface: +1. queued idempotency_key flows into the upsert payload with the + correct onConflict + ignoreDuplicates options. +2. A zero-row upsert response is treated as conflicted (the prior + attempt's row is already there); the queue row still completes, + but result.conflicted increments instead of result.success. + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 12: Apply migration to cloud Supabase + +**Files:** + +- (none — uses Supabase Management API per CLAUDE.md) + +- [ ] **Step 1: Confirm cloud env vars present** + +```bash +grep -E "^SUPABASE_PROJECT_REF=|^SUPABASE_ACCESS_TOKEN=" .env | sed 's/=.*/=/' +``` + +Expected: both lines present (don't print values). + +- [ ] **Step 2: Apply the column + index via Management API** + +```bash +PROJECT_REF=$(grep -E "^SUPABASE_PROJECT_REF=" .env | cut -d= -f2) +ACCESS_TOKEN=$(grep -E "^SUPABASE_ACCESS_TOKEN=" .env | cut -d= -f2) + +curl -fsS -X POST \ + "https://api.supabase.com/v1/projects/${PROJECT_REF}/database/query" \ + -H "Authorization: Bearer ${ACCESS_TOKEN}" \ + -H "Content-Type: application/json" \ + -d '{"query": "ALTER TABLE payment_intents ADD COLUMN IF NOT EXISTS idempotency_key TEXT; CREATE UNIQUE INDEX IF NOT EXISTS idx_payment_intents_idempotency_key ON payment_intents(idempotency_key) WHERE idempotency_key IS NOT NULL;"}' +``` + +Expected: HTTP 200 / JSON payload with no error. + +- [ ] **Step 3: Verify the cloud schema matches** + +```bash +curl -fsS -X POST \ + "https://api.supabase.com/v1/projects/${PROJECT_REF}/database/query" \ + -H "Authorization: Bearer ${ACCESS_TOKEN}" \ + -H "Content-Type: application/json" \ + -d '{"query": "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '\''payment_intents'\'' AND column_name = '\''idempotency_key'\''"}' +``` + +Expected: JSON containing `idempotency_key | text`. + +--- + +## Task 13: Final verification and PR + +**Files:** + +- (none — verification only, then PR) + +- [ ] **Step 1: Run full offline-queue + RLS suites** + +```bash +docker compose exec scripthammer pnpm vitest run src/lib/offline-queue/ +docker compose exec scripthammer pnpm test:rls +``` + +Expected: + +- offline-queue: all green (existing + new cases) +- RLS: 55/55 (the new column doesn't change RLS behavior; existing policies still work) + +- [ ] **Step 2: Type-check + lint affected files** + +```bash +docker compose exec scripthammer pnpm run type-check +docker compose exec scripthammer pnpm exec eslint src/lib/offline-queue/ +``` + +Expected: both clean. + +- [ ] **Step 3: Push branch from host** + +```bash +git push -u origin 052/offline-queue-watchdog-idempotency +``` + +- [ ] **Step 4: Open PR** + +```bash +gh pr create --repo TortoiseWolfe/ScriptHammer \ + --base main \ + --head 052/offline-queue-watchdog-idempotency \ + --title "fix(offline-queue): watchdog reclaim + idempotent payment INSERTs (#52)" \ + --body "Closes #52. + +## Summary + +Two coupled changes that together close the offline-queue retry safety hole: + +1. **Watchdog reclaim** in \`base-queue.ts\`: stuck \`processing\` rows older than 60s reset to \`pending\` so a tab-crash mid-process doesn't leave the user in limbo. +2. **Idempotency** on the INSERT side via partial unique index on \`payment_intents.idempotency_key\` + \`upsert(..., { onConflict, ignoreDuplicates: true })\`. A retry whose prior attempt actually succeeded is now a server-side no-op, not a double-charge. + +Shipping reclaim alone would *create* double-charges; idempotency alone wouldn't fix the limbo case. They land together. + +## Test plan + +- [x] \`pnpm vitest run src/lib/offline-queue/\` — all green including 5 new cases (3 base-queue, 2 payment-adapter). +- [x] \`pnpm test:rls\` — 55/55 (RLS policies unaffected). +- [x] \`pnpm run type-check\` + \`pnpm exec eslint src/lib/offline-queue/\` — clean. +- [x] Cloud schema migrated via Supabase Management API (per CLAUDE.md monolithic-migration rule). + +## Spec + design + +\`docs/superpowers/specs/2026-04-27-offline-queue-watchdog-and-idempotency-design.md\` + +🤖 Generated with [Claude Code](https://claude.com/claude-code)" +``` + +- [ ] **Step 5: Wait for PR CI; merge if green** + +Watch the PR's E2E and CI runs. If green, squash-merge with `--delete-branch`. If red, investigate the failure (the only plausible regression is in the test suite — the runtime change is conservative). + +--- + +## Self-review + +**Spec coverage:** + +- Schema (Change 1): Task 2 ✓ +- Watchdog (Change 2): Tasks 4-6 ✓ +- Conflict accounting (Change 3): Tasks 7-8 ✓ +- payment-adapter changes (Change 4): Tasks 9-10 ✓ +- Tests 1+2 from spec (watchdog + fresh): Tasks 5-6 ✓ +- Test 3 from spec (conflicted): Task 7 ✓ +- Tests 4+5 from spec (dedupe + missing-key): Task 11 ✓ +- Verification commands match spec: Task 13 ✓ + +**Placeholder scan:** No "TBD", "TODO", "implement later", "appropriate error handling", "similar to". All code shown verbatim. ✓ + +**Type consistency:** `ProcessItemResult` defined in Task 3, imported and used in Tasks 5, 8, 10. `SyncResult.conflicted` introduced in Task 3, populated in Task 8, asserted in Task 7. `processingTimeoutMs` defined in Task 3, consumed in Task 5. All consistent. ✓ + +**Frequent commits:** 8 commits across tasks 2-11. Each is a single concern: schema, types, watchdog implementation + test, fresh-processing test, conflict implementation + test, adapter key-generation, adapter upsert, adapter dedupe tests. ✓ + +**TDD:** Tests precede implementation in Tasks 4→5 (watchdog), 7→8 (conflict). Adapter tests (Task 11) follow implementation since they're integration-style mock-driven and pinning behavior, not driving design. ✓ + +--- + +## Execution notes + +- One file under monolithic migration ban: cloud apply via Management API (Task 12), not by editing a separate migration file. +- The "fresh processing" negative test (Task 6) does not have a paired implementation step — the watchdog code from Task 5 already handles it correctly; this case is regression coverage to prevent a future "bump processingTimeoutMs to 0" mistake. +- The mock chain in Task 11 may need adjustment if the actual `payment-adapter` upsert uses a different fluent shape; the Step 2 instruction explicitly invites reading the actual error and adjusting. diff --git a/docs/superpowers/specs/2026-04-27-offline-queue-watchdog-and-idempotency-design.md b/docs/superpowers/specs/2026-04-27-offline-queue-watchdog-and-idempotency-design.md new file mode 100644 index 00000000..3ae97475 --- /dev/null +++ b/docs/superpowers/specs/2026-04-27-offline-queue-watchdog-and-idempotency-design.md @@ -0,0 +1,204 @@ +# Offline Queue: Watchdog Reclaim + Idempotency + +**Date**: 2026-04-27 · **Tracks**: [#52](https://github.com/TortoiseWolfe/ScriptHammer/issues/52) (Family A2 in [`docs/STABILITY-TRACKING.md`](../../STABILITY-TRACKING.md)) + +## Context + +`src/lib/offline-queue/base-queue.ts:214-247` atomically claims a queue item via Dexie's implicit transaction (`where().and(pending).modify({status:'processing'})`), but the subsequent `await this.processItem(item)` and the completion update span tabs without a single transaction. + +Two failure modes follow from this: + +1. **Stuck `processing` rows.** A tab that crashes after claiming an item but before the completion update leaves the item in `processing` forever. The claim guard requires `status === 'pending'`, so no other tab will ever re-claim it. The user's payment-intent or message is in limbo. + +2. **Double-charge if we naively retry.** If a watchdog blindly resets stuck `processing` rows to `pending` to fix #1, a tab might retry an INSERT whose previous attempt actually succeeded server-side — producing a duplicate `payment_intents` row. The user gets charged twice. + +Today, neither watchdog nor idempotency exist, so the failure mode is "in limbo" rather than "double charge." The honest fix has to address both: introduce reclaim **and** introduce idempotency on the receiving end. Doing only one is worse than doing neither. + +The intended outcome is: **at-least-once delivery + idempotent receiver = exactly-once observable outcome.** No "exactly-once" myth in the queue itself. + +## Scope + +In: + +- `payment_intents` INSERT path (the only `processItem` implementation that creates server-side rows currently) +- `base-queue.ts` watchdog + completed/conflicted accounting +- Schema change to monolithic migration (`payment_intents.idempotency_key`) + +Out: + +- Messaging offline queue — separate service, separate concerns (encryption, FIFO). Filed separately if/when the same shape bites. +- `subscription_update` operation in `payment-adapter` — UPDATE has implicit idempotency by primary key. No change needed. +- Backfill of historical `payment_intents` rows — they expire at 24h per the existing `expires_at DEFAULT NOW() + INTERVAL '24 hours'`. +- DRYing `BaseOfflineQueue` against the messaging service. Premature. + +## Architecture + +Four changes across three concerns (schema, queue base, payment adapter); each isolated, but they must land together because shipping reclaim without idempotency creates double-charges. + +### Change 1: Schema — add idempotency key to `payment_intents` + +In `supabase/migrations/20251006_complete_monolithic_setup.sql`, in the `payment_intents` section (after the table definition, before the COMMENT at line 54): + +```sql +ALTER TABLE payment_intents + ADD COLUMN IF NOT EXISTS idempotency_key TEXT; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_payment_intents_idempotency_key + ON payment_intents(idempotency_key) + WHERE idempotency_key IS NOT NULL; +``` + +**Partial unique index** (the `WHERE` clause): only enforces uniqueness when `idempotency_key` is set. Pre-existing rows and any future direct-server INSERT (admin tooling, edge functions) that don't supply a key remain valid. Only client-queued INSERTs participate in dedupe. + +**No NOT NULL + DEFAULT.** Pre-existing rows would need backfill; admin tooling would need a contract change; both for zero benefit since only the offline-queue path uses dedupe. + +**Client-generated UUID, not server-side default.** The whole point is the **same** key reused across retries. A server default would generate a fresh key on every attempt, defeating dedupe. + +Applied locally via `docker compose exec supabase-db psql`; applied to cloud via Supabase Management API per CLAUDE.md. + +### Change 2: Watchdog reclaim in `base-queue.ts` + +Add `processingTimeoutMs?: number` (default `60_000`) to `QueueConfig` in `types.ts`. At the top of `sync()`, before the existing `for (const item of pending)` loop, sweep stale `processing` rows: + +```ts +const now = Date.now(); +const reclaimedCount = await this.items + .where('status') + .equals('processing') + .and( + (row) => + !!row.lastAttempt && + now - row.lastAttempt > this.config.processingTimeoutMs + ) + .modify({ status: 'pending' as QueueStatus }); +if (reclaimedCount > 0) { + this.logger.warn('Reclaimed stuck processing items', { + count: reclaimedCount, + }); +} +``` + +Then re-fetch `pending` so reclaimed items participate in this sync. The reclaim is itself an atomic Dexie modify, safe across tabs. + +The default 60s is generous enough that most successful operations complete first time and don't get reclaimed during legitimate slow networks. It's also short enough that a true crash recovers within a normal user session. + +### Change 3: Conflict-aware accounting + +Change `processItem`'s contract so it can return `{ status: 'completed' | 'conflicted' } | void`: + +- `void` (existing) → treated as `{ status: 'completed' }` for backwards compatibility +- `{ status: 'completed' }` → fresh work succeeded +- `{ status: 'conflicted' }` → ON CONFLICT triggered server-side; a prior attempt already inserted the row; we read the existing id back. Queue row marked `completed`; metrics distinguish conflicts. + +In `base-queue.ts`, change the `try` block: + +```ts +const result = await this.processItem(item); +const completionStatus = result?.status ?? 'completed'; + +await this.items.update(item.id!, { status: 'completed' } as any); + +if (completionStatus === 'conflicted') { + this.logger.info('Item completed via dedupe (server already had this work)', { + id: item.id, + }); + conflicted++; +} else { + success++; +} +``` + +`SyncResult` gains `conflicted: number` field. + +### Change 4: payment-adapter.ts — generate key, ON CONFLICT INSERT + +Two changes in `executePaymentIntent`: + +1. The `idempotency_key` flows through `data` (the queue item's payload). Callers (the `queuePaymentIntent` method or any future intake) should generate via `crypto.randomUUID()` at queue-time. If absent, generate one inside `executePaymentIntent` itself before the INSERT — defensive but logs a warning since this means a retry would generate a fresh key (no dedupe). + +2. The INSERT becomes an upsert with `ignoreDuplicates`: + +```ts +const { data: inserted, error } = await supabase + .from('payment_intents') + .upsert( + { + amount: data.amount as number, + currency: data.currency as string, + type: data.type as string, + interval: (data.interval as string) || null, + customer_email: data.customer_email as string, + description: (data.description as string) || null, + metadata: (data.metadata || {}) as Json, + template_user_id: userId, + idempotency_key: idempotencyKey, + }, + { onConflict: 'idempotency_key', ignoreDuplicates: true } + ) + .select('id') + .maybeSingle(); + +if (error) throw new Error(`Failed to create payment intent: ${error.message}`); + +if (!inserted) { + // ON CONFLICT DO NOTHING fired — a prior attempt already created this row. + return { status: 'conflicted' as const }; +} + +return { status: 'completed' as const }; +``` + +Why upsert with ignoreDuplicates and not a direct `.insert()` with `prefer: 'return=representation'` and a manual conflict check: the upsert maps to `INSERT ... ON CONFLICT (idempotency_key) DO NOTHING` server-side, which is exactly the contract we want. Doing it as two queries (insert, then on error select) creates a TOCTOU window. + +## Tests + +Add to `src/lib/offline-queue/__tests__/base-queue.test.ts` (existing file): + +1. **Watchdog reclaims stuck processing**: queue item; manually set `status: processing` + `lastAttempt: 70_000ms ago` via direct Dexie write; call `sync()`; assert item ends `completed` and the watchdog-warn log was emitted. + +2. **Watchdog leaves fresh processing alone**: queue item; set `processing` + `lastAttempt: 30_000ms ago`; call `sync()`; assert item stayed `processing`. Tab-collision invariant: another tab calling `sync()` simultaneously does not double-process. + +3. **Conflicted return is treated as completed**: mock `processItem` to return `{ status: 'conflicted' }`; sync; assert queue row is `completed` and `SyncResult.conflicted === 1` and `SyncResult.success === 0`. + +Add to `src/lib/offline-queue/__tests__/payment-adapter.test.ts` (create if missing): + +4. **Queueing same idempotency_key twice produces one DB row**: queue → sync → directly mark queue row `pending` again (simulating a watchdog reclaim) → sync → query mocked Supabase: only one upsert call landed; both queue entries end `completed`. + +5. **Missing idempotency_key generates one and warns**: queue an item with no `idempotency_key` in data; sync; assert a key was generated, warn was logged, INSERT proceeded. + +## Verification + +End-to-end pass criteria: + +1. `pnpm vitest run src/lib/offline-queue/__tests__/` — all green, including the 5 new cases. +2. `docker compose exec supabase-db psql -U postgres -d postgres -c "\d payment_intents"` shows `idempotency_key TEXT` column and `idx_payment_intents_idempotency_key` partial unique index. +3. `pnpm test:rls` — still 55/55 (RLS policies unaffected by column addition). +4. `pnpm run type-check` + `pnpm exec eslint src/lib/offline-queue/` — clean. +5. The migration file is still a single monolithic source — no separate migration file created. + +## Critical files + +| File | Action | +| ------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------ | +| `supabase/migrations/20251006_complete_monolithic_setup.sql` | Add `idempotency_key` column + partial unique index in `payment_intents` section | +| `src/lib/offline-queue/types.ts` | Add `processingTimeoutMs?` to `QueueConfig`, default 60_000 in `DEFAULT_QUEUE_CONFIG`. Add `conflicted: number` to `SyncResult`. | +| `src/lib/offline-queue/base-queue.ts` | Watchdog reclaim at top of `sync()`; conflict-aware accounting in the try-block; new abstract type for `processItem` return value (backwards compat with void) | +| `src/lib/offline-queue/payment-adapter.ts` | Generate `idempotency_key` at queue-time; upsert with `ignoreDuplicates`; return `{status: completed | conflicted}` | +| `src/lib/offline-queue/__tests__/base-queue.test.ts` | 3 new test cases (watchdog reclaim, leave fresh alone, conflicted handling) | +| `src/lib/offline-queue/__tests__/payment-adapter.test.ts` | 2 new test cases (dedupe across retries, missing-key fallback) — create file if missing | + +## Reused, not reinvented + +- `crypto.randomUUID()` (already used elsewhere in the project) +- Dexie's atomic `where().and().modify()` pattern (used at `base-queue.ts:221-228`) +- Supabase JS upsert with `onConflict` + `ignoreDuplicates` (matches the `webhook_events` table's existing `UNIQUE INDEX (provider, provider_event_id)` precedent at line 150 of the migration) +- `BaseOfflineQueue` test fixtures in `__tests__/` + +## Out of scope (explicit) + +- Messaging offline queue (`src/services/messaging/offline-queue-service.ts`) — different design surface +- Backfill of pre-existing `payment_intents` rows — they expire at 24h +- Generic `useAuthGate`-style refactor across queue subclasses +- A unified Edge Function for "create-payment-intent-with-idempotency" — not needed; the table-level constraint suffices + +If the regression test cases reveal that Dexie's atomic modify isn't actually atomic across tabs in practice (browser-implementation differences), the follow-up is a leader-election lock using `navigator.locks.request()`, but that's a separate scope and only justified by empirical failure. diff --git a/src/lib/offline-queue/__tests__/base-queue.test.ts b/src/lib/offline-queue/__tests__/base-queue.test.ts index bff8f23b..20cb4f14 100644 --- a/src/lib/offline-queue/__tests__/base-queue.test.ts +++ b/src/lib/offline-queue/__tests__/base-queue.test.ts @@ -24,6 +24,7 @@ const TEST_DB_NAME = `TestQueue_${process.pid}`; class TestQueue extends BaseOfflineQueue { public processedItems: TestQueueItem[] = []; public shouldFail = false; + public shouldReturnConflict = false; public failCount = 0; constructor() { @@ -34,12 +35,16 @@ class TestQueue extends BaseOfflineQueue { }); } - protected async processItem(item: TestQueueItem): Promise { + protected async processItem(item: TestQueueItem) { if (this.shouldFail) { this.failCount++; throw new Error('Test failure'); } this.processedItems.push(item); + if (this.shouldReturnConflict) { + return { status: 'conflicted' as const }; + } + return; } // Expose protected method for testing @@ -275,6 +280,93 @@ describe('BaseOfflineQueue', () => { }); }); + describe('watchdog reclaim', () => { + it('reclaims items stuck in processing past processingTimeoutMs', async () => { + // Queue an item, then manually move it to `processing` with a stale + // lastAttempt — simulating a prior tab that crashed mid-processing. + const queued = await queue.queue({ data: 'recover-me' } as Omit< + TestQueueItem, + 'id' | 'status' | 'retries' | 'createdAt' + >); + const longAgo = Date.now() - 70_000; // 70s > default 60s timeout + await ( + queue as unknown as { + items: { + update: ( + id: number, + changes: Record + ) => Promise; + }; + } + ).items.update(queued.id!, { + status: 'processing', + lastAttempt: longAgo, + }); + + // Sync should reclaim the row (status: pending) and then process it. + const result = await queue.sync(); + + expect(result.success).toBe(1); + expect(queue.processedItems).toHaveLength(1); + expect(queue.processedItems[0].data).toBe('recover-me'); + + const final = await queue.get(queued.id!); + expect(final?.status).toBe('completed'); + }); + + it('leaves fresh processing items alone (within processingTimeoutMs)', async () => { + const queued = await queue.queue({ data: 'leave-me-alone' } as Omit< + TestQueueItem, + 'id' | 'status' | 'retries' | 'createdAt' + >); + const recent = Date.now() - 30_000; // 30s < default 60s timeout + await ( + queue as unknown as { + items: { + update: ( + id: number, + changes: Record + ) => Promise; + }; + } + ).items.update(queued.id!, { + status: 'processing', + lastAttempt: recent, + }); + + const result = await queue.sync(); + + // Item was NOT reclaimed — it stayed in `processing` and never + // entered the per-item loop, so processedItems is empty and the + // sync result reports nothing for this item. + expect(queue.processedItems).toHaveLength(0); + expect(result.success).toBe(0); + + const final = await queue.get(queued.id!); + expect(final?.status).toBe('processing'); + }); + }); + + describe('conflict accounting', () => { + it('counts conflicted as completed but in the conflicted bucket', async () => { + queue.shouldReturnConflict = true; + await queue.queue({ data: 'dedupe-me' } as Omit< + TestQueueItem, + 'id' | 'status' | 'retries' | 'createdAt' + >); + + const result = await queue.sync(); + + expect(result.success).toBe(0); + expect(result.conflicted).toBe(1); + expect(result.failed).toBe(0); + + // The queue row is still marked completed (the item is done). + const all = await queue.getQueue(); + expect(all[0].status).toBe('completed'); + }); + }); + describe('isSyncing()', () => { it('should return false when not syncing', () => { expect(queue.isSyncing()).toBe(false); diff --git a/src/lib/offline-queue/__tests__/payment-adapter.test.ts b/src/lib/offline-queue/__tests__/payment-adapter.test.ts new file mode 100644 index 00000000..c9327c30 --- /dev/null +++ b/src/lib/offline-queue/__tests__/payment-adapter.test.ts @@ -0,0 +1,113 @@ +/** + * Unit tests for PaymentQueueAdapter idempotent INSERT path. + * + * Validates the offline-queue retry safety contract from #52: + * - A queued idempotency_key flows into the upsert payload with the + * correct onConflict + ignoreDuplicates options. + * - A zero-row upsert response is treated as conflicted (the prior + * attempt's row is already there); the queue row still completes. + * + * @module lib/offline-queue/__tests__/payment-adapter.test + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; + +// Per-call upsert mock so each test can specify the response shape. +// Sits inside the from() chain expected by Supabase JS: +// supabase.from('payment_intents').upsert(payload, options).select('id').maybeSingle() +const mockMaybeSingle = vi.fn(); +const mockSelect = vi.fn(); +const mockUpsert = vi.fn(); +const mockFrom = vi.fn(); + +// Reconfigure the chain in beforeEach so each test starts from a known +// state. mockReturnValue calls happen there. + +vi.mock('@/lib/supabase/client', () => ({ + supabase: { + from: (...args: unknown[]) => mockFrom(...args), + auth: { + getUser: vi.fn(async () => ({ + data: { user: { id: 'test-user-1' } }, + error: null, + })), + }, + }, +})); + +// Import AFTER the mock so the adapter binds to our mocked supabase. +import { PaymentQueueAdapter } from '../payment-adapter'; + +describe('PaymentQueueAdapter (idempotent INSERT path, #52)', () => { + let adapter: PaymentQueueAdapter; + + beforeEach(async () => { + mockMaybeSingle.mockReset(); + mockSelect.mockReset().mockReturnValue({ maybeSingle: mockMaybeSingle }); + mockUpsert.mockReset().mockReturnValue({ select: mockSelect }); + mockFrom.mockReset().mockReturnValue({ upsert: mockUpsert }); + + adapter = new PaymentQueueAdapter(); + await adapter.clear(); + }); + + it('flows the queued idempotency_key into upsert with correct options', async () => { + mockMaybeSingle.mockResolvedValueOnce({ + data: { id: 'intent-1' }, + error: null, + }); + + await adapter.queuePaymentIntent( + { + amount: 1000, + currency: 'usd', + type: 'one_time', + customer_email: 'a@example.com', + idempotency_key: 'fixed-key-1', + }, + 'test-user-1' + ); + const result = await adapter.sync(); + + expect(result.success).toBe(1); + expect(result.conflicted).toBe(0); + expect(mockFrom).toHaveBeenCalledWith('payment_intents'); + + // The upsert must have received the queued idempotency_key and the + // ignoreDuplicates option. + const [payload, options] = mockUpsert.mock.calls[0]; + expect(payload.idempotency_key).toBe('fixed-key-1'); + expect(payload.amount).toBe(1000); + expect(payload.template_user_id).toBe('test-user-1'); + expect(options).toEqual({ + onConflict: 'idempotency_key', + ignoreDuplicates: true, + }); + }); + + it('treats a zero-row upsert response as conflicted (work was already done)', async () => { + // Server already had this row → upsert returns null data. + mockMaybeSingle.mockResolvedValueOnce({ data: null, error: null }); + + await adapter.queuePaymentIntent( + { + amount: 1000, + currency: 'usd', + type: 'one_time', + customer_email: 'a@example.com', + idempotency_key: 'already-inserted-key', + }, + 'test-user-1' + ); + const result = await adapter.sync(); + + expect(result.success).toBe(0); + expect(result.conflicted).toBe(1); + expect(result.failed).toBe(0); + + // The queue row is marked completed even though the dedupe path + // fired — the user's work is done from their perspective. + const all = await adapter.getQueue(); + expect(all[0].status).toBe('completed'); + }); +}); diff --git a/src/lib/offline-queue/base-queue.ts b/src/lib/offline-queue/base-queue.ts index ddb88973..520d8d7d 100644 --- a/src/lib/offline-queue/base-queue.ts +++ b/src/lib/offline-queue/base-queue.ts @@ -13,6 +13,7 @@ import Dexie, { Table } from 'dexie'; import { BaseQueueItem, + ProcessItemResult, QueueConfig, QueueStatus, SyncResult, @@ -174,16 +175,38 @@ export abstract class BaseOfflineQueue extends Dexie { // Prevent concurrent sync if (this.syncInProgress) { this.logger.debug('Sync already in progress, skipping'); - return { success: 0, failed: 0, skipped: 0 }; + return { success: 0, failed: 0, skipped: 0, conflicted: 0 }; } this.syncInProgress = true; try { + // Watchdog: reclaim items stuck in `processing` past the timeout. + // Defends against tab crashes between the claim and the completion + // update. The reclaim is itself an atomic Dexie modify, so racing + // tabs converge. processItem must be idempotent for safe reclaim — + // see payment-adapter's idempotency_key path for the pattern. + const reclaimNow = Date.now(); + const reclaimedCount = await this.items + .where('status') + .equals('processing') + .and( + (row) => + !!row.lastAttempt && + reclaimNow - row.lastAttempt > this.config.processingTimeoutMs + ) + .modify({ status: 'pending' } as any); + if (reclaimedCount > 0) { + this.logger.warn('Reclaimed stuck processing items', { + count: reclaimedCount, + processingTimeoutMs: this.config.processingTimeoutMs, + }); + } + const pending = await this.getPending(); if (pending.length === 0) { - return { success: 0, failed: 0, skipped: 0 }; + return { success: 0, failed: 0, skipped: 0, conflicted: 0 }; } this.logger.info('Starting sync', { itemCount: pending.length }); @@ -191,6 +214,7 @@ export abstract class BaseOfflineQueue extends Dexie { let success = 0; let failed = 0; let skipped = 0; + let conflicted = 0; for (const item of pending) { // Check if max retries exceeded @@ -237,17 +261,26 @@ export abstract class BaseOfflineQueue extends Dexie { } try { - // Process the item (implemented by subclass) - await this.processItem(item); - - // Mark as completed + // Process the item (implemented by subclass). The subclass may + // return `{ status: 'conflicted' }` when it detected a server- + // side dedupe (its work was already done by a prior attempt). + // void return is treated as completed for backwards compatibility. + const processResult = await this.processItem(item); await this.items.update(item.id!, { status: 'completed', } as any); - success++; - this.logger.debug('Item processed successfully', { id: item.id }); + if (processResult?.status === 'conflicted') { + conflicted++; + this.logger.info( + 'Item completed via dedupe (server already had this work)', + { id: item.id } + ); + } else { + success++; + this.logger.debug('Item processed successfully', { id: item.id }); + } } catch (error) { // Record failed attempt const errorMessage = @@ -274,8 +307,13 @@ export abstract class BaseOfflineQueue extends Dexie { } } - this.logger.info('Sync complete', { success, failed, skipped }); - return { success, failed, skipped }; + this.logger.info('Sync complete', { + success, + failed, + skipped, + conflicted, + }); + return { success, failed, skipped, conflicted }; } finally { this.syncInProgress = false; } @@ -340,11 +378,15 @@ export abstract class BaseOfflineQueue extends Dexie { } /** - * Process a single queue item - * Must be implemented by subclasses with domain-specific logic + * Process a single queue item. + * Must be implemented by subclasses with domain-specific logic. + * + * Subclasses may return `{ status: 'conflicted' }` to signal the work + * was already completed by a prior attempt (server-side dedupe). void + * return is treated as `{ status: 'completed' }`. * * @param item - Item to process * @throws Error if processing fails (will trigger retry) */ - protected abstract processItem(item: T): Promise; + protected abstract processItem(item: T): Promise; } diff --git a/src/lib/offline-queue/payment-adapter.ts b/src/lib/offline-queue/payment-adapter.ts index 2a21d50a..4b07837d 100644 --- a/src/lib/offline-queue/payment-adapter.ts +++ b/src/lib/offline-queue/payment-adapter.ts @@ -11,7 +11,11 @@ */ import { BaseOfflineQueue } from './base-queue'; -import { PaymentQueueItem, DEFAULT_QUEUE_CONFIG } from './types'; +import { + PaymentQueueItem, + ProcessItemResult, + DEFAULT_QUEUE_CONFIG, +} from './types'; import { supabase } from '@/lib/supabase/client'; import type { Json } from '@/lib/supabase/types'; @@ -48,6 +52,13 @@ export class PaymentQueueAdapter extends BaseOfflineQueue { /** * Queue a payment intent creation + * + * If `data.idempotency_key` is not supplied, a fresh UUID is generated + * here at queue-time. The same key is reused across every retry of this + * row, which combined with the partial unique index on + * payment_intents.idempotency_key turns the server-side INSERT into an + * idempotent ON CONFLICT DO NOTHING — a retry whose prior attempt + * already succeeded becomes a no-op rather than a duplicate charge (#52). */ async queuePaymentIntent( data: { @@ -58,12 +69,14 @@ export class PaymentQueueAdapter extends BaseOfflineQueue { customer_email: string; description?: string; metadata?: Record; + idempotency_key?: string; }, userId?: string ): Promise { + const idempotency_key = data.idempotency_key ?? crypto.randomUUID(); return await this.queue({ type: 'payment_intent', - data, + data: { ...data, idempotency_key }, userId, } as Omit); } @@ -82,16 +95,23 @@ export class PaymentQueueAdapter extends BaseOfflineQueue { } /** - * Process a single payment queue item + * Process a single payment queue item. + * + * Returns ProcessItemResult for payment_intent so the base queue can + * distinguish a fresh INSERT from a server-side dedupe (the work was + * already done by a prior attempt). subscription_update returns void — + * UPDATE is implicitly idempotent by primary key, no dedupe distinction + * needed there. */ - protected async processItem(item: PaymentQueueItem): Promise { + protected async processItem( + item: PaymentQueueItem + ): Promise { switch (item.type) { case 'payment_intent': - await this.executePaymentIntent(item.data, item.userId); - break; + return await this.executePaymentIntent(item.data, item.userId); case 'subscription_update': await this.executeSubscriptionUpdate(item.data); - break; + return; default: throw new Error(`Unknown payment operation type: ${item.type}`); } @@ -103,11 +123,17 @@ export class PaymentQueueAdapter extends BaseOfflineQueue { * Prefers the userId captured at queue time (REQ-SEC-001 — the user * was authenticated when they made the payment). Falls back to * auth.getUser() if no userId was stored, matching fork behaviour. + * + * Uses upsert with ignoreDuplicates so the server-side INSERT is + * idempotent across retries: a queued item whose prior attempt + * already wrote the row produces a zero-row response, which we + * surface as `{ status: 'conflicted' }` so the queue marks the row + * completed without double-charging. See #52. */ private async executePaymentIntent( data: Record, storedUserId?: string - ): Promise { + ): Promise { let userId = storedUserId; if (!userId) { @@ -122,24 +148,51 @@ export class PaymentQueueAdapter extends BaseOfflineQueue { userId = user.id; } - const { error } = await supabase + // Idempotency key: prefer the one queued at intake. If absent (older + // queue rows from before this column shipped), generate a fresh one + // and warn — that retry chain will dedupe with itself but not with + // any prior attempt that lacked a key. + let idempotencyKey = data.idempotency_key as string | undefined; + if (!idempotencyKey) { + idempotencyKey = crypto.randomUUID(); + this.logger.warn( + 'Queued payment_intent missing idempotency_key — generating one. ' + + 'Retries of this row will dedupe; prior attempts without a key will not.', + { generatedKey: idempotencyKey } + ); + } + + const { data: inserted, error } = await supabase .from('payment_intents') - .insert({ - amount: data.amount as number, - currency: data.currency as string, - type: data.type as string, - interval: (data.interval as string) || null, - customer_email: data.customer_email as string, - description: (data.description as string) || null, - metadata: (data.metadata || {}) as Json, - template_user_id: userId, - }) - .select() - .single(); + .upsert( + { + amount: data.amount as number, + currency: data.currency as string, + type: data.type as string, + interval: (data.interval as string) || null, + customer_email: data.customer_email as string, + description: (data.description as string) || null, + metadata: (data.metadata || {}) as Json, + template_user_id: userId, + idempotency_key: idempotencyKey, + }, + { onConflict: 'idempotency_key', ignoreDuplicates: true } + ) + .select('id') + .maybeSingle(); if (error) { throw new Error(`Failed to create payment intent: ${error.message}`); } + + if (!inserted) { + // ON CONFLICT DO NOTHING fired — a prior attempt already created + // this row server-side. Mark the queue row completed via the + // dedupe path; the user is not double-charged. + return { status: 'conflicted' }; + } + + return { status: 'completed' }; } /** diff --git a/src/lib/offline-queue/types.ts b/src/lib/offline-queue/types.ts index b227d858..3629eab0 100644 --- a/src/lib/offline-queue/types.ts +++ b/src/lib/offline-queue/types.ts @@ -44,6 +44,13 @@ export interface QueueConfig { initialDelayMs: number; /** Backoff multiplier (e.g., 2 for doubling) */ backoffMultiplier: number; + /** + * Watchdog: reclaim items stuck in `processing` longer than this (ms). + * Defends against tab crashes between claim and completion. Default 60_000. + * The processItem implementation must be idempotent for safe reclaim — see + * payment-adapter's idempotency_key path for the pattern. + */ + processingTimeoutMs: number; } /** @@ -53,20 +60,30 @@ export const DEFAULT_QUEUE_CONFIG: Omit = { maxRetries: 5, initialDelayMs: 1000, backoffMultiplier: 2, + processingTimeoutMs: 60_000, }; /** * Result of queue sync operation */ export interface SyncResult { - /** Number of items successfully processed */ + /** Number of items successfully processed (fresh work) */ success: number; /** Number of items that failed */ failed: number; /** Number of items skipped (e.g., still in backoff) */ skipped: number; + /** Number of items completed via dedupe (server already had this work) */ + conflicted: number; } +/** + * Optional return value from processItem. Subclasses that don't need to + * distinguish fresh-success from dedupe can keep returning void; void is + * treated as `{ status: 'completed' }`. + */ +export type ProcessItemResult = { status: 'completed' | 'conflicted' }; + /** * Form queue item (for form submissions) */ diff --git a/supabase/migrations/20251006_complete_monolithic_setup.sql b/supabase/migrations/20251006_complete_monolithic_setup.sql index 8afad331..c1645cbb 100644 --- a/supabase/migrations/20251006_complete_monolithic_setup.sql +++ b/supabase/migrations/20251006_complete_monolithic_setup.sql @@ -51,6 +51,17 @@ CREATE INDEX IF NOT EXISTS idx_payment_intents_created_at ON payment_intents(cre CREATE INDEX IF NOT EXISTS idx_payment_intents_user_id ON payment_intents(template_user_id); CREATE INDEX IF NOT EXISTS idx_payment_intents_expires_at ON payment_intents(expires_at); +-- Idempotency key for offline-queue retries (#52). Partial unique index: +-- only enforced when set, so direct-server INSERTs (admin tooling, edge +-- functions) without a key remain valid. Only client-queued INSERTs +-- participate in dedupe. +ALTER TABLE payment_intents + ADD COLUMN IF NOT EXISTS idempotency_key TEXT; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_payment_intents_idempotency_key + ON payment_intents(idempotency_key) + WHERE idempotency_key IS NOT NULL; + COMMENT ON TABLE payment_intents IS 'Customer payment intentions before provider redirect (24hr expiry)'; -- Payment results