1717import static com .rabbitmq .stream .impl .TestUtils .CountDownLatchConditions .completed ;
1818import static com .rabbitmq .stream .impl .TestUtils .localhost ;
1919import static com .rabbitmq .stream .impl .TestUtils .waitAtMost ;
20+ import static io .micrometer .tracing .test .simple .SpanAssert .assertThat ;
2021import static org .assertj .core .api .Assertions .assertThat ;
2122
2223import com .rabbitmq .stream .*;
24+ import com .rabbitmq .stream .codec .QpidProtonCodec ;
25+ import com .rabbitmq .stream .codec .SwiftMqCodec ;
2326import com .rabbitmq .stream .impl .TestUtils ;
2427import io .micrometer .tracing .test .SampleTestRunner ;
25- import io .micrometer .tracing .test .simple . SpanAssert ;
28+ import io .micrometer .tracing .test .reporter . BuildingBlocks ;
2629import io .micrometer .tracing .test .simple .SpansAssert ;
2730import io .netty .channel .EventLoopGroup ;
2831import java .nio .charset .StandardCharsets ;
@@ -48,26 +51,96 @@ EnvironmentBuilder environmentBuilder() {
4851 .addressResolver (add -> localhost ());
4952 }
5053
54+ ObservationCollector <?> observationCollector () {
55+ return new MicrometerObservationCollectorBuilder ().registry (getObservationRegistry ()).build ();
56+ }
57+
5158 @ Override
5259 public TracingSetup [] getTracingSetup () {
5360 return new TracingSetup [] {TracingSetup .IN_MEMORY_BRAVE , TracingSetup .ZIPKIN_BRAVE };
5461 }
62+
63+ void publishConsume (Codec codec , BuildingBlocks buildingBlocks ) throws Exception {
64+ try (Environment env =
65+ environmentBuilder ().codec (codec ).observationCollector (observationCollector ()).build ()) {
66+ Producer producer = env .producerBuilder ().stream (stream ).build ();
67+ CountDownLatch publishLatch = new CountDownLatch (1 );
68+ producer .send (
69+ producer .messageBuilder ().addData (PAYLOAD ).build (), status -> publishLatch .countDown ());
70+
71+ assertThat (publishLatch ).is (completed ());
72+
73+ CountDownLatch consumeLatch = new CountDownLatch (1 );
74+ env .consumerBuilder ().stream (stream )
75+ .offset (first ())
76+ .messageHandler ((ctx , msg ) -> consumeLatch .countDown ())
77+ .build ();
78+
79+ assertThat (consumeLatch ).is (completed ());
80+
81+ waitAtMost (() -> buildingBlocks .getFinishedSpans ().size () == 2 );
82+
83+ SpansAssert .assertThat (buildingBlocks .getFinishedSpans ()).haveSameTraceId ().hasSize (2 );
84+ assertThat (buildingBlocks .getFinishedSpans ().get (0 ))
85+ .hasNameEqualTo (stream + " publish" )
86+ .hasTag ("messaging.destination.name" , stream )
87+ .hasTag ("messaging.message.payload_size_bytes" , String .valueOf (PAYLOAD .length ))
88+ .hasTag ("net.protocol.name" , "rabbitmq-stream" )
89+ .hasTag ("net.protocol.version" , "1.0" );
90+ assertThat (buildingBlocks .getFinishedSpans ().get (1 ))
91+ .hasNameEqualTo (stream + " process" )
92+ .hasTag ("messaging.destination.name" , stream )
93+ .hasTag ("messaging.source.name" , stream )
94+ .hasTag ("messaging.message.payload_size_bytes" , String .valueOf (PAYLOAD .length ))
95+ .hasTag ("net.protocol.name" , "rabbitmq-stream" )
96+ .hasTag ("net.protocol.version" , "1.0" );
97+ waitAtMost (
98+ () ->
99+ getMeterRegistry ().find ("rabbitmq.stream.publish" ).timer () != null
100+ && getMeterRegistry ().find ("rabbitmq.stream.process" ).timer () != null );
101+ getMeterRegistry ()
102+ .get ("rabbitmq.stream.publish" )
103+ .tag ("messaging.operation" , "publish" )
104+ .tag ("messaging.system" , "rabbitmq" )
105+ .timer ();
106+ getMeterRegistry ()
107+ .get ("rabbitmq.stream.process" )
108+ .tag ("messaging.operation" , "process" )
109+ .tag ("messaging.system" , "rabbitmq" )
110+ .timer ();
111+ }
112+ }
55113 }
56114
57115 @ Nested
58- class PublishConsume extends IntegrationTest {
116+ class PublishConsumeQpidCodec extends IntegrationTest {
117+
118+ @ Override
119+ public SampleTestRunnerConsumer yourCode () {
120+ return (buildingBlocks , meterRegistry ) ->
121+ publishConsume (new QpidProtonCodec (), buildingBlocks );
122+ }
123+ }
124+
125+ @ Nested
126+ class PublishConsumeSwiftMqCodec extends IntegrationTest {
127+
128+ @ Override
129+ public SampleTestRunnerConsumer yourCode () {
130+ return (buildingBlocks , meterRegistry ) -> publishConsume (new SwiftMqCodec (), buildingBlocks );
131+ }
132+ }
133+
134+ @ Nested
135+ class ConsumeWithoutObservationShouldNotFail extends IntegrationTest {
59136
60137 @ Override
61138 public SampleTestRunnerConsumer yourCode () {
62139 return (buildingBlocks , meterRegistry ) -> {
63- try (Environment env =
64- environmentBuilder ()
65- .observationCollector (
66- new MicrometerObservationCollectorBuilder ()
67- .registry (getObservationRegistry ())
68- .build ())
69- .build ()) {
70- Producer producer = env .producerBuilder ().stream (stream ).build ();
140+ try (Environment publishEnv = environmentBuilder ().build ();
141+ Environment consumeEnv =
142+ environmentBuilder ().observationCollector (observationCollector ()).build ()) {
143+ Producer producer = publishEnv .producerBuilder ().stream (stream ).build ();
71144 CountDownLatch publishLatch = new CountDownLatch (1 );
72145 producer .send (
73146 producer .messageBuilder ().addData (PAYLOAD ).build (),
@@ -76,86 +149,30 @@ public SampleTestRunnerConsumer yourCode() {
76149 assertThat (publishLatch ).is (completed ());
77150
78151 CountDownLatch consumeLatch = new CountDownLatch (1 );
79- env .consumerBuilder ().stream (stream )
152+ consumeEnv .consumerBuilder ().stream (stream )
80153 .offset (first ())
81154 .messageHandler ((ctx , msg ) -> consumeLatch .countDown ())
82155 .build ();
83156
84157 assertThat (consumeLatch ).is (completed ());
85158
86- waitAtMost (() -> buildingBlocks .getFinishedSpans ().size () == 2 );
159+ waitAtMost (() -> buildingBlocks .getFinishedSpans ().size () == 1 );
87160
88- SpansAssert .assertThat (buildingBlocks .getFinishedSpans ()).haveSameTraceId ().hasSize (2 );
89- SpanAssert .assertThat (buildingBlocks .getFinishedSpans ().get (0 ))
90- .hasNameEqualTo (stream + " publish" )
91- .hasTag ("messaging.destination.name" , stream )
92- .hasTag ("messaging.message.payload_size_bytes" , String .valueOf (PAYLOAD .length ))
93- .hasTag ("net.protocol.name" , "rabbitmq-stream" )
94- .hasTag ("net.protocol.version" , "1.0" );
95- SpanAssert .assertThat (buildingBlocks .getFinishedSpans ().get (1 ))
161+ SpansAssert .assertThat (buildingBlocks .getFinishedSpans ()).haveSameTraceId ().hasSize (1 );
162+ assertThat (buildingBlocks .getFinishedSpans ().get (0 ))
96163 .hasNameEqualTo (stream + " process" )
97164 .hasTag ("messaging.destination.name" , stream )
98165 .hasTag ("messaging.source.name" , stream )
99166 .hasTag ("messaging.message.payload_size_bytes" , String .valueOf (PAYLOAD .length ))
100167 .hasTag ("net.protocol.name" , "rabbitmq-stream" )
101168 .hasTag ("net.protocol.version" , "1.0" );
102- waitAtMost (
103- () ->
104- getMeterRegistry ().find ("rabbitmq.stream.publish" ).timer () != null
105- && getMeterRegistry ().find ("rabbitmq.stream.process" ).timer () != null );
106- getMeterRegistry ()
107- .get ("rabbitmq.stream.publish" )
108- .tag ("messaging.operation" , "publish" )
109- .tag ("messaging.system" , "rabbitmq" )
110- .timer ();
169+ waitAtMost (() -> getMeterRegistry ().find ("rabbitmq.stream.process" ).timer () != null );
111170 getMeterRegistry ()
112171 .get ("rabbitmq.stream.process" )
113172 .tag ("messaging.operation" , "process" )
114173 .tag ("messaging.system" , "rabbitmq" )
115174 .timer ();
116175 }
117-
118- /*
119- assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
120- waitAtMost(() -> buildingBlocks.getFinishedSpans().size() == 2);
121- SpansAssert.assertThat(buildingBlocks.getFinishedSpans()).haveSameTraceId().hasSize(2);
122- SpanAssert.assertThat(buildingBlocks.getFinishedSpans().get(0))
123- .hasNameEqualTo("metrics.queue publish")
124- .hasTag("messaging.rabbitmq.destination.routing_key", "metrics.queue")
125- .hasTag("messaging.destination.name", "amq.default")
126- .hasTag("messaging.message.payload_size_bytes", String.valueOf(PAYLOAD.length))
127- .hasTagWithKey("net.sock.peer.addr")
128- .hasTag("net.sock.peer.port", "5672")
129- .hasTag("net.protocol.name", "amqp")
130- .hasTag("net.protocol.version", "0.9.1");
131- SpanAssert.assertThat(buildingBlocks.getFinishedSpans().get(1))
132- .hasNameEqualTo("metrics.queue process")
133- .hasTag("messaging.rabbitmq.destination.routing_key", "metrics.queue")
134- .hasTag("messaging.destination.name", "amq.default")
135- .hasTag("messaging.source.name", "metrics.queue")
136- .hasTag("messaging.message.payload_size_bytes", String.valueOf(PAYLOAD.length))
137- .hasTag("net.protocol.name", "amqp")
138- .hasTag("net.protocol.version", "0.9.1");
139- waitAtMost(
140- () ->
141- getMeterRegistry().find("rabbitmq.publish").timer() != null
142- && getMeterRegistry().find("rabbitmq.process").timer() != null);
143- getMeterRegistry()
144- .get("rabbitmq.publish")
145- .tag("messaging.operation", "publish")
146- .tag("messaging.system", "rabbitmq")
147- .timer();
148- getMeterRegistry()
149- .get("rabbitmq.process")
150- .tag("messaging.operation", "process")
151- .tag("messaging.system", "rabbitmq")
152- .timer();
153- } finally {
154- safeClose(publishConnection);
155- safeClose(consumeConnection);
156- }
157-
158- */
159176 };
160177 }
161178 }
0 commit comments