diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaBackOffAwareErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaBackOffAwareErrorHandler.java new file mode 100644 index 0000000000..17cf637af1 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaBackOffAwareErrorHandler.java @@ -0,0 +1,63 @@ +/* + * Copyright 2016-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.List; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import org.springframework.kafka.KafkaException; +import org.springframework.util.backoff.BackOff; + +/** + * An error handler prepared to handle a {@link KafkaBackoffException} + * thrown by the listener. + * + * @author Tomaz Fernandes + * @since 2.8.2 + * + */ +public class KafkaBackOffAwareErrorHandler extends DefaultErrorHandler { + + public KafkaBackOffAwareErrorHandler() { + } + + public KafkaBackOffAwareErrorHandler(BackOff backOff) { + super(backOff); + } + + public KafkaBackOffAwareErrorHandler(ConsumerRecordRecoverer recoverer) { + super(recoverer); + } + + public KafkaBackOffAwareErrorHandler(ConsumerRecordRecoverer recoverer, BackOff backOff) { + super(recoverer, backOff); + } + + @Override + public void handleRemaining(Exception thrownException, List> records, Consumer consumer, MessageListenerContainer container) { + SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR + getRecoveryStrategy(records, thrownException), this.logger, SeekUtils.isBackoffException(thrownException) + ? getKafkaBackOffExceptionLogLevel() + : getLogLevel()); + } + + protected KafkaException.Level getKafkaBackOffExceptionLogLevel() { + return KafkaException.Level.DEBUG; + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaExceptionLogLevelAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaExceptionLogLevelAware.java index 14d29347b8..b5ac18792a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaExceptionLogLevelAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaExceptionLogLevelAware.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors. + * Copyright 2020-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ public void setLogLevel(KafkaException.Level logLevel) { } /** - * Set the level at which the exception thrown by this handler is logged. + * Get the level at which the exception thrown by this handler is logged. * @return the level. */ protected KafkaException.Level getLogLevel() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index b25ad95109..1b62781d75 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -2495,12 +2495,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord recor commitOffsetsIfNeeded(record); } catch (KafkaException ke) { - if (ke.contains(KafkaBackoffException.class)) { - this.logger.warn(ke.getMessage()); - } - else { - ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); - } + ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); return ke; } catch (RuntimeException ee) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java index 5d4953c427..363fb38ecf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.listener.KafkaBackOffAwareErrorHandler; import org.springframework.kafka.listener.KafkaConsumerBackoffManager; import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.util.Assert; @@ -141,7 +142,7 @@ public void setErrorHandlerCustomizer(Consumer errorHandlerC } private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { - DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer, + DefaultErrorHandler errorHandler = new KafkaBackOffAwareErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 0)); errorHandler.setCommitRecovered(true); this.errorHandlerCustomizer.accept(errorHandler);