Skip to content

Commit e2fea37

Browse files
authored
Check for ready endpoints (#3983)
* Refactor check for ready endpoints
1 parent f3abd93 commit e2fea37

File tree

2 files changed

+182
-59
lines changed

2 files changed

+182
-59
lines changed

internal/k8s/controller.go

Lines changed: 73 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
"github.com/nginxinc/kubernetes-ingress/pkg/apis/dos/v1beta1"
29+
"golang.org/x/exp/maps"
2930

3031
"github.com/nginxinc/kubernetes-ingress/internal/k8s/appprotect"
3132
"github.com/nginxinc/kubernetes-ingress/internal/k8s/appprotectcommon"
@@ -3596,46 +3597,66 @@ func (lbc *LoadBalancerController) getEndpointsForServiceWithSubselector(targetP
35963597
return endps, nil
35973598
}
35983599

3600+
// selectEndpointSlicesForPort returns EndpointSlices that match given targetPort.
3601+
func selectEndpointSlicesForPort(targetPort int32, esx []discovery_v1.EndpointSlice) []discovery_v1.EndpointSlice {
3602+
eps := make([]discovery_v1.EndpointSlice, 0, len(esx))
3603+
for _, es := range esx {
3604+
for _, p := range es.Ports {
3605+
if p.Port == nil {
3606+
continue
3607+
}
3608+
if *p.Port == targetPort {
3609+
eps = append(eps, es)
3610+
}
3611+
}
3612+
}
3613+
return eps
3614+
}
3615+
3616+
// filterReadyEndpoinsFrom returns ready Endpoints from given EndpointSlices.
3617+
func filterReadyEndpointsFrom(esx []discovery_v1.EndpointSlice) []discovery_v1.Endpoint {
3618+
epx := make([]discovery_v1.Endpoint, 0, len(esx))
3619+
for _, es := range esx {
3620+
for _, e := range es.Endpoints {
3621+
if e.Conditions.Ready == nil {
3622+
continue
3623+
}
3624+
if *e.Conditions.Ready {
3625+
epx = append(epx, e)
3626+
}
3627+
}
3628+
}
3629+
return epx
3630+
}
3631+
35993632
func getEndpointsFromEndpointSlicesForSubselectedPods(targetPort int32, pods []*api_v1.Pod, svcEndpointSlices []discovery_v1.EndpointSlice) (podEndpoints []podEndpoint) {
3600-
endpointSet := make(map[podEndpoint]struct{})
3601-
for _, pod := range pods {
3602-
for _, endpointSlice := range svcEndpointSlices {
3603-
for _, port := range endpointSlice.Ports {
3604-
if *port.Port != targetPort {
3605-
continue
3606-
}
3607-
for _, endpoint := range endpointSlice.Endpoints {
3608-
if !*endpoint.Conditions.Ready {
3609-
continue
3610-
}
3611-
for _, address := range endpoint.Addresses {
3612-
if pod.Status.PodIP == address {
3613-
addr := ipv6SafeAddrPort(pod.Status.PodIP, targetPort)
3614-
ownerType, ownerName := getPodOwnerTypeAndName(pod)
3615-
podEndpoint := podEndpoint{
3616-
Address: addr,
3617-
PodName: getPodName(endpoint.TargetRef),
3618-
MeshPodOwner: configs.MeshPodOwner{
3619-
OwnerType: ownerType,
3620-
OwnerName: ownerName,
3621-
},
3622-
}
3623-
endpointSet[podEndpoint] = struct{}{}
3624-
podEndpoints = append(podEndpoints, podEndpoint)
3633+
// Match ready endpoints IP ddresses with Pod's IP. If they match create a new podEnpoint.
3634+
makePodEndpoints := func(pods []*api_v1.Pod, endpoints []discovery_v1.Endpoint) []podEndpoint {
3635+
endpointSet := make(map[podEndpoint]struct{})
3636+
3637+
for _, pod := range pods {
3638+
for _, endpoint := range endpoints {
3639+
for _, address := range endpoint.Addresses {
3640+
if pod.Status.PodIP == address {
3641+
addr := ipv6SafeAddrPort(pod.Status.PodIP, targetPort)
3642+
ownerType, ownerName := getPodOwnerTypeAndName(pod)
3643+
podEndpoint := podEndpoint{
3644+
Address: addr,
3645+
PodName: getPodName(endpoint.TargetRef),
3646+
MeshPodOwner: configs.MeshPodOwner{
3647+
OwnerType: ownerType,
3648+
OwnerName: ownerName,
3649+
},
36253650
}
3651+
endpointSet[podEndpoint] = struct{}{}
36263652
}
36273653
}
36283654
}
36293655
}
3656+
return maps.Keys(endpointSet)
36303657
}
3631-
if len(endpointSet) == 0 {
3632-
return nil
3633-
}
3634-
endpoints := make([]podEndpoint, 0, len(endpointSet))
3635-
for ep := range endpointSet {
3636-
endpoints = append(endpoints, ep)
3637-
}
3638-
return endpoints
3658+
3659+
return makePodEndpoints(pods, filterReadyEndpointsFrom(selectEndpointSlicesForPort(targetPort, svcEndpointSlices)))
36393660
}
36403661

36413662
func ipv6SafeAddrPort(addr string, port int32) string {
@@ -3754,38 +3775,31 @@ func (lbc *LoadBalancerController) getEndpointsForPortFromEndpointSlices(endpoin
37543775
return nil, fmt.Errorf("no port %v in service %s", backendPort, svc.Name)
37553776
}
37563777

3757-
endpointSet := make(map[podEndpoint]struct{})
3758-
for _, endpointSlice := range endpointSlices {
3759-
for _, endpointSlicePort := range endpointSlice.Ports {
3760-
if *endpointSlicePort.Port == targetPort {
3761-
for _, endpoint := range endpointSlice.Endpoints {
3762-
if !*endpoint.Conditions.Ready {
3763-
continue
3764-
}
3765-
for _, endpointAddress := range endpoint.Addresses {
3766-
address := ipv6SafeAddrPort(endpointAddress, *endpointSlicePort.Port)
3767-
podEndpoint := podEndpoint{
3768-
Address: address,
3769-
}
3770-
if endpoint.TargetRef != nil {
3771-
parentType, parentName := lbc.getPodOwnerTypeAndNameFromAddress(endpoint.TargetRef.Namespace, endpoint.TargetRef.Name)
3772-
podEndpoint.OwnerType = parentType
3773-
podEndpoint.OwnerName = parentName
3774-
podEndpoint.PodName = endpoint.TargetRef.Name
3775-
}
3776-
endpointSet[podEndpoint] = struct{}{}
3777-
}
3778+
makePodEndpoints := func(port int32, epx []discovery_v1.Endpoint) []podEndpoint {
3779+
endpointSet := make(map[podEndpoint]struct{})
3780+
3781+
for _, ep := range epx {
3782+
for _, addr := range ep.Addresses {
3783+
address := ipv6SafeAddrPort(addr, port)
3784+
podEndpoint := podEndpoint{
3785+
Address: address,
3786+
}
3787+
if ep.TargetRef != nil {
3788+
parentType, parentName := lbc.getPodOwnerTypeAndNameFromAddress(ep.TargetRef.Namespace, ep.TargetRef.Name)
3789+
podEndpoint.OwnerType = parentType
3790+
podEndpoint.OwnerName = parentName
3791+
podEndpoint.PodName = ep.TargetRef.Name
37783792
}
3793+
endpointSet[podEndpoint] = struct{}{}
37793794
}
37803795
}
3796+
return maps.Keys(endpointSet)
37813797
}
3782-
if len(endpointSet) == 0 {
3798+
3799+
endpoints := makePodEndpoints(targetPort, filterReadyEndpointsFrom(selectEndpointSlicesForPort(targetPort, endpointSlices)))
3800+
if len(endpoints) == 0 {
37833801
return nil, fmt.Errorf("no endpointslices for target port %v in service %s", targetPort, svc.Name)
37843802
}
3785-
endpoints := make([]podEndpoint, 0, len(endpointSet))
3786-
for ep := range endpointSet {
3787-
endpoints = append(endpoints, ep)
3788-
}
37893803
return endpoints, nil
37903804
}
37913805

internal/k8s/controller_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,6 +1161,115 @@ func TestGetEndpointSlicesBySubselectedPods_FindOnePodInOneEndpointSlice(t *test
11611161
}
11621162
}
11631163

1164+
func TestGetEndpointSlicesBySubselectedPods_GetsEndpointsOnNilValues(t *testing.T) {
1165+
t.Parallel()
1166+
endpointPort := int32(8080)
1167+
endpointReady := true
1168+
boolPointer := func(b bool) *bool { return &b }
1169+
1170+
tests := []struct {
1171+
desc string
1172+
targetPort int32
1173+
svcEndpointSlices []discovery_v1.EndpointSlice
1174+
pods []*api_v1.Pod
1175+
want []podEndpoint
1176+
}{
1177+
{
1178+
desc: "no endpoints selected on nil endpoint port",
1179+
targetPort: 8080,
1180+
want: []podEndpoint{},
1181+
pods: []*api_v1.Pod{
1182+
{
1183+
ObjectMeta: meta_v1.ObjectMeta{
1184+
OwnerReferences: []meta_v1.OwnerReference{
1185+
{
1186+
Kind: "Deployment",
1187+
Name: "deploy-1",
1188+
Controller: boolPointer(true),
1189+
},
1190+
},
1191+
},
1192+
Status: api_v1.PodStatus{
1193+
PodIP: "1.2.3.4",
1194+
},
1195+
},
1196+
},
1197+
svcEndpointSlices: []discovery_v1.EndpointSlice{
1198+
{
1199+
Ports: []discovery_v1.EndpointPort{
1200+
{
1201+
Port: nil,
1202+
},
1203+
},
1204+
Endpoints: []discovery_v1.Endpoint{
1205+
{
1206+
Addresses: []string{
1207+
"1.2.3.4",
1208+
},
1209+
Conditions: discovery_v1.EndpointConditions{
1210+
Ready: &endpointReady,
1211+
},
1212+
},
1213+
},
1214+
},
1215+
},
1216+
},
1217+
{
1218+
desc: "no endpoints selected on nil endpoint condition",
1219+
targetPort: 8080,
1220+
want: []podEndpoint{},
1221+
pods: []*api_v1.Pod{
1222+
{
1223+
ObjectMeta: meta_v1.ObjectMeta{
1224+
OwnerReferences: []meta_v1.OwnerReference{
1225+
{
1226+
Kind: "Deployment",
1227+
Name: "deploy-1",
1228+
Controller: boolPointer(true),
1229+
},
1230+
},
1231+
},
1232+
Status: api_v1.PodStatus{
1233+
PodIP: "1.2.3.4",
1234+
},
1235+
},
1236+
},
1237+
svcEndpointSlices: []discovery_v1.EndpointSlice{
1238+
{
1239+
Ports: []discovery_v1.EndpointPort{
1240+
{
1241+
Port: &endpointPort,
1242+
},
1243+
},
1244+
Endpoints: []discovery_v1.Endpoint{
1245+
{
1246+
Addresses: []string{
1247+
"1.2.3.4",
1248+
},
1249+
Conditions: discovery_v1.EndpointConditions{
1250+
Ready: nil,
1251+
},
1252+
},
1253+
},
1254+
},
1255+
},
1256+
},
1257+
}
1258+
1259+
for _, test := range tests {
1260+
t.Run(test.desc, func(t *testing.T) {
1261+
got := getEndpointsFromEndpointSlicesForSubselectedPods(test.targetPort, test.pods, test.svcEndpointSlices)
1262+
if !cmp.Equal(got, test.want) {
1263+
t.Error(cmp.Diff(got, test.want))
1264+
}
1265+
})
1266+
}
1267+
}
1268+
1269+
func TestFilterReadyEndpoints(t *testing.T) {
1270+
t.Parallel()
1271+
}
1272+
11641273
func TestGetEndpointSlicesBySubselectedPods_FindOnePodInTwoEndpointSlicesWithDuplicateEndpoints(t *testing.T) {
11651274
t.Parallel()
11661275
endpointPort := int32(8080)

0 commit comments

Comments
 (0)