Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
Expand All @@ -56,8 +59,10 @@
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.annotation.Publisher;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.EnablePublisher;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.Transformers;
Expand All @@ -68,6 +73,7 @@
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
Expand All @@ -87,7 +93,7 @@
@SpringJUnitConfig
@RabbitAvailable(queues = {"amqpOutboundInput", "amqpReplyChannel", "asyncReplies",
"defaultReplyTo", "si.dsl.test", "si.dsl.exception.test.dlq",
"si.dsl.conv.exception.test.dlq", "testTemplateChannelTransacted"})
"si.dsl.conv.exception.test.dlq", "testTemplateChannelTransacted", "publisherQueue"})
@DirtiesContext
public class AmqpTests {

Expand Down Expand Up @@ -282,8 +288,23 @@ void streamContainer() {
verify(customizer).accept(any(), any());
}

@Autowired
QueueChannel fromRabbitViaPublisher;

@Test
void messageReceivedFromRabbitListenerViaPublisher() {
this.amqpTemplate.convertAndSend("publisherQueue", "test data");

Message<?> receive = this.fromRabbitViaPublisher.receive(10_000);
assertThat(receive).isNotNull()
.extracting(Message::getPayload)
.isEqualTo("TEST DATA");
}

@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

@Bean
Expand Down Expand Up @@ -495,6 +516,27 @@ public AmqpHeaderMapper mapperOut() {
return DefaultAmqpHeaderMapper.outboundMapper();
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory rabbitConnectionFactory) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory);
return factory;
}

@Bean
QueueChannel fromRabbitViaPublisher() {
return new QueueChannel();
}

@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue("publisherQueue"))
@Publisher("fromRabbitViaPublisher")
@Payload("#args.payload.toUpperCase()")
public void consumeForPublisher(String payload) {

}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,8 +26,11 @@
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.Publisher;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.EnablePublisher;
import org.springframework.integration.core.GenericHandler;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
Expand Down Expand Up @@ -84,6 +87,9 @@ public static void setup() {
@Autowired
private ApplicationEvents applicationEvents;

@Autowired
QueueChannel eventFromPublisher;

@Test
public void testEventsFlow() {
assertThat(this.applicationEvents.stream(MessagingEvent.class)).isEmpty();
Expand Down Expand Up @@ -115,9 +121,17 @@ public void testDelayRescheduling() {
assertThat(messageGroupStore.getMessageCountForAllMessageGroups()).isEqualTo(0);
}

@Test
public void eventFromPublisherAnnotation() {
this.applicationContext.publishEvent(new TestApplicationEvent3());
Message<?> receive = this.eventFromPublisher.receive(10000);
assertThat(receive).isNotNull()
.extracting(Message::getPayload).isEqualTo("TestApplicationEvent3");
}

@Configuration
@EnableIntegration
@EnablePublisher
public static class ContextConfiguration {

@Bean
Expand Down Expand Up @@ -176,6 +190,17 @@ public IntegrationFlow delayFlow() {
.channel(MessageChannels.queue("delayedResults"));
}

@Bean
QueueChannel eventFromPublisher() {
return new QueueChannel();
}

@EventListener
@Publisher("eventFromPublisher")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this would work, do we really want to promote it? An ApplicationEventListeningMessageProducer is a much simpler approach.

public String publishEventToChannel(TestApplicationEvent3 testApplicationEvent3) {
return testApplicationEvent3.getSource().toString();
}

}

@SuppressWarnings("serial")
Expand All @@ -196,4 +221,13 @@ private static final class TestApplicationEvent2 extends ApplicationEvent {

}

@SuppressWarnings("serial")
private static final class TestApplicationEvent3 extends ApplicationEvent {

TestApplicationEvent3() {
super("TestApplicationEvent3");
}

}

}
33 changes: 33 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,39 @@ In that regard, it is more similar to the JMS message-driven channel adapter.
Starting with version 5.5, the `AmqpInboundChannelAdapter` can be configured with an `org.springframework.amqp.rabbit.retry.MessageRecoverer` strategy which is used in the `RecoveryCallback` when the retry operation is called internally.
See `setMessageRecoverer()` JavaDocs for more information.

The `@Publisher` annotation also can be used in combination with a `@RabbitListener`:

====
[source, java]
----
@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

@Bean
QueueChannel fromRabbitViaPublisher() {
return new QueueChannel();
}

@RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
@Publisher("fromRabbitViaPublisher")
@Payload("#args.payload.toUpperCase()")
public void consumeForPublisher(String payload) {

}

}
----
====

By default, the `@Publisher` AOP interceptor deals with a return value from a method call.
However, the return value from a `@RabbitListener` method is treated as an AMQP reply message.
Therefore, such an approach cannot be used together with a `@Publisher`, so a `@Payload` annotation with respective SpEL expression against method arguments is a recommended way for this combination.
See more information about the `@Publisher` in the <<./message-publishing.adoc#publisher-annotation, Annotation-driven Configuration>> section.


[[amqp-debatching]]
==== Batched Messages

Expand Down
28 changes: 28 additions & 0 deletions src/reference/asciidoc/event.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,31 @@ public IntegrationFlow eventsOutFlow(ApplicationEventPublishingMessageHandler ev
}
----
====

The `@Publisher` annotation also can be used in combination with an `@EventListener`:

====
[source, java]
----
@Configuration
@EnableIntegration
@EnablePublisher
public static class ContextConfiguration {

@Bean
QueueChannel eventFromPublisher() {
return new QueueChannel();
}

@EventListener
@Publisher("eventFromPublisher")
public String publishEventToChannel(TestApplicationEvent3 testApplicationEvent3) {
return testApplicationEvent3.getSource().toString();
}

}
----
====

In this case a return value of the event listener method is used as a payload for a `Message` to be published to that `eventFromPublisher` channel.
See more information about the `@Publisher` in the <<./message-publishing.adoc#publisher-annotation, Annotation-driven Configuration>> section.