@@ -21,6 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2121import { type AsyncDisposable , configureResourceManagement } from '../resource_management' ;
2222import type { Server } from '../sdam/server' ;
2323import { ClientSession , maybeClearPinnedConnection } from '../sessions' ;
24+ import { TimeoutContext } from '../timeout' ;
2425import { type MongoDBNamespace , squashError } from '../utils' ;
2526
2627/**
@@ -60,6 +61,17 @@ export interface CursorStreamOptions {
6061/** @public */
6162export type CursorFlag = ( typeof CURSOR_FLAGS ) [ number ] ;
6263
64+ /** @public */
65+ export const CursorTimeoutMode = Object . freeze ( {
66+ ITERATION : 'iteration' ,
67+ LIFETIME : 'cursorLifetime'
68+ } as const ) ;
69+
70+ /** @public
71+ * TODO(NODE-5688): Document and release
72+ * */
73+ export type CursorTimeoutMode = ( typeof CursorTimeoutMode ) [ keyof typeof CursorTimeoutMode ] ;
74+
6375/** @public */
6476export interface AbstractCursorOptions extends BSONSerializeOptions {
6577 session ?: ClientSession ;
@@ -105,6 +117,8 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
105117 noCursorTimeout ?: boolean ;
106118 /** @internal TODO(NODE-5688): make this public */
107119 timeoutMS ?: number ;
120+ /** @internal TODO(NODE-5688): make this public */
121+ timeoutMode ?: CursorTimeoutMode ;
108122}
109123
110124/** @internal */
@@ -117,6 +131,8 @@ export type InternalAbstractCursorOptions = Omit<AbstractCursorOptions, 'readPre
117131 oplogReplay ?: boolean ;
118132 exhaust ?: boolean ;
119133 partial ?: boolean ;
134+
135+ omitMaxTimeMS ?: boolean ;
120136} ;
121137
122138/** @public */
@@ -154,6 +170,8 @@ export abstract class AbstractCursor<
154170 private isKilled : boolean ;
155171 /** @internal */
156172 protected readonly cursorOptions : InternalAbstractCursorOptions ;
173+ /** @internal */
174+ protected timeoutContext ?: TimeoutContext ;
157175
158176 /** @event */
159177 static readonly CLOSE = 'close' as const ;
@@ -186,6 +204,30 @@ export abstract class AbstractCursor<
186204 ...pluckBSONSerializeOptions ( options )
187205 } ;
188206 this . cursorOptions . timeoutMS = options . timeoutMS ;
207+ if ( this . cursorOptions . timeoutMS != null ) {
208+ if ( options . timeoutMode == null ) {
209+ if ( options . tailable ) {
210+ this . cursorOptions . timeoutMode = CursorTimeoutMode . ITERATION ;
211+ } else {
212+ this . cursorOptions . timeoutMode = CursorTimeoutMode . LIFETIME ;
213+ }
214+ } else {
215+ if ( options . tailable && this . cursorOptions . timeoutMode === CursorTimeoutMode . LIFETIME ) {
216+ throw new MongoInvalidArgumentError (
217+ "Cannot set tailable cursor's timeoutMode to LIFETIME"
218+ ) ;
219+ }
220+ this . cursorOptions . timeoutMode = options . timeoutMode ;
221+ }
222+ } else {
223+ if ( options . timeoutMode != null )
224+ throw new MongoInvalidArgumentError ( 'Cannot set timeoutMode without setting timeoutMS' ) ;
225+ }
226+ this . cursorOptions . omitMaxTimeMS =
227+ this . cursorOptions . timeoutMS != null &&
228+ ( ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION &&
229+ ! this . cursorOptions . tailable ) ||
230+ ( this . cursorOptions . tailable && ! this . cursorOptions . awaitData ) ) ;
189231
190232 const readConcern = ReadConcern . fromOptions ( options ) ;
191233 if ( readConcern ) {
@@ -400,12 +442,21 @@ export abstract class AbstractCursor<
400442 return false ;
401443 }
402444
403- do {
404- if ( ( this . documents ?. length ?? 0 ) !== 0 ) {
405- return true ;
445+ if ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION && this . cursorId != null ) {
446+ this . timeoutContext ?. refresh ( ) ;
447+ }
448+ try {
449+ do {
450+ if ( ( this . documents ?. length ?? 0 ) !== 0 ) {
451+ return true ;
452+ }
453+ await this . fetchBatch ( ) ;
454+ } while ( ! this . isDead || ( this . documents ?. length ?? 0 ) !== 0 ) ;
455+ } finally {
456+ if ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION && this . cursorId != null ) {
457+ this . timeoutContext ?. clear ( ) ;
406458 }
407- await this . fetchBatch ( ) ;
408- } while ( ! this . isDead || ( this . documents ?. length ?? 0 ) !== 0 ) ;
459+ }
409460
410461 return false ;
411462 }
@@ -415,15 +466,24 @@ export abstract class AbstractCursor<
415466 if ( this . cursorId === Long . ZERO ) {
416467 throw new MongoCursorExhaustedError ( ) ;
417468 }
469+ if ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION && this . cursorId != null ) {
470+ this . timeoutContext ?. refresh ( ) ;
471+ }
418472
419- do {
420- const doc = this . documents ?. shift ( this . deserializationOptions ) ;
421- if ( doc != null ) {
422- if ( this . transform != null ) return await this . transformDocument ( doc ) ;
423- return doc ;
473+ try {
474+ do {
475+ const doc = this . documents ?. shift ( this . deserializationOptions ) ;
476+ if ( doc != null ) {
477+ if ( this . transform != null ) return await this . transformDocument ( doc ) ;
478+ return doc ;
479+ }
480+ await this . fetchBatch ( ) ;
481+ } while ( ! this . isDead || ( this . documents ?. length ?? 0 ) !== 0 ) ;
482+ } finally {
483+ if ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION && this . cursorId != null ) {
484+ this . timeoutContext ?. clear ( ) ;
424485 }
425- await this . fetchBatch ( ) ;
426- } while ( ! this . isDead || ( this . documents ?. length ?? 0 ) !== 0 ) ;
486+ }
427487
428488 return null ;
429489 }
@@ -436,18 +496,27 @@ export abstract class AbstractCursor<
436496 throw new MongoCursorExhaustedError ( ) ;
437497 }
438498
439- let doc = this . documents ?. shift ( this . deserializationOptions ) ;
440- if ( doc != null ) {
441- if ( this . transform != null ) return await this . transformDocument ( doc ) ;
442- return doc ;
499+ if ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION && this . cursorId != null ) {
500+ this . timeoutContext ?. refresh ( ) ;
443501 }
502+ try {
503+ let doc = this . documents ?. shift ( this . deserializationOptions ) ;
504+ if ( doc != null ) {
505+ if ( this . transform != null ) return await this . transformDocument ( doc ) ;
506+ return doc ;
507+ }
444508
445- await this . fetchBatch ( ) ;
509+ await this . fetchBatch ( ) ;
446510
447- doc = this . documents ?. shift ( this . deserializationOptions ) ;
448- if ( doc != null ) {
449- if ( this . transform != null ) return await this . transformDocument ( doc ) ;
450- return doc ;
511+ doc = this . documents ?. shift ( this . deserializationOptions ) ;
512+ if ( doc != null ) {
513+ if ( this . transform != null ) return await this . transformDocument ( doc ) ;
514+ return doc ;
515+ }
516+ } finally {
517+ if ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION && this . cursorId != null ) {
518+ this . timeoutContext ?. clear ( ) ;
519+ }
451520 }
452521
453522 return null ;
@@ -476,8 +545,8 @@ export abstract class AbstractCursor<
476545 /**
477546 * Frees any client-side resources used by the cursor.
478547 */
479- async close ( ) : Promise < void > {
480- await this . cleanup ( ) ;
548+ async close ( options ?: { timeoutMS ?: number } ) : Promise < void > {
549+ await this . cleanup ( options ?. timeoutMS ) ;
481550 }
482551
483552 /**
@@ -658,6 +727,8 @@ export abstract class AbstractCursor<
658727
659728 this . cursorId = null ;
660729 this . documents ?. clear ( ) ;
730+ this . timeoutContext ?. clear ( ) ;
731+ this . timeoutContext = undefined ;
661732 this . isClosed = false ;
662733 this . isKilled = false ;
663734 this . initialized = false ;
@@ -707,7 +778,7 @@ export abstract class AbstractCursor<
707778 }
708779 ) ;
709780
710- return await executeOperation ( this . cursorClient , getMoreOperation ) ;
781+ return await executeOperation ( this . cursorClient , getMoreOperation , this . timeoutContext ) ;
711782 }
712783
713784 /**
@@ -718,6 +789,12 @@ export abstract class AbstractCursor<
718789 * a significant refactor.
719790 */
720791 private async cursorInit ( ) : Promise < void > {
792+ if ( this . cursorOptions . timeoutMS != null ) {
793+ this . timeoutContext = TimeoutContext . create ( {
794+ serverSelectionTimeoutMS : this . client . options . serverSelectionTimeoutMS ,
795+ timeoutMS : this . cursorOptions . timeoutMS
796+ } ) ;
797+ }
721798 try {
722799 const state = await this . _initialize ( this . cursorSession ) ;
723800 const response = state . response ;
@@ -729,7 +806,7 @@ export abstract class AbstractCursor<
729806 } catch ( error ) {
730807 // the cursor is now initialized, even if an error occurred
731808 this . initialized = true ;
732- await this . cleanup ( error ) ;
809+ await this . cleanup ( undefined , error ) ;
733810 throw error ;
734811 }
735812
@@ -763,14 +840,15 @@ export abstract class AbstractCursor<
763840
764841 // otherwise need to call getMore
765842 const batchSize = this . cursorOptions . batchSize || 1000 ;
843+ this . cursorOptions . omitMaxTimeMS = this . cursorOptions . timeoutMS != null ;
766844
767845 try {
768846 const response = await this . getMore ( batchSize ) ;
769847 this . cursorId = response . id ;
770848 this . documents = response ;
771849 } catch ( error ) {
772850 try {
773- await this . cleanup ( error ) ;
851+ await this . cleanup ( undefined , error ) ;
774852 } catch ( error ) {
775853 // `cleanupCursor` should never throw, squash and throw the original error
776854 squashError ( error ) ;
@@ -791,7 +869,7 @@ export abstract class AbstractCursor<
791869 }
792870
793871 /** @internal */
794- private async cleanup ( error ?: Error ) {
872+ private async cleanup ( timeoutMS ?: number , error ?: Error ) {
795873 this . isClosed = true ;
796874 const session = this . cursorSession ;
797875 try {
@@ -806,11 +884,23 @@ export abstract class AbstractCursor<
806884 this . isKilled = true ;
807885 const cursorId = this . cursorId ;
808886 this . cursorId = Long . ZERO ;
887+ let timeoutContext : TimeoutContext | undefined ;
888+ if ( timeoutMS != null ) {
889+ this . timeoutContext ?. clear ( ) ;
890+ timeoutContext = TimeoutContext . create ( {
891+ serverSelectionTimeoutMS : this . client . options . serverSelectionTimeoutMS ,
892+ timeoutMS
893+ } ) ;
894+ } else {
895+ this . timeoutContext ?. refresh ( ) ;
896+ timeoutContext = this . timeoutContext ;
897+ }
809898 await executeOperation (
810899 this . cursorClient ,
811900 new KillCursorsOperation ( cursorId , this . cursorNamespace , this . selectedServer , {
812901 session
813- } )
902+ } ) ,
903+ timeoutContext
814904 ) ;
815905 }
816906 } catch ( error ) {
0 commit comments