Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,10 @@ public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attribute
* Equivalent to {@link #updateSubchannelAddresses(io.grpc.LoadBalancer.Subchannel, List)} with
* the given single {@code EquivalentAddressGroup}.
*
* <p>It should be called from the Synchronization Context. Currently will log a warning if
* violated. It will become an exception eventually. See <a
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
*
* @since 1.4.0
*/
public final void updateSubchannelAddresses(
Expand All @@ -707,6 +711,10 @@ public final void updateSubchannelAddresses(
* {@link #createSubchannel} when the new and old addresses overlap, since the subchannel can
* continue using an existing connection.
*
* <p>It should be called from the Synchronization Context. Currently will log a warning if
* violated. It will become an exception eventually. See <a
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
*
* @throws IllegalArgumentException if {@code subchannel} was not returned from {@link
* #createSubchannel} or {@code addrs} is empty
* @since 1.14.0
Expand Down Expand Up @@ -776,6 +784,10 @@ public ManagedChannel createResolvingOobChannel(String target) {
* updateBalancingState()} has never been called, the channel will buffer all RPCs until a
* picker is provided.
*
* <p>It should be called from the Synchronization Context. Currently will log a warning if
* violated. It will become an exception eventually. See <a
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
*
* <p>The passed state will be the channel's new state. The SHUTDOWN state should not be passed
* and its behavior is undefined.
*
Expand All @@ -787,6 +799,10 @@ public abstract void updateBalancingState(
/**
* Call {@link NameResolver#refresh} on the channel's resolver.
*
* <p>It should be called from the Synchronization Context. Currently will log a warning if
* violated. It will become an exception eventually. See <a
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
*
* @since 1.18.0
*/
public void refreshNameResolution() {
Expand Down Expand Up @@ -884,13 +900,21 @@ public abstract static class Subchannel {
* Shuts down the Subchannel. After this method is called, this Subchannel should no longer
* be returned by the latest {@link SubchannelPicker picker}, and can be safely discarded.
*
* <p>It should be called from the Synchronization Context. Currently will log a warning if
* violated. It will become an exception eventually. See <a
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
*
* @since 1.2.0
*/
public abstract void shutdown();

/**
* Asks the Subchannel to create a connection (aka transport), if there isn't an active one.
*
* <p>It should be called from the Synchronization Context. Currently will log a warning if
* violated. It will become an exception eventually. See <a
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
*
* @since 1.2.0
*/
public abstract void requestConnection();
Expand All @@ -900,6 +924,10 @@ public abstract static class Subchannel {
* the Subchannel has only one {@link EquivalentAddressGroup}. Under the hood it calls
* {@link #getAllAddresses}.
*
* <p>It should be called from the Synchronization Context. Currently will log a warning if
* violated. It will become an exception eventually. See <a
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
*
* @throws IllegalStateException if this subchannel has more than one EquivalentAddressGroup.
* Use {@link #getAllAddresses} instead
* @since 1.2.0
Expand All @@ -913,6 +941,10 @@ public final EquivalentAddressGroup getAddresses() {
/**
* Returns the addresses that this Subchannel is bound to. The returned list will not be empty.
*
* <p>It should be called from the Synchronization Context. Currently will log a warning if
* violated. It will become an exception eventually. See <a
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
*
* @since 1.14.0
*/
public List<EquivalentAddressGroup> getAllAddresses() {
Expand Down
26 changes: 18 additions & 8 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1045,14 +1045,7 @@ private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
@Override
public AbstractSubchannel createSubchannel(
List<EquivalentAddressGroup> addressGroups, Attributes attrs) {
try {
syncContext.throwIfNotInThisSynchronizationContext();
} catch (IllegalStateException e) {
logger.log(Level.WARNING,
"We sugguest you call createSubchannel() from SynchronizationContext."
+ " Otherwise, it may race with handleSubchannelState()."
+ " See https://github.com/grpc/grpc-java/issues/5015", e);
}
logWarningIfNotInSyncContext("createSubchannel()");
checkNotNull(addressGroups, "addressGroups");
checkNotNull(attrs, "attrs");
// TODO(ejona): can we be even stricter? Like loadBalancer == null?
Expand Down Expand Up @@ -1146,6 +1139,7 @@ public void updateBalancingState(
final ConnectivityState newState, final SubchannelPicker newPicker) {
checkNotNull(newState, "newState");
checkNotNull(newPicker, "newPicker");
logWarningIfNotInSyncContext("updateBalancingState()");
final class UpdateBalancingState implements Runnable {
@Override
public void run() {
Expand All @@ -1167,6 +1161,7 @@ public void run() {

@Override
public void refreshNameResolution() {
logWarningIfNotInSyncContext("refreshNameResolution()");
final class LoadBalancerRefreshNameResolution implements Runnable {
@Override
public void run() {
Expand All @@ -1182,6 +1177,7 @@ public void updateSubchannelAddresses(
LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
checkArgument(subchannel instanceof SubchannelImpl,
"subchannel must have been returned from createSubchannel");
logWarningIfNotInSyncContext("updateSubchannelAddresses()");
((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs);
}

Expand Down Expand Up @@ -1465,6 +1461,7 @@ InternalInstrumented<ChannelStats> getInternalSubchannel() {

@Override
public void shutdown() {
logWarningIfNotInSyncContext("Subchannel.shutdown()");
synchronized (shutdownLock) {
if (shutdownRequested) {
if (terminating && delayedShutdownTask != null) {
Expand Down Expand Up @@ -1508,11 +1505,13 @@ public void run() {

@Override
public void requestConnection() {
logWarningIfNotInSyncContext("Subchannel.requestConnection()");
subchannel.obtainActiveTransport();
}

@Override
public List<EquivalentAddressGroup> getAllAddresses() {
logWarningIfNotInSyncContext("Subchannel.getAllAddresses()");
return subchannel.getAddressGroups();
}

Expand Down Expand Up @@ -1771,4 +1770,15 @@ public ConfigOrError parseServiceConfig(Map<String, ?> rawServiceConfig) {
}
}
}

private void logWarningIfNotInSyncContext(String method) {
try {
syncContext.throwIfNotInThisSynchronizationContext();
} catch (IllegalStateException e) {
logger.log(Level.WARNING,
method + " should be called from SynchronizationContext. "
+ "This warning will become an exception in a future release. "
+ "See https://github.com/grpc/grpc-java/issues/5015 for more details", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.grpc.ChannelLogger;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.IntegerMarshaller;
import io.grpc.LoadBalancer;
Expand Down Expand Up @@ -310,14 +311,14 @@ public void realTransportsHoldsOffIdleness() throws Exception {

// Assume LoadBalancer has received an address, then create a subchannel.
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
subchannel.requestConnection();
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();

SubchannelPicker mockPicker = mock(SubchannelPicker.class);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
helper.updateBalancingState(READY, mockPicker);
updateBalancingStateSafely(helper, READY, mockPicker);
// Delayed transport creates real streams in the app executor
executor.runDueTasks();

Expand Down Expand Up @@ -350,13 +351,13 @@ public void updateSubchannelAddresses_newAddressConnects() {
Helper helper = helperCaptor.getValue();
Subchannel subchannel = createSubchannelSafely(helper, servers.get(0), Attributes.EMPTY);

subchannel.requestConnection();
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();

helper.updateSubchannelAddresses(subchannel, servers.get(1));
updateSubchannelAddressesSafely(helper, subchannel, servers.get(1));

subchannel.requestConnection();
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo t1 = newTransports.poll();
t1.listener.transportReady();
}
Expand All @@ -370,15 +371,15 @@ public void updateSubchannelAddresses_existingAddressDoesNotConnect() {
Helper helper = helperCaptor.getValue();
Subchannel subchannel = createSubchannelSafely(helper, servers.get(0), Attributes.EMPTY);

subchannel.requestConnection();
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();

List<SocketAddress> changedList = new ArrayList<>(servers.get(0).getAddresses());
changedList.add(new FakeSocketAddress("aDifferentServer"));
helper.updateSubchannelAddresses(subchannel, new EquivalentAddressGroup(changedList));
updateSubchannelAddressesSafely(helper, subchannel, new EquivalentAddressGroup(changedList));

subchannel.requestConnection();
requestConnectionSafely(helper, subchannel);
assertNull(newTransports.poll());
}

Expand All @@ -397,7 +398,7 @@ public void oobTransportDoesNotAffectIdleness() {
SubchannelPicker failingPicker = mock(SubchannelPicker.class);
when(failingPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withError(Status.UNAVAILABLE));
helper.updateBalancingState(TRANSIENT_FAILURE, failingPicker);
updateBalancingStateSafely(helper, TRANSIENT_FAILURE, failingPicker);
executor.runDueTasks();
verify(mockCallListener).onClose(same(Status.UNAVAILABLE), any(Metadata.class));

Expand Down Expand Up @@ -499,7 +500,7 @@ public String toString() {
}
}

// We need this because createSubchannel() should be called from the SynchronizationContext
// Helper methods to call methods from SynchronizationContext
private static Subchannel createSubchannelSafely(
final Helper helper, final EquivalentAddressGroup addressGroup, final Attributes attrs) {
final AtomicReference<Subchannel> resultCapture = new AtomicReference<>();
Expand All @@ -512,4 +513,36 @@ public void run() {
});
return resultCapture.get();
}

private static void requestConnectionSafely(Helper helper, final Subchannel subchannel) {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
subchannel.requestConnection();
}
});
}

private static void updateBalancingStateSafely(
final Helper helper, final ConnectivityState state, final SubchannelPicker picker) {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
helper.updateBalancingState(state, picker);
}
});
}

private static void updateSubchannelAddressesSafely(
final Helper helper, final Subchannel subchannel, final EquivalentAddressGroup addrs) {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
helper.updateSubchannelAddresses(subchannel, addrs);
}
});
}
}
Loading