Skip to content

Commit 8c0b9b3

Browse files
committed
crazy
1 parent 863cd48 commit 8c0b9b3

File tree

8 files changed

+135
-73
lines changed

8 files changed

+135
-73
lines changed

src/SignalR/server/Core/src/HubConnectionContext.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ internal StreamTracker StreamTracker
9898
}
9999

100100
internal HubCallerContext HubCallerContext { get; }
101-
internal HubCallerClients HubCallerClients { get; set; } = null!;
102101

103102
internal Exception? CloseException { get; private set; }
104103

src/SignalR/server/Core/src/HubConnectionHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,8 @@ private async Task HubOnDisconnectedAsync(HubConnectionContext connection, Excep
200200
// Ensure the connection is aborted before firing disconnect
201201
await connection.AbortAsync();
202202

203-
// If a client result is requested in OnDisconnectedAsync we want to make sure it isn't blocked by the ActiveInvocationLimit
204-
_ = connection.ActiveInvocationLimit.AttemptWait();
203+
// If a client result is requested in OnDisconnectedAsync we want to avoid the SemaphoreFullException and get the better connection disconnected IOException
204+
_ = connection.ActiveInvocationLimit.TryAcquire();
205205

206206
try
207207
{
Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
using System.Diagnostics;
45
using System.Threading.Channels;
56

67
namespace Microsoft.AspNetCore.SignalR.Internal;
@@ -9,7 +10,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal;
910
// Additionally initial perf results show faster RPS when using Channel instead of SemaphoreSlim
1011
internal sealed class ChannelBasedSemaphore
1112
{
12-
internal readonly Channel<int> _channel;
13+
private readonly Channel<int> _channel;
1314

1415
public ChannelBasedSemaphore(int maxCapacity)
1516
{
@@ -20,18 +21,57 @@ public ChannelBasedSemaphore(int maxCapacity)
2021
}
2122
}
2223

23-
public bool AttemptWait()
24+
public bool TryAcquire()
2425
{
2526
return _channel.Reader.TryRead(out _);
2627
}
2728

29+
// The int result isn't important, only reason it's exposed is because ValueTask<T> doesn't implement ValueTask so we can't cast like we could with Task<T> to Task
2830
public ValueTask<int> WaitAsync(CancellationToken cancellationToken = default)
2931
{
3032
return _channel.Reader.ReadAsync(cancellationToken);
3133
}
3234

33-
public ValueTask ReleaseAsync(CancellationToken cancellationToken = default)
35+
public void Release()
3436
{
35-
return _channel.Writer.WriteAsync(1, cancellationToken);
37+
if (!_channel.Writer.TryWrite(1))
38+
{
39+
throw new SemaphoreFullException();
40+
}
41+
}
42+
43+
public ValueTask RunAsync<TState>(Func<TState, Task<bool>> callback, TState state)
44+
{
45+
if (TryAcquire())
46+
{
47+
_ = RunTask(callback, state);
48+
return ValueTask.CompletedTask;
49+
}
50+
51+
return RunSlowAsync(callback, state);
52+
}
53+
54+
private async ValueTask RunSlowAsync<TState>(Func<TState, Task<bool>> callback, TState state)
55+
{
56+
_ = await WaitAsync();
57+
_ = RunTask(callback, state);
58+
}
59+
60+
private async Task RunTask<TState>(Func<TState, Task<bool>> callback, TState state)
61+
{
62+
try
63+
{
64+
var shouldRelease = await callback(state);
65+
if (shouldRelease)
66+
{
67+
Release();
68+
}
69+
}
70+
catch
71+
{
72+
// DefaultHubDispatcher catches and handles exceptions
73+
// It does write to the connection in exception cases which also can't throw because we catch and log in HubConnectionContext
74+
Debug.Assert(false);
75+
}
3676
}
3777
}

src/SignalR/server/Core/src/Internal/ChannelExtensions.cs

Lines changed: 0 additions & 36 deletions
This file was deleted.

src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,13 @@ public DefaultHubDispatcher(IServiceScopeFactory serviceScopeFactory, IHubContex
7373
public override async Task OnConnectedAsync(HubConnectionContext connection)
7474
{
7575
await using var scope = _serviceScopeFactory.CreateAsyncScope();
76-
connection.HubCallerClients = new HubCallerClients(_hubContext.Clients, connection.ConnectionId, connection.ActiveInvocationLimit);
7776

7877
var hubActivator = scope.ServiceProvider.GetRequiredService<IHubActivator<THub>>();
7978
var hub = hubActivator.Create();
8079
try
8180
{
82-
InitializeHub(hub, connection);
81+
// OnConnectedAsync won't work with client results (ISingleClientProxy.InvokeAsync)
82+
InitializeHub(hub, connection, invokeAllowed: false);
8383

8484
if (_onConnectedMiddleware != null)
8585
{
@@ -90,9 +90,6 @@ public override async Task OnConnectedAsync(HubConnectionContext connection)
9090
{
9191
await hub.OnConnectedAsync();
9292
}
93-
94-
// OnConnectedAsync is finished, allow hub methods to use client results (ISingleClientProxy.InvokeAsync)
95-
connection.HubCallerClients.InvokeAllowed = true;
9693
}
9794
finally
9895
{
@@ -271,11 +268,12 @@ private Task ProcessInvocation(HubConnectionContext connection,
271268
}
272269
}
273270

274-
private async Task Invoke(HubMethodDescriptor descriptor, HubConnectionContext connection,
271+
private async Task<bool> Invoke(HubMethodDescriptor descriptor, HubConnectionContext connection,
275272
HubMethodInvocationMessage hubMethodInvocationMessage, bool isStreamResponse, bool isStreamCall)
276273
{
277274
var methodExecutor = descriptor.MethodExecutor;
278275

276+
var wasSemaphoreReleased = false;
279277
var disposeScope = true;
280278
var scope = _serviceScopeFactory.CreateAsyncScope();
281279
IHubActivator<THub>? hubActivator = null;
@@ -290,12 +288,12 @@ private async Task Invoke(HubMethodDescriptor descriptor, HubConnectionContext c
290288
Log.HubMethodNotAuthorized(_logger, hubMethodInvocationMessage.Target);
291289
await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
292290
$"Failed to invoke '{hubMethodInvocationMessage.Target}' because user is unauthorized");
293-
return;
291+
return true;
294292
}
295293

296294
if (!await ValidateInvocationMode(descriptor, isStreamResponse, hubMethodInvocationMessage, connection))
297295
{
298-
return;
296+
return true;
299297
}
300298

301299
try
@@ -308,7 +306,7 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
308306
Log.InvalidHubParameters(_logger, hubMethodInvocationMessage.Target, ex);
309307
await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
310308
ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors));
311-
return;
309+
return true;
312310
}
313311

314312
InitializeHub(hub, connection);
@@ -404,9 +402,15 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
404402
{
405403
if (disposeScope)
406404
{
405+
if (hub?.Clients is HubCallerClients hubCallerClients)
406+
{
407+
wasSemaphoreReleased = Interlocked.CompareExchange(ref hubCallerClients.ShouldReleaseSemaphore, 0, 1) == 0;
408+
}
407409
await CleanupInvocation(connection, hubMethodInvocationMessage, hubActivator, hub, scope);
408410
}
409411
}
412+
413+
return !wasSemaphoreReleased;
410414
}
411415

412416
private static ValueTask CleanupInvocation(HubConnectionContext connection, HubMethodInvocationMessage hubMessage, IHubActivator<THub>? hubActivator,
@@ -553,9 +557,9 @@ private static async Task SendInvocationError(string? invocationId,
553557
await connection.WriteAsync(CompletionMessage.WithError(invocationId, errorMessage));
554558
}
555559

556-
private void InitializeHub(THub hub, HubConnectionContext connection)
560+
private void InitializeHub(THub hub, HubConnectionContext connection, bool invokeAllowed = true)
557561
{
558-
hub.Clients = connection.HubCallerClients;
562+
hub.Clients = new HubCallerClients(_hubContext.Clients, connection.ConnectionId, connection.ActiveInvocationLimit) { InvokeAllowed = invokeAllowed };
559563
hub.Context = connection.HubCallerContext;
560564
hub.Groups = _hubContext.Groups;
561565
}

src/SignalR/server/Core/src/Internal/HubCallerClients.cs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ internal sealed class HubCallerClients : IHubCallerClients
77
{
88
private readonly string _connectionId;
99
private readonly IHubClients _hubClients;
10-
private readonly string[] _currentConnectionId;
11-
private readonly ChannelBasedSemaphore _parallelInvokes;
10+
internal readonly ChannelBasedSemaphore _parallelInvokes;
11+
12+
internal int ShouldReleaseSemaphore = 1;
1213

1314
// Client results don't work in OnConnectedAsync
1415
// This property is set by the hub dispatcher when those methods are being called
@@ -19,7 +20,6 @@ public HubCallerClients(IHubClients hubClients, string connectionId, ChannelBase
1920
{
2021
_connectionId = connectionId;
2122
_hubClients = hubClients;
22-
_currentConnectionId = new[] { _connectionId };
2323
_parallelInvokes = parallelInvokes;
2424
}
2525

@@ -32,11 +32,11 @@ public ISingleClientProxy Caller
3232
{
3333
return new NoInvokeSingleClientProxy(_hubClients.Client(_connectionId));
3434
}
35-
return new SingleClientProxy(_hubClients.Client(_connectionId), _parallelInvokes);
35+
return new SingleClientProxy(_hubClients.Client(_connectionId), this);
3636
}
3737
}
3838

39-
public IClientProxy Others => _hubClients.AllExcept(_currentConnectionId);
39+
public IClientProxy Others => _hubClients.AllExcept(new[] { _connectionId });
4040

4141
public IClientProxy All => _hubClients.All;
4242

@@ -52,7 +52,7 @@ public ISingleClientProxy Client(string connectionId)
5252
{
5353
return new NoInvokeSingleClientProxy(_hubClients.Client(_connectionId));
5454
}
55-
return new SingleClientProxy(_hubClients.Client(connectionId), _parallelInvokes);
55+
return new SingleClientProxy(_hubClients.Client(connectionId), this);
5656
}
5757

5858
public IClientProxy Group(string groupName)
@@ -67,7 +67,7 @@ public IClientProxy Groups(IReadOnlyList<string> groupNames)
6767

6868
public IClientProxy OthersInGroup(string groupName)
6969
{
70-
return _hubClients.GroupExcept(groupName, _currentConnectionId);
70+
return _hubClients.GroupExcept(groupName, new[] { _connectionId });
7171
}
7272

7373
public IClientProxy GroupExcept(string groupName, IReadOnlyList<string> excludedConnectionIds)
@@ -113,29 +113,26 @@ public Task SendCoreAsync(string method, object?[] args, CancellationToken cance
113113
private sealed class SingleClientProxy : ISingleClientProxy
114114
{
115115
private readonly ISingleClientProxy _proxy;
116-
private readonly ChannelBasedSemaphore _parallelInvokes;
116+
private readonly HubCallerClients _hubCallerClients;
117117

118-
public SingleClientProxy(ISingleClientProxy hubClients, ChannelBasedSemaphore parallelInvokes)
118+
public SingleClientProxy(ISingleClientProxy hubClients, HubCallerClients hubCallerClients)
119119
{
120120
_proxy = hubClients;
121-
_parallelInvokes = parallelInvokes;
121+
_hubCallerClients = hubCallerClients;
122122
}
123123

124124
public async Task<T> InvokeCoreAsync<T>(string method, object?[] args, CancellationToken cancellationToken = default)
125125
{
126126
// Releases the Channel that is blocking pending invokes, which in turn can block the receive loop.
127127
// Because we are waiting for a result from the client we need to let the receive loop run otherwise we'll be blocked forever
128-
await _parallelInvokes.ReleaseAsync(cancellationToken);
129-
try
130-
{
131-
var result = await _proxy.InvokeCoreAsync<T>(method, args, cancellationToken);
132-
return result;
133-
}
134-
finally
128+
var value = Interlocked.CompareExchange(ref _hubCallerClients.ShouldReleaseSemaphore, 0, 1);
129+
// Only release once, and we set ShouldReleaseSemaphore to 0 so the DefaultHubDispatcher knows not to call Release again
130+
if (value == 1)
135131
{
136-
// Re-wait, this is because when the hub method completes it will release which we already did above, so we need to reset the state
137-
_ = await _parallelInvokes.WaitAsync(CancellationToken.None);
132+
_hubCallerClients._parallelInvokes.Release();
138133
}
134+
var result = await _proxy.InvokeCoreAsync<T>(method, args, cancellationToken);
135+
return result;
139136
}
140137

141138
public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default)

src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,24 @@ public async Task<int> GetClientResult(int num)
338338
var sum = await Clients.Caller.InvokeAsync<int>("Sum", num, cancellationToken: default);
339339
return sum;
340340
}
341+
342+
public void BackgroundClientResult(TcsService tcsService)
343+
{
344+
var caller = Clients.Caller;
345+
_ = Task.Run(async () =>
346+
{
347+
try
348+
{
349+
await tcsService.StartedMethod.Task;
350+
var result = await caller.InvokeAsync<int>("GetResult", 1, CancellationToken.None);
351+
tcsService.EndMethod.SetResult(result);
352+
}
353+
catch (Exception ex)
354+
{
355+
tcsService.EndMethod.SetException(ex);
356+
}
357+
});
358+
}
341359
}
342360

