Skip to content

Commit 2cd352e

Browse files
use token bucket and apply exponential backoff in retry loop
1 parent 6b81ec4 commit 2cd352e

File tree

2 files changed

+116
-52
lines changed

2 files changed

+116
-52
lines changed

src/operations/execute_operation.ts

Lines changed: 113 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { setTimeout } from 'node:timers/promises';
2+
13
import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants';
24
import {
35
isRetryableReadError,
@@ -10,6 +12,7 @@ import {
1012
MongoInvalidArgumentError,
1113
MongoNetworkError,
1214
MongoNotConnectedError,
15+
MongoOperationTimeoutError,
1316
MongoRuntimeError,
1417
MongoServerError,
1518
MongoTransactionError,
@@ -26,6 +29,7 @@ import {
2629
import type { Topology } from '../sdam/topology';
2730
import type { ClientSession } from '../sessions';
2831
import { TimeoutContext } from '../timeout';
32+
import { RETRY_COST, TOKEN_REFRESH_RATE } from '../token_bucket';
2933
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
3034
import { AggregateOperation } from './aggregate';
3135
import { AbstractOperation, Aspect } from './operation';
@@ -232,71 +236,138 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
232236
session.incrementTransactionNumber();
233237
}
234238

235-
const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
239+
// The maximum number of retry attempts using regular retryable reads/writes logic (not including
240+
// SystemOverLoad error retries).
241+
const maxNonOverloadRetryAttempts = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
236242
let previousOperationError: MongoError | undefined;
237243
let previousServer: ServerDescription | undefined;
244+
const nonOverloadRetryAttempt = 0;
245+
246+
let systemOverloadRetryAttempt = 0;
247+
const maxSystemOverloadRetryAttempts = 5;
238248

239-
for (let tries = 0; tries < maxTries; tries++) {
249+
while (true) {
240250
if (previousOperationError) {
241-
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
242-
throw new MongoServerError({
243-
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
244-
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
245-
originalError: previousOperationError
251+
if (previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadError)) {
252+
systemOverloadRetryAttempt += 1;
253+
254+
// if the SystemOverloadError is not retryable, throw.
255+
if (!previousOperationError.hasErrorLabel(MongoErrorLabel.RetryableError)) {
256+
throw previousOperationError;
257+
}
258+
259+
// if we have exhausted overload retry attempts, throw.
260+
if (systemOverloadRetryAttempt > maxSystemOverloadRetryAttempts) {
261+
throw previousOperationError;
262+
}
263+
264+
const delayMS =
265+
Math.random() *
266+
Math.min(
267+
10_000, // MAX_BACKOFF,
268+
100 * 2 ** systemOverloadRetryAttempt
269+
);
270+
271+
// if the delay would exhaust the CSOT timeout, short-circuit.
272+
if (timeoutContext.csotEnabled() && delayMS > timeoutContext.remainingTimeMS) {
273+
// TODO: is this the right error to throw?
274+
throw new MongoOperationTimeoutError(
275+
`MongoDB SystemOverload exponential backoff would exceed timeoutMS deadline: remaining CSOT deadline=${timeoutContext.remainingTimeMS}, backoff delayMS=${delayMS}`,
276+
{
277+
cause: previousOperationError
278+
}
279+
);
280+
}
281+
282+
await setTimeout(delayMS);
283+
284+
if (!topology.tokenBucket.consume(RETRY_COST)) {
285+
throw previousOperationError;
286+
}
287+
288+
server = await topology.selectServer(selector, {
289+
session,
290+
operationName: operation.commandName,
291+
previousServer,
292+
signal: operation.options.signal
293+
});
294+
} else {
295+
// we have no more retry attempts, throw.
296+
if (nonOverloadRetryAttempt >= maxNonOverloadRetryAttempts) {
297+
throw previousOperationError;
298+
}
299+
300+
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
301+
throw new MongoServerError({
302+
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
303+
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
304+
originalError: previousOperationError
305+
});
306+
}
307+
308+
if (
309+
(operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) ||
310+
(hasWriteAspect && !isRetryableWriteError(previousOperationError)) ||
311+
(hasReadAspect && !isRetryableReadError(previousOperationError))
312+
) {
313+
throw previousOperationError;
314+
}
315+
316+
if (
317+
previousOperationError instanceof MongoNetworkError &&
318+
operation.hasAspect(Aspect.CURSOR_CREATING) &&
319+
session != null &&
320+
session.isPinned &&
321+
!session.inTransaction()
322+
) {
323+
session.unpin({ force: true, forceClear: true });
324+
}
325+
326+
server = await topology.selectServer(selector, {
327+
session,
328+
operationName: operation.commandName,
329+
previousServer,
330+
signal: operation.options.signal
246331
});
247-
}
248-
249-
if (operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) {
250-
throw previousOperationError;
251-
}
252-
253-
if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
254-
throw previousOperationError;
255-
256-
if (hasReadAspect && !isRetryableReadError(previousOperationError)) {
257-
throw previousOperationError;
258-
}
259-
260-
if (
261-
previousOperationError instanceof MongoNetworkError &&
262-
operation.hasAspect(Aspect.CURSOR_CREATING) &&
263-
session != null &&
264-
session.isPinned &&
265-
!session.inTransaction()
266-
) {
267-
session.unpin({ force: true, forceClear: true });
268-
}
269-
270-
server = await topology.selectServer(selector, {
271-
session,
272-
operationName: operation.commandName,
273-
previousServer,
274-
signal: operation.options.signal
275-
});
276332

277-
if (hasWriteAspect && !supportsRetryableWrites(server)) {
278-
throw new MongoUnexpectedServerResponseError(
279-
'Selected server does not support retryable writes'
280-
);
333+
if (hasWriteAspect && !supportsRetryableWrites(server)) {
334+
throw new MongoUnexpectedServerResponseError(
335+
'Selected server does not support retryable writes'
336+
);
337+
}
281338
}
282339
}
283340

284341
operation.server = server;
285342

286343
try {
287-
// If tries > 0 and we are command batching we need to reset the batch.
288-
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
344+
// If attempt > 0 and we are command batching we need to reset the batch.
345+
if (nonOverloadRetryAttempt > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
289346
operation.resetBatch();
290347
}
291348

292349
try {
293350
const result = await server.command(operation, timeoutContext);
351+
const isRetry = nonOverloadRetryAttempt > 0 || systemOverloadRetryAttempt > 0;
352+
topology.tokenBucket.deposit(
353+
isRetry
354+
? // on successful retry, deposit the retry cost + the refresh rate.
355+
TOKEN_REFRESH_RATE + RETRY_COST
356+
: // otherwise, just deposit the refresh rate.
357+
TOKEN_REFRESH_RATE
358+
);
294359
return operation.handleOk(result);
295360
} catch (error) {
296361
return operation.handleError(error);
297362
}
298363
} catch (operationError) {
299364
if (!(operationError instanceof MongoError)) throw operationError;
365+
366+
if (!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadError)) {
367+
// if an operation fails with an error that does not contain the SystemOverloadError, deposit 1 token.
368+
topology.tokenBucket.deposit(RETRY_COST);
369+
}
370+
300371
if (
301372
previousOperationError != null &&
302373
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
@@ -310,9 +381,4 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
310381
timeoutContext.clear();
311382
}
312383
}
313-
314-
throw (
315-
previousOperationError ??
316-
new MongoRuntimeError('Tried to propagate retryability error, but no error was found.')
317-
);
318384
}

src/sdam/topology.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types';
3535
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3636
import type { ClientSession } from '../sessions';
3737
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
38+
import { TokenBucket } from '../token_bucket';
3839
import type { Transaction } from '../transactions';
3940
import {
4041
addAbortListener,
@@ -201,18 +202,15 @@ export type TopologyEvents = {
201202
* @internal
202203
*/
203204
export class Topology extends TypedEventEmitter<TopologyEvents> {
204-
/** @internal */
205205
s: TopologyPrivate;
206-
/** @internal */
207206
waitQueue: List<ServerSelectionRequest>;
208-
/** @internal */
209207
hello?: Document;
210-
/** @internal */
211208
_type?: string;
212209

210+
tokenBucket = new TokenBucket(1000);
211+
213212
client!: MongoClient;
214213

215-
/** @internal */
216214
private connectionLock?: Promise<Topology>;
217215

218216
/** @event */

0 commit comments

Comments
 (0)