Skip to content

Commit 5b84d4c

Browse files
[SignalR] Unblock user callbacks when waiting for client result (#44014)
Co-authored-by: Brennan Conroy <[email protected]>
1 parent f2ba446 commit 5b84d4c

File tree

4 files changed

+137
-49
lines changed

4 files changed

+137
-49
lines changed

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1281,7 +1281,14 @@ async Task StartProcessingInvocationMessages(ChannelReader<InvocationMessage> in
12811281
{
12821282
while (invocationMessageChannelReader.TryRead(out var invocationMessage))
12831283
{
1284-
await DispatchInvocationAsync(invocationMessage, connectionState).ConfigureAwait(false);
1284+
var invokeTask = DispatchInvocationAsync(invocationMessage, connectionState);
1285+
// If a client result is expected we shouldn't block on user code as that could potentially permanently block the application
1286+
// Even if it doesn't permanently block, it would be better if non-client result handlers could still be called while waiting for a result
1287+
// e.g. chat while waiting for user input for a turn in a game
1288+
if (string.IsNullOrEmpty(invocationMessage.InvocationId))
1289+
{
1290+
await invokeTask.ConfigureAwait(false);
1291+
}
12851292
}
12861293
}
12871294
}

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.Protocol.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -876,5 +876,36 @@ public async Task ClientResultCanReturnNullResult()
876876
await connection.DisposeAsync().DefaultTimeout();
877877
}
878878
}
879+
880+
[Fact]
881+
public async Task ClientResultHandlerDoesNotBlockOtherHandlers()
882+
{
883+
var connection = new TestConnection();
884+
var hubConnection = CreateHubConnection(connection);
885+
try
886+
{
887+
await hubConnection.StartAsync().DefaultTimeout();
888+
889+
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
890+
hubConnection.On("Result", async () =>
891+
{
892+
await tcs.Task.DefaultTimeout();
893+
return 1;
894+
});
895+
hubConnection.On("Other", () => tcs.SetResult());
896+
897+
await connection.ReceiveTextAsync("{\"type\":1,\"invocationId\":\"1\",\"target\":\"Result\",\"arguments\":[]}\u001e").DefaultTimeout();
898+
await connection.ReceiveTextAsync("{\"type\":1,\"target\":\"Other\",\"arguments\":[]}\u001e").DefaultTimeout();
899+
900+
var invokeMessage = await connection.ReadSentTextMessageAsync().DefaultTimeout();
901+
902+
Assert.Equal("{\"type\":3,\"invocationId\":\"1\",\"result\":1}", invokeMessage);
903+
}
904+
finally
905+
{
906+
await hubConnection.DisposeAsync().DefaultTimeout();
907+
await connection.DisposeAsync().DefaultTimeout();
908+
}
909+
}
879910
}
880911
}

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java

Lines changed: 64 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,6 +1375,7 @@ private final class ConnectionState implements InvocationBinder {
13751375
private Boolean handshakeReceived = false;
13761376
private ScheduledExecutorService handshakeTimeout = null;
13771377
private BehaviorSubject<InvocationMessage> messages = BehaviorSubject.create();
1378+
private ExecutorService resultInvocationPool = null;
13781379

13791380
public final Lock lock = new ReentrantLock();
13801381
public final CompletableSubject handshakeResponseSubject = CompletableSubject.create();
@@ -1506,7 +1507,7 @@ public void handleHandshake(ByteBuffer payload) {
15061507
}
15071508
handshakeReceived = true;
15081509
handshakeResponseSubject.onComplete();
1509-
handleInvocations();
1510+
startInvocationProcessing();
15101511
}
15111512
}
15121513

@@ -1528,66 +1529,81 @@ public void close() {
15281529
if (this.handshakeTimeout != null) {
15291530
this.handshakeTimeout.shutdownNow();
15301531
}
1532+
1533+
if (this.resultInvocationPool != null) {
1534+
this.resultInvocationPool.shutdownNow();
1535+
}
15311536
}
15321537

