Skip to content

Commit 3de1e89

Browse files
committed
GH-2382: Fix RetryingBatchErrorHandler Cross Talk
Resolves #2382 Change `retrying` boolean to `ThreadLocal<Boolean>`.
1 parent f001423 commit 3de1e89

File tree

2 files changed

+47
-7
lines changed

2 files changed

+47
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ public class RetryingBatchErrorHandler extends KafkaExceptionLogLevelAware
5555
@SuppressWarnings("deprecation")
5656
private final CommonErrorHandler seeker = new ErrorHandlerAdapter(new SeekToCurrentBatchErrorHandler());
5757

58-
private boolean ackAfterHandle = true;
58+
private final ThreadLocal<Boolean> retrying = ThreadLocal.withInitial(() -> false);
5959

60-
private boolean retrying;
60+
private boolean ackAfterHandle = true;
6161

6262
/**
6363
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
@@ -104,14 +104,18 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
104104
this.logger.error(thrownException, "Called with no records; consumer exception");
105105
return;
106106
}
107-
this.retrying = true;
108-
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
109-
this.seeker, this.recoverer, this.logger, getLogLevel());
110-
this.retrying = false;
107+
this.retrying.set(true);
108+
try {
109+
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
110+
this.seeker, this.recoverer, this.logger, getLogLevel());
111+
}
112+
finally {
113+
this.retrying.set(false);
114+
}
111115
}
112116

113117
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
114-
if (this.retrying) {
118+
if (this.retrying.get()) {
115119
consumer.pause(consumer.assignment());
116120
}
117121
}

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
1920
import static org.assertj.core.api.Assertions.assertThat;
2021
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2123
import static org.mockito.ArgumentMatchers.any;
2224
import static org.mockito.BDDMockito.given;
2325
import static org.mockito.BDDMockito.willAnswer;
26+
import static org.mockito.BDDMockito.willThrow;
2427
import static org.mockito.Mockito.inOrder;
2528
import static org.mockito.Mockito.mock;
2629
import static org.mockito.Mockito.times;
2730
import static org.mockito.Mockito.verify;
2831
import static org.mockito.Mockito.verifyNoMoreInteractions;
2932

33+
import java.lang.reflect.Field;
3034
import java.util.ArrayList;
3135
import java.util.Collections;
3236
import java.util.HashMap;
@@ -42,6 +46,7 @@
4246
import org.mockito.InOrder;
4347

4448
import org.springframework.kafka.KafkaException;
49+
import org.springframework.util.ReflectionUtils;
4550
import org.springframework.util.backoff.FixedBackOff;
4651

4752
/**
@@ -205,4 +210,35 @@ void rePauseOnRebalance() {
205210
verifyNoMoreInteractions(consumer);
206211
}
207212

213+
@Test
214+
void resetRetryingFlagOnExceptionFromRetryBatch() {
215+
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0L, 1L), (consumerRecord, e) -> { });
216+
217+
Consumer<?, ?> consumer = mock(Consumer.class);
218+
// KafkaException could be thrown from SeekToCurrentBatchErrorHandler, but it is hard to mock
219+
KafkaException exception = new KafkaException("Failed consumer.resume()");
220+
willThrow(exception).given(consumer).resume(any());
221+
222+
MessageListenerContainer container = mock(MessageListenerContainer.class);
223+
given(container.isRunning()).willReturn(true);
224+
225+
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = new HashMap<>();
226+
map.put(new TopicPartition("foo", 0),
227+
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, "foo", "bar")));
228+
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
229+
230+
assertThatThrownBy(() -> eh.handle(new RuntimeException(), records, consumer, container, () -> { }))
231+
.isSameAs(exception);
232+
233+
assertThat(getRetryingFieldValue(eh))
234+
.withFailMessage("retrying field was not reset to false")
235+
.isFalse();
236+
}
237+
238+
private boolean getRetryingFieldValue(FallbackBatchErrorHandler errorHandler) {
239+
Field field = ReflectionUtils.findField(FallbackBatchErrorHandler.class, "retrying");
240+
ReflectionUtils.makeAccessible(field);
241+
ThreadLocal<Boolean> value = (ThreadLocal<Boolean>) ReflectionUtils.getField(field, errorHandler);
242+
return value.get();
243+
}
208244
}

0 commit comments

Comments
 (0)