Skip to content

Commit cfec2d8

Browse files
committed
more changes
Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 5363d20 commit cfec2d8

File tree

10 files changed

+84
-40
lines changed

10 files changed

+84
-40
lines changed

pkg/epp/backend/metrics/fake_metrics_scraper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (s *FakeMetricsScraper) Scrape(ctx context.Context, pod *backend.Pod, port
6060
return nil, fmt.Errorf("no pod found: %v", pod.NamespacedName)
6161
}
6262
log.FromContext(ctx).V(logutil.VERBOSE).Info("Fetching metrics for pod", "existing", s.existingMetrics, "new", res)
63-
return podinfo.Clone(res), nil
63+
return podinfo.CloneScrapedData(res), nil
6464
}
6565

6666
func (s *FakeMetricsScraper) ProcessResult(ctx context.Context, podinfo podinfo.ScrapedData) {} // noop

pkg/epp/backend/metrics/logger.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ const (
3939
type Datastore interface {
4040
PoolGet() (*v1alpha2.InferencePool, error)
4141
// Pod operations
42-
// PodGetAll returns all pods and metrics, including fresh and stale.
42+
// PodGetAll returns all pod info objects.
4343
PodGetAll() []podinfo.PodInfo
4444
PodList(func(podinfo.PodInfo) bool) []podinfo.PodInfo
4545
}
@@ -124,7 +124,7 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore) {
124124
}
125125

