Skip to content

Commit 2842196

Browse files
committed
Upgrade kafka-clients to 3.2.0; other upgrades
Fix deprecation warnings.
1 parent 6473c33 commit 2842196

File tree

9 files changed

+22
-19
lines changed

9 files changed

+22
-19
lines changed

build.gradle

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,16 @@ ext {
5959
jaywayJsonPathVersion = '2.6.0'
6060
junit4Version = '4.13.2'
6161
junitJupiterVersion = '5.8.2'
62-
kafkaVersion = '3.1.0'
62+
kafkaVersion = '3.2.0'
6363
log4jVersion = '2.17.2'
64-
micrometerVersion = '1.9.0-RC1'
64+
micrometerVersion = '1.9.0'
6565
mockitoVersion = '4.5.0'
6666
reactorVersion = '2020.0.18'
6767
scalaVersion = '2.13'
6868
springBootVersion = '2.6.7' // docs module
69-
springDataVersion = '2021.2.0-RC1'
69+
springDataVersion = '2021.2.0'
7070
springRetryVersion = '1.3.3'
71-
springVersion = '5.3.19'
71+
springVersion = '5.3.20'
7272
zookeeperVersion = '3.6.3'
7373

7474
idPrefix = 'kafka'
@@ -155,7 +155,7 @@ subprojects { subproject ->
155155
eclipse.project.natures += 'org.springframework.ide.eclipse.core.springnature'
156156

157157
jacoco {
158-
toolVersion = '0.8.7'
158+
toolVersion = '0.8.6'
159159
}
160160

161161
// dependencies that are common across all java projects

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@
8787
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
8888
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
8989
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
90-
import org.springframework.kafka.retrytopic.RetryTopicBootstrapper;
9190
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
9291
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
9392
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -527,8 +526,9 @@ private void bootstrapRetryTopicIfNecessary() {
527526
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) this.beanFactory;
528527
if (!registry.containsBeanDefinition("internalRetryTopicBootstrapper")) {
529528
registry.registerBeanDefinition("internalRetryTopicBootstrapper",
530-
new RootBeanDefinition(RetryTopicBootstrapper.class));
531-
this.beanFactory.getBean("internalRetryTopicBootstrapper", RetryTopicBootstrapper.class).bootstrapRetryTopic();
529+
new RootBeanDefinition(org.springframework.kafka.retrytopic.RetryTopicBootstrapper.class));
530+
this.beanFactory.getBean("internalRetryTopicBootstrapper",
531+
org.springframework.kafka.retrytopic.RetryTopicBootstrapper.class).bootstrapRetryTopic();
532532
}
533533
}
534534

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
3838
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
3939
import org.springframework.kafka.retrytopic.RetryTopicConstants;
40-
import org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames;
4140
import org.springframework.kafka.support.EndpointHandlerMethod;
4241
import org.springframework.retry.annotation.Backoff;
4342
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
@@ -215,7 +214,8 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
215214
}
216215
}
217216
try {
218-
return this.beanFactory.getBean(RetryTopicInternalBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
217+
return this.beanFactory.getBean(
218+
org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
219219
KafkaOperations.class);
220220
}
221221
catch (NoSuchBeanDefinitionException ex) {

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -209,7 +209,7 @@ public Map<String, TopicDescription> describeTopics(String... topicNames) {
209209
Map<String, TopicDescription> results = new HashMap<>();
210210
DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(topicNames));
211211
try {
212-
results.putAll(topics.all().get(this.operationTimeout, TimeUnit.SECONDS));
212+
results.putAll(topics.allTopicNames().get(this.operationTimeout, TimeUnit.SECONDS));
213213
return results;
214214
}
215215
catch (InterruptedException ie) {
@@ -251,7 +251,7 @@ private Map<String, NewPartitions> checkPartitions(Map<String, NewTopic> topicNa
251251
DescribeTopicsResult topicInfo, List<NewTopic> topicsToAdd) {
252252

253253
Map<String, NewPartitions> topicsToModify = new HashMap<>();
254-
topicInfo.values().forEach((n, f) -> {
254+
topicInfo.topicNameValues().forEach((n, f) -> {
255255
NewTopic topic = topicNameToTopic.get(n);
256256
try {
257257
TopicDescription topicDescription = f.get(this.operationTimeout, TimeUnit.SECONDS);

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -480,7 +480,7 @@ protected void checkTopics() {
480480
.toArray(String[]::new);
481481
}
482482
DescribeTopicsResult result = client.describeTopics(Arrays.asList(topics));
483-
missing = result.values()
483+
missing = result.topicNameValues()
484484
.entrySet()
485485
.stream()
486486
.filter(entry -> {

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,6 +62,7 @@ class ErrorHandlerAdapter implements CommonErrorHandler {
6262
this.batchErrorHandler = batchErrorHandler;
6363
}
6464

65+
@SuppressWarnings("deprecation")
6566
@Override
6667
public boolean remainingRecords() {
6768
return this.errorHandler instanceof RemainingRecordsErrorHandler;
@@ -121,6 +122,7 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
121122
}
122123
}
123124

125+
@SuppressWarnings("deprecation")
124126
@Override
125127
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
126128
MessageListenerContainer container) {

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -131,7 +131,7 @@ public void testDefaultPartsAndReplicas() throws Exception {
131131
await().until(() -> {
132132
DescribeTopicsResult topics = adminClient.describeTopics(Arrays.asList("optBoth", "optPart", "optRepl"));
133133
try {
134-
results.putAll(topics.all().get(10, TimeUnit.SECONDS));
134+
results.putAll(topics.allTopicNames().get(10, TimeUnit.SECONDS));
135135
return true;
136136
}
137137
catch (InterruptedException ie) {

spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiv
269269
.onErrorResume(error -> reactiveKafkaProducerTemplate.transactionManager()
270270
.abort()
271271
.then(Mono.error(error))))
272-
.expectErrorMatches(throwable -> throwable instanceof KafkaException &&
272+
.expectErrorMatches(throwable -> throwable instanceof IllegalStateException &&
273273
throwable.getMessage().equals("TransactionalId reactive.transaction: Invalid transition " +
274274
"attempted from state READY to state ABORTING_TRANSACTION"))
275275
.verify(DEFAULT_VERIFY_TIMEOUT);

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
246246
factory.getContainerProperties().setMissingTopicsFatal(false);
247247
factory.setRecordInterceptor(new RecordInterceptor() {
248248

249+
@SuppressWarnings("deprecation")
249250
@Override
250251
@Nullable
251252
public ConsumerRecord intercept(ConsumerRecord record) {

0 commit comments

Comments
 (0)