Skip to content

Commit 24bc347

Browse files
committed
fix(NODE-5993): connection's aborted promise leak
1 parent 28b7040 commit 24bc347

File tree

4 files changed

+152
-55
lines changed

4 files changed

+152
-55
lines changed

src/cmap/connection.ts

Lines changed: 34 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { type Readable, Transform, type TransformCallback } from 'stream';
22
import { clearTimeout, setTimeout } from 'timers';
3-
import { promisify } from 'util';
43

54
import type { BSONSerializeOptions, Document, ObjectId } from '../bson';
65
import type { AutoEncrypter } from '../client-side-encryption/auto_encrypter';
@@ -182,18 +181,18 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
182181
* Once connection is established, command logging can log events (if enabled)
183182
*/
184183
public established: boolean;
184+
/** Indicates that the connection (including underlying TCP socket) has been closed. */
185+
public closed = false;
185186

186187
private lastUseTime: number;
187188
private clusterTime: Document | null = null;
189+
private error: Error | null = null;
190+
private dataEvents: AsyncGenerator<Buffer, void, void> | null = null;
188191

189192
private readonly socketTimeoutMS: number;
190193
private readonly monitorCommands: boolean;
191194
private readonly socket: Stream;
192-
private readonly controller: AbortController;
193-
private readonly signal: AbortSignal;
194195
private readonly messageStream: Readable;
195-
private readonly socketWrite: (buffer: Uint8Array) => Promise<void>;
196-
private readonly aborted: Promise<never>;
197196

198197
/** @event */
199198
static readonly COMMAND_STARTED = COMMAND_STARTED;
@@ -213,6 +212,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
213212
constructor(stream: Stream, options: ConnectionOptions) {
214213
super();
215214

215+
this.socket = stream;
216216
this.id = options.id;
217217
this.address = streamIdentifier(stream, options);
218218
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
@@ -225,39 +225,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
225225
this.generation = options.generation;
226226
this.lastUseTime = now();
227227

228-
this.socket = stream;
229-
230-
// TODO: Remove signal from connection layer
231-
this.controller = new AbortController();
232-
const { signal } = this.controller;
233-
this.signal = signal;
234-
const { promise: aborted, reject } = promiseWithResolvers<never>();
235-
aborted.then(undefined, () => null); // Prevent unhandled rejection
236-
this.signal.addEventListener(
237-
'abort',
238-
function onAbort() {
239-
reject(signal.reason);
240-
},
241-
{ once: true }
242-
);
243-
this.aborted = aborted;
244-
245228
this.messageStream = this.socket
246229
.on('error', this.onError.bind(this))
247230
.pipe(new SizedMessageTransform({ connection: this }))
248231
.on('error', this.onError.bind(this));
249232
this.socket.on('close', this.onClose.bind(this));
250233
this.socket.on('timeout', this.onTimeout.bind(this));
251-
252-
const socketWrite = promisify(this.socket.write.bind(this.socket));
253-
this.socketWrite = async buffer => {
254-
return Promise.race([socketWrite(buffer), this.aborted]);
255-
};
256-
}
257-
258-
/** Indicates that the connection (including underlying TCP socket) has been closed. */
259-
public get closed(): boolean {
260-
return this.signal.aborted;
261234
}
262235

263236
public get hello() {
@@ -357,7 +330,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
357330
}
358331

359332
this.socket.destroy();
360-
this.controller.abort(error);
333+
if (error) {
334+
this.error = error;
335+
this.dataEvents?.throw(error).then(undefined, () => null); // squash unhandled rejection
336+
}
337+
this.closed = true;
361338
this.emit(Connection.CLOSE);
362339
}
363340

@@ -598,7 +575,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
598575
}
599576

600577
private throwIfAborted() {
601-
this.signal.throwIfAborted();
578+
if (this.error) throw this.error;
602579
}
603580

