From 887925f08688def6fc804b0b51e46731036aed7b Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Thu, 3 Sep 2020 11:59:01 -0700 Subject: [PATCH 1/2] Send CloseMessage in more cases --- .../FunctionalTests/HubConnectionTests.cs | 26 ++++++++--- .../server/Core/src/HubConnectionContext.cs | 46 +++++++++++++++++++ .../server/Core/src/HubConnectionHandler.cs | 2 +- .../SignalR/test/HubConnectionHandlerTests.cs | 23 +++++----- 4 files changed, 79 insertions(+), 18 deletions(-) diff --git a/src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs b/src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs index 1e1de9abe406..4c2c48d7f0be 100644 --- a/src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs +++ b/src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs @@ -1349,7 +1349,11 @@ public async Task ServerLogsErrorIfClientInvokeCannotBeSerialized(string protoco }; var protocol = HubProtocols[protocolName]; - await using (var server = await StartServer(write => write.EventId.Name == "FailedWritingMessage")) + await using (var server = await StartServer(write => + { + return write.EventId.Name == "FailedWritingMessage" || write.EventId.Name == "ReceivedCloseWithError" + || write.EventId.Name == "ShutdownWithError"; + })) { var connection = CreateHubConnection(server.Url, "/default", HttpTransportType.WebSockets, protocol, LoggerFactory); var closedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -1361,9 +1365,12 @@ public async Task ServerLogsErrorIfClientInvokeCannotBeSerialized(string protoco var result = connection.InvokeAsync(nameof(TestHub.CallWithUnserializableObject)); // The connection should close. - Assert.Null(await closedTcs.Task.OrTimeout()); + var exception = await closedTcs.Task.OrTimeout(); + Assert.Contains("Connection closed with an error.", exception.Message); - await Assert.ThrowsAsync(() => result).OrTimeout(); + var hubException = await Assert.ThrowsAsync(() => result).OrTimeout(); + Assert.Contains("Connection closed with an error.", hubException.Message); + Assert.Contains(exceptionSubstring, hubException.Message); } catch (Exception ex) { @@ -1396,7 +1403,11 @@ public async Task ServerLogsErrorIfReturnValueCannotBeSerialized(string protocol }; var protocol = HubProtocols[protocolName]; - await using (var server = await StartServer(write => write.EventId.Name == "FailedWritingMessage")) + await using (var server = await StartServer(write => + { + return write.EventId.Name == "FailedWritingMessage" || write.EventId.Name == "ReceivedCloseWithError" + || write.EventId.Name == "ShutdownWithError"; + })) { var connection = CreateHubConnection(server.Url, "/default", HttpTransportType.LongPolling, protocol, LoggerFactory); var closedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -1408,9 +1419,12 @@ public async Task ServerLogsErrorIfReturnValueCannotBeSerialized(string protocol var result = connection.InvokeAsync(nameof(TestHub.GetUnserializableObject)).OrTimeout(); // The connection should close. - Assert.Null(await closedTcs.Task.OrTimeout()); + var exception = await closedTcs.Task.OrTimeout(); + Assert.Contains("Connection closed with an error.", exception.Message); - await Assert.ThrowsAsync(() => result).OrTimeout(); + var hubException = await Assert.ThrowsAsync(() => result).OrTimeout(); + Assert.Contains("Connection closed with an error.", hubException.Message); + Assert.Contains(exceptionSubstring, hubException.Message); } catch (Exception ex) { diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index 9f94b6a8b0d4..e54a5f1b4953 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -146,6 +146,52 @@ internal StreamTracker StreamTracker // Currently used only for streaming methods internal ConcurrentDictionary ActiveRequestCancellationSources { get; } = new ConcurrentDictionary(StringComparer.Ordinal); + // Only used when writing CloseMessage, it ignores _connectionAborted to be the final message sent to the client + internal ValueTask WriteCloseAsync(CloseMessage message, CancellationToken cancellationToken = default) + { + // Try to grab the lock synchronously, if we fail, go to the slower path + if (!_writeLock.Wait(0)) + { + return new ValueTask(WriteCloseSlowAsync(message, cancellationToken)); + } + + // This method should never throw synchronously + var task = WriteCore(message, cancellationToken); + + // The write didn't complete synchronously so await completion + if (!task.IsCompletedSuccessfully) + { + return new ValueTask(CompleteWriteAsync(task)); + } + + // Otherwise, release the lock acquired when entering WriteAsync + _writeLock.Release(); + + return default; + } + + private async Task WriteCloseSlowAsync(CloseMessage message, CancellationToken cancellationToken) + { + // Failed to get the lock immediately when entering WriteAsync so await until it is available + await _writeLock.WaitAsync(cancellationToken); + + try + { + + await WriteCore(message, cancellationToken); + } + catch (Exception ex) + { + CloseException = ex; + Log.FailedWritingMessage(_logger, ex); + AbortAllowReconnect(); + } + finally + { + _writeLock.Release(); + } + } + public virtual ValueTask WriteAsync(HubMessage message, CancellationToken cancellationToken = default) { // Try to grab the lock synchronously, if we fail, go to the slower path diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index d2d77e0fe669..2d29817e3c56 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -226,7 +226,7 @@ private async Task SendCloseAsync(HubConnectionContext connection, Exception? ex try { - await connection.WriteAsync(closeMessage); + await connection.WriteCloseAsync(closeMessage); } catch (Exception ex) { diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 862cde22d499..609a2550ae7c 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -155,6 +155,9 @@ public async Task AbortFromHubMethodForcesClientDisconnect() await client.SendInvocationAsync(nameof(AbortHub.Kill)).OrTimeout(); + var close = Assert.IsType(await client.ReadAsync().OrTimeout()); + Assert.False(close.AllowReconnect); + await connectionHandlerTask.OrTimeout(); Assert.Null(client.TryRead()); @@ -955,15 +958,18 @@ public async Task HubMethodListeningToConnectionAbortedClosesOnConnectionContext { var connectionHandlerTask = await client.ConnectAsync(connectionHandler); - var invokeTask = client.InvokeAsync(nameof(MethodHub.BlockingMethod)); + await client.SendInvocationAsync(nameof(MethodHub.BlockingMethod)).OrTimeout(); client.Connection.Abort(); + var closeMessage = Assert.IsType(await client.ReadAsync().OrTimeout()); + Assert.False(closeMessage.AllowReconnect); + // If this completes then the server has completed the connection await connectionHandlerTask.OrTimeout(); // Nothing written to connection because it was closed - Assert.False(invokeTask.IsCompleted); + Assert.Null(client.TryRead()); } } } @@ -1019,16 +1025,11 @@ public async Task HubMethodDoesNotSendResultWhenInvocationIsNonBlocking() // kill the connection client.Dispose(); + var message = Assert.IsType(client.TryRead()); + Assert.True(message.AllowReconnect); + // Ensure the client channel is empty - var message = client.TryRead(); - switch (message) - { - case CloseMessage close: - break; - default: - Assert.Null(message); - break; - } + Assert.Null(client.TryRead()); await connectionHandlerTask.OrTimeout(); } From 5ed2a4b78565ea531c401636db461c80bc2539f9 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Tue, 8 Sep 2020 20:59:07 -0700 Subject: [PATCH 2/2] fb --- .../server/Core/src/HubConnectionContext.cs | 57 +++---------------- .../server/Core/src/HubConnectionHandler.cs | 2 +- 2 files changed, 9 insertions(+), 50 deletions(-) diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index e54a5f1b4953..732420a0de0c 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -146,61 +146,20 @@ internal StreamTracker StreamTracker // Currently used only for streaming methods internal ConcurrentDictionary ActiveRequestCancellationSources { get; } = new ConcurrentDictionary(StringComparer.Ordinal); - // Only used when writing CloseMessage, it ignores _connectionAborted to be the final message sent to the client - internal ValueTask WriteCloseAsync(CloseMessage message, CancellationToken cancellationToken = default) - { - // Try to grab the lock synchronously, if we fail, go to the slower path - if (!_writeLock.Wait(0)) - { - return new ValueTask(WriteCloseSlowAsync(message, cancellationToken)); - } - - // This method should never throw synchronously - var task = WriteCore(message, cancellationToken); - - // The write didn't complete synchronously so await completion - if (!task.IsCompletedSuccessfully) - { - return new ValueTask(CompleteWriteAsync(task)); - } - - // Otherwise, release the lock acquired when entering WriteAsync - _writeLock.Release(); - - return default; - } - - private async Task WriteCloseSlowAsync(CloseMessage message, CancellationToken cancellationToken) + public virtual ValueTask WriteAsync(HubMessage message, CancellationToken cancellationToken = default) { - // Failed to get the lock immediately when entering WriteAsync so await until it is available - await _writeLock.WaitAsync(cancellationToken); - - try - { - - await WriteCore(message, cancellationToken); - } - catch (Exception ex) - { - CloseException = ex; - Log.FailedWritingMessage(_logger, ex); - AbortAllowReconnect(); - } - finally - { - _writeLock.Release(); - } + return WriteAsync(message, ignoreAbort: false, cancellationToken); } - public virtual ValueTask WriteAsync(HubMessage message, CancellationToken cancellationToken = default) + internal ValueTask WriteAsync(HubMessage message, bool ignoreAbort, CancellationToken cancellationToken = default) { // Try to grab the lock synchronously, if we fail, go to the slower path if (!_writeLock.Wait(0)) { - return new ValueTask(WriteSlowAsync(message, cancellationToken)); + return new ValueTask(WriteSlowAsync(message, ignoreAbort, cancellationToken)); } - if (_connectionAborted) + if (_connectionAborted && !ignoreAbort) { _writeLock.Release(); return default; @@ -318,14 +277,14 @@ private async Task CompleteWriteAsync(ValueTask task) } } - private async Task WriteSlowAsync(HubMessage message, CancellationToken cancellationToken) + private async Task WriteSlowAsync(HubMessage message, bool ignoreAbort, CancellationToken cancellationToken) { // Failed to get the lock immediately when entering WriteAsync so await until it is available await _writeLock.WaitAsync(cancellationToken); try { - if (_connectionAborted) + if (_connectionAborted && !ignoreAbort) { return; } @@ -347,7 +306,7 @@ private async Task WriteSlowAsync(HubMessage message, CancellationToken cancella private async Task WriteSlowAsync(SerializedHubMessage message, CancellationToken cancellationToken) { // Failed to get the lock immediately when entering WriteAsync so await until it is available - await _writeLock.WaitAsync(); + await _writeLock.WaitAsync(cancellationToken); try { diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index 2d29817e3c56..a16f42f490cf 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -226,7 +226,7 @@ private async Task SendCloseAsync(HubConnectionContext connection, Exception? ex try { - await connection.WriteCloseAsync(closeMessage); + await connection.WriteAsync(closeMessage, ignoreAbort: true); } catch (Exception ex) {