Skip to content

Commit bb7ea6c

Browse files
authored
Merge pull request #266 from graphql-dotnet/fix-260
Fix #260
2 parents 5d3b4a0 + bcbb26f commit bb7ea6c

File tree

3 files changed

+30
-18
lines changed

3 files changed

+30
-18
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
bin/
55
obj/
66
*.user
7+
nuget/

src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System.Net.Http;
55
using System.Net.WebSockets;
66
using System.Reactive;
7-
using System.Reactive.Concurrency;
87
using System.Reactive.Disposables;
98
using System.Reactive.Linq;
109
using System.Reactive.Subjects;
@@ -30,8 +29,6 @@ internal class GraphQLHttpWebSocket : IDisposable
3029
private readonly BehaviorSubject<GraphQLWebsocketConnectionState> _stateSubject =
3130
new BehaviorSubject<GraphQLWebsocketConnectionState>(GraphQLWebsocketConnectionState.Disconnected);
3231
private readonly IDisposable _requestSubscription;
33-
private readonly EventLoopScheduler _receiveLoopScheduler = new EventLoopScheduler();
34-
private readonly EventLoopScheduler _sendLoopScheduler = new EventLoopScheduler();
3532

3633
private int _connectionAttempt = 0;
3734
private IConnectableObservable<WebsocketMessageWrapper> _incomingMessages;
@@ -80,12 +77,10 @@ public GraphQLHttpWebSocket(Uri webSocketUri, GraphQLHttpClient client)
8077
_client = client;
8178
_buffer = new ArraySegment<byte>(new byte[8192]);
8279
IncomingMessageStream = GetMessageStream();
83-
_receiveLoopScheduler.Schedule(() =>
84-
Debug.WriteLine($"receive loop scheduler thread id: {Thread.CurrentThread.ManagedThreadId}"));
8580

8681
_requestSubscription = _requestSubject
87-
.ObserveOn(_sendLoopScheduler)
88-
.SelectMany(SendWebSocketRequestAsync)
82+
.Select(SendWebSocketRequestAsync)
83+
.Concat()
8984
.Subscribe();
9085
}
9186

@@ -436,7 +431,7 @@ private async Task ConnectAsync(CancellationToken token)
436431

437432
// create receiving observable
438433
_incomingMessages = Observable
439-
.Defer(() => GetReceiveTask().ToObservable().ObserveOn(_receiveLoopScheduler))
434+
.Defer(() => GetReceiveTask().ToObservable())
440435
.Repeat()
441436
// complete sequence on OperationCanceledException, this is triggered by the cancellation token on disposal
442437
.Catch<WebsocketMessageWrapper, OperationCanceledException>(exception => Observable.Empty<WebsocketMessageWrapper>())
@@ -489,13 +484,13 @@ private Task BackOff()
489484
}
490485

491486
private IObservable<WebsocketMessageWrapper> GetMessageStream() =>
492-
Observable.Using(() => new EventLoopScheduler(), scheduler =>
493-
Observable.Create<WebsocketMessageWrapper>(async observer =>
487+
Observable.Create<WebsocketMessageWrapper>(async observer =>
494488
{
495489
// make sure the websocket is connected
496490
await InitializeWebSocket();
497491
// subscribe observer to message stream
498-
var subscription = new CompositeDisposable(_incomingMessages.ObserveOn(scheduler).Subscribe(observer))
492+
var subscription = new CompositeDisposable(_incomingMessages
493+
.Subscribe(observer))
499494
{
500495
// register the observer's OnCompleted method with the cancellation token to complete the sequence on disposal
501496
_internalCancellationTokenSource.Token.Register(observer.OnCompleted)
@@ -507,7 +502,7 @@ private IObservable<WebsocketMessageWrapper> GetMessageStream() =>
507502
Debug.WriteLine($"new incoming message subscription {hashCode} created");
508503

509504
return subscription;
510-
}));
505+
});
511506

512507
private Task<WebsocketMessageWrapper> _receiveAsyncTask = null;
513508
private readonly object _receiveTaskLocker = new object();
@@ -634,10 +629,7 @@ private async Task CompleteAsync()
634629
_exceptionSubject?.OnCompleted();
635630
_exceptionSubject?.Dispose();
636631
_internalCancellationTokenSource.Dispose();
637-
638-
_sendLoopScheduler?.Dispose();
639-
_receiveLoopScheduler?.Dispose();
640-
632+
641633
Debug.WriteLine("GraphQLHttpWebSocket disposed");
642634
}
643635

tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
34
using System.Diagnostics;
45
using System.Linq;
56
using System.Net.WebSockets;
@@ -257,8 +258,26 @@ public async void CanConnectTwoSubscriptionsSimultaneously()
257258
var observable2 = ChatClient.CreateSubscriptionStream<UserJoinedSubscriptionResult>(_subscriptionRequest2, callbackTester2.Invoke);
258259

259260
Debug.WriteLine("subscribing...");
260-
var messagesMonitor = observable1.Observe();
261-
var joinedMonitor = observable2.Observe();
261+
var blocker = new ManualResetEventSlim(false);
262+
FluentTestObserver<GraphQLResponse<MessageAddedSubscriptionResult>> messagesMonitor = null;
263+
FluentTestObserver<GraphQLResponse<UserJoinedSubscriptionResult>> joinedMonitor = null;
264+
265+
var tasks = new List<Task>
266+
{
267+
Task.Run(() =>
268+
{
269+
blocker.Wait();
270+
messagesMonitor = observable1.Observe();
271+
}),
272+
Task.Run(() =>
273+
{
274+
blocker.Wait();
275+
joinedMonitor = observable2.Observe();
276+
})
277+
};
278+
279+
blocker.Set();
280+
await Task.WhenAll(tasks);
262281

263282
await messagesMonitor.Should().PushAsync(1);
264283
messagesMonitor.RecordedMessages.Last().Data.MessageAdded.Content.Should().Be(InitialMessage.Content);

0 commit comments

Comments
 (0)