Skip to content

Commit a7cf43d

Browse files
committed
Fix KafkaMLCTests for proper CountDownLatch place
In the `KafkaMessageListenerContainerTests.invokeBatchInterceptorSuccessFailureOnRetry()` we are expecting interaction with a `BatchInterceptor`, when its `success` or `failure` happens already after the `MessageListener` call, but a `CountDownLatch` thread barrier in the test setup is counted in the `MessageListener` leading to race condition when we exit from the barrier to verification phase, but `BatchInterceptor.success()` has not been called yet in the other thread. * Fix `KafkaMessageListenerContainerTests.invokeBatchInterceptorSuccessFailureOnRetry()` moving `CountDownLatch` to the `BatchInterceptor.success()` implementation. Leave `MessageListener` just only with an `AtomicInteger` for retry attempts
1 parent 5d5291f commit a7cf43d

File tree

1 file changed

+12
-4
lines changed

1 file changed

+12
-4
lines changed

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4313,14 +4313,14 @@ public void invokeBatchInterceptorSuccessFailureOnRetry() throws Exception {
43134313
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
43144314
new TopicPartitionOffset("test-topic", 0) };
43154315

4316-
CountDownLatch latch = new CountDownLatch(4); // 3 failures, 1 success
4316+
AtomicInteger attempts = new AtomicInteger(3); // 3 failures, 1 success
4317+
// Cannot be lambda: Mockito doesn't mock final classes
43174318
BatchMessageListener<Integer, String> batchMessageListener = spy(
43184319
new BatchMessageListener<Integer, String>() { // Cannot be lambda: Mockito doesn't mock final classes
43194320

43204321
@Override
43214322
public void onMessage(List<ConsumerRecord<Integer, String>> data) {
4322-
latch.countDown();
4323-
if (latch.getCount() > 0) {
4323+
if (attempts.getAndDecrement() > 0) {
43244324
throw new IllegalArgumentException("Failed record");
43254325
}
43264326
}
@@ -4334,6 +4334,8 @@ public void onMessage(List<ConsumerRecord<Integer, String>> data) {
43344334
containerProps.setMessageListener(batchMessageListener);
43354335
containerProps.setClientId("clientId");
43364336

4337+
CountDownLatch successLatch = new CountDownLatch(1);
4338+
43374339
BatchInterceptor<Integer, String> batchInterceptor = spy(new BatchInterceptor<Integer, String>() {
43384340

43394341
@Override
@@ -4342,14 +4344,20 @@ public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, Strin
43424344
return records;
43434345
}
43444346

4347+
@Override
4348+
public void success(ConsumerRecords<Integer, String> records, Consumer<Integer, String> consumer) {
4349+
successLatch.countDown();
4350+
}
4351+
43454352
});
43464353

43474354
KafkaMessageListenerContainer<Integer, String> container =
43484355
new KafkaMessageListenerContainer<>(cf, containerProps);
43494356
container.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0, 3)));
43504357
container.setBatchInterceptor(batchInterceptor);
43514358
container.start();
4352-
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
4359+
4360+
assertThat(successLatch.await(10, TimeUnit.SECONDS)).isTrue();
43534361

43544362
InOrder inOrder = inOrder(batchInterceptor, batchMessageListener, consumer);
43554363
for (int i = 0; i < 3; i++) {

0 commit comments

Comments
 (0)