Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import {
type Message,
type MessageHandlerResult,
} from '@event-driven-io/emmett';
import {
SequentialTransform,
type SequentialTransformHandler,
type SequentialTransformHandlerResult,
} from '@event-driven-io/emmett/nodejs';
import {
END,
EventStoreDBClient,
Expand All @@ -18,7 +23,7 @@ import {
type ResolvedEvent,
type StreamSubscription,
} from '@eventstore/db-client';
import { pipeline, Transform, Writable, type WritableOptions } from 'stream';
import { pipeline, Writable, type WritableOptions } from 'stream';
import {
mapFromESDBEvent,
type EventStoreDBReadEventMetadata,
Expand Down Expand Up @@ -120,49 +125,45 @@ type SubscriptionSequentialHandlerOptions<

class SubscriptionSequentialHandler<
MessageType extends AnyMessage = AnyMessage,
> extends Transform {
> extends SequentialTransform<ResolvedEvent<MessageType>, bigint | null> {
private options: SubscriptionSequentialHandlerOptions<MessageType>;
private from: EventStoreDBEventStoreConsumerType | undefined;
public isRunning: boolean;

constructor(options: SubscriptionSequentialHandlerOptions<MessageType>) {
super({ objectMode: true, ...options });
this.options = options;
this.from = options.from;
this.isRunning = true;
}

async _transform(
resolvedEvent: ResolvedEvent<MessageType>,
_encoding: BufferEncoding,
callback: (error?: Error | null) => void,
): Promise<void> {
try {
const handler: SequentialTransformHandler<
ResolvedEvent<MessageType>,
bigint
> = async (
resolvedEvent: ResolvedEvent<MessageType>,
): Promise<SequentialTransformHandlerResult<bigint>> => {
if (!this.isRunning || !resolvedEvent.event) {
callback();
return;
return { resultType: 'SKIP' };
}

const message = mapFromESDBEvent(resolvedEvent, this.from);
const messageCheckpoint = getCheckpoint(message);
const messageCheckpoint = getCheckpoint(message)!;

const result = await this.options.eachBatch([message]);

if (result && result.type === 'STOP') {
this.isRunning = false;
if (!result.error) this.push(messageCheckpoint);
if (!result.error) {
return { resultType: 'ACK', message: messageCheckpoint };
}

this.push(result);
this.push(null);
callback();
return;
return { resultType: 'STOP', ...result };
}

this.push(messageCheckpoint);
callback();
} catch (error) {
callback(error as Error);
}
return { resultType: 'ACK', message: messageCheckpoint };
};
super({
...options,
handler,
});
this.options = options;
this.from = options.from;
this.isRunning = true;
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/packages/emmett/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@
"types": "./dist/cli.d.cts",
"default": "./dist/cli.cjs"
}
},
"./nodejs": {
"import": {
"types": "./dist/nodejs.d.ts",
"default": "./dist/nodejs.js"
},
"require": {
"types": "./dist/nodejs.d.cts",
"default": "./dist/nodejs.cjs"
}
}
},
"typesVersions": {
Expand All @@ -57,6 +67,9 @@
],
"cli": [
"./dist/cli.d.ts"
],
"nodejs": [
"./dist/nodejs.d.ts"
]
}
},
Expand Down
1 change: 1 addition & 0 deletions src/packages/emmett/src/fusionStreams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './transformations';
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './sequentialTransform';
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { Transform, type WritableOptions } from 'stream';
import type { EmmettError } from '../../errors';

export type SequentialTransformHandlerResultType = 'ACK' | 'SKIP' | 'STOP';

export type SequentialTransformHandlerResult<OutgoingMessageType = unknown> =
| { resultType: 'ACK'; message: OutgoingMessageType }
| { resultType: 'SKIP'; reason?: string }
| { resultType: 'STOP'; reason?: string; error?: EmmettError };

export type SequentialTransformHandler<
IncomingMessageType = unknown,
OutgoingMessageType = unknown,
> = (
message: IncomingMessageType,
) => Promise<SequentialTransformHandlerResult<OutgoingMessageType>>;

export type SequentialTransformOptions<
IncomingMessageType = unknown,
OutgoingMessageType = unknown,
> = {
handler: SequentialTransformHandler<IncomingMessageType, OutgoingMessageType>;
} & WritableOptions;

export class SequentialTransform<
IncomingMessageType = unknown,
OutgoingMessageType = unknown,
> extends Transform {
private handler: SequentialTransformHandler<
IncomingMessageType,
OutgoingMessageType
>;
constructor(
options: SequentialTransformOptions<
IncomingMessageType,
OutgoingMessageType
>,
) {
super({ objectMode: true, ...options });
this.handler = options.handler;
}

async _transform(
message: IncomingMessageType,
_encoding: BufferEncoding,
callback: (error?: Error | null) => void,
): Promise<void> {
try {
const result = await this.handler(message);

switch (result.resultType) {
case 'ACK':
this.push(result);
break;
case 'SKIP':
break;
case 'STOP':
this.push(null);
break;
}

callback();
} catch (error) {
callback(error as Error);
}
}
}
1 change: 1 addition & 0 deletions src/packages/emmett/src/nodejs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './fusionStreams';
2 changes: 2 additions & 0 deletions src/packages/emmett/src/typing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export * from './messageHandling';
export * from './decider';
export * from './workflow';

export * from './result';

export type Brand<K, T> = K & { readonly __brand: T };
export type Flavour<K, T> = K & { readonly __brand?: T };

Expand Down
65 changes: 65 additions & 0 deletions src/packages/emmett/src/typing/result.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import type { Brand } from '.';

export const EmptySuccessValue = Symbol.for('emt:result:success:emptyvalue');
export type EmptySuccessValue = typeof EmptySuccessValue;

export const EmptyFailureValue = Symbol.for('emt:result:failure:emptyvalue');
export type EmptyFailureValue = typeof EmptyFailureValue;

export type Success<Value = EmptySuccessValue> = Readonly<
Brand<
{
ok: true;
value: Value;
},
'SuccessResult'
>
>;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type AnySuccess = Success<any>;

export type Failure<ErrorType = unknown> = Readonly<
Brand<
{
ok: false;
error: ErrorType;
},
'FailureResult'
>
>;

export type Result<Value = EmptySuccessValue, ErrorType = EmptyFailureValue> =
| Success<Value>
| Failure<ErrorType>;

export const success = <Value = EmptySuccessValue>(
...args: Value extends EmptySuccessValue ? [] : [value: Value]
): Success<Value> => {
const [value] = args;

return {
ok: true,
value: value ?? EmptySuccessValue,
__brand: 'SuccessResult',
} as unknown as Success<Value>;
};
success.empty = success();

export const failure = <Error = EmptyFailureValue>(
...args: Error extends EmptyFailureValue ? [] : [error: Error]
): Failure<Error> => {
const [error] = args;

return {
ok: false,
error: error ?? EmptyFailureValue,
__brand: 'FailureResult',
} as unknown as Failure<Error>;
};
failure.empty = failure();

export const Result = {
success,
failure,
};
2 changes: 1 addition & 1 deletion src/packages/emmett/tsup.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export default defineConfig([
watch: env === 'development',
target: 'esnext',
outDir: 'dist', //env === 'production' ? 'dist' : 'lib',
entry: ['src/index.ts', 'src/cli.ts'],
entry: ['src/index.ts', 'src/cli.ts', 'src/nodejs.ts'],
sourcemap: true,
tsconfig: 'tsconfig.build.json', // workaround for https://github.com/egoist/tsup/issues/571#issuecomment-1760052931
},
Expand Down
Loading