Skip to content

Commit 9db08b7

Browse files
committed
Instrument basic.get
1 parent 1f6538e commit 9db08b7

14 files changed

+330
-108
lines changed

pom.xml

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
5555
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
5656

57+
<spotless.check.skip>true</spotless.check.skip>
5758
<slf4j.version>1.7.36</slf4j.version>
5859
<metrics.version>4.2.19</metrics.version>
5960
<micrometer.version>1.11.1</micrometer.version>
@@ -88,7 +89,8 @@
8889
<nexus-staging-maven-plugin.version>1.6.13</nexus-staging-maven-plugin.version>
8990
<checksum.maven.plugin.version>1.11</checksum.maven.plugin.version>
9091
<jshell-maven-plugin.version>1.3</jshell-maven-plugin.version>
91-
92+
<spotless.version>2.35.0</spotless.version>
93+
<google-java-format.version>1.17.0</google-java-format.version>
9294
<!--
9395
These groovy scripts are used later in this POM file to generate
9496
source files and resources for the library itself and for the
@@ -1104,6 +1106,42 @@
11041106
</configuration>
11051107
</plugin>
11061108

1109+
<plugin>
1110+
<groupId>com.diffplug.spotless</groupId>
1111+
<artifactId>spotless-maven-plugin</artifactId>
1112+
<version>${spotless.version}</version>
1113+
<configuration>
1114+
<java>
1115+
<includes>
1116+
<include>src/main/java/com/rabbitmq/client/observation/**/*.java</include>
1117+
<include>src/test/java/com/rabbitmq/client/test/functional/MicrometerObservationCollectorMetrics.java</include>
1118+
</includes>
1119+
<googleJavaFormat>
1120+
<version>${google-java-format.version}</version>
1121+
<style>GOOGLE</style>
1122+
</googleJavaFormat>
1123+
</java>
1124+
<!-- <ratchetFrom>origin/main</ratchetFrom>-->
1125+
<licenseHeader> <!-- specify either content or file, but not both -->
1126+
<content>// Copyright (c) $YEAR VMware, Inc. or its affiliates. All rights reserved.
1127+
//
1128+
// This software, the RabbitMQ Java client library, is triple-licensed under the
1129+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
1130+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
1131+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
1132+
// please see LICENSE-APACHE2.
1133+
//
1134+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
1135+
// either express or implied. See the LICENSE file for specific language governing
1136+
// rights and limitations of this software.
1137+
//
1138+
// If you have any questions regarding licensing, please contact us at
1139+
1140+
</content>
1141+
</licenseHeader>
1142+
</configuration>
1143+
</plugin>
1144+
11071145
</plugins>
11081146
<extensions>
11091147
<extension>

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,26 +1164,28 @@ public GetResponse basicGet(String queue, boolean autoAck)
11641164
.queue(queue)
11651165
.noAck(autoAck)
11661166
.build());
1167-
Method method = replyCommand.getMethod();
1168-
1169-
if (method instanceof Basic.GetOk) {
1170-
Basic.GetOk getOk = (Basic.GetOk)method;
1171-
Envelope envelope = new Envelope(getOk.getDeliveryTag(),
1172-
getOk.getRedelivered(),
1173-
getOk.getExchange(),
1174-
getOk.getRoutingKey());
1175-
BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
1176-
byte[] body = replyCommand.getContentBody();
1177-
int messageCount = getOk.getMessageCount();
1178-
1179-
metricsCollector.consumedMessage(this, getOk.getDeliveryTag(), autoAck);
1180-
1181-
return new GetResponse(envelope, props, body, messageCount);
1182-
} else if (method instanceof Basic.GetEmpty) {
1183-
return null;
1184-
} else {
1185-
throw new UnexpectedMethodError(method);
1186-
}
1167+
return this.observationCollector.basicGet(() -> {
1168+
Method method = replyCommand.getMethod();
1169+
1170+
if (method instanceof Basic.GetOk) {
1171+
Basic.GetOk getOk = (Basic.GetOk)method;
1172+
Envelope envelope = new Envelope(getOk.getDeliveryTag(),
1173+
getOk.getRedelivered(),
1174+
getOk.getExchange(),
1175+
getOk.getRoutingKey());
1176+
BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
1177+
byte[] body = replyCommand.getContentBody();
1178+
int messageCount = getOk.getMessageCount();
1179+
1180+
metricsCollector.consumedMessage(this, getOk.getDeliveryTag(), autoAck);
1181+
1182+
return new GetResponse(envelope, props, body, messageCount);
1183+
} else if (method instanceof Basic.GetEmpty) {
1184+
return null;
1185+
} else {
1186+
throw new UnexpectedMethodError(method);
1187+
}
1188+
}, queue);
11871189
}
11881190

11891191
/** Public API - {@inheritDoc} */

src/main/java/com/rabbitmq/client/observation/NoOpObservationCollector.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
//
1313
// If you have any questions regarding licensing, please contact us at
1414
15-
1615
package com.rabbitmq.client.observation;
1716

1817
import com.rabbitmq.client.AMQP;
1918
import com.rabbitmq.client.Consumer;
19+
import com.rabbitmq.client.GetResponse;
2020
import java.io.IOException;
2121

2222
final class NoOpObservationCollector implements ObservationCollector {
@@ -36,4 +36,9 @@ public void publish(
3636
public Consumer basicConsume(String queue, String consumerTag, Consumer consumer) {
3737
return consumer;
3838
}
39+
40+
@Override
41+
public GetResponse basicGet(BasicGetCall call, String queue) {
42+
return call.get();
43+
}
3944
}

src/main/java/com/rabbitmq/client/observation/ObservationCollector.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,43 @@
1212
//
1313
// If you have any questions regarding licensing, please contact us at
1414
15-
1615
package com.rabbitmq.client.observation;
1716

1817
import com.rabbitmq.client.AMQP;
1918
import com.rabbitmq.client.Consumer;
19+
import com.rabbitmq.client.GetResponse;
2020
import java.io.IOException;
2121

2222
/**
23+
* API to instrument operations in the AMQP client. The supported operations are publishing,
24+
* asynchronous delivery, and synchronous delivery (<code>basic.get</code>).
25+
*
26+
* <p>Implementations can gather information and send it to tracing backends. This allows e.g.
27+
* following the processing steps of a given message through different systems.
28+
*
29+
* <p>This is considered an SPI and is susceptible to change at any time.
30+
*
2331
* @since 5.18.0
32+
* @see com.rabbitmq.client.ConnectionFactory#setObservationCollector( ObservationCollector)
2433
*/
2534
public interface ObservationCollector {
2635

2736
ObservationCollector NO_OP = new NoOpObservationCollector();
2837

38+
/**
39+
* Decorate message publishing.
40+
*
41+
* <p>Implementations are expected to call {@link PublishCall#publish( PublishCall,
42+
* AMQP.Basic.Publish, AMQP.BasicProperties, byte[], ConnectionInfo)} to make sure the message is
43+
* actually sent.
44+
*
45+
* @param call
46+
* @param publish
47+
* @param properties
48+
* @param body
49+
* @param connectionInfo
50+
* @throws IOException
51+
*/
2952
void publish(
3053
PublishCall call,
3154
AMQP.Basic.Publish publish,
@@ -34,13 +57,44 @@ void publish(
3457
ConnectionInfo connectionInfo)
3558
throws IOException;
3659

60+
/**
61+
* Decorate consumer registration.
62+
*
63+
* <p>Implementations are expected to decorate the appropriate {@link Consumer} callbacks. The
64+
* original {@link Consumer} behavior should not be changed though.
65+
*
66+
* @param queue
67+
* @param consumerTag
68+
* @param consumer
69+
* @return
70+
*/
3771
Consumer basicConsume(String queue, String consumerTag, Consumer consumer);
3872

73+
/**
74+
* Decorate message polling with <code>basic.get</code>.
75+
*
76+
* <p>Implementations are expected to {@link BasicGetCall#basicGet( BasicGetCall, String)} and
77+
* return the same result.
78+
*
79+
* @param call
80+
* @param queue
81+
* @return
82+
*/
83+
GetResponse basicGet(BasicGetCall call, String queue);
84+
85+
/** Underlying publishing call. */
3986
interface PublishCall {
4087

4188
void publish(AMQP.BasicProperties properties) throws IOException;
4289
}
4390

91+
/** Underlying <code>basic.get</code> call. */
92+
interface BasicGetCall {
93+
94+
GetResponse get();
95+
}
96+
97+
/** Connection information. */
4498
interface ConnectionInfo {
4599

46100
String getPeerAddress();

src/main/java/com/rabbitmq/client/observation/micrometer/DefaultConsumeObservationConvention.java renamed to src/main/java/com/rabbitmq/client/observation/micrometer/DefaultDeliverObservationConvention.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
//
1313
// If you have any questions regarding licensing, please contact us at
1414
15-
1615
package com.rabbitmq.client.observation.micrometer;
1716

1817
import com.rabbitmq.client.observation.micrometer.RabbitMqObservationDocumentation.HighCardinalityTags;
@@ -21,21 +20,19 @@
2120
import io.micrometer.common.util.StringUtils;
2221

2322
/**
24-
* Default implementation of {@link ConsumeObservationConvention}.
23+
* Default implementation of {@link DeliverObservationConvention}.
2524
*
2625
* @since 5.18.0
27-
* @see ConsumeObservationConvention
26+
* @see DeliverObservationConvention
2827
*/
29-
public class DefaultConsumeObservationConvention implements ConsumeObservationConvention {
28+
public class DefaultDeliverObservationConvention implements DeliverObservationConvention {
3029

3130
private final String name;
31+
private final String operation;
3232

33-
public DefaultConsumeObservationConvention() {
34-
this("rabbitmq.consume");
35-
}
36-
37-
public DefaultConsumeObservationConvention(String name) {
33+
public DefaultDeliverObservationConvention(String name, String operation) {
3834
this.name = name;
35+
this.operation = operation;
3936
}
4037

4138
@Override
@@ -44,8 +41,8 @@ public String getName() {
4441
}
4542

4643
@Override
47-
public String getContextualName(ConsumeContext context) {
48-
return source(context.getQueue()) + " consume";
44+
public String getContextualName(DeliverContext context) {
45+
return source(context.getQueue()) + " " + operation;
4946
}
5047

5148
private String exchange(String destination) {
@@ -57,14 +54,14 @@ private String source(String destination) {
5754
}
5855

5956
@Override
60-
public KeyValues getLowCardinalityKeyValues(ConsumeContext context) {
57+
public KeyValues getLowCardinalityKeyValues(DeliverContext context) {
6158
return KeyValues.of(
62-
LowCardinalityTags.MESSAGING_OPERATION.withValue("consume"),
59+
LowCardinalityTags.MESSAGING_OPERATION.withValue(this.operation),
6360
LowCardinalityTags.MESSAGING_SYSTEM.withValue("rabbitmq"));
6461
}
6562

6663
@Override
67-
public KeyValues getHighCardinalityKeyValues(ConsumeContext context) {
64+
public KeyValues getHighCardinalityKeyValues(DeliverContext context) {
6865
return KeyValues.of(
6966
HighCardinalityTags.MESSAGING_ROUTING_KEY.withValue(context.getRoutingKey()),
7067
HighCardinalityTags.MESSAGING_DESTINATION_NAME.withValue(exchange(context.getExchange())),

src/main/java/com/rabbitmq/client/observation/micrometer/DefaultPublishObservationConvention.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
//
1313
// If you have any questions regarding licensing, please contact us at
1414
15-
1615
package com.rabbitmq.client.observation.micrometer;
1716

1817
import com.rabbitmq.client.observation.micrometer.RabbitMqObservationDocumentation.HighCardinalityTags;
@@ -24,7 +23,6 @@
2423
* Default implementation of {@link PublishObservationConvention}.
2524
*
2625
* @since 5.18.0
27-
* @see RabbitMqObservationDocumentation
2826
*/
2927
public class DefaultPublishObservationConvention implements PublishObservationConvention {
3028

src/main/java/com/rabbitmq/client/observation/micrometer/ConsumeContext.java renamed to src/main/java/com/rabbitmq/client/observation/micrometer/DeliverContext.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
//
1313
// If you have any questions regarding licensing, please contact us at
1414
15-
1615
package com.rabbitmq.client.observation.micrometer;
1716

1817
import io.micrometer.observation.transport.ReceiverContext;
@@ -24,15 +23,19 @@
2423
*
2524
* @since 5.18.0
2625
*/
27-
public class ConsumeContext extends ReceiverContext<Map<String, Object>> {
26+
public class DeliverContext extends ReceiverContext<Map<String, Object>> {
2827

2928
private final String exchange;
3029
private final String routingKey;
3130
private final int payloadSizeBytes;
3231
private final String queue;
3332

34-
ConsumeContext(String exchange, String routingKey, String queue, Map<String, Object> headers,
35-
int payloadSizeBytes) {
33+
DeliverContext(
34+
String exchange,
35+
String routingKey,
36+
String queue,
37+
Map<String, Object> headers,
38+
int payloadSizeBytes) {
3639
super(
3740
(hdrs, key) -> {
3841
Object result = hdrs.get(key);
@@ -63,5 +66,4 @@ public int getPayloadSizeBytes() {
6366
public String getQueue() {
6467
return queue;
6568
}
66-
6769
}

src/main/java/com/rabbitmq/client/observation/micrometer/ConsumeObservationConvention.java renamed to src/main/java/com/rabbitmq/client/observation/micrometer/DeliverObservationConvention.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
//
1313
// If you have any questions regarding licensing, please contact us at
1414
15-
1615
package com.rabbitmq.client.observation.micrometer;
1716

1817
import io.micrometer.observation.Observation;
@@ -22,12 +21,11 @@
2221
* {@link ObservationConvention} for RabbitMQ client instrumentation.
2322
*
2423
* @since 5.18.0
25-
* @see DefaultPublishObservationConvention
2624
*/
27-
public interface ConsumeObservationConvention extends ObservationConvention<ConsumeContext> {
25+
public interface DeliverObservationConvention extends ObservationConvention<DeliverContext> {
2826

2927
@Override
3028
default boolean supportsContext(Observation.Context context) {
31-
return context instanceof ConsumeContext;
29+
return context instanceof DeliverContext;
3230
}
3331
}

0 commit comments

Comments
 (0)