diff --git a/src/Grpc.Net.Client/Balancer/Subchannel.cs b/src/Grpc.Net.Client/Balancer/Subchannel.cs index db0a2ee42..d80d5adeb 100644 --- a/src/Grpc.Net.Client/Balancer/Subchannel.cs +++ b/src/Grpc.Net.Client/Balancer/Subchannel.cs @@ -237,6 +237,7 @@ public void UpdateAddresses(IReadOnlyList addresses) /// public void RequestConnection() { + var connectionRequested = false; lock (Lock) { switch (_state) @@ -245,7 +246,8 @@ public void RequestConnection() SubchannelLog.ConnectionRequested(_logger, Id); // Only start connecting underlying transport if in an idle state. - UpdateConnectivityState(ConnectivityState.Connecting, "Connection requested."); + // Update connectivity state outside of subchannel lock to avoid deadlock. + connectionRequested = true; break; case ConnectivityState.Connecting: case ConnectivityState.Ready: @@ -264,6 +266,11 @@ public void RequestConnection() } } + if (connectionRequested) + { + UpdateConnectivityState(ConnectivityState.Connecting, "Connection requested."); + } + // Don't capture the current ExecutionContext and its AsyncLocals onto the connect var restoreFlow = false; if (!ExecutionContext.IsFlowSuppressed()) @@ -448,6 +455,8 @@ internal bool UpdateConnectivityState(ConnectivityState state, string successDet internal bool UpdateConnectivityState(ConnectivityState state, Status status) { + Debug.Assert(!Monitor.IsEntered(Lock), "Ensure the subchannel lock isn't held here. Updating channel state with the subchannel lock can cause a deadlock."); + lock (Lock) { // Don't update subchannel state if the state is the same or the subchannel has been shutdown. @@ -462,7 +471,7 @@ internal bool UpdateConnectivityState(ConnectivityState state, Status status) } _state = state; } - + // Notify channel outside of lock to avoid deadlocks. _manager.OnSubchannelStateChange(this, state, status); return true; diff --git a/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs b/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs index 743639886..7a5221728 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs @@ -17,6 +17,7 @@ #endregion #if SUPPORT_LOAD_BALANCING +using System.Diagnostics; using System.Net; using System.Threading.Channels; using Greet; @@ -29,6 +30,7 @@ using Grpc.Tests.Shared; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Logging.Testing; using NUnit.Framework; using ChannelState = Grpc.Net.Client.Balancer.ChannelState; @@ -535,6 +537,89 @@ public async Task PickAsync_DoesNotDeadlockAfterReconnect_WithZeroAddressResolve await pickTask.DefaultTimeout(); } + [Test] + public async Task PickAsync_UpdateAddressesWhileRequestingConnection_DoesNotDeadlock() + { + var services = new ServiceCollection(); + services.AddNUnitLogger(); + + var testSink = new TestSink(); + var testProvider = new TestLoggerProvider(testSink); + + services.AddLogging(b => + { + b.AddProvider(testProvider); + }); + + await using var serviceProvider = services.BuildServiceProvider(); + var loggerFactory = serviceProvider.GetRequiredService(); + + var resolver = new TestResolver(loggerFactory); + resolver.UpdateAddresses(new List + { + new BalancerAddress("localhost", 80) + }); + + var channelOptions = new GrpcChannelOptions(); + + var transportFactory = new TestSubchannelTransportFactory(); + var clientChannel = CreateConnectionManager(loggerFactory, resolver, transportFactory, new[] { new PickFirstBalancerFactory() }); + // Configure balancer similar to how GrpcChannel constructor does it + clientChannel.ConfigureBalancer(c => new ChildHandlerLoadBalancer( + c, + channelOptions.ServiceConfig, + clientChannel)); + + await clientChannel.ConnectAsync(waitForReady: true, cancellationToken: CancellationToken.None); + + transportFactory.Transports.ForEach(t => t.Disconnect()); + + var requestConnectionSyncPoint = new SyncPoint(runContinuationsAsynchronously: true); + testSink.MessageLogged += (w) => + { + if (w.EventId.Name == "ConnectionRequested") + { + requestConnectionSyncPoint.WaitToContinue().Wait(); + } + }; + + // Task should pause when requesting connection because of the logger sink. + var pickTask = Task.Run(() => clientChannel.PickAsync( + new PickContext { Request = new HttpRequestMessage() }, + waitForReady: true, + CancellationToken.None).AsTask()); + + // Wait until we're paused on requesting a connection. + await requestConnectionSyncPoint.WaitForSyncPoint().DefaultTimeout(); + + // Update addresses while requesting a connection. + var updateAddressesTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var updateAddressesTask = Task.Run(() => + { + updateAddressesTcs.TrySetResult(null); + resolver.UpdateAddresses(new List + { + new BalancerAddress("localhost", 81) + }); + }); + + // There isn't a clean way to wait for UpdateAddresses to be waiting for the subchannel lock. + // Use a long delay to ensure we're waiting for the lock and are in the right state. + await updateAddressesTcs.Task.DefaultTimeout(); + await Task.Delay(500); + requestConnectionSyncPoint.Continue(); + + // Ensure the pick completes without deadlock. + try + { + await pickTask.DefaultTimeout(); + } + catch (TimeoutException ex) + { + throw new InvalidOperationException("Likely deadlock when picking subchannel.", ex); + } + } + [Test] public async Task PickAsync_ExecutionContext_DoesNotCaptureAsyncLocalsInConnect() {