Skip to content

Commit 7c7c7f0

Browse files
committed
fix(NODE-5502): recursive calls to next cause memory leak
1 parent 435f88b commit 7c7c7f0

File tree

2 files changed

+72
-78
lines changed

2 files changed

+72
-78
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 68 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,11 @@ export abstract class AbstractCursor<
220220
return this[kId] ?? undefined;
221221
}
222222

223+
/** @internal */
224+
get isDead() {
225+
return (this[kId]?.isZero() ?? false) || this[kClosed] || this[kKilled];
226+
}
227+
223228
/** @internal */
224229
get client(): MongoClient {
225230
return this[kClient];
@@ -671,7 +676,7 @@ export abstract class AbstractCursor<
671676
return cleanupCursor(this, { error }, () => callback(error, undefined));
672677
}
673678

674-
if (cursorIsDead(this)) {
679+
if (this.isDead) {
675680
return cleanupCursor(this, undefined, () => callback());
676681
}
677682

@@ -701,96 +706,83 @@ async function next<T>(
701706
transform: boolean;
702707
}
703708
): Promise<T | null> {
704-
const cursorId = cursor[kId];
705709
if (cursor.closed) {
706710
return null;
707711
}
708712

709-
if (cursor[kDocuments].length !== 0) {
710-
const doc = cursor[kDocuments].shift();
711-
712-
if (doc != null && transform && cursor[kTransform]) {
713-
try {
714-
return cursor[kTransform](doc);
715-
} catch (error) {
716-
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => {
717-
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
718-
// error instead.
719-
});
720-
throw error;
721-
}
713+
do {
714+
if (cursor[kId] == null) {
715+
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
716+
await promisify(cursor[kInit].bind(cursor))();
722717
}
723718

724-
return doc;
725-
}
719+
if (cursor[kDocuments].length !== 0) {
720+
const doc = cursor[kDocuments].shift();
726721

727-
if (cursorId == null) {
728-
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
729-
const init = promisify(cb => cursor[kInit](cb));
730-
await init();
731-
return next(cursor, { blocking, transform });
732-
}
733-
734-
if (cursorIsDead(cursor)) {
735-
// if the cursor is dead, we clean it up
736-
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
737-
// and we should surface the error
738-
await cleanupCursorAsync(cursor, {});
739-
return null;
740-
}
722+
if (doc != null && transform && cursor[kTransform]) {
723+
try {
724+
return cursor[kTransform](doc);
725+
} catch (error) {
726+
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => {
727+
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
728+
// error instead.
729+
});
730+
throw error;
731+
}
732+
}
741733

742-
// otherwise need to call getMore
743-
const batchSize = cursor[kOptions].batchSize || 1000;
744-
const getMore = promisify((batchSize: number, cb: Callback<Document | undefined>) =>
745-
cursor._getMore(batchSize, cb)
746-
);
747-
748-
let response: Document | undefined;
749-
try {
750-
response = await getMore(batchSize);
751-
} catch (error) {
752-
if (error) {
753-
await cleanupCursorAsync(cursor, { error }).catch(() => {
754-
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
755-
// error instead.
756-
});
757-
throw error;
734+
return doc;
758735
}
759-
}
760736

761-
if (response) {
762-
const cursorId =
763-
typeof response.cursor.id === 'number'
764-
? Long.fromNumber(response.cursor.id)
765-
: typeof response.cursor.id === 'bigint'
766-
? Long.fromBigInt(response.cursor.id)
767-
: response.cursor.id;
737+
if (cursor.isDead) {
738+
// if the cursor is dead, we clean it up
739+
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
740+
// and we should surface the error
741+
await cleanupCursorAsync(cursor, {});
742+
return null;
743+
}
768744

769-
cursor[kDocuments].pushMany(response.cursor.nextBatch);
770-
cursor[kId] = cursorId;
771-
}
745+
// otherwise need to call getMore
746+
const batchSize = cursor[kOptions].batchSize || 1000;
772747

773-
if (cursorIsDead(cursor)) {
774-
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
775-
// we intentionally clean up the cursor to release its session back into the pool before the cursor
776-
// is iterated. This prevents a cursor that is exhausted on the server from holding
777-
// onto a session indefinitely until the AbstractCursor is iterated.
778-
//
779-
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
780-
// and we should surface the error
781-
await cleanupCursorAsync(cursor, {});
782-
}
748+
try {
749+
const response = await promisify(cursor._getMore.bind(cursor))(batchSize);
750+
751+
if (response) {
752+
const cursorId =
753+
typeof response.cursor.id === 'number'
754+
? Long.fromNumber(response.cursor.id)
755+
: typeof response.cursor.id === 'bigint'
756+
? Long.fromBigInt(response.cursor.id)
757+
: response.cursor.id;
758+
759+
cursor[kDocuments].pushMany(response.cursor.nextBatch);
760+
cursor[kId] = cursorId;
761+
}
762+
} catch (error) {
763+
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
764+
// error instead.
765+
await cleanupCursorAsync(cursor, { error }).catch(() => null);
766+
throw error;
767+
}
783768

784-
if (cursor[kDocuments].length === 0 && blocking === false) {
785-
return null;
786-
}
769+
if (cursor.isDead) {
770+
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
771+
// we intentionally clean up the cursor to release its session back into the pool before the cursor
772+
// is iterated. This prevents a cursor that is exhausted on the server from holding
773+
// onto a session indefinitely until the AbstractCursor is iterated.
774+
//
775+
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
776+
// and we should surface the error
777+
await cleanupCursorAsync(cursor, {});
778+
}
787779

788-
return next(cursor, { blocking, transform });
789-
}
780+
if (cursor[kDocuments].length === 0 && blocking === false) {
781+
return null;
782+
}
783+
} while (!cursor.isDead || cursor[kDocuments].length !== 0);
790784

791-
function cursorIsDead(cursor: AbstractCursor): boolean {
792-
const cursorId = cursor[kId];
793-
return !!cursorId && cursorId.isZero();
785+
return null;
794786
}
795787

796788
const cleanupCursorAsync = promisify(cleanupCursor);

src/cursor/find_cursor.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { Document } from '../bson';
1+
import { type Document, Long } from '../bson';
22
import { MongoInvalidArgumentError, MongoTailableCursorError } from '../error';
33
import type { ExplainVerbosityLike } from '../explain';
44
import type { MongoClient } from '../mongo_client';
@@ -101,7 +101,9 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
101101
limit && limit > 0 && numReturned + batchSize > limit ? limit - numReturned : batchSize;
102102

103103
if (batchSize <= 0) {
104-
this.close().finally(() => callback());
104+
this.close().finally(() =>
105+
callback(undefined, { cursor: { id: Long.ZERO, nextBatch: [] } })
106+
);
105107
return;
106108
}
107109
}

0 commit comments

Comments
 (0)