diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 7fe587fb2a..d0d3b8e9c9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2212,6 +2212,10 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords recor invokeBatchErrorHandler(records, recordList, e); commitOffsetsIfNeededAfterHandlingError(records); } + catch (RecordInRetryException rire) { + this.logger.info("Record in retry and not yet recovered"); + return rire; + } catch (KafkaException ke) { ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); return ke; @@ -2715,6 +2719,10 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco invokeErrorHandler(cRecord, iterator, e); commitOffsetsIfNeededAfterHandlingError(cRecord); } + catch (RecordInRetryException rire) { + this.logger.info("Record in retry and not yet recovered"); + return rire; + } catch (KafkaException ke) { ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); return ke; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInRetryException.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInRetryException.java new file mode 100644 index 0000000000..0673b0f006 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInRetryException.java @@ -0,0 +1,46 @@ +/* + * Copyright 2024-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import javax.annotation.Nullable; + +import org.springframework.core.NestedRuntimeException; + +/** + * Internal {@link NestedRuntimeException} that is used as an exception thrown + * when the record is in retry and not yet recovered during error handling. + * This is to prevent the record from being prematurely committed in the middle of a retry. + * + * Intended only for framework use and thus the package-protected access. + * + * @author Soby Chacko + * @since 3.3.0 + */ +@SuppressWarnings("serial") +class RecordInRetryException extends NestedRuntimeException { + + /** + * Package protected constructor to create an instance with the provided properties. + * + * @param message logging message + * @param cause {@link Throwable} + */ + RecordInRetryException(String message, @Nullable Throwable cause) { + super(message, cause); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java index 4a4aa10419..332294357b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java @@ -32,7 +32,6 @@ import org.springframework.core.NestedRuntimeException; import org.springframework.core.log.LogAccessor; -import org.springframework.kafka.KafkaException; import org.springframework.kafka.KafkaException.Level; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.support.KafkaUtils; @@ -46,6 +45,7 @@ * @author Gary Russell * @author Francois Rosiere * @author Wang Zhiyang + * @author Soby Chacko * @since 2.2 * */ @@ -224,7 +224,7 @@ public static void seekOrRecover(Exception thrownException, @Nullable List record, Exception original, Exce List> records = Arrays.asList(record1, record2); IllegalStateException illegalState = new IllegalStateException(); Consumer consumer = mock(Consumer.class); - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> handler.handleRemaining(illegalState, records, + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> handler.handleRemaining(illegalState, records, consumer, mock(MessageListenerContainer.class))) .withCause(illegalState); handler.handleRemaining(new DeserializationException("intended", null, false, illegalState), records, @@ -214,7 +214,7 @@ void testEarlyExitBackOff() { MessageListenerContainer container = mock(MessageListenerContainer.class); given(container.isRunning()).willReturn(false); long t1 = System.currentTimeMillis(); - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> handler.handleRemaining(illegalState, + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> handler.handleRemaining(illegalState, records, consumer, container)); assertThat(System.currentTimeMillis() < t1 + 5_000); } @@ -230,7 +230,7 @@ void testNoEarlyExitBackOff() { MessageListenerContainer container = mock(MessageListenerContainer.class); given(container.isRunning()).willReturn(true); long t1 = System.currentTimeMillis(); - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> handler.handleRemaining(illegalState, + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> handler.handleRemaining(illegalState, records, consumer, container)); assertThat(System.currentTimeMillis() >= t1 + 200); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java index 22f5c38d21..382b6eb944 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java @@ -43,13 +43,13 @@ import org.springframework.core.log.LogAccessor; import org.springframework.data.util.DirectFieldAccessFallbackBeanWrapper; -import org.springframework.kafka.KafkaException; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff; /** * @author Gary Russell * @author Francois Rosiere + * @author Soby Chacko * @since 3.0.3 * */ @@ -118,10 +118,10 @@ void testExceptionDuringCommit() { willThrow(new RebalanceInProgressException("rebalance in progress")).given(consumer).commitSync(anyMap(), any()); final MessageListenerContainer mockMLC = mock(MessageListenerContainer.class); willReturn(new ContainerProperties("topic")).given(mockMLC).getContainerProperties(); - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> testFBP.handle(new BatchListenerFailedException("topic", rec2), records, consumer, mockMLC, mock(Runnable.class)) - ).withMessage("Seek to current after exception"); + ).withMessage("Record in retry and not yet recovered"); } static class TestFBP extends FailedBatchProcessor { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java index abff94e5a6..1c68947dd1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java @@ -52,7 +52,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.springframework.kafka.KafkaException; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaOperations; @@ -70,6 +69,7 @@ /** * @author Gary Russell + * @author Soby Chacko * @since 2.2 * */ @@ -180,7 +180,7 @@ public void seekToCurrentErrorHandlerRecovers() { records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); Consumer consumer = mock(Consumer.class); - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer).seek(new TopicPartition("foo", 0), 0L); verifyNoMoreInteractions(consumer); @@ -227,14 +227,14 @@ public void recoveryFailed(ConsumerRecord record, Exception original, Exce records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); Consumer consumer = mock(Consumer.class); - assertThatExceptionOfType(KafkaException.class).isThrownBy( + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy( () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer).seek(new TopicPartition("foo", 0), 0L); verifyNoMoreInteractions(consumer); - assertThatExceptionOfType(KafkaException.class).isThrownBy( + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy( () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L); - assertThatExceptionOfType(KafkaException.class).isThrownBy( + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy( () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer, times(3)).seek(new TopicPartition("foo", 0), 0L); eh.handleRemaining(new RuntimeException(), records, consumer, null); @@ -267,11 +267,11 @@ public void seekToCurrentErrorHandlerRecovererFailsBackOffNotReset() { records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); Consumer consumer = mock(Consumer.class); - assertThatExceptionOfType(KafkaException.class).isThrownBy( + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy( () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer).seek(new TopicPartition("foo", 0), 0L); verifyNoMoreInteractions(consumer); - assertThatExceptionOfType(KafkaException.class).isThrownBy( + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy( () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L); eh.handleRemaining(new RuntimeException(), records, consumer, null); // immediate re-attempt recovery @@ -308,7 +308,7 @@ private void seekToCurrentErrorHandlerRecoversManualAcks(boolean syncCommits) { OffsetCommitCallback commitCallback = (offsets, ex) -> { }; properties.setCommitCallback(commitCallback); given(container.getContainerProperties()).willReturn(properties); - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> eh.handleRemaining(new RuntimeException(), records, consumer, container)); verify(consumer).seek(new TopicPartition("foo", 0), 0L); verify(consumer).seek(new TopicPartition("foo", 1), 0L); @@ -340,7 +340,7 @@ public void testNeverRecover() { records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); Consumer consumer = mock(Consumer.class); for (int i = 0; i < 20; i++) { - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); } verify(consumer, times(20)).seek(new TopicPartition("foo", 0), 0L);