Skip to content

Commit 72d4284

Browse files
Aleksei-Chgaryrussell
authored andcommitted
@RetryableTopic Support SpEL for listenerContainerFactory and kafkaTemplate properties
1 parent e38c4cb commit 72d4284

File tree

2 files changed

+9
-9
lines changed

2 files changed

+9
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
135135
.dltSuffix(resolveExpressionAsString(annotation.dltTopicSuffix(), "dltTopicSuffix"))
136136
.dltHandlerMethod(getDltProcessor(method, bean))
137137
.includeTopics(Arrays.asList(topics))
138-
.listenerFactory(annotation.listenerContainerFactory())
138+
.listenerFactory(resolveExpressionAsString(annotation.listenerContainerFactory(), "listenerContainerFactory"))
139139
.autoCreateTopics(resolveExpressionAsBoolean(annotation.autoCreateTopics(), "autoCreateTopics"),
140140
resolveExpressionAsInteger(annotation.numPartitions(), "numPartitions", true),
141141
resolveExpressionAsShort(annotation.replicationFactor(), "replicationFactor", true))
@@ -147,7 +147,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
147147
.autoStartDltHandler(autoStartDlt)
148148
.setTopicSuffixingStrategy(annotation.topicSuffixingStrategy())
149149
.timeoutAfter(timeout)
150-
.create(getKafkaTemplate(annotation.kafkaTemplate(), topics));
150+
.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics));
151151
}
152152

153153
private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, BeanFactory beanFactory) { // NOSONAR

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
RetryTopicIntegrationTests.THIRD_TOPIC,
9696
RetryTopicIntegrationTests.FOURTH_TOPIC,
9797
RetryTopicIntegrationTests.TWO_LISTENERS_TOPIC })
98-
@TestPropertySource(properties = "five.attempts=5")
98+
@TestPropertySource(properties = { "five.attempts=5", "kafka.template=customKafkaTemplate"})
9999
public class RetryTopicIntegrationTests extends AbstractRetryTopicIntegrationTests {
100100

101101
private static final Logger logger = LoggerFactory.getLogger(RetryTopicIntegrationTests.class);
@@ -286,7 +286,7 @@ static class ThirdTopicListener {
286286
backoff = @Backoff(delay = 250, maxDelay = 1000, multiplier = 1.5),
287287
numPartitions = "#{3}",
288288
timeout = "${missing.property:2000}",
289-
include = MyRetryException.class, kafkaTemplate = "kafkaTemplate",
289+
include = MyRetryException.class, kafkaTemplate = "${kafka.template}",
290290
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
291291
concurrency = "1")
292292
@KafkaListener(id = "thirdTopicId", topics = THIRD_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY,
@@ -311,7 +311,7 @@ static class FourthTopicListener {
311311
CountDownLatchContainer container;
312312

313313
@RetryableTopic(dltStrategy = DltStrategy.NO_DLT, attempts = "4", backoff = @Backoff(300),
314-
kafkaTemplate = "kafkaTemplate")
314+
kafkaTemplate = "${kafka.template}")
315315
@KafkaListener(topics = FOURTH_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
316316
public void listenNoDlt(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
317317
logger.debug("Message {} received in topic {} ", message, receivedTopic);
@@ -337,7 +337,7 @@ static class FifthTopicListener1 {
337337
numPartitions = "2",
338338
retryTopicSuffix = "-listener1", dltTopicSuffix = "-listener1-dlt",
339339
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
340-
kafkaTemplate = "kafkaTemplate")
340+
kafkaTemplate = "${kafka.template}")
341341
@KafkaListener(id = "fifthTopicId1", topicPartitions = {@TopicPartition(topic = TWO_LISTENERS_TOPIC,
342342
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))},
343343
containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
@@ -369,7 +369,7 @@ static class FifthTopicListener2 {
369369
numPartitions = "2",
370370
retryTopicSuffix = "-listener2", dltTopicSuffix = "-listener2-dlt",
371371
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
372-
kafkaTemplate = "kafkaTemplate")
372+
kafkaTemplate = "${kafka.template}")
373373
@KafkaListener(id = "fifthTopicId2", topicPartitions = {@TopicPartition(topic = TWO_LISTENERS_TOPIC,
374374
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))},
375375
containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
@@ -397,7 +397,7 @@ static class NoRetryTopicListener {
397397

398398
@RetryableTopic(attempts = "3", numPartitions = "3", exclude = MyDontRetryException.class,
399399
backoff = @Backoff(delay = 50, maxDelay = 100, multiplier = 3),
400-
traversingCauses = "true", kafkaTemplate = "kafkaTemplate")
400+
traversingCauses = "true", kafkaTemplate = "${kafka.template}")
401401
@KafkaListener(topics = NOT_RETRYABLE_EXCEPTION_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
402402
public void listenWithAnnotation2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
403403
container.countDownIfNotKnown(receivedTopic, container.countDownLatchNoRetry);
@@ -592,7 +592,7 @@ public ProducerFactory<String, String> producerFactory() {
592592
return new DefaultKafkaProducerFactory<>(configProps);
593593
}
594594

595-
@Bean
595+
@Bean("customKafkaTemplate")
596596
public KafkaTemplate<String, String> kafkaTemplate() {
597597
return new KafkaTemplate<>(producerFactory());
598598
}

0 commit comments

Comments
 (0)