Skip to content

GH-2427: Allow RuntimeException to be Classified #2458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class ExceptionClassifier extends KafkaExceptionLogLevelAware {
* Construct the instance.
*/
public ExceptionClassifier() {
this.classifier = configureDefaultClassifier();
this.classifier = configureDefaultClassifier(true);
}

/**
Expand All @@ -64,9 +64,27 @@ public static List<Class<? extends Throwable>> defaultFatalExceptionsList() {
ClassCastException.class);
}

private static ExtendedBinaryExceptionClassifier configureDefaultClassifier() {
private static ExtendedBinaryExceptionClassifier configureDefaultClassifier(boolean defaultClassification) {
return new ExtendedBinaryExceptionClassifier(defaultFatalExceptionsList().stream()
.collect(Collectors.toMap(ex -> ex, ex -> false)), true);
.collect(Collectors.toMap(ex -> ex, ex -> false)), defaultClassification);
}

/**
* By default, unmatched types classify as true. Call this method to make the default
* false, and optionally retain types implicitly classified as false. This should be
* called before calling any of the classification modification methods. This can be
* useful if you want to classify a super class of one or more of the standard fatal
* exceptions as retryable.
* @param retainStandardFatal true to retain.
* @since 3.0
*/
public void defaultFalse(boolean retainStandardFatal) {
if (retainStandardFatal) {
this.classifier = configureDefaultClassifier(false);
}
else {
defaultFalse();
}
}

/**
Expand Down Expand Up @@ -94,6 +112,7 @@ protected BinaryExceptionClassifier getClassifier() {
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen

private final BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> noRetriesForClassified =
(rec, ex) -> {
if (!getClassifier().classify(ex)) {
Exception theEx = ex;
if (theEx instanceof TimestampedException && theEx.getCause() instanceof Exception cause) {
theEx = cause;
}
if (theEx instanceof ListenerExecutionFailedException && theEx.getCause() instanceof Exception cause) {
theEx = cause;
}
if (!getClassifier().classify(theEx)) {
return NO_RETRIES_OR_DELAY_BACKOFF;
}
return this.userBackOffFunction.apply(rec, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class ListenerContainerFactoryConfigurer {

private Class<? extends Exception>[] blockingExceptionTypes = null;

private boolean retainStandardFatal;

private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
};

Expand Down Expand Up @@ -141,6 +143,16 @@ public final void setBlockingRetryableExceptions(Class<? extends Exception>... e
this.blockingExceptionTypes = Arrays.copyOf(exceptionTypes, exceptionTypes.length);
}

/**
* Set to true to retain standard fatal exceptions as not retryable when configuring
* blocking retries.
* @param retainStandardFatal true to retain standard fatal exceptions.
* @since 3.0
*/
public void setRetainStandardFatal(boolean retainStandardFatal) {
this.retainStandardFatal = retainStandardFatal;
}

