Skip to content

Commit 425faa2

Browse files
Fix slowdown and stoppage in the main event loop (#499)
* Fix slowdown and stoppage in the main event loop - Prevent the event loop from exiting immediately if it receives a stale event marked InProgress - Periodically log the size of the event queue. - Periodically "garbage-collect" the event queue to trim already-handled events and slow the rate of its unbounded growth. * Cancel events which fail to drain or cordon * Move event store logging and GC into the store itself * Don't print more than 1 error per second when running out of workers
1 parent 6a22485 commit 425faa2

File tree

2 files changed

+68
-4
lines changed

2 files changed

+68
-4
lines changed

cmd/node-termination-handler.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,16 +229,23 @@ func main() {
229229
// Exit interruption loop if a SIGTERM is received or the channel is closed
230230
break
231231
default:
232-
for event, ok := interruptionEventStore.GetActiveEvent(); ok && !event.InProgress; event, ok = interruptionEventStore.GetActiveEvent() {
232+
EventLoop:
233+
for event, ok := interruptionEventStore.GetActiveEvent(); ok; event, ok = interruptionEventStore.GetActiveEvent() {
233234
select {
234235
case interruptionEventStore.Workers <- 1:
236+
log.Info().
237+
Str("event-id", event.EventID).
238+
Str("kind", event.Kind).
239+
Str("node-name", event.NodeName).
240+
Str("instance-id", event.InstanceID).
241+
Msg("Requesting instance drain")
235242
event.InProgress = true
236243
wg.Add(1)
237244
recorder.Emit(event.NodeName, observability.Normal, observability.GetReasonForKind(event.Kind), event.Description)
238245
go drainOrCordonIfNecessary(interruptionEventStore, event, *node, nthConfig, nodeMetadata, metrics, recorder, &wg)
239246
default:
240247
log.Warn().Msg("all workers busy, waiting")
241-
break
248+
break EventLoop
242249
}
243250
}
244251
}
@@ -339,6 +346,7 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto
339346
}
340347

341348
if err != nil {
349+
interruptionEventStore.CancelInterruptionEvent(drainEvent.EventID)
342350
<-interruptionEventStore.Workers
343351
} else {
344352
interruptionEventStore.MarkAllAsProcessed(nodeName)

pkg/interruptioneventstore/interruption-event-store.go

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,26 @@ type Store struct {
3131
ignoredEvents map[string]struct{}
3232
atLeastOneEvent bool
3333
Workers chan int
34+
callsSinceLastClean int
35+
callsSinceLastLog int
36+
cleaningPeriod int
37+
loggingPeriod int
3438
}
3539

3640
// New Creates a new interruption event store
3741
func New(nthConfig config.Config) *Store {
38-
return &Store{
42+
store := &Store{
3943
NthConfig: nthConfig,
4044
interruptionEventStore: make(map[string]*monitor.InterruptionEvent),
4145
ignoredEvents: make(map[string]struct{}),
4246
Workers: make(chan int, nthConfig.Workers),
47+
cleaningPeriod: 7200,
48+
loggingPeriod: 1800,
4349
}
50+
if nthConfig.LogLevel == "debug" {
51+
store.loggingPeriod = 60
52+
}
53+
return store
4454
}
4555

4656
// CancelInterruptionEvent removes an interruption event from the internal store
@@ -70,6 +80,8 @@ func (s *Store) AddInterruptionEvent(interruptionEvent *monitor.InterruptionEven
7080

7181
// GetActiveEvent returns true if there are interruption events in the internal store
7282
func (s *Store) GetActiveEvent() (*monitor.InterruptionEvent, bool) {
83+
s.cleanPeriodically()
84+
s.logPeriodically()
7385
s.RLock()
7486
defer s.RUnlock()
7587
for _, interruptionEvent := range s.interruptionEventStore {
@@ -94,7 +106,7 @@ func (s *Store) ShouldDrainNode() bool {
94106

95107
func (s *Store) shouldEventDrain(interruptionEvent *monitor.InterruptionEvent) bool {
96108
_, ignored := s.ignoredEvents[interruptionEvent.EventID]
97-
if !ignored && !interruptionEvent.NodeProcessed && s.TimeUntilDrain(interruptionEvent) <= 0 {
109+
if !ignored && !interruptionEvent.InProgress && !interruptionEvent.NodeProcessed && s.TimeUntilDrain(interruptionEvent) <= 0 {
98110
return true
99111
}
100112
return false
@@ -148,3 +160,47 @@ func (s *Store) ShouldUncordonNode(nodeName string) bool {
148160

149161
return true
150162
}
163+
164+
// cleanPeriodically removes old events from the store every N times it is called
165+
//
166+
// Cleaning consists of removing events with NodeProcessed=true
167+
func (s *Store) cleanPeriodically() {
168+
s.Lock()
169+
defer s.Unlock()
170+
s.callsSinceLastClean++
171+
if s.callsSinceLastClean < s.cleaningPeriod {
172+
return
173+
}
174+
log.Info().Msg("Garbage-collecting the interruption event store")
175+
toDelete := []string{}
176+
for _, e := range s.interruptionEventStore {
177+
if e.NodeProcessed {
178+
toDelete = append(toDelete, e.EventID)
179+
}
180+
}
181+
for _, id := range toDelete {
182+
delete(s.interruptionEventStore, id)
183+
}
184+
s.callsSinceLastClean = 0
185+
}
186+
187+
// logPeriodically logs statistics about the store every N times it is called.
188+
func (s *Store) logPeriodically() {
189+
s.Lock()
190+
defer s.Unlock()
191+
s.callsSinceLastLog++
192+
if s.callsSinceLastLog < s.loggingPeriod {
193+
return
194+
}
195+
drainableEventCount := 0
196+
for _, interruptionEvent := range s.interruptionEventStore {
197+
if s.shouldEventDrain(interruptionEvent) {
198+
drainableEventCount += 1
199+
}
200+
}
201+
log.Info().
202+
Int("size", len(s.interruptionEventStore)).
203+
Int("drainable-events", drainableEventCount).
204+
Msg("event store statistics")
205+
s.callsSinceLastLog = 0
206+
}

0 commit comments

Comments
 (0)