Skip to content

Commit 50b5a0d

Browse files
committed
channel
1 parent 3c3f957 commit 50b5a0d

File tree

7 files changed

+60
-124
lines changed

7 files changed

+60
-124
lines changed

src/SignalR/server/Core/src/HubConnectionContext.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Diagnostics.CodeAnalysis;
88
using System.IO.Pipelines;
99
using System.Security.Claims;
10+
using System.Threading.Channels;
1011
using Microsoft.AspNetCore.Connections;
1112
using Microsoft.AspNetCore.Connections.Features;
1213
using Microsoft.AspNetCore.Http.Features;
@@ -79,12 +80,11 @@ public HubConnectionContext(ConnectionContext connectionContext, HubConnectionCo
7980
_systemClock = contextOptions.SystemClock ?? new SystemClock();
8081
_lastSendTick = _systemClock.CurrentTicks;
8182

82-
// We'll be avoiding using the semaphore when the limit is set to 1, so no need to allocate it
8383
var maxInvokeLimit = contextOptions.MaximumParallelInvocations;
84-
if (maxInvokeLimit != 1)
84+
ActiveInvocationLimit = Channel.CreateBounded<int>(maxInvokeLimit);
85+
for (var i = 0; i < maxInvokeLimit; i++)
8586
{
86-
// Don't specify max count, this is so InvokeAsync inside hub methods will not be able to soft-lock a connection if it's run on a separate thread from the hub method, or just not awaited
87-
ActiveInvocationLimit = new SemaphoreSlim(maxInvokeLimit);
87+
ActiveInvocationLimit.Writer.TryWrite(1);
8888
}
8989
}
9090

@@ -107,7 +107,7 @@ internal StreamTracker StreamTracker
107107

108108
internal Exception? CloseException { get; private set; }
109109

110-
internal SemaphoreSlim? ActiveInvocationLimit { get; }
110+
internal Channel<int> ActiveInvocationLimit { get; }
111111

112112
/// <summary>
113113
/// Gets a <see cref="CancellationToken"/> that notifies when the connection is aborted.

