Skip to content

Commit e38c4cb

Browse files
authored
GH-2282: Fix Multi RetryableTopic Same Topic
Resolves #2282 * Fix docs. * Copyrights; author tags. * Remove unnecessary method override in KLEAdapter. * Fix new test after rebase. * Fix test after rebase. Move `getEndpointId()` call to within `ListenerScope`.
1 parent 70b9143 commit e38c4cb

21 files changed

+335
-159
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,37 @@ public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProv
775775
----
776776
====
777777

778+
[[multi-retry]]
779+
==== Multiple Listeners, Same Topic(s)
780+
781+
Starting with version 3.0, it is now possible to configure multiple listeners on the same topic(s).
782+
In order to do this, you must use custom topic naming to isolate the retry topics from each other.
783+
This is best shown with an example:
784+
785+
====
786+
[source, java]
787+
----
788+
@RetryableTopic(...
789+
retryTopicSuffix = "-listener1", dltTopicSuffix = "-listener1-dlt",
790+
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
791+
@KafkaListener(id = "listener1", groupId = "group1", topics = TWO_LISTENERS_TOPIC, ...)
792+
void listen1(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
793+
...
794+
}
795+
796+
@RetryableTopic(...
797+
retryTopicSuffix = "-listener2", dltTopicSuffix = "-listener2-dlt",
798+
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
799+
@KafkaListener(id = "listener2", groupId = "group2", topics = TWO_LISTENERS_TOPIC, ...)
800+
void listen2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
801+
...
802+
}
803+
----
804+
====
805+
806+
The `topicSuffixingStrategy` is optional.
807+
The framework will configure and use a separate set of retry topics for each listener.
808+
778809
==== Dlt Strategies
779810

780811
The framework provides a few strategies for working with DLTs. You can provide a method for DLT processing, use the default logging method, or have no DLT at all. Also you can choose what happens if DLT processing fails.

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ You can now set a different `concurrency` for the retry containers; by default,
4040

4141
See <<retry-config>> for more information.
4242

43+
You can now configure multiple `@RetryableTopic` listeners on the same topic in the same application context.
44+
Previously, this was not possible.
45+
See <<multi-retry>> for more information.
46+
4347
[[x30-lc-changes]]
4448
==== Listener Container Changes
4549

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,7 @@ private void processMultiMethodListeners(Collection<KafkaListener> classLevelLis
468468
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
469469
String beanRef = classLevelListener.beanRef();
470470
this.listenerScope.addListener(beanRef, bean);
471+
endpoint.setId(getEndpointId(classLevelListener));
471472
processListener(endpoint, classLevelListener, bean, beanName, resolveTopics(classLevelListener),
472473
resolveTopicPartitions(classLevelListener));
473474
this.listenerScope.removeListener(beanRef);
@@ -478,6 +479,7 @@ protected void processKafkaListener(KafkaListener kafkaListener, Method method,
478479
Method methodToUse = checkProxy(method, bean);
479480
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
480481
endpoint.setMethod(methodToUse);
482+
endpoint.setId(getEndpointId(kafkaListener));
481483

482484
String beanRef = kafkaListener.beanRef();
483485
this.listenerScope.addListener(beanRef, bean);
@@ -627,7 +629,6 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> en
627629

628630
endpoint.setBean(bean);
629631
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
630-
endpoint.setId(getEndpointId(kafkaListener));
631632
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
632633
endpoint.setTopicPartitions(tps);
633634
endpoint.setTopics(topics);

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,8 @@ public void afterPropertiesSet() {
356356
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
357357
C instance = createContainerInstance(endpoint);
358358
JavaUtils.INSTANCE
359-
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
359+
.acceptIfNotNull(endpoint.getId(), instance::setBeanName)
360+
.acceptIfNotNull(endpoint.getMainListenerId(), instance::setMainListenerId);
360361
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
361362
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
362363
}

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
120120

121121
private String correlationHeaderName;
122122

123+
@Nullable
124+
private String mainListenerId;
125+
123126
@Override
124127
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
125128
this.beanFactory = beanFactory;
@@ -154,6 +157,16 @@ public void setId(String id) {
154157
this.id = id;
155158
}
156159

160+
public void setMainListenerId(String id) {
161+
this.mainListenerId = id;
162+
}
163+
164+
@Override
165+
@Nullable
166+
public String getMainListenerId() {
167+
return this.mainListenerId;
168+
}
169+
157170
@Nullable
158171
@Override
159172
public String getId() {

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,4 +155,14 @@ default byte[] getListenerInfo() {
155155
return null;
156156
}
157157

158+
/**
159+
* Return the main listener id if this container is for a retry topic.
160+
* @return the main listener id or null.
161+
* @since 3.0
162+
*/
163+
@Nullable
164+
default String getMainListenerId() {
165+
return null;
166+
}
167+
158168
}

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ public abstract class AbstractMessageListenerContainer<K, V>
119119

120120
private volatile boolean stoppedNormally = true;
121121

122+
@Nullable
123+
private String mainListenerId;
124+
122125
/**
123126
* Construct an instance with the provided factory and properties.
124127
* @param consumerFactory the factory.
@@ -376,6 +379,21 @@ public String getListenerId() {
376379
return this.beanName; // the container factory sets the bean name to the id attribute
377380
}
378381

382+
/**
383+
* Set the main listener id, if this container is for a retry topic.
384+
* @param id the id.
385+
* @since 3.0.
386+
*/
387+
public void setMainListenerId(String id) {
388+
this.mainListenerId = id;
389+
}
390+
391+
@Override
392+
@Nullable
393+
public String getMainListenerId() {
394+
return this.mainListenerId;
395+
}
396+
379397
@Nullable
380398
@Override
381399
public byte[] getListenerInfo() {

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,17 @@ default String getListenerId() {
196196
throw new UnsupportedOperationException("This container does not support retrieving the listener id");
197197
}
198198

199+
/**
200+
* The 'id' attribute of the main {@code @KafkaListener} container, if this container
201+
* is for a retry topic; null otherwise.
202+
* @return the id.
203+
* @since 3.0
204+
*/
205+
@Nullable
206+
default String getMainListenerId() {
207+
throw new UnsupportedOperationException("This container does not support retrieving the main listener id");
208+
}
209+
199210
/**
200211
* Get arbitrary static information that will be added to the
201212
* {@link KafkaHeaders#LISTENER_INFO} header of all records.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Set;
2525
import java.util.function.BiFunction;
2626
import java.util.function.Consumer;
27+
import java.util.function.Function;
2728

2829
import org.apache.commons.logging.LogFactory;
2930
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -158,10 +159,10 @@ public void alwaysLogListenerException() {
158159
}
159160

160161
@SuppressWarnings("unchecked")
161-
public DeadLetterPublishingRecoverer create() {
162+
public DeadLetterPublishingRecoverer create(String mainListenerId) {
163+
Assert.notNull(mainListenerId, "'listenerId' cannot be null");
162164
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(// NOSONAR anon. class size
163-
this::resolveTemplate,
164-
false, (this::resolveDestination)) {
165+
templateResolver(mainListenerId), false, destinationResolver(mainListenerId)) {
165166

166167
@Override
167168
protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
@@ -185,7 +186,8 @@ protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
185186
}
186187
};
187188

188-
recoverer.setHeadersFunction((consumerRecord, e) -> addHeaders(consumerRecord, e, getAttempts(consumerRecord)));
189+
recoverer.setHeadersFunction(
190+
(consumerRecord, e) -> addHeaders(mainListenerId, consumerRecord, e, getAttempts(consumerRecord)));
189191
if (this.headersFunction != null) {
190192
recoverer.addHeadersFunction(this.headersFunction);
191193
}
@@ -199,33 +201,35 @@ protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
199201
return recoverer;
200202
}
201203

202-
private KafkaOperations<?, ?> resolveTemplate(ProducerRecord<?, ?> outRecord) {
203-
return this.destinationTopicResolver
204-
.getDestinationTopicByName(outRecord.topic())
204+
private Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver(String mainListenerId) {
205+
return outRecord -> this.destinationTopicResolver
206+
.getDestinationTopicByName(mainListenerId, outRecord.topic())
205207
.getKafkaOperations();
206208
}
207209

208210
public void setDeadLetterPublishingRecovererCustomizer(Consumer<DeadLetterPublishingRecoverer> customizer) {
209211
this.recovererCustomizer = customizer;
210212
}
211213

212-
private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e) {
213-
if (SeekUtils.isBackoffException(e)) {
214-
throw (NestedRuntimeException) e; // Necessary to not commit the offset and seek to current again
215-
}
214+
private BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver(String mainListenerId) {
215+
return (cr, ex) -> {
216+
if (SeekUtils.isBackoffException(ex)) {
217+
throw (NestedRuntimeException) ex; // Necessary to not commit the offset and seek to current again
218+
}
216219

217-
DestinationTopic nextDestination = this.destinationTopicResolver.resolveDestinationTopic(
218-
cr.topic(), getAttempts(cr), e, getOriginalTimestampHeaderLong(cr));
220+
DestinationTopic nextDestination = this.destinationTopicResolver.resolveDestinationTopic(mainListenerId,
221+
cr.topic(), getAttempts(cr), ex, getOriginalTimestampHeaderLong(cr));
219222

220-
LOGGER.debug(() -> "Resolved topic: " + (nextDestination.isNoOpsTopic()
221-
? "none"
222-
: nextDestination.getDestinationName()));
223+
LOGGER.debug(() -> "Resolved topic: " + (nextDestination.isNoOpsTopic()
224+
? "none"
225+
: nextDestination.getDestinationName()));
223226

224-
maybeLogListenerException(e, cr, nextDestination);
227+
maybeLogListenerException(ex, cr, nextDestination);
225228

226-
return nextDestination.isNoOpsTopic()
227-
? null
228-
: resolveTopicPartition(cr, nextDestination);
229+
return nextDestination.isNoOpsTopic()
230+
? null
231+
: resolveTopicPartition(cr, nextDestination);
232+
};
229233
}
230234

231235
private void maybeLogListenerException(Exception e, ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
@@ -299,25 +303,27 @@ else if (value.length == Integer.BYTES) {
299303
return 1;
300304
}
301305

302-
private Headers addHeaders(ConsumerRecord<?, ?> consumerRecord, Exception e, int attempts) {
306+
private Headers addHeaders(String mainListenerId, ConsumerRecord<?, ?> consumerRecord, Exception e, int attempts) {
303307
Headers headers = new RecordHeaders();
304308
byte[] originalTimestampHeader = getOriginalTimestampHeaderBytes(consumerRecord);
305309
headers.add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, originalTimestampHeader);
306310
headers.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS,
307311
ByteBuffer.wrap(new byte[Integer.BYTES]).putInt(attempts + 1).array());
308312
headers.add(RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP,
309-
BigInteger.valueOf(getNextExecutionTimestamp(consumerRecord, e, originalTimestampHeader))
313+
BigInteger
314+
.valueOf(getNextExecutionTimestamp(mainListenerId, consumerRecord, e, originalTimestampHeader))
310315
.toByteArray());
311316
return headers;
312317
}
313318

314-
private long getNextExecutionTimestamp(ConsumerRecord<?, ?> consumerRecord, Exception e,
319+
private long getNextExecutionTimestamp(String mainListenerId, ConsumerRecord<?, ?> consumerRecord, Exception e,
315320
byte[] originalTimestampHeader) {
316321

317322
long originalTimestamp = new BigInteger(originalTimestampHeader).longValue();
318323
long failureTimestamp = getFailureTimestamp(e);
319-
long nextExecutionTimestamp = failureTimestamp + this.destinationTopicResolver
320-
.resolveDestinationTopic(consumerRecord.topic(), getAttempts(consumerRecord), e, originalTimestamp)
324+
long nextExecutionTimestamp = failureTimestamp + this.destinationTopicResolver
325+
.resolveDestinationTopic(mainListenerId, consumerRecord.topic(), getAttempts(consumerRecord), e,
326+
originalTimestamp)
321327
.getDestinationDelay();
322328
LOGGER.debug(() -> String.format("FailureTimestamp: %s, Original timestamp: %s, nextExecutionTimestamp: %s",
323329
failureTimestamp, originalTimestamp, nextExecutionTimestamp));

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727
* Default implementation of the {@link DestinationTopicProcessor} interface.
2828
*
2929
* @author Tomaz Fernandes
30+
* @author Gary Russell
3031
* @since 2.7
3132
*
3233
*/
@@ -60,7 +61,8 @@ public void processRegisteredDestinations(Consumer<Collection<String>> topicsCal
6061
context
6162
.destinationsByTopicMap
6263
.values()
63-
.forEach(topicDestinations -> this.destinationTopicResolver.addDestinationTopics(topicDestinations));
64+
.forEach(topicDestinations -> this.destinationTopicResolver.addDestinationTopics(
65+
context.listenerId, topicDestinations));
6466
topicsCallback.accept(getAllTopicsNamesForThis(context));
6567
}
6668

0 commit comments

Comments
 (0)