diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java index 3b457d338d4..719323c03d6 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java @@ -31,6 +31,9 @@ import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.AsyncRabbitTemplate; +import org.springframework.amqp.rabbit.annotation.EnableRabbit; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; @@ -56,8 +59,10 @@ import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; +import org.springframework.integration.annotation.Publisher; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.config.EnablePublisher; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlowBuilder; import org.springframework.integration.dsl.Transformers; @@ -68,6 +73,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.handler.annotation.Payload; import org.springframework.rabbit.stream.listener.ConsumerCustomizer; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -87,7 +93,7 @@ @SpringJUnitConfig @RabbitAvailable(queues = {"amqpOutboundInput", "amqpReplyChannel", "asyncReplies", "defaultReplyTo", "si.dsl.test", "si.dsl.exception.test.dlq", - "si.dsl.conv.exception.test.dlq", "testTemplateChannelTransacted"}) + "si.dsl.conv.exception.test.dlq", "testTemplateChannelTransacted", "publisherQueue"}) @DirtiesContext public class AmqpTests { @@ -282,8 +288,23 @@ void streamContainer() { verify(customizer).accept(any(), any()); } + @Autowired + QueueChannel fromRabbitViaPublisher; + + @Test + void messageReceivedFromRabbitListenerViaPublisher() { + this.amqpTemplate.convertAndSend("publisherQueue", "test data"); + + Message receive = this.fromRabbitViaPublisher.receive(10_000); + assertThat(receive).isNotNull() + .extracting(Message::getPayload) + .isEqualTo("TEST DATA"); + } + @Configuration @EnableIntegration + @EnableRabbit + @EnablePublisher public static class ContextConfiguration { @Bean @@ -495,6 +516,27 @@ public AmqpHeaderMapper mapperOut() { return DefaultAmqpHeaderMapper.outboundMapper(); } + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( + ConnectionFactory rabbitConnectionFactory) { + + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConnectionFactory(rabbitConnectionFactory); + return factory; + } + + @Bean + QueueChannel fromRabbitViaPublisher() { + return new QueueChannel(); + } + + @RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue("publisherQueue")) + @Publisher("fromRabbitViaPublisher") + @Payload("#args.payload.toUpperCase()") + public void consumeForPublisher(String payload) { + + } + } } diff --git a/spring-integration-event/src/test/java/org/springframework/integration/event/dsl/IntegrationFlowEventsTests.java b/spring-integration-event/src/test/java/org/springframework/integration/event/dsl/IntegrationFlowEventsTests.java index e183b8f635b..5a3a7973fc8 100644 --- a/spring-integration-event/src/test/java/org/springframework/integration/event/dsl/IntegrationFlowEventsTests.java +++ b/spring-integration-event/src/test/java/org/springframework/integration/event/dsl/IntegrationFlowEventsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,8 +26,11 @@ import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; +import org.springframework.integration.annotation.Publisher; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.config.EnablePublisher; import org.springframework.integration.core.GenericHandler; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.MessageChannels; @@ -84,6 +87,9 @@ public static void setup() { @Autowired private ApplicationEvents applicationEvents; + @Autowired + QueueChannel eventFromPublisher; + @Test public void testEventsFlow() { assertThat(this.applicationEvents.stream(MessagingEvent.class)).isEmpty(); @@ -115,9 +121,17 @@ public void testDelayRescheduling() { assertThat(messageGroupStore.getMessageCountForAllMessageGroups()).isEqualTo(0); } + @Test + public void eventFromPublisherAnnotation() { + this.applicationContext.publishEvent(new TestApplicationEvent3()); + Message receive = this.eventFromPublisher.receive(10000); + assertThat(receive).isNotNull() + .extracting(Message::getPayload).isEqualTo("TestApplicationEvent3"); + } @Configuration @EnableIntegration + @EnablePublisher public static class ContextConfiguration { @Bean @@ -176,6 +190,17 @@ public IntegrationFlow delayFlow() { .channel(MessageChannels.queue("delayedResults")); } + @Bean + QueueChannel eventFromPublisher() { + return new QueueChannel(); + } + + @EventListener + @Publisher("eventFromPublisher") + public String publishEventToChannel(TestApplicationEvent3 testApplicationEvent3) { + return testApplicationEvent3.getSource().toString(); + } + } @SuppressWarnings("serial") @@ -196,4 +221,13 @@ private static final class TestApplicationEvent2 extends ApplicationEvent { } + @SuppressWarnings("serial") + private static final class TestApplicationEvent3 extends ApplicationEvent { + + TestApplicationEvent3() { + super("TestApplicationEvent3"); + } + + } + } diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 3307953fda7..2c64c9f7a35 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -253,6 +253,39 @@ In that regard, it is more similar to the JMS message-driven channel adapter. Starting with version 5.5, the `AmqpInboundChannelAdapter` can be configured with an `org.springframework.amqp.rabbit.retry.MessageRecoverer` strategy which is used in the `RecoveryCallback` when the retry operation is called internally. See `setMessageRecoverer()` JavaDocs for more information. +The `@Publisher` annotation also can be used in combination with a `@RabbitListener`: + +==== +[source, java] +---- +@Configuration +@EnableIntegration +@EnableRabbit +@EnablePublisher +public static class ContextConfiguration { + + @Bean + QueueChannel fromRabbitViaPublisher() { + return new QueueChannel(); + } + + @RabbitListener(queuesToDeclare = @Queue("publisherQueue")) + @Publisher("fromRabbitViaPublisher") + @Payload("#args.payload.toUpperCase()") + public void consumeForPublisher(String payload) { + + } + +} +---- +==== + +By default, the `@Publisher` AOP interceptor deals with a return value from a method call. +However, the return value from a `@RabbitListener` method is treated as an AMQP reply message. +Therefore, such an approach cannot be used together with a `@Publisher`, so a `@Payload` annotation with respective SpEL expression against method arguments is a recommended way for this combination. +See more information about the `@Publisher` in the <<./message-publishing.adoc#publisher-annotation, Annotation-driven Configuration>> section. + + [[amqp-debatching]] ==== Batched Messages diff --git a/src/reference/asciidoc/event.adoc b/src/reference/asciidoc/event.adoc index 5c0ee86e4d7..a54a670822e 100644 --- a/src/reference/asciidoc/event.adoc +++ b/src/reference/asciidoc/event.adoc @@ -177,3 +177,31 @@ public IntegrationFlow eventsOutFlow(ApplicationEventPublishingMessageHandler ev } ---- ==== + +The `@Publisher` annotation also can be used in combination with an `@EventListener`: + +==== +[source, java] +---- +@Configuration +@EnableIntegration +@EnablePublisher +public static class ContextConfiguration { + + @Bean + QueueChannel eventFromPublisher() { + return new QueueChannel(); + } + + @EventListener + @Publisher("eventFromPublisher") + public String publishEventToChannel(TestApplicationEvent3 testApplicationEvent3) { + return testApplicationEvent3.getSource().toString(); + } + +} +---- +==== + +In this case a return value of the event listener method is used as a payload for a `Message` to be published to that `eventFromPublisher` channel. +See more information about the `@Publisher` in the <<./message-publishing.adoc#publisher-annotation, Annotation-driven Configuration>> section.