343361
internal class SelfRef

src/SignalR/server/SignalR/test/HubConnectionHandlerTests.ClientResult.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,46 @@ public async Task ClientResultFromHubDoesNotBlockReceiveLoop()
277277
}
278278
}
279279

280+
[Fact]
281+
public async Task ClientResultFromBackgroundThreadInHubMethodWorks()
282+
{
283+
using (StartVerifiableLog())
284+
{
285+
var tcsService = new TcsService();
286+
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
287+
{
288+
builder.AddSingleton(tcsService);
289+
}, LoggerFactory);
290+
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<MethodHub>>();
291+
292+
using (var client = new TestClient())
293+
{
294+
var connectionHandlerTask = await client.ConnectAsync(connectionHandler).DefaultTimeout();
295+
296+
var completionMessage = await client.InvokeAsync(nameof(MethodHub.BackgroundClientResult)).DefaultTimeout();
297+
298+
tcsService.StartedMethod.SetResult(null);
299+
300+
var task = await Task.WhenAny(tcsService.EndMethod.Task, client.ReadAsync()).DefaultTimeout();
301+
if (task == tcsService.EndMethod.Task)
302+
{
303+
await tcsService.EndMethod.Task;
304+
}
305+
// Hub asks client for a result, this is an invocation message with an ID
306+
var invocationMessage = Assert.IsType<InvocationMessage>(await (Task<HubMessage>)task);
307+
Assert.NotNull(invocationMessage.InvocationId);
308+
var res = 4 + ((long)invocationMessage.Arguments[0]);
309+
await client.SendHubMessageAsync(CompletionMessage.WithResult(invocationMessage.InvocationId, res)).DefaultTimeout();
310+
311+
Assert.Equal(5, await tcsService.EndMethod.Task.DefaultTimeout());
312+
313+
// Make sure we can still do a Hub invocation and that the semaphore state didn't get messed up
314+
completionMessage = await client.InvokeAsync(nameof(MethodHub.ValueMethod)).DefaultTimeout();
315+
Assert.Equal(43L, completionMessage.Result);
316+
}
317+
}
318+
}
319+
280320
private class TestBinder : IInvocationBinder
281321
{
282322
public IReadOnlyList<Type> GetParameterTypes(string methodName)

0 commit comments

Comments
 (0)