Skip to content

Commit f15bf08

Browse files
garyrussellartembilan
authored andcommitted
* GH-757: Upgrade to Apache Kafka 2.0.0
Resolves #757 Kafka 2.0.0 Compatibility Tested against local build of the kafka trunk branch. - `poll(long)` is deprecated. - `poll(Duration)` does not block until the partitions are assigned. - Several KStream changes. - Some tests are still failing sporadically Increase default poll timeout; ensure unique group.id for all tests. Test against a build from the 2.0.0-rc1 tag instead of trunk. Rebase; Polishing; test against official 2.0.0 build * Rebase, polishing.
1 parent 7d68d23 commit f15bf08

29 files changed

+229
-137
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ subprojects { subproject ->
7676
junit4Version = '4.12'
7777
junitJupiterVersion = '5.1.1'
7878
junitPlatformVersion = '1.1.1'
79-
kafkaVersion = '1.1.1'
79+
kafkaVersion = '2.0.0'
8080
log4jVersion = '2.11.0'
8181
mockitoVersion = '2.18.0'
8282
scalaVersion = '2.11'

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.time.Duration;
2122
import java.util.ArrayList;
2223
import java.util.Arrays;
2324
import java.util.Collection;
@@ -28,8 +29,7 @@
2829
import java.util.Map;
2930
import java.util.Properties;
3031
import java.util.Set;
31-
import java.util.concurrent.CountDownLatch;
32-
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicBoolean;
3333
import java.util.stream.Collectors;
3434

3535
import org.I0Itec.zkclient.ZkClient;
@@ -405,7 +405,7 @@ public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topics)
405405
assertThat(this.topics)
406406
.as("topic(s):'" + diff + "' are not in embedded topic list")
407407
.containsAll(new HashSet<>(Arrays.asList(topics)));
408-
final CountDownLatch consumerLatch = new CountDownLatch(1);
408+
final AtomicBoolean assigned = new AtomicBoolean();
409409
consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener() {
410410

411411
@Override
@@ -414,26 +414,31 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
414414

415415
@Override
416416
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
417-
consumerLatch.countDown();
417+
assigned.set(true);
418418
if (logger.isDebugEnabled()) {
419419
logger.debug("partitions assigned: " + partitions);
420420
}
421421
}
422422

