Skip to content

Commit 76910bd

Browse files
add backoff and jitter logic into operation retry loop
1 parent c764010 commit 76910bd

File tree

4 files changed

+64
-34
lines changed

4 files changed

+64
-34
lines changed

src/error.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ export const MongoErrorLabel = Object.freeze({
9999
ResetPool: 'ResetPool',
100100
PoolRequestedRetry: 'PoolRequestedRetry',
101101
InterruptInUseConnections: 'InterruptInUseConnections',
102-
NoWritesPerformed: 'NoWritesPerformed'
102+
NoWritesPerformed: 'NoWritesPerformed',
103+
SystemOverloadError: 'SystemOverloadError',
104+
RetryableError: 'RetryableError'
103105
} as const);
104106

105107
/** @public */

src/operations/execute_operation.ts

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { setTimeout } from 'node:timers/promises';
2+
13
import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants';
24
import {
35
isRetryableReadError,
@@ -26,6 +28,7 @@ import {
2628
import type { Topology } from '../sdam/topology';
2729
import type { ClientSession } from '../sessions';
2830
import { TimeoutContext } from '../timeout';
31+
import { RETRY_COST, TOKEN_REFRESH_RATE } from '../token_bucket';
2932
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
3033
import { AggregateOperation } from './aggregate';
3134
import { AbstractOperation, Aspect } from './operation';
@@ -232,11 +235,12 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
232235
session.incrementTransactionNumber();
233236
}
234237

235-
const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
238+
const MAX_ATTEMPTS = 5;
239+
const maxTries = willRetry ? MAX_ATTEMPTS : 1;
236240
let previousOperationError: MongoError | undefined;
237241
let previousServer: ServerDescription | undefined;
238242

239-
for (let tries = 0; tries < maxTries; tries++) {
243+
for (let attempt = 0; attempt < maxTries; attempt++) {
240244
if (previousOperationError) {
241245
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
242246
throw new MongoServerError({
@@ -246,14 +250,40 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
246250
});
247251
}
248252

249-
if (operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) {
253+
const isRetryableError =
254+
// any command with the RetryableError label is retryable
255+
previousOperationError.hasErrorLabel(MongoErrorLabel.RetryableError) ||
256+
// COMMAND_BATCHING commands are retryable depending on the contents of the batches
257+
(operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) ||
258+
(hasWriteAspect && isRetryableWriteError(previousOperationError)) ||
259+
(hasReadAspect && !isRetryableReadError(previousOperationError));
260+
261+
if (!isRetryableError) {
250262
throw previousOperationError;
251263
}
252264

253-
if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
254-
throw previousOperationError;
265+
const isSystemOverloadError = previousOperationError.hasErrorLabel(
266+
MongoErrorLabel.SystemOverloadError
267+
);
268+
if (isSystemOverloadError) {
269+
const delay =
270+
Math.random() *
271+
Math.min(
272+
10_000, // MAX_BACKOFF
273+
100 * 2 ** attempt
274+
);
275+
276+
// short circuit if CSOT would expire while sleeping
277+
if (timeoutContext.csotEnabled() && timeoutContext.remainingTimeMS < delay) {
278+
throw previousOperationError;
279+
}
280+
281+
await setTimeout(delay, undefined, {
282+
// TODO: handle abort signal here?
283+
});
284+
}
255285

256-
if (hasReadAspect && !isRetryableReadError(previousOperationError)) {
286+
if (!topology.tokenBucket.consume(RETRY_COST)) {
257287
throw previousOperationError;
258288
}
259289

@@ -284,19 +314,35 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
284314
operation.server = server;
285315

286316
try {
287-
// If tries > 0 and we are command batching we need to reset the batch.
288-
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
317+
// If attempt > 0 and we are command batching we need to reset the batch.
318+
if (attempt > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
289319
operation.resetBatch();
290320
}
291321

292322
try {
293323
const result = await server.command(operation, timeoutContext);
324+
325+
// On success, deposit the refresh rate into the bucket. If the attempt
326+
// was a retry, also deposit the retry cost to allow for another retry.
327+
topology.tokenBucket.deposit(TOKEN_REFRESH_RATE);
328+
if (attempt > 0) {
329+
topology.tokenBucket.deposit(RETRY_COST);
330+
}
331+
294332
return operation.handleOk(result);
295333
} catch (error) {
296334
return operation.handleError(error);
297335
}
298336
} catch (operationError) {
299337
if (!(operationError instanceof MongoError)) throw operationError;
338+
339+
// If a retry fails with a non-SystemOverloadError, this indicates that the server
340+
// either was not overloaded OR was overloaded but has recovered enough to let this
341+
// request through. Either way, deposit the retry cost to allow for another retry.
342+
if (attempt > 0 && !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadError)) {
343+
topology.tokenBucket.deposit(RETRY_COST);
344+
}
345+
300346
if (
301347
previousOperationError != null &&
302348
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)

src/sdam/topology.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types';
3535
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3636
import type { ClientSession } from '../sessions';
3737
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
38+
import { TokenBucket } from '../token_bucket';
3839
import type { Transaction } from '../transactions';
3940
import {
4041
addAbortListener,
@@ -201,18 +202,15 @@ export type TopologyEvents = {
201202
* @internal
202203
*/
203204
export class Topology extends TypedEventEmitter<TopologyEvents> {
204-
/** @internal */
205205
s: TopologyPrivate;
206-
/** @internal */
207206
waitQueue: List<ServerSelectionRequest>;
208-
/** @internal */
209207
hello?: Document;
210-
/** @internal */
211208
_type?: string;
212209

210+
tokenBucket = new TokenBucket(1000);
211+
213212
client!: MongoClient;
214213

215-
/** @internal */
216214
private connectionLock?: Promise<Topology>;
217215

218216
/** @event */

src/token_bucket.ts

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
class TokenBucket {
1+
export class TokenBucket {
22
private budget: number;
33
constructor(allowance: number) {
44
this.budget = allowance;
@@ -15,22 +15,6 @@ class TokenBucket {
1515
}
1616
}
1717

18-
const TOKEN_REFRESH_RATE = 0.1;
19-
const INITIAL_SIZE = 1000;
20-
const RETRY_COST = 1;
21-
22-
export class RetryTokenBucket {
23-
private bucket = new TokenBucket(INITIAL_SIZE);
24-
25-
onCommandSuccess(retry: boolean) {
26-
this.bucket.deposit(TOKEN_REFRESH_RATE);
27-
28-
if (retry) {
29-
this.bucket.deposit(RETRY_COST);
30-
}
31-
}
32-
33-
canRetry() {
34-
return this.bucket.consume(RETRY_COST);
35-
}
36-
}
18+
export const TOKEN_REFRESH_RATE = 0.1;
19+
export const INITIAL_SIZE = 1000;
20+
export const RETRY_COST = 1;

0 commit comments

Comments
 (0)