Skip to content

Commit a31c181

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
# Conflicts: # src/execution/execute.ts
1 parent c6b4ba4 commit a31c181

File tree

2 files changed

+225
-6
lines changed

2 files changed

+225
-6
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { describe, it } from 'mocha';
22

33
import { expectJSON } from '../../__testUtils__/expectJSON';
44

5+
import { invariant } from '../../jsutils/invariant';
56
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
67

78
import type { DocumentNode } from '../../language/ast';
@@ -111,6 +112,37 @@ const query = new GraphQLObjectType({
111112
yield await Promise.resolve({});
112113
},
113114
},
115+
asyncIterableListDelayed: {
116+
type: new GraphQLList(friendType),
117+
async *resolve() {
118+
for (const friend of friends) {
119+
// pause an additional ms before yielding to allow time
120+
// for tests to return or throw before next value is processed.
121+
// eslint-disable-next-line no-await-in-loop
122+
await new Promise((r) => setTimeout(r, 1));
123+
yield friend; /* c8 ignore start */
124+
// Not reachable, early return
125+
}
126+
} /* c8 ignore stop */,
127+
},
128+
asyncIterableListNoReturn: {
129+
type: new GraphQLList(friendType),
130+
resolve() {
131+
let i = 0;
132+
return {
133+
[Symbol.asyncIterator]: () => ({
134+
async next() {
135+
const friend = friends[i++];
136+
if (friend) {
137+
await new Promise((r) => setTimeout(r, 1));
138+
return { value: friend, done: false };
139+
}
140+
return { value: undefined, done: true };
141+
},
142+
}),
143+
};
144+
},
145+
},
114146
asyncIterableListDelayedClose: {
115147
type: new GraphQLList(friendType),
116148
async *resolve() {
@@ -1011,4 +1043,175 @@ describe('Execute: stream directive', () => {
10111043
},
10121044
]);
10131045
});
1046+
it('Returns underlying async iterables when dispatcher is returned', async () => {
1047+
const document = parse(`
1048+
query {
1049+
asyncIterableListDelayed @stream(initialCount: 1) {
1050+
name
1051+
id
1052+
}
1053+
}
1054+
`);
1055+
const schema = new GraphQLSchema({ query });
1056+
1057+
const executeResult = await execute({ schema, document, rootValue: {} });
1058+
invariant(isAsyncIterable(executeResult));
1059+
const iterator = executeResult[Symbol.asyncIterator]();
1060+
1061+
const result1 = await iterator.next();
1062+
expectJSON(result1).toDeepEqual({
1063+
done: false,
1064+
value: {
1065+
data: {
1066+
asyncIterableListDelayed: [
1067+
{
1068+
id: '1',
1069+
name: 'Luke',
1070+
},
1071+
],
1072+
},
1073+
hasNext: true,
1074+
},
1075+
});
1076+
1077+
const returnPromise = iterator.return();
1078+
1079+
// this result had started processing before return was called
1080+
const result2 = await iterator.next();
1081+
expectJSON(result2).toDeepEqual({
1082+
done: false,
1083+
value: {
1084+
data: {
1085+
id: '2',
1086+
name: 'Han',
1087+
},
1088+
hasNext: true,
1089+
path: ['asyncIterableListDelayed', 1],
1090+
},
1091+
});
1092+
1093+
// third result is not returned because async iterator has returned
1094+
const result3 = await iterator.next();
1095+
expectJSON(result3).toDeepEqual({
1096+
done: true,
1097+
value: undefined,
1098+
});
1099+
await returnPromise;
1100+
});
1101+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
1102+
const document = parse(`
1103+
query {
1104+
asyncIterableListNoReturn @stream(initialCount: 1) {
1105+
name
1106+
id
1107+
}
1108+
}
1109+
`);
1110+
const schema = new GraphQLSchema({ query });
1111+
1112+
const executeResult = await execute({ schema, document, rootValue: {} });
1113+
invariant(isAsyncIterable(executeResult));
1114+
const iterator = executeResult[Symbol.asyncIterator]();
1115+
1116+
const result1 = await iterator.next();
1117+
expectJSON(result1).toDeepEqual({
1118+
done: false,
1119+
value: {
1120+
data: {
1121+
asyncIterableListNoReturn: [
1122+
{
1123+
id: '1',
1124+
name: 'Luke',
1125+
},
1126+
],
1127+
},
1128+
hasNext: true,
1129+
},
1130+
});
1131+
1132+
const returnPromise = iterator.return();
1133+
1134+
// this result had started processing before return was called
1135+
const result2 = await iterator.next();
1136+
expectJSON(result2).toDeepEqual({
1137+
done: false,
1138+
value: {
1139+
data: {
1140+
id: '2',
1141+
name: 'Han',
1142+
},
1143+
hasNext: true,
1144+
path: ['asyncIterableListNoReturn', 1],
1145+
},
1146+
});
1147+
1148+
// third result is not returned because async iterator has returned
1149+
const result3 = await iterator.next();
1150+
expectJSON(result3).toDeepEqual({
1151+
done: true,
1152+
value: undefined,
1153+
});
1154+
await returnPromise;
1155+
});
1156+
it('Returns underlying async iterables when dispatcher is thrown', async () => {
1157+
const document = parse(`
1158+
query {
1159+
asyncIterableListDelayed @stream(initialCount: 1) {
1160+
name
1161+
id
1162+
}
1163+
}
1164+
`);
1165+
const schema = new GraphQLSchema({ query });
1166+
1167+
const executeResult = await execute({ schema, document, rootValue: {} });
1168+
invariant(isAsyncIterable(executeResult));
1169+
const iterator = executeResult[Symbol.asyncIterator]();
1170+
1171+
const result1 = await iterator.next();
1172+
expectJSON(result1).toDeepEqual({
1173+
done: false,
1174+
value: {
1175+
data: {
1176+
asyncIterableListDelayed: [
1177+
{
1178+
id: '1',
1179+
name: 'Luke',
1180+
},
1181+
],
1182+
},
1183+
hasNext: true,
1184+
},
1185+
});
1186+
1187+
const throwPromise = iterator.throw(new Error('bad'));
1188+
1189+
// this result had started processing before return was called
1190+
const result2 = await iterator.next();
1191+
expectJSON(result2).toDeepEqual({
1192+
done: false,
1193+
value: {
1194+
data: {
1195+
id: '2',
1196+
name: 'Han',
1197+
},
1198+
hasNext: true,
1199+
path: ['asyncIterableListDelayed', 1],
1200+
},
1201+
});
1202+
1203+
// third result is not returned because async iterator has returned
1204+
const result3 = await iterator.next();
1205+
expectJSON(result3).toDeepEqual({
1206+
done: true,
1207+
value: undefined,
1208+
});
1209+
try {
1210+
await throwPromise; /* c8 ignore start */
1211+
// Not reachable, always throws
1212+
/* c8 ignore stop */
1213+
} catch (e) {
1214+
// ignore error
1215+
}
1216+
});
10141217
});

