Skip to content

Commit 75c7b35

Browse files
authored
Prometheus add nodes gauge for SQS mode (#1083)
* Prometheus add nodes gauge for SQS mode
1 parent e191577 commit 75c7b35

File tree

8 files changed

+818
-15
lines changed

8 files changed

+818
-15
lines changed

cmd/node-termination-handler.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,10 @@ func main() {
120120
log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,")
121121
}
122122

123-
metrics, err := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort)
124-
if err != nil {
123+
metrics, initMetricsErr := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort)
124+
if initMetricsErr != nil {
125125
nthConfig.Print()
126-
log.Fatal().Err(err).Msg("Unable to instantiate observability metrics,")
126+
log.Fatal().Err(initMetricsErr).Msg("Unable to instantiate observability metrics,")
127127
}
128128

129129
err = observability.InitProbes(nthConfig.EnableProbes, nthConfig.ProbesPort, nthConfig.ProbesEndpoint)
@@ -215,6 +215,12 @@ func main() {
215215
}
216216
log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName)
217217

218+
ec2Client := ec2.New(sess)
219+
220+
if initMetricsErr == nil && nthConfig.EnablePrometheus {
221+
go metrics.InitNodeMetrics(nthConfig, node, ec2Client)
222+
}
223+
218224
completeLifecycleActionDelay := time.Duration(nthConfig.CompleteLifecycleActionDelaySeconds) * time.Second
219225
sqsMonitor := sqsevent.SQSMonitor{
220226
CheckIfManaged: nthConfig.CheckTagBeforeDraining,
@@ -224,7 +230,7 @@ func main() {
224230
CancelChan: cancelChan,
225231
SQS: sqsevent.GetSqsClient(sess),
226232
ASG: autoscaling.New(sess),
227-
EC2: ec2.New(sess),
233+
EC2: ec2Client,
228234
BeforeCompleteLifecycleAction: func() { <-time.After(completeLifecycleActionDelay) },
229235
}
230236
monitoringFns[sqsEvents] = sqsMonitor

pkg/ec2helper/ec2helper.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
14+
package ec2helper
15+
16+
import (
17+
"fmt"
18+
19+
"github.com/aws/aws-sdk-go/aws"
20+
"github.com/aws/aws-sdk-go/service/ec2"
21+
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
22+
)
23+
24+
type IEC2Helper interface {
25+
GetInstanceIdsMapByTagKey(tag string) (map[string]bool, error)
26+
}
27+
28+
type EC2Helper struct {
29+
ec2ServiceClient ec2iface.EC2API
30+
}
31+
32+
func New(ec2 ec2iface.EC2API) EC2Helper {
33+
return EC2Helper{
34+
ec2ServiceClient: ec2,
35+
}
36+
}
37+
38+
func (h EC2Helper) GetInstanceIdsByTagKey(tag string) ([]string, error) {
39+
ids := []string{}
40+
var nextToken string
41+
42+
for {
43+
result, err := h.ec2ServiceClient.DescribeInstances(&ec2.DescribeInstancesInput{
44+
Filters: []*ec2.Filter{
45+
{
46+
Name: aws.String("tag-key"),
47+
Values: []*string{aws.String(tag)},
48+
},
49+
},
50+
NextToken: &nextToken,
51+
})
52+
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
if result == nil || result.Reservations == nil {
58+
return nil, fmt.Errorf("describe instances success but return empty response for tag key: %s", tag)
59+
}
60+
61+
for _, reservation := range result.Reservations {
62+
if reservation.Instances == nil {
63+
continue
64+
}
65+
for _, instance := range reservation.Instances {
66+
if instance == nil || instance.InstanceId == nil {
67+
continue
68+
}
69+
ids = append(ids, *instance.InstanceId)
70+
}
71+
}
72+
73+
if result.NextToken == nil {
74+
break
75+
}
76+
nextToken = *result.NextToken
77+
}
78+
79+
return ids, nil
80+
}
81+
82+
func (h EC2Helper) GetInstanceIdsMapByTagKey(tag string) (map[string]bool, error) {
83+
idMap := map[string]bool{}
84+
ids, err := h.GetInstanceIdsByTagKey(tag)
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
if ids == nil {
90+
return nil, fmt.Errorf("get instance ids success but return empty response for tag key: %s", tag)
91+
}
92+
93+
for _, id := range ids {
94+
idMap[id] = true
95+
}
96+
97+
return idMap, nil
98+
}

pkg/ec2helper/ec2helper_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
14+
package ec2helper_test
15+
16+
import (
17+
"testing"
18+
19+
"github.com/aws/aws-node-termination-handler/pkg/ec2helper"
20+
h "github.com/aws/aws-node-termination-handler/pkg/test"
21+
"github.com/aws/aws-sdk-go/aws"
22+
"github.com/aws/aws-sdk-go/aws/awserr"
23+
"github.com/aws/aws-sdk-go/service/ec2"
24+
)
25+
26+
const (
27+
instanceId1 = "i-1"
28+
instanceId2 = "i-2"
29+
)
30+
31+
func TestGetInstanceIdsByTagKey(t *testing.T) {
32+
ec2Mock := h.MockedEC2{
33+
DescribeInstancesResp: getDescribeInstancesResp(),
34+
}
35+
ec2Helper := ec2helper.New(ec2Mock)
36+
instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
37+
h.Ok(t, err)
38+
39+
h.Equals(t, 2, len(instanceIds))
40+
h.Equals(t, instanceId1, instanceIds[0])
41+
h.Equals(t, instanceId2, instanceIds[1])
42+
}
43+
44+
func TestGetInstanceIdsByTagKeyAPIError(t *testing.T) {
45+
ec2Mock := h.MockedEC2{
46+
DescribeInstancesResp: getDescribeInstancesResp(),
47+
DescribeInstancesErr: awserr.New("ThrottlingException", "Rate exceeded", nil),
48+
}
49+
ec2Helper := ec2helper.New(ec2Mock)
50+
_, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
51+
h.Nok(t, err)
52+
}
53+
54+
func TestGetInstanceIdsByTagKeyNilResponse(t *testing.T) {
55+
ec2Mock := h.MockedEC2{}
56+
ec2Helper := ec2helper.New(ec2Mock)
57+
_, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
58+
h.Nok(t, err)
59+
}
60+
61+
func TestGetInstanceIdsByTagKeyNilReservations(t *testing.T) {
62+
ec2Mock := h.MockedEC2{
63+
DescribeInstancesResp: ec2.DescribeInstancesOutput{
64+
Reservations: nil,
65+
},
66+
}
67+
ec2Helper := ec2helper.New(ec2Mock)
68+
_, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
69+
h.Nok(t, err)
70+
}
71+
72+
func TestGetInstanceIdsByTagKeyEmptyReservation(t *testing.T) {
73+
ec2Mock := h.MockedEC2{
74+
DescribeInstancesResp: ec2.DescribeInstancesOutput{
75+
Reservations: []*ec2.Reservation{},
76+
},
77+
}
78+
ec2Helper := ec2helper.New(ec2Mock)
79+
instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
80+
h.Ok(t, err)
81+
h.Equals(t, 0, len(instanceIds))
82+
}
83+
84+
func TestGetInstanceIdsByTagKeyEmptyInstances(t *testing.T) {
85+
ec2Mock := h.MockedEC2{
86+
DescribeInstancesResp: ec2.DescribeInstancesOutput{
87+
Reservations: []*ec2.Reservation{
88+
{
89+
Instances: []*ec2.Instance{},
90+
},
91+
},
92+
},
93+
}
94+
ec2Helper := ec2helper.New(ec2Mock)
95+
instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
96+
h.Ok(t, err)
97+
h.Equals(t, 0, len(instanceIds))
98+
}
99+
100+
func TestGetInstanceIdsByTagKeyNilInstancesId(t *testing.T) {
101+
ec2Mock := h.MockedEC2{
102+
DescribeInstancesResp: ec2.DescribeInstancesOutput{
103+
Reservations: []*ec2.Reservation{
104+
{
105+
Instances: []*ec2.Instance{
106+
{
107+
InstanceId: nil,
108+
},
109+
{
110+
InstanceId: aws.String(instanceId1),
111+
},
112+
},
113+
},
114+
},
115+
},
116+
}
117+
ec2Helper := ec2helper.New(ec2Mock)
118+
instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
119+
h.Ok(t, err)
120+
h.Equals(t, 1, len(instanceIds))
121+
}
122+
123+
func TestGetInstanceIdsMapByTagKey(t *testing.T) {
124+
ec2Mock := h.MockedEC2{
125+
DescribeInstancesResp: getDescribeInstancesResp(),
126+
}
127+
ec2Helper := ec2helper.New(ec2Mock)
128+
instanceIdsMap, err := ec2Helper.GetInstanceIdsMapByTagKey("myNTHManagedTag")
129+
h.Ok(t, err)
130+
131+
_, exist := instanceIdsMap[instanceId1]
132+
h.Equals(t, true, exist)
133+
_, exist = instanceIdsMap[instanceId2]
134+
h.Equals(t, true, exist)
135+
_, exist = instanceIdsMap["non-existent instance id"]
136+
h.Equals(t, false, exist)
137+
}
138+
139+
func getDescribeInstancesResp() ec2.DescribeInstancesOutput {
140+
return ec2.DescribeInstancesOutput{
141+
Reservations: []*ec2.Reservation{
142+
{
143+
Instances: []*ec2.Instance{
144+
{
145+
InstanceId: aws.String(instanceId1),
146+
},
147+
{
148+
InstanceId: aws.String(instanceId2),
149+
},
150+
},
151+
},
152+
},
153+
}
154+
}

pkg/node/node.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"context"
1818
"encoding/json"
1919
"fmt"
20+
"regexp"
2021
"strconv"
2122
"strings"
2223
"time"
@@ -74,6 +75,7 @@ const (
7475
var (
7576
maxRetryDeadline time.Duration = 5 * time.Second
7677
conflictRetryInterval time.Duration = 750 * time.Millisecond
78+
instanceIDRegex = regexp.MustCompile(`^i-.*`)
7779
)
7880

7981
// Node represents a kubernetes node with functions to manipulate its state via the kubernetes api server
@@ -639,6 +641,43 @@ func (n Node) fetchKubernetesNode(nodeName string) (*corev1.Node, error) {
639641
return &matchingNodes.Items[0], nil
640642
}
641643

644+
// fetchKubernetesNode will send an http request to the k8s api server and return list of AWS EC2 instance id
645+
func (n Node) FetchKubernetesNodeInstanceIds() ([]string, error) {
646+
ids := []string{}
647+
648+
if n.nthConfig.DryRun {
649+
log.Info().Msgf("Would have retrieved nodes, but dry-run flag was set")
650+
return ids, nil
651+
}
652+
matchingNodes, err := n.drainHelper.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
653+
if err != nil {
654+
log.Warn().Msgf("Unable to list Nodes")
655+
return nil, err
656+
}
657+
658+
if matchingNodes == nil || matchingNodes.Items == nil {
659+
return nil, fmt.Errorf("list nodes success but return empty response")
660+
}
661+
662+
for _, node := range matchingNodes.Items {
663+
// sample providerID: aws:///us-west-2a/i-0abcd1234efgh5678
664+
parts := strings.Split(node.Spec.ProviderID, "/")
665+
if len(parts) != 5 {
666+
log.Warn().Msgf("Invalid providerID format found for node %s: %s (expected format: aws:///region/instance-id)", node.Name, node.Spec.ProviderID)
667+
continue
668+
}
669+
670+
instanceId := parts[len(parts)-1]
671+
if instanceIDRegex.MatchString(instanceId) {
672+
ids = append(ids, parts[len(parts)-1])
673+
} else {
674+
log.Warn().Msgf("Invalid instance id format found for node %s: %s (expected format: ^i-.*)", node.Name, instanceId)
675+
}
676+
}
677+
678+
return ids, nil
679+
}
680+
642681
func (n Node) fetchAllPods(nodeName string) (*corev1.PodList, error) {
643682
if n.nthConfig.DryRun {
644683
log.Info().Msgf("Would have retrieved running pod list on node %s, but dry-run flag was set", nodeName)

0 commit comments

Comments
 (0)