From e0213df662d6fa4d7a067ac55450fa0d13d9e903 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 24 Apr 2025 10:52:39 -0400 Subject: [PATCH 01/11] feat(NODE-6882): close checked out connection in MongoClient.close() --- src/cmap/connection_pool.ts | 11 ++++ src/error.ts | 56 +++++++++++++++++++ src/index.ts | 2 + src/mongo_client.ts | 6 +- src/sdam/server.ts | 6 +- src/sdam/topology.ts | 14 +++-- .../change-streams/change_stream.test.ts | 11 +++- test/integration/crud/misc_cursors.test.js | 8 ++- test/integration/index_management.test.ts | 17 +++--- .../examples/causal_consistency.test.js | 4 +- .../node-specific/mongo_client.test.ts | 4 +- test/unit/index.test.ts | 2 + 12 files changed, 117 insertions(+), 24 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 71f509481b1..8ff775c34dc 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -17,6 +17,7 @@ import { } from '../constants'; import { type AnyError, + ConnectionPoolClosedError, type MongoError, MongoInvalidArgumentError, MongoMissingCredentialsError, @@ -489,6 +490,16 @@ export class ConnectionPool extends TypedEventEmitter { } } + closeCheckedOutConnections() { + for (const conn of this.checkedOut) { + this.emitAndLog( + ConnectionPool.CONNECTION_CLOSED, + new ConnectionClosedEvent(this, conn, 'poolClosed') + ); + conn.onError(new ConnectionPoolClosedError()); + } + } + /** Close the pool */ close(): void { if (this.closed) { diff --git a/src/error.ts b/src/error.ts index 31ae5c9c4d8..73e9f95a76e 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1018,6 +1018,62 @@ export class MongoTopologyClosedError extends MongoAPIError { } } +/** + * An error generated when the MongoClient is closed and async + * operations are interrupted. + * + * @public + * @category Error + */ +export class MongoClientClosedError extends MongoAPIError { + /** + * **Do not use this constructor!** + * + * Meant for internal use only. + * + * @remarks + * This class is only meant to be constructed within the driver. This constructor is + * not subject to semantic versioning compatibility guarantees and may change at any time. + * + * @public + **/ + constructor(message = 'MongoClient is closed') { + super(message); + } + + override get name(): string { + return 'MongoClientClosedError'; + } +} + +/** + * An error generated when a ConnectionPool is closed and async + * operations are interrupted. + * + * @public + * @category Error + */ +export class ConnectionPoolClosedError extends MongoAPIError { + /** + * **Do not use this constructor!** + * + * Meant for internal use only. + * + * @remarks + * This class is only meant to be constructed within the driver. This constructor is + * not subject to semantic versioning compatibility guarantees and may change at any time. + * + * @public + **/ + constructor(message = 'ConnectionPool is closed') { + super(message); + } + + override get name(): string { + return 'ConnectionPoolClosedError'; + } +} + /** @public */ export interface MongoNetworkErrorOptions { /** Indicates the timeout happened before a connection handshake completed */ diff --git a/src/index.ts b/src/index.ts index 476b5affc3b..9d4365ee385 100644 --- a/src/index.ts +++ b/src/index.ts @@ -45,6 +45,7 @@ export { export { ClientEncryption } from './client-side-encryption/client_encryption'; export { ChangeStreamCursor } from './cursor/change_stream_cursor'; export { + ConnectionPoolClosedError, MongoAPIError, MongoAWSError, MongoAzureError, @@ -53,6 +54,7 @@ export { MongoClientBulkWriteCursorError, MongoClientBulkWriteError, MongoClientBulkWriteExecutionError, + MongoClientClosedError, MongoCompatibilityError, MongoCursorExhaustedError, MongoCursorInUseError, diff --git a/src/mongo_client.ts b/src/mongo_client.ts index e77afc4026a..f4bda2b5db6 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -641,7 +641,9 @@ export class MongoClient extends TypedEventEmitter implements } /** - * Cleans up client-side resources used by the MongoCLient and . This includes: + * Cleans up client-side resources used by the MongoClient. + * + * This includes: * * - Closes all open, unused connections (see note). * - Ends all in-use sessions with {@link ClientSession#endSession|ClientSession.endSession()}. @@ -677,6 +679,8 @@ export class MongoClient extends TypedEventEmitter implements writable: false }); + this.topology?.closeCheckedOutConnections(); + const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close()); this.s.activeCursors.clear(); diff --git a/src/sdam/server.ts b/src/sdam/server.ts index c6798316974..4d7052e3270 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -246,8 +246,12 @@ export class Server extends TypedEventEmitter { } } + closeCheckedOutConnections() { + return this.pool.closeCheckedOutConnections(); + } + /** Destroy the server connection */ - destroy(): void { + close(): void { if (this.s.state === STATE_CLOSED) { return; } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index a67f17dd9bb..4da824d059a 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -490,6 +490,12 @@ export class Topology extends TypedEventEmitter { } } + closeCheckedOutConnections() { + for (const server of this.s.servers.values()) { + return server.closeCheckedOutConnections(); + } + } + /** Close this topology */ close(): void { if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) { @@ -497,7 +503,7 @@ export class Topology extends TypedEventEmitter { } for (const server of this.s.servers.values()) { - destroyServer(server, this); + closeServer(server, this); } this.s.servers.clear(); @@ -791,12 +797,12 @@ export class Topology extends TypedEventEmitter { } /** Destroys a server, and removes all event listeners from the instance */ -function destroyServer(server: Server, topology: Topology) { +function closeServer(server: Server, topology: Topology) { for (const event of LOCAL_SERVER_EVENTS) { server.removeAllListeners(event); } - server.destroy(); + server.close(); topology.emitAndLog( Topology.SERVER_CLOSED, new ServerClosedEvent(topology.s.id, server.description.address) @@ -903,7 +909,7 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes // prepare server for garbage collection if (server) { - destroyServer(server, topology); + closeServer(server, topology); } } } diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index f8aabb83215..8cc7b3c3b10 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -63,6 +63,7 @@ describe('Change Streams', function () { await csDb.createCollection('test').catch(() => null); collection = csDb.collection('test'); changeStream = collection.watch(); + changeStream.on('error', () => null); }); afterEach(async () => { @@ -702,15 +703,19 @@ describe('Change Streams', function () { const outStream = new PassThrough({ objectMode: true }); - // @ts-expect-error: transform requires a Document return type - changeStream.stream({ transform: JSON.stringify }).pipe(outStream); + const transform = doc => ({ doc: JSON.stringify(doc) }); + changeStream + .stream({ transform }) + .on('error', () => null) + .pipe(outStream) + .on('error', () => null); const willBeData = once(outStream, 'data'); await collection.insertMany([{ a: 1 }]); const [data] = await willBeData; - const parsedEvent = JSON.parse(data); + const parsedEvent = JSON.parse(data.doc); expect(parsedEvent).to.have.nested.property('fullDocument.a', 1); outStream.destroy(); diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 2eca8a008a5..4ce1861d71d 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -11,7 +11,7 @@ const { Writable } = require('stream'); const { once, on } = require('events'); const { setTimeout } = require('timers'); const { ReadPreference } = require('../../mongodb'); -const { ServerType } = require('../../mongodb'); +const { ServerType, ConnectionPoolClosedError } = require('../../mongodb'); const { formatSort } = require('../../mongodb'); describe('Cursor', function () { @@ -1872,7 +1872,11 @@ describe('Cursor', function () { expect(cursor).to.have.property('closed', true); const error = await rejectedEarlyBecauseClientClosed; - expect(error).to.be.null; // TODO(NODE-6632): This should throw again after the client signal aborts the in-progress next call + if (this.configuration.topologyType === 'LoadBalanced') { + expect(error).to.be.instanceOf(ConnectionPoolClosedError); + } else { + expect(error).to.be.null; + } }); it('shouldAwaitData', { diff --git a/test/integration/index_management.test.ts b/test/integration/index_management.test.ts index b79d8c18047..ee1855fc024 100644 --- a/test/integration/index_management.test.ts +++ b/test/integration/index_management.test.ts @@ -770,20 +770,19 @@ describe('Indexes', function () { expect(events).to.be.an('array').with.lengthOf(1); expect(events[0]).nested.property('command.commitQuorum').to.equal(0); - await collection.drop(err => { - expect(err).to.not.exist; - }); + await collection.drop(); } }; } it( 'should run command with commitQuorum if specified on db.createIndex', - commitQuorumTest((db, collection) => - db.createIndex(collection.collectionName, 'a', { - // @ts-expect-error revaluate this? - writeConcern: { w: 'majority' }, - commitQuorum: 0 - }) + commitQuorumTest( + async (db, collection) => + await db.createIndex(collection.collectionName, 'a', { + // @ts-expect-error revaluate this? + writeConcern: { w: 'majority' }, + commitQuorum: 0 + }) ) ); it( diff --git a/test/integration/node-specific/examples/causal_consistency.test.js b/test/integration/node-specific/examples/causal_consistency.test.js index 41e97323d87..7a4db5eda01 100644 --- a/test/integration/node-specific/examples/causal_consistency.test.js +++ b/test/integration/node-specific/examples/causal_consistency.test.js @@ -31,8 +31,8 @@ describe('examples(causal-consistency):', function () { it('supports causal consistency', async function () { const session = client.startSession({ causalConsistency: true }); - collection.insertOne({ darmok: 'jalad' }, { session }); - collection.updateOne({ darmok: 'jalad' }, { $set: { darmok: 'tanagra' } }, { session }); + await collection.insertOne({ darmok: 'jalad' }, { session }); + await collection.updateOne({ darmok: 'jalad' }, { $set: { darmok: 'tanagra' } }, { session }); const results = await collection.find({}, { session }).toArray(); diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index 9a0cea014bd..8dccb079014 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -1035,7 +1035,7 @@ describe('class MongoClient', function () { client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev)); }); - it('are all closed', async () => { + it('are all closed', async function () { const cursors = Array.from({ length: 30 }, (_, skip) => collection.find({}, { skip, batchSize: 1 }) ); @@ -1043,7 +1043,7 @@ describe('class MongoClient', function () { expect(client.s.activeCursors).to.have.lengthOf(30); await client.close(); expect(client.s.activeCursors).to.have.lengthOf(0); - expect(kills).to.have.lengthOf(30); + expect(kills).to.have.lengthOf(this.configuration.topologyType === 'LoadBalanced' ? 0 : 30); }); it('creating cursors after close adds to activeCursors', async () => { diff --git a/test/unit/index.test.ts b/test/unit/index.test.ts index b24639f2c80..463b1719eef 100644 --- a/test/unit/index.test.ts +++ b/test/unit/index.test.ts @@ -36,6 +36,7 @@ const EXPECTED_EXPORTS = [ 'ConnectionClosedEvent', 'ConnectionCreatedEvent', 'ConnectionPoolClearedEvent', + 'ConnectionPoolClosedError', 'ConnectionPoolClosedEvent', 'ConnectionPoolCreatedEvent', 'ConnectionPoolMonitoringEvent', @@ -71,6 +72,7 @@ const EXPECTED_EXPORTS = [ 'MongoClientBulkWriteCursorError', 'MongoClientBulkWriteError', 'MongoClientBulkWriteExecutionError', + 'MongoClientClosedError', 'MongoCompatibilityError', 'MongoCryptAzureKMSRequestError', 'MongoCryptCreateDataKeyError', From 7f711c42e4292fe4ab3154f50f2dc116a05df1e5 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 24 Apr 2025 11:40:02 -0400 Subject: [PATCH 02/11] docs and force flag --- src/client-side-encryption/auto_encrypter.ts | 4 ++-- src/encrypter.ts | 13 +++---------- src/mongo_client.ts | 18 +++++++++--------- 3 files changed, 14 insertions(+), 21 deletions(-) diff --git a/src/client-side-encryption/auto_encrypter.ts b/src/client-side-encryption/auto_encrypter.ts index 18e2b62cc4a..6f64495c1de 100644 --- a/src/client-side-encryption/auto_encrypter.ts +++ b/src/client-side-encryption/auto_encrypter.ts @@ -375,8 +375,8 @@ export class AutoEncrypter { /** * Cleans up the `_mongocryptdClient`, if present. */ - async teardown(force: boolean): Promise { - await this._mongocryptdClient?.close(force); + async teardown(): Promise { + await this._mongocryptdClient?.close(); } /** diff --git a/src/encrypter.ts b/src/encrypter.ts index 3c7bf2aaed6..f4c9659d220 100644 --- a/src/encrypter.ts +++ b/src/encrypter.ts @@ -1,11 +1,8 @@ -import { callbackify } from 'util'; - import { AutoEncrypter, type AutoEncryptionOptions } from './client-side-encryption/auto_encrypter'; import { MONGO_CLIENT_EVENTS } from './constants'; import { getMongoDBClientEncryption } from './deps'; import { MongoInvalidArgumentError, MongoMissingDependencyError } from './error'; import { MongoClient, type MongoClientOptions } from './mongo_client'; -import { type Callback } from './utils'; /** @internal */ export interface EncrypterOptions { @@ -98,20 +95,16 @@ export class Encrypter { } } - closeCallback(client: MongoClient, force: boolean, callback: Callback) { - callbackify(this.close.bind(this))(client, force, callback); - } - - async close(client: MongoClient, force: boolean): Promise { + async close(client: MongoClient): Promise { let error; try { - await this.autoEncrypter.teardown(force); + await this.autoEncrypter.teardown(); } catch (autoEncrypterError) { error = autoEncrypterError; } const internalClient = this.internalClient; if (internalClient != null && client !== internalClient) { - return await internalClient.close(force); + return await internalClient.close(); } if (error != null) { throw error; diff --git a/src/mongo_client.ts b/src/mongo_client.ts index f4bda2b5db6..fe3097c502f 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -645,23 +645,23 @@ export class MongoClient extends TypedEventEmitter implements * * This includes: * - * - Closes all open, unused connections (see note). + * - Closes in-use connections. + * - Closes all active cursors. * - Ends all in-use sessions with {@link ClientSession#endSession|ClientSession.endSession()}. + * - aborts in progress transactions if is one related to the session. * - Ends all unused sessions server-side. + * - Closes all remaining idle connections. * - Cleans up any resources being used for auto encryption if auto encryption is enabled. * - * @remarks Any in-progress operations are not killed and any connections used by in progress operations - * will be cleaned up lazily as operations finish. - * - * @param force - Force close, emitting no events + * @param _force - currently an unused flag that has no effect. Defaults to `false`. */ - async close(force = false): Promise { + async close(_force = false): Promise { if (this.closeLock) { return await this.closeLock; } try { - this.closeLock = this._close(force); + this.closeLock = this._close(); await this.closeLock; } finally { // release @@ -670,7 +670,7 @@ export class MongoClient extends TypedEventEmitter implements } /* @internal */ - private async _close(force = false): Promise { + private async _close(): Promise { // There's no way to set hasBeenClosed back to false Object.defineProperty(this.s, 'hasBeenClosed', { value: true, @@ -726,7 +726,7 @@ export class MongoClient extends TypedEventEmitter implements const { encrypter } = this.options; if (encrypter) { - await encrypter.close(this, force); + await encrypter.close(this); } } From 5ea0fc7e406ccdd3df7338c3ae6b0f527a6978e9 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 24 Apr 2025 11:41:27 -0400 Subject: [PATCH 03/11] chore: close naming --- src/client-side-encryption/auto_encrypter.ts | 2 +- src/encrypter.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client-side-encryption/auto_encrypter.ts b/src/client-side-encryption/auto_encrypter.ts index 6f64495c1de..caca83e367f 100644 --- a/src/client-side-encryption/auto_encrypter.ts +++ b/src/client-side-encryption/auto_encrypter.ts @@ -375,7 +375,7 @@ export class AutoEncrypter { /** * Cleans up the `_mongocryptdClient`, if present. */ - async teardown(): Promise { + async close(): Promise { await this._mongocryptdClient?.close(); } diff --git a/src/encrypter.ts b/src/encrypter.ts index f4c9659d220..5a627ea67e9 100644 --- a/src/encrypter.ts +++ b/src/encrypter.ts @@ -98,7 +98,7 @@ export class Encrypter { async close(client: MongoClient): Promise { let error; try { - await this.autoEncrypter.teardown(); + await this.autoEncrypter.close(); } catch (autoEncrypterError) { error = autoEncrypterError; } From a733ed790cfc37e74f64084f915549d85375917e Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 25 Apr 2025 12:47:56 -0400 Subject: [PATCH 04/11] chore: remove ConnectionPoolClosedError --- src/cmap/connection_pool.ts | 4 ++-- src/error.ts | 28 ---------------------- src/index.ts | 1 - test/integration/crud/misc_cursors.test.js | 4 ++-- test/unit/index.test.ts | 1 - 5 files changed, 4 insertions(+), 34 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 8ff775c34dc..a214805bfbc 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -17,7 +17,7 @@ import { } from '../constants'; import { type AnyError, - ConnectionPoolClosedError, + MongoClientClosedError, type MongoError, MongoInvalidArgumentError, MongoMissingCredentialsError, @@ -496,7 +496,7 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CLOSED, new ConnectionClosedEvent(this, conn, 'poolClosed') ); - conn.onError(new ConnectionPoolClosedError()); + conn.onError(new MongoClientClosedError()); } } diff --git a/src/error.ts b/src/error.ts index 73e9f95a76e..f74c4d41a28 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1046,34 +1046,6 @@ export class MongoClientClosedError extends MongoAPIError { } } -/** - * An error generated when a ConnectionPool is closed and async - * operations are interrupted. - * - * @public - * @category Error - */ -export class ConnectionPoolClosedError extends MongoAPIError { - /** - * **Do not use this constructor!** - * - * Meant for internal use only. - * - * @remarks - * This class is only meant to be constructed within the driver. This constructor is - * not subject to semantic versioning compatibility guarantees and may change at any time. - * - * @public - **/ - constructor(message = 'ConnectionPool is closed') { - super(message); - } - - override get name(): string { - return 'ConnectionPoolClosedError'; - } -} - /** @public */ export interface MongoNetworkErrorOptions { /** Indicates the timeout happened before a connection handshake completed */ diff --git a/src/index.ts b/src/index.ts index 9d4365ee385..b8865042958 100644 --- a/src/index.ts +++ b/src/index.ts @@ -45,7 +45,6 @@ export { export { ClientEncryption } from './client-side-encryption/client_encryption'; export { ChangeStreamCursor } from './cursor/change_stream_cursor'; export { - ConnectionPoolClosedError, MongoAPIError, MongoAWSError, MongoAzureError, diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 4ce1861d71d..18462c47471 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -11,7 +11,7 @@ const { Writable } = require('stream'); const { once, on } = require('events'); const { setTimeout } = require('timers'); const { ReadPreference } = require('../../mongodb'); -const { ServerType, ConnectionPoolClosedError } = require('../../mongodb'); +const { ServerType, MongoClientClosedError } = require('../../mongodb'); const { formatSort } = require('../../mongodb'); describe('Cursor', function () { @@ -1873,7 +1873,7 @@ describe('Cursor', function () { const error = await rejectedEarlyBecauseClientClosed; if (this.configuration.topologyType === 'LoadBalanced') { - expect(error).to.be.instanceOf(ConnectionPoolClosedError); + expect(error).to.be.instanceOf(MongoClientClosedError); } else { expect(error).to.be.null; } diff --git a/test/unit/index.test.ts b/test/unit/index.test.ts index 463b1719eef..653792492ce 100644 --- a/test/unit/index.test.ts +++ b/test/unit/index.test.ts @@ -36,7 +36,6 @@ const EXPECTED_EXPORTS = [ 'ConnectionClosedEvent', 'ConnectionCreatedEvent', 'ConnectionPoolClearedEvent', - 'ConnectionPoolClosedError', 'ConnectionPoolClosedEvent', 'ConnectionPoolCreatedEvent', 'ConnectionPoolMonitoringEvent', From b31b403ce4d798bc53fae6d1aaf5e903ea073d0c Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Mon, 28 Apr 2025 12:16:22 -0400 Subject: [PATCH 05/11] chore: add test to demonstrate check in --- .../connection_pool.test.ts | 74 ++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts b/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts index 437961b1201..0f29220abc4 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts @@ -2,7 +2,14 @@ import { once } from 'node:events'; import { expect } from 'chai'; -import { type ConnectionPoolCreatedEvent, type Db, type MongoClient } from '../../mongodb'; +import { + type ConnectionCheckedInEvent, + type ConnectionCheckedOutEvent, + type ConnectionPoolCreatedEvent, + type Db, + type MongoClient +} from '../../mongodb'; +import { clearFailPoint, configureFailPoint, sleep } from '../../tools/utils'; describe('Connection Pool', function () { let client: MongoClient; @@ -64,5 +71,70 @@ describe('Connection Pool', function () { }); }); }); + + describe( + 'ConnectionCheckedInEvent', + { requires: { mongodb: '>=4.4', topology: 'single' } }, + function () { + let client: MongoClient; + + beforeEach(async function () { + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['insert'], + blockConnection: true, + blockTimeMS: 500 + } + }); + + client = this.configuration.newClient(); + await client.connect(); + await Promise.all(Array.from({ length: 100 }, () => client.db().command({ ping: 1 }))); + }); + + afterEach(async function () { + await clearFailPoint(this.configuration); + await client.close(); + }); + + describe('when a MongoClient is closed', function () { + it( + 'a connection pool emits checked in events for closed connections', + { requires: { mongodb: '>=4.4', topology: 'single' } }, + async () => { + const connectionCheckedOutEvents: ConnectionCheckedOutEvent[] = []; + client.on('connectionCheckedOut', event => connectionCheckedOutEvents.push(event)); + const connectionCheckedInEvents: ConnectionCheckedInEvent[] = []; + client.on('connectionCheckedIn', event => connectionCheckedInEvents.push(event)); + + const inserts = Promise.allSettled([ + client.db('test').collection('test').insertOne({ a: 1 }), + client.db('test').collection('test').insertOne({ a: 1 }), + client.db('test').collection('test').insertOne({ a: 1 }) + ]); + + // wait until all pings are pending on the server + while (connectionCheckedOutEvents.length < 3) await sleep(1); + + const insertConnectionIds = connectionCheckedOutEvents.map( + ({ address, connectionId }) => `${address} + ${connectionId}` + ); + + await client.close(); + + const insertCheckIns = connectionCheckedInEvents.filter(({ address, connectionId }) => + insertConnectionIds.includes(`${address} + ${connectionId}`) + ); + + expect(insertCheckIns).to.have.lengthOf(3); + + await inserts; + } + ); + }); + } + ); }); }); From f376ce7690fada99416ce8023f527de0e8d06b78 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 30 Apr 2025 14:13:48 -0400 Subject: [PATCH 06/11] test: check-in before close --- src/cmap/connection_pool.ts | 4 -- .../connection_pool.test.ts | 44 +++++++++++-------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index a214805bfbc..e5cfdcd6aeb 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -492,10 +492,6 @@ export class ConnectionPool extends TypedEventEmitter { closeCheckedOutConnections() { for (const conn of this.checkedOut) { - this.emitAndLog( - ConnectionPool.CONNECTION_CLOSED, - new ConnectionClosedEvent(this, conn, 'poolClosed') - ); conn.onError(new MongoClientClosedError()); } } diff --git a/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts b/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts index 0f29220abc4..99d792ccf4d 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts @@ -2,13 +2,7 @@ import { once } from 'node:events'; import { expect } from 'chai'; -import { - type ConnectionCheckedInEvent, - type ConnectionCheckedOutEvent, - type ConnectionPoolCreatedEvent, - type Db, - type MongoClient -} from '../../mongodb'; +import { type ConnectionPoolCreatedEvent, type Db, type MongoClient } from '../../mongodb'; import { clearFailPoint, configureFailPoint, sleep } from '../../tools/utils'; describe('Connection Pool', function () { @@ -104,10 +98,13 @@ describe('Connection Pool', function () { 'a connection pool emits checked in events for closed connections', { requires: { mongodb: '>=4.4', topology: 'single' } }, async () => { - const connectionCheckedOutEvents: ConnectionCheckedOutEvent[] = []; - client.on('connectionCheckedOut', event => connectionCheckedOutEvents.push(event)); - const connectionCheckedInEvents: ConnectionCheckedInEvent[] = []; - client.on('connectionCheckedIn', event => connectionCheckedInEvents.push(event)); + const allClientEvents = []; + const pushToClientEvents = e => allClientEvents.push(e); + + client + .on('connectionCheckedOut', pushToClientEvents) + .on('connectionCheckedIn', pushToClientEvents) + .on('connectionClosed', pushToClientEvents); const inserts = Promise.allSettled([ client.db('test').collection('test').insertOne({ a: 1 }), @@ -116,19 +113,28 @@ describe('Connection Pool', function () { ]); // wait until all pings are pending on the server - while (connectionCheckedOutEvents.length < 3) await sleep(1); + while (allClientEvents.filter(e => e.name === 'connectionCheckedOut').length < 3) { + await sleep(1); + } - const insertConnectionIds = connectionCheckedOutEvents.map( - ({ address, connectionId }) => `${address} + ${connectionId}` - ); + const insertConnectionIds = allClientEvents + .filter(e => e.name === 'connectionCheckedOut') + .map(({ address, connectionId }) => `${address} + ${connectionId}`); await client.close(); - const insertCheckIns = connectionCheckedInEvents.filter(({ address, connectionId }) => - insertConnectionIds.includes(`${address} + ${connectionId}`) - ); + const insertCheckInAndCloses = allClientEvents + .filter(e => e.name === 'connectionCheckedIn' || e.name === 'connectionClosed') + .filter(({ address, connectionId }) => + insertConnectionIds.includes(`${address} + ${connectionId}`) + ); - expect(insertCheckIns).to.have.lengthOf(3); + expect(insertCheckInAndCloses).to.have.lengthOf(6); + + // check that each check-in is followed by a close (not proceeded by one) + expect(insertCheckInAndCloses.map(e => e.name)).to.deep.equal( + Array.from({ length: 3 }, () => ['connectionCheckedIn', 'connectionClosed']).flat(1) + ); await inserts; } From 6674aa8939b9e35a267e269885878679b23240c3 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 30 Apr 2025 15:04:33 -0400 Subject: [PATCH 07/11] chore: update error message --- src/error.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/error.ts b/src/error.ts index f74c4d41a28..80b7f760f65 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1037,7 +1037,7 @@ export class MongoClientClosedError extends MongoAPIError { * * @public **/ - constructor(message = 'MongoClient is closed') { + constructor(message = 'Operation interrupted because client was closed') { super(message); } From f04fbd4e37df864e02664e9c1c1af6717272a507 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 30 Apr 2025 15:18:19 -0400 Subject: [PATCH 08/11] chore: error and checkin --- src/cmap/connection_pool.ts | 4 ++-- src/error.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index e5cfdcd6aeb..00321ec2342 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -485,12 +485,12 @@ export class ConnectionPool extends TypedEventEmitter { for (const connection of this.checkedOut) { if (connection.generation <= minGeneration) { connection.onError(new PoolClearedOnNetworkError(this)); - this.checkIn(connection); } } } - closeCheckedOutConnections() { + /** For MongoClient.close() procedures */ + public closeCheckedOutConnections() { for (const conn of this.checkedOut) { conn.onError(new MongoClientClosedError()); } diff --git a/src/error.ts b/src/error.ts index 80b7f760f65..08e4b86d949 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1037,8 +1037,8 @@ export class MongoClientClosedError extends MongoAPIError { * * @public **/ - constructor(message = 'Operation interrupted because client was closed') { - super(message); + constructor() { + super('Operation interrupted because client was closed'); } override get name(): string { From ffed234c40e7b4324ef79a5a459da592542d8deb Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 30 Apr 2025 16:05:07 -0400 Subject: [PATCH 09/11] test: deadlock if in-use connections not closed --- .../node-specific/mongo_client.test.ts | 52 ++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index 8dccb079014..522d1851ccb 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -12,13 +12,14 @@ import { Db, getTopology, MongoClient, + MongoClientClosedError, MongoNotConnectedError, MongoServerSelectionError, ReadPreference, ServerDescription, Topology } from '../../mongodb'; -import { runLater } from '../../tools/utils'; +import { clearFailPoint, configureFailPoint, runLater } from '../../tools/utils'; import { setupDatabase } from '../shared'; describe('class MongoClient', function () { @@ -1064,6 +1065,55 @@ describe('class MongoClient', function () { expect(client.s.activeCursors).to.have.lengthOf(1); }); }); + + describe( + 'maxPoolSize is not fully used when running clean up operations', + { requires: { mongodb: '>=4.4', topology: 'single' } }, + function () { + let client; + + beforeEach(async function () { + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['insert'], + blockConnection: true, + blockTimeMS: 500 + } + }); + + client = this.configuration.newClient({}, { maxPoolSize: 1, monitorCommands: true }); + }); + + afterEach(async function () { + await clearFailPoint(this.configuration); + await client.close(); + }); + + it( + 'closes in-use connections before running clean up operations avoiding a deadlock', + { requires: { mongodb: '>=4.4', topology: 'single' } }, + async () => { + const inserted = client + .db('t') + .collection('t') + .insertOne({ a: 1 }) + .catch(error => error); + + await once(client, 'commandStarted'); + + const start = performance.now(); + await client.close(); + const error = await inserted; + const end = performance.now(); + + expect(end - start).to.be.lessThan(100); + expect(error).to.be.instanceOf(MongoClientClosedError); + } + ); + } + ); }); context('when connecting', function () { From 5082c668d909fec91c385ff0fdd2bbb6f0d4bd12 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 30 Apr 2025 16:27:47 -0400 Subject: [PATCH 10/11] metadata filter not working? --- .../connection_pool.test.ts | 145 +++++++++--------- .../node-specific/mongo_client.test.ts | 16 +- 2 files changed, 88 insertions(+), 73 deletions(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts b/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts index 99d792ccf4d..a765f42afe2 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts @@ -66,81 +66,88 @@ describe('Connection Pool', function () { }); }); - describe( - 'ConnectionCheckedInEvent', - { requires: { mongodb: '>=4.4', topology: 'single' } }, - function () { - let client: MongoClient; + const metadata: MongoDBMetadataUI = { requires: { mongodb: '>=4.4', topology: 'single' } }; + + describe('ConnectionCheckedInEvent', metadata, function () { + let client: MongoClient; + + beforeEach(async function () { + if (!this.configuration.filters.MongoDBVersionFilter.filter({ metadata })) { + return; + } + if (!this.configuration.filters.MongoDBTopologyFilter.filter({ metadata })) { + return; + } + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['insert'], + blockConnection: true, + blockTimeMS: 500 + } + }); - beforeEach(async function () { - await configureFailPoint(this.configuration, { - configureFailPoint: 'failCommand', - mode: 'alwaysOn', - data: { - failCommands: ['insert'], - blockConnection: true, - blockTimeMS: 500 + client = this.configuration.newClient(); + await client.connect(); + await Promise.all(Array.from({ length: 100 }, () => client.db().command({ ping: 1 }))); + }); + + afterEach(async function () { + if (this.configuration.filters.MongoDBVersionFilter.filter({ metadata })) { + await clearFailPoint(this.configuration); + } + await client.close(); + }); + + describe('when a MongoClient is closed', function () { + it( + 'a connection pool emits checked in events for closed connections', + metadata, + async () => { + const allClientEvents = []; + const pushToClientEvents = e => allClientEvents.push(e); + + client + .on('connectionCheckedOut', pushToClientEvents) + .on('connectionCheckedIn', pushToClientEvents) + .on('connectionClosed', pushToClientEvents); + + const inserts = Promise.allSettled([ + client.db('test').collection('test').insertOne({ a: 1 }), + client.db('test').collection('test').insertOne({ a: 1 }), + client.db('test').collection('test').insertOne({ a: 1 }) + ]); + + // wait until all pings are pending on the server + while (allClientEvents.filter(e => e.name === 'connectionCheckedOut').length < 3) { + await sleep(1); } - }); - client = this.configuration.newClient(); - await client.connect(); - await Promise.all(Array.from({ length: 100 }, () => client.db().command({ ping: 1 }))); - }); + const insertConnectionIds = allClientEvents + .filter(e => e.name === 'connectionCheckedOut') + .map(({ address, connectionId }) => `${address} + ${connectionId}`); - afterEach(async function () { - await clearFailPoint(this.configuration); - await client.close(); - }); + await client.close(); - describe('when a MongoClient is closed', function () { - it( - 'a connection pool emits checked in events for closed connections', - { requires: { mongodb: '>=4.4', topology: 'single' } }, - async () => { - const allClientEvents = []; - const pushToClientEvents = e => allClientEvents.push(e); - - client - .on('connectionCheckedOut', pushToClientEvents) - .on('connectionCheckedIn', pushToClientEvents) - .on('connectionClosed', pushToClientEvents); - - const inserts = Promise.allSettled([ - client.db('test').collection('test').insertOne({ a: 1 }), - client.db('test').collection('test').insertOne({ a: 1 }), - client.db('test').collection('test').insertOne({ a: 1 }) - ]); - - // wait until all pings are pending on the server - while (allClientEvents.filter(e => e.name === 'connectionCheckedOut').length < 3) { - await sleep(1); - } - - const insertConnectionIds = allClientEvents - .filter(e => e.name === 'connectionCheckedOut') - .map(({ address, connectionId }) => `${address} + ${connectionId}`); - - await client.close(); - - const insertCheckInAndCloses = allClientEvents - .filter(e => e.name === 'connectionCheckedIn' || e.name === 'connectionClosed') - .filter(({ address, connectionId }) => - insertConnectionIds.includes(`${address} + ${connectionId}`) - ); - - expect(insertCheckInAndCloses).to.have.lengthOf(6); - - // check that each check-in is followed by a close (not proceeded by one) - expect(insertCheckInAndCloses.map(e => e.name)).to.deep.equal( - Array.from({ length: 3 }, () => ['connectionCheckedIn', 'connectionClosed']).flat(1) + const insertCheckInAndCloses = allClientEvents + .filter(e => e.name === 'connectionCheckedIn' || e.name === 'connectionClosed') + .filter(({ address, connectionId }) => + insertConnectionIds.includes(`${address} + ${connectionId}`) ); - await inserts; - } - ); - }); - } - ); + expect(insertCheckInAndCloses).to.have.lengthOf(6); + + // check that each check-in is followed by a close (not proceeded by one) + expect(insertCheckInAndCloses.map(e => e.name)).to.deep.equal( + Array.from({ length: 3 }, () => ['connectionCheckedIn', 'connectionClosed']).flat(1) + ); + + await inserts; + } + ); + }); + }); }); }); diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index 522d1851ccb..06bb58f83cf 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -1066,13 +1066,22 @@ describe('class MongoClient', function () { }); }); + const metadata: MongoDBMetadataUI = { requires: { mongodb: '>=4.4', topology: 'single' } }; + describe( 'maxPoolSize is not fully used when running clean up operations', - { requires: { mongodb: '>=4.4', topology: 'single' } }, + metadata, function () { let client; beforeEach(async function () { + if (!this.configuration.filters.MongoDBVersionFilter.filter({ metadata })) { + return; + } + if (!this.configuration.filters.MongoDBTopologyFilter.filter({ metadata })) { + return; + } + await configureFailPoint(this.configuration, { configureFailPoint: 'failCommand', mode: 'alwaysOn', @@ -1093,7 +1102,7 @@ describe('class MongoClient', function () { it( 'closes in-use connections before running clean up operations avoiding a deadlock', - { requires: { mongodb: '>=4.4', topology: 'single' } }, + metadata, async () => { const inserted = client .db('t') @@ -1105,11 +1114,10 @@ describe('class MongoClient', function () { const start = performance.now(); await client.close(); - const error = await inserted; + await inserted; const end = performance.now(); expect(end - start).to.be.lessThan(100); - expect(error).to.be.instanceOf(MongoClientClosedError); } ); } From a46ca1da9afc2b8fe4f1aa2f1cf9f7f2acf6387c Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 2 May 2025 12:49:16 -0400 Subject: [PATCH 11/11] test: make sure the getMore is started --- test/integration/crud/misc_cursors.test.js | 52 +++++++++++-------- .../node-specific/mongo_client.test.ts | 1 - 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 18462c47471..c558ec57978 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1848,36 +1848,44 @@ describe('Cursor', function () { } }); - it('closes cursors when client is closed even if it has not been exhausted', async function () { - await client - .db() - .dropCollection('test_cleanup_tailable') - .catch(() => null); + it( + 'closes cursors when client is closed even if it has not been exhausted', + { requires: { topology: '!replicaset' } }, + async function () { + await client + .db() + .dropCollection('test_cleanup_tailable') + .catch(() => null); - const collection = await client - .db() - .createCollection('test_cleanup_tailable', { capped: true, size: 1000, max: 3 }); + const collection = await client + .db() + .createCollection('test_cleanup_tailable', { capped: true, size: 1000, max: 3 }); - // insert only 2 docs in capped coll of 3 - await collection.insertMany([{ a: 1 }, { a: 1 }]); + // insert only 2 docs in capped coll of 3 + await collection.insertMany([{ a: 1 }, { a: 1 }]); - const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 }); + const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 }); - await cursor.next(); - await cursor.next(); - // will block for maxAwaitTimeMS (except we are closing the client) - const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error); + await cursor.next(); + await cursor.next(); - await client.close(); - expect(cursor).to.have.property('closed', true); + const nextCommand = once(client, 'commandStarted'); + // will block for maxAwaitTimeMS (except we are closing the client) + const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error); + + for ( + let [{ commandName }] = await nextCommand; + commandName !== 'getMore'; + [{ commandName }] = await once(client, 'commandStarted') + ); - const error = await rejectedEarlyBecauseClientClosed; - if (this.configuration.topologyType === 'LoadBalanced') { + await client.close(); + expect(cursor).to.have.property('closed', true); + + const error = await rejectedEarlyBecauseClientClosed; expect(error).to.be.instanceOf(MongoClientClosedError); - } else { - expect(error).to.be.null; } - }); + ); it('shouldAwaitData', { // Add a tag that our runner can trigger on diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index 06bb58f83cf..dd3d012fdb8 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -12,7 +12,6 @@ import { Db, getTopology, MongoClient, - MongoClientClosedError, MongoNotConnectedError, MongoServerSelectionError, ReadPreference,