Skip to content

Commit ba526ce

Browse files
authored
Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue, reduce goroutines etc (#15693)
* move shutdownfns, terminatefns and hammerfns out of separate goroutines Coalesce the shutdownfns etc into a list of functions that get run at shutdown rather then have them run at goroutines blocked on selects. This may help reduce the background select/poll load in certain configurations. * The LevelDB queues can actually wait on empty instead of polling Slight refactor to cause leveldb queues to wait on empty instead of polling. * Shutdown the shadow level queue once it is empty * Remove bytefifo additional goroutine for readToChan as it can just be run in run * Remove additional removeWorkers goroutine for workers * Simplify the AtShutdown and AtTerminate functions and add Channel Flusher * Add shutdown flusher to CUQ * move persistable channel shutdown stuff to Shutdown Fn * Ensure that UPCQ has the correct config * handle shutdown during the flushing * reduce risk of race between zeroBoost and addWorkers * prevent double shutdown Signed-off-by: Andrew Thornton <[email protected]>
1 parent 9f19c2b commit ba526ce

24 files changed

+598
-412
lines changed

modules/graceful/context.go

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,9 @@ package graceful
66

77
import (
88
"context"
9-
"fmt"
109
"time"
1110
)
1211

13-
// Errors for context.Err()
14-
var (
15-
ErrShutdown = fmt.Errorf("Graceful Manager called Shutdown")
16-
ErrHammer = fmt.Errorf("Graceful Manager called Hammer")
17-
ErrTerminate = fmt.Errorf("Graceful Manager called Terminate")
18-
)
19-
2012
// ChannelContext is a context that wraps a channel and error as a context
2113
type ChannelContext struct {
2214
done <-chan struct{}
@@ -63,28 +55,19 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} {
6355
// Callers using this context should ensure that they are registered as a running server
6456
// in order that they are waited for.
6557
func (g *Manager) ShutdownContext() context.Context {
66-
return &ChannelContext{
67-
done: g.IsShutdown(),
68-
err: ErrShutdown,
69-
}
58+
return g.shutdownCtx
7059
}
7160

7261
// HammerContext returns a context.Context that is Done at hammer
7362
// Callers using this context should ensure that they are registered as a running server
7463
// in order that they are waited for.
7564
func (g *Manager) HammerContext() context.Context {
76-
return &ChannelContext{
77-
done: g.IsHammer(),
78-
err: ErrHammer,
79-
}
65+
return g.hammerCtx
8066
}
8167

8268
// TerminateContext returns a context.Context that is Done at terminate
8369
// Callers using this context should ensure that they are registered as a terminating server
8470
// in order that they are waited for.
8571
func (g *Manager) TerminateContext() context.Context {
86-
return &ChannelContext{
87-
done: g.IsTerminate(),
88-
err: ErrTerminate,
89-
}
72+
return g.terminateCtx
9073
}

modules/graceful/manager.go

Lines changed: 82 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,16 @@ func InitManager(ctx context.Context) {
5454
})
5555
}
5656

57-
// CallbackWithContext is combined runnable and context to watch to see if the caller has finished
58-
type CallbackWithContext func(ctx context.Context, callback func())
57+
// WithCallback is a runnable to call when the caller has finished
58+
type WithCallback func(callback func())
5959

6060
// RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate
6161
// After the callback to atShutdown is called and is complete, the main function must return.
6262
// Similarly the callback function provided to atTerminate must return once termination is complete.
6363
// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
6464
// - users must therefore be careful to only call these as necessary.
6565
// If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate.
66-
type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, func()))
66+
type RunnableWithShutdownFns func(atShutdown, atTerminate func(func()))
6767

6868
// RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks
6969
// After the callback to atShutdown is called and is complete, the main function must return.
@@ -80,17 +80,21 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
8080
g.doShutdown()
8181
}
8282
}()
83-
run(func(ctx context.Context, atShutdown func()) {
84-
go func() {
85-
select {
86-
case <-g.IsShutdown():
83+
run(func(atShutdown func()) {
84+
g.lock.Lock()
85+
defer g.lock.Unlock()
86+
g.toRunAtShutdown = append(g.toRunAtShutdown,
87+
func() {
88+
defer func() {
89+
if err := recover(); err != nil {
90+
log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
91+
g.doShutdown()
92+
}
93+
}()
8794
atShutdown()
88-
case <-ctx.Done():
89-
return
90-
}
91-
}()
92-
}, func(ctx context.Context, atTerminate func()) {
93-
g.RunAtTerminate(ctx, atTerminate)
95+
})
96+
}, func(atTerminate func()) {
97+
g.RunAtTerminate(atTerminate)
9498
})
9599
}
96100

@@ -99,7 +103,7 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
99103
// (Optionally IsHammer may be waited for instead however, this should be avoided if possible.)
100104
// The callback function provided to atTerminate must return once termination is complete.
101105
// Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary.
102-
type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate CallbackWithContext)
106+
type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate WithCallback)
103107

