Skip to content

Non-Blocking Retries don't work with BatchListener #2699

@wapkch

Description

@wapkch

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.0.7

Describe the bug

When using a batch listener together with retry topic, an IllegalArgumentException encounted

To Reproduce

  1. Define a batch listener together with retry topic:
@SpringBootApplication
public class SpringKafkaDemoApplication {

    @RetryableTopic(attempts = "5", backoff = @Backoff(delay = 2_000, maxDelay = 10_000, multiplier = 2))
    @KafkaListener(id = "fooGroup", topics = "topic1", batch = "true")
    public void listen(List<ConsumerRecord<String, String>> data, Acknowledgment acknowledgment) throws Exception {
        // ...
    }

}
  1. Start the SpringBoot application, the IllegalArgumentException encounted:
java.lang.IllegalArgumentException: The provided class BatchMessagingMessageListenerAdapter is not assignable from MessageListener: class org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter
	at org.springframework.util.Assert.assignableCheckFailed(Assert.java:731) ~[spring-core-6.0.9.jar:6.0.9]
	at org.springframework.util.Assert.isAssignable(Assert.java:681) ~[spring-core-6.0.9.jar:6.0.9]
	at org.springframework.kafka.retrytopic.ListenerContainerFactoryConfigurer.checkAndCast(ListenerContainerFactoryConfigurer.java:201) ~[spring-kafka-3.0.7.jar:3.0.7]
	at org.springframework.kafka.retrytopic.ListenerContainerFactoryConfigurer.setupBackoffAwareMessageListenerAdapter(ListenerContainerFactoryConfigurer.java:190) ~[spring-kafka-3.0.7.jar:3.0.7]
	at org.springframework.kafka.retrytopic.ListenerContainerFactoryConfigurer$RetryTopicListenerContainerFactoryDecorator.decorate(ListenerContainerFactoryConfigurer.java:241) ~[spring-kafka-3.0.7.jar:3.0.7]
	at org.springframework.kafka.retrytopic.ListenerContainerFactoryConfigurer$RetryTopicListenerContainerFactoryDecorator.createListenerContainer(ListenerContainerFactoryConfigurer.java:226) ~[spring-kafka-3.0.7.jar:3.0.7]
	at org.springframework.kafka.retrytopic.ListenerContainerFactoryConfigurer$RetryTopicListenerContainerFactoryDecorator.createListenerContainer(ListenerContainerFactoryConfigurer.java:207) ~[spring-kafka-3.0.7.jar:3.0.7]
	at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:280) ~[spring-kafka-3.0.7.jar:3.0.7]
	at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:219) ~[spring-kafka-3.0.7.jar:3.0.7]
	at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:192) ~[spring-kafka-3.0.7.jar:3.0.7]
	at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:196) ~[spring-kafka-3.0.7.jar:3.0.7]
	at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:186) ~[spring-kafka-3.0.7.jar:3.0.7]

Expected behavior
I can't find instructions explain why batch listener can not be used together with retry topic in documentation.

What's the problem if we just recover the failed record in BatchListenerFailedException using retry topic and seek the remaining records?

Can we make batch listener and retry topic work together?

Sample

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions