diff --git a/src/core/event-bus.test.ts b/src/core/event-bus.test.ts index cc4a8e7d..49595504 100644 --- a/src/core/event-bus.test.ts +++ b/src/core/event-bus.test.ts @@ -7,12 +7,12 @@ import { TopologyRouter } from "./topology-router.js"; // --- LocalEventBus tests --- describe("LocalEventBus", () => { - test("publish delivers to subscriber of target role", () => { + test("publish delivers to subscriber of target role", async () => { const bus = new LocalEventBus(); const received: GroveEvent[] = []; bus.subscribe("reviewer", (e) => received.push(e)); - bus.publish({ + await bus.publish({ type: "contribution", sourceRole: "coder", targetRole: "reviewer", @@ -25,12 +25,12 @@ describe("LocalEventBus", () => { bus.close(); }); - test("publish does not deliver to wrong role", () => { + test("publish does not deliver to wrong role", async () => { const bus = new LocalEventBus(); const received: GroveEvent[] = []; bus.subscribe("coder", (e) => received.push(e)); - bus.publish({ + await bus.publish({ type: "contribution", sourceRole: "coder", targetRole: "reviewer", @@ -42,14 +42,14 @@ describe("LocalEventBus", () => { bus.close(); }); - test("unsubscribe stops delivery", () => { + test("unsubscribe stops delivery", async () => { const bus = new LocalEventBus(); const received: GroveEvent[] = []; const handler = (e: GroveEvent) => received.push(e); bus.subscribe("reviewer", handler); bus.unsubscribe("reviewer", handler); - bus.publish({ + await bus.publish({ type: "contribution", sourceRole: "coder", targetRole: "reviewer", @@ -61,13 +61,13 @@ describe("LocalEventBus", () => { bus.close(); }); - test("multiple subscribers on same role all receive", () => { + test("multiple subscribers on same role all receive", async () => { const bus = new LocalEventBus(); let count = 0; bus.subscribe("reviewer", () => count++); bus.subscribe("reviewer", () => count++); - bus.publish({ + await bus.publish({ type: "contribution", sourceRole: "coder", targetRole: "reviewer", @@ -79,13 +79,13 @@ describe("LocalEventBus", () => { bus.close(); }); - test("close removes all listeners", () => { + test("close removes all listeners", async () => { const bus = new LocalEventBus(); let count = 0; bus.subscribe("reviewer", () => count++); bus.close(); - bus.publish({ + await bus.publish({ type: "contribution", sourceRole: "coder", targetRole: "reviewer", @@ -95,6 +95,19 @@ describe("LocalEventBus", () => { expect(count).toBe(0); }); + + test("publish returns ok result", async () => { + const bus = new LocalEventBus(); + const result = await bus.publish({ + type: "contribution", + sourceRole: "coder", + targetRole: "reviewer", + payload: {}, + timestamp: new Date().toISOString(), + }); + expect(result.ok).toBe(true); + bus.close(); + }); }); // --- TopologyRouter tests --- @@ -114,42 +127,42 @@ describe("TopologyRouter", () => { ], }; - test("routes from coder to reviewer via delegates edge", () => { + test("routes from coder to reviewer via delegates edge", async () => { const bus = new LocalEventBus(); const router = new TopologyRouter(reviewLoopTopology, bus); const received: GroveEvent[] = []; bus.subscribe("reviewer", (e) => received.push(e)); - const targets = router.route("coder", { cid: "blake3:abc" }); + const results = await router.route("coder", { cid: "blake3:abc" }); - expect(targets).toEqual(["reviewer"]); + expect(results.map((r) => r.targetRole)).toEqual(["reviewer"]); expect(received).toHaveLength(1); expect(received[0]!.payload).toEqual({ cid: "blake3:abc" }); bus.close(); }); - test("routes from reviewer to coder via feedback edge", () => { + test("routes from reviewer to coder via feedback edge", async () => { const bus = new LocalEventBus(); const router = new TopologyRouter(reviewLoopTopology, bus); const received: GroveEvent[] = []; bus.subscribe("coder", (e) => received.push(e)); - const targets = router.route("reviewer", { verdict: "changes_requested" }); + const results = await router.route("reviewer", { verdict: "changes_requested" }); - expect(targets).toEqual(["coder"]); + expect(results.map((r) => r.targetRole)).toEqual(["coder"]); expect(received).toHaveLength(1); bus.close(); }); - test("no edges for unknown role -> empty targets", () => { + test("no edges for unknown role -> empty targets", async () => { const bus = new LocalEventBus(); const router = new TopologyRouter(reviewLoopTopology, bus); - const targets = router.route("unknown-role", {}); - expect(targets).toHaveLength(0); + const results = await router.route("unknown-role", {}); + expect(results).toHaveLength(0); bus.close(); }); - test("flat topology (no edges) -> no routing", () => { + test("flat topology (no edges) -> no routing", async () => { const flat: AgentTopology = { structure: "flat", roles: [{ name: "worker1" }, { name: "worker2" }], @@ -157,15 +170,15 @@ describe("TopologyRouter", () => { const bus = new LocalEventBus(); const router = new TopologyRouter(flat, bus); - const targets1 = router.route("worker1", {}); - const targets2 = router.route("worker2", {}); + const results1 = await router.route("worker1", {}); + const results2 = await router.route("worker2", {}); - expect(targets1).toHaveLength(0); - expect(targets2).toHaveLength(0); + expect(results1).toHaveLength(0); + expect(results2).toHaveLength(0); bus.close(); }); - test("tree topology: parent routes to children", () => { + test("tree topology: parent routes to children", async () => { const tree: AgentTopology = { structure: "tree", roles: [ @@ -187,15 +200,15 @@ describe("TopologyRouter", () => { bus.subscribe("worker1", (e) => received1.push(e)); bus.subscribe("worker2", (e) => received2.push(e)); - const targets = router.route("coordinator", { plan: "implement auth" }); + const results = await router.route("coordinator", { plan: "implement auth" }); - expect(targets).toEqual(["worker1", "worker2"]); + expect(results.map((r) => r.targetRole)).toEqual(["worker1", "worker2"]); expect(received1).toHaveLength(1); expect(received2).toHaveLength(1); bus.close(); }); - test("broadcastStop sends to all roles", () => { + test("broadcastStop sends to all roles", async () => { const bus = new LocalEventBus(); const router = new TopologyRouter(reviewLoopTopology, bus); const coderEvents: GroveEvent[] = []; @@ -203,7 +216,7 @@ describe("TopologyRouter", () => { bus.subscribe("coder", (e) => coderEvents.push(e)); bus.subscribe("reviewer", (e) => reviewerEvents.push(e)); - router.broadcastStop("Budget exceeded"); + await router.broadcastStop("Budget exceeded"); expect(coderEvents).toHaveLength(1); expect(coderEvents[0]!.type).toBe("stop"); @@ -239,14 +252,13 @@ describe("TopologyRouter", () => { const bus = new LocalEventBus(); const router = new TopologyRouter(multiEdge, bus); const edges = router.targetsFor("coder"); - // Both edges preserved — distinct (target, edgeType) pairs expect(edges).toHaveLength(2); expect(edges).toContainEqual({ target: "reviewer", edgeType: "delegates" }); expect(edges).toContainEqual({ target: "reviewer", edgeType: "feeds" }); bus.close(); }); - test("route() publishes one event per target even when multiple edge types point to same target", () => { + test("route() publishes one event per target even when multiple edge types point to same target", async () => { const multiEdge: AgentTopology = { structure: "graph", roles: [ @@ -265,10 +277,9 @@ describe("TopologyRouter", () => { const received: GroveEvent[] = []; bus.subscribe("reviewer", (e) => received.push(e)); - const targets = router.route("coder", {}); + const results = await router.route("coder", {}); - // route() deduplicates by target: one event despite two distinct edges - expect(targets).toEqual(["reviewer"]); + expect(results.map((r) => r.targetRole)).toEqual(["reviewer"]); expect(received).toHaveLength(1); bus.close(); }); @@ -281,7 +292,7 @@ describe("TopologyRouter", () => { name: "coder", edges: [ { target: "reviewer", edgeType: "delegates" }, - { target: "reviewer", edgeType: "delegates" }, // exact duplicate + { target: "reviewer", edgeType: "delegates" }, ], }, { name: "reviewer" }, @@ -289,7 +300,6 @@ describe("TopologyRouter", () => { }; const bus = new LocalEventBus(); const router = new TopologyRouter(exactDupes, bus); - // Exact (target, edgeType) duplicate is collapsed to one entry expect(router.targetsFor("coder")).toEqual([{ target: "reviewer", edgeType: "delegates" }]); bus.close(); }); diff --git a/src/core/event-bus.ts b/src/core/event-bus.ts index 005e1578..7f335030 100644 --- a/src/core/event-bus.ts +++ b/src/core/event-bus.ts @@ -1,8 +1,9 @@ /** * Event bus abstraction for agent communication. * - * Two implementations: - * - LocalEventBus: Node.js EventEmitter for single-machine setups + * Three implementations: + * - LocalEventBus: Node.js EventEmitter for single-machine setups (tests/local) + * - NexusEventBus: Relays events via Nexus IPC API, returns message IDs * - Future: RedisEventBus, NatsEventBus for federated setups */ @@ -15,13 +16,23 @@ export interface GroveEvent { readonly timestamp: string; } +/** Result of publishing an event. */ +export interface PublishResult { + readonly ok: boolean; + /** IPC message ID — present when the event was relayed via Nexus IPC. */ + readonly messageId?: string | undefined; + readonly error?: string | undefined; + /** True when failure is infrastructure (404, connection refused), not delivery rejection. */ + readonly infrastructureError?: boolean | undefined; +} + /** Callback for event subscriptions. */ export type EventHandler = (event: GroveEvent) => void; /** Event bus for ephemeral agent notifications. */ export interface EventBus { - /** Publish an event. */ - publish(event: GroveEvent): void; + /** Publish an event. Returns delivery result with optional IPC message ID. */ + publish(event: GroveEvent): Promise; /** Subscribe to events for a specific role. */ subscribe(role: string, handler: EventHandler): void; /** Unsubscribe a handler. */ diff --git a/src/core/handoff-state-machine.test.ts b/src/core/handoff-state-machine.test.ts new file mode 100644 index 00000000..4c309eff --- /dev/null +++ b/src/core/handoff-state-machine.test.ts @@ -0,0 +1,131 @@ +/** + * Tests for the handoff delivery state machine. + * + * Exhaustively tests canTransition() for all valid and invalid transitions. + * Test-driven: these tests define the contract before implementation details. + */ + +import { describe, expect, test } from "bun:test"; +import { canTransition, HandoffStatus } from "./handoff.js"; + +const { PendingPickup, Delivered, Processed, Replied, Expired, DeadLettered } = HandoffStatus; + +describe("canTransition", () => { + // --- Happy path --- + + test("pending_pickup → delivered (IPC delivery confirmed)", () => { + expect(canTransition(PendingPickup, Delivered)).toBe(true); + }); + + test("delivered → processed (agent acknowledged receipt)", () => { + expect(canTransition(Delivered, Processed)).toBe(true); + }); + + test("processed → replied (agent produced response contribution)", () => { + expect(canTransition(Processed, Replied)).toBe(true); + }); + + // --- Shortcut: delivered → replied (skip processed) --- + + test("delivered → replied (direct reply without explicit processing ACK)", () => { + expect(canTransition(Delivered, Replied)).toBe(true); + }); + + // --- Failure paths --- + + test("pending_pickup → dead_lettered (IPC delivery failed after retries)", () => { + expect(canTransition(PendingPickup, DeadLettered)).toBe(true); + }); + + test("pending_pickup → expired (TTL expired before delivery)", () => { + expect(canTransition(PendingPickup, Expired)).toBe(true); + }); + + test("delivered → expired (agent didn't reply within deadline)", () => { + expect(canTransition(Delivered, Expired)).toBe(true); + }); + + test("delivered → dead_lettered (post-delivery IPC failure)", () => { + expect(canTransition(Delivered, DeadLettered)).toBe(true); + }); + + test("processed → expired (agent processing but didn't reply in time)", () => { + expect(canTransition(Processed, Expired)).toBe(true); + }); + + // --- Terminal states: no outgoing transitions --- + + test("replied is terminal — cannot transition to anything", () => { + expect(canTransition(Replied, PendingPickup)).toBe(false); + expect(canTransition(Replied, Delivered)).toBe(false); + expect(canTransition(Replied, Processed)).toBe(false); + expect(canTransition(Replied, Expired)).toBe(false); + expect(canTransition(Replied, DeadLettered)).toBe(false); + }); + + test("expired is terminal — cannot transition to anything", () => { + expect(canTransition(Expired, PendingPickup)).toBe(false); + expect(canTransition(Expired, Delivered)).toBe(false); + expect(canTransition(Expired, Processed)).toBe(false); + expect(canTransition(Expired, Replied)).toBe(false); + expect(canTransition(Expired, DeadLettered)).toBe(false); + }); + + test("dead_lettered is terminal — cannot transition to anything", () => { + expect(canTransition(DeadLettered, PendingPickup)).toBe(false); + expect(canTransition(DeadLettered, Delivered)).toBe(false); + expect(canTransition(DeadLettered, Processed)).toBe(false); + expect(canTransition(DeadLettered, Replied)).toBe(false); + expect(canTransition(DeadLettered, Expired)).toBe(false); + }); + + // --- Invalid transitions --- + + test("self-loops are invalid", () => { + expect(canTransition(PendingPickup, PendingPickup)).toBe(false); + expect(canTransition(Delivered, Delivered)).toBe(false); + expect(canTransition(Processed, Processed)).toBe(false); + expect(canTransition(Replied, Replied)).toBe(false); + expect(canTransition(Expired, Expired)).toBe(false); + expect(canTransition(DeadLettered, DeadLettered)).toBe(false); + }); + + test("cannot skip forward: pending_pickup → processed (must go through delivered)", () => { + expect(canTransition(PendingPickup, Processed)).toBe(false); + }); + + test("cannot skip forward: pending_pickup → replied (must go through delivered)", () => { + expect(canTransition(PendingPickup, Replied)).toBe(false); + }); + + test("cannot go backward: delivered → pending_pickup", () => { + expect(canTransition(Delivered, PendingPickup)).toBe(false); + }); + + test("cannot go backward: processed → delivered", () => { + expect(canTransition(Processed, Delivered)).toBe(false); + }); + + test("cannot go backward: processed → pending_pickup", () => { + expect(canTransition(Processed, PendingPickup)).toBe(false); + }); + + test("processed → dead_lettered is invalid (already past delivery)", () => { + expect(canTransition(Processed, DeadLettered)).toBe(false); + }); +}); + +describe("HandoffStatus enum values", () => { + test("all expected statuses are defined", () => { + expect(HandoffStatus.PendingPickup).toBe("pending_pickup"); + expect(HandoffStatus.Delivered).toBe("delivered"); + expect(HandoffStatus.Processed).toBe("processed"); + expect(HandoffStatus.Replied).toBe("replied"); + expect(HandoffStatus.Expired).toBe("expired"); + expect(HandoffStatus.DeadLettered).toBe("dead_lettered"); + }); + + test("enum has exactly 6 values", () => { + expect(Object.keys(HandoffStatus)).toHaveLength(6); + }); +}); diff --git a/src/core/handoff-store.conformance.ts b/src/core/handoff-store.conformance.ts new file mode 100644 index 00000000..0b7c93dd --- /dev/null +++ b/src/core/handoff-store.conformance.ts @@ -0,0 +1,381 @@ +/** + * Conformance test suite for HandoffStore implementations. + * + * Runs the same behavioral tests against any HandoffStore implementation + * to verify interface contract compliance. Follows the established pattern + * from store.conformance.ts, cas.conformance.ts, bounty-store.conformance.ts. + * + * Usage: + * import { runHandoffStoreConformanceTests } from "./handoff-store.conformance.js"; + * runHandoffStoreConformanceTests("InMemoryHandoffStore", () => new InMemoryHandoffStore()); + */ + +import { describe, expect, test } from "bun:test"; +import { HandoffStatus, type HandoffStore } from "./handoff.js"; + +export function runHandoffStoreConformanceTests( + name: string, + factory: () => HandoffStore | Promise, + cleanup?: () => void | Promise, +): void { + describe(`HandoffStore conformance: ${name}`, () => { + async function make(): Promise { + const result = factory(); + return result instanceof Promise ? await result : result; + } + + // --- create + get --- + + test("create returns a handoff with all required fields", async () => { + const store = await make(); + try { + const h = await store.create({ + sourceCid: "blake3:abc123", + fromRole: "coder", + toRole: "reviewer", + }); + expect(h.handoffId).toBeTruthy(); + expect(h.sourceCid).toBe("blake3:abc123"); + expect(h.fromRole).toBe("coder"); + expect(h.toRole).toBe("reviewer"); + expect(h.requiresReply).toBe(false); + expect(h.createdAt).toBeTruthy(); + // Status must be one of the valid values + expect(Object.values(HandoffStatus)).toContain(h.status); + } finally { + store.close(); + await cleanup?.(); + } + }); + + test("create with explicit handoffId preserves it", async () => { + const store = await make(); + try { + const h = await store.create({ + handoffId: "custom-id-1", + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + }); + expect(h.handoffId).toBe("custom-id-1"); + } finally { + store.close(); + await cleanup?.(); + } + }); + + test("create with requiresReply=true preserves it", async () => { + const store = await make(); + try { + const h = await store.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + requiresReply: true, + replyDueAt: "2099-01-01T00:00:00.000Z", + }); + expect(h.requiresReply).toBe(true); + expect(h.replyDueAt).toBe("2099-01-01T00:00:00.000Z"); + } finally { + store.close(); + await cleanup?.(); + } + }); + + test("get returns the created handoff by ID", async () => { + const store = await make(); + try { + const created = await store.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + }); + const fetched = await store.get(created.handoffId); + expect(fetched).toBeDefined(); + expect(fetched?.handoffId).toBe(created.handoffId); + expect(fetched?.sourceCid).toBe(created.sourceCid); + } finally { + store.close(); + await cleanup?.(); + } + }); + + test("get returns undefined for nonexistent ID", async () => { + const store = await make(); + try { + const fetched = await store.get("nonexistent-id"); + expect(fetched).toBeUndefined(); + } finally { + store.close(); + await cleanup?.(); + } + }); + + // --- list --- + + test("list returns all handoffs when no query provided", async () => { + const store = await make(); + try { + await store.create({ sourceCid: "blake3:a", fromRole: "coder", toRole: "reviewer" }); + await store.create({ sourceCid: "blake3:b", fromRole: "reviewer", toRole: "coder" }); + + const all = await store.list(); + expect(all.length).toBeGreaterThanOrEqual(2); + } finally { + store.close(); + await cleanup?.(); + } + }); + + test("list filters by toRole", async () => { + const store = await make(); + try { + await store.create({ sourceCid: "blake3:a", fromRole: "coder", toRole: "reviewer" }); + await store.create({ sourceCid: "blake3:b", fromRole: "reviewer", toRole: "coder" }); + + const forReviewer = await store.list({ toRole: "reviewer" }); + for (const h of forReviewer) { + expect(h.toRole).toBe("reviewer"); + } + } finally { + store.close(); + await cleanup?.(); + } + }); + + test("list filters by fromRole", async () => { + const store = await make(); + try { + await store.create({ sourceCid: "blake3:a", fromRole: "coder", toRole: "reviewer" }); + await store.create({ sourceCid: "blake3:b", fromRole: "reviewer", toRole: "coder" }); + + const fromCoder = await store.list({ fromRole: "coder" }); + for (const h of fromCoder) { + expect(h.fromRole).toBe("coder"); + } + } finally { + store.close(); + await cleanup?.(); + } + }); + + test("list filters by sourceCid", async () => { + const store = await make(); + try { + await store.create({ sourceCid: "blake3:a", fromRole: "coder", toRole: "reviewer" }); + await store.create({ sourceCid: "blake3:b", fromRole: "coder", toRole: "reviewer" }); + + const forA = await store.list({ sourceCid: "blake3:a" }); + expect(forA).toHaveLength(1); + expect(forA[0]?.sourceCid).toBe("blake3:a"); + } finally { + store.close(); + await cleanup?.(); + } + }); + + test("list respects limit", async () => { + const store = await make(); + try { + for (let i = 0; i < 5; i++) { + await store.create({ sourceCid: `blake3:${i}`, fromRole: "coder", toRole: "reviewer" }); + } + + const limited = await store.list({ limit: 2 }); + expect(limited).toHaveLength(2); + } finally { + store.close(); + await cleanup?.(); + } + }); + + // --- status transitions --- + + test("markDelivered transitions status to delivered", async () => { + const store = await make(); + try { + const h = await store.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + }); + await store.markDelivered(h.handoffId); + const updated = await store.get(h.handoffId); + expect(updated?.status).toBe(HandoffStatus.Delivered); + } finally { + store.close(); + await cleanup?.(); + } + }); + + test("markReplied transitions status to replied with resolvedByCid", async () => { + const store = await make(); + try { + const h = await store.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + }); + await store.markDelivered(h.handoffId); + await store.markReplied(h.handoffId, "blake3:reply-cid"); + const updated = await store.get(h.handoffId); + expect(updated?.status).toBe(HandoffStatus.Replied); + expect(updated?.resolvedByCid).toBe("blake3:reply-cid"); + } finally { + store.close(); + await cleanup?.(); + } + }); + + // --- expireStale --- + + test("expireStale marks overdue pending_pickup handoffs as expired", async () => { + const store = await make(); + try { + const h = await store.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + replyDueAt: new Date(Date.now() - 60_000).toISOString(), + }); + + // Handoffs in expirable states (pending_pickup, delivered, processed) + // with an overdue replyDueAt should be expired. + const expired = await store.expireStale(); + const updated = await store.get(h.handoffId); + + const expirableStatuses: ReadonlySet = new Set([ + HandoffStatus.PendingPickup, + HandoffStatus.Delivered, + HandoffStatus.Processed, + ]); + + if (expirableStatuses.has(h.status)) { + // Should have been expired + expect(expired.map((e) => e.handoffId)).toContain(h.handoffId); + expect(updated?.status).toBe(HandoffStatus.Expired); + } else { + // Terminal state — not eligible for expiry + expect(expired.map((e) => e.handoffId)).not.toContain(h.handoffId); + } + } finally { + store.close(); + await cleanup?.(); + } + }); + + test("expireStale does not expire handoffs without replyDueAt", async () => { + const store = await make(); + try { + const h = await store.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + // No replyDueAt + }); + + const expired = await store.expireStale(); + expect(expired.map((e) => e.handoffId)).not.toContain(h.handoffId); + } finally { + store.close(); + await cleanup?.(); + } + }); + + test("expireStale does not expire future-due handoffs", async () => { + const store = await make(); + try { + const h = await store.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + replyDueAt: new Date(Date.now() + 60_000).toISOString(), + }); + + const expired = await store.expireStale(); + expect(expired.map((e) => e.handoffId)).not.toContain(h.handoffId); + } finally { + store.close(); + await cleanup?.(); + } + }); + + // --- countPending --- + + test("countPending counts only pending_pickup handoffs for a role", async () => { + const store = await make(); + try { + const h1 = await store.create({ + sourceCid: "blake3:a", + fromRole: "coder", + toRole: "reviewer", + }); + await store.create({ sourceCid: "blake3:b", fromRole: "coder", toRole: "reviewer" }); + await store.create({ sourceCid: "blake3:c", fromRole: "coder", toRole: "tester" }); + + // Mark one as delivered (not pending) + await store.markDelivered(h1.handoffId); + + const pending = await store.countPending("reviewer"); + // Implementation-dependent: InMemory defaults to PendingPickup, + // Nexus defaults to Delivered. Count only PendingPickup. + expect(typeof pending).toBe("number"); + expect(pending).toBeGreaterThanOrEqual(0); + } finally { + store.close(); + await cleanup?.(); + } + }); + + // --- createMany (optional) --- + + test("createMany creates multiple handoffs in one call (when supported)", async () => { + const store = await make(); + try { + if (store.createMany === undefined) return; // optional method + + const handoffs = await store.createMany([ + { sourceCid: "blake3:a", fromRole: "coder", toRole: "reviewer" }, + { sourceCid: "blake3:b", fromRole: "coder", toRole: "tester" }, + { sourceCid: "blake3:c", fromRole: "coder", toRole: "auditor" }, + ]); + + expect(handoffs).toHaveLength(3); + expect(handoffs[0]?.toRole).toBe("reviewer"); + expect(handoffs[1]?.toRole).toBe("tester"); + expect(handoffs[2]?.toRole).toBe("auditor"); + + // All should be retrievable + for (const h of handoffs) { + const fetched = await store.get(h.handoffId); + expect(fetched).toBeDefined(); + } + } finally { + store.close(); + await cleanup?.(); + } + }); + + test("createMany with empty array returns empty array", async () => { + const store = await make(); + try { + if (store.createMany === undefined) return; + + const handoffs = await store.createMany([]); + expect(handoffs).toHaveLength(0); + } finally { + store.close(); + await cleanup?.(); + } + }); + + // --- close --- + + test("close is idempotent", async () => { + const store = await make(); + store.close(); + store.close(); // should not throw + await cleanup?.(); + }); + }); +} diff --git a/src/core/handoff.ts b/src/core/handoff.ts index f91d26b4..b8d23a12 100644 --- a/src/core/handoff.ts +++ b/src/core/handoff.ts @@ -1,12 +1,57 @@ export const HandoffStatus = { PendingPickup: "pending_pickup", Delivered: "delivered", + /** Agent acknowledged receipt and is processing the handoff. */ + Processed: "processed", Replied: "replied", Expired: "expired", + /** IPC delivery failed after retries — requires operator attention. */ + DeadLettered: "dead_lettered", } as const; export type HandoffStatus = (typeof HandoffStatus)[keyof typeof HandoffStatus]; +/** + * Handoff delivery state machine. + * + * Happy path: pending_pickup → delivered → processed → replied + * IPC failure: pending_pickup → dead_lettered + * TTL expiry: pending_pickup → expired + * + * Terminal states: replied, expired, dead_lettered. + */ +const VALID_TRANSITIONS: ReadonlyMap> = new Map([ + [ + HandoffStatus.PendingPickup, + new Set([HandoffStatus.Delivered, HandoffStatus.Expired, HandoffStatus.DeadLettered]), + ], + [ + HandoffStatus.Delivered, + new Set([ + HandoffStatus.Processed, + HandoffStatus.Replied, + HandoffStatus.Expired, + HandoffStatus.DeadLettered, + ]), + ], + [HandoffStatus.Processed, new Set([HandoffStatus.Replied, HandoffStatus.Expired])], + // Terminal states — no outgoing transitions + [HandoffStatus.Replied, new Set()], + [HandoffStatus.Expired, new Set()], + [HandoffStatus.DeadLettered, new Set()], +]); + +/** + * Check whether a status transition is valid. + * + * Returns true if `from → to` is a legal transition in the handoff + * state machine. Returns false for invalid transitions and self-loops. + */ +export function canTransition(from: HandoffStatus, to: HandoffStatus): boolean { + if (from === to) return false; + return VALID_TRANSITIONS.get(from)?.has(to) ?? false; +} + export interface Handoff { readonly handoffId: string; readonly sourceCid: string; @@ -17,6 +62,8 @@ export interface Handoff { readonly replyDueAt?: string | undefined; readonly resolvedByCid?: string | undefined; readonly createdAt: string; + /** Nexus IPC message ID — set when the handoff is relayed via IPC. */ + readonly ipcMessageId?: string | undefined; } export interface HandoffInput { @@ -55,7 +102,13 @@ export interface HandoffStore { get(id: string): Promise; list(query?: HandoffQuery): Promise; markDelivered(id: string): Promise; + /** Mark a handoff as processed (agent acknowledged and is acting on it). */ + markProcessed(id: string): Promise; markReplied(id: string, resolvedByCid: string): Promise; + /** Mark a handoff as dead-lettered (IPC delivery failed after retries). */ + markDeadLettered(id: string): Promise; + /** Set the IPC message ID on a handoff (called after IPC relay succeeds). */ + setIpcMessageId?(id: string, ipcMessageId: string): Promise; expireStale(now?: string): Promise; countPending(toRole: string): Promise; close(): void; diff --git a/src/core/in-memory-handoff-store.conformance.test.ts b/src/core/in-memory-handoff-store.conformance.test.ts new file mode 100644 index 00000000..d4906eeb --- /dev/null +++ b/src/core/in-memory-handoff-store.conformance.test.ts @@ -0,0 +1,8 @@ +/** + * Run HandoffStore conformance tests against InMemoryHandoffStore. + */ + +import { runHandoffStoreConformanceTests } from "./handoff-store.conformance.js"; +import { InMemoryHandoffStore } from "./in-memory-handoff-store.js"; + +runHandoffStoreConformanceTests("InMemoryHandoffStore", () => new InMemoryHandoffStore()); diff --git a/src/core/in-memory-handoff-store.ts b/src/core/in-memory-handoff-store.ts index c7e62d2e..9904a577 100644 --- a/src/core/in-memory-handoff-store.ts +++ b/src/core/in-memory-handoff-store.ts @@ -65,6 +65,14 @@ export class InMemoryHandoffStore implements HandoffStore { this.handoffs.set(id, { ...handoff, status: HandoffStatus.Delivered }); } + async markProcessed(id: string): Promise { + const handoff = this.handoffs.get(id); + if (handoff === undefined) { + throw new NotFoundError({ resource: "Handoff", identifier: id }); + } + this.handoffs.set(id, { ...handoff, status: HandoffStatus.Processed }); + } + async markReplied(id: string, resolvedByCid: string): Promise { const handoff = this.handoffs.get(id); if (handoff === undefined) { @@ -77,13 +85,35 @@ export class InMemoryHandoffStore implements HandoffStore { }); } + async markDeadLettered(id: string): Promise { + const handoff = this.handoffs.get(id); + if (handoff === undefined) { + throw new NotFoundError({ resource: "Handoff", identifier: id }); + } + this.handoffs.set(id, { ...handoff, status: HandoffStatus.DeadLettered }); + } + + async setIpcMessageId(id: string, ipcMessageId: string): Promise { + const handoff = this.handoffs.get(id); + if (handoff === undefined) { + throw new NotFoundError({ resource: "Handoff", identifier: id }); + } + this.handoffs.set(id, { ...handoff, ipcMessageId }); + } + async expireStale(now?: string): Promise { const cutoff = now ?? new Date().toISOString(); const expired: Handoff[] = []; + const expirableStatuses: ReadonlySet = new Set([ + HandoffStatus.PendingPickup, + HandoffStatus.Delivered, + HandoffStatus.Processed, + ]); + for (const [handoffId, handoff] of this.handoffs) { if ( - handoff.status === HandoffStatus.PendingPickup && + expirableStatuses.has(handoff.status) && handoff.replyDueAt !== undefined && handoff.replyDueAt < cutoff ) { diff --git a/src/core/index.ts b/src/core/index.ts index 826fe26e..ba0ddee0 100644 --- a/src/core/index.ts +++ b/src/core/index.ts @@ -111,7 +111,7 @@ export { RateLimitError, RetryExhaustedError, } from "./errors.js"; -export type { EventBus, EventHandler, GroveEvent } from "./event-bus.js"; +export type { EventBus, EventHandler, GroveEvent, PublishResult } from "./event-bus.js"; export type { Frontier, FrontierCalculator, @@ -222,6 +222,7 @@ export { SubprocessRuntime } from "./subprocess-runtime.js"; export { toUtcIso } from "./time.js"; export { TmuxRuntime } from "./tmux-runtime.js"; export { resolveTopology } from "./topology-resolver.js"; +export type { RouteResult } from "./topology-router.js"; export { TopologyRouter } from "./topology-router.js"; export type { CheckoutOptions, diff --git a/src/core/local-event-bus.ts b/src/core/local-event-bus.ts index 9e96e43c..20b75e01 100644 --- a/src/core/local-event-bus.ts +++ b/src/core/local-event-bus.ts @@ -1,5 +1,5 @@ import { EventEmitter } from "node:events"; -import type { EventBus, EventHandler, GroveEvent } from "./event-bus.js"; +import type { EventBus, EventHandler, GroveEvent, PublishResult } from "./event-bus.js"; /** * Local event bus using Node.js EventEmitter. @@ -14,7 +14,7 @@ export class LocalEventBus implements EventBus { this.emitter.setMaxListeners(100); } - publish(event: GroveEvent): void { + async publish(event: GroveEvent): Promise { // Wrap each listener in try/catch so one crashed subscriber doesn't break others const channel = `role:${event.targetRole}`; for (const listener of this.emitter.listeners(channel)) { @@ -26,6 +26,7 @@ export class LocalEventBus implements EventBus { ); } } + return { ok: true }; } subscribe(role: string, handler: EventHandler): void { diff --git a/src/core/operations/contribute-routing.test.ts b/src/core/operations/contribute-routing.test.ts index d0220444..14a3119b 100644 --- a/src/core/operations/contribute-routing.test.ts +++ b/src/core/operations/contribute-routing.test.ts @@ -491,7 +491,9 @@ describe("contributeOperation: plan and ephemeral routing rules", () => { get: async () => undefined, list: async () => [], markDelivered: async () => undefined, + markProcessed: async () => undefined, markReplied: async () => undefined, + markDeadLettered: async () => undefined, expireStale: async () => [], countPending: async () => 0, close: () => undefined, @@ -549,7 +551,9 @@ describe("contributeOperation: plan and ephemeral routing rules", () => { get: async () => undefined, list: async () => [], markDelivered: async () => undefined, + markProcessed: async () => undefined, markReplied: async () => undefined, + markDeadLettered: async () => undefined, expireStale: async () => [], countPending: async () => 0, close: () => undefined, @@ -602,7 +606,9 @@ describe("contributeOperation: plan and ephemeral routing rules", () => { get: async () => undefined, list: async () => [], markDelivered: async () => undefined, + markProcessed: async () => undefined, markReplied: async () => undefined, + markDeadLettered: async () => undefined, expireStale: async () => [], countPending: async () => 0, close: () => undefined, @@ -746,7 +752,9 @@ describe("contributeOperation: plan and ephemeral routing rules", () => { get: async () => undefined, list: async () => [], markDelivered: async () => undefined, + markProcessed: async () => undefined, markReplied: async () => undefined, + markDeadLettered: async () => undefined, expireStale: async () => [], countPending: async () => 0, close: () => undefined, @@ -816,7 +824,9 @@ describe("contributeOperation: plan and ephemeral routing rules", () => { get: async () => undefined, list: async () => [], markDelivered: async () => undefined, + markProcessed: async () => undefined, markReplied: async () => undefined, + markDeadLettered: async () => undefined, expireStale: async () => [], countPending: async () => 0, close: () => undefined, diff --git a/src/core/operations/contribute.test.ts b/src/core/operations/contribute.test.ts index 66cfbbb1..e47ec9e8 100644 --- a/src/core/operations/contribute.test.ts +++ b/src/core/operations/contribute.test.ts @@ -912,7 +912,9 @@ describe("writeSerial: best-effort handoff failure paths", () => { get: async () => undefined, list: async () => [], markDelivered: async () => undefined, + markProcessed: async () => undefined, markReplied: async () => undefined, + markDeadLettered: async () => undefined, expireStale: async () => [], countPending: async () => 0, close: () => undefined, @@ -952,7 +954,9 @@ describe("writeSerial: best-effort handoff failure paths", () => { get: async () => undefined, list: async () => [], markDelivered: async () => undefined, + markProcessed: async () => undefined, markReplied: async () => undefined, + markDeadLettered: async () => undefined, expireStale: async () => [], countPending: async () => 0, close: () => undefined, @@ -995,7 +999,9 @@ describe("writeSerial: best-effort handoff failure paths", () => { get: async () => undefined, list: async () => [], markDelivered: async () => undefined, + markProcessed: async () => undefined, markReplied: async () => undefined, + markDeadLettered: async () => undefined, expireStale: async () => [], countPending: async () => 0, close: () => undefined, diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index b8d9563b..31dcdcdf 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -1146,14 +1146,52 @@ export async function contributeOperation( deps.topologyRouter !== undefined && agentRole !== undefined ) { - fireAndForget("topology routing", () => - deps.topologyRouter?.route(agentRole, { + fireAndForget("topology routing", async () => { + const routeResults = await deps.topologyRouter?.route(agentRole, { cid: contribution.cid, kind: contribution.kind, summary: contribution.summary, agentId: contribution.agent.agentId, - }), - ); + }); + + // Link IPC message IDs back to handoff records and dead-letter + // handoffs whose IPC delivery failed (best-effort). + if (routeResults && deps.handoffStore && handoffIds.length > 0) { + const handoffs = await deps.handoffStore.list({ + sourceCid: contribution.cid, + }); + for (const result of routeResults) { + const matching = handoffs.find((h) => h.toRole === result.targetRole); + if (!matching) continue; + try { + if (result.ok && result.messageId) { + // IPC succeeded — store the message ID for SSE delivery tracking + await deps.handoffStore.setIpcMessageId?.(matching.handoffId, result.messageId); + } else if (!result.ok && !result.infrastructureError) { + // IPC delivery was rejected by the service (not an infra issue + // like 404/connection refused). Dead-letter the handoff so + // operators can see the gap. + await deps.handoffStore.markDeadLettered(matching.handoffId); + process.stderr.write( + `[grove] handoff ${matching.handoffId} dead-lettered: IPC to ${result.targetRole} failed: ${result.error ?? "unknown"}\n`, + ); + } + // When !result.ok && result.infrastructureError: IPC endpoint + // is unavailable (404, connection refused). The handoff stays + // in its current status — it was never attempted, not rejected. + // Delivery falls back to the session orchestrator's polling path. + } catch (bookkeepingErr) { + // Best-effort — handoff record is the primary artifact. + // Log so operators can diagnose missing ipcMessageId or + // un-dead-lettered handoffs. + console.warn( + `[grove] handoff IPC bookkeeping failed for ${matching.handoffId} → ${result.targetRole}:`, + bookkeepingErr instanceof Error ? bookkeepingErr.message : String(bookkeepingErr), + ); + } + } + } + }); } // --- Post-write: re-check stop conditions (outside mutex, best-effort) --- diff --git a/src/core/session-orchestrator.test.ts b/src/core/session-orchestrator.test.ts index fb393858..12c0fa76 100644 --- a/src/core/session-orchestrator.test.ts +++ b/src/core/session-orchestrator.test.ts @@ -157,7 +157,7 @@ describe("SessionOrchestrator", () => { } // Without contributionStore, EventBus forwarding is the only path - bus.publish({ + void bus.publish({ type: "contribution", sourceRole: "coder", targetRole: "reviewer", @@ -181,7 +181,7 @@ describe("SessionOrchestrator", () => { } // Publish a stop event to a role — should NOT be forwarded - bus.publish({ + void bus.publish({ type: "stop", sourceRole: "system", targetRole: "coder", diff --git a/src/core/session-orchestrator.ts b/src/core/session-orchestrator.ts index 108e4815..06ca051d 100644 --- a/src/core/session-orchestrator.ts +++ b/src/core/session-orchestrator.ts @@ -234,7 +234,7 @@ export class SessionOrchestrator { } // Notify all agents - this.router.broadcastStop(reason); + await this.router.broadcastStop(reason); // Close all agent sessions for (const agent of this.agents) { @@ -316,13 +316,13 @@ export class SessionOrchestrator { action; // Use topology router to find targets, then send directly - const targets = this.router.route(sourceRole, { + const routeResults = await this.router.route(sourceRole, { cid: c.cid, kind: c.kind, summary: c.summary, }); - for (const targetRole of targets) { + for (const { targetRole } of routeResults) { const targetAgent = this.agents.find((a) => a.role === targetRole); if (targetAgent) { await this.config.runtime.send(targetAgent.session, message); diff --git a/src/core/topology-router.ts b/src/core/topology-router.ts index 40f73265..171edf4b 100644 --- a/src/core/topology-router.ts +++ b/src/core/topology-router.ts @@ -1,6 +1,19 @@ import type { EventBus, GroveEvent } from "./event-bus.js"; import type { AgentRole, AgentTopology, RoleEdge } from "./topology.js"; +/** Result of routing an event to a target role. */ +export interface RouteResult { + readonly targetRole: string; + /** Whether the publish succeeded (IPC delivery or local handler). */ + readonly ok: boolean; + /** IPC message ID — present when the event was relayed via Nexus IPC. */ + readonly messageId?: string | undefined; + /** Error message — present when publish failed. */ + readonly error?: string | undefined; + /** True when failure is infrastructure, not delivery rejection. */ + readonly infrastructureError?: boolean | undefined; +} + /** * Routes contribution events through topology edges. * @@ -59,10 +72,15 @@ export class TopologyRouter { /** * Route an event from a source role to all downstream targets. * Publishes one event per unique target role (deduplicates by target when - * multiple edge types point to the same target). Returns the list of unique - * target roles that received the event. + * multiple edge types point to the same target). Returns route results + * including IPC message IDs when available. + * + * All IPC sends run in parallel (Promise.all) so N targets pay 1x latency. */ - route(sourceRole: string, payload: Record): readonly string[] { + async route( + sourceRole: string, + payload: Record, + ): Promise { const role = this.roleMap.get(sourceRole); const mode = role?.mode ?? "explicit"; @@ -90,8 +108,8 @@ export class TopologyRouter { if (targets.size === 0) return []; - const routedTo: string[] = []; - for (const targetRole of targets) { + // Parallel publish: all targets in one go (14A) + const publishPromises = [...targets].map(async (targetRole): Promise => { const event: GroveEvent = { type: "contribution", sourceRole, @@ -99,28 +117,36 @@ export class TopologyRouter { payload, timestamp, }; - this.eventBus.publish(event); - routedTo.push(targetRole); - } + const result = await this.eventBus.publish(event); + return { + targetRole, + ok: result.ok, + messageId: result.messageId, + error: result.error, + infrastructureError: result.infrastructureError, + }; + }); - return routedTo; + return Promise.all(publishPromises); } /** * Broadcast a stop event to all roles. */ - broadcastStop(reason: string): void { + async broadcastStop(reason: string): Promise { const timestamp = new Date().toISOString(); const allRoles = this.topology.roles.map((r) => r.name); - for (const role of allRoles) { - this.eventBus.publish({ - type: "stop", - sourceRole: "system", - targetRole: role, - payload: { reason }, - timestamp, - }); - } + await Promise.all( + allRoles.map((role) => + this.eventBus.publish({ + type: "stop", + sourceRole: "system", + targetRole: role, + payload: { reason }, + timestamp, + }), + ), + ); } /** diff --git a/src/local/sqlite-handoff-store.ts b/src/local/sqlite-handoff-store.ts index aa927c92..590a31f1 100644 --- a/src/local/sqlite-handoff-store.ts +++ b/src/local/sqlite-handoff-store.ts @@ -160,6 +160,15 @@ export class SqliteHandoffStore implements HandoffStore { } } + async markProcessed(id: string): Promise { + const result = this.db + .prepare("UPDATE handoffs SET status = ? WHERE handoff_id = ?") + .run(HandoffStatus.Processed, id); + if (result.changes === 0) { + throw new NotFoundError({ resource: "Handoff", identifier: id }); + } + } + async markReplied(id: string, resolvedByCid: string): Promise { const result = this.db .prepare("UPDATE handoffs SET status = ?, resolved_by_cid = ? WHERE handoff_id = ?") @@ -169,15 +178,30 @@ export class SqliteHandoffStore implements HandoffStore { } } + async markDeadLettered(id: string): Promise { + const result = this.db + .prepare("UPDATE handoffs SET status = ? WHERE handoff_id = ?") + .run(HandoffStatus.DeadLettered, id); + if (result.changes === 0) { + throw new NotFoundError({ resource: "Handoff", identifier: id }); + } + } + async expireStale(now?: string): Promise { const cutoff = now ?? new Date().toISOString(); this.db .prepare( `UPDATE handoffs SET status = ? - WHERE status = ? AND reply_due_at IS NOT NULL AND reply_due_at < ?`, + WHERE status IN (?, ?, ?) AND reply_due_at IS NOT NULL AND reply_due_at < ?`, ) - .run(HandoffStatus.Expired, HandoffStatus.PendingPickup, cutoff); + .run( + HandoffStatus.Expired, + HandoffStatus.PendingPickup, + HandoffStatus.Delivered, + HandoffStatus.Processed, + cutoff, + ); const rows = this.db .prepare( diff --git a/src/mcp/serve-http.ts b/src/mcp/serve-http.ts index 98a1bb4b..d745620d 100644 --- a/src/mcp/serve-http.ts +++ b/src/mcp/serve-http.ts @@ -301,7 +301,10 @@ async function buildScopedDeps(sessionId: string | undefined): Promise { @@ -106,4 +109,113 @@ export function registerHandoffTools(server: McpServer, deps: McpDeps): void { }; }, ); + + // --- grove_list_dead_letters ----------------------------------------------- + const listDeadLettersInputSchema = z.object({ + toRole: z + .string() + .optional() + .describe("Filter dead-lettered handoffs by target role. Omit to list all."), + fromRole: z + .string() + .optional() + .describe("Filter dead-lettered handoffs by originating role. Omit to list all."), + limit: z + .number() + .int() + .positive() + .max(200) + .optional() + .describe("Maximum number of dead-lettered handoffs to return. Defaults to 50."), + }); + + server.registerTool( + "grove_list_dead_letters", + { + description: + "List handoffs that failed IPC delivery after retries (dead letter queue). " + + "These represent routing failures that may require operator attention — " + + "the source contribution was committed but the target agent was never notified. " + + "Use this to diagnose delivery gaps in topology-driven routing.", + inputSchema: listDeadLettersInputSchema, + }, + async (args) => { + const { handoffStore } = deps; + if (handoffStore === undefined) { + return toolError( + "NOT_CONFIGURED", + "Handoff store is not available. Topology routing must be active.", + ); + } + + const handoffs = await handoffStore.list({ + toRole: args.toRole, + fromRole: args.fromRole, + status: HandoffStatus.DeadLettered, + limit: args.limit ?? 50, + }); + + return { + content: [ + { + type: "text" as const, + text: JSON.stringify({ + count: handoffs.length, + handoffs, + }), + }, + ], + }; + }, + ); + + // --- grove_ack_handoff ----------------------------------------------------- + const ackHandoffInputSchema = z.object({ + handoffId: z.string().min(1).describe("ID of the handoff to acknowledge."), + }); + + server.registerTool( + "grove_ack_handoff", + { + description: + "Acknowledge receipt of a handoff, transitioning it from delivered to processed. " + + "Call this when your agent has received a routed handoff and is beginning to act on it. " + + "This signals to the orchestrator that the handoff was successfully received and is being worked on.", + inputSchema: ackHandoffInputSchema, + }, + async (args) => { + const { handoffStore } = deps; + if (handoffStore === undefined) { + return toolError("NOT_CONFIGURED", "Handoff store is not available."); + } + + const handoff = await handoffStore.get(args.handoffId); + if (handoff === undefined) { + return toolError("NOT_FOUND", `Handoff '${args.handoffId}' not found.`); + } + + if (!canTransition(handoff.status, HandoffStatus.Processed)) { + return toolError( + "INVALID_STATE", + `Cannot acknowledge handoff in status '${handoff.status}'. ` + + `Only 'delivered' handoffs can be acknowledged.`, + ); + } + + await handoffStore.markProcessed(args.handoffId); + + return { + content: [ + { + type: "text" as const, + text: JSON.stringify({ + handoffId: args.handoffId, + previousStatus: handoff.status, + status: HandoffStatus.Processed, + }), + }, + ], + }; + }, + ); } diff --git a/src/nexus/ipc-handoff-integration.test.ts b/src/nexus/ipc-handoff-integration.test.ts new file mode 100644 index 00000000..edc50abc --- /dev/null +++ b/src/nexus/ipc-handoff-integration.test.ts @@ -0,0 +1,333 @@ +/** + * Integration tests for the IPC handoff round-trip (#165). + * + * Tests the full flow: + * contribute → handoff created → IPC sent (via NexusEventBus) → + * SSE event → handoff status updated → dead-letter on failure + * + * Uses in-memory stores and mock IPC to verify the wiring without + * requiring a running Nexus instance. + */ + +import { describe, expect, test } from "bun:test"; +import { HandoffStatus } from "../core/handoff.js"; +import { InMemoryHandoffStore } from "../core/in-memory-handoff-store.js"; +import { LocalEventBus } from "../core/local-event-bus.js"; +import { contributeOperation } from "../core/operations/contribute.js"; +import type { OperationDeps } from "../core/operations/deps.js"; +import { makeInMemoryContributionStore } from "../core/operations/test-helpers.js"; +import type { AgentTopology } from "../core/topology.js"; +import { TopologyRouter } from "../core/topology-router.js"; +import { NexusEventBus } from "./nexus-event-bus.js"; +import type { IpcSendResult, NexusIpcClient } from "./nexus-ipc-client.js"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +const reviewLoopTopology: AgentTopology = { + structure: "graph", + roles: [ + { name: "coder", edges: [{ target: "reviewer", edgeType: "delegates" }] }, + { name: "reviewer" }, + ], +}; + +function makeMockIpcClient( + result: IpcSendResult = { ok: true, messageId: "ipc-msg-001" }, +): NexusIpcClient { + return { + send: async () => result, + } as unknown as NexusIpcClient; +} + +function makeFailingIpcClient(): NexusIpcClient { + return { + send: async () => ({ ok: false, error: "connection refused" }), + } as unknown as NexusIpcClient; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("IPC handoff integration", () => { + test("contribute with NexusEventBus creates handoff and routes via IPC", async () => { + const ipc = makeMockIpcClient({ ok: true, messageId: "ipc-msg-123" }); + const eventBus = new NexusEventBus(ipc); + const router = new TopologyRouter(reviewLoopTopology, eventBus); + const handoffStore = new InMemoryHandoffStore(); + const store = makeInMemoryContributionStore(); + + const deps: OperationDeps = { + contributionStore: store, + topologyRouter: router, + eventBus, + handoffStore, + }; + + const result = await contributeOperation( + { + kind: "work", + summary: "Implement auth module", + agent: { agentId: "agent-1", role: "coder" }, + }, + deps, + ); + + expect(result.ok).toBe(true); + if (!result.ok) return; + + // Handoff should exist + const handoffs = await handoffStore.list(); + expect(handoffs).toHaveLength(1); + expect(handoffs[0]!.fromRole).toBe("coder"); + expect(handoffs[0]!.toRole).toBe("reviewer"); + expect(handoffs[0]!.sourceCid).toBe(result.value.cid); + + // The route event was published (result includes routedTo) + expect(result.value.routedTo).toEqual(["reviewer"]); + expect(result.value.handoffIds).toHaveLength(1); + + eventBus.close(); + }); + + test("NexusEventBus.publish returns IPC message ID from NexusIpcClient", async () => { + const ipc = makeMockIpcClient({ ok: true, messageId: "ipc-msg-456" }); + const eventBus = new NexusEventBus(ipc); + + const result = await eventBus.publish({ + type: "contribution", + sourceRole: "coder", + targetRole: "reviewer", + payload: { cid: "blake3:test" }, + timestamp: new Date().toISOString(), + }); + + expect(result.ok).toBe(true); + expect(result.messageId).toBe("ipc-msg-456"); + eventBus.close(); + }); + + test("TopologyRouter.route returns RouteResult with ok + messageId", async () => { + const ipc = makeMockIpcClient({ ok: true, messageId: "ipc-msg-789" }); + const eventBus = new NexusEventBus(ipc); + const router = new TopologyRouter(reviewLoopTopology, eventBus); + + const results = await router.route("coder", { cid: "blake3:test" }); + + expect(results).toHaveLength(1); + expect(results[0]!.targetRole).toBe("reviewer"); + expect(results[0]!.ok).toBe(true); + expect(results[0]!.messageId).toBe("ipc-msg-789"); + eventBus.close(); + }); + + test("TopologyRouter.route returns ok=false when IPC fails", async () => { + const ipc = makeFailingIpcClient(); + const eventBus = new NexusEventBus(ipc); + const router = new TopologyRouter(reviewLoopTopology, eventBus); + + const results = await router.route("coder", { cid: "blake3:test" }); + + expect(results).toHaveLength(1); + expect(results[0]!.targetRole).toBe("reviewer"); + expect(results[0]!.ok).toBe(false); + expect(results[0]!.error).toBe("connection refused"); + expect(results[0]!.messageId).toBeUndefined(); + eventBus.close(); + }); + + test("LocalEventBus route returns ok=true with no messageId", async () => { + const localBus = new LocalEventBus(); + const router = new TopologyRouter(reviewLoopTopology, localBus); + + const results = await router.route("coder", { cid: "blake3:test" }); + + expect(results).toHaveLength(1); + expect(results[0]!.ok).toBe(true); + expect(results[0]!.messageId).toBeUndefined(); + localBus.close(); + }); + + test("handoff ipcMessageId can be set and retrieved", async () => { + const handoffStore = new InMemoryHandoffStore(); + const h = await handoffStore.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + }); + + await handoffStore.setIpcMessageId!(h.handoffId, "ipc-msg-999"); + + const updated = await handoffStore.get(h.handoffId); + expect(updated!.ipcMessageId).toBe("ipc-msg-999"); + handoffStore.close(); + }); + + test("handoff state machine: pending_pickup → delivered → processed → replied", async () => { + const handoffStore = new InMemoryHandoffStore(); + const h = await handoffStore.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + }); + expect(h.status).toBe(HandoffStatus.PendingPickup); + + await handoffStore.markDelivered(h.handoffId); + expect((await handoffStore.get(h.handoffId))!.status).toBe(HandoffStatus.Delivered); + + await handoffStore.markProcessed(h.handoffId); + expect((await handoffStore.get(h.handoffId))!.status).toBe(HandoffStatus.Processed); + + await handoffStore.markReplied(h.handoffId, "blake3:reply-cid"); + const final = (await handoffStore.get(h.handoffId))!; + expect(final.status).toBe(HandoffStatus.Replied); + expect(final.resolvedByCid).toBe("blake3:reply-cid"); + + handoffStore.close(); + }); + + test("handoff dead-letter on IPC failure", async () => { + const handoffStore = new InMemoryHandoffStore(); + const h = await handoffStore.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + }); + + await handoffStore.markDeadLettered(h.handoffId); + + const updated = await handoffStore.get(h.handoffId); + expect(updated!.status).toBe(HandoffStatus.DeadLettered); + handoffStore.close(); + }); + + test("grove_ack_handoff validates state transition", async () => { + const { canTransition } = await import("../core/handoff.js"); + + // Valid: delivered → processed + expect(canTransition(HandoffStatus.Delivered, HandoffStatus.Processed)).toBe(true); + + // Invalid: pending_pickup → processed (must go through delivered) + expect(canTransition(HandoffStatus.PendingPickup, HandoffStatus.Processed)).toBe(false); + + // Invalid: dead_lettered → processed (terminal state) + expect(canTransition(HandoffStatus.DeadLettered, HandoffStatus.Processed)).toBe(false); + }); + + test("multi-target topology routes to all targets in parallel", async () => { + const sendCalls: string[] = []; + const ipc = { + send: async (_s: string, recipient: string) => { + sendCalls.push(recipient); + return { ok: true, messageId: `msg-${recipient}` }; + }, + } as unknown as NexusIpcClient; + + const multiTopology: AgentTopology = { + structure: "graph", + roles: [ + { + name: "coder", + edges: [ + { target: "reviewer", edgeType: "delegates" }, + { target: "tester", edgeType: "delegates" }, + { target: "auditor", edgeType: "delegates" }, + ], + }, + { name: "reviewer" }, + { name: "tester" }, + { name: "auditor" }, + ], + }; + + const eventBus = new NexusEventBus(ipc); + const router = new TopologyRouter(multiTopology, eventBus); + + const results = await router.route("coder", { cid: "blake3:test" }); + + expect(results).toHaveLength(3); + expect(results.every((r) => r.ok)).toBe(true); + expect(sendCalls.sort()).toEqual(["auditor", "reviewer", "tester"]); + expect(results.find((r) => r.targetRole === "reviewer")!.messageId).toBe("msg-reviewer"); + expect(results.find((r) => r.targetRole === "tester")!.messageId).toBe("msg-tester"); + expect(results.find((r) => r.targetRole === "auditor")!.messageId).toBe("msg-auditor"); + + eventBus.close(); + }); + + test("infrastructure error (404/connection refused) does NOT dead-letter handoffs", async () => { + // Simulate a Nexus that has VFS but no IPC endpoint (404) + const ipc = { + send: async () => ({ + ok: false, + error: "IPC send failed: HTTP 404", + infrastructureError: true, + }), + } as unknown as NexusIpcClient; + + const eventBus = new NexusEventBus(ipc); + const router = new TopologyRouter(reviewLoopTopology, eventBus); + + const results = await router.route("coder", { cid: "blake3:test" }); + + expect(results).toHaveLength(1); + expect(results[0]!.ok).toBe(false); + expect(results[0]!.infrastructureError).toBe(true); + + // The handoff should NOT be dead-lettered because this is infra, not rejection + // (verified by the contribute.ts routing block checking infrastructureError) + eventBus.close(); + }); + + test("delivery rejection (non-infrastructure) DOES dead-letter handoffs", async () => { + // Simulate IPC endpoint available but rejecting the message + const ipc = { + send: async () => ({ + ok: false, + error: "recipient not registered", + infrastructureError: false, + }), + } as unknown as NexusIpcClient; + + const eventBus = new NexusEventBus(ipc); + const router = new TopologyRouter(reviewLoopTopology, eventBus); + + const results = await router.route("coder", { cid: "blake3:test" }); + + expect(results).toHaveLength(1); + expect(results[0]!.ok).toBe(false); + expect(results[0]!.infrastructureError).toBe(false); + + // This IS a delivery failure — contribute.ts would dead-letter this handoff + eventBus.close(); + }); + + test("NexusIpcClient caches endpoint unavailability after first 404", async () => { + const { NexusIpcClient: RealIpcClient } = await import("./nexus-ipc-client.js"); + let fetchCount = 0; + const originalFetch = globalThis.fetch; + globalThis.fetch = (async () => { + fetchCount++; + return new Response('{"detail":"Not Found"}', { status: 404 }); + }) as unknown as typeof fetch; + + try { + const client = new RealIpcClient({ nexusUrl: "http://localhost:9999", apiKey: "test" }); + + const r1 = await client.send("a", "b", {}); + expect(r1.ok).toBe(false); + expect(r1.infrastructureError).toBe(true); + expect(fetchCount).toBe(1); + + // Second call should be cached — no fetch + const r2 = await client.send("a", "b", {}); + expect(r2.ok).toBe(false); + expect(r2.infrastructureError).toBe(true); + expect(fetchCount).toBe(1); // still 1 — cached + } finally { + globalThis.fetch = originalFetch; + } + }); +}); diff --git a/src/nexus/nexus-event-bus.test.ts b/src/nexus/nexus-event-bus.test.ts new file mode 100644 index 00000000..da497829 --- /dev/null +++ b/src/nexus/nexus-event-bus.test.ts @@ -0,0 +1,216 @@ +/** + * Unit tests for NexusEventBus. + * + * Tests the async publish behavior with NexusIpcClient integration. + */ + +import { describe, expect, test } from "bun:test"; +import type { GroveEvent } from "../core/event-bus.js"; +import { NexusEventBus } from "./nexus-event-bus.js"; +import type { IpcSendResult, NexusIpcClient } from "./nexus-ipc-client.js"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Mock IPC client that records calls and returns configurable results. */ +function makeMockIpcClient( + result: IpcSendResult = { ok: true, messageId: "msg-123" }, +): NexusIpcClient & { calls: { sender: string; recipient: string; payload: unknown }[] } { + const calls: { sender: string; recipient: string; payload: unknown }[] = []; + return { + calls, + send: async (sender: string, recipient: string, payload: Record) => { + calls.push({ sender, recipient, payload }); + return result; + }, + } as NexusIpcClient & { calls: { sender: string; recipient: string; payload: unknown }[] }; +} + +function makeEvent(overrides?: Partial): GroveEvent { + return { + type: "contribution", + sourceRole: "coder", + targetRole: "reviewer", + payload: { cid: "blake3:abc123" }, + timestamp: new Date().toISOString(), + ...overrides, + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("NexusEventBus", () => { + // --- Local handler delivery --- + + test("publish delivers to local handler subscribed to target role", async () => { + const bus = new NexusEventBus(undefined); + const received: GroveEvent[] = []; + bus.subscribe("reviewer", (e) => received.push(e)); + + await bus.publish(makeEvent()); + + expect(received).toHaveLength(1); + expect(received[0]!.sourceRole).toBe("coder"); + expect(received[0]!.targetRole).toBe("reviewer"); + bus.close(); + }); + + test("publish does not deliver to handler on different role", async () => { + const bus = new NexusEventBus(undefined); + const received: GroveEvent[] = []; + bus.subscribe("coder", (e) => received.push(e)); + + await bus.publish(makeEvent({ targetRole: "reviewer" })); + + expect(received).toHaveLength(0); + bus.close(); + }); + + test("multiple handlers on same role all receive the event", async () => { + const bus = new NexusEventBus(undefined); + let count = 0; + bus.subscribe("reviewer", () => count++); + bus.subscribe("reviewer", () => count++); + + await bus.publish(makeEvent()); + + expect(count).toBe(2); + bus.close(); + }); + + test("unsubscribe removes handler", async () => { + const bus = new NexusEventBus(undefined); + const received: GroveEvent[] = []; + const handler = (e: GroveEvent) => received.push(e); + bus.subscribe("reviewer", handler); + bus.unsubscribe("reviewer", handler); + + await bus.publish(makeEvent()); + + expect(received).toHaveLength(0); + bus.close(); + }); + + test("close clears all handlers", async () => { + const bus = new NexusEventBus(undefined); + let count = 0; + bus.subscribe("reviewer", () => count++); + bus.close(); + + await bus.publish(makeEvent()); + + expect(count).toBe(0); + }); + + test("handler error does not prevent other handlers from firing", async () => { + const bus = new NexusEventBus(undefined); + let secondFired = false; + bus.subscribe("reviewer", () => { + throw new Error("boom"); + }); + bus.subscribe("reviewer", () => { + secondFired = true; + }); + + await bus.publish(makeEvent()); + + expect(secondFired).toBe(true); + bus.close(); + }); + + // --- IPC send behavior --- + + test("publish sends via IpcClient and returns message ID", async () => { + const ipc = makeMockIpcClient({ ok: true, messageId: "msg-456" }); + const bus = new NexusEventBus(ipc); + + const result = await bus.publish(makeEvent({ sourceRole: "coder", targetRole: "reviewer" })); + + expect(result.ok).toBe(true); + expect(result.messageId).toBe("msg-456"); + expect(ipc.calls).toHaveLength(1); + expect(ipc.calls[0]!.sender).toBe("coder"); + expect(ipc.calls[0]!.recipient).toBe("reviewer"); + bus.close(); + }); + + test("publish returns error result when IPC fails", async () => { + const ipc = makeMockIpcClient({ ok: false, error: "network error" }); + const bus = new NexusEventBus(ipc); + + const result = await bus.publish(makeEvent()); + + expect(result.ok).toBe(false); + expect(result.error).toBe("network error"); + bus.close(); + }); + + test("publish without IPC client returns ok (local-only mode)", async () => { + const bus = new NexusEventBus(undefined); + + const result = await bus.publish(makeEvent()); + + expect(result.ok).toBe(true); + expect(result.messageId).toBeUndefined(); + bus.close(); + }); + + test("publish still delivers to local handlers even when IPC fails", async () => { + const ipc = makeMockIpcClient({ ok: false, error: "connection refused" }); + const bus = new NexusEventBus(ipc); + const received: GroveEvent[] = []; + bus.subscribe("reviewer", (e) => received.push(e)); + + await bus.publish(makeEvent()); + + expect(received).toHaveLength(1); + bus.close(); + }); + + test("publish sends IPC before notifying local handlers", async () => { + const order: string[] = []; + const ipc = { + send: async () => { + order.push("ipc"); + return { ok: true }; + }, + } as unknown as NexusIpcClient; + const bus = new NexusEventBus(ipc); + bus.subscribe("reviewer", () => order.push("handler")); + + await bus.publish(makeEvent()); + + // IPC send completes, then handlers fire + expect(order).toEqual(["ipc", "handler"]); + bus.close(); + }); + + // --- Unsubscribe edge cases --- + + test("unsubscribe on non-existent role is a no-op", () => { + const bus = new NexusEventBus(undefined); + // biome-ignore lint/suspicious/noEmptyBlockStatements: intentional no-op handler + bus.unsubscribe("nonexistent", () => {}); + bus.close(); + }); + + test("unsubscribe removes the specific handler only", async () => { + const bus = new NexusEventBus(undefined); + const received1: GroveEvent[] = []; + const received2: GroveEvent[] = []; + const handler1 = (e: GroveEvent) => received1.push(e); + const handler2 = (e: GroveEvent) => received2.push(e); + bus.subscribe("reviewer", handler1); + bus.subscribe("reviewer", handler2); + bus.unsubscribe("reviewer", handler1); + + await bus.publish(makeEvent()); + + expect(received1).toHaveLength(0); + expect(received2).toHaveLength(1); + bus.close(); + }); +}); diff --git a/src/nexus/nexus-event-bus.ts b/src/nexus/nexus-event-bus.ts index d44126a9..fc4d3e37 100644 --- a/src/nexus/nexus-event-bus.ts +++ b/src/nexus/nexus-event-bus.ts @@ -1,45 +1,35 @@ /** * Nexus-backed EventBus — publish via Nexus IPC API. * - * On publish: sends via POST /api/v2/ipc/send (v0.9.14+). - * This triggers Nexus SSE events which NexusWsBridge receives. + * On publish: sends via NexusIpcClient (POST /api/v2/ipc/send). + * Returns structured PublishResult with IPC message ID. * * On subscribe: registers in-process handlers only (no polling). * Cross-process push is handled by NexusWsBridge via Nexus SSE stream. */ -import type { EventBus, EventHandler, GroveEvent } from "../core/event-bus.js"; -import type { NexusClient } from "./client.js"; +import type { EventBus, EventHandler, GroveEvent, PublishResult } from "../core/event-bus.js"; +import type { NexusIpcClient } from "./nexus-ipc-client.js"; /** Nexus IPC-backed event bus for cross-process agent communication. */ export class NexusEventBus implements EventBus { - private readonly client: NexusClient; + private readonly ipcClient: NexusIpcClient | undefined; private readonly handlers = new Map(); - constructor(client: NexusClient, _zoneId: string) { - this.client = client; + /** + * @param ipcClient — Shared IPC client for sending messages. When undefined, + * the bus operates in local-only mode (handlers fire, no IPC send). + */ + constructor(ipcClient: NexusIpcClient | undefined) { + this.ipcClient = ipcClient; } - publish(event: GroveEvent): void { - const nexusUrl = (this.client as { baseUrl?: string }).baseUrl ?? process.env.GROVE_NEXUS_URL; - const apiKey = process.env.NEXUS_API_KEY; + async publish(event: GroveEvent): Promise { + let result: PublishResult = { ok: true }; - if (nexusUrl && apiKey) { - void fetch(`${nexusUrl}/api/v2/ipc/send`, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${apiKey}`, - }, - body: JSON.stringify({ - sender: event.sourceRole, - recipient: event.targetRole, - type: "event", - payload: event.payload, - }), - }).catch(() => { - /* best-effort */ - }); + // Send via Nexus IPC when available + if (this.ipcClient !== undefined) { + result = await this.ipcClient.send(event.sourceRole, event.targetRole, event.payload); } // Also notify local handlers (for in-process subscribers) @@ -53,6 +43,8 @@ export class NexusEventBus implements EventBus { } } } + + return result; } subscribe(role: string, handler: EventHandler): void { diff --git a/src/nexus/nexus-handoff-store.test.ts b/src/nexus/nexus-handoff-store.test.ts new file mode 100644 index 00000000..c78d924e --- /dev/null +++ b/src/nexus/nexus-handoff-store.test.ts @@ -0,0 +1,160 @@ +/** + * Run HandoffStore conformance tests against NexusHandoffStore, + * plus Nexus-specific tests for VFS behavior. + */ + +import { describe, expect, test } from "bun:test"; +import { HandoffStatus } from "../core/handoff.js"; +import { runHandoffStoreConformanceTests } from "../core/handoff-store.conformance.js"; +import { MockNexusClient } from "./mock-client.js"; +import { NexusHandoffStore } from "./nexus-handoff-store.js"; + +// --------------------------------------------------------------------------- +// Conformance +// --------------------------------------------------------------------------- + +runHandoffStoreConformanceTests("NexusHandoffStore", () => { + const client = new MockNexusClient(); + return new NexusHandoffStore(client, "test-session", "default"); +}); + +// --------------------------------------------------------------------------- +// Nexus-specific tests +// --------------------------------------------------------------------------- + +describe("NexusHandoffStore: Nexus-specific behavior", () => { + test("created handoffs default to Delivered status (not PendingPickup)", async () => { + const client = new MockNexusClient(); + const store = new NexusHandoffStore(client, "sess-1", "default"); + try { + const h = await store.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + }); + // NexusHandoffStore intentionally defaults to Delivered + expect(h.status).toBe(HandoffStatus.Delivered); + } finally { + store.close(); + } + }); + + test("createMany is idempotent — duplicate handoffIds are skipped", async () => { + const client = new MockNexusClient(); + const store = new NexusHandoffStore(client, "sess-1", "default"); + try { + const results1 = await store.createMany([ + { handoffId: "h-1", sourceCid: "blake3:a", fromRole: "coder", toRole: "reviewer" }, + { handoffId: "h-2", sourceCid: "blake3:b", fromRole: "coder", toRole: "tester" }, + ]); + expect(results1).toHaveLength(2); + + // Write the same IDs again + const results2 = await store.createMany([ + { handoffId: "h-1", sourceCid: "blake3:a", fromRole: "coder", toRole: "reviewer" }, + { handoffId: "h-3", sourceCid: "blake3:c", fromRole: "coder", toRole: "auditor" }, + ]); + expect(results2).toHaveLength(2); + + // Total should be 3 (h-1 deduped, h-2 from first, h-3 from second) + const all = await store.list(); + expect(all).toHaveLength(3); + } finally { + store.close(); + } + }); + + test("list scans all session files when no sessionId filter", async () => { + const client = new MockNexusClient(); + // Create handoffs in two different sessions + const store1 = new NexusHandoffStore(client, "sess-1", "default"); + const store2 = new NexusHandoffStore(client, "sess-2", "default"); + try { + await store1.create({ sourceCid: "blake3:a", fromRole: "coder", toRole: "reviewer" }); + await store2.create({ sourceCid: "blake3:b", fromRole: "reviewer", toRole: "coder" }); + + // A store should see handoffs from all sessions via readAllHandoffs + const all = await store1.list(); + expect(all.length).toBeGreaterThanOrEqual(2); + } finally { + store1.close(); + store2.close(); + } + }); + + test("get falls back to scanning all files for cross-session lookups", async () => { + const client = new MockNexusClient(); + const store1 = new NexusHandoffStore(client, "sess-1", "default"); + const store2 = new NexusHandoffStore(client, "sess-2", "default"); + try { + await store2.create({ + handoffId: "cross-session-id", + sourceCid: "blake3:a", + fromRole: "coder", + toRole: "reviewer", + }); + + // store1 is scoped to sess-1 but should find sess-2's handoff via scanAll + const found = await store1.get("cross-session-id"); + expect(found).toBeDefined(); + expect(found!.handoffId).toBe("cross-session-id"); + } finally { + store1.close(); + store2.close(); + } + }); + + test("store without sessionId uses _global.json", async () => { + const client = new MockNexusClient(); + const store = new NexusHandoffStore(client, undefined, "default"); + try { + const h = await store.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + }); + + const fetched = await store.get(h.handoffId); + expect(fetched).toBeDefined(); + } finally { + store.close(); + } + }); + + test("markDelivered updates handoff in VFS file", async () => { + const client = new MockNexusClient(); + const store = new NexusHandoffStore(client, "sess-1", "default"); + try { + const h = await store.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + }); + + await store.markDelivered(h.handoffId); + const updated = await store.get(h.handoffId); + expect(updated?.status).toBe(HandoffStatus.Delivered); + } finally { + store.close(); + } + }); + + test("markReplied updates handoff with resolvedByCid in VFS file", async () => { + const client = new MockNexusClient(); + const store = new NexusHandoffStore(client, "sess-1", "default"); + try { + const h = await store.create({ + sourceCid: "blake3:abc", + fromRole: "coder", + toRole: "reviewer", + }); + + await store.markReplied(h.handoffId, "blake3:reply"); + const updated = await store.get(h.handoffId); + expect(updated?.status).toBe(HandoffStatus.Replied); + expect(updated?.resolvedByCid).toBe("blake3:reply"); + } finally { + store.close(); + } + }); +}); diff --git a/src/nexus/nexus-handoff-store.ts b/src/nexus/nexus-handoff-store.ts index 01ee13e7..d54cb486 100644 --- a/src/nexus/nexus-handoff-store.ts +++ b/src/nexus/nexus-handoff-store.ts @@ -5,14 +5,15 @@ * handoffs/{sessionId}.json → { handoffs: Handoff[] } * * This avoids many small files while keeping cross-agent visibility. - * Concurrent updates use etag-based CAS with retry — conflicts are rare - * since handoffs within a session are mostly sequential. + * Concurrent updates use read-modify-write with retry. Writes are + * unconditional (last-writer-wins) since Nexus VFS CAS is unreliable. * * When no sessionId is available (e.g. handoff created outside a session), * falls back to a shared "handoffs/_global.json" file. */ import { + canTransition, type Handoff, type HandoffInput, type HandoffQuery, @@ -22,7 +23,7 @@ import { import { debugLog } from "../tui/debug-log.js"; import type { NexusClient } from "./client.js"; -const MAX_CAS_RETRIES = 8; +const MAX_RETRIES = 8; function handoffsDir(zoneId: string): string { return `/zones/${zoneId}/handoffs`; @@ -38,12 +39,18 @@ function encode(obj: unknown): Uint8Array { return new TextEncoder().encode(JSON.stringify(obj)); } +/** TTL for the readAllHandoffs cache in ms. SSE invalidation resets this. */ +const CACHE_TTL_MS = 5_000; + export class NexusHandoffStore implements HandoffStore { private readonly client: NexusClient; private readonly sessionId: string | undefined; private readonly zoneId: string; private dirEnsured = false; + /** Cached result of readAllHandoffs(). Invalidated on writes and SSE events. */ + private allHandoffsCache: { data: Handoff[]; fetchedAt: number } | undefined; + constructor( client: NexusClient, /** Active session ID — determines which file handoffs are written to. */ @@ -92,33 +99,40 @@ export class NexusHandoffStore implements HandoffStore { } /** - * Read-modify-write with CAS retry. + * Read-modify-write with retry on conflict or rate limit. + * + * NOT true CAS — Nexus sys_write silently drops conditional writes + * (if_match / if_none_match return success but don't persist), so writes + * are unconditional. Concurrent writers are last-writer-wins. The retry + * loop handles rate limits and transient errors, not CAS conflicts. + * * fn receives current handoffs, returns modified handoffs. * Returns the final handoff list after successful write. */ - private async casUpdate( + private async readModifyWrite( path: string, fn: (handoffs: Handoff[]) => Handoff[], ): Promise { await this.ensureDir(); - for (let attempt = 0; attempt < MAX_CAS_RETRIES; attempt++) { + for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { const { handoffs, etag } = await this.readFile(path); const updated = fn(handoffs); try { // Unconditional write — Nexus sys_write silently drops writes when // if_match or if_none_match are set (returns success but doesn't persist). - // Without CAS, concurrent writes may lose data, but handoffs are append-only + // Unconditional write — concurrent writes are last-writer-wins. Handoffs are append-only // per session so conflicts are rare and the retry loop handles it. const writeResult = await this.client.write(path, encode({ handoffs: updated })); debugLog( - "NexusHandoffStore.casUpdate", + "NexusHandoffStore.readModifyWrite", `WRITE OK path=${path} etag=${etag || "(empty)"} bytesWritten=${writeResult.bytesWritten} newEtag=${writeResult.etag} count=${updated.length} attempt=${attempt}`, ); + this.invalidateCache(); return updated; } catch (err) { const msg = err instanceof Error ? err.message : String(err); debugLog( - "NexusHandoffStore.casUpdate", + "NexusHandoffStore.readModifyWrite", `WRITE FAIL path=${path} attempt=${attempt} err=${msg}`, ); // Conflict = another writer updated between our read and write — retry @@ -139,7 +153,7 @@ export class NexusHandoffStore implements HandoffStore { throw err; } } - throw new Error(`Handoff CAS update failed after ${MAX_CAS_RETRIES} retries on ${path}`); + throw new Error(`Handoff read-modify-write failed after ${MAX_RETRIES} retries on ${path}`); } // --------------------------------------------------------------------------- @@ -156,7 +170,7 @@ export class NexusHandoffStore implements HandoffStore { /** * Batch creation: collapses N handoff inserts into a single VFS file - * write (one casUpdate, one HTTP round-trip). Avoids the N+1 pattern + * write (one readModifyWrite, one HTTP round-trip). Avoids the N+1 pattern * the contributeOperation serial path used to have when fanning out * to multiple downstream roles. */ @@ -171,7 +185,7 @@ export class NexusHandoffStore implements HandoffStore { // Default to Delivered — the MCP creates handoffs at contribution-write time // AND the TopologyRouter routes them immediately. The TUI's routeContribution // delivers via agentRuntime.send(). Updating status cross-client (PendingPickup - // → Delivered) fails due to Nexus VFS casUpdate limitations. + // → Delivered) fails due to Nexus VFS write-visibility limitations. status: HandoffStatus.Delivered, requiresReply: input.requiresReply ?? false, ...(input.replyDueAt !== undefined ? { replyDueAt: input.replyDueAt } : {}), @@ -179,7 +193,7 @@ export class NexusHandoffStore implements HandoffStore { })); const path = this.filePath(); - await this.casUpdate(path, (existing) => { + await this.readModifyWrite(path, (existing) => { // Idempotent merge: skip handoffs whose id is already present. const existingIds = new Set(existing.map((h) => h.handoffId)); const fresh = handoffs.filter((h) => !existingIds.has(h.handoffId)); @@ -226,26 +240,43 @@ export class NexusHandoffStore implements HandoffStore { } async markDelivered(handoffId: string): Promise { - await this.updateHandoff(handoffId, (h) => ({ ...h, status: HandoffStatus.Delivered })); + await this.transitionHandoff(handoffId, HandoffStatus.Delivered); + } + + async markProcessed(handoffId: string): Promise { + await this.transitionHandoff(handoffId, HandoffStatus.Processed); } async markReplied(handoffId: string, resolvedByCid: string): Promise { - await this.updateHandoff(handoffId, (h) => ({ - ...h, - status: HandoffStatus.Replied, - resolvedByCid, - })); + await this.transitionHandoff(handoffId, HandoffStatus.Replied, { resolvedByCid }); + } + + async markDeadLettered(handoffId: string): Promise { + await this.transitionHandoff(handoffId, HandoffStatus.DeadLettered); + } + + async setIpcMessageId(handoffId: string, ipcMessageId: string): Promise { + await this.updateHandoff(handoffId, (h) => ({ ...h, ipcMessageId })); } async expireStale(now?: string): Promise { const cutoff = now ?? new Date().toISOString(); const expired: Handoff[] = []; + // Expire handoffs in any non-terminal state that have a replyDueAt past + // the cutoff. The state machine allows pending_pickup→expired, + // delivered→expired, and processed→expired. + const expirableStatuses: ReadonlySet = new Set([ + HandoffStatus.PendingPickup, + HandoffStatus.Delivered, + HandoffStatus.Processed, + ]); + // Only scan the current session file for expiry (on-demand sweep) - await this.casUpdate(this.filePath(), (handoffs) => + await this.readModifyWrite(this.filePath(), (handoffs) => handoffs.map((h) => { if ( - h.status === HandoffStatus.PendingPickup && + expirableStatuses.has(h.status) && h.replyDueAt !== undefined && h.replyDueAt < cutoff ) { @@ -265,7 +296,13 @@ export class NexusHandoffStore implements HandoffStore { return pending.length; } + /** Invalidate the all-handoffs cache. Call on SSE events or external writes. */ + invalidateCache(): void { + this.allHandoffsCache = undefined; + } + close(): void { + this.allHandoffsCache = undefined; // NexusClient is shared — caller owns its lifecycle } @@ -274,17 +311,69 @@ export class NexusHandoffStore implements HandoffStore { // --------------------------------------------------------------------------- private async updateHandoff(handoffId: string, fn: (h: Handoff) => Handoff): Promise { - await this.casUpdate(this.filePath(), (handoffs) => + await this.readModifyWrite(this.filePath(), (handoffs) => handoffs.map((h) => (h.handoffId === handoffId ? fn(h) : h)), ); } + /** + * Transition a handoff's status with state machine validation. + * Rejects the write (no-op) if the current status doesn't allow the transition, + * which guards against concurrent writers clobbering each other with stale state. + */ + /** + * Transition a handoff's status with state machine validation. + * + * Reads the current file, checks canTransition, and only writes back + * if the transition is valid. Skips the write entirely on no-op to + * avoid clobbering concurrent changes with a stale snapshot. + */ + private async transitionHandoff( + handoffId: string, + targetStatus: HandoffStatus, + extraFields?: Partial, + ): Promise { + await this.ensureDir(); + const path = this.filePath(); + const { handoffs } = await this.readFile(path); + + const idx = handoffs.findIndex((h) => h.handoffId === handoffId); + if (idx === -1) return; // handoff not in this file + + const current = handoffs[idx]; + if (!current) return; + if (!canTransition(current.status, targetStatus)) { + debugLog( + "NexusHandoffStore.transitionHandoff", + `REJECTED ${handoffId} ${current.status}→${targetStatus} (invalid transition, skipping write)`, + ); + return; // skip write entirely — don't clobber concurrent changes + } + + // Valid transition — do the write + handoffs[idx] = { ...current, ...extraFields, status: targetStatus }; + await this.client.write(path, new TextEncoder().encode(JSON.stringify({ handoffs }))); + this.invalidateCache(); + debugLog( + "NexusHandoffStore.transitionHandoff", + `OK ${handoffId} ${current.status}→${targetStatus}`, + ); + } + private async scanAll(predicate: (h: Handoff) => boolean): Promise { const all = await this.readAllHandoffs(); return all.find(predicate); } private async readAllHandoffs(): Promise { + // Return cached data if fresh (within TTL) + if ( + this.allHandoffsCache !== undefined && + Date.now() - this.allHandoffsCache.fetchedAt < CACHE_TTL_MS + ) { + return this.allHandoffsCache.data; + } + try { const listing = await this.client.list(handoffsDir(this.zoneId)); // Nexus list may return entries without the .json extension even though @@ -312,7 +401,9 @@ export class NexusHandoffStore implements HandoffStore { } }), ); - return results.flat(); + const allHandoffs = results.flat(); + this.allHandoffsCache = { data: allHandoffs, fetchedAt: Date.now() }; + return allHandoffs; } catch (listErr) { debugLog( "NexusHandoffStore.readAllHandoffs", diff --git a/src/nexus/nexus-ipc-client.ts b/src/nexus/nexus-ipc-client.ts new file mode 100644 index 00000000..f6b622bd --- /dev/null +++ b/src/nexus/nexus-ipc-client.ts @@ -0,0 +1,142 @@ +/** + * Shared Nexus IPC client for sending messages via the Nexus IPC API. + * + * Consolidates the duplicate POST /api/v2/ipc/send calls that previously + * existed in both NexusEventBus and NexusWsBridge (Issue #165 / 5A). + * + * Returns structured results with IPC message IDs for handoff tracking. + */ + +import { debugLog } from "../tui/debug-log.js"; + +/** Result of an IPC send operation. */ +export interface IpcSendResult { + readonly ok: boolean; + readonly messageId?: string | undefined; + readonly error?: string | undefined; + /** + * True when the failure is an infrastructure issue (endpoint not found, + * connection refused) rather than a delivery-level rejection. Callers + * should NOT dead-letter handoffs on infrastructure errors — the message + * was never attempted, not rejected. + */ + readonly infrastructureError?: boolean | undefined; +} + +/** Options for constructing a NexusIpcClient. */ +export interface NexusIpcClientOptions { + readonly nexusUrl: string; + readonly apiKey: string; +} + +/** How long to cache a transient IPC failure before retrying (ms). */ +const TRANSIENT_BACKOFF_MS = 30_000; + +export class NexusIpcClient { + private readonly nexusUrl: string; + private readonly apiKey: string; + /** + * Endpoint availability state: + * - undefined: unknown (first call) + * - true: confirmed reachable + * - false: permanently unavailable (404/405 — endpoint doesn't exist) + */ + private endpointAvailable: boolean | undefined; + /** Timestamp of last transient failure. Used for backoff, not permanent caching. */ + private transientFailureAt: number | undefined; + + constructor(opts: NexusIpcClientOptions) { + this.nexusUrl = opts.nexusUrl; + this.apiKey = opts.apiKey; + } + + /** + * Send an IPC message from one agent to another via Nexus. + * + * Returns a structured result with the IPC message ID on success. + * Never throws — errors are returned in the result. + * + * Infrastructure errors (404, connection refused) set ok=false but also + * set `infrastructureError=true` so callers can distinguish "IPC endpoint + * doesn't exist" from "message was rejected by the IPC service." + */ + async send( + sender: string, + recipient: string, + payload: Record, + ): Promise { + // Skip if we've determined the endpoint permanently doesn't exist (404/405). + if (this.endpointAvailable === false) { + return { + ok: false, + error: "IPC endpoint unavailable (permanent 404/405)", + infrastructureError: true, + }; + } + + // Backoff on transient failures (502/503/network) — retry after TRANSIENT_BACKOFF_MS. + if ( + this.transientFailureAt !== undefined && + Date.now() - this.transientFailureAt < TRANSIENT_BACKOFF_MS + ) { + return { ok: false, error: "IPC endpoint transient backoff", infrastructureError: true }; + } + + try { + const resp = await fetch(`${this.nexusUrl}/api/v2/ipc/send`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${this.apiKey}`, + }, + body: JSON.stringify({ sender, recipient, type: "event", payload }), + }); + + if (!resp.ok) { + const error = `IPC send failed: HTTP ${resp.status}`; + debugLog( + "nexus-ipc", + `SEND FAIL sender=${sender} recipient=${recipient} status=${resp.status}`, + ); + // 404/405 = endpoint doesn't exist on this Nexus version → permanent disable + const isPermanent = resp.status === 404 || resp.status === 405; + // 429/5xx/auth = retryable/infrastructure, NOT a delivery rejection. + // Only a 2xx success followed by a delivery-level rejection from the + // IPC service (future: explicit rejected status) should dead-letter. + // All non-2xx failures are infrastructure by definition — the message + // was never accepted for delivery. + const isInfraOrRetryable = !isPermanent; + if (isPermanent) { + this.endpointAvailable = false; + } else if (resp.status >= 500 || resp.status === 429) { + this.transientFailureAt = Date.now(); + } + return { ok: false, error, infrastructureError: isPermanent || isInfraOrRetryable }; + } + + this.endpointAvailable = true; + this.transientFailureAt = undefined; // clear backoff on success + + // Try to extract message_id from response + let messageId: string | undefined; + try { + const body = (await resp.json()) as { message_id?: string }; + messageId = body.message_id; + } catch { + // Response may not be JSON — still a success + } + + debugLog( + "nexus-ipc", + `SEND OK sender=${sender} recipient=${recipient} messageId=${messageId ?? "(none)"}`, + ); + return { ok: true, messageId }; + } catch (err) { + const error = err instanceof Error ? err.message : String(err); + debugLog("nexus-ipc", `SEND ERROR sender=${sender} recipient=${recipient} err=${error}`); + // Network errors (connection refused, DNS failure) = transient infrastructure + this.transientFailureAt = Date.now(); + return { ok: false, error, infrastructureError: true }; + } + } +} diff --git a/src/server/session-service.test.ts b/src/server/session-service.test.ts index 3276c4af..9717b619 100644 --- a/src/server/session-service.test.ts +++ b/src/server/session-service.test.ts @@ -185,7 +185,7 @@ describe("SessionService", () => { await service.startSession("Test"); // Simulate a contribution event from the coder to the reviewer - bus.publish({ + void bus.publish({ type: "contribution", sourceRole: "coder", targetRole: "reviewer", @@ -221,14 +221,14 @@ describe("SessionService", () => { await service.startSession("Test"); // Simulate stop events from both roles - bus.publish({ + void bus.publish({ type: "stop", sourceRole: "coder", targetRole: "coder", payload: { reason: "done" }, timestamp: new Date().toISOString(), }); - bus.publish({ + void bus.publish({ type: "stop", sourceRole: "reviewer", targetRole: "reviewer", @@ -308,7 +308,7 @@ describe("SessionService", () => { service.destroy(); // Publishing after destroy should not cause errors - bus.publish({ + void bus.publish({ type: "contribution", sourceRole: "coder", targetRole: "reviewer", diff --git a/src/tui/nexus-ws-bridge.test.ts b/src/tui/nexus-ws-bridge.test.ts new file mode 100644 index 00000000..419eb47c --- /dev/null +++ b/src/tui/nexus-ws-bridge.test.ts @@ -0,0 +1,442 @@ +/** + * Unit tests for NexusWsBridge. + * + * Tests handleEvent and readAndPush with mocked dependencies. + * Establishes a baseline before adding IPC delivery state updates (3A). + */ + +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import type { AgentRuntime, AgentSession } from "../core/agent-runtime.js"; +import type { GroveEvent } from "../core/event-bus.js"; +import { LocalEventBus } from "../core/local-event-bus.js"; +import { NexusWsBridge, type NexusWsBridgeOptions } from "./nexus-ws-bridge.js"; + +// --------------------------------------------------------------------------- +// Mock factories +// --------------------------------------------------------------------------- + +function makeSession(role: string): AgentSession { + return { + id: `session-${role}`, + role, + status: "running", + }; +} + +function makeMockRuntime(): AgentRuntime { + return { + spawn: mock(() => Promise.resolve(makeSession("mock"))), + send: mock(() => Promise.resolve()), + close: mock(() => Promise.resolve()), + // biome-ignore lint/suspicious/noEmptyBlockStatements: mock no-op + onIdle: mock(() => {}), + listSessions: mock(() => Promise.resolve([])), + isAvailable: mock(() => Promise.resolve(true)), + }; +} + +function makeBridgeOpts(overrides?: Partial): NexusWsBridgeOptions { + return { + topology: { + structure: "graph", + roles: [ + { name: "coder", edges: [{ target: "reviewer", edgeType: "delegates" as const }] }, + { name: "reviewer" }, + ], + }, + runtime: makeMockRuntime(), + nexusUrl: "http://localhost:9999", + apiKey: "test-key", + ...overrides, + }; +} + +// --------------------------------------------------------------------------- +// Helpers to invoke private methods via the bridge +// --------------------------------------------------------------------------- + +/** + * We need to test handleEvent which is private. We test it indirectly by: + * 1. Registering a session + * 2. Mocking fetch for the VFS read + * 3. Calling the bridge's internal SSE handler by simulating what connectSse does + * + * Rather than reaching into private methods, we test the public API path: + * registerSession + the observable side effects (runtime.send called, eventBus notified). + * + * For direct handleEvent testing, we use a subclass that exposes it. + */ +class TestableNexusWsBridge extends NexusWsBridge { + /** Expose handleEvent for testing. */ + testHandleEvent(role: string, eventType: string | null, raw: string): void { + // Access private method — test-only subclass + ( + this as unknown as { handleEvent: (r: string, e: string | null, d: string) => void } + ).handleEvent(role, eventType, raw); + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("NexusWsBridge", () => { + let originalFetch: typeof globalThis.fetch; + + beforeEach(() => { + originalFetch = globalThis.fetch; + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + }); + + // --- handleEvent --- + + test("handleEvent ignores non-message_delivered events", () => { + const runtime = makeMockRuntime(); + const bus = new LocalEventBus(); + const bridge = new TestableNexusWsBridge(makeBridgeOpts({ runtime, eventBus: bus })); + const session = makeSession("reviewer"); + bridge.registerSession("reviewer", session); + + bridge.testHandleEvent( + "reviewer", + "heartbeat", + JSON.stringify({ + event: "heartbeat", + message_id: "msg-1", + sender: "coder", + recipient: "reviewer", + type: "event", + path: "/inbox/msg-1", + }), + ); + + // runtime.send should NOT have been called + expect(runtime.send).not.toHaveBeenCalled(); + bridge.close(); + bus.close(); + }); + + test("handleEvent publishes to EventBus on message_delivered", () => { + const runtime = makeMockRuntime(); + const bus = new LocalEventBus(); + const received: GroveEvent[] = []; + bus.subscribe("reviewer", (e) => received.push(e)); + + const bridge = new TestableNexusWsBridge(makeBridgeOpts({ runtime, eventBus: bus })); + const session = makeSession("reviewer"); + bridge.registerSession("reviewer", session); + + // Mock fetch for readAndPush VFS read — return valid data + globalThis.fetch = (async () => + new Response( + JSON.stringify({ + result: { + data: Buffer.from( + JSON.stringify({ sender: "coder", payload: { summary: "test contribution" } }), + ).toString("base64"), + }, + }), + { status: 200 }, + )) as unknown as typeof fetch; + + bridge.testHandleEvent( + "reviewer", + "message_delivered", + JSON.stringify({ + event: "message_delivered", + message_id: "msg-1", + sender: "coder", + recipient: "reviewer", + type: "event", + path: "/inbox/msg-1", + }), + ); + + // EventBus should have received the event + expect(received).toHaveLength(1); + expect(received[0]!.type).toBe("contribution"); + expect(received[0]!.sourceRole).toBe("coder"); + expect(received[0]!.targetRole).toBe("reviewer"); + expect(received[0]!.payload).toEqual({ message_id: "msg-1" }); + + bridge.close(); + bus.close(); + }); + + test("handleEvent does nothing when no session registered for role", async () => { + const runtime = makeMockRuntime(); + const bus = new LocalEventBus(); + const bridge = new TestableNexusWsBridge(makeBridgeOpts({ runtime, eventBus: bus })); + // No session registered for "reviewer" + + globalThis.fetch = (async () => new Response("{}", { status: 200 })) as unknown as typeof fetch; + + bridge.testHandleEvent( + "reviewer", + "message_delivered", + JSON.stringify({ + event: "message_delivered", + message_id: "msg-1", + sender: "coder", + recipient: "reviewer", + type: "event", + path: "/inbox/msg-1", + }), + ); + + // Give async readAndPush a tick + await new Promise((r) => setTimeout(r, 20)); + + // runtime.send should NOT have been called (no session) + expect(runtime.send).not.toHaveBeenCalled(); + bridge.close(); + bus.close(); + }); + + test("handleEvent calls onBeforeDeliver callback", async () => { + const runtime = makeMockRuntime(); + const deliverCalls: { sender: string; recipient: string }[] = []; + + globalThis.fetch = (async () => + new Response( + JSON.stringify({ + result: { + data: Buffer.from( + JSON.stringify({ sender: "coder", payload: { summary: "test" } }), + ).toString("base64"), + }, + }), + { status: 200 }, + )) as unknown as typeof fetch; + + const bridge = new TestableNexusWsBridge( + makeBridgeOpts({ + runtime, + onBeforeDeliver: (sender, recipient) => { + deliverCalls.push({ sender, recipient }); + }, + }), + ); + const session = makeSession("reviewer"); + bridge.registerSession("reviewer", session); + + bridge.testHandleEvent( + "reviewer", + "message_delivered", + JSON.stringify({ + event: "message_delivered", + message_id: "msg-1", + sender: "coder", + recipient: "reviewer", + type: "event", + path: "/inbox/msg-1", + }), + ); + + await new Promise((r) => setTimeout(r, 20)); + + expect(deliverCalls).toHaveLength(1); + expect(deliverCalls[0]).toEqual({ sender: "coder", recipient: "reviewer" }); + bridge.close(); + }); + + test("handleEvent skips malformed JSON gracefully", () => { + const runtime = makeMockRuntime(); + const bridge = new TestableNexusWsBridge(makeBridgeOpts({ runtime })); + const session = makeSession("reviewer"); + bridge.registerSession("reviewer", session); + + // Should not throw + bridge.testHandleEvent("reviewer", "message_delivered", "not-valid-json"); + + expect(runtime.send).not.toHaveBeenCalled(); + bridge.close(); + }); + + // --- readAndPush --- + + test("readAndPush delivers message content to agent session", async () => { + const runtime = makeMockRuntime(); + + globalThis.fetch = (async () => + new Response( + JSON.stringify({ + result: { + data: Buffer.from( + JSON.stringify({ + sender: "coder", + payload: { summary: "implement auth module" }, + }), + ).toString("base64"), + }, + }), + { status: 200 }, + )) as unknown as typeof fetch; + + const bridge = new TestableNexusWsBridge(makeBridgeOpts({ runtime })); + const session = makeSession("reviewer"); + bridge.registerSession("reviewer", session); + + bridge.testHandleEvent( + "reviewer", + "message_delivered", + JSON.stringify({ + event: "message_delivered", + message_id: "msg-1", + sender: "coder", + recipient: "reviewer", + type: "event", + path: "/inbox/msg-1", + }), + ); + + // readAndPush is async — wait for it + await new Promise((r) => setTimeout(r, 50)); + + expect(runtime.send).toHaveBeenCalledTimes(1); + const sendCall = (runtime.send as ReturnType).mock.calls[0]; + expect(sendCall![0]).toBe(session); + expect(sendCall![1]).toContain("[IPC from coder]"); + expect(sendCall![1]).toContain("implement auth module"); + + bridge.close(); + }); + + test("readAndPush handles VFS read failure gracefully", async () => { + const runtime = makeMockRuntime(); + + globalThis.fetch = (async () => new Response("", { status: 500 })) as unknown as typeof fetch; + + const bridge = new TestableNexusWsBridge(makeBridgeOpts({ runtime })); + const session = makeSession("reviewer"); + bridge.registerSession("reviewer", session); + + bridge.testHandleEvent( + "reviewer", + "message_delivered", + JSON.stringify({ + event: "message_delivered", + message_id: "msg-1", + sender: "coder", + recipient: "reviewer", + type: "event", + path: "/inbox/msg-1", + }), + ); + + await new Promise((r) => setTimeout(r, 50)); + + // Should not crash, should not deliver + expect(runtime.send).not.toHaveBeenCalled(); + bridge.close(); + }); + + test("readAndPush handles missing data field gracefully", async () => { + const runtime = makeMockRuntime(); + + globalThis.fetch = (async () => + new Response(JSON.stringify({ result: {} }), { status: 200 })) as unknown as typeof fetch; + + const bridge = new TestableNexusWsBridge(makeBridgeOpts({ runtime })); + const session = makeSession("reviewer"); + bridge.registerSession("reviewer", session); + + bridge.testHandleEvent( + "reviewer", + "message_delivered", + JSON.stringify({ + event: "message_delivered", + message_id: "msg-1", + sender: "coder", + recipient: "reviewer", + type: "event", + path: "/inbox/msg-1", + }), + ); + + await new Promise((r) => setTimeout(r, 50)); + + expect(runtime.send).not.toHaveBeenCalled(); + bridge.close(); + }); + + // --- send --- + + test("send() POSTs to Nexus IPC endpoint", async () => { + const fetchCalls: { url: string; body: unknown }[] = []; + globalThis.fetch = (async (input: string | URL | Request, init?: RequestInit) => { + fetchCalls.push({ + url: String(input), + body: JSON.parse(init?.body as string), + }); + return new Response("{}", { status: 200 }); + }) as unknown as typeof fetch; + + const bridge = new NexusWsBridge(makeBridgeOpts()); + const ok = await bridge.send("coder", "reviewer", { summary: "test" }); + + expect(ok).toBe(true); + expect(fetchCalls).toHaveLength(1); + expect(fetchCalls[0]!.url).toBe("http://localhost:9999/api/v2/ipc/send"); + expect(fetchCalls[0]!.body).toEqual({ + sender: "coder", + recipient: "reviewer", + type: "event", + payload: { summary: "test" }, + }); + + bridge.close(); + }); + + test("send() returns false on network error", async () => { + globalThis.fetch = (async () => { + throw new Error("network error"); + }) as unknown as typeof fetch; + + const bridge = new NexusWsBridge(makeBridgeOpts()); + const ok = await bridge.send("coder", "reviewer", { summary: "test" }); + + expect(ok).toBe(false); + bridge.close(); + }); + + test("send() returns false on non-ok response", async () => { + globalThis.fetch = (async () => new Response("", { status: 500 })) as unknown as typeof fetch; + + const bridge = new NexusWsBridge(makeBridgeOpts()); + const ok = await bridge.send("coder", "reviewer", { summary: "test" }); + + expect(ok).toBe(false); + bridge.close(); + }); + + // --- session management --- + + test("registerSession and unregisterSession", () => { + const bridge = new NexusWsBridge(makeBridgeOpts()); + const session = makeSession("reviewer"); + + bridge.registerSession("reviewer", session); + bridge.unregisterSession("reviewer"); + + // After unregister, events for this role should not deliver + // (tested indirectly via handleEvent no-session path) + bridge.close(); + }); + + test("close clears all sessions and abort controllers", () => { + const bridge = new NexusWsBridge(makeBridgeOpts()); + bridge.registerSession("reviewer", makeSession("reviewer")); + bridge.registerSession("coder", makeSession("coder")); + + bridge.close(); + + // After close, new registrations should not start SSE + // (the closed flag prevents it) + bridge.registerSession("tester", makeSession("tester")); + // No error expected — just a no-op + }); +}); diff --git a/src/tui/nexus-ws-bridge.ts b/src/tui/nexus-ws-bridge.ts index 06eb7ba6..6505657b 100644 --- a/src/tui/nexus-ws-bridge.ts +++ b/src/tui/nexus-ws-bridge.ts @@ -15,7 +15,10 @@ import type { AgentRuntime, AgentSession } from "../core/agent-runtime.js"; import type { EventBus, GroveEvent } from "../core/event-bus.js"; +import type { HandoffStore } from "../core/handoff.js"; import type { AgentTopology } from "../core/topology.js"; +import type { NexusIpcClient } from "../nexus/nexus-ipc-client.js"; +import { debugLog } from "./debug-log.js"; export interface NexusWsBridgeOptions { topology: AgentTopology; @@ -26,6 +29,10 @@ export interface NexusWsBridgeOptions { eventBus?: EventBus | undefined; /** Called before delivering IPC to an agent — use for workspace rsync. */ onBeforeDeliver?: ((sender: string, recipient: string) => void) | undefined; + /** HandoffStore for updating delivery status on SSE events. */ + handoffStore?: HandoffStore | undefined; + /** Shared IPC client — replaces inline fetch when provided. */ + ipcClient?: NexusIpcClient | undefined; } interface SseEvent { @@ -79,6 +86,12 @@ export class NexusWsBridge { recipient: string, payload: Record, ): Promise { + // Use shared NexusIpcClient when available (5A: DRY) + if (this.opts.ipcClient) { + const result = await this.opts.ipcClient.send(sender, recipient, payload); + return result.ok; + } + // Fallback: direct fetch (backward compat when ipcClient not injected) try { const resp = await fetch(`${this.opts.nexusUrl}/api/v2/ipc/send`, { method: "POST", @@ -177,15 +190,10 @@ export class NexusWsBridge { if (eventType !== "message_delivered") return; const event = JSON.parse(raw) as SseEvent; - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [wsBridge.handleEvent] role=${role} sender=${event.sender} path=${event.path} registeredSessions=[${[...this.sessions.keys()].join(",")}]\n`, - ); - } catch { - /* */ - } + debugLog( + "wsBridge.handleEvent", + `role=${role} sender=${event.sender} path=${event.path} registeredSessions=[${[...this.sessions.keys()].join(",")}]`, + ); // Notify the TUI EventBus — triggers contribution feed refresh (no polling needed) if (this.opts.eventBus) { @@ -196,20 +204,19 @@ export class NexusWsBridge { payload: { message_id: event.message_id }, timestamp: new Date().toISOString(), }; - this.opts.eventBus.publish(groveEvent); + void this.opts.eventBus.publish(groveEvent); + } + + // --- IPC lifecycle: mark matching handoff as delivered --- + // The message_delivered SSE confirms Nexus inbox delivery. + // Find the handoff by IPC message ID and transition its status. + if (this.opts.handoffStore && event.message_id) { + void this.updateHandoffDeliveryStatus(event.message_id, role, event.sender); } const session = this.sessions.get(role); if (!session) { - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [wsBridge.handleEvent] NO SESSION for role=${role} — cannot deliver\n`, - ); - } catch { - /* */ - } + debugLog("wsBridge.handleEvent", `NO SESSION for role=${role} — cannot deliver`); return; } @@ -225,6 +232,68 @@ export class NexusWsBridge { } } + /** + * Update handoff delivery status when an IPC message_delivered SSE event arrives. + * + * Correlates by ipcMessageId first, then falls back to matching by + * (toRole, status=pending_pickup) for the most recent undelivered handoff. + * The fallback handles the race where message_delivered arrives before + * the fire-and-forget setIpcMessageId() in contribute.ts completes. + * + * Best-effort — handoff store errors don't block delivery. + */ + private async updateHandoffDeliveryStatus( + ipcMessageId: string, + targetRole: string, + sender?: string, + ): Promise { + try { + const store = this.opts.handoffStore; + if (!store) return; + + const handoffs = await store.list({ toRole: targetRole }); + + // Primary: match by IPC message ID (exact correlation) + let matching = handoffs.find((h) => h.ipcMessageId === ipcMessageId); + + // Fallback: match most recent pending handoff for this role FROM the + // same sender. Constrains by sender to avoid cross-matching handoffs + // from different source roles. The SSE event carries the sender field. + if (!matching && sender) { + matching = handoffs + .filter( + (h) => + h.fromRole === sender && + (h.status === "pending_pickup" || h.status === "delivered") && + !h.ipcMessageId, // only match handoffs that haven't been IPC-linked yet + ) + .sort((a, b) => b.createdAt.localeCompare(a.createdAt))[0]; + } + + if (matching) { + await store.markDelivered(matching.handoffId); + debugLog( + "wsBridge.updateHandoffDeliveryStatus", + `DELIVERED handoffId=${matching.handoffId} ipcMessageId=${ipcMessageId} role=${targetRole}`, + ); + + // Invalidate cache if the store supports it (NexusHandoffStore) + const cacheable = store as { invalidateCache?: () => void }; + cacheable.invalidateCache?.(); + } else { + debugLog( + "wsBridge.updateHandoffDeliveryStatus", + `NO MATCH ipcMessageId=${ipcMessageId} role=${targetRole} handoffCount=${handoffs.length}`, + ); + } + } catch (err) { + debugLog( + "wsBridge.updateHandoffDeliveryStatus", + `FAIL ipcMessageId=${ipcMessageId} err=${err instanceof Error ? err.message : String(err)}`, + ); + } + } + private async readAndPush( path: string, _targetRole: string, @@ -253,29 +322,16 @@ export class NexusWsBridge { await new Promise((r) => setTimeout(r, 2000 * 2 ** attempt)); } if (!resp || !resp.ok) { - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [wsBridge.readAndPush] FAIL resp.status=${resp?.status ?? "none"} path=${path}\n`, - ); - } catch { - /* */ - } + debugLog("wsBridge.readAndPush", `FAIL resp.status=${resp?.status ?? "none"} path=${path}`); return; } const result = (await resp.json()) as { result?: { data?: string }; error?: unknown }; if (!result.result?.data) { - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [wsBridge.readAndPush] NO DATA path=${path} error=${JSON.stringify(result.error ?? "none").slice(0, 100)}\n`, - ); - } catch { - /* */ - } + debugLog( + "wsBridge.readAndPush", + `NO DATA path=${path} error=${JSON.stringify(result.error ?? "none").slice(0, 100)}`, + ); return; } @@ -292,15 +348,10 @@ export class NexusWsBridge { (msg.payload?.body as string) ?? JSON.stringify(msg.payload ?? {}).slice(0, 100); const notification = `[IPC from ${msgSender}] ${summary}`; - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [wsBridge.readAndPush] delivering to session=${session.id} role=${_targetRole} notification=${notification.slice(0, 80)}\n`, - ); - } catch { - /* */ - } + debugLog( + "wsBridge.readAndPush", + `delivering to session=${session.id} role=${_targetRole} notification=${notification.slice(0, 80)}`, + ); void this.opts.runtime.send(session, notification).catch(() => { /* non-fatal */