604581
/**
@@ -621,7 +598,18 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
621598

622599
const buffer = Buffer.concat(await finalCommand.toBin());
623600

624-
return this.socketWrite(buffer);
601+
if (this.socket.write(buffer)) return;
602+
603+
const { promise: drained, resolve, reject } = promiseWithResolvers<void>();
604+
const onDrain = () => resolve();
605+
const onError = (error: Error) => reject(error);
606+
607+
this.socket.once('drain', onDrain).once('error', onError);
608+
try {
609+
return await drained;
610+
} finally {
611+
this.socket.off('drain', onDrain).off('error', onError);
612+
}
625613
}
626614

627615
/**
@@ -634,13 +622,19 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
634622
* Note that `for-await` loops call `return` automatically when the loop is exited.
635623
*/
636624
private async *readMany(): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
637-
for await (const message of onData(this.messageStream, { signal: this.signal })) {
638-
const response = await decompressResponse(message);
639-
yield response;
625+
try {
626+
this.dataEvents = this.dataEvents = onData(this.messageStream);
627+
for await (const message of this.dataEvents) {
628+
const response = await decompressResponse(message);
629+
yield response;
640630

641-
if (!response.moreToCome) {
642-
return;
631+
if (!response.moreToCome) {
632+
return;
633+
}
643634
}
635+
} finally {
636+
this.dataEvents = null;
637+
this.throwIfAborted();
644638
}
645639
}
646640
}

