Skip to content

GH-3763: Add handleReactive() for Java DSL #8605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.MessageProcessor;
import org.springframework.integration.handler.MessageTriggerAction;
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.integration.router.ErrorMessageExceptionTypeRouter;
Expand All @@ -91,6 +92,7 @@
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.ReactiveMessageHandler;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.InterceptableChannel;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -891,7 +893,8 @@ public <P> B filter(@Nullable Class<P> expectedType, GenericSelector<P> genericS

/**
* Populate a {@link ServiceActivatingHandler} for the selected protocol specific
* {@link MessageHandler} implementation from {@code Namespace Factory}:
* {@link MessageHandler} implementation
* from the respective namespace factory (e.g. {@code Http, Kafka, Files}):
* <pre class="code">
* {@code
* .handle(Amqp.outboundAdapter(this.amqpTemplate).routingKeyExpression("headers.routingKey"))
Expand Down Expand Up @@ -1097,7 +1100,8 @@ public B handle(MessageProcessorSpec<?> messageProcessorSpec,

/**
* Populate a {@link ServiceActivatingHandler} for the selected protocol specific
* {@link MessageHandler} implementation from {@code Namespace Factory}:
* {@link MessageHandler} implementation
* from the respective namespace factory (e.g. {@code Http, Kafka, Files}).
* In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}.
* Typically, used with a Lambda expression:
* <pre class="code">
Expand Down Expand Up @@ -1221,7 +1225,7 @@ public B enrich(Consumer<EnricherSpec> enricherConfigurer) {
* Populate a {@link MessageTransformingHandler} for
* a {@link org.springframework.integration.transformer.HeaderEnricher}
* using header values from provided {@link MapBuilder}.
* Can be used together with {@code Namespace Factory}:
* Can be used together with a namespace factory:
* <pre class="code">
* {@code
* .enrichHeaders(Mail.headers()
Expand All @@ -1242,7 +1246,7 @@ public B enrichHeaders(MapBuilder<?, String, Object> headers) {
* a {@link org.springframework.integration.transformer.HeaderEnricher}
* using header values from provided {@link MapBuilder}.
* In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}.
* Can be used together with {@code Namespace Factory}:
* Can be used together with a namespace factory:
* <pre class="code">
* {@code
* .enrichHeaders(Mail.headers()
Expand Down Expand Up @@ -2916,6 +2920,68 @@ public IntegrationFlow nullChannel() {
.get();
}

/**
* Populate a terminal consumer endpoint for the selected protocol specific
* {@link MessageHandler} implementation
* from the respective namespace factory (e.g. {@code Http, Kafka, Files}).
* In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}.
* @param messageHandlerSpec the {@link MessageHandlerSpec} to configure the protocol specific
* {@link MessageHandler}.
* @param <H> the {@link MessageHandler} type.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @since 6.1
*/
public <H extends ReactiveMessageHandler> IntegrationFlow handleReactive(
ReactiveMessageHandlerSpec<?, H> messageHandlerSpec) {

return handleReactive(messageHandlerSpec, null);
}

/**
* Populate a terminal consumer endpoint for the selected protocol specific
* {@link MessageHandler} implementation
* from the respective namespace factory (e.g. {@code Http, Kafka, Files}).
* In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}.
* @param messageHandlerSpec the {@link MessageHandlerSpec} to configure the protocol specific
* {@link MessageHandler}.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @param <H> the {@link MessageHandler} type.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @since 6.1
*/
public <H extends ReactiveMessageHandler> IntegrationFlow handleReactive(
ReactiveMessageHandlerSpec<?, H> messageHandlerSpec,
@Nullable Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> endpointConfigurer) {

return
addComponents(messageHandlerSpec.getComponentsToRegister()).
handleReactive(messageHandlerSpec.getObject().getDelegate(), endpointConfigurer);
}

/**
* Add a {@link ReactiveMessageHandler} as a terminal {@link IntegrationFlow} operator.
* @param reactiveMessageHandler the {@link ReactiveMessageHandler} to finish the flow.
* @return The {@link IntegrationFlow} instance based on this definition.
* @since 6.1
*/
public IntegrationFlow handleReactive(ReactiveMessageHandler reactiveMessageHandler) {
return handleReactive(reactiveMessageHandler, null);
}

/**
* Add a {@link ReactiveMessageHandler} as a terminal {@link IntegrationFlow} operator.
* @param reactiveMessageHandler the {@link ReactiveMessageHandler} to finish the flow.
* @param endpointConfigurer the {@link Consumer} to configure a target endpoint for the handler.
* @return The {@link IntegrationFlow} instance based on this definition.
* @since 6.1
*/
public IntegrationFlow handleReactive(ReactiveMessageHandler reactiveMessageHandler,
@Nullable Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> endpointConfigurer) {

return handle(new ReactiveMessageHandlerAdapter(reactiveMessageHandler), endpointConfigurer)
.get();
}

/**
* Finish this flow with delegation to other {@link IntegrationFlow} instance.
* @param other the {@link IntegrationFlow} to compose with.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
Expand Down Expand Up @@ -718,8 +717,7 @@ public MessageHandler loggingMessageHandler() {
public IntegrationFlow wireTapFlow1() {
return IntegrationFlow.from("tappedChannel1")
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
.handle(new ReactiveMessageHandlerAdapter((message) -> Mono.just(message).log().then()))
.get();
.handleReactive((message) -> Mono.just(message).log().then());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway(
public IntegrationFlow reactiveStore() {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(REACTIVE_MONGO_DATABASE_FACTORY));
.handleReactive(MongoDb.reactiveOutboundChannelAdapter(REACTIVE_MONGO_DATABASE_FACTORY));
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -114,12 +114,12 @@ IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
e -> e.poller(p -> p.fixedDelay(100)).autoStartup(false).id("r2dbcInboundChannelAdapter"))
.<Mono<?>>handle((p, h) -> p, e -> e.async(true))
.channel(MessageChannels.flux())
.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.queryType(R2dbcMessageHandler.Type.UPDATE)
.tableNameExpression("payload.class.simpleName")
.criteria((message) -> Criteria.where("id").is(2))
.values("{age:36}"))
.get();
.handleReactive(
R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.queryType(R2dbcMessageHandler.Type.UPDATE)
.tableNameExpression("payload.class.simpleName")
.criteria((message) -> Criteria.where("id").is(2))
.values("{age:36}"));
}

}
Expand Down
4 changes: 2 additions & 2 deletions src/reference/asciidoc/r2dbc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ With Java DSL a configuration for this channel adapter is like this:
====
[source, java]
----
.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.queryType(R2dbcMessageHandler.Type.UPDATE)
.tableNameExpression("payload.class.simpleName")
.criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
.values("{age:36}"))
----
====
====
23 changes: 23 additions & 0 deletions src/reference/asciidoc/reactive-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,29 @@ However, when a `ReactiveStreamsConsumer` is involved in the flow (e.g. when cha
One of the out-of-the-box `ReactiveMessageHandler` implementation is a `ReactiveMongoDbStoringMessageHandler` for Outbound Channel Adapter.
See <<./mongodb.adoc#mongodb-reactive-channel-adapters,MongoDB Reactive Channel Adapters>> for more information.

Starting with version 6.1, the `IntegrationFlowDefinition` exposes a convenient `handleReactive(ReactiveMessageHandler)` terminal operator.
Any `ReactiveMessageHandler` implementation (even just a plain lambda using the `Mono` API) can be used for this operator.
The framework subscribes to the returned `Mono<Void>` automatically.
Here is a simple sample of possible configuration for this operator:

====
[source, java]
----
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlow.from("tappedChannel1")
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
.handleReactive((message) -> Mono.just(message).log().then());
}
----
====

An overloaded version of this operator accepts a `Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>` to customize a consumer endpoint around the provided `ReactiveMessageHandler`.

In addition, a `ReactiveMessageHandlerSpec`-based variants are also provided.
In most cases they are used for protocol-specific channel adapter implementations.
See the next section following links to the target technologies with respective reactive channel adapters.

[[reactive-channel-adapters]]
=== Reactive Channel Adapters

Expand Down
5 changes: 5 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ See <<./zip.adoc#zip,Zip Support>> for more information.
The `ContextHolderRequestHandlerAdvice` allows to store a value from a request message into some context around `MessageHandler` execution.
See <<./handler-advice.adoc#context-holder-advice, Context Holder Advice>> for more information.

[[x6.1-handle-reactive]]
==== The `handleReactive()` operator for Java DSL
The `IntegrationFlow` can now end with a convenient `handleReactive(ReactiveMessageHandler)` operator.
See <<./reactive-streams.adoc#reactive-message-handler, `ReactiveMessageHandler`>> for more information.

[[x6.1-general]]
=== General Changes

Expand Down