From 8c6af5c6e4766397b8e5e3e57181cd2076d0c7b4 Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Wed, 9 Jul 2025 21:33:43 +0200 Subject: [PATCH 01/17] feat: mongodb processor and subscription --- .../consumers/CancellablePromise.ts | 63 +++++ .../consumers/mongoDBEventsConsumer.ts | 228 ++++++++++++++++ .../eventStore/consumers/mongoDBProcessor.ts | 255 ++++++++++++++++++ .../consumers/readProcessorCheckpoint.ts | 29 ++ .../consumers/storeProcessorCheckpoint.ts | 69 +++++ .../consumers/subscriptions/index.ts | 196 ++++++++++++++ .../consumers/subscriptions/types.ts | 1 + .../src/eventStore/consumers/types.ts | 12 + .../emmett-mongodb/src/eventStore/event.ts | 15 ++ .../emmett-mongodb/src/eventStore/example.ts | 47 ++++ ...mongoDBEventStore.subscription.e2e.spec.ts | 145 ++++++++++ .../schema/readProcessorCheckpoint.ts | 2 + 12 files changed, 1062 insertions(+) create mode 100644 src/packages/emmett-mongodb/src/eventStore/consumers/CancellablePromise.ts create mode 100644 src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts create mode 100644 src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts create mode 100644 src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts create mode 100644 src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts create mode 100644 src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts create mode 100644 src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/types.ts create mode 100644 src/packages/emmett-mongodb/src/eventStore/consumers/types.ts create mode 100644 src/packages/emmett-mongodb/src/eventStore/event.ts create mode 100644 src/packages/emmett-mongodb/src/eventStore/example.ts create mode 100644 src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/CancellablePromise.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/CancellablePromise.ts new file mode 100644 index 00000000..74204424 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/CancellablePromise.ts @@ -0,0 +1,63 @@ +import assert from 'assert'; + +export class CancellationPromise extends Promise { + private _resolve: (value: T | PromiseLike) => void; + private _reject: (reason?: unknown) => void; + private _state: 'resolved' | 'rejected' | 'pending' = 'pending'; + + constructor( + executor: ( + resolve: (value: T | PromiseLike) => void, + reject: (reason?: unknown) => void, + ) => void = () => null, + ) { + let _resolve: ((value: T | PromiseLike) => void) | undefined = undefined; + let _reject: ((reason?: unknown) => void) | undefined = undefined; + + super((resolve, reject) => { + executor(resolve, reject); + _resolve = resolve; + _reject = reject; + }); + + assert(_resolve); + assert(_reject); + + this._resolve = _resolve; + this._reject = _reject; + } + + reject(reason?: unknown): void { + this._state = 'rejected'; + this._reject(reason); + } + + resolve(value?: T): void { + this._state = 'resolved'; + this._resolve(value as T); + } + + get isResolved() { + return this._state === 'resolved'; + } + + get isRejected() { + return this._state === 'rejected'; + } + + get isPending() { + return this._state === 'pending'; + } + + static resolved(value?: R) { + const promise = new CancellationPromise(); + promise.resolve(value as R); + return promise; + } + + static rejected(value: R) { + const promise = new CancellationPromise(); + promise.reject(value); + return promise; + } +} diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts new file mode 100644 index 00000000..6bf9de9e --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -0,0 +1,228 @@ +import { + EmmettError, + type AnyEvent, + type AnyMessage, + type AsyncRetryOptions, + type CommonRecordedMessageMetadata, + type Event, + type GlobalPositionTypeOfRecordedMessageMetadata, + type Message, + type MessageConsumer, + type RecordedMessage, +} from '@event-driven-io/emmett'; +import { ChangeStream, MongoClient, type MongoClientOptions } from 'mongodb'; +import { v4 as uuid } from 'uuid'; +import type { + MongoDBRecordedMessageMetadata, + ReadEventMetadataWithGlobalPosition, +} from '../event'; +import type { EventStream } from '../mongoDBEventStore'; +import { + changeStreamReactor, + mongoDBProjector, + type MongoDBProcessor, + type MongoDBProcessorOptions, + type MongoDBProjectorOptions, +} from './mongoDBProcessor'; +import { + subscribe as _subscribe, + zipMongoDBEventStoreMessageBatchPullerStartFrom, + type ChangeStreamFullDocumentValuePolicy, + type MongoDBSubscriptionDocument, +} from './subscriptions'; + +const noop = () => Promise.resolve(); + +export type MessageConsumerOptions< + MessageType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, + CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, +> = { + consumerId?: string; + + processors?: MongoDBProcessor[]; +}; + +export type EventStoreDBEventStoreConsumerConfig< + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ConsumerMessageType extends Message = any, +> = MessageConsumerOptions & { + // from?: any; + pulling?: { + batchSize?: number; + }; + resilience?: { + resubscribeOptions?: AsyncRetryOptions; + }; + changeStreamFullDocumentPolicy: ChangeStreamFullDocumentValuePolicy; +}; + +export type MongoDBConsumerOptions< + ConsumerEventType extends Message = Message, +> = EventStoreDBEventStoreConsumerConfig & + ( + | { + connectionString: string; + clientOptions?: MongoClientOptions; + client?: never; + onHandleStart?: ( + messages: RecordedMessage< + ConsumerEventType, + ReadEventMetadataWithGlobalPosition + >[], + ) => Promise; + onHandleEnd?: ( + messages: RecordedMessage< + ConsumerEventType, + ReadEventMetadataWithGlobalPosition + >[], + ) => Promise; + } + | { + client: MongoClient; + connectionString?: never; + clientOptions?: never; + onHandleStart?: ( + messages: RecordedMessage< + ConsumerEventType, + ReadEventMetadataWithGlobalPosition + >[], + ) => Promise; + onHandleEnd?: ( + messages: RecordedMessage< + ConsumerEventType, + ReadEventMetadataWithGlobalPosition + >[], + ) => Promise; + } + ); + +export type EventStoreDBEventStoreConsumer< + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ConsumerMessageType extends AnyMessage = any, +> = MessageConsumer & + Readonly<{ + reactor: ( + options: MongoDBProcessorOptions, + ) => MongoDBProcessor; + }> & + (AnyEvent extends ConsumerMessageType + ? Readonly<{ + projector: < + EventType extends AnyEvent = ConsumerMessageType & AnyEvent, + >( + options: MongoDBProjectorOptions, + ) => MongoDBProcessor; + }> + : object); + +export const mongoDBEventsConsumer = < + ConsumerMessageType extends Message = AnyMessage, +>( + options: MongoDBConsumerOptions, +): EventStoreDBEventStoreConsumer => { + let start: Promise; + let stream: ChangeStream< + EventStream, + MongoDBSubscriptionDocument< + EventStream + > + >; + let isRunning = false; + const client = + 'client' in options && options.client + ? options.client + : new MongoClient(options.connectionString, options.clientOptions); + const processors = options.processors ?? []; + const subscribe = _subscribe( + options.changeStreamFullDocumentPolicy, + client.db(), + ); + const onHandleStart = options.onHandleStart || noop; + const onHandleEnd = options.onHandleEnd || noop; + + return { + consumerId: options.consumerId ?? uuid(), + get isRunning() { + return isRunning; + }, + processors, + reactor: ( + options: MongoDBProcessorOptions, + ): MongoDBProcessor => { + const processor = changeStreamReactor(options); + + processors.push(processor as unknown as MongoDBProcessor); + + return processor; + }, + projector: ( + options: MongoDBProjectorOptions, + ): MongoDBProcessor => { + const processor = mongoDBProjector(options); + + processors.push(processor as unknown as MongoDBProcessor); + + return processor; + }, + start: () => { + start = (async () => { + if (processors.length === 0) + return Promise.reject( + new EmmettError( + 'Cannot start consumer without at least a single processor', + ), + ); + + isRunning = true; + + const positions = await Promise.all( + processors.map((o) => o.start(options)), + ); + const startFrom = + zipMongoDBEventStoreMessageBatchPullerStartFrom(positions); + + stream = subscribe(); + stream.on('change', async (change) => { + const resumeToken = change._id; + const streamChange = ( + change as unknown as { fullDocument: EventStream } + ).fullDocument; + const messages = streamChange.messages.map((message) => { + return { + kind: message.kind, + type: message.type, + data: message.data, + metadata: { + ...message.metadata, + streamPosition: resumeToken, + }, + } as unknown as RecordedMessage< + ConsumerMessageType, + ReadEventMetadataWithGlobalPosition + >; + }); + + await onHandleStart(messages); + + for (const processor of processors.filter( + ({ isActive }) => isActive, + )) { + await processor.handle(messages, { client }); + } + + await onHandleEnd(messages); + }); + })(); + + return start; + }, + stop: async () => { + return Promise.resolve(); + }, + close: async () => { + await stream.close(); + }, + }; +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts new file mode 100644 index 00000000..452a8143 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts @@ -0,0 +1,255 @@ +import { + type AnyEvent, + type AnyMessage, + type Checkpointer, + type Event, + type Message, + type MessageHandlerResult, + type MessageProcessingScope, + MessageProcessor, + type ProjectorOptions, + type ReactorOptions, + type RecordedMessage, + type StreamPositionTypeOfRecordedMessageMetadata, + projector, + reactor, +} from '@event-driven-io/emmett'; +import { MongoClient } from 'mongodb'; +import type { + MongoDBRecordedMessageMetadata, + ReadEventMetadataWithGlobalPosition, + StringStreamPosition, +} from '../event'; +import type { MongoDBEventStoreConnectionOptions } from '../mongoDBEventStore'; +import { readProcessorCheckpoint } from './readProcessorCheckpoint'; +import { storeProcessorCheckpoint } from './storeProcessorCheckpoint'; +import type { MongoDBResumeToken } from './subscriptions/types'; + +type MongoDBConnectionOptions = { + connectionOptions: MongoDBEventStoreConnectionOptions; +}; + +export type MongoDBProcessorHandlerContext = { + client: MongoClient; + // execute: SQLExecutor; + // connection: { + // connectionString: string; + // client: NodePostgresClient; + // transaction: NodePostgresTransaction; + // pool: Dumbo; + // }; +}; + +export type CommonRecordedMessageMetadata< + StreamPosition = StringStreamPosition, +> = Readonly<{ + messageId: string; + streamPosition: StreamPosition; + streamName: string; +}>; + +export type WithGlobalPosition = Readonly<{ + globalPosition: GlobalPosition; +}>; + +export type RecordedMessageMetadata< + GlobalPosition = undefined, + StreamPosition = StringStreamPosition, +> = CommonRecordedMessageMetadata & + // eslint-disable-next-line @typescript-eslint/no-empty-object-type + (GlobalPosition extends undefined ? {} : WithGlobalPosition); + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type AnyRecordedMessageMetadata = RecordedMessageMetadata; + +export type MongoDBProcessor = + MessageProcessor< + MessageType, + ReadEventMetadataWithGlobalPosition, + MongoDBProcessorHandlerContext + >; + +export type MongoDBProcessorOptions = + ReactorOptions< + MessageType, + ReadEventMetadataWithGlobalPosition, + MongoDBProcessorHandlerContext + > & { connectionOptions: MongoDBEventStoreConnectionOptions }; + +export type MongoDBCheckpointer = + Checkpointer< + MessageType, + ReadEventMetadataWithGlobalPosition, + MongoDBProcessorHandlerContext + >; + +export type MongoDBProjectorOptions = + ProjectorOptions< + EventType, + ReadEventMetadataWithGlobalPosition, + MongoDBProcessorHandlerContext + > & + MongoDBConnectionOptions; + +const isResumeToken = (value: object): value is MongoDBResumeToken => + '_data' in value && + typeof value._data === 'string' && + typeof value._data.trim === 'function' && + value._data.trim() !== ''; + +export const getCheckpoint = < + MessageType extends AnyMessage = AnyMessage, + MessageMetadataType extends + MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, + CheckpointType = StreamPositionTypeOfRecordedMessageMetadata, +>( + message: RecordedMessage, +): CheckpointType | null => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return 'checkpoint' in message.metadata && + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + isResumeToken(message.metadata.checkpoint) + ? // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + message.metadata.checkpoint + : 'globalPosition' in message.metadata && + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + isResumeToken(message.metadata.globalPosition) + ? // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + message.metadata.globalPosition + : 'streamPosition' in message.metadata && + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + isResumeToken(message.metadata.streamPosition) + ? // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + message.metadata.streamPosition + : null; +}; + +export const mongoDBCheckpointer = < + MessageType extends Message = Message, +>(): MongoDBCheckpointer => ({ + read: async (options, context) => { + const result = await readProcessorCheckpoint(context.client, options); + + return { lastCheckpoint: result?.lastProcessedPosition }; + }, + store: async (options, context) => { + const newPosition: MongoDBResumeToken | null = getCheckpoint( + options.message, + ); + + const result = await storeProcessorCheckpoint(context.client, { + lastProcessedPosition: options.lastCheckpoint, + newPosition, + processorId: options.processorId, + partition: options.partition, + version: options.version, + }); + + return result.success + ? { success: true, newCheckpoint: result.newPosition } + : result; + }, +}); + +const mongoDBProcessingScope = (options: { + client: MongoClient; + processorId: string; +}): MessageProcessingScope => { + // const processorConnectionString = options.connectionString; + + const processingScope: MessageProcessingScope< + MongoDBProcessorHandlerContext + > = async ( + handler: ( + context: MongoDBProcessorHandlerContext, + ) => Result | Promise, + partialContext: Partial, + ) => { + // const connection = partialContext?.connection; + // const connectionString = + // processorConnectionString ?? connection?.connectionString; + + // if (!connectionString) + // throw new EmmettError( + // `MongoDB processor '${options.processorId}' is missing connection string. Ensure that you passed it through options`, + // ); + + return handler({ + client: options.client, + ...partialContext, + }); + }; + + return processingScope; +}; + +export const mongoDBProjector = ( + options: MongoDBProjectorOptions, +): MongoDBProcessor => { + const { connectionOptions } = options; + const hooks = { + onStart: options.hooks?.onStart, + onClose: options.hooks?.onClose + ? async () => { + if (options.hooks?.onClose) await options.hooks?.onClose(); + } + : undefined, + }; + const client = + 'client' in connectionOptions && connectionOptions.client + ? connectionOptions.client + : new MongoClient( + connectionOptions.connectionString, + connectionOptions.clientOptions, + ); + + return projector< + EventType, + ReadEventMetadataWithGlobalPosition, + MongoDBProcessorHandlerContext + >({ + ...options, + hooks, + processingScope: mongoDBProcessingScope({ + client, + processorId: + options.processorId ?? `projection:${options.projection.name}`, + }), + + checkpoints: mongoDBCheckpointer(), + }); +}; + +export const changeStreamReactor = < + MessageType extends AnyMessage = AnyMessage, +>( + options: MongoDBProcessorOptions, +): MongoDBProcessor => { + const connectionOptions = options.connectionOptions || {}; + const client = + 'client' in connectionOptions && connectionOptions.client + ? connectionOptions.client + : new MongoClient( + connectionOptions.connectionString, + connectionOptions.clientOptions, + ); + + const hooks = { + onStart: options.hooks?.onStart, + onClose: options.hooks?.onClose + ? async () => { + if (options.hooks?.onClose) await options.hooks?.onClose(); + } + : undefined, + }; + + return reactor({ + ...options, + hooks, + processingScope: mongoDBProcessingScope({ + client, + processorId: options.processorId, + }), + checkpoints: mongoDBCheckpointer(), + }); +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts new file mode 100644 index 00000000..9b858549 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts @@ -0,0 +1,29 @@ +import type { MongoClient } from 'mongodb'; +import { + DefaultProcessotCheckpointCollectionName, + type ReadProcessorCheckpointSqlResult, +} from './types'; +import type { MongoDBResumeToken } from './subscriptions/types'; + +export type ReadProcessorCheckpointResult = { + lastProcessedPosition: MongoDBResumeToken | null; +}; + +export const readProcessorCheckpoint = async ( + client: MongoClient, + options: { processorId: string; partition?: string; collectionName?: string }, +): Promise => { + const result = await client + .db() + .collection( + options.collectionName || DefaultProcessotCheckpointCollectionName, + ) + .findOne({ + subscriptionId: options.processorId, + partitionId: options.partition || null, + }); + + return { + lastProcessedPosition: result !== null ? result.lastProcessedToken : null, + }; +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts new file mode 100644 index 00000000..6bd5ea05 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts @@ -0,0 +1,69 @@ +import type { MongoClient } from 'mongodb'; +import type { MongoDBResumeToken } from './subscriptions/types'; +import { + DefaultProcessotCheckpointCollectionName, + type ReadProcessorCheckpointSqlResult, +} from './types'; + +export type StoreLastProcessedProcessorPositionResult< + Position extends MongoDBResumeToken | null = MongoDBResumeToken, +> = + | { + success: true; + newPosition: Position; + } + | { success: false; reason: 'IGNORED' | 'MISMATCH' }; + +export const storeProcessorCheckpoint = async ( + client: MongoClient, + options: { + processorId: string; + version: number | undefined; + newPosition: null extends Position + ? MongoDBResumeToken | null + : MongoDBResumeToken; + lastProcessedPosition: MongoDBResumeToken | null; + partition?: string; + collectionName?: string; + }, +): Promise< + StoreLastProcessedProcessorPositionResult< + null extends Position ? MongoDBResumeToken | null : MongoDBResumeToken + > +> => { + try { + const result = await client + .db() + .collection( + options.collectionName || DefaultProcessotCheckpointCollectionName, + ) + .updateOne( + { + subscriptionId: options.processorId, + partitionId: options.partition || null, + lastProcessedToken: options.lastProcessedPosition, + }, + { + $set: { + subscriptionId: options.processorId, + partitionId: options.partition || null, + lastProcessedToken: options.newPosition, + version: options.version, + }, + }, + { + upsert: true, + }, + ); + + return result.modifiedCount || result.upsertedCount + ? { success: true, newPosition: options.newPosition } + : { + success: false, + reason: result.matchedCount === 0 ? 'IGNORED' : 'MISMATCH', + }; + } catch (error) { + console.log(error); + throw error; + } +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts new file mode 100644 index 00000000..6908ad64 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts @@ -0,0 +1,196 @@ +import type { + AsyncRetryOptions, + BatchRecordedMessageHandlerWithoutContext, + Event, + Message, + ReadEventMetadataWithGlobalPosition, +} from '@event-driven-io/emmett'; +import type { + ChangeStreamDeleteDocument, + ChangeStreamInsertDocument, + ChangeStreamReplaceDocument, + ChangeStreamUpdateDocument, + Db, + Document, + MongoClient, + ResumeToken, +} from 'mongodb'; +import type { EventStream } from '../../mongoDBEventStore'; +import type { MongoDBResumeToken } from './types'; + +export type MongoDBSubscriptionOptions = + { + // from?: EventStoreDBEventStoreConsumerType; + client: MongoClient; + batchSize: number; + eachBatch: BatchRecordedMessageHandlerWithoutContext< + MessageType, + ReadEventMetadataWithGlobalPosition + >; + resilience?: { + resubscribeOptions?: AsyncRetryOptions; + }; + }; +export type ChangeStreamFullDocumentValuePolicy = () => + | 'whenAvailable' + | 'updateLookup'; +export type MongoDBSubscriptionDocument = + | ChangeStreamInsertDocument + | ChangeStreamUpdateDocument + | ChangeStreamReplaceDocument + | ChangeStreamDeleteDocument; +// https://www.mongodb.com/docs/manual/reference/command/buildInfo/ +export type BuildInfo = { + version: string; + gitVersion: string; + sysInfo: string; + loaderFlags: string; + compilerFlags: string; + allocator: string; + versionArray: number[]; + openssl: Document; + javascriptEngine: string; + bits: number; + debug: boolean; + maxBsonObjectSize: number; + storageEngines: string[]; + ok: number; +}; +export type MongoDBSubscriptionStartFrom = + | { lastCheckpoint: MongoDBResumeToken } + | 'BEGINNING' + | 'END'; + +export type MongoDBSubscriptionStartOptions = { + startFrom: MongoDBSubscriptionStartFrom; +}; + +// export type EventStoreDBEventStoreConsumerType = +// | { +// stream: $all; +// options?: Exclude; +// } +// | { +// stream: string; +// options?: Exclude; +// }; +const REGEXP = + /^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$/; + +export const parseSemVer = (value: string = '') => { + const versions = REGEXP.exec(value); + + return { + major: Number(versions?.[1]) || void 0, + minor: Number(versions?.[2]) || void 0, + bugfix: Number(versions?.[3]) || void 0, + rc: versions?.[4] || void 0, + }; +}; + +export const generateVersionPolicies = async (db: Db) => { + const buildInfo = (await db.admin().buildInfo()) as BuildInfo; + const semver = parseSemVer(buildInfo.version); + const major = semver.major || 0; + const throwNotSupportedError = (): never => { + throw new Error(); + // throw new NotSupportedMongoVersionError({ + // currentVersion: buildInfo.version, + // supportedVersions: SupportedMajorMongoVersions, + // }); + }; + + const supportedVersionCheckPolicy = () => { + if (major < 5) { + throwNotSupportedError(); + } + }; + const changeStreamFullDocumentValuePolicy: ChangeStreamFullDocumentValuePolicy = + () => { + if (major >= 6) { + return 'whenAvailable'; + } else if (major === 5) { + return 'updateLookup'; + } else { + throw new Error(`Major number is ${major}`); + // throwNotSupportedError(); + } + }; + + return { + supportedVersionCheckPolicy, + changeStreamFullDocumentValuePolicy, + }; +}; + +const createChangeStream = ( + getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, + db: Db, + // messages: Collection>, + // partitionKey: string, + resumeToken?: ResumeToken, +) => { + //: Partial>> + const $match = { + 'ns.coll': { $regex: /^emt:/ }, + $or: [ + { operationType: 'insert' }, + { + operationType: 'update', + 'updateDescription.updatedFields.messages': { $exists: true }, + }, + ], + // 'fullDocument.partitionKey': partitionKey, + }; + const pipeline = [ + { + $match, + }, + ]; + + return db.watch< + EventStream, + MongoDBSubscriptionDocument> + >(pipeline, { + fullDocument: getFullDocumentValue(), + startAfter: resumeToken, + }); +}; + +const subscribe = + (getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, db: Db) => + (resumeToken?: ResumeToken) => { + return createChangeStream(getFullDocumentValue, db, resumeToken); + }; + +const zipMongoDBMessageBatchPullerStartFrom = ( + options: (MongoDBSubscriptionStartFrom | undefined)[], +): MongoDBSubscriptionStartFrom => { + if ( + options.length === 0 || + options.some((o) => o === undefined || o === 'BEGINNING') + ) { + return 'BEGINNING'; + } + + if (options.every((o) => o === 'END')) { + return 'END'; + } + + const positionTokens = options.filter( + (o) => o !== undefined && o !== 'BEGINNING' && o !== 'END', + ); + + const sorted = positionTokens.sort((a, b) => { + const bufA = Buffer.from(a.lastCheckpoint._data, 'hex'); // or 'base64', depending on encoding + const bufB = Buffer.from(b.lastCheckpoint._data, 'hex'); + return Buffer.compare(bufA, bufB); + }); + + return sorted[0]!; +}; + +export { + subscribe, + zipMongoDBMessageBatchPullerStartFrom as zipMongoDBEventStoreMessageBatchPullerStartFrom, +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/types.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/types.ts new file mode 100644 index 00000000..59a35868 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/types.ts @@ -0,0 +1 @@ +export type MongoDBResumeToken = Readonly<{ _data: string }>; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts new file mode 100644 index 00000000..3f4b63fb --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts @@ -0,0 +1,12 @@ +import { toStreamCollectionName } from '../mongoDBEventStore'; +import type { MongoDBResumeToken } from './subscriptions/types'; + +export const DefaultProcessotCheckpointCollectionName = + toStreamCollectionName(`processors`); + +export type ReadProcessorCheckpointSqlResult = { + lastProcessedToken: MongoDBResumeToken | null; + subscriptionId: string; + partitionId: string | null; + version: number; +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/event.ts b/src/packages/emmett-mongodb/src/eventStore/event.ts new file mode 100644 index 00000000..82a81cd6 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/event.ts @@ -0,0 +1,15 @@ +import type { + RecordedMessageMetadata, + RecordedMessageMetadataWithGlobalPosition, +} from '@event-driven-io/emmett'; +import type { MongoDBResumeToken } from './consumers/subscriptions/types'; + +export type StringStreamPosition = MongoDBResumeToken; +export type StringGlobalPosition = MongoDBResumeToken; +export type ReadEventMetadataWithGlobalPosition< + GlobalPosition = StringGlobalPosition, +> = RecordedMessageMetadataWithGlobalPosition; +export type MongoDBRecordedMessageMetadata = RecordedMessageMetadata< + undefined, + StringStreamPosition +>; diff --git a/src/packages/emmett-mongodb/src/eventStore/example.ts b/src/packages/emmett-mongodb/src/eventStore/example.ts new file mode 100644 index 00000000..b9fddbf2 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/example.ts @@ -0,0 +1,47 @@ +import { type Event } from '@event-driven-io/emmett'; +import { MongoClient } from 'mongodb'; +import { getMongoDBEventStore } from '../eventStore'; + +export type PricedProductItem = { + productId: string; + quantity: number; + price: number; +}; +export type ProductItemAdded = Event< + 'ProductItemAdded', + { productItem: PricedProductItem } +>; +export type DiscountApplied = Event< + 'DiscountApplied', + { percent: number; couponId: string } +>; + +export type ShoppingCartEvent = ProductItemAdded | DiscountApplied; + +const connectionString = `mongodb://localhost:30003,localhost:30004/ylah-access?replicaSet=rsmongo&retryWrites=true&w=majority`; + +const main = async () => { + const mongo = new MongoClient(connectionString); + await mongo.connect(); + const es = getMongoDBEventStore({ + client: mongo, + }); + await es.appendToStream('test', [ + { + type: 'ProductItemAdded', + data: { + productItem: { + price: 100, + productId: '111-000', + quantity: 1, + }, + }, + }, + ]); + process.on('SIGTERM', async () => { + console.info(`Closing...`); + await mongo.close(); + }); +}; + +main(); diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts new file mode 100644 index 00000000..631c9f5b --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts @@ -0,0 +1,145 @@ +import { + assertEqual, + assertIsNotNull, + assertTrue, + STREAM_DOES_NOT_EXIST, +} from '@event-driven-io/emmett'; +import { + MongoDBContainer, + type StartedMongoDBContainer, +} from '@testcontainers/mongodb'; +import { MongoClient, type Collection } from 'mongodb'; +import { after, before, beforeEach, describe, it } from 'node:test'; +import { v4 as uuid, v4 } from 'uuid'; +import { + getMongoDBEventStore, + toStreamCollectionName, + toStreamName, + type EventStream, + type MongoDBEventStore, +} from '.'; +import { + type PricedProductItem, + type ProductItemAdded, + type ShoppingCartEvent, +} from '../testing'; +import { CancellationPromise } from './consumers/CancellablePromise'; +import { + mongoDBEventsConsumer, + type EventStoreDBEventStoreConsumer, +} from './consumers/mongoDBEventsConsumer'; +import { generateVersionPolicies } from './consumers/subscriptions'; + +void describe('MongoDBEventStore subscription', () => { + let mongodb: StartedMongoDBContainer; + let eventStore: MongoDBEventStore; + let client: MongoClient; + let collection: Collection; + let consumer: EventStoreDBEventStoreConsumer; + let messageProcessingPromise = new CancellationPromise(); + + const noop = () => {}; + const timeoutGuard = async ( + action: () => Promise, + timeoutAfterMs = 1000, + ) => { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + reject(new Error('timeout')); + clearTimeout(timer); + }, timeoutAfterMs); + + action() + .catch(noop) + .finally(() => { + clearTimeout(timer); + resolve(); + }); + }); + }; + + before(async () => { + mongodb = await new MongoDBContainer('mongo:8.0.10').start(); + client = new MongoClient(mongodb.getConnectionString(), { + directConnection: true, + }); + + await client.connect(); + const db = client.db(); + collection = db.collection( + toStreamCollectionName('shopping_cart'), + ); + + eventStore = getMongoDBEventStore({ + client, + }); + const versionPolicy = await generateVersionPolicies(db); + + consumer = mongoDBEventsConsumer({ + client, + changeStreamFullDocumentPolicy: + versionPolicy.changeStreamFullDocumentValuePolicy, + onHandleEnd: () => { + messageProcessingPromise.resolve(); + return Promise.resolve(); + }, + }); + + consumer.reactor({ + processorId: v4(), + eachMessage: (event) => { + console.log(event); + }, + connectionOptions: { + client, + }, + }); + + await consumer.start(); + }); + + after(async () => { + try { + if (consumer) { + await consumer.close(); + } + await client.close(); + await mongodb.stop(); + } catch (error) { + console.log(error); + } + }); + + beforeEach(() => { + messageProcessingPromise = new CancellationPromise(); + }); + + void it('should create a new stream with metadata with appendToStream', async () => { + const productItem: PricedProductItem = { + productId: '123', + quantity: 10, + price: 3, + }; + const shoppingCartId = uuid(); + const streamType = 'shopping_cart'; + const streamName = toStreamName(streamType, shoppingCartId); + + await eventStore.appendToStream( + streamName, + [{ type: 'ProductItemAdded', data: { productItem } }], + { expectedStreamVersion: STREAM_DOES_NOT_EXIST }, + ); + + const stream = await collection.findOne( + { streamName }, + { useBigInt64: true }, + ); + await timeoutGuard(() => messageProcessingPromise); + assertIsNotNull(stream); + assertEqual(1n, stream.metadata.streamPosition); + assertEqual(shoppingCartId, stream.metadata.streamId); + assertEqual(streamType, stream.metadata.streamType); + assertTrue(stream.metadata.createdAt instanceof Date); + assertTrue(stream.metadata.updatedAt instanceof Date); + }); +}); diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/readProcessorCheckpoint.ts b/src/packages/emmett-postgresql/src/eventStore/schema/readProcessorCheckpoint.ts index fb0fd042..8611dea5 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/readProcessorCheckpoint.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/readProcessorCheckpoint.ts @@ -2,6 +2,8 @@ import { singleOrNull, sql, type SQLExecutor } from '@event-driven-io/dumbo'; import { defaultTag, subscriptionsTable } from './typing'; type ReadProcessorCheckpointSqlResult = { + subscriptionId: string; + partitionId: string | null; last_processed_position: string; }; From bb80bf3c4a9207dd7a0e55bb6eb58f56bfdaade3 Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Thu, 31 Jul 2025 10:19:42 +0200 Subject: [PATCH 02/17] feat: receiving updates on messages --- .../consumers/mongoDBEventsConsumer.ts | 58 +++++++++++-- .../eventStore/consumers/mongoDBProcessor.ts | 15 ++-- .../consumers/subscriptions/index.ts | 9 +- ...MongoDBMessageBatchPullerStartFrom.spec.ts | 32 +++++++ .../emmett-mongodb/src/eventStore/event.ts | 6 +- ...mongoDBEventStore.subscription.e2e.spec.ts | 83 ++++++++++++++----- ...ongoDBEventstore.onAfterCommit.e2e.spec.ts | 12 +-- 7 files changed, 161 insertions(+), 54 deletions(-) create mode 100644 src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/zipMongoDBMessageBatchPullerStartFrom.spec.ts diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts index 6bf9de9e..0724c135 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -8,6 +8,7 @@ import { type GlobalPositionTypeOfRecordedMessageMetadata, type Message, type MessageConsumer, + type ReadEvent, type RecordedMessage, } from '@event-driven-io/emmett'; import { ChangeStream, MongoClient, type MongoClientOptions } from 'mongodb'; @@ -16,7 +17,10 @@ import type { MongoDBRecordedMessageMetadata, ReadEventMetadataWithGlobalPosition, } from '../event'; -import type { EventStream } from '../mongoDBEventStore'; +import type { + EventStream, + MongoDBReadEventMetadata, +} from '../mongoDBEventStore'; import { changeStreamReactor, mongoDBProjector, @@ -26,7 +30,7 @@ import { } from './mongoDBProcessor'; import { subscribe as _subscribe, - zipMongoDBEventStoreMessageBatchPullerStartFrom, + zipMongoDBMessageBatchPullerStartFrom, type ChangeStreamFullDocumentValuePolicy, type MongoDBSubscriptionDocument, } from './subscriptions'; @@ -117,6 +121,30 @@ export type EventStoreDBEventStoreConsumer< }> : object); +type MessageArrayElement = `messages.${string}`; +type UpdateDescription = { + updateDescription: { + updatedFields: Record & { + 'metadata.streamPosition': number; + 'metadata.updatedAt': Date; + }; + }; +}; +type FullDocument< + EventType extends Event = Event, + EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata, + T extends EventStream = EventStream, +> = { + fullDocument: T; +}; +type OplogChange< + EventType extends Event = Event, + EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata, + T extends EventStream = EventStream, +> = + | FullDocument + | UpdateDescription>; + export const mongoDBEventsConsumer = < ConsumerMessageType extends Message = AnyMessage, >( @@ -180,15 +208,29 @@ export const mongoDBEventsConsumer = < const positions = await Promise.all( processors.map((o) => o.start(options)), ); - const startFrom = - zipMongoDBEventStoreMessageBatchPullerStartFrom(positions); + const startFrom = zipMongoDBMessageBatchPullerStartFrom(positions); - stream = subscribe(); + stream = subscribe( + typeof startFrom !== 'string' ? startFrom.lastCheckpoint : void 0, + ); stream.on('change', async (change) => { const resumeToken = change._id; - const streamChange = ( - change as unknown as { fullDocument: EventStream } - ).fullDocument; + const typedChange = change as OplogChange; + const streamChange = + 'updateDescription' in typedChange + ? { + messages: Object.entries( + typedChange.updateDescription.updatedFields, + ) + .filter(([key]) => key.startsWith('messages.')) + .map(([, value]) => value as ReadEvent), + } + : typedChange.fullDocument; + + if (!streamChange) { + return; + } + const messages = streamChange.messages.map((message) => { return { kind: message.kind, diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts index 452a8143..a0ea47e4 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts @@ -3,6 +3,7 @@ import { type AnyMessage, type Checkpointer, type Event, + type GlobalPositionTypeOfRecordedMessageMetadata, type Message, type MessageHandlerResult, type MessageProcessingScope, @@ -10,13 +11,11 @@ import { type ProjectorOptions, type ReactorOptions, type RecordedMessage, - type StreamPositionTypeOfRecordedMessageMetadata, projector, reactor, } from '@event-driven-io/emmett'; import { MongoClient } from 'mongodb'; import type { - MongoDBRecordedMessageMetadata, ReadEventMetadataWithGlobalPosition, StringStreamPosition, } from '../event'; @@ -91,17 +90,17 @@ export type MongoDBProjectorOptions = > & MongoDBConnectionOptions; -const isResumeToken = (value: object): value is MongoDBResumeToken => +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const isResumeToken = (value: any): value is MongoDBResumeToken => '_data' in value && - typeof value._data === 'string' && - typeof value._data.trim === 'function' && - value._data.trim() !== ''; + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + typeof value._data === 'string'; export const getCheckpoint = < MessageType extends AnyMessage = AnyMessage, MessageMetadataType extends - MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, - CheckpointType = StreamPositionTypeOfRecordedMessageMetadata, + ReadEventMetadataWithGlobalPosition = ReadEventMetadataWithGlobalPosition, + CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, >( message: RecordedMessage, ): CheckpointType | null => { diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts index 6908ad64..8508ffc7 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts @@ -132,12 +132,12 @@ const createChangeStream = ( ) => { //: Partial>> const $match = { - 'ns.coll': { $regex: /^emt:/ }, + 'ns.coll': { $regex: /^emt:/, $ne: 'emt:processors' }, $or: [ { operationType: 'insert' }, { operationType: 'update', - 'updateDescription.updatedFields.messages': { $exists: true }, + // 'updateDescription.updatedFields.messages': { $exists: true }, }, ], // 'fullDocument.partitionKey': partitionKey, @@ -190,7 +190,4 @@ const zipMongoDBMessageBatchPullerStartFrom = ( return sorted[0]!; }; -export { - subscribe, - zipMongoDBMessageBatchPullerStartFrom as zipMongoDBEventStoreMessageBatchPullerStartFrom, -}; +export { subscribe, zipMongoDBMessageBatchPullerStartFrom }; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/zipMongoDBMessageBatchPullerStartFrom.spec.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/zipMongoDBMessageBatchPullerStartFrom.spec.ts new file mode 100644 index 00000000..35da5f5b --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/zipMongoDBMessageBatchPullerStartFrom.spec.ts @@ -0,0 +1,32 @@ +import { assertEqual, assertNotEqual } from '@event-driven-io/emmett'; +import assert from 'assert'; +import { describe, it } from 'node:test'; +import { zipMongoDBMessageBatchPullerStartFrom } from './'; + +void describe('zipMongoDBMessageBatchPullerStartFrom', () => { + void it('it can get the earliest MongoDB oplog token', () => { + // tokens are sorted in descending order, so the earliest message is at the end + const input = [ + { + lastCheckpoint: { + _data: `82687E94D4000000012B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004`, + }, + }, + { + lastCheckpoint: { + _data: `82687E949E000000012B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004`, + }, + }, + { + lastCheckpoint: { + _data: `82687E948D000000032B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004`, + }, + }, + ]; + const result = zipMongoDBMessageBatchPullerStartFrom(input); + + assertNotEqual('string', typeof result); + assert(typeof result !== 'string'); + assertEqual(input[2]?.lastCheckpoint._data, result.lastCheckpoint._data); + }); +}); diff --git a/src/packages/emmett-mongodb/src/eventStore/event.ts b/src/packages/emmett-mongodb/src/eventStore/event.ts index 82a81cd6..b5418520 100644 --- a/src/packages/emmett-mongodb/src/eventStore/event.ts +++ b/src/packages/emmett-mongodb/src/eventStore/event.ts @@ -7,9 +7,9 @@ import type { MongoDBResumeToken } from './consumers/subscriptions/types'; export type StringStreamPosition = MongoDBResumeToken; export type StringGlobalPosition = MongoDBResumeToken; export type ReadEventMetadataWithGlobalPosition< - GlobalPosition = StringGlobalPosition, + GlobalPosition extends StringGlobalPosition = StringGlobalPosition, > = RecordedMessageMetadataWithGlobalPosition; export type MongoDBRecordedMessageMetadata = RecordedMessageMetadata< - undefined, - StringStreamPosition + StringGlobalPosition, + undefined >; diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts index 631c9f5b..a1a2db1e 100644 --- a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts @@ -79,23 +79,7 @@ void describe('MongoDBEventStore subscription', () => { client, changeStreamFullDocumentPolicy: versionPolicy.changeStreamFullDocumentValuePolicy, - onHandleEnd: () => { - messageProcessingPromise.resolve(); - return Promise.resolve(); - }, }); - - consumer.reactor({ - processorId: v4(), - eachMessage: (event) => { - console.log(event); - }, - connectionOptions: { - client, - }, - }); - - await consumer.start(); }); after(async () => { @@ -115,28 +99,81 @@ void describe('MongoDBEventStore subscription', () => { }); void it('should create a new stream with metadata with appendToStream', async () => { - const productItem: PricedProductItem = { - productId: '123', - quantity: 10, - price: 3, - }; + const productItem = (productId: string) => + ({ + productId, + quantity: 10, + price: 3, + }) as PricedProductItem; const shoppingCartId = uuid(); const streamType = 'shopping_cart'; const streamName = toStreamName(streamType, shoppingCartId); + const lastProductItemId = '789'; + const expectedProductItemIds = ['123', '456', lastProductItemId] as const; + let receivedMessageCount: 0 | 1 | 2 = 0; + + consumer.reactor({ + processorId: v4(), + eachMessage: (event) => { + assertTrue(receivedMessageCount <= 2); + assertEqual( + expectedProductItemIds[receivedMessageCount], + event.data.productItem.productId, + ); + + if (event.data.productItem.productId === lastProductItemId) { + messageProcessingPromise.resolve(); + } + + receivedMessageCount++; + }, + connectionOptions: { + client, + }, + }); + + await consumer.start(); await eventStore.appendToStream( streamName, - [{ type: 'ProductItemAdded', data: { productItem } }], + [ + { + type: 'ProductItemAdded', + data: { productItem: productItem(expectedProductItemIds[0]) }, + }, + ], { expectedStreamVersion: STREAM_DOES_NOT_EXIST }, ); + await eventStore.appendToStream( + streamName, + [ + { + type: 'ProductItemAdded', + data: { productItem: productItem(expectedProductItemIds[1]) }, + }, + ], + { expectedStreamVersion: 1n }, + ); + await eventStore.appendToStream( + streamName, + [ + { + type: 'ProductItemAdded', + data: { productItem: productItem(expectedProductItemIds[2]) }, + }, + ], + { expectedStreamVersion: 2n }, + ); const stream = await collection.findOne( { streamName }, { useBigInt64: true }, ); + await timeoutGuard(() => messageProcessingPromise); + assertIsNotNull(stream); - assertEqual(1n, stream.metadata.streamPosition); + assertEqual(3n, stream.metadata.streamPosition); assertEqual(shoppingCartId, stream.metadata.streamId); assertEqual(streamType, stream.metadata.streamType); assertTrue(stream.metadata.createdAt instanceof Date); diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.e2e.spec.ts index 179e1332..d6bc6dda 100644 --- a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.e2e.spec.ts +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.e2e.spec.ts @@ -1,15 +1,15 @@ +import { assertEqual, type Event } from '@event-driven-io/emmett'; +import { + MongoDBContainer, + StartedMongoDBContainer, +} from '@testcontainers/mongodb'; +import { MongoClient } from 'mongodb'; import { after, before, describe, it } from 'node:test'; import { v7 as uuid } from 'uuid'; -import { type Event, assertEqual } from '@event-driven-io/emmett'; import { getMongoDBEventStore, type MongoDBReadEvent, } from './mongoDBEventStore'; -import { - MongoDBContainer, - StartedMongoDBContainer, -} from '@testcontainers/mongodb'; -import { MongoClient } from 'mongodb'; type TestEvent = Event<'test', { counter: number }, { some: boolean }>; From bb1ebfe462a0b1be0861308b8b16dddc1c7eebdd Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Wed, 6 Aug 2025 16:19:16 +0200 Subject: [PATCH 03/17] refactor: removed the esdb copy-paste leftovers --- .../src/eventStore/consumers/mongoDBEventsConsumer.ts | 8 ++++---- .../src/eventStore/consumers/subscriptions/index.ts | 4 ++-- .../eventStore/mongoDBEventStore.subscription.e2e.spec.ts | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts index 0724c135..0e603bcc 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -48,7 +48,7 @@ export type MessageConsumerOptions< processors?: MongoDBProcessor[]; }; -export type EventStoreDBEventStoreConsumerConfig< +export type MongoDBEventStoreConsumerConfig< // eslint-disable-next-line @typescript-eslint/no-explicit-any ConsumerMessageType extends Message = any, > = MessageConsumerOptions & { @@ -64,7 +64,7 @@ export type EventStoreDBEventStoreConsumerConfig< export type MongoDBConsumerOptions< ConsumerEventType extends Message = Message, -> = EventStoreDBEventStoreConsumerConfig & +> = MongoDBEventStoreConsumerConfig & ( | { connectionString: string; @@ -102,7 +102,7 @@ export type MongoDBConsumerOptions< } ); -export type EventStoreDBEventStoreConsumer< +export type MongoDBEventStoreConsumer< // eslint-disable-next-line @typescript-eslint/no-explicit-any ConsumerMessageType extends AnyMessage = any, > = MessageConsumer & @@ -149,7 +149,7 @@ export const mongoDBEventsConsumer = < ConsumerMessageType extends Message = AnyMessage, >( options: MongoDBConsumerOptions, -): EventStoreDBEventStoreConsumer => { +): MongoDBEventStoreConsumer => { let start: Promise; let stream: ChangeStream< EventStream, diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts index 8508ffc7..49d0578b 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts @@ -20,7 +20,7 @@ import type { MongoDBResumeToken } from './types'; export type MongoDBSubscriptionOptions = { - // from?: EventStoreDBEventStoreConsumerType; + // from?: MongoDBEventStoreConsumerType; client: MongoClient; batchSize: number; eachBatch: BatchRecordedMessageHandlerWithoutContext< @@ -65,7 +65,7 @@ export type MongoDBSubscriptionStartOptions = { startFrom: MongoDBSubscriptionStartFrom; }; -// export type EventStoreDBEventStoreConsumerType = +// export type MongoDBEventStoreConsumerType = // | { // stream: $all; // options?: Exclude; diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts index a1a2db1e..8282df90 100644 --- a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts @@ -26,7 +26,7 @@ import { import { CancellationPromise } from './consumers/CancellablePromise'; import { mongoDBEventsConsumer, - type EventStoreDBEventStoreConsumer, + type MongoDBEventStoreConsumer, } from './consumers/mongoDBEventsConsumer'; import { generateVersionPolicies } from './consumers/subscriptions'; @@ -35,7 +35,7 @@ void describe('MongoDBEventStore subscription', () => { let eventStore: MongoDBEventStore; let client: MongoClient; let collection: Collection; - let consumer: EventStoreDBEventStoreConsumer; + let consumer: MongoDBEventStoreConsumer; let messageProcessingPromise = new CancellationPromise(); const noop = () => {}; From ae793305fa554f6ea5fdc9a493fe5ea0546a1f32 Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Wed, 6 Aug 2025 18:39:42 +0200 Subject: [PATCH 04/17] fix: revert the processors type back to generic MessageProcessor --- .../consumers/mongoDBEventsConsumer.ts | 65 ++++++++++++++++--- ...mongoDBEventStore.subscription.e2e.spec.ts | 19 ++++++ 2 files changed, 76 insertions(+), 8 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts index 0e603bcc..17bd79da 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -1,9 +1,11 @@ import { EmmettError, + MessageProcessor, type AnyEvent, type AnyMessage, type AsyncRetryOptions, type CommonRecordedMessageMetadata, + type DefaultRecord, type Event, type GlobalPositionTypeOfRecordedMessageMetadata, type Message, @@ -41,17 +43,32 @@ export type MessageConsumerOptions< MessageType extends Message = AnyMessage, MessageMetadataType extends MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, + HandlerContext extends DefaultRecord | undefined = undefined, CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, > = { consumerId?: string; - processors?: MongoDBProcessor[]; + processors?: MessageProcessor< + MessageType, + MessageMetadataType, + HandlerContext, + CheckpointType + >[]; }; export type MongoDBEventStoreConsumerConfig< // eslint-disable-next-line @typescript-eslint/no-explicit-any ConsumerMessageType extends Message = any, -> = MessageConsumerOptions & { + MessageMetadataType extends + MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, + HandlerContext extends DefaultRecord | undefined = undefined, + CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, +> = MessageConsumerOptions< + ConsumerMessageType, + MessageMetadataType, + HandlerContext, + CheckpointType +> & { // from?: any; pulling?: { batchSize?: number; @@ -64,7 +81,16 @@ export type MongoDBEventStoreConsumerConfig< export type MongoDBConsumerOptions< ConsumerEventType extends Message = Message, -> = MongoDBEventStoreConsumerConfig & + MessageMetadataType extends + MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, + HandlerContext extends DefaultRecord | undefined = undefined, + CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, +> = MongoDBEventStoreConsumerConfig< + ConsumerEventType, + MessageMetadataType, + HandlerContext, + CheckpointType +> & ( | { connectionString: string; @@ -147,8 +173,17 @@ type OplogChange< export const mongoDBEventsConsumer = < ConsumerMessageType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, + HandlerContext extends DefaultRecord | undefined = undefined, + CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, >( - options: MongoDBConsumerOptions, + options: MongoDBConsumerOptions< + ConsumerMessageType, + MessageMetadataType, + HandlerContext, + CheckpointType + >, ): MongoDBEventStoreConsumer => { let start: Promise; let stream: ChangeStream< @@ -181,7 +216,14 @@ export const mongoDBEventsConsumer = < ): MongoDBProcessor => { const processor = changeStreamReactor(options); - processors.push(processor as unknown as MongoDBProcessor); + processors.push( + processor as unknown as MessageProcessor< + ConsumerMessageType, + MessageMetadataType, + HandlerContext, + CheckpointType + >, + ); return processor; }, @@ -190,11 +232,18 @@ export const mongoDBEventsConsumer = < ): MongoDBProcessor => { const processor = mongoDBProjector(options); - processors.push(processor as unknown as MongoDBProcessor); + processors.push( + processor as unknown as MessageProcessor< + ConsumerMessageType, + MessageMetadataType, + HandlerContext, + CheckpointType + >, + ); return processor; }, - start: () => { + start: (context: Partial) => { start = (async () => { if (processors.length === 0) return Promise.reject( @@ -206,7 +255,7 @@ export const mongoDBEventsConsumer = < isRunning = true; const positions = await Promise.all( - processors.map((o) => o.start(options)), + processors.map((o) => o.start(context)), ); const startFrom = zipMongoDBMessageBatchPullerStartFrom(positions); diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts index 8282df90..f21a607b 100644 --- a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts @@ -28,6 +28,7 @@ import { mongoDBEventsConsumer, type MongoDBEventStoreConsumer, } from './consumers/mongoDBEventsConsumer'; +import { changeStreamReactor } from './consumers/mongoDBProcessor'; import { generateVersionPolicies } from './consumers/subscriptions'; void describe('MongoDBEventStore subscription', () => { @@ -111,7 +112,25 @@ void describe('MongoDBEventStore subscription', () => { const lastProductItemId = '789'; const expectedProductItemIds = ['123', '456', lastProductItemId] as const; let receivedMessageCount: 0 | 1 | 2 = 0; + changeStreamReactor({ + connectionOptions: { + client, + }, + processorId: v4(), + eachMessage: (event) => { + assertTrue(receivedMessageCount <= 2); + assertEqual( + expectedProductItemIds[receivedMessageCount], + event.data.productItem.productId, + ); + + if (event.data.productItem.productId === lastProductItemId) { + messageProcessingPromise.resolve(); + } + receivedMessageCount++; + }, + }); consumer.reactor({ processorId: v4(), eachMessage: (event) => { From 753103c7981257865f8927365683dad9aad7f6ba Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Wed, 6 Aug 2025 18:41:13 +0200 Subject: [PATCH 05/17] fix: removed onHandleStart --- .../consumers/mongoDBEventsConsumer.ts | 30 ------------------- 1 file changed, 30 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts index 17bd79da..5c3582ca 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -96,35 +96,11 @@ export type MongoDBConsumerOptions< connectionString: string; clientOptions?: MongoClientOptions; client?: never; - onHandleStart?: ( - messages: RecordedMessage< - ConsumerEventType, - ReadEventMetadataWithGlobalPosition - >[], - ) => Promise; - onHandleEnd?: ( - messages: RecordedMessage< - ConsumerEventType, - ReadEventMetadataWithGlobalPosition - >[], - ) => Promise; } | { client: MongoClient; connectionString?: never; clientOptions?: never; - onHandleStart?: ( - messages: RecordedMessage< - ConsumerEventType, - ReadEventMetadataWithGlobalPosition - >[], - ) => Promise; - onHandleEnd?: ( - messages: RecordedMessage< - ConsumerEventType, - ReadEventMetadataWithGlobalPosition - >[], - ) => Promise; } ); @@ -202,8 +178,6 @@ export const mongoDBEventsConsumer = < options.changeStreamFullDocumentPolicy, client.db(), ); - const onHandleStart = options.onHandleStart || noop; - const onHandleEnd = options.onHandleEnd || noop; return { consumerId: options.consumerId ?? uuid(), @@ -295,15 +269,11 @@ export const mongoDBEventsConsumer = < >; }); - await onHandleStart(messages); - for (const processor of processors.filter( ({ isActive }) => isActive, )) { await processor.handle(messages, { client }); } - - await onHandleEnd(messages); }); })(); From 592a8662d1f727a760c7dc1ac75a02d04aae9c3a Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Wed, 6 Aug 2025 18:42:40 +0200 Subject: [PATCH 06/17] refactor: to mongoDbEventsConsumer renamed to mongoDBMessagesConsumer --- .../src/eventStore/consumers/mongoDBEventsConsumer.ts | 2 +- .../src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts index 5c3582ca..29835d45 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -147,7 +147,7 @@ type OplogChange< | FullDocument | UpdateDescription>; -export const mongoDBEventsConsumer = < +export const mongoDBMessagesConsumer = < ConsumerMessageType extends Message = AnyMessage, MessageMetadataType extends MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts index f21a607b..6a2bc530 100644 --- a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts @@ -25,7 +25,7 @@ import { } from '../testing'; import { CancellationPromise } from './consumers/CancellablePromise'; import { - mongoDBEventsConsumer, + mongoDBMessagesConsumer, type MongoDBEventStoreConsumer, } from './consumers/mongoDBEventsConsumer'; import { changeStreamReactor } from './consumers/mongoDBProcessor'; @@ -76,7 +76,7 @@ void describe('MongoDBEventStore subscription', () => { }); const versionPolicy = await generateVersionPolicies(db); - consumer = mongoDBEventsConsumer({ + consumer = mongoDBMessagesConsumer({ client, changeStreamFullDocumentPolicy: versionPolicy.changeStreamFullDocumentValuePolicy, From 30fa0441139b83e5a5a16a1993fb98d7aa3d4e55 Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Wed, 6 Aug 2025 18:48:00 +0200 Subject: [PATCH 07/17] feat: databaseName as parameter for the readProcessorCheckpoint --- .../eventStore/consumers/readProcessorCheckpoint.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts index 9b858549..fc28c266 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts @@ -1,9 +1,9 @@ import type { MongoClient } from 'mongodb'; +import type { MongoDBResumeToken } from './subscriptions/types'; import { DefaultProcessotCheckpointCollectionName, type ReadProcessorCheckpointSqlResult, } from './types'; -import type { MongoDBResumeToken } from './subscriptions/types'; export type ReadProcessorCheckpointResult = { lastProcessedPosition: MongoDBResumeToken | null; @@ -11,10 +11,15 @@ export type ReadProcessorCheckpointResult = { export const readProcessorCheckpoint = async ( client: MongoClient, - options: { processorId: string; partition?: string; collectionName?: string }, + options: { + processorId: string; + partition?: string; + collectionName?: string; + databaseName?: string; + }, ): Promise => { const result = await client - .db() + .db(options.databaseName) .collection( options.collectionName || DefaultProcessotCheckpointCollectionName, ) From 2de9d86cbc349ea46095a422534ebdfc281bca3a Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Sat, 9 Aug 2025 20:51:21 +0200 Subject: [PATCH 08/17] test: storeProcessorCheckpoint and readProcessorCheckpoint tests --- ...ocessorCheckpoint.subscription.e2e.spec.ts | 147 ++++++++++++++++++ .../consumers/storeProcessorCheckpoint.ts | 88 ++++++++--- .../consumers/subscriptions/index.ts | 22 ++- 3 files changed, 232 insertions(+), 25 deletions(-) create mode 100644 src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.subscription.e2e.spec.ts diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.subscription.e2e.spec.ts new file mode 100644 index 00000000..5d583d34 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.subscription.e2e.spec.ts @@ -0,0 +1,147 @@ +import { assertDeepEqual } from '@event-driven-io/emmett'; +import { + MongoDBContainer, + type StartedMongoDBContainer, +} from '@testcontainers/mongodb'; +import { MongoClient, type Collection } from 'mongodb'; +import { after, before, describe, it } from 'node:test'; +import { + getMongoDBEventStore, + toStreamCollectionName, + type EventStream, + type MongoDBEventStore, +} from '../mongoDBEventStore'; +import { readProcessorCheckpoint } from './readProcessorCheckpoint'; +import { storeProcessorCheckpoint } from './storeProcessorCheckpoint'; +import type { MongoDBResumeToken } from './subscriptions/types'; + +void describe('storeProcessorCheckpoint and readProcessorCheckpoint tests', () => { + let mongodb: StartedMongoDBContainer; + let eventStore: MongoDBEventStore; + let client: MongoClient; + let collection: Collection; + + const processorId = 'processorId-1'; + const resumeToken1: MongoDBResumeToken = { + _data: + '82687E948D000000032B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004', + }; + const resumeToken2: MongoDBResumeToken = { + _data: + '82687E949E000000012B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004', + }; + const resumeToken3: MongoDBResumeToken = { + _data: + '82687E94D4000000012B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004', + }; + + before(async () => { + mongodb = await new MongoDBContainer('mongo:8.0.10').start(); + client = new MongoClient(mongodb.getConnectionString(), { + directConnection: true, + }); + + await client.connect(); + const db = client.db(); + collection = db.collection( + toStreamCollectionName('shopping_cart'), + ); + + eventStore = getMongoDBEventStore({ + client, + }); + }); + + after(async () => { + await client.close(); + await mongodb.stop(); + }); + + void it('should store successfully last proceeded MongoDB resume token for the first time', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastProcessedPosition: null, + newPosition: resumeToken1, + version: 1, + }); + + assertDeepEqual(result, { + success: true, + newPosition: resumeToken1, + }); + }); + + void it('should store successfully a new checkpoint expecting the previous token', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastProcessedPosition: resumeToken1, + newPosition: resumeToken2, + version: 2, + }); + + assertDeepEqual(result, { + success: true, + newPosition: resumeToken2, + }); + }); + + void it('it returns IGNORED when the newPosition is the same or earlier than the lastProcessedPosition', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastProcessedPosition: resumeToken2, + newPosition: resumeToken1, + version: 3, + }); + + assertDeepEqual(result, { + success: false, + reason: 'IGNORED', + }); + }); + + void it('it returns MISMATCH when the lastProcessedPosition is not the one that is currently stored', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastProcessedPosition: resumeToken1, + newPosition: resumeToken3, + version: 3, + }); + + assertDeepEqual(result, { + success: false, + reason: 'MISMATCH', + }); + }); + + void it('it can save a checkpoint with a specific partition', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastProcessedPosition: null, + newPosition: resumeToken1, + partition: 'partition-2', + version: 1, + }); + + assertDeepEqual(result, { + success: true, + newPosition: resumeToken1, + }); + }); + + void it('it can read a position of a processor with the default partition', async () => { + const result = await readProcessorCheckpoint(client, { + processorId, + }); + + assertDeepEqual(result, { lastProcessedPosition: resumeToken2 }); + }); + + void it('it can read a position of a processor with a defined partition', async () => { + const result = await readProcessorCheckpoint(client, { + processorId, + partition: 'partition-2', + }); + + assertDeepEqual(result, { lastProcessedPosition: resumeToken1 }); + }); +}); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts index 6bd5ea05..be350e6f 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts @@ -1,4 +1,5 @@ import type { MongoClient } from 'mongodb'; +import { compareTwoTokens } from './subscriptions'; import type { MongoDBResumeToken } from './subscriptions/types'; import { DefaultProcessotCheckpointCollectionName, @@ -18,13 +19,14 @@ export const storeProcessorCheckpoint = async ( client: MongoClient, options: { processorId: string; - version: number | undefined; + version: number; newPosition: null extends Position ? MongoDBResumeToken | null : MongoDBResumeToken; lastProcessedPosition: MongoDBResumeToken | null; partition?: string; collectionName?: string; + dbName?: string; }, ): Promise< StoreLastProcessedProcessorPositionResult< @@ -32,35 +34,79 @@ export const storeProcessorCheckpoint = async ( > > => { try { - const result = await client - .db() + const checkpoints = client + .db(options.dbName) .collection( options.collectionName || DefaultProcessotCheckpointCollectionName, - ) - .updateOne( - { - subscriptionId: options.processorId, - partitionId: options.partition || null, - lastProcessedToken: options.lastProcessedPosition, - }, - { - $set: { + ); + const currentCheckpoint = await checkpoints.findOne({ + subscriptionId: options.processorId, + partitionId: options.partition || null, + }); + const matchedCheckpoint = await checkpoints.findOne({ + subscriptionId: options.processorId, + partitionId: options.partition || null, + lastProcessedToken: options.lastProcessedPosition, + }); + + if (currentCheckpoint && !matchedCheckpoint) { + return { + success: false, + reason: 'MISMATCH', + }; + } + + if (matchedCheckpoint?.lastProcessedToken && options?.newPosition) { + const comparison = compareTwoTokens( + matchedCheckpoint.lastProcessedToken, + options.newPosition, + ); + + // if the tokens are the same or + // the `currentCheckpoint.lastProcessedToken` is later than the `options.newPosition`. + if (comparison !== -1) { + return { + success: false, + reason: 'IGNORED', + }; + } + } + + const result = currentCheckpoint + ? await checkpoints.findOneAndUpdate( + { subscriptionId: options.processorId, partitionId: options.partition || null, - lastProcessedToken: options.newPosition, - version: options.version, + lastProcessedToken: options.lastProcessedPosition, }, - }, - { - upsert: true, - }, - ); + { + $set: { + lastProcessedToken: options.newPosition, + version: options.version, + }, + }, + { + returnDocument: 'after', + }, + ) + : await checkpoints.insertOne({ + subscriptionId: options.processorId, + partitionId: options.partition || null, + lastProcessedToken: options.newPosition, + version: options.version, + }); - return result.modifiedCount || result.upsertedCount + return (result && + 'acknowledged' in result && + result.acknowledged && + result.insertedId) || + (result && + 'lastProcessedToken' in result && + result.lastProcessedToken?._data === options.newPosition?._data) ? { success: true, newPosition: options.newPosition } : { success: false, - reason: result.matchedCount === 0 ? 'IGNORED' : 'MISMATCH', + reason: 'MISMATCH', }; } catch (error) { console.log(error); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts index 49d0578b..f0564541 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts @@ -163,6 +163,22 @@ const subscribe = return createChangeStream(getFullDocumentValue, db, resumeToken); }; +/** + * Compares two MongoDB Resume Tokens. + * @param token1 Token 1. + * @param token2 Token 2. + * @returns 0 - if the tokens are the same, 1 - if the token1 is later, -1 - is the token1 is earlier. + */ +const compareTwoTokens = ( + token1: MongoDBResumeToken, + token2: MongoDBResumeToken, +) => { + const bufA = Buffer.from(token1._data, 'hex'); + const bufB = Buffer.from(token2._data, 'hex'); + + return Buffer.compare(bufA, bufB); +}; + const zipMongoDBMessageBatchPullerStartFrom = ( options: (MongoDBSubscriptionStartFrom | undefined)[], ): MongoDBSubscriptionStartFrom => { @@ -182,12 +198,10 @@ const zipMongoDBMessageBatchPullerStartFrom = ( ); const sorted = positionTokens.sort((a, b) => { - const bufA = Buffer.from(a.lastCheckpoint._data, 'hex'); // or 'base64', depending on encoding - const bufB = Buffer.from(b.lastCheckpoint._data, 'hex'); - return Buffer.compare(bufA, bufB); + return compareTwoTokens(a.lastCheckpoint, b.lastCheckpoint); }); return sorted[0]!; }; -export { subscribe, zipMongoDBMessageBatchPullerStartFrom }; +export { compareTwoTokens, subscribe, zipMongoDBMessageBatchPullerStartFrom }; From fc53348b9c22a47efeae7c8cf31d25099ce9e4a7 Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Sat, 9 Aug 2025 21:27:51 +0200 Subject: [PATCH 09/17] refactor: storeProcessorCheckpoint --- .../consumers/storeProcessorCheckpoint.ts | 107 +++++++----------- 1 file changed, 40 insertions(+), 67 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts index be350e6f..de411c30 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts @@ -2,8 +2,8 @@ import type { MongoClient } from 'mongodb'; import { compareTwoTokens } from './subscriptions'; import type { MongoDBResumeToken } from './subscriptions/types'; import { - DefaultProcessotCheckpointCollectionName, type ReadProcessorCheckpointSqlResult, + DefaultProcessotCheckpointCollectionName, } from './types'; export type StoreLastProcessedProcessorPositionResult< @@ -17,7 +17,15 @@ export type StoreLastProcessedProcessorPositionResult< export const storeProcessorCheckpoint = async ( client: MongoClient, - options: { + { + processorId, + version, + newPosition, + lastProcessedPosition, + partition, + collectionName, + dbName, + }: { processorId: string; version: number; newPosition: null extends Position @@ -35,81 +43,46 @@ export const storeProcessorCheckpoint = async ( > => { try { const checkpoints = client - .db(options.dbName) + .db(dbName) .collection( - options.collectionName || DefaultProcessotCheckpointCollectionName, + collectionName || DefaultProcessotCheckpointCollectionName, ); - const currentCheckpoint = await checkpoints.findOne({ - subscriptionId: options.processorId, - partitionId: options.partition || null, - }); - const matchedCheckpoint = await checkpoints.findOne({ - subscriptionId: options.processorId, - partitionId: options.partition || null, - lastProcessedToken: options.lastProcessedPosition, - }); - if (currentCheckpoint && !matchedCheckpoint) { - return { - success: false, - reason: 'MISMATCH', - }; - } + const filter = { + subscriptionId: processorId, + partitionId: partition || null, + }; - if (matchedCheckpoint?.lastProcessedToken && options?.newPosition) { - const comparison = compareTwoTokens( - matchedCheckpoint.lastProcessedToken, - options.newPosition, - ); + const current = await checkpoints.findOne(filter); - // if the tokens are the same or - // the `currentCheckpoint.lastProcessedToken` is later than the `options.newPosition`. - if (comparison !== -1) { - return { - success: false, - reason: 'IGNORED', - }; + // MISMATCH: we have a checkpoint but lastProcessedPosition doesn’t match + if ( + current && + current.lastProcessedToken?._data !== lastProcessedPosition?._data + ) { + return { success: false, reason: 'MISMATCH' }; + } + + // IGNORED: same or earlier position + if (current?.lastProcessedToken && newPosition) { + if (compareTwoTokens(current.lastProcessedToken, newPosition) !== -1) { + return { success: false, reason: 'IGNORED' }; } } - const result = currentCheckpoint - ? await checkpoints.findOneAndUpdate( - { - subscriptionId: options.processorId, - partitionId: options.partition || null, - lastProcessedToken: options.lastProcessedPosition, - }, - { - $set: { - lastProcessedToken: options.newPosition, - version: options.version, - }, - }, - { - returnDocument: 'after', - }, - ) - : await checkpoints.insertOne({ - subscriptionId: options.processorId, - partitionId: options.partition || null, - lastProcessedToken: options.newPosition, - version: options.version, - }); + const updateResult = await checkpoints.updateOne( + { ...filter, lastProcessedToken: lastProcessedPosition }, + { $set: { lastProcessedToken: newPosition, version } }, + { upsert: true }, + ); + + if (updateResult.matchedCount > 0 || updateResult.upsertedCount > 0) { + return { success: true, newPosition }; + } - return (result && - 'acknowledged' in result && - result.acknowledged && - result.insertedId) || - (result && - 'lastProcessedToken' in result && - result.lastProcessedToken?._data === options.newPosition?._data) - ? { success: true, newPosition: options.newPosition } - : { - success: false, - reason: 'MISMATCH', - }; + return { success: false, reason: 'MISMATCH' }; } catch (error) { - console.log(error); + console.error(error); throw error; } }; From 847580dd2ec74eb85b7928084abb3efd09055349 Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Sat, 9 Aug 2025 21:31:49 +0200 Subject: [PATCH 10/17] chore: eslint fix --- .../consumers/mongoDBEventsConsumer.ts | 2 -- ...rocessorCheckpoint.subscription.e2e.spec.ts | 18 +----------------- 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts index 29835d45..1f58a560 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -37,8 +37,6 @@ import { type MongoDBSubscriptionDocument, } from './subscriptions'; -const noop = () => Promise.resolve(); - export type MessageConsumerOptions< MessageType extends Message = AnyMessage, MessageMetadataType extends diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.subscription.e2e.spec.ts index 5d583d34..774a76c3 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.subscription.e2e.spec.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.subscription.e2e.spec.ts @@ -3,23 +3,15 @@ import { MongoDBContainer, type StartedMongoDBContainer, } from '@testcontainers/mongodb'; -import { MongoClient, type Collection } from 'mongodb'; +import { MongoClient } from 'mongodb'; import { after, before, describe, it } from 'node:test'; -import { - getMongoDBEventStore, - toStreamCollectionName, - type EventStream, - type MongoDBEventStore, -} from '../mongoDBEventStore'; import { readProcessorCheckpoint } from './readProcessorCheckpoint'; import { storeProcessorCheckpoint } from './storeProcessorCheckpoint'; import type { MongoDBResumeToken } from './subscriptions/types'; void describe('storeProcessorCheckpoint and readProcessorCheckpoint tests', () => { let mongodb: StartedMongoDBContainer; - let eventStore: MongoDBEventStore; let client: MongoClient; - let collection: Collection; const processorId = 'processorId-1'; const resumeToken1: MongoDBResumeToken = { @@ -42,14 +34,6 @@ void describe('storeProcessorCheckpoint and readProcessorCheckpoint tests', () = }); await client.connect(); - const db = client.db(); - collection = db.collection( - toStreamCollectionName('shopping_cart'), - ); - - eventStore = getMongoDBEventStore({ - client, - }); }); after(async () => { From fa8cb43d6be3900e0cd24eb828fe7486724c11d6 Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Sat, 9 Aug 2025 21:52:22 +0200 Subject: [PATCH 11/17] feat: handling an unknown Position --- ...pec.ts => processorCheckpoint.e2e.spec.ts} | 0 .../consumers/storeProcessorCheckpoint.ts | 17 +++---- .../consumers/subscriptions/index.ts | 46 +++++++++++++++---- .../consumers/subscriptions/types.ts | 10 ++++ .../src/eventStore/consumers/types.ts | 5 +- 5 files changed, 54 insertions(+), 24 deletions(-) rename src/packages/emmett-mongodb/src/eventStore/consumers/{processorCheckpoint.subscription.e2e.spec.ts => processorCheckpoint.e2e.spec.ts} (100%) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.e2e.spec.ts similarity index 100% rename from src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.subscription.e2e.spec.ts rename to src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.e2e.spec.ts diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts index de411c30..95f62812 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts @@ -1,21 +1,18 @@ import type { MongoClient } from 'mongodb'; import { compareTwoTokens } from './subscriptions'; -import type { MongoDBResumeToken } from './subscriptions/types'; import { type ReadProcessorCheckpointSqlResult, DefaultProcessotCheckpointCollectionName, } from './types'; -export type StoreLastProcessedProcessorPositionResult< - Position extends MongoDBResumeToken | null = MongoDBResumeToken, -> = +export type StoreLastProcessedProcessorPositionResult = | { success: true; newPosition: Position; } | { success: false; reason: 'IGNORED' | 'MISMATCH' }; -export const storeProcessorCheckpoint = async ( +export const storeProcessorCheckpoint = async ( client: MongoClient, { processorId, @@ -28,17 +25,15 @@ export const storeProcessorCheckpoint = async ( }: { processorId: string; version: number; - newPosition: null extends Position - ? MongoDBResumeToken | null - : MongoDBResumeToken; - lastProcessedPosition: MongoDBResumeToken | null; + newPosition: Position; + lastProcessedPosition: Position | null; partition?: string; collectionName?: string; dbName?: string; }, ): Promise< StoreLastProcessedProcessorPositionResult< - null extends Position ? MongoDBResumeToken | null : MongoDBResumeToken + null extends Position ? Position | null : Position > > => { try { @@ -58,7 +53,7 @@ export const storeProcessorCheckpoint = async ( // MISMATCH: we have a checkpoint but lastProcessedPosition doesn’t match if ( current && - current.lastProcessedToken?._data !== lastProcessedPosition?._data + compareTwoTokens(current.lastProcessedToken, lastProcessedPosition) !== 0 ) { return { success: false, reason: 'MISMATCH' }; } diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts index f0564541..15a8a8f1 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts @@ -1,9 +1,10 @@ -import type { - AsyncRetryOptions, - BatchRecordedMessageHandlerWithoutContext, - Event, - Message, - ReadEventMetadataWithGlobalPosition, +import { + IllegalStateError, + type AsyncRetryOptions, + type BatchRecordedMessageHandlerWithoutContext, + type Event, + type Message, + type ReadEventMetadataWithGlobalPosition, } from '@event-driven-io/emmett'; import type { ChangeStreamDeleteDocument, @@ -16,7 +17,7 @@ import type { ResumeToken, } from 'mongodb'; import type { EventStream } from '../../mongoDBEventStore'; -import type { MongoDBResumeToken } from './types'; +import { isMongoDBResumeToken, type MongoDBResumeToken } from './types'; export type MongoDBSubscriptionOptions = { @@ -169,7 +170,7 @@ const subscribe = * @param token2 Token 2. * @returns 0 - if the tokens are the same, 1 - if the token1 is later, -1 - is the token1 is earlier. */ -const compareTwoTokens = ( +const compareTwoMongoDBTokens = ( token1: MongoDBResumeToken, token2: MongoDBResumeToken, ) => { @@ -179,6 +180,26 @@ const compareTwoTokens = ( return Buffer.compare(bufA, bufB); }; +const compareTwoTokens = (token1: unknown, token2: unknown) => { + if (token1 === null && token2) { + return -1; + } + + if (token1 && token2 === null) { + return 1; + } + + if (token1 === null && token2 === null) { + return 0; + } + + if (isMongoDBResumeToken(token1) && isMongoDBResumeToken(token2)) { + return compareTwoMongoDBTokens(token1, token2); + } + + throw new IllegalStateError(`Type of tokens is not comparable`); +}; + const zipMongoDBMessageBatchPullerStartFrom = ( options: (MongoDBSubscriptionStartFrom | undefined)[], ): MongoDBSubscriptionStartFrom => { @@ -198,10 +219,15 @@ const zipMongoDBMessageBatchPullerStartFrom = ( ); const sorted = positionTokens.sort((a, b) => { - return compareTwoTokens(a.lastCheckpoint, b.lastCheckpoint); + return compareTwoMongoDBTokens(a.lastCheckpoint, b.lastCheckpoint); }); return sorted[0]!; }; -export { compareTwoTokens, subscribe, zipMongoDBMessageBatchPullerStartFrom }; +export { + compareTwoMongoDBTokens, + compareTwoTokens, + subscribe, + zipMongoDBMessageBatchPullerStartFrom, +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/types.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/types.ts index 59a35868..34c876db 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/types.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/types.ts @@ -1 +1,11 @@ export type MongoDBResumeToken = Readonly<{ _data: string }>; +export const isMongoDBResumeToken = ( + value: unknown, +): value is MongoDBResumeToken => { + return !!( + typeof value === 'object' && + value && + '_data' in value && + typeof value._data === 'string' + ); +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts index 3f4b63fb..3886d764 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts @@ -1,11 +1,10 @@ import { toStreamCollectionName } from '../mongoDBEventStore'; -import type { MongoDBResumeToken } from './subscriptions/types'; export const DefaultProcessotCheckpointCollectionName = toStreamCollectionName(`processors`); -export type ReadProcessorCheckpointSqlResult = { - lastProcessedToken: MongoDBResumeToken | null; +export type ReadProcessorCheckpointSqlResult = { + lastProcessedToken: Position; subscriptionId: string; partitionId: string | null; version: number; From 4c03a71b67b46def8713d14920f509ead2a4f1e7 Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Sun, 10 Aug 2025 22:39:56 +0200 Subject: [PATCH 12/17] feat: starting from the earliest position --- .../consumers/mongoDBEventsConsumer.ts | 31 ++++---- .../eventStore/consumers/mongoDBProcessor.ts | 27 ++----- .../consumers/subscriptions/index.ts | 75 +++++++++++-------- .../emmett-mongodb/src/eventStore/event.ts | 6 +- ...mongoDBEventStore.subscription.e2e.spec.ts | 4 +- 5 files changed, 72 insertions(+), 71 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts index 1f58a560..8fd2e4be 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -15,10 +15,7 @@ import { } from '@event-driven-io/emmett'; import { ChangeStream, MongoClient, type MongoClientOptions } from 'mongodb'; import { v4 as uuid } from 'uuid'; -import type { - MongoDBRecordedMessageMetadata, - ReadEventMetadataWithGlobalPosition, -} from '../event'; +import type { MongoDBRecordedMessageMetadata } from '../event'; import type { EventStream, MongoDBReadEventMetadata, @@ -145,11 +142,15 @@ type OplogChange< | FullDocument | UpdateDescription>; +export type MongoDBConsumerHandlerContext = { + client?: MongoClient; +}; export const mongoDBMessagesConsumer = < ConsumerMessageType extends Message = AnyMessage, MessageMetadataType extends MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, - HandlerContext extends DefaultRecord | undefined = undefined, + HandlerContext extends + MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext, CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, >( options: MongoDBConsumerOptions< @@ -215,7 +216,7 @@ export const mongoDBMessagesConsumer = < return processor; }, - start: (context: Partial) => { + start: () => { start = (async () => { if (processors.length === 0) return Promise.reject( @@ -227,13 +228,13 @@ export const mongoDBMessagesConsumer = < isRunning = true; const positions = await Promise.all( - processors.map((o) => o.start(context)), + processors.map((o) => o.start({ client } as Partial)), ); - const startFrom = zipMongoDBMessageBatchPullerStartFrom(positions); + const startFrom = + zipMongoDBMessageBatchPullerStartFrom(positions); + + stream = subscribe(startFrom); - stream = subscribe( - typeof startFrom !== 'string' ? startFrom.lastCheckpoint : void 0, - ); stream.on('change', async (change) => { const resumeToken = change._id; const typedChange = change as OplogChange; @@ -263,14 +264,14 @@ export const mongoDBMessagesConsumer = < }, } as unknown as RecordedMessage< ConsumerMessageType, - ReadEventMetadataWithGlobalPosition + MessageMetadataType >; }); for (const processor of processors.filter( ({ isActive }) => isActive, )) { - await processor.handle(messages, { client }); + await processor.handle(messages, { client } as Partial); } }); })(); @@ -278,10 +279,12 @@ export const mongoDBMessagesConsumer = < return start; }, stop: async () => { - return Promise.resolve(); + await stream.close(); + isRunning = false; }, close: async () => { await stream.close(); + isRunning = false; }, }; }; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts index a0ea47e4..e5ce285b 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts @@ -15,10 +15,7 @@ import { reactor, } from '@event-driven-io/emmett'; import { MongoClient } from 'mongodb'; -import type { - ReadEventMetadataWithGlobalPosition, - StringStreamPosition, -} from '../event'; +import type { ReadEventMetadataWithGlobalPosition } from '../event'; import type { MongoDBEventStoreConnectionOptions } from '../mongoDBEventStore'; import { readProcessorCheckpoint } from './readProcessorCheckpoint'; import { storeProcessorCheckpoint } from './storeProcessorCheckpoint'; @@ -30,22 +27,14 @@ type MongoDBConnectionOptions = { export type MongoDBProcessorHandlerContext = { client: MongoClient; - // execute: SQLExecutor; - // connection: { - // connectionString: string; - // client: NodePostgresClient; - // transaction: NodePostgresTransaction; - // pool: Dumbo; - // }; }; -export type CommonRecordedMessageMetadata< - StreamPosition = StringStreamPosition, -> = Readonly<{ - messageId: string; - streamPosition: StreamPosition; - streamName: string; -}>; +export type CommonRecordedMessageMetadata = + Readonly<{ + messageId: string; + streamPosition: StreamPosition; + streamName: string; + }>; export type WithGlobalPosition = Readonly<{ globalPosition: GlobalPosition; @@ -53,7 +42,7 @@ export type WithGlobalPosition = Readonly<{ export type RecordedMessageMetadata< GlobalPosition = undefined, - StreamPosition = StringStreamPosition, + StreamPosition = MongoDBResumeToken, > = CommonRecordedMessageMetadata & // eslint-disable-next-line @typescript-eslint/no-empty-object-type (GlobalPosition extends undefined ? {} : WithGlobalPosition); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts index 15a8a8f1..b5cdd227 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts @@ -2,19 +2,20 @@ import { IllegalStateError, type AsyncRetryOptions, type BatchRecordedMessageHandlerWithoutContext, + type CurrentMessageProcessorPosition, type Event, type Message, type ReadEventMetadataWithGlobalPosition, } from '@event-driven-io/emmett'; -import type { - ChangeStreamDeleteDocument, - ChangeStreamInsertDocument, - ChangeStreamReplaceDocument, - ChangeStreamUpdateDocument, - Db, - Document, - MongoClient, - ResumeToken, +import { + Timestamp, + type ChangeStreamDeleteDocument, + type ChangeStreamInsertDocument, + type ChangeStreamReplaceDocument, + type ChangeStreamUpdateDocument, + type Db, + type Document, + type MongoClient, } from 'mongodb'; import type { EventStream } from '../../mongoDBEventStore'; import { isMongoDBResumeToken, type MongoDBResumeToken } from './types'; @@ -58,23 +59,12 @@ export type BuildInfo = { ok: number; }; export type MongoDBSubscriptionStartFrom = - | { lastCheckpoint: MongoDBResumeToken } - | 'BEGINNING' - | 'END'; + CurrentMessageProcessorPosition; export type MongoDBSubscriptionStartOptions = { startFrom: MongoDBSubscriptionStartFrom; }; -// export type MongoDBEventStoreConsumerType = -// | { -// stream: $all; -// options?: Exclude; -// } -// | { -// stream: string; -// options?: Exclude; -// }; const REGEXP = /^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$/; @@ -124,21 +114,22 @@ export const generateVersionPolicies = async (db: Db) => { }; }; -const createChangeStream = ( +const DEFAULT_PARTITION_KEY_NAME = 'default'; +const createChangeStream = < + EventType extends Event = Event, + CheckpointType = any, +>( getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, db: Db, - // messages: Collection>, - // partitionKey: string, - resumeToken?: ResumeToken, + resumeToken?: CurrentMessageProcessorPosition, + partitionKey: string = DEFAULT_PARTITION_KEY_NAME, ) => { - //: Partial>> const $match = { 'ns.coll': { $regex: /^emt:/, $ne: 'emt:processors' }, $or: [ { operationType: 'insert' }, { operationType: 'update', - // 'updateDescription.updatedFields.messages': { $exists: true }, }, ], // 'fullDocument.partitionKey': partitionKey, @@ -154,13 +145,31 @@ const createChangeStream = ( MongoDBSubscriptionDocument> >(pipeline, { fullDocument: getFullDocumentValue(), - startAfter: resumeToken, + ...(resumeToken === 'BEGINNING' + ? { + /* + The MongoDB's API is designed around starting from now or resuming from a known position + (resumeAfter, startAfter, or startAtOperationTime). + By passing a date set a long time ago (year 2000), we force MongoDB to start + from the earliest possible position in the oplog. + If the retention is 48 hours, then it will be 24 hours back. + */ + startAtOperationTime: new Timestamp({ + t: 946684800, + i: 0, + }), + } + : resumeToken === 'END' + ? void 0 + : resumeToken?.lastCheckpoint), }); }; const subscribe = (getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, db: Db) => - (resumeToken?: ResumeToken) => { + ( + resumeToken?: CurrentMessageProcessorPosition, + ) => { return createChangeStream(getFullDocumentValue, db, resumeToken); }; @@ -200,9 +209,9 @@ const compareTwoTokens = (token1: unknown, token2: unknown) => { throw new IllegalStateError(`Type of tokens is not comparable`); }; -const zipMongoDBMessageBatchPullerStartFrom = ( - options: (MongoDBSubscriptionStartFrom | undefined)[], -): MongoDBSubscriptionStartFrom => { +const zipMongoDBMessageBatchPullerStartFrom = ( + options: (CurrentMessageProcessorPosition | undefined)[], +): CurrentMessageProcessorPosition => { if ( options.length === 0 || options.some((o) => o === undefined || o === 'BEGINNING') @@ -219,7 +228,7 @@ const zipMongoDBMessageBatchPullerStartFrom = ( ); const sorted = positionTokens.sort((a, b) => { - return compareTwoMongoDBTokens(a.lastCheckpoint, b.lastCheckpoint); + return compareTwoTokens(a.lastCheckpoint, b.lastCheckpoint); }); return sorted[0]!; diff --git a/src/packages/emmett-mongodb/src/eventStore/event.ts b/src/packages/emmett-mongodb/src/eventStore/event.ts index b5418520..341ae716 100644 --- a/src/packages/emmett-mongodb/src/eventStore/event.ts +++ b/src/packages/emmett-mongodb/src/eventStore/event.ts @@ -4,12 +4,10 @@ import type { } from '@event-driven-io/emmett'; import type { MongoDBResumeToken } from './consumers/subscriptions/types'; -export type StringStreamPosition = MongoDBResumeToken; -export type StringGlobalPosition = MongoDBResumeToken; export type ReadEventMetadataWithGlobalPosition< - GlobalPosition extends StringGlobalPosition = StringGlobalPosition, + GlobalPosition extends MongoDBResumeToken = MongoDBResumeToken, > = RecordedMessageMetadataWithGlobalPosition; export type MongoDBRecordedMessageMetadata = RecordedMessageMetadata< - StringGlobalPosition, + MongoDBResumeToken, undefined >; diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts index 6a2bc530..6dbb22f0 100644 --- a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts @@ -184,12 +184,14 @@ void describe('MongoDBEventStore subscription', () => { { expectedStreamVersion: 2n }, ); + await timeoutGuard(() => messageProcessingPromise); + const stream = await collection.findOne( { streamName }, { useBigInt64: true }, ); - await timeoutGuard(() => messageProcessingPromise); + await consumer.stop(); assertIsNotNull(stream); assertEqual(3n, stream.metadata.streamPosition); From e4d5904b4658297ccd16c030315ed0e699aea4cc Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Mon, 11 Aug 2025 23:28:53 +0200 Subject: [PATCH 13/17] fix: eslint all fixed --- .../consumers/mongoDBEventsConsumer.ts | 4 +- .../eventStore/consumers/mongoDBProcessor.ts | 50 +++++++++----- .../consumers/readProcessorCheckpoint.ts | 13 ++-- .../consumers/storeProcessorCheckpoint.ts | 69 +++++++++---------- .../consumers/subscriptions/index.ts | 7 +- .../src/eventStore/consumers/types.ts | 3 +- .../emmett-mongodb/src/eventStore/event.ts | 2 +- .../emmett-mongodb/src/eventStore/example.ts | 1 + 8 files changed, 82 insertions(+), 67 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts index 8fd2e4be..723f1852 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -271,7 +271,9 @@ export const mongoDBMessagesConsumer = < for (const processor of processors.filter( ({ isActive }) => isActive, )) { - await processor.handle(messages, { client } as Partial); + await processor.handle(messages, { + client, + } as Partial); } }); })(); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts index e5ce285b..74919e43 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts @@ -3,7 +3,6 @@ import { type AnyMessage, type Checkpointer, type Event, - type GlobalPositionTypeOfRecordedMessageMetadata, type Message, type MessageHandlerResult, type MessageProcessingScope, @@ -64,12 +63,16 @@ export type MongoDBProcessorOptions = MongoDBProcessorHandlerContext > & { connectionOptions: MongoDBEventStoreConnectionOptions }; -export type MongoDBCheckpointer = - Checkpointer< - MessageType, - ReadEventMetadataWithGlobalPosition, - MongoDBProcessorHandlerContext - >; +export type MongoDBCheckpointer< + MessageType extends AnyMessage = AnyMessage, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + CheckpointType = any, +> = Checkpointer< + MessageType, + ReadEventMetadataWithGlobalPosition, + MongoDBProcessorHandlerContext, + CheckpointType +>; export type MongoDBProjectorOptions = ProjectorOptions< @@ -87,9 +90,10 @@ const isResumeToken = (value: any): value is MongoDBResumeToken => export const getCheckpoint = < MessageType extends AnyMessage = AnyMessage, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + CheckpointType = any, MessageMetadataType extends - ReadEventMetadataWithGlobalPosition = ReadEventMetadataWithGlobalPosition, - CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, + ReadEventMetadataWithGlobalPosition = ReadEventMetadataWithGlobalPosition, >( message: RecordedMessage, ): CheckpointType | null => { @@ -114,24 +118,32 @@ export const getCheckpoint = < export const mongoDBCheckpointer = < MessageType extends Message = Message, ->(): MongoDBCheckpointer => ({ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + CheckpointType = any, +>(): MongoDBCheckpointer => ({ read: async (options, context) => { - const result = await readProcessorCheckpoint(context.client, options); + const result = await readProcessorCheckpoint( + context.client, + options, + ); return { lastCheckpoint: result?.lastProcessedPosition }; }, store: async (options, context) => { - const newPosition: MongoDBResumeToken | null = getCheckpoint( + const newPosition = getCheckpoint( options.message, ); - const result = await storeProcessorCheckpoint(context.client, { - lastProcessedPosition: options.lastCheckpoint, - newPosition, - processorId: options.processorId, - partition: options.partition, - version: options.version, - }); + const result = await storeProcessorCheckpoint( + context.client, + { + lastProcessedPosition: options.lastCheckpoint, + newPosition, + processorId: options.processorId, + partition: options.partition, + version: options.version || 0, + }, + ); return result.success ? { success: true, newCheckpoint: result.newPosition } diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts index fc28c266..b6075d53 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts @@ -1,15 +1,16 @@ import type { MongoClient } from 'mongodb'; -import type { MongoDBResumeToken } from './subscriptions/types'; import { DefaultProcessotCheckpointCollectionName, type ReadProcessorCheckpointSqlResult, } from './types'; -export type ReadProcessorCheckpointResult = { - lastProcessedPosition: MongoDBResumeToken | null; +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type ReadProcessorCheckpointResult = { + lastProcessedPosition: CheckpointType | null; }; -export const readProcessorCheckpoint = async ( +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const readProcessorCheckpoint = async ( client: MongoClient, options: { processorId: string; @@ -17,10 +18,10 @@ export const readProcessorCheckpoint = async ( collectionName?: string; databaseName?: string; }, -): Promise => { +): Promise> => { const result = await client .db(options.databaseName) - .collection( + .collection>( options.collectionName || DefaultProcessotCheckpointCollectionName, ) .findOne({ diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts index 95f62812..4fb21ea3 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts @@ -25,7 +25,7 @@ export const storeProcessorCheckpoint = async ( }: { processorId: string; version: number; - newPosition: Position; + newPosition: Position | null; lastProcessedPosition: Position | null; partition?: string; collectionName?: string; @@ -36,48 +36,43 @@ export const storeProcessorCheckpoint = async ( null extends Position ? Position | null : Position > > => { - try { - const checkpoints = client - .db(dbName) - .collection( - collectionName || DefaultProcessotCheckpointCollectionName, - ); + const checkpoints = client + .db(dbName) + .collection( + collectionName || DefaultProcessotCheckpointCollectionName, + ); - const filter = { - subscriptionId: processorId, - partitionId: partition || null, - }; + const filter = { + subscriptionId: processorId, + partitionId: partition || null, + }; - const current = await checkpoints.findOne(filter); + const current = await checkpoints.findOne(filter); - // MISMATCH: we have a checkpoint but lastProcessedPosition doesn’t match - if ( - current && - compareTwoTokens(current.lastProcessedToken, lastProcessedPosition) !== 0 - ) { - return { success: false, reason: 'MISMATCH' }; - } + // MISMATCH: we have a checkpoint but lastProcessedPosition doesn’t match + if ( + current && + compareTwoTokens(current.lastProcessedToken, lastProcessedPosition) !== 0 + ) { + return { success: false, reason: 'MISMATCH' }; + } - // IGNORED: same or earlier position - if (current?.lastProcessedToken && newPosition) { - if (compareTwoTokens(current.lastProcessedToken, newPosition) !== -1) { - return { success: false, reason: 'IGNORED' }; - } + // IGNORED: same or earlier position + if (current?.lastProcessedToken && newPosition) { + if (compareTwoTokens(current.lastProcessedToken, newPosition) !== -1) { + return { success: false, reason: 'IGNORED' }; } + } - const updateResult = await checkpoints.updateOne( - { ...filter, lastProcessedToken: lastProcessedPosition }, - { $set: { lastProcessedToken: newPosition, version } }, - { upsert: true }, - ); - - if (updateResult.matchedCount > 0 || updateResult.upsertedCount > 0) { - return { success: true, newPosition }; - } + const updateResult = await checkpoints.updateOne( + { ...filter, lastProcessedToken: lastProcessedPosition }, + { $set: { lastProcessedToken: newPosition, version } }, + { upsert: true }, + ); - return { success: false, reason: 'MISMATCH' }; - } catch (error) { - console.error(error); - throw error; + if (updateResult.matchedCount > 0 || updateResult.upsertedCount > 0) { + return { success: true, newPosition }; } + + return { success: false, reason: 'MISMATCH' }; }; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts index b5cdd227..d9d3c7aa 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts @@ -114,15 +114,16 @@ export const generateVersionPolicies = async (db: Db) => { }; }; -const DEFAULT_PARTITION_KEY_NAME = 'default'; +// const DEFAULT_PARTITION_KEY_NAME = 'default'; const createChangeStream = < EventType extends Event = Event, + // eslint-disable-next-line @typescript-eslint/no-explicit-any CheckpointType = any, >( getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, db: Db, resumeToken?: CurrentMessageProcessorPosition, - partitionKey: string = DEFAULT_PARTITION_KEY_NAME, + // partitionKey: string = DEFAULT_PARTITION_KEY_NAME, ) => { const $match = { 'ns.coll': { $regex: /^emt:/, $ne: 'emt:processors' }, @@ -167,6 +168,7 @@ const createChangeStream = < const subscribe = (getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, db: Db) => + // eslint-disable-next-line @typescript-eslint/no-explicit-any ( resumeToken?: CurrentMessageProcessorPosition, ) => { @@ -209,6 +211,7 @@ const compareTwoTokens = (token1: unknown, token2: unknown) => { throw new IllegalStateError(`Type of tokens is not comparable`); }; +// eslint-disable-next-line @typescript-eslint/no-explicit-any const zipMongoDBMessageBatchPullerStartFrom = ( options: (CurrentMessageProcessorPosition | undefined)[], ): CurrentMessageProcessorPosition => { diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts index 3886d764..2184c762 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts @@ -3,7 +3,8 @@ import { toStreamCollectionName } from '../mongoDBEventStore'; export const DefaultProcessotCheckpointCollectionName = toStreamCollectionName(`processors`); -export type ReadProcessorCheckpointSqlResult = { +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type ReadProcessorCheckpointSqlResult = { lastProcessedToken: Position; subscriptionId: string; partitionId: string | null; diff --git a/src/packages/emmett-mongodb/src/eventStore/event.ts b/src/packages/emmett-mongodb/src/eventStore/event.ts index 341ae716..795282c7 100644 --- a/src/packages/emmett-mongodb/src/eventStore/event.ts +++ b/src/packages/emmett-mongodb/src/eventStore/event.ts @@ -5,7 +5,7 @@ import type { import type { MongoDBResumeToken } from './consumers/subscriptions/types'; export type ReadEventMetadataWithGlobalPosition< - GlobalPosition extends MongoDBResumeToken = MongoDBResumeToken, + GlobalPosition = MongoDBResumeToken, > = RecordedMessageMetadataWithGlobalPosition; export type MongoDBRecordedMessageMetadata = RecordedMessageMetadata< MongoDBResumeToken, diff --git a/src/packages/emmett-mongodb/src/eventStore/example.ts b/src/packages/emmett-mongodb/src/eventStore/example.ts index b9fddbf2..49f809ea 100644 --- a/src/packages/emmett-mongodb/src/eventStore/example.ts +++ b/src/packages/emmett-mongodb/src/eventStore/example.ts @@ -44,4 +44,5 @@ const main = async () => { }); }; +// eslint-disable-next-line @typescript-eslint/no-floating-promises main(); From ead380c0bf21cd4cac919fdfcc587f6be5c012fc Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Fri, 15 Aug 2025 20:15:57 +0200 Subject: [PATCH 14/17] feat: processing messages one by one --- .../consumers/mongoDBEventsConsumer.ts | 132 ++++++++++----- .../consumers/storeProcessorCheckpoint.ts | 2 +- .../consumers/subscriptions/index.ts | 9 +- ...mongoDBEventStore.subscription.e2e.spec.ts | 155 ++++++++++++------ 4 files changed, 200 insertions(+), 98 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts index 723f1852..c34bcda4 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -20,6 +20,7 @@ import type { EventStream, MongoDBReadEventMetadata, } from '../mongoDBEventStore'; +import { CancellationPromise } from './CancellablePromise'; import { changeStreamReactor, mongoDBProjector, @@ -168,6 +169,7 @@ export const mongoDBMessagesConsumer = < > >; let isRunning = false; + let runningPromise = new CancellationPromise(); const client = 'client' in options && options.client ? options.client @@ -227,6 +229,8 @@ export const mongoDBMessagesConsumer = < isRunning = true; + runningPromise = new CancellationPromise(); + const positions = await Promise.all( processors.map((o) => o.start({ client } as Partial)), ); @@ -235,58 +239,104 @@ export const mongoDBMessagesConsumer = < stream = subscribe(startFrom); - stream.on('change', async (change) => { - const resumeToken = change._id; - const typedChange = change as OplogChange; - const streamChange = - 'updateDescription' in typedChange - ? { - messages: Object.entries( - typedChange.updateDescription.updatedFields, - ) - .filter(([key]) => key.startsWith('messages.')) - .map(([, value]) => value as ReadEvent), - } - : typedChange.fullDocument; + void (async () => { + while (!stream.closed && isRunning) { + const hasNext = await Promise.race([ + stream.hasNext(), + runningPromise, + ]); - if (!streamChange) { - return; - } + if (hasNext === null) { + break; + } + + if (!hasNext) { + continue; + } + + const change = await stream.next(); + const resumeToken = change._id; + const typedChange = change as OplogChange; + const streamChange = + 'updateDescription' in typedChange + ? { + messages: Object.entries( + typedChange.updateDescription.updatedFields, + ) + .filter(([key]) => key.startsWith('messages.')) + .map(([, value]) => value as ReadEvent), + } + : typedChange.fullDocument; - const messages = streamChange.messages.map((message) => { - return { - kind: message.kind, - type: message.type, - data: message.data, - metadata: { - ...message.metadata, - streamPosition: resumeToken, - }, - } as unknown as RecordedMessage< - ConsumerMessageType, - MessageMetadataType - >; - }); + if (!streamChange) { + return; + } - for (const processor of processors.filter( - ({ isActive }) => isActive, - )) { - await processor.handle(messages, { - client, - } as Partial); + const messages = streamChange.messages.map((message) => { + return { + kind: message.kind, + type: message.type, + data: message.data, + metadata: { + ...message.metadata, + globalPosition: resumeToken, + }, + } as unknown as RecordedMessage< + ConsumerMessageType, + MessageMetadataType + >; + }); + + for (const processor of processors.filter( + ({ isActive }) => isActive, + )) { + await processor.handle(messages, { + client, + } as Partial); + } } - }); + + console.log('END'); + })(); })(); return start; }, stop: async () => { - await stream.close(); - isRunning = false; + if (stream) { + await stream.close(); + isRunning = false; + runningPromise.resolve(null); + } }, close: async () => { - await stream.close(); - isRunning = false; + if (stream) { + await stream.close(); + isRunning = false; + runningPromise.resolve(null); + } }, }; }; + +export const mongoDBChangeStreamMessagesConsumer = < + ConsumerMessageType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, + HandlerContext extends + MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext, + CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, +>( + options: MongoDBConsumerOptions< + ConsumerMessageType, + MessageMetadataType, + HandlerContext, + CheckpointType + >, +): MongoDBEventStoreConsumer => + mongoDBMessagesConsumer< + ConsumerMessageType, + MessageMetadataType, + HandlerContext, + CheckpointType + >(options); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts index 4fb21ea3..ee17cae0 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts @@ -71,7 +71,7 @@ export const storeProcessorCheckpoint = async ( ); if (updateResult.matchedCount > 0 || updateResult.upsertedCount > 0) { - return { success: true, newPosition }; + return { success: true, newPosition: newPosition! }; } return { success: false, reason: 'MISMATCH' }; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts index d9d3c7aa..c6ded0a5 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts @@ -1,4 +1,5 @@ import { + EmmettError, IllegalStateError, type AsyncRetryOptions, type BatchRecordedMessageHandlerWithoutContext, @@ -84,11 +85,9 @@ export const generateVersionPolicies = async (db: Db) => { const semver = parseSemVer(buildInfo.version); const major = semver.major || 0; const throwNotSupportedError = (): never => { - throw new Error(); - // throw new NotSupportedMongoVersionError({ - // currentVersion: buildInfo.version, - // supportedVersions: SupportedMajorMongoVersions, - // }); + throw new EmmettError( + `Not supported MongoDB version: ${buildInfo.version}.`, + ); }; const supportedVersionCheckPolicy = () => { diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts index 6dbb22f0..b3a87af0 100644 --- a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts @@ -1,6 +1,7 @@ import { assertEqual, assertIsNotNull, + assertNotEqual, assertTrue, STREAM_DOES_NOT_EXIST, } from '@event-driven-io/emmett'; @@ -8,8 +9,9 @@ import { MongoDBContainer, type StartedMongoDBContainer, } from '@testcontainers/mongodb'; +import assert from 'assert'; import { MongoClient, type Collection } from 'mongodb'; -import { after, before, beforeEach, describe, it } from 'node:test'; +import { after, before, describe, it } from 'node:test'; import { v4 as uuid, v4 } from 'uuid'; import { getMongoDBEventStore, @@ -28,8 +30,12 @@ import { mongoDBMessagesConsumer, type MongoDBEventStoreConsumer, } from './consumers/mongoDBEventsConsumer'; -import { changeStreamReactor } from './consumers/mongoDBProcessor'; -import { generateVersionPolicies } from './consumers/subscriptions'; +import type { MongoDBProcessor } from './consumers/mongoDBProcessor'; +import { + compareTwoMongoDBTokens, + generateVersionPolicies, +} from './consumers/subscriptions'; +import type { MongoDBResumeToken } from './consumers/subscriptions/types'; void describe('MongoDBEventStore subscription', () => { let mongodb: StartedMongoDBContainer; @@ -37,9 +43,30 @@ void describe('MongoDBEventStore subscription', () => { let client: MongoClient; let collection: Collection; let consumer: MongoDBEventStoreConsumer; - let messageProcessingPromise = new CancellationPromise(); + let processor: MongoDBProcessor | undefined; + let lastResumeToken: MongoDBResumeToken | null = null; + + const messageProcessingPromise1 = new CancellationPromise(); + const messageProcessingPromise2 = new CancellationPromise(); + const lastProductItemIdTest1 = '789'; + const lastProductItemIdTest2 = '999'; + const expectedProductItemIds = [ + '123', + '456', + lastProductItemIdTest1, + lastProductItemIdTest2, + ] as const; + const shoppingCartId = uuid(); + const streamType = 'shopping_cart'; + const streamName = toStreamName(streamType, shoppingCartId); const noop = () => {}; + const productItem = (productId: string) => + ({ + productId, + quantity: 10, + price: 3, + }) as PricedProductItem; const timeoutGuard = async ( action: () => Promise, timeoutAfterMs = 1000, @@ -84,66 +111,40 @@ void describe('MongoDBEventStore subscription', () => { }); after(async () => { - try { - if (consumer) { - await consumer.close(); - } - await client.close(); - await mongodb.stop(); - } catch (error) { - console.log(error); + if (consumer) { + await consumer.close(); } + await client.close(); + await mongodb.stop(); }); - beforeEach(() => { - messageProcessingPromise = new CancellationPromise(); - }); - - void it('should create a new stream with metadata with appendToStream', async () => { - const productItem = (productId: string) => - ({ - productId, - quantity: 10, - price: 3, - }) as PricedProductItem; - const shoppingCartId = uuid(); - const streamType = 'shopping_cart'; - const streamName = toStreamName(streamType, shoppingCartId); - const lastProductItemId = '789'; - const expectedProductItemIds = ['123', '456', lastProductItemId] as const; + void it('should react to new events added by the appendToStream', async () => { let receivedMessageCount: 0 | 1 | 2 = 0; - changeStreamReactor({ - connectionOptions: { - client, - }, - processorId: v4(), - eachMessage: (event) => { - assertTrue(receivedMessageCount <= 2); - assertEqual( - expectedProductItemIds[receivedMessageCount], - event.data.productItem.productId, - ); - if (event.data.productItem.productId === lastProductItemId) { - messageProcessingPromise.resolve(); + processor = consumer.reactor({ + processorId: v4(), + stopAfter: (event) => { + if (event.data.productItem.productId === lastProductItemIdTest1) { + messageProcessingPromise1.resolve(); + } + if (event.data.productItem.productId === lastProductItemIdTest2) { + messageProcessingPromise2.resolve(); } - receivedMessageCount++; + return ( + event.data.productItem.productId === lastProductItemIdTest1 || + event.data.productItem.productId === lastProductItemIdTest2 + ); }, - }); - consumer.reactor({ - processorId: v4(), eachMessage: (event) => { - assertTrue(receivedMessageCount <= 2); + lastResumeToken = event.metadata.globalPosition; + + assertTrue(receivedMessageCount <= 3); assertEqual( expectedProductItemIds[receivedMessageCount], event.data.productItem.productId, ); - if (event.data.productItem.productId === lastProductItemId) { - messageProcessingPromise.resolve(); - } - receivedMessageCount++; }, connectionOptions: { @@ -184,7 +185,7 @@ void describe('MongoDBEventStore subscription', () => { { expectedStreamVersion: 2n }, ); - await timeoutGuard(() => messageProcessingPromise); + await timeoutGuard(() => messageProcessingPromise1); const stream = await collection.findOne( { streamName }, @@ -200,4 +201,56 @@ void describe('MongoDBEventStore subscription', () => { assertTrue(stream.metadata.createdAt instanceof Date); assertTrue(stream.metadata.updatedAt instanceof Date); }); + + void it('should renew after the last event', async () => { + assertTrue(!!processor); + assert(processor); + + let stream = await collection.findOne( + { streamName }, + { useBigInt64: true }, + ); + assertIsNotNull(stream); + assertEqual(3n, stream.metadata.streamPosition); + + await consumer.start(); + + const position = await processor.start({ client }); + + assertTrue(!!position); + assertNotEqual(typeof position, 'string'); + assert(position); + assert(typeof position !== 'string'); + + // processor after restart is renewed after the 3rd position. + assertEqual( + 0, + compareTwoMongoDBTokens(position.lastCheckpoint, lastResumeToken!), + ); + + await eventStore.appendToStream( + streamName, + [ + { + type: 'ProductItemAdded', + data: { productItem: productItem(expectedProductItemIds[3]) }, + }, + ], + { expectedStreamVersion: 3n }, + ); + + await timeoutGuard(() => messageProcessingPromise2); + + stream = await collection.findOne({ streamName }, { useBigInt64: true }); + assertIsNotNull(stream); + assertEqual(4n, stream.metadata.streamPosition); + + // lastResumeToken has changed after the last message + assertEqual( + 1, + compareTwoMongoDBTokens(lastResumeToken!, position.lastCheckpoint), + ); + + await consumer.stop(); + }); }); From 13afb1e06afaeb132993eb43ebc3367ced4dc398 Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Fri, 15 Aug 2025 20:17:16 +0200 Subject: [PATCH 15/17] fix: removed incorrect change --- .../src/eventStore/schema/readProcessorCheckpoint.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/readProcessorCheckpoint.ts b/src/packages/emmett-postgresql/src/eventStore/schema/readProcessorCheckpoint.ts index 8611dea5..fb0fd042 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/readProcessorCheckpoint.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/readProcessorCheckpoint.ts @@ -2,8 +2,6 @@ import { singleOrNull, sql, type SQLExecutor } from '@event-driven-io/dumbo'; import { defaultTag, subscriptionsTable } from './typing'; type ReadProcessorCheckpointSqlResult = { - subscriptionId: string; - partitionId: string | null; last_processed_position: string; }; From 6f787605c3dd8c8944df91862e59119db640f16f Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Thu, 11 Sep 2025 22:24:26 +0200 Subject: [PATCH 16/17] test: fix --- .../consumers/mongoDBEventsConsumer.ts | 148 ++----- .../eventStore/consumers/mongoDBProcessor.ts | 3 +- .../consumers/subscriptions/index.ts | 409 ++++++++++++++++-- .../emmett-mongodb/src/eventStore/event.ts | 3 +- ...mongoDBEventStore.subscription.e2e.spec.ts | 14 +- 5 files changed, 434 insertions(+), 143 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts index c34bcda4..c21be3a8 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -4,22 +4,16 @@ import { type AnyEvent, type AnyMessage, type AsyncRetryOptions, - type CommonRecordedMessageMetadata, type DefaultRecord, - type Event, type GlobalPositionTypeOfRecordedMessageMetadata, type Message, type MessageConsumer, - type ReadEvent, type RecordedMessage, } from '@event-driven-io/emmett'; -import { ChangeStream, MongoClient, type MongoClientOptions } from 'mongodb'; +import { MongoClient, type MongoClientOptions } from 'mongodb'; import { v4 as uuid } from 'uuid'; import type { MongoDBRecordedMessageMetadata } from '../event'; -import type { - EventStream, - MongoDBReadEventMetadata, -} from '../mongoDBEventStore'; +import type { MongoDBReadEventMetadata } from '../mongoDBEventStore'; import { CancellationPromise } from './CancellablePromise'; import { changeStreamReactor, @@ -29,16 +23,18 @@ import { type MongoDBProjectorOptions, } from './mongoDBProcessor'; import { - subscribe as _subscribe, + generateVersionPolicies, + mongoDBSubscription, zipMongoDBMessageBatchPullerStartFrom, type ChangeStreamFullDocumentValuePolicy, - type MongoDBSubscriptionDocument, + type MongoDBSubscription, } from './subscriptions'; +import type { MongoDBResumeToken } from './subscriptions/types'; export type MessageConsumerOptions< MessageType extends Message = AnyMessage, MessageMetadataType extends - MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, HandlerContext extends DefaultRecord | undefined = undefined, CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, > = { @@ -56,7 +52,7 @@ export type MongoDBEventStoreConsumerConfig< // eslint-disable-next-line @typescript-eslint/no-explicit-any ConsumerMessageType extends Message = any, MessageMetadataType extends - MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, HandlerContext extends DefaultRecord | undefined = undefined, CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, > = MessageConsumerOptions< @@ -78,7 +74,7 @@ export type MongoDBEventStoreConsumerConfig< export type MongoDBConsumerOptions< ConsumerEventType extends Message = Message, MessageMetadataType extends - MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, HandlerContext extends DefaultRecord | undefined = undefined, CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, > = MongoDBEventStoreConsumerConfig< @@ -119,40 +115,17 @@ export type MongoDBEventStoreConsumer< }> : object); -type MessageArrayElement = `messages.${string}`; -type UpdateDescription = { - updateDescription: { - updatedFields: Record & { - 'metadata.streamPosition': number; - 'metadata.updatedAt': Date; - }; - }; -}; -type FullDocument< - EventType extends Event = Event, - EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata, - T extends EventStream = EventStream, -> = { - fullDocument: T; -}; -type OplogChange< - EventType extends Event = Event, - EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata, - T extends EventStream = EventStream, -> = - | FullDocument - | UpdateDescription>; - export type MongoDBConsumerHandlerContext = { client?: MongoClient; }; + export const mongoDBMessagesConsumer = < ConsumerMessageType extends Message = AnyMessage, MessageMetadataType extends - MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, HandlerContext extends MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext, - CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, + CheckpointType = MongoDBResumeToken, >( options: MongoDBConsumerOptions< ConsumerMessageType, @@ -162,12 +135,7 @@ export const mongoDBMessagesConsumer = < >, ): MongoDBEventStoreConsumer => { let start: Promise; - let stream: ChangeStream< - EventStream, - MongoDBSubscriptionDocument< - EventStream - > - >; + let stream: MongoDBSubscription; let isRunning = false; let runningPromise = new CancellationPromise(); const client = @@ -175,10 +143,6 @@ export const mongoDBMessagesConsumer = < ? options.client : new MongoClient(options.connectionString, options.clientOptions); const processors = options.processors ?? []; - const subscribe = _subscribe( - options.changeStreamFullDocumentPolicy, - client.db(), - ); return { consumerId: options.consumerId ?? uuid(), @@ -237,56 +201,19 @@ export const mongoDBMessagesConsumer = < const startFrom = zipMongoDBMessageBatchPullerStartFrom(positions); - stream = subscribe(startFrom); - - void (async () => { - while (!stream.closed && isRunning) { - const hasNext = await Promise.race([ - stream.hasNext(), - runningPromise, - ]); - - if (hasNext === null) { - break; - } - - if (!hasNext) { - continue; - } - - const change = await stream.next(); - const resumeToken = change._id; - const typedChange = change as OplogChange; - const streamChange = - 'updateDescription' in typedChange - ? { - messages: Object.entries( - typedChange.updateDescription.updatedFields, - ) - .filter(([key]) => key.startsWith('messages.')) - .map(([, value]) => value as ReadEvent), - } - : typedChange.fullDocument; - - if (!streamChange) { - return; - } - - const messages = streamChange.messages.map((message) => { - return { - kind: message.kind, - type: message.type, - data: message.data, - metadata: { - ...message.metadata, - globalPosition: resumeToken, - }, - } as unknown as RecordedMessage< - ConsumerMessageType, - MessageMetadataType - >; - }); - + stream = mongoDBSubscription< + ConsumerMessageType, + MessageMetadataType, + CheckpointType + >({ + client, + from: startFrom, + eachBatch: async ( + messages: RecordedMessage< + ConsumerMessageType, + MessageMetadataType + >[], + ) => { for (const processor of processors.filter( ({ isActive }) => isActive, )) { @@ -294,24 +221,31 @@ export const mongoDBMessagesConsumer = < client, } as Partial); } - } + }, + }); + + // TODO: Remember to fix. + const policy = (await generateVersionPolicies(options.client?.db()!)) + .changeStreamFullDocumentValuePolicy; - console.log('END'); - })(); + await stream.start({ + getFullDocumentValue: policy, + startFrom, + }); })(); return start; }, stop: async () => { - if (stream) { - await stream.close(); + if (stream.isRunning) { + await stream.stop(); isRunning = false; runningPromise.resolve(null); } }, close: async () => { - if (stream) { - await stream.close(); + if (stream.isRunning) { + await stream.stop(); isRunning = false; runningPromise.resolve(null); } @@ -322,7 +256,7 @@ export const mongoDBMessagesConsumer = < export const mongoDBChangeStreamMessagesConsumer = < ConsumerMessageType extends Message = AnyMessage, MessageMetadataType extends - MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, HandlerContext extends MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext, CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata, diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts index 74919e43..4131ef02 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts @@ -84,6 +84,7 @@ export type MongoDBProjectorOptions = // eslint-disable-next-line @typescript-eslint/no-explicit-any const isResumeToken = (value: any): value is MongoDBResumeToken => + typeof value === 'object' && '_data' in value && // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access typeof value._data === 'string'; @@ -91,7 +92,7 @@ const isResumeToken = (value: any): value is MongoDBResumeToken => export const getCheckpoint = < MessageType extends AnyMessage = AnyMessage, // eslint-disable-next-line @typescript-eslint/no-explicit-any - CheckpointType = any, + CheckpointType = MongoDBCheckpointer, MessageMetadataType extends ReadEventMetadataWithGlobalPosition = ReadEventMetadataWithGlobalPosition, >( diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts index c6ded0a5..f269c8a7 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts @@ -1,16 +1,22 @@ import { + asyncRetry, EmmettError, IllegalStateError, + JSONParser, + type AnyMessage, type AsyncRetryOptions, type BatchRecordedMessageHandlerWithoutContext, type CurrentMessageProcessorPosition, type Event, type Message, - type ReadEventMetadataWithGlobalPosition, + type MessageHandlerResult, + type ReadEvent, + type RecordedMessage, + type RecordedMessageMetadata, } from '@event-driven-io/emmett'; import { + ChangeStream, Timestamp, - type ChangeStreamDeleteDocument, type ChangeStreamInsertDocument, type ChangeStreamReplaceDocument, type ChangeStreamUpdateDocument, @@ -18,30 +24,38 @@ import { type Document, type MongoClient, } from 'mongodb'; -import type { EventStream } from '../../mongoDBEventStore'; +import { pipeline, Transform, Writable, type WritableOptions } from 'stream'; +import type { MongoDBRecordedMessageMetadata } from '../../event'; +import type { + EventStream, + MongoDBReadEventMetadata, +} from '../../mongoDBEventStore'; import { isMongoDBResumeToken, type MongoDBResumeToken } from './types'; -export type MongoDBSubscriptionOptions = - { - // from?: MongoDBEventStoreConsumerType; - client: MongoClient; - batchSize: number; - eachBatch: BatchRecordedMessageHandlerWithoutContext< - MessageType, - ReadEventMetadataWithGlobalPosition - >; - resilience?: { - resubscribeOptions?: AsyncRetryOptions; - }; +export type MongoDBSubscriptionOptions< + MessageType extends Message = Message, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + CheckpointType = MongoDBResumeToken, +> = { + from?: CurrentMessageProcessorPosition; + client: MongoClient; + batchSize: number; + eachBatch: BatchRecordedMessageHandlerWithoutContext< + MessageType, + MessageMetadataType + >; + resilience?: { + resubscribeOptions?: AsyncRetryOptions; }; +}; export type ChangeStreamFullDocumentValuePolicy = () => | 'whenAvailable' | 'updateLookup'; export type MongoDBSubscriptionDocument = | ChangeStreamInsertDocument | ChangeStreamUpdateDocument - | ChangeStreamReplaceDocument - | ChangeStreamDeleteDocument; + | ChangeStreamReplaceDocument; // https://www.mongodb.com/docs/manual/reference/command/buildInfo/ export type BuildInfo = { version: string; @@ -59,16 +73,180 @@ export type BuildInfo = { storageEngines: string[]; ok: number; }; -export type MongoDBSubscriptionStartFrom = - CurrentMessageProcessorPosition; +export type MongoDBSubscriptionStartFrom = + CurrentMessageProcessorPosition; + +export type MongoDBSubscriptionStartOptions = + { + startFrom: MongoDBSubscriptionStartFrom; + getFullDocumentValue: ChangeStreamFullDocumentValuePolicy; + dbName?: string; + }; + +export type MongoDBSubscription = { + isRunning: boolean; + start(options: MongoDBSubscriptionStartOptions): Promise; + stop(): Promise; +}; -export type MongoDBSubscriptionStartOptions = { - startFrom: MongoDBSubscriptionStartFrom; +export type StreamSubscription< + EventType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, +> = ChangeStream< + EventStream, MessageMetadataType>, + MongoDBSubscriptionDocument< + EventStream, RecordedMessageMetadata> + > +>; +export type MessageArrayElement = `messages.${string}`; +export type UpdateDescription = { + _id: MongoDBResumeToken; + operationType: 'update'; + updateDescription: { + updatedFields: Record & { + 'metadata.streamPosition': number; + 'metadata.updatedAt': Date; + }; + }; +}; +export type FullDocument< + EventType extends Event = Event, + EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata, + T extends EventStream = EventStream, +> = { + _id: MongoDBResumeToken; + operationType: 'insert'; + fullDocument: T; }; +export type OplogChange< + EventType extends Message = AnyMessage, + EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata, + T extends EventStream = EventStream< + Extract, + EventMetaDataType + >, +> = + | FullDocument, EventMetaDataType, T> + | UpdateDescription< + ReadEvent, EventMetaDataType> + >; + +type SubscriptionSequentialHandlerOptions< + MessageType extends AnyMessage = AnyMessage, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + CheckpointType = MongoDBResumeToken, +> = MongoDBSubscriptionOptions< + MessageType, + MessageMetadataType, + CheckpointType +> & + WritableOptions; + +class SubscriptionSequentialHandler< + MessageType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + CheckpointType = MongoDBResumeToken, +> extends Transform { + private options: SubscriptionSequentialHandlerOptions< + MessageType, + MessageMetadataType, + CheckpointType + >; + // private from: EventStoreDBEventStoreConsumerType | undefined; + public isRunning: boolean; + + constructor( + options: SubscriptionSequentialHandlerOptions< + MessageType, + MessageMetadataType, + CheckpointType + >, + ) { + super({ objectMode: true, ...options }); + this.options = options; + // this.from = options.from; + this.isRunning = true; + } + + async _transform( + change: OplogChange, + _encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ): Promise { + try { + if (!this.isRunning || !change) { + callback(); + return; + } + + const messageCheckpoint = change._id; + const streamChange = + change.operationType === 'insert' + ? change.fullDocument + : change.operationType === 'update' + ? { + messages: Object.entries(change.updateDescription.updatedFields) + .filter(([key]) => key.startsWith('messages.')) + .map(([, value]) => value as ReadEvent), + } + : void 0; + + if (!streamChange) { + return; + } + + const messages = streamChange.messages.map((message) => { + return { + kind: message.kind, + type: message.type, + data: message.data, + metadata: { + ...message.metadata, + globalPosition: messageCheckpoint, + }, + } as unknown as RecordedMessage; + }); + + const result = await this.options.eachBatch(messages); + + if (result && result.type === 'STOP') { + this.isRunning = false; + if (!result.error) this.push(messageCheckpoint); + + this.push(result); + this.push(null); + callback(); + return; + } + + this.push(messageCheckpoint); + callback(); + } catch (error) { + callback(error as Error); + } + } +} const REGEXP = /^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$/; +export const isDatabaseUnavailableError = (error: unknown) => + error instanceof Error && + 'type' in error && + error.type === 'unavailable' && + 'code' in error && + error.code === 14; + +export const MongoDBResubscribeDefaultOptions: AsyncRetryOptions = { + forever: true, + minTimeout: 100, + factor: 1.5, + shouldRetryError: (error) => !isDatabaseUnavailableError(error), +}; + export const parseSemVer = (value: string = '') => { const versions = REGEXP.exec(value); @@ -102,8 +280,7 @@ export const generateVersionPolicies = async (db: Db) => { } else if (major === 5) { return 'updateLookup'; } else { - throw new Error(`Major number is ${major}`); - // throwNotSupportedError(); + throw new EmmettError(`Major number is ${major}`); } }; @@ -115,7 +292,7 @@ export const generateVersionPolicies = async (db: Db) => { // const DEFAULT_PARTITION_KEY_NAME = 'default'; const createChangeStream = < - EventType extends Event = Event, + EventType extends Message = AnyMessage, // eslint-disable-next-line @typescript-eslint/no-explicit-any CheckpointType = any, >( @@ -141,8 +318,10 @@ const createChangeStream = < ]; return db.watch< - EventStream, - MongoDBSubscriptionDocument> + EventStream>, + MongoDBSubscriptionDocument< + EventStream> + > >(pipeline, { fullDocument: getFullDocumentValue(), ...(resumeToken === 'BEGINNING' @@ -168,12 +347,186 @@ const createChangeStream = < const subscribe = (getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, db: Db) => // eslint-disable-next-line @typescript-eslint/no-explicit-any - ( - resumeToken?: CurrentMessageProcessorPosition, + ( + resumeToken?: MongoDBSubscriptionStartFrom, ) => { return createChangeStream(getFullDocumentValue, db, resumeToken); }; +export const mongoDBSubscription = < + MessageType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, + ResumeToken = MongoDBResumeToken, +>({ + client, + from, + batchSize, + eachBatch, + resilience, +}: MongoDBSubscriptionOptions< + MessageType, + MessageMetadataType +>): MongoDBSubscription => { + let isRunning = false; + + let start: Promise; + let processor: SubscriptionSequentialHandler< + MessageType, + MessageMetadataType + >; + + let subscription: StreamSubscription; + + const resubscribeOptions: AsyncRetryOptions = + resilience?.resubscribeOptions ?? { + ...MongoDBResubscribeDefaultOptions, + shouldRetryResult: () => isRunning, + shouldRetryError: (error) => + isRunning && MongoDBResubscribeDefaultOptions.shouldRetryError!(error), + }; + + const stopSubscription = async (callback?: () => void): Promise => { + isRunning = false; + if (processor) processor.isRunning = false; + + if (subscription.closed) { + return new Promise((resolve, reject) => { + try { + callback?.(); + resolve(); + } catch (error) { + reject(error); + } + }); + } else { + try { + await subscription.close(); + } catch (error) { + throw error; + } finally { + callback?.(); + } + } + }; + + const pipeMessages = ( + options: MongoDBSubscriptionStartOptions, + ) => { + let retry = 0; + + return asyncRetry( + () => + new Promise((resolve, reject) => { + console.info( + `Starting subscription. ${retry++} retries. From: ${JSONParser.stringify(from ?? '$all')}, Start from: ${JSONParser.stringify( + options.startFrom, + )}`, + ); + subscription = subscribe( + options.getFullDocumentValue, + client.db(options.dbName), + )(options.startFrom); + + processor = new SubscriptionSequentialHandler< + MessageType, + MessageMetadataType + >({ + client, + from, + batchSize, + eachBatch, + resilience, + }); + + const handler = new (class extends Writable { + async _write( + result: MongoDBResumeToken | MessageHandlerResult, + _encoding: string, + done: () => void, + ) { + if (!isRunning) return; + + if (isMongoDBResumeToken(result)) { + options.startFrom = { + lastCheckpoint: result, + }; + done(); + return; + } + + if (result && result.type === 'STOP' && result.error) { + console.error( + `Subscription stopped with error code: ${result.error.errorCode}, message: ${ + result.error.message + }.`, + ); + } + + await stopSubscription(); + done(); + } + })({ objectMode: true }); + + pipeline( + subscription, + processor, + handler, + async (error: Error | null) => { + console.info(`Stopping subscription.`); + await stopSubscription(() => { + if (!error) { + console.info('Subscription ended successfully.'); + resolve(); + return; + } + + if ( + error.message === 'ChangeStream is closed' && + error.name === 'MongoAPIError' + ) { + console.info('Subscription ended successfully.'); + resolve(); + return; + } + + console.error( + `Received error: ${JSONParser.stringify(error)}.`, + ); + reject(error); + }); + }, + ); + + console.log('OK'); + }), + resubscribeOptions, + ); + }; + + return { + get isRunning() { + return isRunning; + }, + start: (options) => { + if (isRunning) return start; + + start = (async () => { + isRunning = true; + const a = pipeMessages(options); + return a; + })(); + + return start; + }, + stop: async () => { + if (!isRunning) return start ? await start : Promise.resolve(); + await stopSubscription(); + await start; + }, + }; +}; + /** * Compares two MongoDB Resume Tokens. * @param token1 Token 1. diff --git a/src/packages/emmett-mongodb/src/eventStore/event.ts b/src/packages/emmett-mongodb/src/eventStore/event.ts index 795282c7..4a098a28 100644 --- a/src/packages/emmett-mongodb/src/eventStore/event.ts +++ b/src/packages/emmett-mongodb/src/eventStore/event.ts @@ -1,4 +1,5 @@ import type { + BigIntStreamPosition, RecordedMessageMetadata, RecordedMessageMetadataWithGlobalPosition, } from '@event-driven-io/emmett'; @@ -9,5 +10,5 @@ export type ReadEventMetadataWithGlobalPosition< > = RecordedMessageMetadataWithGlobalPosition; export type MongoDBRecordedMessageMetadata = RecordedMessageMetadata< MongoDBResumeToken, - undefined + BigIntStreamPosition >; diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts index b3a87af0..023f48ca 100644 --- a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts @@ -126,9 +126,11 @@ void describe('MongoDBEventStore subscription', () => { stopAfter: (event) => { if (event.data.productItem.productId === lastProductItemIdTest1) { messageProcessingPromise1.resolve(); + consumer.stop(); } if (event.data.productItem.productId === lastProductItemIdTest2) { messageProcessingPromise2.resolve(); + consumer.stop(); } return ( @@ -152,8 +154,6 @@ void describe('MongoDBEventStore subscription', () => { }, }); - await consumer.start(); - await eventStore.appendToStream( streamName, [ @@ -185,15 +185,17 @@ void describe('MongoDBEventStore subscription', () => { { expectedStreamVersion: 2n }, ); - await timeoutGuard(() => messageProcessingPromise1); + try { + await consumer.start(); + } catch (err) { + console.error(err); + } const stream = await collection.findOne( { streamName }, { useBigInt64: true }, ); - await consumer.stop(); - assertIsNotNull(stream); assertEqual(3n, stream.metadata.streamPosition); assertEqual(shoppingCartId, stream.metadata.streamId); @@ -202,7 +204,7 @@ void describe('MongoDBEventStore subscription', () => { assertTrue(stream.metadata.updatedAt instanceof Date); }); - void it('should renew after the last event', async () => { + void it.skip('should renew after the last event', async () => { assertTrue(!!processor); assert(processor); From 57fcd848370e9f55f1687f25dd5c021213d01c17 Mon Sep 17 00:00:00 2001 From: Artur Wojnar Date: Wed, 17 Sep 2025 20:15:48 +0200 Subject: [PATCH 17/17] test: tests, eslint, ts fixed --- .../consumers/mongoDBEventsConsumer.ts | 16 +++++--- .../eventStore/consumers/mongoDBProcessor.ts | 1 - .../consumers/subscriptions/index.ts | 26 +++++++------ ...mongoDBEventStore.subscription.e2e.spec.ts | 38 +++++-------------- 4 files changed, 35 insertions(+), 46 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts index c21be3a8..758af5b4 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts @@ -62,9 +62,9 @@ export type MongoDBEventStoreConsumerConfig< CheckpointType > & { // from?: any; - pulling?: { - batchSize?: number; - }; + // pulling?: { + // batchSize?: number; + // }; resilience?: { resubscribeOptions?: AsyncRetryOptions; }; @@ -224,9 +224,15 @@ export const mongoDBMessagesConsumer = < }, }); + const db = options.client?.db?.(); + + if (!db) { + throw new EmmettError('MongoDB client is not connected'); + } + // TODO: Remember to fix. - const policy = (await generateVersionPolicies(options.client?.db()!)) - .changeStreamFullDocumentValuePolicy; + const versionPolicies = await generateVersionPolicies(db); + const policy = versionPolicies.changeStreamFullDocumentValuePolicy; await stream.start({ getFullDocumentValue: policy, diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts index 4131ef02..7551ad82 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts @@ -91,7 +91,6 @@ const isResumeToken = (value: any): value is MongoDBResumeToken => export const getCheckpoint = < MessageType extends AnyMessage = AnyMessage, - // eslint-disable-next-line @typescript-eslint/no-explicit-any CheckpointType = MongoDBCheckpointer, MessageMetadataType extends ReadEventMetadataWithGlobalPosition = ReadEventMetadataWithGlobalPosition, diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts index f269c8a7..8badf3ef 100644 --- a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts @@ -36,11 +36,13 @@ export type MongoDBSubscriptionOptions< MessageType extends Message = Message, MessageMetadataType extends MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata, - CheckpointType = MongoDBResumeToken, + // CheckpointType = MongoDBResumeToken, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + CheckpointType = any, > = { from?: CurrentMessageProcessorPosition; client: MongoClient; - batchSize: number; + // batchSize: number; eachBatch: BatchRecordedMessageHandlerWithoutContext< MessageType, MessageMetadataType @@ -346,7 +348,6 @@ const createChangeStream = < const subscribe = (getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, db: Db) => - // eslint-disable-next-line @typescript-eslint/no-explicit-any ( resumeToken?: MongoDBSubscriptionStartFrom, ) => { @@ -361,7 +362,7 @@ export const mongoDBSubscription = < >({ client, from, - batchSize, + // batchSize, eachBatch, resilience, }: MongoDBSubscriptionOptions< @@ -396,14 +397,18 @@ export const mongoDBSubscription = < callback?.(); resolve(); } catch (error) { - reject(error); + reject( + error instanceof Error + ? error + : typeof error === 'string' + ? new Error(error) + : new Error('Unknown error'), + ); } }); } else { try { await subscription.close(); - } catch (error) { - throw error; } finally { callback?.(); } @@ -434,7 +439,7 @@ export const mongoDBSubscription = < >({ client, from, - batchSize, + // batchSize, eachBatch, resilience, }); @@ -449,7 +454,7 @@ export const mongoDBSubscription = < if (isMongoDBResumeToken(result)) { options.startFrom = { - lastCheckpoint: result, + lastCheckpoint: result as ResumeToken, }; done(); return; @@ -513,8 +518,7 @@ export const mongoDBSubscription = < start = (async () => { isRunning = true; - const a = pipeMessages(options); - return a; + return pipeMessages(options); })(); return start; diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts index 023f48ca..bb0af710 100644 --- a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts @@ -67,24 +67,6 @@ void describe('MongoDBEventStore subscription', () => { quantity: 10, price: 3, }) as PricedProductItem; - const timeoutGuard = async ( - action: () => Promise, - timeoutAfterMs = 1000, - ) => { - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - reject(new Error('timeout')); - clearTimeout(timer); - }, timeoutAfterMs); - - action() - .catch(noop) - .finally(() => { - clearTimeout(timer); - resolve(); - }); - }); - }; before(async () => { mongodb = await new MongoDBContainer('mongo:8.0.10').start(); @@ -126,11 +108,11 @@ void describe('MongoDBEventStore subscription', () => { stopAfter: (event) => { if (event.data.productItem.productId === lastProductItemIdTest1) { messageProcessingPromise1.resolve(); - consumer.stop(); + consumer.stop().catch(noop); } if (event.data.productItem.productId === lastProductItemIdTest2) { messageProcessingPromise2.resolve(); - consumer.stop(); + consumer.stop().catch(noop); } return ( @@ -185,11 +167,7 @@ void describe('MongoDBEventStore subscription', () => { { expectedStreamVersion: 2n }, ); - try { - await consumer.start(); - } catch (err) { - console.error(err); - } + await consumer.start(); const stream = await collection.findOne( { streamName }, @@ -204,7 +182,7 @@ void describe('MongoDBEventStore subscription', () => { assertTrue(stream.metadata.updatedAt instanceof Date); }); - void it.skip('should renew after the last event', async () => { + void it('should renew after the last event', async () => { assertTrue(!!processor); assert(processor); @@ -215,8 +193,6 @@ void describe('MongoDBEventStore subscription', () => { assertIsNotNull(stream); assertEqual(3n, stream.metadata.streamPosition); - await consumer.start(); - const position = await processor.start({ client }); assertTrue(!!position); @@ -230,6 +206,10 @@ void describe('MongoDBEventStore subscription', () => { compareTwoMongoDBTokens(position.lastCheckpoint, lastResumeToken!), ); + const consumerPromise = consumer.start(); + + await new Promise((resolve) => setTimeout(resolve, 1000)); + await eventStore.appendToStream( streamName, [ @@ -241,7 +221,7 @@ void describe('MongoDBEventStore subscription', () => { { expectedStreamVersion: 3n }, ); - await timeoutGuard(() => messageProcessingPromise2); + await consumerPromise; stream = await collection.findOne({ streamName }, { useBigInt64: true }); assertIsNotNull(stream);