From c88d559de250edc532894ab9726634465b087537 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Fri, 10 Apr 2026 17:28:20 -0700 Subject: [PATCH 1/5] =?UTF-8?q?fix(tui,mcp,acpx):=20production-ready=20cod?= =?UTF-8?q?er=E2=86=92reviewer=20loop=20via=20Nexus=20IPC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit End-to-end fix for the full review-loop path: Grove TUI → acpx(claude) → grove MCP stdio → Nexus contribution store → TopologyRouter handoff → reviewer IPC push → grove_submit_review → reviewer→coder handoff. Validated locally: both coder→reviewer and reviewer→coder handoffs delivered in the TUI Handoffs panel, hello.txt loop completes via native `mcp__grove__grove_cas_put` / `grove_submit_work` / `grove_submit_review` tool calls — no curl fallback, no SQLite split-brain. Root causes fixed: 1. acpx DEFAULT_AGENT hardcoded to "codex" src/tui/main.ts + src/core/acpx-runtime.ts Every role was spawned as codex regardless of the launch-preview CLI mapping. Now AcpxRuntime.spawn() extracts the agent (claude/codex/ gemini/pi/openclaw) from AgentConfig.command and passes it through to `acpx `. main.ts drops the hardcoded override. 2. Grove wrote .mcp.json but acpx reads .acpxrc.json src/tui/spawn-manager.ts acpx (even in v0.5.3) does not read claude-code's .mcp.json format; it expects an .acpxrc.json with mcpServers as an array and env as [{name,value}]. spawn-manager now writes both files on every spawn and protects .acpxrc.json from agent mutation. 3. acpx 0.3.1 had a broken claude-agent-acp adapter src/core/acpx-runtime.ts 0.3.x pulled @zed-industries/claude-agent-acp@^0.21.0 which failed session/new with "Query closed before response received". 0.5.3 switched to @agentclientprotocol/claude-agent-acp@^0.25.0. Added a startup version check that surfaces a clear upgrade hint. 4. grove MCP server threw on Nexus-mode startup src/mcp/serve.ts createLocalRuntime({parseContract: true}) tried to load the session config from local SQLite, which doesn't have the record in Nexus mode — throwing "Session has no stored config" and killing the MCP server. serve.ts now skips the local parse when GROVE_NEXUS_URL is set and reconstructs the contract from NexusSessionStore.getSession. 5. Nexus session record had no frozen contract src/server/routes/sessions.ts + src/tui/provider-shared.ts + src/mcp/serve.ts toSessionResponse() stripped the `config` field before returning, so the mirrored Nexus session only had `topology`. serve.ts now loads the authoritative GroveContract from the session record, giving real rate-limit enforcement in nexus mode instead of a minimal stub. 6. serve-http.ts was SQLite-only src/mcp/serve-http.ts The HTTP MCP transport ignored GROVE_NEXUS_URL entirely and wrote every contribution to the local grove.db. Bypassed handoff routing for any caller hitting :4015/mcp. Now mirrors serve.ts: Nexus stores + NexusHandoffStore + TopologyRouter when GROVE_NEXUS_URL is set. Reads GROVE_SESSION_ID from .grove/current-session.json when the env var is unavailable (serve-http.ts is spawned before sessions are created). 7. TUI didn't write current-session.json src/tui/screens/screen-manager.tsx Added a best-effort write on setSessionScope so serve-http.ts can pick up the session ID without restarting. 8. spawn-manager didn't pass GROVE_NEXUS_URL to agent MCP servers src/tui/spawn-manager.ts writeMcpConfig built mcpEnv from process.env but the TUI was often launched without GROVE_NEXUS_URL in env (nexusUrl lives in grove.json). Added a fallback that reads this.groveDir/grove.json when the env var is missing. Without this, the spawned grove MCP subprocess fell back to local SQLite and the TUI never saw contributions in the Nexus feed. 9. nexus-lifecycle GROVE_NEXUS_URL priority src/cli/nexus-lifecycle.ts Env var was last in the candidateUrls list so a healthy-looking but broken Nexus would always win. Moved to first position. 10. tmux-runtime wrong socket src/core/tmux-runtime.ts listSessions() and capture-pane hit the default tmux server; spawn uses `-L grove`. They were talking to different daemons — sessions created by grove were invisible to grove's own listSessions. Added `-L grove` to both commands. 11. codex mcp registration blocked every spawn ~3s src/tui/spawn-manager.ts `codex mcp remove` + `codex mcp add` in writeMcpConfig were synchronous and added ~3s per spawn, blocking the spawn chain on machines where codex is installed. Moved to fire-and-forget since the claude path reads .acpxrc.json directly and codex reads its config lazily. Other fixes: - src/tui/provider-shared.ts: ApiSessionResponse now includes `config` - tests/presets/preset-e2e-nexus.test.ts: rpc() sends NEXUS_API_KEY auth header when set, surfaces non-2xx bodies. Dropped broken swarm-ops contribution assertion (preset enforces a has_relation gate that a bare CLI contribute can't satisfy; seed data was removed in 6da5494 without updating this expectation). - src/tui/spawn-manager.test.ts: setDefaultTimeout(30_000) for spawn tests that do real git worktree I/O. - src/core/acpx-runtime.test.ts: regex matches the new session-name format (grove---), timeout bumped to 30s. Test coverage: `bun test` → 4878 pass, 0 fail (was 4873 / 5 fail). `bun x tsc --noEmit` clean. `bun x biome check` clean on changed files. Requires: acpx >= 0.5.3 (`npm install -g acpx@latest`). --- src/cli/nexus-lifecycle.ts | 2 +- src/core/acpx-runtime.test.ts | 28 +++-- src/core/acpx-runtime.ts | 67 ++++++++++-- src/core/tmux-runtime.ts | 6 +- src/mcp/serve-http.ts | 137 +++++++++++++++++++++++-- src/mcp/serve.ts | 67 ++++++++++-- src/server/routes/sessions.ts | 6 ++ src/tui/main.ts | 5 +- src/tui/provider-shared.ts | 2 + src/tui/screens/screen-manager.tsx | 18 +++- src/tui/spawn-manager.test.ts | 8 +- src/tui/spawn-manager.ts | 110 ++++++++++++++++---- tests/presets/preset-e2e-nexus.test.ts | 52 +++++----- 13 files changed, 421 insertions(+), 87 deletions(-) diff --git a/src/cli/nexus-lifecycle.ts b/src/cli/nexus-lifecycle.ts index fe16a8ef..c4d0d4cd 100644 --- a/src/cli/nexus-lifecycle.ts +++ b/src/cli/nexus-lifecycle.ts @@ -600,11 +600,11 @@ export async function ensureNexusRunning( } const candidateUrls = [ + process.env.GROVE_NEXUS_URL, // explicit env override takes highest priority containerUrl, // container IP (works without port binding) config.nexusUrl, readNexusUrl(projectRoot), stateFileUrl, - process.env.GROVE_NEXUS_URL, DEFAULT_NEXUS_URL, ].filter((u): u is string => !!u); diff --git a/src/core/acpx-runtime.test.ts b/src/core/acpx-runtime.test.ts index db9eb753..3caeb6cd 100644 --- a/src/core/acpx-runtime.test.ts +++ b/src/core/acpx-runtime.test.ts @@ -75,16 +75,24 @@ describe("AcpxRuntime", () => { expect(rt).toBeDefined(); }); - test("spawn and close work when acpx is available", async () => { - const rt = new AcpxRuntime(); - const skip = !(await rt.isAvailable()); - if (skip) return; + test( + "spawn and close work when acpx is available", + async () => { + const rt = new AcpxRuntime(); + const skip = !(await rt.isAvailable()); + if (skip) return; - const session = await rt.spawn("test", config); - expect(session.id).toMatch(/^grove-test-\d+$/); - expect(session.role).toBe("test"); - expect(session.status).toBe("running"); + // Session names now carry a per-spawn counter and a base36 timestamp: + // grove--- + const session = await rt.spawn("test", config); + expect(session.id).toMatch(/^grove-test-\d+-[a-z0-9]+$/); + expect(session.role).toBe("test"); + expect(session.status).toBe("running"); - await rt.close(session); - }); + await rt.close(session); + }, + // acpx can take >5s when the agent adapter has to be fetched via npx or + // when the host machine is under load — default timeout was racing. + 30_000, + ); }); diff --git a/src/core/acpx-runtime.ts b/src/core/acpx-runtime.ts index 6fc6e3a0..3dc6f572 100644 --- a/src/core/acpx-runtime.ts +++ b/src/core/acpx-runtime.ts @@ -67,8 +67,24 @@ export class AcpxRuntime implements AgentRuntime { async isAvailable(): Promise { try { - execSync("acpx --version", { encoding: "utf-8", stdio: "pipe" }); - return true; + const out = execSync("acpx --version", { encoding: "utf-8", stdio: "pipe" }).trim(); + // acpx >=0.5.3 is required: 0.3.x uses the buggy @zed-industries/claude-agent-acp + // adapter that fails session/new with "Query closed before response received". + // 0.5.x switched to @agentclientprotocol/claude-agent-acp which works. + const match = /^(\d+)\.(\d+)\.(\d+)/.exec(out); + if (!match) return true; // unparseable — don't block + const [, maj, min, patch] = match; + const major = Number(maj); + const minor = Number(min); + const patchNum = Number(patch); + if (major > 0) return true; + if (minor > 5) return true; + if (minor === 5 && patchNum >= 3) return true; + process.stderr.write( + `[acpx-runtime] acpx ${out} is too old — grove requires acpx >=0.5.3. ` + + `Upgrade with: npm install -g acpx@latest\n`, + ); + return false; } catch { return false; } @@ -82,21 +98,58 @@ export class AcpxRuntime implements AgentRuntime { const counter = this.nextId++; const sessionName = `grove-${role}-${counter}-${Date.now().toString(36)}`; const id = sessionName; - const mergedEnv = config.env ? { ...process.env, ...config.env } : { ...process.env }; + + // Strip Claude Code harness env vars before spawning the subagent. + // + // When grove runs inside a Claude Code shell (CLAUDECODE=1), any inner + // `claude` subprocess detects that flag and connects back to the parent + // Claude Code harness via its IPC channel instead of launching a fresh + // session. The inner agent then inherits the parent's tool surface + // (ToolSearch, EnterWorktree, mcp__MaaS-*, etc.) and — critically — does + // NOT load the workspace's `.mcp.json` / `.acpxrc.json` grove MCP server. + // + // Unset every CLAUDE_CODE_* / CLAUDECODE / CLAUDE_PLUGIN_* var so acpx + // launches a pristine agent that reads its MCP config from the workspace. + const baseEnv = { ...process.env }; + for (const key of Object.keys(baseEnv)) { + if ( + key === "CLAUDECODE" || + key.startsWith("CLAUDE_CODE_") || + key.startsWith("CLAUDE_PLUGIN_") + ) { + delete baseEnv[key]; + } + } + const mergedEnv = config.env ? { ...baseEnv, ...config.env } : baseEnv; + + // Extract the agent binary name from config.command (e.g. "claude --flag" → "claude"). + // acpx takes the agent name as a subcommand; flags and the initial prompt go through + // the session creation path, not the `acpx ` argument. + // + // Only known acpx subcommands are accepted (claude/codex/gemini/pi/openclaw). Any + // other first token (including shell builtins like `echo` used in tests) falls back + // to the runtime-level default so we never pass acpx an unknown agent name. + const KNOWN_ACPX_AGENTS = new Set(["claude", "codex", "gemini", "pi", "openclaw"]); + const agent = (() => { + if (!config.command) return this.agent; + const stripped = config.command.replace(/^rm\s+[^;]+;\s*/, ""); // drop leading "rm -f ~/..." hooks + const first = stripped.trim().split(/\s+/)[0] ?? ""; + return KNOWN_ACPX_AGENTS.has(first) ? first : this.agent; + })(); // Create a new acpx session with --approve-all (layer 1: acpx client-side auto-approve) - const createCmd = `acpx --approve-all ${shellEscape(this.agent)} sessions new --name ${shellEscape(sessionName)}`; + const createCmd = `acpx --approve-all ${shellEscape(agent)} sessions new --name ${shellEscape(sessionName)}`; try { execSync(createCmd, { encoding: "utf-8", stdio: "pipe", cwd: config.cwd, env: mergedEnv }); } catch (err) { const msg = err instanceof Error ? err.message : String(err); - throw new Error(`acpx session creation failed for role "${role}": ${msg}`); + throw new Error(`acpx session creation failed for role "${role}" (agent=${agent}): ${msg}`); } // Set full-access mode (layer 2: codex internal approval policy = never prompt) try { execSync( - `acpx --approve-all ${shellEscape(this.agent)} set-mode full-access -s ${shellEscape(sessionName)}`, + `acpx --approve-all ${shellEscape(agent)} set-mode full-access -s ${shellEscape(sessionName)}`, { encoding: "utf-8", stdio: "pipe", cwd: config.cwd, env: mergedEnv, timeout: 10_000 }, ); } catch { @@ -107,7 +160,7 @@ export class AcpxRuntime implements AgentRuntime { const logFile = this.logDir ? join(this.logDir, `${role}-${counter}.log`) : null; const entry: AcpxSessionEntry = { session, - agent: this.agent, + agent, sessionName, cwd: config.cwd, env: mergedEnv, diff --git a/src/core/tmux-runtime.ts b/src/core/tmux-runtime.ts index 78821438..fa96eb7b 100644 --- a/src/core/tmux-runtime.ts +++ b/src/core/tmux-runtime.ts @@ -128,7 +128,9 @@ export class TmuxRuntime implements AgentRuntime { async listSessions(): Promise { try { - const output = execSync("tmux list-sessions -F '#{session_name}'", { + // Must use the same `-L grove` socket as spawn(); listing without it + // queries the default tmux server and misses every grove session. + const output = execSync("tmux -L grove list-sessions -F '#{session_name}'", { encoding: "utf-8", stdio: "pipe", }); @@ -162,7 +164,7 @@ export class TmuxRuntime implements AgentRuntime { if (!entry) return; try { - const output = execSync(`tmux capture-pane -p -t ${shellEscape(sessionId)}`, { + const output = execSync(`tmux -L grove capture-pane -p -t ${shellEscape(sessionId)}`, { encoding: "utf-8", stdio: "pipe", }); diff --git a/src/mcp/serve-http.ts b/src/mcp/serve-http.ts index 78da85dd..87b669d5 100644 --- a/src/mcp/serve-http.ts +++ b/src/mcp/serve-http.ts @@ -25,6 +25,7 @@ import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/ import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; import { findGroveDir } from "../cli/context.js"; +import { TopologyRouter } from "../core/topology-router.js"; import { createLocalRuntime } from "../local/runtime.js"; import { parsePort } from "../shared/env.js"; import { safeCleanup } from "../shared/safe-cleanup.js"; @@ -58,27 +59,149 @@ try { throw new Error("Not inside a grove. Run 'grove init' to create one, or set GROVE_DIR."); } + // Skip the local contract parse in Nexus mode — the contract lives in the + // Nexus session record and is loaded below. In local mode, createLocalRuntime + // reads GROVE.md + local SQLite session config as usual. + const nexusUrl = process.env.GROVE_NEXUS_URL; + const runtime = createLocalRuntime({ groveDir, frontierCacheTtlMs: 5_000, workspace: true, - parseContract: true, + parseContract: !nexusUrl, }); - // Note: creditsService is intentionally omitted — see serve.ts for rationale. if (!runtime.workspace) { throw new Error("Workspace manager failed to initialize"); } + + // Try Nexus stores when GROVE_NEXUS_URL is set — mirrors serve.ts setup. + const nexusApiKey = process.env.NEXUS_API_KEY; + const zoneId = process.env.GROVE_ZONE_ID ?? "default"; + // Read session ID from env first (set by parent TUI if available), then fall + // back to a state file that the TUI writes when setSessionScope is called. + // serve-http.ts is typically spawned before the session exists, so the state + // file is the usual source of truth. + const sessionId = (() => { + const fromEnv = process.env.GROVE_SESSION_ID; + if (fromEnv) return fromEnv; + try { + const { existsSync, readFileSync } = require("node:fs") as typeof import("node:fs"); + const sessionFile = `${groveDir}/current-session.json`; + if (existsSync(sessionFile)) { + const raw = JSON.parse(readFileSync(sessionFile, "utf-8")) as { sessionId?: string }; + return raw.sessionId; + } + } catch { + /* ignore — fall through to undefined */ + } + return undefined; + })(); + + let contributionStore = runtime.contributionStore as import("../core/store.js").ContributionStore; + let claimStore = runtime.claimStore as import("../core/store.js").ClaimStore; + let bountyStore = runtime.bountyStore as import("../core/bounty-store.js").BountyStore; + let cas = runtime.cas as import("../core/cas.js").ContentStore; + let nexusClient: import("../nexus/nexus-http-client.js").NexusHttpClient | undefined; + let nexusHandoffStore: import("../nexus/nexus-handoff-store.js").NexusHandoffStore | undefined; + let topologyRouter: TopologyRouter | undefined; + + if (nexusUrl) { + try { + const { NexusHttpClient } = await import("../nexus/nexus-http-client.js"); + const { NexusContributionStore } = await import("../nexus/nexus-contribution-store.js"); + const { NexusClaimStore } = await import("../nexus/nexus-claim-store.js"); + const { NexusBountyStore } = await import("../nexus/nexus-bounty-store.js"); + const { NexusCas } = await import("../nexus/nexus-cas.js"); + + nexusClient = new NexusHttpClient({ + url: nexusUrl, + ...(nexusApiKey ? { apiKey: nexusApiKey } : {}), + }); + + const health = await Promise.race([ + fetch(`${nexusUrl}/health`, { signal: AbortSignal.timeout(3000) }).then((r) => r.ok), + new Promise((r) => setTimeout(() => r(false), 3000)), + ]).catch(() => false); + + if (health) { + contributionStore = new NexusContributionStore({ + client: nexusClient, + zoneId, + sessionId, + }); + claimStore = new NexusClaimStore({ client: nexusClient, zoneId }); + bountyStore = new NexusBountyStore({ client: nexusClient, zoneId }); + cas = new NexusCas({ client: nexusClient, zoneId }); + const { NexusHandoffStore } = await import("../nexus/nexus-handoff-store.js"); + nexusHandoffStore = new NexusHandoffStore(nexusClient, sessionId, zoneId); + process.stderr.write(`grove-mcp-http: using Nexus stores at ${nexusUrl}\n`); + } else { + process.stderr.write(`grove-mcp-http: Nexus unreachable, using local stores\n`); + nexusClient = undefined; + } + } catch (err) { + process.stderr.write(`grove-mcp-http: Nexus setup failed, using local stores: ${err}\n`); + } + } + + // Load contract: in Nexus mode, from the Nexus session record. In local mode, + // createLocalRuntime already populated runtime.contract from GROVE.md + local DB. + let loadedContract: import("../core/contract.js").GroveContract | undefined = runtime.contract; + if (nexusClient && !loadedContract && sessionId) { + try { + const { NexusSessionStore } = await import("../nexus/nexus-session-store.js"); + const nexusSessionStore = new NexusSessionStore(nexusClient, zoneId); + const session = await nexusSessionStore.getSession(sessionId); + if (session?.config) { + loadedContract = session.config; + process.stderr.write( + `grove-mcp-http: loaded full contract from Nexus session ${sessionId}\n`, + ); + } else if (session?.topology) { + // Backwards-compat: older session records have only topology. + loadedContract = { + contractVersion: 1, + name: session.presetName ?? "nexus-session", + mode: "exploration", + topology: session.topology, + }; + process.stderr.write( + `grove-mcp-http: Nexus session ${sessionId} missing config; ` + + `reconstructed minimal contract from topology (rate limits NOT enforced)\n`, + ); + } + } catch (err) { + process.stderr.write( + `grove-mcp-http: failed to load Nexus session config: ${err instanceof Error ? err.message : String(err)}\n`, + ); + } + } + + if (loadedContract !== undefined) { + const { EnforcingContributionStore } = await import("../core/enforcing-store.js"); + contributionStore = new EnforcingContributionStore(contributionStore, loadedContract, { cas }); + } + + if (loadedContract?.topology && nexusClient) { + const { NexusEventBus } = await import("../nexus/nexus-event-bus.js"); + const eventBus = new NexusEventBus(nexusClient, zoneId); + topologyRouter = new TopologyRouter(loadedContract.topology, eventBus); + process.stderr.write(`grove-mcp-http: IPC via Nexus EventBus at ${nexusUrl}\n`); + } + deps = { - contributionStore: runtime.contributionStore, - claimStore: runtime.claimStore, - bountyStore: runtime.bountyStore, - cas: runtime.cas, + contributionStore, + claimStore, + bountyStore, + cas, frontier: runtime.frontier, workspace: runtime.workspace, - contract: runtime.contract, + contract: loadedContract, onContributionWrite: runtime.onContributionWrite, workspaceBoundary: runtime.groveRoot, + ...(nexusHandoffStore ? { handoffStore: nexusHandoffStore } : {}), + ...(topologyRouter ? { topologyRouter } : {}), }; closeStores = () => runtime.close(); } catch (error) { diff --git a/src/mcp/serve.ts b/src/mcp/serve.ts index 709f665d..7ec5e6d0 100644 --- a/src/mcp/serve.ts +++ b/src/mcp/serve.ts @@ -54,12 +54,18 @@ try { const nexusUrl = process.env.GROVE_NEXUS_URL; const nexusApiKey = process.env.NEXUS_API_KEY; - // Always create local runtime for workspace, contract, frontier, CAS + // Always create local runtime for workspace, frontier, CAS. + // + // Contract loading: in local mode we read the session config from the local + // SQLite goalSessionStore. In Nexus mode, the session config lives only in + // the Nexus VFS (see NexusProvider.createSession → nexusSessionStore.putSession), + // so we skip the SQLite session-config lookup here and reconstruct the + // contract from the Nexus session record further down. const runtime = createLocalRuntime({ groveDir, frontierCacheTtlMs: 5_000, workspace: true, - parseContract: true, + parseContract: !nexusUrl, }); if (!runtime.workspace) { @@ -124,14 +130,57 @@ try { process.stderr.write(`grove-mcp: using local stores at ${groveDir}\n`); } + // In Nexus mode we skipped the local contract parse; load it from the Nexus + // session record instead. The TUI mirrors the session to Nexus via + // NexusSessionStore.putSession with the frozen contract snapshot, so we + // pull the authoritative contract from there. Without this, topologyRouter + // is never created and coder→reviewer handoffs are never generated. + let loadedContract: import("../core/contract.js").GroveContract | undefined = runtime.contract; + if (nexusClient && !loadedContract && process.env.GROVE_SESSION_ID) { + try { + const { NexusSessionStore } = await import("../nexus/nexus-session-store.js"); + const nexusSessionStore = new NexusSessionStore(nexusClient, zoneId); + const session = await nexusSessionStore.getSession(process.env.GROVE_SESSION_ID); + if (session?.config) { + loadedContract = session.config; + process.stderr.write( + `grove-mcp: loaded full contract from Nexus session ${process.env.GROVE_SESSION_ID}\n`, + ); + } else if (session?.topology) { + // Backwards-compat fallback: older session records have only topology, + // no frozen config. Reconstruct a minimal contract so topology routing + // still works. This path should disappear once all in-flight sessions + // are recreated with the updated server. + loadedContract = { + contractVersion: 1, + name: session.presetName ?? "nexus-session", + mode: "exploration", + topology: session.topology, + }; + process.stderr.write( + `grove-mcp: Nexus session ${process.env.GROVE_SESSION_ID} missing config; ` + + `reconstructed minimal contract from topology (rate limits NOT enforced)\n`, + ); + } else { + process.stderr.write( + `grove-mcp: Nexus session ${process.env.GROVE_SESSION_ID} has no topology; topologyRouter disabled\n`, + ); + } + } catch (err) { + process.stderr.write( + `grove-mcp: failed to load Nexus session config: ${err instanceof Error ? err.message : String(err)}\n`, + ); + } + } + // Wrap the contribution store with rate-limit / clock-skew enforcement // when a contract is loaded. Mirrors the CLI path in // src/cli/commands/contribute.ts:353. Without this wrap, MCP-served // contributions bypass the rate limits configured in GROVE.md (Issue 2A // in the #228 review). - if (runtime.contract !== undefined) { + if (loadedContract !== undefined) { const { EnforcingContributionStore } = await import("../core/enforcing-store.js"); - contributionStore = new EnforcingContributionStore(contributionStore, runtime.contract, { + contributionStore = new EnforcingContributionStore(contributionStore, loadedContract, { cas, }); } @@ -140,7 +189,7 @@ try { let eventBus: import("../core/event-bus.js").EventBus | undefined; let topologyRouter: TopologyRouter | undefined; - if (runtime.contract?.topology) { + if (loadedContract?.topology) { if (nexusClient) { const { NexusEventBus } = await import("../nexus/nexus-event-bus.js"); eventBus = new NexusEventBus(nexusClient, zoneId); @@ -149,7 +198,7 @@ try { const { LocalEventBus } = await import("../core/local-event-bus.js"); eventBus = new LocalEventBus(); } - topologyRouter = new TopologyRouter(runtime.contract.topology, eventBus); + topologyRouter = new TopologyRouter(loadedContract.topology, eventBus); } // When running with a local SQLite store and GROVE_SESSION_ID is set, @@ -170,7 +219,7 @@ try { cas, frontier: runtime.frontier, workspace: runtime.workspace, - contract: runtime.contract, + contract: loadedContract, onContributionWrite: runtime.onContributionWrite, ...(onContributionWritten ? { onContributionWritten } : {}), workspaceBoundary: runtime.groveRoot, @@ -181,9 +230,9 @@ try { handoffStore: nexusHandoffStore ?? runtime.handoffStore, }; // Derive MCP tool preset from contract mode — #11 MCP Tool Surface + #12 Concept Usage - const contractMode = runtime.contract?.mode ?? "exploration"; + const contractMode = loadedContract?.mode ?? "exploration"; const hasMetrics = - runtime.contract?.metrics !== undefined && Object.keys(runtime.contract.metrics).length > 0; + loadedContract?.metrics !== undefined && Object.keys(loadedContract.metrics).length > 0; preset = contractMode === "evaluation" diff --git a/src/server/routes/sessions.ts b/src/server/routes/sessions.ts index 28c613d0..d2f7f586 100644 --- a/src/server/routes/sessions.ts +++ b/src/server/routes/sessions.ts @@ -51,6 +51,12 @@ function toSessionResponse(session: Session) { endedAt: session.completedAt, contributionCount: session.contributionCount, ...(session.topology !== undefined && { topology: session.topology }), + // Expose the frozen contract snapshot so clients (e.g. NexusProvider mirroring) + // and downstream MCP servers can reconstruct the full enforcement contract + // without hitting the local SQLite store. Without this, serve.ts running in + // Nexus mode has to fall back to a minimal reconstruction from topology, + // which loses rate limits / metrics configured in GROVE.md. + ...(session.config !== undefined && { config: session.config }), }; } diff --git a/src/tui/main.ts b/src/tui/main.ts index 756ac6e3..5bcd6ef4 100644 --- a/src/tui/main.ts +++ b/src/tui/main.ts @@ -278,8 +278,11 @@ async function buildAppProps( { const { AcpxRuntime } = await import("../core/acpx-runtime.js"); const effectiveGrovePath = groveDir; + // Note: no `agent` override — AcpxRuntime.spawn() derives the agent + // (claude/codex/gemini) from each role's AgentConfig.command, which comes + // from the launch-preview role→CLI mapping. Setting it here would force + // every role to the same agent regardless of the user's selection. const runtime = new AcpxRuntime({ - agent: "codex", ...(effectiveGrovePath ? { logDir: join(effectiveGrovePath, "agent-logs") } : {}), }); const available = await runtime.isAvailable(); diff --git a/src/tui/provider-shared.ts b/src/tui/provider-shared.ts index b846d018..525d6e34 100644 --- a/src/tui/provider-shared.ts +++ b/src/tui/provider-shared.ts @@ -251,6 +251,7 @@ interface ApiSessionResponse { readonly endedAt?: string; readonly completedAt?: string; readonly topology?: import("../core/topology.js").AgentTopology; + readonly config?: import("../core/contract.js").GroveContract; readonly contributionCount?: number; } @@ -263,6 +264,7 @@ function mapApiSession(raw: ApiSessionResponse): SessionRecord { createdAt: (raw.startedAt ?? raw.createdAt) as string, completedAt: raw.endedAt ?? raw.completedAt, topology: raw.topology, + config: raw.config, contributionCount: raw.contributionCount ?? 0, }; } diff --git a/src/tui/screens/screen-manager.tsx b/src/tui/screens/screen-manager.tsx index 80132426..4721efa0 100644 --- a/src/tui/screens/screen-manager.tsx +++ b/src/tui/screens/screen-manager.tsx @@ -390,6 +390,22 @@ export const ScreenManager: React.NamedExoticComponent = Rea } else { process.stderr.write(`[spawnAgents] provider does NOT have setSessionScope\n`); } + // Write the session ID to .grove/current-session.json so the HTTP + // MCP server (spawned before the session exists) can pick it up at + // startup. Best-effort; the stdio MCP path doesn't need this. + if (appProps.groveDir) { + try { + const { writeFileSync } = await import("node:fs"); + const { join } = await import("node:path"); + writeFileSync( + join(appProps.groveDir, "current-session.json"), + JSON.stringify({ sessionId: session.id }, null, 2), + "utf-8", + ); + } catch { + /* best-effort */ + } + } setState((s) => ({ ...s, sessionId: session.id })); } catch (err) { const msg = err instanceof Error ? err.message : String(err); @@ -488,7 +504,7 @@ export const ScreenManager: React.NamedExoticComponent = Rea setState((s) => ({ ...s, screen: "running", goal, sessionStartedAt })); } }, - [provider, topology, contract, state.selectedPreset, spawnManager], + [provider, topology, contract, state.selectedPreset, spawnManager, appProps.groveDir], ); // Screen 3 (launch preview) -> spawning: Ctrl+Enter confirmed launch diff --git a/src/tui/spawn-manager.test.ts b/src/tui/spawn-manager.test.ts index 9fcb60ba..6067bd85 100644 --- a/src/tui/spawn-manager.test.ts +++ b/src/tui/spawn-manager.test.ts @@ -6,7 +6,13 @@ * in SpawnManager (not just provider methods). */ -import { afterEach, describe, expect, test } from "bun:test"; +import { afterEach, describe, expect, setDefaultTimeout, test } from "bun:test"; + +// spawn() does real work: git worktree add into the current repo, writeFile +// for config artifacts, chmod, and writeMcpConfig. Individual spawns routinely +// take 1–3s on a warm machine and 3–5s cold; the default 5s timeout races tests +// that spawn multiple agents. Bump to 30s so CI and local runs are stable. +setDefaultTimeout(30_000); import type { Claim } from "../core/models.js"; import type { SpawnOptions, TmuxManager } from "./agents/tmux-manager.js"; import { MockTmuxManager } from "./agents/tmux-manager.js"; diff --git a/src/tui/spawn-manager.ts b/src/tui/spawn-manager.ts index a11e24a0..7f6156c2 100644 --- a/src/tui/spawn-manager.ts +++ b/src/tui/spawn-manager.ts @@ -214,7 +214,13 @@ export class SpawnManager { } // Step 2c: Protect config files from agent mutation (#7 Workspace Mutation Constraints) const { chmod } = await import("node:fs/promises"); - for (const protectedFile of [".mcp.json", "CLAUDE.md", "CODEX.md", ".grove-role"]) { + for (const protectedFile of [ + ".mcp.json", + ".acpxrc.json", + "CLAUDE.md", + "CODEX.md", + ".grove-role", + ]) { const filePath = join(workspacePath, protectedFile); await chmod(filePath, 0o444).catch(() => { // File may not exist — non-fatal @@ -646,7 +652,7 @@ export class SpawnManager { debugLog("route", `rsync ${sourceWorkspace} → ${targetWorkspace}`); try { execSync( - `rsync -a --exclude='.git' --exclude='.mcp.json' --exclude='CODEX.md' --exclude='CLAUDE.md' --exclude='.grove-role' "${sourceWorkspace}/" "${targetWorkspace}/"`, + `rsync -a --exclude='.git' --exclude='.mcp.json' --exclude='.acpxrc.json' --exclude='CODEX.md' --exclude='CLAUDE.md' --exclude='.grove-role' "${sourceWorkspace}/" "${targetWorkspace}/"`, { stdio: "pipe", timeout: 10_000 }, ); debugLog("route", `rsync done`); @@ -1119,14 +1125,41 @@ export class SpawnManager { const groveDir = join(workspacePath, "..", ".."); // Resolve the project root (parent of .grove) for finding src/mcp/serve.ts const projectRoot = join(groveDir, ".."); + + // Resolve Nexus URL: env var takes precedence (explicit override), then + // fall back to the nexusUrl stored in the *session*'s .grove/grove.json + // (set by `grove init --nexus-url` and refreshed by startServices). + // + // Note: `groveDir` here is the SHARED nexus-workspaces dir (parent of all + // workspace folders), not the per-session .grove dir — so we can't read + // grove.json from it. Use `this.groveDir` (the SpawnManager's session-level + // .grove path) for the config lookup instead. + // + // Agents' MCP servers need GROVE_NEXUS_URL so contributions go to Nexus + // (enables IPC push via NexusEventBus + TopologyRouter). Without it, + // contributions only land in local SQLite and the reviewer is never notified. + let resolvedNexusUrl: string | undefined = process.env.GROVE_NEXUS_URL; + if (!resolvedNexusUrl && this.groveDir) { + try { + const configPath = join(this.groveDir, "grove.json"); + if (existsSync(configPath)) { + const raw = await (await import("node:fs/promises")).readFile(configPath, "utf-8"); + const cfg = JSON.parse(raw) as { nexusUrl?: string }; + if (cfg.nexusUrl) resolvedNexusUrl = cfg.nexusUrl; + } + } catch { + /* best-effort */ + } + } + // MCP server needs GROVE_NEXUS_URL so contributions are written to Nexus // (enables IPC push via NexusEventBus + TopologyRouter for agent routing). // Without it, contributions only go to local SQLite and reviewer never gets notified. const mcpEnv: Record = { GROVE_DIR: groveDir, }; - if (process.env.GROVE_NEXUS_URL) { - mcpEnv.GROVE_NEXUS_URL = process.env.GROVE_NEXUS_URL; + if (resolvedNexusUrl) { + mcpEnv.GROVE_NEXUS_URL = resolvedNexusUrl; } if (process.env.NEXUS_API_KEY) { mcpEnv.NEXUS_API_KEY = process.env.NEXUS_API_KEY; @@ -1202,25 +1235,62 @@ export class SpawnManager { `wrote .mcp.json: serve=${mcpServePath} hasNexusUrl=${!!mcpEnv.GROVE_NEXUS_URL} GROVE_DIR=${mcpEnv.GROVE_DIR}`, ); + // Also write .acpxrc.json — acpx (>=0.5.3) reads this, NOT .mcp.json. + // acpx forwards mcpServers to claude-agent-acp via ACP protocol, enabling + // native grove_* tool calls in the agent. Without this file, acpx launches + // claude with mcpServers=[] and the agent falls back to curling the HTTP + // MCP endpoint, which bypasses per-session Nexus scoping and handoff routing. + // Schema: array of servers with `name`, `type`, `command`, `args`, `env: [{name,value}]`. + const acpxRcConfig = { + mcpServers: [ + { + name: "grove", + type: "stdio", + command: "bun", + args: ["run", mcpServePath], + env: Object.entries(mcpEnv).map(([name, value]) => ({ name, value })), + }, + ], + }; + await writeFile( + join(workspacePath, ".acpxrc.json"), + JSON.stringify(acpxRcConfig, null, 2), + "utf-8", + ); + debugLog("mcpConfig", `wrote .acpxrc.json for acpx mcpServers forwarding`); + // Register MCP with codex globally (codex uses ~/.codex/config.toml, not .mcp.json). // Use a single stable name "grove" so we don't accumulate stale per-spawn entries. // Pass Nexus env vars so codex agents write contributions to Nexus (cross-process IPC). - try { - const nexusEnvFlags = [ - `--env GROVE_DIR=${groveDir}`, - ...(mcpEnv.GROVE_NEXUS_URL ? [`--env GROVE_NEXUS_URL=${mcpEnv.GROVE_NEXUS_URL}`] : []), - ...(mcpEnv.NEXUS_API_KEY ? [`--env NEXUS_API_KEY=${mcpEnv.NEXUS_API_KEY}`] : []), - ...(mcpEnv.GROVE_SESSION_ID ? [`--env GROVE_SESSION_ID=${mcpEnv.GROVE_SESSION_ID}`] : []), - ].join(" "); - execSync(`codex mcp remove grove 2>/dev/null || true`, { stdio: "pipe", timeout: 5000 }); - execSync(`codex mcp add grove ${nexusEnvFlags} -- bun run ${mcpServePath}`, { - stdio: "pipe", - timeout: 10000, - }); - debugLog("mcpConfig", `codex mcp registered: ${nexusEnvFlags}`); - } catch { - // Non-fatal — codex may not be installed - } + // + // Fired asynchronously (not awaited) because each codex invocation takes + // ~1.5s and would add several seconds to every spawn. The claude agent + // runtime doesn't depend on it — it reads .acpxrc.json directly — and + // codex reads its own config lazily when the first tool call happens, so + // landing the registration a few hundred ms late is safe. + const nexusEnvFlags = [ + `--env GROVE_DIR=${groveDir}`, + ...(mcpEnv.GROVE_NEXUS_URL ? [`--env GROVE_NEXUS_URL=${mcpEnv.GROVE_NEXUS_URL}`] : []), + ...(mcpEnv.NEXUS_API_KEY ? [`--env NEXUS_API_KEY=${mcpEnv.NEXUS_API_KEY}`] : []), + ...(mcpEnv.GROVE_SESSION_ID ? [`--env GROVE_SESSION_ID=${mcpEnv.GROVE_SESSION_ID}`] : []), + ].join(" "); + void (async () => { + try { + const { spawnSync } = await import("node:child_process"); + spawnSync("sh", ["-c", "codex mcp remove grove 2>/dev/null || true"], { + stdio: "pipe", + timeout: 5000, + }); + spawnSync( + "sh", + ["-c", `codex mcp add grove ${nexusEnvFlags} -- bun run ${mcpServePath}`], + { stdio: "pipe", timeout: 10000 }, + ); + debugLog("mcpConfig", `codex mcp registered: ${nexusEnvFlags}`); + } catch { + // Non-fatal — codex may not be installed + } + })(); } /** diff --git a/tests/presets/preset-e2e-nexus.test.ts b/tests/presets/preset-e2e-nexus.test.ts index d8a70add..0139c10b 100644 --- a/tests/presets/preset-e2e-nexus.test.ts +++ b/tests/presets/preset-e2e-nexus.test.ts @@ -24,6 +24,7 @@ import { join } from "node:path"; setDefaultTimeout(30_000); const NEXUS_URL = process.env.NEXUS_URL ?? "http://localhost:2026"; +const NEXUS_API_KEY = process.env.NEXUS_API_KEY; const CLI_PATH = join(import.meta.dir, "..", "..", "src", "cli", "main.ts"); // --------------------------------------------------------------------------- @@ -303,43 +304,26 @@ describe("E2E: swarm-ops", () => { // Start grove server const server = await startServer(dir); try { - // API: list contributions + // API: list contributions (swarm-ops ships with no seed data since + // commit 6da5494, so the initial list is empty — just verify the + // endpoint is reachable and returns a valid shape). const resp = await fetch(`http://localhost:${server.port}/api/contributions`); expect(resp.ok).toBe(true); const data = (await resp.json()) as Record; - // Response might be array or {contributions: [...]} const contributions = Array.isArray(data) ? data : ((data.contributions ?? data.items ?? []) as unknown[]); - expect(contributions.length).toBeGreaterThanOrEqual(1); + expect(Array.isArray(contributions)).toBe(true); // API: frontier const frontier = await fetch(`http://localhost:${server.port}/api/frontier`); expect(frontier.ok).toBe(true); - // Contribute with metric score - const contribute = await grove( - dir, - "contribute", - "--kind", - "work", - "--summary", - "Worker: completed task batch 1", - "--mode", - "evaluation", - "--agent-name", - "worker", - ); - expect(contribute.exitCode).toBe(0); - - // Verify contribution appears in API - const resp2 = await fetch(`http://localhost:${server.port}/api/contributions`); - const data2 = (await resp2.json()) as Record; - const contribs2 = Array.isArray(data2) - ? data2 - : ((data2.contributions ?? data2.items ?? []) as { summary: string }[]); - const summaries = contribs2.map((c: { summary: string }) => c.summary); - expect(summaries).toContain("Worker: completed task batch 1"); + // Note: swarm-ops enforces a `has_relation: derives_from` gate, so a + // bare `grove contribute --kind work` call rejects with "no + // 'derives_from' relation present". The end-to-end path with a parent + // contribution is exercised by the review-loop/exploration tests; here + // we just verify the preset's API surface boots cleanly. } finally { server.stop(); } @@ -515,13 +499,25 @@ describe("E2E: federated-swarm", () => { // ============================================================================ describe("E2E: Nexus VFS operations (raw JSON-RPC)", () => { - /** Send a JSON-RPC request to Nexus */ + /** + * Send a JSON-RPC request to Nexus. + * + * Includes an Authorization header when NEXUS_API_KEY is set in env, so the + * test works against both auth-enabled ("database" mode) and auth-disabled + * ("none" mode) Nexus instances without any code branching. + */ async function rpc(method: string, params: Record) { + const headers: Record = { "Content-Type": "application/json" }; + if (NEXUS_API_KEY) headers.Authorization = `Bearer ${NEXUS_API_KEY}`; const resp = await fetch(`${NEXUS_URL}/api/nfs/${method}`, { method: "POST", - headers: { "Content-Type": "application/json" }, + headers, body: JSON.stringify({ jsonrpc: "2.0", method, params, id: 1 }), }); + if (!resp.ok) { + const body = await resp.text(); + throw new Error(`HTTP ${resp.status} from ${method}: ${body.slice(0, 200)}`); + } const json = (await resp.json()) as { result?: unknown; error?: unknown }; if (json.error) throw new Error(JSON.stringify(json.error)); return json.result; From 1d52f8f9922748e3180c8c45a23f140355915c72 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Fri, 10 Apr 2026 20:07:47 -0700 Subject: [PATCH 2/5] fix(tui,mcp): address codex adversarial review findings (8 rounds) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ran 8 rounds of adversarial review against the previous commit's session bootstrap / contract loading / spawn ordering paths. Each round surfaced real correctness gaps; this commit lands the fixes for all of them. Hardening summary by file: src/mcp/serve.ts - Fail closed on unreachable Nexus at startup instead of silently downgrading to local stores (would otherwise split-brain writes with the TUI). - Retry Nexus session lookup with exponential backoff before declaring the record missing, to absorb the TUI → Nexus mirror race. - Accept topology-only session records (legacy / grove_create_session tool) with a loud WARN by DEFAULT; opt into strict `config`-only mode via GROVE_MCP_STRICT_CONTRACT=1. Prevents backcompat breakage while giving ops a knob to tighten enforcement. - Exit the process when a session ID is set but Nexus returns no record at all (vs a topology-only record); the old code path kept writing to orphan VFS paths. src/mcp/serve-http.ts - Lazily resolve session-scoped deps per incoming HTTP request instead of baking sessionId into stores at process boot. serve-http.ts is started before the interactive session exists, so the previous static scoping was always wrong for fresh clients. - Invalidate MCP sessions whose bound grove sessionId no longer matches the current one so a grove session switch doesn't leave long-lived HTTP clients writing under the prior scope. - Reject new mutating MCP initialization in Nexus bootstrap mode (no grove session yet) with 503 SESSION_NOT_READY. Bootstrap-scoped McpServers would capture unscoped stores for their entire lifetime. - Differentiate "file missing" (valid pre-session state) from "file unreadable/corrupt" via SessionStateReadError, and only invalidate live sessions after a SUCCESSFUL read. Atomic writes on the TUI side (tmp + rename) and a read-error-tolerant handler keep concurrent session switches from tearing down in-flight requests. - Update the current-session.json mtime cache only AFTER a successful parse; a torn-write read no longer poisons the cache. - Mirror the strict/weak contract policy from serve.ts. src/tui/spawn-manager.ts - Serialize codex MCP registration through a per-session promise chain on the SpawnManager instance. Parallel role spawns in the same session previously raced remove+add on the single global "grove" entry. - Use stable "grove" MCP name (not per-session) to avoid leaking stale entries and persisted secrets in ~/.codex/config.toml over time. Sweep any legacy `grove-*` entries on first spawn. - Replace execSync shell strings with spawnSync argv arrays in the codex mcp add/remove path so paths with spaces / shell metacharacters can't break the command. - Only cache successful registrations; a failed first attempt clears the cache so the next spawn retries. - Probe `codex --version` once and skip registration entirely when codex is not installed (claude path reads .acpxrc.json directly). - writeMcpConfig swallows codex-registration errors softly so claude roles aren't blocked, but spawn() re-calls ensureCodexMcpRegistered with the real error propagated when the role is specifically codex. src/tui/nexus-provider.ts - Expose `mode = "nexus" as const` as a public discriminator so callers can distinguish Nexus from local providers without peeking at protected fields. - Await the Nexus session mirror in createSession and retry a few times; on final failure, archive the just-created local/server session so repeated retries don't accumulate orphan active records. Throws so the TUI can surface the error instead of returning a broken session id. src/tui/screens/screen-manager.tsx - Refuse the synthetic-UUID fallback on Nexus session-creation failures (uses the new provider.mode discriminator); the stdio MCP server now fails closed when the session record is missing, so a fabricated ID would immediately crash every spawned agent. - Write .grove/current-session.json on BOTH new-session and resume paths so the HTTP MCP server's session scoping stays fresh across restarts and resumes. - Atomic writes via temp-file + rename so concurrent readers in serve-http.ts never observe truncated JSON during a session switch. src/cli/main.tui-dispatch.test.ts - Pre-existing flake: bun cold-start of main.ts exceeds the hardcoded 2s kill deadline on warm machines. Bumped inner timeout to 5s and the outer bun:test timeout to 15s so the race finishes cleanly. Test coverage: `bun test` → 4878 pass, 0 fail. `bun x tsc --noEmit` clean. `bun x biome check` clean on changed files. --- src/cli/main.tui-dispatch.test.ts | 24 +- src/mcp/serve-http.ts | 427 +++++++++++++++++++++++------ src/mcp/serve.ts | 144 +++++++--- src/tui/nexus-provider.ts | 59 +++- src/tui/screens/screen-manager.tsx | 63 ++++- src/tui/spawn-manager.ts | 197 +++++++++++-- 6 files changed, 735 insertions(+), 179 deletions(-) diff --git a/src/cli/main.tui-dispatch.test.ts b/src/cli/main.tui-dispatch.test.ts index 51e4e3fe..0d685161 100644 --- a/src/cli/main.tui-dispatch.test.ts +++ b/src/cli/main.tui-dispatch.test.ts @@ -12,26 +12,32 @@ const CLI_PATH = join(import.meta.dir, "main.ts"); describe("bare grove → TUI dispatch", () => { test("bare grove does not exit with usage error", async () => { - // When invoked without a TTY, handleTuiDirect may block or fail to render. - // The key assertion: it should NOT exit with code 2 ("unknown command"). - // We give it 2s then kill it — if it hasn't exited with code 2 by then, - // it successfully reached the TUI path. + // When invoked without a TTY, handleTuiDirect may block or fail to + // render. The key assertion: it should NOT exit with code 2 + // ("unknown command"). We give it 5s then kill it — if it hasn't + // exited with code 2 by then, it successfully reached the TUI path. const proc = Bun.spawn(["bun", "run", CLI_PATH], { stdout: "pipe", stderr: "pipe", env: { ...process.env, TERM: "dumb" }, }); - // Race: either the process exits on its own, or we kill after 2s - const timeout = new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 2000)); + // Race: either the process exits on its own, or we kill after 5s. + // Bun's cold-start on larger entry points (grove main.ts pulls in a + // lot of TS) routinely takes >2s on a warm machine, so the old 2s + // timeout would fire before the process had a chance to exit-on-its- + // -own, and the subsequent `await proc.exited` would then hit the + // outer bun:test 5s timeout. + const timeout = new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 5000)); const result = await Promise.race([ proc.exited.then((code) => ({ kind: "exited" as const, code })), timeout.then((t) => ({ kind: t })), ]); if (result.kind === "timeout") { - // Process is still running (TUI is blocking) — this means it successfully - // dispatched to handleTuiDirect and didn't exit with "unknown command" + // Process is still running (TUI is blocking) — this means it + // successfully dispatched to handleTuiDirect and didn't exit with + // "unknown command" proc.kill(); await proc.exited; } else { @@ -40,7 +46,7 @@ describe("bare grove → TUI dispatch", () => { expect(result.code).not.toBe(2); expect(stderr).not.toContain("unknown command"); } - }); + }, 15_000); // Outer test timeout must exceed the inner 5s kill deadline. test("grove --help still works", async () => { const proc = Bun.spawn(["bun", "run", CLI_PATH, "--help"], { diff --git a/src/mcp/serve-http.ts b/src/mcp/serve-http.ts index 87b669d5..5698f089 100644 --- a/src/mcp/serve-http.ts +++ b/src/mcp/serve-http.ts @@ -50,21 +50,26 @@ const groveOverride = process.env.GROVE_DIR ?? undefined; const cwd = process.cwd(); const port = parsePort(process.env.PORT, 4015); -let deps: McpDeps; -let closeStores: () => void; +let groveDir!: string; +let runtime!: ReturnType; +let nexusUrl: string | undefined; +let nexusApiKey: string | undefined; +let zoneId = "default"; +let nexusClient: import("../nexus/nexus-http-client.js").NexusHttpClient | undefined; +let closeStores: () => void = () => {}; try { - const groveDir = groveOverride ?? findGroveDir(cwd); - if (groveDir === undefined) { + const resolvedGroveDir = groveOverride ?? findGroveDir(cwd); + if (resolvedGroveDir === undefined) { throw new Error("Not inside a grove. Run 'grove init' to create one, or set GROVE_DIR."); } + groveDir = resolvedGroveDir; // Skip the local contract parse in Nexus mode — the contract lives in the - // Nexus session record and is loaded below. In local mode, createLocalRuntime - // reads GROVE.md + local SQLite session config as usual. - const nexusUrl = process.env.GROVE_NEXUS_URL; + // Nexus session record and is loaded per-request inside resolveDeps below. + nexusUrl = process.env.GROVE_NEXUS_URL; - const runtime = createLocalRuntime({ + runtime = createLocalRuntime({ groveDir, frontierCacheTtlMs: 5_000, workspace: true, @@ -75,106 +80,211 @@ try { throw new Error("Workspace manager failed to initialize"); } - // Try Nexus stores when GROVE_NEXUS_URL is set — mirrors serve.ts setup. - const nexusApiKey = process.env.NEXUS_API_KEY; - const zoneId = process.env.GROVE_ZONE_ID ?? "default"; - // Read session ID from env first (set by parent TUI if available), then fall - // back to a state file that the TUI writes when setSessionScope is called. - // serve-http.ts is typically spawned before the session exists, so the state - // file is the usual source of truth. - const sessionId = (() => { - const fromEnv = process.env.GROVE_SESSION_ID; - if (fromEnv) return fromEnv; - try { - const { existsSync, readFileSync } = require("node:fs") as typeof import("node:fs"); - const sessionFile = `${groveDir}/current-session.json`; - if (existsSync(sessionFile)) { - const raw = JSON.parse(readFileSync(sessionFile, "utf-8")) as { sessionId?: string }; - return raw.sessionId; - } - } catch { - /* ignore — fall through to undefined */ - } - return undefined; - })(); - - let contributionStore = runtime.contributionStore as import("../core/store.js").ContributionStore; - let claimStore = runtime.claimStore as import("../core/store.js").ClaimStore; - let bountyStore = runtime.bountyStore as import("../core/bounty-store.js").BountyStore; - let cas = runtime.cas as import("../core/cas.js").ContentStore; - let nexusClient: import("../nexus/nexus-http-client.js").NexusHttpClient | undefined; - let nexusHandoffStore: import("../nexus/nexus-handoff-store.js").NexusHandoffStore | undefined; - let topologyRouter: TopologyRouter | undefined; + nexusApiKey = process.env.NEXUS_API_KEY; + zoneId = process.env.GROVE_ZONE_ID ?? "default"; if (nexusUrl) { try { const { NexusHttpClient } = await import("../nexus/nexus-http-client.js"); - const { NexusContributionStore } = await import("../nexus/nexus-contribution-store.js"); - const { NexusClaimStore } = await import("../nexus/nexus-claim-store.js"); - const { NexusBountyStore } = await import("../nexus/nexus-bounty-store.js"); - const { NexusCas } = await import("../nexus/nexus-cas.js"); - nexusClient = new NexusHttpClient({ url: nexusUrl, ...(nexusApiKey ? { apiKey: nexusApiKey } : {}), }); - const health = await Promise.race([ fetch(`${nexusUrl}/health`, { signal: AbortSignal.timeout(3000) }).then((r) => r.ok), new Promise((r) => setTimeout(() => r(false), 3000)), ]).catch(() => false); - - if (health) { - contributionStore = new NexusContributionStore({ - client: nexusClient, - zoneId, - sessionId, - }); - claimStore = new NexusClaimStore({ client: nexusClient, zoneId }); - bountyStore = new NexusBountyStore({ client: nexusClient, zoneId }); - cas = new NexusCas({ client: nexusClient, zoneId }); - const { NexusHandoffStore } = await import("../nexus/nexus-handoff-store.js"); - nexusHandoffStore = new NexusHandoffStore(nexusClient, sessionId, zoneId); - process.stderr.write(`grove-mcp-http: using Nexus stores at ${nexusUrl}\n`); - } else { - process.stderr.write(`grove-mcp-http: Nexus unreachable, using local stores\n`); - nexusClient = undefined; + if (!health) { + // Match serve.ts: fail closed when the operator explicitly pointed us + // at a Nexus that isn't available. Falling back to local SQLite here + // would split-brain writes between Nexus (used by the TUI and stdio + // MCP agents) and this HTTP MCP process, silently bypassing session + // scoping and handoff routing. + process.stderr.write( + `grove-mcp-http: FATAL: GROVE_NEXUS_URL=${nexusUrl} is set but health check failed. ` + + `Refusing to fall back to local stores. Verify Nexus is reachable and retry.\n`, + ); + process.exit(1); } + process.stderr.write(`grove-mcp-http: Nexus client ready at ${nexusUrl}\n`); } catch (err) { - process.stderr.write(`grove-mcp-http: Nexus setup failed, using local stores: ${err}\n`); + // Configured Nexus that throws during setup is a hard failure. Exit so + // the parent can surface the real error instead of silently downgrading. + process.stderr.write( + `grove-mcp-http: FATAL: Nexus setup failed for ${nexusUrl}: ${err instanceof Error ? err.message : String(err)}\n`, + ); + process.exit(1); } } - // Load contract: in Nexus mode, from the Nexus session record. In local mode, - // createLocalRuntime already populated runtime.contract from GROVE.md + local DB. + closeStores = () => runtime.close(); +} catch (error) { + const message = error instanceof Error ? error.message : String(error); + process.stderr.write(`grove-mcp-http: ${message}\n`); + process.exit(1); +} + +// --- Dynamic session-scoped deps -------------------------------------------- +// +// The HTTP MCP server is spawned before any interactive session exists, so we +// cannot bake a session ID into the stores at startup. We resolve the current +// session ID lazily on each incoming request by re-reading +// `${groveDir}/current-session.json` (written by the TUI on setSessionScope). +// Session-scoped stores (NexusContributionStore, NexusHandoffStore, +// EnforcingContributionStore, TopologyRouter) are built the first time we see +// a given session ID and cached. When the state file advances to a new +// session ID the old cache entry is discarded so a resumed or restarted TUI +// never inherits stale routing state. + +interface ScopedDeps { + readonly deps: McpDeps; + readonly sessionId: string | undefined; +} + +/** + * Error raised when the current-session state file exists but cannot be + * parsed or read. Distinct from "file absent" (a valid pre-session state) + * so the caller can fail closed on corrupted state instead of silently + * falling through to unscoped stores. + */ +class SessionStateReadError extends Error { + constructor(message: string) { + super(message); + this.name = "SessionStateReadError"; + } +} + +const depsCache = new Map(); +let lastSessionFileMtimeMs = -1; +let lastSessionFileId: string | undefined; + +/** + * Read the current grove session ID from env or state file. + * + * Returns `undefined` ONLY when no session has been created yet (env var is + * unset AND the state file does not exist). Throws `SessionStateReadError` + * when the state file is present but unreadable or unparseable — callers + * must decide whether to fail closed (Nexus mode) or tolerate the error. + * + * Caching semantics: we only update the cached mtime/sessionId after a + * SUCCESSFUL parse. A parse failure leaves the cache untouched so that + * every subsequent call re-runs the read against the current mtime — the + * caller keeps seeing errors until the writer either fixes the file or + * produces a new mtime with valid JSON. + */ +function readCurrentSessionId(): string | undefined { + const fromEnv = process.env.GROVE_SESSION_ID; + if (fromEnv) return fromEnv; + const { existsSync, readFileSync, statSync } = require("node:fs") as typeof import("node:fs"); + const sessionFile = `${groveDir}/current-session.json`; + if (!existsSync(sessionFile)) return undefined; + let stat: ReturnType; + try { + stat = statSync(sessionFile); + } catch (err) { + throw new SessionStateReadError( + `stat ${sessionFile} failed: ${err instanceof Error ? err.message : String(err)}`, + ); + } + if (stat.mtimeMs === lastSessionFileMtimeMs && lastSessionFileMtimeMs > 0) { + return lastSessionFileId; + } + let raw: { sessionId?: string }; + try { + raw = JSON.parse(readFileSync(sessionFile, "utf-8")) as { sessionId?: string }; + } catch (err) { + // Do NOT update the cache — leave lastSessionFileMtimeMs/lastSessionFileId + // untouched so the next call re-reads. Otherwise a single torn-write + // during a concurrent session switch would poison the cache and every + // subsequent request would read the stale (or undefined) session id + // without surfacing the error. + throw new SessionStateReadError( + `read/parse ${sessionFile} failed: ${err instanceof Error ? err.message : String(err)}`, + ); + } + // Success — commit to cache. + lastSessionFileMtimeMs = stat.mtimeMs; + lastSessionFileId = raw.sessionId; + return raw.sessionId; +} + +async function buildScopedDeps(sessionId: string | undefined): Promise { + let contributionStore = runtime.contributionStore as import("../core/store.js").ContributionStore; + let claimStore = runtime.claimStore as import("../core/store.js").ClaimStore; + let bountyStore = runtime.bountyStore as import("../core/bounty-store.js").BountyStore; + let cas = runtime.cas as import("../core/cas.js").ContentStore; + let nexusHandoffStore: import("../nexus/nexus-handoff-store.js").NexusHandoffStore | undefined; + let topologyRouter: TopologyRouter | undefined; let loadedContract: import("../core/contract.js").GroveContract | undefined = runtime.contract; - if (nexusClient && !loadedContract && sessionId) { - try { + + if (nexusClient) { + const { NexusContributionStore } = await import("../nexus/nexus-contribution-store.js"); + const { NexusClaimStore } = await import("../nexus/nexus-claim-store.js"); + const { NexusBountyStore } = await import("../nexus/nexus-bounty-store.js"); + const { NexusCas } = await import("../nexus/nexus-cas.js"); + const { NexusHandoffStore } = await import("../nexus/nexus-handoff-store.js"); + + contributionStore = new NexusContributionStore({ client: nexusClient, zoneId, sessionId }); + claimStore = new NexusClaimStore({ client: nexusClient, zoneId }); + bountyStore = new NexusBountyStore({ client: nexusClient, zoneId }); + cas = new NexusCas({ client: nexusClient, zoneId }); + nexusHandoffStore = new NexusHandoffStore(nexusClient, sessionId, zoneId); + + if (sessionId && !loadedContract) { const { NexusSessionStore } = await import("../nexus/nexus-session-store.js"); const nexusSessionStore = new NexusSessionStore(nexusClient, zoneId); - const session = await nexusSessionStore.getSession(sessionId); - if (session?.config) { - loadedContract = session.config; + // Retry briefly in case the TUI session mirror is still in flight. + const retryDelaysMs = [0, 100, 250, 500, 1000]; + let sessionRecord: import("../core/session.js").Session | undefined; + for (const delay of retryDelaysMs) { + if (delay > 0) await new Promise((r) => setTimeout(r, delay)); + sessionRecord = await nexusSessionStore.getSession(sessionId).catch(() => undefined); + if (sessionRecord?.config) break; + } + // Policy matches serve.ts: default to weak (compatible) fallback, + // opt into strict via GROVE_MCP_STRICT_CONTRACT=1. See serve.ts for + // rationale — legacy sessions created without a frozen contract + // must still work, while operators can tighten the policy once + // every session-creation path is emitting config. + const strictContract = process.env.GROVE_MCP_STRICT_CONTRACT === "1"; + + if (sessionRecord?.config) { + loadedContract = sessionRecord.config; process.stderr.write( `grove-mcp-http: loaded full contract from Nexus session ${sessionId}\n`, ); - } else if (session?.topology) { - // Backwards-compat: older session records have only topology. + } else if (sessionRecord?.topology && !strictContract) { loadedContract = { contractVersion: 1, - name: session.presetName ?? "nexus-session", + name: sessionRecord.presetName ?? "nexus-session", mode: "exploration", - topology: session.topology, + topology: sessionRecord.topology, }; process.stderr.write( - `grove-mcp-http: Nexus session ${sessionId} missing config; ` + - `reconstructed minimal contract from topology (rate limits NOT enforced)\n`, + `grove-mcp-http: WARN: Nexus session ${sessionId} has no frozen config — ` + + `using topology-only contract (enforcement NOT applied). Set ` + + `GROVE_MCP_STRICT_CONTRACT=1 to fail closed here.\n`, + ); + } else if (sessionRecord?.topology) { + throw new Error( + `grove-mcp-http: GROVE_MCP_STRICT_CONTRACT=1 and Nexus session ${sessionId} ` + + `has no frozen config. Recreate the session with the current TUI.`, + ); + } else { + // Session ID set but no record found in Nexus. Fail closed — the + // Nexus store adapters would happily create VFS paths under the + // bogus ID (see vfs-paths.ts contributionPath/sessionPath) and + // orphan writes under /zones/.../sessions//... without + // matching session metadata. That's strictly worse than refusing + // to handle the request, which the HTTP layer converts into a + // SESSION_NOT_READY 503. + throw new Error( + `grove-mcp-http: cannot scope to session ${sessionId} — no record in Nexus. ` + + `Ensure the TUI session mirror has completed (it is awaited by ` + + `nexus-provider.createSession) or remove the stale session id from ` + + `${groveDir}/current-session.json.`, ); } - } catch (err) { - process.stderr.write( - `grove-mcp-http: failed to load Nexus session config: ${err instanceof Error ? err.message : String(err)}\n`, - ); } } @@ -187,27 +297,70 @@ try { const { NexusEventBus } = await import("../nexus/nexus-event-bus.js"); const eventBus = new NexusEventBus(nexusClient, zoneId); topologyRouter = new TopologyRouter(loadedContract.topology, eventBus); - process.stderr.write(`grove-mcp-http: IPC via Nexus EventBus at ${nexusUrl}\n`); } - deps = { + const deps: McpDeps = { contributionStore, claimStore, bountyStore, cas, frontier: runtime.frontier, - workspace: runtime.workspace, + workspace: runtime.workspace!, contract: loadedContract, onContributionWrite: runtime.onContributionWrite, workspaceBoundary: runtime.groveRoot, ...(nexusHandoffStore ? { handoffStore: nexusHandoffStore } : {}), ...(topologyRouter ? { topologyRouter } : {}), }; - closeStores = () => runtime.close(); -} catch (error) { - const message = error instanceof Error ? error.message : String(error); - process.stderr.write(`grove-mcp-http: ${message}\n`); - process.exit(1); + return { deps, sessionId }; +} + +async function resolveDeps(): Promise { + let sessionId: string | undefined; + try { + sessionId = readCurrentSessionId(); + } catch (err) { + // The state file EXISTS but is corrupted. Refuse to continue in Nexus + // mode — silently building unscoped stores would accept writes that + // bypass contract enforcement and leak across sessions. In local mode + // we still fail hard because a malformed state file is a bug, not a + // normal bootstrap state. + throw new Error( + `grove-mcp-http: refusing to scope session — ${err instanceof Error ? err.message : String(err)}. ` + + `Fix or remove ${groveDir}/current-session.json and retry.`, + ); + } + + // When no session exists yet (sessionId === undefined), fall into a + // degraded "bootstrap" mode that serves read-only tools against the + // zone-global path. The HTTP MCP server is started by `grove up` before + // any interactive session is created, and external MCP clients + // (Cursor, Claude Desktop) may connect for tool introspection during + // that window. Returning 503 for the entire server would break that + // use case. + // + // Mutating grove_submit_work / grove_cas_put calls will still fail at + // the store layer in Nexus mode (no session id → wrong path), which is + // the correct failure mode for "did not select a session yet". The + // coder→reviewer loop is unaffected because the TUI spawns agents via + // stdio MCP (serve.ts) which has GROVE_SESSION_ID in its env, not via + // this HTTP endpoint. + if (!sessionId) { + process.stderr.write( + `grove-mcp-http: no grove session selected — serving in bootstrap mode ` + + `(reads work against the zone-global path, writes will be session-less). ` + + `Start a session via the TUI to enable full routing.\n`, + ); + } + + const key = sessionId ?? "__bootstrap__"; + const cached = depsCache.get(key); + if (cached) return cached; + // A new session id invalidates the entire cache so we never mix scopes. + depsCache.clear(); + const scoped = await buildScopedDeps(sessionId); + depsCache.set(key, scoped); + return scoped; } // --- Session management ----------------------------------------------------- @@ -231,11 +384,31 @@ interface ManagedSession { server: McpServer; transport: StreamableHTTPServerTransport; lastActivity: number; + /** The grove session ID these stores are bound to (undefined = bootstrap). */ + groveSessionId: string | undefined; } -/** Map of session ID → managed session for active sessions. */ +/** Map of MCP-session ID → managed session for active sessions. */ const sessions = new Map(); +/** + * Invalidate all MCP sessions bound to a grove session other than `current`. + * Called on every POST before routing to catch a grove session switch. The + * McpServer captures its deps at construction, so when the grove session + * changes we cannot simply swap stores — we must close the old MCP session + * and force the client to re-initialize against fresh scoped deps. + */ +function invalidateStaleSessions(current: string | undefined): void { + for (const [id, session] of sessions) { + if (session.groveSessionId !== current) { + void safeCleanup(session.server.close(), "invalidate stale-scope MCP session", { + silent: true, + }); + sessions.delete(id); + } + } +} + /** Periodically close sessions that have been idle longer than SESSION_TTL_MS. */ const reapTimer = setInterval(() => { const now = Date.now(); @@ -294,7 +467,31 @@ async function handleRequest(req: IncomingMessage, res: ServerResponse): Promise return; } - // If we have a session ID, route to existing session + // Peek the current grove session ID up front. If it has changed since + // an existing MCP session was created, invalidate those stale sessions + // so they can't keep writing to the prior grove session's VFS paths. + // + // Only invalidate when we successfully READ the session file. A read + // error here (truncated JSON during a concurrent session-switch write, + // temporary file-system hiccup) must NOT tear down in-flight sessions: + // the TUI writes this file atomically via tmp+rename, so any transient + // read failure is a bug on the writer side or a disk blip, and killing + // live connections would be strictly worse than leaving them alone for + // this one request. The corrupted-state path is caught below in + // resolveDeps() where it's turned into a 503 for this specific request. + let sessionReadFailed = false; + let currentGroveSessionId: string | undefined; + try { + currentGroveSessionId = readCurrentSessionId(); + } catch { + sessionReadFailed = true; + } + if (!sessionReadFailed) { + invalidateStaleSessions(currentGroveSessionId); + } + + // If we have a session ID and it's still bound to the current grove + // session, route to the existing MCP session. const existingSession = sessionId ? sessions.get(sessionId) : undefined; if (existingSession) { existingSession.lastActivity = Date.now(); @@ -302,14 +499,62 @@ async function handleRequest(req: IncomingMessage, res: ServerResponse): Promise return; } - // New session — create server + transport + // New MCP session — resolve session-scoped deps NOW (not at boot) so the + // server binds to whatever grove session is current. + let scoped: ScopedDeps; + try { + scoped = await resolveDeps(); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + process.stderr.write(`${msg}\n`); + res.writeHead(503, { "Content-Type": "application/json" }); + res.end( + JSON.stringify({ + error: { + code: "SESSION_NOT_READY", + message: msg, + }, + }), + ); + return; + } + + // Reject new MCP session initialization in bootstrap mode under Nexus: + // a bootstrap-scoped McpServer would capture unscoped stores for its + // entire lifetime and silently write to the zone-global path even after + // a grove session appears. Clients must retry after `grove up` selects + // a session. Local mode can initialize freely since there is no Nexus + // scoping at all. + if (nexusClient && scoped.sessionId === undefined) { + res.writeHead(503, { "Content-Type": "application/json" }); + res.end( + JSON.stringify({ + error: { + code: "SESSION_NOT_READY", + message: + "grove-mcp-http: no grove session selected — initialize the HTTP MCP " + + "after starting a session via the TUI. Mutations in bootstrap mode " + + "would land outside session scope and are refused.", + }, + }), + ); + return; + } + + const scopedDeps = scoped.deps; + const boundGroveSessionId = scoped.sessionId; const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => crypto.randomUUID(), onsessioninitialized: (id) => { - sessions.set(id, { server, transport, lastActivity: Date.now() }); + sessions.set(id, { + server, + transport, + lastActivity: Date.now(), + groveSessionId: boundGroveSessionId, + }); }, }); - const server = await createMcpServer(deps); + const server = await createMcpServer(scopedDeps); transport.onclose = () => { const sid = transport.sessionId; diff --git a/src/mcp/serve.ts b/src/mcp/serve.ts index 7ec5e6d0..53a1ab7f 100644 --- a/src/mcp/serve.ts +++ b/src/mcp/serve.ts @@ -120,56 +120,124 @@ try { ); process.stderr.write(`grove-mcp: using Nexus stores at ${nexusUrl}\n`); } else { - process.stderr.write(`grove-mcp: Nexus unreachable, using local stores\n`); - nexusClient = undefined; + // Fail closed: GROVE_NEXUS_URL was explicitly set, so the caller wants + // Nexus semantics (session scoping, handoff routing, cross-process + // IPC). Falling back to local SQLite here would silently split writes + // between Nexus and the session-scoped path the TUI subscribes to, + // leaving the coder→reviewer loop deadlocked without any error. We + // cannot know if this is a transient health blip or a real outage, + // so we crash and let the parent process retry / surface the error. + process.stderr.write( + `grove-mcp: FATAL: GROVE_NEXUS_URL=${nexusUrl} is set but health check failed. ` + + `Refusing to fall back to local stores — that would silently bypass Nexus ` + + `routing and handoff creation. Verify Nexus is reachable and retry.\n`, + ); + process.exit(1); } } catch (err) { - process.stderr.write(`grove-mcp: Nexus failed, using local: ${err}\n`); + // Same reasoning: a configured Nexus that throws during setup is a hard + // failure, not a soft downgrade. Exit so the parent can retry with the + // full error visible instead of corrupting subsequent writes. + process.stderr.write( + `grove-mcp: FATAL: Nexus setup failed for ${nexusUrl}: ${err instanceof Error ? err.message : String(err)}\n`, + ); + process.exit(1); } } else { process.stderr.write(`grove-mcp: using local stores at ${groveDir}\n`); } - // In Nexus mode we skipped the local contract parse; load it from the Nexus - // session record instead. The TUI mirrors the session to Nexus via - // NexusSessionStore.putSession with the frozen contract snapshot, so we - // pull the authoritative contract from there. Without this, topologyRouter - // is never created and coder→reviewer handoffs are never generated. + // In Nexus mode we skipped the local contract parse; load the authoritative + // contract from the Nexus session record instead. The TUI mirrors every + // session to Nexus via NexusSessionStore.putSession with the frozen + // GroveContract snapshot, so we pull it from there. + // + // This path fails CLOSED: if GROVE_SESSION_ID is set but no full contract + // is available (session missing, record has only topology, Nexus is + // unreachable, or the write race hasn't settled), we exit the process. + // Without a full contract, EnforcingContributionStore is never wired, so + // agent writes would bypass rate limits, cost caps, gates, and per-kind + // policies — a silent fail-open here corrupts every contribution until + // the next restart. Callers must ensure the Nexus session mirror has + // completed before spawning agents (nexus-provider awaits putSession). + // + // Note: the stdio serve.ts process is spawned per agent by acpx, after + // setSessionScope has mirrored the session. If the mirror is still in + // flight we retry with exponential backoff before failing. let loadedContract: import("../core/contract.js").GroveContract | undefined = runtime.contract; if (nexusClient && !loadedContract && process.env.GROVE_SESSION_ID) { - try { - const { NexusSessionStore } = await import("../nexus/nexus-session-store.js"); - const nexusSessionStore = new NexusSessionStore(nexusClient, zoneId); - const session = await nexusSessionStore.getSession(process.env.GROVE_SESSION_ID); - if (session?.config) { - loadedContract = session.config; - process.stderr.write( - `grove-mcp: loaded full contract from Nexus session ${process.env.GROVE_SESSION_ID}\n`, - ); - } else if (session?.topology) { - // Backwards-compat fallback: older session records have only topology, - // no frozen config. Reconstruct a minimal contract so topology routing - // still works. This path should disappear once all in-flight sessions - // are recreated with the updated server. - loadedContract = { - contractVersion: 1, - name: session.presetName ?? "nexus-session", - mode: "exploration", - topology: session.topology, - }; - process.stderr.write( - `grove-mcp: Nexus session ${process.env.GROVE_SESSION_ID} missing config; ` + - `reconstructed minimal contract from topology (rate limits NOT enforced)\n`, - ); - } else { - process.stderr.write( - `grove-mcp: Nexus session ${process.env.GROVE_SESSION_ID} has no topology; topologyRouter disabled\n`, - ); + const sessionId = process.env.GROVE_SESSION_ID; + const { NexusSessionStore } = await import("../nexus/nexus-session-store.js"); + const nexusSessionStore = new NexusSessionStore(nexusClient, zoneId); + + const retryDelaysMs = [0, 100, 250, 500, 1000, 2000]; + let lastErr: unknown; + let sessionRecord: import("../core/session.js").Session | undefined; + for (const delay of retryDelaysMs) { + if (delay > 0) await new Promise((r) => setTimeout(r, delay)); + try { + sessionRecord = await nexusSessionStore.getSession(sessionId); + if (sessionRecord?.config) break; + } catch (err) { + lastErr = err; } - } catch (err) { + } + + // Policy: by default we PREFER a full frozen contract but fall back to + // topology-only with a loud warning when the session record was + // created without one. This preserves backwards compatibility with + // legacy sessions and session-creation paths (e.g. the + // `grove_create_session` MCP tool) that don't ship a contract. + // + // Operators who need strict enforcement can set + // `GROVE_MCP_STRICT_CONTRACT=1` to invert the default and fail closed + // on any session without a frozen contract — recommended once every + // session-creation path in the deployment is persisting config. + const strictContract = process.env.GROVE_MCP_STRICT_CONTRACT === "1"; + + if (sessionRecord?.config) { + // Happy path: full frozen contract mirrored from the TUI's createSession. + loadedContract = sessionRecord.config; + process.stderr.write(`grove-mcp: loaded full contract from Nexus session ${sessionId}\n`); + } else if (sessionRecord?.topology && !strictContract) { + // Default degraded path: the session record exists but lacks a frozen + // config. Reconstruct a minimal contract so topology routing and + // per-kind tool presets still work. Rate limits / enforcement / gates + // are NOT honored; log a loud WARN so operators see the gap. + loadedContract = { + contractVersion: 1, + name: sessionRecord.presetName ?? "nexus-session", + mode: "exploration", + topology: sessionRecord.topology, + }; + process.stderr.write( + `grove-mcp: WARN: Nexus session ${sessionId} has no frozen config — ` + + `using topology-only contract (rate limits / gates / cost caps NOT ` + + `applied). Recreate the session with the current TUI to get full ` + + `enforcement, or set GROVE_MCP_STRICT_CONTRACT=1 to fail closed here.\n`, + ); + } else if (sessionRecord?.topology) { + // Strict mode explicitly enabled: refuse to proceed without a full + // contract even though the session record exists. + process.stderr.write( + `grove-mcp: FATAL: GROVE_MCP_STRICT_CONTRACT=1 is set and Nexus session ` + + `${sessionId} has no frozen config. Recreate the session with the ` + + `current TUI.\n`, + ); + process.exit(1); + } else { + // No session record at all in Nexus — the agent cannot do useful work. + // This is distinct from "session exists without config"; here we exit + // because there is no topology to route with either, so spawning an + // agent would produce zero progress. + const reason = lastErr + ? `Nexus lookup failed: ${lastErr instanceof Error ? lastErr.message : String(lastErr)}` + : "session not found in Nexus"; process.stderr.write( - `grove-mcp: failed to load Nexus session config: ${err instanceof Error ? err.message : String(err)}\n`, + `grove-mcp: FATAL: cannot find Nexus session ${sessionId} — ${reason}. ` + + `Ensure the TUI session mirror has completed before spawning agents.\n`, ); + process.exit(1); } } diff --git a/src/tui/nexus-provider.ts b/src/tui/nexus-provider.ts index 74fa428a..e468f24c 100644 --- a/src/tui/nexus-provider.ts +++ b/src/tui/nexus-provider.ts @@ -71,7 +71,13 @@ export class NexusDataProvider { readonly capabilities: ProviderCapabilities; - protected readonly mode = "nexus"; + /** + * Public discriminator — `"nexus"` here, `"local"` on the sqlite provider. + * Used by the TUI to decide whether fatal session-create failures can + * safely fall back to a synthetic UUID (local mode: yes, Nexus mode: no, + * because the authoritative Nexus store has never seen the fake id). + */ + readonly mode = "nexus" as const; private readonly client: NexusClient; private readonly zoneId: string; @@ -305,11 +311,52 @@ export class NexusDataProvider } else { result = await super.createSession(input); } - // Mirror to Nexus VFS for cross-session visibility — write the authoritative - // record directly to preserve the real session ID (createSession would generate a new one) - void this.nexusSessionStore.putSession(result).catch(() => { - /* best-effort */ - }); + // Mirror to Nexus VFS BEFORE returning. The stdio MCP server is spawned + // per agent shortly after this returns (TUI → setSessionScope → spawn), + // and it loads its GroveContract from this exact record. A fire-and-forget + // write used to race the spawn and leave the MCP server with "session not + // found", which (since the serve.ts rewrite) now fails closed and kills + // the spawn. Awaiting serializes the mirror before the agent starts. + // + // Retry a few times to absorb transient Nexus blips before giving up. + // On final failure we ARCHIVE the local session so each retry doesn't + // create a new orphan `active` record without a Nexus counterpart. + // Without this cleanup, users pressing "retry" would accumulate + // abandoned local sessions that can later show up in session lists + // and fail closed when agents are spawned against them. + const retryDelaysMs = [0, 200, 500, 1000]; + let lastErr: unknown; + for (const delay of retryDelaysMs) { + if (delay > 0) await new Promise((r) => setTimeout(r, delay)); + try { + await this.nexusSessionStore.putSession(result); + lastErr = undefined; + break; + } catch (err) { + lastErr = err; + } + } + if (lastErr) { + const msg = lastErr instanceof Error ? lastErr.message : String(lastErr); + process.stderr.write( + `[nexus-provider] FATAL: session mirror to Nexus failed after retries: ${msg}\n`, + ); + // Compensate: archive the just-created local/server session so retries + // don't leave orphan active records. Best-effort — we still rethrow + // either way so the caller sees the original mirror error. + try { + await this.archiveSession(result.id); + } catch (archiveErr) { + process.stderr.write( + `[nexus-provider] WARN: failed to archive orphan session ${result.id}: ` + + `${archiveErr instanceof Error ? archiveErr.message : String(archiveErr)}\n`, + ); + } + throw new Error( + `Failed to mirror session ${result.id} to Nexus: ${msg}. ` + + `The local session has been archived; please retry.`, + ); + } return result; } diff --git a/src/tui/screens/screen-manager.tsx b/src/tui/screens/screen-manager.tsx index 4721efa0..a942aae4 100644 --- a/src/tui/screens/screen-manager.tsx +++ b/src/tui/screens/screen-manager.tsx @@ -145,6 +145,26 @@ export const ScreenManager: React.NamedExoticComponent = Rea if (id && "setSessionScope" in provider) { (provider as { setSessionScope: (id: string) => void }).setSessionScope(id); process.stderr.write(`[screen-manager] resume setSessionScope(${id})\n`); + // Persist the resumed session id to current-session.json so the HTTP + // MCP server (serve-http.ts) re-reads and scopes subsequent + // requests to this session. Without this, resume would leave the + // HTTP server pinned to bootstrap mode or the prior session's id, + // and any HTTP MCP clients would see stale scope. Matches the write + // on new-session creation in handleLaunchConfirm. + if (appProps.groveDir) { + void (async () => { + try { + const { writeFileSync, renameSync } = await import("node:fs"); + const { join } = await import("node:path"); + const finalPath = join(appProps.groveDir!, "current-session.json"); + const tmpPath = `${finalPath}.${process.pid}.${Date.now()}.tmp`; + writeFileSync(tmpPath, JSON.stringify({ sessionId: id }, null, 2), "utf-8"); + renameSync(tmpPath, finalPath); + } catch { + /* best-effort */ + } + })(); + } } }, []); @@ -391,17 +411,21 @@ export const ScreenManager: React.NamedExoticComponent = Rea process.stderr.write(`[spawnAgents] provider does NOT have setSessionScope\n`); } // Write the session ID to .grove/current-session.json so the HTTP - // MCP server (spawned before the session exists) can pick it up at - // startup. Best-effort; the stdio MCP path doesn't need this. + // MCP server (spawned before the session exists) can pick it up. + // + // Written atomically via temp-file + rename so concurrent readers + // in serve-http.ts never observe a truncated/partial JSON during + // a session switch. Without this, a POST hitting /mcp at the + // same moment as the write could tear down every live HTTP MCP + // session via invalidateStaleSessions. if (appProps.groveDir) { try { - const { writeFileSync } = await import("node:fs"); + const { writeFileSync, renameSync } = await import("node:fs"); const { join } = await import("node:path"); - writeFileSync( - join(appProps.groveDir, "current-session.json"), - JSON.stringify({ sessionId: session.id }, null, 2), - "utf-8", - ); + const finalPath = join(appProps.groveDir, "current-session.json"); + const tmpPath = `${finalPath}.${process.pid}.${Date.now()}.tmp`; + writeFileSync(tmpPath, JSON.stringify({ sessionId: session.id }, null, 2), "utf-8"); + renameSync(tmpPath, finalPath); } catch { /* best-effort */ } @@ -410,9 +434,26 @@ export const ScreenManager: React.NamedExoticComponent = Rea } catch (err) { const msg = err instanceof Error ? err.message : String(err); process.stderr.write(`[grove] session record failed to save: ${msg}\n`); - // Generate a local session ID even when the session store is unavailable. - // Without this, MCP servers have no GROVE_SESSION_ID → contributions go to - // the global zone → N+1 VFS reads on 47+ old contributions → rate limit. + // In Nexus mode, refuse to fabricate a synthetic UUID when the + // session record (local create OR Nexus mirror) failed. Doing + // so would set GROVE_SESSION_ID to an id the authoritative + // Nexus store has never seen, and the stdio MCP server now + // fails closed when it can't load that record, so every spawned + // agent would immediately crash against an orphaned id. Keep + // the synthetic-id path for local-only sessions where there is + // no Nexus record to be missing. + // NexusDataProvider exposes `mode = "nexus"`. Using a runtime + // duck-type check keeps this file decoupled from the provider + // import graph (which would create a TUI/Nexus coupling cycle). + const isNexus = (provider as unknown as { mode?: string }).mode === "nexus"; + if (isNexus) { + setState((s) => ({ + ...s, + error: `Failed to create Nexus session: ${msg}. Retry or fall back to local mode.`, + screen: "preset-select", + })); + return; + } const fallbackId = crypto.randomUUID(); spawnManager.setSessionId(fallbackId); setState((s) => ({ ...s, sessionId: fallbackId })); diff --git a/src/tui/spawn-manager.ts b/src/tui/spawn-manager.ts index 7f6156c2..24a86cb3 100644 --- a/src/tui/spawn-manager.ts +++ b/src/tui/spawn-manager.ts @@ -7,7 +7,7 @@ * On tmux failure: roll back claim + workspace. */ -import { execSync } from "node:child_process"; +import { execSync, spawnSync } from "node:child_process"; import { existsSync } from "node:fs"; import { mkdir, writeFile } from "node:fs/promises"; import { join, resolve } from "node:path"; @@ -271,6 +271,34 @@ export class SpawnManager { agentCommand = `${agentCommand} "${initialPrompt.replace(/"/g, '\\"')}"`; } + // For codex roles, require successful codex MCP registration before + // spawning. writeMcpConfig above runs ensureCodexMcpRegistered in a + // soft-catch so claude roles can proceed when codex isn't installed + // or registration fails, but a codex role without grove_* tools is + // useless — it would start successfully yet be unable to contribute. + // Re-invoke here to await the serialized promise and re-throw on + // failure, giving the caller a clear error instead of a dead spawn. + if (baseCmd === "codex") { + const codexMcpEnv: Record = { + GROVE_DIR: this.groveDir ?? process.cwd(), + }; + if (process.env.GROVE_NEXUS_URL) codexMcpEnv.GROVE_NEXUS_URL = process.env.GROVE_NEXUS_URL; + if (process.env.NEXUS_API_KEY) codexMcpEnv.NEXUS_API_KEY = process.env.NEXUS_API_KEY; + if (this.sessionId) codexMcpEnv.GROVE_SESSION_ID = this.sessionId; + // Derive mcpServePath the same way writeMcpConfig does. + const { dirname: d } = await import("node:path"); + const entry = process.argv[1] ?? ""; + const serveRoot = d(d(d(entry))); + const servePath = existsSync(join(serveRoot, "dist", "mcp", "serve.js")) + ? join(serveRoot, "dist", "mcp", "serve.js") + : join(serveRoot, "src", "mcp", "serve.ts"); + await this.ensureCodexMcpRegistered( + codexMcpEnv, + servePath, + codexMcpEnv.GROVE_DIR ?? process.cwd(), + ); // throws on real registration failure (not "codex not installed") + } + if (this.agentRuntime) { // Use AgentRuntime interface — works with acpx, subprocess, or any runtime // Determine if this role should wait for IPC push instead of starting immediately. @@ -1260,39 +1288,160 @@ export class SpawnManager { debugLog("mcpConfig", `wrote .acpxrc.json for acpx mcpServers forwarding`); // Register MCP with codex globally (codex uses ~/.codex/config.toml, not .mcp.json). - // Use a single stable name "grove" so we don't accumulate stale per-spawn entries. - // Pass Nexus env vars so codex agents write contributions to Nexus (cross-process IPC). // - // Fired asynchronously (not awaited) because each codex invocation takes - // ~1.5s and would add several seconds to every spawn. The claude agent - // runtime doesn't depend on it — it reads .acpxrc.json directly — and - // codex reads its own config lazily when the first tool call happens, so - // landing the registration a few hundred ms late is safe. - const nexusEnvFlags = [ - `--env GROVE_DIR=${groveDir}`, - ...(mcpEnv.GROVE_NEXUS_URL ? [`--env GROVE_NEXUS_URL=${mcpEnv.GROVE_NEXUS_URL}`] : []), - ...(mcpEnv.NEXUS_API_KEY ? [`--env NEXUS_API_KEY=${mcpEnv.NEXUS_API_KEY}`] : []), - ...(mcpEnv.GROVE_SESSION_ID ? [`--env GROVE_SESSION_ID=${mcpEnv.GROVE_SESSION_ID}`] : []), - ].join(" "); - void (async () => { + // Codex stores MCP servers in a single global config keyed by name. + // Parallel role spawns in the same session all share identical env + // (GROVE_DIR / GROVE_NEXUS_URL / GROVE_SESSION_ID come from the + // SpawnManager, not the individual role), so we only need ONE entry + // per session. `ensureCodexMcpRegistered` serializes concurrent calls + // through a per-session promise stored on the SpawnManager instance. + // + // This is best-effort from writeMcpConfig's perspective: a claude + // role doesn't need codex registration, and failing the claude spawn + // because codex isn't installed or mis-registered would be an + // over-reaction. If a codex role is spawned and registration failed, + // the caller (see spawn() codex branch below) re-calls this and the + // thrown error propagates up to the spawn caller. + try { + await this.ensureCodexMcpRegistered(mcpEnv, mcpServePath, groveDir); + } catch (err) { + debugLog( + "mcpConfig", + `codex mcp registration failed in writeMcpConfig (claude roles unaffected): ${err instanceof Error ? err.message : String(err)}`, + ); + } + } + + /** + * Per-SpawnManager state: a promise chain that serializes codex MCP + * registration across parallel spawn() calls. Keyed by GROVE_SESSION_ID + * so a new session retries registration instead of reusing a stale one. + */ + private codexRegistration: { sessionId: string | undefined; promise: Promise } | undefined; + + private ensureCodexMcpRegistered( + mcpEnv: Record, + mcpServePath: string, + groveDir: string, + ): Promise { + const sessionId = mcpEnv.GROVE_SESSION_ID; + + // Short-circuit when codex is not installed at all — there is nothing + // to configure, and the claude path reads .acpxrc.json directly. Use + // spawnSync for a cheap `codex --version` probe, cached on the instance + // so we only pay it once. + if (this.codexAvailable === undefined) { try { - const { spawnSync } = await import("node:child_process"); - spawnSync("sh", ["-c", "codex mcp remove grove 2>/dev/null || true"], { + const result = spawnSync("codex", ["--version"], { stdio: "pipe", timeout: 5000, }); - spawnSync( - "sh", - ["-c", `codex mcp add grove ${nexusEnvFlags} -- bun run ${mcpServePath}`], - { stdio: "pipe", timeout: 10000 }, - ); - debugLog("mcpConfig", `codex mcp registered: ${nexusEnvFlags}`); + this.codexAvailable = result.status === 0; + } catch { + this.codexAvailable = false; + } + } + if (!this.codexAvailable) { + return Promise.resolve(); + } + + // Return an already-successful registration for this session. We only + // cache on success — a failure clears the cache so the next caller + // retries, guarding against one slow/transient `codex mcp add` + // poisoning every subsequent spawn in the same session. + if (this.codexRegistration && this.codexRegistration.sessionId === sessionId) { + return this.codexRegistration.promise; + } + + // Use a stable name `grove` (not `grove-`) so we don't + // accumulate stale entries with persisted secrets (NEXUS_API_KEY, + // GROVE_SESSION_ID) in ~/.codex/config.toml over time. Each new + // session replaces the single entry in place. The serialized promise + // chain on `this.codexRegistration` plus the `spawn()` mutex + // guarantees that concurrent role spawns in the same session share + // one registration call rather than racing the remove/add cycle. + // + // We still sweep any `grove-*` entries left over from earlier code + // paths (or from crashed prior runs) so the global config stays clean. + const codexMcpName = "grove"; + + // Use argv-based spawnSync rather than a shell string so that paths + // containing spaces / quotes / shell metacharacters in mcpServePath, + // groveDir, NEXUS_API_KEY, etc. cannot break the command or inject + // extra arguments. + const addArgs: string[] = ["mcp", "add", codexMcpName]; + addArgs.push("--env", `GROVE_DIR=${groveDir}`); + if (mcpEnv.GROVE_NEXUS_URL) addArgs.push("--env", `GROVE_NEXUS_URL=${mcpEnv.GROVE_NEXUS_URL}`); + if (mcpEnv.NEXUS_API_KEY) addArgs.push("--env", `NEXUS_API_KEY=${mcpEnv.NEXUS_API_KEY}`); + if (sessionId) addArgs.push("--env", `GROVE_SESSION_ID=${sessionId}`); + addArgs.push("--", "bun", "run", mcpServePath); + + const promise = (async () => { + // Sweep any stale `grove-*` entries left by earlier code paths that + // used per-session names. Best-effort — list output is parsed loosely. + try { + const list = spawnSync("codex", ["mcp", "list"], { + stdio: "pipe", + timeout: 5000, + }); + if (list.status === 0) { + const stdout = list.stdout?.toString("utf-8") ?? ""; + for (const line of stdout.split("\n")) { + const match = /^\s*(grove-\S+)\b/.exec(line); + if (match?.[1]) { + spawnSync("codex", ["mcp", "remove", match[1]], { + stdio: "pipe", + timeout: 5000, + }); + } + } + } } catch { - // Non-fatal — codex may not be installed + /* best-effort cleanup */ } + + // Remove the current stable name (noop on first registration) and + // re-add with the fresh session env. + spawnSync("codex", ["mcp", "remove", codexMcpName], { + stdio: "pipe", + timeout: 5000, + }); + const result = spawnSync("codex", addArgs, { + stdio: "pipe", + timeout: 10000, + }); + if (result.status !== 0) { + const stderr = result.stderr?.toString("utf-8") ?? ""; + const stdout = result.stdout?.toString("utf-8") ?? ""; + throw new Error( + `codex mcp add ${codexMcpName} failed (exit=${result.status ?? "signal"}): ${stderr || stdout || "no output"}`, + ); + } + debugLog("mcpConfig", `codex mcp registered as ${codexMcpName}`); })(); + + // Install the cache entry before awaiting so concurrent callers share + // the same in-flight promise. On failure we clear it so the next spawn + // can retry, and we RE-THROW so the caller (writeMcpConfig → spawn) can + // surface the failure when a codex role actually needs these tools. + // Note: writeMcpConfig is called for every role, even claude ones. We + // don't want claude spawns to fail because codex registration raced, + // so writeMcpConfig wraps this in a soft catch before returning. + this.codexRegistration = { sessionId, promise }; + return promise.catch((err) => { + if (this.codexRegistration?.promise === promise) { + this.codexRegistration = undefined; + } + debugLog( + "mcpConfig", + `codex mcp registration failed (will retry next spawn): ${err instanceof Error ? err.message : String(err)}`, + ); + throw err; + }); } + private codexAvailable: boolean | undefined; + /** * Write CLAUDE.md (agent instructions) into the workspace. * Tells the agent its role. Communication happens automatically via From bc0a945316f1aabfb1dc244838482f60f119183f Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Fri, 10 Apr 2026 20:35:48 -0700 Subject: [PATCH 3/5] chore(core,nexus): gate debug logs + parallelize handoff fan-out MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lands the safe subset of the sibling WIP that was in the worktree alongside the Nexus/MCP fixes. Two themes: 1. Debug log hygiene - src/tui/debug-log.ts: add `ENABLED` gate keyed off GROVE_DEBUG=1 so the unconditional `/tmp/grove-debug.log` writes in hot paths become no-ops in normal operation. Before: every NexusHandoffStore read/write and every NexusContributionStore put/list wrote a timestamped line. After: only when GROVE_DEBUG=1 is set. - src/nexus/nexus-handoff-store.ts: replace 6× inlined `appendFileSync("/tmp/grove-debug.log", ...)` blocks with calls to the shared `debugLog()` helper. Net -53 lines of boilerplate and one consistent gate instead of six unguarded writes. - src/nexus/nexus-contribution-store.ts: same cleanup; net -22 lines. Reuses the existing `manifestPath` local rather than recomputing it inside the log. 2. contributeOperation handoff fan-out improvements - src/core/operations/contribute.ts: the `createMany` fast path already existed; the per-input fallback was serial and swallowed every error silently. Switch the fallback to `Promise.allSettled` so N downstream handoffs pay 1×RTT instead of N×RTT, and surface per-failure reasons via `console.warn` so operators can diagnose routing gaps instead of noticing handoffs missing from the UI without any log trail. The best-effort semantics are preserved: a failure does not abort the contribution that was already committed. - src/core/operations/contribute.test.ts: add a describe block exercising the serial (fallback) path with an injected faulty handoff store. Verifies (a) the contribution is still durably persisted when every handoff create throws and (b) a single `console.warn` fires for the failed batch. 3. Doc-only clarifications - src/local/sqlite-store.ts: extend the `putWithCowrite` JSDoc to spell out the duck-type capability contract used by contributeOperation's `writeAtomic` selector. - src/local/sqlite-handoff-store.ts: matching comment on `insertSync` pointing at the same selector rule. Explicitly NOT included in this commit (separate issue to follow): `src/core/operations/bounty.ts` and `src/core/operations/bounty.test.ts`. Three codex adversarial-review rounds flagged the new compensation pattern there (settle-before-pay corruption + post-commit reservation rollback + post-commit claim release). Those fixes need a saga / outbox / reconciler design decision, not a mechanical change, so they stay in the worktree until the design is resolved. Test coverage: `bun test` (with NEXUS env) → 4878 pass, 0 fail. `bun x tsc --noEmit` clean. `bun x biome check` clean on changed files. --- src/core/operations/contribute.test.ts | 104 +++++++++++++++++++++- src/core/operations/contribute.ts | 33 +++++-- src/local/sqlite-handoff-store.ts | 8 ++ src/local/sqlite-store.ts | 9 +- src/nexus/nexus-contribution-store.ts | 39 +++------ src/nexus/nexus-handoff-store.ts | 115 ++++++++----------------- src/tui/debug-log.ts | 4 + 7 files changed, 195 insertions(+), 117 deletions(-) diff --git a/src/core/operations/contribute.test.ts b/src/core/operations/contribute.test.ts index e5374504..2e1b35f3 100644 --- a/src/core/operations/contribute.test.ts +++ b/src/core/operations/contribute.test.ts @@ -2,8 +2,11 @@ * Tests for contribution operations. */ -import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { afterEach, beforeEach, describe, expect, spyOn, test } from "bun:test"; +import { LocalEventBus } from "../local-event-bus.js"; +import type { AgentTopology } from "../topology.js"; +import { TopologyRouter } from "../topology-router.js"; import { _resetIdempotencyCacheForTests, contributeOperation, @@ -11,8 +14,22 @@ import { reproduceOperation, reviewOperation, } from "./contribute.js"; +import type { OperationDeps } from "./deps.js"; import type { FullOperationDeps, TestOperationDeps } from "./test-helpers.js"; -import { createTestOperationDeps, storeTestContent } from "./test-helpers.js"; +import { + createTestOperationDeps, + makeInMemoryContributionStore, + storeTestContent, +} from "./test-helpers.js"; + +/** Minimal topology: coder routes to reviewer. Used to populate routedTo in writeSerial. */ +const twoRoleTopology: AgentTopology = { + structure: "graph", + roles: [ + { name: "coder", edges: [{ target: "reviewer", edgeType: "delegates" }] }, + { name: "reviewer", edges: [{ target: "coder", edgeType: "feedback" }] }, + ], +}; describe("contributeOperation", () => { let testDeps: TestOperationDeps; @@ -871,3 +888,86 @@ describe("discussOperation", () => { if (!result.ok) expect(result.error.code).toBe("NOT_FOUND"); }); }); + +describe("writeSerial: best-effort handoff failure paths", () => { + // These tests use an in-memory contribution store (no putWithCowrite) so that + // contributeOperation goes through writeSerial, not writeAtomic. A topology + // router is needed to populate routedTo so handoff creation is actually attempted. + + function makeSerialDeps(handoffStore: OperationDeps["handoffStore"]): OperationDeps { + const store = makeInMemoryContributionStore(); + const bus = new LocalEventBus(); + const router = new TopologyRouter(twoRoleTopology, bus); + return { contributionStore: store, topologyRouter: router, eventBus: bus, handoffStore }; + } + + test("contribution is committed even when handoffStore.createMany throws", async () => { + const faultyHandoffStore: OperationDeps["handoffStore"] = { + create: async () => { + throw new Error("should not be called"); + }, + createMany: async () => { + throw new Error("simulated handoff store failure"); + }, + get: async () => undefined, + list: async () => [], + markDelivered: async () => undefined, + markReplied: async () => undefined, + expireStale: async () => [], + countPending: async () => 0, + close: () => undefined, + }; + + const deps = makeSerialDeps(faultyHandoffStore); + + const result = await contributeOperation( + { kind: "work", summary: "Handoff fault test", agent: { agentId: "worker", role: "coder" } }, + deps, + ); + + // Contribution must succeed despite handoff failure + expect(result.ok).toBe(true); + if (!result.ok) return; + + // handoffIds is empty because handoff creation failed (best-effort) + expect(result.value.handoffIds ?? []).toHaveLength(0); + + // The contribution itself is durably stored + const stored = await deps.contributionStore?.get(result.value.cid); + expect(stored).toBeDefined(); + expect(stored!.cid).toBe(result.value.cid); + }); + + test("emits console.warn when handoffStore.createMany throws", async () => { + const warnSpy = spyOn(console, "warn").mockImplementation(() => {}); + + const faultyHandoffStore: OperationDeps["handoffStore"] = { + create: async () => { + throw new Error("should not be called"); + }, + createMany: async () => { + throw new Error("handoff store down"); + }, + get: async () => undefined, + list: async () => [], + markDelivered: async () => undefined, + markReplied: async () => undefined, + expireStale: async () => [], + countPending: async () => 0, + close: () => undefined, + }; + + const deps = makeSerialDeps(faultyHandoffStore); + + await contributeOperation( + { kind: "work", summary: "Warning log test", agent: { agentId: "worker", role: "coder" } }, + deps, + ); + + expect(warnSpy).toHaveBeenCalled(); + const warnArg = String(warnSpy.mock.calls[0]?.[0] ?? ""); + expect(warnArg).toContain("[grove] handoff batch failed"); + + warnSpy.mockRestore(); + }); +}); diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index 951f2600..f473b4c2 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -263,6 +263,13 @@ interface CachedIdempotencyEntry { * on lookup. Not shared across processes — clients running multiple grove * instances must coordinate keys themselves. * + * Process-restart gap: this cache is not persisted. If a client submits a + * contribution, receives a timeout, and retries after a process restart, the + * retry will not hit the cache and may produce a duplicate contribution. + * Clients that need restart-safe deduplication should use content-addressed + * CIDs or a separate coordination layer. A store-backed idempotency table + * would close this gap but is deferred to a future saga-log project. + * * Single-flight: when a caller first observes a key miss, it synchronously * inserts a pending entry holding a Promise the write will resolve. Any * concurrent caller with the same key awaits that Promise. JavaScript is @@ -570,18 +577,30 @@ async function writeSerial( try { const handoffs = await handoffStore.createMany(inputs); for (const h of handoffs) handoffIds.push(h.handoffId); - } catch { - // Best-effort: contribution is already committed. + } catch (err) { + // Best-effort: contribution is already committed. Log so operators can + // diagnose routing gaps (downstream agents may not be notified). + console.warn( + `[grove] handoff batch failed for cid=${contribution.cid} roles=${inputs.map((i) => i.toRole).join(",")}`, + err, + ); } return handoffIds; } - for (const input of inputs) { - try { - const handoff = await handoffStore.create(input); - handoffIds.push(handoff.handoffId); - } catch { + // Fallback for stores without createMany: fan out in parallel so N handoffs + // pay 1×RTT instead of N×RTT. allSettled ensures a single failure doesn't + // abandon the remaining handoffs. + const results = await Promise.allSettled(inputs.map((input) => handoffStore.create(input))); + for (const [i, result] of results.entries()) { + if (result.status === "fulfilled") { + handoffIds.push(result.value.handoffId); + } else { // Best-effort: contribution is already committed. + console.warn( + `[grove] handoff create failed for cid=${contribution.cid} role=${inputs[i]?.toRole}`, + result.reason, + ); } } return handoffIds; diff --git a/src/local/sqlite-handoff-store.ts b/src/local/sqlite-handoff-store.ts index 79e7b955..aa927c92 100644 --- a/src/local/sqlite-handoff-store.ts +++ b/src/local/sqlite-handoff-store.ts @@ -70,6 +70,14 @@ export class SqliteHandoffStore implements HandoffStore { return handoff; } + /** + * Insert a handoff record synchronously inside an active SQLite transaction. + * + * Capability extension: `contributeOperation` duck-types for this method when + * selecting the atomic write path (`writeAtomic`). It is called as the cowrite + * callback inside `putWithCowrite`, so it must be synchronous. Stores that do + * NOT implement this method fall back to `writeSerial` (best-effort handoffs). + */ insertSync(input: HandoffInput): string { const handoffId = input.handoffId ?? crypto.randomUUID(); this.db diff --git a/src/local/sqlite-store.ts b/src/local/sqlite-store.ts index 7270b4a5..e4fadb49 100644 --- a/src/local/sqlite-store.ts +++ b/src/local/sqlite-store.ts @@ -637,7 +637,14 @@ export class SqliteContributionStore implements ContributionStore { * Used for atomic contribution + handoff creation (outbox pattern). * * cowriteFn must be synchronous (SQLite transactions in bun:sqlite are sync). - * Called via duck-typing from contributeOperation when both stores are SQLite-backed. + * + * Capability extension: `contributeOperation` duck-types for this method at + * runtime. When both the ContributionStore and HandoffStore are SQLite-backed + * (i.e. both expose `putWithCowrite` and `insertSync`), the write goes through + * `writeAtomic` — a single SQLite transaction covering contribution + all handoffs. + * Stores that do NOT implement this method fall back to `writeSerial` (best-effort + * handoffs). Implementing stores must not add this method unless they can actually + * satisfy the synchronous-cowrite contract. */ putWithCowrite(contribution: Contribution, cowriteFn: () => void): void { this.putSync(contribution, cowriteFn); diff --git a/src/nexus/nexus-contribution-store.ts b/src/nexus/nexus-contribution-store.ts index ac1a2148..f40fd3b6 100644 --- a/src/nexus/nexus-contribution-store.ts +++ b/src/nexus/nexus-contribution-store.ts @@ -11,7 +11,6 @@ * - FTS: /zones/{zoneId}/indexes/fts/{cid}.json */ -import { appendFileSync as _afs } from "node:fs"; import { fromManifest, toManifest, verifyCid } from "../core/manifest.js"; import type { Contribution, @@ -28,6 +27,7 @@ import type { ThreadSummary, } from "../core/store.js"; import { toUtcIso } from "../core/time.js"; +import { debugLog } from "../tui/debug-log.js"; import { batchParallel } from "./batch.js"; import type { NexusClient } from "./client.js"; import type { NexusConfig, ResolvedNexusConfig } from "./config.js"; @@ -141,15 +141,10 @@ export class NexusContributionStore implements ContributionStore { this.cache.set(contribution.cid, contribution); this.listCacheResult = undefined; // Invalidate list cache on write - try { - const manifestPath = contributionPath(this.zoneId, contribution.cid, this.sessionId); - _afs( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [store.put] cid=${contribution.cid.slice(0, 16)} sessionId=${this.sessionId ?? "none"} path=${manifestPath}\n`, - ); - } catch { - /* ignore */ - } + debugLog( + "store.put", + `cid=${contribution.cid.slice(0, 16)} sessionId=${this.sessionId ?? "none"} path=${manifestPath}`, + ); } async putMany(contributions: readonly Contribution[]): Promise { @@ -200,14 +195,10 @@ export class NexusContributionStore implements ContributionStore { const cacheHit = this.listCacheResult !== undefined && Date.now() - this.listCacheTime < this.listCacheTtlMs; const ftsDir = ftsIndexDir(this.zoneId, this.sessionId); - try { - _afs( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [store.list] sessionId=${this.sessionId ?? "none"} ftsDir=${ftsDir} cacheHit=${cacheHit}\n`, - ); - } catch { - /* ignore */ - } + debugLog( + "store.list", + `sessionId=${this.sessionId ?? "none"} ftsDir=${ftsDir} cacheHit=${cacheHit}`, + ); if (cacheHit) { allContributions = [...(this.listCacheResult as Contribution[])]; } else { @@ -230,14 +221,10 @@ export class NexusContributionStore implements ContributionStore { // Fetch ALL contributions const fetched = await batchParallel(allCids, (cid) => this.get(cid)); allContributions = fetched.filter((c): c is Contribution => c !== undefined); - try { - _afs( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [store.list] ftsEntries=${nonDirEntries.length} matchingCids=${allCids.length} total=${allContributions.length}\n`, - ); - } catch { - /* ignore */ - } + debugLog( + "store.list", + `ftsEntries=${nonDirEntries.length} matchingCids=${allCids.length} total=${allContributions.length}`, + ); // Sort by createdAt ascending (matches SQLite store behavior) allContributions.sort( diff --git a/src/nexus/nexus-handoff-store.ts b/src/nexus/nexus-handoff-store.ts index d17ae8ae..01ee13e7 100644 --- a/src/nexus/nexus-handoff-store.ts +++ b/src/nexus/nexus-handoff-store.ts @@ -19,6 +19,7 @@ import { HandoffStatus, type HandoffStore, } from "../core/handoff.js"; +import { debugLog } from "../tui/debug-log.js"; import type { NexusClient } from "./client.js"; const MAX_CAS_RETRIES = 8; @@ -78,27 +79,14 @@ export class NexusHandoffStore implements HandoffStore { private async readFile(path: string): Promise<{ handoffs: Handoff[]; etag: string }> { const result = await this.client.readWithMeta(path); if (!result) { - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [nexus-handoff.readFile] path=${path} result=null (file not found)\n`, - ); - } catch { - /* */ - } + debugLog("nexus-handoff.readFile", `path=${path} result=null (file not found)`); return { handoffs: [], etag: "" }; } const text = new TextDecoder().decode(result.content); - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [nexus-handoff.readFile] path=${path} contentLen=${result.content.length} text=${text.slice(0, 80)} etag=${result.etag}\n`, - ); - } catch { - /* */ - } + debugLog( + "nexus-handoff.readFile", + `path=${path} contentLen=${result.content.length} text=${text.slice(0, 80)} etag=${result.etag}`, + ); const parsed = JSON.parse(text) as HandoffFile; return { handoffs: parsed.handoffs ?? [], etag: result.etag ?? "" }; } @@ -122,27 +110,17 @@ export class NexusHandoffStore implements HandoffStore { // Without CAS, concurrent writes may lose data, but handoffs are append-only // per session so conflicts are rare and the retry loop handles it. const writeResult = await this.client.write(path, encode({ handoffs: updated })); - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [NexusHandoffStore.casUpdate] WRITE OK path=${path} etag=${etag || "(empty)"} bytesWritten=${writeResult.bytesWritten} newEtag=${writeResult.etag} count=${updated.length} attempt=${attempt}\n`, - ); - } catch { - /* non-fatal */ - } + debugLog( + "NexusHandoffStore.casUpdate", + `WRITE OK path=${path} etag=${etag || "(empty)"} bytesWritten=${writeResult.bytesWritten} newEtag=${writeResult.etag} count=${updated.length} attempt=${attempt}`, + ); return updated; } catch (err) { const msg = err instanceof Error ? err.message : String(err); - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [NexusHandoffStore.casUpdate] WRITE FAIL path=${path} attempt=${attempt} err=${msg}\n`, - ); - } catch { - /* non-fatal */ - } + debugLog( + "NexusHandoffStore.casUpdate", + `WRITE FAIL path=${path} attempt=${attempt} err=${msg}`, + ); // Conflict = another writer updated between our read and write — retry if (msg.includes("412") || msg.includes("conflict") || msg.includes("mismatch")) { // Brief backoff before retry @@ -226,15 +204,10 @@ export class NexusHandoffStore implements HandoffStore { // doesn't guarantee read-after-write visibility across NexusHttpClient instances). // readAllHandoffs uses directory listing which has broader visibility. const allHandoffs = await this.readAllHandoffs(); - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [nexus-handoff] LIST sessionId=${this.sessionId ?? "none"} path=${this.filePath()} total=${allHandoffs.length}\n`, - ); - } catch { - /* */ - } + debugLog( + "nexus-handoff", + `LIST sessionId=${this.sessionId ?? "none"} path=${this.filePath()} total=${allHandoffs.length}`, + ); // Filter out malformed entries (test files without required fields) let results = allHandoffs.filter((h) => h.handoffId && h.createdAt); @@ -317,54 +290,34 @@ export class NexusHandoffStore implements HandoffStore { // Nexus list may return entries without the .json extension even though // the file was written with .json — accept all non-directory entries. const files = listing.files.filter((e) => !e.isDirectory); - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [NexusHandoffStore.readAllHandoffs] dir=${handoffsDir(this.zoneId)} totalEntries=${listing.files.length} jsonFiles=${files.length} paths=${files.map((f) => f.path).join(",") || "(none)"}\n`, - ); - } catch { - /* non-fatal */ - } + debugLog( + "NexusHandoffStore.readAllHandoffs", + `dir=${handoffsDir(this.zoneId)} totalEntries=${listing.files.length} jsonFiles=${files.length} paths=${files.map((f) => f.path).join(",") || "(none)"}`, + ); const results = await Promise.all( files.map(async (f: import("./client.js").ListEntry) => { try { const { handoffs } = await this.readFile(f.path); - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [NexusHandoffStore.readAllHandoffs] read path=${f.path} count=${handoffs.length}\n`, - ); - } catch { - /* non-fatal */ - } + debugLog( + "NexusHandoffStore.readAllHandoffs", + `read path=${f.path} count=${handoffs.length}`, + ); return handoffs; } catch (readErr) { - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [NexusHandoffStore.readAllHandoffs] FAIL path=${f.path} err=${readErr instanceof Error ? readErr.message : String(readErr)}\n`, - ); - } catch { - /* non-fatal */ - } + debugLog( + "NexusHandoffStore.readAllHandoffs", + `FAIL path=${f.path} err=${readErr instanceof Error ? readErr.message : String(readErr)}`, + ); return []; } }), ); return results.flat(); } catch (listErr) { - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [NexusHandoffStore.readAllHandoffs] LIST FAIL dir=${handoffsDir(this.zoneId)} err=${listErr instanceof Error ? listErr.message : String(listErr)}\n`, - ); - } catch { - /* non-fatal */ - } + debugLog( + "NexusHandoffStore.readAllHandoffs", + `LIST FAIL dir=${handoffsDir(this.zoneId)} err=${listErr instanceof Error ? listErr.message : String(listErr)}`, + ); return []; } } diff --git a/src/tui/debug-log.ts b/src/tui/debug-log.ts index 98a3f1d5..90242430 100644 --- a/src/tui/debug-log.ts +++ b/src/tui/debug-log.ts @@ -1,13 +1,17 @@ /** * Temporary debug logger for e2e trace validation. * Writes to /tmp/grove-debug.log so it doesn't corrupt the TUI. + * + * Gated behind GROVE_DEBUG=1 — no-op in normal operation. */ import { appendFileSync } from "node:fs"; const LOG_PATH = "/tmp/grove-debug.log"; +const ENABLED = process.env.GROVE_DEBUG === "1"; export function debugLog(tag: string, msg: string): void { + if (!ENABLED) return; try { appendFileSync(LOG_PATH, `[${new Date().toISOString()}] [${tag}] ${msg}\n`); } catch { From 28421f428a2be7841f2861af928dd2b8379fc636 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Fri, 10 Apr 2026 21:38:23 -0700 Subject: [PATCH 4/5] style: fix biome lint failures on PR #239 CI caught two lint errors that `bun x biome check` didn't surface when I ran it against only the changed files in the previous commits. Running the full `bun run check` the way CI does reproduced both. - src/core/acpx-runtime.test.ts: previous auto-fix left the `test()` call with the 30s timeout arg on a broken trailing-comment line that biome's formatter rejected on the second pass. Rewrote to the canonical 3-arg form with a single blank line between the arrow body and the timeout. - src/tui/spawn-manager.test.ts: the `setDefaultTimeout(30_000)` call I added in a prior round split the import block in two, which biome's `assist/source/organizeImports` rule flagged. Added a blank line between the call and the subsequent type imports so the import group is contiguous. No behavior change. `bun run check` is clean now (16 pre-existing warnings unrelated to my files). Full test suite still 4870 pass, 0 fail with NEXUS env set. --- src/core/acpx-runtime.test.ts | 32 ++++++++++++++------------------ src/tui/spawn-manager.test.ts | 1 + 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/src/core/acpx-runtime.test.ts b/src/core/acpx-runtime.test.ts index 3caeb6cd..56615305 100644 --- a/src/core/acpx-runtime.test.ts +++ b/src/core/acpx-runtime.test.ts @@ -75,24 +75,20 @@ describe("AcpxRuntime", () => { expect(rt).toBeDefined(); }); - test( - "spawn and close work when acpx is available", - async () => { - const rt = new AcpxRuntime(); - const skip = !(await rt.isAvailable()); - if (skip) return; + // Timeout bumped to 30s because acpx cold-start can take >5s when the + // agent adapter has to be fetched via npx or when the host is under load. + test("spawn and close work when acpx is available", async () => { + const rt = new AcpxRuntime(); + const skip = !(await rt.isAvailable()); + if (skip) return; - // Session names now carry a per-spawn counter and a base36 timestamp: - // grove--- - const session = await rt.spawn("test", config); - expect(session.id).toMatch(/^grove-test-\d+-[a-z0-9]+$/); - expect(session.role).toBe("test"); - expect(session.status).toBe("running"); + // Session names now carry a per-spawn counter and a base36 timestamp: + // grove--- + const session = await rt.spawn("test", config); + expect(session.id).toMatch(/^grove-test-\d+-[a-z0-9]+$/); + expect(session.role).toBe("test"); + expect(session.status).toBe("running"); - await rt.close(session); - }, - // acpx can take >5s when the agent adapter has to be fetched via npx or - // when the host machine is under load — default timeout was racing. - 30_000, - ); + await rt.close(session); + }, 30_000); }); diff --git a/src/tui/spawn-manager.test.ts b/src/tui/spawn-manager.test.ts index 6067bd85..15b0ce31 100644 --- a/src/tui/spawn-manager.test.ts +++ b/src/tui/spawn-manager.test.ts @@ -13,6 +13,7 @@ import { afterEach, describe, expect, setDefaultTimeout, test } from "bun:test"; // take 1–3s on a warm machine and 3–5s cold; the default 5s timeout races tests // that spawn multiple agents. Bump to 30s so CI and local runs are stable. setDefaultTimeout(30_000); + import type { Claim } from "../core/models.js"; import type { SpawnOptions, TmuxManager } from "./agents/tmux-manager.js"; import { MockTmuxManager } from "./agents/tmux-manager.js"; From c4b4f8e40ae8affef70ed50dbb3220592b6dff7d Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Sat, 11 Apr 2026 14:13:32 -0700 Subject: [PATCH 5/5] fix(core,tui): address final codex review on PR #239 (2 findings) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - contribute.ts: wrap parallel handoff create() in Promise.resolve().then() so a synchronous throw becomes a rejected promise instead of escaping .map() and bypassing allSettled. Without this, a sync throw from a HandoffStore extension would surface as an operation error after the contribution was already committed, releasing the idempotency slot and allowing duplicate contributions on retry. - contribute.test.ts: regression test with a HandoffStore whose create() throws synchronously and no createMany (forces the fallback path). - spawn-manager.ts: forward GROVE_DEBUG to spawned MCP agents via both .mcp.json/.acpxrc.json env and the codex mcp registration. debugLog() in NexusContributionStore/NexusHandoffStore reads the env var at module load inside the child grove-mcp process, which does not inherit env from the parent TUI shell — so without this passthrough turning on GROVE_DEBUG=1 in the parent never re-enabled agent-side traces. --- src/core/operations/contribute.test.ts | 61 ++++++++++++++++++++++++++ src/core/operations/contribute.ts | 9 +++- src/tui/spawn-manager.ts | 9 ++++ 3 files changed, 77 insertions(+), 2 deletions(-) diff --git a/src/core/operations/contribute.test.ts b/src/core/operations/contribute.test.ts index 2e1b35f3..eda13d15 100644 --- a/src/core/operations/contribute.test.ts +++ b/src/core/operations/contribute.test.ts @@ -970,4 +970,65 @@ describe("writeSerial: best-effort handoff failure paths", () => { warnSpy.mockRestore(); }); + + test("contribution survives synchronous throw from handoffStore.create() in parallel fallback", async () => { + // Regression for codex review finding: when a HandoffStore exposes only + // create() (no createMany), contribute falls back to a parallel fan-out via + // Promise.allSettled. If create() throws synchronously *before* returning a + // Promise, the throw must still be caught — otherwise the already-committed + // contribution would bubble out as an operation error and the idempotency + // slot would be released, allowing duplicate contributions on retry. + const warnSpy = spyOn(console, "warn").mockImplementation(() => {}); + + // Non-async function so the throw happens synchronously, before any + // Promise is returned. Cast through `unknown` because the interface + // declares an async return type. + const syncThrowingCreate = (() => { + throw new Error("simulated synchronous throw from create()"); + }) as unknown as NonNullable["create"]; + + const syncThrowingHandoffStore: OperationDeps["handoffStore"] = { + create: syncThrowingCreate, + // No createMany — forces the fallback path. + get: async () => undefined, + list: async () => [], + markDelivered: async () => undefined, + markReplied: async () => undefined, + expireStale: async () => [], + countPending: async () => 0, + close: () => undefined, + }; + + const deps = makeSerialDeps(syncThrowingHandoffStore); + + const result = await contributeOperation( + { + kind: "work", + summary: "Sync-throw fallback test", + agent: { agentId: "worker", role: "coder" }, + }, + deps, + ); + + // Operation must succeed: the contribution is already committed, and + // best-effort handoff failure should not surface as an error. + expect(result.ok).toBe(true); + if (!result.ok) return; + + // No handoff IDs because every create() threw. + expect(result.value.handoffIds ?? []).toHaveLength(0); + + // The contribution itself is durably stored. + const stored = await deps.contributionStore?.get(result.value.cid); + expect(stored).toBeDefined(); + expect(stored!.cid).toBe(result.value.cid); + + // We logged the per-item failure (not the batch failure path). + const warnedAboutCreate = warnSpy.mock.calls.some((call) => + String(call[0] ?? "").includes("[grove] handoff create failed"), + ); + expect(warnedAboutCreate).toBe(true); + + warnSpy.mockRestore(); + }); }); diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index f473b4c2..20ea77ef 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -590,8 +590,13 @@ async function writeSerial( // Fallback for stores without createMany: fan out in parallel so N handoffs // pay 1×RTT instead of N×RTT. allSettled ensures a single failure doesn't - // abandon the remaining handoffs. - const results = await Promise.allSettled(inputs.map((input) => handoffStore.create(input))); + // abandon the remaining handoffs. Wrap each call in Promise.resolve().then() + // so a synchronous throw inside create() becomes a rejected promise rather + // than escaping map() and bypassing allSettled (which would violate the + // best-effort contract: the contribution is already committed by this point). + const results = await Promise.allSettled( + inputs.map((input) => Promise.resolve().then(() => handoffStore.create(input))), + ); for (const [i, result] of results.entries()) { if (result.status === "fulfilled") { handoffIds.push(result.value.handoffId); diff --git a/src/tui/spawn-manager.ts b/src/tui/spawn-manager.ts index 24a86cb3..57f4da1a 100644 --- a/src/tui/spawn-manager.ts +++ b/src/tui/spawn-manager.ts @@ -1196,6 +1196,14 @@ export class SpawnManager { if (this.sessionId) { mcpEnv.GROVE_SESSION_ID = this.sessionId; } + // Forward GROVE_DEBUG to spawned MCP agents. debugLog() in the agent-side + // NexusContributionStore / NexusHandoffStore reads this env var at module + // load, but those run inside a separate child process whose env is dictated + // by .mcp.json / .acpxrc.json — not inherited from the parent shell. Without + // this passthrough, GROVE_DEBUG=1 in the TUI never enables agent-side traces. + if (process.env.GROVE_DEBUG) { + mcpEnv.GROVE_DEBUG = process.env.GROVE_DEBUG; + } // Find the grove MCP server: check dist/ first (installed), then src/ (dev) // Use process.argv[1] (entry point) not import.meta.url — bun bundles may inline @@ -1374,6 +1382,7 @@ export class SpawnManager { if (mcpEnv.GROVE_NEXUS_URL) addArgs.push("--env", `GROVE_NEXUS_URL=${mcpEnv.GROVE_NEXUS_URL}`); if (mcpEnv.NEXUS_API_KEY) addArgs.push("--env", `NEXUS_API_KEY=${mcpEnv.NEXUS_API_KEY}`); if (sessionId) addArgs.push("--env", `GROVE_SESSION_ID=${sessionId}`); + if (mcpEnv.GROVE_DEBUG) addArgs.push("--env", `GROVE_DEBUG=${mcpEnv.GROVE_DEBUG}`); addArgs.push("--", "bun", "run", mcpServePath); const promise = (async () => {