From 944e4b1414e40f925445b8263e0b9525735f7502 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 14 Feb 2023 12:21:08 -0500 Subject: [PATCH 1/2] make DestroyOptions required on connection.destroy --- src/cmap/connect.ts | 2 +- src/cmap/connection.ts | 10 +-- src/cmap/connection_pool.ts | 4 +- src/sdam/server.ts | 5 +- src/sdam/topology.ts | 17 ++--- test/integration/crud/misc_cursors.test.js | 69 ++++++------------- .../node-specific/topology.test.js | 18 +++-- ...records_for_mongos_discovery.prose.test.ts | 2 +- .../assorted/server_selection_spec_helper.js | 2 +- test/unit/error.test.ts | 2 +- test/unit/sdam/monitor.test.ts | 4 +- test/unit/sdam/topology.test.js | 18 ++--- 12 files changed, 60 insertions(+), 93 deletions(-) diff --git a/src/cmap/connect.ts b/src/cmap/connect.ts index e3f44cc9101..a69289befaa 100644 --- a/src/cmap/connect.ts +++ b/src/cmap/connect.ts @@ -96,7 +96,7 @@ function performInitialHandshake( ) { const callback: Callback = function (err, ret) { if (err && conn) { - conn.destroy(); + conn.destroy({ force: false }); } _callback(err, ret); }; diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index b3155239233..c35f5e53918 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -133,7 +133,7 @@ export interface ConnectionOptions /** @internal */ export interface DestroyOptions { /** Force the destruction. */ - force?: boolean; + force: boolean; } /** @public */ @@ -443,16 +443,10 @@ export class Connection extends TypedEventEmitter { callback(undefined, message.documents[0]); } - destroy(options?: DestroyOptions, callback?: Callback): void { - if (typeof options === 'function') { - callback = options; - options = { force: false }; - } - + destroy(options: DestroyOptions, callback?: Callback): void { this.removeAllListeners(Connection.PINNED); this.removeAllListeners(Connection.UNPINNED); - options = Object.assign({ force: false }, options); if (this[kStream] == null || this.destroyed) { this.destroyed = true; if (typeof callback === 'function') { diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index e52fc9081d4..ca3270db5c6 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -510,7 +510,7 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CLOSED, new ConnectionClosedEvent(this, conn, 'poolClosed') ); - conn.destroy(options, cb); + conn.destroy({ force: !!options.force }, cb); }, err => { this[kConnections].clear(); @@ -586,7 +586,7 @@ export class ConnectionPool extends TypedEventEmitter { new ConnectionClosedEvent(this, connection, reason) ); // destroy the connection - process.nextTick(() => connection.destroy()); + process.nextTick(() => connection.destroy({ force: false })); } private connectionIsStale(connection: Connection) { diff --git a/src/sdam/server.ts b/src/sdam/server.ts index e9679de9a2a..4ed6c1cb9c9 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -241,7 +241,10 @@ export class Server extends TypedEventEmitter { /** Destroy the server connection */ destroy(options?: DestroyOptions, callback?: Callback): void { - if (typeof options === 'function') (callback = options), (options = {}); + if (typeof options === 'function') { + callback = options; + options = { force: false }; + } options = Object.assign({}, { force: false }, options); if (this.s.state === STATE_CLOSED) { diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index a8623269133..3b447a6f00c 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -466,26 +466,17 @@ export class Topology extends TypedEventEmitter { } /** Close this topology */ - close(callback: Callback): void; close(options: CloseOptions): void; close(options: CloseOptions, callback: Callback): void; - close(options?: CloseOptions | Callback, callback?: Callback): void { - if (typeof options === 'function') { - callback = options; - options = {}; - } - - if (typeof options === 'boolean') { - options = { force: options }; - } - options = options ?? {}; + close(options?: CloseOptions, callback?: Callback): void { + options = options ?? { force: false }; if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) { return callback?.(); } const destroyedServers = Array.from(this.s.servers.values(), server => { - return promisify(destroyServer)(server, this, options as CloseOptions); + return promisify(destroyServer)(server, this, { force: !!options?.force }); }); Promise.all(destroyedServers) @@ -740,7 +731,7 @@ function destroyServer( options?: DestroyOptions, callback?: Callback ) { - options = options ?? {}; + options = options ?? { force: false }; for (const event of LOCAL_SERVER_EVENTS) { server.removeAllListeners(event); } diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 87e888ac375..d00ea10e3df 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -14,6 +14,7 @@ const { ReadPreference } = require('../../mongodb'); const { ServerType } = require('../../mongodb'); const { formatSort } = require('../../mongodb'); const { getSymbolFrom } = require('../../tools/utils'); +const { MongoExpiredSessionError } = require('../../../src/error'); describe('Cursor', function () { before(function () { @@ -1852,61 +1853,31 @@ describe('Cursor', function () { } }); - it('should close dead tailable cursors', { - metadata: { - os: '!win32' // NODE-2943: timeout on windows - }, - - test: function (done) { - // http://www.mongodb.org/display/DOCS/Tailable+Cursors - - const configuration = this.configuration; - client.connect((err, client) => { - expect(err).to.not.exist; - this.defer(() => client.close()); - - const db = client.db(configuration.db); - const options = { capped: true, size: 10000000 }; - db.createCollection( - 'test_if_dead_tailable_cursors_close', - options, - function (err, collection) { - expect(err).to.not.exist; + it('closes cursors when client is closed even if it has not been exhausted', async function () { + await client + .db() + .dropCollection('test_cleanup_tailable') + .catch(() => null); - let closeCount = 0; - const docs = Array.from({ length: 100 }).map(() => ({ a: 1 })); - collection.insertMany(docs, { w: 'majority', wtimeoutMS: 5000 }, err => { - expect(err).to.not.exist; - - const cursor = collection.find({}, { tailable: true, awaitData: true }); - const stream = cursor.stream(); + const collection = await client + .db() + .createCollection('test_cleanup_tailable', { capped: true, size: 1000, max: 3 }); - stream.resume(); - - var validator = () => { - closeCount++; - if (closeCount === 2) { - done(); - } - }; + // insert only 2 docs in capped coll of 3 + await collection.insertMany([{ a: 1 }, { a: 1 }]); - // we validate that the stream "ends" either cleanly or with an error - stream.on('end', validator); - stream.on('error', validator); + const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 }); - cursor.on('close', validator); + await cursor.next(); + await cursor.next(); + // will block for maxAwaitTimeMS (except we are closing the client) + const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error); - const docs = Array.from({ length: 100 }).map(() => ({ a: 1 })); - collection.insertMany(docs, err => { - expect(err).to.not.exist; + await client.close(); + expect(cursor).to.have.property('killed', true); - setTimeout(() => client.close()); - }); - }); - } - ); - }); - } + const error = await rejectedEarlyBecauseClientClosed; + expect(error).to.be.instanceOf(MongoExpiredSessionError); }); it('shouldAwaitData', { diff --git a/test/integration/node-specific/topology.test.js b/test/integration/node-specific/topology.test.js index ee806b96912..912c1443c49 100644 --- a/test/integration/node-specific/topology.test.js +++ b/test/integration/node-specific/topology.test.js @@ -10,12 +10,20 @@ describe('Topology', function () { const states = []; topology.on('stateChanged', (_, newState) => states.push(newState)); topology.connect(err => { - expect(err).to.not.exist; - topology.close(err => { + try { expect(err).to.not.exist; - expect(topology.isDestroyed()).to.be.true; - expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']); - done(); + } catch (error) { + done(error); + } + topology.close({}, err => { + try { + expect(err).to.not.exist; + expect(topology.isDestroyed()).to.be.true; + expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']); + done(); + } catch (error) { + done(error); + } }); }); } diff --git a/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts b/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts index 27e680fc751..8d8b2b8412c 100644 --- a/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts +++ b/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts @@ -103,7 +103,7 @@ describe('Polling Srv Records for Mongos Discovery', () => { afterEach(function (done) { if (context.topology) { - context.topology.close(done); + context.topology.close({}, done); } else { done(); } diff --git a/test/unit/assorted/server_selection_spec_helper.js b/test/unit/assorted/server_selection_spec_helper.js index 7c895eba5df..32b2147eb14 100644 --- a/test/unit/assorted/server_selection_spec_helper.js +++ b/test/unit/assorted/server_selection_spec_helper.js @@ -109,7 +109,7 @@ function executeServerSelectionTest(testDefinition, testDone) { }); function done(err) { - topology.close(e => testDone(e || err)); + topology.close({}, e => testDone(e || err)); } topology.connect(err => { diff --git a/test/unit/error.test.ts b/test/unit/error.test.ts index d0cd53bb22f..0a6b05cd99f 100644 --- a/test/unit/error.test.ts +++ b/test/unit/error.test.ts @@ -381,7 +381,7 @@ describe('MongoErrors', () => { makeAndConnectReplSet((err, topology) => { // cleanup the server before calling done - const cleanup = err => topology.close(err2 => done(err || err2)); + const cleanup = err => topology.close({}, err2 => done(err || err2)); if (err) { return cleanup(err); diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts index 497183ecb62..ecb62a5de90 100644 --- a/test/unit/sdam/monitor.test.ts +++ b/test/unit/sdam/monitor.test.ts @@ -56,7 +56,7 @@ describe('monitoring', function () { const serverDescription = Array.from(topology.description.servers.values())[0]; expect(serverDescription).property('roundTripTime').to.be.greaterThan(0); - topology.close(done as any); + topology.close({}, done as any); }, 500); }); }).skipReason = 'TODO(NODE-3819): Unskip flaky tests'; @@ -96,7 +96,7 @@ describe('monitoring', function () { const serverDescription = Array.from(topology.description.servers.values())[0]; expect(serverDescription).property('roundTripTime').to.be.greaterThan(0); - topology.close(done); + topology.close({}, done); }); }).skipReason = 'TODO(NODE-3600): Unskip flaky tests'; diff --git a/test/unit/sdam/topology.test.js b/test/unit/sdam/topology.test.js index 5ca93bd3d47..68c52b878ab 100644 --- a/test/unit/sdam/topology.test.js +++ b/test/unit/sdam/topology.test.js @@ -26,7 +26,7 @@ describe('Topology (unit)', function () { } if (topology) { - topology.close(); + topology.close({}); } }); @@ -107,7 +107,7 @@ describe('Topology (unit)', function () { topology.connect(() => { expect(topology.shouldCheckForSessionSupport()).to.be.true; - topology.close(done); + topology.close({}, done); }); }); @@ -127,7 +127,7 @@ describe('Topology (unit)', function () { topology.connect(() => { expect(topology.shouldCheckForSessionSupport()).to.be.false; - topology.close(done); + topology.close({}, done); }); }); @@ -147,7 +147,7 @@ describe('Topology (unit)', function () { topology.connect(() => { expect(topology.shouldCheckForSessionSupport()).to.be.false; - topology.close(done); + topology.close({}, done); }); }); }); @@ -182,7 +182,7 @@ describe('Topology (unit)', function () { expect(err).to.exist; expect(err).to.match(/timed out/); - topology.close(done); + topology.close({}, done); }); }); }); @@ -325,7 +325,7 @@ describe('Topology (unit)', function () { expect(err).to.exist; expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.false; - topology.close(done); + topology.close({}, done); }); }); }); @@ -467,7 +467,7 @@ describe('Topology (unit)', function () { it('should clean up listeners on close', function (done) { topology.s.state = 'connected'; // fake state to test clean up logic - topology.close(e => { + topology.close({}, e => { const srvPollerListeners = topology.s.srvPoller.listeners( SrvPoller.SRV_RECORD_DISCOVERY ); @@ -547,7 +547,7 @@ describe('Topology (unit)', function () { // occurs `requestCheck` will be called for an immediate check. expect(requestCheck).property('callCount').to.equal(1); - topology.close(done); + topology.close({}, done); }); }); }); @@ -559,7 +559,7 @@ describe('Topology (unit)', function () { this.emit('connect'); }); - topology.close(() => { + topology.close({}, () => { topology.selectServer(ReadPreference.primary, { serverSelectionTimeoutMS: 2000 }, err => { expect(err).to.exist; expect(err).to.match(/Topology is closed/); From 7ba26d3a3d6c7360e9054145ffa0f67203ca4128 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 14 Feb 2023 12:45:28 -0500 Subject: [PATCH 2/2] chore: fix import --- test/integration/crud/misc_cursors.test.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index d00ea10e3df..e224dbd3c75 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -10,11 +10,10 @@ const sinon = require('sinon'); const { Writable } = require('stream'); const { once, on } = require('events'); const { setTimeout } = require('timers'); -const { ReadPreference } = require('../../mongodb'); +const { ReadPreference, MongoExpiredSessionError } = require('../../mongodb'); const { ServerType } = require('../../mongodb'); const { formatSort } = require('../../mongodb'); const { getSymbolFrom } = require('../../tools/utils'); -const { MongoExpiredSessionError } = require('../../../src/error'); describe('Cursor', function () { before(function () {