Skip to content

Commit f121405

Browse files
[SignalR] Avoid blocking common InvokeAsync usage (#42796)
* [SignalR] Avoid blocking common InvokeAsync usage * channel * fixup test * fb * sealed * crazy
1 parent e9577fb commit f121405

9 files changed

+248
-126
lines changed

src/SignalR/server/Core/src/HubConnectionContext.cs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,8 @@ public HubConnectionContext(ConnectionContext connectionContext, HubConnectionCo
7979
_systemClock = contextOptions.SystemClock ?? new SystemClock();
8080
_lastSendTick = _systemClock.CurrentTicks;
8181

82-
// We'll be avoiding using the semaphore when the limit is set to 1, so no need to allocate it
8382
var maxInvokeLimit = contextOptions.MaximumParallelInvocations;
84-
if (maxInvokeLimit != 1)
85-
{
86-
ActiveInvocationLimit = new SemaphoreSlim(maxInvokeLimit, maxInvokeLimit);
87-
}
83+
ActiveInvocationLimit = new ChannelBasedSemaphore(maxInvokeLimit);
8884
}
8985

9086
internal StreamTracker StreamTracker
@@ -102,11 +98,10 @@ internal StreamTracker StreamTracker
10298
}
10399

104100
internal HubCallerContext HubCallerContext { get; }
105-
internal HubCallerClients HubCallerClients { get; set; } = null!;
106101

107102
internal Exception? CloseException { get; private set; }
108103

109-
internal SemaphoreSlim? ActiveInvocationLimit { get; }
104+
internal ChannelBasedSemaphore ActiveInvocationLimit { get; }
110105

111106
/// <summary>
112107
/// Gets a <see cref="CancellationToken"/> that notifies when the connection is aborted.

