diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/SocketClientLoggerExtensions.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/SocketClientLoggerExtensions.cs index 4b277f1dc7..32d8bd52ed 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/SocketClientLoggerExtensions.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/SocketClientLoggerExtensions.cs @@ -62,6 +62,9 @@ internal static class SocketClientLoggerExtensions private static readonly Action _closingWebSocketFailed = LoggerMessage.Define(LogLevel.Information, 16, "{time}: Connection Id {connectionId}: Closing webSocket failed."); + private static readonly Action _cancelMessage = + LoggerMessage.Define(LogLevel.Debug, 17, "{time}: Connection Id {connectionId}: Canceled passing message to application."); + // Category: ServerSentEventsTransport and LongPollingTransport private static readonly Action _sendingMessages = LoggerMessage.Define(LogLevel.Debug, 9, "{time}: Connection Id {connectionId}: Sending {count} message(s) to the server using url: {url}."); @@ -283,6 +286,14 @@ public static void ClosingWebSocketFailed(this ILogger logger, string connection } } + public static void CancelMessage(this ILogger logger, string connectionId) + { + if (logger.IsEnabled(LogLevel.Debug)) + { + _cancelMessage(logger, DateTime.Now, connectionId, null); + } + } + public static void SendingMessages(this ILogger logger, string connectionId, int count, Uri url) { if (logger.IsEnabled(LogLevel.Debug)) diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs index 7177993346..2249c44bc9 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs @@ -19,6 +19,7 @@ public class WebSocketsTransport : ITransport private readonly ClientWebSocket _webSocket = new ClientWebSocket(); private Channel _application; private readonly CancellationTokenSource _transportCts = new CancellationTokenSource(); + private readonly CancellationTokenSource _receiveCts = new CancellationTokenSource(); private readonly ILogger _logger; private string _connectionId; @@ -80,7 +81,7 @@ private async Task ReceiveMessages(Uri pollUrl) try { - while (!_transportCts.Token.IsCancellationRequested) + while (!_receiveCts.Token.IsCancellationRequested) { const int bufferSize = 4096; var totalBytes = 0; @@ -91,7 +92,7 @@ private async Task ReceiveMessages(Uri pollUrl) var buffer = new ArraySegment(new byte[bufferSize]); //Exceptions are handled above where the send and receive tasks are being run. - receiveResult = await _webSocket.ReceiveAsync(buffer, _transportCts.Token); + receiveResult = await _webSocket.ReceiveAsync(buffer, _receiveCts.Token); if (receiveResult.MessageType == WebSocketMessageType.Close) { _logger.WebSocketClosed(_connectionId, receiveResult.CloseStatus); @@ -129,15 +130,25 @@ private async Task ReceiveMessages(Uri pollUrl) Buffer.BlockCopy(incomingMessage[0].Array, incomingMessage[0].Offset, messageBuffer, 0, incomingMessage[0].Count); } - _logger.MessageToApp(_connectionId, messageBuffer.Length); - while (await _application.Out.WaitToWriteAsync(_transportCts.Token)) + try { - if (_application.Out.TryWrite(messageBuffer)) + if (!_transportCts.Token.IsCancellationRequested) { - incomingMessage.Clear(); - break; + _logger.MessageToApp(_connectionId, messageBuffer.Length); + while (await _application.Out.WaitToWriteAsync(_transportCts.Token)) + { + if (_application.Out.TryWrite(messageBuffer)) + { + incomingMessage.Clear(); + break; + } + } } } + catch (OperationCanceledException) + { + _logger.CancelMessage(_connectionId); + } } } catch (OperationCanceledException) @@ -198,7 +209,7 @@ private async Task SendMessages(Uri sendUrl) finally { _logger.SendStopped(_connectionId); - _transportCts.Cancel(); + TriggerCancel(); } } @@ -248,7 +259,7 @@ private async Task CloseWebSocket() await _webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None); // shutdown the transport after a timeout in case the server does not send close frame - _transportCts.CancelAfter(TimeSpan.FromSeconds(5)); + TriggerCancel(); } } catch (Exception ex) @@ -258,5 +269,12 @@ private async Task CloseWebSocket() _logger.ClosingWebSocketFailed(_connectionId, ex); } } + + private void TriggerCancel() + { + // Give server 5 seconds to respond with a close frame for graceful close. + _receiveCts.CancelAfter(TimeSpan.FromSeconds(5)); + _transportCts.Cancel(); + } } } diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs index 627b03242b..62ef0b3e91 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs @@ -62,7 +62,7 @@ public async Task WebSocketsTransportStopsWhenConnectionChannelClosed() await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection, TransferMode.Binary, connectionId: string.Empty); connectionToTransport.Out.TryComplete(); - await webSocketsTransport.Running.OrTimeout(); + await webSocketsTransport.Running.OrTimeout(TimeSpan.FromSeconds(10)); } }