diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index f48bf9bf162..247f0901560 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -694,6 +694,10 @@ public Subchannel createSubchannel(List addrs, Attribute * Equivalent to {@link #updateSubchannelAddresses(io.grpc.LoadBalancer.Subchannel, List)} with * the given single {@code EquivalentAddressGroup}. * + *

It should be called from the Synchronization Context. Currently will log a warning if + * violated. It will become an exception eventually. See #5015 for the background. + * * @since 1.4.0 */ public final void updateSubchannelAddresses( @@ -707,6 +711,10 @@ public final void updateSubchannelAddresses( * {@link #createSubchannel} when the new and old addresses overlap, since the subchannel can * continue using an existing connection. * + *

It should be called from the Synchronization Context. Currently will log a warning if + * violated. It will become an exception eventually. See #5015 for the background. + * * @throws IllegalArgumentException if {@code subchannel} was not returned from {@link * #createSubchannel} or {@code addrs} is empty * @since 1.14.0 @@ -776,6 +784,10 @@ public ManagedChannel createResolvingOobChannel(String target) { * updateBalancingState()} has never been called, the channel will buffer all RPCs until a * picker is provided. * + *

It should be called from the Synchronization Context. Currently will log a warning if + * violated. It will become an exception eventually. See #5015 for the background. + * *

The passed state will be the channel's new state. The SHUTDOWN state should not be passed * and its behavior is undefined. * @@ -787,6 +799,10 @@ public abstract void updateBalancingState( /** * Call {@link NameResolver#refresh} on the channel's resolver. * + *

It should be called from the Synchronization Context. Currently will log a warning if + * violated. It will become an exception eventually. See #5015 for the background. + * * @since 1.18.0 */ public void refreshNameResolution() { @@ -884,6 +900,10 @@ public abstract static class Subchannel { * Shuts down the Subchannel. After this method is called, this Subchannel should no longer * be returned by the latest {@link SubchannelPicker picker}, and can be safely discarded. * + *

It should be called from the Synchronization Context. Currently will log a warning if + * violated. It will become an exception eventually. See #5015 for the background. + * * @since 1.2.0 */ public abstract void shutdown(); @@ -891,6 +911,10 @@ public abstract static class Subchannel { /** * Asks the Subchannel to create a connection (aka transport), if there isn't an active one. * + *

It should be called from the Synchronization Context. Currently will log a warning if + * violated. It will become an exception eventually. See #5015 for the background. + * * @since 1.2.0 */ public abstract void requestConnection(); @@ -900,6 +924,10 @@ public abstract static class Subchannel { * the Subchannel has only one {@link EquivalentAddressGroup}. Under the hood it calls * {@link #getAllAddresses}. * + *

It should be called from the Synchronization Context. Currently will log a warning if + * violated. It will become an exception eventually. See #5015 for the background. + * * @throws IllegalStateException if this subchannel has more than one EquivalentAddressGroup. * Use {@link #getAllAddresses} instead * @since 1.2.0 @@ -913,6 +941,10 @@ public final EquivalentAddressGroup getAddresses() { /** * Returns the addresses that this Subchannel is bound to. The returned list will not be empty. * + *

It should be called from the Synchronization Context. Currently will log a warning if + * violated. It will become an exception eventually. See #5015 for the background. + * * @since 1.14.0 */ public List getAllAddresses() { diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index c372511cb25..58e8808292b 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -1045,14 +1045,7 @@ private void handleInternalSubchannelState(ConnectivityStateInfo newState) { @Override public AbstractSubchannel createSubchannel( List addressGroups, Attributes attrs) { - try { - syncContext.throwIfNotInThisSynchronizationContext(); - } catch (IllegalStateException e) { - logger.log(Level.WARNING, - "We sugguest you call createSubchannel() from SynchronizationContext." - + " Otherwise, it may race with handleSubchannelState()." - + " See https://github.com/grpc/grpc-java/issues/5015", e); - } + logWarningIfNotInSyncContext("createSubchannel()"); checkNotNull(addressGroups, "addressGroups"); checkNotNull(attrs, "attrs"); // TODO(ejona): can we be even stricter? Like loadBalancer == null? @@ -1146,6 +1139,7 @@ public void updateBalancingState( final ConnectivityState newState, final SubchannelPicker newPicker) { checkNotNull(newState, "newState"); checkNotNull(newPicker, "newPicker"); + logWarningIfNotInSyncContext("updateBalancingState()"); final class UpdateBalancingState implements Runnable { @Override public void run() { @@ -1167,6 +1161,7 @@ public void run() { @Override public void refreshNameResolution() { + logWarningIfNotInSyncContext("refreshNameResolution()"); final class LoadBalancerRefreshNameResolution implements Runnable { @Override public void run() { @@ -1182,6 +1177,7 @@ public void updateSubchannelAddresses( LoadBalancer.Subchannel subchannel, List addrs) { checkArgument(subchannel instanceof SubchannelImpl, "subchannel must have been returned from createSubchannel"); + logWarningIfNotInSyncContext("updateSubchannelAddresses()"); ((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs); } @@ -1465,6 +1461,7 @@ InternalInstrumented getInternalSubchannel() { @Override public void shutdown() { + logWarningIfNotInSyncContext("Subchannel.shutdown()"); synchronized (shutdownLock) { if (shutdownRequested) { if (terminating && delayedShutdownTask != null) { @@ -1508,11 +1505,13 @@ public void run() { @Override public void requestConnection() { + logWarningIfNotInSyncContext("Subchannel.requestConnection()"); subchannel.obtainActiveTransport(); } @Override public List getAllAddresses() { + logWarningIfNotInSyncContext("Subchannel.getAllAddresses()"); return subchannel.getAddressGroups(); } @@ -1771,4 +1770,15 @@ public ConfigOrError parseServiceConfig(Map rawServiceConfig) { } } } + + private void logWarningIfNotInSyncContext(String method) { + try { + syncContext.throwIfNotInThisSynchronizationContext(); + } catch (IllegalStateException e) { + logger.log(Level.WARNING, + method + " should be called from SynchronizationContext. " + + "This warning will become an exception in a future release. " + + "See https://github.com/grpc/grpc-java/issues/5015 for more details", e); + } + } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java index d7ba03a9fc8..f1f0d4e0d76 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java @@ -38,6 +38,7 @@ import io.grpc.ChannelLogger; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.IntegerMarshaller; import io.grpc.LoadBalancer; @@ -310,14 +311,14 @@ public void realTransportsHoldsOffIdleness() throws Exception { // Assume LoadBalancer has received an address, then create a subchannel. Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); MockClientTransportInfo t0 = newTransports.poll(); t0.listener.transportReady(); SubchannelPicker mockPicker = mock(SubchannelPicker.class); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); // Delayed transport creates real streams in the app executor executor.runDueTasks(); @@ -350,13 +351,13 @@ public void updateSubchannelAddresses_newAddressConnects() { Helper helper = helperCaptor.getValue(); Subchannel subchannel = createSubchannelSafely(helper, servers.get(0), Attributes.EMPTY); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); MockClientTransportInfo t0 = newTransports.poll(); t0.listener.transportReady(); - helper.updateSubchannelAddresses(subchannel, servers.get(1)); + updateSubchannelAddressesSafely(helper, subchannel, servers.get(1)); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); MockClientTransportInfo t1 = newTransports.poll(); t1.listener.transportReady(); } @@ -370,15 +371,15 @@ public void updateSubchannelAddresses_existingAddressDoesNotConnect() { Helper helper = helperCaptor.getValue(); Subchannel subchannel = createSubchannelSafely(helper, servers.get(0), Attributes.EMPTY); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); MockClientTransportInfo t0 = newTransports.poll(); t0.listener.transportReady(); List changedList = new ArrayList<>(servers.get(0).getAddresses()); changedList.add(new FakeSocketAddress("aDifferentServer")); - helper.updateSubchannelAddresses(subchannel, new EquivalentAddressGroup(changedList)); + updateSubchannelAddressesSafely(helper, subchannel, new EquivalentAddressGroup(changedList)); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); assertNull(newTransports.poll()); } @@ -397,7 +398,7 @@ public void oobTransportDoesNotAffectIdleness() { SubchannelPicker failingPicker = mock(SubchannelPicker.class); when(failingPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withError(Status.UNAVAILABLE)); - helper.updateBalancingState(TRANSIENT_FAILURE, failingPicker); + updateBalancingStateSafely(helper, TRANSIENT_FAILURE, failingPicker); executor.runDueTasks(); verify(mockCallListener).onClose(same(Status.UNAVAILABLE), any(Metadata.class)); @@ -499,7 +500,7 @@ public String toString() { } } - // We need this because createSubchannel() should be called from the SynchronizationContext + // Helper methods to call methods from SynchronizationContext private static Subchannel createSubchannelSafely( final Helper helper, final EquivalentAddressGroup addressGroup, final Attributes attrs) { final AtomicReference resultCapture = new AtomicReference<>(); @@ -512,4 +513,36 @@ public void run() { }); return resultCapture.get(); } + + private static void requestConnectionSafely(Helper helper, final Subchannel subchannel) { + helper.getSynchronizationContext().execute( + new Runnable() { + @Override + public void run() { + subchannel.requestConnection(); + } + }); + } + + private static void updateBalancingStateSafely( + final Helper helper, final ConnectivityState state, final SubchannelPicker picker) { + helper.getSynchronizationContext().execute( + new Runnable() { + @Override + public void run() { + helper.updateBalancingState(state, picker); + } + }); + } + + private static void updateSubchannelAddressesSafely( + final Helper helper, final Subchannel subchannel, final EquivalentAddressGroup addrs) { + helper.getSynchronizationContext().execute( + new Runnable() { + @Override + public void run() { + helper.updateSubchannelAddresses(subchannel, addrs); + } + }); + } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index b933d726321..ee738bd0f43 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -359,7 +359,7 @@ public void close() throws SecurityException { LogRecord record = logRef.get(); assertThat(record.getLevel()).isEqualTo(Level.WARNING); assertThat(record.getMessage()).contains( - "We sugguest you call createSubchannel() from SynchronizationContext"); + "createSubchannel() should be called from SynchronizationContext"); assertThat(record.getThrown()).isInstanceOf(IllegalStateException.class); } finally { logger.removeHandler(handler); @@ -433,7 +433,7 @@ public void channelzMembership_subchannel() throws Exception { assertThat(getStats(channel).subchannels) .containsExactly(subchannel.getInternalSubchannel()); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); MockClientTransportInfo transportInfo = transports.poll(); assertNotNull(transportInfo); assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId())); @@ -444,7 +444,7 @@ public void channelzMembership_subchannel() throws Exception { // terminate subchannel assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); - subchannel.shutdown(); + shutdownSafely(helper, subchannel); timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); timer.runDueTasks(); assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); @@ -519,7 +519,7 @@ private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAft // Configure the picker so that first RPC goes to delayed transport, and second RPC goes to // real transport. Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); verify(mockTransportFactory) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); @@ -538,7 +538,7 @@ private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAft when(mockPicker.pickSubchannel( new PickSubchannelArgsImpl(method, headers2, CallOptions.DEFAULT))).thenReturn( PickResult.withSubchannel(subchannel)); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); // First RPC, will be pending ClientCall call = channel.newCall(method, CallOptions.DEFAULT); @@ -597,7 +597,7 @@ private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAft SubchannelPicker picker2 = mock(SubchannelPicker.class); when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))) .thenReturn(PickResult.withSubchannel(subchannel)); - helper.updateBalancingState(READY, picker2); + updateBalancingStateSafely(helper, READY, picker2); executor.runDueTasks(); verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT)); verify(mockStream).start(any(ClientStreamListener.class)); @@ -615,7 +615,7 @@ private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAft verify(mockTransport, never()).shutdownNow(any(Status.class)); } // LoadBalancer should shutdown the subchannel - subchannel.shutdown(); + shutdownSafely(helper, subchannel); if (shutdownNow) { verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS)); } else { @@ -659,8 +659,8 @@ public void noMoreCallbackAfterLoadBalancerShutdown() { Subchannel subchannel1 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); Subchannel subchannel2 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - subchannel1.requestConnection(); - subchannel2.requestConnection(); + requestConnectionSafely(helper, subchannel1); + requestConnectionSafely(helper, subchannel2); verify(mockTransportFactory, times(2)) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); @@ -728,7 +728,7 @@ public void callOptionsExecutor() { verify(mockTransportFactory, never()) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); verify(mockTransportFactory) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); @@ -741,7 +741,7 @@ public void callOptionsExecutor() { when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); assertEquals(0, callExecutor.numPendingTasks()); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); // Real streams are started in the call executor if they were previously buffered. assertEquals(1, callExecutor.runDueTasks()); @@ -762,7 +762,7 @@ public void callOptionsExecutor() { transportListener.transportTerminated(); // Clean up as much as possible to allow the channel to terminate. - subchannel.shutdown(); + shutdownSafely(helper, subchannel); timer.forwardNanos( TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS)); } @@ -842,7 +842,7 @@ public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() { Status status = Status.UNAVAILABLE.withDescription("for test"); when(picker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withDrop(status)); - helper.updateBalancingState(READY, picker); + updateBalancingStateSafely(helper, READY, picker); executor.runDueTasks(); verify(mockCallListener).onClose(same(status), any(Metadata.class)); @@ -986,7 +986,7 @@ public void firstResolvedServerFailedToConnect() throws Exception { Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); inOrder.verify(mockLoadBalancer).handleSubchannelState( same(subchannel), stateInfoCaptor.capture()); assertEquals(CONNECTING, stateInfoCaptor.getValue().getState()); @@ -1019,7 +1019,7 @@ public void firstResolvedServerFailedToConnect() throws Exception { assertEquals(READY, stateInfoCaptor.getValue().getState()); // A typical LoadBalancer will call this once the subchannel becomes READY - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); // Delayed transport uses the app executor to create real streams. executor.runDueTasks(); @@ -1068,7 +1068,7 @@ private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, bool when(picker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status)); - helper.updateBalancingState(READY, picker); + updateBalancingStateSafely(helper, READY, picker); executor.runDueTasks(); if (shouldFail) { @@ -1136,7 +1136,7 @@ public void allServersFailedToConnect() throws Exception { Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); inOrder.verify(mockLoadBalancer).handleSubchannelState( same(subchannel), stateInfoCaptor.capture()); @@ -1171,7 +1171,7 @@ public void allServersFailedToConnect() throws Exception { SubchannelPicker picker2 = mock(SubchannelPicker.class); when(picker2.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withError(server2Error)); - helper.updateBalancingState(TRANSIENT_FAILURE, picker2); + updateBalancingStateSafely(helper, TRANSIENT_FAILURE, picker2); executor.runDueTasks(); // ... which fails the fail-fast call @@ -1192,14 +1192,24 @@ public void subchannels() { // createSubchannel() always return a new Subchannel Attributes attrs1 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr1").build(); Attributes attrs2 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr2").build(); - Subchannel sub1 = createSubchannelSafely(helper, addressGroup, attrs1); - Subchannel sub2 = createSubchannelSafely(helper, addressGroup, attrs2); + final Subchannel sub1 = createSubchannelSafely(helper, addressGroup, attrs1); + final Subchannel sub2 = createSubchannelSafely(helper, addressGroup, attrs2); assertNotSame(sub1, sub2); assertNotSame(attrs1, attrs2); assertSame(attrs1, sub1.getAttributes()); assertSame(attrs2, sub2.getAttributes()); - assertSame(addressGroup, sub1.getAddresses()); - assertSame(addressGroup, sub2.getAddresses()); + + final AtomicBoolean snippetPassed = new AtomicBoolean(false); + helper.getSynchronizationContext().execute(new Runnable() { + @Override + public void run() { + // getAddresses() must be called from sync context + assertSame(addressGroup, sub1.getAddresses()); + assertSame(addressGroup, sub2.getAddresses()); + snippetPassed.set(true); + } + }); + assertThat(snippetPassed.get()).isTrue(); // requestConnection() verify(mockTransportFactory, never()) @@ -1207,7 +1217,7 @@ public void subchannels() { any(SocketAddress.class), any(ClientTransportOptions.class), any(TransportLogger.class)); - sub1.requestConnection(); + requestConnectionSafely(helper, sub1); verify(mockTransportFactory) .newClientTransport( eq(socketAddress), @@ -1216,7 +1226,7 @@ public void subchannels() { MockClientTransportInfo transportInfo1 = transports.poll(); assertNotNull(transportInfo1); - sub2.requestConnection(); + requestConnectionSafely(helper, sub2); verify(mockTransportFactory, times(2)) .newClientTransport( eq(socketAddress), @@ -1225,17 +1235,17 @@ public void subchannels() { MockClientTransportInfo transportInfo2 = transports.poll(); assertNotNull(transportInfo2); - sub1.requestConnection(); - sub2.requestConnection(); + requestConnectionSafely(helper, sub1); + requestConnectionSafely(helper, sub2); // The subchannel doesn't matter since this isn't called verify(mockTransportFactory, times(2)) .newClientTransport( eq(socketAddress), eq(clientTransportOptions), isA(TransportLogger.class)); // shutdown() has a delay - sub1.shutdown(); + shutdownSafely(helper, sub1); timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS); - sub1.shutdown(); + shutdownSafely(helper, sub1); verify(transportInfo1.transport, never()).shutdown(any(Status.class)); timer.forwardTime(1, TimeUnit.SECONDS); verify(transportInfo1.transport).shutdown(same(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_STATUS)); @@ -1246,7 +1256,7 @@ public void subchannels() { verify(mockLoadBalancer).shutdown(); verify(transportInfo2.transport, never()).shutdown(any(Status.class)); - sub2.shutdown(); + shutdownSafely(helper, sub2); verify(transportInfo2.transport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS)); // Cleanup @@ -1262,8 +1272,8 @@ public void subchannelsWhenChannelShutdownNow() { createChannel(); Subchannel sub1 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); Subchannel sub2 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - sub1.requestConnection(); - sub2.requestConnection(); + requestConnectionSafely(helper, sub1); + requestConnectionSafely(helper, sub2); assertThat(transports).hasSize(2); MockClientTransportInfo ti1 = transports.poll(); @@ -1293,9 +1303,9 @@ public void subchannelsNoConnectionShutdown() { channel.shutdown(); verify(mockLoadBalancer).shutdown(); - sub1.shutdown(); + shutdownSafely(helper, sub1); assertFalse(channel.isTerminated()); - sub2.shutdown(); + shutdownSafely(helper, sub2); assertTrue(channel.isTerminated()); verify(mockTransportFactory, never()) .newClientTransport( @@ -1498,7 +1508,7 @@ public void subchannelChannel_normalUsage() { CallOptions callOptions = CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS); // Subchannel must be READY when creating the RPC. - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); verify(mockTransportFactory) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); @@ -1523,7 +1533,7 @@ public void subchannelChannel_failWhenNotReady() { Channel sChannel = subchannel.asChannel(); Metadata headers = new Metadata(); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); verify(mockTransportFactory) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); @@ -1552,7 +1562,7 @@ public void subchannelChannel_failWaitForReady() { Metadata headers = new Metadata(); // Subchannel must be READY when creating the RPC. - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); verify(mockTransportFactory) .newClientTransport( any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); @@ -1635,7 +1645,7 @@ private void subtestNameResolutionRefreshWhenConnectionFailed( oobChannel.getSubchannel().requestConnection(); } else { Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); } MockClientTransportInfo transportInfo = transports.poll(); @@ -1722,7 +1732,7 @@ public Void answer(InvocationOnMock in) throws Throwable { // Simulate name resolution results EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress); Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); verify(mockTransportFactory) .newClientTransport( same(socketAddress), eq(clientTransportOptions), any(ChannelLogger.class)); @@ -1745,7 +1755,7 @@ public ClientStream answer(InvocationOnMock in) throws Throwable { transportInfo.listener.transportReady(); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); executor.runDueTasks(); ArgumentCaptor infoCaptor = ArgumentCaptor.forClass(null); ArgumentCaptor applierCaptor = ArgumentCaptor.forClass(null); @@ -1795,7 +1805,7 @@ public void pickerReturnsStreamTracer_noDelay() { ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class); createChannel(); Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); MockClientTransportInfo transportInfo = transports.poll(); transportInfo.listener.transportReady(); ClientTransport mockTransport = transportInfo.transport; @@ -1805,7 +1815,7 @@ public void pickerReturnsStreamTracer_noDelay() { when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( PickResult.withSubchannel(subchannel, factory2)); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1); ClientCall call = channel.newCall(method, callOptions); @@ -1833,7 +1843,7 @@ public void pickerReturnsStreamTracer_delayed() { call.start(mockCallListener, new Metadata()); Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); MockClientTransportInfo transportInfo = transports.poll(); transportInfo.listener.transportReady(); ClientTransport mockTransport = transportInfo.transport; @@ -1843,7 +1853,7 @@ public void pickerReturnsStreamTracer_delayed() { when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( PickResult.withSubchannel(subchannel, factory2)); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); assertEquals(1, executor.runDueTasks()); verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class)); @@ -1863,7 +1873,7 @@ public void getState_loadBalancerSupportsChannelState() { createChannel(); assertEquals(IDLE, channel.getState(false)); - helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker); + updateBalancingStateSafely(helper, TRANSIENT_FAILURE, mockPicker); assertEquals(TRANSIENT_FAILURE, channel.getState(false)); } @@ -1883,7 +1893,7 @@ public void getState_withRequestConnect() { verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); helper = helperCaptor.getValue(); - helper.updateBalancingState(CONNECTING, mockPicker); + updateBalancingStateSafely(helper, CONNECTING, mockPicker); assertEquals(CONNECTING, channel.getState(false)); assertEquals(CONNECTING, channel.getState(true)); verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); @@ -1896,7 +1906,7 @@ public void getState_withRequestConnect_IdleWithLbRunning() { createChannel(); verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); - helper.updateBalancingState(IDLE, mockPicker); + updateBalancingStateSafely(helper, IDLE, mockPicker); assertEquals(IDLE, channel.getState(true)); verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); @@ -1923,7 +1933,7 @@ public void run() { assertFalse(stateChanged.get()); // state change from IDLE to CONNECTING - helper.updateBalancingState(CONNECTING, mockPicker); + updateBalancingStateSafely(helper, CONNECTING, mockPicker); // onStateChanged callback should run executor.runDueTasks(); assertTrue(stateChanged.get()); @@ -1961,7 +1971,7 @@ public void run() { stateChanged.set(false); channel.notifyWhenStateChanged(SHUTDOWN, onStateChanged); - helper.updateBalancingState(CONNECTING, mockPicker); + updateBalancingStateSafely(helper, CONNECTING, mockPicker); assertEquals(SHUTDOWN, channel.getState(false)); executor.runDueTasks(); @@ -1975,7 +1985,7 @@ public void stateIsIdleOnIdleTimeout() { createChannel(); assertEquals(IDLE, channel.getState(false)); - helper.updateBalancingState(CONNECTING, mockPicker); + updateBalancingStateSafely(helper, CONNECTING, mockPicker); assertEquals(CONNECTING, channel.getState(false)); timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); @@ -2019,7 +2029,7 @@ private void subtestPanic(ConnectivityState initialState) { if (initialState == IDLE) { timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); } else { - helper.updateBalancingState(initialState, mockPicker); + updateBalancingStateSafely(helper, initialState, mockPicker); } assertEquals(initialState, channel.getState(false)); @@ -2062,7 +2072,7 @@ public void run() { // A misbehaving balancer that calls updateBalancingState() after it's shut down will not be // able to revive it. - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); verifyPanicMode(panicReason); // Cannot be revived by exitIdleMode() @@ -2085,7 +2095,7 @@ public void panic_bufferedCallsWillFail() { when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withNoResult()); - helper.updateBalancingState(CONNECTING, mockPicker); + updateBalancingStateSafely(helper, CONNECTING, mockPicker); // Start RPCs that will be buffered in delayedTransport ClientCall call = @@ -2159,10 +2169,10 @@ public void idleTimeoutAndReconnect() { // Updating on the old helper (whose balancer has been shutdown) does not change the channel // state. - helper.updateBalancingState(CONNECTING, mockPicker); + updateBalancingStateSafely(helper, CONNECTING, mockPicker); assertEquals(IDLE, channel.getState(false)); - helper2.updateBalancingState(CONNECTING, mockPicker); + updateBalancingStateSafely(helper2, CONNECTING, mockPicker); assertEquals(CONNECTING, channel.getState(false)); } @@ -2186,7 +2196,7 @@ public void idleMode_resetsDelayedTransportPicker() { // Move channel into TRANSIENT_FAILURE, which will fail the pending call when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withError(pickError)); - helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker); + updateBalancingStateSafely(helper, TRANSIENT_FAILURE, mockPicker); assertEquals(TRANSIENT_FAILURE, channel.getState(false)); executor.runDueTasks(); verify(mockCallListener).onClose(same(pickError), any(Metadata.class)); @@ -2208,7 +2218,7 @@ public void idleMode_resetsDelayedTransportPicker() { // Establish a connection Subchannel subchannel = createSubchannelSafely(helper2, addressGroup, Attributes.EMPTY); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); MockClientTransportInfo transportInfo = transports.poll(); ConnectionClientTransport mockTransport = transportInfo.transport; ManagedClientTransport.Listener transportListener = transportInfo.listener; @@ -2218,7 +2228,7 @@ public void idleMode_resetsDelayedTransportPicker() { when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); - helper2.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper2, READY, mockPicker); assertEquals(READY, channel.getState(false)); executor.runDueTasks(); @@ -2230,7 +2240,7 @@ public void idleMode_resetsDelayedTransportPicker() { @Test public void enterIdleEntersIdle() { createChannel(); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); assertEquals(READY, channel.getState(false)); channel.enterIdle(); @@ -2276,7 +2286,7 @@ public void enterIdle_exitsIdleIfDelayedStreamPending() { // Establish a connection Subchannel subchannel = createSubchannelSafely(helper2, addressGroup, Attributes.EMPTY); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); ClientStream mockStream = mock(ClientStream.class); MockClientTransportInfo transportInfo = transports.poll(); ConnectionClientTransport mockTransport = transportInfo.transport; @@ -2286,7 +2296,7 @@ public void enterIdle_exitsIdleIfDelayedStreamPending() { transportListener.transportReady(); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); - helper2.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper2, READY, mockPicker); assertEquals(READY, channel.getState(false)); // Verify the original call was drained @@ -2306,7 +2316,7 @@ public void updateBalancingStateDoesUpdatePicker() { // Make the transport available with subchannel2 Subchannel subchannel1 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); Subchannel subchannel2 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - subchannel2.requestConnection(); + requestConnectionSafely(helper, subchannel2); MockClientTransportInfo transportInfo = transports.poll(); ConnectionClientTransport mockTransport = transportInfo.transport; @@ -2317,7 +2327,7 @@ public void updateBalancingStateDoesUpdatePicker() { when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel1)); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); executor.runDueTasks(); verify(mockTransport, never()) @@ -2327,7 +2337,7 @@ public void updateBalancingStateDoesUpdatePicker() { when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel2)); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); executor.runDueTasks(); verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class)); @@ -2344,7 +2354,7 @@ public void updateBalancingStateWithShutdownShouldBeIgnored() { Runnable onStateChanged = mock(Runnable.class); channel.notifyWhenStateChanged(IDLE, onStateChanged); - helper.updateBalancingState(SHUTDOWN, mockPicker); + updateBalancingStateSafely(helper, SHUTDOWN, mockPicker); assertEquals(IDLE, channel.getState(false)); executor.runDueTasks(); @@ -2360,7 +2370,7 @@ public void balancerRefreshNameResolution() { FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); int initialRefreshCount = resolver.refreshCalled; - helper.refreshNameResolution(); + refreshNameResolutionSafely(helper); assertEquals(initialRefreshCount + 1, resolver.refreshCalled); } @@ -2627,7 +2637,7 @@ public void channelTracing_stateChangeEvent() throws Exception { channelBuilder.maxTraceEvents(10); createChannel(); timer.forwardNanos(1234); - helper.updateBalancingState(CONNECTING, mockPicker); + updateBalancingStateSafely(helper, CONNECTING, mockPicker); assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() .setDescription("Entering CONNECTING state") .setSeverity(ChannelTrace.Event.Severity.CT_INFO) @@ -2699,14 +2709,14 @@ public void channelsAndSubchannels_instrumented_state() throws Exception { helper = helperCaptor.getValue(); assertEquals(IDLE, getStats(channel).state); - helper.updateBalancingState(CONNECTING, mockPicker); + updateBalancingStateSafely(helper, CONNECTING, mockPicker); assertEquals(CONNECTING, getStats(channel).state); AbstractSubchannel subchannel = (AbstractSubchannel) createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); assertEquals(IDLE, getStats(subchannel).state); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); assertEquals(CONNECTING, getStats(subchannel).state); MockClientTransportInfo transportInfo = transports.poll(); @@ -2716,7 +2726,7 @@ public void channelsAndSubchannels_instrumented_state() throws Exception { assertEquals(READY, getStats(subchannel).state); assertEquals(CONNECTING, getStats(channel).state); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); assertEquals(READY, getStats(channel).state); channel.shutdownNow(); @@ -2758,7 +2768,7 @@ private void channelsAndSubchannels_instrumented0(boolean success) throws Except ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class); AbstractSubchannel subchannel = (AbstractSubchannel) createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); MockClientTransportInfo transportInfo = transports.poll(); transportInfo.listener.transportReady(); ClientTransport mockTransport = transportInfo.transport; @@ -2770,7 +2780,7 @@ private void channelsAndSubchannels_instrumented0(boolean success) throws Except // subchannel stat bumped when call gets assigned to it assertEquals(0, getStats(subchannel).callsStarted); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); assertEquals(1, executor.runDueTasks()); verify(mockStream).start(streamListenerCaptor.capture()); assertEquals(1, getStats(subchannel).callsStarted); @@ -2997,7 +3007,7 @@ public double nextDouble() { Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); MockClientTransportInfo transportInfo = transports.poll(); ConnectionClientTransport mockTransport = transportInfo.transport; ClientStream mockStream = mock(ClientStream.class); @@ -3005,7 +3015,7 @@ public double nextDouble() { when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) .thenReturn(mockStream).thenReturn(mockStream2); transportInfo.listener.transportReady(); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); ArgumentCaptor streamListenerCaptor = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -3046,7 +3056,7 @@ public double nextDouble() { streamListenerCaptor.getValue().closed(Status.INTERNAL, new Metadata()); verify(mockLoadBalancer).shutdown(); // simulating the shutdown of load balancer triggers the shutdown of subchannel - subchannel.shutdown(); + shutdownSafely(helper, subchannel); transportInfo.listener.transportTerminated(); // simulating transport terminated assertTrue( "channel.isTerminated() is expected to be true but was false", @@ -3093,10 +3103,10 @@ public void hedgingScheduledThenChannelShutdown_hedgeShouldStillHappen_newCallSh .build()); // simulating request connection and then transport ready after resolved address - Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); + Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); - subchannel.requestConnection(); + requestConnectionSafely(helper, subchannel); MockClientTransportInfo transportInfo = transports.poll(); ConnectionClientTransport mockTransport = transportInfo.transport; ClientStream mockStream = mock(ClientStream.class); @@ -3104,7 +3114,7 @@ public void hedgingScheduledThenChannelShutdown_hedgeShouldStillHappen_newCallSh when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) .thenReturn(mockStream).thenReturn(mockStream2); transportInfo.listener.transportReady(); - helper.updateBalancingState(READY, mockPicker); + updateBalancingStateSafely(helper, READY, mockPicker); ArgumentCaptor streamListenerCaptor = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -3144,7 +3154,7 @@ public void hedgingScheduledThenChannelShutdown_hedgeShouldStillHappen_newCallSh assertThat(timer.numPendingTasks()).isEqualTo(0); verify(mockLoadBalancer).shutdown(); // simulating the shutdown of load balancer triggers the shutdown of subchannel - subchannel.shutdown(); + shutdownSafely(helper, subchannel); transportInfo.listener.transportTerminated(); // simulating transport terminated assertTrue( "channel.isTerminated() is expected to be true but was false", @@ -3878,7 +3888,7 @@ private FakeClock.ScheduledTask getNameResolverRefresh() { return Iterables.getOnlyElement(timer.getPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER), null); } - // We need this because createSubchannel() should be called from the SynchronizationContext + // Helper methods to call methods from SynchronizationContext private static Subchannel createSubchannelSafely( final Helper helper, final EquivalentAddressGroup addressGroup, final Attributes attrs) { final AtomicReference resultCapture = new AtomicReference<>(); @@ -3892,6 +3902,48 @@ public void run() { return resultCapture.get(); } + private static void requestConnectionSafely(Helper helper, final Subchannel subchannel) { + helper.getSynchronizationContext().execute( + new Runnable() { + @Override + public void run() { + subchannel.requestConnection(); + } + }); + } + + private static void updateBalancingStateSafely( + final Helper helper, final ConnectivityState state, final SubchannelPicker picker) { + helper.getSynchronizationContext().execute( + new Runnable() { + @Override + public void run() { + helper.updateBalancingState(state, picker); + } + }); + } + + private static void refreshNameResolutionSafely(final Helper helper) { + helper.getSynchronizationContext().execute( + new Runnable() { + @Override + public void run() { + helper.refreshNameResolution(); + } + }); + } + + private static void shutdownSafely( + final Helper helper, final Subchannel subchannel) { + helper.getSynchronizationContext().execute( + new Runnable() { + @Override + public void run() { + subchannel.shutdown(); + } + }); + } + @SuppressWarnings("unchecked") private static Map parseConfig(String json) throws Exception { return (Map) JsonParser.parse(json);