public void setContainerCustomizer(Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer) {
Assert.notNull(containerCustomizer, "'containerCustomizer' cannot be null");
this.containerCustomizer = containerCustomizer;
Expand All @@ -153,7 +165,7 @@ public void setErrorHandlerCustomizer(Consumer<DefaultErrorHandler> errorHandler
protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer,
Configuration configuration) {
DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer);
errorHandler.defaultFalse();
errorHandler.defaultFalse(this.retainStandardFatal);
errorHandler.setCommitRecovered(true);
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
if (this.blockingExceptionTypes != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private void processDeadLetterPublishingContainerFactory(
CustomizersConfigurer customizersConfigurer = new CustomizersConfigurer();
configureCustomizers(customizersConfigurer);
JavaUtils.INSTANCE
.acceptIfNotNull(customizersConfigurer.deadLetterPublishingRecovererCustomizer,
.acceptIfNotNull(customizersConfigurer.getDeadLetterPublishingRecovererCustomizer(),
deadLetterPublishingRecovererFactory::setDeadLetterPublishingRecovererCustomizer);
Consumer<DeadLetterPublishingRecovererFactory> dlprfConsumer = configureDeadLetterPublishingContainerFactory();
Assert.notNull(dlprfConsumer, "configureDeadLetterPublishingContainerFactory must not return null");
Expand All @@ -159,24 +159,28 @@ protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPubl

/**
* Internal method for processing the {@link ListenerContainerFactoryConfigurer}.
* Consider overriding {@link #configureListenerContainerFactoryConfigurer()}
* if further customization is required.
* @param listenerContainerFactoryConfigurer the {@link ListenerContainerFactoryConfigurer} instance.
* Consider overriding {@link #configureListenerContainerFactoryConfigurer()} if
* further customization is required.
* @param listenerContainerFactoryConfigurer the
* {@link ListenerContainerFactoryConfigurer} instance.
*/
private void processListenerContainerFactoryConfigurer(ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer) {
private void processListenerContainerFactoryConfigurer(
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer) {

CustomizersConfigurer customizersConfigurer = new CustomizersConfigurer();
configureCustomizers(customizersConfigurer);
BlockingRetriesConfigurer blockingRetriesConfigurer = new BlockingRetriesConfigurer();
configureBlockingRetries(blockingRetriesConfigurer);
JavaUtils.INSTANCE
.acceptIfNotNull(blockingRetriesConfigurer.backOff,
.acceptIfNotNull(blockingRetriesConfigurer.getBackOff(),
listenerContainerFactoryConfigurer::setBlockingRetriesBackOff)
.acceptIfNotNull(blockingRetriesConfigurer.retryableExceptions,
.acceptIfNotNull(blockingRetriesConfigurer.getRetryableExceptions(),
listenerContainerFactoryConfigurer::setBlockingRetryableExceptions)
.acceptIfNotNull(customizersConfigurer.errorHandlerCustomizer,
.acceptIfNotNull(customizersConfigurer.getErrorHandlerCustomizer(),
listenerContainerFactoryConfigurer::setErrorHandlerCustomizer)
.acceptIfNotNull(customizersConfigurer.listenerContainerCustomizer,
.acceptIfNotNull(customizersConfigurer.getListenerContainerCustomizer(),
listenerContainerFactoryConfigurer::setContainerCustomizer);
listenerContainerFactoryConfigurer.setRetainStandardFatal(true);
Consumer<ListenerContainerFactoryConfigurer> lcfcConfigurer = configureListenerContainerFactoryConfigurer();
Assert.notNull(lcfcConfigurer, "configureListenerContainerFactoryConfigurer must not return null.");
lcfcConfigurer.accept(listenerContainerFactoryConfigurer);
Expand Down Expand Up @@ -341,6 +345,15 @@ public BlockingRetriesConfigurer backOff(BackOff backoff) {
this.backOff = backoff;
return this;
}

BackOff getBackOff() {
return this.backOff;
}

Class<? extends Exception>[] getRetryableExceptions() {
return this.retryableExceptions;
}

}

/**
Expand Down Expand Up @@ -387,6 +400,19 @@ public CustomizersConfigurer customizeDeadLetterPublishingRecoverer(Consumer<Dea
this.deadLetterPublishingRecovererCustomizer = dlprCustomizer;
return this;
}

Consumer<DefaultErrorHandler> getErrorHandlerCustomizer() {
return this.errorHandlerCustomizer;
}

Consumer<ConcurrentMessageListenerContainer<?, ?>> getListenerContainerCustomizer() {
return this.listenerContainerCustomizer;
}

Consumer<DeadLetterPublishingRecoverer> getDeadLetterPublishingRecovererCustomizer() {
return this.deadLetterPublishingRecovererCustomizer;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
Expand Down Expand Up @@ -898,4 +899,22 @@ void nonCompliantProducerFactory() throws Exception {
assertThat(timeoutCaptor.getValue()).isEqualTo(Duration.ofSeconds(125).toMillis());
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void blockingRetryRuntimeException() {
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
CompletableFuture future = mock(CompletableFuture.class);
given(template.send(any(ProducerRecord.class))).willReturn(future);
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.defaultFalse(true);
recoverer.addRetryableExceptions(RuntimeException.class);
recoverer.accept(record, new ListenerExecutionFailedException("test", "group",
new TimestampedException(
new ListenerExecutionFailedException("test", new ConversionException("test", null)))));
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
verify(template).send(producerRecordCaptor.capture());
ProducerRecord outRecord = producerRecordCaptor.getValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

/**
* @author Tomaz Fernandes
* @author Gary Russell
* @since 2.7
*/
@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -295,6 +296,42 @@ void shouldUseGivenBackOffAndExceptions() {

}

@Test
void shouldUseGivenBackOffAndExceptionsKeepStandard() {

// given
given(container.getContainerProperties()).willReturn(containerProperties);
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
given(containerProperties.getMessageListener()).willReturn(listener);
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
BackOff backOffMock = mock(BackOff.class);
BackOffExecution backOffExecutionMock = mock(BackOffExecution.class);
given(backOffMock.start()).willReturn(backOffExecutionMock);

ListenerContainerFactoryConfigurer configurer =
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
deadLetterPublishingRecovererFactory, clock);
configurer.setBlockingRetriesBackOff(backOffMock);
configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);
configurer.setRetainStandardFatal(true);

// when
KafkaListenerContainerFactory<?> decoratedFactory =
configurer.decorateFactory(this.containerFactory, configuration.forContainerFactoryConfigurer());
decoratedFactory.createListenerContainer(endpoint);

// then
then(backOffMock).should().start();
then(container).should().setCommonErrorHandler(errorHandlerCaptor.capture());
CommonErrorHandler errorHandler = errorHandlerCaptor.getValue();
assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue();
DefaultErrorHandler defaultErrorHandler = (DefaultErrorHandler) errorHandler;
assertThat(defaultErrorHandler.removeClassification(IllegalArgumentException.class)).isTrue();
assertThat(defaultErrorHandler.removeClassification(IllegalStateException.class)).isTrue();
assertThat(defaultErrorHandler.removeClassification(ConversionException.class)).isFalse();

}

@Test
void shouldThrowIfBackOffOrRetryablesAlreadySet() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetrie
then(lcfc).should().setErrorHandlerCustomizer(errorHandlerCustomizer);
assertThatThrownBy(lcfc::setBlockingRetryableExceptions).isInstanceOf(IllegalStateException.class);
then(lcfc).should().setBlockingRetriesBackOff(backoff);
then(lcfc).should().setRetainStandardFatal(true);
then(dlprfCustomizer).should().accept(dlprf);
then(rtconfigurer).should().accept(topicConfigurer);
then(lcfcConsumer).should().accept(lcfc);
Expand Down