Skip to content

Commit 1e84b78

Browse files
committed
INT-3724: Gateway - Support CompletableFuture
JIRA: https://jira.spring.io/browse/INT-3724 Add support for `CompletableFuture<?>` return types on gateway methods, if JDK8 is being used. - If the return type is exactly `CompletableFuture` and an async executor is provided, use `CompletableFuture.supplyAsync()` - If there is no return async executor, return types can be `CompletableFuture` or a subclass and the flow can return such a future. - Also fixes a problem for return type `Future<Message<?>>` with no async executor; previously this caused a `ClassCastException` because the gateway returned the message - it assumed such return types would always run on an excutor. We can consider back-porting this last part, but nobody has complained.
1 parent 51b52ea commit 1e84b78

File tree

6 files changed

+389
-16
lines changed

6 files changed

+389
-16
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/MessagingGatewayRegistrar.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Set;
2525

2626
import org.springframework.beans.BeanMetadataAttribute;
27+
import org.springframework.beans.factory.BeanClassLoaderAware;
2728
import org.springframework.beans.factory.BeanDefinitionStoreException;
2829
import org.springframework.beans.factory.config.BeanDefinition;
2930
import org.springframework.beans.factory.config.BeanDefinitionHolder;
@@ -38,10 +39,12 @@
3839
import org.springframework.expression.common.LiteralExpression;
3940
import org.springframework.integration.annotation.AnnotationConstants;
4041
import org.springframework.integration.annotation.MessagingGateway;
42+
import org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean;
4143
import org.springframework.integration.gateway.GatewayMethodMetadata;
4244
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
4345
import org.springframework.integration.util.MessagingAnnotationUtils;
4446
import org.springframework.util.Assert;
47+
import org.springframework.util.ClassUtils;
4548
import org.springframework.util.MultiValueMap;
4649
import org.springframework.util.ObjectUtils;
4750
import org.springframework.util.StringUtils;
@@ -55,7 +58,14 @@
5558
* @author Andy Wilksinson
5659
* @since 4.0
5760
*/
58-
public class MessagingGatewayRegistrar implements ImportBeanDefinitionRegistrar {
61+
public class MessagingGatewayRegistrar implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware {
62+
63+
private ClassLoader beanClassLoader;
64+
65+
@Override
66+
public void setBeanClassLoader(ClassLoader classLoader) {
67+
this.beanClassLoader = classLoader;
68+
}
5969

6070
@Override
6171
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
@@ -72,6 +82,13 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B
7282
}
7383

7484
public BeanDefinitionHolder parse(Map<String, Object> gatewayAttributes) {
85+
boolean completableFutureCapable = true;
86+
try {
87+
ClassUtils.forName("java.util.concurrent.CompletableFuture", this.beanClassLoader);
88+
}
89+
catch (Exception e1) {
90+
completableFutureCapable = false;
91+
}
7592

7693
String defaultPayloadExpression = (String) gatewayAttributes.get("defaultPayloadExpression");
7794

@@ -93,7 +110,9 @@ public BeanDefinitionHolder parse(Map<String, Object> gatewayAttributes) {
93110
boolean hasDefaultHeaders = !ObjectUtils.isEmpty(defaultHeaders);
94111
Assert.state(!hasMapper || !hasDefaultHeaders, "'defaultHeaders' are not allowed when a 'mapper' is provided");
95112

96-
BeanDefinitionBuilder gatewayProxyBuilder = BeanDefinitionBuilder.genericBeanDefinition(GatewayProxyFactoryBean.class);
113+
BeanDefinitionBuilder gatewayProxyBuilder = BeanDefinitionBuilder.genericBeanDefinition(
114+
completableFutureCapable ? GatewayCompletableFutureProxyFactoryBean.class
115+
: GatewayProxyFactoryBean.class);
97116

98117
if (hasDefaultHeaders || hasDefaultPayloadExpression) {
99118
BeanDefinitionBuilder methodMetadataBuilder = BeanDefinitionBuilder.genericBeanDefinition(GatewayMethodMetadata.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.integration.gateway;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.function.Supplier;
20+
21+
import org.aopalliance.intercept.MethodInvocation;
22+
23+
import org.springframework.core.task.AsyncTaskExecutor;
24+
import org.springframework.messaging.MessagingException;
25+
26+
27+
/**
28+
* A gateway proxy factory bean that can handle JDK8 {@link CompletableFuture}s. If a
29+
* gateway method returns {@link CompletableFuture} exactly, one will be returned and the
30+
* results of the
31+
* {@link CompletableFuture#supplyAsync(Supplier, java.util.concurrent.Executor))
32+
* supplyAsync} method call will be returned. If you wish your integration flow to return
33+
* a {@link CompletableFuture} to the gateway in a reply message, the async executor must
34+
* be set to {@code null}. If the return type is a subclass of {@link CompletableFuture},
35+
* it must be returned by the integration flow and the async executor (if present) is not
36+
* used.
37+
*
38+
* @author Gary Russell
39+
* @since 4.2
40+
*
41+
*/
42+
public class GatewayCompletableFutureProxyFactoryBean extends GatewayProxyFactoryBean {
43+
44+
public GatewayCompletableFutureProxyFactoryBean() {
45+
super();
46+
}
47+
48+
public GatewayCompletableFutureProxyFactoryBean(Class<?> serviceInterface) {
49+
super(serviceInterface);
50+
}
51+
52+
@Override
53+
public Object invoke(MethodInvocation invocation) throws Throwable {
54+
final Class<?> returnType = invocation.getMethod().getReturnType();
55+
if (CompletableFuture.class.equals(returnType)) { // exact
56+
AsyncTaskExecutor asyncExecutor = getAsyncExecutor();
57+
if (asyncExecutor != null) {
58+
return CompletableFuture.supplyAsync(new Invoker(invocation), asyncExecutor);
59+
}
60+
}
61+
return super.invoke(invocation);
62+
}
63+
64+
private class Invoker implements Supplier<Object> {
65+
66+
private final MethodInvocation invocation;
67+
68+
public Invoker(MethodInvocation methodInvocation) {
69+
this.invocation = methodInvocation;
70+
}
71+
72+
@Override
73+
public Object get() {
74+
try {
75+
return doInvoke(invocation, false);
76+
}
77+
catch (Error e) {//NOSONAR
78+
throw e;
79+
}
80+
catch (Throwable t) {//NOSONAR
81+
if (t instanceof RuntimeException) {
82+
throw (RuntimeException) t;
83+
}
84+
throw new MessagingException("asynchronous gateway invocation failed", t);
85+
}
86+
}
87+
88+
}
89+
90+
}

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@
6464
import org.springframework.util.StringUtils;
6565

6666
import reactor.Environment;
67+
import reactor.fn.Functions;
6768
import reactor.rx.Promise;
6869
import reactor.rx.Promises;
69-
import reactor.fn.Functions;
7070

7171
/**
7272
* Generates a proxy for the provided service interface to enable interaction
@@ -279,6 +279,10 @@ public final void setMapper(MethodArgsMessageMapper mapper) {
279279
this.argsMapper = mapper;
280280
}
281281

282+
protected AsyncTaskExecutor getAsyncExecutor() {
283+
return asyncExecutor;
284+
}
285+
282286
@Override
283287
protected void onInit() {
284288
synchronized (this.initializationMonitor) {
@@ -366,32 +370,32 @@ else if (Future.class.isAssignableFrom(returnType)) {
366370
return Promises.<Object>task((Environment) this.reactorEnvironment,
367371
Functions.supplier(new AsyncInvocationTask(invocation)));
368372
}
369-
return this.doInvoke(invocation);
373+
return this.doInvoke(invocation, true);
370374
}
371375

372-
private Object doInvoke(MethodInvocation invocation) throws Throwable {
376+
protected Object doInvoke(MethodInvocation invocation, boolean runningOnCallerThread) throws Throwable {
373377
Method method = invocation.getMethod();
374378
if (AopUtils.isToStringMethod(method)) {
375379
return "gateway proxy for service interface [" + this.serviceInterface + "]";
376380
}
377381
try {
378-
return this.invokeGatewayMethod(invocation);
382+
return this.invokeGatewayMethod(invocation, runningOnCallerThread);
379383
}
380384
catch (Throwable e) {//NOSONAR - ok to catch, rethrown below
381385
this.rethrowExceptionCauseIfPossible(e, invocation.getMethod());
382386
return null; // preceding call should always throw something
383387
}
384388
}
385389

386-
private Object invokeGatewayMethod(MethodInvocation invocation) throws Exception {
390+
private Object invokeGatewayMethod(MethodInvocation invocation, boolean runningOnCallerThread) throws Exception {
387391
if (!this.initialized) {
388392
this.afterPropertiesSet();
389393
}
390394
Method method = invocation.getMethod();
391395
MethodInvocationGateway gateway = this.gatewayMap.get(method);
392396
Class<?> returnType = method.getReturnType();
393397
boolean shouldReturnMessage = Message.class.isAssignableFrom(returnType)
394-
|| hasReturnParameterizedWithMessage(method);
398+
|| hasReturnParameterizedWithMessage(method, runningOnCallerThread);
395399
boolean shouldReply = returnType != void.class;
396400
int paramCount = method.getParameterTypes().length;
397401
Object response = null;
@@ -593,9 +597,10 @@ private <T> T convert(Object source, Class<T> expectedReturnType) {
593597
}
594598
}
595599

596-
private static boolean hasReturnParameterizedWithMessage(Method method) {
597-
if (Future.class.isAssignableFrom(method.getReturnType())
598-
|| (reactorPresent && Promise.class.isAssignableFrom(method.getReturnType()))) {
600+
private static boolean hasReturnParameterizedWithMessage(Method method, boolean runningOnCallerThread) {
601+
if (!runningOnCallerThread &&
602+
(Future.class.isAssignableFrom(method.getReturnType())
603+
|| (reactorPresent && Promise.class.isAssignableFrom(method.getReturnType())))) {
599604
Type returnType = method.getGenericReturnType();
600605
if (returnType instanceof ParameterizedType) {
601606
Type[] typeArgs = ((ParameterizedType) returnType).getActualTypeArguments();
@@ -634,7 +639,7 @@ private AsyncInvocationTask(MethodInvocation invocation) {
634639
@Override
635640
public Object call() throws Exception {
636641
try {
637-
return doInvoke(this.invocation);
642+
return doInvoke(this.invocation, false);
638643
}
639644
catch (Error e) {//NOSONAR
640645
throw e;

spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests-context.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,30 @@
4848
default-reply-channel="replyChannel"
4949
reactor-environment="reactorEnvironment"/>
5050

51+
<gateway id="asyncCompletable"
52+
service-interface="org.springframework.integration.gateway.TestService"
53+
default-request-channel="requestChannel"
54+
default-reply-channel="replyChannel"
55+
async-executor="testExecutor"/>
56+
57+
<gateway id="completableNoAsync"
58+
service-interface="org.springframework.integration.gateway.TestService"
59+
default-request-channel="requestChannel"
60+
default-reply-channel="replyChannel"
61+
async-executor=""/>
62+
63+
<gateway id="customCompletable"
64+
service-interface="org.springframework.integration.gateway.TestService"
65+
default-request-channel="requestChannel"
66+
default-reply-channel="replyChannel"
67+
async-executor=""/>
68+
69+
<gateway id="customCompletableAttemptAsync"
70+
service-interface="org.springframework.integration.gateway.TestService"
71+
default-request-channel="requestChannel"
72+
default-reply-channel="replyChannel"
73+
async-executor="testExecutor"/>
74+
5175
<!-- no assertions for this. The fact that this config does not result in error is sufficient -->
5276
<gateway id="defaultConfig" default-request-channel="nullChannel"/>
5377

0 commit comments

Comments
 (0)