diff --git a/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts index 6390fa7c..278f4f95 100644 --- a/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts @@ -10,6 +10,11 @@ import { type Message, type MessageHandlerResult, } from '@event-driven-io/emmett'; +import { + SequentialTransform, + type SequentialTransformHandler, + type SequentialTransformHandlerResult, +} from '@event-driven-io/emmett/nodejs'; import { END, EventStoreDBClient, @@ -18,7 +23,7 @@ import { type ResolvedEvent, type StreamSubscription, } from '@eventstore/db-client'; -import { pipeline, Transform, Writable, type WritableOptions } from 'stream'; +import { pipeline, Writable, type WritableOptions } from 'stream'; import { mapFromESDBEvent, type EventStoreDBReadEventMetadata, @@ -120,49 +125,45 @@ type SubscriptionSequentialHandlerOptions< class SubscriptionSequentialHandler< MessageType extends AnyMessage = AnyMessage, -> extends Transform { +> extends SequentialTransform, bigint | null> { private options: SubscriptionSequentialHandlerOptions; private from: EventStoreDBEventStoreConsumerType | undefined; public isRunning: boolean; constructor(options: SubscriptionSequentialHandlerOptions) { - super({ objectMode: true, ...options }); - this.options = options; - this.from = options.from; - this.isRunning = true; - } - - async _transform( - resolvedEvent: ResolvedEvent, - _encoding: BufferEncoding, - callback: (error?: Error | null) => void, - ): Promise { - try { + const handler: SequentialTransformHandler< + ResolvedEvent, + bigint + > = async ( + resolvedEvent: ResolvedEvent, + ): Promise> => { if (!this.isRunning || !resolvedEvent.event) { - callback(); - return; + return { resultType: 'SKIP' }; } const message = mapFromESDBEvent(resolvedEvent, this.from); - const messageCheckpoint = getCheckpoint(message); + const messageCheckpoint = getCheckpoint(message)!; const result = await this.options.eachBatch([message]); if (result && result.type === 'STOP') { this.isRunning = false; - if (!result.error) this.push(messageCheckpoint); + if (!result.error) { + return { resultType: 'ACK', message: messageCheckpoint }; + } - this.push(result); - this.push(null); - callback(); - return; + return { resultType: 'STOP', ...result }; } - this.push(messageCheckpoint); - callback(); - } catch (error) { - callback(error as Error); - } + return { resultType: 'ACK', message: messageCheckpoint }; + }; + super({ + ...options, + handler, + }); + this.options = options; + this.from = options.from; + this.isRunning = true; } } diff --git a/src/packages/emmett/package.json b/src/packages/emmett/package.json index b5336c77..3b9e0fcf 100644 --- a/src/packages/emmett/package.json +++ b/src/packages/emmett/package.json @@ -48,6 +48,16 @@ "types": "./dist/cli.d.cts", "default": "./dist/cli.cjs" } + }, + "./nodejs": { + "import": { + "types": "./dist/nodejs.d.ts", + "default": "./dist/nodejs.js" + }, + "require": { + "types": "./dist/nodejs.d.cts", + "default": "./dist/nodejs.cjs" + } } }, "typesVersions": { @@ -57,6 +67,9 @@ ], "cli": [ "./dist/cli.d.ts" + ], + "nodejs": [ + "./dist/nodejs.d.ts" ] } }, diff --git a/src/packages/emmett/src/fusionStreams/index.ts b/src/packages/emmett/src/fusionStreams/index.ts new file mode 100644 index 00000000..38156cbd --- /dev/null +++ b/src/packages/emmett/src/fusionStreams/index.ts @@ -0,0 +1 @@ +export * from './transformations'; diff --git a/src/packages/emmett/src/fusionStreams/transformations/index.ts b/src/packages/emmett/src/fusionStreams/transformations/index.ts new file mode 100644 index 00000000..c258d561 --- /dev/null +++ b/src/packages/emmett/src/fusionStreams/transformations/index.ts @@ -0,0 +1 @@ +export * from './sequentialTransform'; diff --git a/src/packages/emmett/src/fusionStreams/transformations/sequentialTransform.ts b/src/packages/emmett/src/fusionStreams/transformations/sequentialTransform.ts new file mode 100644 index 00000000..4d6f9138 --- /dev/null +++ b/src/packages/emmett/src/fusionStreams/transformations/sequentialTransform.ts @@ -0,0 +1,67 @@ +import { Transform, type WritableOptions } from 'stream'; +import type { EmmettError } from '../../errors'; + +export type SequentialTransformHandlerResultType = 'ACK' | 'SKIP' | 'STOP'; + +export type SequentialTransformHandlerResult = + | { resultType: 'ACK'; message: OutgoingMessageType } + | { resultType: 'SKIP'; reason?: string } + | { resultType: 'STOP'; reason?: string; error?: EmmettError }; + +export type SequentialTransformHandler< + IncomingMessageType = unknown, + OutgoingMessageType = unknown, +> = ( + message: IncomingMessageType, +) => Promise>; + +export type SequentialTransformOptions< + IncomingMessageType = unknown, + OutgoingMessageType = unknown, +> = { + handler: SequentialTransformHandler; +} & WritableOptions; + +export class SequentialTransform< + IncomingMessageType = unknown, + OutgoingMessageType = unknown, +> extends Transform { + private handler: SequentialTransformHandler< + IncomingMessageType, + OutgoingMessageType + >; + constructor( + options: SequentialTransformOptions< + IncomingMessageType, + OutgoingMessageType + >, + ) { + super({ objectMode: true, ...options }); + this.handler = options.handler; + } + + async _transform( + message: IncomingMessageType, + _encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ): Promise { + try { + const result = await this.handler(message); + + switch (result.resultType) { + case 'ACK': + this.push(result); + break; + case 'SKIP': + break; + case 'STOP': + this.push(null); + break; + } + + callback(); + } catch (error) { + callback(error as Error); + } + } +} diff --git a/src/packages/emmett/src/nodejs.ts b/src/packages/emmett/src/nodejs.ts new file mode 100644 index 00000000..f130f972 --- /dev/null +++ b/src/packages/emmett/src/nodejs.ts @@ -0,0 +1 @@ +export * from './fusionStreams'; diff --git a/src/packages/emmett/src/typing/index.ts b/src/packages/emmett/src/typing/index.ts index 2ea5fce1..935a6c90 100644 --- a/src/packages/emmett/src/typing/index.ts +++ b/src/packages/emmett/src/typing/index.ts @@ -8,6 +8,8 @@ export * from './messageHandling'; export * from './decider'; export * from './workflow'; +export * from './result'; + export type Brand = K & { readonly __brand: T }; export type Flavour = K & { readonly __brand?: T }; diff --git a/src/packages/emmett/src/typing/result.ts b/src/packages/emmett/src/typing/result.ts new file mode 100644 index 00000000..4f53eff4 --- /dev/null +++ b/src/packages/emmett/src/typing/result.ts @@ -0,0 +1,65 @@ +import type { Brand } from '.'; + +export const EmptySuccessValue = Symbol.for('emt:result:success:emptyvalue'); +export type EmptySuccessValue = typeof EmptySuccessValue; + +export const EmptyFailureValue = Symbol.for('emt:result:failure:emptyvalue'); +export type EmptyFailureValue = typeof EmptyFailureValue; + +export type Success = Readonly< + Brand< + { + ok: true; + value: Value; + }, + 'SuccessResult' + > +>; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type AnySuccess = Success; + +export type Failure = Readonly< + Brand< + { + ok: false; + error: ErrorType; + }, + 'FailureResult' + > +>; + +export type Result = + | Success + | Failure; + +export const success = ( + ...args: Value extends EmptySuccessValue ? [] : [value: Value] +): Success => { + const [value] = args; + + return { + ok: true, + value: value ?? EmptySuccessValue, + __brand: 'SuccessResult', + } as unknown as Success; +}; +success.empty = success(); + +export const failure = ( + ...args: Error extends EmptyFailureValue ? [] : [error: Error] +): Failure => { + const [error] = args; + + return { + ok: false, + error: error ?? EmptyFailureValue, + __brand: 'FailureResult', + } as unknown as Failure; +}; +failure.empty = failure(); + +export const Result = { + success, + failure, +}; diff --git a/src/packages/emmett/tsup.config.ts b/src/packages/emmett/tsup.config.ts index 8872bdc9..1e54d40f 100644 --- a/src/packages/emmett/tsup.config.ts +++ b/src/packages/emmett/tsup.config.ts @@ -15,7 +15,7 @@ export default defineConfig([ watch: env === 'development', target: 'esnext', outDir: 'dist', //env === 'production' ? 'dist' : 'lib', - entry: ['src/index.ts', 'src/cli.ts'], + entry: ['src/index.ts', 'src/cli.ts', 'src/nodejs.ts'], sourcemap: true, tsconfig: 'tsconfig.build.json', // workaround for https://github.com/egoist/tsup/issues/571#issuecomment-1760052931 },