From 562ed31183801eecc211b08384d0d4e0b1b33a33 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 3 Apr 2024 13:31:04 -0400 Subject: [PATCH 1/8] refactor(NODE-6056): implement MongoDBResponse class --- src/bson.ts | 1 + src/cmap/auth/gssapi.ts | 8 +- src/cmap/commands.ts | 160 +++--------------- src/cmap/connect.ts | 2 +- src/cmap/connection.ts | 130 ++++++++++---- src/cmap/wire_protocol/compression.ts | 10 +- src/cmap/wire_protocol/on_demand/document.ts | 1 + src/cmap/wire_protocol/responses.ts | 105 ++++++++++++ src/index.ts | 5 +- src/sessions.ts | 5 +- .../node-specific/bson-options/raw.test.ts | 18 +- .../bson-options/utf8_validation.test.ts | 6 +- test/mongodb.ts | 1 + test/types/connection.test-d.ts | 25 +++ test/unit/cmap/commands.test.js | 32 ++-- .../wire_protocol/on_demand/document.test.ts | 18 ++ .../unit/cmap/wire_protocol/responses.test.ts | 63 +++++++ test/unit/commands.test.ts | 143 ---------------- 18 files changed, 372 insertions(+), 361 deletions(-) create mode 100644 src/cmap/wire_protocol/responses.ts create mode 100644 test/types/connection.test-d.ts create mode 100644 test/unit/cmap/wire_protocol/responses.test.ts diff --git a/src/bson.ts b/src/bson.ts index a44f7e2519f..7938a2b173a 100644 --- a/src/bson.ts +++ b/src/bson.ts @@ -33,6 +33,7 @@ export function parseToElementsToArray(bytes: Uint8Array, offset?: number): BSON const res = BSON.onDemand.parseToElements(bytes, offset); return Array.isArray(res) ? res : [...res]; } + export const getInt32LE = BSON.onDemand.NumberUtils.getInt32LE; export const getFloat64LE = BSON.onDemand.NumberUtils.getFloat64LE; export const getBigInt64LE = BSON.onDemand.NumberUtils.getBigInt64LE; diff --git a/src/cmap/auth/gssapi.ts b/src/cmap/auth/gssapi.ts index e4e2658db24..2595546a588 100644 --- a/src/cmap/auth/gssapi.ts +++ b/src/cmap/auth/gssapi.ts @@ -29,11 +29,9 @@ type MechanismProperties = { async function externalCommand( connection: Connection, command: ReturnType | ReturnType -): Promise<{ payload: string; conversationId: any }> { - return await (connection.command(ns('$external.$cmd'), command, undefined) as Promise<{ - payload: string; - conversationId: any; - }>); +): Promise<{ payload: string; conversationId: number }> { + const response = await connection.command(ns('$external.$cmd'), command); + return response as { payload: string; conversationId: number }; } let krb: typeof Kerberos; diff --git a/src/cmap/commands.ts b/src/cmap/commands.ts index 57a605cf248..19dd8e1a657 100644 --- a/src/cmap/commands.ts +++ b/src/cmap/commands.ts @@ -37,7 +37,6 @@ export type WriteProtocolMessageType = OpQueryRequest | OpMsgRequest; export interface OpQueryOptions extends CommandOptions { socketTimeoutMS?: number; session?: ClientSession; - documentsReturnedIn?: string; numberToSkip?: number; numberToReturn?: number; returnFieldSelector?: Document; @@ -53,9 +52,6 @@ export interface OpQueryOptions extends CommandOptions { exhaustAllowed?: boolean; } -/************************************************************** - * QUERY - **************************************************************/ /** @internal */ export class OpQueryRequest { ns: string; @@ -284,16 +280,11 @@ export interface MessageHeader { } /** @internal */ -export interface OpResponseOptions extends BSONSerializeOptions { - documentsReturnedIn?: string | null; -} - -/** @internal */ -export class OpQueryResponse { +export class OpReply { parsed: boolean; raw: Buffer; data: Buffer; - opts: OpResponseOptions; + opts: BSONSerializeOptions; length: number; requestId: number; responseTo: number; @@ -303,7 +294,6 @@ export class OpQueryResponse { cursorId?: Long; startingFrom?: number; numberReturned?: number; - documents: (Document | Buffer)[] = new Array(0); cursorNotFound?: boolean; queryFailure?: boolean; shardConfigStale?: boolean; @@ -313,7 +303,8 @@ export class OpQueryResponse { promoteValues: boolean; promoteBuffers: boolean; bsonRegExp?: boolean; - index?: number; + index = 0; + sections: Uint8Array[] = []; /** moreToCome is an OP_MSG only concept */ moreToCome = false; @@ -322,7 +313,7 @@ export class OpQueryResponse { message: Buffer, msgHeader: MessageHeader, msgBody: Buffer, - opts?: OpResponseOptions + opts?: BSONSerializeOptions ) { this.parsed = false; this.raw = message; @@ -356,29 +347,9 @@ export class OpQueryResponse { return this.parsed; } - parse(options: OpResponseOptions): void { + parse(): Uint8Array { // Don't parse again if not needed - if (this.parsed) return; - options = options ?? {}; - - // Allow the return of raw documents instead of parsing - const raw = options.raw || false; - const documentsReturnedIn = options.documentsReturnedIn || null; - const useBigInt64 = options.useBigInt64 ?? this.opts.useBigInt64; - const promoteLongs = options.promoteLongs ?? this.opts.promoteLongs; - const promoteValues = options.promoteValues ?? this.opts.promoteValues; - const promoteBuffers = options.promoteBuffers ?? this.opts.promoteBuffers; - const bsonRegExp = options.bsonRegExp ?? this.opts.bsonRegExp; - let bsonSize; - - // Set up the options - const _options: BSONSerializeOptions = { - useBigInt64, - promoteLongs, - promoteValues, - promoteBuffers, - bsonRegExp - }; + if (this.parsed) return this.sections[0]; // Position within OP_REPLY at which documents start // (See https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/#wire-op-reply) @@ -390,8 +361,11 @@ export class OpQueryResponse { this.startingFrom = this.data.readInt32LE(12); this.numberReturned = this.data.readInt32LE(16); - // Preallocate document array - this.documents = new Array(this.numberReturned); + if (this.numberReturned < 0 || this.numberReturned > 2 ** 32 - 1) { + throw new RangeError( + `OP_REPLY numberReturned is an invalid array length ${this.numberReturned}` + ); + } this.cursorNotFound = (this.responseFlags & CURSOR_NOT_FOUND) !== 0; this.queryFailure = (this.responseFlags & QUERY_FAILURE) !== 0; @@ -400,67 +374,26 @@ export class OpQueryResponse { // Parse Body for (let i = 0; i < this.numberReturned; i++) { - bsonSize = + const bsonSize = this.data[this.index] | (this.data[this.index + 1] << 8) | (this.data[this.index + 2] << 16) | (this.data[this.index + 3] << 24); - // If we have raw results specified slice the return document - if (raw) { - this.documents[i] = this.data.slice(this.index, this.index + bsonSize); - } else { - this.documents[i] = BSON.deserialize( - this.data.slice(this.index, this.index + bsonSize), - _options - ); - } + const section = this.data.subarray(this.index, this.index + bsonSize); + this.sections.push(section); // Adjust the index this.index = this.index + bsonSize; } - if (this.documents.length === 1 && documentsReturnedIn != null && raw) { - const fieldsAsRaw: Document = {}; - fieldsAsRaw[documentsReturnedIn] = true; - _options.fieldsAsRaw = fieldsAsRaw; - - const doc = BSON.deserialize(this.documents[0] as Buffer, _options); - this.documents = [doc]; - } - // Set parsed this.parsed = true; + + return this.sections[0]; } } -// Implementation of OP_MSG spec: -// https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst -// -// struct Section { -// uint8 payloadType; -// union payload { -// document document; // payloadType == 0 -// struct sequence { // payloadType == 1 -// int32 size; -// cstring identifier; -// document* documents; -// }; -// }; -// }; - -// struct OP_MSG { -// struct MsgHeader { -// int32 messageLength; -// int32 requestID; -// int32 responseTo; -// int32 opCode = 2013; -// }; -// uint32 flagBits; -// Section+ sections; -// [uint32 checksum;] -// }; - // Msg Flags const OPTS_CHECKSUM_PRESENT = 1; const OPTS_MORE_TO_COME = 2; @@ -587,7 +520,7 @@ export class OpMsgResponse { parsed: boolean; raw: Buffer; data: Buffer; - opts: OpResponseOptions; + opts: BSONSerializeOptions; length: number; requestId: number; responseTo: number; @@ -603,14 +536,14 @@ export class OpMsgResponse { promoteValues: boolean; promoteBuffers: boolean; bsonRegExp: boolean; - documents: (Document | Buffer)[]; - index?: number; + index = 0; + sections: Uint8Array[] = []; constructor( message: Buffer, msgHeader: MessageHeader, msgBody: Buffer, - opts?: OpResponseOptions + opts?: BSONSerializeOptions ) { this.parsed = false; this.raw = message; @@ -642,47 +575,26 @@ export class OpMsgResponse { this.promoteBuffers = typeof this.opts.promoteBuffers === 'boolean' ? this.opts.promoteBuffers : false; this.bsonRegExp = typeof this.opts.bsonRegExp === 'boolean' ? this.opts.bsonRegExp : false; - - this.documents = []; } isParsed(): boolean { return this.parsed; } - parse(options: OpResponseOptions): void { + parse(): Uint8Array { // Don't parse again if not needed - if (this.parsed) return; - options = options ?? {}; + if (this.parsed) return this.sections[0]; this.index = 4; - // Allow the return of raw documents instead of parsing - const raw = options.raw || false; - const documentsReturnedIn = options.documentsReturnedIn || null; - const useBigInt64 = options.useBigInt64 ?? this.opts.useBigInt64; - const promoteLongs = options.promoteLongs ?? this.opts.promoteLongs; - const promoteValues = options.promoteValues ?? this.opts.promoteValues; - const promoteBuffers = options.promoteBuffers ?? this.opts.promoteBuffers; - const bsonRegExp = options.bsonRegExp ?? this.opts.bsonRegExp; - const validation = this.parseBsonSerializationOptions(options); - - // Set up the options - const bsonOptions: BSONSerializeOptions = { - useBigInt64, - promoteLongs, - promoteValues, - promoteBuffers, - bsonRegExp, - validation - // Due to the strictness of the BSON libraries validation option we need this cast - } as BSONSerializeOptions & { validation: { utf8: { writeErrors: boolean } } }; while (this.index < this.data.length) { const payloadType = this.data.readUInt8(this.index++); if (payloadType === 0) { const bsonSize = this.data.readUInt32LE(this.index); - const bin = this.data.slice(this.index, this.index + bsonSize); - this.documents.push(raw ? bin : BSON.deserialize(bin, bsonOptions)); + const bin = this.data.subarray(this.index, this.index + bsonSize); + + this.sections.push(bin); + this.index += bsonSize; } else if (payloadType === 1) { // It was decided that no driver makes use of payload type 1 @@ -692,25 +604,9 @@ export class OpMsgResponse { } } - if (this.documents.length === 1 && documentsReturnedIn != null && raw) { - const fieldsAsRaw: Document = {}; - fieldsAsRaw[documentsReturnedIn] = true; - bsonOptions.fieldsAsRaw = fieldsAsRaw; - const doc = BSON.deserialize(this.documents[0] as Buffer, bsonOptions); - this.documents = [doc]; - } - this.parsed = true; - } - - parseBsonSerializationOptions({ enableUtf8Validation }: BSONSerializeOptions): { - utf8: { writeErrors: false } | false; - } { - if (enableUtf8Validation === false) { - return { utf8: false }; - } - return { utf8: { writeErrors: false } }; + return this.sections[0]; } } diff --git a/src/cmap/connect.ts b/src/cmap/connect.ts index 861062ebe93..abc530f8805 100644 --- a/src/cmap/connect.ts +++ b/src/cmap/connect.ts @@ -103,7 +103,7 @@ export async function performInitialHandshake( const handshakeDoc = await prepareHandshakeDocument(authContext); // @ts-expect-error: TODO(NODE-5141): The options need to be filtered properly, Connection options differ from Command options - const handshakeOptions: CommandOptions = { ...options }; + const handshakeOptions: CommandOptions = { ...options, raw: false }; if (typeof options.connectTimeoutMS === 'number') { // The handshake technically is a monitoring check, so its socket timeout should be connectTimeoutMS handshakeOptions.socketTimeoutMS = options.connectTimeoutMS; diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 1a4db3401f9..34c928d517f 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -54,7 +54,7 @@ import { OpMsgRequest, type OpMsgResponse, OpQueryRequest, - type OpQueryResponse, + type OpReply, type WriteProtocolMessageType } from './commands'; import type { Stream } from './connect'; @@ -62,6 +62,7 @@ import type { ClientMetadata } from './handshake/client_metadata'; import { StreamDescription, type StreamDescriptionOptions } from './stream_description'; import { type CompressorName, decompressResponse } from './wire_protocol/compression'; import { onData } from './wire_protocol/on_data'; +import { MongoDBResponse } from './wire_protocol/responses'; import { getReadPreference, isSharded } from './wire_protocol/shared'; /** @internal */ @@ -412,7 +413,11 @@ export class Connection extends TypedEventEmitter { return message; } - private async *sendWire(message: WriteProtocolMessageType, options: CommandOptions) { + private async *sendWire( + message: WriteProtocolMessageType, + options: CommandOptions, + returnAs?: typeof MongoDBResponse + ): AsyncGenerator { this.throwIfAborted(); if (typeof options.socketTimeoutMS === 'number') { @@ -428,7 +433,7 @@ export class Connection extends TypedEventEmitter { }); if (options.noResponse) { - yield { ok: 1 }; + yield MongoDBResponse.empty; return; } @@ -436,21 +441,9 @@ export class Connection extends TypedEventEmitter { for await (const response of this.readMany()) { this.socket.setTimeout(0); - response.parse(options); - - const [document] = response.documents; + const bson = response.parse(); - if (!Buffer.isBuffer(document)) { - const { session } = options; - if (session) { - updateSessionFromResponse(session, document); - } - - if (document.$clusterTime) { - this.clusterTime = document.$clusterTime; - this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime); - } - } + const document = new (returnAs ?? MongoDBResponse)(bson, 0, false); yield document; this.throwIfAborted(); @@ -469,7 +462,8 @@ export class Connection extends TypedEventEmitter { private async *sendCommand( ns: MongoDBNamespace, command: Document, - options: CommandOptions = {} + options: CommandOptions = {}, + returnAs?: typeof MongoDBResponse ) { const message = this.prepareCommand(ns.db, command, options); @@ -488,16 +482,26 @@ export class Connection extends TypedEventEmitter { let document; try { this.throwIfAborted(); - for await (document of this.sendWire(message, options)) { - if (!Buffer.isBuffer(document) && document.writeConcernError) { - throw new MongoWriteConcernError(document.writeConcernError, document); + for await (document of this.sendWire(message, options, returnAs)) { + if (options.session != null) { + updateSessionFromResponse(options.session, document); + } + + if (document.$clusterTime) { + this.clusterTime = document.$clusterTime; + this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime); + } + + if (document.has('writeConcernError')) { + const objectWithWriteConcernError = document.toObject(options); + throw new MongoWriteConcernError( + objectWithWriteConcernError.writeConcernError, + objectWithWriteConcernError + ); } - if ( - !Buffer.isBuffer(document) && - (document.ok === 0 || document.$err || document.errmsg || document.code) - ) { - throw new MongoServerError(document); + if (document.isError) { + throw new MongoServerError(document.toObject(options)); } if (this.shouldEmitAndLogCommand) { @@ -509,14 +513,29 @@ export class Connection extends TypedEventEmitter { new CommandSucceededEvent( this, message, - options.noResponse ? undefined : document, + options.noResponse ? undefined : document.toObject(options), started, this.description.serverConnectionId ) ); } - yield document; + if (returnAs == null) { + // If `documentsReturnedIn` not set or raw is not enabled, use input bson options + // Otherwise, support raw flag. Raw only works for cursors that hardcode firstBatch/nextBatch fields + const bsonOptions = + options.documentsReturnedIn == null || !options.raw + ? options + : { + ...options, + raw: false, + fieldsAsRaw: { [options.documentsReturnedIn]: true } + }; + yield document.toObject(bsonOptions); + } else { + yield document; + } + this.throwIfAborted(); } } catch (error) { @@ -530,7 +549,7 @@ export class Connection extends TypedEventEmitter { new CommandSucceededEvent( this, message, - options.noResponse ? undefined : document, + options.noResponse ? undefined : document?.toObject(options), started, this.description.serverConnectionId ) @@ -555,13 +574,35 @@ export class Connection extends TypedEventEmitter { } } + public async command( + ns: MongoDBNamespace, + command: Document, + options: CommandOptions | undefined, + returnAs: T | undefined + ): Promise>; + + public async command( + ns: MongoDBNamespace, + command: Document, + options?: undefined + ): Promise; + + public async command( + ns: MongoDBNamespace, + command: Document, + options: CommandOptions + ): Promise; + + public async command(ns: MongoDBNamespace, command: Document): Promise; + public async command( ns: MongoDBNamespace, command: Document, - options: CommandOptions = {} + options: CommandOptions = {}, + returnAs?: typeof MongoDBResponse ): Promise { this.throwIfAborted(); - for await (const document of this.sendCommand(ns, command, options)) { + for await (const document of this.sendCommand(ns, command, options, returnAs)) { return document; } throw new MongoUnexpectedServerResponseError('Unable to get response from server'); @@ -622,7 +663,7 @@ export class Connection extends TypedEventEmitter { * * Note that `for-await` loops call `return` automatically when the loop is exited. */ - private async *readMany(): AsyncGenerator { + private async *readMany(): AsyncGenerator { try { this.dataEvents = onData(this.messageStream); for await (const message of this.dataEvents) { @@ -687,11 +728,26 @@ export class CryptoConnection extends Connection { this.autoEncrypter = options.autoEncrypter; } - /** @internal @override */ - override async command( + public override async command( + ns: MongoDBNamespace, + command: Document, + options: CommandOptions | undefined, + returnAs: T + ): Promise>; + + public override async command( + ns: MongoDBNamespace, + command: Document, + options?: CommandOptions + ): Promise; + + public override async command(ns: MongoDBNamespace, command: Document): Promise; + + override async command( ns: MongoDBNamespace, cmd: Document, - options: CommandOptions + options?: CommandOptions, + returnAs?: T | undefined ): Promise { const { autoEncrypter } = this; if (!autoEncrypter) { @@ -701,7 +757,7 @@ export class CryptoConnection extends Connection { const serverWireVersion = maxWireVersion(this); if (serverWireVersion === 0) { // This means the initial handshake hasn't happened yet - return await super.command(ns, cmd, options); + return await super.command(ns, cmd, options, returnAs); } if (serverWireVersion < 8) { @@ -735,7 +791,7 @@ export class CryptoConnection extends Connection { } } - const response = await super.command(ns, encrypted, options); + const response = await super.command(ns, encrypted, options, returnAs); return await autoEncrypter.decrypt(response, options); } diff --git a/src/cmap/wire_protocol/compression.ts b/src/cmap/wire_protocol/compression.ts index 06c83f29123..b45ac219822 100644 --- a/src/cmap/wire_protocol/compression.ts +++ b/src/cmap/wire_protocol/compression.ts @@ -8,7 +8,7 @@ import { type MessageHeader, OpCompressedRequest, OpMsgResponse, - OpQueryResponse, + OpReply, type WriteProtocolMessageType } from '../commands'; import { OP_COMPRESSED, OP_MSG } from './constants'; @@ -163,9 +163,7 @@ export async function compressCommand( * * This method does not parse the response's BSON. */ -export async function decompressResponse( - message: Buffer -): Promise { +export async function decompressResponse(message: Buffer): Promise { const messageHeader: MessageHeader = { length: message.readInt32LE(0), requestId: message.readInt32LE(4), @@ -174,7 +172,7 @@ export async function decompressResponse( }; if (messageHeader.opCode !== OP_COMPRESSED) { - const ResponseType = messageHeader.opCode === OP_MSG ? OpMsgResponse : OpQueryResponse; + const ResponseType = messageHeader.opCode === OP_MSG ? OpMsgResponse : OpReply; const messageBody = message.subarray(MESSAGE_HEADER_SIZE); return new ResponseType(message, messageHeader, messageBody); } @@ -189,7 +187,7 @@ export async function decompressResponse( const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9); // recalculate based on wrapped opcode - const ResponseType = header.opCode === OP_MSG ? OpMsgResponse : OpQueryResponse; + const ResponseType = header.opCode === OP_MSG ? OpMsgResponse : OpReply; const messageBody = await decompress(compressorID, compressedBuffer); if (messageBody.length !== header.length) { throw new MongoDecompressionError('Message body and message header must be the same length'); diff --git a/src/cmap/wire_protocol/on_demand/document.ts b/src/cmap/wire_protocol/on_demand/document.ts index 96115a30848..ba8804fc6c8 100644 --- a/src/cmap/wire_protocol/on_demand/document.ts +++ b/src/cmap/wire_protocol/on_demand/document.ts @@ -23,6 +23,7 @@ const enum BSONElementOffset { length = 4 } +/** @internal */ export type JSTypeOf = { [BSONType.null]: null; [BSONType.undefined]: null; diff --git a/src/cmap/wire_protocol/responses.ts b/src/cmap/wire_protocol/responses.ts new file mode 100644 index 00000000000..d2b49e4961f --- /dev/null +++ b/src/cmap/wire_protocol/responses.ts @@ -0,0 +1,105 @@ +import { type BSONSerializeOptions, BSONType, type Document, type Timestamp } from '../../bson'; +import { type ClusterTime } from '../../sdam/common'; +import { OnDemandDocument } from './on_demand/document'; + +/** @internal */ +export class MongoDBResponse extends OnDemandDocument { + // {ok:1} + static empty = new MongoDBResponse(new Uint8Array([13, 0, 0, 0, 16, 111, 107, 0, 1, 0, 0, 0, 0])); + + /** Indicates this document is a server error */ + public get isError() { + let isError = this.ok === 0; + isError ||= this.has('$err'); + isError ||= this.has('errmsg'); + isError ||= this.has('code'); + return isError; + } + + /** + * Drivers can safely assume that the `recoveryToken` field is always a BSON document but drivers MUST NOT modify the + * contents of the document. + */ + get recoveryToken(): Document | null { + return ( + this.get('recoveryToken', BSONType.object)?.toObject({ + promoteValues: false, + promoteLongs: false, + promoteBuffers: false + }) ?? null + ); + } + + /** + * The server creates a cursor in response to a snapshot find/aggregate command and reports atClusterTime within the cursor field in the response. + * For the distinct command the server adds a top-level atClusterTime field to the response. + * The atClusterTime field represents the timestamp of the read and is guaranteed to be majority committed. + */ + public get atClusterTime(): Timestamp | null { + return ( + this.get('cursor', BSONType.object)?.get('atClusterTime', BSONType.timestamp) ?? + this.get('atClusterTime', BSONType.timestamp) ?? + null + ); + } + + public get operationTime(): Timestamp | null { + return this.get('operationTime', BSONType.timestamp); + } + + public get ok(): 0 | 1 { + return this.getNumber('ok') ? 1 : 0; + } + + public get $err(): string | null { + return this.get('$err', BSONType.string); + } + + public get errmsg(): string | null { + return this.get('errmsg', BSONType.string); + } + + public get code(): number | null { + return this.getNumber('code'); + } + + private clusterTime?: ClusterTime | null; + public get $clusterTime(): ClusterTime | null { + if (!('clusterTime' in this)) { + const clusterTimeDoc = this.get('$clusterTime', BSONType.object); + if (clusterTimeDoc == null) { + this.clusterTime = null; + return null; + } + const clusterTime = clusterTimeDoc.get('clusterTime', BSONType.timestamp, true); + const signature = clusterTimeDoc.get('signature', BSONType.object)?.toObject(); + // @ts-expect-error: `signature` is incorrectly typed. It is public API. + this.clusterTime = { clusterTime, signature }; + } + return this.clusterTime ?? null; + } + + public override toObject(options: BSONSerializeOptions = {}): Record { + const exactBSONOptions = { + useBigInt64: options.useBigInt64, + promoteLongs: options.promoteLongs, + promoteValues: options.promoteValues, + promoteBuffers: options.promoteBuffers, + bsonRegExp: options.bsonRegExp, + raw: options.raw ?? false, + fieldsAsRaw: options.fieldsAsRaw ?? {}, + validation: this.parseBsonSerializationOptions(options) + }; + return super.toObject(exactBSONOptions); + } + + private parseBsonSerializationOptions({ enableUtf8Validation }: BSONSerializeOptions): { + utf8: { writeErrors: false } | false; + } { + if (enableUtf8Validation === false) { + return { utf8: false }; + } + + return { utf8: { writeErrors: false } }; + } +} diff --git a/src/index.ts b/src/index.ts index 60ebd96067a..86634940c8f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -264,8 +264,7 @@ export type { OpMsgResponse, OpQueryOptions, OpQueryRequest, - OpQueryResponse, - OpResponseOptions, + OpReply, WriteProtocolMessageType } from './cmap/commands'; export type { HandshakeDocument } from './cmap/connect'; @@ -290,6 +289,8 @@ export type { ClientMetadata, ClientMetadataOptions } from './cmap/handshake/cli export type { ConnectionPoolMetrics } from './cmap/metrics'; export type { StreamDescription, StreamDescriptionOptions } from './cmap/stream_description'; export type { CompressorName } from './cmap/wire_protocol/compression'; +export type { JSTypeOf, OnDemandDocument } from './cmap/wire_protocol/on_demand/document'; +export type { MongoDBResponse } from './cmap/wire_protocol/responses'; export type { CollectionOptions, CollectionPrivate, ModifyResult } from './collection'; export type { COMMAND_FAILED, diff --git a/src/sessions.ts b/src/sessions.ts index 8f2892f0c0f..4822f0483a4 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -4,6 +4,7 @@ import { promisify } from 'util'; import { Binary, type Document, Long, type Timestamp } from './bson'; import type { CommandOptions, Connection } from './cmap/connection'; import { ConnectionPoolMetrics } from './cmap/metrics'; +import { type MongoDBResponse } from './cmap/wire_protocol/responses'; import { isSharded } from './cmap/wire_protocol/shared'; import { PINNED, UNPINNED } from './constants'; import type { AbstractCursor } from './cursor/abstract_cursor'; @@ -1040,7 +1041,7 @@ export function applySession( return; } -export function updateSessionFromResponse(session: ClientSession, document: Document): void { +export function updateSessionFromResponse(session: ClientSession, document: MongoDBResponse): void { if (document.$clusterTime) { _advanceClusterTime(session, document.$clusterTime); } @@ -1056,7 +1057,7 @@ export function updateSessionFromResponse(session: ClientSession, document: Docu if (session?.[kSnapshotEnabled] && session[kSnapshotTime] == null) { // find and aggregate commands return atClusterTime on the cursor // distinct includes it in the response body - const atClusterTime = document.cursor?.atClusterTime || document.atClusterTime; + const atClusterTime = document.atClusterTime; if (atClusterTime) { session[kSnapshotTime] = atClusterTime; } diff --git a/test/integration/node-specific/bson-options/raw.test.ts b/test/integration/node-specific/bson-options/raw.test.ts index 30d68415f40..91f6423f3f2 100644 --- a/test/integration/node-specific/bson-options/raw.test.ts +++ b/test/integration/node-specific/bson-options/raw.test.ts @@ -1,11 +1,6 @@ import { expect } from 'chai'; -import { - type Collection, - type MongoClient, - MongoCompatibilityError, - ObjectId -} from '../../../mongodb'; +import { type Collection, type MongoClient, ObjectId } from '../../../mongodb'; describe('raw bson support', () => { describe('raw', () => { @@ -29,14 +24,9 @@ describe('raw bson support', () => { .findOne({ myData: 23 }, passOptionTo === 'operation' ? option : undefined) .catch(error => error); - if (passOptionTo === 'client') { - // TODO(NODE-3946): When the raw option is passed to the client it crashed parsing hellos - // since they are returned as buffers and not js objects - expect(insertResult).to.be.instanceOf(MongoCompatibilityError); - } else { - expect(insertResult).to.have.property('insertedId').that.is.instanceOf(ObjectId); - expect(findOneResult).to.be.instanceOf(Buffer); - } + expect(insertResult).to.have.property('acknowledged').to.be.true; + expect(insertResult).to.have.property('insertedId').that.is.instanceOf(ObjectId); + expect(findOneResult).to.be.instanceOf(Buffer); } finally { await client.close(); } diff --git a/test/integration/node-specific/bson-options/utf8_validation.test.ts b/test/integration/node-specific/bson-options/utf8_validation.test.ts index 8151e63a33c..05e51d5277e 100644 --- a/test/integration/node-specific/bson-options/utf8_validation.test.ts +++ b/test/integration/node-specific/bson-options/utf8_validation.test.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { OpMsgResponse } from '../../../mongodb'; +import { MongoDBResponse } from '../../../mongodb'; const EXPECTED_VALIDATION_DISABLED_ARGUMENT = { utf8: false @@ -13,11 +13,11 @@ const EXPECTED_VALIDATION_ENABLED_ARGUMENT = { } }; -describe('class OpMsgResponse', () => { +describe('class MongoDBResponse', () => { let bsonSpy: sinon.SinonSpy; beforeEach(() => { - bsonSpy = sinon.spy(OpMsgResponse.prototype, 'parseBsonSerializationOptions'); + bsonSpy = sinon.spy(MongoDBResponse.prototype, 'parseBsonSerializationOptions'); }); afterEach(() => { diff --git a/test/mongodb.ts b/test/mongodb.ts index 35a2213da65..9f9f1185eba 100644 --- a/test/mongodb.ts +++ b/test/mongodb.ts @@ -131,6 +131,7 @@ export * from '../src/cmap/stream_description'; export * from '../src/cmap/wire_protocol/compression'; export * from '../src/cmap/wire_protocol/constants'; export * from '../src/cmap/wire_protocol/on_demand/document'; +export * from '../src/cmap/wire_protocol/responses'; export * from '../src/cmap/wire_protocol/shared'; export * from '../src/collection'; export * from '../src/connection_string'; diff --git a/test/types/connection.test-d.ts b/test/types/connection.test-d.ts new file mode 100644 index 00000000000..190016fa085 --- /dev/null +++ b/test/types/connection.test-d.ts @@ -0,0 +1,25 @@ +import { expectError, expectType } from 'tsd'; + +import { type Connection, type Document, MongoDBResponse, ns } from '../mongodb'; + +declare const connection: Connection; + +expectType(await connection.command(ns('a'), { cmd: 1 })); +expectType(await connection.command(ns('a'), { cmd: 1 }, undefined)); +expectType(await connection.command(ns('a'), { cmd: 1 }, { socketTimeoutMS: 1 })); +// TODO fix TS or simplify arguments +// expectType( +// await connection.command(ns('a'), { cmd: 1 }, { socketTimeoutMS: 1 }, undefined) +// ); + +class A extends MongoDBResponse { + myProperty = 0n; +} + +expectType(await connection.command(ns('a'), { cmd: 1 }, undefined, A)); +expectType(await connection.command(ns('a'), { cmd: 1 }, { socketTimeoutMS: 1 }, A)); +expectType( + (await connection.command(ns('a'), { cmd: 1 }, { socketTimeoutMS: 1 }, A)).myProperty +); + +expectError(await connection.command(ns('a'), { cmd: 1 }, { socketTimeoutMS: 1 }, Boolean)); diff --git a/test/unit/cmap/commands.test.js b/test/unit/cmap/commands.test.js index 496a4b5e300..6a0ced9fde1 100644 --- a/test/unit/cmap/commands.test.js +++ b/test/unit/cmap/commands.test.js @@ -1,5 +1,5 @@ const { expect } = require('chai'); -const { OpQueryResponse } = require('../../mongodb'); +const { OpReply } = require('../../mongodb'); describe('commands', function () { describe('Response', function () { @@ -16,7 +16,7 @@ describe('commands', function () { const body = Buffer.from([]); it('throws an exception', function () { - const response = new OpQueryResponse(message, header, body); + const response = new OpReply(message, header, body); expect(() => response.parse()).to.throw(RangeError, /outside buffer bounds/); }); }); @@ -33,8 +33,8 @@ describe('commands', function () { body.writeInt32LE(-1, 16); it('throws an exception', function () { - const response = new OpQueryResponse(message, header, body); - expect(() => response.parse()).to.throw(RangeError, /Invalid array length/); + const response = new OpReply(message, header, body); + expect(() => response.parse()).to.throw(RangeError, /Invalid array length/i); }); }); }); @@ -54,55 +54,55 @@ describe('commands', function () { it('does not throw an exception', function () { let error; try { - new OpQueryResponse(message, header, body); + new OpReply(message, header, body); } catch (err) { error = err; } expect(error).to.be.undefined; }); - it('initializes the documents to an empty array', function () { - const response = new OpQueryResponse(message, header, body); - expect(response.documents).to.be.empty; + it('initializes the sections to an empty array', function () { + const response = new OpReply(message, header, body); + expect(response.sections).to.be.empty; }); it('does not set the responseFlags', function () { - const response = new OpQueryResponse(message, header, body); + const response = new OpReply(message, header, body); expect(response.responseFlags).to.be.undefined; }); it('does not set the cursorNotFound flag', function () { - const response = new OpQueryResponse(message, header, body); + const response = new OpReply(message, header, body); expect(response.cursorNotFound).to.be.undefined; }); it('does not set the cursorId', function () { - const response = new OpQueryResponse(message, header, body); + const response = new OpReply(message, header, body); expect(response.cursorId).to.be.undefined; }); it('does not set startingFrom', function () { - const response = new OpQueryResponse(message, header, body); + const response = new OpReply(message, header, body); expect(response.startingFrom).to.be.undefined; }); it('does not set numberReturned', function () { - const response = new OpQueryResponse(message, header, body); + const response = new OpReply(message, header, body); expect(response.numberReturned).to.be.undefined; }); it('does not set queryFailure', function () { - const response = new OpQueryResponse(message, header, body); + const response = new OpReply(message, header, body); expect(response.queryFailure).to.be.undefined; }); it('does not set shardConfigStale', function () { - const response = new OpQueryResponse(message, header, body); + const response = new OpReply(message, header, body); expect(response.shardConfigStale).to.be.undefined; }); it('does not set awaitCapable', function () { - const response = new OpQueryResponse(message, header, body); + const response = new OpReply(message, header, body); expect(response.awaitCapable).to.be.undefined; }); }); diff --git a/test/unit/cmap/wire_protocol/on_demand/document.test.ts b/test/unit/cmap/wire_protocol/on_demand/document.test.ts index 100062dc3f2..6a7d5bb10cb 100644 --- a/test/unit/cmap/wire_protocol/on_demand/document.test.ts +++ b/test/unit/cmap/wire_protocol/on_demand/document.test.ts @@ -145,6 +145,18 @@ describe('class OnDemandDocument', () => { expect(document.get('b', BSONType.null)).to.be.null; }); + it('supports returning null for null and undefined bson elements', () => { + const bson = Uint8Array.from([ + ...[11, 0, 0, 0], // doc size + ...[6, 97, 0], // a: undefined (6) + ...[10, 98, 0], // b: null (10) + 0 // doc term + ]); + const document = new OnDemandDocument(bson, 0, false); + expect(document.get('a', BSONType.undefined)).to.be.null; + expect(document.get('b', BSONType.null)).to.be.null; + }); + it('supports returning int', () => { expect(document.get('int', BSONType.int, true)).to.deep.equal(input.int); }); @@ -171,6 +183,12 @@ describe('class OnDemandDocument', () => { ); }); + it('supports returning binData, subtype 2', () => { + expect(document.get('binDataSubtype2', BSONType.binData, true)).to.deep.equal( + input.binDataSubtype2 + ); + }); + it('supports returning bool', () => { expect(document.get('bool', BSONType.bool, true)).to.deep.equal(input.bool); }); diff --git a/test/unit/cmap/wire_protocol/responses.test.ts b/test/unit/cmap/wire_protocol/responses.test.ts new file mode 100644 index 00000000000..fda8401bc53 --- /dev/null +++ b/test/unit/cmap/wire_protocol/responses.test.ts @@ -0,0 +1,63 @@ +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { BSON, MongoDBResponse, OnDemandDocument } from '../../../mongodb'; + +describe('class MongoDBResponse', () => { + it('is a subclass of OnDemandDocument', () => { + expect(new MongoDBResponse(BSON.serialize({ ok: 1 }))).to.be.instanceOf(OnDemandDocument); + }); + + context('get isError', () => { + it('returns true when ok is 0', () => { + const doc = new MongoDBResponse(BSON.serialize({ ok: 0 })); + expect(doc.isError).to.be.true; + }); + + it('short circuits detection of $err, errmsg, code', () => { + const doc = new MongoDBResponse(BSON.serialize({ ok: 0 })); + expect(doc.isError).to.be.true; + expect(doc).to.not.have.property('cache.$err'); + expect(doc).to.not.have.property('cache.errmsg'); + expect(doc).to.not.have.property('cache.code'); + }); + }); + + context('utf8 validation', () => { + afterEach(() => sinon.restore()); + + context('when enableUtf8Validation is not specified', () => { + const options = { enableUtf8Validation: undefined }; + it('calls BSON deserialize with writeErrors validation turned off', () => { + const res = new MongoDBResponse(BSON.serialize({})); + const toObject = sinon.spy(Object.getPrototypeOf(Object.getPrototypeOf(res)), 'toObject'); + res.toObject(options); + expect(toObject).to.have.been.calledWith( + sinon.match({ validation: { utf8: { writeErrors: false } } }) + ); + }); + }); + + context('when enableUtf8Validation is true', () => { + const options = { enableUtf8Validation: true }; + it('calls BSON deserialize with writeErrors validation turned off', () => { + const res = new MongoDBResponse(BSON.serialize({})); + const toObject = sinon.spy(Object.getPrototypeOf(Object.getPrototypeOf(res)), 'toObject'); + res.toObject(options); + expect(toObject).to.have.been.calledWith( + sinon.match({ validation: { utf8: { writeErrors: false } } }) + ); + }); + }); + + context('when enableUtf8Validation is false', () => { + const options = { enableUtf8Validation: false }; + it('calls BSON deserialize with all validation disabled', () => { + const res = new MongoDBResponse(BSON.serialize({})); + const toObject = sinon.spy(Object.getPrototypeOf(Object.getPrototypeOf(res)), 'toObject'); + res.toObject(options); + expect(toObject).to.have.been.calledWith(sinon.match({ validation: { utf8: false } })); + }); + }); + }); +}); diff --git a/test/unit/commands.test.ts b/test/unit/commands.test.ts index e1444912c73..f6ba300b7aa 100644 --- a/test/unit/commands.test.ts +++ b/test/unit/commands.test.ts @@ -1,4 +1,3 @@ -import { BSONError, deserialize } from 'bson'; import { expect } from 'chai'; import * as sinon from 'sinon'; @@ -7,156 +6,14 @@ import * as compression from '../../src/cmap/wire_protocol/compression'; import { compress, Compressor, - type MessageHeader, OP_MSG, OP_QUERY, OpCompressedRequest, OpMsgRequest, - OpMsgResponse, OpQueryRequest, uncompressibleCommands } from '../mongodb'; -const msgHeader: MessageHeader = { - length: 735, - requestId: 14704565, - responseTo: 4, - opCode: 2013 -}; - -// when top-level key writeErrors contains an error message that has invalid utf8 -const invalidUtf8ErrorMsg = - '0000000000ca020000106e00000000000477726974654572726f727300a50200000330009d02000010696e646578000000000010636f646500f82a0000036b65795061747465726e000f0000001074657874000100000000036b657956616c756500610100000274657874005201000064e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e298830000026572726d736700f1000000453131303030206475706c6963617465206b6579206572726f7220636f6c6c656374696f6e3a20626967646174612e7465737420696e6465783a20746578745f3120647570206b65793a207b20746578743a202264e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e2982e2e2e22207d000000016f6b00000000000000f03f00'; -const msgBodyInvalidUtf8WriteErrors = Buffer.from(invalidUtf8ErrorMsg, 'hex'); -const invalidUtf8ErrorMsgDeserializeInput = Buffer.from(invalidUtf8ErrorMsg.substring(10), 'hex'); -const invalidUtf8InWriteErrorsJSON = { - n: 0, - writeErrors: [ - { - index: 0, - code: 11000, - keyPattern: { - text: 1 - }, - keyValue: { - text: 'd☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃' - }, - errmsg: - 'E11000 duplicate key error collection: bigdata.test index: text_1 dup key: { text: "d☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃☃�..." }' - } - ], - ok: 1 -}; - -// when another top-level key besides writeErrors has invalid utf8 -const nKeyWithInvalidUtf8 = - '0000000000cc020000026e0005000000f09f98ff000477726974654572726f727300a60200000330009e02000010696e646578000000000010636f646500f82a0000036b65795061747465726e000f0000001074657874000100000000036b657956616c756500610100000274657874005201000064e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e298830000026572726d736700f2000000453131303030206475706c6963617465206b6579206572726f7220636f6c6c656374696f6e3a20626967646174612e7465737420696e6465783a20746578745f3120647570206b65793a207b20746578743a202264e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883e29883efbfbd2e2e2e22207d000000106f6b000100000000'; -const msgBodyNKeyWithInvalidUtf8 = Buffer.from(nKeyWithInvalidUtf8, 'hex'); - -describe('BinMsg BSON utf8 validation', () => { - it('correctly deserializes data with replacement characters for invalid utf8 in writeErrors object', () => { - // this is a sanity check to make sure nothing unexpected is happening in the deserialize method itself - - const options = { validation: { utf8: { writeErrors: false } as const } }; - const deserializerCall = () => deserialize(invalidUtf8ErrorMsgDeserializeInput, options); - expect(deserializerCall()).to.deep.equals(invalidUtf8InWriteErrorsJSON); - }); - - context('when enableUtf8Validation option is not specified', () => { - const binMsgInvalidUtf8ErrorMsg = new OpMsgResponse( - Buffer.alloc(0), - msgHeader, - msgBodyInvalidUtf8WriteErrors - ); - - const options = {}; - it('does not validate the writeErrors key', () => { - expect(() => binMsgInvalidUtf8ErrorMsg.parse(options)).to.not.throw(); - }); - - it('validates keys other than the writeErrors key', () => { - const binMsgAnotherKeyWithInvalidUtf8 = new OpMsgResponse( - Buffer.alloc(0), - msgHeader, - msgBodyNKeyWithInvalidUtf8 - ); - expect(() => binMsgAnotherKeyWithInvalidUtf8.parse(options)).to.throw( - BSONError, - 'Invalid UTF-8 string in BSON document' - ); - }); - }); - - context('when validation is disabled', () => { - const binMsgInvalidUtf8ErrorMsg = new OpMsgResponse( - Buffer.alloc(0), - msgHeader, - msgBodyInvalidUtf8WriteErrors - ); - - const options = { enableUtf8Validation: false }; - it('should not validate the writeErrors key', () => { - expect(() => binMsgInvalidUtf8ErrorMsg.parse(options)).to.not.throw(); - }); - - it('does not validate keys other than the writeErrors key', () => { - const binMsgAnotherKeyWithInvalidUtf8 = new OpMsgResponse( - Buffer.alloc(0), - msgHeader, - msgBodyNKeyWithInvalidUtf8 - ); - expect(() => binMsgAnotherKeyWithInvalidUtf8.parse(options)).to.not.throw( - BSONError, - 'Invalid UTF-8 string in BSON document' - ); - }); - }); - - it('disables validation by default for writeErrors if no validation specified', () => { - const binMsgInvalidUtf8ErrorMsg = new OpMsgResponse( - Buffer.alloc(0), - msgHeader, - msgBodyInvalidUtf8WriteErrors - ); - const options = { - bsonRegExp: false, - promoteBuffers: false, - promoteLongs: true, - promoteValues: true - }; - - expect(() => binMsgInvalidUtf8ErrorMsg.parse(options)).to.not.throw(); - }); - - context('utf8 validation enabled', () => { - const options = { enableUtf8Validation: true }; - it('does not validate the writeErrors key', () => { - const binMsgInvalidUtf8ErrorMsg = new OpMsgResponse( - Buffer.alloc(0), - msgHeader, - msgBodyInvalidUtf8WriteErrors - ); - - expect(() => binMsgInvalidUtf8ErrorMsg.parse(options)).not.to.throw( - BSONError, - 'Invalid UTF-8 string in BSON document' - ); - }); - - it('validates keys other than the writeErrors key', () => { - const binMsgAnotherKeyWithInvalidUtf8 = new OpMsgResponse( - Buffer.alloc(0), - msgHeader, - msgBodyNKeyWithInvalidUtf8 - ); - expect(() => binMsgAnotherKeyWithInvalidUtf8.parse(options)).to.throw( - BSONError, - 'Invalid UTF-8 string in BSON document' - ); - }); - }); -}); - describe('class OpCompressedRequest', () => { context('canCompress()', () => { for (const command of uncompressibleCommands) { From caab9b74fbbc8296b33ed1e815bfc1a367715afc Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 4 Apr 2024 14:26:26 -0400 Subject: [PATCH 2/8] chore: some comments --- src/cmap/connection.ts | 24 +++++++++---------- src/cmap/wire_protocol/responses.ts | 3 +-- .../unit/cmap/wire_protocol/responses.test.ts | 15 ++++++++++++ 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 34c928d517f..954cd262869 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -416,7 +416,7 @@ export class Connection extends TypedEventEmitter { private async *sendWire( message: WriteProtocolMessageType, options: CommandOptions, - returnAs?: typeof MongoDBResponse + responseType?: typeof MongoDBResponse ): AsyncGenerator { this.throwIfAborted(); @@ -443,7 +443,7 @@ export class Connection extends TypedEventEmitter { this.socket.setTimeout(0); const bson = response.parse(); - const document = new (returnAs ?? MongoDBResponse)(bson, 0, false); + const document = new (responseType ?? MongoDBResponse)(bson, 0, false); yield document; this.throwIfAborted(); @@ -463,7 +463,7 @@ export class Connection extends TypedEventEmitter { ns: MongoDBNamespace, command: Document, options: CommandOptions = {}, - returnAs?: typeof MongoDBResponse + responseType?: typeof MongoDBResponse ) { const message = this.prepareCommand(ns.db, command, options); @@ -482,7 +482,7 @@ export class Connection extends TypedEventEmitter { let document; try { this.throwIfAborted(); - for await (document of this.sendWire(message, options, returnAs)) { + for await (document of this.sendWire(message, options, responseType)) { if (options.session != null) { updateSessionFromResponse(options.session, document); } @@ -520,7 +520,7 @@ export class Connection extends TypedEventEmitter { ); } - if (returnAs == null) { + if (responseType == null) { // If `documentsReturnedIn` not set or raw is not enabled, use input bson options // Otherwise, support raw flag. Raw only works for cursors that hardcode firstBatch/nextBatch fields const bsonOptions = @@ -578,7 +578,7 @@ export class Connection extends TypedEventEmitter { ns: MongoDBNamespace, command: Document, options: CommandOptions | undefined, - returnAs: T | undefined + responseType: T | undefined ): Promise>; public async command( @@ -599,10 +599,10 @@ export class Connection extends TypedEventEmitter { ns: MongoDBNamespace, command: Document, options: CommandOptions = {}, - returnAs?: typeof MongoDBResponse + responseType?: typeof MongoDBResponse ): Promise { this.throwIfAborted(); - for await (const document of this.sendCommand(ns, command, options, returnAs)) { + for await (const document of this.sendCommand(ns, command, options, responseType)) { return document; } throw new MongoUnexpectedServerResponseError('Unable to get response from server'); @@ -732,7 +732,7 @@ export class CryptoConnection extends Connection { ns: MongoDBNamespace, command: Document, options: CommandOptions | undefined, - returnAs: T + responseType: T ): Promise>; public override async command( @@ -747,7 +747,7 @@ export class CryptoConnection extends Connection { ns: MongoDBNamespace, cmd: Document, options?: CommandOptions, - returnAs?: T | undefined + responseType?: T | undefined ): Promise { const { autoEncrypter } = this; if (!autoEncrypter) { @@ -757,7 +757,7 @@ export class CryptoConnection extends Connection { const serverWireVersion = maxWireVersion(this); if (serverWireVersion === 0) { // This means the initial handshake hasn't happened yet - return await super.command(ns, cmd, options, returnAs); + return await super.command(ns, cmd, options, responseType); } if (serverWireVersion < 8) { @@ -791,7 +791,7 @@ export class CryptoConnection extends Connection { } } - const response = await super.command(ns, encrypted, options, returnAs); + const response = await super.command(ns, encrypted, options, responseType); return await autoEncrypter.decrypt(response, options); } diff --git a/src/cmap/wire_protocol/responses.ts b/src/cmap/wire_protocol/responses.ts index d2b49e4961f..03f15e4820e 100644 --- a/src/cmap/wire_protocol/responses.ts +++ b/src/cmap/wire_protocol/responses.ts @@ -38,8 +38,7 @@ export class MongoDBResponse extends OnDemandDocument { public get atClusterTime(): Timestamp | null { return ( this.get('cursor', BSONType.object)?.get('atClusterTime', BSONType.timestamp) ?? - this.get('atClusterTime', BSONType.timestamp) ?? - null + this.get('atClusterTime', BSONType.timestamp) ); } diff --git a/test/unit/cmap/wire_protocol/responses.test.ts b/test/unit/cmap/wire_protocol/responses.test.ts index fda8401bc53..fc5ee88ae16 100644 --- a/test/unit/cmap/wire_protocol/responses.test.ts +++ b/test/unit/cmap/wire_protocol/responses.test.ts @@ -14,6 +14,21 @@ describe('class MongoDBResponse', () => { expect(doc.isError).to.be.true; }); + it('returns true when $err is defined', () => { + const doc = new MongoDBResponse(BSON.serialize({ $err: 0 })); + expect(doc.isError).to.be.true; + }); + + it('returns true when errmsg is defined', () => { + const doc = new MongoDBResponse(BSON.serialize({ errmsg: 0 })); + expect(doc.isError).to.be.true; + }); + + it('returns true when code is defined', () => { + const doc = new MongoDBResponse(BSON.serialize({ code: 0 })); + expect(doc.isError).to.be.true; + }); + it('short circuits detection of $err, errmsg, code', () => { const doc = new MongoDBResponse(BSON.serialize({ ok: 0 })); expect(doc.isError).to.be.true; From 072b200b350c5cb97a5585f8502acf8205411b2d Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 4 Apr 2024 15:34:04 -0400 Subject: [PATCH 3/8] fix: type overloads for command --- src/cmap/connection.ts | 30 ++++++++++------------------- src/cmap/wire_protocol/responses.ts | 5 +++++ src/index.ts | 2 +- test/types/connection.test-d.ts | 4 ---- 4 files changed, 16 insertions(+), 25 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 954cd262869..d2be408a1d1 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -62,7 +62,7 @@ import type { ClientMetadata } from './handshake/client_metadata'; import { StreamDescription, type StreamDescriptionOptions } from './stream_description'; import { type CompressorName, decompressResponse } from './wire_protocol/compression'; import { onData } from './wire_protocol/on_data'; -import { MongoDBResponse } from './wire_protocol/responses'; +import { MongoDBResponse, type MongoDBResponseConstructor } from './wire_protocol/responses'; import { getReadPreference, isSharded } from './wire_protocol/shared'; /** @internal */ @@ -416,7 +416,7 @@ export class Connection extends TypedEventEmitter { private async *sendWire( message: WriteProtocolMessageType, options: CommandOptions, - responseType?: typeof MongoDBResponse + responseType?: MongoDBResponseConstructor ): AsyncGenerator { this.throwIfAborted(); @@ -462,8 +462,8 @@ export class Connection extends TypedEventEmitter { private async *sendCommand( ns: MongoDBNamespace, command: Document, - options: CommandOptions = {}, - responseType?: typeof MongoDBResponse + options: CommandOptions, + responseType?: MongoDBResponseConstructor ) { const message = this.prepareCommand(ns.db, command, options); @@ -574,32 +574,24 @@ export class Connection extends TypedEventEmitter { } } - public async command( + public async command( ns: MongoDBNamespace, command: Document, options: CommandOptions | undefined, responseType: T | undefined - ): Promise>; - - public async command( - ns: MongoDBNamespace, - command: Document, - options?: undefined - ): Promise; + ): Promise>; public async command( ns: MongoDBNamespace, command: Document, - options: CommandOptions + options?: CommandOptions ): Promise; - public async command(ns: MongoDBNamespace, command: Document): Promise; - public async command( ns: MongoDBNamespace, command: Document, options: CommandOptions = {}, - responseType?: typeof MongoDBResponse + responseType?: MongoDBResponseConstructor ): Promise { this.throwIfAborted(); for await (const document of this.sendCommand(ns, command, options, responseType)) { @@ -728,7 +720,7 @@ export class CryptoConnection extends Connection { this.autoEncrypter = options.autoEncrypter; } - public override async command( + public override async command( ns: MongoDBNamespace, command: Document, options: CommandOptions | undefined, @@ -741,9 +733,7 @@ export class CryptoConnection extends Connection { options?: CommandOptions ): Promise; - public override async command(ns: MongoDBNamespace, command: Document): Promise; - - override async command( + override async command( ns: MongoDBNamespace, cmd: Document, options?: CommandOptions, diff --git a/src/cmap/wire_protocol/responses.ts b/src/cmap/wire_protocol/responses.ts index 03f15e4820e..1f3be513181 100644 --- a/src/cmap/wire_protocol/responses.ts +++ b/src/cmap/wire_protocol/responses.ts @@ -2,6 +2,11 @@ import { type BSONSerializeOptions, BSONType, type Document, type Timestamp } fr import { type ClusterTime } from '../../sdam/common'; import { OnDemandDocument } from './on_demand/document'; +/** @internal */ +export type MongoDBResponseConstructor = { + new (bson: Uint8Array, offset?: number, isArray?: boolean): MongoDBResponse; +}; + /** @internal */ export class MongoDBResponse extends OnDemandDocument { // {ok:1} diff --git a/src/index.ts b/src/index.ts index 86634940c8f..795f6835c8c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -290,7 +290,7 @@ export type { ConnectionPoolMetrics } from './cmap/metrics'; export type { StreamDescription, StreamDescriptionOptions } from './cmap/stream_description'; export type { CompressorName } from './cmap/wire_protocol/compression'; export type { JSTypeOf, OnDemandDocument } from './cmap/wire_protocol/on_demand/document'; -export type { MongoDBResponse } from './cmap/wire_protocol/responses'; +export type { MongoDBResponse, MongoDBResponseConstructor } from './cmap/wire_protocol/responses'; export type { CollectionOptions, CollectionPrivate, ModifyResult } from './collection'; export type { COMMAND_FAILED, diff --git a/test/types/connection.test-d.ts b/test/types/connection.test-d.ts index 190016fa085..25953c5e16f 100644 --- a/test/types/connection.test-d.ts +++ b/test/types/connection.test-d.ts @@ -7,10 +7,6 @@ declare const connection: Connection; expectType(await connection.command(ns('a'), { cmd: 1 })); expectType(await connection.command(ns('a'), { cmd: 1 }, undefined)); expectType(await connection.command(ns('a'), { cmd: 1 }, { socketTimeoutMS: 1 })); -// TODO fix TS or simplify arguments -// expectType( -// await connection.command(ns('a'), { cmd: 1 }, { socketTimeoutMS: 1 }, undefined) -// ); class A extends MongoDBResponse { myProperty = 0n; From 15d8b615da2084ec24750aed407e4138759a9642 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 4 Apr 2024 16:27:35 -0400 Subject: [PATCH 4/8] chore: ensure command monitoring receives the same object instance --- src/cmap/connection.ts | 41 +++++++++--------- .../connection.test.ts | 43 +++++++++++++++++++ 2 files changed, 64 insertions(+), 20 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index d2be408a1d1..b14a8a3a9f6 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -479,7 +479,21 @@ export class Connection extends TypedEventEmitter { ); } - let document; + // If `documentsReturnedIn` not set or raw is not enabled, use input bson options + // Otherwise, support raw flag. Raw only works for cursors that hardcode firstBatch/nextBatch fields + const bsonOptions = + options.documentsReturnedIn == null || !options.raw + ? options + : { + ...options, + raw: false, + fieldsAsRaw: { [options.documentsReturnedIn]: true } + }; + + /** MongoDBResponse instance or subclass */ + let document: MongoDBResponse | undefined = undefined; + /** Cached result of a toObject call */ + let object: Document | undefined = undefined; try { this.throwIfAborted(); for await (document of this.sendWire(message, options, responseType)) { @@ -493,15 +507,12 @@ export class Connection extends TypedEventEmitter { } if (document.has('writeConcernError')) { - const objectWithWriteConcernError = document.toObject(options); - throw new MongoWriteConcernError( - objectWithWriteConcernError.writeConcernError, - objectWithWriteConcernError - ); + object ??= document.toObject(bsonOptions); + throw new MongoWriteConcernError(object.writeConcernError, object); } if (document.isError) { - throw new MongoServerError(document.toObject(options)); + throw new MongoServerError((object ??= document.toObject(bsonOptions))); } if (this.shouldEmitAndLogCommand) { @@ -513,7 +524,7 @@ export class Connection extends TypedEventEmitter { new CommandSucceededEvent( this, message, - options.noResponse ? undefined : document.toObject(options), + options.noResponse ? undefined : (object ??= document.toObject(bsonOptions)), started, this.description.serverConnectionId ) @@ -521,17 +532,7 @@ export class Connection extends TypedEventEmitter { } if (responseType == null) { - // If `documentsReturnedIn` not set or raw is not enabled, use input bson options - // Otherwise, support raw flag. Raw only works for cursors that hardcode firstBatch/nextBatch fields - const bsonOptions = - options.documentsReturnedIn == null || !options.raw - ? options - : { - ...options, - raw: false, - fieldsAsRaw: { [options.documentsReturnedIn]: true } - }; - yield document.toObject(bsonOptions); + yield (object ??= document.toObject(bsonOptions)); } else { yield document; } @@ -549,7 +550,7 @@ export class Connection extends TypedEventEmitter { new CommandSucceededEvent( this, message, - options.noResponse ? undefined : document?.toObject(options), + options.noResponse ? undefined : (object ??= document?.toObject(bsonOptions)), started, this.description.serverConnectionId ) diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.ts b/test/integration/connection-monitoring-and-pooling/connection.test.ts index 421a9e02bbf..1192dfdbcd4 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection.test.ts @@ -14,6 +14,7 @@ import { makeClientMetadata, MongoClient, MongoClientAuthProviders, + MongoDBResponse, MongoServerError, ns, ServerHeartbeatStartedEvent, @@ -101,6 +102,48 @@ describe('Connection', function () { } } }); + + afterEach(() => sinon.restore()); + + it('command monitoring event do not deserialize more than once', { + metadata: { requires: { apiVersion: false, topology: '!load-balanced' } }, + test: async function () { + const connectOptions: ConnectionOptions = { + ...commonConnectOptions, + connectionType: Connection, + ...this.configuration.options, + monitorCommands: true, + metadata: makeClientMetadata({ driverInfo: {} }), + extendedMetadata: addContainerMetadata(makeClientMetadata({ driverInfo: {} })) + }; + + let conn; + try { + conn = await connect(connectOptions); + + const toObjectSpy = sinon.spy(MongoDBResponse.prototype, 'toObject'); + + const events: any[] = []; + conn.on('commandStarted', event => events.push(event)); + conn.on('commandSucceeded', event => events.push(event)); + conn.on('commandFailed', event => events.push(event)); + + const hello = await conn.command(ns('admin.$cmd'), { ping: 1 }); + expect(toObjectSpy).to.have.been.calledOnce; + expect(hello).to.have.property('ok', 1); + expect(events).to.have.lengthOf(2); + + toObjectSpy.resetHistory(); + + const garbage = await conn.command(ns('admin.$cmd'), { garbage: 1 }).catch(e => e); + expect(toObjectSpy).to.have.been.calledOnce; + expect(garbage).to.have.property('ok', 0); + expect(events).to.have.lengthOf(4); + } finally { + conn?.destroy(); + } + } + }); }); describe('Connection - functional', function () { From 6ee9150fa2115e2fb7565f219c54edab2f722309 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 4 Apr 2024 17:12:19 -0400 Subject: [PATCH 5/8] test: add UTF integration test --- .../bson-options/utf8_validation.test.ts | 47 ++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/test/integration/node-specific/bson-options/utf8_validation.test.ts b/test/integration/node-specific/bson-options/utf8_validation.test.ts index 05e51d5277e..43ca5ff9d06 100644 --- a/test/integration/node-specific/bson-options/utf8_validation.test.ts +++ b/test/integration/node-specific/bson-options/utf8_validation.test.ts @@ -1,7 +1,13 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { MongoDBResponse } from '../../../mongodb'; +import { + BSON, + type MongoClient, + MongoDBResponse, + MongoServerError, + OpMsgResponse +} from '../../../mongodb'; const EXPECTED_VALIDATION_DISABLED_ARGUMENT = { utf8: false @@ -107,4 +113,43 @@ describe('class MongoDBResponse', () => { }); } }); + + context( + 'when the server is given a long multibyte utf sequence and there is a writeError', + () => { + let client: MongoClient; + beforeEach(async function () { + client = this.configuration.newClient(); + }); + + afterEach(async function () { + sinon.restore(); + await client.db('parsing').dropDatabase(); + await client.close(); + }); + + it('does not throw a UTF-8 parsing error', async () => { + // Insert a large string of multibyte UTF-8 characters + const _id = '\u{1F92A}'.repeat(1000); + + const test = client.db('parsing').collection<{ _id: string }>('parsing'); + await test.insertOne({ _id }); + + const spy = sinon.spy(OpMsgResponse.prototype, 'parse'); + + const error = await test.insertOne({ _id }).catch(error => error); + + // Check that the server sent us broken BSON (bad UTF) + expect(() => { + BSON.deserialize(spy.returnValues[0], { validation: { utf8: true } }); + }).to.throw(BSON.BSONError, /Invalid UTF/i); + + // Assert the driver squashed it + expect(error).to.be.instanceOf(MongoServerError); + expect(error.message).to.match(/duplicate/i); + expect(error.message).to.not.match(/utf/i); + expect(error.errmsg).to.include('\uFFFD'); + }); + } + ); }); From d5621c9c3553ba479f84065c2b948710b9616716 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 4 Apr 2024 17:51:32 -0400 Subject: [PATCH 6/8] docs: add comment about $err --- src/cmap/wire_protocol/responses.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmap/wire_protocol/responses.ts b/src/cmap/wire_protocol/responses.ts index 1f3be513181..b776a4de568 100644 --- a/src/cmap/wire_protocol/responses.ts +++ b/src/cmap/wire_protocol/responses.ts @@ -15,9 +15,9 @@ export class MongoDBResponse extends OnDemandDocument { /** Indicates this document is a server error */ public get isError() { let isError = this.ok === 0; - isError ||= this.has('$err'); isError ||= this.has('errmsg'); isError ||= this.has('code'); + isError ||= this.has('$err'); // The '$err' field is used in OP_REPLY responses return isError; } From c24f57647734f45df6c7355cf7a34e1c12b49d74 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 4 Apr 2024 18:11:19 -0400 Subject: [PATCH 7/8] test: make utf test work on older servers --- .../node-specific/bson-options/utf8_validation.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/node-specific/bson-options/utf8_validation.test.ts b/test/integration/node-specific/bson-options/utf8_validation.test.ts index 43ca5ff9d06..5c3f94e7fb7 100644 --- a/test/integration/node-specific/bson-options/utf8_validation.test.ts +++ b/test/integration/node-specific/bson-options/utf8_validation.test.ts @@ -130,7 +130,7 @@ describe('class MongoDBResponse', () => { it('does not throw a UTF-8 parsing error', async () => { // Insert a large string of multibyte UTF-8 characters - const _id = '\u{1F92A}'.repeat(1000); + const _id = '\u{1F92A}'.repeat(100); const test = client.db('parsing').collection<{ _id: string }>('parsing'); await test.insertOne({ _id }); From 199d77b34d832e2bc13cb9a52e59b6c52fde458b Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 5 Apr 2024 12:39:03 -0400 Subject: [PATCH 8/8] reset cache object --- src/cmap/connection.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index b14a8a3a9f6..3b54f02aaa3 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -497,6 +497,7 @@ export class Connection extends TypedEventEmitter { try { this.throwIfAborted(); for await (document of this.sendWire(message, options, responseType)) { + object = undefined; if (options.session != null) { updateSessionFromResponse(options.session, document); }