Skip to content

Commit 228dcf7

Browse files
authored
core: Alternate ipV4 and ipV6 addresses for Happy Eyeballs in PickFirstLeafLoadBalancer (#11624)
* Interweave ipV4 and ipV6 addresses as per gRFC.
1 parent 7162d2d commit 228dcf7

File tree

2 files changed

+199
-47
lines changed

2 files changed

+199
-47
lines changed

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

Lines changed: 116 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import io.grpc.LoadBalancer;
3535
import io.grpc.Status;
3636
import io.grpc.SynchronizationContext.ScheduledHandle;
37+
import java.net.Inet4Address;
38+
import java.net.InetSocketAddress;
3739
import java.net.SocketAddress;
3840
import java.util.ArrayList;
3941
import java.util.Collections;
@@ -58,17 +60,17 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
5860
private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName());
5961
@VisibleForTesting
6062
static final int CONNECTION_DELAY_INTERVAL_MS = 250;
63+
private final boolean enableHappyEyeballs = !isSerializingRetries()
64+
&& PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
6165
private final Helper helper;
6266
private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>();
63-
private final Index addressIndex = new Index(ImmutableList.of());
67+
private final Index addressIndex = new Index(ImmutableList.of(), this.enableHappyEyeballs);
6468
private int numTf = 0;
6569
private boolean firstPass = true;
6670
@Nullable
6771
private ScheduledHandle scheduleConnectionTask = null;
6872
private ConnectivityState rawConnectivityState = IDLE;
6973
private ConnectivityState concludedState = IDLE;
70-
private final boolean enableHappyEyeballs = !isSerializingRetries()
71-
&& PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
7274
private boolean notAPetiolePolicy = true; // means not under a petiole policy
7375
private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
7476
private BackoffPolicy reconnectPolicy;
@@ -610,27 +612,26 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
610612
}
611613

612614
/**
613-
* Index as in 'i', the pointer to an entry. Not a "search index."
615+
* This contains both an ordered list of addresses and a pointer(i.e. index) to the current entry.
614616
* All updates should be done in a synchronization context.
615617
*/
616618
@VisibleForTesting
617619
static final class Index {
618-
private List<EquivalentAddressGroup> addressGroups;
619-
private int size;
620-
private int groupIndex;
621-
private int addressIndex;
620+
private List<UnwrappedEag> orderedAddresses;
621+
private int activeElement = 0;
622+
private boolean enableHappyEyeballs;
622623

623-
public Index(List<EquivalentAddressGroup> groups) {
624+
Index(List<EquivalentAddressGroup> groups, boolean enableHappyEyeballs) {
625+
this.enableHappyEyeballs = enableHappyEyeballs;
624626
updateGroups(groups);
625627
}
626628

627629
public boolean isValid() {
628-
// Is invalid if empty or has incremented off the end
629-
return groupIndex < addressGroups.size();
630+
return activeElement < orderedAddresses.size();
630631
}
631632

632633
public boolean isAtBeginning() {
633-
return groupIndex == 0 && addressIndex == 0;
634+
return activeElement == 0;
634635
}
635636

636637
/**
@@ -642,79 +643,150 @@ public boolean increment() {
642643
return false;
643644
}
644645

645-
EquivalentAddressGroup group = addressGroups.get(groupIndex);
646-
addressIndex++;
647-
if (addressIndex >= group.getAddresses().size()) {
648-
groupIndex++;
649-
addressIndex = 0;
650-
return groupIndex < addressGroups.size();
651-
}
646+
activeElement++;
652647

653-
return true;
648+
return isValid();
654649
}
655650

656651
public void reset() {
657-
groupIndex = 0;
658-
addressIndex = 0;
652+
activeElement = 0;
659653
}
660654

661655
public SocketAddress getCurrentAddress() {
662656
if (!isValid()) {
663657
throw new IllegalStateException("Index is past the end of the address group list");
664658
}
665-
return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
659+
return orderedAddresses.get(activeElement).address;
666660
}
667661

668662
public Attributes getCurrentEagAttributes() {
669663
if (!isValid()) {
670664
throw new IllegalStateException("Index is off the end of the address group list");
671665
}
672-
return addressGroups.get(groupIndex).getAttributes();
666+
return orderedAddresses.get(activeElement).attributes;
673667
}
674668

675669
public List<EquivalentAddressGroup> getCurrentEagAsList() {
676-
return Collections.singletonList(
677-
new EquivalentAddressGroup(getCurrentAddress(), getCurrentEagAttributes()));
670+
return Collections.singletonList(getCurrentEag());
671+
}
672+
673+
private EquivalentAddressGroup getCurrentEag() {
674+
if (!isValid()) {
675+
throw new IllegalStateException("Index is past the end of the address group list");
676+
}
677+
return orderedAddresses.get(activeElement).asEag();
678678
}
679679

680680
/**
681681
* Update to new groups, resetting the current index.
682682
*/
683683
public void updateGroups(List<EquivalentAddressGroup> newGroups) {
684-
addressGroups = checkNotNull(newGroups, "newGroups");
684+
checkNotNull(newGroups, "newGroups");
685+
orderedAddresses = enableHappyEyeballs
686+
? updateGroupsHE(newGroups)
687+
: updateGroupsNonHE(newGroups);
685688
reset();
686-
int size = 0;
687-
for (EquivalentAddressGroup eag : newGroups) {
688-
size += eag.getAddresses().size();
689-
}
690-
this.size = size;
691689
}
692690

693691
/**
694692
* Returns false if the needle was not found and the current index was left unchanged.
695693
*/
696694
public boolean seekTo(SocketAddress needle) {
697-
for (int i = 0; i < addressGroups.size(); i++) {
698-
EquivalentAddressGroup group = addressGroups.get(i);
699-
int j = group.getAddresses().indexOf(needle);
700-
if (j == -1) {
701-
continue;
695+
checkNotNull(needle, "needle");
696+
for (int i = 0; i < orderedAddresses.size(); i++) {
697+
if (orderedAddresses.get(i).address.equals(needle)) {
698+
this.activeElement = i;
699+
return true;
702700
}
703-
this.groupIndex = i;
704-
this.addressIndex = j;
705-
return true;
706701
}
707702
return false;
708703
}
709704

710705
public int size() {
711-
return size;
706+
return orderedAddresses.size();
707+
}
708+
709+
private List<UnwrappedEag> updateGroupsNonHE(List<EquivalentAddressGroup> newGroups) {
710+
List<UnwrappedEag> entries = new ArrayList<>();
711+
for (int g = 0; g < newGroups.size(); g++) {
712+
EquivalentAddressGroup eag = newGroups.get(g);
713+
for (int a = 0; a < eag.getAddresses().size(); a++) {
714+
SocketAddress addr = eag.getAddresses().get(a);
715+
entries.add(new UnwrappedEag(eag.getAttributes(), addr));
716+
}
717+
}
718+
719+
return entries;
720+
}
721+
722+
private List<UnwrappedEag> updateGroupsHE(List<EquivalentAddressGroup> newGroups) {
723+
Boolean firstIsV6 = null;
724+
List<UnwrappedEag> v4Entries = new ArrayList<>();
725+
List<UnwrappedEag> v6Entries = new ArrayList<>();
726+
for (int g = 0; g < newGroups.size(); g++) {
727+
EquivalentAddressGroup eag = newGroups.get(g);
728+
for (int a = 0; a < eag.getAddresses().size(); a++) {
729+
SocketAddress addr = eag.getAddresses().get(a);
730+
boolean isIpV4 = addr instanceof InetSocketAddress
731+
&& ((InetSocketAddress) addr).getAddress() instanceof Inet4Address;
732+
if (isIpV4) {
733+
if (firstIsV6 == null) {
734+
firstIsV6 = false;
735+
}
736+
v4Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
737+
} else {
738+
if (firstIsV6 == null) {
739+
firstIsV6 = true;
740+
}
741+
v6Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
742+
}
743+
}
744+
}
745+
746+
return firstIsV6 != null && firstIsV6
747+
? interleave(v6Entries, v4Entries)
748+
: interleave(v4Entries, v6Entries);
749+
}
750+
751+
private List<UnwrappedEag> interleave(List<UnwrappedEag> firstFamily,
752+
List<UnwrappedEag> secondFamily) {
753+
if (firstFamily.isEmpty()) {
754+
return secondFamily;
755+
}
756+
if (secondFamily.isEmpty()) {
757+
return firstFamily;
758+
}
759+
760+
List<UnwrappedEag> result = new ArrayList<>(firstFamily.size() + secondFamily.size());
761+
for (int i = 0; i < Math.max(firstFamily.size(), secondFamily.size()); i++) {
762+
if (i < firstFamily.size()) {
763+
result.add(firstFamily.get(i));
764+
}
765+
if (i < secondFamily.size()) {
766+
result.add(secondFamily.get(i));
767+
}
768+
}
769+
return result;
770+
}
771+
772+
private static final class UnwrappedEag {
773+
private final Attributes attributes;
774+
private final SocketAddress address;
775+
776+
public UnwrappedEag(Attributes attributes, SocketAddress address) {
777+
this.attributes = attributes;
778+
this.address = address;
779+
}
780+
781+
private EquivalentAddressGroup asEag() {
782+
return new EquivalentAddressGroup(address, attributes);
783+
}
712784
}
713785
}
714786

715787
@VisibleForTesting
716-
int getGroupIndex() {
717-
return addressIndex.groupIndex;
788+
int getIndexLocation() {
789+
return addressIndex.activeElement;
718790
}
719791

720792
@VisibleForTesting
@@ -778,4 +850,5 @@ public PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList) {
778850
this.randomSeed = randomSeed;
779851
}
780852
}
853+
781854
}

core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static org.junit.Assert.assertFalse;
3333
import static org.junit.Assert.assertNotNull;
3434
import static org.junit.Assert.assertNull;
35+
import static org.junit.Assert.assertThrows;
3536
import static org.junit.Assume.assumeTrue;
3637
import static org.mockito.AdditionalAnswers.delegatesTo;
3738
import static org.mockito.ArgumentMatchers.any;
@@ -67,6 +68,7 @@
6768
import io.grpc.Status.Code;
6869
import io.grpc.SynchronizationContext;
6970
import io.grpc.internal.PickFirstLeafLoadBalancer.PickFirstLeafLoadBalancerConfig;
71+
import java.net.InetSocketAddress;
7072
import java.net.SocketAddress;
7173
import java.util.ArrayList;
7274
import java.util.Arrays;
@@ -2618,7 +2620,7 @@ public void serialized_retries_two_passes() {
26182620
forwardTimeByBackoffDelay(); // should trigger retry again
26192621
for (int i = 0; i < subchannels.length; i++) {
26202622
inOrder.verify(subchannels[i]).requestConnection();
2621-
assertEquals(i, loadBalancer.getGroupIndex());
2623+
assertEquals(i, loadBalancer.getIndexLocation());
26222624
listeners[i].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); // cascade
26232625
}
26242626
}
@@ -2637,7 +2639,7 @@ public void index_looping() {
26372639
PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList(
26382640
new EquivalentAddressGroup(Arrays.asList(addr1, addr2), attr1),
26392641
new EquivalentAddressGroup(Arrays.asList(addr3), attr2),
2640-
new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3)));
2642+
new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3)), enableHappyEyeballs);
26412643
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1);
26422644
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1);
26432645
assertThat(index.isAtBeginning()).isTrue();
@@ -2696,7 +2698,7 @@ public void index_updateGroups_resets() {
26962698
SocketAddress addr3 = new FakeSocketAddress("addr3");
26972699
PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList(
26982700
new EquivalentAddressGroup(Arrays.asList(addr1)),
2699-
new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
2701+
new EquivalentAddressGroup(Arrays.asList(addr2, addr3))), enableHappyEyeballs);
27002702
index.increment();
27012703
index.increment();
27022704
// We want to make sure both groupIndex and addressIndex are reset
@@ -2713,7 +2715,7 @@ public void index_seekTo() {
27132715
SocketAddress addr3 = new FakeSocketAddress("addr3");
27142716
PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList(
27152717
new EquivalentAddressGroup(Arrays.asList(addr1, addr2)),
2716-
new EquivalentAddressGroup(Arrays.asList(addr3))));
2718+
new EquivalentAddressGroup(Arrays.asList(addr3))), enableHappyEyeballs);
27172719
assertThat(index.seekTo(addr3)).isTrue();
27182720
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3);
27192721
assertThat(index.seekTo(addr1)).isTrue();
@@ -2725,6 +2727,83 @@ public void index_seekTo() {
27252727
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2);
27262728
}
27272729

