Skip to content

Commit 45a6b06

Browse files
garyrussellartembilan
authored andcommitted
GH-2076: Fix Async Commit Retries
#2076 Do not attempt to retry asynchronous commits. - a later commit for the same topic/partition may have already succeeded - many consecutive retryable exceptions can cause a stack overflow **cherry-pick to 2.8.x, 2.7.x** * Remove unused parameter; polish javadocs. # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
1 parent dfce233 commit 45a6b06

File tree

3 files changed

+22
-47
lines changed

3 files changed

+22
-47
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -289,6 +289,7 @@ public OffsetCommitCallback getCommitCallback() {
289289
* @see #setSyncCommitTimeout(Duration)
290290
* @see #setCommitCallback(OffsetCommitCallback)
291291
* @see #setCommitLogLevel(org.springframework.kafka.support.LogIfLevelEnabled.Level)
292+
* @see #setCommitRetries(int)
292293
*/
293294
public void setSyncCommits(boolean syncCommits) {
294295
this.syncCommits = syncCommits;
@@ -368,9 +369,10 @@ public void setAuthorizationExceptionRetryInterval(Duration authorizationExcepti
368369
/**
369370
* The number of retries allowed when a
370371
* {@link org.apache.kafka.clients.consumer.RetriableCommitFailedException} is thrown
371-
* by the consumer.
372+
* by the consumer when using {@link #setSyncCommits(boolean)} set to true.
372373
* @return the number of retries.
373374
* @since 2.3.9
375+
* @see #setSyncCommits(boolean)
374376
*/
375377
public int getCommitRetries() {
376378
return this.commitRetries;
@@ -379,9 +381,11 @@ public int getCommitRetries() {
379381
/**
380382
* Set number of retries allowed when a
381383
* {@link org.apache.kafka.clients.consumer.RetriableCommitFailedException} is thrown
382-
* by the consumer. Default 3 (4 attempts total).
384+
* by the consumer when using {@link #setSyncCommits(boolean)} set to true. Default 3
385+
* (4 attempts total).
383386
* @param commitRetries the commitRetries.
384387
* @since 2.3.9
388+
* @see #setSyncCommits(boolean)
385389
*/
386390
public void setCommitRetries(int commitRetries) {
387391
this.commitRetries = commitRetries;

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1368,7 +1368,7 @@ private void fixTxOffsetsIfNeeded() {
13681368
commitSync(toFix);
13691369
}
13701370
else {
1371-
commitAsync(toFix, 0);
1371+
commitAsync(toFix);
13721372
}
13731373
}
13741374
else {
@@ -1682,18 +1682,14 @@ else if (this.syncCommits) {
16821682
commitSync(commits);
16831683
}
16841684
else {
1685-
commitAsync(commits, 0);
1685+
commitAsync(commits);
16861686
}
16871687
}
16881688

1689-
private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
1689+
private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits) {
16901690
this.consumer.commitAsync(commits, (offsetsAttempted, exception) -> {
1691-
if (exception instanceof RetriableCommitFailedException
1692-
&& retries < this.containerProperties.getCommitRetries()) {
1693-
commitAsync(commits, retries + 1);
1694-
}
1695-
else {
1696-
this.commitCallback.onComplete(offsetsAttempted, exception);
1691+
this.commitCallback.onComplete(offsetsAttempted, exception);
1692+
if (exception == null) {
16971693
if (this.fixTxOffsets) {
16981694
this.lastCommits.putAll(commits);
16991695
}
@@ -2451,7 +2447,7 @@ public void ackCurrent(final ConsumerRecord<K, V> record) {
24512447
commitSync(offsetsToCommit);
24522448
}
24532449
else {
2454-
commitAsync(offsetsToCommit, 0);
2450+
commitAsync(offsetsToCommit);
24552451
}
24562452
}
24572453
else {
@@ -2706,7 +2702,7 @@ private void commitIfNecessary() {
27062702
commitSync(commits);
27072703
}
27082704
else {
2709-
commitAsync(commits, 0);
2705+
commitAsync(commits);
27102706
}
27112707
}
27122708
catch (@SuppressWarnings(UNUSED) WakeupException e) {
@@ -3051,7 +3047,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
30513047
}
30523048
}
30533049
else {
3054-
commitAsync(offsetsToCommit, 0);
3050+
commitAsync(offsetsToCommit);
30553051
}
30563052
}
30573053
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
import org.apache.kafka.clients.consumer.KafkaConsumer;
7171
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
7272
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
73-
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
7473
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
7574
import org.apache.kafka.clients.producer.ProducerConfig;
7675
import org.apache.kafka.common.TopicPartition;
@@ -3124,17 +3123,8 @@ public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
31243123
}
31253124

31263125
@Test
3127-
void testCommitSyncRetries() throws Exception {
3128-
testCommitRetriesGuts(true);
3129-
}
3130-
3131-
@Test
3132-
void testCommitAsyncRetries() throws Exception {
3133-
testCommitRetriesGuts(false);
3134-
}
3135-
31363126
@SuppressWarnings({ "unchecked", "rawtypes" })
3137-
private void testCommitRetriesGuts(boolean sync) throws Exception {
3127+
void testCommitSyncRetries() throws Exception {
31383128
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
31393129
Consumer<Integer, String> consumer = mock(Consumer.class);
31403130
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
@@ -3153,24 +3143,14 @@ private void testCommitRetriesGuts(boolean sync) throws Exception {
31533143
return first.getAndSet(false) ? consumerRecords : emptyRecords;
31543144
});
31553145
CountDownLatch latch = new CountDownLatch(4);
3156-
if (sync) {
3157-
willAnswer(i -> {
3158-
latch.countDown();
3159-
throw new RetriableCommitFailedException("");
3160-
}).given(consumer).commitSync(anyMap(), eq(Duration.ofSeconds(45)));
3161-
}
3162-
else {
3163-
willAnswer(i -> {
3164-
OffsetCommitCallback callback = i.getArgument(1);
3165-
callback.onComplete(i.getArgument(0), new RetriableCommitFailedException(""));
3166-
latch.countDown();
3167-
return null;
3168-
}).given(consumer).commitAsync(anyMap(), any());
3169-
}
31703146
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
31713147
new TopicPartitionOffset("foo", 0) };
31723148
ContainerProperties containerProps = new ContainerProperties(topicPartition);
3173-
containerProps.setSyncCommits(sync);
3149+
willAnswer(i -> {
3150+
latch.countDown();
3151+
throw new RetriableCommitFailedException("");
3152+
}).given(consumer).commitSync(anyMap(), eq(Duration.ofSeconds(45)));
3153+
containerProps.setSyncCommits(true);
31743154
containerProps.setGroupId("grp");
31753155
containerProps.setClientId("clientId");
31763156
containerProps.setIdleEventInterval(100L);
@@ -3182,12 +3162,7 @@ private void testCommitRetriesGuts(boolean sync) throws Exception {
31823162
container.start();
31833163
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
31843164
container.stop();
3185-
if (sync) {
3186-
verify(consumer, times(4)).commitSync(any(), any());
3187-
}
3188-
else {
3189-
verify(consumer, times(4)).commitAsync(any(), any());
3190-
}
3165+
verify(consumer, times(4)).commitSync(any(), any());
31913166
}
31923167

31933168
@Test

0 commit comments

Comments
 (0)