diff --git a/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonDocument.java b/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonDocument.java index 5ab265c2bc8..70ed10a75a8 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonDocument.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonDocument.java @@ -53,38 +53,31 @@ final class ByteBufBsonDocument extends BsonDocument { private final transient ByteBuf byteBuf; - static List createList(final ByteBufferBsonOutput bsonOutput, final int startPosition) { - List duplicateByteBuffers = bsonOutput.getByteBuffers(); - CompositeByteBuf outputByteBuf = new CompositeByteBuf(duplicateByteBuffers); - outputByteBuf.position(startPosition); + /** + * Create a list of ByteBufBsonDocument from a buffer positioned at the start of the first document of an OP_MSG Section + * of type Document Sequence (Kind 1). + *

+ * The provided buffer will be positioned at the end of the section upon normal completion of the method + */ + static List createList(final ByteBuf outputByteBuf) { List documents = new ArrayList<>(); - int curDocumentStartPosition = startPosition; while (outputByteBuf.hasRemaining()) { - int documentSizeInBytes = outputByteBuf.getInt(); - ByteBuf slice = outputByteBuf.duplicate(); - slice.position(curDocumentStartPosition); - slice.limit(curDocumentStartPosition + documentSizeInBytes); - documents.add(new ByteBufBsonDocument(slice)); - curDocumentStartPosition += documentSizeInBytes; - outputByteBuf.position(outputByteBuf.position() + documentSizeInBytes - 4); - } - for (ByteBuf byteBuffer : duplicateByteBuffers) { - byteBuffer.release(); + ByteBufBsonDocument curDocument = createOne(outputByteBuf); + documents.add(curDocument); } return documents; } - static ByteBufBsonDocument createOne(final ByteBufferBsonOutput bsonOutput, final int startPosition) { - List duplicateByteBuffers = bsonOutput.getByteBuffers(); - CompositeByteBuf outputByteBuf = new CompositeByteBuf(duplicateByteBuffers); - outputByteBuf.position(startPosition); + /** + * Create a ByteBufBsonDocument from a buffer positioned at the start of a BSON document. + * The provided buffer will be positioned at the end of the document upon normal completion of the method + */ + static ByteBufBsonDocument createOne(final ByteBuf outputByteBuf) { + int documentStart = outputByteBuf.position(); int documentSizeInBytes = outputByteBuf.getInt(); - ByteBuf slice = outputByteBuf.duplicate(); - slice.position(startPosition); - slice.limit(startPosition + documentSizeInBytes); - for (ByteBuf byteBuffer : duplicateByteBuffers) { - byteBuffer.release(); - } + int documentEnd = documentStart + documentSizeInBytes; + ByteBuf slice = outputByteBuf.duplicate().position(documentStart).limit(documentEnd); + outputByteBuf.position(documentEnd); return new ByteBufBsonDocument(slice); } @@ -138,10 +131,6 @@ T findInDocument(final Finder finder) { return finder.notFound(); } - int getSizeInBytes() { - return byteBuf.getInt(byteBuf.position()); - } - BsonDocument toBaseBsonDocument() { ByteBuf duplicateByteBuf = byteBuf.duplicate(); try (BsonBinaryReader bsonReader = new BsonBinaryReader(new ByteBufferBsonInput(duplicateByteBuf))) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java index 24b30d60acb..901d7e9bb30 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java +++ b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java @@ -17,6 +17,7 @@ package com.mongodb.internal.connection; import com.mongodb.MongoClientException; +import com.mongodb.MongoInternalException; import com.mongodb.MongoNamespace; import com.mongodb.ReadPreference; import com.mongodb.ServerApi; @@ -30,9 +31,12 @@ import org.bson.BsonElement; import org.bson.BsonInt64; import org.bson.BsonString; +import org.bson.ByteBuf; import org.bson.FieldNameValidator; import org.bson.io.BsonOutput; +import java.io.ByteArrayOutputStream; +import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -46,6 +50,8 @@ import static com.mongodb.connection.ServerType.SHARD_ROUTER; import static com.mongodb.connection.ServerType.STANDALONE; import static com.mongodb.internal.connection.BsonWriterHelper.writePayload; +import static com.mongodb.internal.connection.ByteBufBsonDocument.createList; +import static com.mongodb.internal.connection.ByteBufBsonDocument.createOne; import static com.mongodb.internal.connection.ReadConcernHelper.getReadConcernDocument; import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_TWO_WIRE_VERSION; import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_ZERO_WIRE_VERSION; @@ -107,30 +113,76 @@ public final class CommandMessage extends RequestMessage { this.serverApi = serverApi; } + /** + * Create a BsonDocument representing the logical document encoded by an OP_MSG. + *

+ * The returned document will contain all the fields from the Body (Kind 0) Section, as well as all fields represented by + * OP_MSG Document Sequence (Kind 1) Sections. + */ BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) { - ByteBufBsonDocument byteBufBsonDocument = ByteBufBsonDocument.createOne(bsonOutput, - getEncodingMetadata().getFirstDocumentPosition()); - BsonDocument commandBsonDocument; - - if (containsPayload()) { - commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument(); - - int payloadStartPosition = getEncodingMetadata().getFirstDocumentPosition() - + byteBufBsonDocument.getSizeInBytes() - + 1 // payload type - + 4 // payload size - + payload.getPayloadName().getBytes(StandardCharsets.UTF_8).length + 1; // null-terminated UTF-8 payload name - commandBsonDocument.append(payload.getPayloadName(), - new BsonArray(ByteBufBsonDocument.createList(bsonOutput, payloadStartPosition))); - } else { - commandBsonDocument = byteBufBsonDocument; + List byteBuffers = bsonOutput.getByteBuffers(); + try { + CompositeByteBuf byteBuf = new CompositeByteBuf(byteBuffers); + try { + byteBuf.position(getEncodingMetadata().getFirstDocumentPosition()); + ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf); + + // If true, it means there is at least one Kind 1:Document Sequence in the OP_MSG + if (byteBuf.hasRemaining()) { + BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument(); + + // Each loop iteration processes one Document Sequence + // When there are no more bytes remaining, there are no more Document Sequences + while (byteBuf.hasRemaining()) { + // skip reading the payload type, we know it is 1 + byteBuf.position(byteBuf.position() + 1); + int sequenceStart = byteBuf.position(); + int sequenceSizeInBytes = byteBuf.getInt(); + int sectionEnd = sequenceStart + sequenceSizeInBytes; + + String fieldName = getSequenceIdentifier(byteBuf); + // If this assertion fires, it means that the driver has started using document sequences for nested fields. If + // so, this method will need to change in order to append the value to the correct nested document. + assertFalse(fieldName.contains(".")); + + ByteBuf documentsByteBufSlice = byteBuf.duplicate().limit(sectionEnd); + try { + commandBsonDocument.append(fieldName, new BsonArray(createList(documentsByteBufSlice))); + } finally { + documentsByteBufSlice.release(); + } + byteBuf.position(sectionEnd); + } + return commandBsonDocument; + } else { + return byteBufBsonDocument; + } + } finally { + byteBuf.release(); + } + } finally { + byteBuffers.forEach(ByteBuf::release); } - - return commandBsonDocument; } - boolean containsPayload() { - return payload != null; + /** + * Get the field name from a buffer positioned at the start of the document sequence identifier of an OP_MSG Section of type + * Document Sequence (Kind 1). + *

+ * Upon normal completion of the method, the buffer will be positioned at the start of the first BSON object in the sequence. + */ + private String getSequenceIdentifier(final ByteBuf byteBuf) { + ByteArrayOutputStream sequenceIdentifierBytes = new ByteArrayOutputStream(); + byte curByte = byteBuf.get(); + while (curByte != 0) { + sequenceIdentifierBytes.write(curByte); + curByte = byteBuf.get(); + } + try { + return sequenceIdentifierBytes.toString(StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + throw new MongoInternalException("Unexpected exception", e); + } } boolean isResponseExpected() {