@@ -19,6 +19,7 @@ import {
1919 MongoMissingDependencyError ,
2020 MongoNetworkError ,
2121 MongoNetworkTimeoutError ,
22+ MongoOperationTimeoutError ,
2223 MongoParseError ,
2324 MongoServerError ,
2425 MongoUnexpectedServerResponseError
@@ -30,7 +31,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
3031import { ReadPreference , type ReadPreferenceLike } from '../read_preference' ;
3132import { ServerType } from '../sdam/common' ;
3233import { applySession , type ClientSession , updateSessionFromResponse } from '../sessions' ;
33- import { type TimeoutContext } from '../timeout' ;
34+ import { type TimeoutContext , TimeoutError } from '../timeout' ;
3435import {
3536 BufferPool ,
3637 calculateDurationInMs ,
@@ -417,6 +418,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
417418 ...options
418419 } ;
419420
421+ if ( options . timeoutContext ?. csotEnabled ( ) ) {
422+ const { maxTimeMS } = options . timeoutContext ;
423+ if ( maxTimeMS > 0 && Number . isFinite ( maxTimeMS ) ) cmd . maxTimeMS = maxTimeMS ;
424+ }
425+
420426 const message = this . supportsOpMsg
421427 ? new OpMsgRequest ( db , cmd , commandOptions )
422428 : new OpQueryRequest ( db , cmd , commandOptions ) ;
@@ -431,7 +437,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
431437 ) : AsyncGenerator < MongoDBResponse > {
432438 this . throwIfAborted ( ) ;
433439
434- if ( typeof options . socketTimeoutMS === 'number' ) {
440+ if ( options . timeoutContext ?. csotEnabled ( ) ) {
441+ this . socket . setTimeout ( 0 ) ;
442+ } else if ( typeof options . socketTimeoutMS === 'number' ) {
435443 this . socket . setTimeout ( options . socketTimeoutMS ) ;
436444 } else if ( this . socketTimeoutMS !== 0 ) {
437445 this . socket . setTimeout ( this . socketTimeoutMS ) ;
@@ -440,7 +448,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
440448 try {
441449 await this . writeCommand ( message , {
442450 agreedCompressor : this . description . compressor ?? 'none' ,
443- zlibCompressionLevel : this . description . zlibCompressionLevel
451+ zlibCompressionLevel : this . description . zlibCompressionLevel ,
452+ timeoutContext : options . timeoutContext
444453 } ) ;
445454
446455 if ( options . noResponse ) {
@@ -450,7 +459,17 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
450459
451460 this . throwIfAborted ( ) ;
452461
453- for await ( const response of this . readMany ( ) ) {
462+ if (
463+ options . timeoutContext ?. csotEnabled ( ) &&
464+ options . timeoutContext . minRoundTripTime != null &&
465+ options . timeoutContext . remainingTimeMS < options . timeoutContext . minRoundTripTime
466+ ) {
467+ throw new MongoOperationTimeoutError (
468+ 'Server roundtrip time is greater than the time remaining'
469+ ) ;
470+ }
471+
472+ for await ( const response of this . readMany ( { timeoutContext : options . timeoutContext } ) ) {
454473 this . socket . setTimeout ( 0 ) ;
455474 const bson = response . parse ( ) ;
456475
@@ -623,7 +642,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
623642 */
624643 private async writeCommand (
625644 command : WriteProtocolMessageType ,
626- options : { agreedCompressor ?: CompressorName ; zlibCompressionLevel ?: number }
645+ options : {
646+ agreedCompressor ?: CompressorName ;
647+ zlibCompressionLevel ?: number ;
648+ timeoutContext ?: TimeoutContext ;
649+ }
627650 ) : Promise < void > {
628651 const finalCommand =
629652 options . agreedCompressor === 'none' || ! OpCompressedRequest . canCompress ( command )
@@ -635,8 +658,32 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
635658
636659 const buffer = Buffer . concat ( await finalCommand . toBin ( ) ) ;
637660
661+ if ( options . timeoutContext ?. csotEnabled ( ) ) {
662+ if (
663+ options . timeoutContext . minRoundTripTime != null &&
664+ options . timeoutContext . remainingTimeMS < options . timeoutContext . minRoundTripTime
665+ ) {
666+ throw new MongoOperationTimeoutError (
667+ 'Server roundtrip time is greater than the time remaining'
668+ ) ;
669+ }
670+ }
671+
638672 if ( this . socket . write ( buffer ) ) return ;
639- return await once ( this . socket , 'drain' ) ;
673+
674+ const drainEvent = once < void > ( this . socket , 'drain' ) ;
675+ const timeout = options ?. timeoutContext ?. timeoutForSocketWrite ;
676+ if ( timeout ) {
677+ try {
678+ return await Promise . race ( [ drainEvent , timeout ] ) ;
679+ } catch ( error ) {
680+ if ( TimeoutError . is ( error ) ) {
681+ throw new MongoOperationTimeoutError ( 'Timed out at socket write' ) ;
682+ }
683+ throw error ;
684+ }
685+ }
686+ return await drainEvent ;
640687 }
641688
642689 /**
@@ -648,9 +695,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
648695 *
649696 * Note that `for-await` loops call `return` automatically when the loop is exited.
650697 */
651- private async * readMany ( ) : AsyncGenerator < OpMsgResponse | OpReply > {
698+ private async * readMany ( options : {
699+ timeoutContext ?: TimeoutContext ;
700+ } ) : AsyncGenerator < OpMsgResponse | OpReply > {
652701 try {
653- this . dataEvents = onData ( this . messageStream ) ;
702+ this . dataEvents = onData ( this . messageStream , options ) ;
703+
654704 for await ( const message of this . dataEvents ) {
655705 const response = await decompressResponse ( message ) ;
656706 yield response ;
0 commit comments