Skip to content

Commit 6ce78d2

Browse files
committed
fix(NODE-5502): recursive calls to next cause memory leak
1 parent 435f88b commit 6ce78d2

File tree

3 files changed

+76
-77
lines changed

3 files changed

+76
-77
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 71 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -701,96 +701,90 @@ async function next<T>(
701701
transform: boolean;
702702
}
703703
): Promise<T | null> {
704-
const cursorId = cursor[kId];
705704
if (cursor.closed) {
706705
return null;
707706
}
708707

709-
if (cursor[kDocuments].length !== 0) {
710-
const doc = cursor[kDocuments].shift();
711-
712-
if (doc != null && transform && cursor[kTransform]) {
713-
try {
714-
return cursor[kTransform](doc);
715-
} catch (error) {
716-
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => {
717-
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
718-
// error instead.
719-
});
720-
throw error;
721-
}
708+
do {
709+
if (cursor[kId] == null) {
710+
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
711+
await promisify(cb => cursor[kInit](cb))();
722712
}
723713

724-
return doc;
725-
}
714+
if (cursor[kDocuments].length !== 0) {
715+
const doc = cursor[kDocuments].shift();
726716

727-
if (cursorId == null) {
728-
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
729-
const init = promisify(cb => cursor[kInit](cb));
730-
await init();
731-
return next(cursor, { blocking, transform });
732-
}
717+
if (doc != null && transform && cursor[kTransform]) {
718+
try {
719+
return cursor[kTransform](doc);
720+
} catch (error) {
721+
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => {
722+
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
723+
// error instead.
724+
});
725+
throw error;
726+
}
727+
}
733728

734-
if (cursorIsDead(cursor)) {
735-
// if the cursor is dead, we clean it up
736-
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
737-
// and we should surface the error
738-
await cleanupCursorAsync(cursor, {});
739-
return null;
740-
}
729+
return doc;
730+
}
741731

742-
// otherwise need to call getMore
743-
const batchSize = cursor[kOptions].batchSize || 1000;
744-
const getMore = promisify((batchSize: number, cb: Callback<Document | undefined>) =>
745-
cursor._getMore(batchSize, cb)
746-
);
732+
if (cursorIsDead(cursor)) {
733+
// if the cursor is dead, we clean it up
734+
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
735+
// and we should surface the error
736+
await cleanupCursorAsync(cursor, {});
737+
return null;
738+
}
739+
740+
// otherwise need to call getMore
741+
const batchSize = cursor[kOptions].batchSize || 1000;
747742

748-
let response: Document | undefined;
749-
try {
750-
response = await getMore(batchSize);
751-
} catch (error) {
752-
if (error) {
743+
try {
744+
const response = await promisify((batchSize: number, cb: Callback<Document>) =>
745+
cursor._getMore(batchSize, cb)
746+
)(batchSize);
747+
748+
if (response) {
749+
const cursorId =
750+
typeof response.cursor.id === 'number'
751+
? Long.fromNumber(response.cursor.id)
752+
: typeof response.cursor.id === 'bigint'
753+
? Long.fromBigInt(response.cursor.id)
754+
: response.cursor.id;
755+
756+
cursor[kDocuments].pushMany(response.cursor.nextBatch);
757+
cursor[kId] = cursorId;
758+
}
759+
} catch (error) {
753760
await cleanupCursorAsync(cursor, { error }).catch(() => {
754761
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
755762
// error instead.
756763
});
757764
throw error;
758765
}
759-
}
760766

761-
if (response) {
762-
const cursorId =
763-
typeof response.cursor.id === 'number'
764-
? Long.fromNumber(response.cursor.id)
765-
: typeof response.cursor.id === 'bigint'
766-
? Long.fromBigInt(response.cursor.id)
767-
: response.cursor.id;
768-
769-
cursor[kDocuments].pushMany(response.cursor.nextBatch);
770-
cursor[kId] = cursorId;
771-
}
772-
773-
if (cursorIsDead(cursor)) {
774-
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
775-
// we intentionally clean up the cursor to release its session back into the pool before the cursor
776-
// is iterated. This prevents a cursor that is exhausted on the server from holding
777-
// onto a session indefinitely until the AbstractCursor is iterated.
778-
//
779-
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
780-
// and we should surface the error
781-
await cleanupCursorAsync(cursor, {});
782-
}
767+
if (cursorIsDead(cursor)) {
768+
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
769+
// we intentionally clean up the cursor to release its session back into the pool before the cursor
770+
// is iterated. This prevents a cursor that is exhausted on the server from holding
771+
// onto a session indefinitely until the AbstractCursor is iterated.
772+
//
773+
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
774+
// and we should surface the error
775+
await cleanupCursorAsync(cursor, {});
776+
}
783777

784-
if (cursor[kDocuments].length === 0 && blocking === false) {
785-
return null;
786-
}
778+
if (cursor[kDocuments].length === 0 && blocking === false) {
779+
return null;
780+
}
781+
} while (!cursorIsDead(cursor) || cursor[kDocuments].length !== 0);
787782

788-
return next(cursor, { blocking, transform });
783+
return null;
789784
}
790785

