diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java index f623d4c423b..abd762cc622 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java @@ -65,6 +65,7 @@ import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.handler.MessageProcessor; import org.springframework.integration.handler.MessageTriggerAction; +import org.springframework.integration.handler.ReactiveMessageHandlerAdapter; import org.springframework.integration.handler.ServiceActivatingHandler; import org.springframework.integration.router.AbstractMessageRouter; import org.springframework.integration.router.ErrorMessageExceptionTypeRouter; @@ -91,6 +92,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.ReactiveMessageHandler; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.messaging.support.InterceptableChannel; import org.springframework.util.Assert; @@ -891,7 +893,8 @@ public
B filter(@Nullable Class
expectedType, GenericSelector
genericS /** * Populate a {@link ServiceActivatingHandler} for the selected protocol specific - * {@link MessageHandler} implementation from {@code Namespace Factory}: + * {@link MessageHandler} implementation + * from the respective namespace factory (e.g. {@code Http, Kafka, Files}): *
* {@code * .handle(Amqp.outboundAdapter(this.amqpTemplate).routingKeyExpression("headers.routingKey")) @@ -1097,7 +1100,8 @@ public B handle(MessageProcessorSpec> messageProcessorSpec, /** * Populate a {@link ServiceActivatingHandler} for the selected protocol specific - * {@link MessageHandler} implementation from {@code Namespace Factory}: + * {@link MessageHandler} implementation + * from the respective namespace factory (e.g. {@code Http, Kafka, Files}). * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}. * Typically, used with a Lambda expression: *@@ -1221,7 +1225,7 @@ public B enrich(ConsumerenricherConfigurer) { * Populate a {@link MessageTransformingHandler} for * a {@link org.springframework.integration.transformer.HeaderEnricher} * using header values from provided {@link MapBuilder}. - * Can be used together with {@code Namespace Factory}: + * Can be used together with a namespace factory: * * {@code * .enrichHeaders(Mail.headers() @@ -1242,7 +1246,7 @@ public B enrichHeaders(MapBuilder, String, Object> headers) { * a {@link org.springframework.integration.transformer.HeaderEnricher} * using header values from provided {@link MapBuilder}. * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}. - * Can be used together with {@code Namespace Factory}: + * Can be used together with a namespace factory: ** {@code * .enrichHeaders(Mail.headers() @@ -2916,6 +2920,68 @@ public IntegrationFlow nullChannel() { .get(); } + /** + * Populate a terminal consumer endpoint for the selected protocol specific + * {@link MessageHandler} implementation + * from the respective namespace factory (e.g. {@code Http, Kafka, Files}). + * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param messageHandlerSpec the {@link MessageHandlerSpec} to configure the protocol specific + * {@link MessageHandler}. + * @paramthe {@link MessageHandler} type. + * @return the current {@link BaseIntegrationFlowDefinition}. + * @since 6.1 + */ + public IntegrationFlow handleReactive( + ReactiveMessageHandlerSpec, H> messageHandlerSpec) { + + return handleReactive(messageHandlerSpec, null); + } + + /** + * Populate a terminal consumer endpoint for the selected protocol specific + * {@link MessageHandler} implementation + * from the respective namespace factory (e.g. {@code Http, Kafka, Files}). + * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param messageHandlerSpec the {@link MessageHandlerSpec} to configure the protocol specific + * {@link MessageHandler}. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param the {@link MessageHandler} type. + * @return the current {@link BaseIntegrationFlowDefinition}. + * @since 6.1 + */ + public IntegrationFlow handleReactive( + ReactiveMessageHandlerSpec, H> messageHandlerSpec, + @Nullable Consumer > endpointConfigurer) { + + return + addComponents(messageHandlerSpec.getComponentsToRegister()). + handleReactive(messageHandlerSpec.getObject().getDelegate(), endpointConfigurer); + } + + /** + * Add a {@link ReactiveMessageHandler} as a terminal {@link IntegrationFlow} operator. + * @param reactiveMessageHandler the {@link ReactiveMessageHandler} to finish the flow. + * @return The {@link IntegrationFlow} instance based on this definition. + * @since 6.1 + */ + public IntegrationFlow handleReactive(ReactiveMessageHandler reactiveMessageHandler) { + return handleReactive(reactiveMessageHandler, null); + } + + /** + * Add a {@link ReactiveMessageHandler} as a terminal {@link IntegrationFlow} operator. + * @param reactiveMessageHandler the {@link ReactiveMessageHandler} to finish the flow. + * @param endpointConfigurer the {@link Consumer} to configure a target endpoint for the handler. + * @return The {@link IntegrationFlow} instance based on this definition. + * @since 6.1 + */ + public IntegrationFlow handleReactive(ReactiveMessageHandler reactiveMessageHandler, + @Nullable Consumer > endpointConfigurer) { + + return handle(new ReactiveMessageHandlerAdapter(reactiveMessageHandler), endpointConfigurer) + .get(); + } + /** * Finish this flow with delegation to other {@link IntegrationFlow} instance. * @param other the {@link IntegrationFlow} to compose with. diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java index c8c79f2df52..7260d897066 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java @@ -70,7 +70,6 @@ import org.springframework.integration.gateway.GatewayProxyFactoryBean; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.handler.LoggingHandler; -import org.springframework.integration.handler.ReactiveMessageHandlerAdapter; import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer; import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice; import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice; @@ -718,8 +717,7 @@ public MessageHandler loggingMessageHandler() { public IntegrationFlow wireTapFlow1() { return IntegrationFlow.from("tappedChannel1") .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo"))) - .handle(new ReactiveMessageHandlerAdapter((message) -> Mono.just(message).log().then())) - .get(); + .handleReactive((message) -> Mono.just(message).log().then()); } @Bean diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/dsl/MongoDbTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/dsl/MongoDbTests.java index d4237c39999..d29f289aee0 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/dsl/MongoDbTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/dsl/MongoDbTests.java @@ -425,7 +425,7 @@ private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway( public IntegrationFlow reactiveStore() { return f -> f .channel(MessageChannels.flux()) - .handle(MongoDb.reactiveOutboundChannelAdapter(REACTIVE_MONGO_DATABASE_FACTORY)); + .handleReactive(MongoDb.reactiveOutboundChannelAdapter(REACTIVE_MONGO_DATABASE_FACTORY)); } } diff --git a/spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/dsl/R2dbcDslTests.java b/spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/dsl/R2dbcDslTests.java index 07118b0e11f..53706c15627 100644 --- a/spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/dsl/R2dbcDslTests.java +++ b/spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/dsl/R2dbcDslTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -114,12 +114,12 @@ IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) { e -> e.poller(p -> p.fixedDelay(100)).autoStartup(false).id("r2dbcInboundChannelAdapter")) . >handle((p, h) -> p, e -> e.async(true)) .channel(MessageChannels.flux()) - .handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate) - .queryType(R2dbcMessageHandler.Type.UPDATE) - .tableNameExpression("payload.class.simpleName") - .criteria((message) -> Criteria.where("id").is(2)) - .values("{age:36}")) - .get(); + .handleReactive( + R2dbc.outboundChannelAdapter(r2dbcEntityTemplate) + .queryType(R2dbcMessageHandler.Type.UPDATE) + .tableNameExpression("payload.class.simpleName") + .criteria((message) -> Criteria.where("id").is(2)) + .values("{age:36}")); } } diff --git a/src/reference/asciidoc/r2dbc.adoc b/src/reference/asciidoc/r2dbc.adoc index b4e905bfef1..2f9e90cd9fc 100644 --- a/src/reference/asciidoc/r2dbc.adoc +++ b/src/reference/asciidoc/r2dbc.adoc @@ -110,10 +110,10 @@ With Java DSL a configuration for this channel adapter is like this: ==== [source, java] ---- -.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate) +.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate) .queryType(R2dbcMessageHandler.Type.UPDATE) .tableNameExpression("payload.class.simpleName") .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId"))) .values("{age:36}")) ---- -==== \ No newline at end of file +==== diff --git a/src/reference/asciidoc/reactive-streams.adoc b/src/reference/asciidoc/reactive-streams.adoc index ad4f7333e9c..070cdd8d2bf 100644 --- a/src/reference/asciidoc/reactive-streams.adoc +++ b/src/reference/asciidoc/reactive-streams.adoc @@ -154,6 +154,29 @@ However, when a `ReactiveStreamsConsumer` is involved in the flow (e.g. when cha One of the out-of-the-box `ReactiveMessageHandler` implementation is a `ReactiveMongoDbStoringMessageHandler` for Outbound Channel Adapter. See <<./mongodb.adoc#mongodb-reactive-channel-adapters,MongoDB Reactive Channel Adapters>> for more information. +Starting with version 6.1, the `IntegrationFlowDefinition` exposes a convenient `handleReactive(ReactiveMessageHandler)` terminal operator. +Any `ReactiveMessageHandler` implementation (even just a plain lambda using the `Mono` API) can be used for this operator. +The framework subscribes to the returned `Mono ` automatically. +Here is a simple sample of possible configuration for this operator: + +==== +[source, java] +---- +@Bean +public IntegrationFlow wireTapFlow1() { + return IntegrationFlow.from("tappedChannel1") + .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo"))) + .handleReactive((message) -> Mono.just(message).log().then()); +} +---- +==== + +An overloaded version of this operator accepts a `Consumer >` to customize a consumer endpoint around the provided `ReactiveMessageHandler`. + +In addition, a `ReactiveMessageHandlerSpec`-based variants are also provided. +In most cases they are used for protocol-specific channel adapter implementations. +See the next section following links to the target technologies with respective reactive channel adapters. + [[reactive-channel-adapters]] === Reactive Channel Adapters diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index c5b60ae53fb..b0dc787b624 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -29,6 +29,11 @@ See <<./zip.adoc#zip,Zip Support>> for more information. The `ContextHolderRequestHandlerAdvice` allows to store a value from a request message into some context around `MessageHandler` execution. See <<./handler-advice.adoc#context-holder-advice, Context Holder Advice>> for more information. +[[x6.1-handle-reactive]] +==== The `handleReactive()` operator for Java DSL +The `IntegrationFlow` can now end with a convenient `handleReactive(ReactiveMessageHandler)` operator. +See <<./reactive-streams.adoc#reactive-message-handler, `ReactiveMessageHandler`>> for more information. + [[x6.1-general]] === General Changes