Skip to content

Conversation

tomazfernandes
Copy link
Contributor

Fixes #2184

I also looked into leveraging KafkaException's self logging features here, but in order for it to work we'd need to make a few adjustments to KafkaMessageListenerContainer#decorateException, and that looks like too much change for a bugfix to me.

I still think we could have some configurable feature where a KafkaException could look into its causes, and use the deepest KE log level to self log. This way we could for example throw the original KafkaBackOffException as DEBUG (or what the user configures), and leave all other exceptions as ERROR (or what the user configures), and no matter how many wrappings it goes through it'd still be logged as DEBUG.

Not sure if that's something that would be useful for scenarios other than KBE though.

Please let me know if there's anything to change.

Thanks

@v-chernyshev
Copy link

The change looks good to me.

Out of curiosity, why does getRecoveryStrategy behave differently depending on the result of getClassifier().classify? If the result is true then a functor that can perform the recovery is returned (so there are no immediate side effects). In case of a false outcome, however, the first record is recovered immediately, which is a bit unexpected given the method name.

@tomazfernandes
Copy link
Contributor Author

@v-chernyshev thanks for looking into this. I don't have a proper answer to your question, so I defer to @garyrussell on this one.

@garyrussell
Copy link
Contributor

It's historical; before 2.7, the method was called getSkipPredicate() which returned FailedRecordTracker.skip() (for true) and ALWAYS_SKIP_PREDICATE (or NEVER... if recovery failed) for false (but it also did the immediate recovery for not retryable exceptions).

On reflection, it could (and maybe should) be refactored to use the newer setBackOffFunction to return a FixedBackOff(0L, 0L) for exceptions that are classified as not retryable.

@tomazfernandes
Copy link
Contributor Author

I agree there's room for streamlining this code a bit, and it would help keeping consistency between the various scenarios.

@garyrussell
Copy link
Contributor

Right, but not in this PR; I'll add an issue to the backlog.

@garyrussell
Copy link
Contributor

I think we should consider not calling the retry listeners for these exceptions, but that would be a different issue/PR.

@garyrussell garyrussell merged commit 448871a into spring-projects:main Mar 24, 2022
@garyrussell
Copy link
Contributor

...and back-ported to 2.8.x as da0b820

@tomazfernandes
Copy link
Contributor Author

tomazfernandes commented Mar 24, 2022

Right, but not in this PR; I'll add an issue to the backlog.

Sure. I was thinking something along these lines:
tomazfernandes@e045819

Would still need to nitpick if there's any behavior change, but all tests are passing, and we make sure that all scenarios are using the same recovery logic.

A possible improvement would be having the exception classification inside the FailedRecordTracker class, but we'd have to either pass the classification as an argument (would require signature change) or having it implement ExceptionClassifier and maybe delegate all method calls from FailedRecordProcessor. Looks like to much for too little to me.

Of course, this is just a suggestion, if anyone else has a different idea that's ok.

Thanks.

@tomazfernandes
Copy link
Contributor Author

A version without the inner class - it has less changes and should be better to review.
tomazfernandes@9db49ee

@garyrussell
Copy link
Contributor

I was thinking something like this...

% git diff
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java
index fa2a6071..3c6198f0 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java
@@ -28,7 +28,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.core.log.LogAccessor;
 import org.springframework.kafka.support.TopicPartitionOffset;
 import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
 import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.FixedBackOff;
 
 /**
  * Common super class for classes that deal with failing to consume a consumer record.
@@ -43,14 +45,27 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen
 
        private static final BiPredicate<ConsumerRecord<?, ?>, Exception> NEVER_SKIP_PREDICATE = (r, e) -> false;
 
+       private static final BackOff NO_RETRIES_OR_DELAY_BACKOFF = new FixedBackOff(0L, 0L);
+
+       private final BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> noRetriesForClassified =
+                       (rec, ex) -> {
+                               if (!getClassifier().classify(ex)) {
+                                       return NO_RETRIES_OR_DELAY_BACKOFF;
+                               }
+                               return this.userBackOffFunction.apply(rec, ex);
+                       };
+
        protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR
 
        private final FailedRecordTracker failureTracker;
 
        private boolean commitRecovered;
 
+       private BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> userBackOffFunction = (rec, ex) -> null;
+
        protected FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
                this.failureTracker = new FailedRecordTracker(recoverer, backOff, this.logger);
+               this.failureTracker.setBackOffFunction(this.noRetriesForClassified);
        }
 
        /**
@@ -77,7 +92,8 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen
         * @since 2.6
         */
        public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
