Skip to content

Commit a140a9f

Browse files
committed
Attempting to fix the KafkaItemReaderTests#testReadFromMultiplePartitionsAfterRestart
1 parent 683340e commit a140a9f

File tree

1 file changed

+16
-9
lines changed

1 file changed

+16
-9
lines changed

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.Properties;
25+
import java.util.concurrent.ExecutionException;
2526

2627
import org.apache.kafka.clients.admin.NewTopic;
2728
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -38,6 +39,7 @@
3839
import org.springframework.kafka.core.ProducerFactory;
3940
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
4041
import org.springframework.kafka.test.utils.KafkaTestUtils;
42+
import org.springframework.util.concurrent.ListenableFuture;
4143

4244
import static org.hamcrest.Matchers.containsInAnyOrder;
4345
import static org.hamcrest.Matchers.is;
@@ -269,15 +271,20 @@ public void testReadFromSinglePartitionAfterRestart() {
269271
}
270272

271273
@Test
272-
public void testReadFromMultiplePartitionsAfterRestart() {
273-
this.template.send("topic4", 0, null, "val0");
274-
this.template.send("topic4", 0, null, "val2");
275-
this.template.send("topic4", 0, null, "val4");
276-
this.template.send("topic4", 0, null, "val6");
277-
this.template.send("topic4", 1, null, "val1");
278-
this.template.send("topic4", 1, null, "val3");
279-
this.template.send("topic4", 1, null, "val5");
280-
this.template.send("topic4", 1, null, "val7");
274+
public void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, InterruptedException {
275+
List<ListenableFuture> futures = new ArrayList<>();
276+
futures.add(this.template.send("topic4", 0, null, "val0"));
277+
futures.add(this.template.send("topic4", 0, null, "val2"));
278+
futures.add(this.template.send("topic4", 0, null, "val4"));
279+
futures.add(this.template.send("topic4", 0, null, "val6"));
280+
futures.add(this.template.send("topic4", 1, null, "val1"));
281+
futures.add(this.template.send("topic4", 1, null, "val3"));
282+
futures.add(this.template.send("topic4", 1, null, "val5"));
283+
futures.add(this.template.send("topic4", 1, null, "val7"));
284+
285+
for (ListenableFuture future : futures) {
286+
future.get();
287+
}
281288

282289
ExecutionContext executionContext = new ExecutionContext();
283290
Map<TopicPartition, Long> offsets = new HashMap<>();

0 commit comments

Comments
 (0)