Skip to content

Commit 35c0a6b

Browse files
Merge pull request #149 from graph-quilt/defer-split
Emit initial ei before splitting deferred ei
2 parents 63dc972 + 5b5fc12 commit 35c0a6b

File tree

6 files changed

+231
-44
lines changed

6 files changed

+231
-44
lines changed

pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<graphql-sdl-version>3.0.1</graphql-sdl-version>
4040
<xtextVersion>2.26.0</xtextVersion>
4141
<graphQLVersion>17.4</graphQLVersion>
42+
<reactorVersion>3.4.24</reactorVersion>
4243
</properties>
4344

4445
<profiles>
@@ -162,7 +163,13 @@
162163
<dependency>
163164
<groupId>io.projectreactor</groupId>
164165
<artifactId>reactor-core</artifactId>
165-
<version>3.4.24</version>
166+
<version>${reactorVersion}</version>
167+
</dependency>
168+
<dependency>
169+
<groupId>io.projectreactor</groupId>
170+
<artifactId>reactor-test</artifactId>
171+
<version>${reactorVersion}</version>
172+
<scope>test</scope>
166173
</dependency>
167174

168175
<!-- TEST DEPENDENCIES -->

src/main/java/com/intuit/graphql/orchestrator/GraphQLOrchestrator.java

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.intuit.graphql.orchestrator.deferDirective.DeferDirectiveInstrumentation;
44
import com.intuit.graphql.orchestrator.schema.RuntimeGraph;
5-
import com.intuit.graphql.orchestrator.utils.MultipartUtil;
5+
import com.intuit.graphql.orchestrator.utils.MultiEIGenerator;
66
import graphql.ExecutionInput;
77
import graphql.ExecutionResult;
88
import graphql.ExecutionResultImpl;
@@ -102,41 +102,48 @@ public CompletableFuture<ExecutionResult> execute(ExecutionInput executionInput,
102102
}
103103

104104
private CompletableFuture<ExecutionResult> executeWithDefer(ExecutionInput executionInput) {
105-
List<ExecutionInput> splitExecutionInputs = MultipartUtil.splitMultipartExecutionInput(executionInput).stream()
106-
.map(ei -> {
107-
DataLoaderRegistry registry = buildNewDataLoaderRegistry();
108-
109-
GraphQLContext graphqlContext = GraphQLContext.newContext()
110-
.of((GraphQLContext)executionInput.getContext())
111-
.put(DATA_LOADER_REGISTRY_CONTEXT_KEY, registry)
112-
.put(USE_DEFER, true)
113-
.build();
114-
return ei.transform(builder -> {
115-
builder.dataLoaderRegistry(registry);
116-
builder.context(graphqlContext);
117-
});
118-
})
119-
.collect(Collectors.toList());
120-
121-
int expectedResponses = splitExecutionInputs.size();
122105
AtomicInteger responses = new AtomicInteger(0);
106+
MultiEIGenerator eiGenerator = new MultiEIGenerator(executionInput);
123107

124-
Flux<Object> executionResultPublisher = Flux.fromIterable(splitExecutionInputs)
108+
Flux<Object> executionResultPublisher = eiGenerator.generateEIs()
109+
.filter(ei -> !ei.getQuery().equals(""))
125110
.publishOn(Schedulers.elastic())
111+
.map(ei -> {
112+
log.error("Timestamp processing emittedValue: {}", System.currentTimeMillis());
113+
return this.generateEIWIthNewContext(ei);
114+
})
126115
.map(constructGraphQL()::executeAsync)
127116
.map(CompletableFuture::join)
128117
.doOnNext(executionResult -> responses.getAndIncrement())
129118
.map(ExecutionResultImpl.newExecutionResult()::from)
130-
.map(builder -> builder.addExtension("hasMoreData", (expectedResponses != responses.get())))
119+
.map(builder -> builder.addExtension("hasMoreData", hasMoreData(eiGenerator.getNumOfEIs(), responses.get())))
131120
.map(ExecutionResultImpl.Builder::build)
132121
.map(Object.class::cast)
133-
.take(expectedResponses);
122+
.takeUntil(object -> eiGenerator.getNumOfEIs() != null && !hasMoreData(eiGenerator.getNumOfEIs(), responses.get()));
134123

135124
SubscriptionPublisher multiResultPublisher = new SubscriptionPublisher(executionResultPublisher, null);
136125

137126
return CompletableFuture.completedFuture(ExecutionResultImpl.newExecutionResult().data(multiResultPublisher).build());
138127
}
139128

