Skip to content

Commit 0756bc9

Browse files
committed
feat: error on invalid queue size
1 parent f2c6e78 commit 0756bc9

File tree

2 files changed

+79
-17
lines changed

2 files changed

+79
-17
lines changed

src/cmap/connection.ts

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
MongoMissingDependencyError,
1818
MongoNetworkError,
1919
MongoNetworkTimeoutError,
20+
MongoRuntimeError,
2021
MongoServerError,
2122
MongoWriteConcernError
2223
} from '../error';
@@ -68,6 +69,8 @@ const kAutoEncrypter = Symbol('autoEncrypter');
6869
/** @internal */
6970
const kDelayedTimeoutId = Symbol('delayedTimeoutId');
7071

72+
const INVALID_QUEUE_SIZE = 'Connection internal queue contains more than 1 operation description';
73+
7174
/** @internal */
7275
export interface CommandOptions extends BSONSerializeOptions {
7376
command?: boolean;
@@ -374,15 +377,20 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
374377
if (!operationDescription && this.isMonitoringConnection) {
375378
// NODE-4783: How do we recover from this when the initial hello's requestId is not
376379
// the responseTo when hello responses have been skipped?
377-
//
378-
// Get the first orphaned operation description.
379-
const entry = this[kQueue].entries().next();
380-
if (entry) {
381-
const [requestId, orphaned]: [number, OperationDescription] = entry.value;
382-
// If the orphaned operation description exists then set it.
383-
operationDescription = orphaned;
384-
// Remove the entry with the bad request id from the queue.
385-
this[kQueue].delete(requestId);
380+
381+
// First check if the map is of invalid size
382+
if (this[kQueue].size > 1) {
383+
this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE));
384+
} else {
385+
// Get the first orphaned operation description.
386+
const entry = this[kQueue].entries().next();
387+
if (entry) {
388+
const [requestId, orphaned]: [number, OperationDescription] = entry.value;
389+
// If the orphaned operation description exists then set it.
390+
operationDescription = orphaned;
391+
// Remove the entry with the bad request id from the queue.
392+
this[kQueue].delete(requestId);
393+
}
386394
}
387395
}
388396

test/unit/cmap/connection.test.ts

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { expect } from 'chai';
2-
import { EventEmitter, on } from 'events';
2+
import { EventEmitter, once } from 'events';
33
import { Socket } from 'net';
44
import * as sinon from 'sinon';
55
import { Readable } from 'stream';
@@ -9,7 +9,7 @@ import { BinMsg } from '../../../src/cmap/commands';
99
import { connect } from '../../../src/cmap/connect';
1010
import { Connection, hasSessionSupport } from '../../../src/cmap/connection';
1111
import { MessageStream } from '../../../src/cmap/message_stream';
12-
import { MongoNetworkTimeoutError } from '../../../src/error';
12+
import { MongoNetworkTimeoutError, MongoRuntimeError } from '../../../src/error';
1313
import { isHello, ns } from '../../../src/utils';
1414
import * as mock from '../../tools/mongodb-mock/index';
1515
import { generateOpMsgBuffer, getSymbolFrom } from '../../tools/utils';
@@ -172,12 +172,13 @@ describe('new Connection()', function () {
172172
let callbackSpy;
173173
const inputStream = new Readable();
174174
const document = { ok: 1 };
175+
const last = { isWritablePrimary: true };
175176

176177
beforeEach(function () {
177178
callbackSpy = sinon.spy();
178179
const firstHello = generateOpMsgBuffer(document);
179180
const secondHello = generateOpMsgBuffer(document);
180-
const thirdHello = generateOpMsgBuffer(document);
181+
const thirdHello = generateOpMsgBuffer(last);
181182
const buffer = Buffer.concat([firstHello, secondHello, thirdHello]);
182183

183184
connection = sinon.spy(new Connection(inputStream, connectionOptionsDefaults));
@@ -199,9 +200,10 @@ describe('new Connection()', function () {
199200
inputStream.push(null);
200201
});
201202

202-
it('calls the operation description callback with the document', async function () {
203-
await on(inputStream, 'message');
204-
expect(callbackSpy).to.be.calledOnceWith(undefined, document);
203+
it('calls the callback with the last hello document', async function () {
204+
const messages = await once(connection, 'message');
205+
expect(messages[0].responseTo).to.equal(0);
206+
expect(callbackSpy).to.be.calledOnceWith(undefined, last);
205207
});
206208
});
207209

@@ -230,8 +232,8 @@ describe('new Connection()', function () {
230232
const msg = generateOpMsgBuffer(document);
231233
const msgHeader: MessageHeader = {
232234
length: msg.readInt32LE(0),
233-
requestId: msg.readInt32LE(4),
234-
responseTo: msg.readInt32LE(8),
235+
requestId: 1,
236+
responseTo: 0, // This will not match.
235237
opCode: msg.readInt32LE(12)
236238
};
237239
const msgBody = msg.subarray(16);
@@ -284,6 +286,58 @@ describe('new Connection()', function () {
284286
expect(callbackSpy).to.be.calledOnceWith(undefined, document);
285287
});
286288
});
289+
290+
context('when more than one operation description is in the queue', function () {
291+
let spyOne;
292+
let spyTwo;
293+
const document = { ok: 1 };
294+
295+
beforeEach(function () {
296+
spyOne = sinon.spy();
297+
spyTwo = sinon.spy();
298+
299+
// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
300+
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
301+
connection.isMonitoringConnection = true;
302+
const queueSymbol = getSymbolFrom(connection, 'queue');
303+
queue = connection[queueSymbol];
304+
305+
// Create the operation descriptions.
306+
const descriptionOne: OperationDescription = {
307+
requestId: 1,
308+
cb: spyOne
309+
};
310+
const descriptionTwo: OperationDescription = {
311+
requestId: 2,
312+
cb: spyTwo
313+
};
314+
315+
// Stick an operation description in the queue.
316+
queue.set(2, descriptionOne);
317+
queue.set(3, descriptionTwo);
318+
// Emit a message that matches the existing operation description.
319+
const msg = generateOpMsgBuffer(document);
320+
const msgHeader: MessageHeader = {
321+
length: msg.readInt32LE(0),
322+
requestId: 2,
323+
responseTo: 1,
324+
opCode: msg.readInt32LE(12)
325+
};
326+
const msgBody = msg.subarray(16);
327+
328+
const message = new BinMsg(msg, msgHeader, msgBody);
329+
connection.onMessage(message);
330+
});
331+
332+
it('calls all operation description callbacks with an error', function () {
333+
expect(spyOne).to.be.calledOnce;
334+
expect(spyTwo).to.be.calledOnce;
335+
const errorOne = spyOne.firstCall.args[0];
336+
const errorTwo = spyTwo.firstCall.args[0];
337+
expect(errorOne).to.be.instanceof(MongoRuntimeError);
338+
expect(errorTwo).to.be.instanceof(MongoRuntimeError);
339+
});
340+
});
287341
});
288342
});
289343

0 commit comments

Comments
 (0)