Skip to content

Commit 4fe8d3d

Browse files
authored
balancer: fix tests not properly updating subconn states (#6501)
1 parent 8ebe462 commit 4fe8d3d

File tree

8 files changed

+156
-111
lines changed

8 files changed

+156
-111
lines changed

balancer/weightedtarget/weightedtarget_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1211,7 +1211,11 @@ var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
12111211
func init() {
12121212
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
12131213
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
1214-
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
1214+
sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
1215+
if err != nil {
1216+
return err
1217+
}
1218+
sc.Connect()
12151219
return nil
12161220
},
12171221
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {

internal/balancer/gracefulswitch/gracefulswitch_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -953,6 +953,11 @@ func (mb1 *mockBalancer) newSubConn(addrs []resolver.Address, opts balancer.NewS
953953
if opts.StateListener == nil {
954954
opts.StateListener = func(state balancer.SubConnState) { mb1.UpdateSubConnState(sc, state) }
955955
}
956+
defer func() {
957+
if sc != nil {
958+
sc.Connect()
959+
}
960+
}()
956961
return mb1.cc.NewSubConn(addrs, opts)
957962
}
958963

@@ -1023,6 +1028,7 @@ func (vb *verifyBalancer) newSubConn(addrs []resolver.Address, opts balancer.New
10231028
if opts.StateListener == nil {
10241029
opts.StateListener = func(state balancer.SubConnState) { vb.UpdateSubConnState(sc, state) }
10251030
}
1031+
defer func() { sc.Connect() }()
10261032
return vb.cc.NewSubConn(addrs, opts)
10271033
}
10281034

@@ -1076,6 +1082,11 @@ func (bcb *buildCallbackBal) newSubConn(addrs []resolver.Address, opts balancer.
10761082
if opts.StateListener == nil {
10771083
opts.StateListener = func(state balancer.SubConnState) { bcb.UpdateSubConnState(sc, state) }
10781084
}
1085+
defer func() {
1086+
if sc != nil {
1087+
sc.Connect()
1088+
}
1089+
}()
10791090
return bcb.cc.NewSubConn(addrs, opts)
10801091
}
10811092

internal/testutils/balancer.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"google.golang.org/grpc/balancer"
2828
"google.golang.org/grpc/connectivity"
29+
"google.golang.org/grpc/internal/grpcsync"
2930
"google.golang.org/grpc/resolver"
3031
)
3132

@@ -40,13 +41,26 @@ type TestSubConn struct {
4041
id string
4142
ConnectCh chan struct{}
4243
stateListener func(balancer.SubConnState)
44+
connectCalled *grpcsync.Event
45+
}
46+
47+
// NewTestSubConn returns a newly initialized SubConn. Typically, subconns
48+
// should be created via TestClientConn.NewSubConn instead, but can be useful
49+
// for some tests.
50+
func NewTestSubConn(id string) *TestSubConn {
51+
return &TestSubConn{
52+
ConnectCh: make(chan struct{}, 1),
53+
connectCalled: grpcsync.NewEvent(),
54+
id: id,
55+
}
4356
}
4457

4558
// UpdateAddresses is a no-op.
4659
func (tsc *TestSubConn) UpdateAddresses([]resolver.Address) {}
4760

4861
// Connect is a no-op.
4962
func (tsc *TestSubConn) Connect() {
63+
tsc.connectCalled.Fire()
5064
select {
5165
case tsc.ConnectCh <- struct{}{}:
5266
default:
@@ -60,6 +74,7 @@ func (tsc *TestSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.P
6074

6175
// UpdateState pushes the state to the listener, if one is registered.
6276
func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) {
77+
<-tsc.connectCalled.Done()
6378
if tsc.stateListener != nil {
6479
tsc.stateListener(state)
6580
return
@@ -109,6 +124,7 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon
109124
id: fmt.Sprintf("sc%d", tcc.subConnIdx),
110125
ConnectCh: make(chan struct{}, 1),
111126
stateListener: o.StateListener,
127+
connectCalled: grpcsync.NewEvent(),
112128
}
113129
tcc.subConnIdx++
114130
tcc.logger.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc)

xds/internal/balancer/clusterimpl/balancer_test.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,13 @@ func (s) TestDropByCategory(t *testing.T) {
128128
}
129129

130130
sc1 := <-cc.NewSubConnCh
131-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
131+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
132132
// This should get the connecting picker.
133133
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
134134
t.Fatal(err.Error())
135135
}
136136

137-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
137+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
138138
// Test pick with one backend.
139139

140140
const rpcCount = 20
@@ -283,13 +283,13 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
283283
}
284284

285285
sc1 := <-cc.NewSubConnCh
286-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
286+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
287287
// This should get the connecting picker.
288288
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
289289
t.Fatal(err.Error())
290290
}
291291

292-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
292+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
293293
// Test pick with one backend.
294294
const rpcCount = 100
295295
if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
@@ -375,7 +375,11 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) {
375375
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
376376
// Create a subConn which will be used later on to test the race
377377
// between UpdateSubConnState() and Close().
378-
bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
378+
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
379+
if err != nil {
380+
return err
381+
}
382+
sc.Connect()
379383
return nil
380384
},
381385
UpdateSubConnState: func(bd *stub.BalancerData, _ balancer.SubConn, _ balancer.SubConnState) {
@@ -410,7 +414,7 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) {
410414
// that we use as the child policy will not send a picker update until the
411415
// parent policy is closed.
412416
sc1 := <-cc.NewSubConnCh
413-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
417+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
414418
b.Close()
415419
close(closeCh)
416420

@@ -449,7 +453,7 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
449453
}
450454

