Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import assert from 'assert';

export class CancellationPromise<T = unknown> extends Promise<T> {
private _resolve: (value: T | PromiseLike<T>) => void;
private _reject: (reason?: unknown) => void;
private _state: 'resolved' | 'rejected' | 'pending' = 'pending';

constructor(
executor: (
resolve: (value: T | PromiseLike<T>) => void,
reject: (reason?: unknown) => void,
) => void = () => null,
) {
let _resolve: ((value: T | PromiseLike<T>) => void) | undefined = undefined;
let _reject: ((reason?: unknown) => void) | undefined = undefined;

super((resolve, reject) => {
executor(resolve, reject);
_resolve = resolve;
_reject = reject;
});

assert(_resolve);
assert(_reject);

this._resolve = _resolve;
this._reject = _reject;
}

reject(reason?: unknown): void {
this._state = 'rejected';
this._reject(reason);
}

resolve(value?: T): void {
this._state = 'resolved';
this._resolve(value as T);
}

get isResolved() {
return this._state === 'resolved';
}

get isRejected() {
return this._state === 'rejected';
}

get isPending() {
return this._state === 'pending';
}

static resolved<R = unknown>(value?: R) {
const promise = new CancellationPromise<R>();
promise.resolve(value as R);
return promise;
}

static rejected<R extends Error = Error>(value: R) {
const promise = new CancellationPromise<R>();
promise.reject(value);
return promise;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
import {
EmmettError,
MessageProcessor,
type AnyEvent,
type AnyMessage,
type AsyncRetryOptions,
type DefaultRecord,
type GlobalPositionTypeOfRecordedMessageMetadata,
type Message,
type MessageConsumer,
type RecordedMessage,
} from '@event-driven-io/emmett';
import { MongoClient, type MongoClientOptions } from 'mongodb';
import { v4 as uuid } from 'uuid';
import type { MongoDBRecordedMessageMetadata } from '../event';
import type { MongoDBReadEventMetadata } from '../mongoDBEventStore';
import { CancellationPromise } from './CancellablePromise';
import {
changeStreamReactor,
mongoDBProjector,
type MongoDBProcessor,
type MongoDBProcessorOptions,
type MongoDBProjectorOptions,
} from './mongoDBProcessor';
import {
generateVersionPolicies,
mongoDBSubscription,
zipMongoDBMessageBatchPullerStartFrom,
type ChangeStreamFullDocumentValuePolicy,
type MongoDBSubscription,
} from './subscriptions';
import type { MongoDBResumeToken } from './subscriptions/types';

export type MessageConsumerOptions<
MessageType extends Message = AnyMessage,
MessageMetadataType extends
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
HandlerContext extends DefaultRecord | undefined = undefined,
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,
> = {
consumerId?: string;

processors?: MessageProcessor<
MessageType,
MessageMetadataType,
HandlerContext,
CheckpointType
>[];
};

export type MongoDBEventStoreConsumerConfig<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
ConsumerMessageType extends Message = any,
MessageMetadataType extends
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
HandlerContext extends DefaultRecord | undefined = undefined,
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,
> = MessageConsumerOptions<
ConsumerMessageType,
MessageMetadataType,
HandlerContext,
CheckpointType
> & {
// from?: any;
// pulling?: {
// batchSize?: number;
// };
resilience?: {
resubscribeOptions?: AsyncRetryOptions;
};
changeStreamFullDocumentPolicy: ChangeStreamFullDocumentValuePolicy;
};

export type MongoDBConsumerOptions<
ConsumerEventType extends Message = Message,
MessageMetadataType extends
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
HandlerContext extends DefaultRecord | undefined = undefined,
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,
> = MongoDBEventStoreConsumerConfig<
ConsumerEventType,
MessageMetadataType,
HandlerContext,
CheckpointType
> &
(
| {
connectionString: string;
clientOptions?: MongoClientOptions;
client?: never;
}
| {
client: MongoClient;
connectionString?: never;
clientOptions?: never;
}
);

export type MongoDBEventStoreConsumer<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
ConsumerMessageType extends AnyMessage = any,
> = MessageConsumer<ConsumerMessageType> &
Readonly<{
reactor: <MessageType extends AnyMessage = ConsumerMessageType>(
options: MongoDBProcessorOptions<MessageType>,
) => MongoDBProcessor<MessageType>;
}> &
(AnyEvent extends ConsumerMessageType
? Readonly<{
projector: <
EventType extends AnyEvent = ConsumerMessageType & AnyEvent,
>(
options: MongoDBProjectorOptions<EventType>,
) => MongoDBProcessor<EventType>;
}>
: object);

export type MongoDBConsumerHandlerContext = {
client?: MongoClient;
};

export const mongoDBMessagesConsumer = <
ConsumerMessageType extends Message = AnyMessage,
MessageMetadataType extends
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
HandlerContext extends
MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext,
CheckpointType = MongoDBResumeToken,
>(
options: MongoDBConsumerOptions<
ConsumerMessageType,
MessageMetadataType,
HandlerContext,
CheckpointType
>,
): MongoDBEventStoreConsumer<ConsumerMessageType> => {
let start: Promise<void>;
let stream: MongoDBSubscription<CheckpointType>;
let isRunning = false;
let runningPromise = new CancellationPromise<null>();
const client =
'client' in options && options.client
? options.client
: new MongoClient(options.connectionString, options.clientOptions);
const processors = options.processors ?? [];

return {
consumerId: options.consumerId ?? uuid(),
get isRunning() {
return isRunning;
},
processors,
reactor: <MessageType extends AnyMessage = ConsumerMessageType>(
options: MongoDBProcessorOptions<MessageType>,
): MongoDBProcessor<MessageType> => {
const processor = changeStreamReactor(options);

processors.push(
processor as unknown as MessageProcessor<
ConsumerMessageType,
MessageMetadataType,
HandlerContext,
CheckpointType
>,
);

return processor;
},
projector: <EventType extends AnyEvent = ConsumerMessageType & AnyEvent>(
options: MongoDBProjectorOptions<EventType>,
): MongoDBProcessor<EventType> => {
const processor = mongoDBProjector(options);

processors.push(
processor as unknown as MessageProcessor<
ConsumerMessageType,
MessageMetadataType,
HandlerContext,
CheckpointType
>,
);

return processor;
},
start: () => {
start = (async () => {
if (processors.length === 0)
return Promise.reject(
new EmmettError(
'Cannot start consumer without at least a single processor',
),
);

isRunning = true;

runningPromise = new CancellationPromise<null>();

const positions = await Promise.all(
processors.map((o) => o.start({ client } as Partial<HandlerContext>)),
);
const startFrom =
zipMongoDBMessageBatchPullerStartFrom<CheckpointType>(positions);

stream = mongoDBSubscription<
ConsumerMessageType,
MessageMetadataType,
CheckpointType
>({
client,
from: startFrom,
eachBatch: async (
messages: RecordedMessage<
ConsumerMessageType,
MessageMetadataType
>[],
) => {
for (const processor of processors.filter(
({ isActive }) => isActive,
)) {
await processor.handle(messages, {
client,
} as Partial<HandlerContext>);
}
},
});

const db = options.client?.db?.();

if (!db) {
throw new EmmettError('MongoDB client is not connected');
}

// TODO: Remember to fix.
const versionPolicies = await generateVersionPolicies(db);
const policy = versionPolicies.changeStreamFullDocumentValuePolicy;

await stream.start({
getFullDocumentValue: policy,
startFrom,
});
})();

return start;
},
stop: async () => {
if (stream.isRunning) {
await stream.stop();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not the case for the esdb..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isRunning = false;
runningPromise.resolve(null);
}
},
close: async () => {
if (stream.isRunning) {
await stream.stop();
isRunning = false;
runningPromise.resolve(null);
}
},
};
};

export const mongoDBChangeStreamMessagesConsumer = <
ConsumerMessageType extends Message = AnyMessage,
MessageMetadataType extends
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
HandlerContext extends
MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext,
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,
>(
options: MongoDBConsumerOptions<
ConsumerMessageType,
MessageMetadataType,
HandlerContext,
CheckpointType
>,
): MongoDBEventStoreConsumer<ConsumerMessageType> =>
mongoDBMessagesConsumer<
ConsumerMessageType,
MessageMetadataType,
HandlerContext,
CheckpointType
>(options);
Loading