Skip to content

GH-2357: Remove Remaining Uses of ListenableFuture #2368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.support.TopicPartitionOffset;
Expand Down Expand Up @@ -62,7 +62,7 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis

private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();

private final List<AsyncListenableTaskExecutor> executors = new ArrayList<>();
private final List<AsyncTaskExecutor> executors = new ArrayList<>();

private int concurrency = 1;

Expand Down Expand Up @@ -237,7 +237,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
stopAbnormally(() -> {
});
});
AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();
AsyncTaskExecutor exec = container.getContainerProperties().getListenerTaskExecutor();
if (exec == null) {
if ((this.executors.size() > index)) {
exec = this.executors.get(index);
Expand All @@ -246,7 +246,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
exec = new SimpleAsyncTaskExecutor(beanName + "-C-");
this.executors.add(exec);
}
container.getContainerProperties().setConsumerTaskExecutor(exec);
container.getContainerProperties().setListenerTaskExecutor(exec);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
Expand Down Expand Up @@ -427,11 +428,26 @@ public void setAckTime(long ackTime) {
/**
* Set the executor for threads that poll the consumer.
* @param consumerTaskExecutor the executor
* @deprecated in favor of {@link #setListenerTaskExecutor(AsyncTaskExecutor)}.
*/
@Deprecated
public void setConsumerTaskExecutor(@Nullable AsyncListenableTaskExecutor consumerTaskExecutor) {
this.consumerTaskExecutor = consumerTaskExecutor;
}

/**
* Set the executor for threads that poll the consumer.
* @param listenerTaskExecutor the executor - must be
* {@link AsyncListenableTaskExecutor} until 3.0.
* @since 2.8.9
*/
public void setListenerTaskExecutor(@Nullable AsyncTaskExecutor listenerTaskExecutor) {
if (listenerTaskExecutor != null) {
Assert.isInstanceOf(AsyncListenableTaskExecutor.class, listenerTaskExecutor);
}
this.consumerTaskExecutor = (AsyncListenableTaskExecutor) listenerTaskExecutor;
}

/**
* Set the timeout for shutting down the container. This is the maximum amount of
* time that the invocation to {@code #stop(Runnable)} will block for, before
Expand Down Expand Up @@ -510,12 +526,23 @@ public Object getMessageListener() {
/**
* Return the consumer task executor.
* @return the executor.
* @deprecated in favor of {@link #getListenerTaskExecutor()}.
*/
@Deprecated
@Nullable
public AsyncListenableTaskExecutor getConsumerTaskExecutor() {
return this.consumerTaskExecutor;
}

/**
* Return the consumer task executor.
* @return the executor.
*/
@Nullable
public AsyncTaskExecutor getListenerTaskExecutor() {
return this.consumerTaskExecutor;
}

public long getShutdownTimeout() {
return this.shutdownTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,12 @@ protected void doStart() {
checkAckMode(containerProperties);

Object messageListener = containerProperties.getMessageListener();
AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
AsyncListenableTaskExecutor consumerExecutor = (AsyncListenableTaskExecutor) containerProperties
.getListenerTaskExecutor();
if (consumerExecutor == null) {
consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
containerProperties.setListenerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = determineListenerType(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public Message<P> get(long timeout, TimeUnit unit)
}

@Override
@SuppressWarnings(UNCHECKED)
public synchronized Completable asCompletable() {
if (this.completable == null) {
this.completable = new Completable(this, this);
Expand All @@ -74,9 +75,9 @@ public synchronized Completable asCompletable() {
* A {@link CompletableFuture} version.
* @since 2.9
*/
public class Completable extends RequestReplyMessageFuture.Completable {
public class Completable extends RequestReplyMessageFuture<K, V>.Completable {

Completable(RequestReplyMessageFuture requestReplyMessageFuture, Future<Message<?>> delegate) { // NOSONAR
Completable(RequestReplyMessageFuture<K, V> requestReplyMessageFuture, Future<Message<?>> delegate) { // NOSONAR
requestReplyMessageFuture.super(delegate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo("2");
}

@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "deprecation" })
@Test
public void testNestedTxProducerIsCached() throws Exception {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafka);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -61,7 +62,6 @@
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand All @@ -74,7 +74,7 @@ public class DefaultKafkaProducerFactoryTests {
@Test
void testProducerClosedAfterBadTransition() throws Exception {
final Producer producer = mock(Producer.class);
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer.send(any(), any())).willReturn(new CompletableFuture());
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -88,7 +89,6 @@
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand Down Expand Up @@ -420,7 +420,7 @@ void testWithCallbackFailure() throws Exception {
willAnswer(inv -> {
Callback callback = inv.getArgument(1);
callback.onCompletion(null, new RuntimeException("test"));
return new SettableListenableFuture<RecordMetadata>();
return new CompletableFuture<RecordMetadata>();
}).given(producer).send(any(), any());
ProducerFactory<Integer, String> pf = mock(ProducerFactory.class);
given(pf.createProducer()).willReturn(producer);
Expand Down Expand Up @@ -454,7 +454,7 @@ void testWithCallbackFailureFunctional() throws Exception {
willAnswer(inv -> {
Callback callback = inv.getArgument(1);
callback.onCompletion(null, new RuntimeException("test"));
return new SettableListenableFuture<RecordMetadata>();
return new CompletableFuture<RecordMetadata>();
}).given(producer).send(any(), any());
ProducerFactory<Integer, String> pf = mock(ProducerFactory.class);
given(pf.createProducer()).willReturn(producer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -80,7 +81,6 @@
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand Down Expand Up @@ -317,10 +317,10 @@ public void testTransactionSynchronizationExceptionOnCommit() {
public void testDeadLetterPublisherWhileTransactionActive() {
@SuppressWarnings("unchecked")
Producer<Object, Object> producer1 = mock(Producer.class);
given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer1.send(any(), any())).willReturn(new CompletableFuture<>());
@SuppressWarnings("unchecked")
Producer<Object, Object> producer2 = mock(Producer.class);
given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer2.send(any(), any())).willReturn(new CompletableFuture<>());
producer1.initTransactions();

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -504,10 +504,10 @@ public void testAbort() {
public void testExecuteInTransactionNewInnerTx() {
@SuppressWarnings("unchecked")
Producer<Object, Object> producer1 = mock(Producer.class);
given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer1.send(any(), any())).willReturn(new CompletableFuture<>());
@SuppressWarnings("unchecked")
Producer<Object, Object> producer2 = mock(Producer.class);
given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer2.send(any(), any())).willReturn(new CompletableFuture<>());
producer1.initTransactions();
AtomicBoolean first = new AtomicBoolean(true);

Expand Down Expand Up @@ -608,15 +608,15 @@ public static class DeclarativeConfig {
@Bean
public Producer producer1() {
Producer mock = mock(Producer.class);
given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(mock.send(any(), any())).willReturn(new CompletableFuture<>());
return mock;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Producer producer2() {
Producer mock = mock(Producer.class);
given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(mock.send(any(), any())).willReturn(new CompletableFuture<>());
return mock;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,13 +25,13 @@

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;

import org.apache.kafka.clients.producer.Producer;
import org.junit.jupiter.api.Test;

import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand All @@ -44,9 +44,9 @@ public class RoutingKafkaTemplateTests {
@Test
public void routing() {
Producer<Object, Object> p1 = mock(Producer.class);
given(p1.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(p1.send(any(), any())).willReturn(new CompletableFuture<>());
Producer<Object, Object> p2 = mock(Producer.class);
given(p2.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(p2.send(any(), any())).willReturn(new CompletableFuture<>());
ProducerFactory<Object, Object> pf1 = mock(ProducerFactory.class);
ProducerFactory<Object, Object> pf2 = mock(ProducerFactory.class);
given(pf1.createProducer()).willReturn(p1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void testThreadStarvation() throws InterruptedException {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(1);
exec.afterPropertiesSet();
containerProperties.setConsumerTaskExecutor(exec);
containerProperties.setListenerTaskExecutor(exec);
containerProperties.setConsumerStartTimeout(Duration.ofMillis(50));
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory,
containerProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void testDelegateType() throws Exception {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.initialize();
containerProps.setConsumerTaskExecutor(scheduler);
containerProps.setListenerTaskExecutor(scheduler);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("delegate");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -103,7 +104,6 @@
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.util.backoff.FixedBackOff;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand Down Expand Up @@ -216,7 +216,7 @@ private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode a
return null;
}).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
}
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer.send(any(), any())).willReturn(new CompletableFuture<>());
final CountDownLatch closeLatch = new CountDownLatch(2);
willAnswer(i -> {
closeLatch.countDown();
Expand Down Expand Up @@ -479,7 +479,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception {
ConsumerFactory cf = mock(ConsumerFactory.class);
willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides());
Producer producer = mock(Producer.class);
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer.send(any(), any())).willReturn(new CompletableFuture<>());

final CountDownLatch closeLatch = new CountDownLatch(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -97,7 +98,6 @@
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand Down Expand Up @@ -555,7 +555,7 @@ public void testAggregateOrphansNotStored() throws Exception {
willAnswer(invocation -> {
ProducerRecord rec = invocation.getArgument(0);
correlation.set(rec.headers().lastHeader(KafkaHeaders.CORRELATION_ID).value());
return new SettableListenableFuture<>();
return new CompletableFuture<>();
}).given(producer).send(any(), any());
AggregatingReplyingKafkaTemplate template = new AggregatingReplyingKafkaTemplate(pf, container,
(list, timeout) -> true);
Expand Down Expand Up @@ -694,8 +694,8 @@ void nullDuration() throws Exception {
Producer producer = mock(Producer.class);
willAnswer(invocation -> {
Callback callback = invocation.getArgument(1);
SettableListenableFuture<Object> future = new SettableListenableFuture<>();
future.set("done");
CompletableFuture<Object> future = new CompletableFuture<>();
future.complete("done");
callback.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0L, 0, 0L, 0, 0), null);
return future;
}).given(producer).send(any(), any());
Expand All @@ -718,7 +718,7 @@ void requestTimeoutWithMessage() throws Exception {
ProducerFactory pf = mock(ProducerFactory.class);
Producer producer = mock(Producer.class);
willAnswer(invocation -> {
return new SettableListenableFuture<>();
return new CompletableFuture<>();
}).given(producer).send(any(), any());
given(pf.createProducer()).willReturn(producer);
GenericMessageListenerContainer container = mock(GenericMessageListenerContainer.class);
Expand Down
Loading