Skip to content

Commit 7bf48ed

Browse files
committed
Merge pull request #16740 from garyrussell
* pr/16740: Polish "Add configuration property for Spring Kafka's missingTopicsFatal" Add configuration property for Spring Kafka's missingTopicsFatal
2 parents 68085c9 + 3c46b9e commit 7bf48ed

File tree

3 files changed

+31
-0
lines changed

3 files changed

+31
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ private void configureContainer(ContainerProperties container) {
158158
map.from(properties::getMonitorInterval).as(Duration::getSeconds)
159159
.as(Number::intValue).to(container::setMonitorInterval);
160160
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
161+
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
161162
map.from(this.transactionManager).to(container::setTransactionManager);
162163
}
163164

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,12 @@ public enum Type {
873873
*/
874874
private Boolean logContainerConfig;
875875

876+
/**
877+
* Whether the container should fail to start if at least one of the configured
878+
* topics are not present on the broker.
879+
*/
880+
private boolean missingTopicsFatal = true;
881+
876882
public Type getType() {
877883
return this.type;
878884
}
@@ -961,6 +967,14 @@ public void setLogContainerConfig(Boolean logContainerConfig) {
961967
this.logContainerConfig = logContainerConfig;
962968
}
963969

970+
public boolean isMissingTopicsFatal() {
971+
return this.missingTopicsFatal;
972+
}
973+
974+
public void setMissingTopicsFatal(boolean missingTopicsFatal) {
975+
this.missingTopicsFatal = missingTopicsFatal;
976+
}
977+
964978
}
965979

966980
public static class Ssl {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.junit.jupiter.api.Test;
3939

4040
import org.springframework.boot.autoconfigure.AutoConfigurations;
41+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
4142
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
4243
import org.springframework.context.annotation.Bean;
4344
import org.springframework.context.annotation.Configuration;
@@ -457,6 +458,7 @@ public void listenerProperties() {
457458
"spring.kafka.listener.idle-event-interval=1s",
458459
"spring.kafka.listener.monitor-interval=45",
459460
"spring.kafka.listener.log-container-config=true",
461+
"spring.kafka.listener.missing-topics-fatal=false",
460462
"spring.kafka.jaas.enabled=true",
461463
"spring.kafka.producer.transaction-id-prefix=foo",
462464
"spring.kafka.jaas.login-module=foo",
@@ -491,6 +493,7 @@ public void listenerProperties() {
491493
.isEqualTo(1000L);
492494
assertThat(containerProperties.getMonitorInterval()).isEqualTo(45);
493495
assertThat(containerProperties.isLogContainerConfig()).isTrue();
496+
assertThat(containerProperties.isMissingTopicsFatal()).isFalse();
494497
assertThat(ReflectionTestUtils.getField(kafkaListenerContainerFactory,
495498
"concurrency")).isEqualTo(3);
496499
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();
@@ -509,6 +512,19 @@ public void listenerProperties() {
509512
});
510513
}
511514

515+
@Test
516+
public void listenerPropertiesMatchDefaults() {
517+
this.contextRunner.run((context) -> {
518+
Listener listenerProperties = new KafkaProperties().getListener();
519+
AbstractKafkaListenerContainerFactory<?, ?, ?> kafkaListenerContainerFactory = (AbstractKafkaListenerContainerFactory<?, ?, ?>) context
520+
.getBean(KafkaListenerContainerFactory.class);
521+
ContainerProperties containerProperties = kafkaListenerContainerFactory
522+
.getContainerProperties();
523+
assertThat(containerProperties.isMissingTopicsFatal())
524+
.isEqualTo(listenerProperties.isMissingTopicsFatal());
525+
});
526+
}
527+
512528
@Test
513529
public void testKafkaTemplateRecordMessageConverters() {
514530
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)

0 commit comments

Comments
 (0)