Skip to content

Commit 52c3f1c

Browse files
TheCKwilkinsona
authored andcommitted
Add a configuration property for Kafka's async acks
See gh-30776
1 parent d9e9ab0 commit 52c3f1c

File tree

3 files changed

+31
-12
lines changed

3 files changed

+31
-12
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

+1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ private void configureContainer(ContainerProperties container) {
166166
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
167167
Listener properties = this.properties.getListener();
168168
map.from(properties::getAckMode).to(container::setAckMode);
169+
map.from(properties::getAsyncAcks).to(container::setAsyncAcks);
169170
map.from(properties::getClientId).to(container::setClientId);
170171
map.from(properties::getAckCount).to(container::setAckCount);
171172
map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime);

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

+15
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,13 @@ public enum Type {
881881
*/
882882
private AckMode ackMode;
883883

884+
/**
885+
* Support for asynchronous record acknowledgments. Only applies with
886+
* ContainerProperties.AckMode.MANUAL or
887+
* ContainerProperties.AckMode.MANUAL_IMMEDIATE.
888+
*/
889+
private Boolean asyncAcks;
890+
884891
/**
885892
* Prefix for the listener's consumer client.id property.
886893
*/
@@ -969,6 +976,14 @@ public void setAckMode(AckMode ackMode) {
969976
this.ackMode = ackMode;
970977
}
971978

979+
public Boolean getAsyncAcks() {
980+
return this.asyncAcks;
981+
}
982+
983+
public void setAsyncAcks(Boolean asyncAcks) {
984+
this.asyncAcks = asyncAcks;
985+
}
986+
972987
public String getClientId() {
973988
return this.clientId;
974989
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

+15-12
Original file line numberDiff line numberDiff line change
@@ -450,18 +450,20 @@ void streamsApplicationIdIsNotMandatoryIfEnableKafkaStreamsIsNotSet() {
450450
@SuppressWarnings("unchecked")
451451
@Test
452452
void listenerProperties() {
453-
this.contextRunner.withPropertyValues("spring.kafka.template.default-topic=testTopic",
454-
"spring.kafka.template.transaction-id-prefix=txOverride", "spring.kafka.listener.ack-mode=MANUAL",
455-
"spring.kafka.listener.client-id=client", "spring.kafka.listener.ack-count=123",
456-
"spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3",
457-
"spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.no-poll-threshold=2.5",
458-
"spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s",
459-
"spring.kafka.listener.idle-event-interval=1s",
460-
"spring.kafka.listener.idle-partition-event-interval=1s", "spring.kafka.listener.monitor-interval=45",
461-
"spring.kafka.listener.log-container-config=true", "spring.kafka.listener.missing-topics-fatal=true",
462-
"spring.kafka.jaas.enabled=true", "spring.kafka.listener.immediate-stop=true",
463-
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo",
464-
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true")
453+
this.contextRunner
454+
.withPropertyValues("spring.kafka.template.default-topic=testTopic",
455+
"spring.kafka.template.transaction-id-prefix=txOverride",
456+
"spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client",
457+
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456",
458+
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000",
459+
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
460+
"spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s",
461+
"spring.kafka.listener.idle-partition-event-interval=1s",
462+
"spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true",
463+
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
464+
"spring.kafka.listener.immediate-stop=true", "spring.kafka.producer.transaction-id-prefix=foo",
465+
"spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE",
466+
"spring.kafka.jaas.options.useKeyTab=true", "spring.kafka.listener.async-acks=true")
465467
.run((context) -> {
466468
DefaultKafkaProducerFactory<?, ?> producerFactory = context
467469
.getBean(DefaultKafkaProducerFactory.class);
@@ -477,6 +479,7 @@ void listenerProperties() {
477479
assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory);
478480
ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties();
479481
assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL);
482+
assertThat(containerProperties.isAsyncAcks()).isEqualTo(true);
480483
assertThat(containerProperties.getClientId()).isEqualTo("client");
481484
assertThat(containerProperties.getAckCount()).isEqualTo(123);
482485
assertThat(containerProperties.getAckTime()).isEqualTo(456L);

0 commit comments

Comments
 (0)