-               this.failureTracker.setBackOffFunction(backOffFunction);
+               Assert.notNull(backOffFunction, "'backOffFunction' cannot be null");
+               this.userBackOffFunction = backOffFunction;
        }
 
        /**

But your way works too.

@tomazfernandes
Copy link
Contributor Author

Sure, this looks good too, and has the benefit of not having to expose the recovery logic to the FailedRecordProcessor.

I'm ok with both approaches, as both will let us have a single recovery logic for all paths.

So go ahead with yours if you prefer, no worries, or let me know and I can wrap up mine and open a PR.

garyrussell added a commit to garyrussell/spring-kafka that referenced this pull request Apr 4, 2022
Resolves spring-projects#2195

Add an option to avoid seeks after handling exceptions.

Instead, pause the consumer for one `poll()` and use the remaining records as the
result of that poll.

New methods on `CommonErrorHandler` - `handleOne` for record listeners, returning
a boolean to indicate whether the record was recovered and should not be redelivered.

`handlaBatchAndReturnRemaining` for batch listeners, returning either the complete
set or a subset, e.g. when the `DEH` receives a `BatchListenerExecutionFailedException`
and commits a partial batch.

Also includes the classifier refactoring discussed here
spring-projects#2185 (comment)
garyrussell added a commit to garyrussell/spring-kafka that referenced this pull request Apr 4, 2022
Resolves spring-projects#2195

Add an option to avoid seeks after handling exceptions.

Instead, pause the consumer for one `poll()` and use the remaining records as the
result of that poll.

New methods on `CommonErrorHandler` - `handleOne` for record listeners, returning
a boolean to indicate whether the record was recovered and should not be redelivered.

`handlaBatchAndReturnRemaining` for batch listeners, returning either the complete
set or a subset, e.g. when the `DEH` receives a `BatchListenerExecutionFailedException`
and commits a partial batch.

Also includes the classifier refactoring discussed here
spring-projects#2185 (comment)

The new logic is disabled by default, we can consider enabling it in 3.0 and remove
the deprecations.
garyrussell added a commit to garyrussell/spring-kafka that referenced this pull request Apr 4, 2022
Resolves spring-projects#2195

Add an option to avoid seeks after handling exceptions.

Instead, pause the consumer for one `poll()` and use the remaining records as the
result of that poll.

New methods on `CommonErrorHandler` - `handleOne` for record listeners, returning
a boolean to indicate whether the record was recovered and should not be redelivered.

`handlaBatchAndReturnRemaining` for batch listeners, returning either the complete
set or a subset, e.g. when the `DEH` receives a `BatchListenerExecutionFailedException`
and commits a partial batch.

Also includes the classifier refactoring discussed here
spring-projects#2185 (comment)

The new logic is disabled by default, we can consider enabling it in 3.0 and remove
the deprecations.
garyrussell added a commit to garyrussell/spring-kafka that referenced this pull request Apr 6, 2022
Resolves spring-projects#2195

Add an option to avoid seeks after handling exceptions.

Instead, pause the consumer for one `poll()` and use the remaining records as the
result of that poll.

New methods on `CommonErrorHandler` - `handleOne` for record listeners, returning
a boolean to indicate whether the record was recovered and should not be redelivered.

`handlaBatchAndReturnRemaining` for batch listeners, returning either the complete
set or a subset, e.g. when the `DEH` receives a `BatchListenerExecutionFailedException`
and commits a partial batch.

Also includes the classifier refactoring discussed here
spring-projects#2185 (comment)

The new logic is disabled by default, we can consider enabling it in 3.0 and remove
the deprecations.
garyrussell added a commit to garyrussell/spring-kafka that referenced this pull request Apr 11, 2022
Resolves spring-projects#2195

Add an option to avoid seeks after handling exceptions.

Instead, pause the consumer for one `poll()` and use the remaining records as the
result of that poll.

New methods on `CommonErrorHandler` - `handleOne` for record listeners, returning
a boolean to indicate whether the record was recovered and should not be redelivered.

`handlaBatchAndReturnRemaining` for batch listeners, returning either the complete
set or a subset, e.g. when the `DEH` receives a `BatchListenerExecutionFailedException`
and commits a partial batch.

Also includes the classifier refactoring discussed here
spring-projects#2185 (comment)

The new logic is disabled by default, we can consider enabling it in 3.0 and remove
the deprecations.
garyrussell added a commit to garyrussell/spring-kafka that referenced this pull request Apr 12, 2022
Resolves spring-projects#2195

Add an option to avoid seeks after handling exceptions.

Instead, pause the consumer for one `poll()` and use the remaining records as the
result of that poll.

New methods on `CommonErrorHandler` - `handleOne` for record listeners, returning
a boolean to indicate whether the record was recovered and should not be redelivered.

`handlaBatchAndReturnRemaining` for batch listeners, returning either the complete
set or a subset, e.g. when the `DEH` receives a `BatchListenerExecutionFailedException`
and commits a partial batch.

Also includes the classifier refactoring discussed here
spring-projects#2185 (comment)

The new logic is disabled by default, we can consider enabling it in 3.0 and remove
the deprecations.
artembilan pushed a commit that referenced this pull request Apr 13, 2022
Resolves #2195

Add an option to avoid seeks after handling exceptions.

Instead, pause the consumer for one `poll()` and use the remaining records as the
result of that poll.

New methods on `CommonErrorHandler` - `handleOne` for record listeners, returning
a boolean to indicate whether the record was recovered and should not be redelivered.

`handlaBatchAndReturnRemaining` for batch listeners, returning either the complete
set or a subset, e.g. when the `DEH` receives a `BatchListenerExecutionFailedException`
and commits a partial batch.

Also includes the classifier refactoring discussed here
#2185 (comment)

The new logic is disabled by default, we can consider enabling it in 3.0 and remove
the deprecations.

* Fix race - do not call `resume()` on the container; the user might have paused after the error.

* Change Since to 2.9.

* Fix typos.

Co-authored-by: Artem Bilan <[email protected]>

* Remove unnecessary local variable; add docs.

* Polishing - see commit comment for more details

- move the resume logic to after the invokes and don't resume if pending records
- don't check `isPaused()` after empty poll due to errors; always restore the pending records

* Remove unnecessary boolean; fix deprecation warnings and delegating error handlers.

* Emergency stop container if the consumer returns records while paused after an error.

* Fix race in test - prevent consumer thread from changing pausedConsumers while the test thread is calling revoke/assign.

* Remove System.out().

* Add diagnostics to test.

* Fix race in test; wait until next poll after consumer thread pauses the partitions.

* Fix stubbing in emergency stop test.

* Remove unnecessary boolean.

**Cherry-pick to `2.9.x`**
garyrussell added a commit that referenced this pull request Apr 14, 2022
Resolves #2195

Add an option to avoid seeks after handling exceptions.

Instead, pause the consumer for one `poll()` and use the remaining records as the
result of that poll.

New methods on `CommonErrorHandler` - `handleOne` for record listeners, returning
a boolean to indicate whether the record was recovered and should not be redelivered.

`handlaBatchAndReturnRemaining` for batch listeners, returning either the complete
set or a subset, e.g. when the `DEH` receives a `BatchListenerExecutionFailedException`
and commits a partial batch.

Also includes the classifier refactoring discussed here
#2185 (comment)

The new logic is disabled by default, we can consider enabling it in 3.0 and remove
the deprecations.

* Fix race - do not call `resume()` on the container; the user might have paused after the error.

* Change Since to 2.9.

* Fix typos.

Co-authored-by: Artem Bilan <[email protected]>

* Remove unnecessary local variable; add docs.

* Polishing - see commit comment for more details

- move the resume logic to after the invokes and don't resume if pending records
- don't check `isPaused()` after empty poll due to errors; always restore the pending records

* Remove unnecessary boolean; fix deprecation warnings and delegating error handlers.

* Emergency stop container if the consumer returns records while paused after an error.

* Fix race in test - prevent consumer thread from changing pausedConsumers while the test thread is calling revoke/assign.

* Remove System.out().

* Add diagnostics to test.

* Fix race in test; wait until next poll after consumer thread pauses the partitions.

* Fix stubbing in emergency stop test.

* Remove unnecessary boolean.

**Cherry-pick to `2.9.x`**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RetryableTopic - breaking changes in the error handler

3 participants