104108
// RunWithShutdownChan takes a function that has channel to watch for shutdown and atTerminate callbacks
105109
// After the atShutdown channel is closed, the main function must return once shutdown is complete.
@@ -115,8 +119,8 @@ func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) {
115119
g.doShutdown()
116120
}
117121
}()
118-
run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) {
119-
g.RunAtTerminate(ctx, atTerminate)
122+
run(g.IsShutdown(), func(atTerminate func()) {
123+
g.RunAtTerminate(atTerminate)
120124
})
121125
}
122126

@@ -136,60 +140,65 @@ func (g *Manager) RunWithShutdownContext(run func(context.Context)) {
136140
}
137141

138142
// RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination
139-
func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) {
143+
func (g *Manager) RunAtTerminate(terminate func()) {
140144
g.terminateWaitGroup.Add(1)
141-
go func() {
142-
defer g.terminateWaitGroup.Done()
143-
defer func() {
144-
if err := recover(); err != nil {
145-
log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2))
146-
}
147-
}()
148-
select {
149-
case <-g.IsTerminate():
145+
g.lock.Lock()
146+
defer g.lock.Unlock()
147+
g.toRunAtTerminate = append(g.toRunAtTerminate,
148+
func() {
149+
defer g.terminateWaitGroup.Done()
150+
defer func() {
151+
if err := recover(); err != nil {
152+
log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2))
153+
}
154+
}()
150155
terminate()
151-
case <-ctx.Done():
152-
}
153-
}()
156+
})
154157
}
155158

156159
// RunAtShutdown creates a go-routine to run the provided function at shutdown
157160
func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) {
158-
go func() {
159-
defer func() {
160-
if err := recover(); err != nil {
161-
log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2))
161+
g.lock.Lock()
162+
defer g.lock.Unlock()
163+
g.toRunAtShutdown = append(g.toRunAtShutdown,
164+
func() {
165+
defer func() {
166+
if err := recover(); err != nil {
167+
log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2))
168+
}
169+
}()
170+
select {
171+
case <-ctx.Done():
172+
return
173+
default:
174+
shutdown()
162175
}
163-
}()
164-
select {
165-
case <-g.IsShutdown():
166-
shutdown()
167-
case <-ctx.Done():
168-
}
169-
}()
176+
})
170177
}
171178

