Skip to content

Commit cff7f5f

Browse files
committed
fix test case 'CanReconnectWithSameObservable'
1 parent 2fcfc08 commit cff7f5f

File tree

4 files changed

+26
-36
lines changed

4 files changed

+26
-36
lines changed

src/GraphQL.Client/GraphQLHttpClient.cs

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Linq;
66
using System.Net.Http;
77
using System.Net.Http.Headers;
8+
using System.Runtime.CompilerServices;
89
using System.Threading;
910
using System.Threading.Tasks;
1011
using GraphQL.Client.Abstractions;
@@ -97,34 +98,15 @@ public Task<GraphQLResponse<TResponse>> SendMutationAsync<TResponse>(GraphQLRequ
9798

9899
/// <inheritdoc />
99100
public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TResponse>(GraphQLRequest request)
100-
{
101-
if (_disposed)
102-
throw new ObjectDisposedException(nameof(GraphQLHttpClient));
103-
104-
var key = new Tuple<GraphQLRequest, Type>(request, typeof(TResponse));
105-
106-
if (_subscriptionStreams.ContainsKey(key))
107-
return (IObservable<GraphQLResponse<TResponse>>)_subscriptionStreams[key];
108-
109-
var observable = GraphQlHttpWebSocket.CreateSubscriptionStream<TResponse>(request);
110-
111-
_subscriptionStreams.TryAdd(key, observable);
112-
return observable;
113-
}
101+
=> CreateSubscriptionStream<TResponse>(request, null);
114102

115103
/// <inheritdoc />
116-
public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TResponse>(GraphQLRequest request, Action<Exception> exceptionHandler)
104+
public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TResponse>(GraphQLRequest request, Action<Exception>? exceptionHandler)
117105
{
118106
if (_disposed)
119107
throw new ObjectDisposedException(nameof(GraphQLHttpClient));
120-
121-
var key = new Tuple<GraphQLRequest, Type>(request, typeof(TResponse));
122-
123-
if (_subscriptionStreams.ContainsKey(key))
124-
return (IObservable<GraphQLResponse<TResponse>>)_subscriptionStreams[key];
125-
108+
126109
var observable = GraphQlHttpWebSocket.CreateSubscriptionStream<TResponse>(request, exceptionHandler);
127-
_subscriptionStreams.TryAdd(key, observable);
128110
return observable;
129111
}
130112

src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public GraphQLHttpWebSocket(Uri webSocketUri, GraphQLHttpClient client)
9494
/// <param name="request">the <see cref="GraphQLRequest"/> to start the subscription</param>
9595
/// <param name="exceptionHandler">Optional: exception handler for handling exceptions within the receive pipeline</param>
9696
/// <returns>a <see cref="IObservable{TResponse}"/> which represents the subscription</returns>
97-
public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TResponse>(GraphQLRequest request, Action<Exception> exceptionHandler = null) =>
97+
public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TResponse>(GraphQLRequest request, Action<Exception>? exceptionHandler = null) =>
9898
Observable.Defer(() =>
9999
Observable.Create<GraphQLResponse<TResponse>>(async observer =>
100100
{
@@ -169,6 +169,7 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
169169
observable.Subscribe(observer),
170170
Disposable.Create(async () =>
171171
{
172+
Debug.WriteLine($"disposing subscription {startRequest.Id}, websocket state is '{WebSocketState}'");
172173
// only try to send close request on open websocket
173174
if (WebSocketState != WebSocketState.Open)
174175
return;
@@ -252,10 +253,7 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
252253
return Observable.Empty<GraphQLResponse<TResponse>>();
253254
}
254255
return Observable.Return(t.Item1);
255-
})
256-
// transform to hot observable and auto-connect
257-
.Publish().RefCount();
258-
256+
});
259257
/// <summary>
260258
/// Send a regular GraphQL request (query, mutation) via websocket
261259
/// </summary>

tests/GraphQL.Client.Tests.Common/Chat/Schema/IChat.cs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
using System;
22
using System.Collections.Concurrent;
3+
using System.Diagnostics;
4+
using System.Reactive.Disposables;
35
using System.Reactive.Linq;
46
using System.Reactive.Subjects;
7+
using System.Threading;
58

69
namespace GraphQL.Client.Tests.Common.Chat.Schema
710
{
@@ -82,13 +85,20 @@ public MessageFrom Join(string userId)
8285
}
8386

8487
public IObservable<Message> Messages(string user) =>
85-
_messageStream
86-
.Select(message =>
88+
Observable.Create<Message>(observer =>
89+
{
90+
Debug.WriteLine($"creating messages stream for user '{user}' on thread {Thread.CurrentThread.ManagedThreadId}");
91+
return new CompositeDisposable
8792
{
88-
message.Sub = user;
89-
return message;
90-
})
91-
.AsObservable();
93+
_messageStream.Select(message =>
94+
{
95+
message.Sub = user;
96+
return message;
97+
})
98+
.Subscribe(observer),
99+
Disposable.Create(() => Debug.WriteLine($"disposing messages stream for user '{user}' on thread {Thread.CurrentThread.ManagedThreadId}"))
100+
};
101+
});
92102

93103
public void AddError(Exception exception) => _messageStream.OnError(exception);
94104

tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,13 +226,13 @@ public async void CanReconnectWithSameObservable()
226226

227227
const string message1 = "Hello World";
228228
Debug.WriteLine($"adding message {message1}");
229-
var response = await ChatClient.AddMessageAsync(message1).ConfigureAwait(true);
229+
var response = await ChatClient.AddMessageAsync(message1);
230230
response.Data.AddMessage.Content.Should().Be(message1);
231231
await observer.Should().PushAsync(2);
232232
observer.RecordedMessages.Last().Data.MessageAdded.Content.Should().Be(message1);
233233

234234
const string message2 = "How are you?";
235-
response = await ChatClient.AddMessageAsync(message2).ConfigureAwait(true);
235+
response = await ChatClient.AddMessageAsync(message2);
236236
response.Data.AddMessage.Content.Should().Be(message2);
237237
await observer.Should().PushAsync(3);
238238
observer.RecordedMessages.Last().Data.MessageAdded.Content.Should().Be(message2);
@@ -247,7 +247,7 @@ public async void CanReconnectWithSameObservable()
247247
observer2.RecordedMessages.Last().Data.MessageAdded.Content.Should().Be(message2);
248248

249249
const string message3 = "lorem ipsum dolor si amet";
250-
response = await ChatClient.AddMessageAsync(message3).ConfigureAwait(true);
250+
response = await ChatClient.AddMessageAsync(message3);
251251
response.Data.AddMessage.Content.Should().Be(message3);
252252
await observer2.Should().PushAsync(2);
253253
observer2.RecordedMessages.Last().Data.MessageAdded.Content.Should().Be(message3);

0 commit comments

Comments
 (0)