Skip to content

Commit 128372f

Browse files
garyrussellartembilan
authored andcommitted
GH-2357: Switch to CompletableFuture
Resolves #2357 Spring Framework is planning to deprecate `ListenableFuture` in 6.0. Add methods to the `KafkaOperations` (`KafkaTemplate`) that return `CompletableFuture` instead; the `ListenableFuture` methods will be removed in 3.0. Provide mechanisms to ease the migration; allowing users to use `CompletableFuture`s in this release, which will significantly reduce the effort to switch in 3.0. **2.9 Only; I will issue a separate PR for main**
1 parent a4ee066 commit 128372f

14 files changed

+679
-29
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,21 @@ interface ProducerCallback<K, V, T> {
210210

211211
See the https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/KafkaTemplate.html[Javadoc] for more detail.
212212

213+
IMPORTANT: In version 3.0, the methods that return `ListenableFuture` will be changed to return `CompletableFuture`.
214+
To facilitate the migration, the 2.9 version has a method `.usingCompletableFuture()` which will provide the same methods with `CompletableFuture` return types.
215+
216+
====
217+
[source, java]
218+
----
219+
KafkaOperations2<String, String> template = new KafkaTemplate<>().usingCompletableFuture();
220+
CompletableFuture<SendResult<String, String>> future = template.send(topic1, 0, 0, "buz")
221+
.whenComplete((sr, thrown) -> {
222+
...
223+
});
224+
)
225+
----
226+
====
227+
213228
The `sendDefault` API requires that a default topic has been provided to the template.
214229

215230
The API takes in a `timestamp` as a parameter and stores this timestamp in the record.
@@ -553,6 +568,9 @@ The result is a `ListenableFuture` that is asynchronously populated with the res
553568
The result also has a `sendFuture` property, which is the result of calling `KafkaTemplate.send()`.
554569
You can use this future to determine the result of the send operation.
555570

571+
IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) will be `CompletableFuture` s instead of `ListenableFuture` s.
572+
To assit in the transition, using this release, you can convert these types to a `CompleteableFuture` by calling `asCompletable()` on the returned `Future`.
573+
556574
If the first method is used, or the `replyTimeout` argument is `null`, the template's `defaultReplyTimeout` property is used (5 seconds by default).
557575

558576
Starting with version 2.8.8, the template has a new method `waitForAssignment`.
@@ -791,6 +809,9 @@ RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
791809

792810
These will use the template's default `replyTimeout`, there are also overloaded versions that can take a timeout in the method call.
793811

812+
IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) will be `CompletableFuture` s instead of `ListenableFuture` s.
813+
To assit in the transition, using this release, you can convert these types to a `CompleteableFuture` by calling `asCompletable()` on the returned `Future`.
814+
794815
Use the first method if the consumer's `Deserializer` or the template's `MessageConverter` can convert the payload without any additional information, either via configuration or type metadata in the reply message.
795816

796817
Use the second method if you need to provide type information for the return type, to assist the message converter.

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,18 @@ You can now configure which inbound headers should be mapped.
3535
Also available in version 2.8.8 or later.
3636
See <<headers>> for more information.
3737

38+
[[x29-template-changes]]
39+
==== `KafkaTemplate` Changes
40+
41+
In 3.0, the futures returned by this class will be `CompletableFuture` s instead of `ListenableFuture` s.
42+
See <<kafka-template>> for assistance in transitioning when using this release.
43+
3844
[[x29-rkt-changes]]
3945
==== `ReplyingKafkaTemplate` Changes
4046

4147
The template now provides a method to wait for assignment on the reply container, to avoid a race when sending a request before the reply container is initialized.
4248
Also available in version 2.8.8 or later.
4349
See <<replying-template>>.
50+
51+
In 3.0, the futures returned by this class will be `CompletableFuture` s instead of `ListenableFuture` s.
52+
See <<replying-template>> and <<exchanging-messages>> for assistance in transitioning when using this release.

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Collection;
2121
import java.util.List;
2222
import java.util.Map;
23+
import java.util.concurrent.CompletableFuture;
2324

