Skip to content

Commit f066447

Browse files
committed
A few more improvements
1 parent 5cefaf7 commit f066447

File tree

1 file changed

+17
-15
lines changed

1 file changed

+17
-15
lines changed

internal-packages/run-engine/src/engine/locking.ts

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ export class RunLocker {
175175
routine?: (signal: redlock.RedlockAbortSignal) => Promise<T>
176176
): Promise<T> {
177177
const currentContext = this.asyncLocalStorage.getStore();
178-
const joinedResources = resources.sort().join(",");
178+
const joinedResources = [...resources].sort().join(",");
179179

180180
// Handle overloaded parameters
181181
let actualDuration: number;
@@ -251,7 +251,9 @@ export class RunLocker {
251251
lockId: string,
252252
lockStartTime: number
253253
): Promise<T> {
254-
const joinedResources = resources.sort().join(",");
254+
// Sort resources to ensure consistent lock acquisition order and prevent deadlocks
255+
const sortedResources = [...resources].sort();
256+
const joinedResources = sortedResources.join(",");
255257

256258
// Use configured retry settings with exponential backoff
257259
const { maxRetries, baseDelay, maxDelay, backoffMultiplier, jitterFactor, maxTotalWaitTime } =
@@ -266,14 +268,14 @@ export class RunLocker {
266268
let lastError: Error | undefined;
267269

268270
for (let attempt = 0; attempt <= maxRetries; attempt++) {
269-
const [error, acquiredLock] = await tryCatch(this.redlock.acquire(resources, duration));
271+
const [error, acquiredLock] = await tryCatch(this.redlock.acquire(sortedResources, duration));
270272

271273
if (!error && acquiredLock) {
272274
lock = acquiredLock;
273275
if (attempt > 0) {
274276
this.logger.debug("[RunLocker] Lock acquired after retries", {
275277
name,
276-
resources,
278+
resources: sortedResources,
277279
attempts: attempt + 1,
278280
totalWaitTime: Math.round(totalWaitTime),
279281
});
@@ -287,16 +289,16 @@ export class RunLocker {
287289
if (totalWaitTime >= maxTotalWaitTime) {
288290
this.logger.warn("[RunLocker] Lock acquisition exceeded total wait time limit", {
289291
name,
290-
resources,
292+
resources: sortedResources,
291293
attempts: attempt + 1,
292294
totalWaitTime: Math.round(totalWaitTime),
293295
maxTotalWaitTime,
294296
});
295297
throw new LockAcquisitionTimeoutError(
296-
resources,
298+
sortedResources,
297299
Math.round(totalWaitTime),
298300
attempt + 1,
299-
`Lock acquisition on resources [${resources.join(
301+
`Lock acquisition on resources [${sortedResources.join(
300302
", "
301303
)}] exceeded total wait time limit of ${maxTotalWaitTime}ms`
302304
);
@@ -306,16 +308,16 @@ export class RunLocker {
306308
if (attempt === maxRetries) {
307309
this.logger.warn("[RunLocker] Lock acquisition exhausted all retries", {
308310
name,
309-
resources,
311+
resources: sortedResources,
310312
attempts: attempt + 1,
311313
totalWaitTime: Math.round(totalWaitTime),
312314
lastError: lastError.message,
313315
});
314316
throw new LockAcquisitionTimeoutError(
315-
resources,
317+
sortedResources,
316318
Math.round(totalWaitTime),
317319
attempt + 1,
318-
`Lock acquisition on resources [${resources.join(", ")}] failed after ${
320+
`Lock acquisition on resources [${sortedResources.join(", ")}] failed after ${
319321
attempt + 1
320322
} attempts`
321323
);
@@ -334,14 +336,14 @@ export class RunLocker {
334336
maxDelay
335337
);
336338
const jitter = exponentialDelay * jitterFactor * (Math.random() * 2 - 1); // ±jitterFactor% jitter
337-
const delay = Math.max(0, Math.round(exponentialDelay + jitter));
339+
const delay = Math.min(maxDelay, Math.max(0, Math.round(exponentialDelay + jitter)));
338340

339341
// Update total wait time before delay
340342
totalWaitTime += delay;
341343

342344
this.logger.debug("[RunLocker] Lock acquisition failed, retrying with backoff", {
343345
name,
344-
resources,
346+
resources: sortedResources,
345347
attempt: attempt + 1,
346348
delay,
347349
totalWaitTime: Math.round(totalWaitTime),
@@ -356,7 +358,7 @@ export class RunLocker {
356358
// For other errors (non-retryable), throw immediately
357359
this.logger.error("[RunLocker] Lock acquisition failed with non-retryable error", {
358360
name,
359-
resources,
361+
resources: sortedResources,
360362
attempt: attempt + 1,
361363
error: lastError.message,
362364
errorName: lastError.name,
@@ -390,7 +392,7 @@ export class RunLocker {
390392
// Track active lock
391393
this.activeLocks.set(lockId, {
392394
lockType: name,
393-
resources: resources,
395+
resources: sortedResources,
394396
});
395397

396398
let lockSuccess = true;
@@ -426,7 +428,7 @@ export class RunLocker {
426428
if (releaseError) {
427429
this.logger.warn("[RunLocker] Error releasing lock", {
428430
error: releaseError,
429-
resources,
431+
resources: sortedResources,
430432
lockValue: lock!.value,
431433
});
432434
}

0 commit comments

Comments
 (0)