Skip to content

Commit ce3b538

Browse files
authored
client: simplify initialization and cleanup a bit (#6798)
1 parent b98104e commit ce3b538

File tree

2 files changed

+57
-70
lines changed

2 files changed

+57
-70
lines changed

balancer_conn_wrappers.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,17 +76,14 @@ type ccBalancerWrapper struct {
7676
mode ccbMode // Tracks the current mode of the wrapper.
7777
}
7878

79-
// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
80-
// is not created until the switchTo() method is invoked.
79+
// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
80+
// underlying balancer is not created until the switchTo() method is invoked.
8181
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
82-
ctx, cancel := context.WithCancel(context.Background())
8382
ccb := &ccBalancerWrapper{
84-
cc: cc,
85-
opts: bopts,
86-
serializer: grpcsync.NewCallbackSerializer(ctx),
87-
serializerCancel: cancel,
83+
cc: cc,
84+
opts: bopts,
85+
mode: ccbModeIdle,
8886
}
89-
ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
9087
return ccb
9188
}
9289

@@ -258,7 +255,7 @@ func (ccb *ccBalancerWrapper) exitIdleMode() {
258255
// exitIdleMode(), and since we just created a new serializer, we can be
259256
// sure that the below function will be scheduled.
260257
done := make(chan struct{})
261-
ccb.serializer.Schedule(func(_ context.Context) {
258+
ccb.serializer.Schedule(func(context.Context) {
262259
defer close(done)
263260

264261
ccb.mu.Lock()
@@ -271,7 +268,11 @@ func (ccb *ccBalancerWrapper) exitIdleMode() {
271268

272269
// Gracefulswitch balancer does not support a switchTo operation after
273270
// being closed. Hence we need to create a new one here.
274-
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
271+
opts := ccb.opts
272+
if c := opts.DialCreds; c != nil {
273+
opts.DialCreds = c.Clone()
274+
}
275+
ccb.balancer = gracefulswitch.NewBalancer(ccb, opts)
275276
ccb.mode = ccbModeActive
276277
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")
277278

@@ -337,7 +338,7 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
337338
// case where we wait for ready and then perform an RPC. If the picker is
338339
// updated later, we could call the "connecting" picker when the state is
339340
// updated, and then call the "ready" picker after the picker gets updated.
340-
ccb.cc.blockingpicker.updatePicker(s.Picker)
341+
ccb.cc.pickerWrapper.updatePicker(s.Picker)
341342
ccb.cc.csMgr.updateState(s.ConnectivityState)
342343
}
343344

clientconn.go

Lines changed: 45 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
160160
cc.ctx, cc.cancel = context.WithCancel(context.Background())
161161
cc.exitIdleCond = sync.NewCond(&cc.mu)
162162

163+
// Apply dial options.
163164
disableGlobalOpts := false
164165
for _, opt := range opts {
165166
if _, ok := opt.(*disableGlobalDialOptions); ok {
@@ -177,21 +178,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
177178
for _, opt := range opts {
178179
opt.apply(&cc.dopts)
179180
}
180-
181181
chainUnaryClientInterceptors(cc)
182182
chainStreamClientInterceptors(cc)
183183

184-
defer func() {
185-
if err != nil {
186-
cc.Close()
187-
}
188-
}()
189-
190-
// Register ClientConn with channelz.
191-
cc.channelzRegistration(target)
192-
193-
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
194-
195184
if err := cc.validateTransportCredentials(); err != nil {
196185
return nil, err
197186
}
@@ -211,6 +200,37 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
211200
cc.dopts.copts.UserAgent = grpcUA
212201
}
213202

203+
// Register ClientConn with channelz.
204+
cc.channelzRegistration(target)
205+
206+
// Determine the resolver to use.
207+
if err := cc.parseTargetAndFindResolver(); err != nil {
208+
channelz.RemoveEntry(cc.channelzID)
209+
return nil, err
210+
}
211+
if err = cc.determineAuthority(); err != nil {
212+
channelz.RemoveEntry(cc.channelzID)
213+
return nil, err
214+
}
215+
216+
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
217+
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
218+
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
219+
DialCreds: cc.dopts.copts.TransportCredentials,
220+
CredsBundle: cc.dopts.copts.CredsBundle,
221+
Dialer: cc.dopts.copts.Dialer,
222+
Authority: cc.authority,
223+
CustomUserAgent: cc.dopts.copts.UserAgent,
224+
ChannelzParentID: cc.channelzID,
225+
Target: cc.parsedTarget,
226+
})
227+
228+
defer func() {
229+
if err != nil {
230+
cc.Close()
231+
}
232+
}()
233+
214234
if cc.dopts.timeout > 0 {
215235
var cancel context.CancelFunc
216236
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
@@ -235,14 +255,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
235255
cc.dopts.bs = backoff.DefaultExponential
236256
}
237257

238-
// Determine the resolver to use.
239-
if err := cc.parseTargetAndFindResolver(); err != nil {
240-
return nil, err
241-
}
242-
if err = cc.determineAuthority(); err != nil {
243-
return nil, err
244-
}
245-
246258
if cc.dopts.scChan != nil {
247259
// Blocking wait for the initial service config.
248260
select {
@@ -359,31 +371,13 @@ func (cc *ClientConn) exitIdleMode() error {
359371
}()
360372

361373
cc.idlenessState = ccIdlenessStateExitingIdle
362-
exitedIdle := false
363-
if cc.blockingpicker == nil {
364-
cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
365-
} else {
366-
cc.blockingpicker.exitIdleMode()
367-
exitedIdle = true
368-
}
374+
cc.pickerWrapper.exitIdleMode()
369375

370376
var credsClone credentials.TransportCredentials
371377
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
372378
credsClone = creds.Clone()
373379
}
374-
if cc.balancerWrapper == nil {
375-
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
376-
DialCreds: credsClone,
377-
CredsBundle: cc.dopts.copts.CredsBundle,
378-
Dialer: cc.dopts.copts.Dialer,
379-
Authority: cc.authority,
380-
CustomUserAgent: cc.dopts.copts.UserAgent,
381-
ChannelzParentID: cc.channelzID,
382-
Target: cc.parsedTarget,
383-
})
384-
} else {
385-
cc.balancerWrapper.exitIdleMode()
386-
}
380+
cc.balancerWrapper.exitIdleMode()
387381
cc.firstResolveEvent = grpcsync.NewEvent()
388382
cc.mu.Unlock()
389383

@@ -394,9 +388,7 @@ func (cc *ClientConn) exitIdleMode() error {
394388
return err
395389
}
396390

397-
if exitedIdle {
398-
cc.addTraceEvent("exiting idle mode")
399-
}
391+
cc.addTraceEvent("exiting idle mode")
400392
return nil
401393
}
402394

@@ -427,7 +419,7 @@ func (cc *ClientConn) enterIdleMode() error {
427419
// `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should
428420
// try to do the same for the balancer and picker wrappers too.
429421
cc.resolverWrapper.close()
430-
cc.blockingpicker.enterIdleMode()
422+
cc.pickerWrapper.enterIdleMode()
431423
cc.balancerWrapper.enterIdleMode()
432424
cc.csMgr.updateState(connectivity.Idle)
433425
cc.idlenessState = ccIdlenessStateIdle
@@ -655,7 +647,7 @@ type ClientConn struct {
655647
// The following provide their own synchronization, and therefore don't
656648
// require cc.mu to be held to access them.
657649
csMgr *connectivityStateManager
658-
blockingpicker *pickerWrapper
650+
pickerWrapper *pickerWrapper
659651
safeConfigSelector iresolver.SafeConfigSelector
660652
czData *channelzData
661653
retryThrottler atomic.Value // Updated from service config.
@@ -910,7 +902,7 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
910902
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
911903
}
912904
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
913-
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
905+
cc.pickerWrapper.updatePicker(base.NewErrPicker(err))
914906
cc.csMgr.updateState(connectivity.TransientFailure)
915907
}
916908

@@ -1174,7 +1166,7 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
11741166
}
11751167

11761168
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
1177-
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
1169+
return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
11781170
Ctx: ctx,
11791171
FullMethodName: method,
11801172
})
@@ -1267,24 +1259,18 @@ func (cc *ClientConn) Close() error {
12671259
cc.conns = nil
12681260
cc.csMgr.updateState(connectivity.Shutdown)
12691261

1270-
pWrapper := cc.blockingpicker
1271-
rWrapper := cc.resolverWrapper
1272-
bWrapper := cc.balancerWrapper
1273-
idlenessMgr := cc.idlenessMgr
1262+
// We can safely unlock and continue to access all fields now as
1263+
// cc.conns==nil, preventing any further operations on cc.
12741264
cc.mu.Unlock()
12751265

12761266
// The order of closing matters here since the balancer wrapper assumes the
12771267
// picker is closed before it is closed.
1278-
if pWrapper != nil {
1279-
pWrapper.close()
1280-
}
1281-
if bWrapper != nil {
1282-
bWrapper.close()
1283-
}
1284-
if rWrapper != nil {
1268+
cc.pickerWrapper.close()
1269+
cc.balancerWrapper.close()
1270+
if rWrapper := cc.resolverWrapper; rWrapper != nil {
12851271
rWrapper.close()
12861272
}
1287-
if idlenessMgr != nil {
1273+
if idlenessMgr := cc.idlenessMgr; idlenessMgr != nil {
12881274
idlenessMgr.Close()
12891275
}
12901276

0 commit comments

Comments
 (0)