Skip to content

Commit 76024dd

Browse files
committed
fix(race): concurrent next calls with defer/stream (#2975)
* fix(race): concurrent next calls * refactor test * use invariant * disable eslint error * fix
1 parent 0d1a37f commit 76024dd

File tree

2 files changed

+80
-0
lines changed

2 files changed

+80
-0
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,22 @@ async function complete(document: DocumentNode, rootValue: unknown = {}) {
186186
return result;
187187
}
188188

189+
async function completeAsync(document: DocumentNode, numCalls: number) {
190+
const schema = new GraphQLSchema({ query });
191+
192+
const result = await execute({ schema, document, rootValue: {} });
193+
194+
invariant(isAsyncIterable(result));
195+
196+
const iterator = result[Symbol.asyncIterator]();
197+
198+
const promises = [];
199+
for (let i = 0; i < numCalls; i++) {
200+
promises.push(iterator.next());
201+
}
202+
return Promise.all(promises);
203+
}
204+
189205
describe('Execute: stream directive', () => {
190206
it('Can stream a list field', async () => {
191207
const document = parse('{ scalarList @stream(initialCount: 1) }');
@@ -582,6 +598,58 @@ describe('Execute: stream directive', () => {
582598
},
583599
});
584600
});
601+
it('Can handle concurrent calls to .next() without waiting', async () => {
602+
const document = parse(`
603+
query {
604+
asyncIterableList @stream(initialCount: 2) {
605+
name
606+
id
607+
}
608+
}
609+
`);
610+
const result = await completeAsync(document, 4);
611+
expectJSON(result).toDeepEqual([
612+
{
613+
done: false,
614+
value: {
615+
data: {
616+
asyncIterableList: [
617+
{
618+
name: 'Luke',
619+
id: '1',
620+
},
621+
{
622+
name: 'Han',
623+
id: '2',
624+
},
625+
],
626+
},
627+
hasNext: true,
628+
},
629+
},
630+
{
631+
done: false,
632+
value: {
633+
data: {
634+
name: 'Leia',
635+
id: '3',
636+
},
637+
path: ['asyncIterableList', 2],
638+
hasNext: true,
639+
},
640+
},
641+
{
642+
done: false,
643+
value: {
644+
hasNext: false,
645+
},
646+
},
647+
{
648+
done: true,
649+
value: undefined,
650+
},
651+
]);
652+
});
585653
it('Handles error thrown in async iterable before initialCount is reached', async () => {
586654
const document = parse(`
587655
query {

src/execution/execute.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1585,8 +1585,20 @@ function yieldSubsequentPayloads(
15851585

15861586
resolved = true;
15871587

1588+
if (exeContext.subsequentPayloads.length === 0) {
1589+
// a different call to next has exhausted all payloads
1590+
resolve({ value: undefined, done: true });
1591+
return;
1592+
}
15881593
const index =
15891594
exeContext.subsequentPayloads.indexOf(asyncPayloadRecord);
1595+
1596+
if (index === -1) {
1597+
// a different call to next has consumed this payload
1598+
resolve(race());
1599+
return;
1600+
}
1601+
15901602
exeContext.subsequentPayloads.splice(index, 1);
15911603

15921604
if (asyncPayloadRecord.isCompletedIterator) {

0 commit comments

Comments
 (0)