Skip to content

Commit 304a1bf

Browse files
committed
* @RetryableTopic support annotated on Class.
* Support process `@RetryableTopic` from Class in `RetryTopicConfigurationProvider`. * Support process `@DltHandler` when `@RetryableTopic` annotated on the Class in `RetryableTopicAnnotationProcessor`. part of #3105
1 parent 65105b6 commit 304a1bf

File tree

5 files changed

+176
-58
lines changed

5 files changed

+176
-58
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,8 +16,10 @@
1616

1717
package org.springframework.kafka.annotation;
1818

19+
import java.lang.reflect.AnnotatedElement;
1920
import java.lang.reflect.Method;
2021
import java.util.Map;
22+
import java.util.Objects;
2123

2224
import org.apache.commons.logging.LogFactory;
2325

@@ -53,6 +55,8 @@
5355
*
5456
* @author Tomaz Fernandes
5557
* @author Gary Russell
58+
* @author Wang Zhiyang
59+
*
5660
* @since 2.7
5761
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurer
5862
* @see RetryableTopic
@@ -96,19 +100,34 @@ public RetryTopicConfigurationProvider(@Nullable BeanFactory beanFactory, @Nulla
96100
this.resolver = resolver;
97101
this.expressionContext = expressionContext;
98102
}
103+
99104
@Nullable
100105
public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method method, Object bean) {
101-
RetryableTopic annotation = MergedAnnotations.from(method, SearchStrategy.TYPE_HIERARCHY,
102-
RepeatableContainers.none())
103-
.get(RetryableTopic.class)
104-
.synthesize(MergedAnnotation::isPresent)
105-
.orElse(null);
106+
return findRetryConfigurationFor(topics, method, null, bean);
107+
}
108+
109+
@Nullable
110+
public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, @Nullable Method method,
111+
@Nullable Class<?> clazz, Object bean) {
112+
113+
RetryableTopic annotation = getRetryableTopicAnnotationFromAnnotatedElement(
114+
Objects.requireNonNullElse(method, clazz));
115+
Class<?> declaringClass = method != null ? method.getDeclaringClass() : clazz;
106116
return annotation != null
107117
? new RetryableTopicAnnotationProcessor(this.beanFactory, this.resolver, this.expressionContext)
108-
.processAnnotation(topics, method, annotation, bean)
118+
.processAnnotation(topics, declaringClass, annotation, bean)
109119
: maybeGetFromContext(topics);
110120
}
111121

122+
@Nullable
123+
private RetryableTopic getRetryableTopicAnnotationFromAnnotatedElement(AnnotatedElement element) {
124+
return MergedAnnotations.from(element, SearchStrategy.TYPE_HIERARCHY,
125+
RepeatableContainers.none())
126+
.get(RetryableTopic.class)
127+
.synthesize(MergedAnnotation::isPresent)
128+
.orElse(null);
129+
}
130+
112131
@Nullable
113132
private RetryTopicConfiguration maybeGetFromContext(String[] topics) {
114133
if (this.beanFactory == null || !ListableBeanFactory.class.isAssignableFrom(this.beanFactory.getClass())) {

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
*
4848
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurer
4949
*/
50-
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
50+
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE, ElementType.TYPE })
5151
@Retention(RetentionPolicy.RUNTIME)
5252
@Documented
5353
public @interface RetryableTopic {

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
* @author Tomaz Fernandes
6464
* @author Gary Russell
6565
* @author Adrian Chlebosz
66+
* @author Wang Zhiyang
67+
*
6668
* @since 2.7
6769
*
6870
*/
@@ -115,6 +117,13 @@ public RetryableTopicAnnotationProcessor(@Nullable BeanFactory beanFactory, @Nul
115117
public RetryTopicConfiguration processAnnotation(String[] topics, Method method, RetryableTopic annotation,
116118
Object bean) {
117119

120+
Class<?> clazz = method.getDeclaringClass();
121+
return processAnnotation(topics, clazz, annotation, bean);
122+
}
123+
124+
public RetryTopicConfiguration processAnnotation(String[] topics, Class<?> clazz, RetryableTopic annotation,
125+
Object bean) {
126+
118127
Long resolvedTimeout = resolveExpressionAsLong(annotation.timeout(), "timeout", false);
119128
long timeout = RetryTopicConstants.NOT_SET;
120129
if (resolvedTimeout != null) {
@@ -140,7 +149,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
140149
.customBackoff(createBackoffFromAnnotation(annotation.backoff(), this.beanFactory))
141150
.retryTopicSuffix(resolveExpressionAsString(annotation.retryTopicSuffix(), "retryTopicSuffix"))
142151
.dltSuffix(resolveExpressionAsString(annotation.dltTopicSuffix(), "dltTopicSuffix"))
143-
.dltHandlerMethod(getDltProcessor(method, bean))
152+
.dltHandlerMethod(getDltProcessor(clazz, bean))
144153
.includeTopics(Arrays.asList(topics))
145154
.listenerFactory(resolveExpressionAsString(annotation.listenerContainerFactory(), "listenerContainerFactory"))
146155
.autoCreateTopics(resolveExpressionAsBoolean(annotation.autoCreateTopics(), "autoCreateTopics"),
@@ -218,9 +227,8 @@ private Map<String, Set<Class<? extends Throwable>>> createDltRoutingSpecFromAnn
218227
.collect(Collectors.toMap(ExceptionBasedDltDestination::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions())));
219228
}
220229

221-
private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean) {
222-
Class<?> declaringClass = listenerMethod.getDeclaringClass();
223-
return Arrays.stream(ReflectionUtils.getDeclaredMethods(declaringClass))
230+
private EndpointHandlerMethod getDltProcessor(Class<?> clazz, Object bean) {
231+
return Arrays.stream(ReflectionUtils.getDeclaredMethods(clazz))
224232
.filter(method -> AnnotationUtils.findAnnotation(method, DltHandler.class) != null)
225233
.map(method -> RetryTopicConfigurer.createHandlerMethodWith(bean, method))
226234
.findFirst()

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationProviderTests.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@
2525
import static org.mockito.Mockito.mock;
2626
import static org.mockito.Mockito.times;
2727

28-
import java.lang.annotation.ElementType;
2928
import java.lang.annotation.Retention;
3029
import java.lang.annotation.RetentionPolicy;
31-
import java.lang.annotation.Target;
3230
import java.lang.reflect.Method;
3331
import java.util.Collections;
3432

@@ -96,10 +94,13 @@ void shouldProvideFromAnnotation() {
9694
// given
9795
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
9896
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, annotatedMethod, bean);
97+
RetryTopicConfiguration configurationFromClass = provider
98+
.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
9999

100100
// then
101101
then(this.beanFactory).should(times(0)).getBeansOfType(RetryTopicConfiguration.class);
102-
102+
assertThat(configuration).isNotNull();
103+
assertThat(configurationFromClass).isNotNull();
103104
}
104105

105106
@Test
@@ -113,10 +114,13 @@ void shouldProvideFromBeanFactory() {
113114
// given
114115
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
115116
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, nonAnnotatedMethod, bean);
117+
RetryTopicConfiguration configurationFromClass = provider
118+
.findRetryConfigurationFor(topics, null, NonAnnotatedClass.class, bean);
116119

117120
// then
118-
then(this.beanFactory).should(times(1)).getBeansOfType(RetryTopicConfiguration.class);
121+
then(this.beanFactory).should(times(2)).getBeansOfType(RetryTopicConfiguration.class);
119122
assertThat(configuration).isEqualTo(retryTopicConfiguration);
123+
assertThat(configurationFromClass).isEqualTo(retryTopicConfiguration);
120124

121125
}
122126

@@ -131,10 +135,13 @@ void shouldFindNone() {
131135
// given
132136
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
133137
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, nonAnnotatedMethod, bean);
138+
RetryTopicConfiguration configurationFromClass = provider
139+
.findRetryConfigurationFor(topics, null, NonAnnotatedClass.class, bean);
134140

135141
// then
136-
then(this.beanFactory).should(times(1)).getBeansOfType(RetryTopicConfiguration.class);
142+
then(this.beanFactory).should(times(2)).getBeansOfType(RetryTopicConfiguration.class);
137143
assertThat(configuration).isNull();
144+
assertThat(configurationFromClass).isNull();
138145

139146
}
140147

@@ -147,10 +154,15 @@ void shouldProvideFromMetaAnnotation() {
147154
// given
148155
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
149156
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, metaAnnotatedMethod, bean);
157+
RetryTopicConfiguration configurationFromClass = provider
158+
.findRetryConfigurationFor(topics, null, MetaAnnotatedClass.class, bean);
150159

151160
// then
152161
then(this.beanFactory).should(times(0)).getBeansOfType(RetryTopicConfiguration.class);
162+
assertThat(configuration).isNotNull();
153163
assertThat(configuration.getConcurrency()).isEqualTo(3);
164+
assertThat(configurationFromClass).isNotNull();
165+
assertThat(configurationFromClass.getConcurrency()).isEqualTo(3);
154166

155167
}
156168

@@ -160,9 +172,12 @@ void shouldNotConfigureIfBeanFactoryNull() {
160172
// given
161173
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(null);
162174
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, nonAnnotatedMethod, bean);
175+
RetryTopicConfiguration configurationFromClass
176+
= provider.findRetryConfigurationFor(topics, null, NonAnnotatedClass.class, bean);
163177

164178
// then
165179
assertThat(configuration).isNull();
180+
assertThat(configurationFromClass).isNull();
166181

167182
}
168183

@@ -175,7 +190,6 @@ public void nonAnnotatedMethod() {
175190
// NoOps
176191
}
177192

178-
@Target({ElementType.METHOD})
179193
@Retention(RetentionPolicy.RUNTIME)
180194
@RetryableTopic
181195
@interface MetaAnnotatedRetryableTopic {
@@ -187,4 +201,19 @@ public void nonAnnotatedMethod() {
187201
public void metaAnnotatedMethod() {
188202
// NoOps
189203
}
204+
205+
@RetryableTopic
206+
public static class AnnotatedClass {
207+
// NoOps
208+
}
209+
210+
public static class NonAnnotatedClass {
211+
// NoOps
212+
}
213+
214+
@MetaAnnotatedRetryableTopic
215+
public static class MetaAnnotatedClass {
216+
// NoOps
217+
}
218+
190219
}

0 commit comments

Comments
 (0)