diff --git a/docs/idempotency.md b/docs/idempotency.md new file mode 100644 index 00000000..d946cfaf --- /dev/null +++ b/docs/idempotency.md @@ -0,0 +1,71 @@ +# Idempotency + +Grove contributions support explicit idempotency keys for retry-safe submissions. The semantics follow [RFC 8284](https://datatracker.ietf.org/doc/draft-ietf-httpapi-idempotency-key-header/) and Stripe's `Idempotency-Key` convention. + +## How to use + +### MCP tools + +Pass the optional `idempotencyKey` parameter to any contribution tool: + +```json +{ + "summary": "Fix the parser bug", + "artifacts": { "fix.ts": "blake3:..." }, + "agent": { "agentId": "coder-1", "role": "coder" }, + "idempotencyKey": "my-unique-key" +} +``` + +Available on: `grove_submit_work`, `grove_submit_review`, `grove_discuss`, `grove_reproduce`, `grove_adopt`. + +### HTTP API + +Pass the `Idempotency-Key` header on `POST /api/contributions`: + +```http +POST /api/contributions +Content-Type: application/json +Idempotency-Key: my-unique-key + +{ "kind": "work", "summary": "...", ... } +``` + +### CLI + +Pass `--idempotency-key` to `grove contribute`: + +```bash +grove contribute --summary "Fix parser" --idempotency-key my-unique-key --role coder +``` + +**Important**: For cross-process retry safety, always set `--role` or `--agent-id` alongside `--idempotency-key`. Without a stable identity, each process generates a unique agent ID (`hostname-pid`) and cross-process retries won't match. + +## Semantics + +| Scenario | Behavior | +|----------|----------| +| Same key + same input | Returns cached result (retry) | +| Same key + different input | Returns `STATE_CONFLICT` error (HTTP 409) | +| Same key + in-flight request | Awaits the pending write (single-flight) | +| No key provided | No deduplication — each call creates a new contribution | + +## Key details + +- **Scope**: Keys are namespaced per agent (`agent.role` if set, otherwise `agent.agentId`). Two different agents can use the same key without colliding. +- **TTL**: Cached results expire after **5 minutes**. After expiry, the key can be reused. +- **Cache size**: Up to 1024 entries (LRU eviction when full). +- **Persistent**: The cache is backed by SQLite (`idempotency_keys` table in `grove.db`), so keys survive across CLI process restarts. An in-memory layer provides single-flight deduplication within a single process. +- **Fingerprint coverage**: The conflict check hashes `kind`, `mode`, `summary`, `description`, `artifacts` (name + hash), `relations`, `scores`, `tags`, `context`, and agent scope. Any difference in these fields triggers `STATE_CONFLICT` on key reuse. + +## Key format + +Keys are opaque strings. UUIDv4 or UUIDv7 are recommended. The key itself is not stored in the contribution — it only controls deduplication during the cache TTL window. + +## When to use + +- **Agent retry loops**: Generate a key before the first attempt, reuse it on retries. +- **Network retries**: If a submission times out, replay with the same key to avoid duplicates. +- **Iterative work**: When an agent intentionally resubmits with updated artifacts under the same summary, use a **new key** (or no key) so the submission is not suppressed. + +Idempotency keys are optional. Callers that don't need retry safety can omit the key entirely. diff --git a/src/cli/commands/contribute.test.ts b/src/cli/commands/contribute.test.ts index e4467d69..7425a3cb 100644 --- a/src/cli/commands/contribute.test.ts +++ b/src/cli/commands/contribute.test.ts @@ -4,10 +4,11 @@ * Covers argument parsing, validation, execution logic, and edge cases. */ -import { describe, expect, test } from "bun:test"; +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { mkdir, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; +import { _resetIdempotencyCacheForTests } from "../../core/operations/contribute.js"; import type { ContributeOptions } from "./contribute.js"; import { executeContribute, @@ -265,6 +266,16 @@ describe("parseContributeArgs", () => { expect(opts.agentOverrides.agentId).toBe("my-agent"); expect(opts.agentOverrides.provider).toBe("openai"); }); + + test("parses --idempotency-key flag", () => { + const opts = parseContributeArgs(["--summary", "test", "--idempotency-key", "my-key-1"]); + expect(opts.idempotencyKey).toBe("my-key-1"); + }); + + test("idempotencyKey is undefined when --idempotency-key is omitted", () => { + const opts = parseContributeArgs(["--summary", "test"]); + expect(opts.idempotencyKey).toBeUndefined(); + }); }); // --------------------------------------------------------------------------- @@ -1235,3 +1246,87 @@ describe("grove contribute E2E", () => { } }); }); + +// --------------------------------------------------------------------------- +// Idempotency key plumbing (CLI surface) +// --------------------------------------------------------------------------- + +describe("executeContribute: idempotencyKey", () => { + beforeEach(() => { + _resetIdempotencyCacheForTests(); + }); + + afterEach(() => { + _resetIdempotencyCacheForTests(); + }); + + test("same idempotency key + same input returns the cached CID", async () => { + const dir = await createTempDir(); + try { + await executeInit(makeInitOptions(dir)); + + const opts = makeContributeOptions({ + summary: "Idempotent CLI work", + idempotencyKey: "cli-key-1", + cwd: dir, + }); + + const first = await executeContribute(opts); + expect(first.cid).toMatch(/^blake3:/); + + const second = await executeContribute(opts); + expect(second.cid).toBe(first.cid); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); + + test("same idempotency key + different input throws STATE_CONFLICT", async () => { + const dir = await createTempDir(); + try { + await executeInit(makeInitOptions(dir)); + + const first = await executeContribute( + makeContributeOptions({ + summary: "First CLI work", + idempotencyKey: "cli-conflict-key", + cwd: dir, + }), + ); + expect(first.cid).toMatch(/^blake3:/); + + await expect( + executeContribute( + makeContributeOptions({ + summary: "Different work, same key", + idempotencyKey: "cli-conflict-key", + cwd: dir, + }), + ), + ).rejects.toThrow(/different request body/i); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); + + test("no idempotency key produces distinct contributions", async () => { + const dir = await createTempDir(); + try { + await executeInit(makeInitOptions(dir)); + + const opts = makeContributeOptions({ + summary: "No key CLI work", + cwd: dir, + }); + + const first = await executeContribute(opts); + const second = await executeContribute(opts); + + expect(first.cid).toMatch(/^blake3:/); + expect(second.cid).toMatch(/^blake3:/); + expect(second.cid).not.toBe(first.cid); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/src/cli/commands/contribute.ts b/src/cli/commands/contribute.ts index 955c2636..4b657039 100644 --- a/src/cli/commands/contribute.ts +++ b/src/cli/commands/contribute.ts @@ -59,6 +59,9 @@ export interface ContributeOptions { // Metadata readonly tags: readonly string[]; + // Idempotency + readonly idempotencyKey?: string | undefined; + // Agent readonly agentOverrides: AgentOverrides; @@ -101,6 +104,7 @@ export function parseContributeArgs(args: readonly string[]): ContributeOptions metric: { type: "string", multiple: true, default: [] }, score: { type: "string", multiple: true, default: [] }, tag: { type: "string", multiple: true, default: [] }, + "idempotency-key": { type: "string" }, "agent-id": { type: "string" }, "agent-name": { type: "string" }, provider: { type: "string" }, @@ -130,6 +134,7 @@ export function parseContributeArgs(args: readonly string[]): ContributeOptions metric: values.metric as string[], score: values.score as string[], tags: values.tag as string[], + idempotencyKey: values["idempotency-key"] as string | undefined, agentOverrides: { agentId: values["agent-id"] as string | undefined, agentName: values["agent-name"] as string | undefined, @@ -294,6 +299,23 @@ export async function executeContribute(options: ContributeOptions): Promise<{ c throw new Error(`Invalid contribute options:\n ${validation.errors.join("\n ")}`); } + // Warn if --idempotency-key is set without a stable agent identity. + // The default agentId is hostname-pid which changes on restart, making + // cross-process idempotency silently ineffective. + if ( + options.idempotencyKey !== undefined && + !options.agentOverrides.role && + !options.agentOverrides.agentId && + !process.env.GROVE_AGENT_ROLE && + !process.env.GROVE_AGENT_ID + ) { + process.stderr.write( + "[grove] Warning: --idempotency-key without --role or --agent-id uses a process-scoped " + + "identity (hostname-pid). Cross-process retries will not match. Set --role or --agent-id " + + "for stable idempotency.\n", + ); + } + // 2. Find .grove/ const grovePath = join(options.cwd, ".grove"); try { @@ -303,9 +325,8 @@ export async function executeContribute(options: ContributeOptions): Promise<{ c } // Dynamic imports for lazy loading - const { SqliteContributionStore, SqliteClaimStore, initSqliteDb } = await import( - "../../local/sqlite-store.js" - ); + const { SqliteContributionStore, SqliteClaimStore, SqliteIdempotencyStore, initSqliteDb } = + await import("../../local/sqlite-store.js"); const { FsCas } = await import("../../local/fs-cas.js"); const { DefaultFrontierCalculator } = await import("../../core/frontier.js"); const { parseGroveContract } = await import("../../core/contract.js"); @@ -316,6 +337,7 @@ export async function executeContribute(options: ContributeOptions): Promise<{ c const db = initSqliteDb(dbPath); const rawStore = new SqliteContributionStore(db); const claimStore = new SqliteClaimStore(db); + const idempotencyStore = new SqliteIdempotencyStore(db); const cas = new FsCas(casPath); const frontier = new DefaultFrontierCalculator(rawStore); @@ -471,6 +493,7 @@ export async function executeContribute(options: ContributeOptions): Promise<{ c claimStore, cas, frontier, + idempotencyStore, ...(contract !== undefined ? { contract } : {}), }; @@ -484,6 +507,7 @@ export async function executeContribute(options: ContributeOptions): Promise<{ c ...(scores !== undefined ? { scores } : {}), tags: [...options.tags], agent: options.agentOverrides, + ...(options.idempotencyKey !== undefined ? { idempotencyKey: options.idempotencyKey } : {}), }; const result = await contributeOperation(input, opDeps); diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index f559be79..b8d9563b 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -123,6 +123,7 @@ export interface ReviewInput { readonly context?: Readonly> | undefined; readonly agent?: AgentOverrides | undefined; readonly metadata?: Readonly> | undefined; + readonly idempotencyKey?: string | undefined; } /** Input for the reproduce operation. */ @@ -136,6 +137,7 @@ export interface ReproduceInput { readonly tags?: readonly string[] | undefined; readonly context?: Readonly> | undefined; readonly agent?: AgentOverrides | undefined; + readonly idempotencyKey?: string | undefined; } /** Input for the discuss operation. */ @@ -146,6 +148,7 @@ export interface DiscussInput { readonly tags?: readonly string[] | undefined; readonly context?: Readonly> | undefined; readonly agent?: AgentOverrides | undefined; + readonly idempotencyKey?: string | undefined; } /** Input for the adopt operation. */ @@ -156,6 +159,7 @@ export interface AdoptInput { readonly tags?: readonly string[] | undefined; readonly context?: Readonly> | undefined; readonly agent?: AgentOverrides | undefined; + readonly idempotencyKey?: string | undefined; } /** Result of an adopt operation. */ @@ -281,7 +285,10 @@ const idempotencyCache = new Map(); /** Build the cache key. Namespaced per agent so two agents can share keys. */ function idempotencyCacheKey(agentScope: string, key: string): string { - return `${agentScope}\u0000${key}`; + // Include session ID when available so the same key in different sessions + // doesn't collide (MCP HTTP sessions share one idempotency store). + const sessionId = process.env.GROVE_SESSION_ID ?? ""; + return `${sessionId}\u0000${agentScope}\u0000${key}`; } /** @@ -415,7 +422,10 @@ function lookupIdempotency( | undefined { const entry = idempotencyCache.get(cacheKey); if (entry === undefined) return undefined; - if (now - entry.storedAt > IDEMPOTENCY_TTL_MS) { + // Never TTL-expire a pending entry — the in-flight write may still be + // running (slow CAS, hooks, etc). Expiring it would let a retry start + // a second write instead of awaiting the first. + if (now - entry.storedAt > IDEMPOTENCY_TTL_MS && entry.pending === undefined) { idempotencyCache.delete(cacheKey); return undefined; } @@ -454,10 +464,16 @@ function reserveIdempotencySlot( readonly resolve: (result: OperationResult) => void; readonly release: () => void; } { - // Evict the oldest entry if at capacity. + // Evict the oldest non-pending entry if at capacity. Pending entries + // represent in-flight writes — evicting them would break single-flight + // and let a retry start a duplicate write. if (idempotencyCache.size >= IDEMPOTENCY_MAX_ENTRIES) { - const oldest = idempotencyCache.keys().next().value; - if (oldest !== undefined) idempotencyCache.delete(oldest); + for (const [key, entry] of idempotencyCache) { + if (entry.pending === undefined) { + idempotencyCache.delete(key); + break; + } + } } let resolver!: (result: OperationResult) => void; @@ -519,6 +535,7 @@ async function writeAtomic( agentRole: string, putWithCowrite: (c: Contribution, fn: () => void) => void | Promise, insertSync: (input: HandoffInput) => string, + onCommit?: () => void, ): Promise { const handoffIds: string[] = []; const maybePromise = putWithCowrite(contribution, () => { @@ -531,6 +548,11 @@ async function writeAtomic( }); if (hid !== undefined) handoffIds.push(hid); } + // Write idempotency row inside the same SQLite transaction so a + // crash between contribution commit and idempotency store write + // cannot leave an orphaned contribution without its idempotency + // record. + onCommit?.(); }); if ( maybePromise !== undefined && @@ -560,8 +582,14 @@ async function writeSerial( agentRole: string | undefined, store: ContributionStore, handoffStore: HandoffStore | undefined, + onCommit?: () => void, ): Promise { await store.put(contribution); + // For non-atomic stores (Nexus, in-memory), write the idempotency row + // immediately after the contribution commit. Not fully crash-safe (the + // contribution and idempotency row are separate writes), but the window + // is minimal and matches the existing handoff best-effort pattern. + onCommit?.(); const handoffIds: string[] = []; if (handoffStore === undefined || routedTo === undefined || agentRole === undefined) { @@ -624,6 +652,7 @@ async function writeContributionWithHandoffs( agentRole: string | undefined, store: ContributionStore, handoffStore: HandoffStore | undefined, + onCommit?: () => void, ): Promise { const needsHandoffs = handoffStore !== undefined && @@ -631,10 +660,11 @@ async function writeContributionWithHandoffs( routedTo.length > 0 && agentRole !== undefined; + const cowriteStore = store as { + putWithCowrite?: (c: Contribution, fn: () => void) => void | Promise; + }; + if (needsHandoffs) { - const cowriteStore = store as { - putWithCowrite?: (c: Contribution, fn: () => void) => void | Promise; - }; const sqliteHandoffStore = handoffStore as { insertSync?: (input: HandoffInput) => string; }; @@ -645,11 +675,26 @@ async function writeContributionWithHandoffs( agentRole, cowriteStore.putWithCowrite.bind(cowriteStore), sqliteHandoffStore.insertSync.bind(sqliteHandoffStore), + onCommit, ); } } - return writeSerial(contribution, routedTo, agentRole, store, handoffStore); + // Even without handoffs, use the atomic path when onCommit is provided + // and the store supports cowrite — this ensures the idempotency row is + // written inside the same SQLite transaction as the contribution. + if (onCommit !== undefined && cowriteStore.putWithCowrite !== undefined) { + return writeAtomic( + contribution, + [], + agentRole ?? "", + cowriteStore.putWithCowrite.bind(cowriteStore), + () => "", // no-op insertSync (no handoffs) + onCommit, + ); + } + + return writeSerial(contribution, routedTo, agentRole, store, handoffStore, onCommit); } // --------------------------------------------------------------------------- @@ -662,15 +707,14 @@ export async function contributeOperation( deps: OperationDeps, ): Promise> { // Hoisted out of the try block so the outer catch can release the slot - // on thrown errors. Single-flight: while a slot holds a pending Promise, - // concurrent callers with the same key await that Promise instead of - // racing through the write path. + // and roll back the durable reservation on thrown errors. let idempotencySlot: | { readonly resolve: (result: OperationResult) => void; readonly release: () => void; } | undefined; + let idempotencyCacheLookupKey: string | undefined; try { if (deps.contributionStore === undefined) { @@ -717,13 +761,20 @@ export async function contributeOperation( // two concurrent callers cannot both observe a miss and race past the // insert — the second caller sees the first caller's pending entry. const idempotencyAgentScope = agent.role ?? agent.agentId; - const idempotencyCacheLookupKey = + idempotencyCacheLookupKey = input.idempotencyKey !== undefined ? idempotencyCacheKey(idempotencyAgentScope, input.idempotencyKey) : undefined; + // Hoisted so it's available at the durable-commit boundary for + // persistent store writes. + let idempotencyFingerprint: string | undefined; if (idempotencyCacheLookupKey !== undefined) { - const fingerprint = computeIdempotencyFingerprint(input, agent); - const cached = lookupIdempotency(idempotencyCacheLookupKey, fingerprint, Date.now()); + idempotencyFingerprint = computeIdempotencyFingerprint(input, agent); + const cached = lookupIdempotency( + idempotencyCacheLookupKey, + idempotencyFingerprint, + Date.now(), + ); if (cached !== undefined) { if (cached.type === "pending") { // Concurrent caller: await their write and return the same result. @@ -739,9 +790,66 @@ export async function contributeOperation( details: { idempotencyKey: input.idempotencyKey }, }); } - // Miss: synchronously reserve the slot. Subsequent concurrent - // callers observe the pending Promise and await it. - idempotencySlot = reserveIdempotencySlot(idempotencyCacheLookupKey, fingerprint, Date.now()); + + // In-memory miss: check persistent store for cross-process hits. + if (deps.idempotencyStore !== undefined) { + const persisted = deps.idempotencyStore.lookup( + idempotencyCacheLookupKey, + IDEMPOTENCY_TTL_MS, + ); + if (persisted !== undefined) { + if (persisted.fingerprint !== idempotencyFingerprint) { + return err({ + code: OperationErrorCode.StateConflict, + message: + "Idempotency key was previously used with a different request body. " + + "Reusing the same key with different input is rejected to prevent silent " + + "write divergence. Use a new key for the new intent.", + details: { idempotencyKey: input.idempotencyKey }, + }); + } + if (persisted.status === "committed") { + // Same key + same fingerprint, already completed: return cached. + const result = JSON.parse(persisted.resultJson) as ContributeResult; + return ok(result); + } + // status === "pending": another process is mid-write. Return a + // retryable error rather than proceeding with a duplicate write + // or returning the placeholder result. + return err({ + code: OperationErrorCode.StateConflict, + message: + "Idempotency key is currently being processed by another request. " + + "Retry after a short delay.", + details: { idempotencyKey: input.idempotencyKey, retryable: true }, + }); + } + + // No existing row: durably reserve before writing. + const reserved = deps.idempotencyStore.reserve( + idempotencyCacheLookupKey, + idempotencyFingerprint, + ); + if (!reserved) { + // Lost the race to another process that reserved between our + // lookup and this INSERT OR IGNORE. Return retryable error. + return err({ + code: OperationErrorCode.StateConflict, + message: + "Idempotency key is currently being processed by another request. " + + "Retry after a short delay.", + details: { idempotencyKey: input.idempotencyKey, retryable: true }, + }); + } + } + + // Reserve the in-memory slot for single-flight within this process. + // Subsequent concurrent callers observe the pending Promise and await it. + idempotencySlot = reserveIdempotencySlot( + idempotencyCacheLookupKey, + idempotencyFingerprint, + Date.now(), + ); } const contributionInput: ContributionInput = { @@ -868,12 +976,42 @@ export async function contributeOperation( // undefined as the routing-target list to the writer. const agentRole = contribution.agent.role; const handoffsRoutedTo = skipHandoffs ? undefined : routedTo; + // Build onCommit callback for atomic idempotency-store write. + // Runs inside the SQLite transaction (atomic path) or immediately + // after store.put() (serial path), closing the crash window between + // contribution commit and idempotency record. + const idempotencyOnCommit = + deps.idempotencyStore !== undefined && + idempotencyCacheLookupKey !== undefined && + idempotencyFingerprint !== undefined + ? () => { + // Build a minimal result from the contribution (routedTo and + // handoffIds are populated later, but the CID is the critical + // dedup signal — retries just need to know the write happened). + const earlyResult: ContributeResult = { + cid: contribution.cid, + kind: contribution.kind, + mode: contribution.mode, + summary: contribution.summary, + artifactCount: Object.keys(contribution.artifacts).length, + relationCount: contribution.relations.length, + createdAt: contribution.createdAt, + }; + deps.idempotencyStore?.store( + idempotencyCacheLookupKey!, + idempotencyFingerprint!, + JSON.stringify(earlyResult), + ); + } + : undefined; + const handoffIds = await writeContributionWithHandoffs( contribution, handoffsRoutedTo, agentRole, deps.contributionStore, deps.handoffStore, + idempotencyOnCommit, ); // ┌──────────────────────────────────────────────────────────────────┐ @@ -917,6 +1055,26 @@ export async function contributeOperation( // durably written; retries with the same key must return this cached // result, NOT re-run the write path. idempotencySlot = undefined; + + // Update the durable idempotency row with the full result (includes + // routedTo, handoffIds, policy which weren't available at commit time). + // This is an UPDATE, not an INSERT — the row was created atomically + // with the contribution inside the write transaction. + if ( + deps.idempotencyStore !== undefined && + idempotencyCacheLookupKey !== undefined && + idempotencyFingerprint !== undefined + ) { + try { + deps.idempotencyStore.store( + idempotencyCacheLookupKey, + idempotencyFingerprint, + JSON.stringify(committedResult), + ); + } catch { + // The early result from onCommit is sufficient for dedup. + } + } } // Post-write callbacks — wrapped so a throw cannot escape and undo @@ -1094,6 +1252,15 @@ export async function contributeOperation( // failure). Post-commit failures flow through the committed result // path and never reach this catch. idempotencySlot?.release(); + // Roll back the durable reservation so the key doesn't stay stuck + // in 'pending' for the full TTL after a pre-commit failure. + if (idempotencyCacheLookupKey !== undefined && deps.idempotencyStore !== undefined) { + try { + deps.idempotencyStore.rollback(idempotencyCacheLookupKey); + } catch { + // Best-effort — don't mask the original error. + } + } return fromGroveError(error); } } @@ -1122,7 +1289,7 @@ export async function reviewOperation( relations, tags: input.tags, agent: input.agent, - ...pickDefined(input, ["description", "scores", "context"]), + ...pickDefined(input, ["description", "scores", "context", "idempotencyKey"]), }, deps, ); @@ -1165,7 +1332,7 @@ export async function reproduceOperation( relations, tags: input.tags, agent: input.agent, - ...pickDefined(input, ["description", "scores", "context"]), + ...pickDefined(input, ["description", "scores", "context", "idempotencyKey"]), }, deps, ); @@ -1206,7 +1373,7 @@ export async function discussOperation( relations, tags: input.tags, agent: input.agent, - ...pickDefined(input, ["description", "context"]), + ...pickDefined(input, ["description", "context", "idempotencyKey"]), }, deps, ); @@ -1245,7 +1412,7 @@ export async function adoptOperation( relations, tags: input.tags, agent: input.agent, - ...pickDefined(input, ["description", "context"]), + ...pickDefined(input, ["description", "context", "idempotencyKey"]), }, deps, ); diff --git a/src/core/operations/deps.ts b/src/core/operations/deps.ts index 4308e9aa..e07a2c6e 100644 --- a/src/core/operations/deps.ts +++ b/src/core/operations/deps.ts @@ -18,6 +18,40 @@ import type { ClaimStore, ContributionStore } from "../store.js"; import type { TopologyRouter } from "../topology-router.js"; import type { WorkspaceManager } from "../workspace.js"; +/** + * Persistent idempotency store for cross-process deduplication. + * + * The in-memory Map in contribute.ts handles single-flight within a single + * process (pending Promise coalescence). This store handles the case where + * separate CLI invocations hit the same key — without it, each process + * starts with an empty Map and the key is silently ignored. + */ +export interface IdempotencyStore { + /** Look up an unexpired entry. Returns fingerprint, result, and status. */ + lookup( + cacheKey: string, + ttlMs: number, + ): + | { + readonly fingerprint: string; + readonly resultJson: string; + readonly status: string; + } + | undefined; + /** + * Durably reserve a key before starting the write. Returns true if this + * process won the reservation, false if another process already holds it. + * Uses INSERT OR IGNORE so concurrent callers cannot both succeed. + */ + reserve(cacheKey: string, fingerprint: string): boolean; + /** Update an already-reserved entry with the final result. */ + store(cacheKey: string, fingerprint: string, resultJson: string): void; + /** Remove a pending reservation on pre-commit failure. */ + rollback(cacheKey: string): void; + /** Remove all entries (testing only). */ + clear(): void; +} + /** * Dependencies required by operations. * @@ -50,4 +84,6 @@ export interface OperationDeps { readonly hookRunner?: HookRunner | undefined; /** Working directory for hook execution. */ readonly hookCwd?: string | undefined; + /** Persistent idempotency store for cross-process deduplication. */ + readonly idempotencyStore?: IdempotencyStore | undefined; } diff --git a/src/core/operations/test-helpers.ts b/src/core/operations/test-helpers.ts index 7516d39e..59d1c602 100644 --- a/src/core/operations/test-helpers.ts +++ b/src/core/operations/test-helpers.ts @@ -14,6 +14,7 @@ import { initSqliteDb, SqliteClaimStore, SqliteContributionStore, + SqliteIdempotencyStore, } from "../../local/sqlite-store.js"; import { LocalWorkspaceManager } from "../../local/workspace.js"; import type { ContentStore } from "../cas.js"; @@ -123,6 +124,7 @@ export async function createTestOperationDeps(): Promise { }); const handoffStore = new InMemoryHandoffStore(); + const idempotencyStore = new SqliteIdempotencyStore(db); const deps: FullOperationDeps = { contributionStore, @@ -133,6 +135,7 @@ export async function createTestOperationDeps(): Promise { frontier, workspace, handoffStore, + idempotencyStore, contract: undefined as unknown as GroveContract, outcomeStore: undefined as unknown as OutcomeStore, onContributionWrite: () => { diff --git a/src/local/runtime.ts b/src/local/runtime.ts index b3835225..b28703a6 100644 --- a/src/local/runtime.ts +++ b/src/local/runtime.ts @@ -17,7 +17,11 @@ import { FsCas } from "./fs-cas.js"; import type { SqliteBountyStore } from "./sqlite-bounty-store.js"; import type { SqliteGoalSessionStore } from "./sqlite-goal-session-store.js"; import type { SqliteOutcomeStore } from "./sqlite-outcome-store.js"; -import type { SqliteClaimStore, SqliteContributionStore } from "./sqlite-store.js"; +import type { + SqliteClaimStore, + SqliteContributionStore, + SqliteIdempotencyStore, +} from "./sqlite-store.js"; import { createSqliteStores } from "./sqlite-store.js"; import { LocalWorkspaceManager } from "./workspace.js"; @@ -45,6 +49,7 @@ export interface LocalRuntime { readonly outcomeStore: SqliteOutcomeStore; readonly goalSessionStore: SqliteGoalSessionStore; readonly handoffStore: import("./sqlite-handoff-store.js").SqliteHandoffStore; + readonly idempotencyStore: SqliteIdempotencyStore; readonly cas: FsCas; readonly frontier: FrontierCalculator; readonly workspace: LocalWorkspaceManager | undefined; @@ -136,6 +141,7 @@ export function createLocalRuntime(options: LocalRuntimeOptions): LocalRuntime { outcomeStore: stores.outcomeStore, goalSessionStore: stores.goalSessionStore, handoffStore: stores.handoffStore, + idempotencyStore: stores.idempotencyStore, cas, frontier, workspace, diff --git a/src/local/sqlite-store.ts b/src/local/sqlite-store.ts index ef927ca4..eff2fe47 100644 --- a/src/local/sqlite-store.ts +++ b/src/local/sqlite-store.ts @@ -153,6 +153,18 @@ const SCHEMA_DDL = ` CREATE INDEX IF NOT EXISTS idx_workspaces_status ON workspaces(status); CREATE INDEX IF NOT EXISTS idx_workspaces_activity ON workspaces(last_activity_at); + + -- Idempotency cache: persists across process restarts so CLI retries work. + -- The in-memory Map in contribute.ts handles single-flight within a process; + -- this table handles cross-process deduplication. + -- status: 'pending' (reservation, write in progress) or 'committed' (write done). + CREATE TABLE IF NOT EXISTS idempotency_keys ( + cache_key TEXT PRIMARY KEY, + fingerprint TEXT NOT NULL, + result_json TEXT NOT NULL DEFAULT '{}', + status TEXT NOT NULL DEFAULT 'pending', + stored_at INTEGER NOT NULL + ); `; const FTS_DDL = ` @@ -419,6 +431,90 @@ export function initSqliteDb(dbPath: string): Database { return db; } +// --------------------------------------------------------------------------- +// SqliteIdempotencyStore +// --------------------------------------------------------------------------- + +/** + * SQLite-backed idempotency store for cross-process deduplication. + * + * Complements the in-memory Map in contribute.ts: the Map provides + * single-flight (pending Promise coalescence) within a process; this + * store provides durable lookup so a CLI retry in a new process can + * detect that a key was already used. + */ +export class SqliteIdempotencyStore { + private readonly db: Database; + private readonly lookupStmt: Statement; + private readonly reserveStmt: Statement; + private readonly commitStmt: Statement; + + constructor(db: Database) { + this.db = db; + // Only return committed rows — pending rows are in-flight in another process. + this.lookupStmt = db.prepare( + "SELECT fingerprint, result_json, status FROM idempotency_keys WHERE cache_key = ? AND stored_at > ?", + ); + this.reserveStmt = db.prepare( + "INSERT OR IGNORE INTO idempotency_keys (cache_key, fingerprint, status, stored_at) VALUES (?, ?, 'pending', ?)", + ); + this.commitStmt = db.prepare( + "UPDATE idempotency_keys SET result_json = ?, status = 'committed', stored_at = ? WHERE cache_key = ? AND fingerprint = ?", + ); + } + + lookup( + cacheKey: string, + ttlMs: number, + ): + | { readonly fingerprint: string; readonly resultJson: string; readonly status: string } + | undefined { + const cutoff = Date.now() - ttlMs; + const row = this.lookupStmt.get(cacheKey, cutoff) as { + fingerprint: string; + result_json: string; + status: string; + } | null; + if (row === null) return undefined; + return { + fingerprint: row.fingerprint, + resultJson: row.result_json, + status: row.status, + }; + } + + reserve(cacheKey: string, fingerprint: string): boolean { + // Purge expired COMMITTED rows so the key becomes reusable after TTL. + // Never delete pending rows — they may represent in-flight writes in + // other processes running past the TTL (slow CAS, hooks, contention). + this.db.run( + "DELETE FROM idempotency_keys WHERE cache_key = ? AND status = 'committed' AND stored_at <= ?", + [cacheKey, Date.now() - 5 * 60 * 1000], + ); + this.reserveStmt.run(cacheKey, fingerprint, Date.now()); + // Check if our reservation landed (fingerprint matches). + const row = this.lookupStmt.get(cacheKey, 0) as { + fingerprint: string; + } | null; + return row !== null && row.fingerprint === fingerprint; + } + + /** Remove a pending reservation on failure (pre-commit rollback). */ + rollback(cacheKey: string): void { + this.db.run("DELETE FROM idempotency_keys WHERE cache_key = ? AND status = 'pending'", [ + cacheKey, + ]); + } + + store(cacheKey: string, fingerprint: string, resultJson: string): void { + this.commitStmt.run(resultJson, Date.now(), cacheKey, fingerprint); + } + + clear(): void { + this.db.run("DELETE FROM idempotency_keys"); + } +} + /** * Convenience factory that creates a shared Database and returns both stores. * The returned close() disposes the database connection. @@ -431,6 +527,7 @@ export function createSqliteStores(dbPath: string): { outcomeStore: SqliteOutcomeStore; goalSessionStore: SqliteGoalSessionStore; handoffStore: SqliteHandoffStore; + idempotencyStore: SqliteIdempotencyStore; close: () => void; } { const db = initSqliteDb(dbPath); @@ -442,6 +539,7 @@ export function createSqliteStores(dbPath: string): { outcomeStore: new SqliteOutcomeStore(db), goalSessionStore: new SqliteGoalSessionStore(db), handoffStore: new SqliteHandoffStore(db), + idempotencyStore: new SqliteIdempotencyStore(db), close: () => { db.run("PRAGMA optimize"); db.close(); diff --git a/src/mcp/operation-adapter.ts b/src/mcp/operation-adapter.ts index 6248d611..682391d5 100644 --- a/src/mcp/operation-adapter.ts +++ b/src/mcp/operation-adapter.ts @@ -38,6 +38,7 @@ export function toOperationDeps(deps: McpDeps): OperationDeps { ...(deps.eventBus !== undefined ? { eventBus: deps.eventBus } : {}), ...(deps.topologyRouter !== undefined ? { topologyRouter: deps.topologyRouter } : {}), ...(deps.handoffStore !== undefined ? { handoffStore: deps.handoffStore } : {}), + ...(deps.idempotencyStore !== undefined ? { idempotencyStore: deps.idempotencyStore } : {}), }; } diff --git a/src/mcp/serve-http.ts b/src/mcp/serve-http.ts index 4ab000cc..042dbecf 100644 --- a/src/mcp/serve-http.ts +++ b/src/mcp/serve-http.ts @@ -294,9 +294,18 @@ async function buildScopedDeps(sessionId: string | undefined): Promise { expect(data.kind).toBe("adoption"); }); }); + +// --------------------------------------------------------------------------- +// Idempotency key plumbing (MCP surface) +// --------------------------------------------------------------------------- + +describe("MCP idempotencyKey plumbing", () => { + let testDeps: TestMcpDeps; + let deps: McpDeps; + let server: McpServer; + + beforeEach(async () => { + _resetIdempotencyCacheForTests(); + testDeps = await createTestMcpDeps(); + deps = testDeps.deps; + server = new McpServer({ name: "test", version: "0.0.1" }, { capabilities: { tools: {} } }); + registerContributionTools(server, deps); + }); + + afterEach(async () => { + await testDeps.cleanup(); + _resetIdempotencyCacheForTests(); + }); + + test("grove_submit_work: same key + same input returns cached result", async () => { + const hash = await storeTestContent(deps.cas, "hello world"); + const args = { + summary: "Idempotent work", + artifacts: { "file.ts": hash }, + agent: { agentId: "coder-1", role: "coder" }, + idempotencyKey: "work-key-1", + }; + + const first = await callTool(server, "grove_submit_work", args); + expect(first.isError).toBeUndefined(); + const firstData = JSON.parse(first.text); + + const second = await callTool(server, "grove_submit_work", args); + expect(second.isError).toBeUndefined(); + const secondData = JSON.parse(second.text); + + expect(secondData.cid).toBe(firstData.cid); + }); + + test("grove_submit_work: same key + different input returns STATE_CONFLICT", async () => { + const hash = await storeTestContent(deps.cas, "hello world"); + const first = await callTool(server, "grove_submit_work", { + summary: "First work", + artifacts: { "file.ts": hash }, + agent: { agentId: "coder-1", role: "coder" }, + idempotencyKey: "work-conflict-key", + }); + expect(first.isError).toBeUndefined(); + + const second = await callTool(server, "grove_submit_work", { + summary: "Different work, same key", + artifacts: { "file.ts": hash }, + agent: { agentId: "coder-1", role: "coder" }, + idempotencyKey: "work-conflict-key", + }); + expect(second.isError).toBe(true); + expect(second.text).toContain("STATE_CONFLICT"); + }); + + test("grove_submit_review: idempotency key flows through", async () => { + const target = makeContribution({ summary: "Target" }); + await deps.contributionStore.put(target); + + const args = { + targetCid: target.cid, + summary: "Review with idempotency", + scores: { quality: { value: 0.9, direction: "maximize" } }, + agent: { agentId: "reviewer-1", role: "reviewer" }, + idempotencyKey: "review-key-1", + }; + + const first = await callTool(server, "grove_submit_review", args); + expect(first.isError).toBeUndefined(); + const firstData = JSON.parse(first.text); + + const second = await callTool(server, "grove_submit_review", args); + expect(second.isError).toBeUndefined(); + const secondData = JSON.parse(second.text); + + expect(secondData.cid).toBe(firstData.cid); + }); + + test("grove_discuss: idempotency key flows through", async () => { + const args = { + summary: "Discussion with idempotency", + agent: { agentId: "agent-1", role: "architect" }, + idempotencyKey: "discuss-key-1", + }; + + const first = await callTool(server, "grove_discuss", args); + expect(first.isError).toBeUndefined(); + const firstData = JSON.parse(first.text); + + const second = await callTool(server, "grove_discuss", args); + expect(second.isError).toBeUndefined(); + const secondData = JSON.parse(second.text); + + expect(secondData.cid).toBe(firstData.cid); + }); + + test("grove_reproduce: idempotency key flows through", async () => { + const target = makeContribution({ summary: "Experiment" }); + await deps.contributionStore.put(target); + + const args = { + targetCid: target.cid, + summary: "Reproduction with idempotency", + result: "confirmed", + agent: { agentId: "repro-1", role: "reproducer" }, + artifacts: {}, + idempotencyKey: "repro-key-1", + }; + + const first = await callTool(server, "grove_reproduce", args); + expect(first.isError).toBeUndefined(); + const firstData = JSON.parse(first.text); + + const second = await callTool(server, "grove_reproduce", args); + expect(second.isError).toBeUndefined(); + const secondData = JSON.parse(second.text); + + expect(secondData.cid).toBe(firstData.cid); + }); + + test("grove_adopt: idempotency key flows through", async () => { + const target = makeContribution({ summary: "Work to adopt" }); + await deps.contributionStore.put(target); + + const args = { + targetCid: target.cid, + summary: "Adopting with idempotency", + agent: { agentId: "adopter-1", role: "adopter" }, + idempotencyKey: "adopt-key-1", + }; + + const first = await callTool(server, "grove_adopt", args); + expect(first.isError).toBeUndefined(); + const firstData = JSON.parse(first.text); + + const second = await callTool(server, "grove_adopt", args); + expect(second.isError).toBeUndefined(); + const secondData = JSON.parse(second.text); + + expect(secondData.cid).toBe(firstData.cid); + }); +}); diff --git a/src/mcp/tools/contributions.ts b/src/mcp/tools/contributions.ts index 1650c5aa..c34feb20 100644 --- a/src/mcp/tools/contributions.ts +++ b/src/mcp/tools/contributions.ts @@ -37,6 +37,30 @@ import { scoreSchema, } from "../schemas.js"; +// --------------------------------------------------------------------------- +// Shared idempotency field +// --------------------------------------------------------------------------- + +/** + * Optional client-supplied idempotency key, shared across all contribution tools. + * + * Follows HTTP Idempotency-Key semantics (Stripe / AWS / RFC draft): + * - same key + same input → cached result (retry) + * - same key + different input → STATE_CONFLICT error + * - scoped per agent role (two agents can use the same key) + * - TTL: 5 minutes from first use + */ +const idempotencyKeyField = { + idempotencyKey: z + .string() + .optional() + .describe( + "Client-supplied idempotency key for retry safety. " + + "Reusing the same key with the same input returns the cached result. " + + "Reusing the same key with different input returns a STATE_CONFLICT error.", + ), +} as const; + // --------------------------------------------------------------------------- // Input schemas // --------------------------------------------------------------------------- @@ -74,6 +98,7 @@ const submitWorkInputSchema = z.object({ .optional() .describe("Execution/evaluation context metadata (e.g., hardware, dataset)"), agent: agentSchema, + ...idempotencyKeyField, }); const submitReviewInputSchema = z.object({ @@ -94,6 +119,7 @@ const submitReviewInputSchema = z.object({ .record(z.string(), z.unknown()) .optional() .describe("Relation metadata attached to the 'reviews' edge (e.g., {score: 0.8})"), + ...idempotencyKeyField, }); const reproduceInputSchema = z.object({ @@ -119,6 +145,7 @@ const reproduceInputSchema = z.object({ tags: z.array(z.string()).optional().default([]).describe("Tags"), context: z.record(z.string(), z.unknown()).optional().describe("Context metadata"), agent: agentSchema, + ...idempotencyKeyField, }); const discussInputSchema = z.object({ @@ -131,6 +158,7 @@ const discussInputSchema = z.object({ tags: z.array(z.string()).optional().default([]).describe("Tags for channel semantics"), context: z.record(z.string(), z.unknown()).optional().describe("Context metadata"), agent: agentSchema, + ...idempotencyKeyField, }); const adoptInputSchema = z.object({ @@ -144,6 +172,7 @@ const adoptInputSchema = z.object({ tags: z.array(z.string()).optional().default([]).describe("Tags"), context: z.record(z.string(), z.unknown()).optional().describe("Context metadata"), agent: agentSchema, + ...idempotencyKeyField, }); // --------------------------------------------------------------------------- @@ -212,6 +241,7 @@ export function registerContributionTools(server: McpServer, deps: McpDeps): voi relations: args.relations as unknown as readonly Relation[], tags: args.tags, agent: withDefaultRole(args.agent as AgentOverrides), + ...(args.idempotencyKey !== undefined ? { idempotencyKey: args.idempotencyKey } : {}), }, opDeps, ); @@ -256,6 +286,7 @@ export function registerContributionTools(server: McpServer, deps: McpDeps): voi ...(args.metadata !== undefined ? { metadata: args.metadata as Readonly> } : {}), + ...(args.idempotencyKey !== undefined ? { idempotencyKey: args.idempotencyKey } : {}), }, opDeps, ); @@ -293,6 +324,7 @@ export function registerContributionTools(server: McpServer, deps: McpDeps): voi ...(args.context !== undefined ? { context: args.context as Readonly> } : {}), + ...(args.idempotencyKey !== undefined ? { idempotencyKey: args.idempotencyKey } : {}), }, opDeps, ); @@ -324,6 +356,7 @@ export function registerContributionTools(server: McpServer, deps: McpDeps): voi ...(args.context !== undefined ? { context: args.context as Readonly> } : {}), + ...(args.idempotencyKey !== undefined ? { idempotencyKey: args.idempotencyKey } : {}), }, opDeps, ); @@ -353,6 +386,7 @@ export function registerContributionTools(server: McpServer, deps: McpDeps): voi ...(args.context !== undefined ? { context: args.context as Readonly> } : {}), + ...(args.idempotencyKey !== undefined ? { idempotencyKey: args.idempotencyKey } : {}), }, opDeps, ); diff --git a/src/server/deps.ts b/src/server/deps.ts index ee52e483..ee72f227 100644 --- a/src/server/deps.ts +++ b/src/server/deps.ts @@ -11,6 +11,7 @@ import type { GroveContract } from "../core/contract.js"; import type { FrontierCalculator } from "../core/frontier.js"; import type { GossipService } from "../core/gossip/types.js"; import type { HandoffStore } from "../core/handoff.js"; +import type { IdempotencyStore } from "../core/operations/deps.js"; import type { OutcomeStore } from "../core/outcome.js"; import type { ClaimStore, ContributionStore } from "../core/store.js"; import type { AgentTopology } from "../core/topology.js"; @@ -36,6 +37,8 @@ export interface ServerDeps { readonly contract?: GroveContract | undefined; /** Optional handoff store. Routes return 501 when not configured. */ readonly handoffStore?: HandoffStore | undefined; + /** Optional idempotency store for cross-process deduplication. */ + readonly idempotencyStore?: IdempotencyStore | undefined; } /** Hono environment type carrying injected dependencies. */ diff --git a/src/server/e2e.test.ts b/src/server/e2e.test.ts index 642960f9..baec3f3a 100644 --- a/src/server/e2e.test.ts +++ b/src/server/e2e.test.ts @@ -6,8 +6,9 @@ * (e.g., streaming, headers, content negotiation). */ -import { afterAll, beforeAll, describe, expect, it } from "bun:test"; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from "bun:test"; import { DefaultFrontierCalculator } from "../core/frontier.js"; +import { _resetIdempotencyCacheForTests } from "../core/operations/contribute.js"; import { InMemoryContributionStore } from "../core/testing.js"; import { createApp } from "./app.js"; import type { ServerDeps } from "./deps.js"; @@ -265,3 +266,77 @@ describe("E2E: validation errors", () => { expect(res.status).toBe(400); }); }); + +describe("E2E: Idempotency-Key header", () => { + beforeEach(() => { + _resetIdempotencyCacheForTests(); + }); + + afterEach(() => { + _resetIdempotencyCacheForTests(); + }); + + it("same Idempotency-Key + same body returns the same CID", async () => { + const body = makeManifestBody({ summary: "Idempotent HTTP contribution" }); + const key = "http-idem-key-1"; + + const first = await fetch(`${baseUrl}/api/contributions`, { + method: "POST", + headers: { "Content-Type": "application/json", "Idempotency-Key": key }, + body: JSON.stringify(body), + }); + expect(first.status).toBe(201); + const firstData = (await first.json()) as Json; + + const second = await fetch(`${baseUrl}/api/contributions`, { + method: "POST", + headers: { "Content-Type": "application/json", "Idempotency-Key": key }, + body: JSON.stringify(body), + }); + expect(second.status).toBe(201); + const secondData = (await second.json()) as Json; + + expect(secondData.cid).toBe(firstData.cid); + }); + + it("same Idempotency-Key + different body returns 409 STATE_CONFLICT", async () => { + const key = "http-conflict-key-1"; + + const first = await fetch(`${baseUrl}/api/contributions`, { + method: "POST", + headers: { "Content-Type": "application/json", "Idempotency-Key": key }, + body: JSON.stringify(makeManifestBody({ summary: "First submission" })), + }); + expect(first.status).toBe(201); + + const second = await fetch(`${baseUrl}/api/contributions`, { + method: "POST", + headers: { "Content-Type": "application/json", "Idempotency-Key": key }, + body: JSON.stringify(makeManifestBody({ summary: "Different submission, same key" })), + }); + expect(second.status).toBe(409); + const errorData = (await second.json()) as Json; + expect(errorData.error.code).toBe("STATE_CONFLICT"); + }); + + it("no Idempotency-Key header allows distinct contributions with different content", async () => { + const first = await fetch(`${baseUrl}/api/contributions`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(makeManifestBody({ summary: "First no-key contribution" })), + }); + expect(first.status).toBe(201); + const firstData = (await first.json()) as Json; + + const second = await fetch(`${baseUrl}/api/contributions`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(makeManifestBody({ summary: "Second no-key contribution" })), + }); + expect(second.status).toBe(201); + const secondData = (await second.json()) as Json; + + // Without a key, different content produces different CIDs (no blocking) + expect(secondData.cid).not.toBe(firstData.cid); + }); +}); diff --git a/src/server/operation-adapter.ts b/src/server/operation-adapter.ts index ebb21048..b07b6525 100644 --- a/src/server/operation-adapter.ts +++ b/src/server/operation-adapter.ts @@ -26,6 +26,7 @@ export function toOperationDeps(deps: ServerDeps): OperationDeps { frontier: deps.frontier, ...(deps.outcomeStore !== undefined ? { outcomeStore: deps.outcomeStore } : {}), ...(deps.contract !== undefined ? { contract: deps.contract } : {}), + ...(deps.idempotencyStore !== undefined ? { idempotencyStore: deps.idempotencyStore } : {}), }; } diff --git a/src/server/routes/contributions.ts b/src/server/routes/contributions.ts index bb8a81c3..593e3795 100644 --- a/src/server/routes/contributions.ts +++ b/src/server/routes/contributions.ts @@ -227,6 +227,9 @@ contributions.post("/", async (c) => { artifacts[name] = contentHash; } + // Extract Idempotency-Key header (RFC 8284 / Stripe convention) + const idempotencyKey = c.req.header("idempotency-key"); + // Build operation input and delegate to shared operations layer const input: ContributeInput = { kind: parsed.kind, @@ -244,6 +247,16 @@ contributions.post("/", async (c) => { : {}), agent: parsed.agent, ...(parsed.createdAt !== undefined ? { createdAt: parsed.createdAt } : {}), + // Scope idempotency key by sessionId so the same key in different + // sessions is treated as distinct — prevents cross-session cache + // contamination and policy bypass. + ...(idempotencyKey !== undefined + ? { + idempotencyKey: parsed.sessionId + ? `${idempotencyKey}\x01${parsed.sessionId}` + : idempotencyKey, + } + : {}), }; let opDeps = toOperationDeps(serverDeps); diff --git a/src/server/serve.ts b/src/server/serve.ts index 658f652f..48307cff 100644 --- a/src/server/serve.ts +++ b/src/server/serve.ts @@ -90,6 +90,7 @@ const deps: ServerDeps = { gossip: gossipService, topology: runtime.contract?.topology, contract: runtime.contract, + idempotencyStore: runtime.idempotencyStore, }; const app = createApp(deps);