1
1
import { inspect } from '../jsutils/inspect' ;
2
2
import { isAsyncIterable } from '../jsutils/isAsyncIterable' ;
3
+ import { isPromise } from '../jsutils/isPromise' ;
3
4
import type { Maybe } from '../jsutils/Maybe' ;
4
5
import { addPath , pathToArray } from '../jsutils/Path' ;
6
+ import type { PromiseOrValue } from '../jsutils/PromiseOrValue' ;
5
7
6
8
import { GraphQLError } from '../error/GraphQLError' ;
7
9
import { locatedError } from '../error/locatedError' ;
8
10
9
- import type { DocumentNode } from '../language/ast' ;
11
+ import type { DocumentNode , FieldNode } from '../language/ast' ;
10
12
11
13
import type { GraphQLFieldResolver } from '../type/definition' ;
12
14
import type { GraphQLSchema } from '../type/schema' ;
@@ -47,9 +49,11 @@ import { getArgumentValues } from './values';
47
49
*
48
50
* Accepts either an object with named arguments, or individual arguments.
49
51
*/
50
- export async function subscribe (
52
+ export function subscribe (
51
53
args : ExecutionArgs ,
52
- ) : Promise < AsyncGenerator < ExecutionResult , void , void > | ExecutionResult > {
54
+ ) : PromiseOrValue <
55
+ AsyncGenerator < ExecutionResult , void , void > | ExecutionResult
56
+ > {
53
57
const {
54
58
schema,
55
59
document,
@@ -61,7 +65,7 @@ export async function subscribe(
61
65
subscribeFieldResolver,
62
66
} = args ;
63
67
64
- const resultOrStream = await createSourceEventStream (
68
+ const resultOrStream = createSourceEventStream (
65
69
schema ,
66
70
document ,
67
71
rootValue ,
@@ -71,6 +75,42 @@ export async function subscribe(
71
75
subscribeFieldResolver ,
72
76
) ;
73
77
78
+ if ( isPromise ( resultOrStream ) ) {
79
+ return resultOrStream . then ( ( resolvedResultOrStream ) =>
80
+ mapSourceToResponse (
81
+ schema ,
82
+ document ,
83
+ resolvedResultOrStream ,
84
+ contextValue ,
85
+ variableValues ,
86
+ operationName ,
87
+ fieldResolver ,
88
+ ) ,
89
+ ) ;
90
+ }
91
+
92
+ return mapSourceToResponse (
93
+ schema ,
94
+ document ,
95
+ resultOrStream ,
96
+ contextValue ,
97
+ variableValues ,
98
+ operationName ,
99
+ fieldResolver ,
100
+ ) ;
101
+ }
102
+
103
+ function mapSourceToResponse (
104
+ schema : GraphQLSchema ,
105
+ document : DocumentNode ,
106
+ resultOrStream : ExecutionResult | AsyncIterable < unknown > ,
107
+ contextValue ?: unknown ,
108
+ variableValues ?: Maybe < { readonly [ variable : string ] : unknown } > ,
109
+ operationName ?: Maybe < string > ,
110
+ fieldResolver ?: Maybe < GraphQLFieldResolver < any , any > > ,
111
+ ) : PromiseOrValue <
112
+ AsyncGenerator < ExecutionResult , void , void > | ExecutionResult
113
+ > {
74
114
if ( ! isAsyncIterable ( resultOrStream ) ) {
75
115
return resultOrStream ;
76
116
}
@@ -81,7 +121,7 @@ export async function subscribe(
81
121
// the GraphQL specification. The `execute` function provides the
82
122
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
83
123
// "ExecuteQuery" algorithm, for which `execute` is also used.
84
- const mapSourceToResponse = ( payload : unknown ) =>
124
+ return mapAsyncIterator ( resultOrStream , ( payload : unknown ) =>
85
125
execute ( {
86
126
schema,
87
127
document,
@@ -90,10 +130,8 @@ export async function subscribe(
90
130
variableValues,
91
131
operationName,
92
132
fieldResolver,
93
- } ) ;
94
-
95
- // Map every source value to a ExecutionResult value as described above.
96
- return mapAsyncIterator ( resultOrStream , mapSourceToResponse ) ;
133
+ } ) ,
134
+ ) ;
97
135
}
98
136
99
137
/**
@@ -124,15 +162,15 @@ export async function subscribe(
124
162
* or otherwise separating these two steps. For more on this, see the
125
163
* "Supporting Subscriptions at Scale" information in the GraphQL specification.
126
164
*/
127
- export async function createSourceEventStream (
165
+ export function createSourceEventStream (
128
166
schema : GraphQLSchema ,
129
167
document : DocumentNode ,
130
168
rootValue ?: unknown ,
131
169
contextValue ?: unknown ,
132
170
variableValues ?: Maybe < { readonly [ variable : string ] : unknown } > ,
133
171
operationName ?: Maybe < string > ,
134
172
subscribeFieldResolver ?: Maybe < GraphQLFieldResolver < any , any > > ,
135
- ) : Promise < AsyncIterable < unknown > | ExecutionResult > {
173
+ ) : PromiseOrValue < AsyncIterable < unknown > | ExecutionResult > {
136
174
// If arguments are missing or incorrectly typed, this is an internal
137
175
// developer mistake which should throw an early error.
138
176
assertValidExecutionArguments ( schema , document , variableValues ) ;
@@ -155,30 +193,43 @@ export async function createSourceEventStream(
155
193
}
156
194
157
195
try {
158
- const eventStream = await executeSubscription ( exeContext ) ;
196
+ const eventStream = executeSubscription ( exeContext ) ;
159
197
160
- // Assert field returned an event stream, otherwise yield an error.
161
- if ( ! isAsyncIterable ( eventStream ) ) {
162
- throw new Error (
163
- 'Subscription field must return Async Iterable. ' +
164
- `Received: ${ inspect ( eventStream ) } .` ,
198
+ if ( isPromise ( eventStream ) ) {
199
+ return eventStream . then (
200
+ ( resolvedEventStream ) => ensureAsyncIterable ( resolvedEventStream ) ,
201
+ ( error ) => handleRawError ( error ) ,
165
202
) ;
166
203
}
167
204
168
- return eventStream ;
205
+ return ensureAsyncIterable ( eventStream ) ;
169
206
} catch ( error ) {
170
- // If it GraphQLError, report it as an ExecutionResult, containing only errors and no data.
171
- // Otherwise treat the error as a system-class error and re-throw it.
172
- if ( error instanceof GraphQLError ) {
173
- return { errors : [ error ] } ;
174
- }
175
- throw error ;
207
+ return handleRawError ( error ) ;
176
208
}
177
209
}
178
210
179
- async function executeSubscription (
180
- exeContext : ExecutionContext ,
181
- ) : Promise < unknown > {
211
+ function ensureAsyncIterable ( eventStream : unknown ) : AsyncIterable < unknown > {
212
+ // Assert field returned an event stream, otherwise yield an error.
213
+ if ( ! isAsyncIterable ( eventStream ) ) {
214
+ throw new Error (
215
+ 'Subscription field must return Async Iterable. ' +
216
+ `Received: ${ inspect ( eventStream ) } .` ,
217
+ ) ;
218
+ }
219
+
220
+ return eventStream ;
221
+ }
222
+
223
+ function handleRawError ( error : unknown ) : ExecutionResult {
224
+ // If it GraphQLError, report it as an ExecutionResult, containing only errors and no data.
225
+ // Otherwise treat the error as a system-class error and re-throw it.
226
+ if ( error instanceof GraphQLError ) {
227
+ return { errors : [ error ] } ;
228
+ }
229
+ throw error ;
230
+ }
231
+
232
+ function executeSubscription ( exeContext : ExecutionContext ) : unknown {
182
233
const { schema, fragments, operation, variableValues, rootValue } =
183
234
exeContext ;
184
235
@@ -233,13 +284,36 @@ async function executeSubscription(
233
284
// Call the `subscribe()` resolver or the default resolver to produce an
234
285
// AsyncIterable yielding raw payloads.
235
286
const resolveFn = fieldDef . subscribe ?? exeContext . subscribeFieldResolver ;
236
- const eventStream = await resolveFn ( rootValue , args , contextValue , info ) ;
237
287
238
- if ( eventStream instanceof Error ) {
239
- throw eventStream ;
288
+ const eventStream = resolveFn ( rootValue , args , contextValue , info ) ;
289
+
290
+ if ( isPromise ( eventStream ) ) {
291
+ return eventStream . then (
292
+ ( resolvedEventStream ) =>
293
+ throwReturnedError (
294
+ resolvedEventStream ,
295
+ fieldNodes ,
296
+ pathToArray ( path ) ,
297
+ ) ,
298
+ ( error ) => {
299
+ throw locatedError ( error , fieldNodes , pathToArray ( path ) ) ;
300
+ } ,
301
+ ) ;
240
302
}
241
- return eventStream ;
303
+
304
+ return throwReturnedError ( eventStream , fieldNodes , pathToArray ( path ) ) ;
242
305
} catch ( error ) {
243
306
throw locatedError ( error , fieldNodes , pathToArray ( path ) ) ;
244
307
}
245
308
}
309
+
310
+ function throwReturnedError (
311
+ eventStream : unknown ,
312
+ fieldNodes : ReadonlyArray < FieldNode > ,
313
+ path : ReadonlyArray < string | number > ,
314
+ ) : unknown {
315
+ if ( eventStream instanceof Error ) {
316
+ throw locatedError ( eventStream , fieldNodes , path ) ;
317
+ }
318
+ return eventStream ;
319
+ }
0 commit comments