|
65 | 65 | import org.springframework.integration.handler.LoggingHandler;
|
66 | 66 | import org.springframework.integration.handler.MessageProcessor;
|
67 | 67 | import org.springframework.integration.handler.MessageTriggerAction;
|
| 68 | +import org.springframework.integration.handler.ReactiveMessageHandlerAdapter; |
68 | 69 | import org.springframework.integration.handler.ServiceActivatingHandler;
|
69 | 70 | import org.springframework.integration.router.AbstractMessageRouter;
|
70 | 71 | import org.springframework.integration.router.ErrorMessageExceptionTypeRouter;
|
|
91 | 92 | import org.springframework.messaging.Message;
|
92 | 93 | import org.springframework.messaging.MessageChannel;
|
93 | 94 | import org.springframework.messaging.MessageHandler;
|
| 95 | +import org.springframework.messaging.ReactiveMessageHandler; |
94 | 96 | import org.springframework.messaging.support.ChannelInterceptor;
|
95 | 97 | import org.springframework.messaging.support.InterceptableChannel;
|
96 | 98 | import org.springframework.util.Assert;
|
@@ -2916,6 +2918,66 @@ public IntegrationFlow nullChannel() {
|
2916 | 2918 | .get();
|
2917 | 2919 | }
|
2918 | 2920 |
|
| 2921 | + /** |
| 2922 | + * Populate a terminal consumer endpoint for the selected protocol specific |
| 2923 | + * {@link MessageHandler} implementation from {@code Namespace Factory}: |
| 2924 | + * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}. |
| 2925 | + * @param messageHandlerSpec the {@link MessageHandlerSpec} to configure protocol specific |
| 2926 | + * {@link MessageHandler}. |
| 2927 | + * @param <H> the {@link MessageHandler} type. |
| 2928 | + * @return the current {@link BaseIntegrationFlowDefinition}. |
| 2929 | + * @since 6.1 |
| 2930 | + */ |
| 2931 | + public <H extends ReactiveMessageHandler> IntegrationFlow handleReactive( |
| 2932 | + ReactiveMessageHandlerSpec<?, H> messageHandlerSpec) { |
| 2933 | + |
| 2934 | + return handleReactive(messageHandlerSpec, null); |
| 2935 | + } |
| 2936 | + |
| 2937 | + /** |
| 2938 | + * Populate a terminal consumer endpoint for the selected protocol specific |
| 2939 | + * {@link MessageHandler} implementation from {@code Namespace Factory}: |
| 2940 | + * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}. |
| 2941 | + * @param messageHandlerSpec the {@link MessageHandlerSpec} to configure protocol specific |
| 2942 | + * {@link MessageHandler}. |
| 2943 | + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. |
| 2944 | + * @param <H> the {@link MessageHandler} type. |
| 2945 | + * @return the current {@link BaseIntegrationFlowDefinition}. |
| 2946 | + * @since 6.1 |
| 2947 | + */ |
| 2948 | + public <H extends ReactiveMessageHandler> IntegrationFlow handleReactive( |
| 2949 | + ReactiveMessageHandlerSpec<?, H> messageHandlerSpec, |
| 2950 | + @Nullable Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> endpointConfigurer) { |
| 2951 | + |
| 2952 | + return |
| 2953 | + addComponents(messageHandlerSpec.getComponentsToRegister()). |
| 2954 | + handleReactive(messageHandlerSpec.getObject().getDelegate(), endpointConfigurer); |
| 2955 | + } |
| 2956 | + |
| 2957 | + /** |
| 2958 | + * Add a {@link ReactiveMessageHandler} as a terminal {@link IntegrationFlow} operator. |
| 2959 | + * @param reactiveMessageHandler the {@link ReactiveMessageHandler} to finish the flow. |
| 2960 | + * @return The {@link IntegrationFlow} instance based on this definition. |
| 2961 | + * @since 6.1 |
| 2962 | + */ |
| 2963 | + public IntegrationFlow handleReactive(ReactiveMessageHandler reactiveMessageHandler) { |
| 2964 | + return handleReactive(reactiveMessageHandler, null); |
| 2965 | + } |
| 2966 | + |
| 2967 | + /** |
| 2968 | + * Add a {@link ReactiveMessageHandler} as a terminal {@link IntegrationFlow} operator. |
| 2969 | + * @param reactiveMessageHandler the {@link ReactiveMessageHandler} to finish the flow. |
| 2970 | + * @param endpointConfigurer the {@link Consumer} to configure a target endpoint for the handler. |
| 2971 | + * @return The {@link IntegrationFlow} instance based on this definition. |
| 2972 | + * @since 6.1 |
| 2973 | + */ |
| 2974 | + public IntegrationFlow handleReactive(ReactiveMessageHandler reactiveMessageHandler, |
| 2975 | + @Nullable Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> endpointConfigurer) { |
| 2976 | + |
| 2977 | + return handle(new ReactiveMessageHandlerAdapter(reactiveMessageHandler), endpointConfigurer) |
| 2978 | + .get(); |
| 2979 | + } |
| 2980 | + |
2919 | 2981 | /**
|
2920 | 2982 | * Finish this flow with delegation to other {@link IntegrationFlow} instance.
|
2921 | 2983 | * @param other the {@link IntegrationFlow} to compose with.
|
|
0 commit comments