Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 45 additions & 36 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"time"

"github.com/aws/aws-node-termination-handler/pkg/config"
"github.com/aws/aws-node-termination-handler/pkg/drainevent"
"github.com/aws/aws-node-termination-handler/pkg/draineventstore"
"github.com/aws/aws-node-termination-handler/pkg/ec2metadata"
"github.com/aws/aws-node-termination-handler/pkg/interruptionevent"
"github.com/aws/aws-node-termination-handler/pkg/interruptioneventstore"
"github.com/aws/aws-node-termination-handler/pkg/node"
"github.com/aws/aws-node-termination-handler/pkg/webhook"
)
Expand All @@ -34,7 +34,7 @@ const (
spotITN = "Spot ITN"
)

type monitorFunc func(chan<- drainevent.DrainEvent, chan<- drainevent.DrainEvent, *ec2metadata.Service) error
type monitorFunc func(chan<- interruptionevent.InterruptionEvent, chan<- interruptionevent.InterruptionEvent, *ec2metadata.Service) error

func main() {
signalChan := make(chan os.Signal, 1)
Expand All @@ -57,61 +57,61 @@ func main() {

imds := ec2metadata.New(nthConfig.MetadataURL, nthConfig.MetadataTries)

drainEventStore := draineventstore.New(nthConfig)
interruptionEventStore := interruptioneventstore.New(nthConfig)
nodeMetadata := imds.GetNodeMetadata()

if nthConfig.EnableScheduledEventDraining {
err = handleRebootUncordon(drainEventStore, *node)
err = handleRebootUncordon(interruptionEventStore, *node)
if err != nil {
log.Printf("Unable to complete the uncordon after reboot workflow on startup: %v\n", err)
}
}

drainChan := make(chan drainevent.DrainEvent)
defer close(drainChan)
cancelChan := make(chan drainevent.DrainEvent)
interruptionChan := make(chan interruptionevent.InterruptionEvent)
defer close(interruptionChan)
cancelChan := make(chan interruptionevent.InterruptionEvent)
defer close(cancelChan)

monitoringFns := map[string]monitorFunc{}
if nthConfig.EnableSpotInterruptionDraining {
monitoringFns[spotITN] = drainevent.MonitorForSpotITNEvents
monitoringFns[spotITN] = interruptionevent.MonitorForSpotITNEvents
}
if nthConfig.EnableScheduledEventDraining {
monitoringFns[scheduledMaintenance] = drainevent.MonitorForScheduledEvents
monitoringFns[scheduledMaintenance] = interruptionevent.MonitorForScheduledEvents
}

for eventType, fn := range monitoringFns {
go func(monitorFn monitorFunc, eventType string) {
log.Printf("Started monitoring for %s events", eventType)
for range time.Tick(time.Second * 2) {
err := monitorFn(drainChan, cancelChan, imds)
err := monitorFn(interruptionChan, cancelChan, imds)
if err != nil {
log.Printf("There was a problem monitoring for %s events: %v", eventType, err)
}
}
}(fn, eventType)
}

go watchForDrainEvents(drainChan, drainEventStore, nodeMetadata)
log.Println("Started watching for drain events")
go watchForInterruptionEvents(interruptionChan, interruptionEventStore, nodeMetadata)
log.Println("Started watching for interruption events")
log.Println("Kubernetes AWS Node Termination Handler has started successfully!")

go watchForCancellationEvents(cancelChan, drainEventStore, node, nodeMetadata)
go watchForCancellationEvents(cancelChan, interruptionEventStore, node, nodeMetadata)
log.Println("Started watching for event cancellations")

for range time.NewTicker(1 * time.Second).C {
select {
case _ = <-signalChan:
// Exit drain loop if a SIGTERM is received or the channel is closed
// Exit interruption loop if a SIGTERM is received or the channel is closed
break
default:
drainIfNecessary(drainEventStore, *node, nthConfig, nodeMetadata)
drainOrCordonIfNecessary(interruptionEventStore, *node, nthConfig, nodeMetadata)
}
}
log.Println("AWS Node Termination Handler is shutting down")
}

func handleRebootUncordon(drainEventStore *draineventstore.Store, node node.Node) error {
func handleRebootUncordon(interruptionEventStore *interruptioneventstore.Store, node node.Node) error {
isLabeled, err := node.IsLabeledWithAction()
if err != nil {
return err
Expand All @@ -127,51 +127,60 @@ func handleRebootUncordon(drainEventStore *draineventstore.Store, node node.Node
if err != nil {
return fmt.Errorf("Unable to complete node label actions: %w", err)
}
drainEventStore.IgnoreEvent(eventID)
interruptionEventStore.IgnoreEvent(eventID)
return nil
}

func watchForDrainEvents(drainChan <-chan drainevent.DrainEvent, drainEventStore *draineventstore.Store, nodeMetadata ec2metadata.NodeMetadata) {
func watchForInterruptionEvents(interruptionChan <-chan interruptionevent.InterruptionEvent, interruptionEventStore *interruptioneventstore.Store, nodeMetadata ec2metadata.NodeMetadata) {
for {
drainEvent := <-drainChan
log.Printf("Got drain event from channel %+v %+v\n", nodeMetadata, drainEvent)
drainEventStore.AddDrainEvent(&drainEvent)
interruptionEvent := <-interruptionChan
log.Printf("Got interruption event from channel %+v %+v\n", nodeMetadata, interruptionEvent)
interruptionEventStore.AddInterruptionEvent(&interruptionEvent)
}
}

func watchForCancellationEvents(cancelChan <-chan drainevent.DrainEvent, drainEventStore *draineventstore.Store, node *node.Node, nodeMetadata ec2metadata.NodeMetadata) {
func watchForCancellationEvents(cancelChan <-chan interruptionevent.InterruptionEvent, interruptionEventStore *interruptioneventstore.Store, node *node.Node, nodeMetadata ec2metadata.NodeMetadata) {
for {
drainEvent := <-cancelChan
log.Printf("Got cancel event from channel %+v %+v\n", nodeMetadata, drainEvent)
drainEventStore.CancelDrainEvent(drainEvent.EventID)
if drainEventStore.ShouldUncordonNode() {
interruptionEvent := <-cancelChan
log.Printf("Got cancel event from channel %+v %+v\n", nodeMetadata, interruptionEvent)
interruptionEventStore.CancelInterruptionEvent(interruptionEvent.EventID)
if interruptionEventStore.ShouldUncordonNode() {
log.Println("Uncordoning the node due to a cancellation event")
err := node.Uncordon()
if err != nil {
log.Printf("Uncordoning the node failed: %v", err)
}
node.RemoveNTHLabels()
} else {
log.Println("Another drain event is active, not uncordoning the node")
log.Println("Another interruption event is active, not uncordoning the node")
}
}
}

func drainIfNecessary(drainEventStore *draineventstore.Store, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata) {
if drainEvent, ok := drainEventStore.GetActiveEvent(); ok {
func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata) {
if drainEvent, ok := interruptionEventStore.GetActiveEvent(); ok {
if drainEvent.PreDrainTask != nil {
err := drainEvent.PreDrainTask(*drainEvent, node)
if err != nil {
log.Println("There was a problem executing the pre-drain task: ", err)
}
}
err := node.Drain()
if err != nil {
log.Println("There was a problem while trying to drain the node: ", err)
os.Exit(1)
if nthConfig.CordonOnly {
err := node.Cordon()
if err != nil {
log.Println("There was a problem while trying to cordon the node: ", err)
os.Exit(1)
}
log.Printf("Node %q successfully cordoned.\n", nthConfig.NodeName)
} else {
err := node.CordonAndDrain()
if err != nil {
log.Println("There was a problem while trying to cordon and drain the node: ", err)
os.Exit(1)
}
log.Printf("Node %q successfully cordoned and drained.\n", nthConfig.NodeName)
}
drainEventStore.MarkAllAsDrained()
log.Printf("Node %q successfully drained.\n", nthConfig.NodeName)
interruptionEventStore.MarkAllAsDrained()
if nthConfig.WebhookURL != "" {
webhook.Post(nodeMetadata, drainEvent, nthConfig)
}
Expand Down
2 changes: 1 addition & 1 deletion config/helm/aws-node-termination-handler/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v1
name: aws-node-termination-handler
description: A Helm chart for the AWS Node Termination Handler
version: 0.7.4
version: 0.7.5
appVersion: 1.3.1
home: https://github.com/aws/eks-charts
icon: https://raw.githubusercontent.com/aws/eks-charts/master/docs/logo/aws.png
Expand Down
5 changes: 5 additions & 0 deletions config/helm/aws-node-termination-handler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ Parameter | Description | Default
`webhookURL` | Posts event data to URL upon instance interruption action | ``
`webhookHeaders` | Replaces the default webhook headers. | `{"Content-type":"application/json"}`
`webhookTemplate` | Replaces the default webhook message template. | `{"text":"[NTH][Instance Interruption] EventID: {{ .EventID }} - Kind: {{ .Kind }} - Description: {{ .Description }} - State: {{ .State }} - Start Time: {{ .StartTime }}"}`
`dryRun` | If true, only log if a node would be drained | `false`
`enableScheduledEventDraining` | [EXPERIMENTAL] If true, drain nodes before the maintenance window starts for an EC2 instance scheduled event | `false`
`enableSpotInterruptionDraining` | If true, drain nodes when the spot interruption termination notice is received | `true`
`metadataTries` | The number of times to try requesting metadata. If you would like 2 retries, set metadata-tries to 3. | `3`
`cordonOnly` | If true, nodes will be cordoned but not drained when an interruption event occurs. | `false`
`affinity` | node/pod affinities | None
`podAnnotations` | annotations to add to each pod | `{}`
`priorityClassName` | Name of the priorityClass | `system-node-critical`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ spec:
value: {{ .Values.enableSpotInterruptionDraining | quote }}
- name: ENABLE_SCHEDULED_EVENT_DRAINING
value: {{ .Values.enableScheduledEventDraining | quote }}
- name: METADATA_TRIES
value: {{ .Values.metadataTries | quote }}
- name: CORDON_ONLY
value: {{ .Values.cordonOnly | quote }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- with .Values.nodeSelector }}
Expand Down
9 changes: 7 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
enableSpotInterruptionDrainingDefault = true
metadataTriesConfigKey = "METADATA_TRIES"
metadataTriesDefault = 3
cordonOnly = "CORDON_ONLY"
)

//Config arguments set via CLI, environment variables, or defaults
Expand All @@ -67,6 +68,7 @@ type Config struct {
EnableScheduledEventDraining bool
EnableSpotInterruptionDraining bool
MetadataTries int
CordonOnly bool
}

//ParseCliArgs parses cli arguments and uses environment variables as fallback values
Expand Down Expand Up @@ -94,8 +96,9 @@ func ParseCliArgs() (config Config, err error) {
flag.StringVar(&config.WebhookHeaders, "webhook-headers", getEnv(webhookHeadersConfigKey, webhookHeadersDefault), "If specified, replaces the default webhook headers.")
flag.StringVar(&config.WebhookTemplate, "webhook-template", getEnv(webhookTemplateConfigKey, webhookTemplateDefault), "If specified, replaces the default webhook message template.")
flag.BoolVar(&config.EnableScheduledEventDraining, "enable-scheduled-event-draining", getBoolEnv(enableScheduledEventDrainingConfigKey, enableScheduledEventDrainingDefault), "[EXPERIMENTAL] If true, drain nodes before the maintenance window starts for an EC2 instance scheduled event")
flag.BoolVar(&config.EnableSpotInterruptionDraining, "enable-spot-interruption-draining", getBoolEnv(enableSpotInterruptionDrainingConfigKey, enableSpotInterruptionDrainingDefault), "If true, drain nodes when the spot interruption termination notice is receieved")
flag.BoolVar(&config.EnableSpotInterruptionDraining, "enable-spot-interruption-draining", getBoolEnv(enableSpotInterruptionDrainingConfigKey, enableSpotInterruptionDrainingDefault), "If true, drain nodes when the spot interruption termination notice is received")
flag.IntVar(&config.MetadataTries, "metadata-tries", getIntEnv(metadataTriesConfigKey, metadataTriesDefault), "The number of times to try requesting metadata. If you would like 2 retries, set metadata-tries to 3.")
flag.BoolVar(&config.CordonOnly, "cordon-only", getBoolEnv(cordonOnly, false), "If true, nodes will be cordoned but not drained when an interruption event occurs.")

flag.Parse()

Expand Down Expand Up @@ -128,7 +131,8 @@ func ParseCliArgs() (config Config, err error) {
"\tnode-termination-grace-period: %d,\n"+
"\tenable-scheduled-event-draining: %t,\n"+
"\tenable-spot-interruption-draining: %t,\n"+
"\tmetadata-tries: %d,\n",
"\tmetadata-tries: %d,\n"+
"\tcordon-only: %t,\n",

config.DryRun,
config.NodeName,
Expand All @@ -142,6 +146,7 @@ func ParseCliArgs() (config Config, err error) {
config.EnableScheduledEventDraining,
config.EnableSpotInterruptionDraining,
config.MetadataTries,
config.CordonOnly,
)

return config, err
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestParseCliArgsEnvSuccess(t *testing.T) {
os.Setenv("WEBHOOK_HEADERS", "WEBHOOK_HEADERS")
os.Setenv("WEBHOOK_TEMPLATE", "WEBHOOK_TEMPLATE")
os.Setenv("METADATA_TRIES", "100")
os.Setenv("CORDON_ONLY", "false")
nthConfig, err := config.ParseCliArgs()
h.Ok(t, err)

Expand All @@ -64,6 +65,7 @@ func TestParseCliArgsEnvSuccess(t *testing.T) {
h.Equals(t, "WEBHOOK_HEADERS", nthConfig.WebhookHeaders)
h.Equals(t, "WEBHOOK_TEMPLATE", nthConfig.WebhookTemplate)
h.Equals(t, 100, nthConfig.MetadataTries)
h.Equals(t, false, nthConfig.CordonOnly)

// Check that env vars were set
value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST")
Expand Down Expand Up @@ -94,6 +96,7 @@ func TestParseCliArgsSuccess(t *testing.T) {
"--webhook-headers=WEBHOOK_HEADERS",
"--webhook-template=WEBHOOK_TEMPLATE",
"--metadata-tries=100",
"--cordon-only=false",
}
nthConfig, err := config.ParseCliArgs()
h.Ok(t, err)
Expand All @@ -114,6 +117,7 @@ func TestParseCliArgsSuccess(t *testing.T) {
h.Equals(t, "WEBHOOK_HEADERS", nthConfig.WebhookHeaders)
h.Equals(t, "WEBHOOK_TEMPLATE", nthConfig.WebhookTemplate)
h.Equals(t, 100, nthConfig.MetadataTries)
h.Equals(t, false, nthConfig.CordonOnly)

// Check that env vars were set
value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST")
Expand All @@ -139,6 +143,7 @@ func TestParseCliArgsOverrides(t *testing.T) {
os.Setenv("WEBHOOK_HEADERS", "no")
os.Setenv("WEBHOOK_TEMPLATE", "no")
os.Setenv("METADATA_TRIES", "100")
os.Setenv("CORDON_ONLY", "true")
os.Args = []string{
"cmd",
"--delete-local-data=false",
Expand All @@ -156,6 +161,7 @@ func TestParseCliArgsOverrides(t *testing.T) {
"--webhook-headers=WEBHOOK_HEADERS",
"--webhook-template=WEBHOOK_TEMPLATE",
"--metadata-tries=101",
"--cordon-only=false",
}
nthConfig, err := config.ParseCliArgs()
h.Ok(t, err)
Expand All @@ -176,6 +182,7 @@ func TestParseCliArgsOverrides(t *testing.T) {
h.Equals(t, "WEBHOOK_HEADERS", nthConfig.WebhookHeaders)
h.Equals(t, "WEBHOOK_TEMPLATE", nthConfig.WebhookTemplate)
h.Equals(t, 101, nthConfig.MetadataTries)
h.Equals(t, false, nthConfig.CordonOnly)

// Check that env vars were set
value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST")
Expand Down
Loading