41
41
import java .util .concurrent .CountDownLatch ;
42
42
import java .util .concurrent .atomic .AtomicReference ;
43
43
import java .util .function .Consumer ;
44
+ import java .util .function .Predicate ;
44
45
import java .util .function .Supplier ;
45
46
import java .util .stream .IntStream ;
46
47
import java .util .stream .Stream ;
@@ -57,6 +58,7 @@ public class AmqpInteroperabilityTest {
57
58
58
59
String stream ;
59
60
TestUtils .ClientFactory cf ;
61
+ String brokerVersion ;
60
62
61
63
static Stream <Codec > codecs () {
62
64
return Stream .of (new QpidProtonCodec (), new SwiftMqCodec ());
@@ -67,14 +69,28 @@ private static HeaderTestConfiguration htc(
67
69
return new HeaderTestConfiguration (headerValue , assertion );
68
70
}
69
71
72
+ private static PropertiesTestConfiguration ptc (
73
+ Predicate <String > condition ,
74
+ Consumer <AMQP .BasicProperties .Builder > builder ,
75
+ Consumer <Message > assertion ) {
76
+ return new PropertiesTestConfiguration (builder , assertion , condition );
77
+ }
78
+
70
79
private static PropertiesTestConfiguration ptc (
71
80
Consumer <AMQP .BasicProperties .Builder > builder , Consumer <Message > assertion ) {
72
- return new PropertiesTestConfiguration (builder , assertion );
81
+ return new PropertiesTestConfiguration (builder , assertion , ignored -> true );
82
+ }
83
+
84
+ static MessageOperation mo (
85
+ Predicate <String > brokerVersionCondition ,
86
+ Consumer <MessageBuilder > messageBuilderConsumer ,
87
+ Consumer <Delivery > deliveryConsumer ) {
88
+ return new MessageOperation (messageBuilderConsumer , deliveryConsumer , brokerVersionCondition );
73
89
}
74
90
75
91
static MessageOperation mo (
76
92
Consumer <MessageBuilder > messageBuilderConsumer , Consumer <Delivery > deliveryConsumer ) {
77
- return new MessageOperation (messageBuilderConsumer , deliveryConsumer );
93
+ return new MessageOperation (messageBuilderConsumer , deliveryConsumer , ignored -> true );
78
94
}
79
95
80
96
@ ParameterizedTest
@@ -88,10 +104,15 @@ void publishToStreamQueueConsumeFromStream(Codec codec) throws Exception {
88
104
() ->
89
105
Stream .of (
90
106
ptc (
107
+ BEFORE_MESSAGE_CONTAINERS ,
91
108
b -> b .appId ("application id" ),
92
109
m ->
93
110
assertThat (m .getApplicationProperties ().get ("x-basic-app-id" ))
94
111
.isEqualTo ("application id" )),
112
+ ptc (
113
+ AFTER_MESSAGE_CONTAINERS ,
114
+ b -> b .appId ("application id" ),
115
+ m -> assertThat (m .getProperties ().getGroupId ()).isEqualTo ("application id" )),
95
116
ptc (
96
117
b -> b .contentEncoding ("content encoding" ),
97
118
m ->
@@ -137,10 +158,17 @@ void publishToStreamQueueConsumeFromStream(Codec codec) throws Exception {
137
158
* 1000 )), // in seconds in 091, in ms in 1.0, so losing some
138
159
// precision
139
160
ptc (
161
+ BEFORE_MESSAGE_CONTAINERS ,
140
162
b -> b .type ("the type" ),
141
163
m ->
142
164
assertThat (m .getApplicationProperties ().get ("x-basic-type" ))
143
165
.isEqualTo ("the type" )),
166
+ ptc (
167
+ AFTER_MESSAGE_CONTAINERS ,
168
+ b -> b .type ("the type" ),
169
+ m ->
170
+ assertThat (m .getMessageAnnotations ().get ("x-basic-type" ))
171
+ .isEqualTo ("the type" )),
144
172
ptc (
145
173
b -> b .userId ("guest" ),
146
174
m ->
@@ -199,6 +227,7 @@ void publishToStreamQueueConsumeFromStream(Codec codec) throws Exception {
199
227
AMQP .BasicProperties .Builder builder = new AMQP .BasicProperties .Builder ();
200
228
propertiesTestConfigurations
201
229
.get ()
230
+ .filter (configuration -> configuration .brokerVersionCondition .test (brokerVersion ))
202
231
.forEach (configuration -> configuration .builder .accept (builder ));
203
232
204
233
AMQP .BasicProperties properties = builder .headers (headers ).build ();
@@ -234,7 +263,10 @@ void publishToStreamQueueConsumeFromStream(Codec codec) throws Exception {
234
263
.forEach (i -> assertThat (messageBodies .contains ("amqp " + i )).isTrue ());
235
264
Message message = messages .iterator ().next ();
236
265
237
- propertiesTestConfigurations .get ().forEach (c -> c .assertion .accept (message ));
266
+ propertiesTestConfigurations
267
+ .get ()
268
+ .filter (c -> c .brokerVersionCondition .test (brokerVersion ))
269
+ .forEach (c -> c .assertion .accept (message ));
238
270
239
271
assertThat (message .getMessageAnnotations ().get ("x-exchange" )).isEqualTo ("" );
240
272
assertThat (message .getMessageAnnotations ().get ("x-routing-key" )).isEqualTo (stream );
@@ -285,6 +317,7 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
285
317
LongStringHelper .asLongString (StringUtils .repeat ("*" , 300 )));
286
318
}),
287
319
mo (
320
+ BEFORE_MESSAGE_CONTAINERS ,
288
321
mb -> {
289
322
mb .properties ().messageId (messageIdUuid );
290
323
mb .properties ().correlationId (correlationIdUuid );
@@ -300,6 +333,19 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
300
333
"x-correlation-id-type" , LongStringHelper .asLongString ("uuid" ));
301
334
}),
302
335
mo (
336
+ AFTER_MESSAGE_CONTAINERS ,
337
+ mb -> {
338
+ mb .properties ().messageId (messageIdUuid );
339
+ mb .properties ().correlationId (correlationIdUuid );
340
+ },
341
+ d -> {
342
+ assertThat (d .getProperties ().getMessageId ())
343
+ .isEqualTo ("urn:uuid:" + messageIdUuid );
344
+ assertThat (d .getProperties ().getCorrelationId ())
345
+ .isEqualTo ("urn:uuid:" + correlationIdUuid );
346
+ }),
347
+ mo (
348
+ BEFORE_MESSAGE_CONTAINERS ,
303
349
mb -> {
304
350
mb .properties ().messageId (10 );
305
351
mb .properties ().correlationId (20 );
@@ -314,6 +360,19 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
314
360
"x-correlation-id-type" , LongStringHelper .asLongString ("ulong" ));
315
361
}),
316
362
mo (
363
+ AFTER_MESSAGE_CONTAINERS ,
364
+ mb -> {
365
+ mb .properties ().messageId (10 );
366
+ mb .properties ().correlationId (20 );
367
+ },
368
+ d -> {
369
+ assertThat (d .getProperties ().getMessageId ()).isEqualTo ("10" );
370
+ assertThat (d .getProperties ().getCorrelationId ()).isEqualTo ("20" );
371
+ assertThat (d .getProperties ().getHeaders ())
372
+ .doesNotContainKeys ("x-message-id-type" , "x-correlation-id-type" );
373
+ }),
374
+ mo (
375
+ BEFORE_MESSAGE_CONTAINERS ,
317
376
mb -> {
318
377
mb .properties ().messageId ("the message ID" .getBytes (UTF8 ));
319
378
mb .properties ().correlationId ("the correlation ID" .getBytes (UTF8 ));
@@ -329,6 +388,30 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
329
388
.containsEntry (
330
389
"x-correlation-id-type" , LongStringHelper .asLongString ("binary" ));
331
390
}),
391
+ mo (
392
+ AFTER_MESSAGE_CONTAINERS ,
393
+ mb -> {
394
+ mb .properties ().messageId ("the message ID" .getBytes (UTF8 ));
395
+ mb .properties ().correlationId ("the correlation ID" .getBytes (UTF8 ));
396
+ },
397
+ d -> {
398
+ assertThat (d .getProperties ().getMessageId ()).isNull ();
399
+ assertThat (d .getProperties ().getCorrelationId ()).isNull ();
400
+ assertThat (
401
+ d .getProperties ()
402
+ .getHeaders ()
403
+ .get ("x-message-id" )
404
+ .toString ()
405
+ .getBytes (UTF8 ))
406
+ .isEqualTo ("the message ID" .getBytes (UTF8 ));
407
+ assertThat (
408
+ d .getProperties ()
409
+ .getHeaders ()
410
+ .get ("x-correlation-id" )
411
+ .toString ()
412
+ .getBytes (UTF8 ))
413
+ .isEqualTo ("the correlation ID" .getBytes (UTF8 ));
414
+ }),
332
415
mo (
333
416
mb -> {
334
417
mb .properties ()
@@ -356,6 +439,8 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
356
439
357
440
testMessageOperations
358
441
.get ()
442
+ .filter (
443
+ testMessageOperation -> testMessageOperation .brokerVersionCondition .test (brokerVersion ))
359
444
.forEach (
360
445
testMessageOperation -> {
361
446
CountDownLatch confirmLatch = new CountDownLatch (messageCount );
@@ -476,6 +561,9 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
476
561
477
562
messageOperations
478
563
.get ()
564
+ .filter (
565
+ messageOperation ->
566
+ messageOperation .brokerVersionCondition .test (brokerVersion ))
479
567
.forEach (
480
568
messageOperation ->
481
569
messageOperation .messageBuilderConsumer .accept (messageBuilder ));
@@ -518,6 +606,9 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
518
606
519
607
messageOperations
520
608
.get ()
609
+ .filter (
610
+ messageOperation ->
611
+ messageOperation .brokerVersionCondition .test (brokerVersion ))
521
612
.forEach (messageOperation -> messageOperation .deliveryConsumer .accept (message ));
522
613
523
614
} catch (Exception e ) {
@@ -562,11 +653,15 @@ void messageWithEmptyBodyAndPropertiesShouldBeConvertedInAmqp(Codec codec) throw
562
653
private static class PropertiesTestConfiguration {
563
654
final Consumer <AMQP .BasicProperties .Builder > builder ;
564
655
final Consumer <Message > assertion ;
656
+ final Predicate <String > brokerVersionCondition ;
565
657
566
658
private PropertiesTestConfiguration (
567
- Consumer <AMQP .BasicProperties .Builder > builder , Consumer <Message > assertion ) {
659
+ Consumer <AMQP .BasicProperties .Builder > builder ,
660
+ Consumer <Message > assertion ,
661
+ Predicate <String > brokerVersionCondition ) {
568
662
this .builder = builder ;
569
663
this .assertion = assertion ;
664
+ this .brokerVersionCondition = brokerVersionCondition ;
570
665
}
571
666
}
572
667
@@ -584,11 +679,21 @@ private static class HeaderTestConfiguration {
584
679
private static class MessageOperation {
585
680
final Consumer <MessageBuilder > messageBuilderConsumer ;
586
681
final Consumer <Delivery > deliveryConsumer ;
682
+ final Predicate <String > brokerVersionCondition ;
587
683
588
684
MessageOperation (
589
- Consumer <MessageBuilder > messageBuilderConsumer , Consumer <Delivery > deliveryConsumer ) {
685
+ Consumer <MessageBuilder > messageBuilderConsumer ,
686
+ Consumer <Delivery > deliveryConsumer ,
687
+ Predicate <String > brokerVersionCondition ) {
590
688
this .messageBuilderConsumer = messageBuilderConsumer ;
591
689
this .deliveryConsumer = deliveryConsumer ;
690
+ this .brokerVersionCondition = brokerVersionCondition ;
592
691
}
593
692
}
693
+
694
+ private static final Predicate <String > BEFORE_MESSAGE_CONTAINERS =
695
+ TestUtils ::beforeMessageContainers ;
696
+
697
+ private static final Predicate <String > AFTER_MESSAGE_CONTAINERS =
698
+ TestUtils ::afterMessageContainers ;
594
699
}
0 commit comments