Skip to content

In the destination topic chain, the type REUSABLE_RETRY_TOPIC can only be specified as the last retry topic #3834

@FabioBentoLuiz

Description

@FabioBentoLuiz

In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.4

Describe the bug

Routing of messages to custom DLTs based on thrown exceptions is not working. The listener bean initialization fails with the error:

java.lang.IllegalArgumentException: In the destination topic chain, the type REUSABLE_RETRY_TOPIC can only be specified as the last retry topic.

Stacktrace:

Caused by: java.lang.IllegalArgumentException: In the destination topic chain, the type REUSABLE_RETRY_TOPIC can only be specified as the last retry topic.
at org.springframework.util.Assert.isTrue(Assert.java:116)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver.validateDestinations(DefaultDestinationTopicResolver.java:256)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver.addDestinationTopics(DefaultDestinationTopicResolver.java:240)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicProcessor.lambda$processRegisteredDestinations$1(DefaultDestinationTopicProcessor.java:64)
at java.base/java.util.HashMap$Values.forEach(HashMap.java:1073)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicProcessor.processRegisteredDestinations(DefaultDestinationTopicProcessor.java:64)
at org.springframework.kafka.retrytopic.RetryTopicConfigurer.processMainAndRetryListeners(RetryTopicConfigurer.java:321)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:544)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:517)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMultiMethodListeners(KafkaListenerAnnotationBeanPostProcessor.java:486)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:418)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:439)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1815)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:601)
... 41 more

I see that at DefaultDestinationTopicResolver.java:256 an assertion happens:

private void validateDestinations(List<DestinationTopic> destinationsToAdd) {
	for (int i = 0; i < destinationsToAdd.size(); i++) {
		DestinationTopic destination = destinationsToAdd.get(i);
		if (destination.isReusableRetryTopic()) {
			Assert.isTrue((i == (destinationsToAdd.size() - 1) ||
					((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))),
					String.format("In the destination topic chain, the type %s can only be "
							+ "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC));
		}
	}
}

At this point, my destinationsToAdd (see demo application attached) looks like:

Pos Topic Type
0 test-topic MAIN
1 test-topic.retry REUSABLE_RETRY_TOPIC
2 test-topic.custom.dlt DLT
3 test-topic.dlt DLT

As the topic with type REUSABLE_RETRY_TOPIC is not the last or last but one followed by a DLT, the assertion fails.

To Reproduce

Configure a RetryableTopic and set the exceptionBasedDltRouting parameter:

@KafkaListener(topics = "test-topic", id = "test-listener", idIsGroup = false, groupId = "test-group")
@RetryableTopic(
        backoff = @Backoff(delay = 100),
        attempts = "10",
        autoCreateTopics = "false",
        autoStartDltHandler = "true",
        dltTopicSuffix = ".dlt",
        retryTopicSuffix = ".retry",
        exceptionBasedDltRouting = {
                @ExceptionBasedDltDestination(
                        suffix = ".custom",
                        exceptions = {MyCustomException.class}
                )}
)

Expected behavior

The listener bean is created without error and if the process fails multiple times due MyCustomException, the DLT containing the ".custom.dlt" suffix will be considered as the tarrget topic for the message before the general pupose DLT is considered.

Sample

demo1.zip

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions