@@ -24,6 +24,7 @@ import {
2424 MongoMissingDependencyError ,
2525 MongoNetworkError ,
2626 MongoNetworkTimeoutError ,
27+ MongoOperationTimeoutError ,
2728 MongoParseError ,
2829 MongoServerError ,
2930 MongoUnexpectedServerResponseError
@@ -35,7 +36,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
3536import { ReadPreference , type ReadPreferenceLike } from '../read_preference' ;
3637import { ServerType } from '../sdam/common' ;
3738import { applySession , type ClientSession , updateSessionFromResponse } from '../sessions' ;
38- import { type TimeoutContext } from '../timeout' ;
39+ import { type TimeoutContext , TimeoutError } from '../timeout' ;
3940import {
4041 BufferPool ,
4142 calculateDurationInMs ,
@@ -424,6 +425,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
424425 ...options
425426 } ;
426427
428+ if ( options . timeoutContext ?. csotEnabled ( ) ) {
429+ const { maxTimeMS } = options . timeoutContext ;
430+ if ( maxTimeMS > 0 && Number . isFinite ( maxTimeMS ) ) cmd . maxTimeMS = maxTimeMS ;
431+ }
432+
427433 const message = this . supportsOpMsg
428434 ? new OpMsgRequest ( db , cmd , commandOptions )
429435 : new OpQueryRequest ( db , cmd , commandOptions ) ;
@@ -438,7 +444,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
438444 ) : AsyncGenerator < MongoDBResponse > {
439445 this . throwIfAborted ( ) ;
440446
441- if ( typeof options . socketTimeoutMS === 'number' ) {
447+ if ( options . timeoutContext ?. csotEnabled ( ) ) {
448+ this . socket . setTimeout ( 0 ) ;
449+ } else if ( typeof options . socketTimeoutMS === 'number' ) {
442450 this . socket . setTimeout ( options . socketTimeoutMS ) ;
443451 } else if ( this . socketTimeoutMS !== 0 ) {
444452 this . socket . setTimeout ( this . socketTimeoutMS ) ;
@@ -447,7 +455,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
447455 try {
448456 await this . writeCommand ( message , {
449457 agreedCompressor : this . description . compressor ?? 'none' ,
450- zlibCompressionLevel : this . description . zlibCompressionLevel
458+ zlibCompressionLevel : this . description . zlibCompressionLevel ,
459+ timeoutContext : options . timeoutContext
451460 } ) ;
452461
453462 if ( options . noResponse || message . moreToCome ) {
@@ -457,7 +466,17 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
457466
458467 this . throwIfAborted ( ) ;
459468
460- for await ( const response of this . readMany ( ) ) {
469+ if (
470+ options . timeoutContext ?. csotEnabled ( ) &&
471+ options . timeoutContext . minRoundTripTime != null &&
472+ options . timeoutContext . remainingTimeMS < options . timeoutContext . minRoundTripTime
473+ ) {
474+ throw new MongoOperationTimeoutError (
475+ 'Server roundtrip time is greater than the time remaining'
476+ ) ;
477+ }
478+
479+ for await ( const response of this . readMany ( { timeoutContext : options . timeoutContext } ) ) {
461480 this . socket . setTimeout ( 0 ) ;
462481 const bson = response . parse ( ) ;
463482
@@ -634,7 +653,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
634653 */
635654 private async writeCommand (
636655 command : WriteProtocolMessageType ,
637- options : { agreedCompressor ?: CompressorName ; zlibCompressionLevel ?: number }
656+ options : {
657+ agreedCompressor ?: CompressorName ;
658+ zlibCompressionLevel ?: number ;
659+ timeoutContext ?: TimeoutContext ;
660+ }
638661 ) : Promise < void > {
639662 const finalCommand =
640663 options . agreedCompressor === 'none' || ! OpCompressedRequest . canCompress ( command )
@@ -646,8 +669,32 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
646669
647670 const buffer = Buffer . concat ( await finalCommand . toBin ( ) ) ;
648671
672+ if ( options . timeoutContext ?. csotEnabled ( ) ) {
673+ if (
674+ options . timeoutContext . minRoundTripTime != null &&
675+ options . timeoutContext . remainingTimeMS < options . timeoutContext . minRoundTripTime
676+ ) {
677+ throw new MongoOperationTimeoutError (
678+ 'Server roundtrip time is greater than the time remaining'
679+ ) ;
680+ }
681+ }
682+
649683 if ( this . socket . write ( buffer ) ) return ;
650- return await once ( this . socket , 'drain' ) ;
684+
685+ const drainEvent = once < void > ( this . socket , 'drain' ) ;
686+ const timeout = options ?. timeoutContext ?. timeoutForSocketWrite ;
687+ if ( timeout ) {
688+ try {
689+ return await Promise . race ( [ drainEvent , timeout ] ) ;
690+ } catch ( error ) {
691+ if ( TimeoutError . is ( error ) ) {
692+ throw new MongoOperationTimeoutError ( 'Timed out at socket write' ) ;
693+ }
694+ throw error ;
695+ }
696+ }
697+ return await drainEvent ;
651698 }
652699
653700 /**
@@ -659,10 +706,13 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
659706 *
660707 * Note that `for-await` loops call `return` automatically when the loop is exited.
661708 */
662- private async * readMany ( ) : AsyncGenerator < OpMsgResponse | OpReply > {
709+ private async * readMany ( options : {
710+ timeoutContext ?: TimeoutContext ;
711+ } ) : AsyncGenerator < OpMsgResponse | OpReply > {
663712 try {
664- this . dataEvents = onData ( this . messageStream ) ;
713+ this . dataEvents = onData ( this . messageStream , options ) ;
665714 this . messageStream . resume ( ) ;
715+
666716 for await ( const message of this . dataEvents ) {
667717 const response = await decompressResponse ( message ) ;
668718 yield response ;
0 commit comments