@@ -21,13 +21,14 @@ import {
2121 MongoInvalidArgumentError ,
2222 MongoMissingCredentialsError ,
2323 MongoNetworkError ,
24+ MongoOperationTimeoutError ,
2425 MongoRuntimeError ,
2526 MongoServerError
2627} from '../error' ;
2728import { CancellationToken , TypedEventEmitter } from '../mongo_types' ;
2829import type { Server } from '../sdam/server' ;
2930import { Timeout , TimeoutError } from '../timeout' ;
30- import { type Callback , List , makeCounter , now , promiseWithResolvers } from '../utils' ;
31+ import { type Callback , csotMin , List , makeCounter , promiseWithResolvers } from '../utils' ;
3132import { connect } from './connect' ;
3233import { Connection , type ConnectionEvents , type ConnectionOptions } from './connection' ;
3334import {
@@ -102,7 +103,6 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
102103export interface WaitQueueMember {
103104 resolve : ( conn : Connection ) => void ;
104105 reject : ( err : AnyError ) => void ;
105- timeout : Timeout ;
106106 [ kCancelled ] ?: boolean ;
107107 checkoutTime : number ;
108108}
@@ -355,37 +355,57 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
355355 * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
356356 * explicitly destroyed by the new owner.
357357 */
358- async checkOut ( ) : Promise < Connection > {
359- const checkoutTime = now ( ) ;
358+ async checkOut ( options ?: { timeout ?: Timeout } ) : Promise < Connection > {
360359 this . emitAndLog (
361360 ConnectionPool . CONNECTION_CHECK_OUT_STARTED ,
362361 new ConnectionCheckOutStartedEvent ( this )
363362 ) ;
364363
365364 const waitQueueTimeoutMS = this . options . waitQueueTimeoutMS ;
365+ const serverSelectionTimeoutMS = this [ kServer ] . topology . s . serverSelectionTimeoutMS ;
366366
367367 const { promise, resolve, reject } = promiseWithResolvers < Connection > ( ) ;
368368
369- const timeout = Timeout . expires ( waitQueueTimeoutMS ) ;
369+ let timeout : Timeout | null = null ;
370+ if ( options ?. timeout ) {
371+ // CSOT enabled
372+ // Determine if we're using the timeout passed in or a new timeout
373+ if ( options . timeout . duration > 0 || serverSelectionTimeoutMS > 0 ) {
374+ // This check determines whether or not Topology.selectServer used the configured
375+ // `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
376+ if (
377+ options . timeout . duration === serverSelectionTimeoutMS ||
378+ csotMin ( options . timeout . duration , serverSelectionTimeoutMS ) < serverSelectionTimeoutMS
379+ ) {
380+ // server selection used `timeoutMS`, so we should use the existing timeout as the timeout
381+ // here
382+ timeout = options . timeout ;
383+ } else {
384+ // server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
385+ // the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
386+ // cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
387+ timeout = Timeout . expires ( serverSelectionTimeoutMS - options . timeout . timeElapsed ) ;
388+ }
389+ }
390+ } else {
391+ timeout = Timeout . expires ( waitQueueTimeoutMS ) ;
392+ }
370393
371394 const waitQueueMember : WaitQueueMember = {
372395 resolve,
373- reject,
374- timeout,
375- checkoutTime
396+ reject
376397 } ;
377398
378399 this [ kWaitQueue ] . push ( waitQueueMember ) ;
379400 process . nextTick ( ( ) => this . processWaitQueue ( ) ) ;
380401
381402 try {
382- return await Promise . race ( [ promise , waitQueueMember . timeout ] ) ;
403+ timeout ?. throwIfExpired ( ) ;
404+ return await ( timeout ? Promise . race ( [ promise , timeout ] ) : promise ) ;
383405 } catch ( error ) {
384406 if ( TimeoutError . is ( error ) ) {
385407 waitQueueMember [ kCancelled ] = true ;
386408
387- waitQueueMember . timeout . clear ( ) ;
388-
389409 this . emitAndLog (
390410 ConnectionPool . CONNECTION_CHECK_OUT_FAILED ,
391411 new ConnectionCheckOutFailedEvent ( this , 'timeout' , waitQueueMember . checkoutTime )
@@ -396,9 +416,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
396416 : 'Timed out while checking out a connection from connection pool' ,
397417 this . address
398418 ) ;
419+ if ( options ?. timeout ) {
420+ throw new MongoOperationTimeoutError ( 'Timed out during connection checkout' , {
421+ cause : timeoutError
422+ } ) ;
423+ }
399424 throw timeoutError ;
400425 }
401426 throw error ;
427+ } finally {
428+ if ( timeout !== options ?. timeout ) timeout ?. clear ( ) ;
402429 }
403430 }
404431
@@ -764,7 +791,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
764791 ConnectionPool . CONNECTION_CHECK_OUT_FAILED ,
765792 new ConnectionCheckOutFailedEvent ( this , reason , waitQueueMember . checkoutTime , error )
766793 ) ;
767- waitQueueMember . timeout . clear ( ) ;
768794 this [ kWaitQueue ] . shift ( ) ;
769795 waitQueueMember . reject ( error ) ;
770796 continue ;
@@ -785,7 +811,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
785811 ConnectionPool . CONNECTION_CHECKED_OUT ,
786812 new ConnectionCheckedOutEvent ( this , connection , waitQueueMember . checkoutTime )
787813 ) ;
788- waitQueueMember . timeout . clear ( ) ;
789814
790815 this [ kWaitQueue ] . shift ( ) ;
791816 waitQueueMember . resolve ( connection ) ;
@@ -828,8 +853,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
828853 ) ;
829854 waitQueueMember . resolve ( connection ) ;
830855 }
831-
832- waitQueueMember . timeout . clear ( ) ;
833856 }
834857 process . nextTick ( ( ) => this . processWaitQueue ( ) ) ;
835858 } ) ;
0 commit comments