-
Notifications
You must be signed in to change notification settings - Fork 1.7k
GH-2195: DefaultErrorHandler Improvements #2207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks for me too bug to be considered for back-porting.
Why just not aim the fix for 3.0
and feel free from possible breaking changes?
Thanks
I agree; maybe this is the final driver to create a 2.9 branch; which we can release sooner (May). There is a big performance problem when using the retryable topic - see the discussions on the issue, |
@garyrussell, I began looking into this today and should continue tomorrow - there's a lot going on in parts of the code I'm not that familiar with. Overall I think the solution looks great, just want to get a better grasp at it. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add anything to the docs for this change?
Thanks
spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java
Outdated
Show resolved
Hide resolved
...ng-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchAckTests.java
Outdated
Show resolved
Hide resolved
So, is this OK now for merging? |
Let @tomazfernandes have a few more days to review; maybe merge on Monday? |
That would work nicely for me, thanks! |
We still need to seek on the retry containers because the delay is enabled by pausing the partitions. I guess you should only configure this on the main container and use a small |
It would add a little more complexity (in the container), but maybe we could come up with a hybrid approach - perform the seeks if listener throws a But that can be another PR. |
I think the ideal scenario there would be, even for retry containers: if we process a record, let's say it fails and goes through recovery logic, which succeeds (message sent to next topic) . If the next record for that container does not trigger a backoff (it's already past its due time), we shouldn't need to seek again. Makes sense? If it does backoff, recovery does not succeed, and the BackOffManager pauses the partition before any of this logic is executed. I'm not sure where in the PR the partition is resumed, but if it does not resume when it didn't pause, and don't serve the failed record while paused, maybe we're good? |
I think the trick here is understanding the interaction of this new logic with partition pausing. If we do seek in case of a But we might consider filtering out any records that are from a currently paused partition before serving them again, this way we don't need to seek all partitions due to one backing off. |
So summing up, I think that maybe rather than "coupling" the container to the Sorry I kind of |
Just to remember, the |
This clearly needs some more thought; so definitely not in the scope of this PR. |
Sure! I was really just thinking out loud, sorry about that. I took a better look at these changes yesterday and they look good to me, sure will increase performance a lot when dealing with errors, and also are defensive enough that shouldn't be a problem when disabled. I'm wondering, considering we'll be having milestone exposure to this, DYT maybe it's worth it to leave it on by default so we get more feedback? If there's any problem users can just disable it, and then we can release a new milestone with the fix. Maybe I'm missing something though - perhaps it's a too big behavior change for a minor release for example. Thanks |
That's an interesting thought; but I think we need to resolve the issue with the retry containers first because, if it is used there, yes, we will still pause the relevant partition, but there may be additional records from that partition in the remaining records. We'd need to week those out somehow, and only seek the paused partition. |
Sure. Also, that might impact users pausing partitions themselves. But I'm sure we'll work this out soon enough. As far as retryable topics go, we could disable it in RT setup if necessary. |
...ng-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
Outdated
Show resolved
Hide resolved
Resolves spring-projects#2195 Add an option to avoid seeks after handling exceptions. Instead, pause the consumer for one `poll()` and use the remaining records as the result of that poll. New methods on `CommonErrorHandler` - `handleOne` for record listeners, returning a boolean to indicate whether the record was recovered and should not be redelivered. `handlaBatchAndReturnRemaining` for batch listeners, returning either the complete set or a subset, e.g. when the `DEH` receives a `BatchListenerExecutionFailedException` and commits a partial batch. Also includes the classifier refactoring discussed here spring-projects#2185 (comment) The new logic is disabled by default, we can consider enabling it in 3.0 and remove the deprecations.
…ve paused after the error.
Co-authored-by: Artem Bilan <[email protected]>
- move the resume logic to after the invokes and don't resume if pending records - don't check `isPaused()` after empty poll due to errors; always restore the pending records
…ers while the test thread is calling revoke/assign.
...ng-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
invokeIfHaveRecords(records); | ||
if (this.pendingRecordsAfterError == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought. Since we're not committed to listening to pause
/ resume
calls while in this logic, I wonder if maybe we should also ignore it for partition pausing / resuming, and instead handle back offs relying on the KafkaBackOffException
itself. The exception contains information such as the TopicPartition
that was backed off. Of course, this or any other solution can be part of a different PR.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - we already know we need to come up with a solution for the retry containers if the primary container has this option set; we can cover this in another PR after M1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, at this point I'm more nit picking about implementation details 😄 I think this looks great and will surely be a noticeable gain in performance regarding error handling. This was a really interesting journey into this logic, which is maybe the framework's heart, so thanks a lot @garyrussell for the opportunity. Thanks. |
Sorry, this does not back-port to Thanks |
...cherry-picked to 2.9.x after resolving conflicts; also removed deprecated error handlers. |
Resolves #2195
Add an option to avoid seeks after handling exceptions.
Instead, pause the consumer for one
poll()
and use the remaining records as theresult of that poll.
New methods on
CommonErrorHandler
-handleOne
for record listeners, returninga boolean to indicate whether the record was recovered and should not be redelivered.
handlaBatchAndReturnRemaining
for batch listeners, returning either the completeset or a subset, e.g. when the
DEH
receives aBatchListenerExecutionFailedException
and commits a partial batch.
Also includes the classifier refactoring discussed here
#2185 (comment)
The new logic is disabled by default, we can consider enabling it in 3.0 and remove
the deprecations.