From c609ee26ffd16cc3c9ab470742b5df4e2aca6696 Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Thu, 22 Jan 2026 11:36:13 -0300 Subject: [PATCH] Event target Signed-off-by: Marcos Candeia --- apps/mesh/migrations/027-event-target.ts | 21 +++++++++++++++++++++ apps/mesh/migrations/index.ts | 2 ++ apps/mesh/src/api/app.ts | 1 + apps/mesh/src/event-bus/event-bus.ts | 1 + apps/mesh/src/event-bus/interface.ts | 6 ++++++ apps/mesh/src/storage/event-bus.ts | 20 +++++++++++++++++--- apps/mesh/src/storage/types.ts | 3 +++ 7 files changed, 51 insertions(+), 3 deletions(-) create mode 100644 apps/mesh/migrations/027-event-target.ts diff --git a/apps/mesh/migrations/027-event-target.ts b/apps/mesh/migrations/027-event-target.ts new file mode 100644 index 0000000000..d43328a026 --- /dev/null +++ b/apps/mesh/migrations/027-event-target.ts @@ -0,0 +1,21 @@ +/** + * Add target column to events table + * + * Allows events to be targeted to a specific connection. + * When target is set, only subscriptions from that connection will receive the event. + * When target is null, the event is broadcast to all matching subscriptions. + */ + +import { Kysely } from "kysely"; + +export async function up(db: Kysely): Promise { + // Add target column to events table + await db.schema + .alterTable("events") + .addColumn("target", "text") // Target connection ID (nullable = broadcast) + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema.alterTable("events").dropColumn("target").execute(); +} diff --git a/apps/mesh/migrations/index.ts b/apps/mesh/migrations/index.ts index 3fcc724a11..3162dafb59 100644 --- a/apps/mesh/migrations/index.ts +++ b/apps/mesh/migrations/index.ts @@ -25,6 +25,7 @@ import * as migration023optimizethreadindexes from "./023-optimize-thread-indexe import * as migration024consolidatevirtualmcp from "./024-consolidate-virtual-mcp.ts"; import * as migration025addmonitoringvirtualmcpid from "./025-add-monitoring-virtual-mcp-id.ts"; import * as migration026restrictchildconnectiondelete from "./026-restrict-child-connection-delete.ts"; +import * as migration027eventtarget from "./027-event-target.ts"; const migrations = { "001-initial-schema": migration001initialschema, @@ -55,6 +56,7 @@ const migrations = { "025-add-monitoring-virtual-mcp-id": migration025addmonitoringvirtualmcpid, "026-restrict-child-connection-delete": migration026restrictchildconnectiondelete, + "027-event-target": migration027eventtarget, } satisfies Record; export default migrations; diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index abb89b5c4f..6d10a76d29 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -596,6 +596,7 @@ export function createApp(options: CreateAppOptions = {}) { data: await c.req.json(), type: `public:${c.req.param("type")}`, subject: c.req.query("subject"), + target: c.req.query("target"), deliverAt: c.req.query("deliverAt"), cron: c.req.query("cron"), }, diff --git a/apps/mesh/src/event-bus/event-bus.ts b/apps/mesh/src/event-bus/event-bus.ts index 17f3a39de1..7540cc8ce5 100644 --- a/apps/mesh/src/event-bus/event-bus.ts +++ b/apps/mesh/src/event-bus/event-bus.ts @@ -113,6 +113,7 @@ export class EventBus implements IEventBus { subject: input.subject, time: now, data: input.data, + target: input.target, cron: input.cron, }); diff --git a/apps/mesh/src/event-bus/interface.ts b/apps/mesh/src/event-bus/interface.ts index 793ee85691..7f62db1f88 100644 --- a/apps/mesh/src/event-bus/interface.ts +++ b/apps/mesh/src/event-bus/interface.ts @@ -27,6 +27,12 @@ export interface PublishEventInput { subject?: string; /** Event payload (any JSON value) */ data?: unknown; + /** + * Optional target connection ID. + * If provided, only this specific connection will receive the event. + * If omitted, the event is broadcast to all matching subscriptions. + */ + target?: string; /** * Optional scheduled delivery time (ISO 8601 timestamp). * If provided, the event will not be delivered until this time. diff --git a/apps/mesh/src/storage/event-bus.ts b/apps/mesh/src/storage/event-bus.ts index 9c2dd324f7..23b4b7c87c 100644 --- a/apps/mesh/src/storage/event-bus.ts +++ b/apps/mesh/src/storage/event-bus.ts @@ -39,6 +39,8 @@ export interface CreateEventInput { datacontenttype?: string; dataschema?: string | null; data?: unknown | null; + /** Target connection ID - if set, only this connection will receive the event */ + target?: string | null; cron?: string | null; } @@ -304,6 +306,7 @@ class KyselyEventBusStorage implements EventBusStorage { datacontenttype: input.datacontenttype ?? "application/json", dataschema: input.dataschema ?? null, data: input.data ? JSON.stringify(input.data) : null, + target: input.target ?? null, cron: input.cron ?? null, status: "pending", attempts: 0, @@ -325,6 +328,7 @@ class KyselyEventBusStorage implements EventBusStorage { datacontenttype: input.datacontenttype ?? "application/json", dataschema: input.dataschema ?? null, data: input.data ?? null, + target: input.target ?? null, cron: input.cron ?? null, status: "pending", attempts: 0, @@ -477,7 +481,7 @@ class KyselyEventBusStorage implements EventBusStorage { async getMatchingSubscriptions(event: Event): Promise { // Find enabled subscriptions that match the event type // and either have no publisher filter or match the event source - const rows = await this.db + let query = this.db .selectFrom("event_subscriptions") .selectAll() .where("organization_id", "=", event.organizationId) @@ -488,8 +492,14 @@ class KyselyEventBusStorage implements EventBusStorage { eb("publisher", "is", null), eb("publisher", "=", event.source), ]), - ) - .execute(); + ); + + // If target is specified, only deliver to that specific connection + if (event.target) { + query = query.where("connection_id", "=", event.target); + } + + const rows = await query.execute(); return rows.map((row) => ({ id: row.id, @@ -630,6 +640,7 @@ class KyselyEventBusStorage implements EventBusStorage { "e.datacontenttype", "e.dataschema", "e.data", + "e.target", "e.cron", "e.status as event_status", "e.attempts as event_attempts", @@ -673,6 +684,7 @@ class KyselyEventBusStorage implements EventBusStorage { datacontenttype: row.datacontenttype, dataschema: row.dataschema, data: row.data ? JSON.parse(row.data as string) : null, + target: row.target, cron: row.cron, status: row.event_status as EventStatus, attempts: row.event_attempts, @@ -841,6 +853,7 @@ class KyselyEventBusStorage implements EventBusStorage { datacontenttype: row.datacontenttype, dataschema: row.dataschema, data: row.data ? JSON.parse(row.data as string) : null, + target: row.target, cron: row.cron, status: row.status as EventStatus, attempts: row.attempts, @@ -883,6 +896,7 @@ class KyselyEventBusStorage implements EventBusStorage { datacontenttype: row.datacontenttype, dataschema: row.dataschema, data: row.data ? JSON.parse(row.data as string) : null, + target: row.target, cron: row.cron, status: row.status as EventStatus, attempts: row.attempts, diff --git a/apps/mesh/src/storage/types.ts b/apps/mesh/src/storage/types.ts index b60ae26df2..8e66155e9e 100644 --- a/apps/mesh/src/storage/types.ts +++ b/apps/mesh/src/storage/types.ts @@ -462,6 +462,8 @@ export interface EventTable { datacontenttype: string; // Content type (default: "application/json") dataschema: string | null; // Schema URI data: JsonObject | null; // JSON payload + // Routing + target: string | null; // Target connection ID (null = broadcast to all matching subscriptions) // Recurring event support cron: string | null; // Cron expression for recurring delivery // Delivery tracking @@ -488,6 +490,7 @@ export interface Event { datacontenttype: string; dataschema: string | null; data: unknown | null; + target: string | null; cron: string | null; status: EventStatus; attempts: number;