Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ Minimum: `1`

The max number of runs in the given interval of time.

Can be changed later by setting `queue.intervalCap`. If changed in the middle of a running interval, the new cap applies immediately. If the new cap is lower than the used cap, the "debt" is not carried over into the next interval.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Debt is not carried over into the next interval" is bit unclear. Say exactly what happens:

If the new cap is lower than the number already run in the current interval, no more tasks start until the next interval begins.


##### interval

Type: `number`\
Expand Down
34 changes: 27 additions & 7 deletions source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ Promise queue with concurrency control.
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter<EventName> { // eslint-disable-line @typescript-eslint/naming-convention
readonly #carryoverConcurrencyCount: boolean;

readonly #isIntervalIgnored: boolean;

#intervalCount = 0;

readonly #intervalCap: number;
#intervalCap: number;

#rateLimitedInInterval = false;
#rateLimitFlushScheduled = false;
Expand Down Expand Up @@ -58,6 +56,12 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
*/
timeout?: number;

static #assertValidIntervalCap(value: unknown): asserts value is number {
if (!(typeof value === 'number' && value >= 1)) {
throw new TypeError(`Expected \`intervalCap\` to be a number from 1 and up, got \`${value?.toString() ?? ''}\` (${typeof value})`);
}
}

constructor(options?: Options<QueueType, EnqueueOptionsType>) {
super();

Expand All @@ -72,16 +76,13 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
...options,
} as Options<QueueType, EnqueueOptionsType>;

if (!(typeof options.intervalCap === 'number' && options.intervalCap >= 1)) {
throw new TypeError(`Expected \`intervalCap\` to be a number from 1 and up, got \`${options.intervalCap?.toString() ?? ''}\` (${typeof options.intervalCap})`);
}
PQueue.#assertValidIntervalCap(options.intervalCap);

if (options.interval === undefined || !(Number.isFinite(options.interval) && options.interval >= 0)) {
throw new TypeError(`Expected \`interval\` to be a finite number >= 0, got \`${options.interval?.toString() ?? ''}\` (${typeof options.interval})`);
}

this.#carryoverConcurrencyCount = options.carryoverConcurrencyCount!;
this.#isIntervalIgnored = options.intervalCap === Number.POSITIVE_INFINITY || options.interval === 0;
this.#intervalCap = options.intervalCap;
this.#interval = options.interval;
this.#queue = new options.queueClass!();
Expand All @@ -98,6 +99,25 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
this.#setupRateLimitTracking();
}

get #isIntervalIgnored(): boolean {
return this.#intervalCap === Number.POSITIVE_INFINITY || this.#interval === 0;
}

get intervalCap(): number {
return this.#intervalCap;
}

set intervalCap(value: number) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the cap mid-interval must claer ratelimit state. When setting intervalCap = Number.POSITIVE_INFINITY, you switch to “interval ignored.” You do call #processQueue(), but if #rateLimitedInInterval or #rateLimitFlushScheduled are set, they can still throttle execution until the old timeout fires. Reset those flags when the cap becomes infinite, then process.

if (value === this.#intervalCap) {
return;
}

PQueue.#assertValidIntervalCap(value);

this.#intervalCap = value;
this.#processQueue();
}

get #doesIntervalAllowAnother(): boolean {
return this.#isIntervalIgnored || this.#intervalCount < this.#intervalCap;
}
Expand Down
2 changes: 2 additions & 0 deletions source/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ export type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOpt
/**
The max number of runs in the given interval of time.

Can be changed later by setting `queue.intervalCap`. If changed in the middle of a running interval, the new cap applies immediately. If the new cap is lower than the used cap, the "debt" is not carried over into the next interval.

Minimum: `1`.

@default Infinity
Expand Down
3 changes: 3 additions & 0 deletions test-d/index.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ import PQueue from '../source/index.js';
const queue = new PQueue();

expectType<Promise<string>>(queue.add(async () => '🦄'));

expectType<number>(queue.intervalCap);
queue.intervalCap = 5;
Loading