Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import org.springframework.beans.BeanMetadataAttribute;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.BeanDefinitionStoreException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
Expand All @@ -38,10 +39,12 @@
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.annotation.AnnotationConstants;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean;
import org.springframework.integration.gateway.GatewayMethodMetadata;
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
import org.springframework.integration.util.MessagingAnnotationUtils;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
Expand All @@ -55,7 +58,14 @@
* @author Andy Wilksinson
* @since 4.0
*/
public class MessagingGatewayRegistrar implements ImportBeanDefinitionRegistrar {
public class MessagingGatewayRegistrar implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware {

private ClassLoader beanClassLoader;

@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.beanClassLoader = classLoader;
}

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
Expand All @@ -72,6 +82,13 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B
}

public BeanDefinitionHolder parse(Map<String, Object> gatewayAttributes) {
boolean completableFutureCapable = true;
try {
ClassUtils.forName("java.util.concurrent.CompletableFuture", this.beanClassLoader);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this one is a part of JVM I don't see reason to worry about classloader.
Although you have already done that we can live with that and merge as is.

}
catch (Exception e1) {
completableFutureCapable = false;
}

String defaultPayloadExpression = (String) gatewayAttributes.get("defaultPayloadExpression");

Expand All @@ -93,7 +110,9 @@ public BeanDefinitionHolder parse(Map<String, Object> gatewayAttributes) {
boolean hasDefaultHeaders = !ObjectUtils.isEmpty(defaultHeaders);
Assert.state(!hasMapper || !hasDefaultHeaders, "'defaultHeaders' are not allowed when a 'mapper' is provided");

BeanDefinitionBuilder gatewayProxyBuilder = BeanDefinitionBuilder.genericBeanDefinition(GatewayProxyFactoryBean.class);
BeanDefinitionBuilder gatewayProxyBuilder = BeanDefinitionBuilder.genericBeanDefinition(
completableFutureCapable ? GatewayCompletableFutureProxyFactoryBean.class
: GatewayProxyFactoryBean.class);

if (hasDefaultHeaders || hasDefaultPayloadExpression) {
BeanDefinitionBuilder methodMetadataBuilder = BeanDefinitionBuilder.genericBeanDefinition(GatewayMethodMetadata.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gateway;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.aopalliance.intercept.MethodInvocation;

import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.messaging.MessagingException;


/**
* A gateway proxy factory bean that can handle JDK8 {@link CompletableFuture}s. If a
* gateway method returns {@link CompletableFuture} exactly, one will be returned and the
* results of the
* {@link CompletableFuture#supplyAsync(Supplier, java.util.concurrent.Executor)
* supplyAsync} method call will be returned. If you wish your integration flow to return
* a {@link CompletableFuture} to the gateway in a reply message, the async executor must
* be set to {@code null}. If the return type is a subclass of {@link CompletableFuture},
* it must be returned by the integration flow and the async executor (if present) is not
* used.
*
* @author Gary Russell
* @since 4.2
*
*/
public class GatewayCompletableFutureProxyFactoryBean extends GatewayProxyFactoryBean {

public GatewayCompletableFutureProxyFactoryBean() {
super();
}

public GatewayCompletableFutureProxyFactoryBean(Class<?> serviceInterface) {
super(serviceInterface);
}

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
final Class<?> returnType = invocation.getMethod().getReturnType();
if (CompletableFuture.class.equals(returnType)) { // exact
AsyncTaskExecutor asyncExecutor = getAsyncExecutor();
if (asyncExecutor != null) {
return CompletableFuture.supplyAsync(new Invoker(invocation), asyncExecutor);
}
}
return super.invoke(invocation);
}

private class Invoker implements Supplier<Object> {

private final MethodInvocation invocation;

public Invoker(MethodInvocation methodInvocation) {
this.invocation = methodInvocation;
}

@Override
public Object get() {
try {
return doInvoke(invocation, false);
}
catch (Error e) {//NOSONAR
throw e;
}
catch (Throwable t) {//NOSONAR
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
throw new MessagingException("asynchronous gateway invocation failed", t);
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@
import org.springframework.util.StringUtils;

import reactor.Environment;
import reactor.fn.Functions;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.fn.Functions;

/**
* Generates a proxy for the provided service interface to enable interaction
Expand Down Expand Up @@ -279,6 +279,10 @@ public final void setMapper(MethodArgsMessageMapper mapper) {
this.argsMapper = mapper;
}

protected AsyncTaskExecutor getAsyncExecutor() {
return asyncExecutor;
}

@Override
protected void onInit() {
synchronized (this.initializationMonitor) {
Expand Down Expand Up @@ -366,32 +370,32 @@ else if (Future.class.isAssignableFrom(returnType)) {
return Promises.<Object>task((Environment) this.reactorEnvironment,
Functions.supplier(new AsyncInvocationTask(invocation)));
}
return this.doInvoke(invocation);
return this.doInvoke(invocation, true);
}

private Object doInvoke(MethodInvocation invocation) throws Throwable {
protected Object doInvoke(MethodInvocation invocation, boolean runningOnCallerThread) throws Throwable {
Method method = invocation.getMethod();
if (AopUtils.isToStringMethod(method)) {
return "gateway proxy for service interface [" + this.serviceInterface + "]";
}
try {
return this.invokeGatewayMethod(invocation);
return this.invokeGatewayMethod(invocation, runningOnCallerThread);
}
catch (Throwable e) {//NOSONAR - ok to catch, rethrown below
this.rethrowExceptionCauseIfPossible(e, invocation.getMethod());
return null; // preceding call should always throw something
}
}

private Object invokeGatewayMethod(MethodInvocation invocation) throws Exception {
private Object invokeGatewayMethod(MethodInvocation invocation, boolean runningOnCallerThread) throws Exception {
if (!this.initialized) {
this.afterPropertiesSet();
}
Method method = invocation.getMethod();
MethodInvocationGateway gateway = this.gatewayMap.get(method);
Class<?> returnType = method.getReturnType();
boolean shouldReturnMessage = Message.class.isAssignableFrom(returnType)
|| hasReturnParameterizedWithMessage(method);
|| hasReturnParameterizedWithMessage(method, runningOnCallerThread);
boolean shouldReply = returnType != void.class;
int paramCount = method.getParameterTypes().length;
Object response = null;
Expand Down Expand Up @@ -593,9 +597,10 @@ private <T> T convert(Object source, Class<T> expectedReturnType) {
}
}

private static boolean hasReturnParameterizedWithMessage(Method method) {
if (Future.class.isAssignableFrom(method.getReturnType())
|| (reactorPresent && Promise.class.isAssignableFrom(method.getReturnType()))) {
private static boolean hasReturnParameterizedWithMessage(Method method, boolean runningOnCallerThread) {
if (!runningOnCallerThread &&
(Future.class.isAssignableFrom(method.getReturnType())
|| (reactorPresent && Promise.class.isAssignableFrom(method.getReturnType())))) {
Type returnType = method.getGenericReturnType();
if (returnType instanceof ParameterizedType) {
Type[] typeArgs = ((ParameterizedType) returnType).getActualTypeArguments();
Expand Down Expand Up @@ -634,7 +639,7 @@ private AsyncInvocationTask(MethodInvocation invocation) {
@Override
public Object call() throws Exception {
try {
return doInvoke(this.invocation);
return doInvoke(this.invocation, false);
}
catch (Error e) {//NOSONAR
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,30 @@
default-reply-channel="replyChannel"
reactor-environment="reactorEnvironment"/>

<gateway id="asyncCompletable"
service-interface="org.springframework.integration.gateway.TestService"
default-request-channel="requestChannel"
default-reply-channel="replyChannel"
async-executor="testExecutor"/>

<gateway id="completableNoAsync"
service-interface="org.springframework.integration.gateway.TestService"
default-request-channel="requestChannel"
default-reply-channel="replyChannel"
async-executor=""/>

<gateway id="customCompletable"
service-interface="org.springframework.integration.gateway.TestService"
default-request-channel="requestChannel"
default-reply-channel="replyChannel"
async-executor=""/>

<gateway id="customCompletableAttemptAsync"
service-interface="org.springframework.integration.gateway.TestService"
default-request-channel="requestChannel"
default-reply-channel="replyChannel"
async-executor="testExecutor"/>

<!-- no assertions for this. The fact that this config does not result in error is sufficient -->
<gateway id="defaultConfig" default-request-channel="nullChannel"/>

Expand Down
Loading