src/SignalR/server/Core/src/HubConnectionHandler.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ private async Task HubOnDisconnectedAsync(HubConnectionContext connection, Excep
200200
// Ensure the connection is aborted before firing disconnect
201201
await connection.AbortAsync();
202202

203+
// If a client result is requested in OnDisconnectedAsync we want to make sure it isn't blocked by the ActiveInvocationLimit
204+
_ = connection.ActiveInvocationLimit.Reader.TryRead(out _);
205+
203206
try
204207
{
205208
await _dispatcher.OnDisconnectedAsync(connection, exception);
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Threading.Channels;
5+
6+
namespace Microsoft.AspNetCore.SignalR.Internal;
7+
8+
internal static class ChannelExtensions
9+
{
10+
public static ValueTask RunAsync<TState>(this Channel<int> semaphoreSlim, Func<TState, Task> callback, TState state)
11+
{
12+
if (semaphoreSlim.Reader.TryRead(out _))
13+
{
14+
_ = RunTask(callback, semaphoreSlim, state);
15+
return ValueTask.CompletedTask;
16+
}
17+
18+
return RunSlowAsync(semaphoreSlim, callback, state);
19+
}
20+
21+
private static async ValueTask RunSlowAsync<TState>(this Channel<int> semaphoreSlim, Func<TState, Task> callback, TState state)
22+
{
23+
_ = await semaphoreSlim.Reader.ReadAsync();
24+
_ = RunTask(callback, semaphoreSlim, state);
25+
}
26+
27+
static async Task RunTask<TState>(Func<TState, Task> callback, Channel<int> semaphoreSlim, TState state)
28+
{
29+
try
30+
{
31+
await callback(state);
32+
}
33+
finally
34+
{
35+
await semaphoreSlim.Writer.WriteAsync(1);
36+
}
37+
}
38+
}

src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,13 @@ private Task ProcessInvocation(HubConnectionContext connection,
256256
else
257257
{
258258
bool isStreamCall = descriptor.StreamingParameters != null;
259-
if (connection.ActiveInvocationLimit != null && !isStreamCall && !isStreamResponse)
259+
if (!isStreamCall && !isStreamResponse)
260260
{
261261
return connection.ActiveInvocationLimit.RunAsync(static state =>
262262
{
263263
var (dispatcher, descriptor, connection, invocationMessage) = state;
264264
return dispatcher.Invoke(descriptor, connection, invocationMessage, isStreamResponse: false, isStreamCall: false);
265-
}, (this, descriptor, connection, hubMethodInvocationMessage));
265+
}, (this, descriptor, connection, hubMethodInvocationMessage)).AsTask();
266266
}
267267
else
268268
{

src/SignalR/server/Core/src/Internal/HubCallerClients.cs

Lines changed: 10 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
using System.Threading.Channels;
5+
46
namespace Microsoft.AspNetCore.SignalR.Internal;
57

68
internal sealed class HubCallerClients : IHubCallerClients
79
{
810
private readonly string _connectionId;
911
private readonly IHubClients _hubClients;
1012
private readonly string[] _currentConnectionId;
11-
private readonly SemaphoreSlim? _parallelInvokes;
13+
private readonly Channel<int> _parallelInvokes;
1214

1315
// Client results don't work in OnConnectedAsync
1416
// This property is set by the hub dispatcher when those methods are being called
1517
// so we can prevent users from making blocking client calls by returning a custom ISingleClientProxy instance
1618
internal bool InvokeAllowed { get; set; }
1719

18-
public HubCallerClients(IHubClients hubClients, string connectionId, SemaphoreSlim? parallelInvokes)
20+
public HubCallerClients(IHubClients hubClients, string connectionId, Channel<int> parallelInvokes)
1921
{
2022
_connectionId = connectionId;
2123
_hubClients = hubClients;
@@ -28,10 +30,6 @@ public ISingleClientProxy Caller
2830
{
2931
get
3032
{
31-
if (_parallelInvokes is null)
32-
{
33-
return new NotParallelSingleClientProxy(_hubClients.Client(_connectionId));
34-
}
3533
if (!InvokeAllowed)
3634
{
3735
return new NoInvokeSingleClientProxy(_hubClients.Client(_connectionId));
@@ -52,10 +50,6 @@ public IClientProxy AllExcept(IReadOnlyList<string> excludedConnectionIds)
5250
IClientProxy IHubClients<IClientProxy>.Client(string connectionId) => Client(connectionId);
5351
public ISingleClientProxy Client(string connectionId)
5452
{
55-
if (_parallelInvokes is null)
56-
{
57-
return new NotParallelSingleClientProxy(_hubClients.Client(connectionId));
58-
}
5953
if (!InvokeAllowed)
6054
{
6155
return new NoInvokeSingleClientProxy(_hubClients.Client(_connectionId));
@@ -98,26 +92,6 @@ public IClientProxy Users(IReadOnlyList<string> userIds)
9892
return _hubClients.Users(userIds);
9993
}
10094

101-
private sealed class NotParallelSingleClientProxy : ISingleClientProxy
102-
{
103-
private readonly ISingleClientProxy _proxy;
104-
105-
public NotParallelSingleClientProxy(ISingleClientProxy hubClients)
106-
{
107-
_proxy = hubClients;
108-
}
109-
110-
public Task<T> InvokeCoreAsync<T>(string method, object?[] args, CancellationToken cancellationToken = default)
111-
{
112-
throw new InvalidOperationException("Client results inside a Hub method requires HubOptions.MaximumParallelInvocationsPerClient to be greater than 1.");
113-
}
114-
115-
public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default)
116-
{
117-
return _proxy.SendCoreAsync(method, args, cancellationToken);
118-
}
119-
}
120-
12195
private sealed class NoInvokeSingleClientProxy : ISingleClientProxy
12296
{
12397
private readonly ISingleClientProxy _proxy;
@@ -141,28 +115,28 @@ public Task SendCoreAsync(string method, object?[] args, CancellationToken cance
141115
private sealed class SingleClientProxy : ISingleClientProxy
142116
{
143117
private readonly ISingleClientProxy _proxy;
144-
private readonly SemaphoreSlim _parallelInvokes;
118+
private readonly Channel<int> _parallelInvokes;
145119

146-
public SingleClientProxy(ISingleClientProxy hubClients, SemaphoreSlim parallelInvokes)
120+
public SingleClientProxy(ISingleClientProxy hubClients, Channel<int> parallelInvokes)
147121
{
148122
_proxy = hubClients;
149123
_parallelInvokes = parallelInvokes;
150124
}
151125

152126
public async Task<T> InvokeCoreAsync<T>(string method, object?[] args, CancellationToken cancellationToken = default)
153127
{
154-
// Releases the SemaphoreSlim that is blocking pending invokes, which in turn can block the receive loop.
128+
// Releases the Channel that is blocking pending invokes, which in turn can block the receive loop.
155129
// Because we are waiting for a result from the client we need to let the receive loop run otherwise we'll be blocked forever
156-
_parallelInvokes.Release();
130+
await _parallelInvokes.Writer.WriteAsync(1, cancellationToken);
157131
try
158132
{
159133
var result = await _proxy.InvokeCoreAsync<T>(method, args, cancellationToken);
160134
return result;
161135
}
162136
finally
163137
{
164-
// Re-acquire the SemaphoreSlim, this is because when the hub method completes it will call release
165-
await _parallelInvokes.WaitAsync(CancellationToken.None);
138+
// Re-read from the channel, this is because when the hub method completes it will release (write an entry) which we already did above, so we need to reset the state
139+
_ = await _parallelInvokes.Reader.ReadAsync(CancellationToken.None);
166140
}
167141
}
168142

src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs

Lines changed: 0 additions & 36 deletions
This file was deleted.

src/SignalR/server/SignalR/test/HubConnectionHandlerTests.ClientResult.cs

Lines changed: 2 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,7 @@ public async Task CanReturnClientResultToHub()
1414
{
1515
using (StartVerifiableLog())
1616
{
17-
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
18-
{
19-
// Waiting for a client result requires multiple invocations enabled
20-
builder.AddSignalR(o => o.MaximumParallelInvocationsPerClient = 2);
21-
}, LoggerFactory);
17+
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder => { }, LoggerFactory);
2218
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<MethodHub>>();
2319

2420
using (var client = new TestClient())
@@ -47,10 +43,8 @@ public async Task CanReturnClientResultErrorToHub()
4743
{
4844
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
4945
{
50-
// Waiting for a client result requires multiple invocations enabled
5146
builder.AddSignalR(o =>
5247
{
53-
o.MaximumParallelInvocationsPerClient = 2;
5448
o.EnableDetailedErrors = true;
5549
});
5650
}, LoggerFactory);
@@ -74,36 +68,6 @@ public async Task CanReturnClientResultErrorToHub()
7468
}
7569
}
7670

77-
[Fact]
78-
public async Task ThrowsWhenParallelHubInvokesNotEnabled()
79-
{
80-
using (StartVerifiableLog(write => write.EventId.Name == "FailedInvokingHubMethod"))
81-
{
82-
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
83-
{
84-
builder.AddSignalR(o =>
85-
{
86-
o.MaximumParallelInvocationsPerClient = 1;
87-
o.EnableDetailedErrors = true;
88-
});
89-
}, LoggerFactory);
90-
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<MethodHub>>();
91-
92-
using (var client = new TestClient())
93-
{
94-
var connectionHandlerTask = await client.ConnectAsync(connectionHandler).DefaultTimeout();
95-
96-
var invocationId = await client.SendHubMessageAsync(new InvocationMessage("1", nameof(MethodHub.GetClientResult), new object[] { 5 })).DefaultTimeout();
97-
98-
// Hub asks client for a result, this is an invocation message with an ID
99-
var completionMessage = Assert.IsType<CompletionMessage>(await client.ReadAsync().DefaultTimeout());
100-
Assert.Equal(invocationId, completionMessage.InvocationId);
101-
Assert.Equal("An unexpected error occurred invoking 'GetClientResult' on the server. InvalidOperationException: Client results inside a Hub method requires HubOptions.MaximumParallelInvocationsPerClient to be greater than 1.",
102-
completionMessage.Error);
103-
}
104-
}
105-
}
106-
10771
[Fact]
10872
public async Task ThrowsWhenUsedInOnConnectedAsync()
10973
{
@@ -113,7 +77,6 @@ public async Task ThrowsWhenUsedInOnConnectedAsync()
11377
{
11478
builder.AddSignalR(o =>
11579
{
116-
o.MaximumParallelInvocationsPerClient = 2;
11780
o.EnableDetailedErrors = true;
11881
});
11982
}, LoggerFactory);
@@ -141,7 +104,6 @@ public async Task ThrowsWhenUsedInOnDisconnectedAsync()
141104
{
142105
builder.AddSignalR(o =>
143106
{
144-
o.MaximumParallelInvocationsPerClient = 2;
145107
o.EnableDetailedErrors = true;
146108
});
147109
}, LoggerFactory);
@@ -235,11 +197,7 @@ public async Task CanReturnClientResultToTypedHubTwoWays()
235197
{
236198
using (StartVerifiableLog())
237199
{
238-
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
239-
{
240-
// Waiting for a client result requires multiple invocations enabled
241-
builder.AddSignalR(o => o.MaximumParallelInvocationsPerClient = 2);
242-
}, LoggerFactory);
200+
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder => { }, LoggerFactory);
243201
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<HubT>>();
244202

245203
using var client = new TestClient(invocationBinder: new GetClientResultThreeWaysInvocationBinder());
@@ -273,7 +231,6 @@ public async Task ClientResultFromHubDoesNotBlockReceiveLoop()
273231
{
274232
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
275233
{
276-
// Waiting for a client result requires multiple invocations enabled
277234
builder.AddSignalR(o => o.MaximumParallelInvocationsPerClient = 2);
278235
}, LoggerFactory);
279236
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<MethodHub>>();

0 commit comments

Comments
 (0)