451455
sc1 := <-cc.NewSubConnCh
452-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
456+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
453457
// This should get the connecting picker.
454458
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
455459
t.Fatal(err.Error())
@@ -464,7 +468,7 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
464468
t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn, ok, testClusterName)
465469
}
466470

467-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
471+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
468472
// Test pick with one backend.
469473
if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
470474
t.Fatal(err.Error())
@@ -524,13 +528,13 @@ func (s) TestReResolution(t *testing.T) {
524528
}
525529

526530
sc1 := <-cc.NewSubConnCh
527-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
531+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
528532
// This should get the connecting picker.
529533
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
530534
t.Fatal(err.Error())
531535
}
532536

533-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
537+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
534538
// This should get the transient failure picker.
535539
if err := cc.WaitForErrPicker(ctx); err != nil {
536540
t.Fatal(err.Error())
@@ -543,13 +547,13 @@ func (s) TestReResolution(t *testing.T) {
543547
t.Fatalf("timeout waiting for ResolveNow()")
544548
}
545549

546-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
550+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
547551
// Test pick with one backend.
548552
if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
549553
t.Fatal(err.Error())
550554
}
551555

552-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
556+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
553557
// This should get the transient failure picker.
554558
if err := cc.WaitForErrPicker(ctx); err != nil {
555559
t.Fatal(err.Error())
@@ -608,13 +612,13 @@ func (s) TestLoadReporting(t *testing.T) {
608612
}
609613

610614
sc1 := <-cc.NewSubConnCh
611-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
615+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
612616
// This should get the connecting picker.
613617
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
614618
t.Fatal(err.Error())
615619
}
616620

617-
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
621+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
618622
// Test pick with one backend.
619623
const successCount = 5
620624
const errorCount = 5

xds/internal/balancer/clustermanager/clustermanager_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ func TestClusterPicks(t *testing.T) {
169169
// Clear the attributes before adding to map.
170170
addrs[0].BalancerAttributes = nil
171171
m1[addrs[0]] = sc
172-
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
173-
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
172+
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
173+
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
174174
}
175175

176176
p1 := <-cc.NewPickerCh
@@ -247,8 +247,8 @@ func TestConfigUpdateAddCluster(t *testing.T) {
247247
// Clear the attributes before adding to map.
248248
addrs[0].BalancerAttributes = nil
249249
m1[addrs[0]] = sc
250-
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
251-
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
250+
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
251+
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
252252
}
253253

254254
p1 := <-cc.NewPickerCh
@@ -313,8 +313,8 @@ func TestConfigUpdateAddCluster(t *testing.T) {
313313
// Clear the attributes before adding to map.
314314
addrs[0].BalancerAttributes = nil
315315
m1[addrs[0]] = sc
316-
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
317-
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
316+
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
317+
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
318318

319319
// Should have no more newSubConn.
320320
select {
@@ -404,8 +404,8 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
404404
// Clear the attributes before adding to map.
405405
addrs[0].BalancerAttributes = nil
406406
m1[addrs[0]] = sc
407-
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
408-
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
407+
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
408+
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
409409
}
410410

411411
p1 := <-cc.NewPickerCh
@@ -488,8 +488,8 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
488488
// Clear the attributes before adding to map.
489489
addrs[0].BalancerAttributes = nil
490490
m2[addrs[0]] = sc
491-
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
492-
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
491+
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
492+
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
493493
}
494494

495495
p3 := <-cc.NewPickerCh
@@ -582,7 +582,11 @@ var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
582582
func init() {
583583
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
584584
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
585-
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
585+
sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
586+
if err != nil {
587+
return err
588+
}
589+
sc.Connect()
586590
return nil
587591
},
588592
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
@@ -632,7 +636,7 @@ func TestInitialIdle(t *testing.T) {
632636
// in the address is cleared.
633637
for range wantAddrs {
634638
sc := <-cc.NewSubConnCh
635-
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
639+
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
636640
}
637641

638642
if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
@@ -673,8 +677,8 @@ func TestClusterGracefulSwitch(t *testing.T) {
673677
}
674678

675679
sc1 := <-cc.NewSubConnCh
676-
rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
677-
rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
680+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
681+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
678682
p1 := <-cc.NewPickerCh
679683
pi := balancer.PickInfo{
680684
Ctx: SetPickedCluster(context.Background(), "csp:cluster"),
@@ -703,7 +707,7 @@ func TestClusterGracefulSwitch(t *testing.T) {
703707
// Update the pick first balancers SubConn as CONNECTING. This will cause
704708
// the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
705709
// a Picker update back, as the Graceful Switch process is not complete.
706-
rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
710+
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
707711
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
708712
defer cancel()
709713
select {
@@ -716,7 +720,7 @@ func TestClusterGracefulSwitch(t *testing.T) {
716720
// the pick first balancer to UpdateState() with READY, which should send a
717721
// Picker update back, as the Graceful Switch process is complete. This
718722
// Picker should always pick the pick first's created SubConn.
719-
rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
723+
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
720724
p2 := <-cc.NewPickerCh
721725
testPick(t, p2, pi, sc2, nil)
722726
// The Graceful Switch process completing for the child should cause the

0 commit comments

Comments
 (0)