Skip to content
This repository was archived by the owner on Mar 20, 2023. It is now read-only.

Commit 7979932

Browse files
authored
fix(api): [EventsSubscription] retry for subcsription process - listen / catchup #370 (#373)
* fix(api): EventsSubscription retry for subcsription process * Replace retry from ts-retry-promise lib with retryUntil - own implementation. * Add graceful shutdown of EventsSubscription * add retryTimesPolicy
1 parent b21f090 commit 7979932

File tree

10 files changed

+570
-45
lines changed

10 files changed

+570
-45
lines changed

packages/api/src/module/write/shared/application/events-subscription/events-subscription.fixture.spec.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { v4 as uuid } from 'uuid';
44
import { ApplicationEvent } from '@/module/application-command-events';
55
import { DomainEvent } from '@/module/domain.event';
66
import { PrismaModule } from '@/prisma/prisma.module';
7-
import { AnotherSampleDomainEvent, initWriteTestModule, SampleDomainEvent } from '@/shared/test-utils';
7+
import { AnotherSampleDomainEvent, initWriteTestModule, SampleDomainEvent, sequence } from '@/shared/test-utils';
88
import { EventStreamName } from '@/write/shared/application/event-stream-name.value-object';
99
import { EventsSubscriptionsRegistry } from '@/write/shared/application/events-subscription/events-subscriptions-registry';
1010
import { SharedModule } from '@/write/shared/shared.module';
@@ -139,3 +139,7 @@ export async function initEventsSubscriptionConcurrencyTestFixture(options: Subs
139139
},
140140
};
141141
}
142+
143+
export function expectEventsOrder(events: ApplicationEvent[], expectedNumberOfEvents: number) {
144+
expect(events.map((x) => x.globalOrder)).toStrictEqual(sequence(expectedNumberOfEvents).map((x) => x + 1));
145+
}

packages/api/src/module/write/shared/application/events-subscription/events-subscription.spec.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import {
1111
sampleDomainEventType2,
1212
sequence,
1313
} from '@/shared/test-utils';
14-
import { using } from '@/shared/using';
14+
import { retryTimesPolicy } from '@/shared/utils/retry-until';
15+
import { using } from '@/shared/utils/using';
1516
import { EventStreamName } from '@/write/shared/application/event-stream-name.value-object';
1617
import { EventsSubscription } from '@/write/shared/application/events-subscription/events-subscription';
1718

1819
import {
20+
expectEventsOrder,
1921
initEventsSubscriptionConcurrencyTestFixture,
2022
initTestEventsSubscription,
2123
} from './events-subscription.fixture.spec';
@@ -38,7 +40,14 @@ describe('Events subscription', () => {
3840
subscription = sut.eventsSubscriptions
3941
.subscription(sut.randomUuid(), {
4042
start: { from: { globalPosition: 1 } },
41-
retry: { retries: 3, backoff: 'LINEAR', delay: 50 },
43+
retry: {
44+
backoff: 'FIXED',
45+
delay: 50,
46+
maxBackoff: 1000,
47+
resetBackoffAfter: 6 * 1000,
48+
until: retryTimesPolicy(3),
49+
},
50+
queue: { maxRetryCount: 5, waitingTimeOnRetry: 50 },
4251
})
4352
.onInitialPosition(onInitialPosition)
4453
.onEvent<SampleDomainEvent>('SampleDomainEvent', onSampleDomainEvent)
@@ -309,7 +318,7 @@ describe('Events subscription concurrency tests', () => {
309318
subscriptionId: sut.subscriptionId,
310319
position: numberOfEvents,
311320
});
312-
expect(processedEvents.map((x) => x.globalOrder)).toStrictEqual(sequence(numberOfEvents).map((x) => x + 1));
321+
expectEventsOrder(processedEvents, numberOfEvents);
313322
});
314323
});
315324
});

packages/api/src/module/write/shared/application/events-subscription/events-subscription.ts

Lines changed: 82 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
import { Logger } from '@nestjs/common';
33
import { EventEmitter2 } from '@nestjs/event-emitter';
44
import { PrismaClient } from '@prisma/client';
5-
import { retry, RetryConfig, wait } from 'ts-retry-promise';
5+
import { wait } from 'ts-retry-promise';
66

77
import { ApplicationEvent } from '@/module/application-command-events';
88
import { DomainEvent } from '@/module/domain.event';
99
import { PrismaService } from '@/prisma/prisma.service';
10+
import { NotificationToken } from '@/shared/utils/notification-token';
11+
import { retryUntil } from '@/shared/utils/retry-until';
1012
import { EventRepository } from '@/write/shared/application/event-repository';
1113

