Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/triggers/github/check-suite-success.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import {
resolveWorkItemId,
} from './utils.js';

export { recentlyDispatched } from './review-dispatch-dedup.js';

/**
* Dispatches an outcome agent when a check_suite completes with success
* conclusion on a PR authored by the implementer persona.
Expand Down Expand Up @@ -235,9 +233,11 @@ export class CheckSuiteSuccessTrigger implements TriggerHandler {
}

// PR+SHA-scoped dedup prevents duplicate reviews across both duplicate
// check_suite deliveries and other review-producing triggers.
// check_suite deliveries and other review-producing triggers — including
// the post-completion-hook in the IMPL worker process. Backed by Redis
// so the dedup holds across processes (router + workers + future replicas).
const dedupKey = buildReviewDispatchKey(owner, repo, prNumber, headSha);
if (!claimReviewDispatch(dedupKey, this.name, { prNumber, headSha })) {
if (!(await claimReviewDispatch(dedupKey, this.name, { prNumber, headSha }))) {
return skip(
this.name,
`Review dispatch for PR #${prNumber}@${headSha} already claimed by another path (dedup)`,
Expand Down Expand Up @@ -271,7 +271,10 @@ export class CheckSuiteSuccessTrigger implements TriggerHandler {
workItemId,
workItemUrl,
workItemTitle,
onBlocked: () => releaseReviewDispatch(dedupKey),
onBlocked: () => {
// Fire-and-forget — release is best-effort and the TTL is the safety net.
void releaseReviewDispatch(dedupKey);
},
};
}
}
155 changes: 124 additions & 31 deletions src/triggers/github/review-dispatch-dedup.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,64 @@
/**
* Review-dispatch deduplication, Redis-backed.
*
* `claimReviewDispatch(key, ...)` returns `true` exactly once per key within
* the TTL window — across ALL processes that share the same Redis backend.
* Subsequent calls (from the same or any other process) return `false` and
* the caller must skip the dispatch.
*
* Why Redis (and not the in-memory Map this module used to be):
* the dedup key (`owner/repo:prNumber:headSha`) is claimed from at least
* THREE distinct processes:
* 1. Router process — `check-suite-success` and `review-requested` triggers.
* 2. IMPL worker process — `agent-execution.ts` post-completion-hook
* (fires the review immediately after impl completes, regardless of
* whether GitHub eventually delivers the check_suite-success event).
* 3. Future router replicas / horizontally-scaled deployments.
*
* The pre-Redis Map was module-scoped and per-process: each process started
* with an empty Map, so the dedup never crossed process boundaries.
* Production confirmed live duplicate dispatch on ucho/PR #194 (2026-05-01) —
* post-completion-hook (worker) and check-suite-success (router) BOTH
* dispatched a review for the same SHA, both claimed `true` from their own
* empty Map, both burned LLM tokens. See PR #1248 for the diagnosis.
*
* Redis primitive: `SET key value NX EX <ttl>` — atomic check-and-set with
* TTL. Returns `'OK'` on first claim, `null` on duplicate. No race window.
*
* Failure mode: when Redis is unreachable, `claim` returns `false` (treats
* the call as a duplicate) and Sentry-captures under `review_dedup_redis_down`.
* Better to skip a legit dispatch than to dispatch a duplicate; mirrors
* spec-017's fail-closed pipeline-capacity-gate posture.
*/

import { Redis } from 'ioredis';
import { captureException } from '../../sentry.js';
import { logger } from '../../utils/logging.js';

// 5-minute TTL — short enough to never wedge the dispatch path for a clean
// PR. Originally 30 min; shortened in PR #1245 follow-up after the
// success-handler refactor eliminated the worker-bail-out path. With the new
// defer-on-incomplete behavior, dispatches correlate with actually-running
// workers, so the longer TTL had no defensive value and amplified incidents.
const DEDUP_TTL_MS = 5 * 60 * 1000;
// 5 minutes — kept short because dispatches now correlate with actually-
// running workers (post the PR #1246 defer-on-incomplete refactor); a longer
// TTL has no defensive value and amplifies any wedged-state incident.
const DEDUP_TTL_SEC = 5 * 60;

const KEY_NS = 'cascade:review-dedup:';

let redisInstance: Redis | null = null;

export const recentlyDispatched = new Map<string, number>();
/**
* Lazy singleton — first call connects, subsequent calls reuse the same
* client. The worker process pays the connection cost only if it actually
* dispatches a review (post-completion-hook).
*/
function getRedis(): Redis {
if (!redisInstance) {
const redisUrl = process.env.REDIS_URL;
if (!redisUrl) {
throw new Error('REDIS_URL is required for review-dispatch dedup');
}
redisInstance = new Redis(redisUrl);
}
return redisInstance;
}

