Skip to content

Commit 59e676a

Browse files
authored
Demonstrate @Publisher with an @EventListener (#8603)
* Demonstrate `@Publisher` with an `@EventListener` The `@EventListener` from Spring Framework is a good tool for POJO configuration to subscribe to events from an `ApplicationContext`. The `@Publisher` is an AOP tool to publish a `Message` from POJO method call. * Add a test and docs to demonstrate how the `@Publisher` can be used together with an `@EventListener` * Add `@Publisher` with a `@RabbitListener` sample in docs * * Fix FQCN for `@Queue` in the `AmqpTests`
1 parent 90bc65e commit 59e676a

File tree

4 files changed

+139
-2
lines changed

4 files changed

+139
-2
lines changed

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import org.springframework.amqp.core.MessageDeliveryMode;
3232
import org.springframework.amqp.core.Queue;
3333
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
34+
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
35+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
36+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
3437
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
3538
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
3639
import org.springframework.amqp.rabbit.core.RabbitAdmin;
@@ -56,8 +59,10 @@
5659
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
5760
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
5861
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
62+
import org.springframework.integration.annotation.Publisher;
5963
import org.springframework.integration.channel.QueueChannel;
6064
import org.springframework.integration.config.EnableIntegration;
65+
import org.springframework.integration.config.EnablePublisher;
6166
import org.springframework.integration.dsl.IntegrationFlow;
6267
import org.springframework.integration.dsl.IntegrationFlowBuilder;
6368
import org.springframework.integration.dsl.Transformers;
@@ -68,6 +73,7 @@
6873
import org.springframework.messaging.Message;
6974
import org.springframework.messaging.MessageChannel;
7075
import org.springframework.messaging.PollableChannel;
76+
import org.springframework.messaging.handler.annotation.Payload;
7177
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
7278
import org.springframework.test.annotation.DirtiesContext;
7379
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
@@ -87,7 +93,7 @@
8793
@SpringJUnitConfig
8894
@RabbitAvailable(queues = {"amqpOutboundInput", "amqpReplyChannel", "asyncReplies",
8995
"defaultReplyTo", "si.dsl.test", "si.dsl.exception.test.dlq",
90-
"si.dsl.conv.exception.test.dlq", "testTemplateChannelTransacted"})
96+
"si.dsl.conv.exception.test.dlq", "testTemplateChannelTransacted", "publisherQueue"})
9197
@DirtiesContext
9298
public class AmqpTests {
9399

@@ -282,8 +288,23 @@ void streamContainer() {
282288
verify(customizer).accept(any(), any());
283289
}
284290

291+
@Autowired
292+
QueueChannel fromRabbitViaPublisher;
293+
294+
@Test
295+
void messageReceivedFromRabbitListenerViaPublisher() {
296+
this.amqpTemplate.convertAndSend("publisherQueue", "test data");
297+
298+
Message<?> receive = this.fromRabbitViaPublisher.receive(10_000);
299+
assertThat(receive).isNotNull()
300+
.extracting(Message::getPayload)
301+
.isEqualTo("TEST DATA");
302+
}
303+
285304
@Configuration
286305
@EnableIntegration
306+
@EnableRabbit
307+
@EnablePublisher
287308
public static class ContextConfiguration {
288309

289310
@Bean
@@ -495,6 +516,27 @@ public AmqpHeaderMapper mapperOut() {
495516
return DefaultAmqpHeaderMapper.outboundMapper();
496517
}
497518

519+
@Bean
520+
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
521+
ConnectionFactory rabbitConnectionFactory) {
522+
523+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
524+
factory.setConnectionFactory(rabbitConnectionFactory);
525+
return factory;
526+
}
527+
528+
@Bean
529+
QueueChannel fromRabbitViaPublisher() {
530+
return new QueueChannel();
531+
}
532+
533+
@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue("publisherQueue"))
534+
@Publisher("fromRabbitViaPublisher")
535+
@Payload("#args.payload.toUpperCase()")
536+
public void consumeForPublisher(String payload) {
537+
538+
}
539+
498540
}
499541

500542
}