129+
private boolean hasMoreData(Integer expectedNumOfEIs, Integer numOfResponses) {
130+
return expectedNumOfEIs == null || expectedNumOfEIs.intValue() != numOfResponses.intValue();
131+
}
132+
private ExecutionInput generateEIWIthNewContext(ExecutionInput ei) {
133+
DataLoaderRegistry registry = buildNewDataLoaderRegistry();
134+
135+
GraphQLContext graphqlContext = GraphQLContext.newContext()
136+
.of((GraphQLContext)ei.getContext())
137+
.put(DATA_LOADER_REGISTRY_CONTEXT_KEY, registry)
138+
.put(USE_DEFER, true)
139+
.build();
140+
141+
return ei.transform(builder -> {
142+
builder.dataLoaderRegistry(registry);
143+
builder.context(graphqlContext);
144+
});
145+
}
146+
140147
private GraphQL constructGraphQL() {
141148
final GraphQL.Builder graphqlBuilder = GraphQL.newGraphQL(runtimeGraph.getExecutableSchema())
142149
.instrumentation(new ChainedInstrumentation(instrumentations))
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.intuit.graphql.orchestrator.utils;
2+
3+
import com.google.common.annotations.VisibleForTesting;
4+
import graphql.ExecutionInput;
5+
import lombok.extern.slf4j.Slf4j;
6+
import reactor.core.publisher.Flux;
7+
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
11+
@Slf4j
12+
public class MultiEIGenerator {
13+
14+
private List<ExecutionInput> eis = new ArrayList<>();
15+
private Integer numOfEIs = null;
16+
private final static String EMPTY_QUERY ="";
17+
18+
@VisibleForTesting
19+
private long timeProcessedSplit = 0;
20+
21+
public MultiEIGenerator(ExecutionInput ei) {
22+
this.eis.add(ei);
23+
}
24+
25+
public Flux<ExecutionInput> generateEIs() {
26+
return Flux.generate(() -> 0, (indexToProcess, sink) -> {
27+
ExecutionInput emittedEI = null;
28+
int nextIndex = indexToProcess + 1;
29+
30+
//Only emit if it is the initial index or a valid index
31+
if(this.numOfEIs == null || nextIndex <= this.numOfEIs) {
32+
emittedEI = this.eis.get(indexToProcess);
33+
//emit the index that needs to be processed
34+
sink.next(emittedEI);
35+
}
36+
37+
//if null/first iteration then proceed to split ei and add to list of eis needing to be emitted
38+
if(this.numOfEIs == null) {
39+
this.timeProcessedSplit = System.currentTimeMillis();
40+
//Adds elements to list of eis that need to be processed
41+
try {
42+
this.eis.addAll(MultipartUtil.splitMultipartExecutionInput(emittedEI));
43+
}
44+
catch (Exception ex) {
45+
sink.error(ex);
46+
sink.complete();
47+
}
48+
//sets the number of expected eis which is also the number of responses expected
49+
this.numOfEIs = this.eis.size();
50+
} else if(nextIndex > this.numOfEIs) {
51+
//index reached the end of all the eis that need to be processed
52+
sink.complete();
53+
}
54+
55+
//Call generator with the next index to process
56+
return nextIndex;
57+
});
58+
}
59+
60+
public Integer getNumOfEIs() {
61+
return this.numOfEIs;
62+
}
63+
}

src/main/java/com/intuit/graphql/orchestrator/utils/MultipartUtil.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ public static List<ExecutionInput> splitMultipartExecutionInput(ExecutionInput o
5454
}
5555
});
5656

57-
//add original ei back into list
58-
eiList.add(0, originalEI);
5957
return eiList;
6058
}
6159

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package com.intuit.graphql.orchestrator.utils
2+
3+
import graphql.ExecutionInput
4+
import graphql.parser.InvalidSyntaxException
5+
import reactor.test.StepVerifier
6+
import spock.lang.Specification
7+
8+
class MultiEIGeneratorSpec extends Specification {
9+
10+
MultiEIGenerator multiEIGenerator
11+
12+
def "Generator split query correctly"() {
13+
given:
14+
String query = '''
15+
query getPetsDeferred {
16+
pets {
17+
id
18+
name
19+
type @defer
20+
}
21+
}
22+
'''
23+
24+
String deferredQuery = "query getPetsDeferred {\n" +
25+
" pets {\n" +
26+
" type\n" +
27+
" __typename\n" +
28+
" }\n" +
29+
"}\n"
30+
31+
ExecutionInput ei = ExecutionInput.newExecutionInput(query).build()
32+
33+
when:
34+
multiEIGenerator = new MultiEIGenerator(ei)
35+
36+
then:
37+
StepVerifier.create(multiEIGenerator.generateEIs())
38+
.expectNextMatches({ response -> response.getQuery() == query })
39+
.expectNextMatches({ response -> response.getQuery() == deferredQuery })
40+
.verifyComplete()
41+
}
42+
43+
def "Initial EI is processed before splitting query"(){
44+
given:
45+
String query = '''
46+
query getPetsDeferred {
47+
pets {
48+
id
49+
name
50+
type @defer
51+
}
52+
}
53+
'''
54+
55+
String deferredQuery = "query getPetsDeferred {\n" +
56+
" pets {\n" +
57+
" type\n" +
58+
" __typename\n" +
59+
" }\n" +
60+
"}\n"
61+
62+
ExecutionInput ei = ExecutionInput.newExecutionInput(query).build()
63+
64+
long timeEmitted = 0;
65+
66+
when:
67+
multiEIGenerator = new MultiEIGenerator(ei)
68+
69+
then:
70+
StepVerifier.create(multiEIGenerator.generateEIs())
71+
.expectNextMatches({ response ->
72+
timeEmitted = System.currentTimeMillis()
73+
return response.query == query && response
74+
})
75+
.expectAccessibleContext()
76+
.assertThat({ e -> this.multiEIGenerator.timeProcessedSplit > timeEmitted })
77+
.then()
78+
.expectNextMatches({ response -> response.getQuery() == deferredQuery })
79+
.verifyComplete()
80+
}
81+
82+
def "EI w/o defer flux completes and only emits 1 object"(){
83+
given:
84+
String query = '''
85+
query getPetsDeferred {
86+
pets {
87+
id
88+
name
89+
type
90+
}
91+
}
92+
'''
93+
94+
ExecutionInput ei = ExecutionInput.newExecutionInput(query).build()
95+
96+
when:
97+
multiEIGenerator = new MultiEIGenerator(ei)
98+
99+
then:
100+
StepVerifier.create(multiEIGenerator.generateEIs())
101+
.expectNextMatches({ response -> response.query == query})
102+
.verifyComplete()
103+
}
104+
105+
def "emits error if it throws error when trying to split ei"(){
106+
given:
107+
String query = ""
108+
109+
ExecutionInput ei = ExecutionInput.newExecutionInput(query).build()
110+
111+
when:
112+
multiEIGenerator = new MultiEIGenerator(ei)
113+
114+
then:
115+
StepVerifier.create(multiEIGenerator.generateEIs())
116+
.expectNextMatches( {emptyEi -> emptyEi.getQuery() == query})
117+
.expectError(InvalidSyntaxException.class)
118+
.verify()
119+
}
120+
}

