Skip to content

Commit d221da6

Browse files
authored
Merge pull request #1017 from rabbitmq/marcingrzejszczak-observation
Support Micrometer observation
2 parents fb545d1 + ee87b27 commit d221da6

30 files changed

+1757
-67
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/usr/bin/env bash
2+
3+
./mvnw -q -P '!setup-test-cluster' test-compile exec:java \
4+
-Dexec.mainClass=io.micrometer.docs.DocsGeneratorCommand \
5+
-Dexec.classpathScope="test" \
6+
-Dexec.args='src/main/java/com/rabbitmq/client/observation/micrometer .* target/micrometer-observation-docs'

pom.xml

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,19 @@
5454
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
5555
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
5656

57+
<spotless.check.skip>true</spotless.check.skip>
5758
<slf4j.version>1.7.36</slf4j.version>
5859
<metrics.version>4.2.19</metrics.version>
5960
<micrometer.version>1.11.2</micrometer.version>
61+
<micrometer-tracing.version>1.1.4</micrometer-tracing.version>
6062
<opentelemetry.version>1.28.0</opentelemetry.version>
6163
<jackson.version>2.15.2</jackson.version>
6264
<logback.version>1.2.12</logback.version>
6365
<junit.jupiter.version>5.9.3</junit.jupiter.version>
6466
<mockito.version>5.4.0</mockito.version>
6567
<assertj.version>3.24.2</assertj.version>
68+
<micrometer-tracing-test.version>1.1.2</micrometer-tracing-test.version>
69+
<micrometer-docs-generator.version>1.0.2</micrometer-docs-generator.version>
6670
<jetty.version>9.4.51.v20230217</jetty.version>
6771
<bouncycastle.version>1.70</bouncycastle.version>
6872
<netcrusher.version>0.10</netcrusher.version>
@@ -87,7 +91,8 @@
8791
<nexus-staging-maven-plugin.version>1.6.13</nexus-staging-maven-plugin.version>
8892
<checksum.maven.plugin.version>1.11</checksum.maven.plugin.version>
8993
<jshell-maven-plugin.version>1.3</jshell-maven-plugin.version>
90-
94+
<spotless.version>2.35.0</spotless.version>
95+
<google-java-format.version>1.17.0</google-java-format.version>
9196
<!--
9297
These groovy scripts are used later in this POM file to generate
9398
source files and resources for the library itself and for the
@@ -791,10 +796,21 @@
791796
<version>${gson.version}</version>
792797
<scope>test</scope>
793798
</dependency>
799+
<dependency>
800+
<groupId>io.micrometer</groupId>
801+
<artifactId>micrometer-tracing-integration-test</artifactId>
802+
<version>${micrometer-tracing-test.version}</version>
803+
<scope>test</scope>
804+
</dependency>
805+
<dependency>
806+
<groupId>io.micrometer</groupId>
807+
<artifactId>micrometer-docs-generator</artifactId>
808+
<version>${micrometer-docs-generator.version}</version>
809+
<scope>test</scope>
810+
</dependency>
794811

795812
</dependencies>
796813

797-
798814
<dependencyManagement>
799815
<dependencies>
800816
<dependency>
@@ -1091,6 +1107,42 @@
10911107
</configuration>
10921108
</plugin>
10931109

1110+
<plugin>
1111+
<groupId>com.diffplug.spotless</groupId>
1112+
<artifactId>spotless-maven-plugin</artifactId>
1113+
<version>${spotless.version}</version>
1114+
<configuration>
1115+
<java>
1116+
<includes>
1117+
<include>src/main/java/com/rabbitmq/client/observation/**/*.java</include>
1118+
<include>src/test/java/com/rabbitmq/client/test/functional/MicrometerObservationCollectorMetrics.java</include>
1119+
</includes>
1120+
<googleJavaFormat>
1121+
<version>${google-java-format.version}</version>
1122+
<style>GOOGLE</style>
1123+
</googleJavaFormat>
1124+
</java>
1125+
<!-- <ratchetFrom>origin/main</ratchetFrom>-->
1126+
<licenseHeader> <!-- specify either content or file, but not both -->
1127+
<content>// Copyright (c) $YEAR VMware, Inc. or its affiliates. All rights reserved.
1128+
//
1129+
// This software, the RabbitMQ Java client library, is triple-licensed under the
1130+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
1131+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
1132+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
1133+
// please see LICENSE-APACHE2.
1134+
//
1135+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
1136+
// either express or implied. See the LICENSE file for specific language governing
1137+
// rights and limitations of this software.
1138+
//
1139+
// If you have any questions regarding licensing, please contact us at
1140+
1141+
</content>
1142+
</licenseHeader>
1143+
</configuration>
1144+
</plugin>
1145+
10941146
</plugins>
10951147
<extensions>
10961148
<extension>

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
2323
import com.rabbitmq.client.impl.recovery.RetryHandler;
2424
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
25+
import com.rabbitmq.client.observation.ObservationCollector;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
2728

@@ -52,7 +53,6 @@
5253
*/
5354
public class ConnectionFactory implements Cloneable {
5455

55-
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionFactory.class);
5656
private static final int MAX_UNSIGNED_SHORT = 65535;
5757

5858
/** Default user name */
@@ -138,6 +138,7 @@ public class ConnectionFactory implements Cloneable {
138138
private RecoveryDelayHandler recoveryDelayHandler;
139139

140140
private MetricsCollector metricsCollector;
141+
private ObservationCollector observationCollector = ObservationCollector.NO_OP;
141142

142143
private boolean nio = false;
143144
private FrameHandlerFactory frameHandlerFactory;
@@ -978,6 +979,18 @@ public MetricsCollector getMetricsCollector() {
978979
return metricsCollector;
979980
}
980981

982+
/**
983+
* Set observation collector.
984+
*
985+
* @param observationCollector the collector instance
986+
* @since 5.19.0
987+
* @see ObservationCollector
988+
* @see com.rabbitmq.client.observation.micrometer.MicrometerObservationCollectorBuilder
989+
*/
990+
public void setObservationCollector(ObservationCollector observationCollector) {
991+
this.observationCollector = observationCollector;
992+
}
993+
981994
/**
982995
* Set a {@link CredentialsRefreshService} instance to handle credentials refresh if appropriate.
983996
* <p>
@@ -1249,7 +1262,8 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
12491262
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
12501263
// No Sonar: no need to close this resource because we're the one that creates it
12511264
// and hands it over to the user
1252-
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector); //NOSONAR
1265+
AutorecoveringConnection conn = new AutorecoveringConnection(
1266+
params, fhFactory, addressResolver, metricsCollector, observationCollector); //NOSONAR
12531267

12541268
conn.init();
12551269
return conn;
@@ -1316,7 +1330,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
13161330
}
13171331

13181332
protected AMQConnection createConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector) {
1319-
return new AMQConnection(params, frameHandler, metricsCollector);
1333+
return new AMQConnection(params, frameHandler, metricsCollector, observationCollector);
13201334
}
13211335

13221336
/**

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,4 @@ default void basicPublishUnrouted(Channel channel) {
6666

6767
void basicCancel(Channel channel, String consumerTag);
6868

69-
}
69+
}

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.rabbitmq.client.AMQP.Queue;
2424
import com.rabbitmq.client.AMQP.Tx;
2525
import com.rabbitmq.client.Method;
26+
import com.rabbitmq.client.observation.ObservationCollector;
2627
import com.rabbitmq.utility.BlockingValueOrException;
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
@@ -78,6 +79,8 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
7879
private final TrafficListener _trafficListener;
7980
private final int maxInboundMessageBodySize;
8081

82+
private final ObservationCollector.ConnectionInfo connectionInfo;
83+
8184
/**
8285
* Construct a channel on the given connection, with the given channel number.
8386
* @param connection the underlying connection for this channel
@@ -94,6 +97,7 @@ public AMQChannel(AMQConnection connection, int channelNumber) {
9497
this._trafficListener = connection.getTrafficListener();
9598
this.maxInboundMessageBodySize = connection.getMaxInboundMessageBodySize();
9699
this._command = new AMQCommand(this.maxInboundMessageBodySize);
100+
this.connectionInfo = connection.connectionInfo();
97101
}
98102

99103
/**
@@ -584,4 +588,8 @@ public AMQCommand transformReply(AMQCommand command) {
584588
return command;
585589
}
586590
}
591+
592+
protected ObservationCollector.ConnectionInfo connectionInfo() {
593+
return this.connectionInfo;
594+
}
587595
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.client.*;
2020
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
2121
import com.rabbitmq.client.impl.recovery.RecoveryCanBeginListener;
22+
import com.rabbitmq.client.observation.ObservationCollector;
2223
import com.rabbitmq.utility.BlockingCell;
2324
import com.rabbitmq.utility.Utility;
2425
import org.slf4j.Logger;
@@ -68,6 +69,7 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
6869
private final int workPoolTimeout;
6970

7071
private final AtomicBoolean finalShutdownStarted = new AtomicBoolean(false);
72+
private volatile ObservationCollector.ConnectionInfo connectionInfo;
7173

7274
/**
7375
* Retrieve a copy of the default table of client properties that
@@ -140,6 +142,7 @@ public static Map<String, Object> defaultClientProperties() {
140142
private final CredentialsProvider credentialsProvider;
141143
private final Collection<BlockedListener> blockedListeners = new CopyOnWriteArrayList<>();
142144
protected final MetricsCollector metricsCollector;
145+
protected final ObservationCollector observationCollector;
143146
private final int channelRpcTimeout;
144147
private final boolean channelShouldCheckRpcResponseType;
145148
private final TrafficListener trafficListener;
@@ -210,13 +213,14 @@ public Map<String, Object> getServerProperties() {
210213
}
211214

212215
public AMQConnection(ConnectionParams params, FrameHandler frameHandler) {
213-
this(params, frameHandler, new NoOpMetricsCollector());
216+
this(params, frameHandler, new NoOpMetricsCollector(), ObservationCollector.NO_OP);
214217
}
215218

216219
/** Construct a new connection
217220
* @param params parameters for it
218221
*/
219-
public AMQConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector)
222+
public AMQConnection(ConnectionParams params, FrameHandler frameHandler,
223+
MetricsCollector metricsCollector, ObservationCollector observationCollector)
220224
{
221225
checkPreconditions();
222226
this.credentialsProvider = params.getCredentialsProvider();
@@ -255,6 +259,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
255259
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
256260

257261
this.metricsCollector = metricsCollector;
262+
this.observationCollector = observationCollector;
258263

259264
this.errorOnWriteListener = params.getErrorOnWriteListener() != null ? params.getErrorOnWriteListener() :
260265
(connection, exception) -> { throw exception; }; // we just propagate the exception for non-recoverable connections
@@ -425,6 +430,11 @@ public void start()
425430

426431
setHeartbeat(negotiatedHeartbeat);
427432

433+
this.connectionInfo = new DefaultConnectionInfo(
434+
this._frameHandler.getAddress().getHostAddress(),
435+
this._frameHandler.getPort()
436+
);
437+
428438
_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
429439
.channelMax(negotiatedChannelMax)
430440
.frameMax(frameMax)
@@ -475,7 +485,9 @@ public void start()
475485
}
476486

477487
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
478-
ChannelManager result = new ChannelManager(this._workService, channelMax, threadFactory, this.metricsCollector);
488+
ChannelManager result = new ChannelManager(
489+
this._workService, channelMax, threadFactory,
490+
this.metricsCollector, this.observationCollector);
479491
configureChannelManager(result);
480492
return result;
481493
}
@@ -1198,4 +1210,30 @@ public TrafficListener getTrafficListener() {
11981210
int getMaxInboundMessageBodySize() {
11991211
return maxInboundMessageBodySize;
12001212
}
1213+
1214+
private static class DefaultConnectionInfo implements ObservationCollector.ConnectionInfo {
1215+
1216+
private final String peerAddress;
1217+
private final int peerPort;
1218+
1219+
private DefaultConnectionInfo(String peerAddress, int peerPort) {
1220+
this.peerAddress = peerAddress;
1221+
this.peerPort = peerPort;
1222+
}
1223+
1224+
@Override
1225+
public String getPeerAddress() {
1226+
return peerAddress;
1227+
}
1228+
1229+
@Override
1230+
public int getPeerPort() {
1231+
return this.peerPort;
1232+
}
1233+
1234+
}
1235+
1236+
ObservationCollector.ConnectionInfo connectionInfo() {
1237+
return this.connectionInfo;
1238+
}
12011239
}

src/main/java/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.client.MetricsCollector;
2020
import com.rabbitmq.client.NoOpMetricsCollector;
2121
import com.rabbitmq.client.ShutdownSignalException;
22+
import com.rabbitmq.client.observation.ObservationCollector;
2223
import com.rabbitmq.utility.IntAllocator;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ public class ChannelManager {
5556
private int channelShutdownTimeout = (int) ((ConnectionFactory.DEFAULT_HEARTBEAT * AMQConnection.CHANNEL_SHUTDOWN_TIMEOUT_MULTIPLIER) * 1000);
5657

5758
protected final MetricsCollector metricsCollector;
59+
protected final ObservationCollector observationCollector;
5860

5961
public int getChannelMax(){
6062
return _channelMax;
@@ -65,11 +67,13 @@ public ChannelManager(ConsumerWorkService workService, int channelMax) {
6567
}
6668

6769
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {
68-
this(workService, channelMax, threadFactory, new NoOpMetricsCollector());
70+
this(workService, channelMax, threadFactory,
71+
new NoOpMetricsCollector(), ObservationCollector.NO_OP);
6972
}
7073

7174

72-
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory, MetricsCollector metricsCollector) {
75+
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory,
76+
MetricsCollector metricsCollector, ObservationCollector observationCollector) {
7377
if (channelMax < 0)
7478
throw new IllegalArgumentException("create ChannelManager: 'channelMax' must be greater or equal to 0.");
7579
if (channelMax == 0) {
@@ -83,6 +87,7 @@ public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFac
8387
this.workService = workService;
8488
this.threadFactory = threadFactory;
8589
this.metricsCollector = metricsCollector;
90+
this.observationCollector = observationCollector;
8691
}
8792

8893
/**
@@ -214,7 +219,8 @@ private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {
214219
}
215220

216221
protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
217-
return new ChannelN(connection, channelNumber, workService, this.metricsCollector);
222+
return new ChannelN(connection, channelNumber, workService,
223+
this.metricsCollector, this.observationCollector);
218224
}
219225

220226
/**

0 commit comments

Comments
 (0)