Skip to content

Conversation

garyrussell
Copy link
Contributor

Resolves #2332

Use manual assignments, if present for pause.
Also, in the rebalance listener, use isPaused() rather than consumerPaused
to determine whether the partitions should be paused.

Add logs and events for pauses in the rebal listener.

cherry-pick to 2.9.x, 2.8.x

Resolves spring-projects#2332

Use manual assignments, if present, for pause.
Also, in the rebalance listener, use `isPaused()` rather than `consumerPaused`
to determine whether the partitions should be paused.

Add logs and events for pauses in the rebal listener.

Revert errant log4j commit.

**cherry-pick to 2.9.x, 2.8.x**
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
publishConsumerPausedEvent(this.consumer.assignment());
Collection<TopicPartition> assigned = getAssignedPartitions();
if (!CollectionUtils.isEmpty(assigned)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this one has to come together with an import org.springframework.util.CollectionUtils;

@artembilan artembilan merged commit c0e210c into spring-projects:main Jul 6, 2022
@artembilan
Copy link
Member

Cherry-picked to 2.9.x, but it doesn't back-port clear to 2.8.x.
No pauseForPending in the KafkaMessageListenerContainer and no DefaultErrorHandler.setSeekAfterError() used in the PauseContainerManualAssignmentTests.
Thanks

@garyrussell garyrussell deleted the GH-2332 branch July 7, 2022 13:24
@garyrussell
Copy link
Contributor Author

Back ported to 2.8.x; found a problem during the back port - fixed there, as well as on main and 2.9.x a63359c

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Container pause() Does Not Work with Manual TopicPartition Assignment
2 participants