diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 479ca4b2a83..e46da087abc 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -912,7 +912,6 @@ public NameResolverRegistry getNameResolverRegistry() { * * @since 1.2.0 */ - @ThreadSafe @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771") public abstract static class Subchannel { /** @@ -930,9 +929,9 @@ public abstract static class Subchannel { /** * Asks the Subchannel to create a connection (aka transport), if there isn't an active one. * - *
It should be called from the Synchronization Context. Currently will log a warning if - * violated. It will become an exception eventually. See #5015 for the background. + *
This method is safe to be called concurrently and can be called any time. If the + * Subchannel is unable to fulfill the request (e.g., because it's already shut down), this + * method has no effect. * * @since 1.2.0 */ diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index ae7303697a5..7897e1616b5 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -1518,7 +1518,6 @@ public void run() { @Override public void requestConnection() { - logWarningIfNotInSyncContext("Subchannel.requestConnection()"); subchannel.obtainActiveTransport(); } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java index f1f0d4e0d76..12387be0f2e 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java @@ -311,7 +311,7 @@ public void realTransportsHoldsOffIdleness() throws Exception { // Assume LoadBalancer has received an address, then create a subchannel. Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); MockClientTransportInfo t0 = newTransports.poll(); t0.listener.transportReady(); @@ -351,13 +351,13 @@ public void updateSubchannelAddresses_newAddressConnects() { Helper helper = helperCaptor.getValue(); Subchannel subchannel = createSubchannelSafely(helper, servers.get(0), Attributes.EMPTY); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); MockClientTransportInfo t0 = newTransports.poll(); t0.listener.transportReady(); updateSubchannelAddressesSafely(helper, subchannel, servers.get(1)); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); MockClientTransportInfo t1 = newTransports.poll(); t1.listener.transportReady(); } @@ -371,7 +371,7 @@ public void updateSubchannelAddresses_existingAddressDoesNotConnect() { Helper helper = helperCaptor.getValue(); Subchannel subchannel = createSubchannelSafely(helper, servers.get(0), Attributes.EMPTY); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); MockClientTransportInfo t0 = newTransports.poll(); t0.listener.transportReady(); @@ -379,7 +379,7 @@ public void updateSubchannelAddresses_existingAddressDoesNotConnect() { changedList.add(new FakeSocketAddress("aDifferentServer")); updateSubchannelAddressesSafely(helper, subchannel, new EquivalentAddressGroup(changedList)); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); assertNull(newTransports.poll()); } @@ -514,16 +514,6 @@ public void run() { return resultCapture.get(); } - private static void requestConnectionSafely(Helper helper, final Subchannel subchannel) { - helper.getSynchronizationContext().execute( - new Runnable() { - @Override - public void run() { - subchannel.requestConnection(); - } - }); - } - private static void updateBalancingStateSafely( final Helper helper, final ConnectivityState state, final SubchannelPicker picker) { helper.getSynchronizationContext().execute( diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 60c13d0b2c9..330244286ec 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -434,7 +434,7 @@ public void channelzMembership_subchannel() throws Exception { assertThat(getStats(channel).subchannels) .containsExactly(subchannel.getInternalSubchannel()); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); assertNotNull(transportInfo); assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId())); @@ -520,7 +520,7 @@ private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAft // Configure the picker so that first RPC goes to delayed transport, and second RPC goes to // real transport. Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); verify(mockTransportFactory) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); @@ -660,8 +660,8 @@ public void noMoreCallbackAfterLoadBalancerShutdown() { Subchannel subchannel1 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); Subchannel subchannel2 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - requestConnectionSafely(helper, subchannel1); - requestConnectionSafely(helper, subchannel2); + subchannel1.requestConnection(); + subchannel2.requestConnection(); verify(mockTransportFactory, times(2)) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); @@ -729,7 +729,7 @@ public void callOptionsExecutor() { verify(mockTransportFactory, never()) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); verify(mockTransportFactory) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); @@ -987,7 +987,7 @@ public void firstResolvedServerFailedToConnect() throws Exception { Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); inOrder.verify(mockLoadBalancer).handleSubchannelState( same(subchannel), stateInfoCaptor.capture()); assertEquals(CONNECTING, stateInfoCaptor.getValue().getState()); @@ -1137,7 +1137,7 @@ public void allServersFailedToConnect() throws Exception { Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); inOrder.verify(mockLoadBalancer).handleSubchannelState( same(subchannel), stateInfoCaptor.capture()); @@ -1218,7 +1218,7 @@ public void run() { any(SocketAddress.class), any(ClientTransportOptions.class), any(TransportLogger.class)); - requestConnectionSafely(helper, sub1); + sub1.requestConnection(); verify(mockTransportFactory) .newClientTransport( eq(socketAddress), @@ -1227,7 +1227,7 @@ public void run() { MockClientTransportInfo transportInfo1 = transports.poll(); assertNotNull(transportInfo1); - requestConnectionSafely(helper, sub2); + sub2.requestConnection(); verify(mockTransportFactory, times(2)) .newClientTransport( eq(socketAddress), @@ -1236,8 +1236,8 @@ public void run() { MockClientTransportInfo transportInfo2 = transports.poll(); assertNotNull(transportInfo2); - requestConnectionSafely(helper, sub1); - requestConnectionSafely(helper, sub2); + sub1.requestConnection(); + sub2.requestConnection(); // The subchannel doesn't matter since this isn't called verify(mockTransportFactory, times(2)) .newClientTransport( @@ -1273,8 +1273,8 @@ public void subchannelsWhenChannelShutdownNow() { createChannel(); Subchannel sub1 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); Subchannel sub2 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - requestConnectionSafely(helper, sub1); - requestConnectionSafely(helper, sub2); + sub1.requestConnection(); + sub2.requestConnection(); assertThat(transports).hasSize(2); MockClientTransportInfo ti1 = transports.poll(); @@ -1509,7 +1509,7 @@ public void subchannelChannel_normalUsage() { CallOptions callOptions = CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS); // Subchannel must be READY when creating the RPC. - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); verify(mockTransportFactory) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); @@ -1534,7 +1534,7 @@ public void subchannelChannel_failWhenNotReady() { Channel sChannel = subchannel.asChannel(); Metadata headers = new Metadata(); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); verify(mockTransportFactory) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); @@ -1563,7 +1563,7 @@ public void subchannelChannel_failWaitForReady() { Metadata headers = new Metadata(); // Subchannel must be READY when creating the RPC. - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); verify(mockTransportFactory) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); @@ -1666,7 +1666,7 @@ private void subtestNameResolutionRefreshWhenConnectionFailed( oobChannel.getSubchannel().requestConnection(); } else { Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); } MockClientTransportInfo transportInfo = transports.poll(); @@ -1753,7 +1753,7 @@ public Void answer(InvocationOnMock in) throws Throwable { // Simulate name resolution results EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress); Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); verify(mockTransportFactory) .newClientTransport( same(socketAddress), eq(clientTransportOptions), any(ChannelLogger.class)); @@ -1826,7 +1826,7 @@ public void pickerReturnsStreamTracer_noDelay() { ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class); createChannel(); Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); transportInfo.listener.transportReady(); ClientTransport mockTransport = transportInfo.transport; @@ -1864,7 +1864,7 @@ public void pickerReturnsStreamTracer_delayed() { call.start(mockCallListener, new Metadata()); Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); transportInfo.listener.transportReady(); ClientTransport mockTransport = transportInfo.transport; @@ -2239,7 +2239,7 @@ public void idleMode_resetsDelayedTransportPicker() { // Establish a connection Subchannel subchannel = createSubchannelSafely(helper2, addressGroup, Attributes.EMPTY); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); ConnectionClientTransport mockTransport = transportInfo.transport; ManagedClientTransport.Listener transportListener = transportInfo.listener; @@ -2307,7 +2307,7 @@ public void enterIdle_exitsIdleIfDelayedStreamPending() { // Establish a connection Subchannel subchannel = createSubchannelSafely(helper2, addressGroup, Attributes.EMPTY); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); ClientStream mockStream = mock(ClientStream.class); MockClientTransportInfo transportInfo = transports.poll(); ConnectionClientTransport mockTransport = transportInfo.transport; @@ -2337,7 +2337,7 @@ public void updateBalancingStateDoesUpdatePicker() { // Make the transport available with subchannel2 Subchannel subchannel1 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); Subchannel subchannel2 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - requestConnectionSafely(helper, subchannel2); + subchannel2.requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); ConnectionClientTransport mockTransport = transportInfo.transport; @@ -2737,7 +2737,7 @@ public void channelsAndSubchannels_instrumented_state() throws Exception { (AbstractSubchannel) createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); assertEquals(IDLE, getStats(subchannel).state); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); assertEquals(CONNECTING, getStats(subchannel).state); MockClientTransportInfo transportInfo = transports.poll(); @@ -2789,7 +2789,7 @@ private void channelsAndSubchannels_instrumented0(boolean success) throws Except ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class); AbstractSubchannel subchannel = (AbstractSubchannel) createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); transportInfo.listener.transportReady(); ClientTransport mockTransport = transportInfo.transport; @@ -3028,7 +3028,7 @@ public double nextDouble() { Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); ConnectionClientTransport mockTransport = transportInfo.transport; ClientStream mockStream = mock(ClientStream.class); @@ -3127,7 +3127,7 @@ public void hedgingScheduledThenChannelShutdown_hedgeShouldStillHappen_newCallSh Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); - requestConnectionSafely(helper, subchannel); + subchannel.requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); ConnectionClientTransport mockTransport = transportInfo.transport; ClientStream mockStream = mock(ClientStream.class); @@ -3923,16 +3923,6 @@ public void run() { return resultCapture.get(); } - private static void requestConnectionSafely(Helper helper, final Subchannel subchannel) { - helper.getSynchronizationContext().execute( - new Runnable() { - @Override - public void run() { - subchannel.requestConnection(); - } - }); - } - private static void updateBalancingStateSafely( final Helper helper, final ConnectivityState state, final SubchannelPicker picker) { helper.getSynchronizationContext().execute(