@@ -13,11 +13,12 @@ public static class GraphQLHttpWebsocketHelpers {
1313 internal static IObservable < GraphQLResponse < TResponse > > CreateSubscriptionStream < TResponse > (
1414 this GraphQLHttpWebSocket graphQlHttpWebSocket ,
1515 GraphQLRequest request ,
16- GraphQLHttpClientOptions options ,
16+ GraphQLHttpClient client ,
1717 Action < Exception > exceptionHandler = null ,
1818 CancellationToken cancellationToken = default ) {
1919 return Observable . Defer ( ( ) =>
2020 Observable . Create < GraphQLResponse < TResponse > > ( async observer => {
21+ await client . Options . PreprocessRequest ( request , client ) ;
2122 var startRequest = new GraphQLWebSocketRequest {
2223 Id = Guid . NewGuid ( ) . ToString ( "N" ) ,
2324 Type = GraphQLWebSocketMessageType . GQL_START ,
@@ -27,34 +28,38 @@ internal static IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream
2728 Id = startRequest . Id ,
2829 Type = GraphQLWebSocketMessageType . GQL_STOP
2930 } ;
31+ var initRequest = new GraphQLWebSocketRequest {
32+ Id = startRequest . Id ,
33+ Type = GraphQLWebSocketMessageType . GQL_CONNECTION_INIT ,
34+ } ;
3035
3136 var observable = Observable . Create < GraphQLResponse < TResponse > > ( o =>
3237 graphQlHttpWebSocket . ResponseStream
3338 // ignore null values and messages for other requests
3439 . Where ( response => response != null && response . Id == startRequest . Id )
3540 . Subscribe ( response => {
36- // terminate the sequence when a 'complete' message is received
37- if ( response . Type == GraphQLWebSocketMessageType . GQL_COMPLETE ) {
38- Debug . WriteLine ( $ "received 'complete' message on subscription { startRequest . Id } ") ;
39- o . OnCompleted ( ) ;
40- return ;
41- }
42-
43- // post the GraphQLResponse to the stream (even if a GraphQL error occurred)
44- Debug . WriteLine ( $ "received payload on subscription { startRequest . Id } ") ;
45- var typedResponse =
46- JsonSerializer . Deserialize < GraphQLWebSocketResponse < TResponse > > ( response . MessageBytes ,
47- options . JsonSerializerOptions ) ;
48- o . OnNext ( typedResponse . Payload ) ;
49-
50- // in case of a GraphQL error, terminate the sequence after the response has been posted
51- if ( response . Type == GraphQLWebSocketMessageType . GQL_ERROR ) {
52- Debug . WriteLine ( $ "terminating subscription { startRequest . Id } because of a GraphQL error") ;
53- o . OnCompleted ( ) ;
54- }
55- } ,
56- o . OnError ,
57- o . OnCompleted )
41+ // terminate the sequence when a 'complete' message is received
42+ if ( response . Type == GraphQLWebSocketMessageType . GQL_COMPLETE ) {
43+ Debug . WriteLine ( $ "received 'complete' message on subscription { startRequest . Id } ") ;
44+ o . OnCompleted ( ) ;
45+ return ;
46+ }
47+
48+ // post the GraphQLResponse to the stream (even if a GraphQL error occurred)
49+ Debug . WriteLine ( $ "received payload on subscription { startRequest . Id } ") ;
50+ var typedResponse =
51+ JsonSerializer . Deserialize < GraphQLWebSocketResponse < TResponse > > ( response . MessageBytes ,
52+ client . Options . JsonSerializerOptions ) ;
53+ o . OnNext ( typedResponse . Payload ) ;
54+
55+ // in case of a GraphQL error, terminate the sequence after the response has been posted
56+ if ( response . Type == GraphQLWebSocketMessageType . GQL_ERROR ) {
57+ Debug . WriteLine ( $ "terminating subscription { startRequest . Id } because of a GraphQL error") ;
58+ o . OnCompleted ( ) ;
59+ }
60+ } ,
61+ o . OnError ,
62+ o . OnCompleted )
5863 ) ;
5964
6065 try {
@@ -81,6 +86,16 @@ internal static IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream
8186 } )
8287 ) ;
8388
89+ // send connection init
90+ Debug . WriteLine ( $ "sending connection init on subscription { startRequest . Id } ") ;
91+ try {
92+ await graphQlHttpWebSocket . SendWebSocketRequest ( initRequest ) . ConfigureAwait ( false ) ;
93+ }
94+ catch ( Exception e ) {
95+ Console . WriteLine ( e ) ;
96+ throw ;
97+ }
98+
8499 Debug . WriteLine ( $ "sending initial message on subscription { startRequest . Id } ") ;
85100 // send subscription request
86101 try {
@@ -137,53 +152,54 @@ internal static IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream
137152 . Publish ( ) . RefCount ( ) ;
138153 }
139154
140- internal static Task < GraphQLResponse < TResponse > > Request < TResponse > (
155+ internal static Task < GraphQLResponse < TResponse > > SendRequest < TResponse > (
141156 this GraphQLHttpWebSocket graphQlHttpWebSocket ,
142157 GraphQLRequest request ,
143- GraphQLHttpClientOptions options ,
158+ GraphQLHttpClient client ,
144159 CancellationToken cancellationToken = default ) {
145160 return Observable . Create < GraphQLResponse < TResponse > > ( async observer => {
146- var websocketRequest = new GraphQLWebSocketRequest {
147- Id = Guid . NewGuid ( ) . ToString ( "N" ) ,
148- Type = GraphQLWebSocketMessageType . GQL_START ,
149- Payload = request
150- } ;
151- var observable = graphQlHttpWebSocket . ResponseStream
152- . Where ( response => response != null && response . Id == websocketRequest . Id )
153- . TakeUntil ( response => response . Type == GraphQLWebSocketMessageType . GQL_COMPLETE )
154- . Select ( response => {
155- Debug . WriteLine ( $ "received response for request { websocketRequest . Id } ") ;
156- var typedResponse =
157- JsonSerializer . Deserialize < GraphQLWebSocketResponse < TResponse > > ( response . MessageBytes ,
158- options . JsonSerializerOptions ) ;
159- return typedResponse . Payload ;
160- } ) ;
161-
162- try {
163- // intialize websocket (completes immediately if socket is already open)
164- await graphQlHttpWebSocket . InitializeWebSocket ( ) . ConfigureAwait ( false ) ;
165- }
166- catch ( Exception e ) {
167- // subscribe observer to failed observable
168- return Observable . Throw < GraphQLResponse < TResponse > > ( e ) . Subscribe ( observer ) ;
169- }
170-
171- var disposable = new CompositeDisposable (
172- observable . Subscribe ( observer )
173- ) ;
174-
175- Debug . WriteLine ( $ "submitting request { websocketRequest . Id } ") ;
176- // send request
177- try {
178- await graphQlHttpWebSocket . SendWebSocketRequest ( websocketRequest ) . ConfigureAwait ( false ) ;
179- }
180- catch ( Exception e ) {
181- Console . WriteLine ( e ) ;
182- throw ;
183- }
184-
185- return disposable ;
186- } )
161+ await client . Options . PreprocessRequest ( request , client ) ;
162+ var websocketRequest = new GraphQLWebSocketRequest {
163+ Id = Guid . NewGuid ( ) . ToString ( "N" ) ,
164+ Type = GraphQLWebSocketMessageType . GQL_START ,
165+ Payload = request
166+ } ;
167+ var observable = graphQlHttpWebSocket . ResponseStream
168+ . Where ( response => response != null && response . Id == websocketRequest . Id )
169+ . TakeUntil ( response => response . Type == GraphQLWebSocketMessageType . GQL_COMPLETE )
170+ . Select ( response => {
171+ Debug . WriteLine ( $ "received response for request { websocketRequest . Id } ") ;
172+ var typedResponse =
173+ JsonSerializer . Deserialize < GraphQLWebSocketResponse < TResponse > > ( response . MessageBytes ,
174+ client . Options . JsonSerializerOptions ) ;
175+ return typedResponse . Payload ;
176+ } ) ;
177+
178+ try {
179+ // intialize websocket (completes immediately if socket is already open)
180+ await graphQlHttpWebSocket . InitializeWebSocket ( ) . ConfigureAwait ( false ) ;
181+ }
182+ catch ( Exception e ) {
183+ // subscribe observer to failed observable
184+ return Observable . Throw < GraphQLResponse < TResponse > > ( e ) . Subscribe ( observer ) ;
185+ }
186+
187+ var disposable = new CompositeDisposable (
188+ observable . Subscribe ( observer )
189+ ) ;
190+
191+ Debug . WriteLine ( $ "submitting request { websocketRequest . Id } ") ;
192+ // send request
193+ try {
194+ await graphQlHttpWebSocket . SendWebSocketRequest ( websocketRequest ) . ConfigureAwait ( false ) ;
195+ }
196+ catch ( Exception e ) {
197+ Console . WriteLine ( e ) ;
198+ throw ;
199+ }
200+
201+ return disposable ;
202+ } )
187203 // complete sequence on OperationCanceledException, this is triggered by the cancellation token
188204 . Catch < GraphQLResponse < TResponse > , OperationCanceledException > ( exception =>
189205 Observable . Empty < GraphQLResponse < TResponse > > ( ) )
0 commit comments