Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,23 @@ func main() {
// Exit interruption loop if a SIGTERM is received or the channel is closed
break
default:
for event, ok := interruptionEventStore.GetActiveEvent(); ok && !event.InProgress; event, ok = interruptionEventStore.GetActiveEvent() {
EventLoop:
for event, ok := interruptionEventStore.GetActiveEvent(); ok; event, ok = interruptionEventStore.GetActiveEvent() {
select {
case interruptionEventStore.Workers <- 1:
log.Info().
Str("event-id", event.EventID).
Str("kind", event.Kind).
Str("node-name", event.NodeName).
Str("instance-id", event.InstanceID).
Msg("Requesting instance drain")
event.InProgress = true
wg.Add(1)
recorder.Emit(event.NodeName, observability.Normal, observability.GetReasonForKind(event.Kind), event.Description)
go drainOrCordonIfNecessary(interruptionEventStore, event, *node, nthConfig, nodeMetadata, metrics, recorder, &wg)
default:
log.Warn().Msg("all workers busy, waiting")
break
break EventLoop
}
}
}
Expand Down Expand Up @@ -339,6 +346,7 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto
}

if err != nil {
interruptionEventStore.CancelInterruptionEvent(drainEvent.EventID)
<-interruptionEventStore.Workers
} else {
interruptionEventStore.MarkAllAsProcessed(nodeName)
Expand Down
60 changes: 58 additions & 2 deletions pkg/interruptioneventstore/interruption-event-store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,26 @@ type Store struct {
ignoredEvents map[string]struct{}
atLeastOneEvent bool
Workers chan int
callsSinceLastClean int
callsSinceLastLog int
cleaningPeriod int
loggingPeriod int
}

// New Creates a new interruption event store
func New(nthConfig config.Config) *Store {
return &Store{
store := &Store{
NthConfig: nthConfig,
interruptionEventStore: make(map[string]*monitor.InterruptionEvent),
ignoredEvents: make(map[string]struct{}),
Workers: make(chan int, nthConfig.Workers),
cleaningPeriod: 7200,
loggingPeriod: 1800,
}
if nthConfig.LogLevel == "debug" {
store.loggingPeriod = 60
}
return store
}

// CancelInterruptionEvent removes an interruption event from the internal store
Expand Down Expand Up @@ -70,6 +80,8 @@ func (s *Store) AddInterruptionEvent(interruptionEvent *monitor.InterruptionEven

// GetActiveEvent returns true if there are interruption events in the internal store
func (s *Store) GetActiveEvent() (*monitor.InterruptionEvent, bool) {
s.cleanPeriodically()
s.logPeriodically()
s.RLock()
defer s.RUnlock()
for _, interruptionEvent := range s.interruptionEventStore {
Expand All @@ -94,7 +106,7 @@ func (s *Store) ShouldDrainNode() bool {

func (s *Store) shouldEventDrain(interruptionEvent *monitor.InterruptionEvent) bool {
_, ignored := s.ignoredEvents[interruptionEvent.EventID]
if !ignored && !interruptionEvent.NodeProcessed && s.TimeUntilDrain(interruptionEvent) <= 0 {
if !ignored && !interruptionEvent.InProgress && !interruptionEvent.NodeProcessed && s.TimeUntilDrain(interruptionEvent) <= 0 {
return true
}
return false
Expand Down Expand Up @@ -148,3 +160,47 @@ func (s *Store) ShouldUncordonNode(nodeName string) bool {

return true
}

// cleanPeriodically removes old events from the store every N times it is called
//
// Cleaning consists of removing events with NodeProcessed=true
func (s *Store) cleanPeriodically() {
s.Lock()
defer s.Unlock()
s.callsSinceLastClean++
if s.callsSinceLastClean < s.cleaningPeriod {
return
}
log.Info().Msg("Garbage-collecting the interruption event store")
toDelete := []string{}
for _, e := range s.interruptionEventStore {
if e.NodeProcessed {
toDelete = append(toDelete, e.EventID)
}
}
for _, id := range toDelete {
delete(s.interruptionEventStore, id)
}
s.callsSinceLastClean = 0
}

// logPeriodically logs statistics about the store every N times it is called.
func (s *Store) logPeriodically() {
s.Lock()
defer s.Unlock()
s.callsSinceLastLog++
if s.callsSinceLastLog < s.loggingPeriod {
return
}
drainableEventCount := 0
for _, interruptionEvent := range s.interruptionEventStore {
if s.shouldEventDrain(interruptionEvent) {
drainableEventCount += 1
}
}
log.Info().
Int("size", len(s.interruptionEventStore)).
Int("drainable-events", drainableEventCount).
Msg("event store statistics")
s.callsSinceLastLog = 0
}