diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 4e13d240996..1b390bf268d 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -963,7 +963,9 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) { * href="https://github.com/grpc/grpc-java/issues/5015">#5015 for the background. * * @since 1.4.0 + * @deprecated use {@link Subchannel#updateAddresses} instead */ + @Deprecated public final void updateSubchannelAddresses( Subchannel subchannel, EquivalentAddressGroup addrs) { checkNotNull(addrs, "addrs"); @@ -982,7 +984,9 @@ public final void updateSubchannelAddresses( * @throws IllegalArgumentException if {@code subchannel} was not returned from {@link * #createSubchannel} or {@code addrs} is empty * @since 1.14.0 + * @deprecated use {@link Subchannel#updateAddresses} instead */ + @Deprecated public void updateSubchannelAddresses( Subchannel subchannel, List addrs) { throw new UnsupportedOperationException(); @@ -1301,6 +1305,19 @@ public ChannelLogger getChannelLogger() { throw new UnsupportedOperationException(); } + /** + * Replaces the existing addresses used with this {@code Subchannel}. If the new and old + * addresses overlap, the Subchannel can continue using an existing connection. + * + *

It must be called from the Synchronization Context or will throw. + * + * @throws IllegalArgumentException if {@code addrs} is empty + * @since 1.22.0 + */ + public void updateAddresses(List addrs) { + throw new UnsupportedOperationException(); + } + /** * (Internal use only) returns an object that represents the underlying subchannel that is used * by the Channel for sending RPCs when this {@link Subchannel} is picked. This is an opaque diff --git a/api/src/test/java/io/grpc/LoadBalancerTest.java b/api/src/test/java/io/grpc/LoadBalancerTest.java index 9492d2dcf7d..0b90f467ec3 100644 --- a/api/src/test/java/io/grpc/LoadBalancerTest.java +++ b/api/src/test/java/io/grpc/LoadBalancerTest.java @@ -170,6 +170,7 @@ public void helper_createSubchannelList_throws() { } } + @Deprecated @Test public void helper_updateSubchannelAddresses_delegates() { class OverrideUpdateSubchannel extends NoopHelper { @@ -190,6 +191,7 @@ public void updateSubchannelAddresses( assertThat(helper.ran).isTrue(); } + @Deprecated @Test(expected = UnsupportedOperationException.class) public void helper_updateSubchannelAddressesList_throws() { new NoopHelper().updateSubchannelAddresses(null, Arrays.asList(eag)); diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index b75b9e305d8..4bf1106990f 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -1132,6 +1132,7 @@ public void run() { syncContext.execute(new LoadBalancerRefreshNameResolution()); } + @Deprecated @Override public void updateSubchannelAddresses( LoadBalancer.Subchannel subchannel, List addrs) { @@ -1606,6 +1607,12 @@ public Object getInternalSubchannel() { public ChannelLogger getChannelLogger() { return subchannel.getChannelLogger(); } + + @Override + public void updateAddresses(List addrs) { + syncContext.throwIfNotInThisSynchronizationContext(); + subchannel.updateAddresses(addrs); + } } @Override diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index 20565438246..acf9b88324a 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -64,7 +64,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel))); subchannel.requestConnection(); } else { - helper.updateSubchannelAddresses(subchannel, servers); + subchannel.updateAddresses(servers); } } diff --git a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java index c1c516d06c9..95bf66c0191 100644 --- a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java +++ b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java @@ -51,6 +51,7 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) { return delegate().createSubchannel(args); } + @Deprecated @Override public void updateSubchannelAddresses( Subchannel subchannel, List addrs) { diff --git a/core/src/main/java/io/grpc/util/ForwardingSubchannel.java b/core/src/main/java/io/grpc/util/ForwardingSubchannel.java index de9a5112327..51f2583186e 100644 --- a/core/src/main/java/io/grpc/util/ForwardingSubchannel.java +++ b/core/src/main/java/io/grpc/util/ForwardingSubchannel.java @@ -74,6 +74,11 @@ public Object getInternalSubchannel() { return delegate().getInternalSubchannel(); } + @Override + public void updateAddresses(List addrs) { + delegate().updateAddresses(addrs); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java index 8b21d25ccc8..409d03c5c03 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java @@ -549,7 +549,7 @@ private static void updateSubchannelAddressesSafely( new Runnable() { @Override public void run() { - helper.updateSubchannelAddresses(subchannel, addrs); + subchannel.updateAddresses(Collections.singletonList(addrs)); } }); } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 2af73972071..5bf5f1134a3 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -183,7 +183,16 @@ public String toString() { return "test-addr"; } }; + private final SocketAddress socketAddress2 = + new SocketAddress() { + @Override + public String toString() { + return "test-addr"; + } + }; private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress); + private final EquivalentAddressGroup addressGroup2 = + new EquivalentAddressGroup(Arrays.asList(socketAddress, socketAddress2)); private final FakeClock timer = new FakeClock(); private final FakeClock executor = new FakeClock(); private final FakeClock balancerRpcExecutor = new FakeClock(); @@ -1301,6 +1310,11 @@ public void run() { .newClientTransport( eq(socketAddress), eq(clientTransportOptions), isA(TransportLogger.class)); + // updateAddresses() + updateAddressesSafely(helper, sub1, Collections.singletonList(addressGroup2)); + assertThat(((InternalSubchannel) sub1.getInternalSubchannel()).getAddressGroups()) + .isEqualTo(Collections.singletonList(addressGroup2)); + // shutdown() has a delay shutdownSafely(helper, sub1); timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS); @@ -4030,6 +4044,17 @@ public void run() { }); } + private static void updateAddressesSafely( + Helper helper, final Subchannel subchannel, final List addrs) { + helper.getSynchronizationContext().execute( + new Runnable() { + @Override + public void run() { + subchannel.updateAddresses(addrs); + } + }); + } + private static void shutdownSafely( final Helper helper, final Subchannel subchannel) { helper.getSynchronizationContext().execute( diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 4b08b911695..5abe28cec22 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -172,6 +172,7 @@ public void pickAfterResolvedAndUnchanged() throws Exception { verify(mockSubchannel).requestConnection(); loadBalancer.handleResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + verify(mockSubchannel).updateAddresses(eq(servers)); verifyNoMoreInteractions(mockSubchannel); verify(mockHelper).createSubchannel(createArgsCaptor.capture()); @@ -179,8 +180,7 @@ public void pickAfterResolvedAndUnchanged() throws Exception { verify(mockHelper) .updateBalancingState(isA(ConnectivityState.class), isA(SubchannelPicker.class)); // Updating the subchannel addresses is unnecessary, but doesn't hurt anything - verify(mockHelper).updateSubchannelAddresses( - eq(mockSubchannel), ArgumentMatchers.anyList()); + verify(mockSubchannel).updateAddresses(ArgumentMatchers.anyList()); verifyNoMoreInteractions(mockHelper); } @@ -191,7 +191,7 @@ public void pickAfterResolvedAndChanged() throws Exception { List newServers = Lists.newArrayList(new EquivalentAddressGroup(socketAddr)); - InOrder inOrder = inOrder(mockHelper); + InOrder inOrder = inOrder(mockHelper, mockSubchannel); loadBalancer.handleResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); @@ -205,7 +205,7 @@ public void pickAfterResolvedAndChanged() throws Exception { loadBalancer.handleResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eq(newServers)); + inOrder.verify(mockSubchannel).updateAddresses(eq(newServers)); verifyNoMoreInteractions(mockSubchannel); verifyNoMoreInteractions(mockHelper); diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index a383d4643c6..260b90912f0 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -438,7 +438,7 @@ private void useRoundRobinLists( } else { checkState(subchannels.size() == 1, "Unexpected Subchannel count: %s", subchannels); subchannel = subchannels.values().iterator().next(); - helper.updateSubchannelAddresses(subchannel, eagList); + subchannel.updateAddresses(eagList); } subchannels = Collections.singletonMap(eagList, subchannel); newBackendList.add( diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 8d64e4df8ba..db0c37c8777 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1776,8 +1776,7 @@ public void grpclbWorking_pickFirstMode() throws Exception { // createSubchannel() has ever been called only once verify(helper, times(1)).createSubchannel(any(List.class), any(Attributes.class)); assertThat(mockSubchannels).isEmpty(); - inOrder.verify(helper).updateSubchannelAddresses( - same(subchannel), + verify(subchannel).updateAddresses( eq(Arrays.asList( new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), new EquivalentAddressGroup(backends2.get(2).addr, @@ -1874,8 +1873,7 @@ public void pickFirstMode_fallback() throws Exception { // createSubchannel() has ever been called only once verify(helper, times(1)).createSubchannel(any(List.class), any(Attributes.class)); assertThat(mockSubchannels).isEmpty(); - inOrder.verify(helper).updateSubchannelAddresses( - same(subchannel), + verify(subchannel).updateAddresses( eq(Arrays.asList( new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), new EquivalentAddressGroup(backends1.get(1).addr,