Skip to content

Commit f3aa5d0

Browse files
committed
Merge pull request #30776 from TheCK
* gh-30776: Polish "Add a configuration property for Kafka's async acks" Add a configuration property for Kafka's async acks Closes gh-30776
2 parents d9e9ab0 + d36b63d commit f3aa5d0

File tree

3 files changed

+30
-12
lines changed

3 files changed

+30
-12
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

+1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ private void configureContainer(ContainerProperties container) {
166166
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
167167
Listener properties = this.properties.getListener();
168168
map.from(properties::getAckMode).to(container::setAckMode);
169+
map.from(properties::getAsyncAcks).to(container::setAsyncAcks);
169170
map.from(properties::getClientId).to(container::setClientId);
170171
map.from(properties::getAckCount).to(container::setAckCount);
171172
map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime);

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

+14
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,12 @@ public enum Type {
881881
*/
882882
private AckMode ackMode;
883883

884+
/**
885+
* Support for asynchronous record acknowledgements. Only applies when
886+
* spring.kafka.listener.ack-mode is manual or manual-immediate.
887+
*/
888+
private Boolean asyncAcks;
889+
884890
/**
885891
* Prefix for the listener's consumer client.id property.
886892
*/
@@ -969,6 +975,14 @@ public void setAckMode(AckMode ackMode) {
969975
this.ackMode = ackMode;
970976
}
971977

978+
public Boolean getAsyncAcks() {
979+
return this.asyncAcks;
980+
}
981+
982+
public void setAsyncAcks(Boolean asyncAcks) {
983+
this.asyncAcks = asyncAcks;
984+
}
985+
972986
public String getClientId() {
973987
return this.clientId;
974988
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

+15-12
Original file line numberDiff line numberDiff line change
@@ -450,18 +450,20 @@ void streamsApplicationIdIsNotMandatoryIfEnableKafkaStreamsIsNotSet() {
450450
@SuppressWarnings("unchecked")
451451
@Test
452452
void listenerProperties() {
453-
this.contextRunner.withPropertyValues("spring.kafka.template.default-topic=testTopic",
454-
"spring.kafka.template.transaction-id-prefix=txOverride", "spring.kafka.listener.ack-mode=MANUAL",
455-
"spring.kafka.listener.client-id=client", "spring.kafka.listener.ack-count=123",
456-
"spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3",
457-
"spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.no-poll-threshold=2.5",
458-
"spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s",
459-
"spring.kafka.listener.idle-event-interval=1s",
460-
"spring.kafka.listener.idle-partition-event-interval=1s", "spring.kafka.listener.monitor-interval=45",
461-
"spring.kafka.listener.log-container-config=true", "spring.kafka.listener.missing-topics-fatal=true",
462-
"spring.kafka.jaas.enabled=true", "spring.kafka.listener.immediate-stop=true",
463-
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo",
464-
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true")
453+
this.contextRunner
454+
.withPropertyValues("spring.kafka.template.default-topic=testTopic",
455+
"spring.kafka.template.transaction-id-prefix=txOverride",
456+
"spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client",
457+
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456",
458+
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000",
459+
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
460+
"spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s",
461+
"spring.kafka.listener.idle-partition-event-interval=1s",
462+
"spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true",
463+
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
464+
"spring.kafka.listener.immediate-stop=true", "spring.kafka.producer.transaction-id-prefix=foo",
465+
"spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE",
466+
"spring.kafka.jaas.options.useKeyTab=true", "spring.kafka.listener.async-acks=true")
465467
.run((context) -> {
466468
DefaultKafkaProducerFactory<?, ?> producerFactory = context
467469
.getBean(DefaultKafkaProducerFactory.class);
@@ -477,6 +479,7 @@ void listenerProperties() {
477479
assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory);
478480
ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties();
479481
assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL);
482+
assertThat(containerProperties.isAsyncAcks()).isEqualTo(true);
480483
assertThat(containerProperties.getClientId()).isEqualTo("client");
481484
assertThat(containerProperties.getAckCount()).isEqualTo(123);
482485
assertThat(containerProperties.getAckTime()).isEqualTo(456L);

0 commit comments

Comments
 (0)