1
1
import { devAssert } from '../jsutils/devAssert' ;
2
2
import { inspect } from '../jsutils/inspect' ;
3
3
import { invariant } from '../jsutils/invariant' ;
4
+ import { isAsyncIterable } from '../jsutils/isAsyncIterable' ;
4
5
import { isIterableObject } from '../jsutils/isIterableObject' ;
5
6
import { isObjectLike } from '../jsutils/isObjectLike' ;
6
7
import { isPromise } from '../jsutils/isPromise' ;
@@ -51,6 +52,7 @@ import {
51
52
collectFields ,
52
53
collectSubfields as _collectSubfields ,
53
54
} from './collectFields' ;
55
+ import { mapAsyncIterator } from './mapAsyncIterator' ;
54
56
import { getArgumentValues , getVariableValues } from './values' ;
55
57
56
58
/**
@@ -234,10 +236,8 @@ function buildResponse(
234
236
/**
235
237
* Essential assertions before executing to provide developer feedback for
236
238
* improper use of the GraphQL library.
237
- *
238
- * @internal
239
239
*/
240
- export function assertValidExecutionArguments (
240
+ function assertValidExecutionArguments (
241
241
schema : GraphQLSchema ,
242
242
document : DocumentNode ,
243
243
rawVariableValues : Maybe < { readonly [ variable : string ] : unknown } > ,
@@ -259,10 +259,8 @@ export function assertValidExecutionArguments(
259
259
* execute, which we will pass throughout the other execution methods.
260
260
*
261
261
* Throws a GraphQLError if a valid execution context cannot be created.
262
- *
263
- * @internal
264
262
*/
265
- export function buildExecutionContext (
263
+ function buildExecutionContext (
266
264
args : ExecutionArgs ,
267
265
) : ReadonlyArray < GraphQLError > | ExecutionContext {
268
266
const {
@@ -542,10 +540,7 @@ function executeField(
542
540
}
543
541
}
544
542
545
- /**
546
- * @internal
547
- */
548
- export function buildResolveInfo (
543
+ function buildResolveInfo (
549
544
exeContext : ExecutionContext ,
550
545
fieldDef : GraphQLField < unknown , unknown > ,
551
546
fieldNodes : ReadonlyArray < FieldNode > ,
@@ -1009,3 +1004,225 @@ export const defaultFieldResolver: GraphQLFieldResolver<unknown, unknown> =
1009
1004
return property ;
1010
1005
}
1011
1006
} ;
1007
+
1008
+ /**
1009
+ * Implements the "Subscribe" algorithm described in the GraphQL specification.
1010
+ *
1011
+ * Returns a Promise which resolves to either an AsyncIterator (if successful)
1012
+ * or an ExecutionResult (error). The promise will be rejected if the schema or
1013
+ * other arguments to this function are invalid, or if the resolved event stream
1014
+ * is not an async iterable.
1015
+ *
1016
+ * If the client-provided arguments to this function do not result in a
1017
+ * compliant subscription, a GraphQL Response (ExecutionResult) with
1018
+ * descriptive errors and no data will be returned.
1019
+ *
1020
+ * If the source stream could not be created due to faulty subscription
1021
+ * resolver logic or underlying systems, the promise will resolve to a single
1022
+ * ExecutionResult containing `errors` and no `data`.
1023
+ *
1024
+ * If the operation succeeded, the promise resolves to an AsyncIterator, which
1025
+ * yields a stream of ExecutionResults representing the response stream.
1026
+ *
1027
+ * Accepts either an object with named arguments, or individual arguments.
1028
+ */
1029
+ export function subscribe (
1030
+ args : ExecutionArgs ,
1031
+ ) : PromiseOrValue <
1032
+ AsyncGenerator < ExecutionResult , void , void > | ExecutionResult
1033
+ > {
1034
+ const resultOrStream = createSourceEventStream ( args ) ;
1035
+
1036
+ if ( isPromise ( resultOrStream ) ) {
1037
+ return resultOrStream . then ( ( resolvedResultOrStream ) =>
1038
+ mapSourceToResponse ( resolvedResultOrStream , args ) ,
1039
+ ) ;
1040
+ }
1041
+
1042
+ return mapSourceToResponse ( resultOrStream , args ) ;
1043
+ }
1044
+
1045
+ function mapSourceToResponse (
1046
+ resultOrStream : ExecutionResult | AsyncIterable < unknown > ,
1047
+ args : ExecutionArgs ,
1048
+ ) : PromiseOrValue <
1049
+ AsyncGenerator < ExecutionResult , void , void > | ExecutionResult
1050
+ > {
1051
+ if ( ! isAsyncIterable ( resultOrStream ) ) {
1052
+ return resultOrStream ;
1053
+ }
1054
+
1055
+ // For each payload yielded from a subscription, map it over the normal
1056
+ // GraphQL `execute` function, with `payload` as the rootValue.
1057
+ // This implements the "MapSourceToResponseEvent" algorithm described in
1058
+ // the GraphQL specification. The `execute` function provides the
1059
+ // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
1060
+ // "ExecuteQuery" algorithm, for which `execute` is also used.
1061
+ return mapAsyncIterator ( resultOrStream , ( payload : unknown ) =>
1062
+ execute ( {
1063
+ ...args ,
1064
+ rootValue : payload ,
1065
+ } ) ,
1066
+ ) ;
1067
+ }
1068
+
1069
+ /**
1070
+ * Implements the "CreateSourceEventStream" algorithm described in the
1071
+ * GraphQL specification, resolving the subscription source event stream.
1072
+ *
1073
+ * Returns a Promise which resolves to either an AsyncIterable (if successful)
1074
+ * or an ExecutionResult (error). The promise will be rejected if the schema or
1075
+ * other arguments to this function are invalid, or if the resolved event stream
1076
+ * is not an async iterable.
1077
+ *
1078
+ * If the client-provided arguments to this function do not result in a
1079
+ * compliant subscription, a GraphQL Response (ExecutionResult) with
1080
+ * descriptive errors and no data will be returned.
1081
+ *
1082
+ * If the the source stream could not be created due to faulty subscription
1083
+ * resolver logic or underlying systems, the promise will resolve to a single
1084
+ * ExecutionResult containing `errors` and no `data`.
1085
+ *
1086
+ * If the operation succeeded, the promise resolves to the AsyncIterable for the
1087
+ * event stream returned by the resolver.
1088
+ *
1089
+ * A Source Event Stream represents a sequence of events, each of which triggers
1090
+ * a GraphQL execution for that event.
1091
+ *
1092
+ * This may be useful when hosting the stateful subscription service in a
1093
+ * different process or machine than the stateless GraphQL execution engine,
1094
+ * or otherwise separating these two steps. For more on this, see the
1095
+ * "Supporting Subscriptions at Scale" information in the GraphQL specification.
1096
+ */
1097
+ export function createSourceEventStream (
1098
+ args : ExecutionArgs ,
1099
+ ) : PromiseOrValue < AsyncIterable < unknown > | ExecutionResult > {
1100
+ const {
1101
+ schema,
1102
+ document,
1103
+ rootValue,
1104
+ contextValue,
1105
+ variableValues,
1106
+ operationName,
1107
+ subscribeFieldResolver,
1108
+ } = args ;
1109
+
1110
+ // If arguments are missing or incorrectly typed, this is an internal
1111
+ // developer mistake which should throw an early error.
1112
+ assertValidExecutionArguments ( schema , document , variableValues ) ;
1113
+
1114
+ // If a valid execution context cannot be created due to incorrect arguments,
1115
+ // a "Response" with only errors is returned.
1116
+ const exeContext = buildExecutionContext ( {
1117
+ schema,
1118
+ document,
1119
+ rootValue,
1120
+ contextValue,
1121
+ variableValues,
1122
+ operationName,
1123
+ subscribeFieldResolver,
1124
+ } ) ;
1125
+
1126
+ // Return early errors if execution context failed.
1127
+ if ( ! ( 'schema' in exeContext ) ) {
1128
+ return { errors : exeContext } ;
1129
+ }
1130
+
1131
+ try {
1132
+ const eventStream = executeSubscription ( exeContext ) ;
1133
+ if ( isPromise ( eventStream ) ) {
1134
+ return eventStream . then ( undefined , ( error ) => ( { errors : [ error ] } ) ) ;
1135
+ }
1136
+
1137
+ return eventStream ;
1138
+ } catch ( error ) {
1139
+ return { errors : [ error ] } ;
1140
+ }
1141
+ }
1142
+
1143
+ function executeSubscription (
1144
+ exeContext : ExecutionContext ,
1145
+ ) : PromiseOrValue < AsyncIterable < unknown > > {
1146
+ const { schema, fragments, operation, variableValues, rootValue } =
1147
+ exeContext ;
1148
+
1149
+ const rootType = schema . getSubscriptionType ( ) ;
1150
+ if ( rootType == null ) {
1151
+ throw new GraphQLError (
1152
+ 'Schema is not configured to execute subscription operation.' ,
1153
+ { nodes : operation } ,
1154
+ ) ;
1155
+ }
1156
+
1157
+ const rootFields = collectFields (
1158
+ schema ,
1159
+ fragments ,
1160
+ variableValues ,
1161
+ rootType ,
1162
+ operation . selectionSet ,
1163
+ ) ;
1164
+ const [ responseName , fieldNodes ] = [ ...rootFields . entries ( ) ] [ 0 ] ;
1165
+ const fieldName = fieldNodes [ 0 ] . name . value ;
1166
+ const fieldDef = schema . getField ( rootType , fieldName ) ;
1167
+
1168
+ if ( ! fieldDef ) {
1169
+ throw new GraphQLError (
1170
+ `The subscription field "${ fieldName } " is not defined.` ,
1171
+ { nodes : fieldNodes } ,
1172
+ ) ;
1173
+ }
1174
+
1175
+ const path = addPath ( undefined , responseName , rootType . name ) ;
1176
+ const info = buildResolveInfo (
1177
+ exeContext ,
1178
+ fieldDef ,
1179
+ fieldNodes ,
1180
+ rootType ,
1181
+ path ,
1182
+ ) ;
1183
+
1184
+ try {
1185
+ // Implements the "ResolveFieldEventStream" algorithm from GraphQL specification.
1186
+ // It differs from "ResolveFieldValue" due to providing a different `resolveFn`.
1187
+
1188
+ // Build a JS object of arguments from the field.arguments AST, using the
1189
+ // variables scope to fulfill any variable references.
1190
+ const args = getArgumentValues ( fieldDef , fieldNodes [ 0 ] , variableValues ) ;
1191
+
1192
+ // The resolve function's optional third argument is a context value that
1193
+ // is provided to every resolve function within an execution. It is commonly
1194
+ // used to represent an authenticated user, or request-specific caches.
1195
+ const contextValue = exeContext . contextValue ;
1196
+
1197
+ // Call the `subscribe()` resolver or the default resolver to produce an
1198
+ // AsyncIterable yielding raw payloads.
1199
+ const resolveFn = fieldDef . subscribe ?? exeContext . subscribeFieldResolver ;
1200
+ const result = resolveFn ( rootValue , args , contextValue , info ) ;
1201
+
1202
+ if ( isPromise ( result ) ) {
1203
+ return result . then ( assertEventStream ) . then ( undefined , ( error ) => {
1204
+ throw locatedError ( error , fieldNodes , pathToArray ( path ) ) ;
1205
+ } ) ;
1206
+ }
1207
+
1208
+ return assertEventStream ( result ) ;
1209
+ } catch ( error ) {
1210
+ throw locatedError ( error , fieldNodes , pathToArray ( path ) ) ;
1211
+ }
1212
+ }
1213
+
1214
+ function assertEventStream ( result : unknown ) : AsyncIterable < unknown > {
1215
+ if ( result instanceof Error ) {
1216
+ throw result ;
1217
+ }
1218
+
1219
+ // Assert field returned an event stream, otherwise yield an error.
1220
+ if ( ! isAsyncIterable ( result ) ) {
1221
+ throw new GraphQLError (
1222
+ 'Subscription field must return Async Iterable. ' +
1223
+ `Received: ${ inspect ( result ) } .` ,
1224
+ ) ;
1225
+ }
1226
+
1227
+ return result ;
1228
+ }
0 commit comments