diff --git a/pkg/ext-proc/backend/fake.go b/pkg/ext-proc/backend/fake.go index dfb520eff..2ddf2932a 100644 --- a/pkg/ext-proc/backend/fake.go +++ b/pkg/ext-proc/backend/fake.go @@ -6,15 +6,16 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) type FakePodMetricsClient struct { Err map[types.NamespacedName]error - Res map[types.NamespacedName]*PodMetrics + Res map[types.NamespacedName]*datastore.PodMetrics } -func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, existing *PodMetrics) (*PodMetrics, error) { +func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, existing *datastore.PodMetrics) (*datastore.PodMetrics, error) { if err, ok := f.Err[existing.NamespacedName]; ok { return nil, err } diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index bb575d191..103659dbd 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" "go.uber.org/multierr" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -17,7 +18,7 @@ const ( fetchMetricsTimeout = 5 * time.Second ) -func NewProvider(pmc PodMetricsClient, datastore Datastore) *Provider { +func NewProvider(pmc PodMetricsClient, datastore datastore.Datastore) *Provider { p := &Provider{ pmc: pmc, datastore: datastore, @@ -28,11 +29,11 @@ func NewProvider(pmc PodMetricsClient, datastore Datastore) *Provider { // Provider provides backend pods and information such as metrics. type Provider struct { pmc PodMetricsClient - datastore Datastore + datastore datastore.Datastore } type PodMetricsClient interface { - FetchMetrics(ctx context.Context, existing *PodMetrics) (*PodMetrics, error) + FetchMetrics(ctx context.Context, existing *datastore.PodMetrics) (*datastore.PodMetrics, error) } func (p *Provider) Init(ctx context.Context, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error { @@ -100,7 +101,7 @@ func (p *Provider) refreshMetricsOnce(logger logr.Logger) error { errCh := make(chan error) processOnePod := func(key, value any) bool { loggerTrace.Info("Pod and metric being processed", "pod", key, "metric", value) - existing := value.(*PodMetrics) + existing := value.(*datastore.PodMetrics) wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/ext-proc/backend/provider_test.go b/pkg/ext-proc/backend/provider_test.go index 2aa2c2139..95936f7e9 100644 --- a/pkg/ext-proc/backend/provider_test.go +++ b/pkg/ext-proc/backend/provider_test.go @@ -11,16 +11,17 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" ) var ( - pod1 = &PodMetrics{ - Pod: Pod{ + pod1 = &datastore.PodMetrics{ + Pod: datastore.Pod{ NamespacedName: types.NamespacedName{ Name: "pod1", }, }, - Metrics: Metrics{ + Metrics: datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.2, MaxActiveModels: 2, @@ -30,13 +31,13 @@ var ( }, }, } - pod2 = &PodMetrics{ - Pod: Pod{ + pod2 = &datastore.PodMetrics{ + Pod: datastore.Pod{ NamespacedName: types.NamespacedName{ Name: "pod2", }, }, - Metrics: Metrics{ + Metrics: datastore.Metrics{ WaitingQueueSize: 1, KVCacheUsagePercent: 0.2, MaxActiveModels: 2, @@ -52,21 +53,19 @@ func TestProvider(t *testing.T) { tests := []struct { name string pmc PodMetricsClient - datastore Datastore - want []*PodMetrics + datastore datastore.Datastore + want []*datastore.PodMetrics }{ { name: "Probing metrics success", pmc: &FakePodMetricsClient{ - Res: map[types.NamespacedName]*PodMetrics{ + Res: map[types.NamespacedName]*datastore.PodMetrics{ pod1.NamespacedName: pod1, pod2.NamespacedName: pod2, }, }, - datastore: &datastore{ - pods: populateMap(pod1, pod2), - }, - want: []*PodMetrics{ + datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil), + want: []*datastore.PodMetrics{ pod1, pod2, }, @@ -74,15 +73,13 @@ func TestProvider(t *testing.T) { { name: "Only pods in the datastore are probed", pmc: &FakePodMetricsClient{ - Res: map[types.NamespacedName]*PodMetrics{ + Res: map[types.NamespacedName]*datastore.PodMetrics{ pod1.NamespacedName: pod1, pod2.NamespacedName: pod2, }, }, - datastore: &datastore{ - pods: populateMap(pod1), - }, - want: []*PodMetrics{ + datastore: datastore.NewFakeDatastore(populateMap(pod1), nil, nil), + want: []*datastore.PodMetrics{ pod1, }, }, @@ -92,19 +89,18 @@ func TestProvider(t *testing.T) { Err: map[types.NamespacedName]error{ pod2.NamespacedName: errors.New("injected error"), }, - Res: map[types.NamespacedName]*PodMetrics{ + Res: map[types.NamespacedName]*datastore.PodMetrics{ pod1.NamespacedName: pod1, }, }, - datastore: &datastore{ - pods: populateMap(pod1, pod2), - }, - want: []*PodMetrics{ + datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil), + + want: []*datastore.PodMetrics{ pod1, // Failed to fetch pod2 metrics so it remains the default values. { - Pod: Pod{NamespacedName: pod2.NamespacedName}, - Metrics: Metrics{ + Pod: datastore.Pod{NamespacedName: pod2.NamespacedName}, + Metrics: datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0, MaxActiveModels: 0, @@ -122,7 +118,7 @@ func TestProvider(t *testing.T) { _ = p.Init(ctx, time.Millisecond, time.Millisecond) assert.EventuallyWithT(t, func(t *assert.CollectT) { metrics := test.datastore.PodGetAll() - diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(func(a, b *PodMetrics) bool { + diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(func(a, b *datastore.PodMetrics) bool { return a.String() < b.String() })) assert.Equal(t, "", diff, "Unexpected diff (+got/-want)") @@ -131,10 +127,10 @@ func TestProvider(t *testing.T) { } } -func populateMap(pods ...*PodMetrics) *sync.Map { +func populateMap(pods ...*datastore.PodMetrics) *sync.Map { newMap := &sync.Map{} for _, pod := range pods { - newMap.Store(pod.NamespacedName, &PodMetrics{Pod: Pod{NamespacedName: pod.NamespacedName, Address: pod.Address}}) + newMap.Store(pod.NamespacedName, &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: pod.NamespacedName, Address: pod.Address}}) } return newMap } diff --git a/pkg/ext-proc/backend/vllm/metrics.go b/pkg/ext-proc/backend/vllm/metrics.go index 3737425dd..4785e4840 100644 --- a/pkg/ext-proc/backend/vllm/metrics.go +++ b/pkg/ext-proc/backend/vllm/metrics.go @@ -14,7 +14,7 @@ import ( "github.com/prometheus/common/expfmt" "go.uber.org/multierr" "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -38,8 +38,8 @@ type PodMetricsClientImpl struct{} // FetchMetrics fetches metrics from a given pod. func (p *PodMetricsClientImpl) FetchMetrics( ctx context.Context, - existing *backend.PodMetrics, -) (*backend.PodMetrics, error) { + existing *datastore.PodMetrics, +) (*datastore.PodMetrics, error) { logger := log.FromContext(ctx) loggerDefault := logger.V(logutil.DEFAULT) @@ -79,8 +79,8 @@ func (p *PodMetricsClientImpl) FetchMetrics( func promToPodMetrics( logger logr.Logger, metricFamilies map[string]*dto.MetricFamily, - existing *backend.PodMetrics, -) (*backend.PodMetrics, error) { + existing *datastore.PodMetrics, +) (*datastore.PodMetrics, error) { var errs error updated := existing.Clone() runningQueueSize, err := getLatestMetric(logger, metricFamilies, RunningQueueSizeMetricName) diff --git a/pkg/ext-proc/backend/vllm/metrics_test.go b/pkg/ext-proc/backend/vllm/metrics_test.go index 0a718cd79..23121ad5c 100644 --- a/pkg/ext-proc/backend/vllm/metrics_test.go +++ b/pkg/ext-proc/backend/vllm/metrics_test.go @@ -7,7 +7,7 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" - "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -17,9 +17,9 @@ func TestPromToPodMetrics(t *testing.T) { testCases := []struct { name string metricFamilies map[string]*dto.MetricFamily - expectedMetrics *backend.Metrics + expectedMetrics *datastore.Metrics expectedErr error - initialPodMetrics *backend.PodMetrics + initialPodMetrics *datastore.PodMetrics }{ { name: "all metrics available", @@ -107,7 +107,7 @@ func TestPromToPodMetrics(t *testing.T) { }, }, }, - expectedMetrics: &backend.Metrics{ + expectedMetrics: &datastore.Metrics{ RunningQueueSize: 15, WaitingQueueSize: 25, KVCacheUsagePercent: 0.9, @@ -117,7 +117,7 @@ func TestPromToPodMetrics(t *testing.T) { }, MaxActiveModels: 2, }, - initialPodMetrics: &backend.PodMetrics{}, + initialPodMetrics: &datastore.PodMetrics{}, expectedErr: nil, }, { @@ -206,7 +206,7 @@ func TestPromToPodMetrics(t *testing.T) { }, }, }, - expectedMetrics: &backend.Metrics{ + expectedMetrics: &datastore.Metrics{ RunningQueueSize: 15, WaitingQueueSize: 25, KVCacheUsagePercent: 0.9, @@ -216,7 +216,7 @@ func TestPromToPodMetrics(t *testing.T) { }, MaxActiveModels: 0, }, - initialPodMetrics: &backend.PodMetrics{}, + initialPodMetrics: &datastore.PodMetrics{}, expectedErr: errors.New("strconv.Atoi: parsing '2a': invalid syntax"), }, } diff --git a/pkg/ext-proc/backend/inferencemodel_reconciler.go b/pkg/ext-proc/controller/inferencemodel_reconciler.go similarity index 95% rename from pkg/ext-proc/backend/inferencemodel_reconciler.go rename to pkg/ext-proc/controller/inferencemodel_reconciler.go index 884e6b7ed..a46229887 100644 --- a/pkg/ext-proc/backend/inferencemodel_reconciler.go +++ b/pkg/ext-proc/controller/inferencemodel_reconciler.go @@ -1,4 +1,4 @@ -package backend +package controller import ( "context" @@ -12,6 +12,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -19,7 +20,7 @@ type InferenceModelReconciler struct { client.Client Scheme *runtime.Scheme Record record.EventRecorder - Datastore Datastore + Datastore datastore.Datastore PoolNamespacedName types.NamespacedName } diff --git a/pkg/ext-proc/backend/inferencemodel_reconciler_test.go b/pkg/ext-proc/controller/inferencemodel_reconciler_test.go similarity index 74% rename from pkg/ext-proc/backend/inferencemodel_reconciler_test.go rename to pkg/ext-proc/controller/inferencemodel_reconciler_test.go index 5afe3b5ab..c3ebb6463 100644 --- a/pkg/ext-proc/backend/inferencemodel_reconciler_test.go +++ b/pkg/ext-proc/controller/inferencemodel_reconciler_test.go @@ -1,4 +1,4 @@ -package backend +package controller import ( "context" @@ -13,6 +13,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -51,58 +52,50 @@ func TestUpdateDatastore_InferenceModelReconciler(t *testing.T) { tests := []struct { name string - datastore *datastore + datastore datastore.Datastore incomingService *v1alpha1.InferenceModel wantInferenceModels *sync.Map }{ { name: "No Services registered; valid, new service incoming.", - datastore: &datastore{ - pool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{"app": "vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "Old and boring", - }, + datastore: datastore.NewFakeDatastore(nil, nil, &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{"app": "vllm"}, }, - models: &sync.Map{}, - }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + ResourceVersion: "Old and boring", + }, + }), + incomingService: infModel1, wantInferenceModels: populateServiceMap(infModel1), }, { name: "Removing existing service.", - datastore: &datastore{ - pool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{"app": "vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "Old and boring", - }, + datastore: datastore.NewFakeDatastore(nil, populateServiceMap(infModel1), &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{"app": "vllm"}, }, - models: populateServiceMap(infModel1), - }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + ResourceVersion: "Old and boring", + }, + }), incomingService: infModel1Modified, wantInferenceModels: populateServiceMap(), }, { name: "Unrelated service, do nothing.", - datastore: &datastore{ - pool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{"app": "vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "Old and boring", - }, + datastore: datastore.NewFakeDatastore(nil, populateServiceMap(infModel1), &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{"app": "vllm"}, }, - models: populateServiceMap(infModel1), - }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + ResourceVersion: "Old and boring", + }, + }), incomingService: &v1alpha1.InferenceModel{ Spec: v1alpha1.InferenceModelSpec{ ModelName: "fake model", @@ -116,33 +109,38 @@ func TestUpdateDatastore_InferenceModelReconciler(t *testing.T) { }, { name: "Add to existing", - datastore: &datastore{ - pool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{"app": "vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "Old and boring", - }, + datastore: datastore.NewFakeDatastore(nil, populateServiceMap(infModel1), &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{"app": "vllm"}, }, - models: populateServiceMap(infModel1), - }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + ResourceVersion: "Old and boring", + }, + }), incomingService: infModel2, wantInferenceModels: populateServiceMap(infModel1, infModel2), }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + pool, err := test.datastore.PoolGet() + if err != nil { + t.Fatalf("failed to get pool: %v", err) + } reconciler := &InferenceModelReconciler{ Datastore: test.datastore, - PoolNamespacedName: types.NamespacedName{Name: test.datastore.pool.Name}, + PoolNamespacedName: types.NamespacedName{Name: pool.Name}, } reconciler.updateDatastore(logger, test.incomingService) - if ok := mapsEqual(test.datastore.models, test.wantInferenceModels); !ok { - t.Error("Maps are not equal") - } + test.wantInferenceModels.Range(func(k, v any) bool { + _, exist := test.datastore.ModelGet(k.(string)) + if !exist { + t.Fatalf("failed to get model %s", k) + } + return true + }) }) } } @@ -156,12 +154,9 @@ func TestReconcile_ResourceNotFound(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() // Create a minimal datastore. - datastore := &datastore{ - models: &sync.Map{}, - pool: &v1alpha1.InferencePool{ - ObjectMeta: metav1.ObjectMeta{Name: "test-pool"}, - }, - } + datastore := datastore.NewFakeDatastore(nil, nil, &v1alpha1.InferencePool{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pool"}, + }) // Create the reconciler. reconciler := &InferenceModelReconciler{ @@ -211,12 +206,9 @@ func TestReconcile_ModelMarkedForDeletion(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existingModel).Build() // Create a minimal datastore. - datastore := &datastore{ - models: &sync.Map{}, - pool: &v1alpha1.InferencePool{ - ObjectMeta: metav1.ObjectMeta{Name: "test-pool"}, - }, - } + datastore := datastore.NewFakeDatastore(nil, nil, &v1alpha1.InferencePool{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pool"}, + }) // Create the reconciler. reconciler := &InferenceModelReconciler{ @@ -268,12 +260,9 @@ func TestReconcile_ResourceExists(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existingModel).Build() // Create a minimal datastore. - datastore := &datastore{ - models: &sync.Map{}, - pool: &v1alpha1.InferencePool{ - ObjectMeta: metav1.ObjectMeta{Name: "test-pool"}, - }, - } + datastore := datastore.NewFakeDatastore(nil, nil, &v1alpha1.InferencePool{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pool"}, + }) // Create the reconciler. reconciler := &InferenceModelReconciler{ @@ -312,24 +301,3 @@ func populateServiceMap(services ...*v1alpha1.InferenceModel) *sync.Map { } return returnVal } - -func mapsEqual(map1, map2 *sync.Map) bool { - equal := true - - map1.Range(func(k, v any) bool { - if _, ok := map2.Load(k); !ok { - equal = false - return false - } - return true - }) - map2.Range(func(k, v any) bool { - if _, ok := map1.Load(k); !ok { - equal = false - return false - } - return true - }) - - return equal -} diff --git a/pkg/ext-proc/backend/inferencepool_reconciler.go b/pkg/ext-proc/controller/inferencepool_reconciler.go similarity index 96% rename from pkg/ext-proc/backend/inferencepool_reconciler.go rename to pkg/ext-proc/controller/inferencepool_reconciler.go index 6f52862e7..5c9e4969d 100644 --- a/pkg/ext-proc/backend/inferencepool_reconciler.go +++ b/pkg/ext-proc/controller/inferencepool_reconciler.go @@ -1,4 +1,4 @@ -package backend +package controller import ( "context" @@ -12,6 +12,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -23,7 +24,7 @@ type InferencePoolReconciler struct { Scheme *runtime.Scheme Record record.EventRecorder PoolNamespacedName types.NamespacedName - Datastore Datastore + Datastore datastore.Datastore } func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { diff --git a/pkg/ext-proc/backend/inferencepool_reconciler_test.go b/pkg/ext-proc/controller/inferencepool_reconciler_test.go similarity index 84% rename from pkg/ext-proc/backend/inferencepool_reconciler_test.go rename to pkg/ext-proc/controller/inferencepool_reconciler_test.go index b6403489b..ec2fdfe1c 100644 --- a/pkg/ext-proc/backend/inferencepool_reconciler_test.go +++ b/pkg/ext-proc/controller/inferencepool_reconciler_test.go @@ -1,4 +1,4 @@ -package backend +package controller import ( "context" @@ -15,19 +15,20 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" utiltesting "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/testing" ) var ( - selector_v1 = map[v1alpha1.LabelKey]v1alpha1.LabelValue{"app": "vllm_v1"} - selector_v2 = map[v1alpha1.LabelKey]v1alpha1.LabelValue{"app": "vllm_v2"} + selector_v1 = map[string]string{"app": "vllm_v1"} + selector_v2 = map[string]string{"app": "vllm_v2"} pool1 = &v1alpha1.InferencePool{ ObjectMeta: metav1.ObjectMeta{ Name: "pool1", Namespace: "pool1-ns", }, Spec: v1alpha1.InferencePoolSpec{ - Selector: selector_v1, + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{"app": "vllm_v1"}, TargetPortNumber: 8080, }, } @@ -39,14 +40,14 @@ var ( } pods = []corev1.Pod{ // Two ready pods matching pool1 - utiltesting.MakePod("pod1", "pool1-ns").Labels(stripLabelKeyAliasFromLabelMap(selector_v1)).ReadyCondition().Obj(), - utiltesting.MakePod("pod2", "pool1-ns").Labels(stripLabelKeyAliasFromLabelMap(selector_v1)).ReadyCondition().Obj(), + utiltesting.MakePod("pod1", "pool1-ns").Labels(selector_v1).ReadyCondition().Obj(), + utiltesting.MakePod("pod2", "pool1-ns").Labels(selector_v1).ReadyCondition().Obj(), // A not ready pod matching pool1 - utiltesting.MakePod("pod3", "pool1-ns").Labels(stripLabelKeyAliasFromLabelMap(selector_v1)).Obj(), + utiltesting.MakePod("pod3", "pool1-ns").Labels(selector_v1).Obj(), // A pod not matching pool1 namespace - utiltesting.MakePod("pod4", "pool2-ns").Labels(stripLabelKeyAliasFromLabelMap(selector_v1)).ReadyCondition().Obj(), + utiltesting.MakePod("pod4", "pool2-ns").Labels(selector_v1).ReadyCondition().Obj(), // A ready pod matching pool1 with a new selector - utiltesting.MakePod("pod5", "pool1-ns").Labels(stripLabelKeyAliasFromLabelMap(selector_v2)).ReadyCondition().Obj(), + utiltesting.MakePod("pod5", "pool1-ns").Labels(selector_v2).ReadyCondition().Obj(), } ) @@ -74,7 +75,7 @@ func TestReconcile_InferencePoolReconciler(t *testing.T) { req := ctrl.Request{NamespacedName: namespacedName} ctx := context.Background() - datastore := NewDatastore() + datastore := datastore.NewDatastore() inferencePoolReconciler := &InferencePoolReconciler{PoolNamespacedName: namespacedName, Client: fakeClient, Datastore: datastore} // Step 1: Inception, only ready pods matching pool1 are added to the store. @@ -98,7 +99,7 @@ func TestReconcile_InferencePoolReconciler(t *testing.T) { if err := fakeClient.Get(ctx, req.NamespacedName, newPool1); err != nil { t.Errorf("Unexpected pool get error: %v", err) } - newPool1.Spec.Selector = selector_v2 + newPool1.Spec.Selector = map[v1alpha1.LabelKey]v1alpha1.LabelValue{"app": "vllm_v2"} if err := fakeClient.Update(ctx, newPool1, &client.UpdateOptions{}); err != nil { t.Errorf("Unexpected pool update error: %v", err) } @@ -140,7 +141,7 @@ func TestReconcile_InferencePoolReconciler(t *testing.T) { } } -func diffPool(datastore Datastore, wantPool *v1alpha1.InferencePool, wantPods []string) string { +func diffPool(datastore datastore.Datastore, wantPool *v1alpha1.InferencePool, wantPods []string) string { gotPool, _ := datastore.PoolGet() if diff := cmp.Diff(wantPool, gotPool); diff != "" { return diff diff --git a/pkg/ext-proc/backend/pod_reconciler.go b/pkg/ext-proc/controller/pod_reconciler.go similarity index 95% rename from pkg/ext-proc/backend/pod_reconciler.go rename to pkg/ext-proc/controller/pod_reconciler.go index 8705ce838..209d2ca71 100644 --- a/pkg/ext-proc/backend/pod_reconciler.go +++ b/pkg/ext-proc/controller/pod_reconciler.go @@ -1,4 +1,4 @@ -package backend +package controller import ( "context" @@ -12,12 +12,13 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) type PodReconciler struct { client.Client - Datastore Datastore + Datastore datastore.Datastore Scheme *runtime.Scheme Record record.EventRecorder } diff --git a/pkg/ext-proc/backend/pod_reconciler_test.go b/pkg/ext-proc/controller/pod_reconciler_test.go similarity index 57% rename from pkg/ext-proc/backend/pod_reconciler_test.go rename to pkg/ext-proc/controller/pod_reconciler_test.go index cc7381f66..b146745a9 100644 --- a/pkg/ext-proc/backend/pod_reconciler_test.go +++ b/pkg/ext-proc/controller/pod_reconciler_test.go @@ -1,7 +1,8 @@ -package backend +package controller import ( "context" + "sync" "testing" "github.com/google/go-cmp/cmp" @@ -15,37 +16,35 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" ) var ( - basePod1 = &PodMetrics{Pod: Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-1"}} - basePod2 = &PodMetrics{Pod: Pod{NamespacedName: types.NamespacedName{Name: "pod2"}, Address: "address-2"}} - basePod3 = &PodMetrics{Pod: Pod{NamespacedName: types.NamespacedName{Name: "pod3"}, Address: "address-3"}} - basePod11 = &PodMetrics{Pod: Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-11"}} + basePod1 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-1"}} + basePod2 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}, Address: "address-2"}} + basePod3 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}, Address: "address-3"}} + basePod11 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-11"}} ) func TestUpdateDatastore_PodReconciler(t *testing.T) { now := metav1.Now() tests := []struct { name string - datastore Datastore + datastore datastore.Datastore incomingPod *corev1.Pod - wantPods []Pod + wantPods []datastore.Pod req *ctrl.Request }{ { name: "Add new pod", - datastore: &datastore{ - pods: populateMap(basePod1, basePod2), - pool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ - "some-key": "some-val", - }, + datastore: datastore.NewFakeDatastore(populateMap(basePod1, basePod2), nil, &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", }, }, - }, + }), incomingPod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: basePod3.NamespacedName.Name, @@ -63,21 +62,18 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) { }, }, }, - wantPods: []Pod{basePod1.Pod, basePod2.Pod, basePod3.Pod}, + wantPods: []datastore.Pod{basePod1.Pod, basePod2.Pod, basePod3.Pod}, }, { name: "Update pod1 address", - datastore: &datastore{ - pods: populateMap(basePod1, basePod2), - pool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ - "some-key": "some-val", - }, + datastore: datastore.NewFakeDatastore(populateMap(basePod1, basePod2), nil, &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", }, }, - }, + }), incomingPod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: basePod11.NamespacedName.Name, @@ -95,21 +91,18 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) { }, }, }, - wantPods: []Pod{basePod11.Pod, basePod2.Pod}, + wantPods: []datastore.Pod{basePod11.Pod, basePod2.Pod}, }, { name: "Delete pod with DeletionTimestamp", - datastore: &datastore{ - pods: populateMap(basePod1, basePod2), - pool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ - "some-key": "some-val", - }, + datastore: datastore.NewFakeDatastore(populateMap(basePod1, basePod2), nil, &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", }, }, - }, + }), incomingPod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod1", @@ -128,37 +121,31 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) { }, }, }, - wantPods: []Pod{basePod2.Pod}, + wantPods: []datastore.Pod{basePod2.Pod}, }, { name: "Delete notfound pod", - datastore: &datastore{ - pods: populateMap(basePod1, basePod2), - pool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ - "some-key": "some-val", - }, + datastore: datastore.NewFakeDatastore(populateMap(basePod1, basePod2), nil, &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", }, }, - }, + }), req: &ctrl.Request{NamespacedName: types.NamespacedName{Name: "pod1"}}, - wantPods: []Pod{basePod2.Pod}, + wantPods: []datastore.Pod{basePod2.Pod}, }, { name: "New pod, not ready, valid selector", - datastore: &datastore{ - pods: populateMap(basePod1, basePod2), - pool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ - "some-key": "some-val", - }, + datastore: datastore.NewFakeDatastore(populateMap(basePod1, basePod2), nil, &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", }, }, - }, + }), incomingPod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod3", @@ -175,21 +162,18 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) { }, }, }, - wantPods: []Pod{basePod1.Pod, basePod2.Pod}, + wantPods: []datastore.Pod{basePod1.Pod, basePod2.Pod}, }, { name: "Remove pod that does not match selector", - datastore: &datastore{ - pods: populateMap(basePod1, basePod2), - pool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ - "some-key": "some-val", - }, + datastore: datastore.NewFakeDatastore(populateMap(basePod1, basePod2), nil, &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", }, }, - }, + }), incomingPod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod1", @@ -206,21 +190,18 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) { }, }, }, - wantPods: []Pod{basePod2.Pod}, + wantPods: []datastore.Pod{basePod2.Pod}, }, { name: "Remove pod that is not ready", - datastore: &datastore{ - pods: populateMap(basePod1, basePod2), - pool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ - "some-key": "some-val", - }, + datastore: datastore.NewFakeDatastore(populateMap(basePod1, basePod2), nil, &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", }, }, - }, + }), incomingPod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod1", @@ -237,7 +218,7 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) { }, }, }, - wantPods: []Pod{basePod2.Pod}, + wantPods: []datastore.Pod{basePod2.Pod}, }, } for _, test := range tests { @@ -263,17 +244,25 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - var gotPods []Pod + var gotPods []datastore.Pod test.datastore.PodRange(func(k, v any) bool { - pod := v.(*PodMetrics) + pod := v.(*datastore.PodMetrics) if v != nil { gotPods = append(gotPods, pod.Pod) } return true }) - if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b Pod) bool { return a.NamespacedName.String() < b.NamespacedName.String() })) { + if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b datastore.Pod) bool { return a.NamespacedName.String() < b.NamespacedName.String() })) { t.Errorf("got (%v) != want (%v);", gotPods, test.wantPods) } }) } } + +func populateMap(pods ...*datastore.PodMetrics) *sync.Map { + newMap := &sync.Map{} + for _, pod := range pods { + newMap.Store(pod.NamespacedName, &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: pod.NamespacedName, Address: pod.Address}}) + } + return newMap +} diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/datastore/datastore.go similarity index 91% rename from pkg/ext-proc/backend/datastore.go rename to pkg/ext-proc/datastore/datastore.go index 6b8483d3d..f85f90147 100644 --- a/pkg/ext-proc/backend/datastore.go +++ b/pkg/ext-proc/datastore/datastore.go @@ -1,4 +1,4 @@ -package backend +package datastore import ( "context" @@ -52,6 +52,21 @@ func NewDatastore() Datastore { return store } +// Used for test only +func NewFakeDatastore(pods, models *sync.Map, pool *v1alpha1.InferencePool) Datastore { + store := NewDatastore() + if pods != nil { + store.(*datastore).pods = pods + } + if models != nil { + store.(*datastore).models = models + } + if pool != nil { + store.(*datastore).pool = pool + } + return store +} + type datastore struct { // poolMu is used to synchronize access to the inferencePool. poolMu sync.RWMutex @@ -249,3 +264,16 @@ func IsCritical(model *v1alpha1.InferenceModel) bool { } return false } + +// TODO: move out to share with pod_reconciler.go +func podIsReady(pod *corev1.Pod) bool { + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodReady { + if condition.Status == corev1.ConditionTrue { + return true + } + break + } + } + return false +} diff --git a/pkg/ext-proc/backend/datastore_test.go b/pkg/ext-proc/datastore/datastore_test.go similarity index 99% rename from pkg/ext-proc/backend/datastore_test.go rename to pkg/ext-proc/datastore/datastore_test.go index b44de0a54..6c5874df2 100644 --- a/pkg/ext-proc/backend/datastore_test.go +++ b/pkg/ext-proc/datastore/datastore_test.go @@ -1,4 +1,4 @@ -package backend +package datastore import ( "testing" diff --git a/pkg/ext-proc/backend/types.go b/pkg/ext-proc/datastore/types.go similarity index 91% rename from pkg/ext-proc/backend/types.go rename to pkg/ext-proc/datastore/types.go index 0e02fb093..221c66302 100644 --- a/pkg/ext-proc/backend/types.go +++ b/pkg/ext-proc/datastore/types.go @@ -1,5 +1,5 @@ -// Package backend is a library to interact with backend model servers such as probing metrics. -package backend +// Package datastore is a library to interact with backend model servers such as probing metrics. +package datastore import ( "fmt" diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go index 5edb2e777..b3ef08e0d 100644 --- a/pkg/ext-proc/handlers/request.go +++ b/pkg/ext-proc/handlers/request.go @@ -11,7 +11,7 @@ import ( extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "google.golang.org/protobuf/types/known/structpb" "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -53,7 +53,7 @@ func (s *Server) HandleRequestBody( return nil, fmt.Errorf("error finding a model object in InferenceModel for input %v", model) } if len(modelObj.Spec.TargetModels) > 0 { - modelName = backend.RandomWeightedDraw(logger, modelObj, 0) + modelName = datastore.RandomWeightedDraw(logger, modelObj, 0) if modelName == "" { return nil, fmt.Errorf("error getting target model name for model %v", modelObj.Name) } @@ -61,7 +61,7 @@ func (s *Server) HandleRequestBody( llmReq := &scheduling.LLMRequest{ Model: model, ResolvedTargetModel: modelName, - Critical: backend.IsCritical(modelObj), + Critical: datastore.IsCritical(modelObj), } loggerVerbose.Info("LLM request assembled", "request", llmReq) diff --git a/pkg/ext-proc/handlers/server.go b/pkg/ext-proc/handlers/server.go index fe00ebeb3..05de0c427 100644 --- a/pkg/ext-proc/handlers/server.go +++ b/pkg/ext-proc/handlers/server.go @@ -11,13 +11,13 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) -func NewServer(scheduler Scheduler, targetEndpointKey string, datastore backend.Datastore) *Server { +func NewServer(scheduler Scheduler, targetEndpointKey string, datastore datastore.Datastore) *Server { return &Server{ scheduler: scheduler, targetEndpointKey: targetEndpointKey, @@ -32,11 +32,11 @@ type Server struct { // The key of the header to specify the target pod address. This value needs to match Envoy // configuration. targetEndpointKey string - datastore backend.Datastore + datastore datastore.Datastore } type Scheduler interface { - Schedule(ctx context.Context, b *scheduling.LLMRequest) (targetPod backend.PodMetrics, err error) + Schedule(ctx context.Context, b *scheduling.LLMRequest) (targetPod datastore.PodMetrics, err error) } func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { diff --git a/pkg/ext-proc/health.go b/pkg/ext-proc/health.go index 59aec348c..525440cbf 100644 --- a/pkg/ext-proc/health.go +++ b/pkg/ext-proc/health.go @@ -7,13 +7,13 @@ import ( "google.golang.org/grpc/codes" healthPb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" - "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) type healthServer struct { logger logr.Logger - datastore backend.Datastore + datastore datastore.Datastore } func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index 8e5886739..d43f2c572 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -26,6 +26,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/internal/runnable" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" @@ -125,7 +126,7 @@ func run() error { } // Setup runner. - datastore := backend.NewDatastore() + datastore := datastore.NewDatastore() provider := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore) serverRunner := &runserver.ExtProcServerRunner{ GrpcPort: *grpcPort, @@ -189,7 +190,7 @@ func initLogging(opts *zap.Options) { } // registerHealthServer adds the Health gRPC server as a Runnable to the given manager. -func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds backend.Datastore, port int) error { +func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error { srv := grpc.NewServer() healthPb.RegisterHealthServer(srv, &healthServer{ logger: logger, diff --git a/pkg/ext-proc/scheduling/filter.go b/pkg/ext-proc/scheduling/filter.go index e028c59a4..4d53e7206 100644 --- a/pkg/ext-proc/scheduling/filter.go +++ b/pkg/ext-proc/scheduling/filter.go @@ -5,13 +5,13 @@ import ( "math" "github.com/go-logr/logr" - "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) type Filter interface { Name() string - Filter(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) + Filter(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) } // filter applies current filterFunc, and then recursively applies next filters depending success or @@ -41,7 +41,7 @@ func (f *filter) Name() string { return f.name } -func (f *filter) Filter(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { +func (f *filter) Filter(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) { loggerTrace := logger.V(logutil.TRACE) loggerTrace.Info("Running a filter", "name", f.Name(), "podCount", len(pods)) @@ -74,12 +74,12 @@ func (f *filter) Filter(logger logr.Logger, req *LLMRequest, pods []*backend.Pod } // filterFunc filters a set of input pods to a subset. -type filterFunc func(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) +type filterFunc func(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) // toFilterFunc is a helper function to convert a per pod filter func to the FilterFunc. func toFilterFunc(pp podPredicate) filterFunc { - return func(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { - filtered := []*backend.PodMetrics{} + return func(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) { + filtered := []*datastore.PodMetrics{} for _, pod := range pods { pass := pp(req, pod) if pass { @@ -100,10 +100,10 @@ func toFilterFunc(pp podPredicate) filterFunc { // the least one as it gives more choices for the next filter, which on aggregate gave better // results. // TODO: Compare this strategy with other strategies such as top K. -func leastQueuingFilterFunc(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { +func leastQueuingFilterFunc(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) { min := math.MaxInt max := 0 - filtered := []*backend.PodMetrics{} + filtered := []*datastore.PodMetrics{} for _, pod := range pods { if pod.WaitingQueueSize <= min { @@ -122,7 +122,7 @@ func leastQueuingFilterFunc(logger logr.Logger, req *LLMRequest, pods []*backend return filtered, nil } -func lowQueueingPodPredicate(_ *LLMRequest, pod *backend.PodMetrics) bool { +func lowQueueingPodPredicate(_ *LLMRequest, pod *datastore.PodMetrics) bool { return pod.WaitingQueueSize < queueingThresholdLoRA } @@ -132,10 +132,10 @@ func lowQueueingPodPredicate(_ *LLMRequest, pod *backend.PodMetrics) bool { // should consider them all instead of the absolute minimum one. This worked better than picking the // least one as it gives more choices for the next filter, which on aggregate gave better results. // TODO: Compare this strategy with other strategies such as top K. -func leastKVCacheFilterFunc(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { +func leastKVCacheFilterFunc(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) { min := math.MaxFloat64 var max float64 = 0 - filtered := []*backend.PodMetrics{} + filtered := []*datastore.PodMetrics{} for _, pod := range pods { if pod.KVCacheUsagePercent <= min { @@ -155,35 +155,35 @@ func leastKVCacheFilterFunc(logger logr.Logger, req *LLMRequest, pods []*backend } // podPredicate is a filter function to check whether a pod is desired. -type podPredicate func(req *LLMRequest, pod *backend.PodMetrics) bool +type podPredicate func(req *LLMRequest, pod *datastore.PodMetrics) bool // We consider serving an adapter low cost it the adapter is active in the model server, or the // model server has room to load the adapter. The lowLoRACostPredicate ensures weak affinity by // spreading the load of a LoRA adapter across multiple pods, avoiding "pinning" all requests to // a single pod. This gave good performance in our initial benchmarking results in the scenario // where # of lora slots > # of lora adapters. -func lowLoRACostPredicate(req *LLMRequest, pod *backend.PodMetrics) bool { +func lowLoRACostPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool { _, ok := pod.ActiveModels[req.ResolvedTargetModel] return ok || len(pod.ActiveModels) < pod.MaxActiveModels } // loRAAffinityPredicate is a filter function to check whether a pod has affinity to the lora requested. -func loRAAffinityPredicate(req *LLMRequest, pod *backend.PodMetrics) bool { +func loRAAffinityPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool { _, ok := pod.ActiveModels[req.ResolvedTargetModel] return ok } // canAcceptNewLoraPredicate is a filter function to check whether a pod has room to load the adapter. -func canAcceptNewLoraPredicate(req *LLMRequest, pod *backend.PodMetrics) bool { +func canAcceptNewLoraPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool { return len(pod.ActiveModels) < pod.MaxActiveModels } -func criticalRequestPredicate(req *LLMRequest, pod *backend.PodMetrics) bool { +func criticalRequestPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool { return req.Critical } func noQueueAndLessThanKVCacheThresholdPredicate(queueThreshold int, kvCacheThreshold float64) podPredicate { - return func(req *LLMRequest, pod *backend.PodMetrics) bool { + return func(req *LLMRequest, pod *datastore.PodMetrics) bool { return pod.WaitingQueueSize <= queueThreshold && pod.KVCacheUsagePercent <= kvCacheThreshold } } diff --git a/pkg/ext-proc/scheduling/filter_test.go b/pkg/ext-proc/scheduling/filter_test.go index 9ed781c42..b2ae4b893 100644 --- a/pkg/ext-proc/scheduling/filter_test.go +++ b/pkg/ext-proc/scheduling/filter_test.go @@ -7,7 +7,7 @@ import ( "github.com/go-logr/logr" "github.com/google/go-cmp/cmp" "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -17,14 +17,14 @@ func TestFilter(t *testing.T) { tests := []struct { name string req *LLMRequest - input []*backend.PodMetrics - output []*backend.PodMetrics + input []*datastore.PodMetrics + output []*datastore.PodMetrics err bool filter *filter }{ { name: "simple filter without successor, failure", - filter: &filter{filter: func(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { + filter: &filter{filter: func(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) { return nil, errors.New("filter error") }}, err: true, @@ -39,10 +39,10 @@ func TestFilter(t *testing.T) { }, // pod2 will be picked because it has relatively low queue size, with the requested // model being active, and has low KV cache. - input: []*backend.PodMetrics{ + input: []*datastore.PodMetrics{ { - Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}}, - Metrics: backend.Metrics{ + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}}, + Metrics: datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.2, MaxActiveModels: 2, @@ -53,8 +53,8 @@ func TestFilter(t *testing.T) { }, }, { - Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}}, - Metrics: backend.Metrics{ + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}}, + Metrics: datastore.Metrics{ WaitingQueueSize: 3, KVCacheUsagePercent: 0.1, MaxActiveModels: 2, @@ -65,8 +65,8 @@ func TestFilter(t *testing.T) { }, }, { - Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}}, - Metrics: backend.Metrics{ + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}}, + Metrics: datastore.Metrics{ WaitingQueueSize: 10, KVCacheUsagePercent: 0.2, MaxActiveModels: 2, @@ -76,10 +76,10 @@ func TestFilter(t *testing.T) { }, }, }, - output: []*backend.PodMetrics{ + output: []*datastore.PodMetrics{ { - Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}}, - Metrics: backend.Metrics{ + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}}, + Metrics: datastore.Metrics{ WaitingQueueSize: 3, KVCacheUsagePercent: 0.1, MaxActiveModels: 2, @@ -100,10 +100,10 @@ func TestFilter(t *testing.T) { Critical: false, }, // pod1 will be picked because it has capacity for the sheddable request. - input: []*backend.PodMetrics{ + input: []*datastore.PodMetrics{ { - Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}}, - Metrics: backend.Metrics{ + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}}, + Metrics: datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.2, MaxActiveModels: 2, @@ -114,8 +114,8 @@ func TestFilter(t *testing.T) { }, }, { - Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}}, - Metrics: backend.Metrics{ + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}}, + Metrics: datastore.Metrics{ WaitingQueueSize: 3, KVCacheUsagePercent: 0.1, MaxActiveModels: 2, @@ -126,8 +126,8 @@ func TestFilter(t *testing.T) { }, }, { - Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}}, - Metrics: backend.Metrics{ + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}}, + Metrics: datastore.Metrics{ WaitingQueueSize: 10, KVCacheUsagePercent: 0.2, MaxActiveModels: 2, @@ -137,10 +137,10 @@ func TestFilter(t *testing.T) { }, }, }, - output: []*backend.PodMetrics{ + output: []*datastore.PodMetrics{ { - Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}}, - Metrics: backend.Metrics{ + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}}, + Metrics: datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.2, MaxActiveModels: 2, @@ -162,10 +162,10 @@ func TestFilter(t *testing.T) { }, // All pods have higher KV cache thant the threshold, so the sheddable request will be // dropped. - input: []*backend.PodMetrics{ + input: []*datastore.PodMetrics{ { - Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}}, - Metrics: backend.Metrics{ + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}}, + Metrics: datastore.Metrics{ WaitingQueueSize: 10, KVCacheUsagePercent: 0.9, MaxActiveModels: 2, @@ -176,8 +176,8 @@ func TestFilter(t *testing.T) { }, }, { - Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}}, - Metrics: backend.Metrics{ + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}}, + Metrics: datastore.Metrics{ WaitingQueueSize: 3, KVCacheUsagePercent: 0.85, MaxActiveModels: 2, @@ -188,8 +188,8 @@ func TestFilter(t *testing.T) { }, }, { - Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}}, - Metrics: backend.Metrics{ + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}}, + Metrics: datastore.Metrics{ WaitingQueueSize: 10, KVCacheUsagePercent: 0.85, MaxActiveModels: 2, @@ -199,7 +199,7 @@ func TestFilter(t *testing.T) { }, }, }, - output: []*backend.PodMetrics{}, + output: []*datastore.PodMetrics{}, err: true, }, } @@ -225,44 +225,44 @@ func TestFilterFunc(t *testing.T) { name string f filterFunc req *LLMRequest - input []*backend.PodMetrics - output []*backend.PodMetrics + input []*datastore.PodMetrics + output []*datastore.PodMetrics err bool }{ { name: "least queuing empty input", f: leastQueuingFilterFunc, - input: []*backend.PodMetrics{}, - output: []*backend.PodMetrics{}, + input: []*datastore.PodMetrics{}, + output: []*datastore.PodMetrics{}, }, { name: "least queuing", f: leastQueuingFilterFunc, - input: []*backend.PodMetrics{ + input: []*datastore.PodMetrics{ { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ WaitingQueueSize: 0, }, }, { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ WaitingQueueSize: 3, }, }, { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ WaitingQueueSize: 10, }, }, }, - output: []*backend.PodMetrics{ + output: []*datastore.PodMetrics{ { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ WaitingQueueSize: 0, }, }, { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ WaitingQueueSize: 3, }, }, @@ -271,37 +271,37 @@ func TestFilterFunc(t *testing.T) { { name: "least kv cache empty input", f: leastKVCacheFilterFunc, - input: []*backend.PodMetrics{}, - output: []*backend.PodMetrics{}, + input: []*datastore.PodMetrics{}, + output: []*datastore.PodMetrics{}, }, { name: "least kv cache", f: leastKVCacheFilterFunc, - input: []*backend.PodMetrics{ + input: []*datastore.PodMetrics{ { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ KVCacheUsagePercent: 0, }, }, { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ KVCacheUsagePercent: 0.3, }, }, { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ KVCacheUsagePercent: 1.0, }, }, }, - output: []*backend.PodMetrics{ + output: []*datastore.PodMetrics{ { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ KVCacheUsagePercent: 0, }, }, { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ KVCacheUsagePercent: 0.3, }, }, @@ -310,32 +310,32 @@ func TestFilterFunc(t *testing.T) { { name: "noQueueAndLessThanKVCacheThresholdPredicate", f: toFilterFunc(noQueueAndLessThanKVCacheThresholdPredicate(0, 0.8)), - input: []*backend.PodMetrics{ + input: []*datastore.PodMetrics{ { // This pod should be returned. - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0, }, }, { // Queue is non zero, despite low kv cache, should not return. - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ WaitingQueueSize: 1, KVCacheUsagePercent: 0.3, }, }, { // High kv cache despite zero queue, should not return - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 1.0, }, }, }, - output: []*backend.PodMetrics{ + output: []*datastore.PodMetrics{ { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0, }, @@ -349,10 +349,10 @@ func TestFilterFunc(t *testing.T) { Model: "model", ResolvedTargetModel: "model", }, - input: []*backend.PodMetrics{ + input: []*datastore.PodMetrics{ // ActiveModels include input model, should be returned. { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ MaxActiveModels: 2, ActiveModels: map[string]int{ "model": 1, @@ -361,7 +361,7 @@ func TestFilterFunc(t *testing.T) { }, // Input model is not active, however the server has room to load another adapter. { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ MaxActiveModels: 2, ActiveModels: map[string]int{ "another-model": 1, @@ -370,7 +370,7 @@ func TestFilterFunc(t *testing.T) { }, // Input is not active, and the server has reached max active models. { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ MaxActiveModels: 2, ActiveModels: map[string]int{ "foo": 1, @@ -379,9 +379,9 @@ func TestFilterFunc(t *testing.T) { }, }, }, - output: []*backend.PodMetrics{ + output: []*datastore.PodMetrics{ { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ MaxActiveModels: 2, ActiveModels: map[string]int{ "model": 1, @@ -389,7 +389,7 @@ func TestFilterFunc(t *testing.T) { }, }, { - Metrics: backend.Metrics{ + Metrics: datastore.Metrics{ MaxActiveModels: 2, ActiveModels: map[string]int{ "another-model": 1, diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index 354bd39cb..1e56fee3a 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -10,7 +10,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -84,16 +84,16 @@ var ( // request to make room for critical requests. nextOnFailure: &filter{ name: "drop request", - filter: func(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { + filter: func(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) { logger.V(logutil.DEFAULT).Info("Request dropped", "request", req) - return []*backend.PodMetrics{}, status.Errorf( + return []*datastore.PodMetrics{}, status.Errorf( codes.ResourceExhausted, "dropping request due to limited backend resources") }, }, } ) -func NewScheduler(datastore backend.Datastore) *Scheduler { +func NewScheduler(datastore datastore.Datastore) *Scheduler { return &Scheduler{ datastore: datastore, filter: defaultFilter, @@ -101,18 +101,18 @@ func NewScheduler(datastore backend.Datastore) *Scheduler { } type Scheduler struct { - datastore backend.Datastore + datastore datastore.Datastore filter Filter } // Schedule finds the target pod based on metrics and the requested lora adapter. -func (s *Scheduler) Schedule(ctx context.Context, req *LLMRequest) (targetPod backend.PodMetrics, err error) { +func (s *Scheduler) Schedule(ctx context.Context, req *LLMRequest) (targetPod datastore.PodMetrics, err error) { logger := log.FromContext(ctx).WithValues("request", req) podMetrics := s.datastore.PodGetAll() logger.V(logutil.VERBOSE).Info("Scheduling a request", "metrics", podMetrics) pods, err := s.filter.Filter(logger, req, podMetrics) if err != nil || len(pods) == 0 { - return backend.PodMetrics{}, fmt.Errorf( + return datastore.PodMetrics{}, fmt.Errorf( "failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) } logger.V(logutil.VERBOSE).Info("Selecting a random pod from the candidates", "candidatePods", pods) diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go index 073c30df1..7b0209a68 100644 --- a/pkg/ext-proc/server/runserver.go +++ b/pkg/ext-proc/server/runserver.go @@ -20,6 +20,8 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/controller" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/internal/runnable" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" @@ -33,7 +35,7 @@ type ExtProcServerRunner struct { PoolNamespace string RefreshMetricsInterval time.Duration RefreshPrometheusMetricsInterval time.Duration - Datastore backend.Datastore + Datastore datastore.Datastore Provider *backend.Provider SecureServing bool CertPath string @@ -66,7 +68,7 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner { // SetupWithManager sets up the runner with the given manager. func (r *ExtProcServerRunner) SetupWithManager(mgr ctrl.Manager) error { // Create the controllers and register them with the manager - if err := (&backend.InferencePoolReconciler{ + if err := (&controller.InferencePoolReconciler{ Datastore: r.Datastore, Scheme: mgr.GetScheme(), Client: mgr.GetClient(), @@ -79,7 +81,7 @@ func (r *ExtProcServerRunner) SetupWithManager(mgr ctrl.Manager) error { return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err) } - if err := (&backend.InferenceModelReconciler{ + if err := (&controller.InferenceModelReconciler{ Datastore: r.Datastore, Scheme: mgr.GetScheme(), Client: mgr.GetClient(), @@ -92,7 +94,7 @@ func (r *ExtProcServerRunner) SetupWithManager(mgr ctrl.Manager) error { return fmt.Errorf("failed setting up InferenceModelReconciler: %w", err) } - if err := (&backend.PodReconciler{ + if err := (&controller.PodReconciler{ Datastore: r.Datastore, Scheme: mgr.GetScheme(), Client: mgr.GetClient(), diff --git a/pkg/ext-proc/test/benchmark/benchmark.go b/pkg/ext-proc/test/benchmark/benchmark.go index a48f0465b..3820998d7 100644 --- a/pkg/ext-proc/test/benchmark/benchmark.go +++ b/pkg/ext-proc/test/benchmark/benchmark.go @@ -16,7 +16,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" - "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/test" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" @@ -104,8 +104,8 @@ func fakeModels() map[string]*v1alpha1.InferenceModel { return models } -func fakePods() []*backend.PodMetrics { - pms := make([]*backend.PodMetrics, 0, *numFakePods) +func fakePods() []*datastore.PodMetrics { + pms := make([]*datastore.PodMetrics, 0, *numFakePods) for i := 0; i < *numFakePods; i++ { pms = append(pms, test.FakePodMetrics(i, fakeMetrics(i))) } @@ -114,8 +114,8 @@ func fakePods() []*backend.PodMetrics { } // fakeMetrics adds numModelsPerPod number of adapters to the pod metrics. -func fakeMetrics(podNumber int) backend.Metrics { - metrics := backend.Metrics{ +func fakeMetrics(podNumber int) datastore.Metrics { + metrics := datastore.Metrics{ ActiveModels: make(map[string]int), } for i := 0; i < *numModelsPerPod; i++ { diff --git a/pkg/ext-proc/test/utils.go b/pkg/ext-proc/test/utils.go index 46affae91..a2d833e0d 100644 --- a/pkg/ext-proc/test/utils.go +++ b/pkg/ext-proc/test/utils.go @@ -15,6 +15,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" @@ -25,16 +26,16 @@ func StartExtProc( ctx context.Context, port int, refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration, - pods []*backend.PodMetrics, + pods []*datastore.PodMetrics, models map[string]*v1alpha1.InferenceModel, ) *grpc.Server { logger := log.FromContext(ctx) - pms := make(map[types.NamespacedName]*backend.PodMetrics) + pms := make(map[types.NamespacedName]*datastore.PodMetrics) for _, pod := range pods { pms[pod.NamespacedName] = pod } pmc := &backend.FakePodMetricsClient{Res: pms} - datastore := backend.NewDatastore() + datastore := datastore.NewDatastore() for _, m := range models { datastore.ModelSet(m) } @@ -54,7 +55,7 @@ func StartExtProc( } // startExtProc starts an extProc server with fake pods. -func startExtProc(logger logr.Logger, port int, datastore backend.Datastore) *grpc.Server { +func startExtProc(logger logr.Logger, port int, datastore datastore.Datastore) *grpc.Server { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { logutil.Fatal(logger, err, "Failed to listen", "port", port) @@ -95,10 +96,10 @@ func GenerateRequest(logger logr.Logger, prompt, model string) *extProcPb.Proces return req } -func FakePodMetrics(index int, metrics backend.Metrics) *backend.PodMetrics { +func FakePodMetrics(index int, metrics datastore.Metrics) *datastore.PodMetrics { address := fmt.Sprintf("address-%v", index) - pod := backend.PodMetrics{ - Pod: backend.Pod{ + pod := datastore.PodMetrics{ + Pod: datastore.Pod{ NamespacedName: types.NamespacedName{Name: fmt.Sprintf("pod-%v", index)}, Address: address, }, diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index 0e30ac696..89fc02d75 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -32,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" extprocutils "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/test" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" @@ -55,7 +56,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { tests := []struct { name string req *extProcPb.ProcessingRequest - pods []*backend.PodMetrics + pods []*datastore.PodMetrics wantHeaders []*configPb.HeaderValueOption wantMetadata *structpb.Struct wantBody []byte @@ -66,16 +67,16 @@ func TestKubeInferenceModelRequest(t *testing.T) { name: "select lower queue and kv cache, no active lora", req: extprocutils.GenerateRequest(logger, "test1", "my-model"), // pod-1 will be picked because it has relatively low queue size and low KV cache. - pods: []*backend.PodMetrics{ - extprocutils.FakePodMetrics(0, backend.Metrics{ + pods: []*datastore.PodMetrics{ + extprocutils.FakePodMetrics(0, datastore.Metrics{ WaitingQueueSize: 3, KVCacheUsagePercent: 0.2, }), - extprocutils.FakePodMetrics(1, backend.Metrics{ + extprocutils.FakePodMetrics(1, datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.1, }), - extprocutils.FakePodMetrics(2, backend.Metrics{ + extprocutils.FakePodMetrics(2, datastore.Metrics{ WaitingQueueSize: 10, KVCacheUsagePercent: 0.2, }), @@ -111,8 +112,8 @@ func TestKubeInferenceModelRequest(t *testing.T) { req: extprocutils.GenerateRequest(logger, "test2", "sql-lora"), // pod-1 will be picked because it has relatively low queue size, with the requested // model being active, and has low KV cache. - pods: []*backend.PodMetrics{ - extprocutils.FakePodMetrics(0, backend.Metrics{ + pods: []*datastore.PodMetrics{ + extprocutils.FakePodMetrics(0, datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.2, ActiveModels: map[string]int{ @@ -120,7 +121,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { "bar": 1, }, }), - extprocutils.FakePodMetrics(1, backend.Metrics{ + extprocutils.FakePodMetrics(1, datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.1, ActiveModels: map[string]int{ @@ -128,7 +129,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { "sql-lora-1fdg2": 1, }, }), - extprocutils.FakePodMetrics(2, backend.Metrics{ + extprocutils.FakePodMetrics(2, datastore.Metrics{ WaitingQueueSize: 10, KVCacheUsagePercent: 0.2, ActiveModels: map[string]int{ @@ -168,8 +169,8 @@ func TestKubeInferenceModelRequest(t *testing.T) { // pod-2 will be picked despite it NOT having the requested model being active // as it's above the affinity for queue size. Also is critical, so we should // still honor request despite all queues > 5 - pods: []*backend.PodMetrics{ - extprocutils.FakePodMetrics(0, backend.Metrics{ + pods: []*datastore.PodMetrics{ + extprocutils.FakePodMetrics(0, datastore.Metrics{ WaitingQueueSize: 10, KVCacheUsagePercent: 0.2, ActiveModels: map[string]int{ @@ -177,7 +178,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { "bar": 1, }, }), - extprocutils.FakePodMetrics(1, backend.Metrics{ + extprocutils.FakePodMetrics(1, datastore.Metrics{ WaitingQueueSize: 50, KVCacheUsagePercent: 0.1, ActiveModels: map[string]int{ @@ -185,7 +186,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { "sql-lora-1fdg2": 1, }, }), - extprocutils.FakePodMetrics(2, backend.Metrics{ + extprocutils.FakePodMetrics(2, datastore.Metrics{ WaitingQueueSize: 6, KVCacheUsagePercent: 0.2, ActiveModels: map[string]int{ @@ -224,8 +225,8 @@ func TestKubeInferenceModelRequest(t *testing.T) { req: extprocutils.GenerateRequest(logger, "test4", "sql-lora-sheddable"), // no pods will be picked as all models are either above kv threshold, // queue threshold, or both. - pods: []*backend.PodMetrics{ - extprocutils.FakePodMetrics(0, backend.Metrics{ + pods: []*datastore.PodMetrics{ + extprocutils.FakePodMetrics(0, datastore.Metrics{ WaitingQueueSize: 6, KVCacheUsagePercent: 0.2, ActiveModels: map[string]int{ @@ -234,7 +235,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { "sql-lora-1fdg3": 1, }, }), - extprocutils.FakePodMetrics(1, backend.Metrics{ + extprocutils.FakePodMetrics(1, datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.85, ActiveModels: map[string]int{ @@ -242,7 +243,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { "sql-lora-1fdg3": 1, }, }), - extprocutils.FakePodMetrics(2, backend.Metrics{ + extprocutils.FakePodMetrics(2, datastore.Metrics{ WaitingQueueSize: 10, KVCacheUsagePercent: 0.9, ActiveModels: map[string]int{ @@ -265,8 +266,8 @@ func TestKubeInferenceModelRequest(t *testing.T) { name: "noncritical, but one server has capacity, do not shed", req: extprocutils.GenerateRequest(logger, "test5", "sql-lora-sheddable"), // pod 0 will be picked as all other models are above threshold - pods: []*backend.PodMetrics{ - extprocutils.FakePodMetrics(0, backend.Metrics{ + pods: []*datastore.PodMetrics{ + extprocutils.FakePodMetrics(0, datastore.Metrics{ WaitingQueueSize: 4, KVCacheUsagePercent: 0.2, ActiveModels: map[string]int{ @@ -275,7 +276,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { "sql-lora-1fdg3": 1, }, }), - extprocutils.FakePodMetrics(1, backend.Metrics{ + extprocutils.FakePodMetrics(1, datastore.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.85, ActiveModels: map[string]int{ @@ -283,7 +284,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { "sql-lora-1fdg3": 1, }, }), - extprocutils.FakePodMetrics(2, backend.Metrics{ + extprocutils.FakePodMetrics(2, datastore.Metrics{ WaitingQueueSize: 10, KVCacheUsagePercent: 0.9, ActiveModels: map[string]int{ @@ -364,8 +365,8 @@ func TestKubeInferenceModelRequest(t *testing.T) { } } -func setUpHermeticServer(podMetrics []*backend.PodMetrics) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { - pms := make(map[types.NamespacedName]*backend.PodMetrics) +func setUpHermeticServer(podMetrics []*datastore.PodMetrics) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { + pms := make(map[types.NamespacedName]*datastore.PodMetrics) for _, pm := range podMetrics { pms[pm.NamespacedName] = pm } @@ -441,7 +442,7 @@ func BeforeSuit(t *testing.T) func() { serverRunner = runserver.NewDefaultExtProcServerRunner() // Adjust from defaults serverRunner.PoolName = "vllm-llama2-7b-pool" - serverRunner.Datastore = backend.NewDatastore() + serverRunner.Datastore = datastore.NewDatastore() serverRunner.SecureServing = false if err := serverRunner.SetupWithManager(mgr); err != nil {