1214
import { OrderedEventQueue } from './ordered-event-queue';
@@ -17,14 +19,20 @@ export type EventsSubscriptionConfig = {
1719
readonly eventHandlers: ApplicationEventHandler[];
1820
};
1921

20-
type SubscriptionRetriesConfig = Pick<RetryConfig<unknown>, 'retries' | 'delay' | 'backoff'>;
21-
type SubscriptionStartOptions = { from: { globalPosition: number } };
22-
type SubscriptionQueueOptions = { maxRetryCount: number; waitingTimeOnRetry: number };
22+
type SubscriptionRetriesConfig = {
23+
delay: number;
24+
backoff: 'FIXED' | 'EXPONENTIAL' | 'LINEAR';
25+
maxBackoff: number;
26+
resetBackoffAfter: number;
27+
until?: (e: Error | void) => boolean;
28+
};
29+
type SubscriptionStartConfig = { from: { globalPosition: number } };
30+
type SubscriptionQueueConfig = { maxRetryCount: number; waitingTimeOnRetry: number };
2331

2432
export type SubscriptionOptions = {
25-
start: SubscriptionStartOptions;
26-
queue: SubscriptionQueueOptions;
27-
retry?: SubscriptionRetriesConfig;
33+
readonly start: SubscriptionStartConfig;
34+
readonly queue: SubscriptionQueueConfig;
35+
readonly retry: SubscriptionRetriesConfig;
2836
};
2937

3038
export type PrismaTransactionManager = Omit<PrismaClient, '$connect' | '$disconnect' | '$on' | '$transaction' | '$use'>;
@@ -65,6 +73,10 @@ export class EventsSubscription {
6573

6674
private readonly queue = new OrderedEventQueue();
6775

76+
private shutdownToken = new NotificationToken();
77+
78+
private running = false;
79+
6880
private readonly eventEmitterListener: (_: unknown, event: ApplicationEvent) => void = (
6981
_: unknown,
7082
event: ApplicationEvent,
@@ -89,43 +101,36 @@ export class EventsSubscription {
89101
* See handleEvent for more details how handling event working.
90102
*/
91103
async start(): Promise<void> {
92-
const retryConfig: Partial<RetryConfig<void>> = {
93-
...(this.configuration.options?.retry ?? {
94-
retries: 'INFINITELY',
95-
backoff: 'EXPONENTIAL',
96-
delay: 3000,
97-
}),
98-
logger: (msg) => this.logger.error(msg),
99-
// FIXME: after timeout EventsSubscription will stop processing
100-
timeout: 2147483647, // max timeout => max 32-bit signed integer
101-
};
104+
if (this.running) {
105+
this.logger.warn(`${this.subscriptionId} is running`);
102106

103-
this.eventEmitter.onAny(this.eventEmitterListener);
107+
return;
108+
}
104109

105-
// FIXME: it will work for 2147483647/(1000*60*60*24) = 24.85 days :D
106-
retry(async () => {
107-
await this.catchUp().catch((e) => {
108-
this.logger.warn(`EventsSubscription ${this.subscriptionId} processing error in CatchUp phase.`, e);
109-
throw e;
110-
});
111-
await this.listen().catch(async (e) => {
112-
this.logger.warn(`EventsSubscription ${this.subscriptionId} processing error in listen phase.`, e);
113-
throw e;
114-
});
115-
}, retryConfig).catch((e) =>
116-
this.logger.error(
117-
`EventsSubscription ${this.subscriptionId} stopped processing of events after ${retryConfig.retries} retries.`,
118-
e,
119-
),
120-
);
110+
this.logger.debug(`${this.subscriptionId} started`);
111+
this.queue.clear();
112+
this.eventEmitter.onAny(this.eventEmitterListener);
113+
this.shutdownToken.reset();
114+
this.running = true;
115+
this.run();
121116
}
122117

123118
/**
124119
* Stops listening for new events.
125120
*/
126121
async stop(): Promise<void> {
122+
if (!this.running) {
123+
this.logger.warn(`${this.subscriptionId} finished`);
124+
125+
return;
126+
}
127+
128+
this.logger.debug(`${this.subscriptionId} shutdown has been requested`);
127129
this.eventEmitter.offAny(this.eventEmitterListener);
128-
this.queue.clear();
130+
this.queue.stop();
131+
this.running = false;
132+
await this.shutdownToken.wait(60 * 1000).catch((e: Error) => this.logger.warn(e, e?.stack));
133+
this.logger.debug(`${this.subscriptionId} finished`);
129134
}
130135

131136
/**
@@ -143,6 +148,8 @@ export class EventsSubscription {
143148
let event = await this.queue.pop();
144149

145150
while (!OrderedEventQueue.isStopToken(event)) {
151+
this.logger.debug(`${this.subscriptionId} recived event(${event.id},${event.globalOrder})`);
152+
146153
if (await this.globalOrderIsPreserved(event)) {
147154
await this.handleEvent(event);
148155
} else {
@@ -170,14 +177,16 @@ export class EventsSubscription {
170177
const subscriptionState = await this.prismaService.eventsSubscription.findUnique({
171178
where: { id: this.subscriptionId },
172179
});
180+
181+
this.eventsRetryCount.clear();
182+
this.queue.clear();
183+
173184
const eventsToCatchup = await this.eventRepository.readAll({
174185
fromGlobalPosition: subscriptionState?.currentPosition
175186
? subscriptionState.currentPosition + 1
176187
: this.configuration.options.start.from.globalPosition,
177188
});
178189

179-
this.eventsRetryCount.clear();
180-
this.queue.clear();
181190
eventsToCatchup.forEach((event) => this.queue.push(event));
182191
}
183192

@@ -209,7 +218,6 @@ export class EventsSubscription {
209218
}
210219

211220
async retryWithWaitingForCorrectOrder(event: ApplicationEvent): Promise<void> {
212-
// TODO cancel wait when subscriber recive stop signal
213221
const count = this.eventsRetryCount.get(event.id) ?? 0;
214222

215223
if (count >= this.configuration.options.queue.maxRetryCount) {
@@ -220,6 +228,7 @@ export class EventsSubscription {
220228

221229
this.queue.push(event);
222230
this.eventsRetryCount.set(event.id, count + 1);
231+
// TODO cancel wait when subscriber recive stop signal
223232
await wait(this.configuration.options.queue.waitingTimeOnRetry);
224233
}
225234

@@ -260,4 +269,39 @@ export class EventsSubscription {
260269
private handlingEventTypes() {
261270
return this.configuration.eventHandlers.map((h) => h.eventType);
262271
}
272+
273+
private async run() {
274+
try {
275+
await retryUntil(
276+
async () => {
277+
this.logger.debug(`${this.subscriptionId} is starting CatchUp phase.`);
278+
await this.catchUp().catch((e) => {
279+
this.logger.warn(`${this.subscriptionId} processing error in CatchUp phase.`);
280+
throw e;
281+
});
282+
this.logger.debug(`${this.subscriptionId} finished CatchUp phase.`);
283+
284+
this.logger.debug(`${this.subscriptionId} is listening.`);
285+
await this.listen().catch(async (e) => {
286+
this.logger.warn(`${this.subscriptionId} processing error in listen phase.`);
287+
throw e;
288+
});
289+
this.logger.debug(`${this.subscriptionId} finished listen phase.`);
290+
},
291+
{
292+
...this.configuration.options.retry,
293+
until: this.configuration.options.retry.until ?? (() => this.running),
294+
logger: (msg) => this.logger.warn(`${this.subscriptionId} ${msg}`),
295+
},
296+
);
297+
} catch (e) {
298+
this.logger.error(
299+
`${this.subscriptionId} stopped processing events with unexpected error`,
300+
(e as Error)?.stack,
301+
e,
302+
);
303+
} finally {
304+
this.shutdownToken.notify();
305+
}
306+
}
263307
}

packages/api/src/module/write/shared/application/events-subscription/events-subscriptions-registry.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ export class EventsSubscriptionsRegistry implements CanCreateSubscription {
3232
maxRetryCount: env.SUBSCRIPTION_QUEUE_MAX_RETRY_COUNT,
3333
waitingTimeOnRetry: env.SUBSCRIPTION_QUEUE_WAITING_TIME_ON_RETRY_MS,
3434
},
35+
retry: {
36+
backoff: 'EXPONENTIAL',
37+
maxBackoff: 60 * 1000,
38+
resetBackoffAfter: 5 * 60 * 1000,
39+
delay: 10,
40+
},
3541
};
3642
const startConfig: SubscriptionOptions = {
3743
...defaultSubscriptionConfig,
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
1+
import { Injectable, OnApplicationShutdown, OnModuleInit } from '@nestjs/common';
22
import { PrismaClient } from '@prisma/client';
33

44
@Injectable()
5-
export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy {
5+
export class PrismaService extends PrismaClient implements OnModuleInit, OnApplicationShutdown {
66
async onModuleInit() {
77
await this.$connect();
88
}
99

10-
async onModuleDestroy() {
10+
async onApplicationShutdown() {
1111
await this.$disconnect();
1212
}
1313
}

0 commit comments

Comments
 (0)