Skip to content

Conversation

garyrussell
Copy link
Contributor

Resolves #2340

The RetryingBatchErrorHandler - now called the FallbackBatchErrorHandler
pauses and resumes the consumer during retries, to allow it to poll the
consumer to avoid a forced rebalance.

However, if a normal rebalance occurs, for example if a new member joins,
the error handler does not re-pause the consumer and silently consumes
new records.

Add a mechanism to always re-pause the consume when in this retry mode.

cherry-pick to 2.9.x, 2.8.x

As well as the unit test, tested proper functionality by running two copies of a Boot app.

Resolves spring-projects#2340

The `RetryingBatchErrorHandler` - now called the `FallbackBatchErrorHandler`
pauses and resumes the consumer during retries, to allow it to poll the
consumer to avoid a forced rebalance.

However, if a normal rebalance occurs, for example if a new member joins,
the error handler does not re-pause the consumer and silently consumes
new records.

Add a mechanism to always re-pause the consume when in this retry mode.

**cherry-pick to 2.9.x, 2.8.x**

private boolean ackAfterHandle = true;

private boolean retrying;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

volatile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need; always used from the consumer thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, you mean that onPartitionsAssigned() is called withing that ErrorHandlingUtils.retryBatch() process?
Otherwise I don't see how this variable is involved...

Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ErrorHandlingUtils.retryBatch pauses the consumer and polls in a while loop until retries are exhausted, and then resumes.

We need to repause the consumer if onPartitionsAssigned() is called while we are in that loop (it is called from poll() in the kafka-clients).

So the thread that tests this field is the same thread that is looping within retryBatch.

@artembilan artembilan merged commit edfbd1d into spring-projects:main Jul 11, 2022
@artembilan
Copy link
Member

... and cherry-picked to 2.9.x .

It does not apply clearly to 2.8.x.
Would you mind taking a look, @garyrussell ?

Thanks

@garyrussell garyrussell deleted the GH-2340 branch July 11, 2022 20:31
@garyrussell
Copy link
Contributor Author

Back-ported to 2.8.x

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Losing messages in RetryingBatchErrorHandler/ErrorHandlingUtils during rebalancing

2 participants