Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.log.LogAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
Expand Down Expand Up @@ -132,7 +132,7 @@ public void setMessagingConverter(SmartMessageConverter messagingConverter) {
private String getReplyTopic() {
Method replyingMethod = getMethod();
if (replyingMethod != null) {
SendTo ann = AnnotationUtils.getAnnotation(replyingMethod, SendTo.class);
SendTo ann = AnnotatedElementUtils.findMergedAnnotation(replyingMethod, SendTo.class);
if (ann != null) {
if (replyingMethod.getReturnType().equals(void.class)) {
this.logger.warn(() -> "Method "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.kafka.KafkaException;
Expand Down Expand Up @@ -193,12 +193,12 @@ private void setupReplyTo(InvocableHandlerMethod handler) {
Method method = handler.getMethod();
SendTo ann = null;
if (method != null) {
ann = AnnotationUtils.getAnnotation(method, SendTo.class);
ann = AnnotatedElementUtils.findMergedAnnotation(method, SendTo.class);
replyTo = extractSendTo(method.toString(), ann);
}
if (ann == null) {
Class<?> beanType = handler.getBeanType();
ann = AnnotationUtils.getAnnotation(beanType, SendTo.class);
ann = AnnotatedElementUtils.findMergedAnnotation(beanType, SendTo.class);
replyTo = extractSendTo(beanType.getSimpleName(), ann);
}
if (ann != null && replyTo == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle",
"annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41", "annotated42" })
"annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41", "annotated42",
"annotated43", "annotated43reply"})
@TestPropertySource(properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10")
public class EnableKafkaIntegrationTests {

Expand Down Expand Up @@ -431,6 +432,15 @@ public void testInterface() throws Exception {
template.send("annotated7", 0, "foo");
template.flush();
assertThat(this.ifaceListener.getLatch1().await(60, TimeUnit.SECONDS)).isTrue();
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testInterface");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "annotated43reply");
template.send("annotated43", 0, "foo");
ConsumerRecord<Integer, String> reply = KafkaTestUtils.getSingleRecord(consumer, "annotated43reply");
assertThat(reply).extracting(rec -> rec.value()).isEqualTo("FOO");
consumer.close();
}

@Test
Expand Down Expand Up @@ -1373,8 +1383,8 @@ public MultiListenerNoDefault multiNoDefault() {
}

@Bean
public MultiListenerSendTo multiListenerSendTo() {
return new MultiListenerSendTo();
public MultiListenerSendToImpl multiListenerSendTo() {
return new MultiListenerSendToImpl();
}

@Bean
Expand Down Expand Up @@ -2296,6 +2306,10 @@ interface IfaceListener<T> {

void listen(T foo);

@SendTo("annotated43reply")
@KafkaListener(id = "ifcR", topics = "annotated43")
String reply(String in);

}

static class IfaceListenerImpl implements IfaceListener<String> {
Expand All @@ -2316,6 +2330,11 @@ public void listenTx(String foo) {
latch2.countDown();
}

@Override
public String reply(String in) {
return in.toUpperCase();
}

public CountDownLatch getLatch1() {
return latch1;
}
Expand Down Expand Up @@ -2422,15 +2441,25 @@ public void bar(@Valid ValidatedClass val) {

@KafkaListener(id = "multiSendTo", topics = "annotated25")
@SendTo("annotated25reply1")
static class MultiListenerSendTo {
interface MultiListenerSendTo {

@KafkaHandler
String foo(String in);

@KafkaHandler
@SendTo("!{'annotated25reply2'}")
String bar(KafkaNull nul, int key);

}

static class MultiListenerSendToImpl implements MultiListenerSendTo {

@Override
public String foo(String in) {
return in.toUpperCase();
}

@KafkaHandler
@SendTo("!{'annotated25reply2'}")
@Override
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
return "BAR";
Expand Down