3
3
import com .google .common .io .ByteStreams ;
4
4
import com .google .common .io .CharStreams ;
5
5
import graphql .ExecutionResult ;
6
+ import graphql .GraphQL ;
7
+ import graphql .execution .reactive .SingleSubscriberPublisher ;
6
8
import graphql .introspection .IntrospectionQuery ;
7
9
import graphql .schema .GraphQLFieldDefinition ;
8
10
import graphql .servlet .config .GraphQLConfiguration ;
13
15
import graphql .servlet .core .GraphQLServletListener ;
14
16
import graphql .servlet .core .internal .GraphQLRequest ;
15
17
import graphql .servlet .core .internal .VariableMapper ;
16
- import graphql .servlet .input .BatchInputPreProcessResult ;
17
- import graphql .servlet .input .BatchInputPreProcessor ;
18
- import graphql .servlet .input .GraphQLBatchedInvocationInput ;
19
- import graphql .servlet .input .GraphQLInvocationInputFactory ;
20
- import graphql .servlet .input .GraphQLSingleInvocationInput ;
18
+ import graphql .servlet .input .*;
21
19
import org .reactivestreams .Publisher ;
22
20
import org .reactivestreams .Subscriber ;
23
21
import org .reactivestreams .Subscription ;
28
26
import javax .servlet .AsyncEvent ;
29
27
import javax .servlet .AsyncListener ;
30
28
import javax .servlet .Servlet ;
31
- import javax .servlet .ServletException ;
32
29
import javax .servlet .http .HttpServlet ;
33
30
import javax .servlet .http .HttpServletRequest ;
34
31
import javax .servlet .http .HttpServletResponse ;
35
32
import javax .servlet .http .Part ;
36
- import java .io .BufferedInputStream ;
37
- import java .io .ByteArrayOutputStream ;
38
- import java .io .IOException ;
39
- import java .io .InputStream ;
40
- import java .io .Writer ;
41
- import java .util .ArrayList ;
42
- import java .util .Arrays ;
43
- import java .util .HashMap ;
44
- import java .util .Iterator ;
45
- import java .util .List ;
46
- import java .util .Map ;
47
- import java .util .Objects ;
48
- import java .util .Optional ;
33
+ import java .io .*;
34
+ import java .util .*;
49
35
import java .util .concurrent .CountDownLatch ;
50
36
import java .util .concurrent .atomic .AtomicReference ;
51
37
import java .util .function .BiConsumer ;
@@ -354,13 +340,13 @@ private void doRequest(HttpServletRequest request, HttpServletResponse response,
354
340
}
355
341
356
342
@ Override
357
- protected void doGet (HttpServletRequest req , HttpServletResponse resp ) throws ServletException , IOException {
343
+ protected void doGet (HttpServletRequest req , HttpServletResponse resp ) {
358
344
init ();
359
345
doRequestAsync (req , resp , getHandler );
360
346
}
361
347
362
348
@ Override
363
- protected void doPost (HttpServletRequest req , HttpServletResponse resp ) throws ServletException , IOException {
349
+ protected void doPost (HttpServletRequest req , HttpServletResponse resp ) {
364
350
init ();
365
351
doRequestAsync (req , resp , postHandler );
366
352
}
@@ -373,7 +359,9 @@ private void query(GraphQLQueryInvoker queryInvoker, GraphQLObjectMapper graphQL
373
359
HttpServletRequest req , HttpServletResponse resp ) throws IOException {
374
360
ExecutionResult result = queryInvoker .query (invocationInput );
375
361
376
- if (!(result .getData () instanceof Publisher )) {
362
+ boolean isDeferred = Objects .nonNull (result .getExtensions ()) && result .getExtensions ().containsKey (GraphQL .DEFERRED_RESULTS );
363
+
364
+ if (!(result .getData () instanceof Publisher || isDeferred )) {
377
365
resp .setContentType (APPLICATION_JSON_UTF8 );
378
366
resp .setStatus (STATUS_OK );
379
367
resp .getWriter ().write (graphQLObjectMapper .serializeResultAsJson (result ));
@@ -390,7 +378,16 @@ private void query(GraphQLQueryInvoker queryInvoker, GraphQLObjectMapper graphQL
390
378
AtomicReference <Subscription > subscriptionRef = new AtomicReference <>();
391
379
asyncContext .addListener (new SubscriptionAsyncListener (subscriptionRef ));
392
380
ExecutionResultSubscriber subscriber = new ExecutionResultSubscriber (subscriptionRef , asyncContext , graphQLObjectMapper );
393
- ((Publisher <ExecutionResult >) result .getData ()).subscribe (subscriber );
381
+ List <Publisher <ExecutionResult >> publishers = new ArrayList <>();
382
+ if (result .getData () instanceof Publisher ) {
383
+ publishers .add (result .getData ());
384
+ } else {
385
+ publishers .add (new StaticDataPublisher <>(result ));
386
+ final Publisher <ExecutionResult > deferredResultsPublisher = (Publisher <ExecutionResult >) result .getExtensions ().get (GraphQL .DEFERRED_RESULTS );
387
+ publishers .add (deferredResultsPublisher );
388
+ }
389
+ publishers .forEach (it -> it .subscribe (subscriber ));
390
+
394
391
if (isInAsyncThread ) {
395
392
// We need to delay the completion of async context until after the subscription has terminated, otherwise the AsyncContext is prematurely closed.
396
393
try {
@@ -537,7 +534,6 @@ public void onStartAsync(AsyncEvent event) {
537
534
}
538
535
}
539
536
540
-
541
537
private static class ExecutionResultSubscriber implements Subscriber <ExecutionResult > {
542
538
543
539
private final AtomicReference <Subscription > subscriptionRef ;
@@ -584,4 +580,13 @@ public void await() throws InterruptedException {
584
580
completedLatch .await ();
585
581
}
586
582
}
583
+
584
+ private static class StaticDataPublisher <T > extends SingleSubscriberPublisher <T > implements Publisher <T > {
585
+ StaticDataPublisher (T data ) {
586
+ super ();
587
+ super .offer (data );
588
+ super .noMoreData ();
589
+ }
590
+ }
591
+
587
592
}
0 commit comments