Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
)

Expand Down Expand Up @@ -267,23 +268,33 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto
if nthConfig.CordonOnly || drainEvent.IsRebalanceRecommendation() {
err := node.Cordon(nodeName)
if err != nil {
log.Log().Err(err).Msg("There was a problem while trying to cordon the node")
os.Exit(1)
}
log.Log().Str("node_name", nodeName).Msg("Node successfully cordoned")
err = node.LogPods(nodeName)
if err != nil {
log.Log().Err(err).Msg("There was a problem while trying to log all pod names on the node")
if errors.IsNotFound(err) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@haugenj found another situation where events for other nodes that do not belong to this cluster could bubble up an error - I think for this case we can only do best effort to ignore such cases without removing all the safety guards

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the right approach. Originally I was thinking we would totally remove the os.Exit(1) here, so your solution feels like a better middle-ground

log.Warn().Err(err).Msgf("node '%s' not found in the cluster", nodeName)
} else {
log.Log().Err(err).Msg("There was a problem while trying to cordon the node")
os.Exit(1)
}
} else {
log.Log().Str("node_name", nodeName).Msg("Node successfully cordoned")
err = node.LogPods(nodeName)
if err != nil {
log.Log().Err(err).Msg("There was a problem while trying to log all pod names on the node")
}
metrics.NodeActionsInc("cordon", nodeName, err)
}
metrics.NodeActionsInc("cordon", nodeName, err)
} else {
err := node.CordonAndDrain(nodeName)
if err != nil {
log.Log().Err(err).Msg("There was a problem while trying to cordon and drain the node")
os.Exit(1)
if errors.IsNotFound(err) {
log.Warn().Err(err).Msgf("node '%s' not found in the cluster", nodeName)
} else {
log.Log().Err(err).Msg("There was a problem while trying to cordon and drain the node")
os.Exit(1)
}
} else {
log.Log().Str("node_name", nodeName).Msg("Node successfully cordoned and drained")
metrics.NodeActionsInc("cordon-and-drain", nodeName, err)
}
log.Log().Str("node_name", nodeName).Msg("Node successfully cordoned and drained")
metrics.NodeActionsInc("cordon-and-drain", nodeName, err)
}

interruptionEventStore.MarkAllAsDrained(nodeName)
Expand Down
26 changes: 24 additions & 2 deletions pkg/monitor/sqsevent/sqs-monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package sqsevent

import (
"encoding/json"
"errors"
"fmt"

"github.com/aws/aws-node-termination-handler/pkg/monitor"
Expand All @@ -33,6 +34,9 @@ const (
SQSTerminateKind = "SQS_TERMINATE"
)

// ErrNodeStateNotRunning forwards condition that the instance is terminated thus metadata missing
var ErrNodeStateNotRunning = errors.New("node metadata unavailable")

// SQSMonitor is a struct definition that knows how to process events from Amazon EventBridge
type SQSMonitor struct {
InterruptionChan chan<- monitor.InterruptionEvent
Expand All @@ -54,6 +58,10 @@ func (m SQSMonitor) Kind() string {
func (m SQSMonitor) Monitor() error {
interruptionEvent, err := m.checkForSQSMessage()
if err != nil {
if errors.Is(err, ErrNodeStateNotRunning) {
log.Warn().Err(err).Msg("dropping event for an already terminated node")
return nil
}
return err
}
if interruptionEvent != nil && interruptionEvent.Kind == SQSTerminateKind {
Expand Down Expand Up @@ -175,10 +183,24 @@ func (m SQSMonitor) retrieveNodeName(instanceID string) (string, error) {
}

instance := result.Reservations[0].Instances[0]
log.Debug().Msgf("Got nodename from private ip %s", *instance.PrivateDnsName)
nodeName := *instance.PrivateDnsName
log.Debug().Msgf("Got nodename from private ip %s", nodeName)
instanceJSON, _ := json.MarshalIndent(*instance, " ", " ")
log.Debug().Msgf("Got nodename from ec2 describe call: %s", instanceJSON)
return *instance.PrivateDnsName, nil

if nodeName == "" {
state := "unknown"
// safe access instance.State potentially being nil
if instance.State != nil {
state = *instance.State.Name
}
// anything except running might not contain PrivateDnsName
if state != ec2.InstanceStateNameRunning {
return "", ErrNodeStateNotRunning
}
return "", fmt.Errorf("unable to retrieve PrivateDnsName name for '%s' in state '%s'", instanceID, state)
}
return nodeName, nil
}

// isInstanceManaged returns whether the instance specified should be managed by node termination handler
Expand Down
36 changes: 36 additions & 0 deletions pkg/monitor/sqsevent/sqs-monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,42 @@ func TestMonitor_EC2NoDNSName(t *testing.T) {
h.Ok(t, err)
}

func TestMonitor_EC2NoDNSNameOnTerminatedInstance(t *testing.T) {
msg, err := getSQSMessageFromEvent(asgLifecycleEvent)
h.Ok(t, err)
messages := []*sqs.Message{
&msg,
}
sqsMock := h.MockedSQS{
ReceiveMessageResp: sqs.ReceiveMessageOutput{Messages: messages},
ReceiveMessageErr: nil,
DeleteMessageResp: sqs.DeleteMessageOutput{},
}
ec2Mock := h.MockedEC2{
DescribeInstancesResp: getDescribeInstancesResp(""),
}
ec2Mock.DescribeInstancesResp.Reservations[0].Instances[0].State = &ec2.InstanceState{
Name: aws.String("running"),
}
drainChan := make(chan monitor.InterruptionEvent)

sqsMonitor := sqsevent.SQSMonitor{
SQS: sqsMock,
EC2: ec2Mock,
ASG: mockIsManagedTrue(nil),
CheckIfManaged: true,
QueueURL: "https://test-queue",
InterruptionChan: drainChan,
}
go func() {
result := <-drainChan
h.Equals(t, result.Kind, sqsevent.SQSTerminateKind)
}()

err = sqsMonitor.Monitor()
h.Nok(t, err)
}

func TestMonitor_SQSDeleteFailure(t *testing.T) {
msg, err := getSQSMessageFromEvent(asgLifecycleEvent)
h.Ok(t, err)
Expand Down