Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions src/cli/main.tui-dispatch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,32 @@ const CLI_PATH = join(import.meta.dir, "main.ts");

describe("bare grove → TUI dispatch", () => {
test("bare grove does not exit with usage error", async () => {
// When invoked without a TTY, handleTuiDirect may block or fail to render.
// The key assertion: it should NOT exit with code 2 ("unknown command").
// We give it 2s then kill it — if it hasn't exited with code 2 by then,
// it successfully reached the TUI path.
// When invoked without a TTY, handleTuiDirect may block or fail to
// render. The key assertion: it should NOT exit with code 2
// ("unknown command"). We give it 5s then kill it — if it hasn't
// exited with code 2 by then, it successfully reached the TUI path.
const proc = Bun.spawn(["bun", "run", CLI_PATH], {
stdout: "pipe",
stderr: "pipe",
env: { ...process.env, TERM: "dumb" },
});

// Race: either the process exits on its own, or we kill after 2s
const timeout = new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 2000));
// Race: either the process exits on its own, or we kill after 5s.
// Bun's cold-start on larger entry points (grove main.ts pulls in a
// lot of TS) routinely takes >2s on a warm machine, so the old 2s
// timeout would fire before the process had a chance to exit-on-its-
// -own, and the subsequent `await proc.exited` would then hit the
// outer bun:test 5s timeout.
const timeout = new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 5000));
const result = await Promise.race([
proc.exited.then((code) => ({ kind: "exited" as const, code })),
timeout.then((t) => ({ kind: t })),
]);

if (result.kind === "timeout") {
// Process is still running (TUI is blocking) — this means it successfully
// dispatched to handleTuiDirect and didn't exit with "unknown command"
// Process is still running (TUI is blocking) — this means it
// successfully dispatched to handleTuiDirect and didn't exit with
// "unknown command"
proc.kill();
await proc.exited;
} else {
Expand All @@ -40,7 +46,7 @@ describe("bare grove → TUI dispatch", () => {
expect(result.code).not.toBe(2);
expect(stderr).not.toContain("unknown command");
}
});
}, 15_000); // Outer test timeout must exceed the inner 5s kill deadline.

