diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/MessagingGatewayRegistrar.java b/spring-integration-core/src/main/java/org/springframework/integration/config/MessagingGatewayRegistrar.java index d31c86953d1..55b530b9b58 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/MessagingGatewayRegistrar.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/MessagingGatewayRegistrar.java @@ -24,6 +24,7 @@ import java.util.Set; import org.springframework.beans.BeanMetadataAttribute; +import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.beans.factory.BeanDefinitionStoreException; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinitionHolder; @@ -38,10 +39,12 @@ import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.annotation.AnnotationConstants; import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean; import org.springframework.integration.gateway.GatewayMethodMetadata; import org.springframework.integration.gateway.GatewayProxyFactoryBean; import org.springframework.integration.util.MessagingAnnotationUtils; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.MultiValueMap; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; @@ -55,7 +58,14 @@ * @author Andy Wilksinson * @since 4.0 */ -public class MessagingGatewayRegistrar implements ImportBeanDefinitionRegistrar { +public class MessagingGatewayRegistrar implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware { + + private ClassLoader beanClassLoader; + + @Override + public void setBeanClassLoader(ClassLoader classLoader) { + this.beanClassLoader = classLoader; + } @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { @@ -72,6 +82,13 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B } public BeanDefinitionHolder parse(Map gatewayAttributes) { + boolean completableFutureCapable = true; + try { + ClassUtils.forName("java.util.concurrent.CompletableFuture", this.beanClassLoader); + } + catch (Exception e1) { + completableFutureCapable = false; + } String defaultPayloadExpression = (String) gatewayAttributes.get("defaultPayloadExpression"); @@ -93,7 +110,9 @@ public BeanDefinitionHolder parse(Map gatewayAttributes) { boolean hasDefaultHeaders = !ObjectUtils.isEmpty(defaultHeaders); Assert.state(!hasMapper || !hasDefaultHeaders, "'defaultHeaders' are not allowed when a 'mapper' is provided"); - BeanDefinitionBuilder gatewayProxyBuilder = BeanDefinitionBuilder.genericBeanDefinition(GatewayProxyFactoryBean.class); + BeanDefinitionBuilder gatewayProxyBuilder = BeanDefinitionBuilder.genericBeanDefinition( + completableFutureCapable ? GatewayCompletableFutureProxyFactoryBean.class + : GatewayProxyFactoryBean.class); if (hasDefaultHeaders || hasDefaultPayloadExpression) { BeanDefinitionBuilder methodMetadataBuilder = BeanDefinitionBuilder.genericBeanDefinition(GatewayMethodMetadata.class); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayCompletableFutureProxyFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayCompletableFutureProxyFactoryBean.java new file mode 100644 index 00000000000..966e136b0e4 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayCompletableFutureProxyFactoryBean.java @@ -0,0 +1,90 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.gateway; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +import org.aopalliance.intercept.MethodInvocation; + +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.messaging.MessagingException; + + +/** + * A gateway proxy factory bean that can handle JDK8 {@link CompletableFuture}s. If a + * gateway method returns {@link CompletableFuture} exactly, one will be returned and the + * results of the + * {@link CompletableFuture#supplyAsync(Supplier, java.util.concurrent.Executor) + * supplyAsync} method call will be returned. If you wish your integration flow to return + * a {@link CompletableFuture} to the gateway in a reply message, the async executor must + * be set to {@code null}. If the return type is a subclass of {@link CompletableFuture}, + * it must be returned by the integration flow and the async executor (if present) is not + * used. + * + * @author Gary Russell + * @since 4.2 + * + */ +public class GatewayCompletableFutureProxyFactoryBean extends GatewayProxyFactoryBean { + + public GatewayCompletableFutureProxyFactoryBean() { + super(); + } + + public GatewayCompletableFutureProxyFactoryBean(Class serviceInterface) { + super(serviceInterface); + } + + @Override + public Object invoke(MethodInvocation invocation) throws Throwable { + final Class returnType = invocation.getMethod().getReturnType(); + if (CompletableFuture.class.equals(returnType)) { // exact + AsyncTaskExecutor asyncExecutor = getAsyncExecutor(); + if (asyncExecutor != null) { + return CompletableFuture.supplyAsync(new Invoker(invocation), asyncExecutor); + } + } + return super.invoke(invocation); + } + + private class Invoker implements Supplier { + + private final MethodInvocation invocation; + + public Invoker(MethodInvocation methodInvocation) { + this.invocation = methodInvocation; + } + + @Override + public Object get() { + try { + return doInvoke(invocation, false); + } + catch (Error e) {//NOSONAR + throw e; + } + catch (Throwable t) {//NOSONAR + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } + throw new MessagingException("asynchronous gateway invocation failed", t); + } + } + + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java index 987b956b820..bc2419eddb0 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java @@ -64,9 +64,9 @@ import org.springframework.util.StringUtils; import reactor.Environment; +import reactor.fn.Functions; import reactor.rx.Promise; import reactor.rx.Promises; -import reactor.fn.Functions; /** * Generates a proxy for the provided service interface to enable interaction @@ -279,6 +279,10 @@ public final void setMapper(MethodArgsMessageMapper mapper) { this.argsMapper = mapper; } + protected AsyncTaskExecutor getAsyncExecutor() { + return asyncExecutor; + } + @Override protected void onInit() { synchronized (this.initializationMonitor) { @@ -366,16 +370,16 @@ else if (Future.class.isAssignableFrom(returnType)) { return Promises.task((Environment) this.reactorEnvironment, Functions.supplier(new AsyncInvocationTask(invocation))); } - return this.doInvoke(invocation); + return this.doInvoke(invocation, true); } - private Object doInvoke(MethodInvocation invocation) throws Throwable { + protected Object doInvoke(MethodInvocation invocation, boolean runningOnCallerThread) throws Throwable { Method method = invocation.getMethod(); if (AopUtils.isToStringMethod(method)) { return "gateway proxy for service interface [" + this.serviceInterface + "]"; } try { - return this.invokeGatewayMethod(invocation); + return this.invokeGatewayMethod(invocation, runningOnCallerThread); } catch (Throwable e) {//NOSONAR - ok to catch, rethrown below this.rethrowExceptionCauseIfPossible(e, invocation.getMethod()); @@ -383,7 +387,7 @@ private Object doInvoke(MethodInvocation invocation) throws Throwable { } } - private Object invokeGatewayMethod(MethodInvocation invocation) throws Exception { + private Object invokeGatewayMethod(MethodInvocation invocation, boolean runningOnCallerThread) throws Exception { if (!this.initialized) { this.afterPropertiesSet(); } @@ -391,7 +395,7 @@ private Object invokeGatewayMethod(MethodInvocation invocation) throws Exception MethodInvocationGateway gateway = this.gatewayMap.get(method); Class returnType = method.getReturnType(); boolean shouldReturnMessage = Message.class.isAssignableFrom(returnType) - || hasReturnParameterizedWithMessage(method); + || hasReturnParameterizedWithMessage(method, runningOnCallerThread); boolean shouldReply = returnType != void.class; int paramCount = method.getParameterTypes().length; Object response = null; @@ -593,9 +597,10 @@ private T convert(Object source, Class expectedReturnType) { } } - private static boolean hasReturnParameterizedWithMessage(Method method) { - if (Future.class.isAssignableFrom(method.getReturnType()) - || (reactorPresent && Promise.class.isAssignableFrom(method.getReturnType()))) { + private static boolean hasReturnParameterizedWithMessage(Method method, boolean runningOnCallerThread) { + if (!runningOnCallerThread && + (Future.class.isAssignableFrom(method.getReturnType()) + || (reactorPresent && Promise.class.isAssignableFrom(method.getReturnType())))) { Type returnType = method.getGenericReturnType(); if (returnType instanceof ParameterizedType) { Type[] typeArgs = ((ParameterizedType) returnType).getActualTypeArguments(); @@ -634,7 +639,7 @@ private AsyncInvocationTask(MethodInvocation invocation) { @Override public Object call() throws Exception { try { - return doInvoke(this.invocation); + return doInvoke(this.invocation, false); } catch (Error e) {//NOSONAR throw e; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests-context.xml b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests-context.xml index 7c012803fa4..2f983ef205e 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests-context.xml +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests-context.xml @@ -48,6 +48,30 @@ default-reply-channel="replyChannel" reactor-environment="reactorEnvironment"/> + + + + + + + + diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests.java index 8b6b70b4f96..b9f9d5af3f9 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests.java @@ -16,32 +16,45 @@ package org.springframework.integration.config.xml; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; import org.junit.Test; import org.junit.runner.RunWith; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.IntegrationConfigUtils; import org.springframework.integration.gateway.RequestReplyExchanger; import org.springframework.integration.gateway.TestService; +import org.springframework.integration.gateway.TestService.MyCompletableFuture; +import org.springframework.integration.gateway.TestService.MyCompletableMessageFuture; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.ChannelInterceptorAdapter; import org.springframework.messaging.support.GenericMessage; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.test.annotation.DirtiesContext; @@ -105,8 +118,15 @@ public void testAsyncGateway() throws Exception { @Test public void testAsyncDisabledGateway() throws Exception { - Object service = context.getBean("&asyncOff"); - assertNull(TestUtils.getPropertyValue(service, "asyncExecutor")); + PollableChannel requestChannel = (PollableChannel) context.getBean("requestChannel"); + MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + this.startResponder(requestChannel, replyChannel); + TestService service = context.getBean("asyncOff", TestService.class); + Future> result = service.async("futureSync"); + Message reply = result.get(1, TimeUnit.SECONDS); + assertEquals("futureSync", reply.getPayload()); + Object serviceBean = context.getBean("&asyncOff"); + assertNull(TestUtils.getPropertyValue(serviceBean, "asyncExecutor")); } @Test @@ -137,13 +157,207 @@ public void testPromiseGateway() throws Exception { assertNotNull(TestUtils.getPropertyValue(context.getBean("&promise"), "asyncExecutor")); } + @Test + @DirtiesContext + public void testAsyncCompletable() throws Exception { + QueueChannel requestChannel = (QueueChannel) context.getBean("requestChannel"); + final AtomicReference thread = new AtomicReference<>(); + requestChannel.addInterceptor(new ChannelInterceptorAdapter() { + + @Override + public Message preSend(Message message, MessageChannel channel) { + thread.set(Thread.currentThread()); + return super.preSend(message, channel); + } + }); + MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + this.startResponder(requestChannel, replyChannel); + TestService service = context.getBean("asyncCompletable", TestService.class); + CompletableFuture result = service.completable("foo").thenApply(t -> t.toUpperCase()); + String reply = result.get(1, TimeUnit.SECONDS); + assertEquals("FOO", reply); + assertThat(thread.get().getName(), startsWith("testExec-")); + assertNotNull(TestUtils.getPropertyValue(context.getBean("&asyncCompletable"), "asyncExecutor")); + } + + @Test + @DirtiesContext + public void testAsyncCompletableNoAsync() throws Exception { + QueueChannel requestChannel = (QueueChannel) context.getBean("requestChannel"); + final AtomicReference thread = new AtomicReference<>(); + requestChannel.addInterceptor(new ChannelInterceptorAdapter() { + + @Override + public Message preSend(Message message, MessageChannel channel) { + thread.set(Thread.currentThread()); + return super.preSend(message, channel); + } + }); + MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + this.startResponder(requestChannel, replyChannel); + TestService service = context.getBean("completableNoAsync", TestService.class); + CompletableFuture result = service.completable("flowCompletable"); + String reply = result.get(1, TimeUnit.SECONDS); + assertEquals("SYNC_COMPLETABLE", reply); + assertEquals(Thread.currentThread(), thread.get()); + assertNull(TestUtils.getPropertyValue(context.getBean("&completableNoAsync"), "asyncExecutor")); + } + + @Test + @DirtiesContext + public void testCustomCompletableNoAsync() throws Exception { + QueueChannel requestChannel = (QueueChannel) context.getBean("requestChannel"); + final AtomicReference thread = new AtomicReference<>(); + requestChannel.addInterceptor(new ChannelInterceptorAdapter() { + + @Override + public Message preSend(Message message, MessageChannel channel) { + thread.set(Thread.currentThread()); + return super.preSend(message, channel); + } + }); + MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + this.startResponder(requestChannel, replyChannel); + TestService service = context.getBean("completableNoAsync", TestService.class); + MyCompletableFuture result = service.customCompletable("flowCustomCompletable"); + String reply = result.get(1, TimeUnit.SECONDS); + assertEquals("SYNC_CUSTOM_COMPLETABLE", reply); + assertEquals(Thread.currentThread(), thread.get()); + assertNull(TestUtils.getPropertyValue(context.getBean("&completableNoAsync"), "asyncExecutor")); + } + + @Test + @DirtiesContext + public void testCustomCompletableNoAsyncAttemptAsync() throws Exception { + Object gateway = context.getBean("&customCompletableAttemptAsync"); + Log logger = spy(TestUtils.getPropertyValue(gateway, "logger", Log.class)); + when(logger.isDebugEnabled()).thenReturn(true); + new DirectFieldAccessor(gateway).setPropertyValue("logger", logger); + QueueChannel requestChannel = (QueueChannel) context.getBean("requestChannel"); + final AtomicReference thread = new AtomicReference<>(); + requestChannel.addInterceptor(new ChannelInterceptorAdapter() { + + @Override + public Message preSend(Message message, MessageChannel channel) { + thread.set(Thread.currentThread()); + return super.preSend(message, channel); + } + }); + MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + this.startResponder(requestChannel, replyChannel); + TestService service = context.getBean("customCompletableAttemptAsync", TestService.class); + MyCompletableFuture result = service.customCompletable("flowCustomCompletable"); + String reply = result.get(1, TimeUnit.SECONDS); + assertEquals("SYNC_CUSTOM_COMPLETABLE", reply); + assertEquals(Thread.currentThread(), thread.get()); + assertNotNull(TestUtils.getPropertyValue(gateway, "asyncExecutor")); + verify(logger).debug("AsyncTaskExecutor submit*() return types are incompatible with the method return type; " + + "running on calling thread; the downstream flow must return the required Future: " + + "MyCompletableFuture"); + } + + @Test + @DirtiesContext + public void testAsyncCompletableMessge() throws Exception { + QueueChannel requestChannel = (QueueChannel) context.getBean("requestChannel"); + final AtomicReference thread = new AtomicReference<>(); + requestChannel.addInterceptor(new ChannelInterceptorAdapter() { + + @Override + public Message preSend(Message message, MessageChannel channel) { + thread.set(Thread.currentThread()); + return super.preSend(message, channel); + } + }); + MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + this.startResponder(requestChannel, replyChannel); + TestService service = context.getBean("asyncCompletable", TestService.class); + CompletableFuture> result = service.completableReturnsMessage("foo"); + Message reply = result.get(1, TimeUnit.SECONDS); + assertEquals("foo", reply.getPayload()); + assertThat(thread.get().getName(), startsWith("testExec-")); + assertNotNull(TestUtils.getPropertyValue(context.getBean("&asyncCompletable"), "asyncExecutor")); + } + + @Test + @DirtiesContext + public void testAsyncCompletableNoAsyncMessage() throws Exception { + QueueChannel requestChannel = (QueueChannel) context.getBean("requestChannel"); + final AtomicReference thread = new AtomicReference<>(); + requestChannel.addInterceptor(new ChannelInterceptorAdapter() { + + @Override + public Message preSend(Message message, MessageChannel channel) { + thread.set(Thread.currentThread()); + return super.preSend(message, channel); + } + }); + MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + this.startResponder(requestChannel, replyChannel); + TestService service = context.getBean("completableNoAsync", TestService.class); + CompletableFuture> result = service.completableReturnsMessage("flowCompletableM"); + Message reply = result.get(1, TimeUnit.SECONDS); + assertEquals("flowCompletableM", reply.getPayload()); + assertEquals(Thread.currentThread(), thread.get()); + assertNull(TestUtils.getPropertyValue(context.getBean("&completableNoAsync"), "asyncExecutor")); + } + + @Test + @DirtiesContext + public void testCustomCompletableNoAsyncMessage() throws Exception { + QueueChannel requestChannel = (QueueChannel) context.getBean("requestChannel"); + final AtomicReference thread = new AtomicReference<>(); + requestChannel.addInterceptor(new ChannelInterceptorAdapter() { + + @Override + public Message preSend(Message message, MessageChannel channel) { + thread.set(Thread.currentThread()); + return super.preSend(message, channel); + } + }); + MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + this.startResponder(requestChannel, replyChannel); + TestService service = context.getBean("completableNoAsync", TestService.class); + MyCompletableMessageFuture result = service.customCompletableReturnsMessage("flowCustomCompletableM"); + Message reply = result.get(1, TimeUnit.SECONDS); + assertEquals("flowCustomCompletableM", reply.getPayload()); + assertEquals(Thread.currentThread(), thread.get()); + assertNull(TestUtils.getPropertyValue(context.getBean("&completableNoAsync"), "asyncExecutor")); + } + private void startResponder(final PollableChannel requestChannel, final MessageChannel replyChannel) { Executors.newSingleThreadExecutor().execute(new Runnable() { @Override public void run() { - Message request = requestChannel.receive(); + Message request = requestChannel.receive(60000); + assertNotNull("Request not received", request); Message reply = MessageBuilder.fromMessage(request) .setCorrelationId(request.getHeaders().getId()).build(); + Object payload = null; + if (request.getPayload().equals("futureSync")) { + payload = new AsyncResult>(reply); + } + else if (request.getPayload().equals("flowCompletable")) { + payload = CompletableFuture.completedFuture("SYNC_COMPLETABLE"); + } + else if (request.getPayload().equals("flowCustomCompletable")) { + MyCompletableFuture myCompletableFuture = new MyCompletableFuture(); + myCompletableFuture.complete("SYNC_CUSTOM_COMPLETABLE"); + payload = myCompletableFuture; + } + else if (request.getPayload().equals("flowCompletableM")) { + payload = CompletableFuture.>completedFuture(reply); + } + else if (request.getPayload().equals("flowCustomCompletableM")) { + MyCompletableMessageFuture myCompletableFuture = new MyCompletableMessageFuture(); + myCompletableFuture.complete(reply); + payload = myCompletableFuture; + } + if (payload != null) { + reply = MessageBuilder.withPayload(payload) + .copyHeaders(reply.getHeaders()) + .build(); + } replyChannel.send(reply); } }); @@ -157,6 +371,10 @@ private static class TestExecutor extends SimpleAsyncTaskExecutor implements Bea private volatile String beanName; + public TestExecutor() { + setThreadNamePrefix("testExec-"); + } + @Override public void setBeanName(String beanName) { this.beanName = beanName; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/gateway/TestService.java b/spring-integration-core/src/test/java/org/springframework/integration/gateway/TestService.java index 9a7e834549c..1fe220a421e 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/gateway/TestService.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/gateway/TestService.java @@ -16,10 +16,11 @@ package org.springframework.integration.gateway; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; -import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Payload; import reactor.rx.Promise; @@ -51,4 +52,20 @@ public interface TestService { Promise> promise(String s); + CompletableFuture completable(String s); + + MyCompletableFuture customCompletable(String s); + + CompletableFuture> completableReturnsMessage(String s); + + MyCompletableMessageFuture customCompletableReturnsMessage(String s); + + public class MyCompletableFuture extends CompletableFuture { + + } + + public class MyCompletableMessageFuture extends CompletableFuture> { + + } + } diff --git a/src/reference/asciidoc/gateway.adoc b/src/reference/asciidoc/gateway.adoc index 3fadcf3c56c..fea5ae64eb9 100644 --- a/src/reference/asciidoc/gateway.adoc +++ b/src/reference/asciidoc/gateway.adoc @@ -359,6 +359,8 @@ Finally, you might want to consider setting downstream flags such as 'requires-r [[async-gateway]] ==== Asynchronous Gateway +===== Introduction + As a pattern, the Messaging Gateway is a very nice way to hide messaging-specific code while still exposing the full capabilities of the messaging system. As you've seen, the `GatewayProxyFactoryBean` provides a convenient way to expose a Proxy over a service-interface thus giving you POJO-based access to a messaging system (based on objects in your own domain, or primitives/Strings, etc).  But when a gateway is exposed via simple POJO methods which return values it does imply that for each Request message (generated when the method is invoked) there must be a Reply message (generated when the method has returned). @@ -402,7 +404,7 @@ int finalResult =  result.get(1000, TimeUnit.SECONDS); For a more detailed example, please refer to the https://github.com/SpringSource/spring-integration-samples/tree/master/intermediate/async-gateway[_async-gateway_] sample distributed within the Spring Integration samples. -*ListenableFuture* +===== ListenableFuture Starting with _version 4.1_, async gateway methods can also return `ListenableFuture` (introduced in Spring Framework 4.0). These return types allow you to provide a callback which is invoked when the result is available (or an exception occurs). @@ -425,12 +427,12 @@ result.addCallback(new ListenableFutureCallback() { }); ---- -*Asynchronous Gateway and AsyncTaskExecutor* +===== AsyncTaskExecutor By default, the `GatewayProxyFactoryBean` uses `org.springframework.core.task.SimpleAsyncTaskExecutor` when submitting internal `AsyncInvocationTask` instances for any gateway method whose return type is `Future`. However the `async-executor` attribute in the `` element's configuration allows you to provide a reference to any implementation of `java.util.concurrent.Executor` available within the Spring application context. -The (default) `SimpleAsyncTaskExecutor` supports both `Future` and `ListenableFuture` return types, returning `FutureTask` or `ListenableFutureTask` respectively. +The (default) `SimpleAsyncTaskExecutor` supports both `Future` and `ListenableFuture` return types, returning `FutureTask` or `ListenableFutureTask` respectively. Also see <> below. Even though there is a default executor, it is often useful to provide an external one so that you can identify its threads in logs (when using XML, the thread name is based on the executor's bean name): [source,java] @@ -468,9 +470,92 @@ public interface NoExecGateway { IMPORTANT: If the return type is a specific concrete `Future` implementation or some other subinterface that is not supported by the configured executor, the flow will run on the caller's thread and the flow must return the required type in the reply message payload. -*Asynchronous Gateway and Reactor Promise* +[[gw-completable-future]] +===== CompletableFuture + +Starting with _version 4.2_, gateway methods can now return `CompletableFuture`. +There are several modes of operation when returning this type: + +When an async executor is provided *and* the return type is exactly `CompletableFuture` (not a subclass), the framework +will run the task on the executor and immediately return a `CompletableFuture` to the caller. +`CompletableFuture.supplyAsync(Supplier supplier, Executor executor)` is used to create the future. + +When the async executor is explicitly set to `null` and the return type is `CompletableFuture` *or* the return type +is a subclass of `CompletableFuture`, the flow is invoked on the caller's thread. +In this scenario, it is expected that the downstream flow will return a `CompletableFuture` of the appropriate type. + +*Usage Scenarios* + +[source, java] +---- + +CompletableFuture order(Order order); +---- + +[source, xml] +---- + + +---- + +In this scenario, the caller thread returns immediately with a `CompletableFuture` which will be completed +when the downstream flow replies to the gateway (with an `Invoice` object). + +[source, java] +---- + +CompletableFuture order(Order order); +---- + +[source, xml] +---- + + +---- + +In this scenario, the caller thread will return with a CompletableFuture when the downstream flow provides +it as the payload of the reply to the gateway. +Some other process must complete the future when the invoice is ready. + +[source, java] +---- + +MyCompletableFuture order(Order order); +---- + +[source, xml] +---- + + +---- + +In this scenario, the caller thread will return with a CompletableFuture when the downstream flow provides +it as the payload of the reply to the gateway. +Some other process must complete the future when the invoice is ready. +If `DEBUG` logging is enabled, a log is emitted indicating that the async executor cannot be used for this scenario. + + +`CompletionFuture` s can be used to perform additional manipulation on the reply, such as: + +[source, java] +---- + +CompletableFuture process(String data); + +... + +CompletableFuture result = process("foo") + .thenApply(t -> t.toUpperCase()); + +... + +String out = result.get(10, TimeUnit.SECONDS); +---- + +===== Reactor Promise -Also starting with _version 4.1_, the `GatewayProxyFactoryBean` allows the use of a `Reactor` with gateway interface methods, utilizing a https://github.com/reactor/reactor/wiki/Promises[`Promise`] return type. +Starting with _version 4.1_, the `GatewayProxyFactoryBean` allows the use of a `Reactor` with gateway interface methods, utilizing a https://github.com/reactor/reactor/wiki/Promises[`Promise`] return type. The internal `AsyncInvocationTask` is wrapped in a `reactor.function.Supplier` with the provided `reactorEnvironment`, using a default `RingBufferDispatcher` for the `Promise` consumption. Note, a `reactorEnvironment` reference is required whenever a service interface has at least one method with a `Promise` return type. (Only those methods run on the reactor's dispatcher). diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 0b170c5766f..9eb9d327f5a 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -145,3 +145,10 @@ See <> for more information. The `AbstractPersistentFileListFilter` has a new property `flushOnUpdate` which, when set to true, will `flush()` the metadata store if it implements `Flushable` (e.g. the `PropertiesPersistenMetadataStore`). + + +[[x4.2-gw-completable-future]] +==== Gateway Methods can Return CompletableFuture + +When using Java 8, gateway methods can now return `CompletableFuture`. +See <> for more information.