Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/Grpc.Net.Client/Balancer/Subchannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public void UpdateAddresses(IReadOnlyList<BalancerAddress> addresses)
/// </summary>
public void RequestConnection()
{
var connectionRequested = false;
lock (Lock)
{
switch (_state)
Expand All @@ -245,7 +246,8 @@ public void RequestConnection()
SubchannelLog.ConnectionRequested(_logger, Id);

// Only start connecting underlying transport if in an idle state.
UpdateConnectivityState(ConnectivityState.Connecting, "Connection requested.");
// Update connectivity state outside of subchannel lock to avoid deadlock.
connectionRequested = true;
break;
case ConnectivityState.Connecting:
case ConnectivityState.Ready:
Expand All @@ -264,6 +266,11 @@ public void RequestConnection()
}
}

if (connectionRequested)
{
UpdateConnectivityState(ConnectivityState.Connecting, "Connection requested.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this now going to cause races where someone else comes along and modifies the Subchannel state and we end up updating it to Connecting when it should stay as TransientFailure or Ready?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is fine. The logic in load balancing is designed that state can change at any point, and other parts of the system then react to it. All other places update the connectivity state outside the subchannel lock, this one is the outlier.

}

// Don't capture the current ExecutionContext and its AsyncLocals onto the connect
var restoreFlow = false;
if (!ExecutionContext.IsFlowSuppressed())
Expand Down Expand Up @@ -448,6 +455,8 @@ internal bool UpdateConnectivityState(ConnectivityState state, string successDet

internal bool UpdateConnectivityState(ConnectivityState state, Status status)
{
Debug.Assert(!Monitor.IsEntered(Lock), "Ensure the subchannel lock isn't held here. Updating channel state with the subchannel lock can cause a deadlock.");

lock (Lock)
{
// Don't update subchannel state if the state is the same or the subchannel has been shutdown.
Expand All @@ -462,7 +471,7 @@ internal bool UpdateConnectivityState(ConnectivityState state, Status status)
}
_state = state;
}

// Notify channel outside of lock to avoid deadlocks.
_manager.OnSubchannelStateChange(this, state, status);
return true;
Expand Down
85 changes: 85 additions & 0 deletions test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#endregion

#if SUPPORT_LOAD_BALANCING
using System.Diagnostics;
using System.Net;
using System.Threading.Channels;
using Greet;
Expand All @@ -29,6 +30,7 @@
using Grpc.Tests.Shared;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Logging.Testing;
using NUnit.Framework;
using ChannelState = Grpc.Net.Client.Balancer.ChannelState;
Expand Down Expand Up @@ -535,6 +537,89 @@ public async Task PickAsync_DoesNotDeadlockAfterReconnect_WithZeroAddressResolve
await pickTask.DefaultTimeout();
}

[Test]
public async Task PickAsync_UpdateAddressesWhileRequestingConnection_DoesNotDeadlock()
{
var services = new ServiceCollection();
services.AddNUnitLogger();

var testSink = new TestSink();
var testProvider = new TestLoggerProvider(testSink);

services.AddLogging(b =>
{
b.AddProvider(testProvider);
});

await using var serviceProvider = services.BuildServiceProvider();
var loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();

var resolver = new TestResolver(loggerFactory);
resolver.UpdateAddresses(new List<BalancerAddress>
{
new BalancerAddress("localhost", 80)
});

var channelOptions = new GrpcChannelOptions();

var transportFactory = new TestSubchannelTransportFactory();
var clientChannel = CreateConnectionManager(loggerFactory, resolver, transportFactory, new[] { new PickFirstBalancerFactory() });
// Configure balancer similar to how GrpcChannel constructor does it
clientChannel.ConfigureBalancer(c => new ChildHandlerLoadBalancer(
c,
channelOptions.ServiceConfig,
clientChannel));

await clientChannel.ConnectAsync(waitForReady: true, cancellationToken: CancellationToken.None);

transportFactory.Transports.ForEach(t => t.Disconnect());

var requestConnectionSyncPoint = new SyncPoint(runContinuationsAsynchronously: true);
testSink.MessageLogged += (w) =>
{
if (w.EventId.Name == "ConnectionRequested")
{
requestConnectionSyncPoint.WaitToContinue().Wait();
}
};

// Task should pause when requesting connection because of the logger sink.
var pickTask = Task.Run(() => clientChannel.PickAsync(
new PickContext { Request = new HttpRequestMessage() },
waitForReady: true,
CancellationToken.None).AsTask());

// Wait until we're paused on requesting a connection.
await requestConnectionSyncPoint.WaitForSyncPoint().DefaultTimeout();

// Update addresses while requesting a connection.
var updateAddressesTcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
var updateAddressesTask = Task.Run(() =>
{
updateAddressesTcs.TrySetResult(null);
resolver.UpdateAddresses(new List<BalancerAddress>
{
new BalancerAddress("localhost", 81)
});
});

// There isn't a clean way to wait for UpdateAddresses to be waiting for the subchannel lock.
// Use a long delay to ensure we're waiting for the lock and are in the right state.
await updateAddressesTcs.Task.DefaultTimeout();
await Task.Delay(500);
requestConnectionSyncPoint.Continue();

// Ensure the pick completes without deadlock.
try
{
await pickTask.DefaultTimeout();
}
catch (TimeoutException ex)
{
throw new InvalidOperationException("Likely deadlock when picking subchannel.", ex);
}
}

[Test]
public async Task PickAsync_ExecutionContext_DoesNotCaptureAsyncLocalsInConnect()
{
Expand Down