Skip to content

Commit 9a3696e

Browse files
committed
Extracting logic from InternalStreamConnection
1 parent d4df11b commit 9a3696e

File tree

4 files changed

+141
-128
lines changed

4 files changed

+141
-128
lines changed

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 10 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,13 @@
2323
import com.mongodb.MongoException;
2424
import com.mongodb.MongoInternalException;
2525
import com.mongodb.MongoInterruptedException;
26-
import com.mongodb.MongoNamespace;
2726
import com.mongodb.MongoOperationTimeoutException;
2827
import com.mongodb.MongoSocketClosedException;
2928
import com.mongodb.MongoSocketReadException;
3029
import com.mongodb.MongoSocketReadTimeoutException;
3130
import com.mongodb.MongoSocketWriteException;
3231
import com.mongodb.MongoSocketWriteTimeoutException;
3332
import com.mongodb.ServerAddress;
34-
import com.mongodb.UnixServerAddress;
3533
import com.mongodb.annotations.NotThreadSafe;
3634
import com.mongodb.connection.AsyncCompletionHandler;
3735
import com.mongodb.connection.ClusterConnectionMode;
@@ -54,9 +52,7 @@
5452
import com.mongodb.internal.session.SessionContext;
5553
import com.mongodb.internal.time.Timeout;
5654
import com.mongodb.internal.tracing.Span;
57-
import com.mongodb.internal.tracing.TracingManager;
5855
import com.mongodb.lang.Nullable;
59-
import io.micrometer.common.KeyValues;
6056
import org.bson.BsonBinaryReader;
6157
import org.bson.BsonDocument;
6258
import org.bson.ByteBuf;
@@ -100,21 +96,7 @@
10096
import static com.mongodb.internal.logging.LogMessage.Level.DEBUG;
10197
import static com.mongodb.internal.thread.InterruptionUtil.translateInterruptedException;
10298
import static com.mongodb.internal.tracing.MongodbObservation.HighCardinalityKeyNames.QUERY_TEXT;
103-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.CLIENT_CONNECTION_ID;
104-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.COLLECTION;
105-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.COMMAND_NAME;
106-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.CURSOR_ID;
107-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.NAMESPACE;
108-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.NETWORK_TRANSPORT;
109-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.QUERY_SUMMARY;
11099
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.RESPONSE_STATUS_CODE;
111-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.SERVER_ADDRESS;
112-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.SERVER_CONNECTION_ID;
113-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.SERVER_PORT;
114-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.SERVER_TYPE;
115-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.SESSION_ID;
116-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.SYSTEM;
117-
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.TRANSACTION_NUMBER;
118100
import static java.util.Arrays.asList;
119101

