Skip to content

Commit 1ede415

Browse files
committed
fix(race): concurrent next calls with defer/stream (#2975)
* fix(race): concurrent next calls * refactor test * use invariant * disable eslint error * fix
1 parent 4456896 commit 1ede415

File tree

2 files changed

+79
-0
lines changed

2 files changed

+79
-0
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,22 @@ async function complete(document: DocumentNode, rootValue: unknown = {}) {
194194
return result;
195195
}
196196

197+
async function completeAsync(document: DocumentNode, numCalls: number) {
198+
const schema = new GraphQLSchema({ query });
199+
200+
const result = await execute({ schema, document, rootValue: {} });
201+
202+
invariant(isAsyncIterable(result));
203+
204+
const iterator = result[Symbol.asyncIterator]();
205+
206+
const promises = [];
207+
for (let i = 0; i < numCalls; i++) {
208+
promises.push(iterator.next());
209+
}
210+
return Promise.all(promises);
211+
}
212+
197213
describe('Execute: stream directive', () => {
198214
it('Can stream a list field', async () => {
199215
const document = parse('{ scalarList @stream(initialCount: 1) }');
@@ -614,6 +630,58 @@ describe('Execute: stream directive', () => {
614630
},
615631
});
616632
});
633+
it('Can handle concurrent calls to .next() without waiting', async () => {
634+
const document = parse(`
635+
query {
636+
asyncIterableList @stream(initialCount: 2) {
637+
name
638+
id
639+
}
640+
}
641+
`);
642+
const result = await completeAsync(document, 4);
643+
expectJSON(result).toDeepEqual([
644+
{
645+
done: false,
646+
value: {
647+
data: {
648+
asyncIterableList: [
649+
{
650+
name: 'Luke',
651+
id: '1',
652+
},
653+
{
654+
name: 'Han',
655+
id: '2',
656+
},
657+
],
658+
},
659+
hasNext: true,
660+
},
661+
},
662+
{
663+
done: false,
664+
value: {
665+
data: {
666+
name: 'Leia',
667+
id: '3',
668+
},
669+
path: ['asyncIterableList', 2],
670+
hasNext: true,
671+
},
672+
},
673+
{
674+
done: false,
675+
value: {
676+
hasNext: false,
677+
},
678+
},
679+
{
680+
done: true,
681+
value: undefined,
682+
},
683+
]);
684+
});
617685
it('Handles error thrown in async iterable before initialCount is reached', async () => {
618686
const document = parse(`
619687
query {

src/execution/execute.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1594,7 +1594,18 @@ function yieldSubsequentPayloads(
15941594

15951595
const data = await asyncPayloadRecord.dataPromise;
15961596

1597+
if (exeContext.subsequentPayloads.length === 0) {
1598+
// a different call to next has exhausted all payloads
1599+
return { value: undefined, done: true };
1600+
}
1601+
15971602
const index = exeContext.subsequentPayloads.indexOf(asyncPayloadRecord);
1603+
1604+
if (index === -1) {
1605+
// a different call to next has consumed this payload
1606+
return race();
1607+
}
1608+
15981609
exeContext.subsequentPayloads.splice(index, 1);
15991610

16001611
if (asyncPayloadRecord.isCompletedIterator) {

0 commit comments

Comments
 (0)