Skip to content

Commit c822adf

Browse files
authored
balancergroup: add a ParseConfig API and remove the UpdateBuilder API (#7232)
1 parent a75dfa6 commit c822adf

File tree

5 files changed

+457
-164
lines changed

5 files changed

+457
-164
lines changed

internal/balancergroup/balancergroup.go

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package balancergroup
2020

2121
import (
22+
"encoding/json"
2223
"fmt"
2324
"sync"
2425
"time"
@@ -29,6 +30,7 @@ import (
2930
"google.golang.org/grpc/internal/cache"
3031
"google.golang.org/grpc/internal/grpclog"
3132
"google.golang.org/grpc/resolver"
33+
"google.golang.org/grpc/serviceconfig"
3234
)
3335

3436
// subBalancerWrapper is used to keep the configurations that will be used to start
@@ -148,20 +150,6 @@ func (sbc *subBalancerWrapper) resolverError(err error) {
148150
b.ResolverError(err)
149151
}
150152

151-
func (sbc *subBalancerWrapper) gracefulSwitch(builder balancer.Builder) {
152-
sbc.builder = builder
153-
b := sbc.balancer
154-
// Even if you get an add and it persists builder but doesn't start
155-
// balancer, this would leave graceful switch being nil, in which we are
156-
// correctly overwriting with the recent builder here as well to use later.
157-
// The graceful switch balancer's presence is an invariant of whether the
158-
// balancer group is closed or not (if closed, nil, if started, present).
159-
if sbc.balancer != nil {
160-
sbc.group.logger.Infof("Switching child policy %v to type %v", sbc.id, sbc.builder.Name())
161-
b.SwitchTo(sbc.builder)
162-
}
163-
}
164-
165153
func (sbc *subBalancerWrapper) stopBalancer() {
166154
if sbc.balancer == nil {
167155
return
@@ -170,7 +158,8 @@ func (sbc *subBalancerWrapper) stopBalancer() {
170158
sbc.balancer = nil
171159
}
172160

173-
// BalancerGroup takes a list of balancers, and make them into one balancer.
161+
// BalancerGroup takes a list of balancers, each behind a gracefulswitch
162+
// balancer, and make them into one balancer.
174163
//
175164
// Note that this struct doesn't implement balancer.Balancer, because it's not
176165
// intended to be used directly as a balancer. It's expected to be used as a
@@ -377,25 +366,6 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
377366
bg.AddWithClientConn(id, builder.Name(), bg.cc)
378367
}
379368

380-
// UpdateBuilder updates the builder for a current child, starting the Graceful
381-
// Switch process for that child.
382-
//
383-
// TODO: update this API to take the name of the new builder instead.
384-
func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
385-
bg.outgoingMu.Lock()
386-
// This does not deal with the balancer cache because this call should come
387-
// after an Add call for a given child balancer. If the child is removed,
388-
// the caller will call Add if the child balancer comes back which would
389-
// then deal with the balancer cache.
390-
sbc := bg.idToBalancerConfig[id]
391-
if sbc == nil {
392-
// simply ignore it if not present, don't error
393-
return
394-
}
395-
sbc.gracefulSwitch(builder)
396-
bg.outgoingMu.Unlock()
397-
}
398-
399369
// Remove removes the balancer with id from the group.
400370
//
401371
// But doesn't close the balancer. The balancer is kept in a cache, and will be
@@ -636,3 +606,14 @@ func (bg *BalancerGroup) ExitIdleOne(id string) {
636606
}
637607
bg.outgoingMu.Unlock()
638608
}
609+
610+
// ParseConfig parses a child config list and returns a LB config for the
611+
// gracefulswitch Balancer.
612+
//
613+
// cfg is expected to be a json.RawMessage containing a JSON array of LB policy
614+
// names + configs as the format of the "loadBalancingConfig" field in
615+
// ServiceConfig. It returns a type that should be passed to
616+
// UpdateClientConnState in the BalancerConfig field.
617+
func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
618+
return gracefulswitch.ParseConfig(cfg)
619+
}

internal/balancergroup/balancergroup_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package balancergroup
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"fmt"
2223
"testing"
2324
"time"
@@ -609,9 +610,15 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
609610
return bal.UpdateClientConnState(ccs)
610611
},
611612
})
612-
builder := balancer.Get(childPolicyName)
613-
bg.UpdateBuilder(testBalancerIDs[0], builder)
614-
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil {
613+
cfgJSON := json.RawMessage(fmt.Sprintf(`[{%q: {}}]`, t.Name()))
614+
lbCfg, err := ParseConfig(cfgJSON)
615+
if err != nil {
616+
t.Fatalf("ParseConfig(%s) failed: %v", string(cfgJSON), err)
617+
}
618+
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{
619+
ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]},
620+
BalancerConfig: lbCfg,
621+
}); err != nil {
615622
t.Fatalf("error updating ClientConn state: %v", err)
616623
}
617624

