Skip to content

Feature Request - handleReactive(ReactiveMessageHandler) #3763

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jifeiiii opened this issue Mar 31, 2022 · 6 comments · Fixed by #8605
Closed

Feature Request - handleReactive(ReactiveMessageHandler) #3763

jifeiiii opened this issue Mar 31, 2022 · 6 comments · Fixed by #8605
Assignees
Milestone

Comments

@jifeiiii
Copy link

Expected Behavior
I'd like there to be a reactive handleReactive(ReactiveMessageHandler) operator that will enable me to easily write reactive code as a handler.

Current Behavior
There is a need to use the ReactiveMessageHandlerAdapter class and write the code inside of it.

Context
The workaround is pretty simple, but an implementation of such a function seems to be pretty simple either. Here's a SO question to support the issue.

@jifeiiii jifeiiii added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels Mar 31, 2022
@artembilan
Copy link
Member

Just to clarify: such an operator indeed going to be based on the ReactiveMessageHandler only, which could be represent as Lambda for simple use-cases.
But most important it is going to be terminal operator with an exact meaning that there is no way to continue a flow after it.
Something similar to the IntegrationFlow nullChannel() or IntegrationFlow to(IntegrationFlow other).

@artembilan artembilan added in: dsl ideal-for-user-contribution An issue that would ideal for a user to get started with contributing. and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Mar 31, 2022
@artembilan artembilan added this to the 6.0.x milestone Mar 31, 2022
@jifeiiii
Copy link
Author

jifeiiii commented Apr 3, 2022

I also think that it would be useful to let the user choose how exactly to subscribe, and to manage publishOn easily

@artembilan
Copy link
Member

The publishOn() is a part of the reactive() endpoint configuration. It has nothing to do with particular handler.
I'm not sure what kind of choice you'd like to see for the subscription. This is going to be the end of the flow.
Therefore Mono.then() is the way to go in the particular ReactiveMessageHandler impl: the framework will do the rest for you.

@markusherbert
Copy link

@artembilan I think this issue talks about a correct point.. You currently can not access a flux and end the flow through it by yourself. This is really a blocker when it comes to developing custom outbound channel adapters or just custom reactive code. In this manner, I think Reactive Spring Integration is very problematic. The solution may be to create a handleFlux(Flux<Message<?>>). I think this is what lots of people search for (or me at least).

In my point of view, if Spring Integration enables this, it can easily become the 'project reactor framework' - a thing that lots of people would use happily.
Such handleFlux() is pretty simple to add, and can add lots of value for Reactive Spring Integration users (that currently need to do instanceof Flux inside the message handler class to perform flux operations.

@markusherbert
Copy link

In general, @artembilan, is it currently possible to subscribe manually? I think a built-in feature for this can provide a lot of flexibility to the reactive spring integration users

@artembilan
Copy link
Member

There is already something like handleFlux() in Spring Integration Java DSL. It is called fluxTransform():

	/**
	 * Populate a {@link FluxMessageChannel} to start a reactive processing for upstream data,
	 * wrap it to a {@link Flux}, apply provided {@link Function} via {@link Flux#transform(Function)}
	 * and emit the result to one more {@link FluxMessageChannel}, subscribed in the downstream flow.
	 * @param fluxFunction the {@link Function} to process data reactive manner.
	 * @param <I> the input payload type.
	 * @param <O> the output type.
	 * @return the current {@link BaseIntegrationFlowDefinition}.
	 */
	@SuppressWarnings(UNCHECKED)
	public <I, O> B fluxTransform(Function<? super Flux<Message<I>>, ? extends Publisher<O>> fluxFunction) {

I even mentioned this in the SO you are pointing to.
I'm not sure what else we need...

This particular issue is exactly for the ReactiveMessageHandler convenience which can be used only in the end of flow.
So, let's don't hijack it and talk only about specific task in a single GH issue!

@artembilan artembilan modified the milestones: 6.0.x, 6.1.x Jan 17, 2023
@artembilan artembilan self-assigned this Apr 28, 2023
@artembilan artembilan removed the ideal-for-user-contribution An issue that would ideal for a user to get started with contributing. label Apr 28, 2023
@artembilan artembilan modified the milestones: 6.1.x, 6.1.0 Apr 28, 2023
artembilan added a commit to artembilan/spring-integration that referenced this issue Apr 28, 2023
Fixes spring-projects#3763

Add a convenient terminal operator to `BaseIntegrationFlowDefinition`
based on a `ReactiveMessageHandler`.
Also add an overload like `handleReactive(ReactiveMessageHandlerSpec)`
to let end-user to choose a protocol-specific channel adapter
artembilan added a commit to artembilan/spring-integration that referenced this issue May 1, 2023
Fixes spring-projects#3763

Add a convenient terminal operator to `BaseIntegrationFlowDefinition`
based on a `ReactiveMessageHandler`.
Also add an overload like `handleReactive(ReactiveMessageHandlerSpec)`
to let end-user to choose a protocol-specific channel adapter
garyrussell added a commit that referenced this issue May 1, 2023
* GH-3763: Add `handleReactive()` for Java DSL

Fixes #3763

Add a convenient terminal operator to `BaseIntegrationFlowDefinition`
based on a `ReactiveMessageHandler`.
Also add an overload like `handleReactive(ReactiveMessageHandlerSpec)`
to let end-user to choose a protocol-specific channel adapter

* * Fix `Namespace Factory` wording in the `BaseIntegrationFlowDefinition` Javadocs

* Fix language in Docs

Co-authored-by: Gary Russell <[email protected]>

---------

Co-authored-by: Gary Russell <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants