@@ -20,6 +20,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2020import { type AsyncDisposable , configureResourceManagement } from '../resource_management' ;
2121import type { Server } from '../sdam/server' ;
2222import { ClientSession , maybeClearPinnedConnection } from '../sessions' ;
23+ import { TimeoutContext } from '../timeout' ;
2324import { type MongoDBNamespace , squashError } from '../utils' ;
2425
2526/**
@@ -59,6 +60,17 @@ export interface CursorStreamOptions {
5960/** @public */
6061export type CursorFlag = ( typeof CURSOR_FLAGS ) [ number ] ;
6162
63+ /** @public */
64+ export const CursorTimeoutMode = Object . freeze ( {
65+ ITERATION : 'iteration' ,
66+ LIFETIME : 'cursorLifetime'
67+ } as const ) ;
68+
69+ /** @public
70+ * TODO(NODE-5688): Document and release
71+ * */
72+ export type CursorTimeoutMode = ( typeof CursorTimeoutMode ) [ keyof typeof CursorTimeoutMode ] ;
73+
6274/** @public */
6375export interface AbstractCursorOptions extends BSONSerializeOptions {
6476 session ?: ClientSession ;
@@ -104,6 +116,8 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
104116 noCursorTimeout ?: boolean ;
105117 /** @internal TODO(NODE-5688): make this public */
106118 timeoutMS ?: number ;
119+ /** @internal TODO(NODE-5688): make this public */
120+ timeoutMode ?: CursorTimeoutMode ;
107121}
108122
109123/** @internal */
@@ -116,6 +130,8 @@ export type InternalAbstractCursorOptions = Omit<AbstractCursorOptions, 'readPre
116130 oplogReplay ?: boolean ;
117131 exhaust ?: boolean ;
118132 partial ?: boolean ;
133+
134+ omitMaxTimeMS ?: boolean ;
119135} ;
120136
121137/** @public */
@@ -153,6 +169,8 @@ export abstract class AbstractCursor<
153169 private isKilled : boolean ;
154170 /** @internal */
155171 protected readonly cursorOptions : InternalAbstractCursorOptions ;
172+ /** @internal */
173+ protected timeoutContext ?: TimeoutContext ;
156174
157175 /** @event */
158176 static readonly CLOSE = 'close' as const ;
@@ -182,6 +200,30 @@ export abstract class AbstractCursor<
182200 ...pluckBSONSerializeOptions ( options )
183201 } ;
184202 this . cursorOptions . timeoutMS = options . timeoutMS ;
203+ if ( this . cursorOptions . timeoutMS != null ) {
204+ if ( options . timeoutMode == null ) {
205+ if ( options . tailable ) {
206+ this . cursorOptions . timeoutMode = CursorTimeoutMode . ITERATION ;
207+ } else {
208+ this . cursorOptions . timeoutMode = CursorTimeoutMode . LIFETIME ;
209+ }
210+ } else {
211+ if ( options . tailable && this . cursorOptions . timeoutMode === CursorTimeoutMode . LIFETIME ) {
212+ throw new MongoInvalidArgumentError (
213+ "Cannot set tailable cursor's timeoutMode to LIFETIME"
214+ ) ;
215+ }
216+ this . cursorOptions . timeoutMode = options . timeoutMode ;
217+ }
218+ } else {
219+ if ( options . timeoutMode != null )
220+ throw new MongoInvalidArgumentError ( 'Cannot set timeoutMode without setting timeoutMS' ) ;
221+ }
222+ this . cursorOptions . omitMaxTimeMS =
223+ this . cursorOptions . timeoutMS != null &&
224+ ( ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION &&
225+ ! this . cursorOptions . tailable ) ||
226+ ( this . cursorOptions . tailable && ! this . cursorOptions . awaitData ) ) ;
185227
186228 const readConcern = ReadConcern . fromOptions ( options ) ;
187229 if ( readConcern ) {
@@ -389,12 +431,21 @@ export abstract class AbstractCursor<
389431 return false ;
390432 }
391433
392- do {
393- if ( ( this . documents ?. length ?? 0 ) !== 0 ) {
394- return true ;
434+ if ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION && this . cursorId != null ) {
435+ this . timeoutContext ?. refresh ( ) ;
436+ }
437+ try {
438+ do {
439+ if ( ( this . documents ?. length ?? 0 ) !== 0 ) {
440+ return true ;
441+ }
442+ await this . fetchBatch ( ) ;
443+ } while ( ! this . isDead || ( this . documents ?. length ?? 0 ) !== 0 ) ;
444+ } finally {
445+ if ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION && this . cursorId != null ) {
446+ this . timeoutContext ?. clear ( ) ;
395447 }
396- await this . fetchBatch ( ) ;
397- } while ( ! this . isDead || ( this . documents ?. length ?? 0 ) !== 0 ) ;
448+ }
398449
399450 return false ;
400451 }
@@ -404,15 +455,24 @@ export abstract class AbstractCursor<
404455 if ( this . cursorId === Long . ZERO ) {
405456 throw new MongoCursorExhaustedError ( ) ;
406457 }
458+ if ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION && this . cursorId != null ) {
459+ this . timeoutContext ?. refresh ( ) ;
460+ }
407461
408- do {
409- const doc = this . documents ?. shift ( this . cursorOptions ) ;
410- if ( doc != null ) {
411- if ( this . transform != null ) return await this . transformDocument ( doc ) ;
412- return doc ;
462+ try {
463+ do {
464+ const doc = this . documents ?. shift ( this . cursorOptions ) ;
465+ if ( doc != null ) {
466+ if ( this . transform != null ) return await this . transformDocument ( doc ) ;
467+ return doc ;
468+ }
469+ await this . fetchBatch ( ) ;
470+ } while ( ! this . isDead || ( this . documents ?. length ?? 0 ) !== 0 ) ;
471+ } finally {
472+ if ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION && this . cursorId != null ) {
473+ this . timeoutContext ?. clear ( ) ;
413474 }
414- await this . fetchBatch ( ) ;
415- } while ( ! this . isDead || ( this . documents ?. length ?? 0 ) !== 0 ) ;
475+ }
416476
417477 return null ;
418478 }
@@ -425,18 +485,27 @@ export abstract class AbstractCursor<
425485 throw new MongoCursorExhaustedError ( ) ;
426486 }
427487
428- let doc = this . documents ?. shift ( this . cursorOptions ) ;
429- if ( doc != null ) {
430- if ( this . transform != null ) return await this . transformDocument ( doc ) ;
431- return doc ;
488+ if ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION && this . cursorId != null ) {
489+ this . timeoutContext ?. refresh ( ) ;
432490 }
491+ try {
492+ let doc = this . documents ?. shift ( this . cursorOptions ) ;
493+ if ( doc != null ) {
494+ if ( this . transform != null ) return await this . transformDocument ( doc ) ;
495+ return doc ;
496+ }
433497
434- await this . fetchBatch ( ) ;
498+ await this . fetchBatch ( ) ;
435499
436- doc = this . documents ?. shift ( this . cursorOptions ) ;
437- if ( doc != null ) {
438- if ( this . transform != null ) return await this . transformDocument ( doc ) ;
439- return doc ;
500+ doc = this . documents ?. shift ( this . cursorOptions ) ;
501+ if ( doc != null ) {
502+ if ( this . transform != null ) return await this . transformDocument ( doc ) ;
503+ return doc ;
504+ }
505+ } finally {
506+ if ( this . cursorOptions . timeoutMode === CursorTimeoutMode . ITERATION && this . cursorId != null ) {
507+ this . timeoutContext ?. clear ( ) ;
508+ }
440509 }
441510
442511 return null ;
@@ -465,8 +534,8 @@ export abstract class AbstractCursor<
465534 /**
466535 * Frees any client-side resources used by the cursor.
467536 */
468- async close ( ) : Promise < void > {
469- await this . cleanup ( ) ;
537+ async close ( options ?: { timeoutMS ?: number } ) : Promise < void > {
538+ await this . cleanup ( options ?. timeoutMS ) ;
470539 }
471540
472541 /**
@@ -647,6 +716,8 @@ export abstract class AbstractCursor<
647716
648717 this . cursorId = null ;
649718 this . documents ?. clear ( ) ;
719+ this . timeoutContext ?. clear ( ) ;
720+ this . timeoutContext = undefined ;
650721 this . isClosed = false ;
651722 this . isKilled = false ;
652723 this . initialized = false ;
@@ -697,7 +768,7 @@ export abstract class AbstractCursor<
697768 }
698769 ) ;
699770
700- return await executeOperation ( this . cursorClient , getMoreOperation ) ;
771+ return await executeOperation ( this . cursorClient , getMoreOperation , this . timeoutContext ) ;
701772 }
702773
703774 /**
@@ -708,6 +779,12 @@ export abstract class AbstractCursor<
708779 * a significant refactor.
709780 */
710781 private async cursorInit ( ) : Promise < void > {
782+ if ( this . cursorOptions . timeoutMS != null ) {
783+ this . timeoutContext = TimeoutContext . create ( {
784+ serverSelectionTimeoutMS : this . client . options . serverSelectionTimeoutMS ,
785+ timeoutMS : this . cursorOptions . timeoutMS
786+ } ) ;
787+ }
711788 try {
712789 const state = await this . _initialize ( this . cursorSession ) ;
713790 const response = state . response ;
@@ -719,7 +796,7 @@ export abstract class AbstractCursor<
719796 } catch ( error ) {
720797 // the cursor is now initialized, even if an error occurred
721798 this . initialized = true ;
722- await this . cleanup ( error ) ;
799+ await this . cleanup ( undefined , error ) ;
723800 throw error ;
724801 }
725802
@@ -753,14 +830,15 @@ export abstract class AbstractCursor<
753830
754831 // otherwise need to call getMore
755832 const batchSize = this . cursorOptions . batchSize || 1000 ;
833+ this . cursorOptions . omitMaxTimeMS = this . cursorOptions . timeoutMS != null ;
756834
757835 try {
758836 const response = await this . getMore ( batchSize ) ;
759837 this . cursorId = response . id ;
760838 this . documents = response ;
761839 } catch ( error ) {
762840 try {
763- await this . cleanup ( error ) ;
841+ await this . cleanup ( undefined , error ) ;
764842 } catch ( error ) {
765843 // `cleanupCursor` should never throw, squash and throw the original error
766844 squashError ( error ) ;
@@ -781,7 +859,7 @@ export abstract class AbstractCursor<
781859 }
782860
783861 /** @internal */
784- private async cleanup ( error ?: Error ) {
862+ private async cleanup ( timeoutMS ?: number , error ?: Error ) {
785863 this . isClosed = true ;
786864 const session = this . cursorSession ;
787865 try {
@@ -796,11 +874,23 @@ export abstract class AbstractCursor<
796874 this . isKilled = true ;
797875 const cursorId = this . cursorId ;
798876 this . cursorId = Long . ZERO ;
877+ let timeoutContext : TimeoutContext | undefined ;
878+ if ( timeoutMS != null ) {
879+ this . timeoutContext ?. clear ( ) ;
880+ timeoutContext = TimeoutContext . create ( {
881+ serverSelectionTimeoutMS : this . client . options . serverSelectionTimeoutMS ,
882+ timeoutMS
883+ } ) ;
884+ } else {
885+ this . timeoutContext ?. refresh ( ) ;
886+ timeoutContext = this . timeoutContext ;
887+ }
799888 await executeOperation (
800889 this . cursorClient ,
801890 new KillCursorsOperation ( cursorId , this . cursorNamespace , this . selectedServer , {
802891 session
803- } )
892+ } ) ,
893+ timeoutContext
804894 ) ;
805895 }
806896 } catch ( error ) {
0 commit comments