diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index f451d94b2c..4c1c3ff32c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -33,7 +33,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationEventPublisher; -import org.springframework.core.task.AsyncListenableTaskExecutor; +import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.support.TopicPartitionOffset; @@ -62,7 +62,7 @@ public class ConcurrentMessageListenerContainer extends AbstractMessageLis private final List> containers = new ArrayList<>(); - private final List executors = new ArrayList<>(); + private final List executors = new ArrayList<>(); private int concurrency = 1; @@ -237,7 +237,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer { }); }); - AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor(); + AsyncTaskExecutor exec = container.getContainerProperties().getListenerTaskExecutor(); if (exec == null) { if ((this.executors.size() > index)) { exec = this.executors.get(index); @@ -246,7 +246,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer listener = (GenericMessageListener) messageListener; ListenerType listenerType = determineListenerType(listener); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyTypedMessageFuture.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyTypedMessageFuture.java index 104c1ab070..39668627e2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyTypedMessageFuture.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyTypedMessageFuture.java @@ -62,6 +62,7 @@ public Message

get(long timeout, TimeUnit unit) } @Override + @SuppressWarnings(UNCHECKED) public synchronized Completable asCompletable() { if (this.completable == null) { this.completable = new Completable(this, this); @@ -74,9 +75,9 @@ public synchronized Completable asCompletable() { * A {@link CompletableFuture} version. * @since 2.9 */ - public class Completable extends RequestReplyMessageFuture.Completable { + public class Completable extends RequestReplyMessageFuture.Completable { - Completable(RequestReplyMessageFuture requestReplyMessageFuture, Future> delegate) { // NOSONAR + Completable(RequestReplyMessageFuture requestReplyMessageFuture, Future> delegate) { // NOSONAR requestReplyMessageFuture.super(delegate); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index 466e6a9b30..cf7fd05816 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -329,7 +329,7 @@ protected KafkaConsumer createKafkaConsumer(Map assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo("2"); } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "deprecation" }) @Test public void testNestedTxProducerIsCached() throws Exception { Map producerProps = KafkaTestUtils.producerProps(this.embeddedKafka); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index 3e8d167467..14fe34a09d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -61,7 +62,6 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.transaction.CannotCreateTransactionException; import org.springframework.transaction.support.TransactionTemplate; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -74,7 +74,7 @@ public class DefaultKafkaProducerFactoryTests { @Test void testProducerClosedAfterBadTransition() throws Exception { final Producer producer = mock(Producer.class); - given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer.send(any(), any())).willReturn(new CompletableFuture()); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { @Override diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index a1330aa0dd..1f15c245c1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -88,7 +89,6 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -420,7 +420,7 @@ void testWithCallbackFailure() throws Exception { willAnswer(inv -> { Callback callback = inv.getArgument(1); callback.onCompletion(null, new RuntimeException("test")); - return new SettableListenableFuture(); + return new CompletableFuture(); }).given(producer).send(any(), any()); ProducerFactory pf = mock(ProducerFactory.class); given(pf.createProducer()).willReturn(producer); @@ -454,7 +454,7 @@ void testWithCallbackFailureFunctional() throws Exception { willAnswer(inv -> { Callback callback = inv.getArgument(1); callback.onCompletion(null, new RuntimeException("test")); - return new SettableListenableFuture(); + return new CompletableFuture(); }).given(producer).send(any(), any()); ProducerFactory pf = mock(ProducerFactory.class); given(pf.createProducer()).willReturn(producer); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index 9a9434e825..e8b8fa8046 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -39,6 +39,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; @@ -80,7 +81,6 @@ import org.springframework.transaction.support.AbstractPlatformTransactionManager; import org.springframework.transaction.support.DefaultTransactionStatus; import org.springframework.transaction.support.TransactionTemplate; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -317,10 +317,10 @@ public void testTransactionSynchronizationExceptionOnCommit() { public void testDeadLetterPublisherWhileTransactionActive() { @SuppressWarnings("unchecked") Producer producer1 = mock(Producer.class); - given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer1.send(any(), any())).willReturn(new CompletableFuture<>()); @SuppressWarnings("unchecked") Producer producer2 = mock(Producer.class); - given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer2.send(any(), any())).willReturn(new CompletableFuture<>()); producer1.initTransactions(); @SuppressWarnings("unchecked") @@ -504,10 +504,10 @@ public void testAbort() { public void testExecuteInTransactionNewInnerTx() { @SuppressWarnings("unchecked") Producer producer1 = mock(Producer.class); - given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer1.send(any(), any())).willReturn(new CompletableFuture<>()); @SuppressWarnings("unchecked") Producer producer2 = mock(Producer.class); - given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer2.send(any(), any())).willReturn(new CompletableFuture<>()); producer1.initTransactions(); AtomicBoolean first = new AtomicBoolean(true); @@ -608,7 +608,7 @@ public static class DeclarativeConfig { @Bean public Producer producer1() { Producer mock = mock(Producer.class); - given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(mock.send(any(), any())).willReturn(new CompletableFuture<>()); return mock; } @@ -616,7 +616,7 @@ public Producer producer1() { @Bean public Producer producer2() { Producer mock = mock(Producer.class); - given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(mock.send(any(), any())).willReturn(new CompletableFuture<>()); return mock; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java index dfd89a0c0a..361d68f47f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors. + * Copyright 2020-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,13 +25,13 @@ import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; import org.apache.kafka.clients.producer.Producer; import org.junit.jupiter.api.Test; import org.springframework.kafka.test.utils.KafkaTestUtils; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -44,9 +44,9 @@ public class RoutingKafkaTemplateTests { @Test public void routing() { Producer p1 = mock(Producer.class); - given(p1.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(p1.send(any(), any())).willReturn(new CompletableFuture<>()); Producer p2 = mock(Producer.class); - given(p2.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(p2.send(any(), any())).willReturn(new CompletableFuture<>()); ProducerFactory pf1 = mock(ProducerFactory.class); ProducerFactory pf2 = mock(ProducerFactory.class); given(pf1.createProducer()).willReturn(p1); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java index 94c322a909..a7ade0c02c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java @@ -101,7 +101,7 @@ void testThreadStarvation() throws InterruptedException { ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor(); exec.setCorePoolSize(1); exec.afterPropertiesSet(); - containerProperties.setConsumerTaskExecutor(exec); + containerProperties.setListenerTaskExecutor(exec); containerProperties.setConsumerStartTimeout(Duration.ofMillis(50)); ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index ce42e4e510..bff8a15d98 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -218,7 +218,7 @@ public void testDelegateType() throws Exception { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(10); scheduler.initialize(); - containerProps.setConsumerTaskExecutor(scheduler); + containerProps.setListenerTaskExecutor(scheduler); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("delegate"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 6149905975..b132bf585d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -103,7 +104,6 @@ import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.transaction.support.DefaultTransactionStatus; import org.springframework.util.backoff.FixedBackOff; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -216,7 +216,7 @@ private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode a return null; }).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class)); } - given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer.send(any(), any())).willReturn(new CompletableFuture<>()); final CountDownLatch closeLatch = new CountDownLatch(2); willAnswer(i -> { closeLatch.countDown(); @@ -479,7 +479,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides()); Producer producer = mock(Producer.class); - given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer.send(any(), any())).willReturn(new CompletableFuture<>()); final CountDownLatch closeLatch = new CountDownLatch(1); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index 7bcf383aa8..69e3f921d3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -34,6 +34,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -97,7 +98,6 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -555,7 +555,7 @@ public void testAggregateOrphansNotStored() throws Exception { willAnswer(invocation -> { ProducerRecord rec = invocation.getArgument(0); correlation.set(rec.headers().lastHeader(KafkaHeaders.CORRELATION_ID).value()); - return new SettableListenableFuture<>(); + return new CompletableFuture<>(); }).given(producer).send(any(), any()); AggregatingReplyingKafkaTemplate template = new AggregatingReplyingKafkaTemplate(pf, container, (list, timeout) -> true); @@ -694,8 +694,8 @@ void nullDuration() throws Exception { Producer producer = mock(Producer.class); willAnswer(invocation -> { Callback callback = invocation.getArgument(1); - SettableListenableFuture future = new SettableListenableFuture<>(); - future.set("done"); + CompletableFuture future = new CompletableFuture<>(); + future.complete("done"); callback.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0L, 0, 0L, 0, 0), null); return future; }).given(producer).send(any(), any()); @@ -718,7 +718,7 @@ void requestTimeoutWithMessage() throws Exception { ProducerFactory pf = mock(ProducerFactory.class); Producer producer = mock(Producer.class); willAnswer(invocation -> { - return new SettableListenableFuture<>(); + return new CompletableFuture<>(); }).given(producer).send(any(), any()); given(pf.createProducer()).willReturn(producer); GenericMessageListenerContainer container = mock(GenericMessageListenerContainer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java index 338b87d25a..61f5b1ddd6 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -73,7 +74,6 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Artem Bilan @@ -106,7 +106,7 @@ public class KafkaStreamsTests { private KafkaTemplate kafkaTemplate; @Autowired - private SettableListenableFuture> resultFuture; + private CompletableFuture> resultFuture; @Autowired private StreamsBuilderFactoryBean streamsBuilderFactoryBean; @@ -267,13 +267,13 @@ public ConsumerFactory consumerFactory() { } @Bean - public SettableListenableFuture> resultFuture() { - return new SettableListenableFuture<>(); + public CompletableFuture> resultFuture() { + return new CompletableFuture<>(); } @KafkaListener(topics = "${streaming.topic.two}") public void listener(ConsumerRecord payload) { - resultFuture().set(payload); + resultFuture().complete(payload); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java index 566d33a857..bb4098c95f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -65,7 +66,6 @@ import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -81,7 +81,7 @@ public class RecoveringDeserializationExceptionHandlerTests { private KafkaTemplate kafkaTemplate; @Autowired - private SettableListenableFuture> resultFuture; + private CompletableFuture> resultFuture; @Test void viaStringProperty() { @@ -237,13 +237,13 @@ public ConsumerFactory consumerFactory() { } @Bean - public SettableListenableFuture> resultFuture() { - return new SettableListenableFuture<>(); + public CompletableFuture> resultFuture() { + return new CompletableFuture<>(); } @KafkaListener(topics = "recovererDLQ") public void listener(ConsumerRecord payload) { - resultFuture().set(payload); + resultFuture().complete(payload); } }