Skip to content

Conversation

realcbb
Copy link
Contributor

@realcbb realcbb commented Mar 2, 2018

In situations like committing acks while consumer group has already rebalanced, and the container error handler is instance of ConsumerAwareErrorHandler or ConsumerAwareBatchErrorHandler, the error handler will throw an exception which would not be caught. Then the consumer will be dead, it can not receive messages any more.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, indeed!

Can we have some test-case though to cover this situation?

Also, be sure to run gradlew check to see the Checkstyle violations.

Thanks

@realcbb
Copy link
Contributor Author

realcbb commented Mar 3, 2018

@artembilan Sorry for checkstyle violations. And i have rethought these questions.

  1. How about just make the error logs, but not let the container error handler deal with the exception thrown from the top try block in the while loop? Should user-defined container error handler handle exceptions on this level?
    Besides, some error handlers eg. ContainerAwareErrorHandler will still just throw an UnsupportedOperationException even if the code was edited like me do in the pull request, and the real original exception details will even not be logged.

  2. How about make ContainerAwareErrorHandler extend ConsumerAwareErrorHandler rather than RemainingRecordsErrorHandler, or how about make RemainingRecordsErrorHandler extend ContainerAwareErrorHandler rather than the opposite? In my opinion, ContainerAwareErrorHandler has no necessary to deal with remaining records, and SeekToCurrentErrorHandler should implement RemainingRecordsErrorHandler rather than ContainerAwareErrorHandler.

@artembilan
Copy link
Member

Thank you for the update @realcbb !
But I still don't see any test-cases to cover the problem.

I can't say anything (yet) about your suggestions, but that definitely looks like a separate issue.
More over sounds like a breaking change and we can't do that in the current point release.

Anyway such a decision I deffer to @garyrussell

@realcbb
Copy link
Contributor Author

realcbb commented Mar 6, 2018

I didn't supplement the test-cases yet because of the suggestion 1 I mentioned in the previous comment.
I mean, is it better like this,

while (isRunning()) {
    try {
        ...
    }
    catch (WakeupException e) {
        // Ignore, we're stopping
    }
    catch (NoOffsetForPartitionException nofpe) {
        this.fatalError = true;
        ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
        break;
    }
    catch (Exception e) {
        ////////////////// Do not let error handler handle exceptions here
        this.logger.error("Container exception", ex);
    }
}

We have many error handlers now, ErrorHandler, ConsumerAwareErrorHandler and ContainerAwareErrorHandler for example, each one has its own handle method for some purpose. If we use error handler to handle exceptions here, we should tell users which specific handle method is used to handle such kind of exceptions.
More over, I think user-defined error handler should not be responsible for exceptions on that level.
May I have your opinions? @artembilan @garyrussell

For suggestion 2 I mentioned in the previous comment, it's another question. Maybe I should open a new issue to talk about that.

@artembilan
Copy link
Member

Yeah... You know if your first suggestion is going to be behavior change and that is not what we can allow ourselves to do in the point release: we just can't introduce some breaking change here even if it sounds reasonable.

The next minor version is a good place to reconsider these refactorings. Right now I would just concentrate on the bug fix and raise appropriate issues for the next version.

Therefore we need a test-case to cover what you explain in the PR header. That's all.
Although I really would ask to add such a comment to the commit message - for better traceability.

Thanks for understanding.

@realcbb
Copy link
Contributor Author

realcbb commented Mar 12, 2018

@artembilan
Sorry for so late. I just supplement the test-case, but there are some conflicts to be resolved. I have no write permission. Does it need your help?

@realcbb
Copy link
Contributor Author

realcbb commented Mar 12, 2018

I updated and committed it again. But it said that there are still conflicts.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To solve collisions you have to rebase your fix-1 branch against the latest upstream master:

git checkout master
git pull upstream master
git checkout -
git rebase master
git push origin fix-1 -f

@realcbb
Copy link
Contributor Author

realcbb commented Mar 16, 2018

@artembilan Thanks! Please check it.

@artembilan artembilan merged commit 6bd58b2 into spring-projects:master Mar 16, 2018
@artembilan
Copy link
Member

Thank you!

Looking forward for more!

if (containerErrorHandler != null) {
if (containerErrorHandler instanceof ConsumerAwareErrorHandler
|| containerErrorHandler instanceof ConsumerAwareBatchErrorHandler) {
containerErrorHandler.handle(e, null, this.consumer);
Copy link
Contributor

@garyrussell garyrussell Mar 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a problem here; I noticed this while running tests...

java.lang.UnsupportedOperationException: Container should never call this
	at org.springframework.kafka.listener.RemainingRecordsErrorHandler.handle(RemainingRecordsErrorHandler.java:39)
	at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:1)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:752)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:748)

testExceptionWhenCommitAfterRebalance

garyrussell added a commit to garyrussell/spring-kafka that referenced this pull request Mar 21, 2018
Fixes spring-projects#614

PR spring-projects#595 added support for calling error handlers for general errors not related
to listener invocation.

However, the wrong `handle()` method was called.

Add a default implementation of the lowest interface method in the hierarchies to
`ErrorHandler` and `BatchErrorHandler` respectively and invoke that so the
right method will always be invoked, regardless of the error handler type.

Also see spring-projects#615
garyrussell added a commit to garyrussell/spring-kafka that referenced this pull request Mar 21, 2018
Fixes spring-projects#614

PR spring-projects#595 added support for calling error handlers for general errors not related
to listener invocation.

However, the wrong `handle()` method was called.

Add a default implementation of the lowest interface method in the hierarchies to
`ErrorHandler` and `BatchErrorHandler` respectively and invoke that so the
right method will always be invoked, regardless of the error handler type.

Also see spring-projects#615
garyrussell added a commit to garyrussell/spring-kafka that referenced this pull request Mar 21, 2018
Fixes spring-projects#614

PR spring-projects#595 added support for calling error handlers for general errors not related
to listener invocation.

However, the wrong `handle()` method was called.

Add a default implementation of the lowest interface method in the hierarchies to
`ErrorHandler` and `BatchErrorHandler` respectively and invoke that so the
right method will always be invoked, regardless of the error handler type.

Also see spring-projects#615
garyrussell added a commit to garyrussell/spring-kafka that referenced this pull request Mar 21, 2018
Fixes spring-projects#614

PR spring-projects#595 added support for calling error handlers for general errors not related
to listener invocation.

However, the wrong `handle()` method was called.

Add a default implementation of the lowest interface method in the hierarchies to
`ErrorHandler` and `BatchErrorHandler` respectively and invoke that so the
right method will always be invoked, regardless of the error handler type.

Also see spring-projects#615
artembilan pushed a commit that referenced this pull request Mar 21, 2018
Fixes #614

PR #595 added support for calling error handlers for general errors not related
to listener invocation.

However, the wrong `handle()` method was called.

Add a default implementation of the lowest interface method in the hierarchies to
`ErrorHandler` and `BatchErrorHandler` respectively and invoke that so the
right method will always be invoked, regardless of the error handler type.

Also see #615
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants