Skip to content
82 changes: 46 additions & 36 deletions src/core/event-bus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import { TopologyRouter } from "./topology-router.js";
// --- LocalEventBus tests ---

describe("LocalEventBus", () => {
test("publish delivers to subscriber of target role", () => {
test("publish delivers to subscriber of target role", async () => {
const bus = new LocalEventBus();
const received: GroveEvent[] = [];
bus.subscribe("reviewer", (e) => received.push(e));

bus.publish({
await bus.publish({
type: "contribution",
sourceRole: "coder",
targetRole: "reviewer",
Expand All @@ -25,12 +25,12 @@ describe("LocalEventBus", () => {
bus.close();
});

test("publish does not deliver to wrong role", () => {
test("publish does not deliver to wrong role", async () => {
const bus = new LocalEventBus();
const received: GroveEvent[] = [];
bus.subscribe("coder", (e) => received.push(e));

bus.publish({
await bus.publish({
type: "contribution",
sourceRole: "coder",
targetRole: "reviewer",
Expand All @@ -42,14 +42,14 @@ describe("LocalEventBus", () => {
bus.close();
});

test("unsubscribe stops delivery", () => {
test("unsubscribe stops delivery", async () => {
const bus = new LocalEventBus();
const received: GroveEvent[] = [];
const handler = (e: GroveEvent) => received.push(e);
bus.subscribe("reviewer", handler);
bus.unsubscribe("reviewer", handler);

bus.publish({
await bus.publish({
type: "contribution",
sourceRole: "coder",
targetRole: "reviewer",
Expand All @@ -61,13 +61,13 @@ describe("LocalEventBus", () => {
bus.close();
});

test("multiple subscribers on same role all receive", () => {
test("multiple subscribers on same role all receive", async () => {
const bus = new LocalEventBus();
let count = 0;
bus.subscribe("reviewer", () => count++);
bus.subscribe("reviewer", () => count++);

bus.publish({
await bus.publish({
type: "contribution",
sourceRole: "coder",
targetRole: "reviewer",
Expand All @@ -79,13 +79,13 @@ describe("LocalEventBus", () => {
bus.close();
});

test("close removes all listeners", () => {
test("close removes all listeners", async () => {
const bus = new LocalEventBus();
let count = 0;
bus.subscribe("reviewer", () => count++);
bus.close();

bus.publish({
await bus.publish({
type: "contribution",
sourceRole: "coder",
targetRole: "reviewer",
Expand All @@ -95,6 +95,19 @@ describe("LocalEventBus", () => {

expect(count).toBe(0);
});

test("publish returns ok result", async () => {
const bus = new LocalEventBus();
const result = await bus.publish({
type: "contribution",
sourceRole: "coder",
targetRole: "reviewer",
payload: {},
timestamp: new Date().toISOString(),
});
expect(result.ok).toBe(true);
bus.close();
});
});

// --- TopologyRouter tests ---
Expand All @@ -114,58 +127,58 @@ describe("TopologyRouter", () => {
],
};

test("routes from coder to reviewer via delegates edge", () => {
test("routes from coder to reviewer via delegates edge", async () => {
const bus = new LocalEventBus();
const router = new TopologyRouter(reviewLoopTopology, bus);
const received: GroveEvent[] = [];
bus.subscribe("reviewer", (e) => received.push(e));

const targets = router.route("coder", { cid: "blake3:abc" });
const results = await router.route("coder", { cid: "blake3:abc" });

expect(targets).toEqual(["reviewer"]);
expect(results.map((r) => r.targetRole)).toEqual(["reviewer"]);
expect(received).toHaveLength(1);
expect(received[0]!.payload).toEqual({ cid: "blake3:abc" });
bus.close();
});

test("routes from reviewer to coder via feedback edge", () => {
test("routes from reviewer to coder via feedback edge", async () => {
const bus = new LocalEventBus();
const router = new TopologyRouter(reviewLoopTopology, bus);
const received: GroveEvent[] = [];
bus.subscribe("coder", (e) => received.push(e));

const targets = router.route("reviewer", { verdict: "changes_requested" });
const results = await router.route("reviewer", { verdict: "changes_requested" });

expect(targets).toEqual(["coder"]);
expect(results.map((r) => r.targetRole)).toEqual(["coder"]);
expect(received).toHaveLength(1);
bus.close();
});

test("no edges for unknown role -> empty targets", () => {
test("no edges for unknown role -> empty targets", async () => {
const bus = new LocalEventBus();
const router = new TopologyRouter(reviewLoopTopology, bus);
const targets = router.route("unknown-role", {});
expect(targets).toHaveLength(0);
const results = await router.route("unknown-role", {});
expect(results).toHaveLength(0);
bus.close();
});

test("flat topology (no edges) -> no routing", () => {
test("flat topology (no edges) -> no routing", async () => {
const flat: AgentTopology = {
structure: "flat",
roles: [{ name: "worker1" }, { name: "worker2" }],
};
const bus = new LocalEventBus();
const router = new TopologyRouter(flat, bus);

const targets1 = router.route("worker1", {});
const targets2 = router.route("worker2", {});
const results1 = await router.route("worker1", {});
const results2 = await router.route("worker2", {});

expect(targets1).toHaveLength(0);
expect(targets2).toHaveLength(0);
expect(results1).toHaveLength(0);
expect(results2).toHaveLength(0);
bus.close();
});

test("tree topology: parent routes to children", () => {
test("tree topology: parent routes to children", async () => {
const tree: AgentTopology = {
structure: "tree",
roles: [
Expand All @@ -187,23 +200,23 @@ describe("TopologyRouter", () => {
bus.subscribe("worker1", (e) => received1.push(e));
bus.subscribe("worker2", (e) => received2.push(e));

const targets = router.route("coordinator", { plan: "implement auth" });
const results = await router.route("coordinator", { plan: "implement auth" });

expect(targets).toEqual(["worker1", "worker2"]);
expect(results.map((r) => r.targetRole)).toEqual(["worker1", "worker2"]);
expect(received1).toHaveLength(1);
expect(received2).toHaveLength(1);
bus.close();
});

test("broadcastStop sends to all roles", () => {
test("broadcastStop sends to all roles", async () => {
const bus = new LocalEventBus();
const router = new TopologyRouter(reviewLoopTopology, bus);
const coderEvents: GroveEvent[] = [];
const reviewerEvents: GroveEvent[] = [];
bus.subscribe("coder", (e) => coderEvents.push(e));
bus.subscribe("reviewer", (e) => reviewerEvents.push(e));

router.broadcastStop("Budget exceeded");
await router.broadcastStop("Budget exceeded");

expect(coderEvents).toHaveLength(1);
expect(coderEvents[0]!.type).toBe("stop");
Expand Down Expand Up @@ -239,14 +252,13 @@ describe("TopologyRouter", () => {
const bus = new LocalEventBus();
const router = new TopologyRouter(multiEdge, bus);
const edges = router.targetsFor("coder");
// Both edges preserved — distinct (target, edgeType) pairs
expect(edges).toHaveLength(2);
expect(edges).toContainEqual({ target: "reviewer", edgeType: "delegates" });
expect(edges).toContainEqual({ target: "reviewer", edgeType: "feeds" });
bus.close();
});

test("route() publishes one event per target even when multiple edge types point to same target", () => {
test("route() publishes one event per target even when multiple edge types point to same target", async () => {
const multiEdge: AgentTopology = {
structure: "graph",
roles: [
Expand All @@ -265,10 +277,9 @@ describe("TopologyRouter", () => {
const received: GroveEvent[] = [];
bus.subscribe("reviewer", (e) => received.push(e));

const targets = router.route("coder", {});
const results = await router.route("coder", {});

// route() deduplicates by target: one event despite two distinct edges
expect(targets).toEqual(["reviewer"]);
expect(results.map((r) => r.targetRole)).toEqual(["reviewer"]);
expect(received).toHaveLength(1);
bus.close();
});
Expand All @@ -281,15 +292,14 @@ describe("TopologyRouter", () => {
name: "coder",
edges: [
{ target: "reviewer", edgeType: "delegates" },
{ target: "reviewer", edgeType: "delegates" }, // exact duplicate
{ target: "reviewer", edgeType: "delegates" },
],
},
{ name: "reviewer" },
],
};
const bus = new LocalEventBus();
const router = new TopologyRouter(exactDupes, bus);
// Exact (target, edgeType) duplicate is collapsed to one entry
expect(router.targetsFor("coder")).toEqual([{ target: "reviewer", edgeType: "delegates" }]);
bus.close();
});
Expand Down
19 changes: 15 additions & 4 deletions src/core/event-bus.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/**
* Event bus abstraction for agent communication.
*
* Two implementations:
* - LocalEventBus: Node.js EventEmitter for single-machine setups
* Three implementations:
* - LocalEventBus: Node.js EventEmitter for single-machine setups (tests/local)
* - NexusEventBus: Relays events via Nexus IPC API, returns message IDs
* - Future: RedisEventBus, NatsEventBus for federated setups
*/

Expand All @@ -15,13 +16,23 @@ export interface GroveEvent {
readonly timestamp: string;
}

/** Result of publishing an event. */
export interface PublishResult {
readonly ok: boolean;
/** IPC message ID — present when the event was relayed via Nexus IPC. */
readonly messageId?: string | undefined;
readonly error?: string | undefined;
/** True when failure is infrastructure (404, connection refused), not delivery rejection. */
readonly infrastructureError?: boolean | undefined;
}

/** Callback for event subscriptions. */
export type EventHandler = (event: GroveEvent) => void;

/** Event bus for ephemeral agent notifications. */
export interface EventBus {
/** Publish an event. */
publish(event: GroveEvent): void;
/** Publish an event. Returns delivery result with optional IPC message ID. */
publish(event: GroveEvent): Promise<PublishResult>;
/** Subscribe to events for a specific role. */
subscribe(role: string, handler: EventHandler): void;
/** Unsubscribe a handler. */
Expand Down
Loading
Loading