Skip to content

Conversation

arturwojnar
Copy link

@arturwojnar arturwojnar commented Jul 31, 2025

This PR intends to provide a consumer relying on the MongoDB Change Streams.

Example of what caused me thinking about this feature 👇

switch (type) {
  case 'B2BInstitutionUsedLicencesIncreased':
    if (state._disabled) {
      throw new InstitutionInvalidState({
        details: { currentState: 'deleted' },
        message: `Cannot use a licence of a disabled institution.`,
      })
    }

    // Hack.
    const licenceId = generateLicenceId()
    await getEventStore().appendToStream<LicenceEvent>(
      toStreamName(LICENCES_STREAM_TYPE, event.patientId.toString()),
      [
        literalObject<B2BLincenceObtained>({
          type: 'B2BLincenceObtained',
          metadata: {
            patientId: event.patientId,
            institutionId: metadata.institutionId,
            licenceId,
          },
          data: { obtainedAt: event.timestamp, duration: state.licenceDuration as Duration },
        }),
      ],
    )

    return literalObject<B2BInstitution>({
      ...state,
      usedLicences: state.usedLicences + 1,
      availableLicences: state.availableLicences - 1,
    })
}

☝️ This is clipping of a projection's evolve function. As you see, to achieve the reactiveness I did something very ugly 🤮 - I converted the evolve into an asynchronous function and on processing the B2BInstitutionUsedLicencesIncreased event, I append to a stream another event - B2BLincenceObtained.

The latter event is a consequence of the first, but it should not be implicitly and directly coupled to the first one. Especially, if that happens in the projection's internals.

So, the reactiveness is the clue of this PR.

MongoDB brings the Change Stream functionality. This is a simple pull mechanism on MongoDB's oplog.

By subscribing to the Change Stream and storing last processed message's position (here's called a token) we implement, in fact, a simple version of the outbox pattern.

Here's the fixed version:

consumer.reactor<B2BInstitutionUsedLicencesIncreased>({
  processorId: v4(),
  eachMessage: async (event) => {
    await getEventStore().appendToStream<LicenceEvent>(
      toStreamName(LICENCES_STREAM_TYPE, event.patientId.toString()),
      [
        literalObject<B2BLincenceObtained>({
          type: 'B2BLincenceObtained',
          metadata: {
            patientId: event.patientId,
            institutionId: metadata.institutionId,
            licenceId,
          },
          data: { obtainedAt: event.timestamp, duration: state.licenceDuration },
        }),
      ],
    )
  },
  connectionOptions: {
    client,
  },
});

@arturwojnar arturwojnar marked this pull request as draft July 31, 2025 08:31
options: { processorId: string; partition?: string; collectionName?: string },
): Promise<ReadProcessorCheckpointResult> => {
const result = await client
.db()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUGGESTION: Here, we'll also need to provide an option to pass db name through options.

};
};

export const generateVersionPolicies = async (db: Db) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QUESTION: Could you explain more on why is it needed and provide some reference links for that?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It checks the MongoDB version and returns a version-dependent configuration. Hm, actually, there is a difference between versions 5 and 6. What is your minimum support version?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minimum is MongoDB 4 for now.

event.data.productItem.productId,
);

if (event.data.productItem.productId === lastProductItemId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUGGESTION: You can use a built-in stop after capabilities here to make this check easier. See

}
| { success: false; reason: 'IGNORED' | 'MISMATCH' };

export const storeProcessorCheckpoint = async <Position extends string | null>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUGGESTION: It'd be great to add some integration tests for that.

Copy link
Collaborator

@oskardudycz oskardudycz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arturwojnar thanks for doing it!

I did a first round of review, I added some questions/comments/suggestions but overall, it looks like a great first step, after clarifications, adding more tests I'll be happy to merge it!

It'd be great also if you could update descriptions with the highlights of this PR.

@oskardudycz
Copy link
Collaborator

@arturwojnar could you ensure that linter, build and tests are passing? See: https://github.com/event-driven-io/emmett/actions/runs/16644021116/job/47194905100?pr=258. Thanks in advance!

Copy link
Collaborator

@alex-laycalvert alex-laycalvert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good start based off of the other consumers. Nothing that @oskardudycz hasn't already mentioned. I think some of the boillerplate can be removed since the addition of centralized consumer types and stuff.

Probably want to add some documentation that this consumer implementation requires change streams be enabled and can't be used in single-instance environments


stream = subscribe<Event, CheckpointType>(startFrom);

void (async () => {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oskardudycz I do it this way, let's say sequentially, one by one, instead relying on stream events (stream.on('data')).

It is because, as it is done in tests for three following stream appends, the value of the lastCheckpoint (processors.ts) doesn't have time to change because of lasting I/O operations. That causes that storeProcessorCheckpoint throws MISMATCH and checkpoints get no persisted.

This solution ain't perfect, though, because there is still possibility for the race because at line 293 we have asynchronous handling of a processor function.

In my Hermes repo I handle this case by implementing a queue of messages to ACK, so messages can be finished processing out of order but still they will be stored / ACK in order.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arturwojnar check ESDB implementation, there it’s handled with custom transform and callbacks

This gives a guarantee of sequential processing and passing it to producer. That should eliminate getting out of order and race conditions. I’m considering to also apply the similar pattern inside processors.

}
},
close: async () => {
if (stream) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as stop... :/

},
stop: async () => {
if (stream.isRunning) {
await stream.stop();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not the case for the esdb..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants