Skip to content

Commit aebc325

Browse files
Wrapping call in observe(...) and nulling sender
Due to the fact that in the tests we have both the producer and the consumer and they have the same parent, the zipkin graph looks bizarre. With these changes we're changing the test sending code to simulate sending a message from a different service that will result in a creation of a new trace identifier. Due to this we will have 2 sets of trace ids created, one for sending and one for polling. Sending: null_observation -> send -> receive message Polling: test_span -> receive (very short)
1 parent 303fa97 commit aebc325

File tree

3 files changed

+44
-21
lines changed

3 files changed

+44
-21
lines changed

src/main/java/com/rabbitmq/client/observation/micrometer/DefaultDeliverObservationConvention.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public DefaultDeliverObservationConvention(String name, String operation) {
3535
this.operation = operation;
3636
}
3737

38+
// TODO: If the name is not fixed we won't be able to parse it to automatically document the name
3839
@Override
3940
public String getName() {
4041
return name;

src/main/java/com/rabbitmq/client/observation/micrometer/MicrometerObservationCollector.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ public Consumer basicConsume(String queue, String consumerTag, Consumer consumer
102102

103103
@Override
104104
public GetResponse basicGet(BasicGetCall call, String queue) {
105-
Observation parentObservation = Observation.start("rabbitmq.receive", registry);
106-
try {
105+
return Observation.createNotStarted("rabbitmq.receive", registry).observe(() -> {
107106
GetResponse response = call.get();
108107
if (response != null) {
109108
Map<String, Object> headers;
@@ -113,26 +112,20 @@ public GetResponse basicGet(BasicGetCall call, String queue) {
113112
headers = response.getProps().getHeaders();
114113
}
115114
DeliverContext context =
116-
new DeliverContext(
117-
response.getEnvelope().getExchange(),
118-
response.getEnvelope().getRoutingKey(),
119-
queue,
120-
headers,
121-
response.getBody() == null ? 0 : response.getBody().length);
115+
new DeliverContext(
116+
response.getEnvelope().getExchange(),
117+
response.getEnvelope().getRoutingKey(),
118+
queue,
119+
headers,
120+
response.getBody() == null ? 0 : response.getBody().length);
122121
Observation observation =
123-
RabbitMqObservationDocumentation.RECEIVE_OBSERVATION.observation(
124-
customReceiveConvention, defaultReceiveConvention, () -> context, registry);
125-
observation.parentObservation(parentObservation);
122+
RabbitMqObservationDocumentation.RECEIVE_OBSERVATION.observation(
123+
customReceiveConvention, defaultReceiveConvention, () -> context, registry);
126124
observation.start();
127125
observation.stop();
128126
}
129127
return response;
130-
} catch (RuntimeException e) {
131-
parentObservation.error(e);
132-
throw e;
133-
} finally {
134-
parentObservation.stop();
135-
}
128+
});
136129
}
137130

138131
private static class ObservationConsumer implements Consumer {

src/test/java/com/rabbitmq/client/test/functional/MicrometerObservationCollectorMetrics.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,25 @@
2222
import com.rabbitmq.client.observation.micrometer.MicrometerObservationCollectorBuilder;
2323
import com.rabbitmq.client.test.BrokerTestCase;
2424
import com.rabbitmq.client.test.TestUtils;
25+
import io.micrometer.observation.Observation;
2526
import io.micrometer.observation.ObservationRegistry;
27+
import io.micrometer.tracing.Tracer;
28+
import io.micrometer.tracing.exporter.FinishedSpan;
2629
import io.micrometer.tracing.test.SampleTestRunner;
2730
import io.micrometer.tracing.test.simple.SpanAssert;
2831
import io.micrometer.tracing.test.simple.SpansAssert;
2932
import java.io.IOException;
3033
import java.nio.charset.StandardCharsets;
34+
import java.util.Collection;
35+
import java.util.Comparator;
36+
import java.util.List;
37+
import java.util.Map;
3138
import java.util.Objects;
3239
import java.util.concurrent.CountDownLatch;
3340
import java.util.concurrent.TimeUnit;
3441
import java.util.stream.Collectors;
42+
43+
import org.assertj.core.api.BDDAssertions;
3544
import org.junit.jupiter.api.Nested;
3645

3746
public class MicrometerObservationCollectorMetrics extends BrokerTestCase {
@@ -111,6 +120,17 @@ private abstract static class IntegrationTest extends SampleTestRunner {
111120
public TracingSetup[] getTracingSetup() {
112121
return new TracingSetup[] {TracingSetup.IN_MEMORY_BRAVE, TracingSetup.ZIPKIN_BRAVE};
113122
}
123+
124+
void runWithNullingObservation(ObservationRegistry registry, Tracer tracer, Observation.CheckedRunnable<?> runnable) {
125+
Observation noParentObservation = Observation.createNotStarted("null_observation", registry);
126+
noParentObservation.parentObservation(null);
127+
try (Tracer.SpanInScope ws = tracer.withSpan(null)) {
128+
noParentObservation.observeChecked(runnable);
129+
}
130+
catch (Throwable e) {
131+
throw new RuntimeException(e);
132+
}
133+
}
114134
}
115135

116136
@Nested
@@ -186,7 +206,7 @@ public SampleTestRunnerConsumer yourCode() {
186206
publishConnection = connectionFactory.newConnection();
187207
Channel channel = publishConnection.createChannel();
188208

189-
sendMessage(channel);
209+
runWithNullingObservation(getObservationRegistry(), buildingBlocks.getTracer(), () -> sendMessage(channel));
190210

191211
consumeConnection = connectionFactory.newConnection();
192212
Channel basicGetChannel = consumeConnection.createChannel();
@@ -196,8 +216,14 @@ public SampleTestRunnerConsumer yourCode() {
196216
buildingBlocks.getFinishedSpans().stream()
197217
.map(Objects::toString)
198218
.collect(Collectors.joining("\n")));
199-
SpansAssert.assertThat(buildingBlocks.getFinishedSpans()).haveSameTraceId();
200-
SpanAssert.assertThat(buildingBlocks.getFinishedSpans().get(0))
219+
Map<String, List<FinishedSpan>> finishedSpans = buildingBlocks.getFinishedSpans().stream()
220+
.collect(Collectors.groupingBy(FinishedSpan::getTraceId));
221+
BDDAssertions.then(finishedSpans).as("One trace id for sending, one for polling").hasSize(2);
222+
Collection<List<FinishedSpan>> spans = finishedSpans.values();
223+
List<FinishedSpan> sendAndReceiveSpans = spans.stream().filter(f -> f.size() == 3).findFirst().orElseThrow(() -> new AssertionError("null_observation (fake nulling observation) -> produce -> consume"));
224+
sendAndReceiveSpans.sort(Comparator.comparing(FinishedSpan::getStartTimestamp));
225+
SpanAssert.assertThat(sendAndReceiveSpans.get(0)).hasNameEqualTo("null_observation");
226+
SpanAssert.assertThat(sendAndReceiveSpans.get(1))
201227
.hasNameEqualTo("metrics.queue publish")
202228
.hasTag("messaging.rabbitmq.destination.routing_key", "metrics.queue")
203229
.hasTag("messaging.destination.name", "amq.default")
@@ -206,12 +232,15 @@ public SampleTestRunnerConsumer yourCode() {
206232
.hasTag("net.sock.peer.port", "5672")
207233
.hasTag("net.protocol.name", "amqp")
208234
.hasTag("net.protocol.version", "0.9.1");
209-
SpanAssert.assertThat(buildingBlocks.getFinishedSpans().get(1))
235+
SpanAssert.assertThat(sendAndReceiveSpans.get(2))
210236
.hasNameEqualTo("metrics.queue receive")
211237
.hasTag("messaging.rabbitmq.destination.routing_key", "metrics.queue")
212238
.hasTag("messaging.destination.name", "amq.default")
213239
.hasTag("messaging.source.name", "metrics.queue")
214240
.hasTag("messaging.message.payload_size_bytes", String.valueOf(PAYLOAD.length));
241+
List<FinishedSpan> pollingSpans = spans.stream().filter(f -> f.size() == 1).findFirst().orElseThrow(() -> new AssertionError("rabbitmq.receive (child of test span)"));
242+
SpanAssert.assertThat(pollingSpans.get(0))
243+
.hasNameEqualTo("rabbitmq.receive");
215244
waitAtMost(
216245
() ->
217246
getMeterRegistry().find("rabbitmq.publish").timer() != null

0 commit comments

Comments
 (0)