120102
/**
@@ -456,7 +438,16 @@ private <T> T sendAndReceiveInternal(final CommandMessage message, final Decoder
456438
Span tracingSpan;
457439
try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this)) {
458440
message.encode(bsonOutput, operationContext);
459-
tracingSpan = createTracingSpan(message, operationContext, bsonOutput);
441+
tracingSpan = operationContext
442+
.getTracingManager()
443+
.createTracingSpan(message,
444+
operationContext,
445+
() -> message.getCommandDocument(bsonOutput),
446+
cmdName -> SECURITY_SENSITIVE_COMMANDS.contains(cmdName)
447+
|| SECURITY_SENSITIVE_HELLO_COMMANDS.contains(cmdName),
448+
() -> getDescription().getServerAddress(),
449+
() -> getDescription().getConnectionId()
450+
);
460451

461452
boolean isLoggingCommandNeeded = isLoggingCommandNeeded();
462453
boolean isTracingCommandPayloadNeeded = tracingSpan != null && operationContext.getTracingManager().isCommandPayloadEnabled();
@@ -1031,103 +1022,4 @@ private ClusterId getClusterId() {
10311022
return description.getConnectionId().getServerId().getClusterId();
10321023
}
10331024

1034-
/**
1035-
* Creates a tracing span for the given command message.
1036-
* <p>
1037-
* The span is only created if tracing is enabled and the command is not security-sensitive.
1038-
* It attaches various tags to the span, such as database system, namespace, query summary, opcode,
1039-
* server address, port, server type, client and server connection IDs, and, if applicable,
1040-
* transaction number and session ID. For cursor fetching commands, the parent context is retrieved using the cursor ID.
1041-
* If command payload tracing is enabled, the command document is also attached as a tag.
1042-
*
1043-
* @param message the command message to trace
1044-
* @param operationContext the operation context containing tracing and session information
1045-
* @param bsonOutput the BSON output used to serialize the command
1046-
* @return the created {@link Span}, or {@code null} if tracing is not enabled or the command is security-sensitive
1047-
*/
1048-
@Nullable
1049-
private Span createTracingSpan(final CommandMessage message, final OperationContext operationContext, final ByteBufferBsonOutput bsonOutput) {
1050-
1051-
TracingManager tracingManager = operationContext.getTracingManager();
1052-
BsonDocument command = message.getCommandDocument(bsonOutput);
1053-
1054-
String commandName = command.getFirstKey();
1055-
1056-
if (!tracingManager.isEnabled()
1057-
|| SECURITY_SENSITIVE_COMMANDS.contains(commandName)
1058-
|| SECURITY_SENSITIVE_HELLO_COMMANDS.contains(commandName)) {
1059-
return null;
1060-
}
1061-
1062-
Span operationSpan = operationContext.getTracingSpan();
1063-
Span span = tracingManager
1064-
.addSpan(commandName, operationSpan != null ? operationSpan.context() : null);
1065-
1066-
if (command.containsKey("getMore")) {
1067-
long cursorId = command.getInt64("getMore").longValue();
1068-
span.tagLowCardinality(CURSOR_ID.withValue(String.valueOf(cursorId)));
1069-
if (operationSpan != null) {
1070-
operationSpan.tagLowCardinality(CURSOR_ID.withValue(String.valueOf(cursorId)));
1071-
}
1072-
}
1073-
1074-
tagNamespace(span, operationSpan, message, commandName);
1075-
tagServerAndConnectionInfo(span, message);
1076-
tagSessionAndTransactionInfo(span, operationContext);
1077-
1078-
return span;
1079-
}
1080-
1081-
private void tagNamespace(final Span span, @Nullable final Span parentSpan, final CommandMessage message, final String commandName) {
1082-
String namespace;
1083-
String collection = "";
1084-
if (parentSpan != null) {
1085-
MongoNamespace parentNamespace = parentSpan.getNamespace();
1086-
if (parentNamespace != null) {
1087-
namespace = parentNamespace.getDatabaseName();
1088-
collection =
1089-
MongoNamespace.COMMAND_COLLECTION_NAME.equalsIgnoreCase(parentNamespace.getCollectionName()) ? ""
1090-
: parentNamespace.getCollectionName();
1091-
} else {
1092-
namespace = message.getDatabase();
1093-
}
1094-
} else {
1095-
namespace = message.getDatabase();
1096-
}
1097-
String summary = commandName + " " + namespace + (collection.isEmpty() ? "" : "." + collection);
1098-
1099-
KeyValues keyValues = KeyValues.of(
1100-
SYSTEM.withValue("mongodb"),
1101-
NAMESPACE.withValue(namespace),
1102-
QUERY_SUMMARY.withValue(summary),
1103-
COMMAND_NAME.withValue(commandName));
1104-
1105-
if (!collection.isEmpty()) {
1106-
keyValues = keyValues.and(COLLECTION.withValue(collection));
1107-
}
1108-
span.tagLowCardinality(keyValues);
1109-
}
1110-
1111-
private void tagServerAndConnectionInfo(final Span span, final CommandMessage message) {
1112-
span.tagLowCardinality(KeyValues.of(
1113-
SERVER_ADDRESS.withValue(serverId.getAddress().getHost()),
1114-
SERVER_PORT.withValue(String.valueOf(serverId.getAddress().getPort())),
1115-
SERVER_TYPE.withValue(message.getSettings().getServerType().name()),
1116-
CLIENT_CONNECTION_ID.withValue(String.valueOf(this.description.getConnectionId().getLocalValue())),
1117-
SERVER_CONNECTION_ID.withValue(String.valueOf(this.description.getConnectionId().getServerValue())),
1118-
NETWORK_TRANSPORT.withValue(getServerAddress() instanceof UnixServerAddress ? "unix" : "tcp")
1119-
));
1120-
}
1121-
1122-
private void tagSessionAndTransactionInfo(final Span span, final OperationContext operationContext) {
1123-
SessionContext sessionContext = operationContext.getSessionContext();
1124-
if (sessionContext.hasSession() && !sessionContext.isImplicitSession()) {
1125-
span.tagLowCardinality(KeyValues.of(
1126-
TRANSACTION_NUMBER.withValue(String.valueOf(sessionContext.getTransactionNumber())),
1127-
SESSION_ID.withValue(String.valueOf(sessionContext.getSessionId()
1128-
.get(sessionContext.getSessionId().getFirstKey())
1129-
.asBinary().asUuid()))
1130-
));
1131-
}
1132-
}
11331025
}