126126
func getMetricsFromPodInfo(podInfo podinfo.PodInfo) *Metrics {
127-
metrics, ok := podInfo.GetData()[MetricsDataKey]
127+
metrics, ok := podInfo.GetData(MetricsDataKey)
128128
if !ok {
129129
return nil // no entry in the map with metrics key
130130
}

pkg/epp/backend/metrics/metrics_scraper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (s *MetricsScraper) ProcessResult(ctx context.Context, data podinfo.Scraped
105105
// promToPodMetrics updates internal pod metrics with scraped Prometheus metrics.
106106
func (s *MetricsScraper) promToPodMetrics(metricFamilies map[string]*dto.MetricFamily) (podinfo.ScrapedData, error) {
107107
var errs error
108-
updated := podinfo.Clone(s.existingMetrics)
108+
updated := podinfo.CloneScrapedData(s.existingMetrics)
109109

110110
if s.MetricMapping.TotalQueuedRequests != nil {
111111
queued, err := s.getMetric(metricFamilies, *s.MetricMapping.TotalQueuedRequests)

pkg/epp/backend/pod-info/fake_pod_info.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package podinfo
1818

1919
import (
2020
"fmt"
21+
"sync"
2122

2223
corev1 "k8s.io/api/core/v1"
2324
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
@@ -29,19 +30,35 @@ var _ PodInfo = &FakePodInfo{}
2930
type FakePodInfo struct {
3031
Pod *backend.Pod
3132
Data map[string]ScrapedData
33+
Lock sync.RWMutex // dataLock is used to synchronize RW access to data map.
3234
}
3335

3436
func (fpi *FakePodInfo) GetPod() *backend.Pod {
3537
return fpi.Pod
3638
}
37-
func (fpi *FakePodInfo) GetData() map[string]ScrapedData {
38-
return fpi.Data
39+
func (fpi *FakePodInfo) GetData(key string) (ScrapedData, bool) {
40+
fpi.Lock.RLock()
41+
defer fpi.Lock.RUnlock()
42+
data, ok := fpi.Data[key]
43+
return data, ok
3944
}
45+
46+
func (fpi *FakePodInfo) GetDataKeys() []string {
47+
fpi.Lock.RLock()
48+
defer fpi.Lock.RUnlock()
49+
result := []string{}
50+
for key := range fpi.Data {
51+
result = append(result, key)
52+
}
53+
return result
54+
}
55+
4056
func (fpi *FakePodInfo) UpdatePod(pod *corev1.Pod) {
4157
fpi.Pod = toInternalPod(pod)
4258
}
59+
4360
func (fpi *FakePodInfo) Stop() {} // noop
4461

4562
func (fpi *FakePodInfo) String() string {
46-
return fmt.Sprintf("Pod: %v; Data: %v", fpi.GetPod(), fpi.GetData())
63+
return fmt.Sprintf("Pod: %v; Data: %v", fpi.GetPod(), fpi.Data)
4764
}

pkg/epp/backend/pod-info/pod_info.go

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,16 @@ var _ PodInfo = &podInfo{}
3535

3636
type PodInfo interface {
3737
GetPod() *backend.Pod
38-
GetData() map[string]ScrapedData
38+
GetData(key string) (ScrapedData, bool)
39+
GetDataKeys() []string
3940
UpdatePod(*corev1.Pod)
4041
Stop()
4142
String() string
4243
}
4344

4445
// data encapsulates all data that is scraped from a pod by the different scrapers.
45-
// Each scraper will have an entry in the map, where the key is the scraper.Name() and the value is the data that was scraped
46-
// by that scraper.
46+
// Each scraper will have an entry in the map, where the key is the scraper.Name()
47+
// and the value is the latest data that was scraped by that scraper.
4748
type data struct {
4849
data map[string]ScrapedData
4950
}
@@ -52,6 +53,7 @@ func newPodInfo(pod *corev1.Pod, scrapers map[Scraper]*ScraperConfig, ds Datasto
5253
info := &podInfo{
5354
scrapers: scrapers,
5455
ds: ds,
56+
dataLock: sync.RWMutex{},
5557
once: sync.Once{},
5658
}
5759
// initialize scrapers data, each scraper has an entry mapped from its name to its data
@@ -72,7 +74,8 @@ type podInfo struct {
7274
scrapers map[Scraper]*ScraperConfig
7375
ds Datastore
7476

75-
once sync.Once // ensure the start function is called only once.
77+
dataLock sync.RWMutex // dataLock is used to synchronize RW access to data map.
78+
once sync.Once // ensure the start function is called only once.
7679
stopFunc context.CancelFunc
7780
logger logr.Logger
7881
}
@@ -81,20 +84,39 @@ func (pi *podInfo) GetPod() *backend.Pod {
8184
return pi.pod.Load()
8285
}
8386

84-
func (pi *podInfo) GetData() map[string]ScrapedData {
85-
if data := pi.data.Load(); data != nil {
86-
return data.data
87+
func (pi *podInfo) GetData(key string) (ScrapedData, bool) {
88+
if data := pi.data.Load(); data != nil && data.data != nil {
89+
pi.dataLock.RLock()
90+
defer pi.dataLock.RUnlock()
91+
value, ok := data.data[key]
92+
return value, ok
8793
}
88-
// if nothing is stored in data field, return empty map
89-
return map[string]ScrapedData{}
94+
// if nothing is stored in data field, return nil
95+
return nil, false
96+
}
97+
98+
func (pi *podInfo) GetDataKeys() []string {
99+
if data := pi.data.Load(); data != nil && data.data != nil {
100+
pi.dataLock.RLock()
101+
defer pi.dataLock.RUnlock()
102+
result := []string{}
103+
for key := range data.data {
104+
result = append(result, key)
105+
}
106+
return result
107+
}
108+
// if nothing is stored in data field, not data keys
109+
return []string{}
90110
}
91111

92112
func (pi *podInfo) UpdatePod(pod *corev1.Pod) {
93113
pi.pod.Store(toInternalPod(pod))
94114
}
95115

96116
func (pi *podInfo) String() string {
97-
return fmt.Sprintf("Pod: %v; Data: %v", pi.GetPod(), pi.GetData())
117+
pi.dataLock.RLock()
118+
defer pi.dataLock.RUnlock()
119+
return fmt.Sprintf("Pod: %v; Data: %v", pi.GetPod(), pi.data.Load().data)
98120
}
99121

100122
func (pi *podInfo) Stop() {
@@ -147,6 +169,8 @@ func (pi *podInfo) startScrapeLoop(ctx context.Context, scraper Scraper, interva
147169

148170
func (pi *podInfo) storeScrapedData(key string, data ScrapedData) {
149171
updated := pi.data.Load()
172+
pi.dataLock.Lock()
173+
defer pi.dataLock.Unlock()
150174
updated.data[key] = data
151175
pi.data.Store(updated)
152176
}

pkg/epp/backend/pod-info/types.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,10 @@ type ScrapedData interface {
6060

6161
type Datastore interface {
6262
PoolGet() (*v1alpha2.InferencePool, error)
63-
// PodMetrics operations
64-
// PodGetAll returns all pods and metrics, including fresh and stale.
65-
// PodGetAll() []PodMetrics
66-
// PodList(func(PodMetrics) bool) []PodMetrics
6763
}
6864

69-
// Clone is a generic function that clones any struct that implements ScrapedData interface.
70-
func Clone[T ScrapedData](base T) T {
65+
// CloneScrapedData is a generic function that clones any struct that implements ScrapedData interface.
66+
func CloneScrapedData[T ScrapedData](base T) T {
7167
copy := base.Clone()
7268
return copy.(T)
7369
}

pkg/epp/datastore/datastore_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,8 @@ func TestMetrics(t *testing.T) {
353353
got := ds.PodGetAll()
354354
metrics := []*backendmetrics.Metrics{}
355355
for _, one := range got {
356-
if podMetrics, ok := one.GetData()[backendmetrics.MetricsDataKey].(*backendmetrics.Metrics); ok {
357-
metrics = append(metrics, podMetrics)
356+
if podMetrics, ok := one.GetData(backendmetrics.MetricsDataKey); ok {
357+
metrics = append(metrics, podMetrics.(*backendmetrics.Metrics))
358358
}
359359
}
360360
diff := cmp.Diff(test.want, metrics, cmpopts.IgnoreFields(backendmetrics.Metrics{}, "UpdateTime"), cmpopts.SortSlices(func(a, b *backendmetrics.Metrics) bool {

pkg/epp/metrics/collectors/inference_pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (c *inferencePoolMetricsCollector) CollectWithStability(ch chan<- metrics.M
8383
}
8484
}
8585
func getMetricsFromPodInfo(pod podinfo.PodInfo) *backendmetrics.Metrics {
86-
podMetrics, ok := pod.GetData()[backendmetrics.MetricsDataKey]
86+
podMetrics, ok := pod.GetData(backendmetrics.MetricsDataKey)
8787
if !ok {
8888
return nil // no entry in the map with metrics key
8989
}

pkg/epp/scheduling/scheduler_test.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package scheduling
1818

1919
import (
2020
"context"
21+
"sync"
2122
"testing"
2223

2324
"github.com/google/go-cmp/cmp"
@@ -72,6 +73,7 @@ func TestSchedule(t *testing.T) {
7273
},
7374
},
7475
},
76+
Lock: sync.RWMutex{},
7577
},
7678
{
7779
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}},
@@ -143,6 +145,7 @@ func TestSchedule(t *testing.T) {
143145
},
144146
},
145147
},
148+
Lock: sync.RWMutex{},
146149
},
147150
{
148151
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}},
@@ -215,6 +218,7 @@ func TestSchedule(t *testing.T) {
215218
},
216219
},
217220
},
221+
Lock: sync.RWMutex{},
218222
},
219223
{
220224
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}},
@@ -307,9 +311,9 @@ func TestSchedulePlugins(t *testing.T) {
307311
postSchedulePlugins: []plugins.PostSchedule{tp1, tp2},
308312
},
309313
input: []*podinfo.FakePodInfo{
310-
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
311-
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
312-
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
314+
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, Lock: sync.RWMutex{}},
315+
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, Lock: sync.RWMutex{}},
316+
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}, Lock: sync.RWMutex{}},
313317
},
314318
wantTargetPod: k8stypes.NamespacedName{Name: "pod1"},
315319
targetPodScore: 1.1,
@@ -329,9 +333,9 @@ func TestSchedulePlugins(t *testing.T) {
329333
postSchedulePlugins: []plugins.PostSchedule{tp1, tp2},
330334
},
331335
input: []*podinfo.FakePodInfo{
332-
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
333-
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
334-
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
336+
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, Lock: sync.RWMutex{}},
337+
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, Lock: sync.RWMutex{}},
338+
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}, Lock: sync.RWMutex{}},
335339
},
336340
wantTargetPod: k8stypes.NamespacedName{Name: "pod1"},
337341
targetPodScore: 50,
@@ -351,9 +355,9 @@ func TestSchedulePlugins(t *testing.T) {
351355
postSchedulePlugins: []plugins.PostSchedule{tp1, tp2},
352356
},
353357
input: []*podinfo.FakePodInfo{
354-
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
355-
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
356-
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
358+
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, Lock: sync.RWMutex{}},
359+
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, Lock: sync.RWMutex{}},
360+
{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}, Lock: sync.RWMutex{}},
357361
},
358362
numPodsToScore: 0,
359363
err: true, // no available pods to server after filter all

pkg/epp/scheduling/types/types.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,18 @@ func (pd *PodData) GetData() map[string]podinfo.ScrapedData {
9696
func ToSchedulerPodData(podsInfo []podinfo.PodInfo) []Pod {
9797
pods := make([]Pod, 0, len(podsInfo))
9898
for _, podInfo := range podsInfo {
99-
pods = append(pods, &PodData{Pod: podInfo.GetPod().Clone(), Data: clonePodData(podInfo.GetData())})
99+
pods = append(pods, &PodData{Pod: podInfo.GetPod().Clone(), Data: clonePodData(podInfo)})
100100
}
101101
return pods
102102
}
103103

104-
func clonePodData(podData map[string]podinfo.ScrapedData) map[string]podinfo.ScrapedData {
105-
copy := make(map[string]podinfo.ScrapedData, len(podData))
106-
for key, data := range podData {
107-
copy[key] = data.Clone()
104+
func clonePodData(podInfo podinfo.PodInfo) map[string]podinfo.ScrapedData {
105+
dataKeys := podInfo.GetDataKeys()
106+
copy := make(map[string]podinfo.ScrapedData, len(dataKeys))
107+
for _, key := range dataKeys {
108+
if data, ok := podInfo.GetData(key); ok {
109+
copy[key] = podinfo.CloneScrapedData(data)
110+
}
108111
}
109112

110113
return copy

0 commit comments

Comments
 (0)