@@ -3,6 +3,7 @@ import { clearTimeout, setTimeout } from 'timers';
33import { type Document , Long } from '../bson' ;
44import { connect } from '../cmap/connect' ;
55import { Connection , type ConnectionOptions } from '../cmap/connection' ;
6+ import { getFAASEnv } from '../cmap/handshake/client_metadata' ;
67import { LEGACY_HELLO_COMMAND } from '../constants' ;
78import { MongoError , MongoErrorLabel , MongoNetworkTimeoutError } from '../error' ;
89import { CancellationToken , TypedEventEmitter } from '../mongo_types' ;
@@ -44,6 +45,11 @@ function isInCloseState(monitor: Monitor) {
4445 return monitor . s . state === STATE_CLOSED || monitor . s . state === STATE_CLOSING ;
4546}
4647
48+ /** @public */
49+ export const ServerMonitoringModes = [ 'auto' , 'poll' , 'stream' ] ;
50+ /** @public */
51+ export type ServerMonitoringMode = ( typeof ServerMonitoringModes ) [ number ] ;
52+
4753/** @internal */
4854export interface MonitorPrivate {
4955 state : string ;
@@ -55,6 +61,7 @@ export interface MonitorOptions
5561 connectTimeoutMS : number ;
5662 heartbeatFrequencyMS : number ;
5763 minHeartbeatFrequencyMS : number ;
64+ serverMonitoringMode : ServerMonitoringMode ;
5865}
5966
6067/** @public */
@@ -73,9 +80,16 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
7380 s : MonitorPrivate ;
7481 address : string ;
7582 options : Readonly <
76- Pick < MonitorOptions , 'connectTimeoutMS' | 'heartbeatFrequencyMS' | 'minHeartbeatFrequencyMS' >
83+ Pick <
84+ MonitorOptions ,
85+ | 'connectTimeoutMS'
86+ | 'heartbeatFrequencyMS'
87+ | 'minHeartbeatFrequencyMS'
88+ | 'serverMonitoringMode'
89+ >
7790 > ;
7891 connectOptions : ConnectionOptions ;
92+ isRunningInFaasEnv : boolean ;
7993 [ kServer ] : Server ;
8094 [ kConnection ] ?: Connection ;
8195 [ kCancellationToken ] : CancellationToken ;
@@ -103,8 +117,11 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
103117 this . options = Object . freeze ( {
104118 connectTimeoutMS : options . connectTimeoutMS ?? 10000 ,
105119 heartbeatFrequencyMS : options . heartbeatFrequencyMS ?? 10000 ,
106- minHeartbeatFrequencyMS : options . minHeartbeatFrequencyMS ?? 500
120+ minHeartbeatFrequencyMS : options . minHeartbeatFrequencyMS ?? 500 ,
121+ serverMonitoringMode : options . serverMonitoringMode
107122 } ) ;
123+ console . log ( getFAASEnv ( ) ) ;
124+ this . isRunningInFaasEnv = getFAASEnv ( ) != null ;
108125
109126 const cancellationToken = this [ kCancellationToken ] ;
110127 // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
@@ -207,10 +224,26 @@ function resetMonitorState(monitor: Monitor) {
207224 monitor [ kConnection ] = undefined ;
208225}
209226
227+ function useStreamingProtocol ( monitor : Monitor , topologyVersion : TopologyVersion | null ) : boolean {
228+ // If we have no topology version we always poll no matter
229+ // what the user provided.
230+ if ( topologyVersion == null ) return false ;
231+
232+ const serverMonitoringMode = monitor . options . serverMonitoringMode ;
233+ if ( serverMonitoringMode === 'poll' ) return false ;
234+ if ( serverMonitoringMode === 'stream' ) return true ;
235+
236+ // If we are in auto mode, we need to figure out if we're in a FaaS
237+ // environment or not and choose the appropriate mode.
238+ if ( monitor . isRunningInFaasEnv ) return false ;
239+ return true ;
240+ }
241+
210242function checkServer ( monitor : Monitor , callback : Callback < Document | null > ) {
211243 let start = now ( ) ;
212244 const topologyVersion = monitor [ kServer ] . description . topologyVersion ;
213- const isAwaitable = topologyVersion != null ;
245+ console . log ( 'checkServer' , topologyVersion ) ;
246+ const isAwaitable = useStreamingProtocol ( monitor , topologyVersion ) ;
214247 monitor . emit (
215248 Server . SERVER_HEARTBEAT_STARTED ,
216249 new ServerHeartbeatStartedEvent ( monitor . address , isAwaitable )
@@ -286,7 +319,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
286319 const duration =
287320 isAwaitable && rttPinger ? rttPinger . roundTripTime : calculateDurationInMs ( start ) ;
288321
289- const awaited = isAwaitable && hello . topologyVersion != null ;
322+ console . log ( 'command' , hello . topologyVersion ) ;
323+ const awaited = useStreamingProtocol ( monitor , hello . topologyVersion ) ;
290324 monitor . emit (
291325 Server . SERVER_HEARTBEAT_SUCCEEDED ,
292326 new ServerHeartbeatSucceededEvent ( monitor . address , duration , hello , awaited )
@@ -370,7 +404,8 @@ function monitorServer(monitor: Monitor) {
370404 }
371405
372406 // if the check indicates streaming is supported, immediately reschedule monitoring
373- if ( hello && hello . topologyVersion ) {
407+ console . log ( 'checkServerCallback' , hello ?. topologyVersion ) ;
408+ if ( useStreamingProtocol ( monitor , hello ?. topologyVersion ) ) {
374409 setTimeout ( ( ) => {
375410 if ( ! isInCloseState ( monitor ) ) {
376411 monitor [ kMonitorId ] ?. wake ( ) ;
0 commit comments