export function buildReviewDispatchKey(
owner: string,
Expand All @@ -18,42 +69,84 @@ export function buildReviewDispatchKey(
return `${owner}/${repo}:${prNumber}:${headSha}`;
}

function cleanupExpiredEntries(now: number): void {
for (const [key, ts] of recentlyDispatched) {
if (now - ts > DEDUP_TTL_MS) {
recentlyDispatched.delete(key);
}
}
}

export function claimReviewDispatch(
/**
* Atomically claim a dispatch slot for the given key. Returns `true` exactly
* once per key within the TTL window across ALL connected processes.
*
* Fails closed on Redis errors: returns `false` so the caller skips the
* dispatch. Sentry-captures the underlying error under
* `review_dedup_redis_down`.
*/
export async function claimReviewDispatch(
key: string,
triggerName: string,
context: { prNumber: number; headSha: string },
): boolean {
const now = Date.now();
cleanupExpiredEntries(now);

if (recentlyDispatched.has(key)) {
): Promise<boolean> {
const namespacedKey = `${KEY_NS}${key}`;
try {
const result = await getRedis().set(namespacedKey, triggerName, 'EX', DEDUP_TTL_SEC, 'NX');
if (result === 'OK') {
logger.info('Claimed review dispatch for PR+SHA', {
trigger: triggerName,
reviewDispatchKey: key,
prNumber: context.prNumber,
headSha: context.headSha,
});
return true;
}
logger.info('Review already dispatched for this PR+SHA, skipping', {
trigger: triggerName,
reviewDispatchKey: key,
prNumber: context.prNumber,
headSha: context.headSha,
});
return false;
} catch (err) {
logger.error('Review-dispatch dedup Redis call failed — failing closed', {
trigger: triggerName,
reviewDispatchKey: key,
error: String(err),
});
captureException(err, {
tags: { source: 'review_dedup_redis_down' },
extra: { reviewDispatchKey: key, trigger: triggerName },
level: 'error',
});
return false;
}
}

recentlyDispatched.set(key, now);
logger.info('Claimed review dispatch for PR+SHA', {
trigger: triggerName,
reviewDispatchKey: key,
prNumber: context.prNumber,
headSha: context.headSha,
});
return true;
/**
* Release a previously-claimed dispatch slot. Used by `onBlocked` callbacks
* when downstream rejects the dispatch (work-item lock, agent-type
* concurrency, etc.) so the next legitimate trigger can claim.
*
* Errors are logged but never thrown — release is best-effort, and the TTL
* is the safety net.
*/
export async function releaseReviewDispatch(key: string): Promise<void> {
const namespacedKey = `${KEY_NS}${key}`;
try {
await getRedis().del(namespacedKey);
} catch (err) {
logger.warn('Review-dispatch dedup release failed (TTL will reap)', {
reviewDispatchKey: key,
error: String(err),
});
}
}

export function releaseReviewDispatch(key: string): void {
recentlyDispatched.delete(key);
/**
* Test-only: flush the dedup namespace and reset the singleton. Intended for
* `beforeEach` in unit tests and for the integration suite's per-test
* cleanup. Never call from production code.
*
* @internal
*/
export async function __resetForTests(): Promise<void> {
if (!redisInstance) return;
const keys = await redisInstance.keys(`${KEY_NS}*`);
if (keys.length > 0) await redisInstance.del(...keys);
await redisInstance.quit().catch(() => {});
redisInstance = null;
}
8 changes: 5 additions & 3 deletions src/triggers/github/review-requested.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ export class ReviewRequestedTrigger implements TriggerHandler {
const workItemId = await resolveWorkItemId(ctx.project.id, prNumber);
const reviewDispatchKey = buildReviewDispatchKey(owner, repo, prNumber, headSha);
// Human-initiated review requests override any prior automated dispatch claim.
releaseReviewDispatch(reviewDispatchKey);
if (!claimReviewDispatch(reviewDispatchKey, this.name, { prNumber, headSha })) {
await releaseReviewDispatch(reviewDispatchKey);
if (!(await claimReviewDispatch(reviewDispatchKey, this.name, { prNumber, headSha }))) {
return skip(
this.name,
`Review dispatch for PR #${prNumber}@${headSha} already claimed by another path (dedup)`,
Expand Down Expand Up @@ -128,7 +128,9 @@ export class ReviewRequestedTrigger implements TriggerHandler {
prUrl: payload.pull_request.html_url,
prTitle: payload.pull_request.title,
workItemId,
onBlocked: () => releaseReviewDispatch(reviewDispatchKey),
onBlocked: () => {
void releaseReviewDispatch(reviewDispatchKey);
},
};
}
}
2 changes: 1 addition & 1 deletion src/triggers/shared/agent-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ async function tryDispatchPostCompletionReview(
}

const dedupKey = buildReviewDispatchKey(owner, repo, prNumber, headSha);
if (!claimReviewDispatch(dedupKey, 'post-completion-hook', { prNumber, headSha })) {
if (!(await claimReviewDispatch(dedupKey, 'post-completion-hook', { prNumber, headSha }))) {
logger.info('Skipping post-completion review: already dispatched', {
prNumber,
workItemId,
Expand Down
45 changes: 30 additions & 15 deletions tests/unit/triggers/check-suite-success.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@ vi.mock('../../../src/triggers/shared/trigger-check.js', () => mockTriggerCheckM

vi.mock('../../../src/github/client.js', () => mockGitHubClientModule);

// Stub the Redis-backed dedup module so tests don't need a Redis connection.
// Each `claim` resolves to true (success) by default; per-test overrides via
// `mockClaimReviewDispatch.mockResolvedValueOnce(false)` simulate a duplicate.
const mockClaimReviewDispatch = vi.fn().mockResolvedValue(true);
const mockReleaseReviewDispatch = vi.fn().mockResolvedValue(undefined);
vi.mock('../../../src/triggers/github/review-dispatch-dedup.js', () => ({
buildReviewDispatchKey: (owner: string, repo: string, prNumber: number, headSha: string) =>
`${owner}/${repo}:${prNumber}:${headSha}`,
claimReviewDispatch: (...args: unknown[]) => mockClaimReviewDispatch(...args),
releaseReviewDispatch: (...args: unknown[]) => mockReleaseReviewDispatch(...args),
}));

import { githubClient } from '../../../src/github/client.js';
import { resetFixAttempts } from '../../../src/triggers/github/check-suite-failure.js';
import {
CheckSuiteSuccessTrigger,
recentlyDispatched,
} from '../../../src/triggers/github/check-suite-success.js';
import { CheckSuiteSuccessTrigger } from '../../../src/triggers/github/check-suite-success.js';
import { ReviewRequestedTrigger } from '../../../src/triggers/github/review-requested.js';
import type { TriggerContext } from '../../../src/triggers/types.js';
import { createCheckSuitePayload, createMockProject } from '../../helpers/factories.js';
Expand All @@ -43,7 +52,8 @@ describe('CheckSuiteSuccessTrigger', () => {

beforeEach(() => {
vi.mocked(lookupWorkItemForPR).mockResolvedValue('abc123');
recentlyDispatched.clear();
mockClaimReviewDispatch.mockReset().mockResolvedValue(true);
mockReleaseReviewDispatch.mockReset().mockResolvedValue(undefined);
resetFixAttempts(42);
// Default: aggregate status reflects all checks passing. Tests that need
// a mixed-state SHA override this per-case.
Expand Down Expand Up @@ -239,6 +249,9 @@ describe('CheckSuiteSuccessTrigger', () => {
user: { login: 'cascade-impl' },
});
vi.mocked(githubClient.getPRReviews).mockResolvedValue([]);
// Simulate Redis state: review-requested claims first (true), then
// check-suite-success loses the SET NX EX race (false).
mockClaimReviewDispatch.mockReset().mockResolvedValueOnce(true).mockResolvedValueOnce(false);

const reviewRequestedContext: TriggerContext = {
project: mockProject,
Expand Down Expand Up @@ -618,6 +631,10 @@ describe('CheckSuiteSuccessTrigger', () => {
});
vi.mocked(githubClient.getPRReviews).mockResolvedValue([]);

// Simulate the Redis-backed dedup: first claim succeeds, second loses
// the SET NX EX race and returns false.
mockClaimReviewDispatch.mockReset().mockResolvedValueOnce(true).mockResolvedValueOnce(false);

const ctx: TriggerContext = {
project: mockProject,
source: 'github',
Expand Down Expand Up @@ -660,16 +677,14 @@ describe('CheckSuiteSuccessTrigger', () => {
const result = await trigger.handle(ctx);
expect(result).not.toBeNull();
expect(result?.onBlocked).toBeTypeOf('function');
expect(recentlyDispatched.size).toBe(1);
// First handle() called claim once.
expect(mockClaimReviewDispatch).toHaveBeenCalledTimes(1);

// Simulate router calling onBlocked (work-item lock or concurrency block)
// Simulate router calling onBlocked (work-item lock or concurrency block) —
// it should release the dedup so a subsequent legitimate trigger can claim.
result?.onBlocked?.();
expect(recentlyDispatched.size).toBe(0);

// After onBlocked, a subsequent call should succeed (not be deduped)
const result2 = await trigger.handle(ctx);
expect(result2).not.toBeNull();
expect(result2?.agentType).toBe('review');
expect(mockReleaseReviewDispatch).toHaveBeenCalledTimes(1);
expect(mockReleaseReviewDispatch).toHaveBeenCalledWith('owner/repo:42:sha123');
});

it('allows review for same PR with a new SHA after dedup', async () => {
Expand Down Expand Up @@ -1196,8 +1211,8 @@ describe('CheckSuiteSuccessTrigger', () => {
const implResult = await trigger.handle(implCtx);
expect(implResult).not.toBeNull();

// External PR — clear dedup since we're testing author mode, not dedup
recentlyDispatched.clear();
// External PR — reset dedup mock since we're testing author mode, not dedup
mockClaimReviewDispatch.mockReset().mockResolvedValue(true);
vi.mocked(lookupWorkItemForPR).mockResolvedValue(null);
setupMocks('external-contributor');
const extCtx: TriggerContext = {
Expand Down
Loading
Loading