Skip to content

Commit 5c70ec1

Browse files
garyrussellartembilan
authored andcommitted
GH-2459: FallbackBatchErrorHandler Retryable Ex
Resolves #2459 The `FallbackBatchErrorHandler` was not an `ExceptionClassifier`. The default error handler should propagate exception classifications. **2.9.x - I will back port**
1 parent 2966c7d commit 5c70ec1

File tree

6 files changed

+158
-5
lines changed

6 files changed

+158
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919
import java.util.Collection;
2020
import java.util.Collections;
2121
import java.util.List;
22+
import java.util.Map;
23+
import java.util.stream.Stream;
2224

2325
import org.apache.kafka.clients.consumer.Consumer;
2426
import org.apache.kafka.clients.consumer.ConsumerRecord;
2527
import org.apache.kafka.clients.consumer.ConsumerRecords;
2628
import org.apache.kafka.common.TopicPartition;
2729

2830
import org.springframework.kafka.support.TopicPartitionOffset;
31+
import org.springframework.lang.Nullable;
2932
import org.springframework.util.Assert;
3033

3134
/**
@@ -35,7 +38,7 @@
3538
* @since 2.7.4
3639
*
3740
*/
38-
class ErrorHandlerAdapter implements CommonErrorHandler {
41+
class ErrorHandlerAdapter extends ExceptionClassifier implements CommonErrorHandler {
3942

4043
@SuppressWarnings({ "rawtypes", "unchecked" })
4144
private static final ConsumerRecords EMPTY_BATCH = new ConsumerRecords(Collections.emptyMap());
@@ -171,5 +174,30 @@ public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartit
171174
}
172175
}
173176

177+
@Override
178+
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
179+
if (this.batchErrorHandler instanceof ExceptionClassifier) {
180+
notRetryable.forEach(ex -> ((ExceptionClassifier) this.batchErrorHandler).addNotRetryableExceptions(ex));
181+
}
182+
}
183+
184+
@Override
185+
public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {
186+
super.setClassifications(classifications, defaultValue);
187+
if (this.batchErrorHandler instanceof ExceptionClassifier) {
188+
((ExceptionClassifier) this.batchErrorHandler).setClassifications(classifications, defaultValue);
189+
}
190+
}
191+
192+
@Override
193+
@Nullable
194+
public Boolean removeClassification(Class<? extends Exception> exceptionType) {
195+
Boolean removed = super.removeClassification(exceptionType);
196+
if (this.batchErrorHandler instanceof ExceptionClassifier) {
197+
((ExceptionClassifier) this.batchErrorHandler).removeClassification(exceptionType);
198+
}
199+
return removed;
200+
}
201+
174202
}
175203

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.clients.consumer.ConsumerRecords;
2727
import org.apache.kafka.common.TopicPartition;
2828