src/cmap/wire_protocol/on_data.ts

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ type PendingPromises = Omit<
1818
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
1919
* It will reject upon an error event or if the provided signal is aborted.
2020
*/
21-
export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) {
22-
const signal = options.signal;
23-
21+
export function onData(emitter: EventEmitter) {
2422
// Setup pending events and pending promise lists
2523
/**
2624
* When the caller has not yet called .next(), we store the
@@ -89,19 +87,8 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal })
8987
emitter.on('data', eventHandler);
9088
emitter.on('error', errorHandler);
9189

92-
if (signal.aborted) {
93-
// If the signal is aborted, set up the first .next() call to be a rejection
94-
queueMicrotask(abortListener);
95-
} else {
96-
signal.addEventListener('abort', abortListener, { once: true });
97-
}
98-
9990
return iterator;
10091

101-
function abortListener() {
102-
errorHandler(signal.reason);
103-
}
104-
10592
function eventHandler(value: Buffer) {
10693
const promise = unconsumedPromises.shift();
10794
if (promise != null) promise.resolve({ value, done: false });
@@ -119,7 +106,6 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal })
119106
// Adding event handlers
120107
emitter.off('data', eventHandler);
121108
emitter.off('error', errorHandler);
122-
signal.removeEventListener('abort', abortListener);
123109
finished = true;
124110
const doneResult = { value: undefined, done: finished } as const;
125111

test/integration/connection-monitoring-and-pooling/connection.test.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import { expect } from 'chai';
2+
import { type EventEmitter, once } from 'events';
3+
import * as sinon from 'sinon';
4+
import { setTimeout } from 'timers';
25

36
import {
47
addContainerMetadata,
8+
Binary,
59
connect,
610
Connection,
711
type ConnectionOptions,
@@ -15,7 +19,9 @@ import {
1519
ServerHeartbeatStartedEvent,
1620
Topology
1721
} from '../../mongodb';
22+
import * as mock from '../../tools/mongodb-mock/index';
1823
import { skipBrokenAuthTestBeforeEachHook } from '../../tools/runner/hooks/configuration';
24+
import { getSymbolFrom, sleep } from '../../tools/utils';
1925
import { assert as test, setupDatabase } from '../shared';
2026

2127
const commonConnectOptions = {
@@ -200,6 +206,84 @@ describe('Connection', function () {
200206
client.connect();
201207
});
202208

209+
context(
210+
'when a large message is written to the socket',
211+
{ requires: { topology: 'single', auth: 'disabled' } },
212+
() => {
213+
let client, mockServer: import('../../tools/mongodb-mock/src/server').MockServer;
214+
215+
beforeEach(async function () {
216+
mockServer = await mock.createServer();
217+
218+
mockServer
219+
.addMessageHandler('insert', req => {
220+
setTimeout(() => {
221+
req.reply({ ok: 1 });
222+
}, 800);
223+
})
224+
.addMessageHandler('hello', req => {
225+
req.reply(Object.assign({}, mock.HELLO));
226+
})
227+
.addMessageHandler(LEGACY_HELLO_COMMAND, req => {
228+
req.reply(Object.assign({}, mock.HELLO));
229+
});
230+
231+
client = new MongoClient(`mongodb://${mockServer.uri()}`, {
232+
minPoolSize: 1,
233+
maxPoolSize: 1
234+
});
235+
});
236+
237+
afterEach(async function () {
238+
await client.close();
239+
mockServer.destroy();
240+
sinon.restore();
241+
});
242+
243+
it('waits for an async drain event because the write was buffered', async () => {
244+
const connectionReady = once(client, 'connectionReady');
245+
await client.connect();
246+
await connectionReady;
247+
248+
// Get the only connection
249+
const pool = [...client.topology.s.servers.values()][0].pool;
250+
251+
const connections = pool[getSymbolFrom(pool, 'connections')];
252+
expect(connections).to.have.lengthOf(1);
253+
254+
const connection = connections.first();
255+
const socket: EventEmitter = connection.socket;
256+
257+
// Spy on the socket event listeners
258+
const addedListeners: string[] = [];
259+
const removedListeners: string[] = [];
260+
socket
261+
.on('removeListener', name => removedListeners.push(name))
262+
.on('newListener', name => addedListeners.push(name));
263+
264+
// Make server sockets block
265+
for (const s of mockServer.sockets) s.pause();
266+
267+
const insert = client
268+
.db('test')
269+
.collection('test')
270+
// Anything above 16Kb should work I think (10mb to be extra sure)
271+
.insertOne({ a: new Binary(Buffer.alloc(10 * (2 ** 10) ** 2), 127) });
272+
273+
// Sleep a bit and unblock server sockets
274+
await sleep(10);
275+
for (const s of mockServer.sockets) s.resume();
276+
277+
// Let the operation finish
278+
await insert;
279+
280+
// Ensure that we used the drain event for this write
281+
expect(addedListeners).to.deep.equal(['drain', 'error']);
282+
expect(removedListeners).to.deep.equal(['drain', 'error']);
283+
});
284+
}
285+
);
286+
203287
context('when connecting with a username and password', () => {
204288
let utilClient: MongoClient;
205289
let client: MongoClient;

test/integration/node-specific/resource_clean_up.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import * as v8 from 'node:v8';
2+
13
import { expect } from 'chai';
24

5+
import { sleep } from '../../tools/utils';
36
import { runScript } from './resource_tracking_script_builder';
47

58
/**
@@ -86,4 +89,34 @@ describe('Driver Resources', () => {
8689
});
8790
});
8891
});
92+
93+
context('when 100s of operations are executed and complete', () => {
94+
beforeEach(function () {
95+
if (this.currentTest && typeof v8.queryObjects !== 'function') {
96+
this.currentTest.skipReason = 'Test requires v8.queryObjects API to count Promises';
97+
this.currentTest?.skip();
98+
}
99+
});
100+
101+
let client;
102+
beforeEach(async function () {
103+
client = this.configuration.newClient();
104+
});
105+
106+
afterEach(async function () {
107+
await client.close();
108+
});
109+
110+
it('does not leave behind additional promises', async () => {
111+
const test = client.db('test').collection('test');
112+
const promiseCountBefore = v8.queryObjects(Promise, { format: 'count' });
113+
for (let i = 0; i < 100; i++) {
114+
await test.findOne();
115+
}
116+
await sleep(10);
117+
const promiseCountAfter = v8.queryObjects(Promise, { format: 'count' });
118+
119+
expect(promiseCountAfter).to.be.within(promiseCountBefore - 5, promiseCountBefore + 5);
120+
});
121+
});
89122
});

0 commit comments

Comments
 (0)