15331538
public void dispatchInvocation(InvocationMessage message) {
15341539
messages.onNext(message);
15351540
}
15361541

1537-
private void handleInvocations() {
1538-
messages.observeOn(Schedulers.io()).subscribe(invocationMessage -> {
1539-
List<InvocationHandler> handlers = this.connection.handlers.get(invocationMessage.getTarget());
1540-
boolean expectsResult = invocationMessage.getInvocationId() != null;
1541-
if (handlers == null) {
1542-
if (expectsResult) {
1543-
logger.warn("Failed to find a value returning handler for '{}' method. Sending error to server.", invocationMessage.getTarget());
1544-
sendHubMessageWithLock(new CompletionMessage(null, invocationMessage.getInvocationId(),
1545-
null, "Client did not provide a result."));
1546-
} else {
1547-
logger.warn("Failed to find handler for '{}' method.", invocationMessage.getTarget());
1548-
}
1549-
return;
1550-
}
1551-
Object result = null;
1552-
Exception resultException = null;
1553-
Boolean hasResult = false;
1554-
for (InvocationHandler handler : handlers) {
1555-
try {
1556-
Object action = handler.getAction();
1557-
if (handler.getHasResult()) {
1558-
FunctionBase function = (FunctionBase)action;
1559-
result = function.invoke(invocationMessage.getArguments()).blockingGet();
1560-
hasResult = true;
1561-
} else {
1562-
((ActionBase)action).invoke(invocationMessage.getArguments()).blockingAwait();
1563-
}
1564-
} catch (Exception e) {
1565-
logger.error("Invoking client side method '{}' failed:", invocationMessage.getTarget(), e);
1566-
if (handler.getHasResult()) {
1567-
resultException = e;
1568-
}
1569-
}
1542+
private void startInvocationProcessing() {
1543+
this.resultInvocationPool = Executors.newCachedThreadPool();
1544+
this.messages.observeOn(Schedulers.io()).subscribe(invocationMessage -> {
1545+
// if client result expected, unblock the invocation processing thread
1546+
if (invocationMessage.getInvocationId() != null) {
1547+
this.resultInvocationPool.submit(() -> handleInvocation(invocationMessage));
1548+
} else {
1549+
handleInvocation(invocationMessage);
15701550
}
1551+
}, (e) -> {
1552+
stop(e.getMessage());
1553+
}, () -> {
1554+
});
1555+
}
15711556

1557+
private void handleInvocation(InvocationMessage invocationMessage)
1558+
{
1559+
List<InvocationHandler> handlers = this.connection.handlers.get(invocationMessage.getTarget());
1560+
boolean expectsResult = invocationMessage.getInvocationId() != null;
1561+
if (handlers == null) {
15721562
if (expectsResult) {
1573-
if (resultException != null) {
1574-
sendHubMessageWithLock(new CompletionMessage(null, invocationMessage.getInvocationId(),
1575-
null, resultException.getMessage()));
1576-
} else if (hasResult) {
1577-
sendHubMessageWithLock(new CompletionMessage(null, invocationMessage.getInvocationId(),
1578-
result, null));
1563+
logger.warn("Failed to find a value returning handler for '{}' method. Sending error to server.", invocationMessage.getTarget());
1564+
sendHubMessageWithLock(new CompletionMessage(null, invocationMessage.getInvocationId(),
1565+
null, "Client did not provide a result."));
1566+
} else {
1567+
logger.warn("Failed to find handler for '{}' method.", invocationMessage.getTarget());
1568+
}
1569+
return;
1570+
}
1571+
Object result = null;
1572+
Exception resultException = null;
1573+
Boolean hasResult = false;
1574+
for (InvocationHandler handler : handlers) {
1575+
try {
1576+
Object action = handler.getAction();
1577+
if (handler.getHasResult()) {
1578+
FunctionBase function = (FunctionBase)action;
1579+
result = function.invoke(invocationMessage.getArguments()).blockingGet();
1580+
hasResult = true;
15791581
} else {
1580-
logger.warn("Failed to find a value returning handler for '{}' method. Sending error to server.", invocationMessage.getTarget());
1581-
sendHubMessageWithLock(new CompletionMessage(null, invocationMessage.getInvocationId(),
1582-
null, "Client did not provide a result."));
1582+
((ActionBase)action).invoke(invocationMessage.getArguments()).blockingAwait();
15831583
}
1584+
} catch (Exception e) {
1585+
logger.error("Invoking client side method '{}' failed:", invocationMessage.getTarget(), e);
1586+
if (handler.getHasResult()) {
1587+
resultException = e;
1588+
}
1589+
}
1590+
}
1591+
1592+
if (expectsResult) {
1593+
if (resultException != null) {
1594+
sendHubMessageWithLock(new CompletionMessage(null, invocationMessage.getInvocationId(),
1595+
null, resultException.getMessage()));
15841596
} else if (hasResult) {
1585-
logger.warn("Result given for '{}' method but server is not expecting a result.", invocationMessage.getTarget());
1597+
sendHubMessageWithLock(new CompletionMessage(null, invocationMessage.getInvocationId(),
1598+
result, null));
1599+
} else {
1600+
logger.warn("Failed to find a value returning handler for '{}' method. Sending error to server.", invocationMessage.getTarget());
1601+
sendHubMessageWithLock(new CompletionMessage(null, invocationMessage.getInvocationId(),
1602+
null, "Client did not provide a result."));
15861603
}
1587-
}, (e) -> {
1588-
stop(e.getMessage());
1589-
}, () -> {
1590-
});
1604+
} else if (hasResult) {
1605+
logger.warn("Result given for '{}' method but server is not expecting a result.", invocationMessage.getTarget());
1606+
}
15911607
}
15921608

15931609
@Override

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnection.ReturnResultTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,4 +395,38 @@ public void returnFromOnHandlerEightParams() {
395395
String expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}" + RECORD_SEPARATOR;
396396
assertEquals(expected, TestUtils.byteBufferToString(message));
397397
}
398+
399+
@Test
400+
public void clientResultHandlerDoesNotBlockOtherHandlers() {
401+
MockTransport mockTransport = new MockTransport();
402+
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
403+
CompletableSubject resultCalled = CompletableSubject.create();
404+
CompletableSubject completeResult = CompletableSubject.create();
405+
CompletableSubject nonResultCalled = CompletableSubject.create();
406+
407+
hubConnection.onWithResult("inc", (i) -> {
408+
resultCalled.onComplete();
409+
completeResult.timeout(30, TimeUnit.SECONDS).blockingAwait();
410+
return Single.just("bob");
411+
}, String.class);
412+
413+
hubConnection.on("inc2", (i) -> {
414+
nonResultCalled.onComplete();
415+
}, String.class);
416+
417+
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
418+
SingleSubject<ByteBuffer> sendTask = mockTransport.getNextSentMessage();
419+
mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\"]}" + RECORD_SEPARATOR);
420+
resultCalled.timeout(30, TimeUnit.SECONDS).blockingAwait();
421+
422+
// Send an non-result invocation and make sure it's processed even with a blocking result invocation
423+
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc2\",\"arguments\":[\"1\"]}" + RECORD_SEPARATOR);
424+
nonResultCalled.timeout(30, TimeUnit.SECONDS).blockingAwait();
425+
426+
completeResult.onComplete();
427+
428+
ByteBuffer message = sendTask.timeout(30, TimeUnit.SECONDS).blockingGet();
429+
String expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}" + RECORD_SEPARATOR;
430+
assertEquals(expected, TestUtils.byteBufferToString(message));
431+
}
398432
}

0 commit comments

Comments
 (0)