@@ -19,6 +19,7 @@ import {
1919 MongoMissingDependencyError ,
2020 MongoNetworkError ,
2121 MongoNetworkTimeoutError ,
22+ MongoOperationTimeoutError ,
2223 MongoParseError ,
2324 MongoServerError ,
2425 MongoUnexpectedServerResponseError
@@ -30,7 +31,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
3031import { ReadPreference , type ReadPreferenceLike } from '../read_preference' ;
3132import { ServerType } from '../sdam/common' ;
3233import { applySession , type ClientSession , updateSessionFromResponse } from '../sessions' ;
33- import { type TimeoutContext } from '../timeout' ;
34+ import { type TimeoutContext , TimeoutError } from '../timeout' ;
3435import {
3536 BufferPool ,
3637 calculateDurationInMs ,
@@ -417,6 +418,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
417418 ...options
418419 } ;
419420
421+ if ( options . timeoutContext ?. csotEnabled ( ) ) {
422+ const { maxTimeMS } = options . timeoutContext ;
423+ if ( maxTimeMS > 0 && Number . isFinite ( maxTimeMS ) ) cmd . maxTimeMS = maxTimeMS ;
424+ }
425+
420426 const message = this . supportsOpMsg
421427 ? new OpMsgRequest ( db , cmd , commandOptions )
422428 : new OpQueryRequest ( db , cmd , commandOptions ) ;
@@ -431,7 +437,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
431437 ) : AsyncGenerator < MongoDBResponse > {
432438 this . throwIfAborted ( ) ;
433439
434- if ( typeof options . socketTimeoutMS === 'number' ) {
440+ if ( options . timeoutContext ?. csotEnabled ( ) ) {
441+ this . socket . setTimeout ( 0 ) ;
442+ } else if ( typeof options . socketTimeoutMS === 'number' ) {
435443 this . socket . setTimeout ( options . socketTimeoutMS ) ;
436444 } else if ( this . socketTimeoutMS !== 0 ) {
437445 this . socket . setTimeout ( this . socketTimeoutMS ) ;
@@ -440,7 +448,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
440448 try {
441449 await this . writeCommand ( message , {
442450 agreedCompressor : this . description . compressor ?? 'none' ,
443- zlibCompressionLevel : this . description . zlibCompressionLevel
451+ zlibCompressionLevel : this . description . zlibCompressionLevel ,
452+ timeoutContext : options . timeoutContext
444453 } ) ;
445454
446455 if ( options . noResponse || message . moreToCome ) {
@@ -450,7 +459,17 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
450459
451460 this . throwIfAborted ( ) ;
452461
453- for await ( const response of this . readMany ( ) ) {
462+ if (
463+ options . timeoutContext ?. csotEnabled ( ) &&
464+ options . timeoutContext . minRoundTripTime != null &&
465+ options . timeoutContext . remainingTimeMS < options . timeoutContext . minRoundTripTime
466+ ) {
467+ throw new MongoOperationTimeoutError (
468+ 'Server roundtrip time is greater than the time remaining'
469+ ) ;
470+ }
471+
472+ for await ( const response of this . readMany ( { timeoutContext : options . timeoutContext } ) ) {
454473 this . socket . setTimeout ( 0 ) ;
455474 const bson = response . parse ( ) ;
456475
@@ -627,7 +646,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
627646 */
628647 private async writeCommand (
629648 command : WriteProtocolMessageType ,
630- options : { agreedCompressor ?: CompressorName ; zlibCompressionLevel ?: number }
649+ options : {
650+ agreedCompressor ?: CompressorName ;
651+ zlibCompressionLevel ?: number ;
652+ timeoutContext ?: TimeoutContext ;
653+ }
631654 ) : Promise < void > {
632655 const finalCommand =
633656 options . agreedCompressor === 'none' || ! OpCompressedRequest . canCompress ( command )
@@ -639,8 +662,32 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
639662
640663 const buffer = Buffer . concat ( await finalCommand . toBin ( ) ) ;
641664
665+ if ( options . timeoutContext ?. csotEnabled ( ) ) {
666+ if (
667+ options . timeoutContext . minRoundTripTime != null &&
668+ options . timeoutContext . remainingTimeMS < options . timeoutContext . minRoundTripTime
669+ ) {
670+ throw new MongoOperationTimeoutError (
671+ 'Server roundtrip time is greater than the time remaining'
672+ ) ;
673+ }
674+ }
675+
642676 if ( this . socket . write ( buffer ) ) return ;
643- return await once ( this . socket , 'drain' ) ;
677+
678+ const drainEvent = once < void > ( this . socket , 'drain' ) ;
679+ const timeout = options ?. timeoutContext ?. timeoutForSocketWrite ;
680+ if ( timeout ) {
681+ try {
682+ return await Promise . race ( [ drainEvent , timeout ] ) ;
683+ } catch ( error ) {
684+ if ( TimeoutError . is ( error ) ) {
685+ throw new MongoOperationTimeoutError ( 'Timed out at socket write' ) ;
686+ }
687+ throw error ;
688+ }
689+ }
690+ return await drainEvent ;
644691 }
645692
646693 /**
@@ -652,9 +699,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
652699 *
653700 * Note that `for-await` loops call `return` automatically when the loop is exited.
654701 */
655- private async * readMany ( ) : AsyncGenerator < OpMsgResponse | OpReply > {
702+ private async * readMany ( options : {
703+ timeoutContext ?: TimeoutContext ;
704+ } ) : AsyncGenerator < OpMsgResponse | OpReply > {
656705 try {
657- this . dataEvents = onData ( this . messageStream ) ;
706+ this . dataEvents = onData ( this . messageStream , options ) ;
707+
658708 for await ( const message of this . dataEvents ) {
659709 const response = await decompressResponse ( message ) ;
660710 yield response ;
0 commit comments