test("grove --help still works", async () => {
const proc = Bun.spawn(["bun", "run", CLI_PATH, "--help"], {
Expand Down
2 changes: 1 addition & 1 deletion src/cli/nexus-lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -600,11 +600,11 @@ export async function ensureNexusRunning(
}

const candidateUrls = [
process.env.GROVE_NEXUS_URL, // explicit env override takes highest priority
containerUrl, // container IP (works without port binding)
config.nexusUrl,
readNexusUrl(projectRoot),
stateFileUrl,
process.env.GROVE_NEXUS_URL,
DEFAULT_NEXUS_URL,
].filter((u): u is string => !!u);

Expand Down
8 changes: 6 additions & 2 deletions src/core/acpx-runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,20 @@ describe("AcpxRuntime", () => {
expect(rt).toBeDefined();
});

// Timeout bumped to 30s because acpx cold-start can take >5s when the
// agent adapter has to be fetched via npx or when the host is under load.
test("spawn and close work when acpx is available", async () => {
const rt = new AcpxRuntime();
const skip = !(await rt.isAvailable());
if (skip) return;

// Session names now carry a per-spawn counter and a base36 timestamp:
// grove-<role>-<counter>-<timestamp>
const session = await rt.spawn("test", config);
expect(session.id).toMatch(/^grove-test-\d+$/);
expect(session.id).toMatch(/^grove-test-\d+-[a-z0-9]+$/);
expect(session.role).toBe("test");
expect(session.status).toBe("running");

await rt.close(session);
});
}, 30_000);
});
67 changes: 60 additions & 7 deletions src/core/acpx-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,24 @@ export class AcpxRuntime implements AgentRuntime {

async isAvailable(): Promise<boolean> {
try {
execSync("acpx --version", { encoding: "utf-8", stdio: "pipe" });
return true;
const out = execSync("acpx --version", { encoding: "utf-8", stdio: "pipe" }).trim();
// acpx >=0.5.3 is required: 0.3.x uses the buggy @zed-industries/claude-agent-acp
// adapter that fails session/new with "Query closed before response received".
// 0.5.x switched to @agentclientprotocol/claude-agent-acp which works.
const match = /^(\d+)\.(\d+)\.(\d+)/.exec(out);
if (!match) return true; // unparseable — don't block
const [, maj, min, patch] = match;
const major = Number(maj);
const minor = Number(min);
const patchNum = Number(patch);
if (major > 0) return true;
if (minor > 5) return true;
if (minor === 5 && patchNum >= 3) return true;
process.stderr.write(
`[acpx-runtime] acpx ${out} is too old — grove requires acpx >=0.5.3. ` +
`Upgrade with: npm install -g acpx@latest\n`,
);
return false;
} catch {
return false;
}
Expand All @@ -82,21 +98,58 @@ export class AcpxRuntime implements AgentRuntime {
const counter = this.nextId++;
const sessionName = `grove-${role}-${counter}-${Date.now().toString(36)}`;
const id = sessionName;
const mergedEnv = config.env ? { ...process.env, ...config.env } : { ...process.env };

// Strip Claude Code harness env vars before spawning the subagent.
//
// When grove runs inside a Claude Code shell (CLAUDECODE=1), any inner
// `claude` subprocess detects that flag and connects back to the parent
// Claude Code harness via its IPC channel instead of launching a fresh
// session. The inner agent then inherits the parent's tool surface
// (ToolSearch, EnterWorktree, mcp__MaaS-*, etc.) and — critically — does
// NOT load the workspace's `.mcp.json` / `.acpxrc.json` grove MCP server.
//
// Unset every CLAUDE_CODE_* / CLAUDECODE / CLAUDE_PLUGIN_* var so acpx
// launches a pristine agent that reads its MCP config from the workspace.
const baseEnv = { ...process.env };
for (const key of Object.keys(baseEnv)) {
if (
key === "CLAUDECODE" ||
key.startsWith("CLAUDE_CODE_") ||
key.startsWith("CLAUDE_PLUGIN_")
) {
delete baseEnv[key];
}
}
const mergedEnv = config.env ? { ...baseEnv, ...config.env } : baseEnv;

// Extract the agent binary name from config.command (e.g. "claude --flag" → "claude").
// acpx takes the agent name as a subcommand; flags and the initial prompt go through
// the session creation path, not the `acpx <agent>` argument.
//
// Only known acpx subcommands are accepted (claude/codex/gemini/pi/openclaw). Any
// other first token (including shell builtins like `echo` used in tests) falls back
// to the runtime-level default so we never pass acpx an unknown agent name.
const KNOWN_ACPX_AGENTS = new Set(["claude", "codex", "gemini", "pi", "openclaw"]);
const agent = (() => {
if (!config.command) return this.agent;
const stripped = config.command.replace(/^rm\s+[^;]+;\s*/, ""); // drop leading "rm -f ~/..." hooks
const first = stripped.trim().split(/\s+/)[0] ?? "";
return KNOWN_ACPX_AGENTS.has(first) ? first : this.agent;
})();

// Create a new acpx session with --approve-all (layer 1: acpx client-side auto-approve)
const createCmd = `acpx --approve-all ${shellEscape(this.agent)} sessions new --name ${shellEscape(sessionName)}`;
const createCmd = `acpx --approve-all ${shellEscape(agent)} sessions new --name ${shellEscape(sessionName)}`;
try {
execSync(createCmd, { encoding: "utf-8", stdio: "pipe", cwd: config.cwd, env: mergedEnv });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
throw new Error(`acpx session creation failed for role "${role}": ${msg}`);
throw new Error(`acpx session creation failed for role "${role}" (agent=${agent}): ${msg}`);
}

// Set full-access mode (layer 2: codex internal approval policy = never prompt)
try {
execSync(
`acpx --approve-all ${shellEscape(this.agent)} set-mode full-access -s ${shellEscape(sessionName)}`,
`acpx --approve-all ${shellEscape(agent)} set-mode full-access -s ${shellEscape(sessionName)}`,
{ encoding: "utf-8", stdio: "pipe", cwd: config.cwd, env: mergedEnv, timeout: 10_000 },
);
} catch {
Expand All @@ -107,7 +160,7 @@ export class AcpxRuntime implements AgentRuntime {
const logFile = this.logDir ? join(this.logDir, `${role}-${counter}.log`) : null;
const entry: AcpxSessionEntry = {
session,
agent: this.agent,
agent,
sessionName,
cwd: config.cwd,
env: mergedEnv,
Expand Down
165 changes: 163 additions & 2 deletions src/core/operations/contribute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,34 @@
* Tests for contribution operations.
*/

import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { afterEach, beforeEach, describe, expect, spyOn, test } from "bun:test";

import { LocalEventBus } from "../local-event-bus.js";
import type { AgentTopology } from "../topology.js";
import { TopologyRouter } from "../topology-router.js";
import {
_resetIdempotencyCacheForTests,
contributeOperation,
discussOperation,
reproduceOperation,
reviewOperation,
} from "./contribute.js";
import type { OperationDeps } from "./deps.js";
import type { FullOperationDeps, TestOperationDeps } from "./test-helpers.js";
import { createTestOperationDeps, storeTestContent } from "./test-helpers.js";
import {
createTestOperationDeps,
makeInMemoryContributionStore,
storeTestContent,
} from "./test-helpers.js";

/** Minimal topology: coder routes to reviewer. Used to populate routedTo in writeSerial. */
const twoRoleTopology: AgentTopology = {
structure: "graph",
roles: [
{ name: "coder", edges: [{ target: "reviewer", edgeType: "delegates" }] },
{ name: "reviewer", edges: [{ target: "coder", edgeType: "feedback" }] },
],
};

describe("contributeOperation", () => {
let testDeps: TestOperationDeps;
Expand Down Expand Up @@ -871,3 +888,147 @@ describe("discussOperation", () => {
if (!result.ok) expect(result.error.code).toBe("NOT_FOUND");
});
});

describe("writeSerial: best-effort handoff failure paths", () => {
// These tests use an in-memory contribution store (no putWithCowrite) so that
// contributeOperation goes through writeSerial, not writeAtomic. A topology
// router is needed to populate routedTo so handoff creation is actually attempted.

function makeSerialDeps(handoffStore: OperationDeps["handoffStore"]): OperationDeps {
const store = makeInMemoryContributionStore();
const bus = new LocalEventBus();
const router = new TopologyRouter(twoRoleTopology, bus);
return { contributionStore: store, topologyRouter: router, eventBus: bus, handoffStore };
}

test("contribution is committed even when handoffStore.createMany throws", async () => {
const faultyHandoffStore: OperationDeps["handoffStore"] = {
create: async () => {
throw new Error("should not be called");
},
createMany: async () => {
throw new Error("simulated handoff store failure");
},
get: async () => undefined,
list: async () => [],
markDelivered: async () => undefined,
markReplied: async () => undefined,
expireStale: async () => [],
countPending: async () => 0,
close: () => undefined,
};

const deps = makeSerialDeps(faultyHandoffStore);

const result = await contributeOperation(
{ kind: "work", summary: "Handoff fault test", agent: { agentId: "worker", role: "coder" } },
deps,
);

// Contribution must succeed despite handoff failure
expect(result.ok).toBe(true);
if (!result.ok) return;

// handoffIds is empty because handoff creation failed (best-effort)
expect(result.value.handoffIds ?? []).toHaveLength(0);

// The contribution itself is durably stored
const stored = await deps.contributionStore?.get(result.value.cid);
expect(stored).toBeDefined();
expect(stored!.cid).toBe(result.value.cid);
});

test("emits console.warn when handoffStore.createMany throws", async () => {
const warnSpy = spyOn(console, "warn").mockImplementation(() => {});

const faultyHandoffStore: OperationDeps["handoffStore"] = {
create: async () => {
throw new Error("should not be called");
},
createMany: async () => {
throw new Error("handoff store down");
},
get: async () => undefined,
list: async () => [],
markDelivered: async () => undefined,
markReplied: async () => undefined,
expireStale: async () => [],
countPending: async () => 0,
close: () => undefined,
};

const deps = makeSerialDeps(faultyHandoffStore);

await contributeOperation(
{ kind: "work", summary: "Warning log test", agent: { agentId: "worker", role: "coder" } },
deps,
);

expect(warnSpy).toHaveBeenCalled();
const warnArg = String(warnSpy.mock.calls[0]?.[0] ?? "");
expect(warnArg).toContain("[grove] handoff batch failed");

warnSpy.mockRestore();
});

test("contribution survives synchronous throw from handoffStore.create() in parallel fallback", async () => {
// Regression for codex review finding: when a HandoffStore exposes only
// create() (no createMany), contribute falls back to a parallel fan-out via
// Promise.allSettled. If create() throws synchronously *before* returning a
// Promise, the throw must still be caught — otherwise the already-committed
// contribution would bubble out as an operation error and the idempotency
// slot would be released, allowing duplicate contributions on retry.
const warnSpy = spyOn(console, "warn").mockImplementation(() => {});

// Non-async function so the throw happens synchronously, before any
// Promise is returned. Cast through `unknown` because the interface
// declares an async return type.
const syncThrowingCreate = (() => {
throw new Error("simulated synchronous throw from create()");
}) as unknown as NonNullable<OperationDeps["handoffStore"]>["create"];

const syncThrowingHandoffStore: OperationDeps["handoffStore"] = {
create: syncThrowingCreate,
// No createMany — forces the fallback path.
get: async () => undefined,
list: async () => [],
markDelivered: async () => undefined,
markReplied: async () => undefined,
expireStale: async () => [],
countPending: async () => 0,
close: () => undefined,
};

const deps = makeSerialDeps(syncThrowingHandoffStore);

const result = await contributeOperation(
{
kind: "work",
summary: "Sync-throw fallback test",
agent: { agentId: "worker", role: "coder" },
},
deps,
);

// Operation must succeed: the contribution is already committed, and
// best-effort handoff failure should not surface as an error.
expect(result.ok).toBe(true);
if (!result.ok) return;

// No handoff IDs because every create() threw.
expect(result.value.handoffIds ?? []).toHaveLength(0);

// The contribution itself is durably stored.
const stored = await deps.contributionStore?.get(result.value.cid);
expect(stored).toBeDefined();
expect(stored!.cid).toBe(result.value.cid);

// We logged the per-item failure (not the batch failure path).
const warnedAboutCreate = warnSpy.mock.calls.some((call) =>
String(call[0] ?? "").includes("[grove] handoff create failed"),
);
expect(warnedAboutCreate).toBe(true);

warnSpy.mockRestore();
});
});
Loading
Loading