Skip to content

Commit 34172b6

Browse files
garyrussellartembilan
authored andcommitted
GH-696: Add missingTopicsFatal container property
Fixes #696 Container start() fails if topics are missing.
1 parent 2355bda commit 34172b6

File tree

7 files changed

+153
-1
lines changed

7 files changed

+153
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.ArrayList;
20+
import java.util.Arrays;
1921
import java.util.Collection;
22+
import java.util.List;
2023
import java.util.concurrent.CountDownLatch;
2124
import java.util.concurrent.TimeUnit;
2225

2326
import org.apache.commons.logging.Log;
2427
import org.apache.commons.logging.LogFactory;
28+
import org.apache.kafka.clients.consumer.Consumer;
2529
import org.apache.kafka.clients.consumer.ConsumerConfig;
2630
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
2731
import org.apache.kafka.common.TopicPartition;
@@ -181,6 +185,7 @@ public boolean isAutoStartup() {
181185
return this.autoStartup;
182186
}
183187

188+
@Override
184189
public void setAutoStartup(boolean autoStartup) {
185190
this.autoStartup = autoStartup;
186191
}
@@ -251,6 +256,33 @@ public final void start() {
251256
}
252257
}
253258

259+
protected void checkTopics() {
260+
if (this.containerProperties.isMissingTopicsFatal() && this.containerProperties.getTopicPattern() == null) {
261+
try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(this.containerProperties.getGroupId(),
262+
this.containerProperties.getClientId(), null)) {
263+
if (consumer != null) {
264+
String[] topics = this.containerProperties.getTopics();
265+
if (topics == null) {
266+
topics = Arrays.stream(this.containerProperties.getTopicPartitions())
267+
.map(tp -> tp.topic())
268+
.toArray(String[]::new);
269+
}
270+
List<String> missing = new ArrayList<>();
271+
for (String topic : topics) {
272+
if (consumer.partitionsFor(topic) == null) {
273+
missing.add(topic);
274+
}
275+
}
276+
if (missing.size() > 0) {
277+
throw new IllegalStateException(
278+
"Topic(s) " + missing.toString()
279+
+ " is/are not present and missingTopicsFatal is true");
280+
}
281+
}
282+
}
283+
}
284+
}
285+
254286
public void checkGroupId() {
255287
if (this.containerProperties.getTopicPartitions() == null) {
256288
boolean hasGroupIdConsumerConfig = true; // assume true for non-standard containers

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public boolean isContainerPaused() {
132132
@Override
133133
protected void doStart() {
134134
if (!isRunning()) {
135+
checkTopics();
135136
ContainerProperties containerProperties = getContainerProperties();
136137
TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
137138
if (topicPartitions != null

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ public enum AckMode {
206206

207207
private LogIfLevelEnabled.Level commitLogLevel = LogIfLevelEnabled.Level.DEBUG;
208208

209+
private boolean missingTopicsFatal = true;
210+
209211
public ContainerProperties(String... topics) {
210212
Assert.notEmpty(topics, "An array of topicPartitions must be provided");
211213
this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
@@ -556,6 +558,27 @@ public void setCommitLogLevel(LogIfLevelEnabled.Level commitLogLevel) {
556558
this.commitLogLevel = commitLogLevel;
557559
}
558560

561+
/**
562+
* If true, the container won't start if any of the configured topics are not present
563+
* on the broker. Does not apply when topic patterns are configured. Default true;
564+
* @return the missingTopicsFatal.
565+
* @since 2.2
566+
*/
567+
public boolean isMissingTopicsFatal() {
568+
return this.missingTopicsFatal;
569+
}
570+
571+
/**
572+
* Set to false to allow the container to start even if any of the configured topics
573+
* are not present on the broker. Does not apply when topic patterns are configured.
574+
* Default true;
575+
* @param missingTopicsFatal the missingTopicsFatal.
576+
* @since 2.2
577+
*/
578+
public void setMissingTopicsFatal(boolean missingTopicsFatal) {
579+
this.missingTopicsFatal = missingTopicsFatal;
580+
}
581+
559582
@Override
560583
public String toString() {
561584
return "ContainerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@ protected void doStart() {
227227
if (isRunning()) {
228228
return;
229229
}
230+
if (this.clientIdSuffix == null) { // stand-alone container
231+
checkTopics();
232+
}
230233
ContainerProperties containerProperties = getContainerProperties();
231234
if (!this.consumerFactory.isAutoCommit()) {
232235
AckMode ackMode = containerProperties.getAckMode();
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.fail;
21+
22+
import java.util.Map;
23+
24+
import org.junit.ClassRule;
25+
import org.junit.Test;
26+
27+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
28+
import org.springframework.kafka.test.rule.KafkaEmbedded;
29+
import org.springframework.kafka.test.utils.KafkaTestUtils;
30+
31+
/**
32+
* @author Gary Russell
33+
* @since 2.2
34+
*
35+
*/
36+
public class MissingTopicsTests {
37+
38+
@ClassRule
39+
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true)
40+
.brokerProperty("auto.create.topics.enable", false);
41+
42+
@Test
43+
public void testMissingTopicCMLC() throws Exception {
44+
Map<String, Object> props = KafkaTestUtils.consumerProps("missing1", "true", embeddedKafka);
45+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
46+
ContainerProperties containerProps = new ContainerProperties("notexisting");
47+
containerProps.setMessageListener((MessageListener<Integer, String>) message -> { });
48+
ConcurrentMessageListenerContainer<Integer, String> container =
49+
new ConcurrentMessageListenerContainer<>(cf, containerProps);
50+
container.setBeanName("testMissing1");
51+
try {
52+
container.start();
53+
fail("Expected exception");
54+
}
55+
catch (IllegalStateException e) {
56+
assertThat(e.getMessage()).contains("missingTopicsFatal");
57+
}
58+
}
59+
60+
@Test
61+
public void testMissingTopicKMLC() throws Exception {
62+
Map<String, Object> props = KafkaTestUtils.consumerProps("missing2", "true", embeddedKafka);
63+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
64+
ContainerProperties containerProps = new ContainerProperties("notexisting");
65+
containerProps.setMessageListener((MessageListener<Integer, String>) message -> { });
66+
KafkaMessageListenerContainer<Integer, String> container =
67+
new KafkaMessageListenerContainer<>(cf, containerProps);
68+
container.setBeanName("testMissing2");
69+
try {
70+
container.start();
71+
fail("Expected exception");
72+
}
73+
catch (IllegalStateException e) {
74+
assertThat(e.getMessage()).contains("missingTopicsFatal");
75+
}
76+
container.getContainerProperties().setMissingTopicsFatal(false);
77+
container.start();
78+
container.stop();
79+
}
80+
81+
82+
}

src/reference/asciidoc/kafka.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -616,12 +616,17 @@ return container;
616616

617617
Refer to the JavaDocs for `ContainerProperties` for more information about the various properties that can be set.
618618

619-
Since version _2.1.1_, a new property `logContainerConfig` is available; when true, and INFO logging is enabled, each listener container will write a log message summarizing its configuration properties.
619+
Since _version 2.1.1_, a new property `logContainerConfig` is available; when true, and INFO logging is enabled, each listener container will write a log message summarizing its configuration properties.
620620

621621
By default, logging of topic offset commits is performed with the DEBUG logging level.
622622
Starting with _version 2.1.2_, there is a new property in `ContainerProperties` called `commitLogLevel` which allows you to specify the log level for these messages.
623623
For example, to change the log level to INFO, use `containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);`.
624624

625+
Starting with _version 2.2_, a new container property `missingTopicsFatal` has been added (default `true`).
626+
This prevents the container from starting if any of the configured topics are not present on the broker; it does not apply if the container is configured to listen to a topic pattern (regex).
627+
Previously, the container threads looped within the `consumer.poll()` method waiting for the topic to appear, while logging many messages; aside from the logs, there was no indication that there was a problem.
628+
To restore the previous behavior, set the property to `false`.
629+
625630
====== ConcurrentMessageListenerContainer
626631

627632
The single constructor is similar to the first `KafkaListenerContainer` constructor:

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,9 @@ A new `AfterRollbackProcessor` strategy is provided - see <<after-rollback>> for
2020

2121
The `ConcurrentKafkaListenerContainerFactory` can now be used to create/configure any `ConcurrentMessageListenerContainer`, not just those for `@KafkaListener` annotations.
2222
See <<container-factory>> for more information.
23+
24+
==== Listener Container Changes
25+
26+
A new container property `missingTopicsFatal` has been added.
27+
28+
See <<kafka-container>> for more information.

0 commit comments

Comments
 (0)