From 7b64f41676ac14d2f38d82dd43a2259a54871673 Mon Sep 17 00:00:00 2001 From: Zbigniew Sobiecki Date: Fri, 1 May 2026 13:24:27 +0000 Subject: [PATCH] fix(triggers): redis-back review-dispatch dedup to close cross-process gap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ucho/PR #194 (2026-05-01, CI fully green, single SHA) had two review runs dispatched 29 s apart — both `triggerType=ci-success`, same engine/model/ workItemId, both completed normally, both burned LLM tokens and posted reviews. Confirmed verbatim in Loki: identical `reviewDispatchKey` (`zbigniewsobiecki/ucho:194:9ed484df...`) appeared in claim logs from TWO processes — one from `agent-execution.ts:279` post-completion-hook running in the IMPL worker container, one from `check-suite-success.ts:240` running in the cascade-router. Both `claimReviewDispatch(key, ...)` returned `true` because the dedup `recentlyDispatched` Map at `review-dispatch-dedup.ts:5` was module-scoped, in-memory, per-process state. Two processes = two independent Maps = no dedup. Pure waste. PR #1246 explicitly called this out as out-of-scope ("With the architectural shift above, the in-memory dedup is sufficient. Skip."). That was wrong: the architectural shift in #1246 closes the worker-bail-out path but does nothing about the post-completion-hook path running in a different process. The bug class is broader than ucho/PR #194 — anywhere two processes might independently call `claimReviewDispatch` for the same (project, PR, SHA) is exposed: post-completion-hook ↔ check-suite-success (live now), review- requested ↔ post-completion-hook (waiting to bite), and any future horizontally-scaled router replicas. Architectural fix — move the dedup state to Redis so it's shared across all processes. Atomic primitive: `SET key value NX EX ` returns `'OK'` exactly once per key within the TTL, regardless of how many processes race it. No application-level locking, no race window. `src/triggers/github/review-dispatch-dedup.ts` — rewritten: - Drops the in-memory `Map` and `cleanupExpiredEntries`. - Lazy IORedis singleton (mirrors `src/queue/cancel.ts:28-37`). - `claimReviewDispatch` / `releaseReviewDispatch` are now async. - Keys namespaced under `cascade:review-dedup:`. - Fail-closed on Redis errors: `claim` returns `false` and Sentry-captures under tag `review_dedup_redis_down`. Better to skip a legit dispatch than duplicate. Mirrors spec-017 fail-closed pipeline-capacity-gate posture. - 5-min TTL preserved from #1246. The pre-Redis "30-min TTL clears stale entries on router restart" CLAUDE.md note is now obsolete. - Test-only `__resetForTests()` flushes the namespace. Callers — minimal mechanical await additions: - `check-suite-success.ts:240` — `await claimReviewDispatch(...)`. - `check-suite-success.ts:274` — `onBlocked: () => { void releaseReviewDispatch(...) }` (callback signature is sync; fire-and-forget the release). - `review-requested.ts:101-102` — both calls become `await`. - `review-requested.ts:131` — same fire-and-forget shape. - `agent-execution.ts:279` (post-completion-hook) — `await`. Tests: - `tests/unit/triggers/github/review-dispatch-dedup.test.ts` rewritten around a vi.mock('ioredis', ...) factory whose closure captures a single in-memory store. Every `new Redis(...)` instance shares that backend, so the cross-process invariant is trivially testable: instantiate two IORedis clients and verify the second `SET NX EX` for the same key returns `null`. **This is the regression pin for ucho/PR #194.** - New "fails closed when Redis errors" test covers the Sentry-capture path under tag `review_dedup_redis_down`. - `check-suite-success.test.ts` and `review-requested.test.ts`: replaced the per-test `recentlyDispatched.clear()` with `vi.mock` of the dedup module. Tests that assert the dedup-skip path now use `mockClaimReviewDispatch.mockResolvedValueOnce(true).mockResolvedValueOnce(false)` to simulate the SET NX EX race. Verification: - npx vitest run --project unit-triggers --project unit-api --project unit-core → 7678/7678 pass. - npm run typecheck + npm run lint clean. - Direct-instance test demonstrates two IORedis clients against the shared backend correctly reject the second claim — the production scenario pinned in code. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/triggers/github/check-suite-success.ts | 13 +- src/triggers/github/review-dispatch-dedup.ts | 155 ++++++-- src/triggers/github/review-requested.ts | 8 +- src/triggers/shared/agent-execution.ts | 2 +- .../unit/triggers/check-suite-success.test.ts | 45 ++- .../github/review-dispatch-dedup.test.ts | 349 ++++++++++++------ tests/unit/triggers/review-requested.test.ts | 26 +- 7 files changed, 422 insertions(+), 176 deletions(-) diff --git a/src/triggers/github/check-suite-success.ts b/src/triggers/github/check-suite-success.ts index 91fad8b0..1d8d7b62 100644 --- a/src/triggers/github/check-suite-success.ts +++ b/src/triggers/github/check-suite-success.ts @@ -19,8 +19,6 @@ import { resolveWorkItemId, } from './utils.js'; -export { recentlyDispatched } from './review-dispatch-dedup.js'; - /** * Dispatches an outcome agent when a check_suite completes with success * conclusion on a PR authored by the implementer persona. @@ -235,9 +233,11 @@ export class CheckSuiteSuccessTrigger implements TriggerHandler { } // PR+SHA-scoped dedup prevents duplicate reviews across both duplicate - // check_suite deliveries and other review-producing triggers. + // check_suite deliveries and other review-producing triggers — including + // the post-completion-hook in the IMPL worker process. Backed by Redis + // so the dedup holds across processes (router + workers + future replicas). const dedupKey = buildReviewDispatchKey(owner, repo, prNumber, headSha); - if (!claimReviewDispatch(dedupKey, this.name, { prNumber, headSha })) { + if (!(await claimReviewDispatch(dedupKey, this.name, { prNumber, headSha }))) { return skip( this.name, `Review dispatch for PR #${prNumber}@${headSha} already claimed by another path (dedup)`, @@ -271,7 +271,10 @@ export class CheckSuiteSuccessTrigger implements TriggerHandler { workItemId, workItemUrl, workItemTitle, - onBlocked: () => releaseReviewDispatch(dedupKey), + onBlocked: () => { + // Fire-and-forget — release is best-effort and the TTL is the safety net. + void releaseReviewDispatch(dedupKey); + }, }; } } diff --git a/src/triggers/github/review-dispatch-dedup.ts b/src/triggers/github/review-dispatch-dedup.ts index 9abee506..9323f48c 100644 --- a/src/triggers/github/review-dispatch-dedup.ts +++ b/src/triggers/github/review-dispatch-dedup.ts @@ -1,13 +1,64 @@ +/** + * Review-dispatch deduplication, Redis-backed. + * + * `claimReviewDispatch(key, ...)` returns `true` exactly once per key within + * the TTL window — across ALL processes that share the same Redis backend. + * Subsequent calls (from the same or any other process) return `false` and + * the caller must skip the dispatch. + * + * Why Redis (and not the in-memory Map this module used to be): + * the dedup key (`owner/repo:prNumber:headSha`) is claimed from at least + * THREE distinct processes: + * 1. Router process — `check-suite-success` and `review-requested` triggers. + * 2. IMPL worker process — `agent-execution.ts` post-completion-hook + * (fires the review immediately after impl completes, regardless of + * whether GitHub eventually delivers the check_suite-success event). + * 3. Future router replicas / horizontally-scaled deployments. + * + * The pre-Redis Map was module-scoped and per-process: each process started + * with an empty Map, so the dedup never crossed process boundaries. + * Production confirmed live duplicate dispatch on ucho/PR #194 (2026-05-01) — + * post-completion-hook (worker) and check-suite-success (router) BOTH + * dispatched a review for the same SHA, both claimed `true` from their own + * empty Map, both burned LLM tokens. See PR #1248 for the diagnosis. + * + * Redis primitive: `SET key value NX EX ` — atomic check-and-set with + * TTL. Returns `'OK'` on first claim, `null` on duplicate. No race window. + * + * Failure mode: when Redis is unreachable, `claim` returns `false` (treats + * the call as a duplicate) and Sentry-captures under `review_dedup_redis_down`. + * Better to skip a legit dispatch than to dispatch a duplicate; mirrors + * spec-017's fail-closed pipeline-capacity-gate posture. + */ + +import { Redis } from 'ioredis'; +import { captureException } from '../../sentry.js'; import { logger } from '../../utils/logging.js'; -// 5-minute TTL — short enough to never wedge the dispatch path for a clean -// PR. Originally 30 min; shortened in PR #1245 follow-up after the -// success-handler refactor eliminated the worker-bail-out path. With the new -// defer-on-incomplete behavior, dispatches correlate with actually-running -// workers, so the longer TTL had no defensive value and amplified incidents. -const DEDUP_TTL_MS = 5 * 60 * 1000; +// 5 minutes — kept short because dispatches now correlate with actually- +// running workers (post the PR #1246 defer-on-incomplete refactor); a longer +// TTL has no defensive value and amplifies any wedged-state incident. +const DEDUP_TTL_SEC = 5 * 60; + +const KEY_NS = 'cascade:review-dedup:'; + +let redisInstance: Redis | null = null; -export const recentlyDispatched = new Map(); +/** + * Lazy singleton — first call connects, subsequent calls reuse the same + * client. The worker process pays the connection cost only if it actually + * dispatches a review (post-completion-hook). + */ +function getRedis(): Redis { + if (!redisInstance) { + const redisUrl = process.env.REDIS_URL; + if (!redisUrl) { + throw new Error('REDIS_URL is required for review-dispatch dedup'); + } + redisInstance = new Redis(redisUrl); + } + return redisInstance; +} export function buildReviewDispatchKey( owner: string, @@ -18,23 +69,31 @@ export function buildReviewDispatchKey( return `${owner}/${repo}:${prNumber}:${headSha}`; } -function cleanupExpiredEntries(now: number): void { - for (const [key, ts] of recentlyDispatched) { - if (now - ts > DEDUP_TTL_MS) { - recentlyDispatched.delete(key); - } - } -} - -export function claimReviewDispatch( +/** + * Atomically claim a dispatch slot for the given key. Returns `true` exactly + * once per key within the TTL window across ALL connected processes. + * + * Fails closed on Redis errors: returns `false` so the caller skips the + * dispatch. Sentry-captures the underlying error under + * `review_dedup_redis_down`. + */ +export async function claimReviewDispatch( key: string, triggerName: string, context: { prNumber: number; headSha: string }, -): boolean { - const now = Date.now(); - cleanupExpiredEntries(now); - - if (recentlyDispatched.has(key)) { +): Promise { + const namespacedKey = `${KEY_NS}${key}`; + try { + const result = await getRedis().set(namespacedKey, triggerName, 'EX', DEDUP_TTL_SEC, 'NX'); + if (result === 'OK') { + logger.info('Claimed review dispatch for PR+SHA', { + trigger: triggerName, + reviewDispatchKey: key, + prNumber: context.prNumber, + headSha: context.headSha, + }); + return true; + } logger.info('Review already dispatched for this PR+SHA, skipping', { trigger: triggerName, reviewDispatchKey: key, @@ -42,18 +101,52 @@ export function claimReviewDispatch( headSha: context.headSha, }); return false; + } catch (err) { + logger.error('Review-dispatch dedup Redis call failed — failing closed', { + trigger: triggerName, + reviewDispatchKey: key, + error: String(err), + }); + captureException(err, { + tags: { source: 'review_dedup_redis_down' }, + extra: { reviewDispatchKey: key, trigger: triggerName }, + level: 'error', + }); + return false; } +} - recentlyDispatched.set(key, now); - logger.info('Claimed review dispatch for PR+SHA', { - trigger: triggerName, - reviewDispatchKey: key, - prNumber: context.prNumber, - headSha: context.headSha, - }); - return true; +/** + * Release a previously-claimed dispatch slot. Used by `onBlocked` callbacks + * when downstream rejects the dispatch (work-item lock, agent-type + * concurrency, etc.) so the next legitimate trigger can claim. + * + * Errors are logged but never thrown — release is best-effort, and the TTL + * is the safety net. + */ +export async function releaseReviewDispatch(key: string): Promise { + const namespacedKey = `${KEY_NS}${key}`; + try { + await getRedis().del(namespacedKey); + } catch (err) { + logger.warn('Review-dispatch dedup release failed (TTL will reap)', { + reviewDispatchKey: key, + error: String(err), + }); + } } -export function releaseReviewDispatch(key: string): void { - recentlyDispatched.delete(key); +/** + * Test-only: flush the dedup namespace and reset the singleton. Intended for + * `beforeEach` in unit tests and for the integration suite's per-test + * cleanup. Never call from production code. + * + * @internal + */ +export async function __resetForTests(): Promise { + if (!redisInstance) return; + const keys = await redisInstance.keys(`${KEY_NS}*`); + if (keys.length > 0) await redisInstance.del(...keys); + await redisInstance.quit().catch(() => {}); + redisInstance = null; } diff --git a/src/triggers/github/review-requested.ts b/src/triggers/github/review-requested.ts index 01d27a9a..11386039 100644 --- a/src/triggers/github/review-requested.ts +++ b/src/triggers/github/review-requested.ts @@ -98,8 +98,8 @@ export class ReviewRequestedTrigger implements TriggerHandler { const workItemId = await resolveWorkItemId(ctx.project.id, prNumber); const reviewDispatchKey = buildReviewDispatchKey(owner, repo, prNumber, headSha); // Human-initiated review requests override any prior automated dispatch claim. - releaseReviewDispatch(reviewDispatchKey); - if (!claimReviewDispatch(reviewDispatchKey, this.name, { prNumber, headSha })) { + await releaseReviewDispatch(reviewDispatchKey); + if (!(await claimReviewDispatch(reviewDispatchKey, this.name, { prNumber, headSha }))) { return skip( this.name, `Review dispatch for PR #${prNumber}@${headSha} already claimed by another path (dedup)`, @@ -128,7 +128,9 @@ export class ReviewRequestedTrigger implements TriggerHandler { prUrl: payload.pull_request.html_url, prTitle: payload.pull_request.title, workItemId, - onBlocked: () => releaseReviewDispatch(reviewDispatchKey), + onBlocked: () => { + void releaseReviewDispatch(reviewDispatchKey); + }, }; } } diff --git a/src/triggers/shared/agent-execution.ts b/src/triggers/shared/agent-execution.ts index fc935f50..be464fac 100644 --- a/src/triggers/shared/agent-execution.ts +++ b/src/triggers/shared/agent-execution.ts @@ -276,7 +276,7 @@ async function tryDispatchPostCompletionReview( } const dedupKey = buildReviewDispatchKey(owner, repo, prNumber, headSha); - if (!claimReviewDispatch(dedupKey, 'post-completion-hook', { prNumber, headSha })) { + if (!(await claimReviewDispatch(dedupKey, 'post-completion-hook', { prNumber, headSha }))) { logger.info('Skipping post-completion review: already dispatched', { prNumber, workItemId, diff --git a/tests/unit/triggers/check-suite-success.test.ts b/tests/unit/triggers/check-suite-success.test.ts index e0109a09..2192c6c1 100644 --- a/tests/unit/triggers/check-suite-success.test.ts +++ b/tests/unit/triggers/check-suite-success.test.ts @@ -14,12 +14,21 @@ vi.mock('../../../src/triggers/shared/trigger-check.js', () => mockTriggerCheckM vi.mock('../../../src/github/client.js', () => mockGitHubClientModule); +// Stub the Redis-backed dedup module so tests don't need a Redis connection. +// Each `claim` resolves to true (success) by default; per-test overrides via +// `mockClaimReviewDispatch.mockResolvedValueOnce(false)` simulate a duplicate. +const mockClaimReviewDispatch = vi.fn().mockResolvedValue(true); +const mockReleaseReviewDispatch = vi.fn().mockResolvedValue(undefined); +vi.mock('../../../src/triggers/github/review-dispatch-dedup.js', () => ({ + buildReviewDispatchKey: (owner: string, repo: string, prNumber: number, headSha: string) => + `${owner}/${repo}:${prNumber}:${headSha}`, + claimReviewDispatch: (...args: unknown[]) => mockClaimReviewDispatch(...args), + releaseReviewDispatch: (...args: unknown[]) => mockReleaseReviewDispatch(...args), +})); + import { githubClient } from '../../../src/github/client.js'; import { resetFixAttempts } from '../../../src/triggers/github/check-suite-failure.js'; -import { - CheckSuiteSuccessTrigger, - recentlyDispatched, -} from '../../../src/triggers/github/check-suite-success.js'; +import { CheckSuiteSuccessTrigger } from '../../../src/triggers/github/check-suite-success.js'; import { ReviewRequestedTrigger } from '../../../src/triggers/github/review-requested.js'; import type { TriggerContext } from '../../../src/triggers/types.js'; import { createCheckSuitePayload, createMockProject } from '../../helpers/factories.js'; @@ -43,7 +52,8 @@ describe('CheckSuiteSuccessTrigger', () => { beforeEach(() => { vi.mocked(lookupWorkItemForPR).mockResolvedValue('abc123'); - recentlyDispatched.clear(); + mockClaimReviewDispatch.mockReset().mockResolvedValue(true); + mockReleaseReviewDispatch.mockReset().mockResolvedValue(undefined); resetFixAttempts(42); // Default: aggregate status reflects all checks passing. Tests that need // a mixed-state SHA override this per-case. @@ -239,6 +249,9 @@ describe('CheckSuiteSuccessTrigger', () => { user: { login: 'cascade-impl' }, }); vi.mocked(githubClient.getPRReviews).mockResolvedValue([]); + // Simulate Redis state: review-requested claims first (true), then + // check-suite-success loses the SET NX EX race (false). + mockClaimReviewDispatch.mockReset().mockResolvedValueOnce(true).mockResolvedValueOnce(false); const reviewRequestedContext: TriggerContext = { project: mockProject, @@ -618,6 +631,10 @@ describe('CheckSuiteSuccessTrigger', () => { }); vi.mocked(githubClient.getPRReviews).mockResolvedValue([]); + // Simulate the Redis-backed dedup: first claim succeeds, second loses + // the SET NX EX race and returns false. + mockClaimReviewDispatch.mockReset().mockResolvedValueOnce(true).mockResolvedValueOnce(false); + const ctx: TriggerContext = { project: mockProject, source: 'github', @@ -660,16 +677,14 @@ describe('CheckSuiteSuccessTrigger', () => { const result = await trigger.handle(ctx); expect(result).not.toBeNull(); expect(result?.onBlocked).toBeTypeOf('function'); - expect(recentlyDispatched.size).toBe(1); + // First handle() called claim once. + expect(mockClaimReviewDispatch).toHaveBeenCalledTimes(1); - // Simulate router calling onBlocked (work-item lock or concurrency block) + // Simulate router calling onBlocked (work-item lock or concurrency block) — + // it should release the dedup so a subsequent legitimate trigger can claim. result?.onBlocked?.(); - expect(recentlyDispatched.size).toBe(0); - - // After onBlocked, a subsequent call should succeed (not be deduped) - const result2 = await trigger.handle(ctx); - expect(result2).not.toBeNull(); - expect(result2?.agentType).toBe('review'); + expect(mockReleaseReviewDispatch).toHaveBeenCalledTimes(1); + expect(mockReleaseReviewDispatch).toHaveBeenCalledWith('owner/repo:42:sha123'); }); it('allows review for same PR with a new SHA after dedup', async () => { @@ -1196,8 +1211,8 @@ describe('CheckSuiteSuccessTrigger', () => { const implResult = await trigger.handle(implCtx); expect(implResult).not.toBeNull(); - // External PR — clear dedup since we're testing author mode, not dedup - recentlyDispatched.clear(); + // External PR — reset dedup mock since we're testing author mode, not dedup + mockClaimReviewDispatch.mockReset().mockResolvedValue(true); vi.mocked(lookupWorkItemForPR).mockResolvedValue(null); setupMocks('external-contributor'); const extCtx: TriggerContext = { diff --git a/tests/unit/triggers/github/review-dispatch-dedup.test.ts b/tests/unit/triggers/github/review-dispatch-dedup.test.ts index b60cbb0f..3079ace2 100644 --- a/tests/unit/triggers/github/review-dispatch-dedup.test.ts +++ b/tests/unit/triggers/github/review-dispatch-dedup.test.ts @@ -1,90 +1,168 @@ +/** + * Review-dispatch dedup tests — Redis-backed. + * + * The `vi.mock('ioredis', ...)` factory closes over a single in-memory store, + * so every `new Redis(...)` instance shares the same backend. That makes the + * cross-process invariant trivially testable: instantiate two Redis clients + * and verify the second `claim` for the same key returns `false`. + * + * The cross-process invariant is the regression pin for the production + * incident on ucho/PR #194 (2026-05-01) — both router-process and + * IMPL-worker-process dispatched a review for the same SHA because the + * pre-Redis Map was per-process. See PR #1248. + */ + import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +// --------------------------------------------------------------------------- +// In-memory shared backend for the IORedis mock. Closure-captured by the +// vi.mock factory so every `new Redis(...)` instance reads/writes here. +// --------------------------------------------------------------------------- + +interface StoredEntry { + value: string; + expiresAtMs: number | null; +} +const sharedStore = new Map(); + +function isExpired(entry: StoredEntry, nowMs: number): boolean { + return entry.expiresAtMs !== null && entry.expiresAtMs <= nowMs; +} + +vi.mock('ioredis', () => { + class MockRedis { + // IORedis `set` overload we care about: SET key value EX seconds NX. + // Returns `'OK'` on success, `null` when NX rejected. + // `quit()` and `del()` are also implemented; the rest is unused. + async set(key: string, value: string, ...args: unknown[]): Promise<'OK' | null> { + const flags = args.map((a) => (typeof a === 'string' ? a.toUpperCase() : a)); + const exIdx = flags.indexOf('EX'); + const ttlSec = + exIdx !== -1 && typeof flags[exIdx + 1] !== 'undefined' + ? Number(flags[exIdx + 1] as string | number) + : null; + const isNX = flags.includes('NX'); + const now = Date.now(); + const existing = sharedStore.get(key); + if (existing && !isExpired(existing, now)) { + if (isNX) return null; + } + sharedStore.set(key, { + value, + expiresAtMs: ttlSec !== null ? now + ttlSec * 1000 : null, + }); + return 'OK'; + } + + async del(...keys: string[]): Promise { + let removed = 0; + for (const k of keys) { + if (sharedStore.delete(k)) removed += 1; + } + return removed; + } + + async keys(pattern: string): Promise { + // Tiny glob: only `prefix*` is used by `__resetForTests`. + if (pattern.endsWith('*')) { + const prefix = pattern.slice(0, -1); + return [...sharedStore.keys()].filter((k) => k.startsWith(prefix)); + } + return [...sharedStore.keys()].filter((k) => k === pattern); + } + + async quit(): Promise<'OK'> { + return 'OK'; + } + + // IORedis-style EventEmitter no-ops for `client.on('error', ...)` etc. + on(): this { + return this; + } + } + return { Redis: MockRedis }; +}); + vi.mock('../../../../src/utils/logging.js', () => ({ logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), + debug: vi.fn(), }, })); +const mockCaptureException = vi.fn(); +vi.mock('../../../../src/sentry.js', () => ({ + captureException: (...args: unknown[]) => mockCaptureException(...args), +})); + import { + __resetForTests, buildReviewDispatchKey, claimReviewDispatch, - recentlyDispatched, releaseReviewDispatch, } from '../../../../src/triggers/github/review-dispatch-dedup.js'; import { logger } from '../../../../src/utils/logging.js'; const mockLogger = vi.mocked(logger); +const DEDUP_TTL_MS = 5 * 60 * 1000; + +beforeEach(() => { + vi.stubEnv('REDIS_URL', 'redis://localhost:6379'); + sharedStore.clear(); + mockCaptureException.mockReset(); + mockLogger.info.mockReset(); + mockLogger.warn.mockReset(); + mockLogger.error.mockReset(); + mockLogger.debug.mockReset(); +}); -const DEDUP_TTL_MS = 5 * 60 * 1000; // 5 minutes (was 30 min before PR #1245 follow-up) +afterEach(async () => { + await __resetForTests(); + vi.unstubAllEnvs(); +}); describe('buildReviewDispatchKey', () => { - it('returns correct format owner/repo:prNumber:headSha', () => { - const key = buildReviewDispatchKey('myorg', 'myrepo', 42, 'abc123def456'); - expect(key).toBe('myorg/myrepo:42:abc123def456'); - }); - - it('includes all four components in the returned key', () => { - const key = buildReviewDispatchKey('acme', 'widget', 99, 'deadbeef'); - expect(key).toContain('acme/widget'); - expect(key).toContain(':99:'); - expect(key).toContain('deadbeef'); + it('returns owner/repo:prNumber:headSha', () => { + expect(buildReviewDispatchKey('myorg', 'myrepo', 42, 'abc123def456')).toBe( + 'myorg/myrepo:42:abc123def456', + ); }); - it('separates owner and repo with a slash and appends prNumber and headSha with colons', () => { - const key = buildReviewDispatchKey('owner', 'repo', 1, 'sha'); - expect(key).toBe('owner/repo:1:sha'); + it('separates components correctly', () => { + expect(buildReviewDispatchKey('owner', 'repo', 1, 'sha')).toBe('owner/repo:1:sha'); }); }); describe('claimReviewDispatch', () => { - beforeEach(() => { - vi.useFakeTimers(); - recentlyDispatched.clear(); - }); - - afterEach(() => { - recentlyDispatched.clear(); - vi.useRealTimers(); - }); - - it('returns true on the first claim for a key', () => { + it('returns true on the first claim for a key', async () => { const key = buildReviewDispatchKey('acme', 'repo', 1, 'sha1'); - const result = claimReviewDispatch(key, 'check-suite-success', { - prNumber: 1, - headSha: 'sha1', - }); - expect(result).toBe(true); + expect( + await claimReviewDispatch(key, 'check-suite-success', { prNumber: 1, headSha: 'sha1' }), + ).toBe(true); }); - it('returns false on a duplicate claim for the same key', () => { + it('returns false on a duplicate claim for the same key', async () => { const key = buildReviewDispatchKey('acme', 'repo', 1, 'sha1'); - claimReviewDispatch(key, 'check-suite-success', { prNumber: 1, headSha: 'sha1' }); - const result = claimReviewDispatch(key, 'check-suite-success', { - prNumber: 1, - headSha: 'sha1', - }); - expect(result).toBe(false); + await claimReviewDispatch(key, 'check-suite-success', { prNumber: 1, headSha: 'sha1' }); + expect( + await claimReviewDispatch(key, 'check-suite-success', { prNumber: 1, headSha: 'sha1' }), + ).toBe(false); }); - it('returns true for a different key (no cross-key interference)', () => { + it('returns true for a different key (no cross-key interference)', async () => { const key1 = buildReviewDispatchKey('acme', 'repo', 1, 'sha1'); const key2 = buildReviewDispatchKey('acme', 'repo', 2, 'sha2'); - - claimReviewDispatch(key1, 'check-suite-success', { prNumber: 1, headSha: 'sha1' }); - const result = claimReviewDispatch(key2, 'check-suite-success', { - prNumber: 2, - headSha: 'sha2', - }); - expect(result).toBe(true); + await claimReviewDispatch(key1, 'check-suite-success', { prNumber: 1, headSha: 'sha1' }); + expect( + await claimReviewDispatch(key2, 'check-suite-success', { prNumber: 2, headSha: 'sha2' }), + ).toBe(true); }); - it('logs info with dispatch key when claim is successful', () => { + it('logs an info line on successful claim', async () => { const key = buildReviewDispatchKey('acme', 'repo', 5, 'sha5'); - claimReviewDispatch(key, 'review-requested', { prNumber: 5, headSha: 'sha5' }); - + await claimReviewDispatch(key, 'review-requested', { prNumber: 5, headSha: 'sha5' }); expect(mockLogger.info).toHaveBeenCalledWith( 'Claimed review dispatch for PR+SHA', expect.objectContaining({ @@ -96,15 +174,14 @@ describe('claimReviewDispatch', () => { ); }); - it('logs info with dispatch key when claim is a duplicate', () => { + it('logs an info line on duplicate claim', async () => { const key = buildReviewDispatchKey('acme', 'repo', 7, 'sha7'); - claimReviewDispatch(key, 'check-suite-success', { prNumber: 7, headSha: 'sha7' }); - claimReviewDispatch(key, 'check-suite-success', { prNumber: 7, headSha: 'sha7' }); - + await claimReviewDispatch(key, 'check-suite-success', { prNumber: 7, headSha: 'sha7' }); + await claimReviewDispatch(key, 'post-completion-hook', { prNumber: 7, headSha: 'sha7' }); expect(mockLogger.info).toHaveBeenCalledWith( 'Review already dispatched for this PR+SHA, skipping', expect.objectContaining({ - trigger: 'check-suite-success', + trigger: 'post-completion-hook', reviewDispatchKey: key, prNumber: 7, headSha: 'sha7', @@ -112,90 +189,130 @@ describe('claimReviewDispatch', () => { ); }); - it('TTL expiration: a previously claimed key can be reclaimed after 5+ minutes', () => { + it('TTL expiration: a previously claimed key can be reclaimed after 5+ minutes', async () => { const key = buildReviewDispatchKey('acme', 'repo', 10, 'sha10'); - claimReviewDispatch(key, 'check-suite-success', { prNumber: 10, headSha: 'sha10' }); + await claimReviewDispatch(key, 'check-suite-success', { prNumber: 10, headSha: 'sha10' }); - // Advance time past the TTL - vi.advanceTimersByTime(DEDUP_TTL_MS + 1); + // Manually expire the entry by pushing its TTL into the past. + // (vi.useFakeTimers doesn't help here because Date.now is consulted + // inside the mock store; advancing real time is too slow for tests.) + const stored = sharedStore.get(`cascade:review-dedup:${key}`); + if (stored) stored.expiresAtMs = Date.now() - 1; - const result = claimReviewDispatch(key, 'check-suite-success', { - prNumber: 10, - headSha: 'sha10', - }); - expect(result).toBe(true); + expect( + await claimReviewDispatch(key, 'check-suite-success', { prNumber: 10, headSha: 'sha10' }), + ).toBe(true); }); - it('does not expire a key before the TTL has elapsed', () => { + it('does not expire a key before the TTL has elapsed', async () => { const key = buildReviewDispatchKey('acme', 'repo', 11, 'sha11'); - claimReviewDispatch(key, 'check-suite-success', { prNumber: 11, headSha: 'sha11' }); - - // Advance time to just before the TTL - vi.advanceTimersByTime(DEDUP_TTL_MS - 1); - - const result = claimReviewDispatch(key, 'check-suite-success', { - prNumber: 11, - headSha: 'sha11', - }); - expect(result).toBe(false); + await claimReviewDispatch(key, 'check-suite-success', { prNumber: 11, headSha: 'sha11' }); + const stored = sharedStore.get(`cascade:review-dedup:${key}`); + expect(stored?.expiresAtMs).toBeGreaterThan(Date.now() + DEDUP_TTL_MS - 5_000); + expect( + await claimReviewDispatch(key, 'check-suite-success', { prNumber: 11, headSha: 'sha11' }), + ).toBe(false); }); - it('cleanupExpiredEntries removes stale entries when claimReviewDispatch is called', () => { - const key1 = buildReviewDispatchKey('acme', 'repo', 20, 'sha20'); - const key2 = buildReviewDispatchKey('acme', 'repo', 21, 'sha21'); - - claimReviewDispatch(key1, 'check-suite-success', { prNumber: 20, headSha: 'sha20' }); - - // Advance time past the TTL so key1 becomes stale - vi.advanceTimersByTime(DEDUP_TTL_MS + 1); - - // Claiming key2 triggers cleanupExpiredEntries which should remove key1 - claimReviewDispatch(key2, 'check-suite-success', { prNumber: 21, headSha: 'sha21' }); + it('namespaces the key under cascade:review-dedup: in Redis', async () => { + const key = buildReviewDispatchKey('acme', 'repo', 99, 'sha99'); + await claimReviewDispatch(key, 'check-suite-success', { prNumber: 99, headSha: 'sha99' }); + expect(sharedStore.has(`cascade:review-dedup:${key}`)).toBe(true); + expect(sharedStore.has(key)).toBe(false); // un-namespaced must NOT be present + }); - expect(recentlyDispatched.has(key1)).toBe(false); - expect(recentlyDispatched.has(key2)).toBe(true); + it('fails closed when Redis errors, returning false and capturing to Sentry', async () => { + const { Redis } = await import('ioredis'); + // Patch the prototype to force `set` to throw on the next call. + const realSet = (Redis.prototype as unknown as { set: (...a: unknown[]) => unknown }).set; + (Redis.prototype as unknown as { set: () => unknown }).set = () => { + throw new Error('connection refused'); + }; + + try { + const key = buildReviewDispatchKey('acme', 'repo', 50, 'sha50'); + expect( + await claimReviewDispatch(key, 'check-suite-success', { prNumber: 50, headSha: 'sha50' }), + ).toBe(false); + expect(mockLogger.error).toHaveBeenCalledWith( + 'Review-dispatch dedup Redis call failed — failing closed', + expect.objectContaining({ reviewDispatchKey: key }), + ); + expect(mockCaptureException).toHaveBeenCalledWith( + expect.any(Error), + expect.objectContaining({ + tags: expect.objectContaining({ source: 'review_dedup_redis_down' }), + }), + ); + } finally { + (Redis.prototype as unknown as { set: typeof realSet }).set = realSet; + } }); }); describe('releaseReviewDispatch', () => { - beforeEach(() => { - recentlyDispatched.clear(); - }); - - afterEach(() => { - recentlyDispatched.clear(); - }); - - it('removes a claimed key so it can be reclaimed immediately', () => { + it('removes a claimed key so it can be reclaimed immediately', async () => { const key = buildReviewDispatchKey('acme', 'repo', 30, 'sha30'); - claimReviewDispatch(key, 'check-suite-success', { prNumber: 30, headSha: 'sha30' }); - - releaseReviewDispatch(key); - - const result = claimReviewDispatch(key, 'check-suite-success', { - prNumber: 30, - headSha: 'sha30', - }); - expect(result).toBe(true); + await claimReviewDispatch(key, 'check-suite-success', { prNumber: 30, headSha: 'sha30' }); + await releaseReviewDispatch(key); + expect( + await claimReviewDispatch(key, 'check-suite-success', { prNumber: 30, headSha: 'sha30' }), + ).toBe(true); }); - it('is a no-op for a key that was never claimed', () => { + it('is a no-op for a key that was never claimed', async () => { const key = buildReviewDispatchKey('acme', 'repo', 31, 'sha31'); - // Should not throw - expect(() => releaseReviewDispatch(key)).not.toThrow(); - expect(recentlyDispatched.has(key)).toBe(false); + await expect(releaseReviewDispatch(key)).resolves.toBeUndefined(); }); - it('only removes the specified key, leaving others intact', () => { + it('only removes the specified key, leaving others intact', async () => { const key1 = buildReviewDispatchKey('acme', 'repo', 40, 'sha40'); const key2 = buildReviewDispatchKey('acme', 'repo', 41, 'sha41'); + await claimReviewDispatch(key1, 'check-suite-success', { prNumber: 40, headSha: 'sha40' }); + await claimReviewDispatch(key2, 'check-suite-success', { prNumber: 41, headSha: 'sha41' }); + await releaseReviewDispatch(key1); + expect(sharedStore.has(`cascade:review-dedup:${key1}`)).toBe(false); + expect(sharedStore.has(`cascade:review-dedup:${key2}`)).toBe(true); + }); +}); - claimReviewDispatch(key1, 'check-suite-success', { prNumber: 40, headSha: 'sha40' }); - claimReviewDispatch(key2, 'check-suite-success', { prNumber: 41, headSha: 'sha41' }); - - releaseReviewDispatch(key1); +// ─── Cross-process invariant ──────────────────────────────────────────────── +// +// THIS is the regression pin for ucho/PR #194 (2026-05-01). Two cascade +// processes — the IMPL worker (post-completion-hook) and the router +// (check-suite-success) — both claimed the same dedup key from their own +// in-memory Map and BOTH dispatched a review. With Redis-backed dedup, the +// second process MUST see the first's claim. +// +// We simulate "two processes" by instantiating two IORedis clients from +// scratch via `new Redis()` (vi.mock's factory is shared, so both reach the +// same in-memory store — exactly mirroring two real processes hitting the +// same Redis backend). + +describe('cross-process dedup invariant (PR #194 regression pin)', () => { + // Direct-instance test: two IORedis clients constructed from scratch + // against the same Redis URL. Mirrors the real-prod shape where the + // router process and the IMPL worker process each instantiate their own + // client. Pre-PR-#1248 the dedup was an in-memory `Map` per process and + // these two clients would have observed independent state — both + // dispatches succeeded, both burned LLM tokens. The Redis-backed + // `SET NX EX` primitive must reject the second claim atomically. + it('two distinct IORedis instances against the shared backend share dedup state', async () => { + const { Redis } = await import('ioredis'); + const routerProcessClient = new Redis('redis://localhost:6379'); + const workerProcessClient = new Redis('redis://localhost:6379'); + + const key = `cascade:review-dedup:${buildReviewDispatchKey('zbigniewsobiecki', 'ucho', 194, '9ed484df')}`; + const firstResult = await routerProcessClient.set(key, 'check-suite-success', 'EX', 300, 'NX'); + const secondResult = await workerProcessClient.set( + key, + 'post-completion-hook', + 'EX', + 300, + 'NX', + ); - expect(recentlyDispatched.has(key1)).toBe(false); - expect(recentlyDispatched.has(key2)).toBe(true); + expect(firstResult).toBe('OK'); + expect(secondResult).toBeNull(); }); }); diff --git a/tests/unit/triggers/review-requested.test.ts b/tests/unit/triggers/review-requested.test.ts index 67b7e7ff..7726ae7d 100644 --- a/tests/unit/triggers/review-requested.test.ts +++ b/tests/unit/triggers/review-requested.test.ts @@ -8,7 +8,15 @@ vi.mock('../../../src/triggers/config-resolver.js', () => mockConfigResolverModu vi.mock('../../../src/triggers/shared/trigger-check.js', () => mockTriggerCheckModule); -import { recentlyDispatched } from '../../../src/triggers/github/review-dispatch-dedup.js'; +const mockClaimReviewDispatch = vi.fn().mockResolvedValue(true); +const mockReleaseReviewDispatch = vi.fn().mockResolvedValue(undefined); +vi.mock('../../../src/triggers/github/review-dispatch-dedup.js', () => ({ + buildReviewDispatchKey: (owner: string, repo: string, prNumber: number, headSha: string) => + `${owner}/${repo}:${prNumber}:${headSha}`, + claimReviewDispatch: (...args: unknown[]) => mockClaimReviewDispatch(...args), + releaseReviewDispatch: (...args: unknown[]) => mockReleaseReviewDispatch(...args), +})); + import { ReviewRequestedTrigger } from '../../../src/triggers/github/review-requested.js'; import type { TriggerContext } from '../../../src/triggers/types.js'; import { createMockProject } from '../../helpers/factories.js'; @@ -29,7 +37,8 @@ describe('ReviewRequestedTrigger', () => { beforeEach(() => { vi.mocked(lookupWorkItemForPR).mockResolvedValue('abc123'); vi.mocked(checkTriggerEnabled).mockResolvedValue(true); - recentlyDispatched.clear(); + mockClaimReviewDispatch.mockReset().mockResolvedValue(true); + mockReleaseReviewDispatch.mockReset().mockResolvedValue(undefined); }); const makeReviewRequestedPayload = (reviewerLogin = 'cascade-reviewer') => ({ @@ -230,9 +239,9 @@ describe('ReviewRequestedTrigger', () => { }); it('overrides a prior dispatch and fires even when the same PR+SHA was already dispatched', async () => { - // Human-initiated review requests always supersede automated dispatch claims. - recentlyDispatched.set('owner/repo:42:abc123', Date.now()); - + // Human-initiated review requests always supersede automated dispatch claims: + // the handler unconditionally calls release before claim. We assert that + // release is invoked AND claim succeeds (post-release the slot is free). const ctx: TriggerContext = { project: mockProject, source: 'github', @@ -243,6 +252,13 @@ describe('ReviewRequestedTrigger', () => { const result = await trigger.handle(ctx); expect(result?.agentType).toBe('review'); + // The release-before-claim sequence: + expect(mockReleaseReviewDispatch).toHaveBeenCalledWith('owner/repo:42:abc123'); + expect(mockClaimReviewDispatch).toHaveBeenCalledWith( + 'owner/repo:42:abc123', + 'review-requested', + expect.objectContaining({ prNumber: 42, headSha: 'abc123' }), + ); }); });