29+
import org.springframework.classify.BinaryExceptionClassifier;
2930
import org.springframework.core.log.LogAccessor;
3031
import org.springframework.kafka.KafkaException;
3132
import org.springframework.lang.Nullable;
@@ -77,18 +78,49 @@ public static void clearRetryListeners() {
7778
* @param recoverer the recoverer.
7879
* @param logger the logger.
7980
* @param logLevel the log level.
81+
* @deprecated in favor of
82+
* {@link #retryBatch(Exception, ConsumerRecords, Consumer, MessageListenerContainer, Runnable, BackOff, CommonErrorHandler, BiConsumer, LogAccessor, org.springframework.kafka.KafkaException.Level, List, BinaryExceptionClassifier)}.
8083
*/
84+
@Deprecated
8185
public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
8286
MessageListenerContainer container, Runnable invokeListener, BackOff backOff,
8387
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
8488
KafkaException.Level logLevel) {
8589

90+
retryBatch(thrownException, records, consumer, container, invokeListener, backOff, seeker, null, logger,
91+
logLevel, null, new BinaryExceptionClassifier(true));
92+
}
93+
94+
/**
95+
* Retry a complete batch by pausing the consumer and then, in a loop, poll the
96+
* consumer, wait for the next back off, then call the listener. When retries are
97+
* exhausted, call the recoverer with the {@link ConsumerRecords}.
98+
* @param thrownException the exception.
99+
* @param records the records.
100+
* @param consumer the consumer.
101+
* @param container the container.
102+
* @param invokeListener the {@link Runnable} to run (call the listener).
103+
* @param backOff the backOff.
104+
* @param seeker the common error handler that re-seeks the entire batch.
105+
* @param recoverer the recoverer.
106+
* @param logger the logger.
107+
* @param logLevel the log level.
108+
* @param retryListenersArg the retry listeners.
109+
* @param classifier the exception classifier.
110+
* @since 2.8.11
111+
*/
112+
public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
113+
MessageListenerContainer container, Runnable invokeListener, BackOff backOff,
114+
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
115+
KafkaException.Level logLevel, @Nullable List<RetryListener> retryListenersArg,
116+
BinaryExceptionClassifier classifier) {
117+
86118
BackOffExecution execution = backOff.start();
87119
long nextBackOff = execution.nextBackOff();
88120
String failed = null;
89121
Set<TopicPartition> assignment = consumer.assignment();
90122
consumer.pause(assignment);
91-
List<RetryListener> listeners = retryListeners.get();
123+
List<RetryListener> listeners = retryListenersArg != null ? retryListenersArg : retryListeners.get();
92124
int attempt = 1;
93125
listen(listeners, records, thrownException, attempt++);
94126
ConsumerRecord<?, ?> first = records.iterator().next();
@@ -98,7 +130,8 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
98130
.publishConsumerPausedEvent(assignment, "For batch retry");
99131
}
100132
try {
101-
while (nextBackOff != BackOffExecution.STOP) {
133+
Boolean retryable = classifier.classify(unwrapIfNeeded(thrownException));
134+
while (Boolean.TRUE.equals(retryable) && nextBackOff != BackOffExecution.STOP) {
102135
consumer.poll(Duration.ZERO);
103136
try {
104137
ListenerUtils.stoppableSleep(container, nextBackOff);
@@ -171,4 +204,22 @@ public static String recordsToString(ConsumerRecords<?, ?> records) {
171204
return sb.toString();
172205
}
173206

207+
/**
208+
* Remove a {@link TimestampedException}, if present.
209+
* Remove a {@link ListenerExecutionFailedException}, if present.
210+
* @param exception the exception.
211+
* @return the unwrapped cause or cause of cause.
212+
* @since 2.8.11
213+
*/
214+
public static Exception unwrapIfNeeded(Exception exception) {
215+
Exception theEx = exception;
216+
if (theEx instanceof TimestampedException && theEx.getCause() instanceof Exception) {
217+
theEx = (Exception) theEx.getCause();
218+
}
219+
if (theEx instanceof ListenerExecutionFailedException && theEx.getCause() instanceof Exception) {
220+
theEx = (Exception) theEx.getCause();
221+
}
222+
return theEx;
223+
}
224+
174225
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.stream.Collectors;
24+
import java.util.stream.Stream;
2425

2526
import org.springframework.classify.BinaryExceptionClassifier;
2627
import org.springframework.kafka.support.converter.ConversionException;
@@ -130,6 +131,16 @@ public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifi
130131
@SuppressWarnings("varargs")
131132
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
132133
add(false, exceptionTypes);
134+
notRetryable(Arrays.stream(exceptionTypes));
135+
}
136+
137+
/**
138+
* Subclasses can override this to receive notification of configuration of not
139+
* retryable exceptions.
140+
* @param notRetryable the not retryable exceptions.
141+
* @since 2.9.3
142+
*/
143+
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
133144
}
134145

135146
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Map;
2727
import java.util.Set;
2828
import java.util.function.BiConsumer;
29+
import java.util.stream.Stream;
2930

3031
import org.apache.kafka.clients.consumer.Consumer;
3132
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -35,6 +36,7 @@
3536
import org.apache.kafka.common.TopicPartition;
3637

3738
import org.springframework.kafka.KafkaException;
39+
import org.springframework.kafka.KafkaException.Level;
3840
import org.springframework.lang.Nullable;
3941
import org.springframework.util.backoff.BackOff;
4042

@@ -84,6 +86,39 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
8486
this.fallbackBatchHandler = fallbackHandler;
8587
}
8688

89+
@Override
90+
public void setLogLevel(Level logLevel) {
91+
super.setLogLevel(logLevel);
92+
if (this.fallbackBatchHandler instanceof KafkaExceptionLogLevelAware) {
93+
((KafkaExceptionLogLevelAware) this.fallbackBatchHandler).setLogLevel(logLevel);
94+
}
95+
}
96+
97+
@Override
98+
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
99+
if (this.fallbackBatchHandler instanceof ExceptionClassifier) {
100+
notRetryable.forEach(ex -> ((ExceptionClassifier) this.fallbackBatchHandler).addNotRetryableExceptions(ex));
101+
}
102+
}
103+
104+
@Override
105+
public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {
106+
super.setClassifications(classifications, defaultValue);
107+
if (this.fallbackBatchHandler instanceof ExceptionClassifier) {
108+
((ExceptionClassifier) this.fallbackBatchHandler).setClassifications(classifications, defaultValue);
109+
}
110+
}
111+
112+
@Override
113+
@Nullable
114+
public Boolean removeClassification(Class<? extends Exception> exceptionType) {
115+
Boolean removed = super.removeClassification(exceptionType);
116+
if (this.fallbackBatchHandler instanceof ExceptionClassifier) {
117+
((ExceptionClassifier) this.fallbackBatchHandler).removeClassification(exceptionType);
118+
}
119+
return removed;
120+
}
121+
87122
/**
88123
* Return the fallback batch error handler.
89124
* @return the handler.

spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
* @since 2.3.7
4242
*
4343
*/
44-
class FallbackBatchErrorHandler extends KafkaExceptionLogLevelAware
44+
class FallbackBatchErrorHandler extends ExceptionClassifier
4545
implements ListenerInvokingBatchErrorHandler {
4646

4747
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
@@ -105,7 +105,7 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
105105
this.retrying.set(true);
106106
try {
107107
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
108-
this.seeker, this.recoverer, this.logger, getLogLevel());
108+
this.seeker, this.recoverer, this.logger, getLogLevel(), null, getClassifier());
109109
}
110110
finally {
111111
this.retrying.set(false);

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.BDDMockito.willAnswer;
2525
import static org.mockito.Mockito.inOrder;
2626
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.never;
2728
import static org.mockito.Mockito.times;
2829
import static org.mockito.Mockito.verify;
2930

@@ -234,6 +235,33 @@ void fallbackListener() {
234235
verify(retryListener).recovered(any(ConsumerRecords.class), any());
235236
}
236237

238+
@SuppressWarnings({ "unchecked", "rawtypes" })
239+
@Test
240+
void notRetryable() {
241+
Consumer mockConsumer = mock(Consumer.class);
242+
ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class);
243+
DefaultErrorHandler beh = new DefaultErrorHandler(recoverer, new FixedBackOff(0, 2));
244+
beh.addNotRetryableExceptions(IllegalStateException.class);
245+
RetryListener retryListener = mock(RetryListener.class);
246+
beh.setRetryListeners(retryListener);
247+
TopicPartition tp = new TopicPartition("foo", 0);
248+
ConsumerRecords<?, ?> records = new ConsumerRecords(Collections.singletonMap(tp,
249+
List.of(new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
250+
new RecordHeaders(), Optional.empty()),
251+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
252+
new RecordHeaders(), Optional.empty()))));
253+
MessageListenerContainer container = mock(MessageListenerContainer.class);
254+
given(container.isRunning()).willReturn(true);
255+
beh.handleBatch(new ListenerExecutionFailedException("test", new IllegalStateException()),
256+
records, mockConsumer, container, () -> {
257+
});
258+
verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(1));
259+
// no retries
260+
verify(retryListener, never()).failedDelivery(any(ConsumerRecords.class), any(), eq(2));
261+
verify(recoverer, times(2)).accept(any(), any()); // each record in batch
262+
verify(retryListener).recovered(any(ConsumerRecords.class), any());
263+
}
264+
237265
@Configuration
238266
@EnableKafka
239267
public static class Config {

0 commit comments

Comments
 (0)