Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,9 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
*
* @since 1.4.0
* @deprecated use {@link Subchannel#updateAddresses} instead
*/
@Deprecated
public final void updateSubchannelAddresses(
Subchannel subchannel, EquivalentAddressGroup addrs) {
checkNotNull(addrs, "addrs");
Expand All @@ -982,7 +984,9 @@ public final void updateSubchannelAddresses(
* @throws IllegalArgumentException if {@code subchannel} was not returned from {@link
* #createSubchannel} or {@code addrs} is empty
* @since 1.14.0
* @deprecated use {@link Subchannel#updateAddresses} instead
*/
@Deprecated
public void updateSubchannelAddresses(
Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -1301,6 +1305,19 @@ public ChannelLogger getChannelLogger() {
throw new UnsupportedOperationException();
}

/**
* Replaces the existing addresses used with this {@code Subchannel}. If the new and old
* addresses overlap, the Subchannel can continue using an existing connection.
*
* <p>It must be called from the Synchronization Context or will throw.
*
* @throws IllegalArgumentException if {@code addrs} is empty
* @since 1.22.0
*/
public void updateAddresses(List<EquivalentAddressGroup> addrs) {
throw new UnsupportedOperationException();
}

/**
* (Internal use only) returns an object that represents the underlying subchannel that is used
* by the Channel for sending RPCs when this {@link Subchannel} is picked. This is an opaque
Expand Down
2 changes: 2 additions & 0 deletions api/src/test/java/io/grpc/LoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public void helper_createSubchannelList_throws() {
}
}

@Deprecated
@Test
public void helper_updateSubchannelAddresses_delegates() {
class OverrideUpdateSubchannel extends NoopHelper {
Expand All @@ -190,6 +191,7 @@ public void updateSubchannelAddresses(
assertThat(helper.ran).isTrue();
}

@Deprecated
@Test(expected = UnsupportedOperationException.class)
public void helper_updateSubchannelAddressesList_throws() {
new NoopHelper().updateSubchannelAddresses(null, Arrays.asList(eag));
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,7 @@ public void run() {
syncContext.execute(new LoadBalancerRefreshNameResolution());
}

@Deprecated
@Override
public void updateSubchannelAddresses(
LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
Expand Down Expand Up @@ -1606,6 +1607,12 @@ public Object getInternalSubchannel() {
public ChannelLogger getChannelLogger() {
return subchannel.getChannelLogger();
}

@Override
public void updateAddresses(List<EquivalentAddressGroup> addrs) {
syncContext.throwIfNotInThisSynchronizationContext();
subchannel.updateAddresses(addrs);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
subchannel.requestConnection();
} else {
helper.updateSubchannelAddresses(subchannel, servers);
subchannel.updateAddresses(servers);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
return delegate().createSubchannel(args);
}

@Deprecated
@Override
public void updateSubchannelAddresses(
Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/io/grpc/util/ForwardingSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public Object getInternalSubchannel() {
return delegate().getInternalSubchannel();
}

@Override
public void updateAddresses(List<EquivalentAddressGroup> addrs) {
delegate().updateAddresses(addrs);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ private static void updateSubchannelAddressesSafely(
new Runnable() {
@Override
public void run() {
helper.updateSubchannelAddresses(subchannel, addrs);
subchannel.updateAddresses(Collections.singletonList(addrs));
}
});
}
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,16 @@ public String toString() {
return "test-addr";
}
};
private final SocketAddress socketAddress2 =
new SocketAddress() {
@Override
public String toString() {
return "test-addr";
}
};
private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
private final EquivalentAddressGroup addressGroup2 =
new EquivalentAddressGroup(Arrays.asList(socketAddress, socketAddress2));
private final FakeClock timer = new FakeClock();
private final FakeClock executor = new FakeClock();
private final FakeClock balancerRpcExecutor = new FakeClock();
Expand Down Expand Up @@ -1301,6 +1310,11 @@ public void run() {
.newClientTransport(
eq(socketAddress), eq(clientTransportOptions), isA(TransportLogger.class));

// updateAddresses()
updateAddressesSafely(helper, sub1, Collections.singletonList(addressGroup2));
assertThat(((InternalSubchannel) sub1.getInternalSubchannel()).getAddressGroups())
.isEqualTo(Collections.singletonList(addressGroup2));

// shutdown() has a delay
shutdownSafely(helper, sub1);
timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS);
Expand Down Expand Up @@ -4030,6 +4044,17 @@ public void run() {
});
}

private static void updateAddressesSafely(
Helper helper, final Subchannel subchannel, final List<EquivalentAddressGroup> addrs) {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
subchannel.updateAddresses(addrs);
}
});
}

private static void shutdownSafely(
final Helper helper, final Subchannel subchannel) {
helper.getSynchronizationContext().execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ public void pickAfterResolvedAndUnchanged() throws Exception {
verify(mockSubchannel).requestConnection();
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockSubchannel).updateAddresses(eq(servers));
verifyNoMoreInteractions(mockSubchannel);

verify(mockHelper).createSubchannel(createArgsCaptor.capture());
assertThat(createArgsCaptor.getValue()).isNotNull();
verify(mockHelper)
.updateBalancingState(isA(ConnectivityState.class), isA(SubchannelPicker.class));
// Updating the subchannel addresses is unnecessary, but doesn't hurt anything
verify(mockHelper).updateSubchannelAddresses(
eq(mockSubchannel), ArgumentMatchers.<EquivalentAddressGroup>anyList());
verify(mockSubchannel).updateAddresses(ArgumentMatchers.<EquivalentAddressGroup>anyList());

verifyNoMoreInteractions(mockHelper);
}
Expand All @@ -191,7 +191,7 @@ public void pickAfterResolvedAndChanged() throws Exception {
List<EquivalentAddressGroup> newServers =
Lists.newArrayList(new EquivalentAddressGroup(socketAddr));

InOrder inOrder = inOrder(mockHelper);
InOrder inOrder = inOrder(mockHelper, mockSubchannel);

loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
Expand All @@ -205,7 +205,7 @@ public void pickAfterResolvedAndChanged() throws Exception {

loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build());
inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eq(newServers));
inOrder.verify(mockSubchannel).updateAddresses(eq(newServers));

verifyNoMoreInteractions(mockSubchannel);
verifyNoMoreInteractions(mockHelper);
Expand Down
2 changes: 1 addition & 1 deletion grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ private void useRoundRobinLists(
} else {
checkState(subchannels.size() == 1, "Unexpected Subchannel count: %s", subchannels);
subchannel = subchannels.values().iterator().next();
helper.updateSubchannelAddresses(subchannel, eagList);
subchannel.updateAddresses(eagList);
}
subchannels = Collections.singletonMap(eagList, subchannel);
newBackendList.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1776,8 +1776,7 @@ public void grpclbWorking_pickFirstMode() throws Exception {
// createSubchannel() has ever been called only once
verify(helper, times(1)).createSubchannel(any(List.class), any(Attributes.class));
assertThat(mockSubchannels).isEmpty();
inOrder.verify(helper).updateSubchannelAddresses(
same(subchannel),
verify(subchannel).updateAddresses(
eq(Arrays.asList(
new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends2.get(2).addr,
Expand Down Expand Up @@ -1874,8 +1873,7 @@ public void pickFirstMode_fallback() throws Exception {
// createSubchannel() has ever been called only once
verify(helper, times(1)).createSubchannel(any(List.class), any(Attributes.class));
assertThat(mockSubchannels).isEmpty();
inOrder.verify(helper).updateSubchannelAddresses(
same(subchannel),
verify(subchannel).updateAddresses(
eq(Arrays.asList(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr,
Expand Down