Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public abstract class AbstractMessageProducingHandler extends AbstractMessageHan

private boolean async;

private boolean asyncExplicitlySet;

@Nullable
private String outputChannelName;

Expand Down Expand Up @@ -119,6 +121,7 @@ public void setOutputChannelName(String outputChannelName) {
*/
public final void setAsync(boolean async) {
this.async = async;
this.asyncExplicitlySet = true;
}

/**
Expand Down Expand Up @@ -213,6 +216,13 @@ protected void onInit() {
this.messagingTemplate.setBeanFactory(beanFactory);
}
this.messagingTemplate.setDestinationResolver(getChannelResolver());
setAsyncIfCan();
}

private void setAsyncIfCan() {
if (!this.asyncExplicitlySet) {
setAsync(this.outputChannel instanceof ReactiveStreamsSubscribableChannel);
}
}

@Override
Expand All @@ -222,6 +232,7 @@ public MessageChannel getOutputChannel() {
if (channelName != null) {
this.outputChannel = getChannelResolver().resolveDestination(channelName);
this.outputChannelName = null;
setAsyncIfCan();
}
return this.outputChannel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -112,7 +111,7 @@ IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
.bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
bindSpec.bind("id", o.getId())),
e -> e.poller(p -> p.fixedDelay(100)).autoStartup(false).id("r2dbcInboundChannelAdapter"))
.<Mono<?>>handle((p, h) -> p, e -> e.async(true))
.handle((p, h) -> p)
.channel(MessageChannels.flux())
.handleReactive(
R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
Expand Down
2 changes: 1 addition & 1 deletion src/reference/asciidoc/r2dbc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
.bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
bindSpec.bind("id", o.getId())),
e -> e.poller(p -> p.fixedDelay(100)))
.<Mono<?>>handle((p, h) -> p, e -> e.async(true))
.handle((p, h) -> p)
.channel(MessageChannels.flux())
.get();
}
Expand Down
2 changes: 1 addition & 1 deletion src/reference/asciidoc/reactive-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ See the next section for more information.

=== Reactive Reply Payload

When a reply producing `MessageHandler` returns a reactive type payload for a reply message, it is processed in an asynchronous manner with a regular `MessageChannel` implementation provided for the `outputChannel` and flattened with on demand subscription when the output channel is a `ReactiveStreamsSubscribableChannel` implementation, e.g. `FluxMessageChannel`.
When a reply producing `MessageHandler` returns a reactive type payload for a reply message, it is processed in an asynchronous manner with a regular `MessageChannel` implementation provided for the `outputChannel` (the `async` must be set to `true`) and flattened with on demand subscription when the output channel is a `ReactiveStreamsSubscribableChannel` implementation, e.g. `FluxMessageChannel`.
With a standard imperative `MessageChannel` use-case, and if a reply payload is a *multi-value* publisher (see `ReactiveAdapter.isMultiValue()` for more information), it is wrapped into a `Mono.just()`.
A result of this, the `Mono` has to be subscribed explicitly downstream or flattened by the `FluxMessageChannel` downstream.
With a `ReactiveStreamsSubscribableChannel` for the `outputChannel`, there is no need to be concerned about return type and subscription; everything is processed smoothly by the framework internally.
Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/service-activator.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ If the service completes the future with an `Exception`, normal error processing
An `ErrorMessage` is sent to the `errorChannel` message header, if present.
Otherwise, an `ErrorMessage` is sent to the default `errorChannel` (if available).

Starting with version 6.1, if the output channel of the `AbstractMessageProducingHandler` is configured to a `ReactiveStreamsSubscribableChannel`, the async mode is turned on by default.
If the handler result is not a reactive type or `CompletableFuture<?>`, then regular reply producing process happens despite the output channel type.

See also <<./reactive-streams.adoc#reactive-streams,Reactive Streams Support>> for more information.

[[service-activator-return-type]]
==== Service Activator and Method Return Type
Expand Down
3 changes: 3 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ Since `IntegrationComponentSpec` is a `FactoryBean`, its bean definition must st
The Java DSL and the framework by itself will manage the `IntegrationComponentSpec` lifecycle.
See <<./dsl.adoc#java-dsl, Java DSL>> for more information.

- The `AbstractMessageProducingHandler` is marked as an `async` by default if its output channel is configured to a `ReactiveStreamsSubscribableChannel`.
See <<./service-activator.adoc#async-service-activator,Asynchronous Service Activator>> for more information.

[[x6.1-web-sockets]]
=== Web Sockets Changes

Expand Down