Skip to content

Commit af3eff3

Browse files
committed
Fix KafkaItemReaderTests
Tests in this class fail intermittently because they send messages to Kafka in an asynchronous way and assert on the results immediately without waiting for the send operation to complete. This commit updates the tests to wait for send results before asserting on them (similar to a140a9f). (cherry picked from commit 5826111)
1 parent c1fbed0 commit af3eff3

File tree

1 file changed

+44
-25
lines changed

1 file changed

+44
-25
lines changed

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java

+44-25
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3939
import org.springframework.kafka.core.KafkaTemplate;
4040
import org.springframework.kafka.core.ProducerFactory;
41+
import org.springframework.kafka.support.SendResult;
4142
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
4243
import org.springframework.kafka.test.utils.KafkaTestUtils;
4344
import org.springframework.util.concurrent.ListenableFuture;
@@ -186,12 +187,16 @@ public void testValidation() {
186187
}
187188

188189
@Test
189-
public void testReadFromSinglePartition() {
190+
public void testReadFromSinglePartition() throws ExecutionException, InterruptedException {
190191
this.template.setDefaultTopic("topic1");
191-
this.template.sendDefault("val0");
192-
this.template.sendDefault("val1");
193-
this.template.sendDefault("val2");
194-
this.template.sendDefault("val3");
192+
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
193+
futures.add(this.template.sendDefault("val0"));
194+
futures.add(this.template.sendDefault("val1"));
195+
futures.add(this.template.sendDefault("val2"));
196+
futures.add(this.template.sendDefault("val3"));
197+
for (ListenableFuture<SendResult<String, String>> future : futures) {
198+
future.get();
199+
}
195200

196201
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic1", 0);
197202
this.reader.setPollTimeout(Duration.ofSeconds(1));
@@ -216,12 +221,16 @@ public void testReadFromSinglePartition() {
216221
}
217222

218223
@Test
219-
public void testReadFromSinglePartitionFromCustomOffset() {
224+
public void testReadFromSinglePartitionFromCustomOffset() throws ExecutionException, InterruptedException {
220225
this.template.setDefaultTopic("topic5");
221-
this.template.sendDefault("val0"); // <-- offset 0
222-
this.template.sendDefault("val1"); // <-- offset 1
223-
this.template.sendDefault("val2"); // <-- offset 2
224-
this.template.sendDefault("val3"); // <-- offset 3
226+
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
227+
futures.add(this.template.sendDefault("val0")); // <-- offset 0
228+
futures.add(this.template.sendDefault("val1")); // <-- offset 1
229+
futures.add(this.template.sendDefault("val2")); // <-- offset 2
230+
futures.add(this.template.sendDefault("val3")); // <-- offset 3
231+
for (ListenableFuture<SendResult<String, String>> future : futures) {
232+
future.get();
233+
}
225234

226235
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic5", 0);
227236

@@ -250,9 +259,12 @@ public void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Excep
250259
// first run: read a topic from the beginning
251260

252261
this.template.setDefaultTopic("topic6");
253-
this.template.sendDefault("val0"); // <-- offset 0
254-
this.template.sendDefault("val1"); // <-- offset 1
255-
262+
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
263+
futures.add(this.template.sendDefault("val0")); // <-- offset 0
264+
futures.add(this.template.sendDefault("val1")); // <-- offset 1
265+
for (ListenableFuture<SendResult<String, String>> future : futures) {
266+
future.get();
267+
}
256268
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic6", 0);
257269
this.reader.setPollTimeout(Duration.ofSeconds(1));
258270
this.reader.open(new ExecutionContext());
@@ -299,12 +311,16 @@ public void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Excep
299311
}
300312

301313
@Test
302-
public void testReadFromMultiplePartitions() {
314+
public void testReadFromMultiplePartitions() throws ExecutionException, InterruptedException {
303315
this.template.setDefaultTopic("topic2");
304-
this.template.sendDefault("val0");
305-
this.template.sendDefault("val1");
306-
this.template.sendDefault("val2");
307-
this.template.sendDefault("val3");
316+
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
317+
futures.add(this.template.sendDefault("val0"));
318+
futures.add(this.template.sendDefault("val1"));
319+
futures.add(this.template.sendDefault("val2"));
320+
futures.add(this.template.sendDefault("val3"));
321+
for (ListenableFuture<SendResult<String, String>> future : futures) {
322+
future.get();
323+
}
308324

309325
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic2", 0, 1);
310326
this.reader.setPollTimeout(Duration.ofSeconds(1));
@@ -323,14 +339,17 @@ public void testReadFromMultiplePartitions() {
323339
}
324340

325341
@Test
326-
public void testReadFromSinglePartitionAfterRestart() {
342+
public void testReadFromSinglePartitionAfterRestart() throws ExecutionException, InterruptedException {
327343
this.template.setDefaultTopic("topic3");
328-
this.template.sendDefault("val0");
329-
this.template.sendDefault("val1");
330-
this.template.sendDefault("val2");
331-
this.template.sendDefault("val3");
332-
this.template.sendDefault("val4");
333-
344+
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
345+
futures.add(this.template.sendDefault("val0"));
346+
futures.add(this.template.sendDefault("val1"));
347+
futures.add(this.template.sendDefault("val2"));
348+
futures.add(this.template.sendDefault("val3"));
349+
futures.add(this.template.sendDefault("val4"));
350+
for (ListenableFuture<SendResult<String, String>> future : futures) {
351+
future.get();
352+
}
334353
ExecutionContext executionContext = new ExecutionContext();
335354
Map<TopicPartition, Long> offsets = new HashMap<>();
336355
offsets.put(new TopicPartition("topic3", 0), 1L);

0 commit comments

Comments
 (0)