-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
This is more a feature request and an issue: While working with this project and the spring-integration-kafka project, I faced the problem that KafkaConsumer can fetch messages much faster than I can consume them. With the current implementation the batch of messages are read and each one is pushed downstream for processing. However, if it takes more than session.timeout.ms to process the batch (which can be thousands of messages if there is a large kafka lag) a rebalance will be triggered as KafkaConsumer was not able to send heart beats (it happens while calling poll()). This could be solved by placing a queueChannel after the KafkaMessageListenerContainer, but that just moves the problem as at some point that queue is going to fill up or I run out of memory.
So my suggestion is this: Add support for some flow control in the KafkaMessageListenerContainer so I can start/pause the KafkaConsumer when my downstream processing pipe is backed up to prevent filling up all buffers. KafkaConsumer supports start/pause out of the box, so it allows to pause the consumer while it keeps sending heart beats (with poll()) but not receive more messages until it can be consumed downstream. This could then be wrapped in a integration component that could manage the flow control based on MessageDeliveryExceptions or similar.
I did an implementation like the above, but it required to copy the existing KafkaMessageListenerContainer to make it work due to most of the logic is in the inner class.