Skip to content

Commit 1d97586

Browse files
committed
Apply ReactiveAdapterRegistry to return values
CoroutinesUtils#invokeSuspendingFunction in adapts return values to Mono and Flux in Kotlin. However, Flow methods don't have to be suspending functions. This change ensures that ReactiveAdapterRegistry is applied wherever return values are handled. Closes gh-988
1 parent a9a8d0b commit 1d97586

File tree

6 files changed

+170
-82
lines changed

6 files changed

+170
-82
lines changed

spring-graphql/src/main/java/org/springframework/graphql/data/federation/EntityHandlerMethod.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818

1919
import java.util.List;
2020
import java.util.Map;
21-
import java.util.concurrent.CompletableFuture;
2221
import java.util.concurrent.Executor;
2322

2423
import graphql.schema.DataFetchingEnvironment;
2524
import reactor.core.publisher.Mono;
2625

2726
import org.springframework.graphql.data.method.HandlerMethodArgumentResolverComposite;
2827
import org.springframework.graphql.data.method.annotation.support.DataFetcherHandlerMethodSupport;
28+
import org.springframework.graphql.execution.ReactiveAdapterRegistryHelper;
2929
import org.springframework.lang.Nullable;
3030

3131
/**
@@ -80,18 +80,8 @@ Mono<Object> getEntities(DataFetchingEnvironment env, List<Map<String, Object>>
8080
}
8181

8282
private Mono<Object> doInvoke(DataFetchingEnvironment env, Object[] args) {
83-
8483
Object result = doInvoke(env.getGraphQlContext(), args);
85-
86-
if (result instanceof Mono<?> mono) {
87-
return mono.cast(Object.class);
88-
}
89-
else if (result instanceof CompletableFuture<?> future) {
90-
return Mono.fromFuture(future);
91-
}
92-
else {
93-
return Mono.justOrEmpty(result);
94-
}
84+
return ReactiveAdapterRegistryHelper.toMono(result);
9585
}
9686

9787
}

spring-graphql/src/main/java/org/springframework/graphql/data/method/annotation/support/AnnotatedControllerConfigurer.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import org.springframework.graphql.data.pagination.CursorStrategy;
7979
import org.springframework.graphql.data.query.SortStrategy;
8080
import org.springframework.graphql.execution.BatchLoaderRegistry;
81+
import org.springframework.graphql.execution.ReactiveAdapterRegistryHelper;
8182
import org.springframework.graphql.execution.RuntimeWiringConfigurer;
8283
import org.springframework.graphql.execution.SelfDescribingDataFetcher;
8384
import org.springframework.graphql.execution.SubscriptionPublisherException;
@@ -528,18 +529,24 @@ public Object get(DataFetchingEnvironment environment) throws Exception {
528529
}
529530

530531
@SuppressWarnings({"unchecked", "ReactiveStreamsUnusedPublisher"})
532+
@Nullable
531533
private <T> Object applyExceptionHandling(
532534
DataFetchingEnvironment env, DataFetcherHandlerMethod handlerMethod, Object result) {
533535

534-
if (this.subscription && result instanceof Publisher<?> publisher) {
535-
result = Flux.from(publisher).onErrorResume((ex) -> handleSubscriptionError(ex, env, handlerMethod));
536+
if (this.subscription) {
537+
return ReactiveAdapterRegistryHelper.toSubscriptionFlux(result)
538+
.onErrorResume((ex) -> handleSubscriptionError(ex, env, handlerMethod));
536539
}
537-
else if (result instanceof Mono) {
540+
541+
result = ReactiveAdapterRegistryHelper.toMonoOrFluxIfReactive(result);
542+
543+
if (result instanceof Mono) {
538544
result = ((Mono<T>) result).onErrorResume((ex) -> (Mono<T>) handleException(ex, env, handlerMethod));
539545
}
540546
else if (result instanceof Flux<?>) {
541547
result = ((Flux<T>) result).onErrorResume((ex) -> (Mono<T>) handleException(ex, env, handlerMethod));
542548
}
549+
543550
return result;
544551
}
545552

spring-graphql/src/main/java/org/springframework/graphql/data/method/annotation/support/BatchLoaderHandlerMethod.java

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121
import java.util.Collection;
2222
import java.util.Map;
2323
import java.util.concurrent.Callable;
24-
import java.util.concurrent.CompletableFuture;
2524
import java.util.concurrent.Executor;
26-
import java.util.function.Function;
2725

2826
import graphql.GraphQLContext;
2927
import org.dataloader.BatchLoaderEnvironment;
@@ -37,6 +35,7 @@
3735
import org.springframework.graphql.data.method.HandlerMethod;
3836
import org.springframework.graphql.data.method.InvocableHandlerMethodSupport;
3937
import org.springframework.graphql.data.method.annotation.ContextValue;
38+
import org.springframework.graphql.execution.ReactiveAdapterRegistryHelper;
4039
import org.springframework.lang.Nullable;
4140
import org.springframework.util.Assert;
4241
import org.springframework.util.ClassUtils;
@@ -97,11 +96,11 @@ public <K, V> Mono<Map<K, V>> invokeForMap(Collection<K> keys, BatchLoaderEnviro
9796
Object[] args = getMethodArgumentValues(keys, environment);
9897
if (doesNotHaveAsyncArgs(args)) {
9998
Object result = doInvoke(environment.getContext(), args);
100-
return toMonoMap(result);
99+
return ReactiveAdapterRegistryHelper.toMono(result);
101100
}
102101
return toArgsMono(args).flatMap((argValues) -> {
103102
Object result = doInvoke(environment.getContext(), argValues);
104-
return toMonoMap(result);
103+
return ReactiveAdapterRegistryHelper.toMono(result);
105104
});
106105
}
107106

@@ -117,11 +116,11 @@ public <V> Flux<V> invokeForIterable(Collection<?> keys, BatchLoaderEnvironment
117116
Object[] args = getMethodArgumentValues(keys, environment);
118117
if (doesNotHaveAsyncArgs(args)) {
119118
Object result = doInvoke(environment.getContext(), args);
120-
return toFlux(result);
119+
return ReactiveAdapterRegistryHelper.toFluxFromCollection(result);
121120
}
122121
return toArgsMono(args).flatMapMany((resolvedArgs) -> {
123122
Object result = doInvoke(environment.getContext(), resolvedArgs);
124-
return toFlux(result);
123+
return ReactiveAdapterRegistryHelper.toFluxFromCollection(result);
125124
});
126125
}
127126

@@ -185,33 +184,4 @@ private boolean doesNotHaveAsyncArgs(Object[] args) {
185184
return Arrays.stream(args).noneMatch((arg) -> arg instanceof Mono);
186185
}
187186

188-
@SuppressWarnings("unchecked")
189-
private static <K, V> Mono<Map<K, V>> toMonoMap(@Nullable Object result) {
190-
if (result instanceof Map) {
191-
return Mono.just((Map<K, V>) result);
192-
}
193-
else if (result instanceof Mono) {
194-
return (Mono<Map<K, V>>) result;
195-
}
196-
else if (result instanceof CompletableFuture) {
197-
return Mono.fromFuture((CompletableFuture<? extends Map<K, V>>) result);
198-
}
199-
return Mono.error(new IllegalStateException("Unexpected return value: " + result));
200-
}
201-
202-
@SuppressWarnings("unchecked")
203-
private static <V> Flux<V> toFlux(@Nullable Object result) {
204-
if (result instanceof Collection) {
205-
return Flux.fromIterable((Collection<V>) result);
206-
}
207-
else if (result instanceof Flux) {
208-
return (Flux<V>) result;
209-
}
210-
else if (result instanceof CompletableFuture) {
211-
return Mono.fromFuture((CompletableFuture<? extends Collection<V>>) result)
212-
.flatMapIterable(Function.identity());
213-
}
214-
return Flux.error(new IllegalStateException("Unexpected return value: " + result));
215-
}
216-
217187
}

spring-graphql/src/main/java/org/springframework/graphql/data/method/annotation/support/DataFetcherHandlerMethod.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,16 @@
1818

1919
import java.util.Arrays;
2020
import java.util.concurrent.Callable;
21-
import java.util.concurrent.CompletableFuture;
2221
import java.util.concurrent.Executor;
2322
import java.util.function.BiConsumer;
2423

2524
import graphql.schema.DataFetchingEnvironment;
26-
import org.reactivestreams.Publisher;
27-
import reactor.core.publisher.Flux;
2825
import reactor.core.publisher.Mono;
2926

3027
import org.springframework.graphql.data.method.HandlerMethod;
3128
import org.springframework.graphql.data.method.HandlerMethodArgumentResolver;
3229
import org.springframework.graphql.data.method.HandlerMethodArgumentResolverComposite;
30+
import org.springframework.graphql.execution.ReactiveAdapterRegistryHelper;
3331
import org.springframework.lang.Nullable;
3432
import org.springframework.util.Assert;
3533

@@ -112,6 +110,7 @@ public Object invoke(DataFetchingEnvironment environment) {
112110
* @param providedArgs additional arguments to be matched by their type
113111
* @since 1.2.0
114112
*/
113+
@SuppressWarnings("ReactiveStreamsUnusedPublisher")
115114
@Nullable
116115
public Object invoke(DataFetchingEnvironment environment, Object... providedArgs) {
117116
Object[] args;
@@ -129,23 +128,11 @@ public Object invoke(DataFetchingEnvironment environment, Object... providedArgs
129128
return this.subscription ?
130129
toArgsMono(args).flatMapMany((argValues) -> {
131130
Object result = validateAndInvoke(argValues, environment);
132-
Assert.state(result instanceof Publisher, "Expected a Publisher from a Subscription response");
133-
return Flux.from((Publisher<?>) result);
131+
return ReactiveAdapterRegistryHelper.toSubscriptionFlux(result);
134132
}) :
135133
toArgsMono(args).flatMap((argValues) -> {
136134
Object result = validateAndInvoke(argValues, environment);
137-
if (result instanceof Mono<?> mono) {
138-
return mono;
139-
}
140-
else if (result instanceof Flux<?> flux) {
141-
return Flux.from(flux).collectList();
142-
}
143-
else if (result instanceof CompletableFuture<?> future) {
144-
return Mono.fromFuture(future);
145-
}
146-
else {
147-
return Mono.justOrEmpty(result);
148-
}
135+
return ReactiveAdapterRegistryHelper.toMono(result);
149136
});
150137
}
151138

spring-graphql/src/main/java/org/springframework/graphql/execution/ContextDataFetcherDecorator.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ private ContextDataFetcherDecorator(
7373
}
7474

7575

76+
@SuppressWarnings("ReactiveStreamsUnusedPublisher")
7677
@Override
7778
public Object get(DataFetchingEnvironment env) throws Exception {
7879

@@ -86,24 +87,22 @@ public Object get(DataFetchingEnvironment env) throws Exception {
8687
Object value = snapshot.wrap(() -> this.delegate.get(env)).call();
8788

8889
if (this.subscription) {
89-
Assert.state(value instanceof Publisher, "Expected Publisher for a subscription");
90-
Flux<?> flux = Flux.from((Publisher<?>) value).onErrorResume((exception) -> {
91-
// Already handled, e.g. controller methods?
92-
if (exception instanceof SubscriptionPublisherException) {
93-
return Mono.error(exception);
94-
}
95-
return this.subscriptionExceptionResolver.resolveException(exception)
96-
.flatMap((errors) -> Mono.error(new SubscriptionPublisherException(errors, exception)));
97-
});
98-
return flux.contextWrite(snapshot::updateContext);
90+
return ReactiveAdapterRegistryHelper.toSubscriptionFlux(value)
91+
.onErrorResume((exception) -> {
92+
// Already handled, e.g. controller methods?
93+
if (exception instanceof SubscriptionPublisherException) {
94+
return Mono.error(exception);
95+
}
96+
return this.subscriptionExceptionResolver.resolveException(exception)
97+
.flatMap((errors) -> Mono.error(new SubscriptionPublisherException(errors, exception)));
98+
})
99+
.contextWrite(snapshot::updateContext);
99100
}
100101

101-
if (value instanceof Flux) {
102-
value = ((Flux<?>) value).collectList();
103-
}
102+
value = ReactiveAdapterRegistryHelper.toMonoIfReactive(value);
104103

105-
if (value instanceof Mono<?> valueMono) {
106-
value = valueMono.contextWrite(snapshot::updateContext).toFuture();
104+
if (value instanceof Mono<?> mono) {
105+
value = mono.contextWrite(snapshot::updateContext).toFuture();
107106
}
108107

109108
return value;
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright 2002-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.graphql.execution;
18+
19+
import java.util.Collection;
20+
21+
import io.micrometer.context.Nullable;
22+
import org.reactivestreams.Publisher;
23+
import reactor.core.publisher.Flux;
24+
import reactor.core.publisher.Mono;
25+
26+
import org.springframework.core.ReactiveAdapter;
27+
import org.springframework.core.ReactiveAdapterRegistry;
28+
import org.springframework.util.Assert;
29+
30+
/**
31+
* Helper to adapt a result Object to {@link Mono} or {@link Flux} through
32+
* {@link ReactiveAdapterRegistry}.
33+
*
34+
* @author Rossen Stoyanchev
35+
* @since 1.3.1
36+
*/
37+
@SuppressWarnings({"ReactiveStreamsUnusedPublisher", "unchecked"})
38+
public abstract class ReactiveAdapterRegistryHelper {
39+
40+
private static final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
41+
42+
43+
/**
44+
* Return a {@link Mono} for the given Object by delegating to
45+
* {@link #toMonoIfReactive}, and then applying {@link Mono#justOrEmpty}
46+
* if necessary.
47+
* @param result the result Object to adapt
48+
* @param <T> the type of element in the Mono to cast to
49+
* @return a {@code Mono} that represents the result
50+
*/
51+
public static <T> Mono<T> toMono(@Nullable Object result) {
52+
result = toMonoIfReactive(result);
53+
return (Mono<T>) ((result instanceof Mono<?> mono) ? mono : Mono.justOrEmpty(result));
54+
}
55+
56+
/**
57+
* Return a {@link Mono} for the given result Object if it can be adapted
58+
* to a {@link Publisher} via {@link ReactiveAdapterRegistry}. Multivalued
59+
* publishers are collected to a List.
60+
* @param result the result Object to adapt
61+
* @return the same instance or a {@code Mono} if the object is known to
62+
* {@code ReactiveAdapterRegistry}
63+
*/
64+
@Nullable
65+
public static Object toMonoIfReactive(@Nullable Object result) {
66+
ReactiveAdapter adapter = ((result != null) ? registry.getAdapter(result.getClass()) : null);
67+
if (adapter == null) {
68+
return result;
69+
}
70+
Publisher<?> publisher = adapter.toPublisher(result);
71+
return (adapter.isMultiValue() ? Flux.from(publisher).collectList() : Mono.from(publisher));
72+
}
73+
74+
/**
75+
* Adapt the given result Object to {@link Mono} or {@link Flux} if it can
76+
* be adapted to a single or multi-value {@link Publisher} respectively
77+
* via {@link ReactiveAdapterRegistry}.
78+
* @param result the result Object to adapt
79+
* @return the same instance, a {@code Mono}, or a {@code Flux}
80+
*/
81+
@Nullable
82+
public static Object toMonoOrFluxIfReactive(@Nullable Object result) {
83+
ReactiveAdapter adapter = ((result != null) ? registry.getAdapter(result.getClass()) : null);
84+
if (adapter == null) {
85+
return result;
86+
}
87+
Publisher<Object> publisher = adapter.toPublisher(result);
88+
return (adapter.isMultiValue() ? Flux.from(publisher) : Mono.from(publisher));
89+
}
90+
91+
/**
92+
* Return a {@link Flux} for the given result Object, adapting to a
93+
* {@link Publisher} first if necessary via {@link ReactiveAdapterRegistry}.
94+
* @param result the result Object to adapt
95+
* @return a {@link Flux}, possibly empty if the result is {@code null}
96+
*/
97+
public static Flux<?> toSubscriptionFlux(@Nullable Object result) {
98+
if (result == null) {
99+
return Flux.empty();
100+
}
101+
if (result instanceof Publisher<?> publisher) {
102+
return Flux.from(publisher);
103+
}
104+
ReactiveAdapter adapter = registry.getAdapter(result.getClass());
105+
Assert.state(adapter != null, "Expected Publisher for a subscription");
106+
return Flux.from(adapter.toPublisher(result));
107+
}
108+
109+
/**
110+
* Return a {@link Flux} for the given result Object that represents a
111+
* logical collection of values. The Object must be a {@link Collection}
112+
* or a publisher of a {@code Collection}, which is flattened with
113+
* {@link Flux#fromIterable(Iterable)}, or a multi-value publisher.
114+
* @param result the result Object to adapt
115+
* @param <T> the type of element in the collection to cast to
116+
* @return a {@code Flux} that represents the collection
117+
*/
118+
public static <T> Flux<T> toFluxFromCollection(@Nullable Object result) {
119+
if (result instanceof Collection) {
120+
return Flux.fromIterable((Collection<T>) result);
121+
}
122+
ReactiveAdapter adapter = ((result != null) ? registry.getAdapter(result.getClass()) : null);
123+
if (adapter == null) {
124+
return Flux.error(new IllegalStateException("Unexpected return value: " + result));
125+
}
126+
Publisher<?> publisher = adapter.toPublisher(result);
127+
if (adapter.isMultiValue()) {
128+
return (Flux<T>) Flux.from(publisher);
129+
}
130+
else {
131+
return Mono.from(publisher).flatMapMany((c) -> Flux.fromIterable((Collection<T>) c));
132+
}
133+
}
134+
135+
}

0 commit comments

Comments
 (0)