Skip to content

Commit 852b6e7

Browse files
committed
Used SequentialTransform base class in ESDB consumer
1 parent 79db026 commit 852b6e7

File tree

6 files changed

+59
-43
lines changed

6 files changed

+59
-43
lines changed

src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ import {
1010
type Message,
1111
type MessageHandlerResult,
1212
} from '@event-driven-io/emmett';
13+
import {
14+
SequentialTransform,
15+
type SequentialTransformHandler,
16+
type SequentialTransformHandlerResult,
17+
} from '@event-driven-io/emmett/nodejs';
1318
import {
1419
END,
1520
EventStoreDBClient,
@@ -18,7 +23,7 @@ import {
1823
type ResolvedEvent,
1924
type StreamSubscription,
2025
} from '@eventstore/db-client';
21-
import { pipeline, Transform, Writable, type WritableOptions } from 'stream';
26+
import { pipeline, Writable, type WritableOptions } from 'stream';
2227
import {
2328
mapFromESDBEvent,
2429
type EventStoreDBReadEventMetadata,
@@ -120,49 +125,45 @@ type SubscriptionSequentialHandlerOptions<
120125

121126
class SubscriptionSequentialHandler<
122127
MessageType extends AnyMessage = AnyMessage,
123-
> extends Transform {
128+
> extends SequentialTransform<ResolvedEvent<MessageType>, bigint | null> {
124129
private options: SubscriptionSequentialHandlerOptions<MessageType>;
125130
private from: EventStoreDBEventStoreConsumerType | undefined;
126131
public isRunning: boolean;
127132

128133
constructor(options: SubscriptionSequentialHandlerOptions<MessageType>) {
129-
super({ objectMode: true, ...options });
130-
this.options = options;
131-
this.from = options.from;
132-
this.isRunning = true;
133-
}
134-
135-
async _transform(
136-
resolvedEvent: ResolvedEvent<MessageType>,
137-
_encoding: BufferEncoding,
138-
callback: (error?: Error | null) => void,
139-
): Promise<void> {
140-
try {
134+
const handler: SequentialTransformHandler<
135+
ResolvedEvent<MessageType>,
136+
bigint
137+
> = async (
138+
resolvedEvent: ResolvedEvent<MessageType>,
139+
): Promise<SequentialTransformHandlerResult<bigint>> => {
141140
if (!this.isRunning || !resolvedEvent.event) {
142-
callback();
143-
return;
141+
return { resultType: 'SKIP' };
144142
}
145143

146144
const message = mapFromESDBEvent(resolvedEvent, this.from);
147-
const messageCheckpoint = getCheckpoint(message);
145+
const messageCheckpoint = getCheckpoint(message)!;
148146

149147
const result = await this.options.eachBatch([message]);
150148

151149
if (result && result.type === 'STOP') {
152150
this.isRunning = false;
153-
if (!result.error) this.push(messageCheckpoint);
151+
if (!result.error) {
152+
return { resultType: 'ACK', message: messageCheckpoint };
153+
}
154154

155-
this.push(result);
156-
this.push(null);
157-
callback();
158-
return;
155+
return { resultType: 'STOP', ...result };
159156
}
160157

161-
this.push(messageCheckpoint);
162-
callback();
163-
} catch (error) {
164-
callback(error as Error);
165-
}
158+
return { resultType: 'ACK', message: messageCheckpoint };
159+
};
160+
super({
161+
...options,
162+
handler,
163+
});
164+
this.options = options;
165+
this.from = options.from;
166+
this.isRunning = true;
166167
}
167168
}
168169

src/packages/emmett/package.json

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,14 @@
4949
"default": "./dist/cli.cjs"
5050
}
5151
},
52-
"./fusionStreams": {
52+
"./nodejs": {
5353
"import": {
54-
"types": "./dist/fusionStreams.d.ts",
55-
"default": "./dist/fusionStreams.js"
54+
"types": "./dist/nodejs.d.ts",
55+
"default": "./dist/nodejs.js"
5656
},
5757
"require": {
58-
"types": "./dist/fusionStreams.d.cts",
59-
"default": "./dist/fusionStreams.cjs"
58+
"types": "./dist/nodejs.d.cts",
59+
"default": "./dist/nodejs.cjs"
6060
}
6161
}
6262
},
@@ -68,8 +68,8 @@
6868
"cli": [
6969
"./dist/cli.d.ts"
7070
],
71-
"fusionStreams": [
72-
"./dist/fusionStreams.d.ts"
71+
"nodejs": [
72+
"./dist/nodejs.d.ts"
7373
]
7474
}
7575
},
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
export * from './sequentialTransformation';
1+
export * from './sequentialTransform';

src/packages/emmett/src/fusionStreams/transformations/sequentialTransformation.ts renamed to src/packages/emmett/src/fusionStreams/transformations/sequentialTransform.ts

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
import { Transform, type WritableOptions } from 'stream';
2+
import type { EmmettError } from '../../errors';
3+
4+
export type SequentialTransformHandlerResultType = 'ACK' | 'SKIP' | 'STOP';
5+
6+
export type SequentialTransformHandlerResult<OutgoingMessageType = unknown> =
7+
| { resultType: 'ACK'; message: OutgoingMessageType }
8+
| { resultType: 'SKIP'; reason?: string }
9+
| { resultType: 'STOP'; reason?: string; error?: EmmettError };
210

311
export type SequentialTransformHandler<
412
IncomingMessageType = unknown,
513
OutgoingMessageType = unknown,
6-
> = {
7-
handler: (
8-
message: IncomingMessageType,
9-
) => Promise<OutgoingMessageType | null>;
10-
};
14+
> = (
15+
message: IncomingMessageType,
16+
) => Promise<SequentialTransformHandlerResult<OutgoingMessageType>>;
1117

1218
export type SequentialTransformOptions<
1319
IncomingMessageType = unknown,
@@ -40,9 +46,18 @@ export class SequentialTransform<
4046
callback: (error?: Error | null) => void,
4147
): Promise<void> {
4248
try {
43-
const result = await this.handler.handler(message);
49+
const result = await this.handler(message);
4450

45-
this.push(result);
51+
switch (result.resultType) {
52+
case 'ACK':
53+
this.push(result);
54+
break;
55+
case 'SKIP':
56+
break;
57+
case 'STOP':
58+
this.push(null);
59+
break;
60+
}
4661

4762
callback();
4863
} catch (error) {

src/packages/emmett/tsup.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export default defineConfig([
1515
watch: env === 'development',
1616
target: 'esnext',
1717
outDir: 'dist', //env === 'production' ? 'dist' : 'lib',
18-
entry: ['src/index.ts', 'src/cli.ts', 'src/fusionStreams.ts'],
18+
entry: ['src/index.ts', 'src/cli.ts', 'src/nodejs.ts'],
1919
sourcemap: true,
2020
tsconfig: 'tsconfig.build.json', // workaround for https://github.com/egoist/tsup/issues/571#issuecomment-1760052931
2121
},

0 commit comments

Comments
 (0)