src/test/groovy/com/intuit/graphql/orchestrator/utils/MultipartUtilSpec.groovy

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@ class MultipartUtilSpec extends Specification {
2020
List<ExecutionInput> splitSet = MultipartUtil.splitMultipartExecutionInput(input)
2121

2222
then:
23-
splitSet.size() == 2
24-
splitSet.get(0).query == query
25-
splitSet.get(1).query == "query {\n" +
23+
splitSet.size() == 1
24+
splitSet.get(0).query == "query {\n" +
2625
" queryA {\n" +
2726
" fieldC\n" +
2827
" __typename\n" +
@@ -39,9 +38,8 @@ class MultipartUtilSpec extends Specification {
3938
List<ExecutionInput> splitSet = MultipartUtil.splitMultipartExecutionInput(input)
4039

4140
then:
42-
splitSet.size() == 2
43-
splitSet.get(0).query == query
44-
splitSet.get(1).query == "query {\n" +
41+
splitSet.size() == 1
42+
splitSet.get(0).query == "query {\n" +
4543
" queryA {\n" +
4644
" aliasB: fieldB\n" +
4745
" __typename\n" +
@@ -58,11 +56,8 @@ class MultipartUtilSpec extends Specification {
5856
List<ExecutionInput> splitSet = MultipartUtil.splitMultipartExecutionInput(input)
5957

6058
then:
61-
splitSet.size() == 2
62-
63-
splitSet.get(0).query == query
64-
65-
splitSet.get(1).query == "query {\n" +
59+
splitSet.size() == 1
60+
splitSet.get(0).query == "query {\n" +
6661
" getFoo(id: \"inputA\") {\n" +
6762
" fieldB\n" +
6863
" __typename\n" +
@@ -79,15 +74,14 @@ class MultipartUtilSpec extends Specification {
7974
List<ExecutionInput> splitSet = MultipartUtil.splitMultipartExecutionInput(input)
8075

8176
then:
82-
splitSet.size() == 3
83-
splitSet.get(0).query == query
84-
splitSet.get(1).query == "query {\n" +
77+
splitSet.size() == 2
78+
splitSet.get(0).query == "query {\n" +
8579
" queryA {\n" +
8680
" fieldB\n" +
8781
" __typename\n" +
8882
" }\n" +
8983
"}\n"
90-
splitSet.get(2).query == "query {\n" +
84+
splitSet.get(1).query == "query {\n" +
9185
" queryA {\n" +
9286
" fieldC\n" +
9387
" __typename\n" +
@@ -106,9 +100,8 @@ class MultipartUtilSpec extends Specification {
106100
List<ExecutionInput> splitSet = MultipartUtil.splitMultipartExecutionInput(input)
107101

108102
then:
109-
splitSet.size() == 2
110-
splitSet.get(0).query == query
111-
splitSet.get(1).query == "query {\n" +
103+
splitSet.size() == 1
104+
splitSet.get(0).query == "query {\n" +
112105
" queryA {\n" +
113106
" objectField {\n" +
114107
" fieldC\n" +
@@ -128,8 +121,7 @@ class MultipartUtilSpec extends Specification {
128121
List<ExecutionInput> splitSet = MultipartUtil.splitMultipartExecutionInput(input)
129122

130123
then:
131-
splitSet.size() == 1
132-
splitSet.get(0).query == query
124+
splitSet.size() == 0
133125
}
134126

135127
//todo

0 commit comments

Comments
 (0)