Skip to content

Commit 24330c0

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
1 parent 4c262a2 commit 24330c0

File tree

2 files changed

+167
-0
lines changed

2 files changed

+167
-0
lines changed

src/execution/__tests__/stream-test.js

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { expect } from 'chai';
22
import { describe, it } from 'mocha';
33

4+
import { invariant } from '../../jsutils/invariant';
45
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
56
import { parse } from '../../language/parser';
67

@@ -72,6 +73,36 @@ const query = new GraphQLObjectType({
7273
yield {};
7374
},
7475
},
76+
asyncIterableListDelayed: {
77+
type: new GraphQLList(friendType),
78+
async *resolve() {
79+
for (const friend of friends) {
80+
// pause an additional ms before yielding to allow time
81+
// for tests to return or throw before next value is processed.
82+
// eslint-disable-next-line no-await-in-loop
83+
await new Promise((r) => setTimeout(r, 1));
84+
yield friend;
85+
}
86+
},
87+
},
88+
asyncIterableListNoReturn: {
89+
type: new GraphQLList(friendType),
90+
resolve() {
91+
let i = 0;
92+
return {
93+
[Symbol.asyncIterator]: () => ({
94+
async next() {
95+
const friend = friends[i++];
96+
if (friend) {
97+
await new Promise((r) => setTimeout(r, 1));
98+
return { value: friend, done: false };
99+
}
100+
return { value: undefined, done: true };
101+
},
102+
}),
103+
};
104+
},
105+
},
75106
asyncIterableListDelayedClose: {
76107
type: new GraphQLList(friendType),
77108
async *resolve() {
@@ -649,4 +680,114 @@ describe('Execute: stream directive', () => {
649680
},
650681
]);
651682
});
683+
it('Returns underlying async iterables when dispatcher is returned', async () => {
684+
const document = parse(`
685+
query {
686+
asyncIterableListDelayed @stream(initialCount: 1) {
687+
name
688+
id
689+
}
690+
}
691+
`);
692+
const schema = new GraphQLSchema({ query });
693+
694+
const executeResult = await execute({ schema, document, rootValue: {} });
695+
invariant(isAsyncIterable(executeResult));
696+
697+
const result1 = await executeResult.next();
698+
expect(result1).to.deep.equal({
699+
done: false,
700+
value: {
701+
data: {
702+
asyncIterableListDelayed: [
703+
{
704+
id: '1',
705+
name: 'Luke',
706+
},
707+
],
708+
},
709+
hasNext: true,
710+
},
711+
});
712+
713+
executeResult.return();
714+
715+
// this result had started processing before return was called
716+
const result2 = await executeResult.next();
717+
expect(result2).to.deep.equal({
718+
done: false,
719+
value: {
720+
data: {
721+
id: '2',
722+
name: 'Han',
723+
},
724+
hasNext: true,
725+
path: ['asyncIterableListDelayed', 1],
726+
},
727+
});
728+
729+
// third result is not returned because async iterator has returned
730+
const result3 = await executeResult.next();
731+
expect(result3).to.deep.equal({
732+
done: false,
733+
value: {
734+
hasNext: false,
735+
},
736+
});
737+
});
738+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
739+
const document = parse(`
740+
query {
741+
asyncIterableListNoReturn @stream(initialCount: 1) {
742+
name
743+
id
744+
}
745+
}
746+
`);
747+
const schema = new GraphQLSchema({ query });
748+
749+
const executeResult = await execute({ schema, document, rootValue: {} });
750+
invariant(isAsyncIterable(executeResult));
751+
752+
const result1 = await executeResult.next();
753+
expect(result1).to.deep.equal({
754+
done: false,
755+
value: {
756+
data: {
757+
asyncIterableListNoReturn: [
758+
{
759+
id: '1',
760+
name: 'Luke',
761+
},
762+
],
763+
},
764+
hasNext: true,
765+
},
766+
});
767+
768+
executeResult.return();
769+
770+
// this result had started processing before return was called
771+
const result2 = await executeResult.next();
772+
expect(result2).to.deep.equal({
773+
done: false,
774+
value: {
775+
data: {
776+
id: '2',
777+
name: 'Han',
778+
},
779+
hasNext: true,
780+
path: ['asyncIterableListNoReturn', 1],
781+
},
782+
});
783+
784+
// third result is not returned because async iterator has returned
785+
const result3 = await executeResult.next();
786+
expect(result3).to.deep.equal({
787+
done: false,
788+
value: {
789+
hasNext: false,
790+
},
791+
});
792+
});
652793
});

src/execution/execute.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1608,11 +1608,15 @@ type DispatcherResult = {|
16081608
*/
16091609
export class Dispatcher {
16101610
_subsequentPayloads: Array<Promise<IteratorResult<DispatcherResult, void>>>;
1611+
_iterators: Array<AsyncIterator<mixed>>;
1612+
_isDone: boolean;
16111613
_initialResult: ?ExecutionResult;
16121614
_hasReturnedInitialResult: boolean;
16131615

16141616
constructor() {
16151617
this._subsequentPayloads = [];
1618+
this._iterators = [];
1619+
this._isDone = false;
16161620
this._hasReturnedInitialResult = false;
16171621
}
16181622

@@ -1681,13 +1685,16 @@ export class Dispatcher {
16811685
itemType: GraphQLOutputType,
16821686
): void {
16831687
const subsequentPayloads = this._subsequentPayloads;
1688+
const iterators = this._iterators;
1689+
iterators.push(iterator);
16841690
function next(index) {
16851691
const fieldPath = addPath(path, index);
16861692
const patchErrors = [];
16871693
subsequentPayloads.push(
16881694
iterator.next().then(
16891695
({ value: data, done }) => {
16901696
if (done) {
1697+
iterators.splice(iterators.indexOf(iterator), 1);
16911698
return { value: undefined, done: true };
16921699
}
16931700

@@ -1758,6 +1765,14 @@ export class Dispatcher {
17581765
}
17591766

17601767
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
1768+
if (this._isDone) {
1769+
return Promise.resolve({
1770+
value: {
1771+
hasNext: false,
1772+
},
1773+
done: false,
1774+
});
1775+
}
17611776
return new Promise((resolve) => {
17621777
this._subsequentPayloads.forEach((promise) => {
17631778
promise.then(() => {
@@ -1814,13 +1829,24 @@ export class Dispatcher {
18141829
return this._race();
18151830
}
18161831

1832+
_return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1833+
return Promise.all(
1834+
// $FlowFixMe[prop-missing]
1835+
this._iterators.map((iterator) => iterator.return?.()),
1836+
).then(() => {
1837+
this._isDone = true;
1838+
return { value: undefined, done: true };
1839+
});
1840+
}
1841+
18171842
get(initialResult: ExecutionResult): AsyncIterable<AsyncExecutionResult> {
18181843
this._initialResult = initialResult;
18191844
return ({
18201845
[Symbol.asyncIterator]() {
18211846
return this;
18221847
},
18231848
next: () => this._next(),
1849+
return: () => this._return(),
18241850
}: any);
18251851
}
18261852
}

0 commit comments

Comments
 (0)