Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ const kConnection = Symbol('connection');
/** @internal */
const kCancellationToken = Symbol('cancellationToken');
/** @internal */
const kRTTPinger = Symbol('rttPinger');
/** @internal */
const kRoundTripTime = Symbol('roundTripTime');

const STATE_IDLE = 'idle';
Expand Down Expand Up @@ -81,7 +79,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
[kCancellationToken]: CancellationToken;
/** @internal */
[kMonitorId]?: MonitorInterval;
[kRTTPinger]?: RTTPinger;
rttPinger?: RTTPinger;

get connection(): Connection | undefined {
return this[kConnection];
Expand Down Expand Up @@ -198,8 +196,8 @@ function resetMonitorState(monitor: Monitor) {
monitor[kMonitorId]?.stop();
monitor[kMonitorId] = undefined;

monitor[kRTTPinger]?.close();
monitor[kRTTPinger] = undefined;
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

monitor[kCancellationToken].emit('cancel');

Expand Down Expand Up @@ -252,8 +250,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
}
: { socketTimeoutMS: connectTimeoutMS };

if (isAwaitable && monitor[kRTTPinger] == null) {
monitor[kRTTPinger] = new RTTPinger(
if (isAwaitable && monitor.rttPinger == null) {
monitor.rttPinger = new RTTPinger(
monitor[kCancellationToken],
Object.assign(
{ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS },
Expand All @@ -272,9 +270,10 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
}

const rttPinger = monitor[kRTTPinger];
const duration =
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);
isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
: calculateDurationInMs(start);

monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
Expand All @@ -290,8 +289,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
);
start = now();
} else {
monitor[kRTTPinger]?.close();
monitor[kRTTPinger] = undefined;
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

callback(undefined, hello);
}
Expand Down Expand Up @@ -384,7 +383,7 @@ export interface RTTPingerOptions extends ConnectionOptions {
/** @internal */
export class RTTPinger {
/** @internal */
[kConnection]?: Connection;
connection?: Connection;
/** @internal */
[kCancellationToken]: CancellationToken;
/** @internal */
Expand All @@ -394,7 +393,7 @@ export class RTTPinger {
closed: boolean;

constructor(cancellationToken: CancellationToken, options: RTTPingerOptions) {
this[kConnection] = undefined;
this.connection = undefined;
this[kCancellationToken] = cancellationToken;
this[kRoundTripTime] = 0;
this.closed = false;
Expand All @@ -411,8 +410,8 @@ export class RTTPinger {
this.closed = true;
clearTimeout(this[kMonitorId]);

this[kConnection]?.destroy({ force: true });
this[kConnection] = undefined;
this.connection?.destroy({ force: true });
this.connection = undefined;
}
}

Expand All @@ -431,8 +430,8 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
return;
}

if (rttPinger[kConnection] == null) {
rttPinger[kConnection] = conn;
if (rttPinger.connection == null) {
rttPinger.connection = conn;
}

rttPinger[kRoundTripTime] = calculateDurationInMs(start);
Expand All @@ -442,11 +441,11 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
);
}

const connection = rttPinger[kConnection];
const connection = rttPinger.connection;
if (connection == null) {
connect(options, (err, conn) => {
if (err) {
rttPinger[kConnection] = undefined;
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}
Expand All @@ -457,15 +456,16 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
return;
}

connection.command(ns('admin.$cmd'), { [LEGACY_HELLO_COMMAND]: 1 }, undefined, err => {
if (err) {
rttPinger[kConnection] = undefined;
const commandName =
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
connection.commandAsync(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => measureAndReschedule(),
() => {
rttPinger.connection?.destroy({ force: true });
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}

measureAndReschedule();
});
);
}

/**
Expand Down
24 changes: 10 additions & 14 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ const stateTransition = makeStateMachine({
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED]
});

/** @internal */
const kMonitor = Symbol('monitor');

/** @internal */
export type ServerOptions = Omit<ConnectionPoolOptions, 'id' | 'generation' | 'hostAddress'> &
MonitorOptions;
Expand Down Expand Up @@ -119,7 +116,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
serverApi?: ServerApi;
hello?: Document;
commandAsync: (ns: MongoDBNamespace, cmd: Document, options: CommandOptions) => Promise<Document>;
[kMonitor]: Monitor | null;
monitor: Monitor | null;

/** @event */
static readonly SERVER_HEARTBEAT_STARTED = SERVER_HEARTBEAT_STARTED;
Expand Down Expand Up @@ -175,22 +172,21 @@ export class Server extends TypedEventEmitter<ServerEvents> {
});

if (this.loadBalanced) {
this[kMonitor] = null;
this.monitor = null;
// monitoring is disabled in load balancing mode
return;
}

// create the monitor
// TODO(NODE-4144): Remove new variable for type narrowing
const monitor = new Monitor(this, this.s.options);
this[kMonitor] = monitor;
this.monitor = new Monitor(this, this.s.options);

for (const event of HEARTBEAT_EVENTS) {
monitor.on(event, (e: any) => this.emit(event, e));
this.monitor.on(event, (e: any) => this.emit(event, e));
}

monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
this.monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
this.monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
this.emit(
Server.DESCRIPTION_RECEIVED,
new ServerDescription(this.description.hostAddress, event.reply, {
Expand Down Expand Up @@ -246,7 +242,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
// a load balancer. It never transitions out of this state and
// has no monitor.
if (!this.loadBalanced) {
this[kMonitor]?.connect();
this.monitor?.connect();
} else {
stateTransition(this, STATE_CONNECTED);
this.emit(Server.CONNECT, this);
Expand All @@ -272,7 +268,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
stateTransition(this, STATE_CLOSING);

if (!this.loadBalanced) {
this[kMonitor]?.close();
this.monitor?.close();
}

this.pool.close(options, err => {
Expand All @@ -290,7 +286,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
*/
requestCheck(): void {
if (!this.loadBalanced) {
this[kMonitor]?.requestCheck();
this.monitor?.requestCheck();
}
}

Expand Down Expand Up @@ -465,7 +461,7 @@ function markServerUnknown(server: Server, error?: MongoServerError) {
}

if (error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError)) {
server[kMonitor]?.reset();
server.monitor?.reset();
}

server.emit(
Expand Down
Loading