Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions apps/mesh/migrations/027-event-target.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* Add target column to events table
*
* Allows events to be targeted to a specific connection.
* When target is set, only subscriptions from that connection will receive the event.
* When target is null, the event is broadcast to all matching subscriptions.
*/

import { Kysely } from "kysely";

export async function up(db: Kysely<unknown>): Promise<void> {
// Add target column to events table
await db.schema
.alterTable("events")
.addColumn("target", "text") // Target connection ID (nullable = broadcast)
.execute();
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.alterTable("events").dropColumn("target").execute();
}
2 changes: 2 additions & 0 deletions apps/mesh/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import * as migration023optimizethreadindexes from "./023-optimize-thread-indexe
import * as migration024consolidatevirtualmcp from "./024-consolidate-virtual-mcp.ts";
import * as migration025addmonitoringvirtualmcpid from "./025-add-monitoring-virtual-mcp-id.ts";
import * as migration026restrictchildconnectiondelete from "./026-restrict-child-connection-delete.ts";
import * as migration027eventtarget from "./027-event-target.ts";

const migrations = {
"001-initial-schema": migration001initialschema,
Expand Down Expand Up @@ -55,6 +56,7 @@ const migrations = {
"025-add-monitoring-virtual-mcp-id": migration025addmonitoringvirtualmcpid,
"026-restrict-child-connection-delete":
migration026restrictchildconnectiondelete,
"027-event-target": migration027eventtarget,
} satisfies Record<string, Migration>;

export default migrations;
1 change: 1 addition & 0 deletions apps/mesh/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ export function createApp(options: CreateAppOptions = {}) {
data: await c.req.json(),
type: `public:${c.req.param("type")}`,
subject: c.req.query("subject"),
target: c.req.query("target"),
deliverAt: c.req.query("deliverAt"),
cron: c.req.query("cron"),
},
Expand Down
1 change: 1 addition & 0 deletions apps/mesh/src/event-bus/event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ export class EventBus implements IEventBus {
subject: input.subject,
time: now,
data: input.data,
target: input.target,
cron: input.cron,
});

Expand Down
6 changes: 6 additions & 0 deletions apps/mesh/src/event-bus/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ export interface PublishEventInput {
subject?: string;
/** Event payload (any JSON value) */
data?: unknown;
/**
* Optional target connection ID.
* If provided, only this specific connection will receive the event.
* If omitted, the event is broadcast to all matching subscriptions.
*/
target?: string;
/**
* Optional scheduled delivery time (ISO 8601 timestamp).
* If provided, the event will not be delivered until this time.
Expand Down
20 changes: 17 additions & 3 deletions apps/mesh/src/storage/event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ export interface CreateEventInput {
datacontenttype?: string;
dataschema?: string | null;
data?: unknown | null;
/** Target connection ID - if set, only this connection will receive the event */
target?: string | null;
cron?: string | null;
}

Expand Down Expand Up @@ -304,6 +306,7 @@ class KyselyEventBusStorage implements EventBusStorage {
datacontenttype: input.datacontenttype ?? "application/json",
dataschema: input.dataschema ?? null,
data: input.data ? JSON.stringify(input.data) : null,
target: input.target ?? null,
cron: input.cron ?? null,
status: "pending",
attempts: 0,
Expand All @@ -325,6 +328,7 @@ class KyselyEventBusStorage implements EventBusStorage {
datacontenttype: input.datacontenttype ?? "application/json",
dataschema: input.dataschema ?? null,
data: input.data ?? null,
target: input.target ?? null,
cron: input.cron ?? null,
status: "pending",
attempts: 0,
Expand Down Expand Up @@ -477,7 +481,7 @@ class KyselyEventBusStorage implements EventBusStorage {
async getMatchingSubscriptions(event: Event): Promise<EventSubscription[]> {
// Find enabled subscriptions that match the event type
// and either have no publisher filter or match the event source
const rows = await this.db
let query = this.db
.selectFrom("event_subscriptions")
.selectAll()
.where("organization_id", "=", event.organizationId)
Expand All @@ -488,8 +492,14 @@ class KyselyEventBusStorage implements EventBusStorage {
eb("publisher", "is", null),
eb("publisher", "=", event.source),
]),
)
.execute();
);

// If target is specified, only deliver to that specific connection
if (event.target) {
query = query.where("connection_id", "=", event.target);
}

const rows = await query.execute();

return rows.map((row) => ({
id: row.id,
Expand Down Expand Up @@ -630,6 +640,7 @@ class KyselyEventBusStorage implements EventBusStorage {
"e.datacontenttype",
"e.dataschema",
"e.data",
"e.target",
"e.cron",
"e.status as event_status",
"e.attempts as event_attempts",
Expand Down Expand Up @@ -673,6 +684,7 @@ class KyselyEventBusStorage implements EventBusStorage {
datacontenttype: row.datacontenttype,
dataschema: row.dataschema,
data: row.data ? JSON.parse(row.data as string) : null,
target: row.target,
cron: row.cron,
status: row.event_status as EventStatus,
attempts: row.event_attempts,
Expand Down Expand Up @@ -841,6 +853,7 @@ class KyselyEventBusStorage implements EventBusStorage {
datacontenttype: row.datacontenttype,
dataschema: row.dataschema,
data: row.data ? JSON.parse(row.data as string) : null,
target: row.target,
cron: row.cron,
status: row.status as EventStatus,
attempts: row.attempts,
Expand Down Expand Up @@ -883,6 +896,7 @@ class KyselyEventBusStorage implements EventBusStorage {
datacontenttype: row.datacontenttype,
dataschema: row.dataschema,
data: row.data ? JSON.parse(row.data as string) : null,
target: row.target,
cron: row.cron,
status: row.status as EventStatus,
attempts: row.attempts,
Expand Down
3 changes: 3 additions & 0 deletions apps/mesh/src/storage/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ export interface EventTable {
datacontenttype: string; // Content type (default: "application/json")
dataschema: string | null; // Schema URI
data: JsonObject<unknown> | null; // JSON payload
// Routing
target: string | null; // Target connection ID (null = broadcast to all matching subscriptions)
// Recurring event support
cron: string | null; // Cron expression for recurring delivery
// Delivery tracking
Expand All @@ -488,6 +490,7 @@ export interface Event {
datacontenttype: string;
dataschema: string | null;
data: unknown | null;
target: string | null;
cron: string | null;
status: EventStatus;
attempts: number;
Expand Down
Loading