diff --git a/src/cli/commands/inbox.test.ts b/src/cli/commands/inbox.test.ts new file mode 100644 index 0000000..f4c341e --- /dev/null +++ b/src/cli/commands/inbox.test.ts @@ -0,0 +1,243 @@ +/** + * Tests for `grove inbox` command — specifically the send path's + * contract enforcement (Codex adversarial review finding #3). + * + * Before the fix, `grove inbox send` fed sendMessageAsDiscussion a + * bare OperationDeps with no contract and no handoff store, so + * GROVE.md role-kind rules did not apply to CLI message sends — + * agents could bypass allowed_kinds=['work'] via the CLI even when + * the same operation was blocked via MCP. + * + * These tests run the actual handleInbox function (not an internal + * helper) so they exercise the full command bootstrap path: locate + * .grove, open SQLite stores, read GROVE.md, wrap with + * EnforcingContributionStore, build OperationDeps, call + * sendMessageAsDiscussion. + */ + +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdir, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { initSqliteDb, SqliteContributionStore } from "../../local/sqlite-store.js"; +import { handleInbox } from "./inbox.js"; +import type { InitOptions } from "./init.js"; +import { executeInit } from "./init.js"; + +async function createTempDir(): Promise { + const dir = join( + tmpdir(), + `grove-inbox-test-${Date.now()}-${Math.random().toString(36).slice(2)}`, + ); + await mkdir(dir, { recursive: true }); + return dir; +} + +function makeInitOptions(cwd: string): InitOptions { + return { + name: "test-grove", + mode: "evaluation", + seed: [], + metric: [], + force: false, + agentOverrides: { agentId: "test-agent" }, + cwd, + }; +} + +/** + * Capture console.error / console.log / process.exitCode for the + * duration of a handleInbox call so we can observe the command's + * side effects without mocking. + */ +async function runInbox( + args: readonly string[], + groveOverride: string, +): Promise<{ stdout: string[]; stderr: string[]; exitCode: number | undefined }> { + const stdout: string[] = []; + const stderr: string[] = []; + const origLog = console.log; + const origError = console.error; + const origExitCode = process.exitCode; + process.exitCode = undefined; + console.log = (...args) => { + stdout.push(args.map((a) => (typeof a === "string" ? a : JSON.stringify(a))).join(" ")); + }; + console.error = (...args) => { + stderr.push(args.map((a) => (typeof a === "string" ? a : JSON.stringify(a))).join(" ")); + }; + try { + await handleInbox(args, groveOverride); + } finally { + console.log = origLog; + console.error = origError; + } + const exitCode = process.exitCode; + process.exitCode = origExitCode; + return { stdout, stderr, exitCode }; +} + +describe("grove inbox send — contract enforcement (Codex finding #3)", () => { + let dir: string; + + beforeEach(async () => { + dir = await createTempDir(); + }); + + afterEach(async () => { + await rm(dir, { recursive: true, force: true }); + }); + + test("succeeds when GROVE.md does not exist (no contract, no enforcement)", async () => { + await executeInit(makeInitOptions(dir)); + // Overwrite the default GROVE.md with nothing by deleting it. + await rm(join(dir, "GROVE.md"), { force: true }); + + const { exitCode, stdout } = await runInbox( + ["send", "plain body", "--to", "@reviewer", "--agent-id", "alice", "--json"], + join(dir, ".grove"), + ); + expect(exitCode).not.toBe(1); + expect(stdout.join("\n")).toMatch(/"cid"\s*:/); + }); + + test("succeeds when contract allows discussion kind", async () => { + await executeInit(makeInitOptions(dir)); + // Contract with no agent_constraints — everything allowed. + await writeFile( + join(dir, "GROVE.md"), + `--- +contract_version: 3 +name: test-grove +mode: exploration +--- +`, + "utf-8", + ); + + const { exitCode } = await runInbox( + ["send", "allowed body", "--to", "@reviewer", "--agent-id", "alice", "--json"], + join(dir, ".grove"), + ); + expect(exitCode).not.toBe(1); + + // Verify the contribution actually landed. + const db = initSqliteDb(join(dir, ".grove", "grove.db")); + const store = new SqliteContributionStore(db); + try { + const all = await store.list({ kind: "discussion" }); + expect(all.length).toBeGreaterThanOrEqual(1); + expect(all.some((c) => c.context?.message_body === "allowed body")).toBe(true); + } finally { + store.close(); + } + }); + + // ------------------------------------------------------------------------- + // Codex round 3 finding #1: malformed GROVE.md must fail closed. + // + // An earlier version of this patch wrapped readFile + parseGroveContract + // in one broad try/catch. A YAML syntax error was indistinguishable from + // "file does not exist", so a broken contract silently fell through to + // the unenforced path — reopening the exact bypass the enforcement fix + // was supposed to close. + // + // Fix: only swallow ENOENT; let parse errors propagate. This test writes + // a GROVE.md with invalid YAML and asserts handleInbox rejects the send + // with an error (not a silent success). + // ------------------------------------------------------------------------- + test("fails closed when GROVE.md is malformed (YAML parse error)", async () => { + await executeInit(makeInitOptions(dir)); + // Malformed YAML frontmatter — unclosed bracket, invalid structure. + await writeFile( + join(dir, "GROVE.md"), + `--- +contract_version: 3 +name: test-grove +mode: evaluation +agent_constraints: + allowed_kinds: [work +--- +`, + "utf-8", + ); + + // handleInbox re-throws parse errors through to the CLI dispatcher; + // the test wrapper captures uncaught rejections so we assert on that. + let caughtError: unknown; + try { + await runInbox( + ["send", "should-fail", "--to", "@reviewer", "--agent-id", "alice"], + join(dir, ".grove"), + ); + } catch (err) { + caughtError = err; + } + + // Either the command threw synchronously OR exited non-zero with an + // error visible. In both cases: no discussion contribution landed. + const db = initSqliteDb(join(dir, ".grove", "grove.db")); + const store = new SqliteContributionStore(db); + try { + const discussions = await store.list({ kind: "discussion" }); + expect(discussions).toHaveLength(0); + } finally { + store.close(); + } + + // And we surfaced *some* signal of failure (thrown or exit=1). + // The important property is: we did NOT silently succeed with no + // enforcement — which is what the previous code did. + expect(caughtError).toBeDefined(); + }); + + // ------------------------------------------------------------------------- + // Codex round 2 finding #3: contract enforcement must apply to + // the CLI inbox send path, not only the MCP path. + // ------------------------------------------------------------------------- + test("rejects when contract restricts allowed_kinds to ['work']", async () => { + await executeInit(makeInitOptions(dir)); + // Contract that blocks discussion contributions. + await writeFile( + join(dir, "GROVE.md"), + `--- +contract_version: 3 +name: test-grove +mode: evaluation +agent_constraints: + allowed_kinds: [work] +--- +`, + "utf-8", + ); + + const { exitCode, stderr } = await runInbox( + [ + "send", + "should-be-blocked", + "--to", + "@reviewer", + "--agent-id", + "alice", + "--agent-name", + "Alice", + ], + join(dir, ".grove"), + ); + + // Must fail with a non-zero exit. + expect(exitCode).toBe(1); + // Must surface the role_kind violation to stderr. + expect(stderr.join("\n")).toMatch(/not allowed to submit kind 'discussion'/); + + // And crucially: NO discussion contribution should land in the DAG. + const db = initSqliteDb(join(dir, ".grove", "grove.db")); + const store = new SqliteContributionStore(db); + try { + const discussions = await store.list({ kind: "discussion" }); + expect(discussions).toHaveLength(0); + } finally { + store.close(); + } + }); +}); diff --git a/src/cli/commands/inbox.ts b/src/cli/commands/inbox.ts index 5ee343e..ada6de7 100644 --- a/src/cli/commands/inbox.ts +++ b/src/cli/commands/inbox.ts @@ -6,12 +6,13 @@ * grove inbox read [--from ] [--since ] [--limit ] [--json] */ +import { readFile } from "node:fs/promises"; +import { join } from "node:path"; import { parseArgs } from "node:util"; -import { createContribution } from "../../core/manifest.js"; -import type { ContributionInput } from "../../core/models.js"; import type { AgentOverrides } from "../../core/operations/agent.js"; import { resolveAgent } from "../../core/operations/agent.js"; -import { readInbox, sendMessage } from "../../core/operations/messaging.js"; +import type { OperationDeps } from "../../core/operations/deps.js"; +import { readInbox, sendMessageAsDiscussion } from "../../core/operations/messaging.js"; import { formatTable, formatTimestamp, outputJson } from "../format.js"; // --------------------------------------------------------------------------- @@ -64,40 +65,103 @@ async function handleSend(args: readonly string[], groveOverride?: string): Prom return; } - const { initCliDeps } = await import("../context.js"); - const deps = initCliDeps(process.cwd(), groveOverride); + // Mirror the `grove discuss` bootstrap so the CLI send path goes through + // the same contract-enforcement pipeline as the MCP path. Previously this + // command wrapped a bare `CliDeps` which carried no contract, so GROVE.md + // role-kind rules and rate limits silently did not apply to inbox sends — + // a loophole the MCP fix didn't close. + const { resolveGroveDir } = await import("../utils/grove-dir.js"); + const { createSqliteStores } = await import("../../local/sqlite-store.js"); + const { FsCas } = await import("../../local/fs-cas.js"); + const { DefaultFrontierCalculator } = await import("../../core/frontier.js"); + const { parseGroveContract } = await import("../../core/contract.js"); + const { EnforcingContributionStore } = await import("../../core/enforcing-store.js"); + + const { groveDir, dbPath } = resolveGroveDir(groveOverride); + const stores = createSqliteStores(dbPath); + const cas = new FsCas(join(groveDir, "cas")); + const frontier = new DefaultFrontierCalculator(stores.contributionStore); + + // Load GROVE.md from the grove root (parent of .grove/). + // + // Separate the readFile catch from the parse call so a MALFORMED contract + // fails closed instead of silently falling through to unenforced mode. + // The first pass of this patch combined both into one try/catch, which + // meant a YAML syntax error would be indistinguishable from "file does + // not exist" — any broken contract file reopened the CLI bypass we're + // supposed to be closing. + // + // ENOENT is the only acceptable fallthrough. Everything else (parse + // errors, permission denied, schema validation) propagates to the + // outer error handler so the operator sees the failure. + const groveRoot = join(groveDir, ".."); + const grovemdPath = join(groveRoot, "GROVE.md"); + let contract: Awaited> | undefined; + let grovemdContent: string | undefined; + try { + grovemdContent = await readFile(grovemdPath, "utf-8"); + } catch (err) { + const code = (err as NodeJS.ErrnoException)?.code; + if (code !== "ENOENT") { + // Permission error, I/O error, etc. — surface loudly. + throw err; + } + // GROVE.md does not exist — proceed without enforcement, same as + // `grove discuss` in a grove without a contract. + } + if (grovemdContent !== undefined) { + // parseGroveContract intentionally runs OUTSIDE the catch: YAML/schema + // errors must propagate, not be swallowed as "no contract". + contract = parseGroveContract(grovemdContent); + } + + // Wrap with EnforcingContributionStore when a contract exists. Without + // this, rate-limits / allowed-kinds / clock-skew checks would not fire + // for inbox sends even when configured in GROVE.md. + const contributionStore = contract + ? new EnforcingContributionStore(stores.contributionStore, contract, { cas }) + : stores.contributionStore; try { const agentOverrides: AgentOverrides = { agentId: values["agent-id"] as string | undefined, agentName: values["agent-name"] as string | undefined, }; - const agent = resolveAgent(agentOverrides); - const computeCid = (input: ContributionInput): string => { - return createContribution(input).cid; + const opDeps: OperationDeps = { + contributionStore, + claimStore: stores.claimStore, + cas, + frontier, + handoffStore: stores.handoffStore, + ...(contract !== undefined ? { contract } : {}), }; - const result = await sendMessage( - deps.store, + const result = await sendMessageAsDiscussion( { - agent, + agent: agentOverrides, body, recipients, - inReplyTo: values["reply-to"] as string | undefined, + ...(values["reply-to"] !== undefined ? { inReplyTo: values["reply-to"] as string } : {}), tags: (values.tag ?? []) as string[], }, - computeCid, + opDeps, ); + if (!result.ok) { + console.error(`Error: ${result.error.message}`); + process.exitCode = 1; + return; + } + if (values.json) { - outputJson({ cid: result.cid, recipients, body }); + outputJson({ cid: result.value.cid, recipients, body }); } else { - console.log(`Message sent: ${result.cid}`); + console.log(`Message sent: ${result.value.cid}`); console.log(` to: ${recipients.join(", ")}`); } } finally { - deps.close(); + stores.contributionStore.close(); } } diff --git a/src/core/enforcing-store.test.ts b/src/core/enforcing-store.test.ts index dd1d8cd..f0c8705 100644 --- a/src/core/enforcing-store.test.ts +++ b/src/core/enforcing-store.test.ts @@ -1462,3 +1462,101 @@ describe("EnforcingContributionStore delegation", () => { } }); }); + +// --------------------------------------------------------------------------- +// EnforcingContributionStore — putWithCowrite preservation (Codex finding #3) +// --------------------------------------------------------------------------- +// +// contributeOperation's capability detection looks for `putWithCowrite` on +// the contribution store. Without the wrapper exposing that method, +// wrapping a SQLite store with EnforcingContributionStore would silently +// drop local MCP sessions from the atomic contribution+handoff write +// path to the best-effort serial path — leaving orphaned contributions +// on handoff failure. + +describe("EnforcingContributionStore: putWithCowrite preservation", () => { + test("exposes putWithCowrite when inner store supports it", async () => { + const { dir, db, contributionStore } = await setupStores(); + try { + const contract = makeContract(); + const store = new EnforcingContributionStore(contributionStore, contract); + + // The wrapper must forward putWithCowrite so contributeOperation's + // capability check detects the atomic cowrite path. + expect(typeof (store as { putWithCowrite?: unknown }).putWithCowrite).toBe("function"); + + // Inner SQLite store also has it — confirms the pattern the wrapper + // delegates to. + expect( + typeof (contributionStore as unknown as { putWithCowrite?: unknown }).putWithCowrite, + ).toBe("function"); + } finally { + await cleanup(dir, db); + } + }); + + test("putWithCowrite runs enforcement then delegates to inner cowrite", async () => { + const { dir, db, contributionStore } = await setupStores(); + try { + const contract = makeContract({ + rateLimits: { maxContributionsPerAgentPerHour: 5 }, + }); + const store = new EnforcingContributionStore(contributionStore, contract); + + const contribution = makeRecentContribution({ summary: "atomic-cowrite" }); + + // Observable: the cowrite callback fires synchronously (inside the + // SQLite transaction) before putWithCowrite's Promise resolves. + let cowriteRan = false; + await ( + store as unknown as { + putWithCowrite: (c: typeof contribution, fn: () => void) => Promise; + } + ).putWithCowrite(contribution, () => { + cowriteRan = true; + }); + + expect(cowriteRan).toBe(true); + const stored = await contributionStore.get(contribution.cid); + expect(stored?.cid).toBe(contribution.cid); + } finally { + await cleanup(dir, db); + } + }); + + test("putWithCowrite still enforces rate limits", async () => { + const { dir, db, contributionStore } = await setupStores(); + try { + const contract = makeContract({ + rateLimits: { maxContributionsPerAgentPerHour: 1 }, + }); + const store = new EnforcingContributionStore(contributionStore, contract); + const cowriteFn = (): void => { + /* no handoffs in this test */ + }; + type CowriteStore = { putWithCowrite: (c: unknown, fn: () => void) => Promise }; + + // First contribution from agent-X: succeeds. + const c1 = makeRecentContribution({ + summary: "first", + agent: { agentId: "agent-X" }, + }); + await (store as unknown as CowriteStore).putWithCowrite(c1, cowriteFn); + + // Second contribution from same agent: rate-limited. + const c2 = makeRecentContribution({ + summary: "second", + agent: { agentId: "agent-X" }, + }); + await expect( + (store as unknown as CowriteStore).putWithCowrite(c2, cowriteFn), + ).rejects.toThrow(RateLimitError); + + // Confirm the second was never written. + const stored = await contributionStore.get(c2.cid); + expect(stored).toBeUndefined(); + } finally { + await cleanup(dir, db); + } + }); +}); diff --git a/src/core/enforcing-store.ts b/src/core/enforcing-store.ts index 47f644c..11d330a 100644 --- a/src/core/enforcing-store.ts +++ b/src/core/enforcing-store.ts @@ -184,6 +184,62 @@ export class EnforcingContributionStore implements ContributionStore { }); }; + /** + * Atomic contribution + handoff cowrite, delegated to the inner store. + * + * Without this method, wrapping a SqliteContributionStore with + * EnforcingContributionStore hides the atomic-cowrite capability from + * `contributeOperation`'s capability check, forcing it to fall back to + * the serial (non-atomic) write path and leaving orphaned contributions + * on handoff insertion failure. + * + * Flow: + * 1. Acquire the shared write mutex (same as put()) + * 2. Run rate-limit / clock-skew enforcement + * 3. Run the per-CID preWriteHook (for TOCTOU-safe policy enforcement) + * 4. Delegate to inner.putWithCowrite if available (SQLite) — the + * inner store runs the cowrite sync inside a single transaction + * 5. If the inner store doesn't support cowrite (unexpected when the + * caller already checked), fall back to a serial put + sync fn + * as a defensive last resort + * + * Async because enforcement involves a `countSince` DB query. The + * cowrite callback itself is sync (runs inside the inner transaction). + * Callers (`writeContributionWithHandoffs`) must handle the Promise. + */ + putWithCowrite = async (contribution: Contribution, cowriteFn: () => void): Promise => { + return this.writeMutex.runExclusive(async () => { + const existing = await this.inner.get(contribution.cid); + if (existing !== undefined) { + // Idempotent: contribution already present, skip enforcement and + // the cowrite (no handoffs to insert for an existing row). + return; + } + + await this.enforceContributionLimits(contribution, 0, []); + const hook = this.preWriteHooks.get(contribution.cid); + if (hook) { + this.preWriteHooks.delete(contribution.cid); + await hook(contribution); + } + + const innerCowrite = ( + this.inner as unknown as { + putWithCowrite?: (c: Contribution, fn: () => void) => void; + } + ).putWithCowrite; + if (innerCowrite !== undefined) { + innerCowrite.call(this.inner, contribution, cowriteFn); + } else { + // Defensive fallback — inner store does not support atomic cowrite. + // Do the best we can: put the contribution, then run the cowrite fn + // serially. This loses atomicity but matches the serial write path. + await this.inner.put(contribution); + cowriteFn(); + } + }); + }; + putMany = async (contributions: readonly Contribution[]): Promise => { return this.writeMutex.runExclusive(async () => { // Filter out already-existing CIDs and intra-batch duplicates. diff --git a/src/core/handoff.ts b/src/core/handoff.ts index 4ccc69e..f91d26b 100644 --- a/src/core/handoff.ts +++ b/src/core/handoff.ts @@ -38,6 +38,20 @@ export interface HandoffQuery { export interface HandoffStore { create(input: HandoffInput): Promise; + /** + * Create multiple handoff records in a single round-trip. + * + * Used by the contributeOperation serial write path to avoid an N+1 + * pattern when fanning a contribution out to multiple downstream roles + * (e.g., coder → [reviewer, tester, auditor] would otherwise pay + * 3×rtt against a remote handoff store). + * + * Implementations should preserve input order in the returned array. + * Default implementation calls create() in a loop — override with + * a single batch operation when the backing store supports it + * (e.g., one HTTP POST for Nexus, one BEGIN/COMMIT for SQLite). + */ + createMany?(inputs: readonly HandoffInput[]): Promise; get(id: string): Promise; list(query?: HandoffQuery): Promise; markDelivered(id: string): Promise; diff --git a/src/core/operations/context-schemas.test.ts b/src/core/operations/context-schemas.test.ts new file mode 100644 index 0000000..284879f --- /dev/null +++ b/src/core/operations/context-schemas.test.ts @@ -0,0 +1,151 @@ +import { describe, expect, test } from "bun:test"; + +import { + buildMessageContext, + buildPlanContext, + isEphemeralMessageContext, + type PlanTask, + parseMessageContext, + parsePlanContext, +} from "./context-schemas.js"; + +// --------------------------------------------------------------------------- +// PlanContext +// --------------------------------------------------------------------------- + +describe("buildPlanContext + parsePlanContext", () => { + const tasks: readonly PlanTask[] = [ + { id: "t1", title: "Design API", status: "todo" }, + { id: "t2", title: "Write tests", status: "in_progress", assignee: "@alice" }, + { id: "t3", title: "Ship", status: "done" }, + ]; + + test("round-trips title and tasks", () => { + const ctx = buildPlanContext({ title: "Phase 1", tasks }); + expect(ctx.plan_title).toBe("Phase 1"); + const parsed = parsePlanContext(ctx); + expect(parsed?.plan_title).toBe("Phase 1"); + expect(parsed?.tasks).toEqual([...tasks]); + }); + + test("parse returns undefined when context is missing", () => { + expect(parsePlanContext(undefined)).toBeUndefined(); + }); + + test("parse returns undefined when plan_title is missing", () => { + expect(parsePlanContext({ tasks: tasks as unknown as never })).toBeUndefined(); + }); + + test("parse returns undefined when tasks is missing", () => { + expect(parsePlanContext({ plan_title: "Phase 1" })).toBeUndefined(); + }); + + test("parse returns undefined when task status is invalid", () => { + const bad = { + plan_title: "Phase 1", + tasks: [{ id: "t1", title: "Bad", status: "wibble" }], + }; + expect(parsePlanContext(bad as never)).toBeUndefined(); + }); + + test("parse returns undefined when task missing required fields", () => { + const bad = { plan_title: "Phase 1", tasks: [{ id: "t1" }] }; + expect(parsePlanContext(bad as never)).toBeUndefined(); + }); + + test("parse accepts task without assignee", () => { + const ctx = buildPlanContext({ + title: "P", + tasks: [{ id: "t1", title: "Solo", status: "todo" }], + }); + const parsed = parsePlanContext(ctx); + expect(parsed?.tasks[0]?.assignee).toBeUndefined(); + }); + + test("parse rejects context with non-array tasks", () => { + expect(parsePlanContext({ plan_title: "P", tasks: "not-an-array" as never })).toBeUndefined(); + }); +}); + +// --------------------------------------------------------------------------- +// MessageContext +// --------------------------------------------------------------------------- + +describe("buildMessageContext + parseMessageContext", () => { + test("round-trips recipients and body", () => { + const ctx = buildMessageContext({ recipients: ["@bob"], body: "hello" }); + expect(ctx.ephemeral).toBe(true); + expect(ctx.recipients).toEqual(["@bob"]); + expect(ctx.message_body).toBe("hello"); + const parsed = parseMessageContext(ctx); + expect(parsed?.recipients).toEqual(["@bob"]); + expect(parsed?.message_body).toBe("hello"); + }); + + test("multi-recipient round trip", () => { + const ctx = buildMessageContext({ recipients: ["@bob", "@alice", "@all"], body: "hi" }); + const parsed = parseMessageContext(ctx); + expect(parsed?.recipients).toEqual(["@bob", "@alice", "@all"]); + }); + + test("parse returns undefined when context is missing", () => { + expect(parseMessageContext(undefined)).toBeUndefined(); + }); + + test("parse returns undefined when ephemeral is false", () => { + expect( + parseMessageContext({ + ephemeral: false, + recipients: ["@bob"], + message_body: "hi", + } as never), + ).toBeUndefined(); + }); + + test("parse returns undefined when ephemeral is missing", () => { + expect(parseMessageContext({ recipients: ["@bob"], message_body: "hi" })).toBeUndefined(); + }); + + test("parse returns undefined when recipients is empty", () => { + expect( + parseMessageContext({ ephemeral: true, recipients: [], message_body: "hi" }), + ).toBeUndefined(); + }); + + test("parse returns undefined when message_body is missing", () => { + expect(parseMessageContext({ ephemeral: true, recipients: ["@bob"] })).toBeUndefined(); + }); + + test("parse rejects unrelated context that happens to have similar keys", () => { + expect( + parseMessageContext({ ephemeral: 1, recipients: ["@bob"], message_body: "hi" } as never), + ).toBeUndefined(); + }); +}); + +// --------------------------------------------------------------------------- +// isEphemeralMessageContext +// --------------------------------------------------------------------------- + +describe("isEphemeralMessageContext", () => { + test("true when context.ephemeral === true", () => { + expect(isEphemeralMessageContext({ ephemeral: true })).toBe(true); + }); + + test("false when context is undefined", () => { + expect(isEphemeralMessageContext(undefined)).toBe(false); + }); + + test("false when ephemeral is missing", () => { + expect(isEphemeralMessageContext({ recipients: ["@bob"] })).toBe(false); + }); + + test("false when ephemeral is the literal false", () => { + expect(isEphemeralMessageContext({ ephemeral: false })).toBe(false); + }); + + test("false when ephemeral is a truthy non-true value", () => { + expect(isEphemeralMessageContext({ ephemeral: 1 as never })).toBe(false); + expect(isEphemeralMessageContext({ ephemeral: "true" as never })).toBe(false); + }); +}); diff --git a/src/core/operations/context-schemas.ts b/src/core/operations/context-schemas.ts new file mode 100644 index 0000000..0c2438c --- /dev/null +++ b/src/core/operations/context-schemas.ts @@ -0,0 +1,150 @@ +/** + * Typed context schemas for kind-specific contribution data. + * + * Plans and messages stuff structured fields into the generic + * `context: Record` field on a contribution. Without a + * central schema, every read site uses unsafe casts (`as string`, + * `as PlanTask[]`) and every write site is a free-form object literal. + * + * This module is the single source of truth for those magic context keys: + * + * - `PlanContext` — fields stored on `kind=plan` contributions + * - `MessageContext` — fields stored on `kind=discussion` contributions + * that are messages (vs. regular discussions) + * + * Each schema has a builder (writer side) and a parser (reader side). + * The builder returns a `Record` ready to drop into + * `ContributeInput.context`. The parser narrows untyped context into a + * typed object or returns undefined when the context doesn't match. + */ + +import { z } from "zod"; + +import type { JsonValue } from "../models.js"; + +// --------------------------------------------------------------------------- +// Plan context +// --------------------------------------------------------------------------- + +export const PLAN_TASK_STATUSES = ["todo", "in_progress", "done", "blocked"] as const; +export type PlanTaskStatus = (typeof PLAN_TASK_STATUSES)[number]; + +export interface PlanTask { + readonly id: string; + readonly title: string; + readonly status: PlanTaskStatus; + readonly assignee?: string | undefined; +} + +export interface PlanContext { + readonly plan_title: string; + readonly tasks: readonly PlanTask[]; +} + +const PlanTaskSchema = z.object({ + id: z.string(), + title: z.string(), + status: z.enum(PLAN_TASK_STATUSES), + assignee: z.string().optional(), +}); + +const PlanContextSchema = z.object({ + plan_title: z.string(), + tasks: z.array(PlanTaskSchema), +}); + +/** + * Build a Plan context payload for `ContributeInput.context`. + * Encapsulates the field names so writers don't have to remember the + * magic keys. + */ +export function buildPlanContext(input: { + readonly title: string; + readonly tasks: readonly PlanTask[]; +}): Record { + return { + plan_title: input.title, + tasks: input.tasks as unknown as JsonValue, + }; +} + +/** + * Parse the plan-specific fields out of an untyped contribution context. + * Returns undefined when the context is missing or malformed (no exception + * — readers can treat this as "not a plan context"). + */ +export function parsePlanContext( + context: Readonly> | undefined, +): PlanContext | undefined { + if (context === undefined) return undefined; + const result = PlanContextSchema.safeParse(context); + return result.success ? result.data : undefined; +} + +// --------------------------------------------------------------------------- +// Message context +// --------------------------------------------------------------------------- + +export interface MessageContext { + readonly ephemeral: true; + readonly recipients: readonly string[]; + readonly message_body: string; +} + +const MessageContextSchema = z.object({ + ephemeral: z.literal(true), + recipients: z.array(z.string()).min(1), + message_body: z.string(), +}); + +/** + * Build a Message context payload for `ContributeInput.context`. + * Messages are ephemeral discussions with addressed recipients — this + * helper sets the ephemeral marker, recipients list, and body verbatim. + */ +export function buildMessageContext(input: { + readonly recipients: readonly string[]; + readonly body: string; +}): Record { + return { + ephemeral: true, + recipients: [...input.recipients], + message_body: input.body, + }; +} + +/** + * Parse the message-specific fields out of an untyped contribution context. + * Returns undefined when the context is missing or doesn't have the + * ephemeral message shape (e.g., a regular non-ephemeral discussion). + */ +export function parseMessageContext( + context: Readonly> | undefined, +): MessageContext | undefined { + if (context === undefined) return undefined; + const result = MessageContextSchema.safeParse(context); + return result.success ? result.data : undefined; +} + +/** + * Quick predicate: true when a context has the ephemeral flag set. + * + * "Ephemeral" means the contribution should skip topology routing and + * handoff creation — it's coordination noise, not work. Callers include + * `contributeOperation` (to skip the routing/handoff path for chat and + * session terminators) and `readInbox` (indirectly, via + * `parseMessageContext` which requires ephemeral=true). + * + * Two known shapes trigger this: + * - Chat messages: { ephemeral: true, recipients: [...], message_body: "..." } + * - grove_done markers: { ephemeral: true, done: true, reason: "..." } + * + * The name includes "Message" for historical reasons — originally this + * predicate was only used by the messaging path. It's now a general + * "should this contribution be treated as ephemeral?" check. + */ +export function isEphemeralMessageContext( + context: Readonly> | undefined, +): boolean { + return context !== undefined && context.ephemeral === true; +} diff --git a/src/core/operations/contribute-routing.test.ts b/src/core/operations/contribute-routing.test.ts index d0a76bb..d022044 100644 --- a/src/core/operations/contribute-routing.test.ts +++ b/src/core/operations/contribute-routing.test.ts @@ -456,3 +456,401 @@ describe("contributeOperation: hook execution", () => { expect(hookCalls).toHaveLength(0); }); }); + +// --------------------------------------------------------------------------- +// Plan + ephemeral message routing semantics (Issues 1A + 13A + 12A) +// --------------------------------------------------------------------------- +// +// These tests pin down the per-kind routing rules locked in during the +// #228 review. They prevent 1A from being a silent behavior change. +// +// kind | handoffs | route event | stop conditions +// plan | no | yes | no +// ephemeral msg | no | no | no +// work / discuss | yes | yes | yes +// +// Without these tests, anyone refactoring the kind-based skip logic +// in contribute.ts could silently change semantics. + +describe("contributeOperation: plan and ephemeral routing rules", () => { + test("plan kind fires the routing event but creates no handoff", async () => { + const bus = new LocalEventBus(); + const router = new TopologyRouter(reviewLoopTopology, bus); + const store = makeInMemoryContributionStore(); + + const received: GroveEvent[] = []; + bus.subscribe("reviewer", (e) => received.push(e)); + + // Spy handoff store: tracks any create() calls. + const handoffCreates: unknown[] = []; + const handoffStore = { + create: async (input: unknown) => { + handoffCreates.push(input); + return { handoffId: "fake-handoff" }; + }, + get: async () => undefined, + list: async () => [], + markDelivered: async () => undefined, + markReplied: async () => undefined, + expireStale: async () => [], + countPending: async () => 0, + close: () => undefined, + } as unknown as NonNullable; + + const deps: OperationDeps = { + contributionStore: store, + topologyRouter: router, + eventBus: bus, + handoffStore, + }; + + const result = await contributeOperation( + { + kind: "plan", + mode: "exploration", + summary: "Plan: routed but no handoff", + context: { plan_title: "P", tasks: [] }, + agent: { agentId: "planner-1", role: "coder" }, + }, + deps, + ); + + expect(result.ok).toBe(true); + + // Plans skip handoffs entirely. + expect(handoffCreates).toHaveLength(0); + if (result.ok) { + expect(result.value.handoffIds).toBeUndefined(); + } + + // But the routing event still fires (so live UIs can observe plan creation). + // Wait a tick because the event is fire-and-forget. + await new Promise((r) => setTimeout(r, 5)); + expect(received).toHaveLength(1); + expect(received[0]!.payload.kind).toBe("plan"); + + bus.close(); + }); + + test("ephemeral message kind skips both routing event AND handoffs", async () => { + const bus = new LocalEventBus(); + const router = new TopologyRouter(reviewLoopTopology, bus); + const store = makeInMemoryContributionStore(); + + const received: GroveEvent[] = []; + bus.subscribe("reviewer", (e) => received.push(e)); + + const handoffCreates: unknown[] = []; + const handoffStore = { + create: async (input: unknown) => { + handoffCreates.push(input); + return { handoffId: "fake-handoff" }; + }, + get: async () => undefined, + list: async () => [], + markDelivered: async () => undefined, + markReplied: async () => undefined, + expireStale: async () => [], + countPending: async () => 0, + close: () => undefined, + } as unknown as NonNullable; + + const deps: OperationDeps = { + contributionStore: store, + topologyRouter: router, + eventBus: bus, + handoffStore, + }; + + const result = await contributeOperation( + { + kind: "discussion", + mode: "exploration", + summary: "chat", + context: { ephemeral: true, recipients: ["@reviewer"], message_body: "hi" }, + agent: { agentId: "coder-1", role: "coder" }, + }, + deps, + ); + + expect(result.ok).toBe(true); + + // Ephemeral messages skip handoffs. + expect(handoffCreates).toHaveLength(0); + + // And skip the routing event. + await new Promise((r) => setTimeout(r, 5)); + expect(received).toHaveLength(0); + + bus.close(); + }); + + test("non-ephemeral discussion routes normally (creates handoff and event)", async () => { + const bus = new LocalEventBus(); + const router = new TopologyRouter(reviewLoopTopology, bus); + const store = makeInMemoryContributionStore(); + + const received: GroveEvent[] = []; + bus.subscribe("reviewer", (e) => received.push(e)); + + const handoffCreates: unknown[] = []; + const handoffStore = { + create: async (input: unknown) => { + handoffCreates.push(input); + return { handoffId: "fake-handoff" }; + }, + get: async () => undefined, + list: async () => [], + markDelivered: async () => undefined, + markReplied: async () => undefined, + expireStale: async () => [], + countPending: async () => 0, + close: () => undefined, + } as unknown as NonNullable; + + const deps: OperationDeps = { + contributionStore: store, + topologyRouter: router, + eventBus: bus, + handoffStore, + }; + + const result = await contributeOperation( + { + kind: "discussion", + mode: "exploration", + summary: "structured discussion", + // Note: NO ephemeral flag — this is a regular discussion contribution + agent: { agentId: "coder-1", role: "coder" }, + }, + deps, + ); + + expect(result.ok).toBe(true); + + // Regular discussions DO generate handoffs and route events. + expect(handoffCreates).toHaveLength(1); + if (result.ok) { + expect(result.value.handoffIds).toBeDefined(); + expect(result.value.handoffIds).toHaveLength(1); + } + + await new Promise((r) => setTimeout(r, 5)); + expect(received).toHaveLength(1); + expect(received[0]!.payload.kind).toBe("discussion"); + + bus.close(); + }); + + test("plan does not trigger broadcastStop (Issue 13A: stop conditions skipped)", async () => { + const bus = new LocalEventBus(); + const router = new TopologyRouter(reviewLoopTopology, bus); + const store = makeInMemoryContributionStore(); + + const coderStops: GroveEvent[] = []; + const reviewerStops: GroveEvent[] = []; + bus.subscribe("coder", (e) => { + if (e.type === "stop") coderStops.push(e); + }); + bus.subscribe("reviewer", (e) => { + if (e.type === "stop") reviewerStops.push(e); + }); + + // Pre-populate so a budget=1 stop condition would normally fire. + await store.put({ + cid: "blake3:0000000000000000000000000000000000000000000000000000000000000099", + manifestVersion: 1, + kind: "work", + mode: "evaluation", + summary: "pre-existing", + artifacts: {}, + relations: [], + tags: [], + agent: { agentId: "agent-0" }, + createdAt: new Date().toISOString(), + }); + + const deps: OperationDeps = { + contributionStore: store, + topologyRouter: router, + eventBus: bus, + contract: { + contractVersion: 2, + name: "plan-skip-stop-test", + stopConditions: { budget: { maxContributions: 1 } }, + }, + }; + + // A plan write would normally cross the budget threshold and trigger + // broadcastStop, but plans skip stop-condition evaluation per Issue 13A. + const result = await contributeOperation( + { + kind: "plan", + mode: "exploration", + summary: "Plan: skip-stop", + context: { plan_title: "Skip", tasks: [] }, + agent: { agentId: "planner-1", role: "coder" }, + }, + deps, + ); + + expect(result.ok).toBe(true); + + // Wait for any fire-and-forget broadcast. + await new Promise((r) => setTimeout(r, 5)); + + // No stop events should have been broadcast — plans bypass stop conditions. + expect(coderStops).toHaveLength(0); + expect(reviewerStops).toHaveLength(0); + if (result.ok) { + expect(result.value.policy?.stopResult?.stopped).not.toBe(true); + } + + bus.close(); + }); + + // ------------------------------------------------------------------------- + // grove_done discussion: session terminator, not new work + // ------------------------------------------------------------------------- + // + // grove_done writes a kind=discussion contribution with context.done=true + // (see src/mcp/tools/done.ts). Done markers are asymmetric from chat: + // + // - SKIP handoff creation: session is ending, downstream has no new + // work to pick up. + // - KEEP the routing event: event-driven clients like + // useDoneDetection() subscribe to contribution events on the bus; + // without the route event firing, they would never observe + // completion and the session would be stranded in "running". + // + // An earlier version of this branch suppressed both handoff AND route + // event for done markers (via isEphemeralMessageContext). Codex round 3 + // caught the regression: event-bus mode disables polling in + // useDoneDetection, so losing the route event meant the UI never + // advanced after grove_done. This test pins the correct asymmetry in + // place so that collapse cannot happen again. + test("grove_done discussion (done=true) skips handoff but fires route event", async () => { + const bus = new LocalEventBus(); + const router = new TopologyRouter(reviewLoopTopology, bus); + const store = makeInMemoryContributionStore(); + + const received: GroveEvent[] = []; + bus.subscribe("coder", (e) => received.push(e)); + + const handoffCreates: unknown[] = []; + const handoffStore = { + create: async (input: unknown) => { + handoffCreates.push(input); + return { handoffId: "fake-handoff" }; + }, + get: async () => undefined, + list: async () => [], + markDelivered: async () => undefined, + markReplied: async () => undefined, + expireStale: async () => [], + countPending: async () => 0, + close: () => undefined, + } as unknown as NonNullable; + + const deps: OperationDeps = { + contributionStore: store, + topologyRouter: router, + eventBus: bus, + handoffStore, + }; + + // Exact shape that src/mcp/tools/done.ts writes when a reviewer approves. + const result = await contributeOperation( + { + kind: "discussion", + mode: "exploration", + summary: "[DONE] Approved — code meets standards", + context: { + done: true, + reason: "Approved — code meets standards", + ephemeral: true, + }, + agent: { agentId: "reviewer-1", role: "reviewer" }, + }, + deps, + ); + + expect(result.ok).toBe(true); + + // No handoff created for the done marker — session is ending. + expect(handoffCreates).toHaveLength(0); + + // Route event DID fire — event-driven done detection depends on this. + await new Promise((r) => setTimeout(r, 5)); + expect(received).toHaveLength(1); + expect(received[0]!.type).toBe("contribution"); + expect(received[0]!.payload.kind).toBe("discussion"); + expect(received[0]!.payload.summary).toMatch(/\[DONE\]/); + + bus.close(); + }); + + // ------------------------------------------------------------------------- + // Ephemeral chat vs done marker: asymmetry test + // ------------------------------------------------------------------------- + // + // Ephemeral chat (context.ephemeral=true WITHOUT context.done) skips + // BOTH handoffs AND the route event — chat is background noise. Done + // markers (context.done=true) skip handoffs but KEEP the route event. + // This test asserts both legs explicitly so future refactors of the + // classification logic can't collapse them back together. + test("ephemeral chat (no done flag) skips both handoff AND route event", async () => { + const bus = new LocalEventBus(); + const router = new TopologyRouter(reviewLoopTopology, bus); + const store = makeInMemoryContributionStore(); + + const received: GroveEvent[] = []; + bus.subscribe("reviewer", (e) => received.push(e)); + + const handoffCreates: unknown[] = []; + const handoffStore = { + create: async (input: unknown) => { + handoffCreates.push(input); + return { handoffId: "fake-handoff" }; + }, + get: async () => undefined, + list: async () => [], + markDelivered: async () => undefined, + markReplied: async () => undefined, + expireStale: async () => [], + countPending: async () => 0, + close: () => undefined, + } as unknown as NonNullable; + + const deps: OperationDeps = { + contributionStore: store, + topologyRouter: router, + eventBus: bus, + handoffStore, + }; + + const result = await contributeOperation( + { + kind: "discussion", + mode: "exploration", + summary: "chat message", + // Shape sendMessageAsDiscussion writes: ephemeral + message fields, + // NO done flag. Must skip everything. + context: { + ephemeral: true, + recipients: ["@reviewer"], + message_body: "hi", + }, + agent: { agentId: "coder-1", role: "coder" }, + }, + deps, + ); + + expect(result.ok).toBe(true); + expect(handoffCreates).toHaveLength(0); + await new Promise((r) => setTimeout(r, 5)); + expect(received).toHaveLength(0); + + bus.close(); + }); +}); diff --git a/src/core/operations/contribute.test.ts b/src/core/operations/contribute.test.ts index 373473b..e537450 100644 --- a/src/core/operations/contribute.test.ts +++ b/src/core/operations/contribute.test.ts @@ -5,6 +5,7 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { + _resetIdempotencyCacheForTests, contributeOperation, discussOperation, reproduceOperation, @@ -224,6 +225,436 @@ describe("contributeOperation", () => { }); }); +describe("contributeOperation: idempotencyKey", () => { + let testDeps: TestOperationDeps; + let deps: FullOperationDeps; + + beforeEach(async () => { + _resetIdempotencyCacheForTests(); + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + _resetIdempotencyCacheForTests(); + }); + + test("repeated call with same key + same input returns the cached result", async () => { + const firstInput = { + kind: "work" as const, + summary: "first call", + agent: { agentId: "agent-1" }, + idempotencyKey: "key-1", + }; + const first = await contributeOperation(firstInput, deps); + expect(first.ok).toBe(true); + if (!first.ok) return; + + // Identical retry — should return the cached result. + const second = await contributeOperation({ ...firstInput }, deps); + expect(second.ok).toBe(true); + if (!second.ok) return; + + // Same CID proves the cache served it. + expect(second.value.cid).toBe(first.value.cid); + expect(second.value.summary).toBe("first call"); + }); + + test("same key + different input is rejected with STATE_CONFLICT", async () => { + // Stripe/AWS Idempotency-Key semantics: reusing a key with a different + // request body is a client bug (the key no longer identifies a single + // logical operation). Reject instead of silently returning the first + // call's result, which would hide the mistake. + const first = await contributeOperation( + { + kind: "work", + summary: "first call", + agent: { agentId: "agent-1" }, + idempotencyKey: "key-1", + }, + deps, + ); + expect(first.ok).toBe(true); + if (!first.ok) return; + + const second = await contributeOperation( + { + kind: "work", + summary: "different summary, same key", + agent: { agentId: "agent-1" }, + idempotencyKey: "key-1", + }, + deps, + ); + expect(second.ok).toBe(false); + if (second.ok) return; + expect(second.error.code).toBe("STATE_CONFLICT"); + expect(second.error.message).toMatch(/different request body/i); + }); + + test("concurrent calls with same key are single-flight (one write)", async () => { + // Two overlapping retries with the same key must NOT both write. + // The first reserves the slot, the second awaits the pending Promise. + const input = { + kind: "work" as const, + summary: "single flight", + agent: { agentId: "agent-1" }, + idempotencyKey: "single-flight-key", + }; + const [r1, r2] = await Promise.all([ + contributeOperation(input, deps), + contributeOperation(input, deps), + ]); + expect(r1.ok && r2.ok).toBe(true); + if (!r1.ok || !r2.ok) return; + // Both callers observe the same CID — single write occurred. + expect(r1.value.cid).toBe(r2.value.cid); + + // Verify via the store: only one contribution with that summary. + const stored = await deps.contributionStore.list({ limit: 20 }); + const matching = stored.filter((c) => c.summary === "single flight"); + expect(matching).toHaveLength(1); + }); + + test("different keys produce different contributions", async () => { + const first = await contributeOperation( + { + kind: "work", + summary: "alpha", + agent: { agentId: "agent-1" }, + idempotencyKey: "key-A", + }, + deps, + ); + const second = await contributeOperation( + { + kind: "work", + summary: "beta", + agent: { agentId: "agent-1" }, + idempotencyKey: "key-B", + }, + deps, + ); + expect(first.ok && second.ok).toBe(true); + if (!first.ok || !second.ok) return; + expect(first.value.cid).not.toBe(second.value.cid); + }); + + test("same key from different agents does not collide", async () => { + const alice = await contributeOperation( + { + kind: "work", + summary: "alice's work", + agent: { agentId: "alice" }, + idempotencyKey: "shared-key", + }, + deps, + ); + const bob = await contributeOperation( + { + kind: "work", + summary: "bob's work", + agent: { agentId: "bob" }, + idempotencyKey: "shared-key", + }, + deps, + ); + expect(alice.ok && bob.ok).toBe(true); + if (!alice.ok || !bob.ok) return; + expect(alice.value.cid).not.toBe(bob.value.cid); + expect(alice.value.summary).toBe("alice's work"); + expect(bob.value.summary).toBe("bob's work"); + }); + + test("agent role scopes idempotency: identical payload from two coders shares cache", async () => { + // When the agent has a role, the idempotency namespace is per-role — + // two agent instances of the same role submitting the SAME logical + // request (identical fingerprint) are treated as retries of one call. + // + // This models multi-instance roles (e.g., max_instances=2 for coder) + // where either instance might retry the same work submission. + const sharedPayload = { + kind: "work" as const, + summary: "shared coder work", + idempotencyKey: "coder-shared-key", + }; + const first = await contributeOperation( + { ...sharedPayload, agent: { agentId: "coder-instance-1", role: "coder" } }, + deps, + ); + const second = await contributeOperation( + { ...sharedPayload, agent: { agentId: "coder-instance-2", role: "coder" } }, + deps, + ); + expect(first.ok && second.ok).toBe(true); + if (!first.ok || !second.ok) return; + // Same CID — both callers got the cached result. + expect(second.value.cid).toBe(first.value.cid); + }); + + test("role scope: same key + different summary across instances is STATE_CONFLICT", async () => { + // Within the same role namespace, reusing a key with different + // summary is still a conflict — scope-sharing does not mean anything + // goes, it just means the key identifies one logical operation + // across all instances of that role. + const first = await contributeOperation( + { + kind: "work", + summary: "coder-1 work", + agent: { agentId: "coder-instance-1", role: "coder" }, + idempotencyKey: "role-conflict-key", + }, + deps, + ); + expect(first.ok).toBe(true); + if (!first.ok) return; + + const second = await contributeOperation( + { + kind: "work", + summary: "coder-2 different work", + agent: { agentId: "coder-instance-2", role: "coder" }, + idempotencyKey: "role-conflict-key", + }, + deps, + ); + expect(second.ok).toBe(false); + if (second.ok) return; + expect(second.error.code).toBe("STATE_CONFLICT"); + }); + + test("fingerprint rejects same key + different context (plan tasks)", async () => { + // Plans store their task list in context.tasks. Two calls with the + // same key but different task lists must be rejected, not silently + // return the first result. + const first = await contributeOperation( + { + kind: "plan", + mode: "exploration", + summary: "Phase 1", + context: { + plan_title: "Phase 1", + tasks: [{ id: "t1", title: "Design", status: "todo" }] as never, + }, + agent: { agentId: "a1" }, + idempotencyKey: "plan-context-key", + }, + deps, + ); + expect(first.ok).toBe(true); + if (!first.ok) return; + + const second = await contributeOperation( + { + kind: "plan", + mode: "exploration", + summary: "Phase 1", + context: { + plan_title: "Phase 1", + // Same task id, DIFFERENT status — would leave the first call's + // stored state behind if not rejected. + tasks: [{ id: "t1", title: "Design", status: "done" }] as never, + }, + agent: { agentId: "a1" }, + idempotencyKey: "plan-context-key", + }, + deps, + ); + expect(second.ok).toBe(false); + if (second.ok) return; + expect(second.error.code).toBe("STATE_CONFLICT"); + }); + + test("fingerprint rejects same key + different scores", async () => { + const first = await contributeOperation( + { + kind: "work", + summary: "metric submission", + scores: { latency: { value: 42, direction: "minimize" } }, + agent: { agentId: "a1" }, + idempotencyKey: "score-key", + }, + deps, + ); + expect(first.ok).toBe(true); + if (!first.ok) return; + + const second = await contributeOperation( + { + kind: "work", + summary: "metric submission", + scores: { latency: { value: 99, direction: "minimize" } }, + agent: { agentId: "a1" }, + idempotencyKey: "score-key", + }, + deps, + ); + expect(second.ok).toBe(false); + if (second.ok) return; + expect(second.error.code).toBe("STATE_CONFLICT"); + }); + + test("fingerprint rejects same key + renamed artifact (same hash)", async () => { + // Store a single blob in CAS, reference it under two different names. + const hash = await storeTestContent(deps.cas, "hello world"); + + const first = await contributeOperation( + { + kind: "work", + summary: "artifact submission", + artifacts: { "greeting.txt": hash }, + agent: { agentId: "a1" }, + idempotencyKey: "artifact-name-key", + }, + deps, + ); + expect(first.ok).toBe(true); + if (!first.ok) return; + + const second = await contributeOperation( + { + kind: "work", + summary: "artifact submission", + // Same hash, different filename — must be rejected, not coalesced. + artifacts: { "hello.txt": hash }, + agent: { agentId: "a1" }, + idempotencyKey: "artifact-name-key", + }, + deps, + ); + expect(second.ok).toBe(false); + if (second.ok) return; + expect(second.error.code).toBe("STATE_CONFLICT"); + }); + + test("fingerprint is insensitive to context key order", async () => { + // { a: 1, b: 2 } and { b: 2, a: 1 } must produce the same fingerprint + // so equivalent payloads don't spuriously conflict. This is why + // canonicalizeForFingerprint deeply sorts object keys. + const first = await contributeOperation( + { + kind: "work", + summary: "key-order test", + context: { alpha: 1, beta: 2, nested: { x: 1, y: 2 } } as never, + agent: { agentId: "a1" }, + idempotencyKey: "key-order-key", + }, + deps, + ); + const second = await contributeOperation( + { + kind: "work", + summary: "key-order test", + // Different insertion order at both levels — should still match. + context: { nested: { y: 2, x: 1 }, beta: 2, alpha: 1 } as never, + agent: { agentId: "a1" }, + idempotencyKey: "key-order-key", + }, + deps, + ); + expect(first.ok && second.ok).toBe(true); + if (!first.ok || !second.ok) return; + expect(second.value.cid).toBe(first.value.cid); + }); + + test("post-commit callback failure does NOT release the idempotency slot", async () => { + // Simulates a scenario where a user-supplied onContributionWritten + // callback throws AFTER the contribution was durably committed. + // Previous behavior: catch handler released the slot, and a retry + // with the same key produced a second contribution with a new cid. + // Fix: slot is resolved immediately after commit — post-write + // failures are logged but don't undo the cache. + let throwOnce = true; + const depsWithThrowingCallback: typeof deps = { + ...deps, + onContributionWritten: () => { + if (throwOnce) { + throwOnce = false; + throw new Error("simulated post-commit callback failure"); + } + }, + }; + + const input = { + kind: "work" as const, + summary: "post-commit-failure", + agent: { agentId: "a1" }, + idempotencyKey: "post-commit-key", + }; + const first = await contributeOperation(input, depsWithThrowingCallback); + expect(first.ok).toBe(true); + if (!first.ok) return; + + // Retry with the same key — must return the cached result, not + // create a second contribution. + const second = await contributeOperation(input, depsWithThrowingCallback); + expect(second.ok).toBe(true); + if (!second.ok) return; + expect(second.value.cid).toBe(first.value.cid); + + // Store-level check: exactly ONE contribution with that summary. + const stored = await deps.contributionStore.list({ limit: 20 }); + const matching = stored.filter((c) => c.summary === "post-commit-failure"); + expect(matching).toHaveLength(1); + }); + + test("ephemeral flag on non-discussion kind is rejected", async () => { + // Regression guard: context.ephemeral=true is reserved for discussions + // (chat messages + grove_done markers). Allowing it on work/review + // would route real progress through the skip path — no handoff, no + // topology event, no stop check, AND the frontier filters out + // ephemeral contributions, making the work invisible. + for (const kind of ["work", "review", "reproduction", "adoption"] as const) { + const result = await contributeOperation( + { + kind, + summary: `ephemeral ${kind}`, + context: { ephemeral: true }, + agent: { agentId: "a1", role: "coder" }, + }, + deps, + ); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toMatch(/ephemeral.*only valid on kind=discussion/i); + } + }); + + test("ephemeral flag on discussion is allowed (normal path)", async () => { + const result = await contributeOperation( + { + kind: "discussion", + mode: "exploration", + summary: "ephemeral chat", + context: { ephemeral: true, recipients: ["@bob"], message_body: "hi" }, + agent: { agentId: "a1", role: "coder" }, + }, + deps, + ); + expect(result.ok).toBe(true); + }); + + test("no idempotencyKey means no dedup", async () => { + const first = await contributeOperation( + { kind: "work", summary: "duplicate", agent: { agentId: "a1" } }, + deps, + ); + const second = await contributeOperation( + { kind: "work", summary: "duplicate", agent: { agentId: "a1" } }, + deps, + ); + expect(first.ok && second.ok).toBe(true); + if (!first.ok || !second.ok) return; + // Without an idempotency key, two identical contributions are produced. + // (CIDs may collide if the timestamps round to the same millisecond, + // but conceptually each call is a separate contribution.) + expect(first.value.cid).toBeTruthy(); + expect(second.value.cid).toBeTruthy(); + }); +}); + describe("reviewOperation", () => { let testDeps: TestOperationDeps; let deps: FullOperationDeps; diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index fd0c006..951f260 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -9,7 +9,7 @@ import { fireAndForget } from "../../shared/fire-and-forget.js"; import { pickDefined } from "../../shared/pick-defined.js"; -import type { HandoffInput } from "../handoff.js"; +import type { HandoffInput, HandoffStore } from "../handoff.js"; import { createContribution } from "../manifest.js"; import { ContributionKind as CK, @@ -29,9 +29,10 @@ import type { ContributionStore } from "../store.js"; import { toUtcIso } from "../time.js"; import type { AgentOverrides } from "./agent.js"; import { resolveAgent } from "./agent.js"; +import { isEphemeralMessageContext } from "./context-schemas.js"; import type { OperationDeps } from "./deps.js"; import type { OperationResult } from "./result.js"; -import { fromGroveError, notFound, ok, validationErr } from "./result.js"; +import { err, fromGroveError, notFound, OperationErrorCode, ok, validationErr } from "./result.js"; // --------------------------------------------------------------------------- // Result types @@ -95,6 +96,19 @@ export interface ContributeInput { readonly agent?: AgentOverrides | undefined; /** Optional timestamp for replay/import. Defaults to current time if omitted. */ readonly createdAt?: string | undefined; + /** + * Optional client-supplied idempotency key. When set, repeated calls with + * the same key (within IDEMPOTENCY_TTL_MS) return the previously-stored + * contribution metadata instead of creating a new contribution. + * + * Follows HTTP `Idempotency-Key` conventions: opaque string, scoped per + * agent (the key is namespaced by `agent.role ?? agent.agentId`). Two + * different agents can use the same key without colliding. + * + * If omitted, no deduplication is performed — clients that need retry + * safety should generate a key and pass it on every retry. + */ + readonly idempotencyKey?: string | undefined; } /** Input for the review operation. */ @@ -172,7 +186,9 @@ async function validateRelations( } /** - * Validate that all artifact hashes exist in CAS (batch). + * Validate that all artifact hashes exist in CAS. + * Existence checks run in parallel via Promise.all so a contribution with + * N artifacts pays 1×rtt instead of N×rtt against a remote CAS. * Returns a validation error if any hash is missing, or undefined if all valid. */ async function validateArtifacts( @@ -182,8 +198,12 @@ async function validateArtifacts( if (deps.cas === undefined) { return validationErr("Artifact validation not available (missing cas)"); } - for (const [name, hash] of Object.entries(artifacts)) { - const exists = await deps.cas.exists(hash); + const cas = deps.cas; + const entries = Object.entries(artifacts); + const checks = await Promise.all( + entries.map(async ([name, hash]) => ({ name, hash, exists: await cas.exists(hash) })), + ); + for (const { name, hash, exists } of checks) { if (!exists) { return validationErr(`Artifact '${name}' references non-existent hash: ${hash}`); } @@ -204,6 +224,408 @@ function resolveMode( return CM.Evaluation; } +// --------------------------------------------------------------------------- +// Idempotency cache +// --------------------------------------------------------------------------- + +/** Time window during which a cached idempotency result is reused. */ +const IDEMPOTENCY_TTL_MS = 5 * 60 * 1000; + +/** Maximum number of cached idempotency entries (LRU eviction). */ +const IDEMPOTENCY_MAX_ENTRIES = 1024; + +/** + * An entry in the idempotency cache. Can be in one of two states: + * - `pending`: a write is currently in-flight. Subsequent callers with + * the same key must await this Promise rather than starting a second + * write (single-flight). + * - `value`: the write has completed and the result is cached. + * + * Both states carry a `fingerprint` — a canonical hash of the request's + * intent (kind, summary, agent, relations, artifacts, tags). Lookups with + * a mismatched fingerprint are rejected with a STATE_CONFLICT error + * instead of silently returning the first call's result — that matches + * HTTP Idempotency-Key semantics (see Stripe, AWS, RFC draft) and surfaces + * client bugs where the same key is reused across different intents. + */ +interface CachedIdempotencyEntry { + readonly fingerprint: string; + readonly storedAt: number; + readonly pending?: Promise>; + readonly value?: ContributeResult; +} + +/** + * Per-process cache of idempotency-key → contribute result. + * + * Map iteration order is insertion order, so we can implement a simple LRU + * by deleting + re-inserting on read. Entries are also expired by timestamp + * on lookup. Not shared across processes — clients running multiple grove + * instances must coordinate keys themselves. + * + * Single-flight: when a caller first observes a key miss, it synchronously + * inserts a pending entry holding a Promise the write will resolve. Any + * concurrent caller with the same key awaits that Promise. JavaScript is + * single-threaded, so the check-then-insert is atomic without a mutex. + */ +const idempotencyCache = new Map(); + +/** Build the cache key. Namespaced per agent so two agents can share keys. */ +function idempotencyCacheKey(agentScope: string, key: string): string { + return `${agentScope}\u0000${key}`; +} + +/** + * Deeply canonicalize a JSON-like value for stable fingerprint hashing. + * + * JSON.stringify preserves object-key insertion order, so two objects + * with the same keys in different order would produce different + * strings. This walker recursively sorts object keys and normalizes + * arrays (preserving order, since array order is usually meaningful). + * Used to fingerprint the `context` and `scores` fields where field + * order is not semantic. + */ +function canonicalizeForFingerprint(value: unknown): unknown { + if (value === null || value === undefined) return null; + if (Array.isArray(value)) { + return value.map((v) => canonicalizeForFingerprint(v)); + } + if (typeof value === "object") { + const obj = value as Record; + const out: Record = {}; + for (const key of Object.keys(obj).sort()) { + out[key] = canonicalizeForFingerprint(obj[key]); + } + return out; + } + return value; +} + +/** + * Canonical fingerprint of a contribute request's intent. + * + * Captures the full persisted payload shape so any difference that + * could produce a different stored contribution is reflected in the + * hash: + * + * - kind, mode, summary, description + * - `context` (deep-canonicalized so key order doesn't matter) — + * plans store their task list here, messages store recipients + + * body, grove_done stores the ephemeral flag + reason + * - `scores` (deep-canonicalized) — per-metric values + * - `artifacts` as sorted (name → hash) pairs, not just hashes — + * a rename changes identity even if the hash is the same + * - `relations` sorted by targetCid (+ type, metadata) + * - `tags` sorted + * - agentScope (role ?? agentId) mirroring the cache-key namespace + * + * Excludes `createdAt` (always varies), `agent` object (scope stands + * in), and `idempotencyKey` (that's the lookup key itself). + * + * Two requests with the same key + same fingerprint → treat as retries, + * return cached result. + * Two requests with the same key + different fingerprint → reject with + * STATE_CONFLICT to surface the client bug. + * + * NB: the fingerprint must include every field that could make the + * stored contribution differ in an observable way. Adding new fields + * to ContributeInput or the contribution manifest WITHOUT adding them + * here re-opens the "same key, different stored payload" loophole. + */ +function computeIdempotencyFingerprint( + input: ContributeInput, + agent: { readonly agentId: string; readonly role?: string | undefined }, +): string { + const canonical = JSON.stringify({ + kind: input.kind, + mode: input.mode ?? null, + summary: input.summary, + description: input.description ?? null, + // Sort artifacts by name and keep name→hash pairs so a rename + // (same hash, different filename) produces a different fingerprint. + artifacts: input.artifacts + ? Object.entries(input.artifacts) + .slice() + .sort(([a], [b]) => (a < b ? -1 : a > b ? 1 : 0)) + .map(([name, hash]) => ({ name, hash })) + : [], + relations: input.relations + ? [...input.relations] + .map((r) => ({ + target: r.targetCid, + type: r.relationType, + metadata: canonicalizeForFingerprint(r.metadata), + })) + .sort((a, b) => { + if (a.target !== b.target) return a.target < b.target ? -1 : 1; + return a.type < b.type ? -1 : a.type > b.type ? 1 : 0; + }) + : [], + tags: input.tags ? [...input.tags].sort() : [], + // Context varies per kind (plan tasks, message body, done marker, etc). + // Deep canonicalization so { a: 1, b: 2 } and { b: 2, a: 1 } hash the same. + context: canonicalizeForFingerprint(input.context), + // Scores are per-metric numeric payloads; omitting them would let a + // caller silently overwrite a metric by reusing an idempotency key. + scores: canonicalizeForFingerprint(input.scores), + // Scope mirrors the cache key: role wins if present, else agentId. + // Without this, two agents sharing a role would see the same cache + // key but different fingerprints → spurious STATE_CONFLICT on what + // should be a shared-scope retry. + agentScope: agent.role ?? agent.agentId, + }); + // Simple non-cryptographic hash — collisions would only cause a + // false-positive "same input" response, and the attacker would need to + // control the idempotency key namespace anyway (scoped per-agent). + let h = 0; + for (let i = 0; i < canonical.length; i++) { + h = (h * 31 + canonical.charCodeAt(i)) | 0; + } + return (h >>> 0).toString(16); +} + +/** + * Check the cache for an existing entry. Returns: + * - `{ type: "pending", promise }` — an in-flight write with the same + * fingerprint. Caller should await and return. + * - `{ type: "value", result }` — a completed cached result. Caller + * should return it directly. + * - `{ type: "conflict", message }` — a cached entry exists but with + * a different fingerprint. Caller should return STATE_CONFLICT. + * - `undefined` — no usable entry (miss or expired). Caller should + * reserve the slot via `reserveIdempotencySlot` and run the write. + */ +function lookupIdempotency( + cacheKey: string, + fingerprint: string, + now: number, +): + | { readonly type: "pending"; readonly promise: Promise> } + | { readonly type: "value"; readonly result: ContributeResult } + | { readonly type: "conflict"; readonly message: string } + | undefined { + const entry = idempotencyCache.get(cacheKey); + if (entry === undefined) return undefined; + if (now - entry.storedAt > IDEMPOTENCY_TTL_MS) { + idempotencyCache.delete(cacheKey); + return undefined; + } + if (entry.fingerprint !== fingerprint) { + return { + type: "conflict", + message: + "Idempotency key was previously used with a different request body. " + + "Reusing the same key with different input is rejected to prevent silent " + + "write divergence. Use a new key for the new intent.", + }; + } + // LRU touch: move to end of insertion order. + idempotencyCache.delete(cacheKey); + idempotencyCache.set(cacheKey, entry); + if (entry.pending !== undefined) { + return { type: "pending", promise: entry.pending }; + } + if (entry.value !== undefined) { + return { type: "value", result: entry.value }; + } + return undefined; +} + +/** + * Synchronously reserve a cache slot with a pending Promise. Subsequent + * concurrent calls with the same key will find this pending entry and + * await it (single-flight). Returns a resolver the caller must invoke + * exactly once with the final OperationResult. + */ +function reserveIdempotencySlot( + cacheKey: string, + fingerprint: string, + now: number, +): { + readonly resolve: (result: OperationResult) => void; + readonly release: () => void; +} { + // Evict the oldest entry if at capacity. + if (idempotencyCache.size >= IDEMPOTENCY_MAX_ENTRIES) { + const oldest = idempotencyCache.keys().next().value; + if (oldest !== undefined) idempotencyCache.delete(oldest); + } + + let resolver!: (result: OperationResult) => void; + const pending = new Promise>((r) => { + resolver = r; + }); + + idempotencyCache.set(cacheKey, { fingerprint, storedAt: now, pending }); + + return { + resolve: (result) => { + // Transition the slot from pending → final. + if (result.ok) { + idempotencyCache.set(cacheKey, { + fingerprint, + storedAt: Date.now(), + value: result.value, + }); + } else { + // On error, delete the slot so retries can make progress. + idempotencyCache.delete(cacheKey); + } + resolver(result); + }, + release: () => { + // Called on unexpected exception (e.g., thrown error not caught by + // fromGroveError). Remove the slot so retries aren't blocked. + idempotencyCache.delete(cacheKey); + }, + }; +} + +/** + * Test-only: clear the idempotency cache between test cases. Not exported + * from the package index — only intended for in-package tests. + */ +export function _resetIdempotencyCacheForTests(): void { + idempotencyCache.clear(); +} + +// --------------------------------------------------------------------------- +// Contribution write paths +// --------------------------------------------------------------------------- + +/** + * Atomic write path: SQLite stores supporting `putWithCowrite` write the + * contribution and all handoff records inside a single SQLite transaction. + * Used when both the contribution store and handoff store are SQLite-backed. + * + * The `putWithCowrite` parameter may be sync (raw SqliteContributionStore) + * or async (EnforcingContributionStore wrapping a SQLite store — its + * enforcement hooks are async but the inner cowrite callback still runs + * synchronously inside the transaction, so handoff IDs are populated + * before the outer Promise resolves). + */ +async function writeAtomic( + contribution: Contribution, + routedTo: readonly string[], + agentRole: string, + putWithCowrite: (c: Contribution, fn: () => void) => void | Promise, + insertSync: (input: HandoffInput) => string, +): Promise { + const handoffIds: string[] = []; + const maybePromise = putWithCowrite(contribution, () => { + for (const targetRole of routedTo) { + const hid = insertSync({ + sourceCid: contribution.cid, + fromRole: agentRole, + toRole: targetRole, + requiresReply: false, + }); + if (hid !== undefined) handoffIds.push(hid); + } + }); + if ( + maybePromise !== undefined && + typeof (maybePromise as { then?: unknown }).then === "function" + ) { + await maybePromise; + } + return handoffIds; +} + +/** + * Serial write path: write the contribution first, then create the handoff + * records. Used when the store does not support atomic cowrite (in-memory + * stores, Nexus VFS handoff store). + * + * Uses handoffStore.createMany() when available to fan out N handoffs in + * one round-trip (Issue 15A in the #228 review). Falls back to a sequential + * create() loop for stores that don't implement the batch API. + * + * Best-effort handoffs: a handoff insertion failure must not fail the + * already-committed contribution write. The contribution is in the DAG; + * handoff records are the secondary artifact. + */ +async function writeSerial( + contribution: Contribution, + routedTo: readonly string[] | undefined, + agentRole: string | undefined, + store: ContributionStore, + handoffStore: HandoffStore | undefined, +): Promise { + await store.put(contribution); + + const handoffIds: string[] = []; + if (handoffStore === undefined || routedTo === undefined || agentRole === undefined) { + return handoffIds; + } + + const inputs: HandoffInput[] = routedTo.map((targetRole) => ({ + sourceCid: contribution.cid, + fromRole: agentRole, + toRole: targetRole, + requiresReply: false, + })); + + if (handoffStore.createMany !== undefined) { + try { + const handoffs = await handoffStore.createMany(inputs); + for (const h of handoffs) handoffIds.push(h.handoffId); + } catch { + // Best-effort: contribution is already committed. + } + return handoffIds; + } + + for (const input of inputs) { + try { + const handoff = await handoffStore.create(input); + handoffIds.push(handoff.handoffId); + } catch { + // Best-effort: contribution is already committed. + } + } + return handoffIds; +} + +/** + * Dispatch to the atomic or serial write path based on store capabilities. + * Centralizes the duck-typing on `putWithCowrite` / `insertSync` so the + * caller doesn't have to manage capability detection. + */ +async function writeContributionWithHandoffs( + contribution: Contribution, + routedTo: readonly string[] | undefined, + agentRole: string | undefined, + store: ContributionStore, + handoffStore: HandoffStore | undefined, +): Promise { + const needsHandoffs = + handoffStore !== undefined && + routedTo !== undefined && + routedTo.length > 0 && + agentRole !== undefined; + + if (needsHandoffs) { + const cowriteStore = store as { + putWithCowrite?: (c: Contribution, fn: () => void) => void | Promise; + }; + const sqliteHandoffStore = handoffStore as { + insertSync?: (input: HandoffInput) => string; + }; + if (cowriteStore.putWithCowrite !== undefined && sqliteHandoffStore.insertSync !== undefined) { + return writeAtomic( + contribution, + routedTo, + agentRole, + cowriteStore.putWithCowrite.bind(cowriteStore), + sqliteHandoffStore.insertSync.bind(sqliteHandoffStore), + ); + } + } + + return writeSerial(contribution, routedTo, agentRole, store, handoffStore); +} + // --------------------------------------------------------------------------- // Operations // --------------------------------------------------------------------------- @@ -213,6 +635,17 @@ export async function contributeOperation( input: ContributeInput, deps: OperationDeps, ): Promise> { + // Hoisted out of the try block so the outer catch can release the slot + // on thrown errors. Single-flight: while a slot holds a pending Promise, + // concurrent callers with the same key await that Promise instead of + // racing through the write path. + let idempotencySlot: + | { + readonly resolve: (result: OperationResult) => void; + readonly release: () => void; + } + | undefined; + try { if (deps.contributionStore === undefined) { return validationErr("Contribution operations not available (missing contributionStore)"); @@ -239,41 +672,50 @@ export async function contributeOperation( // Normalize to UTC Z-format so lexicographic ORDER BY works without datetime(). const createdAt = toUtcIso(input.createdAt ?? new Date().toISOString()); - // Idempotency check: skip if same summary + agent role exists within last 60s. - // Prevents duplicate contributions from MCP tool retries or agent calling the tool multiple times. - { - const recentWindow = new Date(Date.now() - 60_000).toISOString(); - try { - const recent = await deps.contributionStore.list({ limit: 20 }); - const agentMatch = (c: Contribution) => - agent.role ? c.agent.role === agent.role : c.agent.agentId === agent.agentId; - const isDuplicate = recent.some( - (c) => - c.summary === input.summary && - agentMatch(c) && - c.kind === input.kind && - c.createdAt >= recentWindow, - ); - if (isDuplicate) { - // Return the existing contribution's info instead of creating a duplicate - const existing = recent.find( - (c) => c.summary === input.summary && agentMatch(c) && c.kind === input.kind, - ); - if (existing) { - return ok({ - cid: existing.cid, - kind: existing.kind, - mode: existing.mode, - summary: existing.summary, - artifactCount: Object.keys(existing.artifacts).length, - relationCount: existing.relations.length, - createdAt: existing.createdAt, - }); - } + // --- Idempotency: single-flight with request fingerprinting --- + // + // Explicit client-supplied key, namespaced per agent. HTTP + // Idempotency-Key semantics (Stripe / AWS / RFC draft): + // + // - same key + same fingerprint → return cached result (retry) + // - same key + different fingerprint → STATE_CONFLICT (bug) + // - same key + in-flight → await the pending Promise (single-flight) + // + // Replaces the previous heuristic that did a 60s same-summary lookup; + // that approach false-positived on legitimate retries (e.g., updatePlan + // called twice with the same title) and missed real retries under + // concurrency because the window query was unbounded across all agents. + // + // Critically: the check-then-reserve is synchronous (no await between + // lookup miss and slot insertion). JavaScript is single-threaded, so + // two concurrent callers cannot both observe a miss and race past the + // insert — the second caller sees the first caller's pending entry. + const idempotencyAgentScope = agent.role ?? agent.agentId; + const idempotencyCacheLookupKey = + input.idempotencyKey !== undefined + ? idempotencyCacheKey(idempotencyAgentScope, input.idempotencyKey) + : undefined; + if (idempotencyCacheLookupKey !== undefined) { + const fingerprint = computeIdempotencyFingerprint(input, agent); + const cached = lookupIdempotency(idempotencyCacheLookupKey, fingerprint, Date.now()); + if (cached !== undefined) { + if (cached.type === "pending") { + // Concurrent caller: await their write and return the same result. + return cached.promise; } - } catch { - // Best-effort — continue with normal contribution if check fails + if (cached.type === "value") { + return ok(cached.result); + } + // type === "conflict" — key reused with different input. + return err({ + code: OperationErrorCode.StateConflict, + message: cached.message, + details: { idempotencyKey: input.idempotencyKey }, + }); } + // Miss: synchronously reserve the slot. Subsequent concurrent + // callers observe the pending Promise and await it. + idempotencySlot = reserveIdempotencySlot(idempotencyCacheLookupKey, fingerprint, Date.now()); } const contributionInput: ContributionInput = { @@ -292,6 +734,69 @@ export async function contributeOperation( const contribution = createContribution(contributionInput); + // --- Ephemeral flag is reserved for discussions --- + // context.ephemeral=true is a routing/frontier skip signal intended for + // chat messages and grove_done session terminators, both of which are + // kind=discussion. Allowing it on work/review/reproduction/adoption would + // make real progress invisible to the topology router, handoff store, + // stop-condition evaluator, AND the frontier calculator (which filters + // out ephemeral contributions). Reject the combination explicitly so + // caller bugs surface instead of silently dropping work. + if (contribution.kind !== CK.Discussion && contribution.context?.ephemeral === true) { + const errResult = validationErr( + `context.ephemeral=true is only valid on kind=discussion contributions ` + + `(chat messages and session terminators). Got kind='${contribution.kind}'. ` + + `The ephemeral flag suppresses topology routing, handoff creation, and ` + + `frontier inclusion — setting it on progress contributions would make them invisible.`, + ); + // Resolve the idempotency slot with this permanent error so any + // concurrent retry with the same key gets the same response. + idempotencySlot?.resolve(errResult); + return errResult; + } + + // --- Routing classification --- + // Plans are coordination metadata (not progress). Done markers + // (kind=discussion + context.done=true, written by grove_done) signal + // session termination — they should NOT create handoff records for + // downstream roles but MUST still fire a topology event so event-driven + // clients like useDoneDetection() can observe session completion. + // Ephemeral chat (kind=discussion + context.ephemeral=true WITHOUT + // context.done) is background noise that should be invisible to the + // routing layer entirely. + // + // kind | handoffs | route event | stop conditions + // plan | no | yes | no + // discussion (done) | no | yes | no + // discussion (chat) | no | no | no + // discussion (plain) | yes | yes | yes + // work / review / etc | yes | yes | yes + // + // Earlier versions of this branch collapsed done markers into the + // "ephemeral discussion" row, which suppressed the route event too. + // That turned out to strand event-driven done detection: when an + // EventBus is present, useDoneDetection disables polling and waits + // exclusively for contribution events on the bus. Without a route + // event for the done marker, the UI never advanced out of "running" + // after all roles signaled done. The two discussion rows must remain + // distinct. + const isPlan = contribution.kind === CK.Plan; + const isDoneMarker = contribution.kind === CK.Discussion && contribution.context?.done === true; + const isEphemeralChat = + contribution.kind === CK.Discussion && + isEphemeralMessageContext(contribution.context) && + !isDoneMarker; + // Done markers + ephemeral chat + plans all skip handoff creation. + // A done marker is "session over — no work to pick up"; a chat message + // is noise; a plan is coordination metadata. + const skipHandoffs = isPlan || isDoneMarker || isEphemeralChat; + // ONLY ephemeral chat skips the route event. Plans and done markers + // still publish so downstream UIs / observers can react. + const skipRouteEvent = isEphemeralChat; + // None of these three count toward budget / quorum / deliberation + // stop conditions. + const skipStopConditions = isPlan || isDoneMarker || isEphemeralChat; + // --- Policy enforcement (TOCTOU-safe: runs inside store mutex) --- let policyResult: PolicyEnforcementResult | undefined; let enforcer: PolicyEnforcer | undefined; @@ -305,11 +810,11 @@ export async function contributeOperation( }; if (store.setPreWriteHook) { store.setPreWriteHook(contribution.cid, async (c: Contribution) => { - policyResult = await enforcer?.enforce(c, true); + policyResult = await enforcer?.enforce(c, true, { skipStopConditions }); }); } else { // Fallback: enforce outside mutex (non-EnforcingContributionStore) - policyResult = await enforcer.enforce(contribution, true); + policyResult = await enforcer.enforce(contribution, true, { skipStopConditions }); } } @@ -327,91 +832,74 @@ export async function contributeOperation( } } - // --- Write: contribution + handoffs atomically where possible --- - const handoffIds: string[] = []; - const needsHandoffs = - deps.handoffStore !== undefined && routedTo !== undefined && routedTo.length > 0; + // --- Write: contribution + handoffs (atomic when supported, serial otherwise) --- + // Plans + ephemeral messages skip handoff creation entirely by passing + // undefined as the routing-target list to the writer. const agentRole = contribution.agent.role; + const handoffsRoutedTo = skipHandoffs ? undefined : routedTo; + const handoffIds = await writeContributionWithHandoffs( + contribution, + handoffsRoutedTo, + agentRole, + deps.contributionStore, + deps.handoffStore, + ); - // Duck-type check for atomic cowrite capability (SqliteContributionStore + SqliteHandoffStore) - const cowriteStore = deps.contributionStore as { - putWithCowrite?: (c: Contribution, fn: () => void) => void; + // ┌──────────────────────────────────────────────────────────────────┐ + // │ DURABLE COMMIT BOUNDARY │ + // │ │ + // │ The contribution (and any atomic-path handoffs) are now durably │ + // │ written to the store. Everything below this line is post-write │ + // │ side-effect — it must NEVER cause the already-committed write │ + // │ to be "undone" from the caller's perspective. │ + // │ │ + // │ Idempotency resolution happens HERE, not at the end of the │ + // │ function, because a throw in persistOutcome or a user-supplied │ + // │ callback would otherwise release the slot and let a retry │ + // │ produce a duplicate contribution with a fresh createdAt. │ + // │ │ + // │ The cached response reflects committed state only — subsequent │ + // │ post-write updates (stop-condition recheck changing │ + // │ policyResult.stopResult) are NOT propagated into the cache. │ + // │ The first caller still sees the full updated result via the │ + // │ direct return below; cached retries see the committed-only │ + // │ snapshot, which is still correct (the contribution is the │ + // │ same, only the advisory stop signal differs). │ + // └──────────────────────────────────────────────────────────────────┘ + const committedResult: ContributeResult = { + cid: contribution.cid, + kind: contribution.kind, + mode: contribution.mode, + summary: contribution.summary, + artifactCount: Object.keys(contribution.artifacts).length, + relationCount: contribution.relations.length, + createdAt: contribution.createdAt, + ...(routedTo !== undefined ? { routedTo } : {}), + ...(handoffIds.length > 0 ? { handoffIds } : {}), + ...(policyResult !== undefined ? { policy: policyResult } : {}), }; - const sqliteHandoffStore = needsHandoffs - ? (deps.handoffStore as { insertSync?: (input: HandoffInput) => string }) - : undefined; - if ( - needsHandoffs && - cowriteStore.putWithCowrite !== undefined && - sqliteHandoffStore?.insertSync !== undefined && - routedTo !== undefined && - agentRole !== undefined - ) { - // Atomic path: contribution + all handoff records in one SQLite transaction - cowriteStore.putWithCowrite(contribution, () => { - for (const targetRole of routedTo) { - const hid = sqliteHandoffStore.insertSync?.({ - sourceCid: contribution.cid, - fromRole: agentRole, - toRole: targetRole, - requiresReply: false, - }); - if (hid !== undefined) handoffIds.push(hid); - } - }); - } else { - // Non-atomic path: separate writes (in-memory stores or Nexus VFS handoff store) - await deps.contributionStore.put(contribution); - if (needsHandoffs && routedTo !== undefined && agentRole !== undefined) { - const handoffStore = deps.handoffStore; - if (handoffStore !== undefined) { - // Identify which handoff store type is in use - const isNexus = !("insertSync" in handoffStore); - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [handoff] storeType=${isNexus ? "NexusHandoffStore" : "SqliteHandoffStore"} targets=${(routedTo as readonly string[]).join(",")}\n`, - ); - } catch { - /* */ - } - for (const targetRole of routedTo) { - try { - const handoff = await handoffStore.create({ - sourceCid: contribution.cid, - fromRole: agentRole, - toRole: targetRole, - requiresReply: false, - }); - handoffIds.push(handoff.handoffId); - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [handoff] CREATED ${agentRole}→${targetRole} cid=${contribution.cid.slice(0, 16)} id=${handoff.handoffId}\n`, - ); - } catch { - /* */ - } - } catch (handoffErr) { - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [handoff] FAILED ${agentRole}→${targetRole} cid=${contribution.cid.slice(0, 16)} err=${handoffErr instanceof Error ? handoffErr.message : String(handoffErr)}\n`, - ); - } catch { - /* */ - } - } - } - } - } + if (idempotencySlot !== undefined) { + idempotencySlot.resolve(ok(committedResult)); + // Clear the local reference so the outer catch(error) handler can't + // release the slot for a post-commit failure. The contribution is + // durably written; retries with the same key must return this cached + // result, NOT re-run the write path. + idempotencySlot = undefined; + } + + // Post-write callbacks — wrapped so a throw cannot escape and undo + // the commit from the caller's perspective. + try { + deps.onContributionWrite?.(); + deps.onContributionWritten?.(contribution.cid); + } catch (callbackErr) { + process.stderr.write( + `[grove] Warning: onContributionWrite* callback threw after commit: ${ + callbackErr instanceof Error ? callbackErr.message : String(callbackErr) + }\n`, + ); } - deps.onContributionWrite?.(); - deps.onContributionWritten?.(contribution.cid); // --- Post-write: mark upstream handoffs as replied (fire-and-forget) --- // When this contribution targets another CID (reviews/responds_to), find @@ -443,12 +931,32 @@ export async function contributeOperation( } // --- Post-write: persist derived outcome (outside mutex scope) --- + // Wrapped: a throw from the outcome store must not undo the committed + // contribution write or leak to the caller as a fresh error. The + // contribution is already in the DAG; failed outcome persistence is + // a downstream bookkeeping issue, not a write failure. if (policyResult?.derivedOutcome !== undefined && enforcer !== undefined) { - await enforcer.persistOutcome(contribution.cid, policyResult.derivedOutcome); + try { + await enforcer.persistOutcome(contribution.cid, policyResult.derivedOutcome); + } catch (persistErr) { + process.stderr.write( + `[grove] Warning: persistOutcome failed after commit (cid=${contribution.cid.slice(0, 16)}): ${ + persistErr instanceof Error ? persistErr.message : String(persistErr) + }\n`, + ); + } } // --- Post-write: route events via topology (fire-and-forget) --- - if (routedTo !== undefined && deps.topologyRouter !== undefined && agentRole !== undefined) { + // Ephemeral messages skip the routing event entirely so chat doesn't + // wake downstream agents. Plans still fire the event so live UIs can + // observe plan creation, but the handoff record was already suppressed. + if ( + !skipRouteEvent && + routedTo !== undefined && + deps.topologyRouter !== undefined && + agentRole !== undefined + ) { fireAndForget("topology routing", () => deps.topologyRouter?.route(agentRole, { cid: contribution.cid, @@ -466,10 +974,15 @@ export async function contributeOperation( // this contribution. This runs outside the write mutex, so it doesn't block // concurrent writers. Only re-checks when the pre-write result said not stopped. // + // Plans + ephemeral messages skip this recheck — they were excluded from the + // pre-write evaluation too (skipStopConditions), so there is no threshold- + // crossing semantics to recover here. + // // Best-effort: errors here must not fail the already-committed write. A store // read failure during the recheck is logged but does not surface as a failed // operation — the contribution is already in the DAG. if ( + !skipStopConditions && policyResult !== undefined && !policyResult.stopResult?.stopped && deps.contract?.stopConditions !== undefined && @@ -522,7 +1035,12 @@ export async function contributeOperation( } } - return ok({ + // Build the final result returned to the DIRECT caller. This includes + // any post-write updates to policyResult (e.g., stop-condition recheck + // detecting a threshold crossing). Cached retries get the narrower + // `committedResult` built above — that's intentional, see the + // DURABLE COMMIT BOUNDARY comment. + const result: ContributeResult = { cid: contribution.cid, kind: contribution.kind, mode: contribution.mode, @@ -533,8 +1051,18 @@ export async function contributeOperation( ...(routedTo !== undefined ? { routedTo } : {}), ...(handoffIds.length > 0 ? { handoffIds } : {}), ...(policyResult !== undefined ? { policy: policyResult } : {}), - }); + }; + + return ok(result); } catch (error) { + // Release the idempotency slot ONLY if it's still reserved here. The + // slot is cleared (set to undefined) immediately after the durable + // commit boundary above — so this release path can only fire for + // errors that happened BEFORE the contribution was durably written + // (validation, policy enforcement inside the mutex, store write + // failure). Post-commit failures flow through the committed result + // path and never reach this catch. + idempotencySlot?.release(); return fromGroveError(error); } } diff --git a/src/core/operations/messaging.test.ts b/src/core/operations/messaging.test.ts index 15b9882..8c82f7e 100644 --- a/src/core/operations/messaging.test.ts +++ b/src/core/operations/messaging.test.ts @@ -1,106 +1,228 @@ -import { describe, expect, test } from "bun:test"; +/** + * Tests for sendMessageAsDiscussion + readInbox. + * + * sendMessageAsDiscussion was previously the standalone `sendMessage` + * helper that bypassed PolicyEnforcer/TopologyRouter (the #228 bug for + * the messaging path). It now sugars over contributeOperation, so this + * file also covers the regression test: a contract that disallows + * `discussion` for an agent must reject sendMessageAsDiscussion. + */ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; + +import type { GroveContract } from "../contract.js"; +import { EnforcingContributionStore } from "../enforcing-store.js"; import type { Contribution, ContributionInput } from "../models.js"; -import { ContributionKind, RelationType } from "../models.js"; +import { ContributionKind, ContributionMode, RelationType } from "../models.js"; +import type { ContributionStore } from "../store.js"; import { InMemoryContributionStore } from "../testing.js"; -import { readInbox, sendMessage } from "./messaging.js"; +import { _resetIdempotencyCacheForTests } from "./contribute.js"; +import type { OperationDeps } from "./deps.js"; +import { readInbox, sendMessageAsDiscussion } from "./messaging.js"; // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- -/** Simple deterministic CID mock for testing. */ -function mockComputeCid(input: ContributionInput): string { - const raw = JSON.stringify(input); - // Use a simple hash-like hex string derived from content length + chars - let hash = 0; - for (let i = 0; i < raw.length; i++) { - hash = (hash * 31 + raw.charCodeAt(i)) | 0; - } - const hex = Math.abs(hash).toString(16).padStart(8, "0"); - return `blake3:${hex.repeat(8)}`; -} - const AGENT_ALICE = { agentId: "alice", agentName: "Alice" }; const AGENT_BOB = { agentId: "bob", agentName: "Bob" }; +function makeDeps(store: ContributionStore, contract?: GroveContract): OperationDeps { + return { + contributionStore: store, + ...(contract !== undefined ? { contract } : {}), + }; +} + // --------------------------------------------------------------------------- -// sendMessage +// sendMessageAsDiscussion — happy path + validation // --------------------------------------------------------------------------- -describe("sendMessage", () => { +describe("sendMessageAsDiscussion", () => { + beforeEach(() => { + _resetIdempotencyCacheForTests(); + }); + + afterEach(() => { + _resetIdempotencyCacheForTests(); + }); + test("creates ephemeral discussion contribution with recipients", async () => { const store = new InMemoryContributionStore(); - const result = await sendMessage( - store, + const result = await sendMessageAsDiscussion( + { agent: AGENT_ALICE, body: "Hello Bob!", recipients: ["@bob"] }, + makeDeps(store), + ); + + expect(result.ok).toBe(true); + if (!result.ok) return; + expect(result.value.recipients).toEqual(["@bob"]); + expect(result.value.summary).toBe("Hello Bob!"); + + const stored = await store.get(result.value.cid); + expect(stored?.kind).toBe(ContributionKind.Discussion); + expect(stored?.mode).toBe(ContributionMode.Exploration); + expect(stored?.context?.ephemeral).toBe(true); + expect(stored?.context?.recipients).toEqual(["@bob"]); + expect(stored?.context?.message_body).toBe("Hello Bob!"); + expect(stored?.tags).toContain("message"); + }); + + test("rejects empty body via VALIDATION_ERROR", async () => { + const store = new InMemoryContributionStore(); + const result = await sendMessageAsDiscussion( + { agent: AGENT_ALICE, body: " ", recipients: ["@bob"] }, + makeDeps(store), + ); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toMatch(/empty/); + }); + + test("rejects empty recipients via VALIDATION_ERROR", async () => { + const store = new InMemoryContributionStore(); + const result = await sendMessageAsDiscussion( + { agent: AGENT_ALICE, body: "Hi", recipients: [] }, + makeDeps(store), + ); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toMatch(/recipient/); + }); + + test("creates responds_to relation when inReplyTo is provided", async () => { + const store = new InMemoryContributionStore(); + // First create a parent message so the responds_to target exists. + const parent = await sendMessageAsDiscussion( + { agent: AGENT_ALICE, body: "parent", recipients: ["@bob"] }, + makeDeps(store), + ); + expect(parent.ok).toBe(true); + if (!parent.ok) return; + + const reply = await sendMessageAsDiscussion( { - agent: AGENT_ALICE, - body: "Hello Bob!", - recipients: ["@bob"], + agent: AGENT_BOB, + body: "reply", + recipients: ["@alice"], + inReplyTo: parent.value.cid, }, - mockComputeCid, + makeDeps(store), ); + expect(reply.ok).toBe(true); + if (!reply.ok) return; - expect(result.kind).toBe(ContributionKind.Discussion); - expect(result.context?.ephemeral).toBe(true); - expect(result.context?.recipients).toEqual(["@bob"]); - expect(result.context?.message_body).toBe("Hello Bob!"); - expect(result.tags).toContain("message"); - expect(result.manifestVersion).toBe(1); - - // Verify it was stored - const stored = await store.get(result.cid); - expect(stored).toBeDefined(); + const stored = await store.get(reply.value.cid); + expect(stored?.relations).toHaveLength(1); + expect(stored?.relations[0]?.relationType).toBe(RelationType.RespondsTo); + expect(stored?.relations[0]?.targetCid).toBe(parent.value.cid); }); - test("rejects empty body", async () => { + test("rejects inReplyTo when target does not exist (validateRelations)", async () => { const store = new InMemoryContributionStore(); - await expect( - sendMessage( - store, - { - agent: AGENT_ALICE, - body: " ", - recipients: ["@bob"], - }, - mockComputeCid, - ), - ).rejects.toThrow(/empty/); + const result = await sendMessageAsDiscussion( + { + agent: AGENT_ALICE, + body: "orphan reply", + recipients: ["@bob"], + inReplyTo: "blake3:0000000000000000000000000000000000000000000000000000000000000000", + }, + makeDeps(store), + ); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.code).toBe("NOT_FOUND"); }); - test("rejects empty recipients", async () => { + test("multi-recipient message stores all addressees", async () => { const store = new InMemoryContributionStore(); - await expect( - sendMessage( - store, - { - agent: AGENT_ALICE, - body: "Hello!", - recipients: [], - }, - mockComputeCid, - ), - ).rejects.toThrow(/at least one recipient/); + const result = await sendMessageAsDiscussion( + { + agent: AGENT_ALICE, + body: "team broadcast", + recipients: ["@bob", "@charlie", "@dave"], + }, + makeDeps(store), + ); + expect(result.ok).toBe(true); + if (!result.ok) return; + const stored = await store.get(result.value.cid); + expect(stored?.context?.recipients).toEqual(["@bob", "@charlie", "@dave"]); }); - test("creates responds_to relation when inReplyTo provided", async () => { + test("idempotencyKey: identical retry returns cached result", async () => { const store = new InMemoryContributionStore(); - const parentCid = "blake3:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + const input = { + agent: AGENT_ALICE, + body: "say once", + recipients: ["@bob"], + idempotencyKey: "broadcast-1", + }; + const first = await sendMessageAsDiscussion(input, makeDeps(store)); + const second = await sendMessageAsDiscussion({ ...input }, makeDeps(store)); + expect(first.ok && second.ok).toBe(true); + if (!first.ok || !second.ok) return; + expect(second.value.cid).toBe(first.value.cid); + }); - const result = await sendMessage( - store, + test("idempotencyKey: same key + different body is rejected with STATE_CONFLICT", async () => { + const store = new InMemoryContributionStore(); + const first = await sendMessageAsDiscussion( { agent: AGENT_ALICE, - body: "Replying to your message", + body: "first body", recipients: ["@bob"], - inReplyTo: parentCid, + idempotencyKey: "broadcast-2", }, - mockComputeCid, + makeDeps(store), ); + expect(first.ok).toBe(true); + if (!first.ok) return; + const second = await sendMessageAsDiscussion( + { + agent: AGENT_ALICE, + body: "different body", + recipients: ["@bob"], + idempotencyKey: "broadcast-2", + }, + makeDeps(store), + ); + expect(second.ok).toBe(false); + if (second.ok) return; + expect(second.error.code).toBe("STATE_CONFLICT"); + }); + + // ------------------------------------------------------------------------- + // Regression test for #228: contract role-kind constraint must apply. + // Before #228 fix: sendMessage bypassed PolicyEnforcer entirely so an + // agent restricted to allowedKinds=["work"] could still send messages. + // After #228 fix: sendMessageAsDiscussion routes through contributeOperation + // and PolicyEnforcer rejects with role_kind violation. + // ------------------------------------------------------------------------- + test("#228 regression: blocked when allowedKinds excludes 'discussion'", async () => { + const rawStore = new InMemoryContributionStore(); + const contract: GroveContract = { + contractVersion: 1, + name: "test-contract", + mode: ContributionMode.Evaluation, + agentConstraints: { allowedKinds: ["work"] }, + }; + const store = new EnforcingContributionStore(rawStore, contract); - expect(result.relations).toHaveLength(1); - expect(result.relations[0]?.relationType).toBe(RelationType.RespondsTo); - expect(result.relations[0]?.targetCid).toBe(parentCid); + const result = await sendMessageAsDiscussion( + { agent: AGENT_ALICE, body: "should fail", recipients: ["@bob"] }, + makeDeps(store, contract), + ); + expect(result.ok).toBe(false); + if (result.ok) return; + // PolicyEnforcer throws PolicyViolationError which the operation + // catches and converts to a result error. + expect(result.error.code).toBeTruthy(); + // Make sure no contribution was actually written. + const stored = await rawStore.list({ kind: ContributionKind.Discussion }); + expect(stored).toHaveLength(0); }); }); @@ -116,7 +238,6 @@ describe("readInbox", () => { ): Promise { const results: Contribution[] = []; for (const msg of messages) { - // Build contribution directly to control createdAt const input: ContributionInput = { kind: ContributionKind.Discussion, mode: "exploration", @@ -133,12 +254,13 @@ describe("readInbox", () => { agent: msg.from, createdAt: msg.createdAt, }; - const cid = mockComputeCid(input); - const contribution: Contribution = { - ...input, - cid, - manifestVersion: 1, - }; + // Compute a deterministic-ish CID (just hash the input by length+chars) + const raw = JSON.stringify(input); + let hash = 0; + for (let i = 0; i < raw.length; i++) hash = (hash * 31 + raw.charCodeAt(i)) | 0; + const hex = Math.abs(hash).toString(16).padStart(8, "0"); + const cid = `blake3:${hex.repeat(8)}`; + const contribution: Contribution = { ...input, cid, manifestVersion: 1 }; await store.put(contribution); results.push(contribution); } @@ -252,4 +374,86 @@ describe("readInbox", () => { expect(inbox).toHaveLength(1); expect(inbox[0]?.body).toBe("Broadcast"); }); + + test("multi-recipient query with OR semantics", async () => { + const store = new InMemoryContributionStore(); + await seedMessages(store, [ + { + from: AGENT_ALICE, + body: "to-bob", + recipients: ["@bob"], + createdAt: "2026-01-01T00:00:00Z", + }, + { + from: AGENT_ALICE, + body: "to-charlie", + recipients: ["@charlie"], + createdAt: "2026-01-01T01:00:00Z", + }, + { + from: AGENT_ALICE, + body: "to-dave", + recipients: ["@dave"], + createdAt: "2026-01-01T02:00:00Z", + }, + ]); + const inbox = await readInbox(store, { recipients: ["@bob", "@charlie"] }); + expect(inbox).toHaveLength(2); + const bodies = inbox.map((m) => m.body); + expect(bodies).toContain("to-bob"); + expect(bodies).toContain("to-charlie"); + expect(bodies).not.toContain("to-dave"); + }); + + test("empty inbox returns empty array", async () => { + const store = new InMemoryContributionStore(); + const inbox = await readInbox(store, { recipient: "@bob" }); + expect(inbox).toEqual([]); + }); + + test("non-message discussions (no ephemeral flag) are excluded", async () => { + const store = new InMemoryContributionStore(); + // Seed a regular discussion that is NOT a message — no ephemeral flag. + const regular: ContributionInput = { + kind: ContributionKind.Discussion, + mode: "exploration", + summary: "regular discussion", + description: "regular discussion", + artifacts: {}, + relations: [], + tags: [], + // Note: no ephemeral, no recipients, no message_body + agent: AGENT_ALICE, + createdAt: "2026-01-01T00:00:00Z", + }; + const c: Contribution = { ...regular, cid: "blake3:reg".padEnd(72, "0"), manifestVersion: 1 }; + await store.put(c); + + // Also seed a real message + await seedMessages(store, [ + { + from: AGENT_BOB, + body: "real msg", + recipients: ["@bob"], + createdAt: "2026-01-02T00:00:00Z", + }, + ]); + + const inbox = await readInbox(store, { recipient: "@bob" }); + expect(inbox).toHaveLength(1); + expect(inbox[0]?.body).toBe("real msg"); + }); + + test("limit caps the number of returned messages", async () => { + const store = new InMemoryContributionStore(); + const seedData = Array.from({ length: 10 }, (_, i) => ({ + from: AGENT_ALICE, + body: `msg-${i}`, + recipients: ["@bob"], + createdAt: `2026-01-01T${String(i).padStart(2, "0")}:00:00Z`, + })); + await seedMessages(store, seedData); + const inbox = await readInbox(store, { recipient: "@bob", limit: 3 }); + expect(inbox).toHaveLength(3); + }); }); diff --git a/src/core/operations/messaging.ts b/src/core/operations/messaging.ts index bb9db3a..657cc20 100644 --- a/src/core/operations/messaging.ts +++ b/src/core/operations/messaging.ts @@ -1,41 +1,58 @@ /** - * Messaging operations — send and read agent-to-agent messages. + * Inbox query operations. * - * Messages are modeled as `discussion`-kind contributions with - * `responds_to` relations and `recipients` + `ephemeral` context fields. - * This reuses the existing contribution graph (DRY) while keeping - * messages out of frontier ranking (ephemeral flag). + * Sending a message is just a discussion contribution with an ephemeral + * context payload — see `sendMessageAsDiscussion()` below for the helper + * that wraps `discussOperation`. This module's primary responsibility is + * the read side: filtering, sorting, and projecting discussion contributions + * back into InboxMessage shapes. * - * All messages flow through the ContributionStore — Nexus-first, - * with local SQLite as fallback. + * All messages flow through the ContributionStore — Nexus-first, with + * local SQLite as fallback. */ -import type { AgentIdentity, Contribution, ContributionInput } from "../models.js"; +import type { Contribution } from "../models.js"; import { ContributionKind, ContributionMode, RelationType } from "../models.js"; import type { ContributionStore } from "../store.js"; +import type { AgentOverrides } from "./agent.js"; +import { buildMessageContext, parseMessageContext } from "./context-schemas.js"; +import { contributeOperation } from "./contribute.js"; +import type { OperationDeps } from "./deps.js"; +import type { OperationResult } from "./result.js"; +import { ok, validationErr } from "./result.js"; // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- -/** Input for sending a message. */ +/** Input for sending a message via the discussion-as-message helper. */ export interface SendMessageInput { - /** The agent sending the message. */ - readonly agent: AgentIdentity; - /** Message body text. */ + /** Sending agent (overrides; resolved by discussOperation). */ + readonly agent?: AgentOverrides | undefined; + /** Message body text. Must be non-empty after trimming. */ readonly body: string; - /** Recipients: "@agent-name" handles, "@all" for broadcast. */ + /** Recipients: "@agent-name" handles, "@all" for broadcast. Must be non-empty. */ readonly recipients: readonly string[]; /** Optional CID to respond to (creates responds_to relation). */ readonly inReplyTo?: string | undefined; /** Optional tags for filtering. */ readonly tags?: readonly string[] | undefined; + /** Optional idempotency key for retry safety. */ + readonly idempotencyKey?: string | undefined; +} + +/** Result of sending a message via the helper. */ +export interface SendMessageResult { + readonly cid: string; + readonly summary: string; + readonly recipients: readonly string[]; + readonly createdAt: string; } /** A message read from the inbox. */ export interface InboxMessage { readonly cid: string; - readonly from: AgentIdentity; + readonly from: import("../models.js").AgentIdentity; readonly body: string; readonly recipients: readonly string[]; readonly inReplyTo?: string | undefined; @@ -63,54 +80,53 @@ export interface InboxQuery { // --------------------------------------------------------------------------- /** - * Send a message as a discussion contribution. + * Send a message by creating an ephemeral discussion contribution. + * + * Sugar over contributeOperation: builds a discussion-kind contribution + * with the ephemeral message context (recipients + body) and an optional + * responds_to relation. Routes through the canonical write pipeline so + * role-kind constraints, hooks, and idempotency apply uniformly with + * other contributions. * - * The message is stored as a `discussion`-kind contribution with: - * - `context.ephemeral = true` (excluded from frontier ranking) - * - `context.recipients` (array of @handles) - * - `context.message_body` (the text content) - * - Optional `responds_to` relation for threaded replies + * Replaces the previous standalone sendMessage that took a raw store + + * computeCid and bypassed PolicyEnforcer / TopologyRouter (the #228 bug + * for the messaging path). */ -export async function sendMessage( - store: ContributionStore, +export async function sendMessageAsDiscussion( input: SendMessageInput, - computeCid: (input: ContributionInput) => string, -): Promise { + deps: OperationDeps, +): Promise> { if (input.recipients.length === 0) { - throw new Error("Message must have at least one recipient"); + return validationErr("Message must have at least one recipient"); } if (input.body.trim().length === 0) { - throw new Error("Message body cannot be empty"); + return validationErr("Message body cannot be empty"); } - const contributionInput: ContributionInput = { - kind: ContributionKind.Discussion, - mode: ContributionMode.Exploration, - summary: truncateSummary(input.body), - description: input.body, - artifacts: {}, - relations: input.inReplyTo - ? [{ targetCid: input.inReplyTo, relationType: RelationType.RespondsTo }] - : [], - tags: [...(input.tags ?? []), "message"], - context: { - ephemeral: true, - recipients: [...input.recipients], - message_body: input.body, + const result = await contributeOperation( + { + kind: ContributionKind.Discussion, + mode: ContributionMode.Exploration, + summary: truncateSummary(input.body), + description: input.body, + relations: input.inReplyTo + ? [{ targetCid: input.inReplyTo, relationType: RelationType.RespondsTo }] + : [], + tags: [...(input.tags ?? []), "message"], + context: buildMessageContext({ recipients: input.recipients, body: input.body }), + ...(input.agent !== undefined ? { agent: input.agent } : {}), + ...(input.idempotencyKey !== undefined ? { idempotencyKey: input.idempotencyKey } : {}), }, - agent: input.agent, - createdAt: new Date().toISOString(), - }; - - const cid = computeCid(contributionInput); - const contribution: Contribution = { - ...contributionInput, - cid, - manifestVersion: 1, - }; - - await store.put(contribution); - return contribution; + deps, + ); + + if (!result.ok) return result as OperationResult; + return ok({ + cid: result.value.cid, + summary: result.value.summary, + recipients: [...input.recipients], + createdAt: result.value.createdAt, + }); } /** @@ -138,63 +154,68 @@ export async function readInbox( ...(query?.sessionId !== undefined ? { sessionId: query.sessionId } : {}), }); - let messages = contributions.filter((c) => { - if (c.context?.ephemeral !== true) return false; - if (!Array.isArray(c.context.recipients)) return false; - return true; + // Pair each contribution with its parsed message context up front so we + // don't re-parse on every filter pass and so wrong-shape contributions are + // dropped exactly once. + let messages = contributions.flatMap((c) => { + const ctx = parseMessageContext(c.context); + return ctx === undefined ? [] : [{ contribution: c, context: ctx }]; }); // Filter by single recipient (legacy) if (query?.recipient !== undefined) { const target = query.recipient; - messages = messages.filter((c) => { - const recipients = c.context?.recipients as string[]; - return recipients.includes(target) || recipients.includes("@all"); - }); + messages = messages.filter( + ({ context }) => context.recipients.includes(target) || context.recipients.includes("@all"), + ); } // Filter by multiple recipients (matches if any handle appears in the message) if (query?.recipients !== undefined && query.recipients.length > 0) { const handles = new Set(query.recipients); - messages = messages.filter((c) => { - const recipients = c.context?.recipients as string[]; - return recipients.some((r) => handles.has(r)); - }); + messages = messages.filter(({ context }) => context.recipients.some((r) => handles.has(r))); } // Filter by sender if (query?.fromAgentId !== undefined) { - messages = messages.filter((c) => c.agent.agentId === query.fromAgentId); + messages = messages.filter( + ({ contribution }) => contribution.agent.agentId === query.fromAgentId, + ); } // Filter by timestamp if (query?.since !== undefined) { const sinceMs = Date.parse(query.since); - messages = messages.filter((c) => Date.parse(c.createdAt) >= sinceMs); + messages = messages.filter(({ contribution }) => Date.parse(contribution.createdAt) >= sinceMs); } // Sort by most recent first - messages.sort((a, b) => Date.parse(b.createdAt) - Date.parse(a.createdAt)); + messages.sort( + (a, b) => Date.parse(b.contribution.createdAt) - Date.parse(a.contribution.createdAt), + ); // Apply limit const limit = query?.limit ?? 50; messages = messages.slice(0, limit); - return messages.map(contributionToMessage); + return messages.map(({ contribution, context }) => contributionToMessage(contribution, context)); } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- -function contributionToMessage(c: Contribution): InboxMessage { +function contributionToMessage( + c: Contribution, + ctx: { readonly recipients: readonly string[]; readonly message_body: string }, +): InboxMessage { const inReplyTo = c.relations.find((r) => r.relationType === RelationType.RespondsTo)?.targetCid; return { cid: c.cid, from: c.agent, - body: (c.context?.message_body as string) ?? c.description ?? c.summary, - recipients: (c.context?.recipients as string[]) ?? [], + body: ctx.message_body, + recipients: [...ctx.recipients], inReplyTo, createdAt: c.createdAt, tags: [...c.tags], diff --git a/src/core/operations/plan.test.ts b/src/core/operations/plan.test.ts new file mode 100644 index 0000000..3ae7407 --- /dev/null +++ b/src/core/operations/plan.test.ts @@ -0,0 +1,519 @@ +/** + * Tests for createPlanOperation + updatePlanOperation. + * + * After Issue 1A in the #228 review, plan operations route through + * contributeOperation, so this file covers: + * + * - Validation (title, tasks) + * - Stats computation (computeStats helper) + * - derives_from chain (updatePlan) + * - Title fall-through from previous version + * - Wrong-kind previous CID (Issue 6A) + * - Plan-kind constraint enforcement via PolicyEnforcer (Issue 1A regression) + * - idempotencyKey passthrough (Issue 4A) + * - Routing rules: plans don't generate handoffs, do fire route events + * - Stop conditions skipped for plan kind (Issue 13A) + */ + +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; + +import type { GroveContract } from "../contract.js"; +import { EnforcingContributionStore } from "../enforcing-store.js"; +import { ContributionKind, ContributionMode } from "../models.js"; +import type { ContributionStore } from "../store.js"; +import type { PlanTask } from "./context-schemas.js"; +import { _resetIdempotencyCacheForTests } from "./contribute.js"; +import type { OperationDeps } from "./deps.js"; +import { createPlanOperation, updatePlanOperation } from "./plan.js"; +import type { FullOperationDeps, TestOperationDeps } from "./test-helpers.js"; +import { createTestOperationDeps } from "./test-helpers.js"; + +const SAMPLE_TASKS: readonly PlanTask[] = [ + { id: "t1", title: "Design API", status: "todo" }, + { id: "t2", title: "Implement", status: "in_progress" }, + { id: "t3", title: "Tests", status: "done" }, + { id: "t4", title: "Deploy", status: "blocked" }, +]; + +// --------------------------------------------------------------------------- +// createPlanOperation +// --------------------------------------------------------------------------- + +describe("createPlanOperation", () => { + let testDeps: TestOperationDeps; + let deps: FullOperationDeps; + + beforeEach(async () => { + _resetIdempotencyCacheForTests(); + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + _resetIdempotencyCacheForTests(); + }); + + test("creates a plan with stats", async () => { + const result = await createPlanOperation( + { + title: "Phase 1", + tasks: SAMPLE_TASKS, + agent: { agentId: "planner" }, + }, + deps, + ); + + expect(result.ok).toBe(true); + if (!result.ok) return; + expect(result.value.title).toBe("Phase 1"); + expect(result.value.taskCount).toBe(4); + expect(result.value.todo).toBe(1); + expect(result.value.inProgress).toBe(1); + expect(result.value.done).toBe(1); + expect(result.value.blocked).toBe(1); + expect(result.value.cid).toMatch(/^blake3:/); + }); + + test("stores as kind=plan, mode=exploration with plan tag", async () => { + const result = await createPlanOperation( + { title: "P", tasks: SAMPLE_TASKS, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(true); + if (!result.ok) return; + const stored = await deps.contributionStore.get(result.value.cid); + expect(stored?.kind).toBe(ContributionKind.Plan); + expect(stored?.mode).toBe(ContributionMode.Exploration); + expect(stored?.tags).toContain("plan"); + }); + + test("stores typed PlanContext via buildPlanContext", async () => { + const result = await createPlanOperation( + { title: "Phase 1", tasks: SAMPLE_TASKS, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(true); + if (!result.ok) return; + const stored = await deps.contributionStore.get(result.value.cid); + expect(stored?.context?.plan_title).toBe("Phase 1"); + expect(stored?.context?.tasks).toEqual(SAMPLE_TASKS as never); + }); + + test("preserves user-supplied tags alongside 'plan' tag", async () => { + const result = await createPlanOperation( + { + title: "P", + tasks: SAMPLE_TASKS, + tags: ["sprint-1", "auth"], + agent: { agentId: "a" }, + }, + deps, + ); + expect(result.ok).toBe(true); + if (!result.ok) return; + const stored = await deps.contributionStore.get(result.value.cid); + expect(stored?.tags).toContain("plan"); + expect(stored?.tags).toContain("sprint-1"); + expect(stored?.tags).toContain("auth"); + }); + + test("rejects empty title", async () => { + const result = await createPlanOperation( + { title: " ", tasks: SAMPLE_TASKS, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toMatch(/title/i); + }); + + test("rejects empty task list", async () => { + const result = await createPlanOperation( + { title: "P", tasks: [], agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toMatch(/task/i); + }); + + test("computeStats counts mixed statuses correctly", async () => { + const tasks: readonly PlanTask[] = [ + { id: "1", title: "a", status: "done" }, + { id: "2", title: "b", status: "done" }, + { id: "3", title: "c", status: "in_progress" }, + { id: "4", title: "d", status: "todo" }, + { id: "5", title: "e", status: "todo" }, + { id: "6", title: "f", status: "todo" }, + { id: "7", title: "g", status: "blocked" }, + ]; + const result = await createPlanOperation( + { title: "Mixed", tasks, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(true); + if (!result.ok) return; + expect(result.value.taskCount).toBe(7); + expect(result.value.done).toBe(2); + expect(result.value.inProgress).toBe(1); + expect(result.value.todo).toBe(3); + expect(result.value.blocked).toBe(1); + }); + + // ------------------------------------------------------------------------- + // #228 regression: PolicyEnforcer must apply + // ------------------------------------------------------------------------- + test("#228 regression: blocked when allowedKinds excludes 'plan'", async () => { + const contract: GroveContract = { + contractVersion: 1, + name: "no-plans", + mode: ContributionMode.Evaluation, + agentConstraints: { allowedKinds: ["work"] }, + }; + const wrappedStore = new EnforcingContributionStore(deps.contributionStore, contract); + const wrappedDeps: OperationDeps = { + ...deps, + contributionStore: wrappedStore, + contract, + }; + + const result = await createPlanOperation( + { title: "blocked plan", tasks: SAMPLE_TASKS, agent: { agentId: "coder" } }, + wrappedDeps, + ); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.code).toBeTruthy(); + // The plan should NOT be in the underlying store. + const stored = await deps.contributionStore.list({ kind: ContributionKind.Plan }); + expect(stored).toHaveLength(0); + }); + + // ------------------------------------------------------------------------- + // Idempotency (Issue 4A) + // ------------------------------------------------------------------------- + test("idempotencyKey: identical retry returns cached result", async () => { + const input = { + title: "Idempotent plan", + tasks: SAMPLE_TASKS, + agent: { agentId: "a" }, + idempotencyKey: "plan-key-1", + }; + const first = await createPlanOperation(input, deps); + const second = await createPlanOperation({ ...input }, deps); + expect(first.ok && second.ok).toBe(true); + if (!first.ok || !second.ok) return; + expect(second.value.cid).toBe(first.value.cid); + }); + + test("idempotencyKey: same key + different title is rejected with STATE_CONFLICT", async () => { + const first = await createPlanOperation( + { + title: "Original title", + tasks: SAMPLE_TASKS, + agent: { agentId: "a" }, + idempotencyKey: "plan-key-2", + }, + deps, + ); + expect(first.ok).toBe(true); + if (!first.ok) return; + const second = await createPlanOperation( + { + title: "Different title, same key", + tasks: SAMPLE_TASKS, + agent: { agentId: "a" }, + idempotencyKey: "plan-key-2", + }, + deps, + ); + expect(second.ok).toBe(false); + if (second.ok) return; + expect(second.error.code).toBe("STATE_CONFLICT"); + }); +}); + +// --------------------------------------------------------------------------- +// updatePlanOperation +// --------------------------------------------------------------------------- + +describe("updatePlanOperation", () => { + let testDeps: TestOperationDeps; + let deps: FullOperationDeps; + + beforeEach(async () => { + _resetIdempotencyCacheForTests(); + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + _resetIdempotencyCacheForTests(); + }); + + test("creates a v2 plan with derives_from relation to v1", async () => { + const v1 = await createPlanOperation( + { title: "Phase 1", tasks: SAMPLE_TASKS, agent: { agentId: "a" } }, + deps, + ); + expect(v1.ok).toBe(true); + if (!v1.ok) return; + + const updatedTasks: readonly PlanTask[] = [ + { id: "t1", title: "Design API", status: "done" }, + { id: "t2", title: "Implement", status: "done" }, + { id: "t3", title: "Tests", status: "done" }, + { id: "t4", title: "Deploy", status: "in_progress" }, + ]; + const v2 = await updatePlanOperation( + { + previousPlanCid: v1.value.cid, + tasks: updatedTasks, + agent: { agentId: "a" }, + }, + deps, + ); + expect(v2.ok).toBe(true); + if (!v2.ok) return; + + expect(v2.value.title).toBe("Phase 1"); // falls through from previous + expect(v2.value.done).toBe(3); + expect(v2.value.inProgress).toBe(1); + + const stored = await deps.contributionStore.get(v2.value.cid); + expect(stored?.relations).toHaveLength(1); + expect(stored?.relations[0]?.relationType).toBe("derives_from"); + expect(stored?.relations[0]?.targetCid).toBe(v1.value.cid); + }); + + test("title falls through from previous plan when omitted", async () => { + const v1 = await createPlanOperation( + { title: "Inherited Title", tasks: SAMPLE_TASKS, agent: { agentId: "a" } }, + deps, + ); + expect(v1.ok).toBe(true); + if (!v1.ok) return; + + const v2 = await updatePlanOperation( + { previousPlanCid: v1.value.cid, tasks: SAMPLE_TASKS, agent: { agentId: "a" } }, + deps, + ); + expect(v2.ok).toBe(true); + if (!v2.ok) return; + expect(v2.value.title).toBe("Inherited Title"); + }); + + test("explicit title override on update", async () => { + const v1 = await createPlanOperation( + { title: "v1 title", tasks: SAMPLE_TASKS, agent: { agentId: "a" } }, + deps, + ); + expect(v1.ok).toBe(true); + if (!v1.ok) return; + + const v2 = await updatePlanOperation( + { + previousPlanCid: v1.value.cid, + title: "v2 title", + tasks: SAMPLE_TASKS, + agent: { agentId: "a" }, + }, + deps, + ); + expect(v2.ok).toBe(true); + if (!v2.ok) return; + expect(v2.value.title).toBe("v2 title"); + }); + + test("returns NOT_FOUND when previous CID does not exist", async () => { + const result = await updatePlanOperation( + { + previousPlanCid: "blake3:0000000000000000000000000000000000000000000000000000000000000000", + tasks: SAMPLE_TASKS, + agent: { agentId: "a" }, + }, + deps, + ); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.code).toBe("NOT_FOUND"); + }); + + // ------------------------------------------------------------------------- + // Issue 6A: kind-check on previous CID + // ------------------------------------------------------------------------- + test("Issue 6A: rejects update when previous CID is not a plan kind", async () => { + // Seed a non-plan contribution by creating a real work contribution + // through contributeOperation, so the CID is valid blake3 hex. + const { contributeOperation } = await import("./contribute.js"); + const work = await contributeOperation( + { + kind: ContributionKind.Work, + mode: ContributionMode.Evaluation, + summary: "real work", + agent: { agentId: "worker" }, + }, + deps, + ); + expect(work.ok).toBe(true); + if (!work.ok) return; + + const result = await updatePlanOperation( + { previousPlanCid: work.value.cid, tasks: SAMPLE_TASKS, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toMatch(/work.*not a plan|not a plan/i); + }); + + test("rejects empty task list", async () => { + const v1 = await createPlanOperation( + { title: "P", tasks: SAMPLE_TASKS, agent: { agentId: "a" } }, + deps, + ); + expect(v1.ok).toBe(true); + if (!v1.ok) return; + + const result = await updatePlanOperation( + { previousPlanCid: v1.value.cid, tasks: [], agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.code).toBe("VALIDATION_ERROR"); + }); + + test("derives_from chain across three versions", async () => { + const v1 = await createPlanOperation( + { title: "Multi-version", tasks: SAMPLE_TASKS, agent: { agentId: "a" } }, + deps, + ); + expect(v1.ok).toBe(true); + if (!v1.ok) return; + + const v2 = await updatePlanOperation( + { previousPlanCid: v1.value.cid, tasks: SAMPLE_TASKS, agent: { agentId: "a" } }, + deps, + ); + expect(v2.ok).toBe(true); + if (!v2.ok) return; + + const v3 = await updatePlanOperation( + { previousPlanCid: v2.value.cid, tasks: SAMPLE_TASKS, agent: { agentId: "a" } }, + deps, + ); + expect(v3.ok).toBe(true); + if (!v3.ok) return; + + expect(v3.value.title).toBe("Multi-version"); // inherited through both updates + + const v3Stored = await deps.contributionStore.get(v3.value.cid); + expect(v3Stored?.relations[0]?.targetCid).toBe(v2.value.cid); + + const v2Stored = await deps.contributionStore.get(v2.value.cid); + expect(v2Stored?.relations[0]?.targetCid).toBe(v1.value.cid); + }); +}); + +// --------------------------------------------------------------------------- +// Routing rules (Issues 1A + 13A) +// --------------------------------------------------------------------------- + +describe("plan routing semantics (Issues 1A + 13A)", () => { + let testDeps: TestOperationDeps; + let deps: FullOperationDeps; + + beforeEach(async () => { + _resetIdempotencyCacheForTests(); + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + _resetIdempotencyCacheForTests(); + }); + + test("creating a plan does NOT create handoff records, even with topology", async () => { + // Build a topology router that would route 'planner' -> 'coder' for any + // contribution. Plans should still skip handoff creation. + const topologyRouter = { + targetsFor: (role: string) => (role === "planner" ? ["coder"] : []), + route: async () => { + /* fire-and-forget event */ + }, + broadcastStop: async () => { + /* stop broadcast */ + }, + } as unknown as NonNullable; + + const depsWithRouting: OperationDeps = { ...deps, topologyRouter }; + + const result = await createPlanOperation( + { + title: "Routed plan", + tasks: SAMPLE_TASKS, + agent: { agentId: "planner-1", role: "planner" }, + }, + depsWithRouting, + ); + expect(result.ok).toBe(true); + if (!result.ok) return; + + // No handoff records should have been created. + const handoffs = await deps.handoffStore.list({ sourceCid: result.value.cid }); + expect(handoffs).toHaveLength(0); + }); + + test("plan write succeeds with stop conditions configured (skipped per 13A)", async () => { + // Configure stop conditions that would normally fire on every write. + // maxRoundsWithoutImprovement: 0 means "stop immediately if no + // improvement". Since plans skip stop-condition evaluation entirely + // (Issue 13A), the plan write succeeds without paying the O(n) scan + // and without triggering broadcastStop. + const contract: GroveContract = { + contractVersion: 1, + name: "with-stop-conditions", + mode: ContributionMode.Exploration, + stopConditions: { maxRoundsWithoutImprovement: 0 }, + }; + + const result = await createPlanOperation( + { + title: "Stop-bypass plan", + tasks: SAMPLE_TASKS, + agent: { agentId: "planner" }, + }, + { ...deps, contract } as OperationDeps, + ); + expect(result.ok).toBe(true); + }); +}); + +// --------------------------------------------------------------------------- +// Confirm contributionStore is required +// --------------------------------------------------------------------------- + +describe("plan operation deps", () => { + test("createPlan returns validation error when contributionStore is missing for updatePlan", async () => { + // This case applies to updatePlanOperation which checks store explicitly. + const emptyDeps = {} as ContributionStore as unknown as OperationDeps; + const result = await updatePlanOperation( + { + previousPlanCid: "blake3:0000000000000000000000000000000000000000000000000000000000000000", + tasks: SAMPLE_TASKS, + agent: { agentId: "a" }, + }, + emptyDeps, + ); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.code).toBe("VALIDATION_ERROR"); + }); +}); diff --git a/src/core/operations/plan.ts b/src/core/operations/plan.ts index d91c2de..88ebef2 100644 --- a/src/core/operations/plan.ts +++ b/src/core/operations/plan.ts @@ -1,32 +1,32 @@ /** - * Plan operations — create and update project plans. + * Plan operations — sugars over `contributeOperation`. * * Plans are modeled as `plan`-kind contributions with structured task * lists in the `context.tasks` field. Plan updates use `derives_from` * relations to form a version chain. + * + * Both operations route through `contributeOperation` so they go through + * the same role-kind / topology / hooks pipeline as `grove_submit_work`, + * preventing the bypass described in #228. Plans are coordination metadata + * (not progress) and opt out of handoff creation and stop-condition + * evaluation inside `contributeOperation` based on `kind === "plan"`. */ -import { createContribution } from "../manifest.js"; -import type { ContributionInput, JsonValue } from "../models.js"; import { ContributionKind, ContributionMode, RelationType } from "../models.js"; import type { AgentOverrides } from "./agent.js"; -import { resolveAgent } from "./agent.js"; +import { buildPlanContext, type PlanTask, parsePlanContext } from "./context-schemas.js"; +import { type ContributeResult, contributeOperation } from "./contribute.js"; import type { OperationDeps } from "./deps.js"; import type { OperationResult } from "./result.js"; import { notFound, ok, validationErr } from "./result.js"; +// Re-export for backwards compat with existing imports. +export type { PlanTask } from "./context-schemas.js"; + // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- -/** A single task within a plan. */ -export interface PlanTask { - readonly id: string; - readonly title: string; - readonly status: "todo" | "in_progress" | "done" | "blocked"; - readonly assignee?: string | undefined; -} - /** Input for creating a new plan. */ export interface CreatePlanInput { readonly title: string; @@ -34,6 +34,7 @@ export interface CreatePlanInput { readonly description?: string | undefined; readonly tags?: readonly string[] | undefined; readonly agent?: AgentOverrides | undefined; + readonly idempotencyKey?: string | undefined; } /** Input for updating an existing plan. */ @@ -44,6 +45,7 @@ export interface UpdatePlanInput { readonly description?: string | undefined; readonly tags?: readonly string[] | undefined; readonly agent?: AgentOverrides | undefined; + readonly idempotencyKey?: string | undefined; } /** Result of a plan operation. */ @@ -62,16 +64,16 @@ export interface PlanResult { // Operations // --------------------------------------------------------------------------- -/** Create a new plan contribution. */ +/** + * Create a new plan contribution. + * + * Sugar over contributeOperation: kind=plan, mode=exploration, with the + * plan title and tasks stored in context via buildPlanContext. + */ export async function createPlanOperation( input: CreatePlanInput, deps: OperationDeps, ): Promise> { - const store = deps.contributionStore; - if (!store) { - return validationErr("contributionStore is required"); - } - if (!input.title || input.title.trim().length === 0) { return validationErr("Plan title is required"); } @@ -79,40 +81,31 @@ export async function createPlanOperation( return validationErr("Plan must have at least one task"); } - const agent = resolveAgent(input.agent); - const now = new Date().toISOString(); - - const contributionInput: ContributionInput = { - kind: ContributionKind.Plan, - mode: ContributionMode.Exploration, - summary: `Plan: ${input.title}`, - description: input.description, - artifacts: {}, - relations: [], - tags: [...(input.tags ?? []), "plan"], - context: { - plan_title: input.title, - tasks: input.tasks as unknown as JsonValue, + const result = await contributeOperation( + { + kind: ContributionKind.Plan, + mode: ContributionMode.Exploration, + summary: `Plan: ${input.title}`, + ...(input.description !== undefined ? { description: input.description } : {}), + tags: [...(input.tags ?? []), "plan"], + context: buildPlanContext({ title: input.title, tasks: input.tasks }), + ...(input.agent !== undefined ? { agent: input.agent } : {}), + ...(input.idempotencyKey !== undefined ? { idempotencyKey: input.idempotencyKey } : {}), }, - agent, - createdAt: now, - }; - - const contribution = createContribution(contributionInput); - await store.put(contribution); - deps.onContributionWrite?.(); - deps.onContributionWritten?.(contribution.cid); + deps, + ); - const stats = computeStats(input.tasks); - return ok({ - cid: contribution.cid, - title: input.title, - ...stats, - createdAt: contribution.createdAt, - }); + if (!result.ok) return result as OperationResult; + return ok(toPlanResult(result.value, input.title, input.tasks)); } -/** Update an existing plan (creates a new version with derives_from relation). */ +/** + * Update an existing plan (creates a new version with derives_from relation). + * + * Sugar over contributeOperation. Validates that the previous CID resolves + * to an actual plan-kind contribution before creating the update — see + * Issue 6A in the #228 review. + */ export async function updatePlanOperation( input: UpdatePlanInput, deps: OperationDeps, @@ -122,59 +115,69 @@ export async function updatePlanOperation( return validationErr("contributionStore is required"); } - // Verify the previous plan exists + // Verify the previous CID resolves AND points at a plan. Doing the kind + // check here (instead of relying on validateRelations alone) gives a clear + // 'wrong kind' error and prevents constructing a plan update that derives + // from a work / review / discussion contribution. const previous = await store.get(input.previousPlanCid); if (!previous) { return notFound("Previous plan", input.previousPlanCid); } + if (previous.kind !== ContributionKind.Plan) { + return validationErr( + `Previous CID ${input.previousPlanCid} is a '${previous.kind}' contribution, not a plan`, + ); + } if (!input.tasks || input.tasks.length === 0) { return validationErr("Plan must have at least one task"); } - const title = input.title ?? (previous.context?.plan_title as string) ?? "Untitled Plan"; - const agent = resolveAgent(input.agent); - const now = new Date().toISOString(); - - const contributionInput: ContributionInput = { - kind: ContributionKind.Plan, - mode: ContributionMode.Exploration, - summary: `Plan update: ${title}`, - description: input.description, - artifacts: {}, - relations: [ - { - targetCid: input.previousPlanCid, - relationType: RelationType.DerivesFrom, - }, - ], - tags: [...(input.tags ?? []), "plan"], - context: { - plan_title: title, - tasks: input.tasks as unknown as JsonValue, + const previousContext = parsePlanContext(previous.context); + const title = input.title ?? previousContext?.plan_title ?? "Untitled Plan"; + + const result = await contributeOperation( + { + kind: ContributionKind.Plan, + mode: ContributionMode.Exploration, + summary: `Plan update: ${title}`, + ...(input.description !== undefined ? { description: input.description } : {}), + relations: [ + { + targetCid: input.previousPlanCid, + relationType: RelationType.DerivesFrom, + }, + ], + tags: [...(input.tags ?? []), "plan"], + context: buildPlanContext({ title, tasks: input.tasks }), + ...(input.agent !== undefined ? { agent: input.agent } : {}), + ...(input.idempotencyKey !== undefined ? { idempotencyKey: input.idempotencyKey } : {}), }, - agent, - createdAt: now, - }; - - const contribution = createContribution(contributionInput); - await store.put(contribution); - deps.onContributionWrite?.(); - deps.onContributionWritten?.(contribution.cid); + deps, + ); - const stats = computeStats(input.tasks); - return ok({ - cid: contribution.cid, - title, - ...stats, - createdAt: contribution.createdAt, - }); + if (!result.ok) return result as OperationResult; + return ok(toPlanResult(result.value, title, input.tasks)); } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- +function toPlanResult( + result: ContributeResult, + title: string, + tasks: readonly PlanTask[], +): PlanResult { + const stats = computeStats(tasks); + return { + cid: result.cid, + title, + ...stats, + createdAt: result.createdAt, + }; +} + function computeStats(tasks: readonly PlanTask[]): { taskCount: number; done: number; diff --git a/src/core/policy-enforcer.ts b/src/core/policy-enforcer.ts index 19d5849..20b3c40 100644 --- a/src/core/policy-enforcer.ts +++ b/src/core/policy-enforcer.ts @@ -138,8 +138,17 @@ export class PolicyEnforcer { * * @param contribution - The contribution to enforce (already created but not yet stored). * @param strict - If true, violations throw instead of being returned as flags. + * @param options.skipStopConditions - When true, skip the stop-condition + * evaluation step entirely. Used by callers that route coordination kinds + * (plans, ephemeral messages) through the enforcement pipeline for the + * role-kind check but don't want their coordination traffic to count + * toward progress-driven stop conditions or pay the O(n) scan cost. */ - async enforce(contribution: Contribution, strict = false): Promise { + async enforce( + contribution: Contribution, + strict = false, + options?: { readonly skipStopConditions?: boolean }, + ): Promise { // Reset best-score cache so each enforce() call gets a fresh view of the store. // The cache is repopulated lazily on the first findBestScore() call within // this invocation, keeping per-call O(n) scan cost to at most once. @@ -256,7 +265,7 @@ export class PolicyEnforcer { // run inside the write mutex when configured. Thread walks are // parallelized and depth-capped to bound latency (see stop-conditions.ts). let stopResult: StopCheckResult | undefined; - if (this.contract.stopConditions !== undefined) { + if (this.contract.stopConditions !== undefined && options?.skipStopConditions !== true) { try { const evalResult = await evaluateStopConditions(this.contract, this.contributionStore); stopResult = { diff --git a/src/mcp/serve.ts b/src/mcp/serve.ts index 2bc4281..709f665 100644 --- a/src/mcp/serve.ts +++ b/src/mcp/serve.ts @@ -54,17 +54,6 @@ try { const nexusUrl = process.env.GROVE_NEXUS_URL; const nexusApiKey = process.env.NEXUS_API_KEY; - // Debug: log MCP server config to file (stderr is ignored in detached mode) - try { - const { appendFileSync } = await import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [mcp-serve] groveDir=${groveDir} nexusUrl=${nexusUrl ?? "none"} hasApiKey=${!!nexusApiKey}\n`, - ); - } catch { - /* non-fatal */ - } - // Always create local runtime for workspace, contract, frontier, CAS const runtime = createLocalRuntime({ groveDir, @@ -113,15 +102,6 @@ try { zoneId, sessionId: process.env.GROVE_SESSION_ID, }); - try { - const { appendFileSync: afs } = await import("node:fs"); - afs( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [mcp-store] sessionId=${process.env.GROVE_SESSION_ID ?? "none"} zoneId=${zoneId}\n`, - ); - } catch { - /* ignore */ - } claimStore = new NexusClaimStore({ client: nexusClient, zoneId }); bountyStore = new NexusBountyStore({ client: nexusClient, zoneId }); outcomeStore = new NexusOutcomeStore({ client: nexusClient, zoneId }); @@ -133,26 +113,8 @@ try { zoneId, ); process.stderr.write(`grove-mcp: using Nexus stores at ${nexusUrl}\n`); - try { - const { appendFileSync } = await import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [mcp-serve] NEXUS STORES active at ${nexusUrl}\n`, - ); - } catch { - /* ignore */ - } } else { process.stderr.write(`grove-mcp: Nexus unreachable, using local stores\n`); - try { - const { appendFileSync } = await import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [mcp-serve] LOCAL STORES (Nexus unreachable at ${nexusUrl})\n`, - ); - } catch { - /* ignore */ - } nexusClient = undefined; } } catch (err) { @@ -162,6 +124,18 @@ try { process.stderr.write(`grove-mcp: using local stores at ${groveDir}\n`); } + // Wrap the contribution store with rate-limit / clock-skew enforcement + // when a contract is loaded. Mirrors the CLI path in + // src/cli/commands/contribute.ts:353. Without this wrap, MCP-served + // contributions bypass the rate limits configured in GROVE.md (Issue 2A + // in the #228 review). + if (runtime.contract !== undefined) { + const { EnforcingContributionStore } = await import("../core/enforcing-store.js"); + contributionStore = new EnforcingContributionStore(contributionStore, runtime.contract, { + cas, + }); + } + // Wire EventBus + TopologyRouter for IPC when topology exists. let eventBus: import("../core/event-bus.js").EventBus | undefined; let topologyRouter: TopologyRouter | undefined; @@ -176,15 +150,6 @@ try { eventBus = new LocalEventBus(); } topologyRouter = new TopologyRouter(runtime.contract.topology, eventBus); - try { - const { appendFileSync } = await import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [mcp-serve] TopologyRouter created, eventBus=${nexusClient ? "NexusEventBus" : "LocalEventBus"}\n`, - ); - } catch { - /* ignore */ - } } // When running with a local SQLite store and GROVE_SESSION_ID is set, @@ -215,19 +180,6 @@ try { // Nexus handoff store when available, falls back to local SQLite handoffStore: nexusHandoffStore ?? runtime.handoffStore, }; - // Debug: log which handoff store is active - try { - const { appendFileSync } = await import("node:fs"); - const storeType = nexusHandoffStore ? "NexusHandoffStore" : "SqliteHandoffStore(fallback)"; - const hasInsertSync = - typeof (deps.handoffStore as unknown as Record)?.insertSync === "function"; - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [mcp-serve] handoffStore=${storeType} hasInsertSync=${hasInsertSync} nexusHandoffStore=${nexusHandoffStore != null}\n`, - ); - } catch { - /* non-fatal */ - } // Derive MCP tool preset from contract mode — #11 MCP Tool Surface + #12 Concept Usage const contractMode = runtime.contract?.mode ?? "exploration"; const hasMetrics = diff --git a/src/mcp/server.integration.test.ts b/src/mcp/server.integration.test.ts index 71d7989..5e3ac28 100644 --- a/src/mcp/server.integration.test.ts +++ b/src/mcp/server.integration.test.ts @@ -231,3 +231,86 @@ describe("MCP server integration", () => { expect(getText(result)).toContain("NOT_FOUND"); }); }); + +// --------------------------------------------------------------------------- +// Wiring test: EnforcingContributionStore wrap for the MCP path +// --------------------------------------------------------------------------- +// +// Issue 11A in the #228 review. Rate limits live on the +// EnforcingContributionStore wrapper, which serve.ts is responsible for +// applying. Without this test, anyone removing the wrap from serve.ts +// would silently break GROVE.md rate-limit configuration in MCP mode. +// +// This test mirrors the wiring logic in src/mcp/serve.ts: build the +// raw store, wrap it with EnforcingContributionStore when a contract +// is loaded, then exercise it through the same MCP boundary the +// production server uses. The 2nd contribution must be rejected with +// a RateLimitError. + +describe("MCP server: rate-limit wiring (Issue 2A/11A)", () => { + test("rate-limited contract rejects 2nd contribution at the MCP boundary", async () => { + const { EnforcingContributionStore } = await import("../core/enforcing-store.js"); + const { ContributionMode } = await import("../core/models.js"); + const testDeps = await createTestMcpDeps(); + try { + // Build a contract with a tight per-agent limit. + const contract = { + contractVersion: 1, + name: "rate-limit-test", + mode: ContributionMode.Evaluation, + rateLimits: { maxContributionsPerAgentPerHour: 1 }, + }; + + // Wrap exactly as src/mcp/serve.ts now does (Issue 2A). + const wrappedStore = new EnforcingContributionStore( + testDeps.deps.contributionStore, + contract, + { cas: testDeps.deps.cas }, + ); + + const wrappedDeps: McpDeps = { + ...testDeps.deps, + contributionStore: wrappedStore, + contract, + }; + + const server = await createMcpServer(wrappedDeps); + const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); + const client = new Client({ name: "rate-test-client", version: "0.0.1" }); + await server.connect(serverTransport); + await client.connect(clientTransport); + + try { + // First call: should succeed. + const first = await client.callTool({ + name: "grove_submit_work", + arguments: { + summary: "first contribution", + tags: ["rate-test"], + artifacts: {}, + agent: { agentId: "limited-agent" }, + }, + }); + expect(first.isError).toBeFalsy(); + + // Second call: must be rejected by the rate limit. + const second = await client.callTool({ + name: "grove_submit_work", + arguments: { + summary: "second contribution", + tags: ["rate-test"], + artifacts: {}, + agent: { agentId: "limited-agent" }, + }, + }); + expect(second.isError).toBeTruthy(); + expect(getText(second)).toMatch(/rate.?limit|RATE_LIMIT/i); + } finally { + await client.close(); + await server.close(); + } + } finally { + await testDeps.cleanup(); + } + }); +}); diff --git a/src/mcp/tools/done.ts b/src/mcp/tools/done.ts index 3379764..26582e0 100644 --- a/src/mcp/tools/done.ts +++ b/src/mcp/tools/done.ts @@ -48,9 +48,15 @@ export function registerDoneTools(server: McpServer, deps: McpDeps): void { { kind: "discussion", summary: `[DONE] ${args.summary}`, + // ephemeral: true routes grove_done through the same skip-handoff / + // skip-route-event path as ephemeral messages. A session-terminator + // contribution should not create routing records that wake up + // downstream agents with "new work" to pick up. See the routing + // rules table in src/core/operations/contribute.ts (isEphemeralMessageContext). context: { done: true, reason: args.summary, + ephemeral: true, } as Readonly>, agent: { ...(args.agent as import("../../core/operations/agent.js").AgentOverrides), diff --git a/src/mcp/tools/messaging.ts b/src/mcp/tools/messaging.ts index dd6ebb5..139b873 100644 --- a/src/mcp/tools/messaging.ts +++ b/src/mcp/tools/messaging.ts @@ -18,8 +18,9 @@ import type { AgentOverrides } from "../../core/operations/agent.js"; import { resolveAgent } from "../../core/operations/agent.js"; import type { UsageReport } from "../../core/operations/cost-tracking.js"; import { reportUsage } from "../../core/operations/cost-tracking.js"; -import { readInbox, sendMessage } from "../../core/operations/messaging.js"; +import { readInbox, sendMessageAsDiscussion } from "../../core/operations/messaging.js"; import type { McpDeps } from "../deps.js"; +import { toMcpResult, toOperationDeps } from "../operation-adapter.js"; import { agentSchema } from "../schemas.js"; // --------------------------------------------------------------------------- @@ -81,43 +82,31 @@ const reportUsageInputSchema = z.object({ // --------------------------------------------------------------------------- export function registerMessagingTools(server: McpServer, deps: McpDeps): void { + const opDeps = toOperationDeps(deps); + // --- grove_send_message -------------------------------------------------- server.registerTool( "grove_send_message", { description: "Send a message to other agents in the boardroom. Messages are stored as ephemeral " + - "discussion contributions. Use @all to broadcast, or specify individual agent handles " + - "as recipients. Supports threaded replies via in_reply_to.", + "discussion contributions and routed through the same enforcement pipeline as " + + "grove_submit_work. Use @all to broadcast, or specify individual agent handles as " + + "recipients. Supports threaded replies via in_reply_to.", inputSchema: sendMessageInputSchema, }, async (args) => { - const agent = resolveAgent(args.agent as AgentOverrides | undefined); - - const contribution = await sendMessage( - deps.contributionStore, + const result = await sendMessageAsDiscussion( { - agent, + agent: args.agent as AgentOverrides | undefined, body: args.body, recipients: args.recipients, ...(args.in_reply_to !== undefined ? { inReplyTo: args.in_reply_to } : {}), tags: args.tags, }, - computeCid, + opDeps, ); - - return { - content: [ - { - type: "text" as const, - text: JSON.stringify({ - cid: contribution.cid, - summary: contribution.summary, - recipients: args.recipients, - }), - }, - ], - }; + return toMcpResult(result); }, ); diff --git a/src/nexus/nexus-handoff-store.ts b/src/nexus/nexus-handoff-store.ts index 2618267..d17ae8a 100644 --- a/src/nexus/nexus-handoff-store.ts +++ b/src/nexus/nexus-handoff-store.ts @@ -169,7 +169,23 @@ export class NexusHandoffStore implements HandoffStore { // --------------------------------------------------------------------------- async create(input: HandoffInput): Promise { - const handoff: Handoff = { + const [handoff] = await this.createMany([input]); + if (handoff === undefined) { + throw new Error("createMany returned no handoff"); + } + return handoff; + } + + /** + * Batch creation: collapses N handoff inserts into a single VFS file + * write (one casUpdate, one HTTP round-trip). Avoids the N+1 pattern + * the contributeOperation serial path used to have when fanning out + * to multiple downstream roles. + */ + async createMany(inputs: readonly HandoffInput[]): Promise { + if (inputs.length === 0) return []; + + const handoffs: Handoff[] = inputs.map((input) => ({ handoffId: input.handoffId ?? crypto.randomUUID(), sourceCid: input.sourceCid, fromRole: input.fromRole, @@ -182,38 +198,17 @@ export class NexusHandoffStore implements HandoffStore { requiresReply: input.requiresReply ?? false, ...(input.replyDueAt !== undefined ? { replyDueAt: input.replyDueAt } : {}), createdAt: new Date().toISOString(), - }; + })); const path = this.filePath(); - try { - await this.casUpdate(path, (existing) => { - // Idempotent: skip if already present - if (existing.some((h) => h.handoffId === handoff.handoffId)) return existing; - return [...existing, handoff]; - }); - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [nexus-handoff] WRITE OK path=${path} id=${handoff.handoffId} ${handoff.fromRole}→${handoff.toRole}\n`, - ); - } catch { - /* */ - } - } catch (err) { - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [nexus-handoff] WRITE FAIL path=${path} err=${err instanceof Error ? err.message : String(err)}\n`, - ); - } catch { - /* */ - } - throw err; - } + await this.casUpdate(path, (existing) => { + // Idempotent merge: skip handoffs whose id is already present. + const existingIds = new Set(existing.map((h) => h.handoffId)); + const fresh = handoffs.filter((h) => !existingIds.has(h.handoffId)); + return fresh.length === 0 ? existing : [...existing, ...fresh]; + }); - return handoff; + return handoffs; } async get(handoffId: string): Promise { diff --git a/src/server/routes/boardroom.ts b/src/server/routes/boardroom.ts index 396a589..ad4ee00 100644 --- a/src/server/routes/boardroom.ts +++ b/src/server/routes/boardroom.ts @@ -17,8 +17,9 @@ import { z } from "zod"; import { computeCid } from "../../core/manifest.js"; import { ContributionKind, RelationType } from "../../core/models.js"; import { answerQuestion } from "../../core/operations/ask-user-bus.js"; -import { sendMessage } from "../../core/operations/messaging.js"; +import { sendMessageAsDiscussion } from "../../core/operations/messaging.js"; import type { ServerEnv } from "../deps.js"; +import { toOperationDeps } from "../operation-adapter.js"; // --------------------------------------------------------------------------- // File-local schemas (not exported — avoids isolatedDeclarations issues) @@ -240,25 +241,30 @@ boardroom.post("/answer", zValidator("json", answerBodySchema), async (c) => { */ boardroom.post("/message", zValidator("json", messageBodySchema), async (c) => { const deps = c.get("deps"); - const store = deps.contributionStore; - const body = c.req.valid("json"); - const operator = { agentId: "tui-operator", agentName: "operator" }; - const contribution = await sendMessage( - store, - { agent: operator, body: body.body, recipients: body.recipients, inReplyTo: body.inReplyTo }, - computeCid, + const result = await sendMessageAsDiscussion( + { + agent: { agentId: "tui-operator", agentName: "operator" }, + body: body.body, + recipients: body.recipients, + ...(body.inReplyTo !== undefined ? { inReplyTo: body.inReplyTo } : {}), + }, + toOperationDeps(deps), ); + if (!result.ok) { + return c.json({ error: result.error.message }, 400); + } + // Link the message contribution to the session so it is visible in scoped reads if (body.sessionId && deps.goalSessionStore) { await deps.goalSessionStore - .addContributionToSession(body.sessionId, contribution.cid) + .addContributionToSession(body.sessionId, result.value.cid) .catch(() => { /* best-effort */ }); } - return c.json({ cid: contribution.cid, summary: contribution.summary }); + return c.json({ cid: result.value.cid, summary: result.value.summary }); }); diff --git a/tests/boardroom/e2e-workflow.test.ts b/tests/boardroom/e2e-workflow.test.ts index 288fb61..1af4000 100644 --- a/tests/boardroom/e2e-workflow.test.ts +++ b/tests/boardroom/e2e-workflow.test.ts @@ -25,7 +25,8 @@ import { submitQuestion, } from "../../src/core/operations/ask-user-bus.js"; import { getSessionCosts, reportUsage } from "../../src/core/operations/cost-tracking.js"; -import { readInbox, sendMessage } from "../../src/core/operations/messaging.js"; +import type { OperationDeps } from "../../src/core/operations/deps.js"; +import { readInbox, sendMessageAsDiscussion } from "../../src/core/operations/messaging.js"; import type { AgentTopology } from "../../src/core/topology.js"; // --------------------------------------------------------------------------- @@ -209,36 +210,39 @@ describe("E2E boardroom workflow", () => { expect(parsed.profiles[0]?.name).toBe("@claude-eng"); // Step 2: Agent 1 sends a message to Agent 2 - const msg = await sendMessage( - store as never, + const opDeps: OperationDeps = { contributionStore: store as never }; + const msgResult = await sendMessageAsDiscussion( { agent: agent1, body: "I've started reviewing the auth module, line 42 looks suspicious", recipients: ["@codex-rev"], tags: ["review"], }, - mockComputeCid, + opDeps, ); - - expect(msg.kind).toBe(ContributionKind.Discussion); - expect(msg.context?.ephemeral).toBe(true); - expect(msg.context?.recipients).toEqual(["@codex-rev"]); + expect(msgResult.ok).toBe(true); + if (!msgResult.ok) return; + const msgStored = await store.get(msgResult.value.cid); + expect(msgStored?.kind).toBe(ContributionKind.Discussion); + expect(msgStored?.context?.ephemeral).toBe(true); + expect(msgStored?.context?.recipients).toEqual(["@codex-rev"]); // Step 3: Agent 2 replies - const reply = await sendMessage( - store as never, + const replyResult = await sendMessageAsDiscussion( { agent: agent2, body: "Good catch, that's a SQL injection vector. I'll analyze further.", recipients: ["@claude-eng"], - inReplyTo: msg.cid, + inReplyTo: msgResult.value.cid, }, - mockComputeCid, + opDeps, ); - - expect(reply.relations).toHaveLength(1); - expect(reply.relations[0]?.relationType).toBe(RelationType.RespondsTo); - expect(reply.relations[0]?.targetCid).toBe(msg.cid); + expect(replyResult.ok).toBe(true); + if (!replyResult.ok) return; + const replyStored = await store.get(replyResult.value.cid); + expect(replyStored?.relations).toHaveLength(1); + expect(replyStored?.relations[0]?.relationType).toBe(RelationType.RespondsTo); + expect(replyStored?.relations[0]?.targetCid).toBe(msgResult.value.cid); // Step 4: Read inbox for Agent 2 — should see Agent 1's message const agent2Inbox = await readInbox(store as never, { @@ -411,14 +415,13 @@ describe("E2E boardroom workflow", () => { const store = new InMemoryStore(); cidCounter = 200; - await sendMessage( - store as never, + await sendMessageAsDiscussion( { agent: agent1, body: "Team standup: what's everyone working on?", recipients: ["@all"], }, - mockComputeCid, + { contributionStore: store as never }, ); // Both agents should see the broadcast