diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/CancellablePromise.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/CancellablePromise.ts new file mode 100644 index 00000000..74204424 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/CancellablePromise.ts @@ -0,0 +1,63 @@ +import assert from 'assert'; + +export class CancellationPromise extends Promise { + private _resolve: (value: T | PromiseLike) => void; + private _reject: (reason?: unknown) => void; + private _state: 'resolved' | 'rejected' | 'pending' = 'pending'; + + constructor( + executor: ( + resolve: (value: T | PromiseLike) => void, + reject: (reason?: unknown) => void, + ) => void = () => null, + ) { + let _resolve: ((value: T | PromiseLike) => void) | undefined = undefined; + let _reject: ((reason?: unknown) => void) | undefined = undefined; + + super((resolve, reject) => { + executor(resolve, reject); + _resolve = resolve; + _reject = reject; + }); + + assert(_resolve); + assert(_reject); + + this._resolve = _resolve; + this._reject = _reject; + } + + reject(reason?: unknown): void { + this._state = 'rejected'; + this._reject(reason); + } + + resolve(value?: T): void { + this._state = 'resolved'; + this._resolve(value as T); + } + + get isResolved() { + return this._state === 'resolved'; + } + + get isRejected() { + return this._state === 'rejected'; + } + + get isPending() { + return this._state === 'pending'; + } + + static resolved(value?: R) { + const promise = new CancellationPromise(); + promise.resolve(value as R); + return promise; + } + + static rejected(value: R) { + const promise = new CancellationPromise(); + promise.reject(value); + return promise; + } +} diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts new file mode 100644 index 00000000..758af5b4 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -0,0 +1,282 @@ +import { + EmmettError, + MessageProcessor, + type AnyEvent, + type AnyMessage, + type AsyncRetryOptions, + type DefaultRecord, + type GlobalPositionTypeOfRecordedMessageMetadata, + type Message, + type MessageConsumer, + type RecordedMessage, +} from '@event-driven-io/emmett'; +import { MongoClient, type MongoClientOptions } from 'mongodb'; +import { v4 as uuid } from 'uuid'; +import type { MongoDBRecordedMessageMetadata } from '../event'; +import type { MongoDBReadEventMetadata } from '../mongoDBEventStore'; +import { CancellationPromise } from './CancellablePromise'; +import { + changeStreamReactor, + mongoDBProjector, + type MongoDBProcessor, + type MongoDBProcessorOptions, + type MongoDBProjectorOptions, +} from './mongoDBProcessor'; +import { + generateVersionPolicies, + mongoDBSubscription, + zipMongoDBMessageBatchPullerStartFrom, + type ChangeStreamFullDocumentValuePolicy, + type MongoDBSubscription, +} from './subscriptions'; +import type { MongoDBResumeToken } from './subscriptions/types'; + +export type MessageConsumerOptions< + MessageType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + HandlerContext extends DefaultRecord | undefined = undefined, + CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, +> = { + consumerId?: string; + + processors?: MessageProcessor< + MessageType, + MessageMetadataType, + HandlerContext, + CheckpointType + >[]; +}; + +export type MongoDBEventStoreConsumerConfig< + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ConsumerMessageType extends Message = any, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + HandlerContext extends DefaultRecord | undefined = undefined, + CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, +> = MessageConsumerOptions< + ConsumerMessageType, + MessageMetadataType, + HandlerContext, + CheckpointType +> & { + // from?: any; + // pulling?: { + // batchSize?: number; + // }; + resilience?: { + resubscribeOptions?: AsyncRetryOptions; + }; + changeStreamFullDocumentPolicy: ChangeStreamFullDocumentValuePolicy; +}; + +export type MongoDBConsumerOptions< + ConsumerEventType extends Message = Message, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + HandlerContext extends DefaultRecord | undefined = undefined, + CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, +> = MongoDBEventStoreConsumerConfig< + ConsumerEventType, + MessageMetadataType, + HandlerContext, + CheckpointType +> & + ( + | { + connectionString: string; + clientOptions?: MongoClientOptions; + client?: never; + } + | { + client: MongoClient; + connectionString?: never; + clientOptions?: never; + } + ); + +export type MongoDBEventStoreConsumer< + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ConsumerMessageType extends AnyMessage = any, +> = MessageConsumer & + Readonly<{ + reactor: ( + options: MongoDBProcessorOptions, + ) => MongoDBProcessor; + }> & + (AnyEvent extends ConsumerMessageType + ? Readonly<{ + projector: < + EventType extends AnyEvent = ConsumerMessageType & AnyEvent, + >( + options: MongoDBProjectorOptions, + ) => MongoDBProcessor; + }> + : object); + +export type MongoDBConsumerHandlerContext = { + client?: MongoClient; +}; + +export const mongoDBMessagesConsumer = < + ConsumerMessageType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + HandlerContext extends + MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext, + CheckpointType = MongoDBResumeToken, +>( + options: MongoDBConsumerOptions< + ConsumerMessageType, + MessageMetadataType, + HandlerContext, + CheckpointType + >, +): MongoDBEventStoreConsumer => { + let start: Promise; + let stream: MongoDBSubscription; + let isRunning = false; + let runningPromise = new CancellationPromise(); + const client = + 'client' in options && options.client + ? options.client + : new MongoClient(options.connectionString, options.clientOptions); + const processors = options.processors ?? []; + + return { + consumerId: options.consumerId ?? uuid(), + get isRunning() { + return isRunning; + }, + processors, + reactor: ( + options: MongoDBProcessorOptions, + ): MongoDBProcessor => { + const processor = changeStreamReactor(options); + + processors.push( + processor as unknown as MessageProcessor< + ConsumerMessageType, + MessageMetadataType, + HandlerContext, + CheckpointType + >, + ); + + return processor; + }, + projector: ( + options: MongoDBProjectorOptions, + ): MongoDBProcessor => { + const processor = mongoDBProjector(options); + + processors.push( + processor as unknown as MessageProcessor< + ConsumerMessageType, + MessageMetadataType, + HandlerContext, + CheckpointType + >, + ); + + return processor; + }, + start: () => { + start = (async () => { + if (processors.length === 0) + return Promise.reject( + new EmmettError( + 'Cannot start consumer without at least a single processor', + ), + ); + + isRunning = true; + + runningPromise = new CancellationPromise(); + + const positions = await Promise.all( + processors.map((o) => o.start({ client } as Partial)), + ); + const startFrom = + zipMongoDBMessageBatchPullerStartFrom(positions); + + stream = mongoDBSubscription< + ConsumerMessageType, + MessageMetadataType, + CheckpointType + >({ + client, + from: startFrom, + eachBatch: async ( + messages: RecordedMessage< + ConsumerMessageType, + MessageMetadataType + >[], + ) => { + for (const processor of processors.filter( + ({ isActive }) => isActive, + )) { + await processor.handle(messages, { + client, + } as Partial); + } + }, + }); + + const db = options.client?.db?.(); + + if (!db) { + throw new EmmettError('MongoDB client is not connected'); + } + + // TODO: Remember to fix. + const versionPolicies = await generateVersionPolicies(db); + const policy = versionPolicies.changeStreamFullDocumentValuePolicy; + + await stream.start({ + getFullDocumentValue: policy, + startFrom, + }); + })(); + + return start; + }, + stop: async () => { + if (stream.isRunning) { + await stream.stop(); + isRunning = false; + runningPromise.resolve(null); + } + }, + close: async () => { + if (stream.isRunning) { + await stream.stop(); + isRunning = false; + runningPromise.resolve(null); + } + }, + }; +}; + +export const mongoDBChangeStreamMessagesConsumer = < + ConsumerMessageType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + HandlerContext extends + MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext, + CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, +>( + options: MongoDBConsumerOptions< + ConsumerMessageType, + MessageMetadataType, + HandlerContext, + CheckpointType + >, +): MongoDBEventStoreConsumer => + mongoDBMessagesConsumer< + ConsumerMessageType, + MessageMetadataType, + HandlerContext, + CheckpointType + >(options); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts new file mode 100644 index 00000000..7551ad82 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts @@ -0,0 +1,255 @@ +import { + type AnyEvent, + type AnyMessage, + type Checkpointer, + type Event, + type Message, + type MessageHandlerResult, + type MessageProcessingScope, + MessageProcessor, + type ProjectorOptions, + type ReactorOptions, + type RecordedMessage, + projector, + reactor, +} from '@event-driven-io/emmett'; +import { MongoClient } from 'mongodb'; +import type { ReadEventMetadataWithGlobalPosition } from '../event'; +import type { MongoDBEventStoreConnectionOptions } from '../mongoDBEventStore'; +import { readProcessorCheckpoint } from './readProcessorCheckpoint'; +import { storeProcessorCheckpoint } from './storeProcessorCheckpoint'; +import type { MongoDBResumeToken } from './subscriptions/types'; + +type MongoDBConnectionOptions = { + connectionOptions: MongoDBEventStoreConnectionOptions; +}; + +export type MongoDBProcessorHandlerContext = { + client: MongoClient; +}; + +export type CommonRecordedMessageMetadata = + Readonly<{ + messageId: string; + streamPosition: StreamPosition; + streamName: string; + }>; + +export type WithGlobalPosition = Readonly<{ + globalPosition: GlobalPosition; +}>; + +export type RecordedMessageMetadata< + GlobalPosition = undefined, + StreamPosition = MongoDBResumeToken, +> = CommonRecordedMessageMetadata & + // eslint-disable-next-line @typescript-eslint/no-empty-object-type + (GlobalPosition extends undefined ? {} : WithGlobalPosition); + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type AnyRecordedMessageMetadata = RecordedMessageMetadata; + +export type MongoDBProcessor = + MessageProcessor< + MessageType, + ReadEventMetadataWithGlobalPosition, + MongoDBProcessorHandlerContext + >; + +export type MongoDBProcessorOptions = + ReactorOptions< + MessageType, + ReadEventMetadataWithGlobalPosition, + MongoDBProcessorHandlerContext + > & { connectionOptions: MongoDBEventStoreConnectionOptions }; + +export type MongoDBCheckpointer< + MessageType extends AnyMessage = AnyMessage, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + CheckpointType = any, +> = Checkpointer< + MessageType, + ReadEventMetadataWithGlobalPosition, + MongoDBProcessorHandlerContext, + CheckpointType +>; + +export type MongoDBProjectorOptions = + ProjectorOptions< + EventType, + ReadEventMetadataWithGlobalPosition, + MongoDBProcessorHandlerContext + > & + MongoDBConnectionOptions; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const isResumeToken = (value: any): value is MongoDBResumeToken => + typeof value === 'object' && + '_data' in value && + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + typeof value._data === 'string'; + +export const getCheckpoint = < + MessageType extends AnyMessage = AnyMessage, + CheckpointType = MongoDBCheckpointer, + MessageMetadataType extends + ReadEventMetadataWithGlobalPosition = ReadEventMetadataWithGlobalPosition, +>( + message: RecordedMessage, +): CheckpointType | null => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return 'checkpoint' in message.metadata && + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + isResumeToken(message.metadata.checkpoint) + ? // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + message.metadata.checkpoint + : 'globalPosition' in message.metadata && + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + isResumeToken(message.metadata.globalPosition) + ? // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + message.metadata.globalPosition + : 'streamPosition' in message.metadata && + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + isResumeToken(message.metadata.streamPosition) + ? // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + message.metadata.streamPosition + : null; +}; + +export const mongoDBCheckpointer = < + MessageType extends Message = Message, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + CheckpointType = any, +>(): MongoDBCheckpointer => ({ + read: async (options, context) => { + const result = await readProcessorCheckpoint( + context.client, + options, + ); + + return { lastCheckpoint: result?.lastProcessedPosition }; + }, + store: async (options, context) => { + const newPosition = getCheckpoint( + options.message, + ); + + const result = await storeProcessorCheckpoint( + context.client, + { + lastProcessedPosition: options.lastCheckpoint, + newPosition, + processorId: options.processorId, + partition: options.partition, + version: options.version || 0, + }, + ); + + return result.success + ? { success: true, newCheckpoint: result.newPosition } + : result; + }, +}); + +const mongoDBProcessingScope = (options: { + client: MongoClient; + processorId: string; +}): MessageProcessingScope => { + // const processorConnectionString = options.connectionString; + + const processingScope: MessageProcessingScope< + MongoDBProcessorHandlerContext + > = async ( + handler: ( + context: MongoDBProcessorHandlerContext, + ) => Result | Promise, + partialContext: Partial, + ) => { + // const connection = partialContext?.connection; + // const connectionString = + // processorConnectionString ?? connection?.connectionString; + + // if (!connectionString) + // throw new EmmettError( + // `MongoDB processor '${options.processorId}' is missing connection string. Ensure that you passed it through options`, + // ); + + return handler({ + client: options.client, + ...partialContext, + }); + }; + + return processingScope; +}; + +export const mongoDBProjector = ( + options: MongoDBProjectorOptions, +): MongoDBProcessor => { + const { connectionOptions } = options; + const hooks = { + onStart: options.hooks?.onStart, + onClose: options.hooks?.onClose + ? async () => { + if (options.hooks?.onClose) await options.hooks?.onClose(); + } + : undefined, + }; + const client = + 'client' in connectionOptions && connectionOptions.client + ? connectionOptions.client + : new MongoClient( + connectionOptions.connectionString, + connectionOptions.clientOptions, + ); + + return projector< + EventType, + ReadEventMetadataWithGlobalPosition, + MongoDBProcessorHandlerContext + >({ + ...options, + hooks, + processingScope: mongoDBProcessingScope({ + client, + processorId: + options.processorId ?? `projection:${options.projection.name}`, + }), + + checkpoints: mongoDBCheckpointer(), + }); +}; + +export const changeStreamReactor = < + MessageType extends AnyMessage = AnyMessage, +>( + options: MongoDBProcessorOptions, +): MongoDBProcessor => { + const connectionOptions = options.connectionOptions || {}; + const client = + 'client' in connectionOptions && connectionOptions.client + ? connectionOptions.client + : new MongoClient( + connectionOptions.connectionString, + connectionOptions.clientOptions, + ); + + const hooks = { + onStart: options.hooks?.onStart, + onClose: options.hooks?.onClose + ? async () => { + if (options.hooks?.onClose) await options.hooks?.onClose(); + } + : undefined, + }; + + return reactor({ + ...options, + hooks, + processingScope: mongoDBProcessingScope({ + client, + processorId: options.processorId, + }), + checkpoints: mongoDBCheckpointer(), + }); +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.e2e.spec.ts new file mode 100644 index 00000000..774a76c3 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.e2e.spec.ts @@ -0,0 +1,131 @@ +import { assertDeepEqual } from '@event-driven-io/emmett'; +import { + MongoDBContainer, + type StartedMongoDBContainer, +} from '@testcontainers/mongodb'; +import { MongoClient } from 'mongodb'; +import { after, before, describe, it } from 'node:test'; +import { readProcessorCheckpoint } from './readProcessorCheckpoint'; +import { storeProcessorCheckpoint } from './storeProcessorCheckpoint'; +import type { MongoDBResumeToken } from './subscriptions/types'; + +void describe('storeProcessorCheckpoint and readProcessorCheckpoint tests', () => { + let mongodb: StartedMongoDBContainer; + let client: MongoClient; + + const processorId = 'processorId-1'; + const resumeToken1: MongoDBResumeToken = { + _data: + '82687E948D000000032B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004', + }; + const resumeToken2: MongoDBResumeToken = { + _data: + '82687E949E000000012B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004', + }; + const resumeToken3: MongoDBResumeToken = { + _data: + '82687E94D4000000012B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004', + }; + + before(async () => { + mongodb = await new MongoDBContainer('mongo:8.0.10').start(); + client = new MongoClient(mongodb.getConnectionString(), { + directConnection: true, + }); + + await client.connect(); + }); + + after(async () => { + await client.close(); + await mongodb.stop(); + }); + + void it('should store successfully last proceeded MongoDB resume token for the first time', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastProcessedPosition: null, + newPosition: resumeToken1, + version: 1, + }); + + assertDeepEqual(result, { + success: true, + newPosition: resumeToken1, + }); + }); + + void it('should store successfully a new checkpoint expecting the previous token', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastProcessedPosition: resumeToken1, + newPosition: resumeToken2, + version: 2, + }); + + assertDeepEqual(result, { + success: true, + newPosition: resumeToken2, + }); + }); + + void it('it returns IGNORED when the newPosition is the same or earlier than the lastProcessedPosition', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastProcessedPosition: resumeToken2, + newPosition: resumeToken1, + version: 3, + }); + + assertDeepEqual(result, { + success: false, + reason: 'IGNORED', + }); + }); + + void it('it returns MISMATCH when the lastProcessedPosition is not the one that is currently stored', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastProcessedPosition: resumeToken1, + newPosition: resumeToken3, + version: 3, + }); + + assertDeepEqual(result, { + success: false, + reason: 'MISMATCH', + }); + }); + + void it('it can save a checkpoint with a specific partition', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastProcessedPosition: null, + newPosition: resumeToken1, + partition: 'partition-2', + version: 1, + }); + + assertDeepEqual(result, { + success: true, + newPosition: resumeToken1, + }); + }); + + void it('it can read a position of a processor with the default partition', async () => { + const result = await readProcessorCheckpoint(client, { + processorId, + }); + + assertDeepEqual(result, { lastProcessedPosition: resumeToken2 }); + }); + + void it('it can read a position of a processor with a defined partition', async () => { + const result = await readProcessorCheckpoint(client, { + processorId, + partition: 'partition-2', + }); + + assertDeepEqual(result, { lastProcessedPosition: resumeToken1 }); + }); +}); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts new file mode 100644 index 00000000..b6075d53 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts @@ -0,0 +1,35 @@ +import type { MongoClient } from 'mongodb'; +import { + DefaultProcessotCheckpointCollectionName, + type ReadProcessorCheckpointSqlResult, +} from './types'; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type ReadProcessorCheckpointResult = { + lastProcessedPosition: CheckpointType | null; +}; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const readProcessorCheckpoint = async ( + client: MongoClient, + options: { + processorId: string; + partition?: string; + collectionName?: string; + databaseName?: string; + }, +): Promise> => { + const result = await client + .db(options.databaseName) + .collection>( + options.collectionName || DefaultProcessotCheckpointCollectionName, + ) + .findOne({ + subscriptionId: options.processorId, + partitionId: options.partition || null, + }); + + return { + lastProcessedPosition: result !== null ? result.lastProcessedToken : null, + }; +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts new file mode 100644 index 00000000..ee17cae0 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts @@ -0,0 +1,78 @@ +import type { MongoClient } from 'mongodb'; +import { compareTwoTokens } from './subscriptions'; +import { + type ReadProcessorCheckpointSqlResult, + DefaultProcessotCheckpointCollectionName, +} from './types'; + +export type StoreLastProcessedProcessorPositionResult = + | { + success: true; + newPosition: Position; + } + | { success: false; reason: 'IGNORED' | 'MISMATCH' }; + +export const storeProcessorCheckpoint = async ( + client: MongoClient, + { + processorId, + version, + newPosition, + lastProcessedPosition, + partition, + collectionName, + dbName, + }: { + processorId: string; + version: number; + newPosition: Position | null; + lastProcessedPosition: Position | null; + partition?: string; + collectionName?: string; + dbName?: string; + }, +): Promise< + StoreLastProcessedProcessorPositionResult< + null extends Position ? Position | null : Position + > +> => { + const checkpoints = client + .db(dbName) + .collection( + collectionName || DefaultProcessotCheckpointCollectionName, + ); + + const filter = { + subscriptionId: processorId, + partitionId: partition || null, + }; + + const current = await checkpoints.findOne(filter); + + // MISMATCH: we have a checkpoint but lastProcessedPosition doesn’t match + if ( + current && + compareTwoTokens(current.lastProcessedToken, lastProcessedPosition) !== 0 + ) { + return { success: false, reason: 'MISMATCH' }; + } + + // IGNORED: same or earlier position + if (current?.lastProcessedToken && newPosition) { + if (compareTwoTokens(current.lastProcessedToken, newPosition) !== -1) { + return { success: false, reason: 'IGNORED' }; + } + } + + const updateResult = await checkpoints.updateOne( + { ...filter, lastProcessedToken: lastProcessedPosition }, + { $set: { lastProcessedToken: newPosition, version } }, + { upsert: true }, + ); + + if (updateResult.matchedCount > 0 || updateResult.upsertedCount > 0) { + return { success: true, newPosition: newPosition! }; + } + + return { success: false, reason: 'MISMATCH' }; +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts new file mode 100644 index 00000000..8badf3ef --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts @@ -0,0 +1,601 @@ +import { + asyncRetry, + EmmettError, + IllegalStateError, + JSONParser, + type AnyMessage, + type AsyncRetryOptions, + type BatchRecordedMessageHandlerWithoutContext, + type CurrentMessageProcessorPosition, + type Event, + type Message, + type MessageHandlerResult, + type ReadEvent, + type RecordedMessage, + type RecordedMessageMetadata, +} from '@event-driven-io/emmett'; +import { + ChangeStream, + Timestamp, + type ChangeStreamInsertDocument, + type ChangeStreamReplaceDocument, + type ChangeStreamUpdateDocument, + type Db, + type Document, + type MongoClient, +} from 'mongodb'; +import { pipeline, Transform, Writable, type WritableOptions } from 'stream'; +import type { MongoDBRecordedMessageMetadata } from '../../event'; +import type { + EventStream, + MongoDBReadEventMetadata, +} from '../../mongoDBEventStore'; +import { isMongoDBResumeToken, type MongoDBResumeToken } from './types'; + +export type MongoDBSubscriptionOptions< + MessageType extends Message = Message, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + // CheckpointType = MongoDBResumeToken, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + CheckpointType = any, +> = { + from?: CurrentMessageProcessorPosition; + client: MongoClient; + // batchSize: number; + eachBatch: BatchRecordedMessageHandlerWithoutContext< + MessageType, + MessageMetadataType + >; + resilience?: { + resubscribeOptions?: AsyncRetryOptions; + }; +}; +export type ChangeStreamFullDocumentValuePolicy = () => + | 'whenAvailable' + | 'updateLookup'; +export type MongoDBSubscriptionDocument = + | ChangeStreamInsertDocument + | ChangeStreamUpdateDocument + | ChangeStreamReplaceDocument; +// https://www.mongodb.com/docs/manual/reference/command/buildInfo/ +export type BuildInfo = { + version: string; + gitVersion: string; + sysInfo: string; + loaderFlags: string; + compilerFlags: string; + allocator: string; + versionArray: number[]; + openssl: Document; + javascriptEngine: string; + bits: number; + debug: boolean; + maxBsonObjectSize: number; + storageEngines: string[]; + ok: number; +}; +export type MongoDBSubscriptionStartFrom = + CurrentMessageProcessorPosition; + +export type MongoDBSubscriptionStartOptions = + { + startFrom: MongoDBSubscriptionStartFrom; + getFullDocumentValue: ChangeStreamFullDocumentValuePolicy; + dbName?: string; + }; + +export type MongoDBSubscription = { + isRunning: boolean; + start(options: MongoDBSubscriptionStartOptions): Promise; + stop(): Promise; +}; + +export type StreamSubscription< + EventType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, +> = ChangeStream< + EventStream, MessageMetadataType>, + MongoDBSubscriptionDocument< + EventStream, RecordedMessageMetadata> + > +>; +export type MessageArrayElement = `messages.${string}`; +export type UpdateDescription = { + _id: MongoDBResumeToken; + operationType: 'update'; + updateDescription: { + updatedFields: Record & { + 'metadata.streamPosition': number; + 'metadata.updatedAt': Date; + }; + }; +}; +export type FullDocument< + EventType extends Event = Event, + EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata, + T extends EventStream = EventStream, +> = { + _id: MongoDBResumeToken; + operationType: 'insert'; + fullDocument: T; +}; +export type OplogChange< + EventType extends Message = AnyMessage, + EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata, + T extends EventStream = EventStream< + Extract, + EventMetaDataType + >, +> = + | FullDocument, EventMetaDataType, T> + | UpdateDescription< + ReadEvent, EventMetaDataType> + >; + +type SubscriptionSequentialHandlerOptions< + MessageType extends AnyMessage = AnyMessage, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + CheckpointType = MongoDBResumeToken, +> = MongoDBSubscriptionOptions< + MessageType, + MessageMetadataType, + CheckpointType +> & + WritableOptions; + +class SubscriptionSequentialHandler< + MessageType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + CheckpointType = MongoDBResumeToken, +> extends Transform { + private options: SubscriptionSequentialHandlerOptions< + MessageType, + MessageMetadataType, + CheckpointType + >; + // private from: EventStoreDBEventStoreConsumerType | undefined; + public isRunning: boolean; + + constructor( + options: SubscriptionSequentialHandlerOptions< + MessageType, + MessageMetadataType, + CheckpointType + >, + ) { + super({ objectMode: true, ...options }); + this.options = options; + // this.from = options.from; + this.isRunning = true; + } + + async _transform( + change: OplogChange, + _encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ): Promise { + try { + if (!this.isRunning || !change) { + callback(); + return; + } + + const messageCheckpoint = change._id; + const streamChange = + change.operationType === 'insert' + ? change.fullDocument + : change.operationType === 'update' + ? { + messages: Object.entries(change.updateDescription.updatedFields) + .filter(([key]) => key.startsWith('messages.')) + .map(([, value]) => value as ReadEvent), + } + : void 0; + + if (!streamChange) { + return; + } + + const messages = streamChange.messages.map((message) => { + return { + kind: message.kind, + type: message.type, + data: message.data, + metadata: { + ...message.metadata, + globalPosition: messageCheckpoint, + }, + } as unknown as RecordedMessage; + }); + + const result = await this.options.eachBatch(messages); + + if (result && result.type === 'STOP') { + this.isRunning = false; + if (!result.error) this.push(messageCheckpoint); + + this.push(result); + this.push(null); + callback(); + return; + } + + this.push(messageCheckpoint); + callback(); + } catch (error) { + callback(error as Error); + } + } +} + +const REGEXP = + /^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$/; + +export const isDatabaseUnavailableError = (error: unknown) => + error instanceof Error && + 'type' in error && + error.type === 'unavailable' && + 'code' in error && + error.code === 14; + +export const MongoDBResubscribeDefaultOptions: AsyncRetryOptions = { + forever: true, + minTimeout: 100, + factor: 1.5, + shouldRetryError: (error) => !isDatabaseUnavailableError(error), +}; + +export const parseSemVer = (value: string = '') => { + const versions = REGEXP.exec(value); + + return { + major: Number(versions?.[1]) || void 0, + minor: Number(versions?.[2]) || void 0, + bugfix: Number(versions?.[3]) || void 0, + rc: versions?.[4] || void 0, + }; +}; + +export const generateVersionPolicies = async (db: Db) => { + const buildInfo = (await db.admin().buildInfo()) as BuildInfo; + const semver = parseSemVer(buildInfo.version); + const major = semver.major || 0; + const throwNotSupportedError = (): never => { + throw new EmmettError( + `Not supported MongoDB version: ${buildInfo.version}.`, + ); + }; + + const supportedVersionCheckPolicy = () => { + if (major < 5) { + throwNotSupportedError(); + } + }; + const changeStreamFullDocumentValuePolicy: ChangeStreamFullDocumentValuePolicy = + () => { + if (major >= 6) { + return 'whenAvailable'; + } else if (major === 5) { + return 'updateLookup'; + } else { + throw new EmmettError(`Major number is ${major}`); + } + }; + + return { + supportedVersionCheckPolicy, + changeStreamFullDocumentValuePolicy, + }; +}; + +// const DEFAULT_PARTITION_KEY_NAME = 'default'; +const createChangeStream = < + EventType extends Message = AnyMessage, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + CheckpointType = any, +>( + getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, + db: Db, + resumeToken?: CurrentMessageProcessorPosition, + // partitionKey: string = DEFAULT_PARTITION_KEY_NAME, +) => { + const $match = { + 'ns.coll': { $regex: /^emt:/, $ne: 'emt:processors' }, + $or: [ + { operationType: 'insert' }, + { + operationType: 'update', + }, + ], + // 'fullDocument.partitionKey': partitionKey, + }; + const pipeline = [ + { + $match, + }, + ]; + + return db.watch< + EventStream>, + MongoDBSubscriptionDocument< + EventStream> + > + >(pipeline, { + fullDocument: getFullDocumentValue(), + ...(resumeToken === 'BEGINNING' + ? { + /* + The MongoDB's API is designed around starting from now or resuming from a known position + (resumeAfter, startAfter, or startAtOperationTime). + By passing a date set a long time ago (year 2000), we force MongoDB to start + from the earliest possible position in the oplog. + If the retention is 48 hours, then it will be 24 hours back. + */ + startAtOperationTime: new Timestamp({ + t: 946684800, + i: 0, + }), + } + : resumeToken === 'END' + ? void 0 + : resumeToken?.lastCheckpoint), + }); +}; + +const subscribe = + (getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, db: Db) => + ( + resumeToken?: MongoDBSubscriptionStartFrom, + ) => { + return createChangeStream(getFullDocumentValue, db, resumeToken); + }; + +export const mongoDBSubscription = < + MessageType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + ResumeToken = MongoDBResumeToken, +>({ + client, + from, + // batchSize, + eachBatch, + resilience, +}: MongoDBSubscriptionOptions< + MessageType, + MessageMetadataType +>): MongoDBSubscription => { + let isRunning = false; + + let start: Promise; + let processor: SubscriptionSequentialHandler< + MessageType, + MessageMetadataType + >; + + let subscription: StreamSubscription; + + const resubscribeOptions: AsyncRetryOptions = + resilience?.resubscribeOptions ?? { + ...MongoDBResubscribeDefaultOptions, + shouldRetryResult: () => isRunning, + shouldRetryError: (error) => + isRunning && MongoDBResubscribeDefaultOptions.shouldRetryError!(error), + }; + + const stopSubscription = async (callback?: () => void): Promise => { + isRunning = false; + if (processor) processor.isRunning = false; + + if (subscription.closed) { + return new Promise((resolve, reject) => { + try { + callback?.(); + resolve(); + } catch (error) { + reject( + error instanceof Error + ? error + : typeof error === 'string' + ? new Error(error) + : new Error('Unknown error'), + ); + } + }); + } else { + try { + await subscription.close(); + } finally { + callback?.(); + } + } + }; + + const pipeMessages = ( + options: MongoDBSubscriptionStartOptions, + ) => { + let retry = 0; + + return asyncRetry( + () => + new Promise((resolve, reject) => { + console.info( + `Starting subscription. ${retry++} retries. From: ${JSONParser.stringify(from ?? '$all')}, Start from: ${JSONParser.stringify( + options.startFrom, + )}`, + ); + subscription = subscribe( + options.getFullDocumentValue, + client.db(options.dbName), + )(options.startFrom); + + processor = new SubscriptionSequentialHandler< + MessageType, + MessageMetadataType + >({ + client, + from, + // batchSize, + eachBatch, + resilience, + }); + + const handler = new (class extends Writable { + async _write( + result: MongoDBResumeToken | MessageHandlerResult, + _encoding: string, + done: () => void, + ) { + if (!isRunning) return; + + if (isMongoDBResumeToken(result)) { + options.startFrom = { + lastCheckpoint: result as ResumeToken, + }; + done(); + return; + } + + if (result && result.type === 'STOP' && result.error) { + console.error( + `Subscription stopped with error code: ${result.error.errorCode}, message: ${ + result.error.message + }.`, + ); + } + + await stopSubscription(); + done(); + } + })({ objectMode: true }); + + pipeline( + subscription, + processor, + handler, + async (error: Error | null) => { + console.info(`Stopping subscription.`); + await stopSubscription(() => { + if (!error) { + console.info('Subscription ended successfully.'); + resolve(); + return; + } + + if ( + error.message === 'ChangeStream is closed' && + error.name === 'MongoAPIError' + ) { + console.info('Subscription ended successfully.'); + resolve(); + return; + } + + console.error( + `Received error: ${JSONParser.stringify(error)}.`, + ); + reject(error); + }); + }, + ); + + console.log('OK'); + }), + resubscribeOptions, + ); + }; + + return { + get isRunning() { + return isRunning; + }, + start: (options) => { + if (isRunning) return start; + + start = (async () => { + isRunning = true; + return pipeMessages(options); + })(); + + return start; + }, + stop: async () => { + if (!isRunning) return start ? await start : Promise.resolve(); + await stopSubscription(); + await start; + }, + }; +}; + +/** + * Compares two MongoDB Resume Tokens. + * @param token1 Token 1. + * @param token2 Token 2. + * @returns 0 - if the tokens are the same, 1 - if the token1 is later, -1 - is the token1 is earlier. + */ +const compareTwoMongoDBTokens = ( + token1: MongoDBResumeToken, + token2: MongoDBResumeToken, +) => { + const bufA = Buffer.from(token1._data, 'hex'); + const bufB = Buffer.from(token2._data, 'hex'); + + return Buffer.compare(bufA, bufB); +}; + +const compareTwoTokens = (token1: unknown, token2: unknown) => { + if (token1 === null && token2) { + return -1; + } + + if (token1 && token2 === null) { + return 1; + } + + if (token1 === null && token2 === null) { + return 0; + } + + if (isMongoDBResumeToken(token1) && isMongoDBResumeToken(token2)) { + return compareTwoMongoDBTokens(token1, token2); + } + + throw new IllegalStateError(`Type of tokens is not comparable`); +}; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const zipMongoDBMessageBatchPullerStartFrom = ( + options: (CurrentMessageProcessorPosition | undefined)[], +): CurrentMessageProcessorPosition => { + if ( + options.length === 0 || + options.some((o) => o === undefined || o === 'BEGINNING') + ) { + return 'BEGINNING'; + } + + if (options.every((o) => o === 'END')) { + return 'END'; + } + + const positionTokens = options.filter( + (o) => o !== undefined && o !== 'BEGINNING' && o !== 'END', + ); + + const sorted = positionTokens.sort((a, b) => { + return compareTwoTokens(a.lastCheckpoint, b.lastCheckpoint); + }); + + return sorted[0]!; +}; + +export { + compareTwoMongoDBTokens, + compareTwoTokens, + subscribe, + zipMongoDBMessageBatchPullerStartFrom, +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/types.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/types.ts new file mode 100644 index 00000000..34c876db --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/types.ts @@ -0,0 +1,11 @@ +export type MongoDBResumeToken = Readonly<{ _data: string }>; +export const isMongoDBResumeToken = ( + value: unknown, +): value is MongoDBResumeToken => { + return !!( + typeof value === 'object' && + value && + '_data' in value && + typeof value._data === 'string' + ); +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/zipMongoDBMessageBatchPullerStartFrom.spec.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/zipMongoDBMessageBatchPullerStartFrom.spec.ts new file mode 100644 index 00000000..35da5f5b --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/zipMongoDBMessageBatchPullerStartFrom.spec.ts @@ -0,0 +1,32 @@ +import { assertEqual, assertNotEqual } from '@event-driven-io/emmett'; +import assert from 'assert'; +import { describe, it } from 'node:test'; +import { zipMongoDBMessageBatchPullerStartFrom } from './'; + +void describe('zipMongoDBMessageBatchPullerStartFrom', () => { + void it('it can get the earliest MongoDB oplog token', () => { + // tokens are sorted in descending order, so the earliest message is at the end + const input = [ + { + lastCheckpoint: { + _data: `82687E94D4000000012B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004`, + }, + }, + { + lastCheckpoint: { + _data: `82687E949E000000012B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004`, + }, + }, + { + lastCheckpoint: { + _data: `82687E948D000000032B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004`, + }, + }, + ]; + const result = zipMongoDBMessageBatchPullerStartFrom(input); + + assertNotEqual('string', typeof result); + assert(typeof result !== 'string'); + assertEqual(input[2]?.lastCheckpoint._data, result.lastCheckpoint._data); + }); +}); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts new file mode 100644 index 00000000..2184c762 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts @@ -0,0 +1,12 @@ +import { toStreamCollectionName } from '../mongoDBEventStore'; + +export const DefaultProcessotCheckpointCollectionName = + toStreamCollectionName(`processors`); + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type ReadProcessorCheckpointSqlResult = { + lastProcessedToken: Position; + subscriptionId: string; + partitionId: string | null; + version: number; +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/event.ts b/src/packages/emmett-mongodb/src/eventStore/event.ts new file mode 100644 index 00000000..4a098a28 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/event.ts @@ -0,0 +1,14 @@ +import type { + BigIntStreamPosition, + RecordedMessageMetadata, + RecordedMessageMetadataWithGlobalPosition, +} from '@event-driven-io/emmett'; +import type { MongoDBResumeToken } from './consumers/subscriptions/types'; + +export type ReadEventMetadataWithGlobalPosition< + GlobalPosition = MongoDBResumeToken, +> = RecordedMessageMetadataWithGlobalPosition; +export type MongoDBRecordedMessageMetadata = RecordedMessageMetadata< + MongoDBResumeToken, + BigIntStreamPosition +>; diff --git a/src/packages/emmett-mongodb/src/eventStore/example.ts b/src/packages/emmett-mongodb/src/eventStore/example.ts new file mode 100644 index 00000000..49f809ea --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/example.ts @@ -0,0 +1,48 @@ +import { type Event } from '@event-driven-io/emmett'; +import { MongoClient } from 'mongodb'; +import { getMongoDBEventStore } from '../eventStore'; + +export type PricedProductItem = { + productId: string; + quantity: number; + price: number; +}; +export type ProductItemAdded = Event< + 'ProductItemAdded', + { productItem: PricedProductItem } +>; +export type DiscountApplied = Event< + 'DiscountApplied', + { percent: number; couponId: string } +>; + +export type ShoppingCartEvent = ProductItemAdded | DiscountApplied; + +const connectionString = `mongodb://localhost:30003,localhost:30004/ylah-access?replicaSet=rsmongo&retryWrites=true&w=majority`; + +const main = async () => { + const mongo = new MongoClient(connectionString); + await mongo.connect(); + const es = getMongoDBEventStore({ + client: mongo, + }); + await es.appendToStream('test', [ + { + type: 'ProductItemAdded', + data: { + productItem: { + price: 100, + productId: '111-000', + quantity: 1, + }, + }, + }, + ]); + process.on('SIGTERM', async () => { + console.info(`Closing...`); + await mongo.close(); + }); +}; + +// eslint-disable-next-line @typescript-eslint/no-floating-promises +main(); diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts new file mode 100644 index 00000000..bb0af710 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts @@ -0,0 +1,238 @@ +import { + assertEqual, + assertIsNotNull, + assertNotEqual, + assertTrue, + STREAM_DOES_NOT_EXIST, +} from '@event-driven-io/emmett'; +import { + MongoDBContainer, + type StartedMongoDBContainer, +} from '@testcontainers/mongodb'; +import assert from 'assert'; +import { MongoClient, type Collection } from 'mongodb'; +import { after, before, describe, it } from 'node:test'; +import { v4 as uuid, v4 } from 'uuid'; +import { + getMongoDBEventStore, + toStreamCollectionName, + toStreamName, + type EventStream, + type MongoDBEventStore, +} from '.'; +import { + type PricedProductItem, + type ProductItemAdded, + type ShoppingCartEvent, +} from '../testing'; +import { CancellationPromise } from './consumers/CancellablePromise'; +import { + mongoDBMessagesConsumer, + type MongoDBEventStoreConsumer, +} from './consumers/mongoDBEventsConsumer'; +import type { MongoDBProcessor } from './consumers/mongoDBProcessor'; +import { + compareTwoMongoDBTokens, + generateVersionPolicies, +} from './consumers/subscriptions'; +import type { MongoDBResumeToken } from './consumers/subscriptions/types'; + +void describe('MongoDBEventStore subscription', () => { + let mongodb: StartedMongoDBContainer; + let eventStore: MongoDBEventStore; + let client: MongoClient; + let collection: Collection; + let consumer: MongoDBEventStoreConsumer; + let processor: MongoDBProcessor | undefined; + let lastResumeToken: MongoDBResumeToken | null = null; + + const messageProcessingPromise1 = new CancellationPromise(); + const messageProcessingPromise2 = new CancellationPromise(); + const lastProductItemIdTest1 = '789'; + const lastProductItemIdTest2 = '999'; + const expectedProductItemIds = [ + '123', + '456', + lastProductItemIdTest1, + lastProductItemIdTest2, + ] as const; + + const shoppingCartId = uuid(); + const streamType = 'shopping_cart'; + const streamName = toStreamName(streamType, shoppingCartId); + const noop = () => {}; + const productItem = (productId: string) => + ({ + productId, + quantity: 10, + price: 3, + }) as PricedProductItem; + + before(async () => { + mongodb = await new MongoDBContainer('mongo:8.0.10').start(); + client = new MongoClient(mongodb.getConnectionString(), { + directConnection: true, + }); + + await client.connect(); + const db = client.db(); + collection = db.collection( + toStreamCollectionName('shopping_cart'), + ); + + eventStore = getMongoDBEventStore({ + client, + }); + const versionPolicy = await generateVersionPolicies(db); + + consumer = mongoDBMessagesConsumer({ + client, + changeStreamFullDocumentPolicy: + versionPolicy.changeStreamFullDocumentValuePolicy, + }); + }); + + after(async () => { + if (consumer) { + await consumer.close(); + } + await client.close(); + await mongodb.stop(); + }); + + void it('should react to new events added by the appendToStream', async () => { + let receivedMessageCount: 0 | 1 | 2 = 0; + + processor = consumer.reactor({ + processorId: v4(), + stopAfter: (event) => { + if (event.data.productItem.productId === lastProductItemIdTest1) { + messageProcessingPromise1.resolve(); + consumer.stop().catch(noop); + } + if (event.data.productItem.productId === lastProductItemIdTest2) { + messageProcessingPromise2.resolve(); + consumer.stop().catch(noop); + } + + return ( + event.data.productItem.productId === lastProductItemIdTest1 || + event.data.productItem.productId === lastProductItemIdTest2 + ); + }, + eachMessage: (event) => { + lastResumeToken = event.metadata.globalPosition; + + assertTrue(receivedMessageCount <= 3); + assertEqual( + expectedProductItemIds[receivedMessageCount], + event.data.productItem.productId, + ); + + receivedMessageCount++; + }, + connectionOptions: { + client, + }, + }); + + await eventStore.appendToStream( + streamName, + [ + { + type: 'ProductItemAdded', + data: { productItem: productItem(expectedProductItemIds[0]) }, + }, + ], + { expectedStreamVersion: STREAM_DOES_NOT_EXIST }, + ); + await eventStore.appendToStream( + streamName, + [ + { + type: 'ProductItemAdded', + data: { productItem: productItem(expectedProductItemIds[1]) }, + }, + ], + { expectedStreamVersion: 1n }, + ); + await eventStore.appendToStream( + streamName, + [ + { + type: 'ProductItemAdded', + data: { productItem: productItem(expectedProductItemIds[2]) }, + }, + ], + { expectedStreamVersion: 2n }, + ); + + await consumer.start(); + + const stream = await collection.findOne( + { streamName }, + { useBigInt64: true }, + ); + + assertIsNotNull(stream); + assertEqual(3n, stream.metadata.streamPosition); + assertEqual(shoppingCartId, stream.metadata.streamId); + assertEqual(streamType, stream.metadata.streamType); + assertTrue(stream.metadata.createdAt instanceof Date); + assertTrue(stream.metadata.updatedAt instanceof Date); + }); + + void it('should renew after the last event', async () => { + assertTrue(!!processor); + assert(processor); + + let stream = await collection.findOne( + { streamName }, + { useBigInt64: true }, + ); + assertIsNotNull(stream); + assertEqual(3n, stream.metadata.streamPosition); + + const position = await processor.start({ client }); + + assertTrue(!!position); + assertNotEqual(typeof position, 'string'); + assert(position); + assert(typeof position !== 'string'); + + // processor after restart is renewed after the 3rd position. + assertEqual( + 0, + compareTwoMongoDBTokens(position.lastCheckpoint, lastResumeToken!), + ); + + const consumerPromise = consumer.start(); + + await new Promise((resolve) => setTimeout(resolve, 1000)); + + await eventStore.appendToStream( + streamName, + [ + { + type: 'ProductItemAdded', + data: { productItem: productItem(expectedProductItemIds[3]) }, + }, + ], + { expectedStreamVersion: 3n }, + ); + + await consumerPromise; + + stream = await collection.findOne({ streamName }, { useBigInt64: true }); + assertIsNotNull(stream); + assertEqual(4n, stream.metadata.streamPosition); + + // lastResumeToken has changed after the last message + assertEqual( + 1, + compareTwoMongoDBTokens(lastResumeToken!, position.lastCheckpoint), + ); + + await consumer.stop(); + }); +}); diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.e2e.spec.ts index 179e1332..d6bc6dda 100644 --- a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.e2e.spec.ts +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.e2e.spec.ts @@ -1,15 +1,15 @@ +import { assertEqual, type Event } from '@event-driven-io/emmett'; +import { + MongoDBContainer, + StartedMongoDBContainer, +} from '@testcontainers/mongodb'; +import { MongoClient } from 'mongodb'; import { after, before, describe, it } from 'node:test'; import { v7 as uuid } from 'uuid'; -import { type Event, assertEqual } from '@event-driven-io/emmett'; import { getMongoDBEventStore, type MongoDBReadEvent, } from './mongoDBEventStore'; -import { - MongoDBContainer, - StartedMongoDBContainer, -} from '@testcontainers/mongodb'; -import { MongoClient } from 'mongodb'; type TestEvent = Event<'test', { counter: number }, { some: boolean }>;