src/SignalR/server/Core/src/HubConnectionHandler.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ private async Task HubOnDisconnectedAsync(HubConnectionContext connection, Excep
200200
// Ensure the connection is aborted before firing disconnect
201201
await connection.AbortAsync();
202202

203+
// If a client result is requested in OnDisconnectedAsync we want to avoid the SemaphoreFullException and get the better connection disconnected IOException
204+
_ = connection.ActiveInvocationLimit.TryAcquire();
205+
203206
try
204207
{
205208
await _dispatcher.OnDisconnectedAsync(connection, exception);
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Diagnostics;
5+
using System.Threading.Channels;
6+
7+
namespace Microsoft.AspNetCore.SignalR.Internal;
8+
9+
// Use a Channel instead of a SemaphoreSlim so that we can potentially save task allocations (ValueTask!)
10+
// Additionally initial perf results show faster RPS when using Channel instead of SemaphoreSlim
11+
internal sealed class ChannelBasedSemaphore
12+
{
13+
private readonly Channel<int> _channel;
14+
15+
public ChannelBasedSemaphore(int maxCapacity)
16+
{
17+
_channel = Channel.CreateBounded<int>(maxCapacity);
18+
for (var i = 0; i < maxCapacity; i++)
19+
{
20+
_channel.Writer.TryWrite(1);
21+
}
22+
}
23+
24+
public bool TryAcquire()
25+
{
26+
return _channel.Reader.TryRead(out _);
27+
}
28+
29+
// The int result isn't important, only reason it's exposed is because ValueTask<T> doesn't implement ValueTask so we can't cast like we could with Task<T> to Task
30+
public ValueTask<int> WaitAsync(CancellationToken cancellationToken = default)
31+
{
32+
return _channel.Reader.ReadAsync(cancellationToken);
33+
}
34+
35+
public void Release()
36+
{
37+
if (!_channel.Writer.TryWrite(1))
38+
{
39+
throw new SemaphoreFullException();
40+
}
41+
}
42+
43+
public ValueTask RunAsync<TState>(Func<TState, Task<bool>> callback, TState state)
44+
{
45+
if (TryAcquire())
46+
{
47+
_ = RunTask(callback, state);
48+
return ValueTask.CompletedTask;
49+
}
50+
51+
return RunSlowAsync(callback, state);
52+
}
53+
54+
private async ValueTask RunSlowAsync<TState>(Func<TState, Task<bool>> callback, TState state)
55+
{
56+
_ = await WaitAsync();
57+
_ = RunTask(callback, state);
58+
}
59+
60+
private async Task RunTask<TState>(Func<TState, Task<bool>> callback, TState state)
61+
{
62+
try
63+
{
64+
var shouldRelease = await callback(state);
65+
if (shouldRelease)
66+
{
67+
Release();
68+
}
69+
}
70+
catch
71+
{
72+
// DefaultHubDispatcher catches and handles exceptions
73+
// It does write to the connection in exception cases which also can't throw because we catch and log in HubConnectionContext
74+
Debug.Assert(false);
75+
}
76+
}
77+
}

src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,13 @@ public DefaultHubDispatcher(IServiceScopeFactory serviceScopeFactory, IHubContex
7373
public override async Task OnConnectedAsync(HubConnectionContext connection)
7474
{
7575
await using var scope = _serviceScopeFactory.CreateAsyncScope();
76-
connection.HubCallerClients = new HubCallerClients(_hubContext.Clients, connection.ConnectionId, connection.ActiveInvocationLimit is not null);
7776

7877
var hubActivator = scope.ServiceProvider.GetRequiredService<IHubActivator<THub>>();
7978
var hub = hubActivator.Create();
8079
try
8180
{
82-
InitializeHub(hub, connection);
81+
// OnConnectedAsync won't work with client results (ISingleClientProxy.InvokeAsync)
82+
InitializeHub(hub, connection, invokeAllowed: false);
8383

8484
if (_onConnectedMiddleware != null)
8585
{
@@ -90,9 +90,6 @@ public override async Task OnConnectedAsync(HubConnectionContext connection)
9090
{
9191
await hub.OnConnectedAsync();
9292
}
93-
94-
// OnConnectedAsync is finished, allow hub methods to use client results (ISingleClientProxy.InvokeAsync)
95-
connection.HubCallerClients.InvokeAllowed = true;
9693
}
9794
finally
9895
{
@@ -256,13 +253,13 @@ private Task ProcessInvocation(HubConnectionContext connection,
256253
else
257254
{
258255
bool isStreamCall = descriptor.StreamingParameters != null;
259-
if (connection.ActiveInvocationLimit != null && !isStreamCall && !isStreamResponse)
256+
if (!isStreamCall && !isStreamResponse)
260257
{
261258
return connection.ActiveInvocationLimit.RunAsync(static state =>
262259
{
263260
var (dispatcher, descriptor, connection, invocationMessage) = state;
264261
return dispatcher.Invoke(descriptor, connection, invocationMessage, isStreamResponse: false, isStreamCall: false);
265-
}, (this, descriptor, connection, hubMethodInvocationMessage));
262+
}, (this, descriptor, connection, hubMethodInvocationMessage)).AsTask();
266263
}
267264
else
268265
{
@@ -271,11 +268,12 @@ private Task ProcessInvocation(HubConnectionContext connection,
271268
}
272269
}
273270

274-
private async Task Invoke(HubMethodDescriptor descriptor, HubConnectionContext connection,
271+
private async Task<bool> Invoke(HubMethodDescriptor descriptor, HubConnectionContext connection,
275272
HubMethodInvocationMessage hubMethodInvocationMessage, bool isStreamResponse, bool isStreamCall)
276273
{
277274
var methodExecutor = descriptor.MethodExecutor;
278275

276+
var wasSemaphoreReleased = false;
279277
var disposeScope = true;
280278
var scope = _serviceScopeFactory.CreateAsyncScope();
281279
IHubActivator<THub>? hubActivator = null;
@@ -290,12 +288,12 @@ private async Task Invoke(HubMethodDescriptor descriptor, HubConnectionContext c
290288
Log.HubMethodNotAuthorized(_logger, hubMethodInvocationMessage.Target);
291289
await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
292290
$"Failed to invoke '{hubMethodInvocationMessage.Target}' because user is unauthorized");
293-
return;
291+
return true;
294292
}
295293

296294
if (!await ValidateInvocationMode(descriptor, isStreamResponse, hubMethodInvocationMessage, connection))
297295
{
298-
return;
296+
return true;
299297
}
300298

301299
try
@@ -308,7 +306,7 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
308306
Log.InvalidHubParameters(_logger, hubMethodInvocationMessage.Target, ex);
309307
await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
310308
ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors));
311-
return;
309+
return true;
312310
}
313311

314312
InitializeHub(hub, connection);
@@ -404,9 +402,15 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
404402
{
405403
if (disposeScope)
406404
{
405+
if (hub?.Clients is HubCallerClients hubCallerClients)
406+
{
407+
wasSemaphoreReleased = Interlocked.CompareExchange(ref hubCallerClients.ShouldReleaseSemaphore, 0, 1) == 0;
408+
}
407409
await CleanupInvocation(connection, hubMethodInvocationMessage, hubActivator, hub, scope);
408410
}
409411
}
412+
413+
return !wasSemaphoreReleased;
410414
}
411415

412416
private static ValueTask CleanupInvocation(HubConnectionContext connection, HubMethodInvocationMessage hubMessage, IHubActivator<THub>? hubActivator,
@@ -553,9 +557,9 @@ private static async Task SendInvocationError(string? invocationId,
553557
await connection.WriteAsync(CompletionMessage.WithError(invocationId, errorMessage));
554558
}
555559

556-
private void InitializeHub(THub hub, HubConnectionContext connection)
560+
private void InitializeHub(THub hub, HubConnectionContext connection, bool invokeAllowed = true)
557561
{
558-
hub.Clients = connection.HubCallerClients;
562+
hub.Clients = new HubCallerClients(_hubContext.Clients, connection.ConnectionId, connection.ActiveInvocationLimit) { InvokeAllowed = invokeAllowed };
559563
hub.Context = connection.HubCallerContext;
560564
hub.Groups = _hubContext.Groups;
561565
}

src/SignalR/server/Core/src/Internal/HubCallerClients.cs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,40 +7,36 @@ internal sealed class HubCallerClients : IHubCallerClients
77
{
88
private readonly string _connectionId;
99
private readonly IHubClients _hubClients;
10-
private readonly string[] _currentConnectionId;
11-
private readonly bool _parallelEnabled;
10+
internal readonly ChannelBasedSemaphore _parallelInvokes;
11+
12+
internal int ShouldReleaseSemaphore = 1;
1213

1314
// Client results don't work in OnConnectedAsync
1415
// This property is set by the hub dispatcher when those methods are being called
1516
// so we can prevent users from making blocking client calls by returning a custom ISingleClientProxy instance
1617
internal bool InvokeAllowed { get; set; }
1718

18-
public HubCallerClients(IHubClients hubClients, string connectionId, bool parallelEnabled)
19+
public HubCallerClients(IHubClients hubClients, string connectionId, ChannelBasedSemaphore parallelInvokes)
1920
{
2021
_connectionId = connectionId;
2122
_hubClients = hubClients;
22-
_currentConnectionId = new[] { _connectionId };
23-
_parallelEnabled = parallelEnabled;
23+
_parallelInvokes = parallelInvokes;
2424
}
2525

2626
IClientProxy IHubCallerClients<IClientProxy>.Caller => Caller;
2727
public ISingleClientProxy Caller
2828
{
2929
get
3030
{
31-
if (!_parallelEnabled)
32-
{
33-
return new NotParallelSingleClientProxy(_hubClients.Client(_connectionId));
34-
}
3531
if (!InvokeAllowed)
3632
{
3733
return new NoInvokeSingleClientProxy(_hubClients.Client(_connectionId));
3834
}
39-
return _hubClients.Client(_connectionId);
35+
return new SingleClientProxy(_hubClients.Client(_connectionId), this);
4036
}
4137
}
4238

43-
public IClientProxy Others => _hubClients.AllExcept(_currentConnectionId);
39+
public IClientProxy Others => _hubClients.AllExcept(new[] { _connectionId });
4440

4541
public IClientProxy All => _hubClients.All;
4642

@@ -52,15 +48,11 @@ public IClientProxy AllExcept(IReadOnlyList<string> excludedConnectionIds)
5248
IClientProxy IHubClients<IClientProxy>.Client(string connectionId) => Client(connectionId);
5349
public ISingleClientProxy Client(string connectionId)
5450
{
55-
if (!_parallelEnabled)
56-
{
57-
return new NotParallelSingleClientProxy(_hubClients.Client(connectionId));
58-
}
5951
if (!InvokeAllowed)
6052
{
6153
return new NoInvokeSingleClientProxy(_hubClients.Client(_connectionId));
6254
}
63-
return _hubClients.Client(connectionId);
55+
return new SingleClientProxy(_hubClients.Client(connectionId), this);
6456
}
6557

6658
public IClientProxy Group(string groupName)
@@ -75,7 +67,7 @@ public IClientProxy Groups(IReadOnlyList<string> groupNames)
7567

7668
public IClientProxy OthersInGroup(string groupName)
7769
{
78-
return _hubClients.GroupExcept(groupName, _currentConnectionId);
70+
return _hubClients.GroupExcept(groupName, new[] { _connectionId });
7971
}
8072

8173
public IClientProxy GroupExcept(string groupName, IReadOnlyList<string> excludedConnectionIds)
@@ -98,18 +90,18 @@ public IClientProxy Users(IReadOnlyList<string> userIds)
9890
return _hubClients.Users(userIds);
9991
}
10092

101-
private sealed class NotParallelSingleClientProxy : ISingleClientProxy
93+
private sealed class NoInvokeSingleClientProxy : ISingleClientProxy
10294
{
10395
private readonly ISingleClientProxy _proxy;
10496

105-
public NotParallelSingleClientProxy(ISingleClientProxy hubClients)
97+
public NoInvokeSingleClientProxy(ISingleClientProxy hubClients)
10698
{
10799
_proxy = hubClients;
108100
}
109101

110102
public Task<T> InvokeCoreAsync<T>(string method, object?[] args, CancellationToken cancellationToken = default)
111103
{
112-
throw new InvalidOperationException("Client results inside a Hub method requires HubOptions.MaximumParallelInvocationsPerClient to be greater than 1.");
104+
throw new InvalidOperationException("Client results inside OnConnectedAsync Hub methods are not allowed.");
113105
}
114106

115107
public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default)
@@ -118,18 +110,29 @@ public Task SendCoreAsync(string method, object?[] args, CancellationToken cance
118110
}
119111
}
120112

121-
private sealed class NoInvokeSingleClientProxy : ISingleClientProxy
113+
private sealed class SingleClientProxy : ISingleClientProxy
122114
{
123115
private readonly ISingleClientProxy _proxy;
116+
private readonly HubCallerClients _hubCallerClients;
124117

125-
public NoInvokeSingleClientProxy(ISingleClientProxy hubClients)
118+
public SingleClientProxy(ISingleClientProxy hubClients, HubCallerClients hubCallerClients)
126119
{
127120
_proxy = hubClients;
121+
_hubCallerClients = hubCallerClients;
128122
}
129123

130-
public Task<T> InvokeCoreAsync<T>(string method, object?[] args, CancellationToken cancellationToken = default)
124+
public async Task<T> InvokeCoreAsync<T>(string method, object?[] args, CancellationToken cancellationToken = default)
131125
{
132-
throw new InvalidOperationException("Client results inside OnConnectedAsync Hub methods are not allowed.");
126+
// Releases the Channel that is blocking pending invokes, which in turn can block the receive loop.
127+
// Because we are waiting for a result from the client we need to let the receive loop run otherwise we'll be blocked forever
128+
var value = Interlocked.CompareExchange(ref _hubCallerClients.ShouldReleaseSemaphore, 0, 1);
129+
// Only release once, and we set ShouldReleaseSemaphore to 0 so the DefaultHubDispatcher knows not to call Release again
130+
if (value == 1)
131+
{
132+
_hubCallerClients._parallelInvokes.Release();
133+
}
134+
var result = await _proxy.InvokeCoreAsync<T>(method, args, cancellationToken);
135+
return result;
133136
}
134137

135138
public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default)

src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs

Lines changed: 0 additions & 36 deletions
This file was deleted.

0 commit comments

Comments
 (0)