172179
// RunAtHammer creates a go-routine to run the provided function at shutdown
173-
func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) {
174-
go func() {
175-
defer func() {
176-
if err := recover(); err != nil {
177-
log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
178-
}
179-
}()
180-
select {
181-
case <-g.IsHammer():
180+
func (g *Manager) RunAtHammer(hammer func()) {
181+
g.lock.Lock()
182+
defer g.lock.Unlock()
183+
g.toRunAtHammer = append(g.toRunAtHammer,
184+
func() {
185+
defer func() {
186+
if err := recover(); err != nil {
187+
log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
188+
}
189+
}()
182190
hammer()
183-
case <-ctx.Done():
184-
}
185-
}()
191+
})
186192
}
187193
func (g *Manager) doShutdown() {
188194
if !g.setStateTransition(stateRunning, stateShuttingDown) {
189195
return
190196
}
191197
g.lock.Lock()
192-
close(g.shutdown)
198+
g.shutdownCtxCancel()
199+
for _, fn := range g.toRunAtShutdown {
200+
go fn()
201+
}
193202
g.lock.Unlock()
194203

195204
if setting.GracefulHammerTime >= 0 {
@@ -203,7 +212,7 @@ func (g *Manager) doShutdown() {
203212
g.doTerminate()
204213
g.WaitForTerminate()
205214
g.lock.Lock()
206-
close(g.done)
215+
g.doneCtxCancel()
207216
g.lock.Unlock()
208217
}()
209218
}
@@ -212,10 +221,13 @@ func (g *Manager) doHammerTime(d time.Duration) {
212221
time.Sleep(d)
213222
g.lock.Lock()
214223
select {
215-
case <-g.hammer:
224+
case <-g.hammerCtx.Done():
216225
default:
217226
log.Warn("Setting Hammer condition")
218-
close(g.hammer)
227+
g.hammerCtxCancel()
228+
for _, fn := range g.toRunAtHammer {
229+
go fn()
230+
}
219231
}
220232
g.lock.Unlock()
221233
}
@@ -226,10 +238,13 @@ func (g *Manager) doTerminate() {
226238
}
227239
g.lock.Lock()
228240
select {
229-
case <-g.terminate:
241+
case <-g.terminateCtx.Done():
230242
default:
231243
log.Warn("Terminating")
232-
close(g.terminate)
244+
g.terminateCtxCancel()
245+
for _, fn := range g.toRunAtTerminate {
246+
go fn()
247+
}
233248
}
234249
g.lock.Unlock()
235250
}
@@ -242,22 +257,22 @@ func (g *Manager) IsChild() bool {
242257
// IsShutdown returns a channel which will be closed at shutdown.
243258
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
244259
func (g *Manager) IsShutdown() <-chan struct{} {
245-
return g.shutdown
260+
return g.shutdownCtx.Done()
246261
}
247262

248263
// IsHammer returns a channel which will be closed at hammer
249264
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
250265
// Servers running within the running server wait group should respond to IsHammer
251266
// if not shutdown already
252267
func (g *Manager) IsHammer() <-chan struct{} {
253-
return g.hammer
268+
return g.hammerCtx.Done()
254269
}
255270

256271
// IsTerminate returns a channel which will be closed at terminate
257272
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
258273
// IsTerminate will only close once all running servers have stopped
259274
func (g *Manager) IsTerminate() <-chan struct{} {
260-
return g.terminate
275+
return g.terminateCtx.Done()
261276
}
262277

263278
// ServerDone declares a running server done and subtracts one from the
@@ -314,25 +329,20 @@ func (g *Manager) InformCleanup() {
314329

315330
// Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating
316331
func (g *Manager) Done() <-chan struct{} {
317-
return g.done
332+
return g.doneCtx.Done()
318333
}
319334

320-
// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate
335+
// Err allows the manager to be viewed as a context.Context done at Terminate
321336
func (g *Manager) Err() error {
322-
select {
323-
case <-g.Done():
324-
return ErrTerminate
325-
default:
326-
return nil
327-
}
337+
return g.doneCtx.Err()
328338
}
329339

330-
// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values
340+
// Value allows the manager to be viewed as a context.Context done at Terminate
331341
func (g *Manager) Value(key interface{}) interface{} {
332-
return nil
342+
return g.doneCtx.Value(key)
333343
}
334344

335345
// Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context
336346
func (g *Manager) Deadline() (deadline time.Time, ok bool) {
337-
return
347+
return g.doneCtx.Deadline()
338348
}

modules/graceful/manager_unix.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,21 @@ type Manager struct {
2525
forked bool
2626
lock *sync.RWMutex
2727
state state
28-
shutdown chan struct{}
29-
hammer chan struct{}
30-
terminate chan struct{}
31-
done chan struct{}
28+
shutdownCtx context.Context
29+
hammerCtx context.Context
30+
terminateCtx context.Context
31+
doneCtx context.Context
32+
shutdownCtxCancel context.CancelFunc
33+
hammerCtxCancel context.CancelFunc
34+
terminateCtxCancel context.CancelFunc
35+
doneCtxCancel context.CancelFunc
3236
runningServerWaitGroup sync.WaitGroup
3337
createServerWaitGroup sync.WaitGroup
3438
terminateWaitGroup sync.WaitGroup
39+
40+
toRunAtShutdown []func()
41+
toRunAtHammer []func()
42+
toRunAtTerminate []func()
3543
}
3644

3745
func newGracefulManager(ctx context.Context) *Manager {
@@ -45,11 +53,11 @@ func newGracefulManager(ctx context.Context) *Manager {
4553
}
4654

4755
func (g *Manager) start(ctx context.Context) {
48-
// Make channels
49-
g.terminate = make(chan struct{})
50-
g.shutdown = make(chan struct{})
51-
g.hammer = make(chan struct{})
52-
g.done = make(chan struct{})
56+
// Make contexts
57+
g.terminateCtx, g.terminateCtxCancel = context.WithCancel(ctx)
58+
g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(ctx)
59+
g.hammerCtx, g.hammerCtxCancel = context.WithCancel(ctx)
60+
g.doneCtx, g.doneCtxCancel = context.WithCancel(ctx)
5361

5462
// Set the running state & handle signals
5563
g.setState(stateRunning)

0 commit comments

Comments
 (0)