diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemWriter.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemWriter.java index 0fcdb63a16..3e496a4654 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemWriter.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,10 +21,10 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.Assert; -import org.springframework.util.concurrent.ListenableFuture; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -42,24 +42,24 @@ public class KafkaItemWriter extends KeyValueItemWriter { protected KafkaTemplate kafkaTemplate; - private final List>> listenableFutures = new ArrayList<>(); + private final List>> completableFutures = new ArrayList<>(); private long timeout = -1; @Override protected void writeKeyValue(K key, T value) { if (this.delete) { - this.listenableFutures.add(this.kafkaTemplate.sendDefault(key, null)); + this.completableFutures.add(this.kafkaTemplate.sendDefault(key, null)); } else { - this.listenableFutures.add(this.kafkaTemplate.sendDefault(key, value)); + this.completableFutures.add(this.kafkaTemplate.sendDefault(key, value)); } } @Override protected void flush() throws Exception { this.kafkaTemplate.flush(); - for (ListenableFuture> future : this.listenableFutures) { + for (var future : this.completableFutures) { if (this.timeout >= 0) { future.get(this.timeout, TimeUnit.MILLISECONDS); } @@ -67,7 +67,7 @@ protected void flush() throws Exception { future.get(); } } - this.listenableFutures.clear(); + this.completableFutures.clear(); } @Override diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java index 1e0f8959c9..8f5b99e035 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.NewTopic; @@ -38,10 +39,8 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import org.springframework.kafka.test.utils.KafkaTestUtils; -import org.springframework.util.concurrent.ListenableFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -187,12 +186,12 @@ public void testValidation() { @Test public void testReadFromSinglePartition() throws ExecutionException, InterruptedException { this.template.setDefaultTopic("topic1"); - List>> futures = new ArrayList<>(); + var futures = new ArrayList>(); futures.add(this.template.sendDefault("val0")); futures.add(this.template.sendDefault("val1")); futures.add(this.template.sendDefault("val2")); futures.add(this.template.sendDefault("val3")); - for (ListenableFuture> future : futures) { + for (var future : futures) { future.get(); } @@ -221,12 +220,12 @@ public void testReadFromSinglePartition() throws ExecutionException, Interrupted @Test public void testReadFromSinglePartitionFromCustomOffset() throws ExecutionException, InterruptedException { this.template.setDefaultTopic("topic5"); - List>> futures = new ArrayList<>(); + var futures = new ArrayList>(); futures.add(this.template.sendDefault("val0")); // <-- offset 0 futures.add(this.template.sendDefault("val1")); // <-- offset 1 futures.add(this.template.sendDefault("val2")); // <-- offset 2 futures.add(this.template.sendDefault("val3")); // <-- offset 3 - for (ListenableFuture> future : futures) { + for (var future : futures) { future.get(); } @@ -257,10 +256,10 @@ public void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Excep // first run: read a topic from the beginning this.template.setDefaultTopic("topic6"); - List>> futures = new ArrayList<>(); + var futures = new ArrayList>(); futures.add(this.template.sendDefault("val0")); // <-- offset 0 futures.add(this.template.sendDefault("val1")); // <-- offset 1 - for (ListenableFuture> future : futures) { + for (var future : futures) { future.get(); } this.reader = new KafkaItemReader<>(this.consumerProperties, "topic6", 0); @@ -311,12 +310,12 @@ public void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Excep @Test public void testReadFromMultiplePartitions() throws ExecutionException, InterruptedException { this.template.setDefaultTopic("topic2"); - List>> futures = new ArrayList<>(); + var futures = new ArrayList>(); futures.add(this.template.sendDefault("val0")); futures.add(this.template.sendDefault("val1")); futures.add(this.template.sendDefault("val2")); futures.add(this.template.sendDefault("val3")); - for (ListenableFuture> future : futures) { + for (var future : futures) { future.get(); } @@ -339,13 +338,13 @@ public void testReadFromMultiplePartitions() throws ExecutionException, Interrup @Test public void testReadFromSinglePartitionAfterRestart() throws ExecutionException, InterruptedException { this.template.setDefaultTopic("topic3"); - List>> futures = new ArrayList<>(); + var futures = new ArrayList>(); futures.add(this.template.sendDefault("val0")); futures.add(this.template.sendDefault("val1")); futures.add(this.template.sendDefault("val2")); futures.add(this.template.sendDefault("val3")); futures.add(this.template.sendDefault("val4")); - for (ListenableFuture> future : futures) { + for (var future : futures) { future.get(); } ExecutionContext executionContext = new ExecutionContext(); @@ -375,7 +374,7 @@ public void testReadFromSinglePartitionAfterRestart() throws ExecutionException, @Test public void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, InterruptedException { - List>> futures = new ArrayList<>(); + var futures = new ArrayList>(); futures.add(this.template.send("topic4", 0, null, "val0")); futures.add(this.template.send("topic4", 0, null, "val2")); futures.add(this.template.send("topic4", 0, null, "val4")); @@ -385,7 +384,7 @@ public void testReadFromMultiplePartitionsAfterRestart() throws ExecutionExcepti futures.add(this.template.send("topic4", 1, null, "val5")); futures.add(this.template.send("topic4", 1, null, "val7")); - for (ListenableFuture future : futures) { + for (var future : futures) { future.get(); } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemWriterTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemWriterTests.java index 374b4f2e07..32eae36293 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemWriterTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemWriterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.junit.Before; @@ -28,7 +29,6 @@ import org.springframework.core.convert.converter.Converter; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; -import org.springframework.util.concurrent.ListenableFuture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -46,7 +46,7 @@ public class KafkaItemWriterTests { private KafkaTemplate kafkaTemplate; @Mock - private ListenableFuture> future; + private CompletableFuture> future; private KafkaItemKeyMapper itemKeyMapper;