From 70c7326348085932ec482056279064715201f727 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Mon, 13 Apr 2026 23:07:30 -0700 Subject: [PATCH 1/9] feat: expose idempotency key to HTTP, MCP, and CLI callers The idempotency engine (fingerprint-based, single-flight, STATE_CONFLICT on mismatch) already existed at the operation layer but was unreachable from external callers. This wires it through all three surfaces so agents and scripts can submit retry-safe contributions. - HTTP: Idempotency-Key header on POST /api/contributions (RFC 8284) - MCP: idempotencyKey param on all 5 contribution tools (shared Zod fragment) - CLI: --idempotency-key flag on grove contribute - Core: idempotencyKey added to ReviewInput, ReproduceInput, DiscussInput, AdoptInput - Docs: docs/idempotency.md with full contract reference - Tests: 15 new integration tests across all three surfaces Closes #209 --- docs/idempotency.md | 69 +++++++++++++ src/cli/commands/contribute.test.ts | 97 +++++++++++++++++- src/cli/commands/contribute.ts | 6 ++ src/core/operations/contribute.ts | 12 ++- src/mcp/tools/contributions.test.ts | 151 ++++++++++++++++++++++++++++ src/mcp/tools/contributions.ts | 34 +++++++ src/server/e2e.test.ts | 77 +++++++++++++- src/server/routes/contributions.ts | 4 + 8 files changed, 444 insertions(+), 6 deletions(-) create mode 100644 docs/idempotency.md diff --git a/docs/idempotency.md b/docs/idempotency.md new file mode 100644 index 00000000..99ebe33d --- /dev/null +++ b/docs/idempotency.md @@ -0,0 +1,69 @@ +# Idempotency + +Grove contributions support explicit idempotency keys for retry-safe submissions. The semantics follow [RFC 8284](https://datatracker.ietf.org/doc/draft-ietf-httpapi-idempotency-key-header/) and Stripe's `Idempotency-Key` convention. + +## How to use + +### MCP tools + +Pass the optional `idempotencyKey` parameter to any contribution tool: + +```json +{ + "summary": "Fix the parser bug", + "artifacts": { "fix.ts": "blake3:..." }, + "agent": { "agentId": "coder-1", "role": "coder" }, + "idempotencyKey": "my-unique-key" +} +``` + +Available on: `grove_submit_work`, `grove_submit_review`, `grove_discuss`, `grove_reproduce`, `grove_adopt`. + +### HTTP API + +Pass the `Idempotency-Key` header on `POST /api/contributions`: + +```http +POST /api/contributions +Content-Type: application/json +Idempotency-Key: my-unique-key + +{ "kind": "work", "summary": "...", ... } +``` + +### CLI + +Pass `--idempotency-key` to `grove contribute`: + +```bash +grove contribute --summary "Fix parser" --idempotency-key my-unique-key +``` + +## Semantics + +| Scenario | Behavior | +|----------|----------| +| Same key + same input | Returns cached result (retry) | +| Same key + different input | Returns `STATE_CONFLICT` error (HTTP 409) | +| Same key + in-flight request | Awaits the pending write (single-flight) | +| No key provided | No deduplication — each call creates a new contribution | + +## Key details + +- **Scope**: Keys are namespaced per agent (`agent.role` if set, otherwise `agent.agentId`). Two different agents can use the same key without colliding. +- **TTL**: Cached results expire after **5 minutes**. After expiry, the key can be reused. +- **Cache size**: Up to 1024 entries (LRU eviction when full). +- **Process-local**: The cache is in-memory and not shared across processes. Clients running multiple grove instances must coordinate keys externally. +- **Fingerprint coverage**: The conflict check hashes `kind`, `mode`, `summary`, `description`, `artifacts` (name + hash), `relations`, `scores`, `tags`, `context`, and agent scope. Any difference in these fields triggers `STATE_CONFLICT` on key reuse. + +## Key format + +Keys are opaque strings. UUIDv4 or UUIDv7 are recommended. The key itself is not stored in the contribution — it only controls deduplication during the cache TTL window. + +## When to use + +- **Agent retry loops**: Generate a key before the first attempt, reuse it on retries. +- **Network retries**: If a submission times out, replay with the same key to avoid duplicates. +- **Iterative work**: When an agent intentionally resubmits with updated artifacts under the same summary, use a **new key** (or no key) so the submission is not suppressed. + +Idempotency keys are optional. Callers that don't need retry safety can omit the key entirely. diff --git a/src/cli/commands/contribute.test.ts b/src/cli/commands/contribute.test.ts index e4467d69..7425a3cb 100644 --- a/src/cli/commands/contribute.test.ts +++ b/src/cli/commands/contribute.test.ts @@ -4,10 +4,11 @@ * Covers argument parsing, validation, execution logic, and edge cases. */ -import { describe, expect, test } from "bun:test"; +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { mkdir, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; +import { _resetIdempotencyCacheForTests } from "../../core/operations/contribute.js"; import type { ContributeOptions } from "./contribute.js"; import { executeContribute, @@ -265,6 +266,16 @@ describe("parseContributeArgs", () => { expect(opts.agentOverrides.agentId).toBe("my-agent"); expect(opts.agentOverrides.provider).toBe("openai"); }); + + test("parses --idempotency-key flag", () => { + const opts = parseContributeArgs(["--summary", "test", "--idempotency-key", "my-key-1"]); + expect(opts.idempotencyKey).toBe("my-key-1"); + }); + + test("idempotencyKey is undefined when --idempotency-key is omitted", () => { + const opts = parseContributeArgs(["--summary", "test"]); + expect(opts.idempotencyKey).toBeUndefined(); + }); }); // --------------------------------------------------------------------------- @@ -1235,3 +1246,87 @@ describe("grove contribute E2E", () => { } }); }); + +// --------------------------------------------------------------------------- +// Idempotency key plumbing (CLI surface) +// --------------------------------------------------------------------------- + +describe("executeContribute: idempotencyKey", () => { + beforeEach(() => { + _resetIdempotencyCacheForTests(); + }); + + afterEach(() => { + _resetIdempotencyCacheForTests(); + }); + + test("same idempotency key + same input returns the cached CID", async () => { + const dir = await createTempDir(); + try { + await executeInit(makeInitOptions(dir)); + + const opts = makeContributeOptions({ + summary: "Idempotent CLI work", + idempotencyKey: "cli-key-1", + cwd: dir, + }); + + const first = await executeContribute(opts); + expect(first.cid).toMatch(/^blake3:/); + + const second = await executeContribute(opts); + expect(second.cid).toBe(first.cid); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); + + test("same idempotency key + different input throws STATE_CONFLICT", async () => { + const dir = await createTempDir(); + try { + await executeInit(makeInitOptions(dir)); + + const first = await executeContribute( + makeContributeOptions({ + summary: "First CLI work", + idempotencyKey: "cli-conflict-key", + cwd: dir, + }), + ); + expect(first.cid).toMatch(/^blake3:/); + + await expect( + executeContribute( + makeContributeOptions({ + summary: "Different work, same key", + idempotencyKey: "cli-conflict-key", + cwd: dir, + }), + ), + ).rejects.toThrow(/different request body/i); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); + + test("no idempotency key produces distinct contributions", async () => { + const dir = await createTempDir(); + try { + await executeInit(makeInitOptions(dir)); + + const opts = makeContributeOptions({ + summary: "No key CLI work", + cwd: dir, + }); + + const first = await executeContribute(opts); + const second = await executeContribute(opts); + + expect(first.cid).toMatch(/^blake3:/); + expect(second.cid).toMatch(/^blake3:/); + expect(second.cid).not.toBe(first.cid); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/src/cli/commands/contribute.ts b/src/cli/commands/contribute.ts index 955c2636..1f99f895 100644 --- a/src/cli/commands/contribute.ts +++ b/src/cli/commands/contribute.ts @@ -59,6 +59,9 @@ export interface ContributeOptions { // Metadata readonly tags: readonly string[]; + // Idempotency + readonly idempotencyKey?: string | undefined; + // Agent readonly agentOverrides: AgentOverrides; @@ -101,6 +104,7 @@ export function parseContributeArgs(args: readonly string[]): ContributeOptions metric: { type: "string", multiple: true, default: [] }, score: { type: "string", multiple: true, default: [] }, tag: { type: "string", multiple: true, default: [] }, + "idempotency-key": { type: "string" }, "agent-id": { type: "string" }, "agent-name": { type: "string" }, provider: { type: "string" }, @@ -130,6 +134,7 @@ export function parseContributeArgs(args: readonly string[]): ContributeOptions metric: values.metric as string[], score: values.score as string[], tags: values.tag as string[], + idempotencyKey: values["idempotency-key"] as string | undefined, agentOverrides: { agentId: values["agent-id"] as string | undefined, agentName: values["agent-name"] as string | undefined, @@ -484,6 +489,7 @@ export async function executeContribute(options: ContributeOptions): Promise<{ c ...(scores !== undefined ? { scores } : {}), tags: [...options.tags], agent: options.agentOverrides, + ...(options.idempotencyKey !== undefined ? { idempotencyKey: options.idempotencyKey } : {}), }; const result = await contributeOperation(input, opDeps); diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index f559be79..2fbd61f9 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -123,6 +123,7 @@ export interface ReviewInput { readonly context?: Readonly> | undefined; readonly agent?: AgentOverrides | undefined; readonly metadata?: Readonly> | undefined; + readonly idempotencyKey?: string | undefined; } /** Input for the reproduce operation. */ @@ -136,6 +137,7 @@ export interface ReproduceInput { readonly tags?: readonly string[] | undefined; readonly context?: Readonly> | undefined; readonly agent?: AgentOverrides | undefined; + readonly idempotencyKey?: string | undefined; } /** Input for the discuss operation. */ @@ -146,6 +148,7 @@ export interface DiscussInput { readonly tags?: readonly string[] | undefined; readonly context?: Readonly> | undefined; readonly agent?: AgentOverrides | undefined; + readonly idempotencyKey?: string | undefined; } /** Input for the adopt operation. */ @@ -156,6 +159,7 @@ export interface AdoptInput { readonly tags?: readonly string[] | undefined; readonly context?: Readonly> | undefined; readonly agent?: AgentOverrides | undefined; + readonly idempotencyKey?: string | undefined; } /** Result of an adopt operation. */ @@ -1122,7 +1126,7 @@ export async function reviewOperation( relations, tags: input.tags, agent: input.agent, - ...pickDefined(input, ["description", "scores", "context"]), + ...pickDefined(input, ["description", "scores", "context", "idempotencyKey"]), }, deps, ); @@ -1165,7 +1169,7 @@ export async function reproduceOperation( relations, tags: input.tags, agent: input.agent, - ...pickDefined(input, ["description", "scores", "context"]), + ...pickDefined(input, ["description", "scores", "context", "idempotencyKey"]), }, deps, ); @@ -1206,7 +1210,7 @@ export async function discussOperation( relations, tags: input.tags, agent: input.agent, - ...pickDefined(input, ["description", "context"]), + ...pickDefined(input, ["description", "context", "idempotencyKey"]), }, deps, ); @@ -1245,7 +1249,7 @@ export async function adoptOperation( relations, tags: input.tags, agent: input.agent, - ...pickDefined(input, ["description", "context"]), + ...pickDefined(input, ["description", "context", "idempotencyKey"]), }, deps, ); diff --git a/src/mcp/tools/contributions.test.ts b/src/mcp/tools/contributions.test.ts index 3029caf8..d4346998 100644 --- a/src/mcp/tools/contributions.test.ts +++ b/src/mcp/tools/contributions.test.ts @@ -14,6 +14,7 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { _resetIdempotencyCacheForTests } from "../../core/operations/contribute.js"; import { makeContribution } from "../../core/test-helpers.js"; import type { McpDeps } from "../deps.js"; import type { TestMcpDeps } from "../test-helpers.js"; @@ -589,3 +590,153 @@ describe("grove_adopt", () => { expect(data.kind).toBe("adoption"); }); }); + +// --------------------------------------------------------------------------- +// Idempotency key plumbing (MCP surface) +// --------------------------------------------------------------------------- + +describe("MCP idempotencyKey plumbing", () => { + let testDeps: TestMcpDeps; + let deps: McpDeps; + let server: McpServer; + + beforeEach(async () => { + _resetIdempotencyCacheForTests(); + testDeps = await createTestMcpDeps(); + deps = testDeps.deps; + server = new McpServer({ name: "test", version: "0.0.1" }, { capabilities: { tools: {} } }); + registerContributionTools(server, deps); + }); + + afterEach(async () => { + await testDeps.cleanup(); + _resetIdempotencyCacheForTests(); + }); + + test("grove_submit_work: same key + same input returns cached result", async () => { + const hash = await storeTestContent(deps.cas, "hello world"); + const args = { + summary: "Idempotent work", + artifacts: { "file.ts": hash }, + agent: { agentId: "coder-1", role: "coder" }, + idempotencyKey: "work-key-1", + }; + + const first = await callTool(server, "grove_submit_work", args); + expect(first.isError).toBeUndefined(); + const firstData = JSON.parse(first.text); + + const second = await callTool(server, "grove_submit_work", args); + expect(second.isError).toBeUndefined(); + const secondData = JSON.parse(second.text); + + expect(secondData.cid).toBe(firstData.cid); + }); + + test("grove_submit_work: same key + different input returns STATE_CONFLICT", async () => { + const hash = await storeTestContent(deps.cas, "hello world"); + const first = await callTool(server, "grove_submit_work", { + summary: "First work", + artifacts: { "file.ts": hash }, + agent: { agentId: "coder-1", role: "coder" }, + idempotencyKey: "work-conflict-key", + }); + expect(first.isError).toBeUndefined(); + + const second = await callTool(server, "grove_submit_work", { + summary: "Different work, same key", + artifacts: { "file.ts": hash }, + agent: { agentId: "coder-1", role: "coder" }, + idempotencyKey: "work-conflict-key", + }); + expect(second.isError).toBe(true); + expect(second.text).toContain("STATE_CONFLICT"); + }); + + test("grove_submit_review: idempotency key flows through", async () => { + const target = makeContribution({ summary: "Target" }); + await deps.contributionStore.put(target); + + const args = { + targetCid: target.cid, + summary: "Review with idempotency", + scores: { quality: { value: 0.9, direction: "maximize" } }, + agent: { agentId: "reviewer-1", role: "reviewer" }, + idempotencyKey: "review-key-1", + }; + + const first = await callTool(server, "grove_submit_review", args); + expect(first.isError).toBeUndefined(); + const firstData = JSON.parse(first.text); + + const second = await callTool(server, "grove_submit_review", args); + expect(second.isError).toBeUndefined(); + const secondData = JSON.parse(second.text); + + expect(secondData.cid).toBe(firstData.cid); + }); + + test("grove_discuss: idempotency key flows through", async () => { + const args = { + summary: "Discussion with idempotency", + agent: { agentId: "agent-1", role: "architect" }, + idempotencyKey: "discuss-key-1", + }; + + const first = await callTool(server, "grove_discuss", args); + expect(first.isError).toBeUndefined(); + const firstData = JSON.parse(first.text); + + const second = await callTool(server, "grove_discuss", args); + expect(second.isError).toBeUndefined(); + const secondData = JSON.parse(second.text); + + expect(secondData.cid).toBe(firstData.cid); + }); + + test("grove_reproduce: idempotency key flows through", async () => { + const target = makeContribution({ summary: "Experiment" }); + await deps.contributionStore.put(target); + + const args = { + targetCid: target.cid, + summary: "Reproduction with idempotency", + result: "confirmed", + agent: { agentId: "repro-1", role: "reproducer" }, + artifacts: {}, + idempotencyKey: "repro-key-1", + }; + + const first = await callTool(server, "grove_reproduce", args); + expect(first.isError).toBeUndefined(); + const firstData = JSON.parse(first.text); + + const second = await callTool(server, "grove_reproduce", args); + expect(second.isError).toBeUndefined(); + const secondData = JSON.parse(second.text); + + expect(secondData.cid).toBe(firstData.cid); + }); + + test("grove_adopt: idempotency key flows through", async () => { + const target = makeContribution({ summary: "Work to adopt" }); + await deps.contributionStore.put(target); + + const args = { + targetCid: target.cid, + summary: "Adopting with idempotency", + agent: { agentId: "adopter-1", role: "adopter" }, + idempotencyKey: "adopt-key-1", + }; + + const first = await callTool(server, "grove_adopt", args); + expect(first.isError).toBeUndefined(); + const firstData = JSON.parse(first.text); + + const second = await callTool(server, "grove_adopt", args); + expect(second.isError).toBeUndefined(); + const secondData = JSON.parse(second.text); + + expect(secondData.cid).toBe(firstData.cid); + }); +}); diff --git a/src/mcp/tools/contributions.ts b/src/mcp/tools/contributions.ts index 1650c5aa..c34feb20 100644 --- a/src/mcp/tools/contributions.ts +++ b/src/mcp/tools/contributions.ts @@ -37,6 +37,30 @@ import { scoreSchema, } from "../schemas.js"; +// --------------------------------------------------------------------------- +// Shared idempotency field +// --------------------------------------------------------------------------- + +/** + * Optional client-supplied idempotency key, shared across all contribution tools. + * + * Follows HTTP Idempotency-Key semantics (Stripe / AWS / RFC draft): + * - same key + same input → cached result (retry) + * - same key + different input → STATE_CONFLICT error + * - scoped per agent role (two agents can use the same key) + * - TTL: 5 minutes from first use + */ +const idempotencyKeyField = { + idempotencyKey: z + .string() + .optional() + .describe( + "Client-supplied idempotency key for retry safety. " + + "Reusing the same key with the same input returns the cached result. " + + "Reusing the same key with different input returns a STATE_CONFLICT error.", + ), +} as const; + // --------------------------------------------------------------------------- // Input schemas // --------------------------------------------------------------------------- @@ -74,6 +98,7 @@ const submitWorkInputSchema = z.object({ .optional() .describe("Execution/evaluation context metadata (e.g., hardware, dataset)"), agent: agentSchema, + ...idempotencyKeyField, }); const submitReviewInputSchema = z.object({ @@ -94,6 +119,7 @@ const submitReviewInputSchema = z.object({ .record(z.string(), z.unknown()) .optional() .describe("Relation metadata attached to the 'reviews' edge (e.g., {score: 0.8})"), + ...idempotencyKeyField, }); const reproduceInputSchema = z.object({ @@ -119,6 +145,7 @@ const reproduceInputSchema = z.object({ tags: z.array(z.string()).optional().default([]).describe("Tags"), context: z.record(z.string(), z.unknown()).optional().describe("Context metadata"), agent: agentSchema, + ...idempotencyKeyField, }); const discussInputSchema = z.object({ @@ -131,6 +158,7 @@ const discussInputSchema = z.object({ tags: z.array(z.string()).optional().default([]).describe("Tags for channel semantics"), context: z.record(z.string(), z.unknown()).optional().describe("Context metadata"), agent: agentSchema, + ...idempotencyKeyField, }); const adoptInputSchema = z.object({ @@ -144,6 +172,7 @@ const adoptInputSchema = z.object({ tags: z.array(z.string()).optional().default([]).describe("Tags"), context: z.record(z.string(), z.unknown()).optional().describe("Context metadata"), agent: agentSchema, + ...idempotencyKeyField, }); // --------------------------------------------------------------------------- @@ -212,6 +241,7 @@ export function registerContributionTools(server: McpServer, deps: McpDeps): voi relations: args.relations as unknown as readonly Relation[], tags: args.tags, agent: withDefaultRole(args.agent as AgentOverrides), + ...(args.idempotencyKey !== undefined ? { idempotencyKey: args.idempotencyKey } : {}), }, opDeps, ); @@ -256,6 +286,7 @@ export function registerContributionTools(server: McpServer, deps: McpDeps): voi ...(args.metadata !== undefined ? { metadata: args.metadata as Readonly> } : {}), + ...(args.idempotencyKey !== undefined ? { idempotencyKey: args.idempotencyKey } : {}), }, opDeps, ); @@ -293,6 +324,7 @@ export function registerContributionTools(server: McpServer, deps: McpDeps): voi ...(args.context !== undefined ? { context: args.context as Readonly> } : {}), + ...(args.idempotencyKey !== undefined ? { idempotencyKey: args.idempotencyKey } : {}), }, opDeps, ); @@ -324,6 +356,7 @@ export function registerContributionTools(server: McpServer, deps: McpDeps): voi ...(args.context !== undefined ? { context: args.context as Readonly> } : {}), + ...(args.idempotencyKey !== undefined ? { idempotencyKey: args.idempotencyKey } : {}), }, opDeps, ); @@ -353,6 +386,7 @@ export function registerContributionTools(server: McpServer, deps: McpDeps): voi ...(args.context !== undefined ? { context: args.context as Readonly> } : {}), + ...(args.idempotencyKey !== undefined ? { idempotencyKey: args.idempotencyKey } : {}), }, opDeps, ); diff --git a/src/server/e2e.test.ts b/src/server/e2e.test.ts index 642960f9..baec3f3a 100644 --- a/src/server/e2e.test.ts +++ b/src/server/e2e.test.ts @@ -6,8 +6,9 @@ * (e.g., streaming, headers, content negotiation). */ -import { afterAll, beforeAll, describe, expect, it } from "bun:test"; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from "bun:test"; import { DefaultFrontierCalculator } from "../core/frontier.js"; +import { _resetIdempotencyCacheForTests } from "../core/operations/contribute.js"; import { InMemoryContributionStore } from "../core/testing.js"; import { createApp } from "./app.js"; import type { ServerDeps } from "./deps.js"; @@ -265,3 +266,77 @@ describe("E2E: validation errors", () => { expect(res.status).toBe(400); }); }); + +describe("E2E: Idempotency-Key header", () => { + beforeEach(() => { + _resetIdempotencyCacheForTests(); + }); + + afterEach(() => { + _resetIdempotencyCacheForTests(); + }); + + it("same Idempotency-Key + same body returns the same CID", async () => { + const body = makeManifestBody({ summary: "Idempotent HTTP contribution" }); + const key = "http-idem-key-1"; + + const first = await fetch(`${baseUrl}/api/contributions`, { + method: "POST", + headers: { "Content-Type": "application/json", "Idempotency-Key": key }, + body: JSON.stringify(body), + }); + expect(first.status).toBe(201); + const firstData = (await first.json()) as Json; + + const second = await fetch(`${baseUrl}/api/contributions`, { + method: "POST", + headers: { "Content-Type": "application/json", "Idempotency-Key": key }, + body: JSON.stringify(body), + }); + expect(second.status).toBe(201); + const secondData = (await second.json()) as Json; + + expect(secondData.cid).toBe(firstData.cid); + }); + + it("same Idempotency-Key + different body returns 409 STATE_CONFLICT", async () => { + const key = "http-conflict-key-1"; + + const first = await fetch(`${baseUrl}/api/contributions`, { + method: "POST", + headers: { "Content-Type": "application/json", "Idempotency-Key": key }, + body: JSON.stringify(makeManifestBody({ summary: "First submission" })), + }); + expect(first.status).toBe(201); + + const second = await fetch(`${baseUrl}/api/contributions`, { + method: "POST", + headers: { "Content-Type": "application/json", "Idempotency-Key": key }, + body: JSON.stringify(makeManifestBody({ summary: "Different submission, same key" })), + }); + expect(second.status).toBe(409); + const errorData = (await second.json()) as Json; + expect(errorData.error.code).toBe("STATE_CONFLICT"); + }); + + it("no Idempotency-Key header allows distinct contributions with different content", async () => { + const first = await fetch(`${baseUrl}/api/contributions`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(makeManifestBody({ summary: "First no-key contribution" })), + }); + expect(first.status).toBe(201); + const firstData = (await first.json()) as Json; + + const second = await fetch(`${baseUrl}/api/contributions`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(makeManifestBody({ summary: "Second no-key contribution" })), + }); + expect(second.status).toBe(201); + const secondData = (await second.json()) as Json; + + // Without a key, different content produces different CIDs (no blocking) + expect(secondData.cid).not.toBe(firstData.cid); + }); +}); diff --git a/src/server/routes/contributions.ts b/src/server/routes/contributions.ts index bb8a81c3..429fa243 100644 --- a/src/server/routes/contributions.ts +++ b/src/server/routes/contributions.ts @@ -227,6 +227,9 @@ contributions.post("/", async (c) => { artifacts[name] = contentHash; } + // Extract Idempotency-Key header (RFC 8284 / Stripe convention) + const idempotencyKey = c.req.header("idempotency-key"); + // Build operation input and delegate to shared operations layer const input: ContributeInput = { kind: parsed.kind, @@ -244,6 +247,7 @@ contributions.post("/", async (c) => { : {}), agent: parsed.agent, ...(parsed.createdAt !== undefined ? { createdAt: parsed.createdAt } : {}), + ...(idempotencyKey !== undefined ? { idempotencyKey } : {}), }; let opDeps = toOperationDeps(serverDeps); From e225d4f7f128c68da7e0fa8821234de9b0283943 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Mon, 13 Apr 2026 23:42:56 -0700 Subject: [PATCH 2/9] fix: persist idempotency cache to SQLite for cross-process CLI retries The in-memory idempotency cache was invisible across CLI invocations because each `grove contribute` spawns a new process with an empty Map. This made `--idempotency-key` silently useless for CLI users. - Add `idempotency_keys` table to SQLite schema (DDL in SCHEMA_DDL) - Add `SqliteIdempotencyStore` class with lookup/store/clear - Add `IdempotencyStore` interface to `OperationDeps` - `contributeOperation` checks SQLite on in-memory miss, writes to both layers after commit - Wire `SqliteIdempotencyStore` through CLI `executeContribute` - In-memory Map retained for single-flight (pending Promise coalescence) Verified: 5/5 cross-process E2E tests pass (retry, conflict, agent scoping, no-key baseline). 1143 unit tests pass, 0 regressions. --- docs/idempotency.md | 2 +- src/cli/commands/contribute.ts | 7 ++-- src/core/operations/contribute.ts | 55 +++++++++++++++++++++++--- src/core/operations/deps.ts | 22 +++++++++++ src/core/operations/test-helpers.ts | 3 ++ src/local/sqlite-store.ts | 60 +++++++++++++++++++++++++++++ 6 files changed, 140 insertions(+), 9 deletions(-) diff --git a/docs/idempotency.md b/docs/idempotency.md index 99ebe33d..49e0fbcb 100644 --- a/docs/idempotency.md +++ b/docs/idempotency.md @@ -53,7 +53,7 @@ grove contribute --summary "Fix parser" --idempotency-key my-unique-key - **Scope**: Keys are namespaced per agent (`agent.role` if set, otherwise `agent.agentId`). Two different agents can use the same key without colliding. - **TTL**: Cached results expire after **5 minutes**. After expiry, the key can be reused. - **Cache size**: Up to 1024 entries (LRU eviction when full). -- **Process-local**: The cache is in-memory and not shared across processes. Clients running multiple grove instances must coordinate keys externally. +- **Persistent**: The cache is backed by SQLite (`idempotency_keys` table in `grove.db`), so keys survive across CLI process restarts. An in-memory layer provides single-flight deduplication within a single process. - **Fingerprint coverage**: The conflict check hashes `kind`, `mode`, `summary`, `description`, `artifacts` (name + hash), `relations`, `scores`, `tags`, `context`, and agent scope. Any difference in these fields triggers `STATE_CONFLICT` on key reuse. ## Key format diff --git a/src/cli/commands/contribute.ts b/src/cli/commands/contribute.ts index 1f99f895..0f450fed 100644 --- a/src/cli/commands/contribute.ts +++ b/src/cli/commands/contribute.ts @@ -308,9 +308,8 @@ export async function executeContribute(options: ContributeOptions): Promise<{ c } // Dynamic imports for lazy loading - const { SqliteContributionStore, SqliteClaimStore, initSqliteDb } = await import( - "../../local/sqlite-store.js" - ); + const { SqliteContributionStore, SqliteClaimStore, SqliteIdempotencyStore, initSqliteDb } = + await import("../../local/sqlite-store.js"); const { FsCas } = await import("../../local/fs-cas.js"); const { DefaultFrontierCalculator } = await import("../../core/frontier.js"); const { parseGroveContract } = await import("../../core/contract.js"); @@ -321,6 +320,7 @@ export async function executeContribute(options: ContributeOptions): Promise<{ c const db = initSqliteDb(dbPath); const rawStore = new SqliteContributionStore(db); const claimStore = new SqliteClaimStore(db); + const idempotencyStore = new SqliteIdempotencyStore(db); const cas = new FsCas(casPath); const frontier = new DefaultFrontierCalculator(rawStore); @@ -476,6 +476,7 @@ export async function executeContribute(options: ContributeOptions): Promise<{ c claimStore, cas, frontier, + idempotencyStore, ...(contract !== undefined ? { contract } : {}), }; diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index 2fbd61f9..836fa80d 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -725,9 +725,12 @@ export async function contributeOperation( input.idempotencyKey !== undefined ? idempotencyCacheKey(idempotencyAgentScope, input.idempotencyKey) : undefined; + // Hoisted so it's available at the durable-commit boundary for + // persistent store writes. + let idempotencyFingerprint: string | undefined; if (idempotencyCacheLookupKey !== undefined) { - const fingerprint = computeIdempotencyFingerprint(input, agent); - const cached = lookupIdempotency(idempotencyCacheLookupKey, fingerprint, Date.now()); + idempotencyFingerprint = computeIdempotencyFingerprint(input, agent); + const cached = lookupIdempotency(idempotencyCacheLookupKey, idempotencyFingerprint, Date.now()); if (cached !== undefined) { if (cached.type === "pending") { // Concurrent caller: await their write and return the same result. @@ -743,9 +746,33 @@ export async function contributeOperation( details: { idempotencyKey: input.idempotencyKey }, }); } - // Miss: synchronously reserve the slot. Subsequent concurrent - // callers observe the pending Promise and await it. - idempotencySlot = reserveIdempotencySlot(idempotencyCacheLookupKey, fingerprint, Date.now()); + + // In-memory miss: check persistent store for cross-process hits. + if (deps.idempotencyStore !== undefined) { + const persisted = deps.idempotencyStore.lookup( + idempotencyCacheLookupKey, + IDEMPOTENCY_TTL_MS, + ); + if (persisted !== undefined) { + if (persisted.fingerprint !== idempotencyFingerprint) { + return err({ + code: OperationErrorCode.StateConflict, + message: + "Idempotency key was previously used with a different request body. " + + "Reusing the same key with different input is rejected to prevent silent " + + "write divergence. Use a new key for the new intent.", + details: { idempotencyKey: input.idempotencyKey }, + }); + } + // Same key + same fingerprint from a previous process: return cached. + const result = JSON.parse(persisted.resultJson) as ContributeResult; + return ok(result); + } + } + + // Miss in both layers: synchronously reserve the in-memory slot. + // Subsequent concurrent callers observe the pending Promise and await it. + idempotencySlot = reserveIdempotencySlot(idempotencyCacheLookupKey, idempotencyFingerprint, Date.now()); } const contributionInput: ContributionInput = { @@ -921,6 +948,24 @@ export async function contributeOperation( // durably written; retries with the same key must return this cached // result, NOT re-run the write path. idempotencySlot = undefined; + + // Persist to durable store so cross-process retries (e.g., separate + // CLI invocations) can find this entry after the process exits. + if ( + deps.idempotencyStore !== undefined && + idempotencyCacheLookupKey !== undefined && + idempotencyFingerprint !== undefined + ) { + try { + deps.idempotencyStore.store( + idempotencyCacheLookupKey, + idempotencyFingerprint, + JSON.stringify(committedResult), + ); + } catch { + // Best-effort — don't fail the contribution over a cache write. + } + } } // Post-write callbacks — wrapped so a throw cannot escape and undo diff --git a/src/core/operations/deps.ts b/src/core/operations/deps.ts index 4308e9aa..b7e63ff9 100644 --- a/src/core/operations/deps.ts +++ b/src/core/operations/deps.ts @@ -18,6 +18,26 @@ import type { ClaimStore, ContributionStore } from "../store.js"; import type { TopologyRouter } from "../topology-router.js"; import type { WorkspaceManager } from "../workspace.js"; +/** + * Persistent idempotency store for cross-process deduplication. + * + * The in-memory Map in contribute.ts handles single-flight within a single + * process (pending Promise coalescence). This store handles the case where + * separate CLI invocations hit the same key — without it, each process + * starts with an empty Map and the key is silently ignored. + */ +export interface IdempotencyStore { + /** Look up an unexpired entry. Returns fingerprint + serialized result, or undefined. */ + lookup( + cacheKey: string, + ttlMs: number, + ): { readonly fingerprint: string; readonly resultJson: string } | undefined; + /** Persist a completed entry. */ + store(cacheKey: string, fingerprint: string, resultJson: string): void; + /** Remove all entries (testing only). */ + clear(): void; +} + /** * Dependencies required by operations. * @@ -50,4 +70,6 @@ export interface OperationDeps { readonly hookRunner?: HookRunner | undefined; /** Working directory for hook execution. */ readonly hookCwd?: string | undefined; + /** Persistent idempotency store for cross-process deduplication. */ + readonly idempotencyStore?: IdempotencyStore | undefined; } diff --git a/src/core/operations/test-helpers.ts b/src/core/operations/test-helpers.ts index 7516d39e..59d1c602 100644 --- a/src/core/operations/test-helpers.ts +++ b/src/core/operations/test-helpers.ts @@ -14,6 +14,7 @@ import { initSqliteDb, SqliteClaimStore, SqliteContributionStore, + SqliteIdempotencyStore, } from "../../local/sqlite-store.js"; import { LocalWorkspaceManager } from "../../local/workspace.js"; import type { ContentStore } from "../cas.js"; @@ -123,6 +124,7 @@ export async function createTestOperationDeps(): Promise { }); const handoffStore = new InMemoryHandoffStore(); + const idempotencyStore = new SqliteIdempotencyStore(db); const deps: FullOperationDeps = { contributionStore, @@ -133,6 +135,7 @@ export async function createTestOperationDeps(): Promise { frontier, workspace, handoffStore, + idempotencyStore, contract: undefined as unknown as GroveContract, outcomeStore: undefined as unknown as OutcomeStore, onContributionWrite: () => { diff --git a/src/local/sqlite-store.ts b/src/local/sqlite-store.ts index ef927ca4..b096c31a 100644 --- a/src/local/sqlite-store.ts +++ b/src/local/sqlite-store.ts @@ -153,6 +153,16 @@ const SCHEMA_DDL = ` CREATE INDEX IF NOT EXISTS idx_workspaces_status ON workspaces(status); CREATE INDEX IF NOT EXISTS idx_workspaces_activity ON workspaces(last_activity_at); + + -- Idempotency cache: persists across process restarts so CLI retries work. + -- The in-memory Map in contribute.ts handles single-flight within a process; + -- this table handles cross-process deduplication. + CREATE TABLE IF NOT EXISTS idempotency_keys ( + cache_key TEXT PRIMARY KEY, + fingerprint TEXT NOT NULL, + result_json TEXT NOT NULL, + stored_at INTEGER NOT NULL + ); `; const FTS_DDL = ` @@ -419,6 +429,54 @@ export function initSqliteDb(dbPath: string): Database { return db; } +// --------------------------------------------------------------------------- +// SqliteIdempotencyStore +// --------------------------------------------------------------------------- + +/** + * SQLite-backed idempotency store for cross-process deduplication. + * + * Complements the in-memory Map in contribute.ts: the Map provides + * single-flight (pending Promise coalescence) within a process; this + * store provides durable lookup so a CLI retry in a new process can + * detect that a key was already used. + */ +export class SqliteIdempotencyStore { + private readonly db: Database; + private readonly lookupStmt: Statement; + private readonly upsertStmt: Statement; + + constructor(db: Database) { + this.db = db; + this.lookupStmt = db.prepare( + "SELECT fingerprint, result_json FROM idempotency_keys WHERE cache_key = ? AND stored_at > ?", + ); + this.upsertStmt = db.prepare( + "INSERT OR REPLACE INTO idempotency_keys (cache_key, fingerprint, result_json, stored_at) VALUES (?, ?, ?, ?)", + ); + } + + lookup( + cacheKey: string, + ttlMs: number, + ): { readonly fingerprint: string; readonly resultJson: string } | undefined { + const cutoff = Date.now() - ttlMs; + const row = this.lookupStmt.get(cacheKey, cutoff) as + | { fingerprint: string; result_json: string } + | null; + if (row === null) return undefined; + return { fingerprint: row.fingerprint, resultJson: row.result_json }; + } + + store(cacheKey: string, fingerprint: string, resultJson: string): void { + this.upsertStmt.run(cacheKey, fingerprint, resultJson, Date.now()); + } + + clear(): void { + this.db.run("DELETE FROM idempotency_keys"); + } +} + /** * Convenience factory that creates a shared Database and returns both stores. * The returned close() disposes the database connection. @@ -431,6 +489,7 @@ export function createSqliteStores(dbPath: string): { outcomeStore: SqliteOutcomeStore; goalSessionStore: SqliteGoalSessionStore; handoffStore: SqliteHandoffStore; + idempotencyStore: SqliteIdempotencyStore; close: () => void; } { const db = initSqliteDb(dbPath); @@ -442,6 +501,7 @@ export function createSqliteStores(dbPath: string): { outcomeStore: new SqliteOutcomeStore(db), goalSessionStore: new SqliteGoalSessionStore(db), handoffStore: new SqliteHandoffStore(db), + idempotencyStore: new SqliteIdempotencyStore(db), close: () => { db.run("PRAGMA optimize"); db.close(); From 110524d8b606303f259c20a2f84c03e257a9f2de Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 11:18:25 -0700 Subject: [PATCH 3/9] fix: wire LocalEventBus + handoff store in serve-http.ts for local mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit serve-http.ts only created a TopologyRouter when nexusClient was present, leaving local-mode MCP servers without topology routing or handoffs. This mirrored the serve.ts pattern: if topology exists, use NexusEventBus when available, otherwise fall back to LocalEventBus. Also wires runtime.handoffStore as fallback when nexusHandoffStore is unavailable, and passes eventBus into McpDeps. Verified: 7/7 E2E tests pass — coder submits work via MCP, handoff created to reviewer, idempotent retry returns cached CID with no duplicate handoff, STATE_CONFLICT on key reuse, reviewer review auto-transitions handoff to replied. --- src/mcp/serve-http.ts | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/mcp/serve-http.ts b/src/mcp/serve-http.ts index 4ab000cc..379b90c3 100644 --- a/src/mcp/serve-http.ts +++ b/src/mcp/serve-http.ts @@ -294,9 +294,18 @@ async function buildScopedDeps(sessionId: string | undefined): Promise Date: Tue, 14 Apr 2026 12:46:33 -0700 Subject: [PATCH 4/9] style: fix biome formatting (line length) in contribute.ts and sqlite-store.ts --- src/core/operations/contribute.ts | 12 ++++++++++-- src/local/sqlite-store.ts | 7 ++++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index 836fa80d..178a38c6 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -730,7 +730,11 @@ export async function contributeOperation( let idempotencyFingerprint: string | undefined; if (idempotencyCacheLookupKey !== undefined) { idempotencyFingerprint = computeIdempotencyFingerprint(input, agent); - const cached = lookupIdempotency(idempotencyCacheLookupKey, idempotencyFingerprint, Date.now()); + const cached = lookupIdempotency( + idempotencyCacheLookupKey, + idempotencyFingerprint, + Date.now(), + ); if (cached !== undefined) { if (cached.type === "pending") { // Concurrent caller: await their write and return the same result. @@ -772,7 +776,11 @@ export async function contributeOperation( // Miss in both layers: synchronously reserve the in-memory slot. // Subsequent concurrent callers observe the pending Promise and await it. - idempotencySlot = reserveIdempotencySlot(idempotencyCacheLookupKey, idempotencyFingerprint, Date.now()); + idempotencySlot = reserveIdempotencySlot( + idempotencyCacheLookupKey, + idempotencyFingerprint, + Date.now(), + ); } const contributionInput: ContributionInput = { diff --git a/src/local/sqlite-store.ts b/src/local/sqlite-store.ts index b096c31a..b750ea4a 100644 --- a/src/local/sqlite-store.ts +++ b/src/local/sqlite-store.ts @@ -461,9 +461,10 @@ export class SqliteIdempotencyStore { ttlMs: number, ): { readonly fingerprint: string; readonly resultJson: string } | undefined { const cutoff = Date.now() - ttlMs; - const row = this.lookupStmt.get(cacheKey, cutoff) as - | { fingerprint: string; result_json: string } - | null; + const row = this.lookupStmt.get(cacheKey, cutoff) as { + fingerprint: string; + result_json: string; + } | null; if (row === null) return undefined; return { fingerprint: row.fingerprint, resultJson: row.result_json }; } From be95ff66b224cf925ba12e73612c4270bd540929 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 14:19:57 -0700 Subject: [PATCH 5/9] fix: atomic idempotency writes + protect pending slots from eviction Two findings from adversarial review: 1. Durable idempotency row is now written inside the same SQLite transaction as the contribution (via onCommit callback in writeContributionWithHandoffs). Closes the crash window where a contribution could be committed without its idempotency record. 2. Pending in-memory cache entries are no longer TTL-expired or LRU-evicted. A slow write (>5min) no longer lets a retry bypass single-flight and start a duplicate write. --- src/core/operations/contribute.ts | 71 +++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 8 deletions(-) diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index 178a38c6..fd55b32e 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -419,7 +419,10 @@ function lookupIdempotency( | undefined { const entry = idempotencyCache.get(cacheKey); if (entry === undefined) return undefined; - if (now - entry.storedAt > IDEMPOTENCY_TTL_MS) { + // Never TTL-expire a pending entry — the in-flight write may still be + // running (slow CAS, hooks, etc). Expiring it would let a retry start + // a second write instead of awaiting the first. + if (now - entry.storedAt > IDEMPOTENCY_TTL_MS && entry.pending === undefined) { idempotencyCache.delete(cacheKey); return undefined; } @@ -458,10 +461,16 @@ function reserveIdempotencySlot( readonly resolve: (result: OperationResult) => void; readonly release: () => void; } { - // Evict the oldest entry if at capacity. + // Evict the oldest non-pending entry if at capacity. Pending entries + // represent in-flight writes — evicting them would break single-flight + // and let a retry start a duplicate write. if (idempotencyCache.size >= IDEMPOTENCY_MAX_ENTRIES) { - const oldest = idempotencyCache.keys().next().value; - if (oldest !== undefined) idempotencyCache.delete(oldest); + for (const [key, entry] of idempotencyCache) { + if (entry.pending === undefined) { + idempotencyCache.delete(key); + break; + } + } } let resolver!: (result: OperationResult) => void; @@ -523,6 +532,7 @@ async function writeAtomic( agentRole: string, putWithCowrite: (c: Contribution, fn: () => void) => void | Promise, insertSync: (input: HandoffInput) => string, + onCommit?: () => void, ): Promise { const handoffIds: string[] = []; const maybePromise = putWithCowrite(contribution, () => { @@ -535,6 +545,11 @@ async function writeAtomic( }); if (hid !== undefined) handoffIds.push(hid); } + // Write idempotency row inside the same SQLite transaction so a + // crash between contribution commit and idempotency store write + // cannot leave an orphaned contribution without its idempotency + // record. + onCommit?.(); }); if ( maybePromise !== undefined && @@ -564,8 +579,14 @@ async function writeSerial( agentRole: string | undefined, store: ContributionStore, handoffStore: HandoffStore | undefined, + onCommit?: () => void, ): Promise { await store.put(contribution); + // For non-atomic stores (Nexus, in-memory), write the idempotency row + // immediately after the contribution commit. Not fully crash-safe (the + // contribution and idempotency row are separate writes), but the window + // is minimal and matches the existing handoff best-effort pattern. + onCommit?.(); const handoffIds: string[] = []; if (handoffStore === undefined || routedTo === undefined || agentRole === undefined) { @@ -628,6 +649,7 @@ async function writeContributionWithHandoffs( agentRole: string | undefined, store: ContributionStore, handoffStore: HandoffStore | undefined, + onCommit?: () => void, ): Promise { const needsHandoffs = handoffStore !== undefined && @@ -649,11 +671,12 @@ async function writeContributionWithHandoffs( agentRole, cowriteStore.putWithCowrite.bind(cowriteStore), sqliteHandoffStore.insertSync.bind(sqliteHandoffStore), + onCommit, ); } } - return writeSerial(contribution, routedTo, agentRole, store, handoffStore); + return writeSerial(contribution, routedTo, agentRole, store, handoffStore, onCommit); } // --------------------------------------------------------------------------- @@ -907,12 +930,42 @@ export async function contributeOperation( // undefined as the routing-target list to the writer. const agentRole = contribution.agent.role; const handoffsRoutedTo = skipHandoffs ? undefined : routedTo; + // Build onCommit callback for atomic idempotency-store write. + // Runs inside the SQLite transaction (atomic path) or immediately + // after store.put() (serial path), closing the crash window between + // contribution commit and idempotency record. + const idempotencyOnCommit = + deps.idempotencyStore !== undefined && + idempotencyCacheLookupKey !== undefined && + idempotencyFingerprint !== undefined + ? () => { + // Build a minimal result from the contribution (routedTo and + // handoffIds are populated later, but the CID is the critical + // dedup signal — retries just need to know the write happened). + const earlyResult: ContributeResult = { + cid: contribution.cid, + kind: contribution.kind, + mode: contribution.mode, + summary: contribution.summary, + artifactCount: Object.keys(contribution.artifacts).length, + relationCount: contribution.relations.length, + createdAt: contribution.createdAt, + }; + deps.idempotencyStore!.store( + idempotencyCacheLookupKey!, + idempotencyFingerprint!, + JSON.stringify(earlyResult), + ); + } + : undefined; + const handoffIds = await writeContributionWithHandoffs( contribution, handoffsRoutedTo, agentRole, deps.contributionStore, deps.handoffStore, + idempotencyOnCommit, ); // ┌──────────────────────────────────────────────────────────────────┐ @@ -957,8 +1010,10 @@ export async function contributeOperation( // result, NOT re-run the write path. idempotencySlot = undefined; - // Persist to durable store so cross-process retries (e.g., separate - // CLI invocations) can find this entry after the process exits. + // Update the durable idempotency row with the full result (includes + // routedTo, handoffIds, policy which weren't available at commit time). + // This is an UPDATE, not an INSERT — the row was created atomically + // with the contribution inside the write transaction. if ( deps.idempotencyStore !== undefined && idempotencyCacheLookupKey !== undefined && @@ -971,7 +1026,7 @@ export async function contributeOperation( JSON.stringify(committedResult), ); } catch { - // Best-effort — don't fail the contribution over a cache write. + // The early result from onCommit is sufficient for dedup. } } } From 756a53cc246f31ef52ceaed0671e994d2cd163cb Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 14:30:33 -0700 Subject: [PATCH 6/9] fix: durable idempotency reservation + session-scoped HTTP keys MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adversarial review round 2 findings: 1. Cross-process race: two processes could both miss the SQLite lookup and both commit. Now uses INSERT OR IGNORE reservation before the write — only one process wins. Losers detect the conflict via a follow-up lookup and return the winner's CID or STATE_CONFLICT. 2. HTTP session leakage: idempotency keys were not scoped by sessionId, so a cached CID from session A could bypass session B's policy. Now appends sessionId to the key when present. --- src/core/operations/contribute.ts | 42 +++++++++++++++++++++++++++++- src/core/operations/deps.ts | 8 +++++- src/local/sqlite-store.ts | 34 +++++++++++++++++++++--- src/server/routes/contributions.ts | 11 +++++++- 4 files changed, 88 insertions(+), 7 deletions(-) diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index fd55b32e..8dd39af7 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -797,7 +797,47 @@ export async function contributeOperation( } } - // Miss in both layers: synchronously reserve the in-memory slot. + // Durably reserve the key in SQLite before starting the write. + // INSERT OR IGNORE ensures only one process wins the reservation; + // a concurrent process that also passed the lookup will fail here + // and detect the conflict on retry. + if (deps.idempotencyStore !== undefined) { + const reserved = deps.idempotencyStore.reserve( + idempotencyCacheLookupKey, + idempotencyFingerprint, + ); + if (!reserved) { + // Another process reserved this key between our lookup and now. + // Re-check the store to determine if it's same-fingerprint (retry) + // or different-fingerprint (conflict). + const existing = deps.idempotencyStore.lookup( + idempotencyCacheLookupKey, + IDEMPOTENCY_TTL_MS, + ); + if (existing !== undefined) { + if (existing.fingerprint !== idempotencyFingerprint) { + return err({ + code: OperationErrorCode.StateConflict, + message: + "Idempotency key was previously used with a different request body. " + + "Reusing the same key with different input is rejected to prevent silent " + + "write divergence. Use a new key for the new intent.", + details: { idempotencyKey: input.idempotencyKey }, + }); + } + // Same fingerprint — another process is writing or has written. + // Return its result if available, otherwise let this process + // proceed (the duplicate contribution is a smaller cost than + // blocking indefinitely on an unknown process). + if (existing.resultJson !== "{}") { + const result = JSON.parse(existing.resultJson) as ContributeResult; + return ok(result); + } + } + } + } + + // Reserve the in-memory slot for single-flight within this process. // Subsequent concurrent callers observe the pending Promise and await it. idempotencySlot = reserveIdempotencySlot( idempotencyCacheLookupKey, diff --git a/src/core/operations/deps.ts b/src/core/operations/deps.ts index b7e63ff9..421d2bee 100644 --- a/src/core/operations/deps.ts +++ b/src/core/operations/deps.ts @@ -32,7 +32,13 @@ export interface IdempotencyStore { cacheKey: string, ttlMs: number, ): { readonly fingerprint: string; readonly resultJson: string } | undefined; - /** Persist a completed entry. */ + /** + * Durably reserve a key before starting the write. Returns true if this + * process won the reservation, false if another process already holds it. + * Uses INSERT OR IGNORE so concurrent callers cannot both succeed. + */ + reserve(cacheKey: string, fingerprint: string): boolean; + /** Update an already-reserved entry with the final result. */ store(cacheKey: string, fingerprint: string, resultJson: string): void; /** Remove all entries (testing only). */ clear(): void; diff --git a/src/local/sqlite-store.ts b/src/local/sqlite-store.ts index b750ea4a..5fba3678 100644 --- a/src/local/sqlite-store.ts +++ b/src/local/sqlite-store.ts @@ -444,15 +444,23 @@ export function initSqliteDb(dbPath: string): Database { export class SqliteIdempotencyStore { private readonly db: Database; private readonly lookupStmt: Statement; - private readonly upsertStmt: Statement; + private readonly reserveStmt: Statement; + private readonly updateStmt: Statement; constructor(db: Database) { this.db = db; this.lookupStmt = db.prepare( "SELECT fingerprint, result_json FROM idempotency_keys WHERE cache_key = ? AND stored_at > ?", ); - this.upsertStmt = db.prepare( - "INSERT OR REPLACE INTO idempotency_keys (cache_key, fingerprint, result_json, stored_at) VALUES (?, ?, ?, ?)", + // INSERT OR IGNORE: if a concurrent process already reserved this key, + // the insert silently fails and the caller detects the conflict via a + // follow-up lookup. This prevents the INSERT OR REPLACE race where two + // processes both miss the lookup and both succeed in writing. + this.reserveStmt = db.prepare( + "INSERT OR IGNORE INTO idempotency_keys (cache_key, fingerprint, result_json, stored_at) VALUES (?, ?, ?, ?)", + ); + this.updateStmt = db.prepare( + "UPDATE idempotency_keys SET result_json = ?, stored_at = ? WHERE cache_key = ? AND fingerprint = ?", ); } @@ -469,8 +477,26 @@ export class SqliteIdempotencyStore { return { fingerprint: row.fingerprint, resultJson: row.result_json }; } + reserve(cacheKey: string, fingerprint: string): boolean { + // INSERT OR IGNORE: if another process already holds this key, the + // insert silently fails. Check via a follow-up SELECT whether the + // row we inserted is ours (fingerprint matches and result is the + // placeholder). + this.reserveStmt.run(cacheKey, fingerprint, "{}", Date.now()); + const row = this.lookupStmt.get(cacheKey, 0) as { + fingerprint: string; + result_json: string; + } | null; + // Reserved successfully if the row exists with our fingerprint and + // still has the placeholder result (no other process has finalized it). + return row !== null && row.fingerprint === fingerprint; + } + store(cacheKey: string, fingerprint: string, resultJson: string): void { - this.upsertStmt.run(cacheKey, fingerprint, resultJson, Date.now()); + // Update the reserved row with the full result. Only affects rows + // where both cache_key AND fingerprint match, so a concurrent + // different-fingerprint reservation is not overwritten. + this.updateStmt.run(resultJson, Date.now(), cacheKey, fingerprint); } clear(): void { diff --git a/src/server/routes/contributions.ts b/src/server/routes/contributions.ts index 429fa243..593e3795 100644 --- a/src/server/routes/contributions.ts +++ b/src/server/routes/contributions.ts @@ -247,7 +247,16 @@ contributions.post("/", async (c) => { : {}), agent: parsed.agent, ...(parsed.createdAt !== undefined ? { createdAt: parsed.createdAt } : {}), - ...(idempotencyKey !== undefined ? { idempotencyKey } : {}), + // Scope idempotency key by sessionId so the same key in different + // sessions is treated as distinct — prevents cross-session cache + // contamination and policy bypass. + ...(idempotencyKey !== undefined + ? { + idempotencyKey: parsed.sessionId + ? `${idempotencyKey}\x01${parsed.sessionId}` + : idempotencyKey, + } + : {}), }; let opDeps = toOperationDeps(serverDeps); From e33e4c6d34c58c6eeba954eb4a522e15debbd96b Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 14:40:34 -0700 Subject: [PATCH 7/9] fix: pending/committed state machine + wire idempotencyStore to all surfaces MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adversarial review round 3 findings: 1. Durable reservation now uses pending/committed states. Pending rows are never returned as successful results — callers get a retryable STATE_CONFLICT instead. Eliminates the {} placeholder parse bug and prevents cross-process duplicate writes. 2. IdempotencyStore now wired through all surfaces: - Server (serve.ts) via ServerDeps + toOperationDeps - MCP stdio (serve.ts) via McpDeps - MCP HTTP (serve-http.ts) via McpDeps - CLI (contribute.ts) — already wired All surfaces now use durable SQLite reservation. --- src/core/operations/contribute.ts | 63 +++++++++++++------------------ src/core/operations/deps.ts | 10 ++++- src/local/runtime.ts | 8 +++- src/local/sqlite-store.ts | 45 +++++++++++----------- src/mcp/operation-adapter.ts | 1 + src/mcp/serve-http.ts | 1 + src/mcp/serve.ts | 1 + src/server/deps.ts | 3 ++ src/server/operation-adapter.ts | 1 + src/server/serve.ts | 1 + 10 files changed, 70 insertions(+), 64 deletions(-) diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index 8dd39af7..06dd1107 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -791,49 +791,38 @@ export async function contributeOperation( details: { idempotencyKey: input.idempotencyKey }, }); } - // Same key + same fingerprint from a previous process: return cached. - const result = JSON.parse(persisted.resultJson) as ContributeResult; - return ok(result); + if (persisted.status === "committed") { + // Same key + same fingerprint, already completed: return cached. + const result = JSON.parse(persisted.resultJson) as ContributeResult; + return ok(result); + } + // status === "pending": another process is mid-write. Return a + // retryable error rather than proceeding with a duplicate write + // or returning the placeholder result. + return err({ + code: OperationErrorCode.StateConflict, + message: + "Idempotency key is currently being processed by another request. " + + "Retry after a short delay.", + details: { idempotencyKey: input.idempotencyKey, retryable: true }, + }); } - } - // Durably reserve the key in SQLite before starting the write. - // INSERT OR IGNORE ensures only one process wins the reservation; - // a concurrent process that also passed the lookup will fail here - // and detect the conflict on retry. - if (deps.idempotencyStore !== undefined) { + // No existing row: durably reserve before writing. const reserved = deps.idempotencyStore.reserve( idempotencyCacheLookupKey, idempotencyFingerprint, ); if (!reserved) { - // Another process reserved this key between our lookup and now. - // Re-check the store to determine if it's same-fingerprint (retry) - // or different-fingerprint (conflict). - const existing = deps.idempotencyStore.lookup( - idempotencyCacheLookupKey, - IDEMPOTENCY_TTL_MS, - ); - if (existing !== undefined) { - if (existing.fingerprint !== idempotencyFingerprint) { - return err({ - code: OperationErrorCode.StateConflict, - message: - "Idempotency key was previously used with a different request body. " + - "Reusing the same key with different input is rejected to prevent silent " + - "write divergence. Use a new key for the new intent.", - details: { idempotencyKey: input.idempotencyKey }, - }); - } - // Same fingerprint — another process is writing or has written. - // Return its result if available, otherwise let this process - // proceed (the duplicate contribution is a smaller cost than - // blocking indefinitely on an unknown process). - if (existing.resultJson !== "{}") { - const result = JSON.parse(existing.resultJson) as ContributeResult; - return ok(result); - } - } + // Lost the race to another process that reserved between our + // lookup and this INSERT OR IGNORE. Return retryable error. + return err({ + code: OperationErrorCode.StateConflict, + message: + "Idempotency key is currently being processed by another request. " + + "Retry after a short delay.", + details: { idempotencyKey: input.idempotencyKey, retryable: true }, + }); } } @@ -991,7 +980,7 @@ export async function contributeOperation( relationCount: contribution.relations.length, createdAt: contribution.createdAt, }; - deps.idempotencyStore!.store( + deps.idempotencyStore?.store( idempotencyCacheLookupKey!, idempotencyFingerprint!, JSON.stringify(earlyResult), diff --git a/src/core/operations/deps.ts b/src/core/operations/deps.ts index 421d2bee..83c2d14f 100644 --- a/src/core/operations/deps.ts +++ b/src/core/operations/deps.ts @@ -27,11 +27,17 @@ import type { WorkspaceManager } from "../workspace.js"; * starts with an empty Map and the key is silently ignored. */ export interface IdempotencyStore { - /** Look up an unexpired entry. Returns fingerprint + serialized result, or undefined. */ + /** Look up an unexpired entry. Returns fingerprint, result, and status. */ lookup( cacheKey: string, ttlMs: number, - ): { readonly fingerprint: string; readonly resultJson: string } | undefined; + ): + | { + readonly fingerprint: string; + readonly resultJson: string; + readonly status: string; + } + | undefined; /** * Durably reserve a key before starting the write. Returns true if this * process won the reservation, false if another process already holds it. diff --git a/src/local/runtime.ts b/src/local/runtime.ts index b3835225..b28703a6 100644 --- a/src/local/runtime.ts +++ b/src/local/runtime.ts @@ -17,7 +17,11 @@ import { FsCas } from "./fs-cas.js"; import type { SqliteBountyStore } from "./sqlite-bounty-store.js"; import type { SqliteGoalSessionStore } from "./sqlite-goal-session-store.js"; import type { SqliteOutcomeStore } from "./sqlite-outcome-store.js"; -import type { SqliteClaimStore, SqliteContributionStore } from "./sqlite-store.js"; +import type { + SqliteClaimStore, + SqliteContributionStore, + SqliteIdempotencyStore, +} from "./sqlite-store.js"; import { createSqliteStores } from "./sqlite-store.js"; import { LocalWorkspaceManager } from "./workspace.js"; @@ -45,6 +49,7 @@ export interface LocalRuntime { readonly outcomeStore: SqliteOutcomeStore; readonly goalSessionStore: SqliteGoalSessionStore; readonly handoffStore: import("./sqlite-handoff-store.js").SqliteHandoffStore; + readonly idempotencyStore: SqliteIdempotencyStore; readonly cas: FsCas; readonly frontier: FrontierCalculator; readonly workspace: LocalWorkspaceManager | undefined; @@ -136,6 +141,7 @@ export function createLocalRuntime(options: LocalRuntimeOptions): LocalRuntime { outcomeStore: stores.outcomeStore, goalSessionStore: stores.goalSessionStore, handoffStore: stores.handoffStore, + idempotencyStore: stores.idempotencyStore, cas, frontier, workspace, diff --git a/src/local/sqlite-store.ts b/src/local/sqlite-store.ts index 5fba3678..4e548bf4 100644 --- a/src/local/sqlite-store.ts +++ b/src/local/sqlite-store.ts @@ -157,10 +157,12 @@ const SCHEMA_DDL = ` -- Idempotency cache: persists across process restarts so CLI retries work. -- The in-memory Map in contribute.ts handles single-flight within a process; -- this table handles cross-process deduplication. + -- status: 'pending' (reservation, write in progress) or 'committed' (write done). CREATE TABLE IF NOT EXISTS idempotency_keys ( cache_key TEXT PRIMARY KEY, fingerprint TEXT NOT NULL, - result_json TEXT NOT NULL, + result_json TEXT NOT NULL DEFAULT '{}', + status TEXT NOT NULL DEFAULT 'pending', stored_at INTEGER NOT NULL ); `; @@ -445,58 +447,53 @@ export class SqliteIdempotencyStore { private readonly db: Database; private readonly lookupStmt: Statement; private readonly reserveStmt: Statement; - private readonly updateStmt: Statement; + private readonly commitStmt: Statement; constructor(db: Database) { this.db = db; + // Only return committed rows — pending rows are in-flight in another process. this.lookupStmt = db.prepare( - "SELECT fingerprint, result_json FROM idempotency_keys WHERE cache_key = ? AND stored_at > ?", + "SELECT fingerprint, result_json, status FROM idempotency_keys WHERE cache_key = ? AND stored_at > ?", ); - // INSERT OR IGNORE: if a concurrent process already reserved this key, - // the insert silently fails and the caller detects the conflict via a - // follow-up lookup. This prevents the INSERT OR REPLACE race where two - // processes both miss the lookup and both succeed in writing. this.reserveStmt = db.prepare( - "INSERT OR IGNORE INTO idempotency_keys (cache_key, fingerprint, result_json, stored_at) VALUES (?, ?, ?, ?)", + "INSERT OR IGNORE INTO idempotency_keys (cache_key, fingerprint, status, stored_at) VALUES (?, ?, 'pending', ?)", ); - this.updateStmt = db.prepare( - "UPDATE idempotency_keys SET result_json = ?, stored_at = ? WHERE cache_key = ? AND fingerprint = ?", + this.commitStmt = db.prepare( + "UPDATE idempotency_keys SET result_json = ?, status = 'committed', stored_at = ? WHERE cache_key = ? AND fingerprint = ?", ); } lookup( cacheKey: string, ttlMs: number, - ): { readonly fingerprint: string; readonly resultJson: string } | undefined { + ): + | { readonly fingerprint: string; readonly resultJson: string; readonly status: string } + | undefined { const cutoff = Date.now() - ttlMs; const row = this.lookupStmt.get(cacheKey, cutoff) as { fingerprint: string; result_json: string; + status: string; } | null; if (row === null) return undefined; - return { fingerprint: row.fingerprint, resultJson: row.result_json }; + return { + fingerprint: row.fingerprint, + resultJson: row.result_json, + status: row.status, + }; } reserve(cacheKey: string, fingerprint: string): boolean { - // INSERT OR IGNORE: if another process already holds this key, the - // insert silently fails. Check via a follow-up SELECT whether the - // row we inserted is ours (fingerprint matches and result is the - // placeholder). - this.reserveStmt.run(cacheKey, fingerprint, "{}", Date.now()); + this.reserveStmt.run(cacheKey, fingerprint, Date.now()); + // Check if our reservation landed (fingerprint matches). const row = this.lookupStmt.get(cacheKey, 0) as { fingerprint: string; - result_json: string; } | null; - // Reserved successfully if the row exists with our fingerprint and - // still has the placeholder result (no other process has finalized it). return row !== null && row.fingerprint === fingerprint; } store(cacheKey: string, fingerprint: string, resultJson: string): void { - // Update the reserved row with the full result. Only affects rows - // where both cache_key AND fingerprint match, so a concurrent - // different-fingerprint reservation is not overwritten. - this.updateStmt.run(resultJson, Date.now(), cacheKey, fingerprint); + this.commitStmt.run(resultJson, Date.now(), cacheKey, fingerprint); } clear(): void { diff --git a/src/mcp/operation-adapter.ts b/src/mcp/operation-adapter.ts index 6248d611..682391d5 100644 --- a/src/mcp/operation-adapter.ts +++ b/src/mcp/operation-adapter.ts @@ -38,6 +38,7 @@ export function toOperationDeps(deps: McpDeps): OperationDeps { ...(deps.eventBus !== undefined ? { eventBus: deps.eventBus } : {}), ...(deps.topologyRouter !== undefined ? { topologyRouter: deps.topologyRouter } : {}), ...(deps.handoffStore !== undefined ? { handoffStore: deps.handoffStore } : {}), + ...(deps.idempotencyStore !== undefined ? { idempotencyStore: deps.idempotencyStore } : {}), }; } diff --git a/src/mcp/serve-http.ts b/src/mcp/serve-http.ts index 379b90c3..042dbecf 100644 --- a/src/mcp/serve-http.ts +++ b/src/mcp/serve-http.ts @@ -324,6 +324,7 @@ async function buildScopedDeps(sessionId: string | undefined): Promise Date: Tue, 14 Apr 2026 14:54:58 -0700 Subject: [PATCH 8/9] fix: durable rollback on failure, atomic CLI writes, expired key reuse MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adversarial review round 4 findings: 1. Pending reservations now rolled back on pre-commit failures via rollback() — no more stuck pending rows for the full TTL. 2. CLI path (no handoffs) now uses putWithCowrite when onCommit is set, making contribution + idempotency row truly atomic even without handoff records. 3. reserve() purges expired rows before INSERT OR IGNORE, so keys are reusable after TTL with different fingerprints. --- src/core/operations/contribute.ts | 37 +++++++++++++++++++++++++------ src/core/operations/deps.ts | 2 ++ src/local/sqlite-store.ts | 14 ++++++++++++ 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index 06dd1107..689cc103 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -657,10 +657,11 @@ async function writeContributionWithHandoffs( routedTo.length > 0 && agentRole !== undefined; + const cowriteStore = store as { + putWithCowrite?: (c: Contribution, fn: () => void) => void | Promise; + }; + if (needsHandoffs) { - const cowriteStore = store as { - putWithCowrite?: (c: Contribution, fn: () => void) => void | Promise; - }; const sqliteHandoffStore = handoffStore as { insertSync?: (input: HandoffInput) => string; }; @@ -676,6 +677,20 @@ async function writeContributionWithHandoffs( } } + // Even without handoffs, use the atomic path when onCommit is provided + // and the store supports cowrite — this ensures the idempotency row is + // written inside the same SQLite transaction as the contribution. + if (onCommit !== undefined && cowriteStore.putWithCowrite !== undefined) { + return writeAtomic( + contribution, + [], + agentRole ?? "", + cowriteStore.putWithCowrite.bind(cowriteStore), + () => "", // no-op insertSync (no handoffs) + onCommit, + ); + } + return writeSerial(contribution, routedTo, agentRole, store, handoffStore, onCommit); } @@ -689,15 +704,14 @@ export async function contributeOperation( deps: OperationDeps, ): Promise> { // Hoisted out of the try block so the outer catch can release the slot - // on thrown errors. Single-flight: while a slot holds a pending Promise, - // concurrent callers with the same key await that Promise instead of - // racing through the write path. + // and roll back the durable reservation on thrown errors. let idempotencySlot: | { readonly resolve: (result: OperationResult) => void; readonly release: () => void; } | undefined; + let idempotencyCacheLookupKey: string | undefined; try { if (deps.contributionStore === undefined) { @@ -744,7 +758,7 @@ export async function contributeOperation( // two concurrent callers cannot both observe a miss and race past the // insert — the second caller sees the first caller's pending entry. const idempotencyAgentScope = agent.role ?? agent.agentId; - const idempotencyCacheLookupKey = + idempotencyCacheLookupKey = input.idempotencyKey !== undefined ? idempotencyCacheKey(idempotencyAgentScope, input.idempotencyKey) : undefined; @@ -1235,6 +1249,15 @@ export async function contributeOperation( // failure). Post-commit failures flow through the committed result // path and never reach this catch. idempotencySlot?.release(); + // Roll back the durable reservation so the key doesn't stay stuck + // in 'pending' for the full TTL after a pre-commit failure. + if (idempotencyCacheLookupKey !== undefined && deps.idempotencyStore !== undefined) { + try { + deps.idempotencyStore.rollback(idempotencyCacheLookupKey); + } catch { + // Best-effort — don't mask the original error. + } + } return fromGroveError(error); } } diff --git a/src/core/operations/deps.ts b/src/core/operations/deps.ts index 83c2d14f..e07a2c6e 100644 --- a/src/core/operations/deps.ts +++ b/src/core/operations/deps.ts @@ -46,6 +46,8 @@ export interface IdempotencyStore { reserve(cacheKey: string, fingerprint: string): boolean; /** Update an already-reserved entry with the final result. */ store(cacheKey: string, fingerprint: string, resultJson: string): void; + /** Remove a pending reservation on pre-commit failure. */ + rollback(cacheKey: string): void; /** Remove all entries (testing only). */ clear(): void; } diff --git a/src/local/sqlite-store.ts b/src/local/sqlite-store.ts index 4e548bf4..0d45f418 100644 --- a/src/local/sqlite-store.ts +++ b/src/local/sqlite-store.ts @@ -484,6 +484,13 @@ export class SqliteIdempotencyStore { } reserve(cacheKey: string, fingerprint: string): boolean { + // Purge expired rows first so the key becomes reusable after TTL. + // Without this, INSERT OR IGNORE permanently fails against stale rows + // and the key can never be reused with a different fingerprint. + this.db.run("DELETE FROM idempotency_keys WHERE cache_key = ? AND stored_at <= ?", [ + cacheKey, + Date.now() - 5 * 60 * 1000, + ]); this.reserveStmt.run(cacheKey, fingerprint, Date.now()); // Check if our reservation landed (fingerprint matches). const row = this.lookupStmt.get(cacheKey, 0) as { @@ -492,6 +499,13 @@ export class SqliteIdempotencyStore { return row !== null && row.fingerprint === fingerprint; } + /** Remove a pending reservation on failure (pre-commit rollback). */ + rollback(cacheKey: string): void { + this.db.run("DELETE FROM idempotency_keys WHERE cache_key = ? AND status = 'pending'", [ + cacheKey, + ]); + } + store(cacheKey: string, fingerprint: string, resultJson: string): void { this.commitStmt.run(resultJson, Date.now(), cacheKey, fingerprint); } From 0da681c0978b89d57f4936b60ef1f80ac698e148 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 15:03:36 -0700 Subject: [PATCH 9/9] fix: protect pending rows from TTL purge, session-scope keys, stable identity warning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adversarial review round 5 (final) findings: 1. reserve() now only purges expired committed rows, never pending — prevents stealing an in-flight reservation from a slow process. 2. Idempotency cache key now includes GROVE_SESSION_ID when set, preventing cross-session namespace collisions in MCP HTTP. 3. CLI warns when --idempotency-key is used without --role or --agent-id (hostname-pid is process-scoped, breaks cross-process retries). Docs updated with guidance. --- docs/idempotency.md | 4 +++- src/cli/commands/contribute.ts | 17 +++++++++++++++++ src/core/operations/contribute.ts | 5 ++++- src/local/sqlite-store.ts | 14 +++++++------- 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/docs/idempotency.md b/docs/idempotency.md index 49e0fbcb..d946cfaf 100644 --- a/docs/idempotency.md +++ b/docs/idempotency.md @@ -36,9 +36,11 @@ Idempotency-Key: my-unique-key Pass `--idempotency-key` to `grove contribute`: ```bash -grove contribute --summary "Fix parser" --idempotency-key my-unique-key +grove contribute --summary "Fix parser" --idempotency-key my-unique-key --role coder ``` +**Important**: For cross-process retry safety, always set `--role` or `--agent-id` alongside `--idempotency-key`. Without a stable identity, each process generates a unique agent ID (`hostname-pid`) and cross-process retries won't match. + ## Semantics | Scenario | Behavior | diff --git a/src/cli/commands/contribute.ts b/src/cli/commands/contribute.ts index 0f450fed..4b657039 100644 --- a/src/cli/commands/contribute.ts +++ b/src/cli/commands/contribute.ts @@ -299,6 +299,23 @@ export async function executeContribute(options: ContributeOptions): Promise<{ c throw new Error(`Invalid contribute options:\n ${validation.errors.join("\n ")}`); } + // Warn if --idempotency-key is set without a stable agent identity. + // The default agentId is hostname-pid which changes on restart, making + // cross-process idempotency silently ineffective. + if ( + options.idempotencyKey !== undefined && + !options.agentOverrides.role && + !options.agentOverrides.agentId && + !process.env.GROVE_AGENT_ROLE && + !process.env.GROVE_AGENT_ID + ) { + process.stderr.write( + "[grove] Warning: --idempotency-key without --role or --agent-id uses a process-scoped " + + "identity (hostname-pid). Cross-process retries will not match. Set --role or --agent-id " + + "for stable idempotency.\n", + ); + } + // 2. Find .grove/ const grovePath = join(options.cwd, ".grove"); try { diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index 689cc103..b8d9563b 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -285,7 +285,10 @@ const idempotencyCache = new Map(); /** Build the cache key. Namespaced per agent so two agents can share keys. */ function idempotencyCacheKey(agentScope: string, key: string): string { - return `${agentScope}\u0000${key}`; + // Include session ID when available so the same key in different sessions + // doesn't collide (MCP HTTP sessions share one idempotency store). + const sessionId = process.env.GROVE_SESSION_ID ?? ""; + return `${sessionId}\u0000${agentScope}\u0000${key}`; } /** diff --git a/src/local/sqlite-store.ts b/src/local/sqlite-store.ts index 0d45f418..eff2fe47 100644 --- a/src/local/sqlite-store.ts +++ b/src/local/sqlite-store.ts @@ -484,13 +484,13 @@ export class SqliteIdempotencyStore { } reserve(cacheKey: string, fingerprint: string): boolean { - // Purge expired rows first so the key becomes reusable after TTL. - // Without this, INSERT OR IGNORE permanently fails against stale rows - // and the key can never be reused with a different fingerprint. - this.db.run("DELETE FROM idempotency_keys WHERE cache_key = ? AND stored_at <= ?", [ - cacheKey, - Date.now() - 5 * 60 * 1000, - ]); + // Purge expired COMMITTED rows so the key becomes reusable after TTL. + // Never delete pending rows — they may represent in-flight writes in + // other processes running past the TTL (slow CAS, hooks, contention). + this.db.run( + "DELETE FROM idempotency_keys WHERE cache_key = ? AND status = 'committed' AND stored_at <= ?", + [cacheKey, Date.now() - 5 * 60 * 1000], + ); this.reserveStmt.run(cacheKey, fingerprint, Date.now()); // Check if our reservation landed (fingerprint matches). const row = this.lookupStmt.get(cacheKey, 0) as {