2730+
@Test
2731+
public void index_interleaving() {
2732+
InetSocketAddress addr1_6 = new InetSocketAddress("f38:1:1", 1234);
2733+
InetSocketAddress addr1_4 = new InetSocketAddress("10.1.1.1", 1234);
2734+
InetSocketAddress addr2_4 = new InetSocketAddress("10.1.1.2", 1234);
2735+
InetSocketAddress addr3_4 = new InetSocketAddress("10.1.1.3", 1234);
2736+
InetSocketAddress addr4_4 = new InetSocketAddress("10.1.1.4", 1234);
2737+
InetSocketAddress addr4_6 = new InetSocketAddress("f38:1:4", 1234);
2738+
2739+
Attributes attrs1 = Attributes.newBuilder().build();
2740+
Attributes attrs2 = Attributes.newBuilder().build();
2741+
Attributes attrs3 = Attributes.newBuilder().build();
2742+
Attributes attrs4 = Attributes.newBuilder().build();
2743+
2744+
PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList(
2745+
new EquivalentAddressGroup(Arrays.asList(addr1_4, addr1_6), attrs1),
2746+
new EquivalentAddressGroup(Arrays.asList(addr2_4), attrs2),
2747+
new EquivalentAddressGroup(Arrays.asList(addr3_4), attrs3),
2748+
new EquivalentAddressGroup(Arrays.asList(addr4_4, addr4_6), attrs4)), enableHappyEyeballs);
2749+
2750+
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1_4);
2751+
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs1);
2752+
assertThat(index.isAtBeginning()).isTrue();
2753+
2754+
index.increment();
2755+
assertThat(index.isValid()).isTrue();
2756+
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1_6);
2757+
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs1);
2758+
assertThat(index.isAtBeginning()).isFalse();
2759+
2760+
index.increment();
2761+
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2_4);
2762+
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs2);
2763+
2764+
index.increment();
2765+
if (enableHappyEyeballs) {
2766+
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_6);
2767+
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs4);
2768+
} else {
2769+
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3_4);
2770+
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs3);
2771+
}
2772+
2773+
index.increment();
2774+
if (enableHappyEyeballs) {
2775+
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3_4);
2776+
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs3);
2777+
} else {
2778+
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_4);
2779+
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs4);
2780+
}
2781+
2782+
// Move to last entry
2783+
assertThat(index.increment()).isTrue();
2784+
assertThat(index.isValid()).isTrue();
2785+
if (enableHappyEyeballs) {
2786+
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_4);
2787+
} else {
2788+
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_6);
2789+
}
2790+
2791+
// Move off of the end
2792+
assertThat(index.increment()).isFalse();
2793+
assertThat(index.isValid()).isFalse();
2794+
assertThrows(IllegalStateException.class, index::getCurrentAddress);
2795+
2796+
// Reset
2797+
index.reset();
2798+
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1_4);
2799+
assertThat(index.isAtBeginning()).isTrue();
2800+
assertThat(index.isValid()).isTrue();
2801+
2802+
// Seek to an address
2803+
assertThat(index.seekTo(addr4_4)).isTrue();
2804+
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_4);
2805+
}
2806+
27282807
private static class FakeSocketAddress extends SocketAddress {
27292808
final String name;
27302809

0 commit comments

Comments
 (0)