Skip to content

Commit 90c6e95

Browse files
garyrussellartembilan
authored andcommitted
GH-2357: Remove Remaining Uses of ListenableFuture
Resolves #2357 **Back-ported from main with deprecated accessors**
1 parent c24db16 commit 90c6e95

15 files changed

+75
-46
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434
import org.springframework.context.ApplicationContext;
3535
import org.springframework.context.ApplicationEventPublisher;
36-
import org.springframework.core.task.AsyncListenableTaskExecutor;
36+
import org.springframework.core.task.AsyncTaskExecutor;
3737
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3838
import org.springframework.kafka.core.ConsumerFactory;
3939
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -62,7 +62,7 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
6262

6363
private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
6464

65-
private final List<AsyncListenableTaskExecutor> executors = new ArrayList<>();
65+
private final List<AsyncTaskExecutor> executors = new ArrayList<>();
6666

6767
private int concurrency = 1;
6868

@@ -237,7 +237,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
237237
stopAbnormally(() -> {
238238
});
239239
});
240-
AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();
240+
AsyncTaskExecutor exec = container.getContainerProperties().getListenerTaskExecutor();
241241
if (exec == null) {
242242
if ((this.executors.size() > index)) {
243243
exec = this.executors.get(index);
@@ -246,7 +246,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
246246
exec = new SimpleAsyncTaskExecutor(beanName + "-C-");
247247
this.executors.add(exec);
248248
}
249-
container.getContainerProperties().setConsumerTaskExecutor(exec);
249+
container.getContainerProperties().setListenerTaskExecutor(exec);
250250
}
251251
}
252252

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.springframework.aop.framework.ProxyFactory;
3232
import org.springframework.aop.support.AopUtils;
3333
import org.springframework.core.task.AsyncListenableTaskExecutor;
34+
import org.springframework.core.task.AsyncTaskExecutor;
3435
import org.springframework.kafka.support.TopicPartitionOffset;
3536
import org.springframework.lang.Nullable;
3637
import org.springframework.scheduling.TaskScheduler;
@@ -427,11 +428,26 @@ public void setAckTime(long ackTime) {
427428
/**
428429
* Set the executor for threads that poll the consumer.
429430
* @param consumerTaskExecutor the executor
431+
* @deprecated in favor of {@link #setListenerTaskExecutor(AsyncTaskExecutor)}.
430432
*/
433+
@Deprecated
431434
public void setConsumerTaskExecutor(@Nullable AsyncListenableTaskExecutor consumerTaskExecutor) {
432435
this.consumerTaskExecutor = consumerTaskExecutor;
433436
}
434437

438+
/**
439+
* Set the executor for threads that poll the consumer.
440+
* @param listenerTaskExecutor the executor - must be
441+
* {@link AsyncListenableTaskExecutor} until 3.0.
442+
* @since 2.8.9
443+
*/
444+
public void setListenerTaskExecutor(@Nullable AsyncTaskExecutor listenerTaskExecutor) {
445+
if (listenerTaskExecutor != null) {
446+
Assert.isInstanceOf(AsyncListenableTaskExecutor.class, listenerTaskExecutor);
447+
}
448+
this.consumerTaskExecutor = (AsyncListenableTaskExecutor) listenerTaskExecutor;
449+
}
450+
435451
/**
436452
* Set the timeout for shutting down the container. This is the maximum amount of
437453
* time that the invocation to {@code #stop(Runnable)} will block for, before
@@ -510,12 +526,23 @@ public Object getMessageListener() {
510526
/**
511527
* Return the consumer task executor.
512528
* @return the executor.
529+
* @deprecated in favor of {@link #getListenerTaskExecutor()}.
513530
*/
531+
@Deprecated
514532
@Nullable
515533
public AsyncListenableTaskExecutor getConsumerTaskExecutor() {
516534
return this.consumerTaskExecutor;
517535
}
518536

537+
/**
538+
* Return the consumer task executor.
539+
* @return the executor.
540+
*/
541+
@Nullable
542+
public AsyncTaskExecutor getListenerTaskExecutor() {
543+
return this.consumerTaskExecutor;
544+
}
545+
519546
public long getShutdownTimeout() {
520547
return this.shutdownTimeout;
521548
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,11 +351,12 @@ protected void doStart() {
351351
checkAckMode(containerProperties);
352352

353353
Object messageListener = containerProperties.getMessageListener();
354-
AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
354+
AsyncListenableTaskExecutor consumerExecutor = (AsyncListenableTaskExecutor) containerProperties
355+
.getListenerTaskExecutor();
355356
if (consumerExecutor == null) {
356357
consumerExecutor = new SimpleAsyncTaskExecutor(
357358
(getBeanName() == null ? "" : getBeanName()) + "-C-");
358-
containerProperties.setConsumerTaskExecutor(consumerExecutor);
359+
containerProperties.setListenerTaskExecutor(consumerExecutor);
359360
}
360361
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
361362
ListenerType listenerType = determineListenerType(listener);

spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyTypedMessageFuture.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public Message<P> get(long timeout, TimeUnit unit)
6262
}
6363

6464
@Override
65+
@SuppressWarnings(UNCHECKED)
6566
public synchronized Completable asCompletable() {
6667
if (this.completable == null) {
6768
this.completable = new Completable(this, this);
@@ -74,9 +75,9 @@ public synchronized Completable asCompletable() {
7475
* A {@link CompletableFuture} version.
7576
* @since 2.9
7677
*/
77-
public class Completable extends RequestReplyMessageFuture.Completable {
78+
public class Completable extends RequestReplyMessageFuture<K, V>.Completable {
7879

79-
Completable(RequestReplyMessageFuture requestReplyMessageFuture, Future<Message<?>> delegate) { // NOSONAR
80+
Completable(RequestReplyMessageFuture<K, V> requestReplyMessageFuture, Future<Message<?>> delegate) { // NOSONAR
8081
requestReplyMessageFuture.super(delegate);
8182
}
8283

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
329329
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo("2");
330330
}
331331

332-
@SuppressWarnings("unchecked")
332+
@SuppressWarnings({ "unchecked", "deprecation" })
333333
@Test
334334
public void testNestedTxProducerIsCached() throws Exception {
335335
Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafka);

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.List;
3838
import java.util.Map;
3939
import java.util.Queue;
40+
import java.util.concurrent.CompletableFuture;
4041
import java.util.concurrent.atomic.AtomicBoolean;
4142
import java.util.concurrent.atomic.AtomicInteger;
4243

@@ -61,7 +62,6 @@
6162
import org.springframework.kafka.transaction.KafkaTransactionManager;
6263
import org.springframework.transaction.CannotCreateTransactionException;
6364
import org.springframework.transaction.support.TransactionTemplate;
64-
import org.springframework.util.concurrent.SettableListenableFuture;
6565

6666
/**
6767
* @author Gary Russell
@@ -74,7 +74,7 @@ public class DefaultKafkaProducerFactoryTests {
7474
@Test
7575
void testProducerClosedAfterBadTransition() throws Exception {
7676
final Producer producer = mock(Producer.class);
77-
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
77+
given(producer.send(any(), any())).willReturn(new CompletableFuture());
7878
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
7979

8080
@Override

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Map;
3939
import java.util.Set;
4040
import java.util.UUID;
41+
import java.util.concurrent.CompletableFuture;
4142
import java.util.concurrent.CountDownLatch;
4243
import java.util.concurrent.TimeUnit;
4344
import java.util.concurrent.atomic.AtomicInteger;
@@ -88,7 +89,6 @@
8889
import org.springframework.messaging.support.MessageBuilder;
8990
import org.springframework.util.concurrent.ListenableFuture;
9091
import org.springframework.util.concurrent.ListenableFutureCallback;
91-
import org.springframework.util.concurrent.SettableListenableFuture;
9292

9393
/**
9494
* @author Gary Russell
@@ -420,7 +420,7 @@ void testWithCallbackFailure() throws Exception {
420420
willAnswer(inv -> {
421421
Callback callback = inv.getArgument(1);
422422
callback.onCompletion(null, new RuntimeException("test"));
423-
return new SettableListenableFuture<RecordMetadata>();
423+
return new CompletableFuture<RecordMetadata>();
424424
}).given(producer).send(any(), any());
425425
ProducerFactory<Integer, String> pf = mock(ProducerFactory.class);
426426
given(pf.createProducer()).willReturn(producer);
@@ -454,7 +454,7 @@ void testWithCallbackFailureFunctional() throws Exception {
454454
willAnswer(inv -> {
455455
Callback callback = inv.getArgument(1);
456456
callback.onCompletion(null, new RuntimeException("test"));
457-
return new SettableListenableFuture<RecordMetadata>();
457+
return new CompletableFuture<RecordMetadata>();
458458
}).given(producer).send(any(), any());
459459
ProducerFactory<Integer, String> pf = mock(ProducerFactory.class);
460460
given(pf.createProducer()).willReturn(producer);

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Iterator;
4040
import java.util.Map;
4141
import java.util.concurrent.BlockingQueue;
42+
import java.util.concurrent.CompletableFuture;
4243
import java.util.concurrent.LinkedBlockingDeque;
4344
import java.util.concurrent.atomic.AtomicBoolean;
4445

@@ -80,7 +81,6 @@
8081
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
8182
import org.springframework.transaction.support.DefaultTransactionStatus;
8283
import org.springframework.transaction.support.TransactionTemplate;
83-
import org.springframework.util.concurrent.SettableListenableFuture;
8484

8585
/**
8686
* @author Gary Russell
@@ -317,10 +317,10 @@ public void testTransactionSynchronizationExceptionOnCommit() {
317317
public void testDeadLetterPublisherWhileTransactionActive() {
318318
@SuppressWarnings("unchecked")
319319
Producer<Object, Object> producer1 = mock(Producer.class);
320-
given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>());
320+
given(producer1.send(any(), any())).willReturn(new CompletableFuture<>());
321321
@SuppressWarnings("unchecked")
322322
Producer<Object, Object> producer2 = mock(Producer.class);
323-
given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>());
323+
given(producer2.send(any(), any())).willReturn(new CompletableFuture<>());
324324
producer1.initTransactions();
325325

326326
@SuppressWarnings("unchecked")
@@ -504,10 +504,10 @@ public void testAbort() {
504504
public void testExecuteInTransactionNewInnerTx() {
505505
@SuppressWarnings("unchecked")
506506
Producer<Object, Object> producer1 = mock(Producer.class);
507-
given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>());
507+
given(producer1.send(any(), any())).willReturn(new CompletableFuture<>());
508508
@SuppressWarnings("unchecked")
509509
Producer<Object, Object> producer2 = mock(Producer.class);
510-
given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>());
510+
given(producer2.send(any(), any())).willReturn(new CompletableFuture<>());
511511
producer1.initTransactions();
512512
AtomicBoolean first = new AtomicBoolean(true);
513513

@@ -608,15 +608,15 @@ public static class DeclarativeConfig {
608608
@Bean
609609
public Producer producer1() {
610610
Producer mock = mock(Producer.class);
611-
given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>());
611+
given(mock.send(any(), any())).willReturn(new CompletableFuture<>());
612612
return mock;
613613
}
614614

615615
@SuppressWarnings({ "rawtypes", "unchecked" })
616616
@Bean
617617
public Producer producer2() {
618618
Producer mock = mock(Producer.class);
619-
given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>());
619+
given(mock.send(any(), any())).willReturn(new CompletableFuture<>());
620620
return mock;
621621
}
622622

spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,13 +25,13 @@
2525

2626
import java.util.LinkedHashMap;
2727
import java.util.Map;
28+
import java.util.concurrent.CompletableFuture;
2829
import java.util.regex.Pattern;
2930

3031
import org.apache.kafka.clients.producer.Producer;
3132
import org.junit.jupiter.api.Test;
3233

3334
import org.springframework.kafka.test.utils.KafkaTestUtils;
34-
import org.springframework.util.concurrent.SettableListenableFuture;
3535

3636
/**
3737
* @author Gary Russell
@@ -44,9 +44,9 @@ public class RoutingKafkaTemplateTests {
4444
@Test
4545
public void routing() {
4646
Producer<Object, Object> p1 = mock(Producer.class);
47-
given(p1.send(any(), any())).willReturn(new SettableListenableFuture<>());
47+
given(p1.send(any(), any())).willReturn(new CompletableFuture<>());
4848
Producer<Object, Object> p2 = mock(Producer.class);
49-
given(p2.send(any(), any())).willReturn(new SettableListenableFuture<>());
49+
given(p2.send(any(), any())).willReturn(new CompletableFuture<>());
5050
ProducerFactory<Object, Object> pf1 = mock(ProducerFactory.class);
5151
ProducerFactory<Object, Object> pf2 = mock(ProducerFactory.class);
5252
given(pf1.createProducer()).willReturn(p1);

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ void testThreadStarvation() throws InterruptedException {
101101
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
102102
exec.setCorePoolSize(1);
103103
exec.afterPropertiesSet();
104-
containerProperties.setConsumerTaskExecutor(exec);
104+
containerProperties.setListenerTaskExecutor(exec);
105105
containerProperties.setConsumerStartTimeout(Duration.ofMillis(50));
106106
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory,
107107
containerProperties);

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ public void testDelegateType() throws Exception {
218218
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
219219
scheduler.setPoolSize(10);
220220
scheduler.initialize();
221-
containerProps.setConsumerTaskExecutor(scheduler);
221+
containerProps.setListenerTaskExecutor(scheduler);
222222
KafkaMessageListenerContainer<Integer, String> container =
223223
new KafkaMessageListenerContainer<>(cf, containerProps);
224224
container.setBeanName("delegate");

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.List;
4646
import java.util.Map;
4747
import java.util.Set;
48+
import java.util.concurrent.CompletableFuture;
4849
import java.util.concurrent.CountDownLatch;
4950
import java.util.concurrent.TimeUnit;
5051
import java.util.concurrent.atomic.AtomicBoolean;
@@ -103,7 +104,6 @@
103104
import org.springframework.transaction.support.DefaultTransactionDefinition;
104105
import org.springframework.transaction.support.DefaultTransactionStatus;
105106
import org.springframework.util.backoff.FixedBackOff;
106-
import org.springframework.util.concurrent.SettableListenableFuture;
107107

108108
/**
109109
* @author Gary Russell
@@ -216,7 +216,7 @@ private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode a
216216
return null;
217217
}).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
218218
}
219-
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
219+
given(producer.send(any(), any())).willReturn(new CompletableFuture<>());
220220
final CountDownLatch closeLatch = new CountDownLatch(2);
221221
willAnswer(i -> {
222222
closeLatch.countDown();
@@ -479,7 +479,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception {
479479
ConsumerFactory cf = mock(ConsumerFactory.class);
480480
willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides());
481481
Producer producer = mock(Producer.class);
482-
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
482+
given(producer.send(any(), any())).willReturn(new CompletableFuture<>());
483483

484484
final CountDownLatch closeLatch = new CountDownLatch(1);
485485

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Iterator;
3535
import java.util.List;
3636
import java.util.Map;
37+
import java.util.concurrent.CompletableFuture;
3738
import java.util.concurrent.CountDownLatch;
3839
import java.util.concurrent.ExecutionException;
3940
import java.util.concurrent.TimeUnit;
@@ -97,7 +98,6 @@
9798
import org.springframework.messaging.support.MessageBuilder;
9899
import org.springframework.test.annotation.DirtiesContext;
99100
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
100-
import org.springframework.util.concurrent.SettableListenableFuture;
101101

102102
/**
103103
* @author Gary Russell
@@ -555,7 +555,7 @@ public void testAggregateOrphansNotStored() throws Exception {
555555
willAnswer(invocation -> {
556556
ProducerRecord rec = invocation.getArgument(0);
557557
correlation.set(rec.headers().lastHeader(KafkaHeaders.CORRELATION_ID).value());
558-
return new SettableListenableFuture<>();
558+
return new CompletableFuture<>();
559559
}).given(producer).send(any(), any());
560560
AggregatingReplyingKafkaTemplate template = new AggregatingReplyingKafkaTemplate(pf, container,
561561
(list, timeout) -> true);
@@ -694,8 +694,8 @@ void nullDuration() throws Exception {
694694
Producer producer = mock(Producer.class);
695695
willAnswer(invocation -> {
696696
Callback callback = invocation.getArgument(1);
697-
SettableListenableFuture<Object> future = new SettableListenableFuture<>();
698-
future.set("done");
697+
CompletableFuture<Object> future = new CompletableFuture<>();
698+
future.complete("done");
699699
callback.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0L, 0, 0L, 0, 0), null);
700700
return future;
701701
}).given(producer).send(any(), any());
@@ -718,7 +718,7 @@ void requestTimeoutWithMessage() throws Exception {
718718
ProducerFactory pf = mock(ProducerFactory.class);
719719
Producer producer = mock(Producer.class);
720720
willAnswer(invocation -> {
721-
return new SettableListenableFuture<>();
721+
return new CompletableFuture<>();
722722
}).given(producer).send(any(), any());
723723
given(pf.createProducer()).willReturn(producer);
724724
GenericMessageListenerContainer container = mock(GenericMessageListenerContainer.class);

0 commit comments

Comments
 (0)