From 14026dddabcbbe5fb819c7c6c1223df455413198 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Thu, 4 Dec 2025 09:27:42 +0100 Subject: [PATCH 1/4] Added setup for the PoC of Fusion Streams --- src/packages/emmett/package.json | 13 +++++++++++++ src/packages/emmett/src/fusionStreams.ts | 1 + src/packages/emmett/src/fusionStreams/index.ts | 0 src/packages/emmett/tsup.config.ts | 2 +- 4 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 src/packages/emmett/src/fusionStreams.ts create mode 100644 src/packages/emmett/src/fusionStreams/index.ts diff --git a/src/packages/emmett/package.json b/src/packages/emmett/package.json index b5336c77..82a87ee1 100644 --- a/src/packages/emmett/package.json +++ b/src/packages/emmett/package.json @@ -48,6 +48,16 @@ "types": "./dist/cli.d.cts", "default": "./dist/cli.cjs" } + }, + "./fusionStreams": { + "import": { + "types": "./dist/fusionStreams.d.ts", + "default": "./dist/fusionStreams.js" + }, + "require": { + "types": "./dist/fusionStreams.d.cts", + "default": "./dist/fusionStreams.cjs" + } } }, "typesVersions": { @@ -57,6 +67,9 @@ ], "cli": [ "./dist/cli.d.ts" + ], + "fusionStreams": [ + "./dist/fusionStreams.d.ts" ] } }, diff --git a/src/packages/emmett/src/fusionStreams.ts b/src/packages/emmett/src/fusionStreams.ts new file mode 100644 index 00000000..f130f972 --- /dev/null +++ b/src/packages/emmett/src/fusionStreams.ts @@ -0,0 +1 @@ +export * from './fusionStreams'; diff --git a/src/packages/emmett/src/fusionStreams/index.ts b/src/packages/emmett/src/fusionStreams/index.ts new file mode 100644 index 00000000..e69de29b diff --git a/src/packages/emmett/tsup.config.ts b/src/packages/emmett/tsup.config.ts index 8872bdc9..b32c83e7 100644 --- a/src/packages/emmett/tsup.config.ts +++ b/src/packages/emmett/tsup.config.ts @@ -15,7 +15,7 @@ export default defineConfig([ watch: env === 'development', target: 'esnext', outDir: 'dist', //env === 'production' ? 'dist' : 'lib', - entry: ['src/index.ts', 'src/cli.ts'], + entry: ['src/index.ts', 'src/cli.ts', 'src/fusionStreams.ts'], sourcemap: true, tsconfig: 'tsconfig.build.json', // workaround for https://github.com/egoist/tsup/issues/571#issuecomment-1760052931 }, From b4bc2b2239a8e425443caf832e52b4327b29a17d Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Thu, 4 Dec 2025 16:47:39 +0100 Subject: [PATCH 2/4] Added basic base class for Node.js streams stransformations --- .../emmett/src/fusionStreams/index.ts | 1 + .../fusionStreams/transformations/index.ts | 1 + .../sequentialTransformation.ts | 52 +++++++++++++++++++ 3 files changed, 54 insertions(+) create mode 100644 src/packages/emmett/src/fusionStreams/transformations/index.ts create mode 100644 src/packages/emmett/src/fusionStreams/transformations/sequentialTransformation.ts diff --git a/src/packages/emmett/src/fusionStreams/index.ts b/src/packages/emmett/src/fusionStreams/index.ts index e69de29b..38156cbd 100644 --- a/src/packages/emmett/src/fusionStreams/index.ts +++ b/src/packages/emmett/src/fusionStreams/index.ts @@ -0,0 +1 @@ +export * from './transformations'; diff --git a/src/packages/emmett/src/fusionStreams/transformations/index.ts b/src/packages/emmett/src/fusionStreams/transformations/index.ts new file mode 100644 index 00000000..953f3aa1 --- /dev/null +++ b/src/packages/emmett/src/fusionStreams/transformations/index.ts @@ -0,0 +1 @@ +export * from './sequentialTransformation'; diff --git a/src/packages/emmett/src/fusionStreams/transformations/sequentialTransformation.ts b/src/packages/emmett/src/fusionStreams/transformations/sequentialTransformation.ts new file mode 100644 index 00000000..7956ca21 --- /dev/null +++ b/src/packages/emmett/src/fusionStreams/transformations/sequentialTransformation.ts @@ -0,0 +1,52 @@ +import { Transform, type WritableOptions } from 'stream'; + +export type SequentialTransformHandler< + IncomingMessageType = unknown, + OutgoingMessageType = unknown, +> = { + handler: ( + message: IncomingMessageType, + ) => Promise; +}; + +export type SequentialTransformOptions< + IncomingMessageType = unknown, + OutgoingMessageType = unknown, +> = { + handler: SequentialTransformHandler; +} & WritableOptions; + +export class SequentialTransform< + IncomingMessageType = unknown, + OutgoingMessageType = unknown, +> extends Transform { + private handler: SequentialTransformHandler< + IncomingMessageType, + OutgoingMessageType + >; + constructor( + options: SequentialTransformOptions< + IncomingMessageType, + OutgoingMessageType + >, + ) { + super({ objectMode: true, ...options }); + this.handler = options.handler; + } + + async _transform( + message: IncomingMessageType, + _encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ): Promise { + try { + const result = await this.handler.handler(message); + + this.push(result); + + callback(); + } catch (error) { + callback(error as Error); + } + } +} From 9588abe6b773287d27921367a0a5fa8f173e778b Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Mon, 8 Dec 2025 14:16:40 +0100 Subject: [PATCH 3/4] Used SequentialTransform base class in ESDB consumer --- .../consumers/subscriptions/index.ts | 55 ++++++++++--------- src/packages/emmett/package.json | 14 ++--- .../fusionStreams/transformations/index.ts | 2 +- ...ansformation.ts => sequentialTransform.ts} | 29 +++++++--- .../src/{fusionStreams.ts => nodejs.ts} | 0 src/packages/emmett/tsup.config.ts | 2 +- 6 files changed, 59 insertions(+), 43 deletions(-) rename src/packages/emmett/src/fusionStreams/transformations/{sequentialTransformation.ts => sequentialTransform.ts} (58%) rename src/packages/emmett/src/{fusionStreams.ts => nodejs.ts} (100%) diff --git a/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts index 6390fa7c..278f4f95 100644 --- a/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts @@ -10,6 +10,11 @@ import { type Message, type MessageHandlerResult, } from '@event-driven-io/emmett'; +import { + SequentialTransform, + type SequentialTransformHandler, + type SequentialTransformHandlerResult, +} from '@event-driven-io/emmett/nodejs'; import { END, EventStoreDBClient, @@ -18,7 +23,7 @@ import { type ResolvedEvent, type StreamSubscription, } from '@eventstore/db-client'; -import { pipeline, Transform, Writable, type WritableOptions } from 'stream'; +import { pipeline, Writable, type WritableOptions } from 'stream'; import { mapFromESDBEvent, type EventStoreDBReadEventMetadata, @@ -120,49 +125,45 @@ type SubscriptionSequentialHandlerOptions< class SubscriptionSequentialHandler< MessageType extends AnyMessage = AnyMessage, -> extends Transform { +> extends SequentialTransform, bigint | null> { private options: SubscriptionSequentialHandlerOptions; private from: EventStoreDBEventStoreConsumerType | undefined; public isRunning: boolean; constructor(options: SubscriptionSequentialHandlerOptions) { - super({ objectMode: true, ...options }); - this.options = options; - this.from = options.from; - this.isRunning = true; - } - - async _transform( - resolvedEvent: ResolvedEvent, - _encoding: BufferEncoding, - callback: (error?: Error | null) => void, - ): Promise { - try { + const handler: SequentialTransformHandler< + ResolvedEvent, + bigint + > = async ( + resolvedEvent: ResolvedEvent, + ): Promise> => { if (!this.isRunning || !resolvedEvent.event) { - callback(); - return; + return { resultType: 'SKIP' }; } const message = mapFromESDBEvent(resolvedEvent, this.from); - const messageCheckpoint = getCheckpoint(message); + const messageCheckpoint = getCheckpoint(message)!; const result = await this.options.eachBatch([message]); if (result && result.type === 'STOP') { this.isRunning = false; - if (!result.error) this.push(messageCheckpoint); + if (!result.error) { + return { resultType: 'ACK', message: messageCheckpoint }; + } - this.push(result); - this.push(null); - callback(); - return; + return { resultType: 'STOP', ...result }; } - this.push(messageCheckpoint); - callback(); - } catch (error) { - callback(error as Error); - } + return { resultType: 'ACK', message: messageCheckpoint }; + }; + super({ + ...options, + handler, + }); + this.options = options; + this.from = options.from; + this.isRunning = true; } } diff --git a/src/packages/emmett/package.json b/src/packages/emmett/package.json index 82a87ee1..3b9e0fcf 100644 --- a/src/packages/emmett/package.json +++ b/src/packages/emmett/package.json @@ -49,14 +49,14 @@ "default": "./dist/cli.cjs" } }, - "./fusionStreams": { + "./nodejs": { "import": { - "types": "./dist/fusionStreams.d.ts", - "default": "./dist/fusionStreams.js" + "types": "./dist/nodejs.d.ts", + "default": "./dist/nodejs.js" }, "require": { - "types": "./dist/fusionStreams.d.cts", - "default": "./dist/fusionStreams.cjs" + "types": "./dist/nodejs.d.cts", + "default": "./dist/nodejs.cjs" } } }, @@ -68,8 +68,8 @@ "cli": [ "./dist/cli.d.ts" ], - "fusionStreams": [ - "./dist/fusionStreams.d.ts" + "nodejs": [ + "./dist/nodejs.d.ts" ] } }, diff --git a/src/packages/emmett/src/fusionStreams/transformations/index.ts b/src/packages/emmett/src/fusionStreams/transformations/index.ts index 953f3aa1..c258d561 100644 --- a/src/packages/emmett/src/fusionStreams/transformations/index.ts +++ b/src/packages/emmett/src/fusionStreams/transformations/index.ts @@ -1 +1 @@ -export * from './sequentialTransformation'; +export * from './sequentialTransform'; diff --git a/src/packages/emmett/src/fusionStreams/transformations/sequentialTransformation.ts b/src/packages/emmett/src/fusionStreams/transformations/sequentialTransform.ts similarity index 58% rename from src/packages/emmett/src/fusionStreams/transformations/sequentialTransformation.ts rename to src/packages/emmett/src/fusionStreams/transformations/sequentialTransform.ts index 7956ca21..4d6f9138 100644 --- a/src/packages/emmett/src/fusionStreams/transformations/sequentialTransformation.ts +++ b/src/packages/emmett/src/fusionStreams/transformations/sequentialTransform.ts @@ -1,13 +1,19 @@ import { Transform, type WritableOptions } from 'stream'; +import type { EmmettError } from '../../errors'; + +export type SequentialTransformHandlerResultType = 'ACK' | 'SKIP' | 'STOP'; + +export type SequentialTransformHandlerResult = + | { resultType: 'ACK'; message: OutgoingMessageType } + | { resultType: 'SKIP'; reason?: string } + | { resultType: 'STOP'; reason?: string; error?: EmmettError }; export type SequentialTransformHandler< IncomingMessageType = unknown, OutgoingMessageType = unknown, -> = { - handler: ( - message: IncomingMessageType, - ) => Promise; -}; +> = ( + message: IncomingMessageType, +) => Promise>; export type SequentialTransformOptions< IncomingMessageType = unknown, @@ -40,9 +46,18 @@ export class SequentialTransform< callback: (error?: Error | null) => void, ): Promise { try { - const result = await this.handler.handler(message); + const result = await this.handler(message); - this.push(result); + switch (result.resultType) { + case 'ACK': + this.push(result); + break; + case 'SKIP': + break; + case 'STOP': + this.push(null); + break; + } callback(); } catch (error) { diff --git a/src/packages/emmett/src/fusionStreams.ts b/src/packages/emmett/src/nodejs.ts similarity index 100% rename from src/packages/emmett/src/fusionStreams.ts rename to src/packages/emmett/src/nodejs.ts diff --git a/src/packages/emmett/tsup.config.ts b/src/packages/emmett/tsup.config.ts index b32c83e7..1e54d40f 100644 --- a/src/packages/emmett/tsup.config.ts +++ b/src/packages/emmett/tsup.config.ts @@ -15,7 +15,7 @@ export default defineConfig([ watch: env === 'development', target: 'esnext', outDir: 'dist', //env === 'production' ? 'dist' : 'lib', - entry: ['src/index.ts', 'src/cli.ts', 'src/fusionStreams.ts'], + entry: ['src/index.ts', 'src/cli.ts', 'src/nodejs.ts'], sourcemap: true, tsconfig: 'tsconfig.build.json', // workaround for https://github.com/egoist/tsup/issues/571#issuecomment-1760052931 }, From d79455f74f81772a83a38c267ee17aed46256d7e Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Wed, 10 Dec 2025 18:40:21 +0100 Subject: [PATCH 4/4] Added Result type --- src/packages/emmett/src/typing/index.ts | 2 + src/packages/emmett/src/typing/result.ts | 65 ++++++++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 src/packages/emmett/src/typing/result.ts diff --git a/src/packages/emmett/src/typing/index.ts b/src/packages/emmett/src/typing/index.ts index 2ea5fce1..935a6c90 100644 --- a/src/packages/emmett/src/typing/index.ts +++ b/src/packages/emmett/src/typing/index.ts @@ -8,6 +8,8 @@ export * from './messageHandling'; export * from './decider'; export * from './workflow'; +export * from './result'; + export type Brand = K & { readonly __brand: T }; export type Flavour = K & { readonly __brand?: T }; diff --git a/src/packages/emmett/src/typing/result.ts b/src/packages/emmett/src/typing/result.ts new file mode 100644 index 00000000..4f53eff4 --- /dev/null +++ b/src/packages/emmett/src/typing/result.ts @@ -0,0 +1,65 @@ +import type { Brand } from '.'; + +export const EmptySuccessValue = Symbol.for('emt:result:success:emptyvalue'); +export type EmptySuccessValue = typeof EmptySuccessValue; + +export const EmptyFailureValue = Symbol.for('emt:result:failure:emptyvalue'); +export type EmptyFailureValue = typeof EmptyFailureValue; + +export type Success = Readonly< + Brand< + { + ok: true; + value: Value; + }, + 'SuccessResult' + > +>; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type AnySuccess = Success; + +export type Failure = Readonly< + Brand< + { + ok: false; + error: ErrorType; + }, + 'FailureResult' + > +>; + +export type Result = + | Success + | Failure; + +export const success = ( + ...args: Value extends EmptySuccessValue ? [] : [value: Value] +): Success => { + const [value] = args; + + return { + ok: true, + value: value ?? EmptySuccessValue, + __brand: 'SuccessResult', + } as unknown as Success; +}; +success.empty = success(); + +export const failure = ( + ...args: Error extends EmptyFailureValue ? [] : [error: Error] +): Failure => { + const [error] = args; + + return { + ok: false, + error: error ?? EmptyFailureValue, + __brand: 'FailureResult', + } as unknown as Failure; +}; +failure.empty = failure(); + +export const Result = { + success, + failure, +};