791786
function cursorIsDead(cursor: AbstractCursor): boolean {
792-
const cursorId = cursor[kId];
793-
return !!cursorId && cursorId.isZero();
787+
return cursor[kId]?.isZero() ?? true;
794788
}
795789

796790
const cleanupCursorAsync = promisify(cleanupCursor);
@@ -882,7 +876,7 @@ export function assertUninitialized(cursor: AbstractCursor): void {
882876

883877
class ReadableCursorStream extends Readable {
884878
private _cursor: AbstractCursor;
885-
private _readInProgress = false;
879+
private _readInProgress: Promise<Document> | null = null;
886880

887881
constructor(cursor: AbstractCursor) {
888882
super({
@@ -895,8 +889,7 @@ class ReadableCursorStream extends Readable {
895889

896890
// eslint-disable-next-line @typescript-eslint/no-unused-vars
897891
override _read(size: number): void {
898-
if (!this._readInProgress) {
899-
this._readInProgress = true;
892+
if (this._readInProgress == null) {
900893
this._readNext();
901894
}
902895
}
@@ -909,8 +902,13 @@ class ReadableCursorStream extends Readable {
909902
}
910903

911904
private _readNext() {
912-
next(this._cursor, { blocking: true, transform: true }).then(
905+
if (this._readInProgress) return;
906+
907+
this._readInProgress = next(this._cursor, { blocking: true, transform: true });
908+
909+
this._readInProgress.then(
913910
result => {
911+
this._readInProgress = null;
914912
if (result == null) {
915913
this.push(null);
916914
} else if (this.destroyed) {
@@ -919,11 +917,10 @@ class ReadableCursorStream extends Readable {
919917
if (this.push(result)) {
920918
return this._readNext();
921919
}
922-
923-
this._readInProgress = false;
924920
}
925921
},
926922
err => {
923+
this._readInProgress = null;
927924
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
928925
// desired behavior is that a stream ends cleanly when a user explicitly closes
929926
// a client during iteration. Alternatively, we could do the "right" thing and

src/cursor/find_cursor.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { Document } from '../bson';
1+
import { type Document, Long } from '../bson';
22
import { MongoInvalidArgumentError, MongoTailableCursorError } from '../error';
33
import type { ExplainVerbosityLike } from '../explain';
44
import type { MongoClient } from '../mongo_client';
@@ -101,7 +101,9 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
101101
limit && limit > 0 && numReturned + batchSize > limit ? limit - numReturned : batchSize;
102102

103103
if (batchSize <= 0) {
104-
this.close().finally(() => callback());
104+
this.close().finally(() =>
105+
callback(undefined, { cursor: { id: Long.ZERO, nextBatch: [] } })
106+
);
105107
return;
106108
}
107109
}

test/integration/change-streams/change_stream.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ describe('Change Streams', function () {
448448
const willBeChange = once(changeStream, 'change');
449449
await once(changeStream.cursor, 'init');
450450

451-
collection.insertOne({ a: 1 });
451+
await collection.insertOne({ a: 1 });
452452

453453
const [change] = await willBeChange;
454454
expect(change).to.have.property('operationType', 'insert');

0 commit comments

Comments
 (0)