5151import com .mongodb .internal .logging .StructuredLogger ;
5252import com .mongodb .internal .session .SessionContext ;
5353import com .mongodb .internal .time .Timeout ;
54+ import com .mongodb .internal .tracing .Span ;
5455import com .mongodb .lang .Nullable ;
5556import org .bson .BsonBinaryReader ;
5657import org .bson .BsonDocument ;
9495import static com .mongodb .internal .connection .ProtocolHelper .isCommandOk ;
9596import static com .mongodb .internal .logging .LogMessage .Level .DEBUG ;
9697import static com .mongodb .internal .thread .InterruptionUtil .translateInterruptedException ;
98+ import static com .mongodb .internal .tracing .MongodbObservation .HighCardinalityKeyNames .QUERY_TEXT ;
99+ import static com .mongodb .internal .tracing .MongodbObservation .LowCardinalityKeyNames .RESPONSE_STATUS_CODE ;
97100import static java .util .Arrays .asList ;
98101
99102/**
@@ -432,22 +435,59 @@ public boolean reauthenticationIsTriggered(@Nullable final Throwable t) {
432435 private <T > T sendAndReceiveInternal (final CommandMessage message , final Decoder <T > decoder ,
433436 final OperationContext operationContext ) {
434437 CommandEventSender commandEventSender ;
438+ Span tracingSpan ;
435439 try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput (this )) {
436440 message .encode (bsonOutput , operationContext );
437- commandEventSender = createCommandEventSender (message , bsonOutput , operationContext );
438- commandEventSender .sendStartedEvent ();
441+ tracingSpan = operationContext
442+ .getTracingManager ()
443+ .createTracingSpan (message ,
444+ operationContext ,
445+ () -> message .getCommandDocument (bsonOutput ),
446+ cmdName -> SECURITY_SENSITIVE_COMMANDS .contains (cmdName )
447+ || SECURITY_SENSITIVE_HELLO_COMMANDS .contains (cmdName ),
448+ () -> getDescription ().getServerAddress (),
449+ () -> getDescription ().getConnectionId ()
450+ );
451+
452+ boolean isLoggingCommandNeeded = isLoggingCommandNeeded ();
453+ boolean isTracingCommandPayloadNeeded = tracingSpan != null && operationContext .getTracingManager ().isCommandPayloadEnabled ();
454+
455+ // Only hydrate the command document if necessary
456+ BsonDocument commandDocument = null ;
457+ if (isLoggingCommandNeeded || isTracingCommandPayloadNeeded ) {
458+ commandDocument = message .getCommandDocument (bsonOutput );
459+ }
460+ if (isLoggingCommandNeeded ) {
461+ commandEventSender = new LoggingCommandEventSender (
462+ SECURITY_SENSITIVE_COMMANDS , SECURITY_SENSITIVE_HELLO_COMMANDS , description , commandListener ,
463+ operationContext , message , commandDocument ,
464+ COMMAND_PROTOCOL_LOGGER , loggerSettings );
465+ commandEventSender .sendStartedEvent ();
466+ } else {
467+ commandEventSender = new NoOpCommandEventSender ();
468+ }
469+ if (isTracingCommandPayloadNeeded ) {
470+ tracingSpan .tagHighCardinality (QUERY_TEXT .asString (), commandDocument );
471+ }
472+
439473 try {
440474 sendCommandMessage (message , bsonOutput , operationContext );
441475 } catch (Exception e ) {
476+ if (tracingSpan != null ) {
477+ tracingSpan .error (e );
478+ }
442479 commandEventSender .sendFailedEvent (e );
443480 throw e ;
444481 }
445482 }
446483
447484 if (message .isResponseExpected ()) {
448- return receiveCommandMessageResponse (decoder , commandEventSender , operationContext );
485+ return receiveCommandMessageResponse (decoder , commandEventSender , operationContext , tracingSpan );
449486 } else {
450487 commandEventSender .sendSucceededEventForOneWayCommand ();
488+ if (tracingSpan != null ) {
489+ tracingSpan .end ();
490+ }
451491 return null ;
452492 }
453493 }
@@ -466,7 +506,7 @@ public <T> void send(final CommandMessage message, final Decoder<T> decoder, fin
466506 @ Override
467507 public <T > T receive (final Decoder <T > decoder , final OperationContext operationContext ) {
468508 isTrue ("Response is expected" , hasMoreToCome );
469- return receiveCommandMessageResponse (decoder , new NoOpCommandEventSender (), operationContext );
509+ return receiveCommandMessageResponse (decoder , new NoOpCommandEventSender (), operationContext , null );
470510 }
471511
472512 @ Override
@@ -512,7 +552,7 @@ private void trySendMessage(final CommandMessage message, final ByteBufferBsonOu
512552 }
513553
514554 private <T > T receiveCommandMessageResponse (final Decoder <T > decoder , final CommandEventSender commandEventSender ,
515- final OperationContext operationContext ) {
555+ final OperationContext operationContext , @ Nullable final Span tracingSpan ) {
516556 boolean commandSuccessful = false ;
517557 try (ResponseBuffers responseBuffers = receiveResponseBuffers (operationContext )) {
518558 updateSessionContext (operationContext .getSessionContext (), responseBuffers );
@@ -537,7 +577,17 @@ private <T> T receiveCommandMessageResponse(final Decoder<T> decoder, final Comm
537577 if (!commandSuccessful ) {
538578 commandEventSender .sendFailedEvent (e );
539579 }
580+ if (tracingSpan != null ) {
581+ if (e instanceof MongoCommandException ) {
582+ tracingSpan .tagLowCardinality (RESPONSE_STATUS_CODE .withValue (String .valueOf (((MongoCommandException ) e ).getErrorCode ())));
583+ }
584+ tracingSpan .error (e );
585+ }
540586 throw e ;
587+ } finally {
588+ if (tracingSpan != null ) {
589+ tracingSpan .end ();
590+ }
541591 }
542592 }
543593
@@ -553,7 +603,18 @@ private <T> void sendAndReceiveAsyncInternal(final CommandMessage message, final
553603
554604 try {
555605 message .encode (bsonOutput , operationContext );
556- CommandEventSender commandEventSender = createCommandEventSender (message , bsonOutput , operationContext );
606+
607+ CommandEventSender commandEventSender ;
608+ if (isLoggingCommandNeeded ()) {
609+ BsonDocument commandDocument = message .getCommandDocument (bsonOutput );
610+ commandEventSender = new LoggingCommandEventSender (
611+ SECURITY_SENSITIVE_COMMANDS , SECURITY_SENSITIVE_HELLO_COMMANDS , description , commandListener ,
612+ operationContext , message , commandDocument ,
613+ COMMAND_PROTOCOL_LOGGER , loggerSettings );
614+ } else {
615+ commandEventSender = new NoOpCommandEventSender ();
616+ }
617+
557618 commandEventSender .sendStartedEvent ();
558619 Compressor localSendCompressor = sendCompressor ;
559620 if (localSendCompressor == null || SECURITY_SENSITIVE_COMMANDS .contains (message .getCommandDocument (bsonOutput ).getFirstKey ())) {
@@ -952,19 +1013,13 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t
9521013
9531014 private static final StructuredLogger COMMAND_PROTOCOL_LOGGER = new StructuredLogger ("protocol.command" );
9541015
955- private CommandEventSender createCommandEventSender (final CommandMessage message , final ByteBufferBsonOutput bsonOutput ,
956- final OperationContext operationContext ) {
1016+ private boolean isLoggingCommandNeeded () {
9571017 boolean listensOrLogs = commandListener != null || COMMAND_PROTOCOL_LOGGER .isRequired (DEBUG , getClusterId ());
958- if (!recordEverything && (isMonitoringConnection || !opened () || !authenticated .get () || !listensOrLogs )) {
959- return new NoOpCommandEventSender ();
960- }
961- return new LoggingCommandEventSender (
962- SECURITY_SENSITIVE_COMMANDS , SECURITY_SENSITIVE_HELLO_COMMANDS , description , commandListener ,
963- operationContext , message , bsonOutput ,
964- COMMAND_PROTOCOL_LOGGER , loggerSettings );
1018+ return recordEverything || (!isMonitoringConnection && opened () && authenticated .get () && listensOrLogs );
9651019 }
9661020
9671021 private ClusterId getClusterId () {
9681022 return description .getConnectionId ().getServerId ().getClusterId ();
9691023 }
1024+
9701025}
0 commit comments