Skip to content

Respect CancellationToken in HubConnection.StartAsync() #10140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ private async Task StartAsyncInner(CancellationToken cancellationToken = default
throw new InvalidOperationException($"The {nameof(HubConnection)} cannot be started while {nameof(StopAsync)} is running.");
}

await StartAsyncCore(cancellationToken);
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _state.StopCts.Token))
{
await StartAsyncCore(cancellationToken);
}

_state.ChangeState(HubConnectionState.Connecting, HubConnectionState.Connected);
}
Expand Down Expand Up @@ -422,7 +425,7 @@ private async Task StartAsyncCore(CancellationToken cancellationToken)
Log.Starting(_logger);

// Start the connection
var connection = await _connectionFactory.ConnectAsync(_protocol.TransferFormat);
var connection = await _connectionFactory.ConnectAsync(_protocol.TransferFormat, cancellationToken);
var startingConnectionState = new ConnectionState(connection, this);

// From here on, if an error occurs we need to shut down the connection because
Expand Down Expand Up @@ -1023,7 +1026,8 @@ private async Task HandshakeAsync(ConnectionState startingConnectionState, Cance
try
{
using (var handshakeCts = new CancellationTokenSource(HandshakeTimeout))
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, handshakeCts.Token, _state.StopCts.Token))
// cancellationToken already contains _state.StopCts.Token, so we don't have to link it again
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, handshakeCts.Token))
{
while (true)
{
Expand Down Expand Up @@ -1287,7 +1291,7 @@ private async Task ReconnectAsync(Exception closeException)
var reconnectStartTime = DateTime.UtcNow;
var retryReason = closeException;
var nextRetryDelay = GetNextRetryDelay(previousReconnectAttempts++, TimeSpan.Zero, retryReason);

// We still have the connection lock from the caller, HandleConnectionClose.
_state.AssertInConnectionLock();

Expand Down Expand Up @@ -1347,8 +1351,7 @@ private async Task ReconnectAsync(Exception closeException)
SafeAssert(ReferenceEquals(_state.CurrentConnectionStateUnsynchronized, null),
"Someone other than Reconnect set the connection state!");

// HandshakeAsync already checks ReconnectingConnectionState.StopCts.Token.
await StartAsyncCore(CancellationToken.None);
await StartAsyncCore(_state.StopCts.Token);

Log.Reconnected(_logger, previousReconnectAttempts, DateTime.UtcNow - reconnectStartTime);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections.Client;
Expand Down Expand Up @@ -29,13 +30,13 @@ public TestTransport(Func<Task> onTransportStop = null, Func<Task> onTransportSt
Format = transferFormat;
}

public async Task StartAsync(Uri url, TransferFormat transferFormat)
public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
if ((Format & transferFormat) == 0)
{
throw new InvalidOperationException($"The '{transferFormat}' transfer format is not supported by this transport.");
}

var options = ClientPipeOptions.DefaultOptions;
var pair = DuplexPipe.CreateConnectionPair(options, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
public partial interface ITransport : System.IO.Pipelines.IDuplexPipe
{
System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat);
System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
System.Threading.Tasks.Task StopAsync();
}
public partial interface ITransportFactory
Expand All @@ -65,7 +65,7 @@ public LongPollingTransport(System.Net.Http.HttpClient httpClient, Microsoft.Ext
public System.IO.Pipelines.PipeReader Input { get { throw null; } }
public System.IO.Pipelines.PipeWriter Output { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat) { throw null; }
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StopAsync() { throw null; }
}
Expand All @@ -76,7 +76,7 @@ public ServerSentEventsTransport(System.Net.Http.HttpClient httpClient, Microsof
public System.IO.Pipelines.PipeReader Input { get { throw null; } }
public System.IO.Pipelines.PipeWriter Output { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat) { throw null; }
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StopAsync() { throw null; }
}
Expand All @@ -86,7 +86,7 @@ public WebSocketsTransport(Microsoft.AspNetCore.Http.Connections.Client.HttpConn
public System.IO.Pipelines.PipeReader Input { get { throw null; } }
public System.IO.Pipelines.PipeWriter Output { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat) { throw null; }
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StopAsync() { throw null; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
public partial interface ITransport : System.IO.Pipelines.IDuplexPipe
{
System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat);
System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
System.Threading.Tasks.Task StopAsync();
}
public partial interface ITransportFactory
Expand All @@ -65,7 +65,7 @@ public LongPollingTransport(System.Net.Http.HttpClient httpClient, Microsoft.Ext
public System.IO.Pipelines.PipeReader Input { get { throw null; } }
public System.IO.Pipelines.PipeWriter Output { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat) { throw null; }
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StopAsync() { throw null; }
}
Expand All @@ -76,7 +76,7 @@ public ServerSentEventsTransport(System.Net.Http.HttpClient httpClient, Microsof
public System.IO.Pipelines.PipeReader Input { get { throw null; } }
public System.IO.Pipelines.PipeWriter Output { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat) { throw null; }
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StopAsync() { throw null; }
}
Expand All @@ -86,7 +86,7 @@ public WebSocketsTransport(Microsoft.AspNetCore.Http.Connections.Client.HttpConn
public System.IO.Pipelines.PipeReader Input { get { throw null; } }
public System.IO.Pipelines.PipeWriter Output { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat) { throw null; }
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StopAsync() { throw null; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,11 @@ public async Task StartAsync(TransferFormat transferFormat, CancellationToken ca
{
using (_logger.BeginScope(_logScope))
{
await StartAsyncCore(transferFormat).ForceAsync();
await StartAsyncCore(transferFormat, cancellationToken).ForceAsync();
}
}

private async Task StartAsyncCore(TransferFormat transferFormat)
private async Task StartAsyncCore(TransferFormat transferFormat, CancellationToken cancellationToken)
{
CheckDisposed();

Expand All @@ -215,7 +215,7 @@ private async Task StartAsyncCore(TransferFormat transferFormat)

Log.Starting(_logger);

await SelectAndStartTransport(transferFormat);
await SelectAndStartTransport(transferFormat, cancellationToken);

_started = true;
Log.Started(_logger);
Expand Down Expand Up @@ -288,7 +288,7 @@ private async Task DisposeAsyncCore()
}
}

private async Task SelectAndStartTransport(TransferFormat transferFormat)
private async Task SelectAndStartTransport(TransferFormat transferFormat, CancellationToken cancellationToken)
{
var uri = _httpConnectionOptions.Url;
// Set the initial access token provider back to the original one from options
Expand All @@ -301,7 +301,7 @@ private async Task SelectAndStartTransport(TransferFormat transferFormat)
if (_httpConnectionOptions.Transports == HttpTransportType.WebSockets)
{
Log.StartingTransport(_logger, _httpConnectionOptions.Transports, uri);
await StartTransport(uri, _httpConnectionOptions.Transports, transferFormat);
await StartTransport(uri, _httpConnectionOptions.Transports, transferFormat, cancellationToken);
}
else
{
Expand All @@ -315,7 +315,7 @@ private async Task SelectAndStartTransport(TransferFormat transferFormat)

do
{
negotiationResponse = await GetNegotiationResponseAsync(uri);
negotiationResponse = await GetNegotiationResponseAsync(uri, cancellationToken);

if (negotiationResponse.Url != null)
{
Expand Down Expand Up @@ -379,12 +379,12 @@ private async Task SelectAndStartTransport(TransferFormat transferFormat)
// The negotiation response gets cleared in the fallback scenario.
if (negotiationResponse == null)
{
negotiationResponse = await GetNegotiationResponseAsync(uri);
negotiationResponse = await GetNegotiationResponseAsync(uri, cancellationToken);
connectUrl = CreateConnectUrl(uri, negotiationResponse.ConnectionId);
}

Log.StartingTransport(_logger, transportType, connectUrl);
await StartTransport(connectUrl, transportType, transferFormat);
await StartTransport(connectUrl, transportType, transferFormat, cancellationToken);
break;
}
}
Expand Down Expand Up @@ -414,7 +414,7 @@ private async Task SelectAndStartTransport(TransferFormat transferFormat)
}
}

private async Task<NegotiationResponse> NegotiateAsync(Uri url, HttpClient httpClient, ILogger logger)
private async Task<NegotiationResponse> NegotiateAsync(Uri url, HttpClient httpClient, ILogger logger, CancellationToken cancellationToken)
{
try
{
Expand All @@ -436,7 +436,7 @@ private async Task<NegotiationResponse> NegotiateAsync(Uri url, HttpClient httpC
// rather than buffer the entire response. This gives a small perf boost.
// Note that it is important to dispose of the response when doing this to
// avoid leaving the connection open.
using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken))
{
response.EnsureSuccessStatusCode();
var responseBuffer = await response.Content.ReadAsByteArrayAsync();
Expand Down Expand Up @@ -467,15 +467,15 @@ private static Uri CreateConnectUrl(Uri url, string connectionId)
return Utils.AppendQueryString(url, "id=" + connectionId);
}

private async Task StartTransport(Uri connectUrl, HttpTransportType transportType, TransferFormat transferFormat)
private async Task StartTransport(Uri connectUrl, HttpTransportType transportType, TransferFormat transferFormat, CancellationToken cancellationToken)
{
// Construct the transport
var transport = _transportFactory.CreateTransport(transportType);

// Start the transport, giving it one end of the pipe
try
{
await transport.StartAsync(connectUrl, transferFormat);
await transport.StartAsync(connectUrl, transferFormat, cancellationToken);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -604,9 +604,9 @@ private static bool IsWebSocketsSupported()
#endif
}

private async Task<NegotiationResponse> GetNegotiationResponseAsync(Uri uri)
private async Task<NegotiationResponse> GetNegotiationResponseAsync(Uri uri, CancellationToken cancellationToken)
{
var negotiationResponse = await NegotiateAsync(uri, _httpClient, _logger);
var negotiationResponse = await NegotiateAsync(uri, _httpClient, _logger, cancellationToken);
_connectionId = negotiationResponse.ConnectionId;
_logScope.ConnectionId = _connectionId;
return negotiationResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@

using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;

namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
public interface ITransport : IDuplexPipe
{
Task StartAsync(Uri url, TransferFormat transferFormat);
Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default);
Task StopAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public LongPollingTransport(HttpClient httpClient, ILoggerFactory loggerFactory)
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<LongPollingTransport>();
}

public async Task StartAsync(Uri url, TransferFormat transferFormat)
public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
if (transferFormat != TransferFormat.Binary && transferFormat != TransferFormat.Text)
{
Expand All @@ -52,7 +52,7 @@ public async Task StartAsync(Uri url, TransferFormat transferFormat)
// Make initial long polling request
// Server uses first long polling request to finish initializing connection and it returns without data
var request = new HttpRequestMessage(HttpMethod.Get, url);
using (var response = await _httpClient.SendAsync(request))
using (var response = await _httpClient.SendAsync(request, cancellationToken))
{
response.EnsureSuccessStatusCode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ServerSentEventsTransport(HttpClient httpClient, ILoggerFactory loggerFac
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<ServerSentEventsTransport>();
}

public async Task StartAsync(Uri url, TransferFormat transferFormat)
public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
if (transferFormat != TransferFormat.Text)
{
Expand All @@ -62,7 +62,7 @@ public async Task StartAsync(Uri url, TransferFormat transferFormat)

try
{
response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, CancellationToken.None);
response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
response.EnsureSuccessStatusCode();
}
catch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public WebSocketsTransport(HttpConnectionOptions httpConnectionOptions, ILoggerF
_accessTokenProvider = accessTokenProvider;
}

public async Task StartAsync(Uri url, TransferFormat transferFormat)
public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
if (url == null)
{
Expand Down Expand Up @@ -121,7 +121,7 @@ public async Task StartAsync(Uri url, TransferFormat transferFormat)

try
{
await _webSocket.ConnectAsync(resolvedUrl, CancellationToken.None);
await _webSocket.ConnectAsync(resolvedUrl, cancellationToken);
}
catch
{
Expand Down
2 changes: 1 addition & 1 deletion src/SignalR/server/SignalR/test/EndToEndTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ public FakeTransport()
}
}

public Task StartAsync(Uri url, TransferFormat transferFormat)
public Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
var options = ClientPipeOptions.DefaultOptions;
var pair = DuplexPipe.CreateConnectionPair(options, options);
Expand Down