From dbf9649d7380a92d6ffc32a25ffa38c87f155f35 Mon Sep 17 00:00:00 2001 From: "Michael F. Booth" Date: Mon, 11 Oct 2021 16:22:40 -0400 Subject: [PATCH 1/4] Fix slowdown and stoppage in the main event loop - Prevent the event loop from exiting immediately if it receives a stale event marked InProgress - Periodically log the size of the event queue. - Periodically "garbage-collect" the event queue to trim already-handled events and slow the rate of its unbounded growth. --- cmd/node-termination-handler.go | 30 ++++++++++++++- .../interruption-event-store.go | 37 ++++++++++++++++++- 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 932809e7..3dca94da 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -222,16 +222,28 @@ func main() { log.Info().Msg("Started watching for event cancellations") var wg sync.WaitGroup - + eventStoreGCInterval := 0 + eventStoreGCPeriod := 7200 + eventStoreLogInterval := 0 + eventStoreLogPeriod := 1800 + if nthConfig.LogLevel == "debug" { + eventStoreLogPeriod = 60 + } for range time.NewTicker(1 * time.Second).C { select { case <-signalChan: // Exit interruption loop if a SIGTERM is received or the channel is closed break default: - for event, ok := interruptionEventStore.GetActiveEvent(); ok && !event.InProgress; event, ok = interruptionEventStore.GetActiveEvent() { + for event, ok := interruptionEventStore.GetActiveEvent(); ok; event, ok = interruptionEventStore.GetActiveEvent() { select { case interruptionEventStore.Workers <- 1: + log.Info(). + Str("event-id", event.EventID). + Str("kind", event.Kind). + Str("node-name", event.NodeName). + Str("instance-id", event.InstanceID). + Msg("Requesting instance drain") event.InProgress = true wg.Add(1) recorder.Emit(event.NodeName, observability.Normal, observability.GetReasonForKind(event.Kind), event.Description) @@ -242,6 +254,20 @@ func main() { } } } + if eventStoreGCInterval >= eventStoreGCPeriod { + log.Info().Msg("Garbage-collecting the interruption event store") + interruptionEventStore.GC() + eventStoreGCInterval = 0 + } + if eventStoreLogInterval >= eventStoreLogPeriod { + log.Info(). + Int("size", interruptionEventStore.Size()). + Int("drainable-events", interruptionEventStore.CountDrainableEvents()). + Msg("event store statistics") + eventStoreLogInterval = 0 + } + eventStoreGCInterval++ + eventStoreLogInterval++ } log.Info().Msg("AWS Node Termination Handler is shutting down") wg.Wait() diff --git a/pkg/interruptioneventstore/interruption-event-store.go b/pkg/interruptioneventstore/interruption-event-store.go index 8a85c2d0..2e9063e5 100644 --- a/pkg/interruptioneventstore/interruption-event-store.go +++ b/pkg/interruptioneventstore/interruption-event-store.go @@ -68,6 +68,13 @@ func (s *Store) AddInterruptionEvent(interruptionEvent *monitor.InterruptionEven } } +// Size returns the total number of events in the internal store +func (s *Store) Size() int { + s.RLock() + defer s.RUnlock() + return len(s.interruptionEventStore) +} + // GetActiveEvent returns true if there are interruption events in the internal store func (s *Store) GetActiveEvent() (*monitor.InterruptionEvent, bool) { s.RLock() @@ -80,6 +87,19 @@ func (s *Store) GetActiveEvent() (*monitor.InterruptionEvent, bool) { return &monitor.InterruptionEvent{}, false } +// CountDrainableEvents returns the number of drainable events in the internal store +func (s *Store) CountDrainableEvents() int { + s.RLock() + defer s.RUnlock() + n := 0 + for _, interruptionEvent := range s.interruptionEventStore { + if s.shouldEventDrain(interruptionEvent) { + n += 1 + } + } + return n +} + // ShouldDrainNode returns true if there are drainable events in the internal store func (s *Store) ShouldDrainNode() bool { s.RLock() @@ -94,7 +114,7 @@ func (s *Store) ShouldDrainNode() bool { func (s *Store) shouldEventDrain(interruptionEvent *monitor.InterruptionEvent) bool { _, ignored := s.ignoredEvents[interruptionEvent.EventID] - if !ignored && !interruptionEvent.NodeProcessed && s.TimeUntilDrain(interruptionEvent) <= 0 { + if !ignored && !interruptionEvent.InProgress && !interruptionEvent.NodeProcessed && s.TimeUntilDrain(interruptionEvent) <= 0 { return true } return false @@ -148,3 +168,18 @@ func (s *Store) ShouldUncordonNode(nodeName string) bool { return true } + +// GC garbage-collects the store by removing events with NodeProcessed=true +func (s *Store) GC() { + s.Lock() + defer s.Unlock() + toDelete := []string{} + for _, e := range s.interruptionEventStore { + if e.NodeProcessed { + toDelete = append(toDelete, e.EventID) + } + } + for _, id := range toDelete { + delete(s.interruptionEventStore, id) + } +} From 495d08fe9c98134b46e92fc53319bbdd3e515a2f Mon Sep 17 00:00:00 2001 From: "Michael F. Booth" Date: Tue, 12 Oct 2021 13:21:18 -0400 Subject: [PATCH 2/4] Cancel events which fail to drain or cordon --- cmd/node-termination-handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 3dca94da..c9291118 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -365,6 +365,7 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto } if err != nil { + interruptionEventStore.CancelInterruptionEvent(drainEvent.EventID) <-interruptionEventStore.Workers } else { interruptionEventStore.MarkAllAsProcessed(nodeName) From b6735c745ab2ea55d61bc509647b30c3cf773176 Mon Sep 17 00:00:00 2001 From: "Michael F. Booth" Date: Tue, 12 Oct 2021 13:50:44 -0400 Subject: [PATCH 3/4] Move event store logging and GC into the store itself --- cmd/node-termination-handler.go | 21 ------ .../interruption-event-store.go | 67 ++++++++++++------- 2 files changed, 44 insertions(+), 44 deletions(-) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index c9291118..33d46e51 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -222,13 +222,6 @@ func main() { log.Info().Msg("Started watching for event cancellations") var wg sync.WaitGroup - eventStoreGCInterval := 0 - eventStoreGCPeriod := 7200 - eventStoreLogInterval := 0 - eventStoreLogPeriod := 1800 - if nthConfig.LogLevel == "debug" { - eventStoreLogPeriod = 60 - } for range time.NewTicker(1 * time.Second).C { select { case <-signalChan: @@ -254,20 +247,6 @@ func main() { } } } - if eventStoreGCInterval >= eventStoreGCPeriod { - log.Info().Msg("Garbage-collecting the interruption event store") - interruptionEventStore.GC() - eventStoreGCInterval = 0 - } - if eventStoreLogInterval >= eventStoreLogPeriod { - log.Info(). - Int("size", interruptionEventStore.Size()). - Int("drainable-events", interruptionEventStore.CountDrainableEvents()). - Msg("event store statistics") - eventStoreLogInterval = 0 - } - eventStoreGCInterval++ - eventStoreLogInterval++ } log.Info().Msg("AWS Node Termination Handler is shutting down") wg.Wait() diff --git a/pkg/interruptioneventstore/interruption-event-store.go b/pkg/interruptioneventstore/interruption-event-store.go index 2e9063e5..2226c334 100644 --- a/pkg/interruptioneventstore/interruption-event-store.go +++ b/pkg/interruptioneventstore/interruption-event-store.go @@ -31,16 +31,26 @@ type Store struct { ignoredEvents map[string]struct{} atLeastOneEvent bool Workers chan int + callsSinceLastClean int + callsSinceLastLog int + cleaningPeriod int + loggingPeriod int } // New Creates a new interruption event store func New(nthConfig config.Config) *Store { - return &Store{ + store := &Store{ NthConfig: nthConfig, interruptionEventStore: make(map[string]*monitor.InterruptionEvent), ignoredEvents: make(map[string]struct{}), Workers: make(chan int, nthConfig.Workers), + cleaningPeriod: 7200, + loggingPeriod: 1800, } + if nthConfig.LogLevel == "debug" { + store.loggingPeriod = 60 + } + return store } // CancelInterruptionEvent removes an interruption event from the internal store @@ -68,15 +78,10 @@ func (s *Store) AddInterruptionEvent(interruptionEvent *monitor.InterruptionEven } } -// Size returns the total number of events in the internal store -func (s *Store) Size() int { - s.RLock() - defer s.RUnlock() - return len(s.interruptionEventStore) -} - // GetActiveEvent returns true if there are interruption events in the internal store func (s *Store) GetActiveEvent() (*monitor.InterruptionEvent, bool) { + s.cleanPeriodically() + s.logPeriodically() s.RLock() defer s.RUnlock() for _, interruptionEvent := range s.interruptionEventStore { @@ -87,19 +92,6 @@ func (s *Store) GetActiveEvent() (*monitor.InterruptionEvent, bool) { return &monitor.InterruptionEvent{}, false } -// CountDrainableEvents returns the number of drainable events in the internal store -func (s *Store) CountDrainableEvents() int { - s.RLock() - defer s.RUnlock() - n := 0 - for _, interruptionEvent := range s.interruptionEventStore { - if s.shouldEventDrain(interruptionEvent) { - n += 1 - } - } - return n -} - // ShouldDrainNode returns true if there are drainable events in the internal store func (s *Store) ShouldDrainNode() bool { s.RLock() @@ -169,10 +161,17 @@ func (s *Store) ShouldUncordonNode(nodeName string) bool { return true } -// GC garbage-collects the store by removing events with NodeProcessed=true -func (s *Store) GC() { +// cleanPeriodically removes old events from the store every N times it is called +// +// Cleaning consists of removing events with NodeProcessed=true +func (s *Store) cleanPeriodically() { s.Lock() defer s.Unlock() + s.callsSinceLastClean++ + if s.callsSinceLastClean < s.cleaningPeriod { + return + } + log.Info().Msg("Garbage-collecting the interruption event store") toDelete := []string{} for _, e := range s.interruptionEventStore { if e.NodeProcessed { @@ -182,4 +181,26 @@ func (s *Store) GC() { for _, id := range toDelete { delete(s.interruptionEventStore, id) } + s.callsSinceLastClean = 0 +} + +// logPeriodically logs statistics about the store every N times it is called. +func (s *Store) logPeriodically() { + s.Lock() + defer s.Unlock() + s.callsSinceLastLog++ + if s.callsSinceLastLog < s.loggingPeriod { + return + } + drainableEventCount := 0 + for _, interruptionEvent := range s.interruptionEventStore { + if s.shouldEventDrain(interruptionEvent) { + drainableEventCount += 1 + } + } + log.Info(). + Int("size", len(s.interruptionEventStore)). + Int("drainable-events", drainableEventCount). + Msg("event store statistics") + s.callsSinceLastLog = 0 } From 3697b833664bedecbd3e91ab020695e17e00b5d9 Mon Sep 17 00:00:00 2001 From: "Michael F. Booth" Date: Tue, 12 Oct 2021 18:53:08 -0400 Subject: [PATCH 4/4] Don't print more than 1 error per second when running out of workers --- cmd/node-termination-handler.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 33d46e51..21eb5a68 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -222,12 +222,14 @@ func main() { log.Info().Msg("Started watching for event cancellations") var wg sync.WaitGroup + for range time.NewTicker(1 * time.Second).C { select { case <-signalChan: // Exit interruption loop if a SIGTERM is received or the channel is closed break default: + EventLoop: for event, ok := interruptionEventStore.GetActiveEvent(); ok; event, ok = interruptionEventStore.GetActiveEvent() { select { case interruptionEventStore.Workers <- 1: @@ -243,7 +245,7 @@ func main() { go drainOrCordonIfNecessary(interruptionEventStore, event, *node, nthConfig, nodeMetadata, metrics, recorder, &wg) default: log.Warn().Msg("all workers busy, waiting") - break + break EventLoop } } }