spring-integration-event/src/test/java/org/springframework/integration/event/dsl/IntegrationFlowEventsTests.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,8 +26,11 @@
2626
import org.springframework.context.ApplicationListener;
2727
import org.springframework.context.annotation.Bean;
2828
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.context.event.EventListener;
30+
import org.springframework.integration.annotation.Publisher;
2931
import org.springframework.integration.channel.QueueChannel;
3032
import org.springframework.integration.config.EnableIntegration;
33+
import org.springframework.integration.config.EnablePublisher;
3134
import org.springframework.integration.core.GenericHandler;
3235
import org.springframework.integration.dsl.IntegrationFlow;
3336
import org.springframework.integration.dsl.MessageChannels;
@@ -84,6 +87,9 @@ public static void setup() {
8487
@Autowired
8588
private ApplicationEvents applicationEvents;
8689

90+
@Autowired
91+
QueueChannel eventFromPublisher;
92+
8793
@Test
8894
public void testEventsFlow() {
8995
assertThat(this.applicationEvents.stream(MessagingEvent.class)).isEmpty();
@@ -115,9 +121,17 @@ public void testDelayRescheduling() {
115121
assertThat(messageGroupStore.getMessageCountForAllMessageGroups()).isEqualTo(0);
116122
}
117123

124+
@Test
125+
public void eventFromPublisherAnnotation() {
126+
this.applicationContext.publishEvent(new TestApplicationEvent3());
127+
Message<?> receive = this.eventFromPublisher.receive(10000);
128+
assertThat(receive).isNotNull()
129+
.extracting(Message::getPayload).isEqualTo("TestApplicationEvent3");
130+
}
118131

119132
@Configuration
120133
@EnableIntegration
134+
@EnablePublisher
121135
public static class ContextConfiguration {
122136

123137
@Bean
@@ -176,6 +190,17 @@ public IntegrationFlow delayFlow() {
176190
.channel(MessageChannels.queue("delayedResults"));
177191
}
178192

193+
@Bean
194+
QueueChannel eventFromPublisher() {
195+
return new QueueChannel();
196+
}
197+
198+
@EventListener
199+
@Publisher("eventFromPublisher")
200+
public String publishEventToChannel(TestApplicationEvent3 testApplicationEvent3) {
201+
return testApplicationEvent3.getSource().toString();
202+
}
203+
179204
}
180205

181206
@SuppressWarnings("serial")
@@ -196,4 +221,13 @@ private static final class TestApplicationEvent2 extends ApplicationEvent {
196221

197222
}
198223

224+
@SuppressWarnings("serial")
225+
private static final class TestApplicationEvent3 extends ApplicationEvent {
226+
227+
TestApplicationEvent3() {
228+
super("TestApplicationEvent3");
229+
}
230+
231+
}
232+
199233
}

src/reference/asciidoc/amqp.adoc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,39 @@ In that regard, it is more similar to the JMS message-driven channel adapter.
253253
Starting with version 5.5, the `AmqpInboundChannelAdapter` can be configured with an `org.springframework.amqp.rabbit.retry.MessageRecoverer` strategy which is used in the `RecoveryCallback` when the retry operation is called internally.
254254
See `setMessageRecoverer()` JavaDocs for more information.
255255

256+
The `@Publisher` annotation also can be used in combination with a `@RabbitListener`:
257+
258+
====
259+
[source, java]
260+
----
261+
@Configuration
262+
@EnableIntegration
263+
@EnableRabbit
264+
@EnablePublisher
265+
public static class ContextConfiguration {
266+
267+
@Bean
268+
QueueChannel fromRabbitViaPublisher() {
269+
return new QueueChannel();
270+
}
271+
272+
@RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
273+
@Publisher("fromRabbitViaPublisher")
274+
@Payload("#args.payload.toUpperCase()")
275+
public void consumeForPublisher(String payload) {
276+
277+
}
278+
279+
}
280+
----
281+
====
282+
283+
By default, the `@Publisher` AOP interceptor deals with a return value from a method call.
284+
However, the return value from a `@RabbitListener` method is treated as an AMQP reply message.
285+
Therefore, such an approach cannot be used together with a `@Publisher`, so a `@Payload` annotation with respective SpEL expression against method arguments is a recommended way for this combination.
286+
See more information about the `@Publisher` in the <<./message-publishing.adoc#publisher-annotation, Annotation-driven Configuration>> section.
287+
288+
256289
[[amqp-debatching]]
257290
==== Batched Messages
258291

src/reference/asciidoc/event.adoc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,3 +177,31 @@ public IntegrationFlow eventsOutFlow(ApplicationEventPublishingMessageHandler ev
177177
}
178178
----
179179
====
180+
181+
The `@Publisher` annotation also can be used in combination with an `@EventListener`:
182+
183+
====
184+
[source, java]
185+
----
186+
@Configuration
187+
@EnableIntegration
188+
@EnablePublisher
189+
public static class ContextConfiguration {
190+
191+
@Bean
192+
QueueChannel eventFromPublisher() {
193+
return new QueueChannel();
194+
}
195+
196+
@EventListener
197+
@Publisher("eventFromPublisher")
198+
public String publishEventToChannel(TestApplicationEvent3 testApplicationEvent3) {
199+
return testApplicationEvent3.getSource().toString();
200+
}
201+
202+
}
203+
----
204+
====
205+
206+
In this case a return value of the event listener method is used as a payload for a `Message` to be published to that `eventFromPublisher` channel.
207+
See more information about the `@Publisher` in the <<./message-publishing.adoc#publisher-annotation, Annotation-driven Configuration>> section.

0 commit comments

Comments
 (0)