driver-core/src/main/com/mongodb/internal/tracing/MongodbObservation.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,6 @@ public String asString() {
116116
return "server.port";
117117
}
118118
},
119-
SERVER_TYPE {
120-
@Override
121-
public String asString() {
122-
return "server.type";
123-
}
124-
},
125119
CLIENT_CONNECTION_ID {
126120
@Override
127121
public String asString() {

driver-core/src/main/com/mongodb/internal/tracing/TracingManager.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,34 @@
1717
package com.mongodb.internal.tracing;
1818

1919
import com.mongodb.MongoNamespace;
20+
import com.mongodb.ServerAddress;
21+
import com.mongodb.UnixServerAddress;
22+
import com.mongodb.connection.ConnectionId;
23+
import com.mongodb.internal.connection.CommandMessage;
24+
import com.mongodb.internal.connection.OperationContext;
25+
import com.mongodb.internal.session.SessionContext;
2026
import com.mongodb.lang.Nullable;
27+
import io.micrometer.common.KeyValues;
2128
import io.micrometer.observation.ObservationRegistry;
29+
import org.bson.BsonDocument;
30+
31+
import java.util.function.Predicate;
32+
import java.util.function.Supplier;
2233

2334
import static com.mongodb.MongoClientSettings.ENV_OTEL_ENABLED;
35+
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.CLIENT_CONNECTION_ID;
36+
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.COLLECTION;
37+
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.COMMAND_NAME;
38+
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.CURSOR_ID;
39+
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.NAMESPACE;
40+
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.NETWORK_TRANSPORT;
41+
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.QUERY_SUMMARY;
42+
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.SERVER_ADDRESS;
43+
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.SERVER_CONNECTION_ID;
44+
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.SERVER_PORT;
45+
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.SESSION_ID;
2446
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.SYSTEM;
47+
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.TRANSACTION_NUMBER;
2548
import static java.lang.System.getenv;
2649

2750
/**
@@ -118,4 +141,101 @@ public boolean isEnabled() {
118141
public boolean isCommandPayloadEnabled() {
119142
return enableCommandPayload;
120143
}
144+
145+
146+
/** Create a tracing span for the given command message.
147+
* <p>
148+
* The span is only created if tracing is enabled and the command is not security-sensitive.
149+
* It attaches various tags to the span, such as database system, namespace, query summary, opcode,
150+
* server address, port, server type, client and server connection IDs, and, if applicable,
151+
* transaction number and session ID.
152+
* If command payload tracing is enabled, the command document is also attached as a tag.
153+
*
154+
* @param message the command message to trace
155+
* @param operationContext the operation context containing tracing and session information
156+
* @param commandDocumentSupplier a supplier that provides the command document when needed
157+
* @param isSensitiveCommand a predicate that determines if a command is security-sensitive based on its name
158+
* @param serverAddressSupplier a supplier that provides the server address when needed
159+
* @param connectionIdSupplier a supplier that provides the connection ID when needed
160+
* @return the created {@link Span}, or {@code null} if tracing is not enabled or the command is security-sensitive
161+
*/
162+
@Nullable
163+
public Span createTracingSpan(final CommandMessage message,
164+
final OperationContext operationContext,
165+
final Supplier<BsonDocument> commandDocumentSupplier,
166+
final Predicate<String> isSensitiveCommand,
167+
final Supplier<ServerAddress> serverAddressSupplier,
168+
final Supplier<ConnectionId> connectionIdSupplier
169+
) {
170+
171+
BsonDocument command = commandDocumentSupplier.get();
172+
String commandName = command.getFirstKey();
173+
if (!isEnabled() || isSensitiveCommand.test(commandName)) {
174+
return null;
175+
}
176+
177+
Span operationSpan = operationContext.getTracingSpan();
178+
Span span = addSpan(commandName, operationSpan != null ? operationSpan.context() : null);
179+
180+
if (command.containsKey("getMore")) {
181+
long cursorId = command.getInt64("getMore").longValue();
182+
span.tagLowCardinality(CURSOR_ID.withValue(String.valueOf(cursorId)));
183+
if (operationSpan != null) {
184+
operationSpan.tagLowCardinality(CURSOR_ID.withValue(String.valueOf(cursorId)));
185+
}
186+
}
187+
188+
// Tag namespace
189+
String namespace;
190+
String collection = "";
191+
if (operationSpan != null) {
192+
MongoNamespace parentNamespace = operationSpan.getNamespace();
193+
if (parentNamespace != null) {
194+
namespace = parentNamespace.getDatabaseName();
195+
collection =
196+
MongoNamespace.COMMAND_COLLECTION_NAME.equalsIgnoreCase(parentNamespace.getCollectionName()) ? ""
197+
: parentNamespace.getCollectionName();
198+
} else {
199+
namespace = message.getDatabase();
200+
}
201+
} else {
202+
namespace = message.getDatabase();
203+
}
204+
String summary = commandName + " " + namespace + (collection.isEmpty() ? "" : "." + collection);
205+
206+
KeyValues keyValues = KeyValues.of(
207+
SYSTEM.withValue("mongodb"),
208+
NAMESPACE.withValue(namespace),
209+
QUERY_SUMMARY.withValue(summary),
210+
COMMAND_NAME.withValue(commandName));
211+
212+
if (!collection.isEmpty()) {
213+
keyValues = keyValues.and(COLLECTION.withValue(collection));
214+
}
215+
span.tagLowCardinality(keyValues);
216+
217+
// tag server and connection info
218+
ServerAddress serverAddress = serverAddressSupplier.get();
219+
ConnectionId connectionId = connectionIdSupplier.get();
220+
span.tagLowCardinality(KeyValues.of(
221+
SERVER_ADDRESS.withValue(serverAddress.getHost()),
222+
SERVER_PORT.withValue(String.valueOf(serverAddress.getPort())),
223+
CLIENT_CONNECTION_ID.withValue(String.valueOf(connectionId.getLocalValue())),
224+
SERVER_CONNECTION_ID.withValue(String.valueOf(connectionId.getServerValue())),
225+
NETWORK_TRANSPORT.withValue(serverAddress instanceof UnixServerAddress ? "unix" : "tcp")
226+
));
227+
228+
// tag session and transaction info
229+
SessionContext sessionContext = operationContext.getSessionContext();
230+
if (sessionContext.hasSession() && !sessionContext.isImplicitSession()) {
231+
span.tagLowCardinality(KeyValues.of(
232+
TRANSACTION_NUMBER.withValue(String.valueOf(sessionContext.getTransactionNumber())),
233+
SESSION_ID.withValue(String.valueOf(sessionContext.getSessionId()
234+
.get(sessionContext.getSessionId().getFirstKey())
235+
.asBinary().asUuid()))
236+
));
237+
}
238+
239+
return span;
240+
}
121241
}

driver-core/src/test/unit/com/mongodb/MongoClientSettingsSpecification.groovy

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -552,10 +552,13 @@ class MongoClientSettingsSpecification extends Specification {
552552
def actual = MongoClientSettings.Builder.declaredFields.grep { !it.synthetic } *.name.sort()
553553
def expected = ['applicationName', 'autoEncryptionSettings', 'clusterSettingsBuilder', 'codecRegistry', 'commandListeners',
554554
'compressorList', 'connectionPoolSettingsBuilder', 'contextProvider', 'credential', 'dnsClient',
555+
'enableCommandPayloadTracing',
555556
'heartbeatConnectTimeoutMS', 'heartbeatSocketTimeoutMS', 'inetAddressResolver', 'loggerSettingsBuilder',
557+
'observationRegistry',
556558
'readConcern', 'readPreference', 'retryReads',
557559
'retryWrites', 'serverApi', 'serverSettingsBuilder', 'socketSettingsBuilder', 'sslSettingsBuilder',
558-
'timeoutMS', 'tracer', 'transportSettings', 'uuidRepresentation', 'writeConcern']
560+
'timeoutMS', 'transportSettings', 'uuidRepresentation',
561+
'writeConcern']
559562

560563
then:
561564
actual == expected
@@ -568,9 +571,13 @@ class MongoClientSettingsSpecification extends Specification {
568571
def expected = ['addCommandListener', 'applicationName', 'applyConnectionString', 'applyToClusterSettings',
569572
'applyToConnectionPoolSettings', 'applyToLoggerSettings', 'applyToServerSettings', 'applyToSocketSettings',
570573
'applyToSslSettings', 'autoEncryptionSettings', 'build', 'codecRegistry', 'commandListenerList',
571-
'compressorList', 'contextProvider', 'credential', 'dnsClient', 'heartbeatConnectTimeoutMS',
572-
'heartbeatSocketTimeoutMS', 'inetAddressResolver', 'readConcern', 'readPreference', 'retryReads', 'retryWrites',
573-
'serverApi', 'timeout', 'tracer', 'transportSettings', 'uuidRepresentation', 'writeConcern']
574+
'compressorList', 'contextProvider', 'credential', 'dnsClient',
575+
'heartbeatConnectTimeoutMS',
576+
'heartbeatSocketTimeoutMS', 'inetAddressResolver', 'observationRegistry', 'observationRegistry', 'readConcern',
577+
'readPreference',
578+
'retryReads', 'retryWrites',
579+
'serverApi', 'timeout', 'transportSettings',
580+
'uuidRepresentation', 'writeConcern']
574581

575582
then:
576583
actual == expected

0 commit comments

Comments
 (0)