src/execution/execute.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1511,6 +1511,7 @@ function executeStreamIterator(
15111511
label,
15121512
path: fieldPath,
15131513
parentContext,
1514+
iterator,
15141515
});
15151516
const dataPromise: Promise<unknown> = iterator
15161517
.next()
@@ -1564,6 +1565,7 @@ function yieldSubsequentPayloads(
15641565
initialResult: ExecutionResult,
15651566
): AsyncGenerator<AsyncExecutionResult, void, void> {
15661567
let _hasReturnedInitialResult = false;
1568+
let isDone = false;
15671569

15681570
async function race(): Promise<IteratorResult<AsyncExecutionResult>> {
15691571
if (exeContext.subsequentPayloads.length === 0) {
@@ -1634,17 +1636,31 @@ function yieldSubsequentPayloads(
16341636
},
16351637
done: false,
16361638
});
1637-
} else if (exeContext.subsequentPayloads.length === 0) {
1639+
} else if (exeContext.subsequentPayloads.length === 0 || isDone) {
16381640
return Promise.resolve({ value: undefined, done: true });
16391641
}
16401642
return race();
16411643
},
1642-
// TODO: implement return & throw
1643-
return: /* istanbul ignore next: will be covered in follow up */ () =>
1644-
Promise.resolve({ value: undefined, done: true }),
1645-
throw: /* istanbul ignore next: will be covered in follow up */ (
1644+
async return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1645+
await Promise.all(
1646+
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
1647+
asyncPayloadRecord.iterator?.return?.(),
1648+
),
1649+
);
1650+
isDone = true;
1651+
return { value: undefined, done: true };
1652+
},
1653+
async throw(
16461654
error?: unknown,
1647-
) => Promise.reject(error),
1655+
): Promise<IteratorResult<AsyncExecutionResult, void>> {
1656+
await Promise.all(
1657+
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
1658+
asyncPayloadRecord.iterator?.return?.(),
1659+
),
1660+
);
1661+
isDone = true;
1662+
return Promise.reject(error);
1663+
},
16481664
};
16491665
}
16501666

0 commit comments

Comments
 (0)