Skip to content

Commit a2cf74a

Browse files
hpoettkerfmbenhassine
authored andcommitted
Replace ListenableFuture with CompletableFuture
Related to spring-projects/spring-kafka#2357
1 parent ee7d126 commit a2cf74a

File tree

3 files changed

+23
-24
lines changed

3 files changed

+23
-24
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemWriter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,10 +21,10 @@
2121
import org.springframework.kafka.core.KafkaTemplate;
2222
import org.springframework.kafka.support.SendResult;
2323
import org.springframework.util.Assert;
24-
import org.springframework.util.concurrent.ListenableFuture;
2524

2625
import java.util.ArrayList;
2726
import java.util.List;
27+
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.TimeUnit;
2929

3030
/**
@@ -42,32 +42,32 @@ public class KafkaItemWriter<K, T> extends KeyValueItemWriter<K, T> {
4242

4343
protected KafkaTemplate<K, T> kafkaTemplate;
4444

45-
private final List<ListenableFuture<SendResult<K, T>>> listenableFutures = new ArrayList<>();
45+
private final List<CompletableFuture<SendResult<K, T>>> completableFutures = new ArrayList<>();
4646

4747
private long timeout = -1;
4848

4949
@Override
5050
protected void writeKeyValue(K key, T value) {
5151
if (this.delete) {
52-
this.listenableFutures.add(this.kafkaTemplate.sendDefault(key, null));
52+
this.completableFutures.add(this.kafkaTemplate.sendDefault(key, null));
5353
}
5454
else {
55-
this.listenableFutures.add(this.kafkaTemplate.sendDefault(key, value));
55+
this.completableFutures.add(this.kafkaTemplate.sendDefault(key, value));
5656
}
5757
}
5858

5959
@Override
6060
protected void flush() throws Exception {
6161
this.kafkaTemplate.flush();
62-
for (ListenableFuture<SendResult<K, T>> future : this.listenableFutures) {
62+
for (var future : this.completableFutures) {
6363
if (this.timeout >= 0) {
6464
future.get(this.timeout, TimeUnit.MILLISECONDS);
6565
}
6666
else {
6767
future.get();
6868
}
6969
}
70-
this.listenableFutures.clear();
70+
this.completableFutures.clear();
7171
}
7272

7373
@Override

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.Properties;
25+
import java.util.concurrent.CompletableFuture;
2526
import java.util.concurrent.ExecutionException;
2627

2728
import org.apache.kafka.clients.admin.NewTopic;
@@ -38,10 +39,8 @@
3839
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3940
import org.springframework.kafka.core.KafkaTemplate;
4041
import org.springframework.kafka.core.ProducerFactory;
41-
import org.springframework.kafka.support.SendResult;
4242
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
4343
import org.springframework.kafka.test.utils.KafkaTestUtils;
44-
import org.springframework.util.concurrent.ListenableFuture;
4544

4645
import static org.hamcrest.MatcherAssert.assertThat;
4746
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -187,12 +186,12 @@ public void testValidation() {
187186
@Test
188187
public void testReadFromSinglePartition() throws ExecutionException, InterruptedException {
189188
this.template.setDefaultTopic("topic1");
190-
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
189+
var futures = new ArrayList<CompletableFuture<?>>();
191190
futures.add(this.template.sendDefault("val0"));
192191
futures.add(this.template.sendDefault("val1"));
193192
futures.add(this.template.sendDefault("val2"));
194193
futures.add(this.template.sendDefault("val3"));
195-
for (ListenableFuture<SendResult<String, String>> future : futures) {
194+
for (var future : futures) {
196195
future.get();
197196
}
198197

@@ -221,12 +220,12 @@ public void testReadFromSinglePartition() throws ExecutionException, Interrupted
221220
@Test
222221
public void testReadFromSinglePartitionFromCustomOffset() throws ExecutionException, InterruptedException {
223222
this.template.setDefaultTopic("topic5");
224-
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
223+
var futures = new ArrayList<CompletableFuture<?>>();
225224
futures.add(this.template.sendDefault("val0")); // <-- offset 0
226225
futures.add(this.template.sendDefault("val1")); // <-- offset 1
227226
futures.add(this.template.sendDefault("val2")); // <-- offset 2
228227
futures.add(this.template.sendDefault("val3")); // <-- offset 3
229-
for (ListenableFuture<SendResult<String, String>> future : futures) {
228+
for (var future : futures) {
230229
future.get();
231230
}
232231

@@ -257,10 +256,10 @@ public void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Excep
257256
// first run: read a topic from the beginning
258257

259258
this.template.setDefaultTopic("topic6");
260-
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
259+
var futures = new ArrayList<CompletableFuture<?>>();
261260
futures.add(this.template.sendDefault("val0")); // <-- offset 0
262261
futures.add(this.template.sendDefault("val1")); // <-- offset 1
263-
for (ListenableFuture<SendResult<String, String>> future : futures) {
262+
for (var future : futures) {
264263
future.get();
265264
}
266265
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic6", 0);
@@ -311,12 +310,12 @@ public void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Excep
311310
@Test
312311
public void testReadFromMultiplePartitions() throws ExecutionException, InterruptedException {
313312
this.template.setDefaultTopic("topic2");
314-
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
313+
var futures = new ArrayList<CompletableFuture<?>>();
315314
futures.add(this.template.sendDefault("val0"));
316315
futures.add(this.template.sendDefault("val1"));
317316
futures.add(this.template.sendDefault("val2"));
318317
futures.add(this.template.sendDefault("val3"));
319-
for (ListenableFuture<SendResult<String, String>> future : futures) {
318+
for (var future : futures) {
320319
future.get();
321320
}
322321

@@ -339,13 +338,13 @@ public void testReadFromMultiplePartitions() throws ExecutionException, Interrup
339338
@Test
340339
public void testReadFromSinglePartitionAfterRestart() throws ExecutionException, InterruptedException {
341340
this.template.setDefaultTopic("topic3");
342-
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
341+
var futures = new ArrayList<CompletableFuture<?>>();
343342
futures.add(this.template.sendDefault("val0"));
344343
futures.add(this.template.sendDefault("val1"));
345344
futures.add(this.template.sendDefault("val2"));
346345
futures.add(this.template.sendDefault("val3"));
347346
futures.add(this.template.sendDefault("val4"));
348-
for (ListenableFuture<SendResult<String, String>> future : futures) {
347+
for (var future : futures) {
349348
future.get();
350349
}
351350
ExecutionContext executionContext = new ExecutionContext();
@@ -375,7 +374,7 @@ public void testReadFromSinglePartitionAfterRestart() throws ExecutionException,
375374

376375
@Test
377376
public void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, InterruptedException {
378-
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
377+
var futures = new ArrayList<CompletableFuture<?>>();
379378
futures.add(this.template.send("topic4", 0, null, "val0"));
380379
futures.add(this.template.send("topic4", 0, null, "val2"));
381380
futures.add(this.template.send("topic4", 0, null, "val4"));
@@ -385,7 +384,7 @@ public void testReadFromMultiplePartitionsAfterRestart() throws ExecutionExcepti
385384
futures.add(this.template.send("topic4", 1, null, "val5"));
386385
futures.add(this.template.send("topic4", 1, null, "val7"));
387386

388-
for (ListenableFuture<?> future : futures) {
387+
for (var future : futures) {
389388
future.get();
390389
}
391390

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemWriterTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717

1818
import java.util.Arrays;
1919
import java.util.List;
20+
import java.util.concurrent.CompletableFuture;
2021
import java.util.concurrent.TimeUnit;
2122

2223
import org.junit.Before;
@@ -28,7 +29,6 @@
2829
import org.springframework.core.convert.converter.Converter;
2930
import org.springframework.kafka.core.KafkaTemplate;
3031
import org.springframework.kafka.support.SendResult;
31-
import org.springframework.util.concurrent.ListenableFuture;
3232

3333
import static org.junit.Assert.assertEquals;
3434
import static org.junit.Assert.fail;
@@ -46,7 +46,7 @@ public class KafkaItemWriterTests {
4646
private KafkaTemplate<String, String> kafkaTemplate;
4747

4848
@Mock
49-
private ListenableFuture<SendResult<String, String>> future;
49+
private CompletableFuture<SendResult<String, String>> future;
5050

5151
private KafkaItemKeyMapper itemKeyMapper;
5252

0 commit comments

Comments
 (0)