xds/internal/balancer/clustermanager/balancerstateaggregator.go

Lines changed: 31 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,14 @@ func (s *subBalancerState) String() string {
4545
type balancerStateAggregator struct {
4646
cc balancer.ClientConn
4747
logger *grpclog.PrefixLogger
48+
csEval *balancer.ConnectivityStateEvaluator
4849

4950
mu sync.Mutex
50-
// If started is false, no updates should be sent to the parent cc. A closed
51-
// sub-balancer could still send pickers to this aggregator. This makes sure
52-
// that no updates will be forwarded to parent when the whole balancer group
53-
// and states aggregator is closed.
54-
started bool
55-
// All balancer IDs exist as keys in this map, even if balancer group is not
56-
// started.
57-
//
58-
// If an ID is not in map, it's either removed or never added.
51+
// This field is used to ensure that no updates are forwarded to the parent
52+
// CC once the aggregator is closed. A closed sub-balancer could still send
53+
// pickers to this aggregator.
54+
closed bool
55+
// Map from child policy name to last reported state.
5956
idToPickerState map[string]*subBalancerState
6057
// Set when UpdateState call propagation is paused.
6158
pauseUpdateState bool
@@ -68,34 +65,24 @@ func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLo
6865
return &balancerStateAggregator{
6966
cc: cc,
7067
logger: logger,
68+
csEval: &balancer.ConnectivityStateEvaluator{},
7169
idToPickerState: make(map[string]*subBalancerState),
7270
}
7371
}
7472

75-
// Start starts the aggregator. It can be called after Close to restart the
76-
// aggretator.
77-
func (bsa *balancerStateAggregator) start() {
78-
bsa.mu.Lock()
79-
defer bsa.mu.Unlock()
80-
bsa.started = true
81-
}
82-
83-
// Close closes the aggregator. When the aggregator is closed, it won't call
84-
// parent ClientConn to update balancer state.
8573
func (bsa *balancerStateAggregator) close() {
8674
bsa.mu.Lock()
8775
defer bsa.mu.Unlock()
88-
bsa.started = false
89-
bsa.clearStates()
76+
bsa.closed = true
9077
}
9178

92-
// add adds a sub-balancer state with weight. It adds a place holder, and waits
93-
// for the real sub-balancer to update state.
79+
// add adds a sub-balancer in CONNECTING state.
9480
//
9581
// This is called when there's a new child.
9682
func (bsa *balancerStateAggregator) add(id string) {
9783
bsa.mu.Lock()
9884
defer bsa.mu.Unlock()
85+
9986
bsa.idToPickerState[id] = &subBalancerState{
10087
// Start everything in CONNECTING, so if one of the sub-balancers
10188
// reports TransientFailure, the RPCs will still wait for the other
@@ -106,6 +93,8 @@ func (bsa *balancerStateAggregator) add(id string) {
10693
},
10794
stateToAggregate: connectivity.Connecting,
10895
}
96+
bsa.csEval.RecordTransition(connectivity.Shutdown, connectivity.Connecting)
97+
bsa.buildAndUpdateLocked()
10998
}
11099

111100
// remove removes the sub-balancer state. Future updates from this sub-balancer,
@@ -118,9 +107,15 @@ func (bsa *balancerStateAggregator) remove(id string) {
118107
if _, ok := bsa.idToPickerState[id]; !ok {
119108
return
120109
}
110+
// Setting the state of the deleted sub-balancer to Shutdown will get
111+
// csEvltr to remove the previous state for any aggregated state
112+
// evaluations. Transitions to and from connectivity.Shutdown are ignored
113+
// by csEvltr.
114+
bsa.csEval.RecordTransition(bsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown)
121115
// Remove id and picker from picker map. This also results in future updates
122116
// for this ID to be ignored.
123117
delete(bsa.idToPickerState, id)
118+
bsa.buildAndUpdateLocked()
124119
}
125120

126121
// pauseStateUpdates causes UpdateState calls to not propagate to the parent
@@ -140,7 +135,7 @@ func (bsa *balancerStateAggregator) resumeStateUpdates() {
140135
defer bsa.mu.Unlock()
141136
bsa.pauseUpdateState = false
142137
if bsa.needUpdateStateOnResume {
143-
bsa.cc.UpdateState(bsa.build())
138+
bsa.cc.UpdateState(bsa.buildLocked())
144139
}
145140
}
146141

@@ -149,6 +144,8 @@ func (bsa *balancerStateAggregator) resumeStateUpdates() {
149144
//
150145
// It calls parent ClientConn's UpdateState with the new aggregated state.
151146
func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) {
147+
bsa.logger.Infof("State update from sub-balancer %q: %+v", id, state)
148+
152149
bsa.mu.Lock()
153150
defer bsa.mu.Unlock()
154151
pickerSt, ok := bsa.idToPickerState[id]
@@ -162,42 +159,17 @@ func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State)
162159
// update the state, to prevent the aggregated state from being always
163160
// CONNECTING. Otherwise, stateToAggregate is the same as
164161
// state.ConnectivityState.
162+
bsa.csEval.RecordTransition(pickerSt.stateToAggregate, state.ConnectivityState)
165163
pickerSt.stateToAggregate = state.ConnectivityState
166164
}
167165
pickerSt.state = state
168-
169-
if !bsa.started {
170-
return
171-
}
172-
if bsa.pauseUpdateState {
173-
// If updates are paused, do not call UpdateState, but remember that we
174-
// need to call it when they are resumed.
175-
bsa.needUpdateStateOnResume = true
176-
return
177-
}
178-
bsa.cc.UpdateState(bsa.build())
179-
}
180-
181-
// clearState Reset everything to init state (Connecting) but keep the entry in
182-
// map (to keep the weight).
183-
//
184-
// Caller must hold bsa.mu.
185-
func (bsa *balancerStateAggregator) clearStates() {
186-
for _, pState := range bsa.idToPickerState {
187-
pState.state = balancer.State{
188-
ConnectivityState: connectivity.Connecting,
189-
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
190-
}
191-
pState.stateToAggregate = connectivity.Connecting
192-
}
166+
bsa.buildAndUpdateLocked()
193167
}
194168

195-
// buildAndUpdate combines the sub-state from each sub-balancer into one state,
196-
// and update it to parent ClientConn.
197-
func (bsa *balancerStateAggregator) buildAndUpdate() {
198-
bsa.mu.Lock()
199-
defer bsa.mu.Unlock()
200-
if !bsa.started {
169+
// buildAndUpdateLocked combines the sub-state from each sub-balancer into one
170+
// state, and sends a picker update to the parent ClientConn.
171+
func (bsa *balancerStateAggregator) buildAndUpdateLocked() {
172+
if bsa.closed {
201173
return
202174
}
203175
if bsa.pauseUpdateState {
@@ -206,55 +178,19 @@ func (bsa *balancerStateAggregator) buildAndUpdate() {
206178
bsa.needUpdateStateOnResume = true
207179
return
208180
}
209-
bsa.cc.UpdateState(bsa.build())
181+
bsa.cc.UpdateState(bsa.buildLocked())
210182
}
211183

212-
// build combines sub-states into one. The picker will do a child pick.
213-
//
214-
// Caller must hold bsa.mu.
215-
func (bsa *balancerStateAggregator) build() balancer.State {
216-
// TODO: the majority of this function (and UpdateState) is exactly the same
217-
// as weighted_target's state aggregator. Try to make a general utility
218-
// function/struct to handle the logic.
219-
//
220-
// One option: make a SubBalancerState that handles Update(State), including
221-
// handling the special connecting after ready, as in UpdateState(). Then a
222-
// function to calculate the aggregated connectivity state as in this
223-
// function.
224-
//
225-
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
226-
// state.
227-
var readyN, connectingN, idleN int
228-
for _, ps := range bsa.idToPickerState {
229-
switch ps.stateToAggregate {
230-
case connectivity.Ready:
231-
readyN++
232-
case connectivity.Connecting:
233-
connectingN++
234-
case connectivity.Idle:
235-
idleN++
236-
}
237-
}
238-
var aggregatedState connectivity.State
239-
switch {
240-
case readyN > 0:
241-
aggregatedState = connectivity.Ready
242-
case connectingN > 0:
243-
aggregatedState = connectivity.Connecting
244-
case idleN > 0:
245-
aggregatedState = connectivity.Idle
246-
default:
247-
aggregatedState = connectivity.TransientFailure
248-
}
249-
184+
// buildLocked combines sub-states into one.
185+
func (bsa *balancerStateAggregator) buildLocked() balancer.State {
250186
// The picker's return error might not be consistent with the
251187
// aggregatedState. Because for this LB policy, we want to always build
252188
// picker with all sub-pickers (not only ready sub-pickers), so even if the
253189
// overall state is Ready, pick for certain RPCs can behave like Connecting
254190
// or TransientFailure.
255191
bsa.logger.Infof("Child pickers: %+v", bsa.idToPickerState)
256192
return balancer.State{
257-
ConnectivityState: aggregatedState,
193+
ConnectivityState: bsa.csEval.CurrentState(),
258194
Picker: newPickerGroup(bsa.idToPickerState),
259195
}
260196
}

0 commit comments

Comments
 (0)