diff --git a/conformance/testing-epp/plugins/filter/request_header_based_filter.go b/conformance/testing-epp/plugins/filter/request_header_based_filter.go index ee191111e..75039d919 100644 --- a/conformance/testing-epp/plugins/filter/request_header_based_filter.go +++ b/conformance/testing-epp/plugins/filter/request_header_based_filter.go @@ -20,6 +20,7 @@ import ( "context" "strings" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) @@ -38,20 +39,19 @@ var _ framework.Filter = &HeaderBasedTestingFilter{} // NewHeaderBasedTestingFilter initializes a new HeaderBasedTestingFilter. // This should only be used for testing purposes. func NewHeaderBasedTestingFilter() *HeaderBasedTestingFilter { - return &HeaderBasedTestingFilter{} + return &HeaderBasedTestingFilter{ + tn: plugins.TypedName{Type: "header-based-testing", Name: "header-based-testing-filter"}, + } } // HeaderBasedTestingFilter filters Pods based on an address specified in the "test-epp-endpoint-selection" request header. -type HeaderBasedTestingFilter struct{} - -// Type returns the type of the filter. -func (f *HeaderBasedTestingFilter) Type() string { - return "header-based-testing" +type HeaderBasedTestingFilter struct { + tn plugins.TypedName } -// Name returns the type of the filter. -func (f *HeaderBasedTestingFilter) Name() string { - return "header-based-testing-filter" +// TypedName returns the type and name tuple of this plugin instance. +func (f *HeaderBasedTestingFilter) TypedName() plugins.TypedName { + return f.tn } // Filter selects pods that match the IP addresses specified in the request header. diff --git a/pkg/epp/common/config/loader/configloader_test.go b/pkg/epp/common/config/loader/configloader_test.go index a6a982029..1be93ed4e 100644 --- a/pkg/epp/common/config/loader/configloader_test.go +++ b/pkg/epp/common/config/loader/configloader_test.go @@ -555,15 +555,18 @@ schedulingProfiles: var _ framework.Filter = &test1{} type test1 struct { + tn plugins.TypedName Threshold int `json:"threshold"` } -func (f *test1) Type() string { - return test1Type +func newTest1() *test1 { + return &test1{ + tn: plugins.TypedName{Type: test1Type, Name: "test-1"}, + } } -func (f *test1) Name() string { - return "test-1" +func (f *test1) TypedName() plugins.TypedName { + return f.tn } // Filter filters out pods that doesn't meet the filter criteria. @@ -575,14 +578,18 @@ func (f *test1) Filter(_ context.Context, _ *types.CycleState, _ *types.LLMReque var _ framework.Scorer = &test2{} var _ framework.PostCycle = &test2{} -type test2 struct{} +type test2 struct { + tn plugins.TypedName +} -func (f *test2) Type() string { - return test2Type +func newTest2() *test2 { + return &test2{ + tn: plugins.TypedName{Type: test2Type, Name: "test-2"}, + } } -func (f *test2) Name() string { - return "test-2" +func (m *test2) TypedName() plugins.TypedName { + return m.tn } func (m *test2) Score(_ context.Context, _ *types.CycleState, _ *types.LLMRequest, _ []types.Pod) map[types.Pod]float64 { @@ -594,14 +601,18 @@ func (m *test2) PostCycle(_ context.Context, _ *types.CycleState, _ *types.Profi // compile-time type validation var _ framework.Picker = &testPicker{} -type testPicker struct{} +type testPicker struct { + tn plugins.TypedName +} -func (p *testPicker) Type() string { - return testPickerType +func newTestPicker() *testPicker { + return &testPicker{ + tn: plugins.TypedName{Type: testPickerType, Name: "test-picker"}, + } } -func (p *testPicker) Name() string { - return "test-picker" +func (p *testPicker) TypedName() plugins.TypedName { + return p.tn } func (p *testPicker) Pick(_ context.Context, _ *types.CycleState, _ []*types.ScoredPod) *types.ProfileRunResult { @@ -611,14 +622,18 @@ func (p *testPicker) Pick(_ context.Context, _ *types.CycleState, _ []*types.Sco // compile-time type validation var _ framework.ProfileHandler = &testProfileHandler{} -type testProfileHandler struct{} +type testProfileHandler struct { + tn plugins.TypedName +} -func (p *testProfileHandler) Type() string { - return testProfileHandlerType +func newTestProfileHandler() *testProfileHandler { + return &testProfileHandler{ + tn: plugins.TypedName{Type: testProfileHandlerType, Name: "test-profile-handler"}, + } } -func (p *testProfileHandler) Name() string { - return "test-profile-handler" +func (p *testProfileHandler) TypedName() plugins.TypedName { + return p.tn } func (p *testProfileHandler) Pick(_ context.Context, _ *types.CycleState, _ *types.LLMRequest, _ map[string]*framework.SchedulerProfile, _ map[string]*types.ProfileRunResult) map[string]*framework.SchedulerProfile { @@ -631,28 +646,28 @@ func (p *testProfileHandler) ProcessResults(_ context.Context, _ *types.CycleSta func registerTestPlugins() { plugins.Register(test1Type, - func(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) { - result := test1{} - err := json.Unmarshal(parameters, &result) - return &result, err + func(_ string, parameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + result := newTest1() + err := json.Unmarshal(parameters, result) + return result, err }, ) plugins.Register(test2Type, - func(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) { - return &test2{}, nil + func(_ string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return newTest2(), nil }, ) plugins.Register(testPickerType, - func(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) { - return &testPicker{}, nil + func(_ string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return newTestPicker(), nil }, ) plugins.Register(testProfileHandlerType, - func(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) { - return &testProfileHandler{}, nil + func(_ string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return newTestProfileHandler(), nil }, ) } diff --git a/pkg/epp/plugins/plugins.go b/pkg/epp/plugins/plugins.go index 2875e06da..5d2a5f2d6 100644 --- a/pkg/epp/plugins/plugins.go +++ b/pkg/epp/plugins/plugins.go @@ -24,10 +24,8 @@ import ( // Plugin defines the interface for a plugin. // This interface should be embedded in all plugins across the code. type Plugin interface { - // Type returns the type of the plugin. - Type() string - // Name returns the name of this plugin instance. - Name() string + // TypedName returns the type and name tuple of this plugin instance. + TypedName() TypedName } // Handle provides plugins a set of standard data and tools to work with diff --git a/pkg/epp/plugins/typedname.go b/pkg/epp/plugins/typedname.go new file mode 100644 index 000000000..eee781370 --- /dev/null +++ b/pkg/epp/plugins/typedname.go @@ -0,0 +1,38 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugins + +// TypedName is a utility struct providing a type and a name +// to plugins. +// It implements the Plugin interface and can be embedded in +// plugins across the code to reduce boilerplate. +type TypedName struct { + // Type returns the type of the plugin. + Type string + // Name returns the name of this plugin instance. + Name string +} + +const ( + Separator = "/" +) + +// String returns the type and name rendered as +// "/". +func (tn *TypedName) String() string { + return tn.Type + Separator + tn.Name +} diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 78effeda0..d4a0c0b63 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -312,18 +312,18 @@ func RandomWeightedDraw(logger logr.Logger, model *v1alpha2.InferenceModel, seed func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, schedulingResult *schedulingtypes.SchedulingResult, targetPort int) { for _, plugin := range d.preRequestPlugins { - log.FromContext(ctx).V(logutil.DEBUG).Info("Running pre-request plugin", "plugin", plugin.Type()) + log.FromContext(ctx).V(logutil.DEBUG).Info("Running pre-request plugin", "plugin", plugin.TypedName().Type) before := time.Now() plugin.PreRequest(ctx, request, schedulingResult, targetPort) - metrics.RecordRequestControlPluginProcessingLatency(PreRequestPluginType, plugin.Type(), time.Since(before)) + metrics.RecordRequestControlPluginProcessingLatency(PreRequestPluginType, plugin.TypedName().Type, time.Since(before)) } } func (d *Director) runPostResponsePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { for _, plugin := range d.postResponsePlugins { - log.FromContext(ctx).V(logutil.DEBUG).Info("Running post-response plugin", "plugin", plugin.Type()) + log.FromContext(ctx).V(logutil.DEBUG).Info("Running post-response plugin", "plugin", plugin.TypedName().Type) before := time.Now() plugin.PostResponse(ctx, request, response, targetPod) - metrics.RecordRequestControlPluginProcessingLatency(PostResponsePluginType, plugin.Type(), time.Since(before)) + metrics.RecordRequestControlPluginProcessingLatency(PostResponsePluginType, plugin.TypedName().Type, time.Since(before)) } } diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 0f214b830..0c9b080d5 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -37,6 +37,7 @@ import ( backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" @@ -653,9 +654,7 @@ func pointer(v int32) *int32 { } func TestDirector_HandleResponse(t *testing.T) { - pr1 := &testPostResponse{ - TypeRes: "pr1", - } + pr1 := newTestPostResponse("pr1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil) @@ -691,14 +690,25 @@ func TestDirector_HandleResponse(t *testing.T) { } } +const ( + testPostResponseType = "test-post-response" +) + type testPostResponse struct { - TypeRes string + tn plugins.TypedName lastRespOnResponse *Response lastTargetPodOnResponse string } -func (p *testPostResponse) Type() string { return p.TypeRes } -func (p *testPostResponse) Name() string { return "test-post-response" } +func newTestPostResponse(name string) *testPostResponse { + return &testPostResponse{ + tn: plugins.TypedName{Type: testPostResponseType, Name: name}, + } +} + +func (p *testPostResponse) TypedName() plugins.TypedName { + return p.tn +} func (p *testPostResponse) PostResponse(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { p.lastRespOnResponse = response diff --git a/pkg/epp/scheduling/framework/plugins/filter/decision_tree_filter.go b/pkg/epp/scheduling/framework/plugins/filter/decision_tree_filter.go index e73d5f921..662107a3b 100644 --- a/pkg/epp/scheduling/framework/plugins/filter/decision_tree_filter.go +++ b/pkg/epp/scheduling/framework/plugins/filter/decision_tree_filter.go @@ -36,9 +36,11 @@ const ( // compile-time type assertion var _ framework.Filter = &DecisionTreeFilter{} -// DecisionTreeFilter applies current fitler, and then recursively applies next filters +// DecisionTreeFilter applies current filter, and then recursively applies next filters // depending success or failure of the current filter. // It can be used to construct a flow chart algorithm. +// Since a DecisionTreeFilter takes on the type and name of the current filter, +// it is not embedding a fixed plugins.TypeName. type DecisionTreeFilter struct { Current framework.Filter // NextOnSuccess filter will be applied after successfully applying the current filter. @@ -131,20 +133,14 @@ func loadDecisionTreeEntry(entry *decisionTreeFilterEntry, handle plugins.Handle return nil, errors.New("either pluginRef or decisionTree must be specified") } -// Type returns the type of the filter. -func (f *DecisionTreeFilter) Type() string { +func (f *DecisionTreeFilter) TypedName() plugins.TypedName { if f == nil { - return "nil" + // TODO: this keeps the previous behavior ("nil"/"") - not sure + // why done this way. + // Change to empty TypedName or some more meaningful values? + return plugins.TypedName{Type: "nil", Name: ""} } - return f.Current.Type() -} - -// Name returns the name of the filter. -func (f *DecisionTreeFilter) Name() string { - if f == nil { - return "" - } - return f.Current.Name() + return f.Current.TypedName() } // Filter filters out pods that doesn't meet the filter criteria. @@ -161,7 +157,7 @@ func (f *DecisionTreeFilter) Filter(ctx context.Context, cycleState *types.Cycle if f.NextOnSuccess != nil { next = f.NextOnSuccess } - loggerTrace.Info("Filter succeeded", "filter", f.Type(), "next", next.Type(), "filteredPodCount", len(filteredPod)) + loggerTrace.Info("Filter succeeded", "filter", f.TypedName(), "next", next.TypedName(), "filteredPodCount", len(filteredPod)) // On success, pass the filtered result to the next filter. return next.Filter(ctx, cycleState, request, filteredPod) } else { @@ -172,7 +168,7 @@ func (f *DecisionTreeFilter) Filter(ctx context.Context, cycleState *types.Cycle if f.NextOnFailure != nil { next = f.NextOnFailure } - loggerTrace.Info("Filter failed", "filter", f.Type(), "next", next.Type()) + loggerTrace.Info("Filter failed", "filter", f.TypedName(), "next", next.TypedName()) // On failure, pass the initial set of pods to the next filter. return next.Filter(ctx, cycleState, request, pods) } diff --git a/pkg/epp/scheduling/framework/plugins/filter/filter_test.go b/pkg/epp/scheduling/framework/plugins/filter/filter_test.go index 978e91c3e..93fd46c8f 100644 --- a/pkg/epp/scheduling/framework/plugins/filter/filter_test.go +++ b/pkg/epp/scheduling/framework/plugins/filter/filter_test.go @@ -25,8 +25,10 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" k8stypes "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer" @@ -37,14 +39,18 @@ import ( // compile-time type assertion var _ framework.Filter = &filterAll{} -type filterAll struct{} +type filterAll struct { + tn plugins.TypedName +} -func (f *filterAll) Type() string { - return "filter-all" +func (f *filterAll) TypedName() plugins.TypedName { + return f.tn } -func (f *filterAll) Name() string { - return "test-all" +func newFilterAll() *filterAll { + return &filterAll{ + tn: plugins.TypedName{Type: "filter-all", Name: "test-all"}, + } } func (f *filterAll) Filter(_ context.Context, _ *types.CycleState, _ *types.LLMRequest, pods []types.Pod) []types.Pod { @@ -61,7 +67,7 @@ func TestFilter(t *testing.T) { }{ { name: "simple filter filters all pods", - filter: &filterAll{}, + filter: newFilterAll(), output: []types.Pod{}, }, { @@ -359,7 +365,7 @@ func TestDecisionTreeFilterFactory(t *testing.T) { } cmpOptions := cmpopts.IgnoreUnexported(LeastKVCacheFilter{}, LeastQueueFilter{}, - LoraAffinityFilter{}, LowQueueFilter{}, scorer.KVCacheScorer{}) + LoraAffinityFilter{}, LowQueueFilter{}, scorer.KVCacheScorer{}, plugins.TypedName{}) for _, test := range tests { rawParameters := struct { diff --git a/pkg/epp/scheduling/framework/plugins/filter/least_kvcache_filter.go b/pkg/epp/scheduling/framework/plugins/filter/least_kvcache_filter.go index ffb190cb0..3cf9bb6c1 100644 --- a/pkg/epp/scheduling/framework/plugins/filter/least_kvcache_filter.go +++ b/pkg/epp/scheduling/framework/plugins/filter/least_kvcache_filter.go @@ -41,7 +41,7 @@ func LeastKVCacheFilterFactory(name string, _ json.RawMessage, _ plugins.Handle) // NewLeastKVCacheFilter initializes a new LeastKVCacheFilter and returns its pointer. func NewLeastKVCacheFilter() *LeastKVCacheFilter { return &LeastKVCacheFilter{ - name: LeastKVCacheFilterType, + tn: plugins.TypedName{Type: LeastKVCacheFilterType, Name: LeastKVCacheFilterType}, } } @@ -51,22 +51,17 @@ func NewLeastKVCacheFilter() *LeastKVCacheFilter { // should consider them all instead of the absolute minimum one. This worked better than picking the // least one as it gives more choices for the next filter, which on aggregate gave better results. type LeastKVCacheFilter struct { - name string + tn plugins.TypedName } -// Type returns the type of the filter. -func (f *LeastKVCacheFilter) Type() string { - return LeastKVCacheFilterType -} - -// Name returns the name of the filter. -func (f *LeastKVCacheFilter) Name() string { - return f.name +// TypedName returns the type and name tuple of this plugin instance. +func (f *LeastKVCacheFilter) TypedName() plugins.TypedName { + return f.tn } // WithName sets the name of the filter. func (f *LeastKVCacheFilter) WithName(name string) *LeastKVCacheFilter { - f.name = name + f.tn.Name = name return f } diff --git a/pkg/epp/scheduling/framework/plugins/filter/least_queue_filter.go b/pkg/epp/scheduling/framework/plugins/filter/least_queue_filter.go index 54b619c7e..387cc5904 100644 --- a/pkg/epp/scheduling/framework/plugins/filter/least_queue_filter.go +++ b/pkg/epp/scheduling/framework/plugins/filter/least_queue_filter.go @@ -41,7 +41,7 @@ func LeastQueueFilterFactory(name string, _ json.RawMessage, _ plugins.Handle) ( // NewLeastQueueFilter initializes a new LeastQueueFilter and returns its pointer. func NewLeastQueueFilter() *LeastQueueFilter { return &LeastQueueFilter{ - name: LeastQueueFilterType, + tn: plugins.TypedName{Type: LeastQueueFilterType, Name: LeastQueueFilterType}, } } @@ -51,22 +51,17 @@ func NewLeastQueueFilter() *LeastQueueFilter { // we should consider them all instead of the absolute minimum one. This worked better than picking // the least one as it gives more choices for the next filter, which on aggregate gave better results. type LeastQueueFilter struct { - name string + tn plugins.TypedName } -// Type returns the type of the filter. -func (f *LeastQueueFilter) Type() string { - return LeastQueueFilterType -} - -// Name returns the name of the filter. -func (f *LeastQueueFilter) Name() string { - return f.name +// TypedName returns the type and name tuple of this plugin instance. +func (f *LeastQueueFilter) TypedName() plugins.TypedName { + return f.tn } // WithName sets the name of the filter. func (f *LeastQueueFilter) WithName(name string) *LeastQueueFilter { - f.name = name + f.tn.Name = name return f } diff --git a/pkg/epp/scheduling/framework/plugins/filter/lora_affinity_filter.go b/pkg/epp/scheduling/framework/plugins/filter/lora_affinity_filter.go index edb18a9d4..145743f03 100644 --- a/pkg/epp/scheduling/framework/plugins/filter/lora_affinity_filter.go +++ b/pkg/epp/scheduling/framework/plugins/filter/lora_affinity_filter.go @@ -52,7 +52,7 @@ func LoraAffinityFilterFactory(name string, rawParameters json.RawMessage, _ plu // NewLoraAffinityFilter initializes a new LoraAffinityFilter and returns its pointer. func NewLoraAffinityFilter(threshold float64) *LoraAffinityFilter { return &LoraAffinityFilter{ - name: LoraAffinityFilterType, + tn: plugins.TypedName{Type: LoraAffinityFilterType, Name: LoraAffinityFilterType}, loraAffinityThreshold: threshold, } } @@ -65,23 +65,18 @@ func NewLoraAffinityFilter(threshold float64) *LoraAffinityFilter { // 2. Using a probability threshold to sometimes select from non-affinity pods to enable load balancing // 3. Falling back to whatever group has pods if one group is empty type LoraAffinityFilter struct { - name string + tn plugins.TypedName loraAffinityThreshold float64 } -// Type returns the type of the filter. -func (f *LoraAffinityFilter) Type() string { - return LoraAffinityFilterType -} - -// Name returns the type of the filter. -func (f *LoraAffinityFilter) Name() string { - return f.name +// TypedName returns the type and name tuple of this plugin instance. +func (f *LoraAffinityFilter) TypedName() plugins.TypedName { + return f.tn } // WithName sets the type of the filter. func (f *LoraAffinityFilter) WithName(name string) *LoraAffinityFilter { - f.name = name + f.tn.Name = name return f } diff --git a/pkg/epp/scheduling/framework/plugins/filter/low_queue_filter.go b/pkg/epp/scheduling/framework/plugins/filter/low_queue_filter.go index b72ccf857..8b121e726 100644 --- a/pkg/epp/scheduling/framework/plugins/filter/low_queue_filter.go +++ b/pkg/epp/scheduling/framework/plugins/filter/low_queue_filter.go @@ -52,30 +52,25 @@ func LowQueueFilterFactory(name string, rawParameters json.RawMessage, _ plugins // NewLowQueueFilter initializes a new LowQueueFilter and returns its pointer. func NewLowQueueFilter(threshold int) *LowQueueFilter { return &LowQueueFilter{ - name: LowQueueFilterType, + tn: plugins.TypedName{Type: LowQueueFilterType, Name: LowQueueFilterType}, queueingThresholdLoRA: threshold, } } // LowQueueFilter returns pods that their waiting queue size is less than a configured threshold type LowQueueFilter struct { - name string + tn plugins.TypedName queueingThresholdLoRA int } -// Type returns the type of the filter. -func (f *LowQueueFilter) Type() string { - return LowQueueFilterType -} - -// Name returns the name of the filter. -func (f *LowQueueFilter) Name() string { - return f.name +// TypedName returns the type and name tuple of this plugin instance. +func (f *LowQueueFilter) TypedName() plugins.TypedName { + return f.tn } // WithName sets the name of the filter. func (f *LowQueueFilter) WithName(name string) *LowQueueFilter { - f.name = name + f.tn.Name = name return f } diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go index d7fe5d190..a291f4761 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go @@ -68,7 +68,7 @@ type Config struct { type Plugin struct { Config - name string + tn plugins.TypedName indexer Indexer } @@ -144,25 +144,20 @@ func New(config Config) *Plugin { } return &Plugin{ - name: PrefixCachePluginType, + tn: plugins.TypedName{Type: PrefixCachePluginType, Name: PrefixCachePluginType}, Config: config, indexer: newIndexer(capacity), } } -// Type returns the type of the plugin. -func (m *Plugin) Type() string { - return PrefixCachePluginType -} - -// Name returns the name of the plugin. -func (m *Plugin) Name() string { - return m.name +// TypedName returns the type and name tuple of this plugin instance. +func (m *Plugin) TypedName() plugins.TypedName { + return m.tn } // WithName sets the name of the plugin. func (m *Plugin) WithName(name string) *Plugin { - m.name = name + m.tn.Name = name return m } @@ -176,7 +171,7 @@ func (m *Plugin) Score(ctx context.Context, cycleState *types.CycleState, reques PrefixCacheServers: m.matchLongestPrefix(ctx, hashes), } - cycleState.Write(types.StateKey(m.Type()), state) + cycleState.Write(types.StateKey(m.TypedName().Type), state) loggerTrace.Info(fmt.Sprintf("cached servers: %+v", state.PrefixCacheServers), "hashes", state.PrefixHashes) // calculate the scores of pods scores := make(map[types.Pod]float64, len(pods)) diff --git a/pkg/epp/scheduling/framework/plugins/picker/max_score_picker.go b/pkg/epp/scheduling/framework/plugins/picker/max_score_picker.go index db4e8487c..eea839761 100644 --- a/pkg/epp/scheduling/framework/plugins/picker/max_score_picker.go +++ b/pkg/epp/scheduling/framework/plugins/picker/max_score_picker.go @@ -43,30 +43,25 @@ func MaxScorePickerFactory(name string, _ json.RawMessage, _ plugins.Handle) (pl // NewMaxScorePicker initializes a new MaxScorePicker and returns its pointer. func NewMaxScorePicker() *MaxScorePicker { return &MaxScorePicker{ - name: MaxScorePickerType, + tn: plugins.TypedName{Type: MaxScorePickerType, Name: MaxScorePickerType}, random: NewRandomPicker(), } } // MaxScorePicker picks the pod with the maximum score from the list of candidates. type MaxScorePicker struct { - name string + tn plugins.TypedName random *RandomPicker } -// Type returns the type of the picker. -func (p *MaxScorePicker) Type() string { - return MaxScorePickerType -} - -// Name returns the name of the picker. -func (p *MaxScorePicker) Name() string { - return p.name +// TypedName returns the type and name tuple of this plugin instance. +func (p *MaxScorePicker) TypedName() plugins.TypedName { + return p.tn } // WithName sets the picker's name func (p *MaxScorePicker) WithName(name string) *MaxScorePicker { - p.name = name + p.tn.Name = name return p } diff --git a/pkg/epp/scheduling/framework/plugins/picker/random_picker.go b/pkg/epp/scheduling/framework/plugins/picker/random_picker.go index fb2d44b4f..7c6955a8e 100644 --- a/pkg/epp/scheduling/framework/plugins/picker/random_picker.go +++ b/pkg/epp/scheduling/framework/plugins/picker/random_picker.go @@ -23,6 +23,7 @@ import ( "math/rand" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" @@ -44,28 +45,23 @@ func RandomPickerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plug // NewRandomPicker initializes a new RandomPicker and returns its pointer. func NewRandomPicker() *RandomPicker { return &RandomPicker{ - name: RandomPickerType, + tn: plugins.TypedName{Type: RandomPickerType, Name: RandomPickerType}, } } // RandomPicker picks a random pod from the list of candidates. type RandomPicker struct { - name string -} - -// Type returns the type of the picker. -func (p *RandomPicker) Type() string { - return RandomPickerType + tn plugins.TypedName } -// Name returns the name of the picker. -func (p *RandomPicker) Name() string { - return p.name +// TypedName returns the type and name tuple of this plugin instance. +func (p *RandomPicker) TypedName() plugins.TypedName { + return p.tn } -// WithName sets the picker's name +// WithName sets the name of the picker. func (p *RandomPicker) WithName(name string) *RandomPicker { - p.name = name + p.tn.Name = name return p } diff --git a/pkg/epp/scheduling/framework/plugins/profile/single_profile_handler.go b/pkg/epp/scheduling/framework/plugins/profile/single_profile_handler.go index 48d75a6f6..e8809486e 100644 --- a/pkg/epp/scheduling/framework/plugins/profile/single_profile_handler.go +++ b/pkg/epp/scheduling/framework/plugins/profile/single_profile_handler.go @@ -42,28 +42,23 @@ func SingleProfileHandlerFactory(name string, _ json.RawMessage, _ plugins.Handl // NewSingleProfileHandler initializes a new SingleProfileHandler and returns its pointer. func NewSingleProfileHandler() *SingleProfileHandler { return &SingleProfileHandler{ - name: SingleProfileHandlerType, + tn: plugins.TypedName{Type: SingleProfileHandlerType, Name: SingleProfileHandlerType}, } } // SingleProfileHandler handles a single profile which is always the primary profile. type SingleProfileHandler struct { - name string + tn plugins.TypedName } -// Type returns the type of the Profile Handler. -func (h *SingleProfileHandler) Type() string { - return SingleProfileHandlerType -} - -// Name returns the name of the profile handler. -func (h *SingleProfileHandler) Name() string { - return h.name +// TypedName returns the type and name tuple of this plugin instance. +func (h *SingleProfileHandler) TypedName() plugins.TypedName { + return h.tn } // WithName sets the name of the profile handler. func (h *SingleProfileHandler) WithName(name string) *SingleProfileHandler { - h.name = name + h.tn.Name = name return h } diff --git a/pkg/epp/scheduling/framework/plugins/scorer/kvcache.go b/pkg/epp/scheduling/framework/plugins/scorer/kvcache.go index f0d4af9ae..387ae0bc1 100644 --- a/pkg/epp/scheduling/framework/plugins/scorer/kvcache.go +++ b/pkg/epp/scheduling/framework/plugins/scorer/kvcache.go @@ -41,28 +41,23 @@ func KvCacheScorerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plu // NewKVCacheScorer initializes a new KVCacheScorer and returns its pointer. func NewKVCacheScorer() *KVCacheScorer { return &KVCacheScorer{ - name: KvCacheScorerType, + tn: plugins.TypedName{Type: KvCacheScorerType, Name: KvCacheScorerType}, } } // KVCacheScorer scores list of candidate pods based on KV cache utilization. type KVCacheScorer struct { - name string + tn plugins.TypedName } -// Type returns the type of the scorer. -func (s *KVCacheScorer) Type() string { - return KvCacheScorerType -} - -// Name returns the name of the scorer. -func (s *KVCacheScorer) Name() string { - return s.name +// TypedName returns the type and name tuple of this plugin instance. +func (s *KVCacheScorer) TypedName() plugins.TypedName { + return s.tn } // WithName sets the name of the scorer. func (s *KVCacheScorer) WithName(name string) *KVCacheScorer { - s.name = name + s.tn.Name = name return s } diff --git a/pkg/epp/scheduling/framework/plugins/scorer/queue.go b/pkg/epp/scheduling/framework/plugins/scorer/queue.go index dad6d0411..c41c1cba5 100644 --- a/pkg/epp/scheduling/framework/plugins/scorer/queue.go +++ b/pkg/epp/scheduling/framework/plugins/scorer/queue.go @@ -42,29 +42,24 @@ func QueueScorerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugi // NewQueueScorer initializes a new QueueScorer and returns its pointer. func NewQueueScorer() *QueueScorer { return &QueueScorer{ - name: QueueScorerType, + tn: plugins.TypedName{Type: QueueScorerType, Name: QueueScorerType}, } } // QueueScorer scores list of candidate pods based on the pod's waiting queue size. // the less waiting queue size the pod has, the higher score it will get (since it's more available to serve new request). type QueueScorer struct { - name string + tn plugins.TypedName } -// Type returns the type of the scorer. -func (s *QueueScorer) Type() string { - return QueueScorerType -} - -// Name returns the name of the scorer. -func (s *QueueScorer) Name() string { - return s.name +// TypedName returns the type and name tuple of this plugin instance. +func (s *QueueScorer) TypedName() plugins.TypedName { + return s.tn } // WithName sets the name of the scorer. func (s *QueueScorer) WithName(name string) *QueueScorer { - s.name = name + s.tn.Name = name return s } diff --git a/pkg/epp/scheduling/framework/scheduler_profile.go b/pkg/epp/scheduling/framework/scheduler_profile.go index f41a915f0..307f474a6 100644 --- a/pkg/epp/scheduling/framework/scheduler_profile.go +++ b/pkg/epp/scheduling/framework/scheduler_profile.go @@ -86,14 +86,14 @@ func (p *SchedulerProfile) AddPlugins(pluginObjects ...plugins.Plugin) error { p.scorers = append(p.scorers, weightedScorer) plugin = weightedScorer.Scorer // if we got WeightedScorer, unwrap the plugin } else if scorer, ok := plugin.(Scorer); ok { // if we got a Scorer instead of WeightedScorer that's an error. - return fmt.Errorf("failed to register scorer '%s' without a weight. follow function documentation to register a scorer", scorer.Type()) + return fmt.Errorf("failed to register scorer '%s' without a weight. follow function documentation to register a scorer", scorer.TypedName()) } if filter, ok := plugin.(Filter); ok { p.filters = append(p.filters, filter) } if picker, ok := plugin.(Picker); ok { if p.picker != nil { - return fmt.Errorf("failed to set '%s' as picker, already have a registered picker plugin '%s'", picker.Type(), p.picker.Type()) + return fmt.Errorf("failed to set '%s' as picker, already have a registered picker plugin '%s'", picker.TypedName(), p.picker.TypedName()) } p.picker = picker } @@ -127,11 +127,11 @@ func (p *SchedulerProfile) runFilterPlugins(ctx context.Context, request *types. loggerDebug.Info("Before running filter plugins", "pods", filteredPods) for _, filter := range p.filters { - loggerDebug.Info("Running filter plugin", "plugin", filter.Type()) + loggerDebug.Info("Running filter plugin", "plugin", filter.TypedName().Type) before := time.Now() filteredPods = filter.Filter(ctx, cycleState, request, filteredPods) - metrics.RecordSchedulerPluginProcessingLatency(FilterPluginType, filter.Type(), time.Since(before)) - loggerDebug.Info("Filter plugin result", "plugin", filter.Type(), "pods", filteredPods) + metrics.RecordSchedulerPluginProcessingLatency(FilterPluginType, filter.TypedName().Type, time.Since(before)) + loggerDebug.Info("Filter plugin result", "plugin", filter.TypedName().Type, "pods", filteredPods) if len(filteredPods) == 0 { break } @@ -151,14 +151,14 @@ func (p *SchedulerProfile) runScorerPlugins(ctx context.Context, request *types. } // Iterate through each scorer in the chain and accumulate the weighted scores. for _, scorer := range p.scorers { - loggerDebug.Info("Running scorer", "scorer", scorer.Type()) + loggerDebug.Info("Running scorer", "scorer", scorer.TypedName().Type) before := time.Now() scores := scorer.Score(ctx, cycleState, request, pods) - metrics.RecordSchedulerPluginProcessingLatency(ScorerPluginType, scorer.Type(), time.Since(before)) + metrics.RecordSchedulerPluginProcessingLatency(ScorerPluginType, scorer.TypedName().Type, time.Since(before)) for pod, score := range scores { // weight is relative to the sum of weights weightedScorePerPod[pod] += score * float64(scorer.Weight()) } - loggerDebug.Info("After running scorer", "scorer", scorer.Type()) + loggerDebug.Info("After running scorer", "scorer", scorer.TypedName().Type) } loggerDebug.Info("After running scorer plugins") @@ -177,7 +177,7 @@ func (p *SchedulerProfile) runPickerPlugin(ctx context.Context, cycleState *type loggerDebug.Info("Before running picker plugin", "pods weighted score", fmt.Sprint(weightedScorePerPod)) before := time.Now() result := p.picker.Pick(ctx, cycleState, scoredPods) - metrics.RecordSchedulerPluginProcessingLatency(PickerPluginType, p.picker.Type(), time.Since(before)) + metrics.RecordSchedulerPluginProcessingLatency(PickerPluginType, p.picker.TypedName().Type, time.Since(before)) loggerDebug.Info("After running picker plugin", "result", result) return result @@ -185,9 +185,9 @@ func (p *SchedulerProfile) runPickerPlugin(ctx context.Context, cycleState *type func (p *SchedulerProfile) runPostCyclePlugins(ctx context.Context, cycleState *types.CycleState, result *types.ProfileRunResult) { for _, plugin := range p.postCyclePlugins { - log.FromContext(ctx).V(logutil.DEBUG).Info("Running post-cycle plugin", "plugin", plugin.Type()) + log.FromContext(ctx).V(logutil.DEBUG).Info("Running post-cycle plugin", "plugin", plugin.TypedName().Type) before := time.Now() plugin.PostCycle(ctx, cycleState, result) - metrics.RecordSchedulerPluginProcessingLatency(PostCyclePluginType, plugin.Type(), time.Since(before)) + metrics.RecordSchedulerPluginProcessingLatency(PostCyclePluginType, plugin.TypedName().Type, time.Since(before)) } } diff --git a/pkg/epp/scheduling/framework/scheduler_profile_test.go b/pkg/epp/scheduling/framework/scheduler_profile_test.go index d94bb26ca..a2adccaf6 100644 --- a/pkg/epp/scheduling/framework/scheduler_profile_test.go +++ b/pkg/epp/scheduling/framework/scheduler_profile_test.go @@ -23,30 +23,24 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" k8stypes "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" // Import config for thresholds + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) func TestSchedulePlugins(t *testing.T) { - tp1 := &testPlugin{ - TypeRes: "test1", - ScoreRes: 0.3, - FilterRes: []k8stypes.NamespacedName{{Name: "pod1"}, {Name: "pod2"}, {Name: "pod3"}}, - } - tp2 := &testPlugin{ - TypeRes: "test2", - ScoreRes: 0.8, - FilterRes: []k8stypes.NamespacedName{{Name: "pod1"}, {Name: "pod2"}}, - } - tp_filterAll := &testPlugin{ - TypeRes: "filter all", - FilterRes: []k8stypes.NamespacedName{}, - } - pickerPlugin := &testPlugin{ - TypeRes: "picker", - PickRes: k8stypes.NamespacedName{Name: "pod1"}, - } + tp1 := newTestPlugin("test1", 0.3, + []k8stypes.NamespacedName{{Name: "pod1"}, {Name: "pod2"}, {Name: "pod3"}}, + k8stypes.NamespacedName{}) + tp2 := newTestPlugin("test2", 0.8, + []k8stypes.NamespacedName{{Name: "pod1"}, {Name: "pod2"}}, + k8stypes.NamespacedName{}) + tp_filterAll := newTestPlugin("filter all", 0.0, + []k8stypes.NamespacedName{}, k8stypes.NamespacedName{}) + pickerPlugin := newTestPlugin("picker", 0.0, + []k8stypes.NamespacedName{}, k8stypes.NamespacedName{Name: "pod1"}) tests := []struct { name string @@ -155,32 +149,32 @@ func TestSchedulePlugins(t *testing.T) { for _, plugin := range test.profile.filters { tp, _ := plugin.(*testPlugin) if tp.FilterCallCount != 1 { - t.Errorf("Plugin %s Filter() called %d times, expected 1", plugin.Type(), tp.FilterCallCount) + t.Errorf("Plugin %s Filter() called %d times, expected 1", plugin.TypedName(), tp.FilterCallCount) } } for _, plugin := range test.profile.scorers { tp, _ := plugin.Scorer.(*testPlugin) if tp.ScoreCallCount != 1 { - t.Errorf("Plugin %s Score() called %d times, expected 1", plugin.Type(), tp.ScoreCallCount) + t.Errorf("Plugin %s Score() called %d times, expected 1", plugin.TypedName(), tp.ScoreCallCount) } if test.numPodsToScore != tp.NumOfScoredPods { - t.Errorf("Plugin %s Score() called with %d pods, expected %d", plugin.Type(), tp.NumOfScoredPods, test.numPodsToScore) + t.Errorf("Plugin %s Score() called with %d pods, expected %d", plugin.TypedName(), tp.NumOfScoredPods, test.numPodsToScore) } } tp, _ := test.profile.picker.(*testPlugin) if tp.NumOfPickerCandidates != test.numPodsToScore { - t.Errorf("Picker plugin %s Pick() called with %d candidates, expected %d", tp.Type(), tp.NumOfPickerCandidates, tp.NumOfScoredPods) + t.Errorf("Picker plugin %s Pick() called with %d candidates, expected %d", tp.TypedName(), tp.NumOfPickerCandidates, tp.NumOfScoredPods) } if tp.PickCallCount != 1 { - t.Errorf("Picker plugin %s Pick() called %d times, expected 1", tp.Type(), tp.PickCallCount) + t.Errorf("Picker plugin %s Pick() called %d times, expected 1", tp.TypedName(), tp.PickCallCount) } - if tp.WinnderPodScore != test.targetPodScore { - t.Errorf("winnder pod score %v, expected %v", tp.WinnderPodScore, test.targetPodScore) + if tp.WinnerPodScore != test.targetPodScore { + t.Errorf("winner pod score %v, expected %v", tp.WinnerPodScore, test.targetPodScore) } for _, plugin := range test.profile.postCyclePlugins { tp, _ := plugin.(*testPlugin) if tp.PostScheduleCallCount != 1 { - t.Errorf("Plugin %s PostSchedule() called %d times, expected 1", plugin.Type(), tp.PostScheduleCallCount) + t.Errorf("Plugin %s PostSchedule() called %d times, expected 1", plugin.TypedName(), tp.PostScheduleCallCount) } } }) @@ -195,6 +189,7 @@ var _ PostCycle = &testPlugin{} // testPlugin is an implementation useful in unit tests. type testPlugin struct { + tn plugins.TypedName TypeRes string ScoreCallCount int NumOfScoredPods int @@ -205,11 +200,22 @@ type testPlugin struct { PickCallCount int NumOfPickerCandidates int PickRes k8stypes.NamespacedName - WinnderPodScore float64 + WinnerPodScore float64 } -func (tp *testPlugin) Type() string { return tp.TypeRes } -func (tp *testPlugin) Name() string { return "test-plugin" } +func newTestPlugin(typeRes string, score float64, pruned []k8stypes.NamespacedName, + target k8stypes.NamespacedName) *testPlugin { + return &testPlugin{ + tn: plugins.TypedName{Type: typeRes, Name: "test-plugin"}, + ScoreRes: score, + FilterRes: pruned, + PickRes: target, + } +} + +func (tp *testPlugin) TypedName() plugins.TypedName { + return tp.tn +} func (tp *testPlugin) Filter(_ context.Context, _ *types.CycleState, _ *types.LLMRequest, pods []types.Pod) []types.Pod { tp.FilterCallCount++ @@ -235,7 +241,7 @@ func (tp *testPlugin) Pick(_ context.Context, _ *types.CycleState, scoredPods [] for _, scoredPod := range scoredPods { if scoredPod.GetPod().NamespacedName.String() == tp.PickRes.String() { winnerPod = scoredPod.Pod - tp.WinnderPodScore = scoredPod.Score + tp.WinnerPodScore = scoredPod.Score } } diff --git a/pkg/epp/scheduling/scheduler.go b/pkg/epp/scheduling/scheduler.go index b848b26dc..3edc0893e 100644 --- a/pkg/epp/scheduling/scheduler.go +++ b/pkg/epp/scheduling/scheduler.go @@ -107,7 +107,7 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, can for { // get the next set of profiles to run iteratively based on the request and the previous execution results before := time.Now() profiles := s.profileHandler.Pick(ctx, cycleState, request, s.profiles, profileRunResults) - metrics.RecordSchedulerPluginProcessingLatency(framework.ProfilePickerType, s.profileHandler.Type(), time.Since(before)) + metrics.RecordSchedulerPluginProcessingLatency(framework.ProfilePickerType, s.profileHandler.TypedName().Type, time.Since(before)) if len(profiles) == 0 { // profile picker didn't pick any profile to run break } @@ -129,7 +129,7 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, can before := time.Now() result, err := s.profileHandler.ProcessResults(ctx, cycleState, request, profileRunResults) - metrics.RecordSchedulerPluginProcessingLatency(framework.ProcessProfilesResultsType, s.profileHandler.Type(), time.Since(before)) + metrics.RecordSchedulerPluginProcessingLatency(framework.ProcessProfilesResultsType, s.profileHandler.TypedName().Type, time.Since(before)) return result, err }