2425
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
2526
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -67,15 +68,21 @@ public interface KafkaOperations<K, V> {
6768
* Send the data to the default topic with no key or partition.
6869
* @param data The data.
6970
* @return a Future for the {@link SendResult}.
71+
* @deprecated see {@link #usingCompletableFuture()}
72+
* @see #usingCompletableFuture()
7073
*/
74+
@Deprecated
7175
ListenableFuture<SendResult<K, V>> sendDefault(V data);
7276

7377
/**
7478
* Send the data to the default topic with the provided key and no partition.
7579
* @param key the key.
7680
* @param data The data.
7781
* @return a Future for the {@link SendResult}.
82+
* @deprecated see {@link #usingCompletableFuture()}
83+
* @see #usingCompletableFuture()
7884
*/
85+
@Deprecated
7986
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
8087

8188
/**
@@ -84,7 +91,10 @@ public interface KafkaOperations<K, V> {
8491
* @param key the key.
8592
* @param data the data.
8693
* @return a Future for the {@link SendResult}.
94+
* @deprecated see {@link #usingCompletableFuture()}
95+
* @see #usingCompletableFuture()
8796
*/
97+
@Deprecated
8898
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
8999

90100
/**
@@ -95,15 +105,21 @@ public interface KafkaOperations<K, V> {
95105
* @param data the data.
96106
* @return a Future for the {@link SendResult}.
97107
* @since 1.3
108+
* @deprecated see {@link #usingCompletableFuture()}
109+
* @see #usingCompletableFuture()
98110
*/
111+
@Deprecated
99112
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
100113

101114
/**
102115
* Send the data to the provided topic with no key or partition.
103116
* @param topic the topic.
104117
* @param data The data.
105118
* @return a Future for the {@link SendResult}.
119+
* @deprecated see {@link #usingCompletableFuture()}
120+
* @see #usingCompletableFuture()
106121
*/
122+
@Deprecated
107123
ListenableFuture<SendResult<K, V>> send(String topic, V data);
108124

109125
/**
@@ -112,7 +128,10 @@ public interface KafkaOperations<K, V> {
112128
* @param key the key.
113129
* @param data The data.
114130
* @return a Future for the {@link SendResult}.
131+
* @deprecated see {@link #usingCompletableFuture()}
132+
* @see #usingCompletableFuture()
115133
*/
134+
@Deprecated
116135
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
117136

118137
/**
@@ -122,7 +141,10 @@ public interface KafkaOperations<K, V> {
122141
* @param key the key.
123142
* @param data the data.
124143
* @return a Future for the {@link SendResult}.
144+
* @deprecated see {@link #usingCompletableFuture()}
145+
* @see #usingCompletableFuture()
125146
*/
147+
@Deprecated
126148
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
127149

128150
/**
@@ -134,26 +156,35 @@ public interface KafkaOperations<K, V> {
134156
* @param data the data.
135157
* @return a Future for the {@link SendResult}.
136158
* @since 1.3
159+
* @deprecated see {@link #usingCompletableFuture()}
160+
* @see #usingCompletableFuture()
137161
*/
162+
@Deprecated
138163
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
139164

140165
/**
141166
* Send the provided {@link ProducerRecord}.
142167
* @param record the record.
143168
* @return a Future for the {@link SendResult}.
144169
* @since 1.3
170+
* @deprecated see {@link #usingCompletableFuture()}
171+
* @see #usingCompletableFuture()
145172
*/
173+
@Deprecated
146174
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
147175

148176
/**
149177
* Send a message with routing information in message headers. The message payload
150178
* may be converted before sending.
151179
* @param message the message to send.
152180
* @return a Future for the {@link SendResult}.
181+
* @deprecated see {@link #usingCompletableFuture()}
153182
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
154183
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION
155184
* @see org.springframework.kafka.support.KafkaHeaders#KEY
185+
* @see #usingCompletableFuture()
156186
*/
187+
@Deprecated
157188
ListenableFuture<SendResult<K, V>> send(Message<?> message);
158189

159190
/**
@@ -328,6 +359,148 @@ default ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested
328359
*/
329360
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
330361

362+
/**
363+
* Return an implementation that returns {@link CompletableFuture} instead of
364+
* {@link ListenableFuture}. The methods returning {@link ListenableFuture} will be
365+
* removed in 3.0
366+
* @return the implementation.
367+
* @since 2.9.
368+
*/
369+
default KafkaOperations2<K, V> usingCompletableFuture() {
370+
return new KafkaOperations2<K, V>() {
371+
372+
KafkaOperations<K, V> ops;
373+
374+
@Override
375+
public CompletableFuture<SendResult<K, V>> sendDefault(V data) {
376+
return KafkaOperations.this.sendDefault(data).completable();
377+
}
378+
379+
@Override
380+
public CompletableFuture<SendResult<K, V>> sendDefault(K key, V data) {
381+
return KafkaOperations.this.sendDefault(key, data).completable();
382+
}
383+
384+
@Override
385+
public CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data) {
386+
return KafkaOperations.this.sendDefault(partition, key, data).completable();
387+
}
388+
389+
@Override
390+
public CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data) {
391+
return KafkaOperations.this.sendDefault(partition, timestamp, key, data).completable();
392+
}
393+
394+
@Override
395+
public CompletableFuture<SendResult<K, V>> send(String topic, V data) {
396+
return KafkaOperations.this.send(topic, data).completable();
397+
}
398+
399+
@Override
400+
public CompletableFuture<SendResult<K, V>> send(String topic, K key, V data) {
401+
return KafkaOperations.this.send(topic, key, data).completable();
402+
}
403+
404+
@Override
405+
public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data) {
406+
return KafkaOperations.this.send(topic, partition, key, data).completable();
407+
}
408+
409+
@Override
410+
public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
411+
V data) {
412+
return KafkaOperations.this.send(topic, partition, timestamp, key, data).completable();
413+
}
414+
415+
@Override
416+
public CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
417+
return KafkaOperations.this.send(record).completable();
418+
}
419+
420+
@Override
421+
public CompletableFuture<SendResult<K, V>> send(Message<?> message) {
422+
return KafkaOperations.this.send(message).completable();
423+
}
424+
425+
@Override
426+
public List<PartitionInfo> partitionsFor(String topic) {
427+
return KafkaOperations.this.partitionsFor(topic);
428+
}
429+
430+
@Override
431+
public Map<MetricName, ? extends Metric> metrics() {
432+
return KafkaOperations.this.metrics();
433+
}
434+
435+
@Override
436+
@Nullable
437+
public <T> T execute(ProducerCallback<K, V, T> callback) {
438+
return KafkaOperations.this.execute(callback);
439+
}
440+
441+
@Override
442+
@Nullable
443+
public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
444+
return KafkaOperations.this.executeInTransaction(callback);
445+
}
446+
447+
@Override
448+
public void flush() {
449+
KafkaOperations.this.flush();
450+
}
451+
452+
@Override
453+
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
454+
ConsumerGroupMetadata groupMetadata) {
455+
KafkaOperations.this.sendOffsetsToTransaction(offsets, groupMetadata);
456+
}
457+
458+
@Override
459+
public boolean isTransactional() {
460+
return KafkaOperations.this.isTransactional();
461+
}
462+
463+
@Override
464+
public boolean isAllowNonTransactional() {
465+
return KafkaOperations.this.isAllowNonTransactional();
466+
}
467+
468+
@Override
469+
public boolean inTransaction() {
470+
return KafkaOperations.this.inTransaction();
471+
}
472+
473+
@Override
474+
public ProducerFactory<K, V> getProducerFactory() {
475+
return KafkaOperations.this.getProducerFactory();
476+
}
477+
478+
@Override
479+
@Nullable
480+
public ConsumerRecord<K, V> receive(String topic, int partition, long offset) {
481+
return KafkaOperations.this.receive(topic, partition, offset);
482+
}
483+
484+
@Override
485+
@Nullable
486+
public ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout) {
487+
return KafkaOperations.this.receive(topic, partition, offset, pollTimeout);
488+
}
489+
490+
@Override
491+
public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested) {
492+
return KafkaOperations.this.receive(requested);
493+
}
494+
495+
@Override
496+
public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) {
497+
return KafkaOperations.this.receive(requested, pollTimeout);
498+
}
499+
500+
};
501+
502+
}
503+
331504
/**
332505
* A callback for executing arbitrary operations on the {@link Producer}.
333506
* @param <K> the key type.

0 commit comments

Comments
 (0)