Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Give Client a chance to receive Close Frame from Server #730

Merged
merged 6 commits into from
Aug 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ internal static class SocketClientLoggerExtensions
private static readonly Action<ILogger, DateTime, string, Exception> _closingWebSocketFailed =
LoggerMessage.Define<DateTime, string>(LogLevel.Information, 16, "{time}: Connection Id {connectionId}: Closing webSocket failed.");

private static readonly Action<ILogger, DateTime, string, Exception> _cancelMessage =
LoggerMessage.Define<DateTime, string>(LogLevel.Debug, 17, "{time}: Connection Id {connectionId}: Canceled passing message to application.");

// Category: ServerSentEventsTransport and LongPollingTransport
private static readonly Action<ILogger, DateTime, string, int, Uri, Exception> _sendingMessages =
LoggerMessage.Define<DateTime, string, int, Uri>(LogLevel.Debug, 9, "{time}: Connection Id {connectionId}: Sending {count} message(s) to the server using url: {url}.");
Expand Down Expand Up @@ -283,6 +286,14 @@ public static void ClosingWebSocketFailed(this ILogger logger, string connection
}
}

public static void CancelMessage(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_cancelMessage(logger, DateTime.Now, connectionId, null);
}
}

public static void SendingMessages(this ILogger logger, string connectionId, int count, Uri url)
{
if (logger.IsEnabled(LogLevel.Debug))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class WebSocketsTransport : ITransport
private readonly ClientWebSocket _webSocket = new ClientWebSocket();
private Channel<byte[], SendMessage> _application;
private readonly CancellationTokenSource _transportCts = new CancellationTokenSource();
private readonly CancellationTokenSource _receiveCts = new CancellationTokenSource();
private readonly ILogger _logger;
private string _connectionId;

Expand Down Expand Up @@ -80,7 +81,7 @@ private async Task ReceiveMessages(Uri pollUrl)

try
{
while (!_transportCts.Token.IsCancellationRequested)
while (!_receiveCts.Token.IsCancellationRequested)
{
const int bufferSize = 4096;
var totalBytes = 0;
Expand All @@ -91,7 +92,7 @@ private async Task ReceiveMessages(Uri pollUrl)
var buffer = new ArraySegment<byte>(new byte[bufferSize]);

//Exceptions are handled above where the send and receive tasks are being run.
receiveResult = await _webSocket.ReceiveAsync(buffer, _transportCts.Token);
receiveResult = await _webSocket.ReceiveAsync(buffer, _receiveCts.Token);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
_logger.WebSocketClosed(_connectionId, receiveResult.CloseStatus);
Expand Down Expand Up @@ -129,15 +130,25 @@ private async Task ReceiveMessages(Uri pollUrl)
Buffer.BlockCopy(incomingMessage[0].Array, incomingMessage[0].Offset, messageBuffer, 0, incomingMessage[0].Count);
}

_logger.MessageToApp(_connectionId, messageBuffer.Length);
while (await _application.Out.WaitToWriteAsync(_transportCts.Token))
try
{
if (_application.Out.TryWrite(messageBuffer))
if (!_transportCts.Token.IsCancellationRequested)
{
incomingMessage.Clear();
break;
_logger.MessageToApp(_connectionId, messageBuffer.Length);
while (await _application.Out.WaitToWriteAsync(_transportCts.Token))
{
if (_application.Out.TryWrite(messageBuffer))
{
incomingMessage.Clear();
break;
}
}
}
}
catch (OperationCanceledException)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth logging?

_logger.CancelMessage(_connectionId);
}
}
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -198,7 +209,7 @@ private async Task SendMessages(Uri sendUrl)
finally
{
_logger.SendStopped(_connectionId);
_transportCts.Cancel();
TriggerCancel();
Copy link
Contributor

@moozzyk moozzyk Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? if you get here it means that receiveCts has been cancelled or there was an exception. I think we only need _transportCts.Cancel() here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that - I did not expand and thought it was in the Receive loop.

}
}

Expand Down Expand Up @@ -248,7 +259,7 @@ private async Task CloseWebSocket()
await _webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None);

// shutdown the transport after a timeout in case the server does not send close frame
_transportCts.CancelAfter(TimeSpan.FromSeconds(5));
TriggerCancel();
}
}
catch (Exception ex)
Expand All @@ -258,5 +269,12 @@ private async Task CloseWebSocket()
_logger.ClosingWebSocketFailed(_connectionId, ex);
}
}

private void TriggerCancel()
{
// Give server 5 seconds to respond with a close frame for graceful close.
_receiveCts.CancelAfter(TimeSpan.FromSeconds(5));
_transportCts.Cancel();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public async Task WebSocketsTransportStopsWhenConnectionChannelClosed()
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection,
TransferMode.Binary, connectionId: string.Empty);
connectionToTransport.Out.TryComplete();
await webSocketsTransport.Running.OrTimeout();
await webSocketsTransport.Running.OrTimeout(TimeSpan.FromSeconds(10));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan... can we add an option or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you need to extend this timeout?
Also, this test failed on AppVeyor so it does not seem increasing this timeout helped.

Copy link
Member Author

@BrennanConroy BrennanConroy Aug 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The known flaky SSE test failed in AppVeyor. And the timeout is because we wait 5 seconds in WebSockets before canceling the ReceiveAsync

}
}

Expand Down