Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,6 @@ public NameResolverRegistry getNameResolverRegistry() {
*
* @since 1.2.0
*/
@ThreadSafe
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
public abstract static class Subchannel {
/**
Expand All @@ -930,9 +929,9 @@ public abstract static class Subchannel {
/**
* Asks the Subchannel to create a connection (aka transport), if there isn't an active one.
*
* <p>It should be called from the Synchronization Context. Currently will log a warning if
* violated. It will become an exception eventually. See <a
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
* <p>This method is safe to be called concurrently and can be called any time. If the
* Subchannel is unable to fulfill the request (e.g., because it's already shut down), this
* method has no effect.
*
* @since 1.2.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,6 @@ public void run() {

@Override
public void requestConnection() {
logWarningIfNotInSyncContext("Subchannel.requestConnection()");
subchannel.obtainActiveTransport();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public void realTransportsHoldsOffIdleness() throws Exception {

// Assume LoadBalancer has received an address, then create a subchannel.
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();

Expand Down Expand Up @@ -351,13 +351,13 @@ public void updateSubchannelAddresses_newAddressConnects() {
Helper helper = helperCaptor.getValue();
Subchannel subchannel = createSubchannelSafely(helper, servers.get(0), Attributes.EMPTY);

requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();

updateSubchannelAddressesSafely(helper, subchannel, servers.get(1));

requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
MockClientTransportInfo t1 = newTransports.poll();
t1.listener.transportReady();
}
Expand All @@ -371,15 +371,15 @@ public void updateSubchannelAddresses_existingAddressDoesNotConnect() {
Helper helper = helperCaptor.getValue();
Subchannel subchannel = createSubchannelSafely(helper, servers.get(0), Attributes.EMPTY);

requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();

List<SocketAddress> changedList = new ArrayList<>(servers.get(0).getAddresses());
changedList.add(new FakeSocketAddress("aDifferentServer"));
updateSubchannelAddressesSafely(helper, subchannel, new EquivalentAddressGroup(changedList));

requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
assertNull(newTransports.poll());
}

Expand Down Expand Up @@ -514,16 +514,6 @@ public void run() {
return resultCapture.get();
}

private static void requestConnectionSafely(Helper helper, final Subchannel subchannel) {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
subchannel.requestConnection();
}
});
}

private static void updateBalancingStateSafely(
final Helper helper, final ConnectivityState state, final SubchannelPicker picker) {
helper.getSynchronizationContext().execute(
Expand Down
64 changes: 27 additions & 37 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public void channelzMembership_subchannel() throws Exception {
assertThat(getStats(channel).subchannels)
.containsExactly(subchannel.getInternalSubchannel());

requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
Expand Down Expand Up @@ -520,7 +520,7 @@ private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAft
// Configure the picker so that first RPC goes to delayed transport, and second RPC goes to
// real transport.
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
Expand Down Expand Up @@ -660,8 +660,8 @@ public void noMoreCallbackAfterLoadBalancerShutdown() {

Subchannel subchannel1 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
Subchannel subchannel2 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
requestConnectionSafely(helper, subchannel1);
requestConnectionSafely(helper, subchannel2);
subchannel1.requestConnection();
subchannel2.requestConnection();
verify(mockTransportFactory, times(2))
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
Expand Down Expand Up @@ -729,7 +729,7 @@ public void callOptionsExecutor() {
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
Expand Down Expand Up @@ -987,7 +987,7 @@ public void firstResolvedServerFailedToConnect() throws Exception {
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
inOrder.verify(mockLoadBalancer).handleSubchannelState(
same(subchannel), stateInfoCaptor.capture());
assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
Expand Down Expand Up @@ -1137,7 +1137,7 @@ public void allServersFailedToConnect() throws Exception {
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();

inOrder.verify(mockLoadBalancer).handleSubchannelState(
same(subchannel), stateInfoCaptor.capture());
Expand Down Expand Up @@ -1218,7 +1218,7 @@ public void run() {
any(SocketAddress.class),
any(ClientTransportOptions.class),
any(TransportLogger.class));
requestConnectionSafely(helper, sub1);
sub1.requestConnection();
verify(mockTransportFactory)
.newClientTransport(
eq(socketAddress),
Expand All @@ -1227,7 +1227,7 @@ public void run() {
MockClientTransportInfo transportInfo1 = transports.poll();
assertNotNull(transportInfo1);

requestConnectionSafely(helper, sub2);
sub2.requestConnection();
verify(mockTransportFactory, times(2))
.newClientTransport(
eq(socketAddress),
Expand All @@ -1236,8 +1236,8 @@ public void run() {
MockClientTransportInfo transportInfo2 = transports.poll();
assertNotNull(transportInfo2);

requestConnectionSafely(helper, sub1);
requestConnectionSafely(helper, sub2);
sub1.requestConnection();
sub2.requestConnection();
// The subchannel doesn't matter since this isn't called
verify(mockTransportFactory, times(2))
.newClientTransport(
Expand Down Expand Up @@ -1273,8 +1273,8 @@ public void subchannelsWhenChannelShutdownNow() {
createChannel();
Subchannel sub1 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
Subchannel sub2 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
requestConnectionSafely(helper, sub1);
requestConnectionSafely(helper, sub2);
sub1.requestConnection();
sub2.requestConnection();

assertThat(transports).hasSize(2);
MockClientTransportInfo ti1 = transports.poll();
Expand Down Expand Up @@ -1509,7 +1509,7 @@ public void subchannelChannel_normalUsage() {
CallOptions callOptions = CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS);

// Subchannel must be READY when creating the RPC.
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
Expand All @@ -1534,7 +1534,7 @@ public void subchannelChannel_failWhenNotReady() {
Channel sChannel = subchannel.asChannel();
Metadata headers = new Metadata();

requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
Expand Down Expand Up @@ -1563,7 +1563,7 @@ public void subchannelChannel_failWaitForReady() {
Metadata headers = new Metadata();

// Subchannel must be READY when creating the RPC.
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
Expand Down Expand Up @@ -1666,7 +1666,7 @@ private void subtestNameResolutionRefreshWhenConnectionFailed(
oobChannel.getSubchannel().requestConnection();
} else {
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
}

MockClientTransportInfo transportInfo = transports.poll();
Expand Down Expand Up @@ -1753,7 +1753,7 @@ public Void answer(InvocationOnMock in) throws Throwable {
// Simulate name resolution results
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
verify(mockTransportFactory)
.newClientTransport(
same(socketAddress), eq(clientTransportOptions), any(ChannelLogger.class));
Expand Down Expand Up @@ -1826,7 +1826,7 @@ public void pickerReturnsStreamTracer_noDelay() {
ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
createChannel();
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
Expand Down Expand Up @@ -1864,7 +1864,7 @@ public void pickerReturnsStreamTracer_delayed() {
call.start(mockCallListener, new Metadata());

Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
Expand Down Expand Up @@ -2239,7 +2239,7 @@ public void idleMode_resetsDelayedTransportPicker() {

// Establish a connection
Subchannel subchannel = createSubchannelSafely(helper2, addressGroup, Attributes.EMPTY);
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
Expand Down Expand Up @@ -2307,7 +2307,7 @@ public void enterIdle_exitsIdleIfDelayedStreamPending() {

// Establish a connection
Subchannel subchannel = createSubchannelSafely(helper2, addressGroup, Attributes.EMPTY);
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
ClientStream mockStream = mock(ClientStream.class);
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
Expand Down Expand Up @@ -2337,7 +2337,7 @@ public void updateBalancingStateDoesUpdatePicker() {
// Make the transport available with subchannel2
Subchannel subchannel1 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
Subchannel subchannel2 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
requestConnectionSafely(helper, subchannel2);
subchannel2.requestConnection();

MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
Expand Down Expand Up @@ -2737,7 +2737,7 @@ public void channelsAndSubchannels_instrumented_state() throws Exception {
(AbstractSubchannel) createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);

assertEquals(IDLE, getStats(subchannel).state);
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
assertEquals(CONNECTING, getStats(subchannel).state);

MockClientTransportInfo transportInfo = transports.poll();
Expand Down Expand Up @@ -2789,7 +2789,7 @@ private void channelsAndSubchannels_instrumented0(boolean success) throws Except
ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class);
AbstractSubchannel subchannel =
(AbstractSubchannel) createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
Expand Down Expand Up @@ -3028,7 +3028,7 @@ public double nextDouble() {
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ClientStream mockStream = mock(ClientStream.class);
Expand Down Expand Up @@ -3127,7 +3127,7 @@ public void hedgingScheduledThenChannelShutdown_hedgeShouldStillHappen_newCallSh
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
requestConnectionSafely(helper, subchannel);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ClientStream mockStream = mock(ClientStream.class);
Expand Down Expand Up @@ -3923,16 +3923,6 @@ public void run() {
return resultCapture.get();
}

private static void requestConnectionSafely(Helper helper, final Subchannel subchannel) {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
subchannel.requestConnection();
}
});
}

private static void updateBalancingStateSafely(
final Helper helper, final ConnectivityState state, final SubchannelPicker picker) {
helper.getSynchronizationContext().execute(
Expand Down