From 0fdb5f49b5c33bceb27899ef123027a9a238ab00 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 24 Oct 2022 15:40:28 -0400 Subject: [PATCH] GH-2427: Allow RuntimeException to be Classified Resolves https://github.com/spring-projects/spring-kafka/issues/2457 Previously, classifying `RuntimeException` either for no retries or for blocking retries would cause undesirable effects - its classification would be found first because the classifier first traverses up the class hierarchy to find a match before traversing down the cause links. When classifying exception for retry, unwrap the 'LEFE' cause from a `TimestampedException` and/or `ListenerExecutionFailedException` so that the classification is made on that cause. By default, the retry classifier has no classified exceptions when used in "classify for retry" mode (instead of the default "classify for no retry" mode). This means that, if `retryOn(RuntimeException.class)` is used, then all `RuntimeException`s will be retried (including those that are usually considered fatal). With mixed blocking/non-blocking retries, change the behavior to include the standard fatal exceptions in case the user configures all `RuntimeException`s to use blocking retries. Also tested with a Boot app to see that a conversion exception bypasses all retries and goes straight to the DLT. --- .../kafka/listener/ExceptionClassifier.java | 25 +++++++++-- .../kafka/listener/FailedRecordProcessor.java | 9 +++- .../ListenerContainerFactoryConfigurer.java | 14 +++++- .../RetryTopicConfigurationSupport.java | 44 +++++++++++++++---- .../DeadLetterPublishingRecovererTests.java | 19 ++++++++ ...stenerContainerFactoryConfigurerTests.java | 37 ++++++++++++++++ .../RetryTopicConfigurationSupportTests.java | 1 + 7 files changed, 135 insertions(+), 14 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java index 43f23448b7..6bf67859b0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java @@ -45,7 +45,7 @@ public abstract class ExceptionClassifier extends KafkaExceptionLogLevelAware { * Construct the instance. */ public ExceptionClassifier() { - this.classifier = configureDefaultClassifier(); + this.classifier = configureDefaultClassifier(true); } /** @@ -64,9 +64,27 @@ public static List> defaultFatalExceptionsList() { ClassCastException.class); } - private static ExtendedBinaryExceptionClassifier configureDefaultClassifier() { + private static ExtendedBinaryExceptionClassifier configureDefaultClassifier(boolean defaultClassification) { return new ExtendedBinaryExceptionClassifier(defaultFatalExceptionsList().stream() - .collect(Collectors.toMap(ex -> ex, ex -> false)), true); + .collect(Collectors.toMap(ex -> ex, ex -> false)), defaultClassification); + } + + /** + * By default, unmatched types classify as true. Call this method to make the default + * false, and optionally retain types implicitly classified as false. This should be + * called before calling any of the classification modification methods. This can be + * useful if you want to classify a super class of one or more of the standard fatal + * exceptions as retryable. + * @param retainStandardFatal true to retain. + * @since 3.0 + */ + public void defaultFalse(boolean retainStandardFatal) { + if (retainStandardFatal) { + this.classifier = configureDefaultClassifier(false); + } + else { + defaultFalse(); + } } /** @@ -94,6 +112,7 @@ protected BinaryExceptionClassifier getClassifier() { *