diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java index a96ba7c88fe..20f139c1215 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java @@ -75,6 +75,8 @@ public abstract class AbstractMessageProducingHandler extends AbstractMessageHan private boolean async; + private boolean asyncExplicitlySet; + @Nullable private String outputChannelName; @@ -119,6 +121,7 @@ public void setOutputChannelName(String outputChannelName) { */ public final void setAsync(boolean async) { this.async = async; + this.asyncExplicitlySet = true; } /** @@ -213,6 +216,13 @@ protected void onInit() { this.messagingTemplate.setBeanFactory(beanFactory); } this.messagingTemplate.setDestinationResolver(getChannelResolver()); + setAsyncIfCan(); + } + + private void setAsyncIfCan() { + if (!this.asyncExplicitlySet) { + setAsync(this.outputChannel instanceof ReactiveStreamsSubscribableChannel); + } } @Override @@ -222,6 +232,7 @@ public MessageChannel getOutputChannel() { if (channelName != null) { this.outputChannel = getChannelResolver().resolveDestination(channelName); this.outputChannelName = null; + setAsyncIfCan(); } return this.outputChannel; } diff --git a/spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/dsl/R2dbcDslTests.java b/spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/dsl/R2dbcDslTests.java index 53706c15627..65ab7fbf38f 100644 --- a/spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/dsl/R2dbcDslTests.java +++ b/spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/dsl/R2dbcDslTests.java @@ -22,7 +22,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import org.springframework.beans.factory.annotation.Autowired; @@ -112,7 +111,7 @@ IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) { .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId())), e -> e.poller(p -> p.fixedDelay(100)).autoStartup(false).id("r2dbcInboundChannelAdapter")) - .>handle((p, h) -> p, e -> e.async(true)) + .handle((p, h) -> p) .channel(MessageChannels.flux()) .handleReactive( R2dbc.outboundChannelAdapter(r2dbcEntityTemplate) diff --git a/src/reference/asciidoc/r2dbc.adoc b/src/reference/asciidoc/r2dbc.adoc index 2f9e90cd9fc..0bdd8b84231 100644 --- a/src/reference/asciidoc/r2dbc.adoc +++ b/src/reference/asciidoc/r2dbc.adoc @@ -71,7 +71,7 @@ IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) { .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId())), e -> e.poller(p -> p.fixedDelay(100))) - .>handle((p, h) -> p, e -> e.async(true)) + .handle((p, h) -> p) .channel(MessageChannels.flux()) .get(); } diff --git a/src/reference/asciidoc/reactive-streams.adoc b/src/reference/asciidoc/reactive-streams.adoc index 070cdd8d2bf..574002dcef2 100644 --- a/src/reference/asciidoc/reactive-streams.adoc +++ b/src/reference/asciidoc/reactive-streams.adoc @@ -42,7 +42,7 @@ See the next section for more information. === Reactive Reply Payload -When a reply producing `MessageHandler` returns a reactive type payload for a reply message, it is processed in an asynchronous manner with a regular `MessageChannel` implementation provided for the `outputChannel` and flattened with on demand subscription when the output channel is a `ReactiveStreamsSubscribableChannel` implementation, e.g. `FluxMessageChannel`. +When a reply producing `MessageHandler` returns a reactive type payload for a reply message, it is processed in an asynchronous manner with a regular `MessageChannel` implementation provided for the `outputChannel` (the `async` must be set to `true`) and flattened with on demand subscription when the output channel is a `ReactiveStreamsSubscribableChannel` implementation, e.g. `FluxMessageChannel`. With a standard imperative `MessageChannel` use-case, and if a reply payload is a *multi-value* publisher (see `ReactiveAdapter.isMultiValue()` for more information), it is wrapped into a `Mono.just()`. A result of this, the `Mono` has to be subscribed explicitly downstream or flattened by the `FluxMessageChannel` downstream. With a `ReactiveStreamsSubscribableChannel` for the `outputChannel`, there is no need to be concerned about return type and subscription; everything is processed smoothly by the framework internally. diff --git a/src/reference/asciidoc/service-activator.adoc b/src/reference/asciidoc/service-activator.adoc index d8f84509e51..3b76c99878b 100644 --- a/src/reference/asciidoc/service-activator.adoc +++ b/src/reference/asciidoc/service-activator.adoc @@ -225,6 +225,10 @@ If the service completes the future with an `Exception`, normal error processing An `ErrorMessage` is sent to the `errorChannel` message header, if present. Otherwise, an `ErrorMessage` is sent to the default `errorChannel` (if available). +Starting with version 6.1, if the output channel of the `AbstractMessageProducingHandler` is configured to a `ReactiveStreamsSubscribableChannel`, the async mode is turned on by default. +If the handler result is not a reactive type or `CompletableFuture`, then regular reply producing process happens despite the output channel type. + +See also <<./reactive-streams.adoc#reactive-streams,Reactive Streams Support>> for more information. [[service-activator-return-type]] ==== Service Activator and Method Return Type diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 3c313954e90..0ea1a793362 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -51,6 +51,9 @@ Since `IntegrationComponentSpec` is a `FactoryBean`, its bean definition must st The Java DSL and the framework by itself will manage the `IntegrationComponentSpec` lifecycle. See <<./dsl.adoc#java-dsl, Java DSL>> for more information. + - The `AbstractMessageProducingHandler` is marked as an `async` by default if its output channel is configured to a `ReactiveStreamsSubscribableChannel`. +See <<./service-activator.adoc#async-service-activator,Asynchronous Service Activator>> for more information. + [[x6.1-web-sockets]] === Web Sockets Changes