423423
});
424-
ConsumerRecords<?, ?> records = consumer.poll(0); // force assignment
425-
if (records.count() > 0) {
424+
ConsumerRecords<?, ?> records = null;
425+
int n = 0;
426+
while (!assigned.get() && n++ < 600) {
427+
records = consumer.poll(Duration.ofMillis(100)); // force assignment
428+
}
429+
if (records != null && records.count() > 0) {
430+
final ConsumerRecords<?, ?> theRecords = records;
426431
if (logger.isDebugEnabled()) {
427432
logger.debug("Records received on initial poll for assignment; re-seeking to beginning; "
428433
+ records.partitions().stream()
429-
.flatMap(p -> records.records(p).stream())
434+
.flatMap(p -> theRecords.records(p).stream())
430435
// map to same format as send metadata toString()
431436
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
432437
.collect(Collectors.toList()));
433438
}
434439
consumer.seekToBeginning(records.partitions());
435440
}
436-
assertThat(consumerLatch.await(30, TimeUnit.SECONDS))
441+
assertThat(assigned.get())
437442
.as("Failed to be assigned partitions from the embedded topics")
438443
.isTrue();
439444
logger.debug("Subscription Initiated");

spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public KafkaEmbedded(int count, boolean controlledShutdown, int partitions, Stri
9797
* @return this for chaining configuration
9898
* @see KafkaConfig
9999
*/
100+
@Override
100101
public KafkaEmbedded brokerProperties(Map<String, String> brokerProperties) {
101102
super.brokerProperties(brokerProperties);
102103
return this;
@@ -109,6 +110,7 @@ public KafkaEmbedded brokerProperties(Map<String, String> brokerProperties) {
109110
* @return the {@link KafkaEmbedded}.
110111
* @since 2.1.4
111112
*/
113+
@Override
112114
public KafkaEmbedded brokerProperty(String property, Object value) {
113115
super.brokerProperty(property, value);
114116
return this;

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.time.Duration;
2122
import java.util.HashMap;
2223
import java.util.Map;
2324
import java.util.stream.Collectors;
@@ -191,7 +192,7 @@ public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) {
191192
*/
192193
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, long timeout) {
193194
logger.debug("Polling...");
194-
ConsumerRecords<K, V> received = consumer.poll(timeout);
195+
ConsumerRecords<K, V> received = consumer.poll(Duration.ofMillis(timeout));
195196
if (logger.isDebugEnabled()) {
196197
logger.debug("Received: " + received.count() + ", "
197198
+ received.partitions().stream()

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.annotation;
1818

19+
import java.util.Properties;
20+
1921
import org.apache.kafka.streams.StreamsConfig;
2022

2123
import org.springframework.beans.factory.ObjectProvider;
@@ -35,6 +37,7 @@
3537
* annotation. See {@link EnableKafkaStreams} Javadoc for complete usage.
3638
*
3739
* @author Artem Bilan
40+
* @author Gary Russell
3841
*
3942
* @since 1.1.4
4043
*/
@@ -54,16 +57,16 @@ public class KafkaStreamsDefaultConfiguration {
5457

5558
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
5659
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(
57-
@Qualifier(DEFAULT_STREAMS_CONFIG_BEAN_NAME) ObjectProvider<StreamsConfig> streamsConfigProvider) {
58-
StreamsConfig streamsConfig = streamsConfigProvider.getIfAvailable();
60+
@Qualifier(DEFAULT_STREAMS_CONFIG_BEAN_NAME) ObjectProvider<Properties> streamsConfigProvider) {
61+
Properties streamsConfig = streamsConfigProvider.getIfAvailable();
5962
if (streamsConfig != null) {
6063
return new StreamsBuilderFactoryBean(streamsConfig);
6164
}
6265
else {
6366
throw new UnsatisfiedDependencyException(KafkaStreamsDefaultConfiguration.class.getName(),
6467
DEFAULT_STREAMS_BUILDER_BEAN_NAME, "streamsConfig", "There is no '" +
65-
DEFAULT_STREAMS_CONFIG_BEAN_NAME + "' StreamsConfig bean in the application context.\n" +
66-
"Consider to declare one or don't use @EnableKafkaStreams.");
68+
DEFAULT_STREAMS_CONFIG_BEAN_NAME + "' Properties bean in the application context.\n" +
69+
"Consider declaring one or don't use @EnableKafkaStreams.");
6770
}
6871
}
6972

spring-kafka/src/main/java/org/springframework/kafka/core/StreamsBuilderFactoryBean.java

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.core;
1818

1919
import java.util.Map;
20+
import java.util.Properties;
2021
import java.util.concurrent.TimeUnit;
2122

2223
import org.apache.kafka.streams.KafkaClientSupplier;
@@ -56,6 +57,8 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
5657

5758
private StreamsConfig streamsConfig;
5859

60+
private Properties properties;
61+
5962
private final CleanupConfig cleanupConfig;
6063

6164
private KafkaStreamsCustomizer kafkaStreamsCustomizer;
@@ -77,9 +80,10 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
7780
private volatile boolean running;
7881

7982
/**
80-
* Default constructor that creates the factory without a {@link StreamsConfig}.
81-
* It is the factory user's responsibility to properly set {@link StreamsConfig}
82-
* using {@link StreamsBuilderFactoryBean#setStreamsConfig(StreamsConfig)}
83+
* Default constructor that creates the factory without configuration
84+
* {@link Properties}. It is the factory user's responsibility to properly set
85+
* {@link Properties} using
86+
* {@link StreamsBuilderFactoryBean#setStreamsConfiguration(Properties)}.
8387
* @since 2.1.3.
8488
*/
8589
public StreamsBuilderFactoryBean() {
@@ -100,29 +104,58 @@ public StreamsBuilderFactoryBean(StreamsConfig streamsConfig) {
100104
* @param streamsConfig the streams configuration.
101105
* @param cleanupConfig the cleanup configuration.
102106
* @since 2.1.2.
107+
* @deprecated in favor of {@link #StreamsBuilderFactoryBean(Properties, CleanupConfig)}
103108
*/
109+
@Deprecated
104110
public StreamsBuilderFactoryBean(StreamsConfig streamsConfig, CleanupConfig cleanupConfig) {
105111
Assert.notNull(streamsConfig, "'streamsConfig' must not be null");
106112
Assert.notNull(cleanupConfig, "'cleanupConfig' must not be null");
107113
this.streamsConfig = streamsConfig;
108114
this.cleanupConfig = cleanupConfig;
109115
}
110116

117+
/**
118+
* Construct an instance with the supplied streams configuration and
119+
* clean up configuration.
120+
* @param streamsConfig the streams configuration.
121+
* @param cleanupConfig the cleanup configuration.
122+
* @since 2.2
123+
*/
124+
public StreamsBuilderFactoryBean(Properties streamsConfig, CleanupConfig cleanupConfig) {
125+
Assert.notNull(streamsConfig, "'streamsConfig' must not be null");
126+
Assert.notNull(cleanupConfig, "'cleanupConfig' must not be null");
127+
this.properties = streamsConfig;
128+
this.cleanupConfig = cleanupConfig;
129+
}
130+
111131
/**
112132
* Construct an instance with the supplied streams configuration.
113133
* @param streamsConfig the streams configuration.
134+
* @deprecated in favor of {@link #StreamsBuilderFactoryBean(Properties)}.
114135
*/
136+
@Deprecated
115137
public StreamsBuilderFactoryBean(Map<String, Object> streamsConfig) {
116138
this(streamsConfig, new CleanupConfig());
117139
}
118140

141+
/**
142+
* Construct an instance with the supplied streams configuration.
143+
* @param streamsConfig the streams configuration.
144+
* @since 2.2
145+
*/
146+
public StreamsBuilderFactoryBean(Properties streamsConfig) {
147+
this(streamsConfig, new CleanupConfig());
148+
}
149+
119150
/**
120151
* Construct an instance with the supplied streams configuration and
121152
* clean up configuration.
122153
* @param streamsConfig the streams configuration.
123154
* @param cleanupConfig the cleanup configuration.
124155
* @since 2.1.2.
156+
* @deprecated in favor of {@link #StreamsBuilderFactoryBean(Properties, CleanupConfig)}.
125157
*/
158+
@Deprecated
126159
public StreamsBuilderFactoryBean(Map<String, Object> streamsConfig, CleanupConfig cleanupConfig) {
127160
Assert.notNull(streamsConfig, "'streamsConfig' must not be null");
128161
Assert.notNull(cleanupConfig, "'cleanupConfig' must not be null");
@@ -137,6 +170,7 @@ public StreamsBuilderFactoryBean(Map<String, Object> streamsConfig, CleanupConfi
137170
*/
138171
public void setStreamsConfig(StreamsConfig streamsConfig) {
139172
Assert.notNull(streamsConfig, "'streamsConfig' must not be null");
173+
Assert.isNull(this.properties, "Cannot have both streamsConfig and streams configuration properties");
140174
this.streamsConfig = streamsConfig;
141175
}
142176

@@ -145,6 +179,22 @@ public StreamsConfig getStreamsConfig() {
145179
return this.streamsConfig;
146180
}
147181

182+
/**
183+
* Set {@link StreamsConfig} on this factory.
184+
* @param streamsConfig the streams configuration.
185+
* @since 2.2
186+
*/
187+
public void setStreamsConfiguration(Properties streamsConfig) {
188+
Assert.notNull(streamsConfig, "'streamsConfig' must not be null");
189+
Assert.isNull(this.streamsConfig, "Cannot have both streamsConfig and streams configuration properties");
190+
this.properties = streamsConfig;
191+
}
192+
193+
@Nullable
194+
public Properties getStreamsConfiguration() {
195+
return this.properties;
196+
}
197+
148198
public void setClientSupplier(KafkaClientSupplier clientSupplier) {
149199
Assert.notNull(clientSupplier, "'clientSupplier' must not be null");
150200
this.clientSupplier = clientSupplier; // NOSONAR (sync)
@@ -191,7 +241,8 @@ public Class<?> getObjectType() {
191241
@Override
192242
protected StreamsBuilder createInstance() throws Exception {
193243
if (this.autoStartup) {
194-
Assert.notNull(this.streamsConfig, "'streamsConfig' must not be null");
244+
Assert.state(this.streamsConfig != null || this.properties != null,
245+
"'streamsConfig' or streams configuration properties must not be null");
195246
}
196247
return new StreamsBuilder();
197248
}
@@ -217,16 +268,23 @@ public void stop(Runnable callback) {
217268
}
218269
}
219270

271+
@SuppressWarnings("deprecation")
220272
@Override
221273
public synchronized void start() {
222274
if (!this.running) {
223275
try {
224-
Assert.notNull(this.streamsConfig, "'streamsConfig' must not be null");
276+
Assert.state(this.streamsConfig != null || this.properties != null,
277+
"'streamsConfig' or streams configuration properties must not be null");
225278
Topology topology = getObject().build();
226279
if (logger.isDebugEnabled()) {
227280
logger.debug(topology.describe());
228281
}
229-
this.kafkaStreams = new KafkaStreams(topology, this.streamsConfig, this.clientSupplier);
282+
if (this.properties != null) {
283+
this.kafkaStreams = new KafkaStreams(topology, this.properties, this.clientSupplier);
284+
}
285+
else {
286+
this.kafkaStreams = new KafkaStreams(topology, this.streamsConfig, this.clientSupplier);
287+
}
230288
this.kafkaStreams.setStateListener(this.stateListener);
231289
this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
232290
this.kafkaStreams.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,12 @@ public enum AckMode {
9595
/**
9696
* The default {@link #setPollTimeout(long) pollTimeout} (ms).
9797
*/
98-
public static final long DEFAULT_POLL_TIMEOUT = 1000L;
98+
public static final long DEFAULT_POLL_TIMEOUT = 5_000L;
9999

100100
/**
101101
* The default {@link #setShutdownTimeout(long) shutDownTimeout} (ms).
102102
*/
103-
public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
103+
public static final long DEFAULT_SHUTDOWN_TIMEOUT = 10_000L;
104104

105105
/**
106106
* The default {@link #setMonitorInterval(int) monitorInterval} (s).

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.lang.reflect.Type;
20+
import java.time.Duration;
2021
import java.util.ArrayList;
2122
import java.util.Arrays;
2223
import java.util.Collection;
@@ -704,7 +705,8 @@ public void run() {
704705
}
705706
publishConsumerPausedEvent(this.consumer.assignment());
706707
}
707-
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
708+
ConsumerRecords<K, V> records = this.consumer
709+
.poll(Duration.ofMillis(this.containerProperties.getPollTimeout()));
708710
this.lastPoll = System.currentTimeMillis();
709711
if (this.consumerPaused && !isPaused()) {
710712
if (this.logger.isDebugEnabled()) {

0 commit comments

Comments
 (0)