diff --git a/PROJECT b/PROJECT index 75c9c9cca..c049fc8e1 100644 --- a/PROJECT +++ b/PROJECT @@ -24,4 +24,12 @@ resources: kind: InferenceModel path: sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1 version: v1alpha1 +- api: + crdVersion: v1 + namespaced: true + domain: x-k8s.io + group: inference + kind: EndpointPickerConfig + path: sigs.k8s.io/gateway-api-inference-extension/api/config/v1alpha1 + version: v1alpha1 version: "3" diff --git a/api/config/v1alpha1/defaults.go b/api/config/v1alpha1/defaults.go new file mode 100644 index 000000000..f59bc7c3c --- /dev/null +++ b/api/config/v1alpha1/defaults.go @@ -0,0 +1,29 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +// SetDefaults_EndpointPickerConfig sets default values in a +// EndpointPickerConfig struct. +// +// This naming convension is required by the defalter-gen code. +func SetDefaults_EndpointPickerConfig(cfg *EndpointPickerConfig) { + for idx, pluginConfig := range cfg.Plugins { + if pluginConfig.Name == "" { + cfg.Plugins[idx].Name = pluginConfig.PluginName + } + } +} diff --git a/api/config/v1alpha1/doc.go b/api/config/v1alpha1/doc.go new file mode 100644 index 000000000..122c3b952 --- /dev/null +++ b/api/config/v1alpha1/doc.go @@ -0,0 +1,22 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package v1alpha1 contains API Schema definitions for the +// inference.networking.x-k8s.io API group. +// +// +kubebuilder:object:generate=true +// +groupName=inference.networking.x-k8s.io +package v1alpha1 diff --git a/api/config/v1alpha1/endpointpickerconfig_types.go b/api/config/v1alpha1/endpointpickerconfig_types.go new file mode 100644 index 000000000..86f5ab7ec --- /dev/null +++ b/api/config/v1alpha1/endpointpickerconfig_types.go @@ -0,0 +1,92 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "encoding/json" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +k8s:defaulter-gen=true +// +kubebuilder:object:root=true + +// EndpointPickerConfig is the Schema for the endpointpickerconfigs API +type EndpointPickerConfig struct { + metav1.TypeMeta `json:",inline"` + + // +required + // +kubebuilder:validation:Required + // Plugins is the list of plugins that will be instantiated. + Plugins []PluginSpec `json:"plugins"` + + // +required + // +kubebuilder:validation:Required + // SchedulingProfiles is the list of named SchedulingProfiles + // that will be created. + SchedulingProfiles []SchedulingProfile `json:"schedulingProfiles"` +} + +// PluginSpec contains the information that describes a plugin that +// will be instantiated. +type PluginSpec struct { + // +optional + // Name provides a name for plugin entries to reference. If + // omitted, the value of the PluginName field will be used. + Name string `json:"name"` + + // +required + // +kubebuilder:validation:Required + // PluginName specifies the plugin to be instantiated. + PluginName string `json:"pluginName"` + + // +optional + // Parameters are the set of parameters to be passed to the plugin's + // factory function. The factory function is responsible + // to parse the parameters. + Parameters json.RawMessage `json:"parameters"` +} + +// SchedulingProfile contains the information to create a SchedulingProfile +// entry to be used by the scheduler. +type SchedulingProfile struct { + // +kubebuilder:validation:Required + // Name specifies the name of this SchedulingProfile + Name string `json:"name"` + + // +required + // +kubebuilder:validation:Required + // Plugins is the list of plugins for this SchedulingProfile. They are assigned + // to the appropriate "slots" based on their type. + Plugins []SchedulingPlugin `json:"plugins"` +} + +// SchedulingPlugin describes a plugin that will be associated with a +// SchedulingProfile entry. +type SchedulingPlugin struct { + // +required + // +kubebuilder:validation:Required + // PluginRef specifies a partiular Plugin instance to be associated with + // this SchedulingProfile. The reference is to the name of an + // entry of the Plugins defined in the configuration's Plugins + // section + PluginRef string `json:"pluginRef"` + + // +optional + // Weight is the weight fo be used if this plugin is a Scorer. + Weight *int `json:"weight"` +} diff --git a/api/config/v1alpha1/zz_generated.deepcopy.go b/api/config/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 000000000..1326f357b --- /dev/null +++ b/api/config/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,126 @@ +//go:build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by controller-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "encoding/json" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EndpointPickerConfig) DeepCopyInto(out *EndpointPickerConfig) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Plugins != nil { + in, out := &in.Plugins, &out.Plugins + *out = make([]PluginSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.SchedulingProfiles != nil { + in, out := &in.SchedulingProfiles, &out.SchedulingProfiles + *out = make([]SchedulingProfile, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EndpointPickerConfig. +func (in *EndpointPickerConfig) DeepCopy() *EndpointPickerConfig { + if in == nil { + return nil + } + out := new(EndpointPickerConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *EndpointPickerConfig) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PluginSpec) DeepCopyInto(out *PluginSpec) { + *out = *in + if in.Parameters != nil { + in, out := &in.Parameters, &out.Parameters + *out = make(json.RawMessage, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PluginSpec. +func (in *PluginSpec) DeepCopy() *PluginSpec { + if in == nil { + return nil + } + out := new(PluginSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchedulingPlugin) DeepCopyInto(out *SchedulingPlugin) { + *out = *in + if in.Weight != nil { + in, out := &in.Weight, &out.Weight + *out = new(int) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulingPlugin. +func (in *SchedulingPlugin) DeepCopy() *SchedulingPlugin { + if in == nil { + return nil + } + out := new(SchedulingPlugin) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchedulingProfile) DeepCopyInto(out *SchedulingProfile) { + *out = *in + if in.Plugins != nil { + in, out := &in.Plugins, &out.Plugins + *out = make([]SchedulingPlugin, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulingProfile. +func (in *SchedulingProfile) DeepCopy() *SchedulingProfile { + if in == nil { + return nil + } + out := new(SchedulingProfile) + in.DeepCopyInto(out) + return out +} diff --git a/api/config/v1alpha1/zz_generated.defaults.go b/api/config/v1alpha1/zz_generated.defaults.go new file mode 100644 index 000000000..1cf0c4275 --- /dev/null +++ b/api/config/v1alpha1/zz_generated.defaults.go @@ -0,0 +1,38 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by defaulter-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// RegisterDefaults adds defaulters functions to the given scheme. +// Public to allow building arbitrary schemes. +// All generated defaulters are covering - they call all nested defaulters. +func RegisterDefaults(scheme *runtime.Scheme) error { + scheme.AddTypeDefaultingFunc(&EndpointPickerConfig{}, func(obj interface{}) { SetObjectDefaults_EndpointPickerConfig(obj.(*EndpointPickerConfig)) }) + return nil +} + +func SetObjectDefaults_EndpointPickerConfig(in *EndpointPickerConfig) { + SetDefaults_EndpointPickerConfig(in) +} diff --git a/api/config/v1alpha1/zz_generated.register.go b/api/config/v1alpha1/zz_generated.register.go new file mode 100644 index 000000000..3c48d0ce2 --- /dev/null +++ b/api/config/v1alpha1/zz_generated.register.go @@ -0,0 +1,69 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by register-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + schema "k8s.io/apimachinery/pkg/runtime/schema" +) + +// GroupName specifies the group name used to register the objects. +const GroupName = "inference.networking.x-k8s.io" + +// GroupVersion specifies the group and the version used to register the objects. +var GroupVersion = v1.GroupVersion{Group: GroupName, Version: "v1alpha1"} + +// SchemeGroupVersion is group version used to register these objects +// Deprecated: use GroupVersion instead. +var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha1"} + +// Resource takes an unqualified resource and returns a Group qualified GroupResource +func Resource(resource string) schema.GroupResource { + return SchemeGroupVersion.WithResource(resource).GroupResource() +} + +var ( + // localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes. + SchemeBuilder runtime.SchemeBuilder + localSchemeBuilder = &SchemeBuilder + // Deprecated: use Install instead + AddToScheme = localSchemeBuilder.AddToScheme + Install = localSchemeBuilder.AddToScheme +) + +func init() { + // We only register manually written functions here. The registration of the + // generated functions takes place in the generated files. The separation + // makes the code compile even when the generated files are missing. + localSchemeBuilder.Register(addKnownTypes) +} + +// Adds the list of known types to Scheme. +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &EndpointPickerConfig{}, + ) + // AddToGroupVersion allows the serialization of client types like ListOptions. + v1.AddToGroupVersion(scheme, SchemeGroupVersion) + return nil +} diff --git a/cmd/epp/main.go b/cmd/epp/main.go index 609a1e5d5..5d5b2c57a 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -25,6 +25,9 @@ import ( ) func main() { + // Register all known plugin factories + runner.RegisterAllPlgugins() + if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil { os.Exit(1) } diff --git a/cmd/epp/runner/register.go b/cmd/epp/runner/register.go new file mode 100644 index 000000000..ab525a2f8 --- /dev/null +++ b/cmd/epp/runner/register.go @@ -0,0 +1,44 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runner + +import ( + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer" +) + +// RegisterAllPlgugins registers the factory functions of all known plugins +func RegisterAllPlgugins() { + plugins.Register(filter.LeastKVCacheFilterName, filter.LeastKVCacheFilterFactory) + plugins.Register(filter.LeastQueueFilterName, filter.LeastQueueFilterFactory) + plugins.Register(filter.LoraAffinityFilterName, filter.LoraAffinityFilterFactory) + plugins.Register(filter.LowQueueFilterName, filter.LowQueueFilterFactory) + plugins.Register(prefix.PrefixCachePluginName, prefix.PrefixCachePluginFactory) + plugins.Register(picker.MaxScorePickerName, picker.MaxScorePickerFactory) + plugins.Register(picker.RandomPickerName, picker.RandomPickerFactory) + plugins.Register(profile.SingleProfileHandlerName, profile.SingleProfileHandlerFactory) + plugins.Register(scorer.KvCacheScorerName, scorer.KvCacheScorerFactory) + plugins.Register(scorer.QueueScorerName, scorer.QueueScorerFactory) +} + +// eppHandle is a temporary implementation of the interface plugins.Handle +type eppHandle struct { +} diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 194ad9d02..ea4fba0a3 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -35,12 +35,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/gateway-api-inference-extension/api/config/v1alpha1" conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp" "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" @@ -109,6 +112,9 @@ var ( loraInfoMetric = flag.String("loraInfoMetric", "vllm:lora_requests_info", "Prometheus metric for the LoRA info metrics (must be in vLLM label format).") + // configuration flags + configFile = flag.String("configFile", "", "The path to the configuration file") + configText = flag.String("configText", "", "The configuration specified as text, in lieu of a file") setupLog = ctrl.Log.WithName("setup") @@ -211,6 +217,30 @@ func (r *Runner) Run(ctx context.Context) error { return err } + var theConfig *v1alpha1.EndpointPickerConfig + var instantiatedPlugins map[string]plugins.Plugin + + if len(*configText) != 0 || len(*configFile) != 0 { + theConfig, err = config.LoadConfig([]byte(*configText), *configFile) + if err != nil { + setupLog.Error(err, "Failed to load the configuration") + return err + } + + epp := eppHandle{} + instantiatedPlugins, err = config.LoadPluginReferences(theConfig.Plugins, epp) + if err != nil { + setupLog.Error(err, "Failed to instantiate the plugins") + return err + } + + r.schedulerConfig, err = scheduling.LoadSchedulerConfig(theConfig.SchedulingProfiles, instantiatedPlugins, setupLog) + if err != nil { + setupLog.Error(err, "Failed to create Scheduler configuration") + return err + } + } + // --- Initialize Core EPP Components --- scheduler, err := r.initializeScheduler(datastore) if err != nil { @@ -220,6 +250,10 @@ func (r *Runner) Run(ctx context.Context) error { saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log) + // Add requestControl plugins + if instantiatedPlugins != nil { + r.requestControlConfig.AddPlugins(instantiatedPlugins) + } director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig) // --- Setup ExtProc Server Runner --- @@ -354,6 +388,9 @@ func validateFlags() error { if *poolName == "" { return fmt.Errorf("required %q flag not set", "poolName") } + if len(*configText) != 0 && len(*configFile) != 0 { + return fmt.Errorf("both the %s and %s flags can not be set at the same time", "configText", "configFile") + } return nil } diff --git a/pkg/epp/common/config/configloader.go b/pkg/epp/common/config/configloader.go new file mode 100644 index 000000000..7db82a0c3 --- /dev/null +++ b/pkg/epp/common/config/configloader.go @@ -0,0 +1,144 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "errors" + "fmt" + "os" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + + configapi "sigs.k8s.io/gateway-api-inference-extension/api/config/v1alpha1" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" +) + +var scheme = runtime.NewScheme() + +func init() { + configapi.SchemeBuilder.Register(configapi.RegisterDefaults) + utilruntime.Must(configapi.Install(scheme)) +} + +// Load config either from supplied text or from a file +func LoadConfig(configText []byte, fileName string) (*configapi.EndpointPickerConfig, error) { + var err error + if len(configText) == 0 { + configText, err = os.ReadFile(fileName) + if err != nil { + return nil, fmt.Errorf("failed to load config file. Error: %s", err) + } + } + + theConfig := &configapi.EndpointPickerConfig{} + + codecs := serializer.NewCodecFactory(scheme, serializer.EnableStrict) + err = runtime.DecodeInto(codecs.UniversalDecoder(), configText, theConfig) + if err != nil { + return nil, fmt.Errorf("the configuration is invalid. Error: %s", err) + } + + // Validate loaded configuration + err = validateConfiguration(theConfig) + if err != nil { + return nil, fmt.Errorf("the configuration is invalid. error: %s", err) + } + return theConfig, nil +} + +func LoadPluginReferences(thePlugins []configapi.PluginSpec, handle plugins.Handle) (map[string]plugins.Plugin, error) { + references := map[string]plugins.Plugin{} + for _, pluginConfig := range thePlugins { + thePlugin, err := InstantiatePlugin(pluginConfig, handle) + if err != nil { + return nil, err + } + references[pluginConfig.Name] = thePlugin + } + return references, nil +} + +func InstantiatePlugin(pluginSpec configapi.PluginSpec, handle plugins.Handle) (plugins.Plugin, error) { + factory, ok := plugins.Registry[pluginSpec.PluginName] + if !ok { + return nil, fmt.Errorf("failed to instantiate the plugin. plugin %s not found", pluginSpec.PluginName) + } + thePlugin, err := factory(pluginSpec.Name, pluginSpec.Parameters, handle) + if err != nil { + return nil, fmt.Errorf("failed to instantiate the plugin %s. Error: %s", pluginSpec.PluginName, err) + } + return thePlugin, err +} + +func validateConfiguration(theConfig *configapi.EndpointPickerConfig) error { + names := make(map[string]struct{}) + + for _, pluginConfig := range theConfig.Plugins { + if pluginConfig.PluginName == "" { + return errors.New("plugin reference definition missing a plugin name") + } + + if _, ok := names[pluginConfig.Name]; ok { + return fmt.Errorf("the name %s has been specified for more than one plugin", pluginConfig.Name) + } + names[pluginConfig.Name] = struct{}{} + + _, ok := plugins.Registry[pluginConfig.PluginName] + if !ok { + return fmt.Errorf("plugin %s is not found", pluginConfig.PluginName) + } + } + + if len(theConfig.SchedulingProfiles) == 0 { + return errors.New("there must be at least one scheduling profile in the configuration") + } + + names = map[string]struct{}{} + for _, profile := range theConfig.SchedulingProfiles { + if profile.Name == "" { + return errors.New("SchedulingProfiles need a name") + } + + if _, ok := names[profile.Name]; ok { + return fmt.Errorf("the name %s has been specified for more than one SchedulingProfile", profile.Name) + } + names[profile.Name] = struct{}{} + + if len(profile.Plugins) == 0 { + return errors.New("SchedulingProfiles need at least one plugin") + } + for _, plugin := range profile.Plugins { + if len(plugin.PluginRef) == 0 { + return errors.New("SchedulingProfile's plugins need a plugin reference") + } + + notFound := true + for _, pluginConfig := range theConfig.Plugins { + if plugin.PluginRef == pluginConfig.Name { + notFound = false + break + } + } + if notFound { + return errors.New(plugin.PluginRef + " is a reference to an undefined Plugin") + } + } + } + return nil +} diff --git a/pkg/epp/common/config/configloader_test.go b/pkg/epp/common/config/configloader_test.go new file mode 100644 index 000000000..7600ba5ed --- /dev/null +++ b/pkg/epp/common/config/configloader_test.go @@ -0,0 +1,548 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "context" + "encoding/json" + "testing" + + "github.com/google/go-cmp/cmp" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + configapi "sigs.k8s.io/gateway-api-inference-extension/api/config/v1alpha1" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +const ( + testProfileHandlerName = "test-profile-handler" + test1Name = "test-one" + test2Name = "test-two" + testPickerName = "test-picker" +) + +func TestLoadConfiguration(t *testing.T) { + test2Weight := 50 + + registerTestPlugins() + + goodConfig := &configapi.EndpointPickerConfig{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointPickerConfig", + APIVersion: "inference.networking.x-k8s.io/v1alpha1", + }, + Plugins: []configapi.PluginSpec{ + { + Name: "test1", + PluginName: test1Name, + Parameters: json.RawMessage("{\"threshold\":10}"), + }, + { + Name: "profileHandler", + PluginName: "test-profile-handler", + }, + { + Name: test2Name, + PluginName: test2Name, + Parameters: json.RawMessage("{\"hashBlockSize\":32}"), + }, + { + Name: "testPicker", + PluginName: testPickerName, + }, + }, + SchedulingProfiles: []configapi.SchedulingProfile{ + { + Name: "default", + Plugins: []configapi.SchedulingPlugin{ + { + PluginRef: "test1", + }, + { + PluginRef: "test-two", + Weight: &test2Weight, + }, + { + PluginRef: "testPicker", + }, + }, + }, + }, + } + + tests := []struct { + name string + configText string + configFile string + want *configapi.EndpointPickerConfig + wantErr bool + }{ + { + name: "success", + configText: successConfigText, + configFile: "", + want: goodConfig, + wantErr: false, + }, + { + name: "errorBadYaml", + configText: errorBadYamlText, + configFile: "", + wantErr: true, + }, + { + name: "errorNoProfileHandler", + configText: errorNoProfileHandlerText, + configFile: "", + wantErr: true, + }, + { + name: "errorBadPluginReferenceText", + configText: errorBadPluginReferenceText, + configFile: "", + wantErr: true, + }, + { + name: "errorBadPluginReferencePluginText", + configText: errorBadPluginReferencePluginText, + configFile: "", + wantErr: true, + }, + { + name: "errorNoProfiles", + configText: errorNoProfilesText, + configFile: "", + wantErr: true, + }, + { + name: "errorNoProfileName", + configText: errorNoProfileNameText, + configFile: "", + wantErr: true, + }, + { + name: "errorNoProfilePlugins", + configText: errorNoProfilePluginsText, + configFile: "", + wantErr: true, + }, + { + name: "errorBadProfilePlugin", + configText: errorBadProfilePluginText, + configFile: "", + wantErr: true, + }, + { + name: "errorBadProfilePluginRef", + configText: errorBadProfilePluginRefText, + configFile: "", + wantErr: true, + }, + { + name: "errorBadProfilePluginName", + configText: errorBadProfilePluginNameText, + configFile: "", + wantErr: true, + }, + { + name: "errorDuplicatePlugin", + configText: errorDuplicatePluginText, + configFile: "", + wantErr: true, + }, + { + name: "errorDuplicateProfile", + configText: errorDuplicateProfileText, + configFile: "", + wantErr: true, + }, + { + name: "successFromFile", + configText: "", + configFile: "../../../../test/testdata/configloader_1_test.yaml", + want: goodConfig, + wantErr: false, + }, + { + name: "noSuchFile", + configText: "", + configFile: "../../../../test/testdata/configloader_error_test.yaml", + wantErr: true, + }, + } + + for _, test := range tests { + got, err := LoadConfig([]byte(test.configText), test.configFile) + if err != nil { + if !test.wantErr { + t.Fatalf("In test %s LoadConfig returned unexpected error: %v, want %v", test.name, err, test.wantErr) + } + t.Logf("error was %s", err) + } else { + if test.wantErr { + t.Fatalf("In test %s LoadConfig did not return an expected error", test.name) + } + if diff := cmp.Diff(test.want, got); diff != "" { + t.Errorf("In test %s LoadConfig returned unexpected response, diff(-want, +got): %v", test.name, diff) + } + } + } +} + +func TestLoadPluginReferences(t *testing.T) { + theConfig, err := LoadConfig([]byte(successConfigText), "") + if err != nil { + t.Fatalf("LoadConfig returned unexpected error: %v", err) + } + references, err := LoadPluginReferences(theConfig.Plugins, testHandle{}) + if err != nil { + t.Fatalf("LoadPluginReferences returned unexpected error: %v", err) + } + if len(references) == 0 { + t.Fatalf("LoadPluginReferences returned an empty set of references") + } + if t1, ok := references["test1"]; !ok { + t.Fatalf("LoadPluginReferences returned references did not contain test1") + } else if _, ok := t1.(*test1); !ok { + t.Fatalf("LoadPluginReferences returned references value for test1 has the wrong type %#v", t1) + } + + theConfig, err = LoadConfig([]byte(errorBadPluginReferenceParametersText), "") + if err != nil { + t.Fatalf("LoadConfig returned unexpected error: %v", err) + } + _, err = LoadPluginReferences(theConfig.Plugins, testHandle{}) + if err == nil { + t.Fatalf("LoadPluginReferences did not return the expected error") + } +} + +func TestInstantiatePlugin(t *testing.T) { + plugSpec := configapi.PluginSpec{PluginName: "plover"} + _, err := InstantiatePlugin(plugSpec, testHandle{}) + if err == nil { + t.Fatalf("InstantiatePlugin did not return the expected error") + } +} + +type testHandle struct { +} + +// The following multi-line string constants, cause false positive lint errors (dupword) + +//nolint:dupword +const successConfigText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + pluginName: test-one + parameters: + threshold: 10 +- name: profileHandler + pluginName: test-profile-handler +- pluginName: test-two + parameters: + hashBlockSize: 32 +- name: testPicker + pluginName: test-picker +schedulingProfiles: +- name: default + plugins: + - pluginRef: test1 + - pluginRef: test-two + weight: 50 + - pluginRef: testPicker +` + +//nolint:dupword +const errorBadYamlText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- testing 1 2 3 +` + +//nolint:dupword +const errorBadPluginReferenceText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- parameters: + a: 1234 +` + +//nolint:dupword +const errorBadPluginReferencePluginText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: testx + pluginName: test-x +- name: profileHandler + pluginName: test-profile-handler +` + +//nolint:dupword +const errorNoProfileHandlerText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + pluginName: test-one + parameters: + threshold: 10 +schedulingProfiles: +- name: default +` + +//nolint:dupword +const errorNoProfilesText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + pluginName: test-one + parameters: + threshold: 10 +- name: profileHandler + pluginName: test-profile-handler +` + +//nolint:dupword +const errorNoProfileNameText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + pluginName: test-one + parameters: + threshold: 10 +- name: profileHandler + pluginName: test-profile-handler +schedulingProfiles: +- plugins: + - pluginRef: test1 +` + +//nolint:dupword +const errorNoProfilePluginsText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + pluginName: test-one + parameters: + threshold: 10 +- name: profileHandler + pluginName: test-profile-handler +schedulingProfiles: +- name: default +` + +//nolint:dupword +const errorBadProfilePluginText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: profileHandler + pluginName: test-profile-handler +schedulingProfiles: +- name: default + plugins: + - weight: 10 +` + +//nolint:dupword +const errorBadProfilePluginRefText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: profileHandler + pluginName: test-profile-handler +schedulingProfiles: +- name: default + plugins: + - pluginRef: plover +` + +//nolint:dupword +const errorBadProfilePluginNameText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- pluginName: test-profile-handler +schedulingProfiles: +- name: default + plugins: + - pluginRef: plover +` + +//nolint:dupword +const errorBadPluginReferenceParametersText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + pluginName: test-one + parameters: + threshold: asdf +- name: profileHandler + pluginName: test-profile-handler +schedulingProfiles: +- name: default + plugins: + - pluginRef: test1 +` + +//nolint:dupword +const errorDuplicatePluginText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + pluginName: test-one + parameters: + threshold: 10 +- name: test1 + pluginName: test-one + parameters: + threshold: 20 +- name: profileHandler + pluginName: test-profile-handler +schedulingProfiles: +- name: default + plugins: + - pluginRef: test1 +` + +//nolint:dupword +const errorDuplicateProfileText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + pluginName: test-one + parameters: + threshold: 10 +- name: test2 + pluginName: test-one + parameters: + threshold: 20 +- name: profileHandler + pluginName: test-profile-handler +schedulingProfiles: +- name: default + plugins: + - pluginRef: test1 +- name: default + plugins: + - pluginRef: test2 +` + +// compile-time type validation +var _ framework.Filter = &test1{} + +type test1 struct { + Threshold int `json:"threshold"` +} + +func (f *test1) Name() string { + return test1Name +} + +// Filter filters out pods that doesn't meet the filter criteria. +func (f *test1) Filter(ctx context.Context, request *types.LLMRequest, cycleState *types.CycleState, pods []types.Pod) []types.Pod { + return pods +} + +// compile-time type validation +var _ framework.Scorer = &test2{} +var _ framework.PostCycle = &test2{} + +type test2 struct{} + +func (f *test2) Name() string { + return test2Name +} + +func (m *test2) Score(ctx context.Context, request *types.LLMRequest, cycleState *types.CycleState, pods []types.Pod) map[types.Pod]float64 { + return map[types.Pod]float64{} +} + +func (m *test2) PostCycle(ctx context.Context, cycleState *types.CycleState, res *types.ProfileRunResult) { +} + +// compile-time type validation +var _ framework.Picker = &testPicker{} + +type testPicker struct{} + +func (p *testPicker) Name() string { + return testPickerName +} + +func (p *testPicker) Pick(ctx context.Context, cycleState *types.CycleState, scoredPods []*types.ScoredPod) *types.ProfileRunResult { + return nil +} + +// compile-time type validation +var _ framework.ProfileHandler = &testProfileHandler{} + +type testProfileHandler struct{} + +func (p *testProfileHandler) Name() string { + return testProfileHandlerName +} + +func (p *testProfileHandler) Pick(ctx context.Context, request *types.LLMRequest, profiles map[string]*framework.SchedulerProfile, executionResults map[string]*types.ProfileRunResult) map[string]*framework.SchedulerProfile { + return nil +} + +func (p *testProfileHandler) ProcessResults(ctx context.Context, request *types.LLMRequest, profileResults map[string]*types.ProfileRunResult) *types.SchedulingResult { + return nil +} + +func registerTestPlugins() { + plugins.Register(test1Name, + func(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) { + result := test1{} + err := json.Unmarshal(parameters, &result) + return &result, err + }, + ) + + plugins.Register(test2Name, + func(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) { + return &test2{}, nil + }, + ) + + plugins.Register(testPickerName, + func(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) { + return &testPicker{}, nil + }, + ) + + plugins.Register(testProfileHandlerName, + func(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) { + return &testProfileHandler{}, nil + }, + ) +} diff --git a/pkg/epp/plugins/plugins.go b/pkg/epp/plugins/plugins.go index 5dd8d87bf..fd0f64ce7 100644 --- a/pkg/epp/plugins/plugins.go +++ b/pkg/epp/plugins/plugins.go @@ -22,3 +22,7 @@ type Plugin interface { // Name returns the name of the plugin. Name() string } + +// Handle provides plugins set of standard data and tools to work with +type Handle interface { +} diff --git a/pkg/epp/plugins/registry.go b/pkg/epp/plugins/registry.go new file mode 100644 index 000000000..d33ce99b3 --- /dev/null +++ b/pkg/epp/plugins/registry.go @@ -0,0 +1,33 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugins + +import ( + "encoding/json" +) + +// Factory is the definition of the factory functions that are used to instantiate plugins +// specified in a configuration. +type Factory func(name string, parameters json.RawMessage, handle Handle) (Plugin, error) + +// Register is a static function that can be called to register plugin factory functions. +func Register(name string, factory Factory) { + Registry[name] = factory +} + +// Registry is a mapping from plugin name to Factory function +var Registry map[string]Factory = map[string]Factory{} diff --git a/pkg/epp/registry/registry.go b/pkg/epp/registry/registry.go new file mode 100644 index 000000000..8ea49dac2 --- /dev/null +++ b/pkg/epp/registry/registry.go @@ -0,0 +1,35 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "encoding/json" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" +) + +// Factory is the definition of the factory functions that are used to instantiate plugins +// specified in a configuration. +type Factory func(parameters json.RawMessage) (plugins.Plugin, error) + +// Register is a static function that can be called to register plugin factory functions. +func Register(name string, factory Factory) { + Registry[name] = factory +} + +// Registry is a mapping from plugin name to Factory function +var Registry map[string]Factory = map[string]Factory{} diff --git a/pkg/epp/requestcontrol/request_control_config.go b/pkg/epp/requestcontrol/request_control_config.go index e5f2cf7fe..a64b2b209 100644 --- a/pkg/epp/requestcontrol/request_control_config.go +++ b/pkg/epp/requestcontrol/request_control_config.go @@ -16,6 +16,8 @@ limitations under the License. package requestcontrol +import "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + // NewConfig creates a new Config object and returns its pointer. func NewConfig() *Config { return &Config{ @@ -34,3 +36,11 @@ func (c *Config) WithPostResponsePlugins(plugins ...PostResponse) *Config { c.postResponsePlugins = plugins return c } + +func (c *Config) AddPlugins(instances map[string]plugins.Plugin) { + for _, plugin := range instances { + if postResponse, ok := plugin.(PostResponse); ok { + c.postResponsePlugins = append(c.postResponsePlugins, postResponse) + } + } +} diff --git a/pkg/epp/scheduling/config/config.go b/pkg/epp/scheduling/config/config.go index 80efaaad6..e7fd0a3f4 100644 --- a/pkg/epp/scheduling/config/config.go +++ b/pkg/epp/scheduling/config/config.go @@ -33,8 +33,8 @@ type Config struct { const ( // Default values for LoRA specific thresholds - defaultQueueingThresholdLoRA = 128 - defaultLoraAffinityThreshold = 0.999 + DefaultQueueingThresholdLoRA = 128 + DefaultLoraAffinityThreshold = 0.999 ) // LoadConfig loads configuration from environment variables @@ -45,8 +45,8 @@ func LoadConfig() Config { config := Config{ KVCacheThreshold: envutil.GetEnvFloat("KV_CACHE_THRESHOLD", commonconfig.DefaultKVCacheThreshold, baseLogger), QueueThresholdCritical: envutil.GetEnvInt("QUEUE_THRESHOLD_CRITICAL", commonconfig.DefaultQueueThresholdCritical, baseLogger), - QueueingThresholdLoRA: envutil.GetEnvInt("QUEUING_THRESHOLD_LORA", defaultQueueingThresholdLoRA, baseLogger), - LoraAffinityThreshold: envutil.GetEnvFloat("LORA_AFFINITY_THRESHOLD", defaultLoraAffinityThreshold, baseLogger), + QueueingThresholdLoRA: envutil.GetEnvInt("QUEUING_THRESHOLD_LORA", DefaultQueueingThresholdLoRA, baseLogger), + LoraAffinityThreshold: envutil.GetEnvFloat("LORA_AFFINITY_THRESHOLD", DefaultLoraAffinityThreshold, baseLogger), } baseLogger.V(logutil.DEFAULT).Info("Scheduler configuration loaded", "config", config) diff --git a/pkg/epp/scheduling/framework/plugins/filter/least_kvcache_filter.go b/pkg/epp/scheduling/framework/plugins/filter/least_kvcache_filter.go index fe18395fb..c8f34ea02 100644 --- a/pkg/epp/scheduling/framework/plugins/filter/least_kvcache_filter.go +++ b/pkg/epp/scheduling/framework/plugins/filter/least_kvcache_filter.go @@ -18,15 +18,24 @@ package filter import ( "context" + "encoding/json" "math" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) -// compile-time type assertion +const LeastKVCacheFilterName = "least-KV-cache" + +// compile-time type validation var _ framework.Filter = &LeastKVCacheFilter{} +// LeastKVCacheFilterFactory is the plugin factory function for the Least KV Cache filter +func LeastKVCacheFilterFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return NewLeastKVCacheFilter(), nil +} + // NewLeastKVCacheFilter initializes a new LeastKVCacheFilter and returns its pointer. func NewLeastKVCacheFilter() *LeastKVCacheFilter { return &LeastKVCacheFilter{} @@ -41,7 +50,7 @@ type LeastKVCacheFilter struct{} // Name returns the name of the filter. func (f *LeastKVCacheFilter) Name() string { - return "least-KV-cache" + return LeastKVCacheFilterName } // Filter filters out pods that doesn't meet the filter criteria. diff --git a/pkg/epp/scheduling/framework/plugins/filter/least_queue_filter.go b/pkg/epp/scheduling/framework/plugins/filter/least_queue_filter.go index 8a43ce72a..1a30d1272 100644 --- a/pkg/epp/scheduling/framework/plugins/filter/least_queue_filter.go +++ b/pkg/epp/scheduling/framework/plugins/filter/least_queue_filter.go @@ -18,15 +18,24 @@ package filter import ( "context" + "encoding/json" "math" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) -// compile-time type assertion +const LeastQueueFilterName = "least-queue" + +// compile-time type validation var _ framework.Filter = &LeastQueueFilter{} +// LeastQueueFilterFactory is the plugin factory function for the Least Queue filter +func LeastQueueFilterFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return NewLeastQueueFilter(), nil +} + // NewLeastQueueFilter initializes a new LeastQueueFilter and returns its pointer. func NewLeastQueueFilter() *LeastQueueFilter { return &LeastQueueFilter{} @@ -41,7 +50,7 @@ type LeastQueueFilter struct{} // Name returns the name of the filter. func (f *LeastQueueFilter) Name() string { - return "least-queue" + return LeastQueueFilterName } // Filter filters out pods that doesn't meet the filter criteria. diff --git a/pkg/epp/scheduling/framework/plugins/filter/lora_affinity_filter.go b/pkg/epp/scheduling/framework/plugins/filter/lora_affinity_filter.go index 04c0107f3..1d89e488d 100644 --- a/pkg/epp/scheduling/framework/plugins/filter/lora_affinity_filter.go +++ b/pkg/epp/scheduling/framework/plugins/filter/lora_affinity_filter.go @@ -18,17 +18,35 @@ package filter import ( "context" + "encoding/json" + "fmt" "math/rand" "time" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) -// compile-time type assertion +const LoraAffinityFilterName = "lora-affinity" + +type loraAffinityFilterParameters struct { + Threshold float64 `json:"threshold"` +} + +// compile-time type validation var _ framework.Filter = &LoraAffinityFilter{} +// LoraAffinityFilterFactory is the factory function for the LoraAffinity filter +func LoraAffinityFilterFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + parameters := loraAffinityFilterParameters{Threshold: config.DefaultLoraAffinityThreshold} + if err := json.Unmarshal(rawParameters, ¶meters); err != nil { + return nil, fmt.Errorf("failed to parse the parameters of the %s filter. Error: %s", LoraAffinityFilterName, err) + } + return &LoraAffinityFilter{loraAffinityThreshold: parameters.Threshold}, nil +} + // NewLoraAffinityFilter initializes a new LoraAffinityFilter and returns its pointer. func NewLoraAffinityFilter() *LoraAffinityFilter { return &LoraAffinityFilter{ @@ -49,7 +67,7 @@ type LoraAffinityFilter struct { // Name returns the name of the filter. func (f *LoraAffinityFilter) Name() string { - return "lora-affinity" + return LoraAffinityFilterName } // Filter filters out pods that doesn't meet the filter criteria. diff --git a/pkg/epp/scheduling/framework/plugins/filter/low_queue_filter.go b/pkg/epp/scheduling/framework/plugins/filter/low_queue_filter.go index 0192c3280..9ab0717e1 100644 --- a/pkg/epp/scheduling/framework/plugins/filter/low_queue_filter.go +++ b/pkg/epp/scheduling/framework/plugins/filter/low_queue_filter.go @@ -18,15 +18,35 @@ package filter import ( "context" + "fmt" + "encoding/json" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) -// compile-time type assertion +const LowQueueFilterName = "low-queue" + +type lowQueueFilterParameters struct { + Threshold int `json:"threshold"` +} + +// compile-time type validation var _ framework.Filter = &LowQueueFilter{} +// LowQueueFilterFactory is the factory function for the LowQueue filter +func LowQueueFilterFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + parameters := lowQueueFilterParameters{Threshold: config.DefaultQueueingThresholdLoRA} + if err := json.Unmarshal(rawParameters, ¶meters); err != nil { + return nil, fmt.Errorf("failed to parse the parameters of the %s filter. Error: %s", LowQueueFilterName, err) + } + + return &LowQueueFilter{queueingThresholdLoRA: parameters.Threshold}, nil +} + // NewLowQueueFilter initializes a new LowQueueFilter and returns its pointer. func NewLowQueueFilter() *LowQueueFilter { return &LowQueueFilter{ @@ -41,7 +61,7 @@ type LowQueueFilter struct { // Name returns the name of the filter. func (f *LowQueueFilter) Name() string { - return "low-queue" + return LowQueueFilterName } // Filter filters out pods that doesn't meet the filter criteria. diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go index 0d40746f3..6ee825373 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go @@ -19,12 +19,14 @@ package prefix import ( "context" "encoding/binary" + "encoding/json" "fmt" "github.com/cespare/xxhash/v2" k8stypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" @@ -49,17 +51,19 @@ const ( // token is about 128KB in size, so we can cache 500K tokens. Using the default block size of 16 // in vLLM, we will have 250K / 16 = 31.25K blocks. DefaultLRUCapacityPerServer = 31250 + + PrefixCachePluginName = "prefix-cache" ) type Config struct { // The input prompt is broken into sizes of HashBlockSize to calculate block hashes . Requests // with length shorter than the block size will be ignored. - HashBlockSize int + HashBlockSize int `json:"hashBlockSize"` // MaxPrefixBlocksToMatch is the maximum number of prefix blocks to match. Input beyond this limit will // be ignored. - MaxPrefixBlocksToMatch int + MaxPrefixBlocksToMatch int `json:"maxPrefixBlocksToMatch"` // Max capacity size of the LRU indexer in number of entries per server (pod). - LRUCapacityPerServer int + LRUCapacityPerServer int `json:"lruCapacityPerServer"` } type Plugin struct { @@ -84,7 +88,24 @@ func (s ServerID) String() string { return k8stypes.NamespacedName(s).String() } -// compile-time type assertion +// PrefixCachePluginFactory is the factory for the PrefixCache plugin +func PrefixCachePluginFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + parameters := Config{ + HashBlockSize: DefaultHashBlockSize, + MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks, + LRUCapacityPerServer: DefaultLRUCapacityPerServer, + } + if err := json.Unmarshal(rawParameters, ¶meters); err != nil { + return nil, fmt.Errorf("failed to parse the parameters of the %s plugin. Error: %s", PrefixCachePluginName, err) + } + + return &Plugin{ + Config: parameters, + indexer: newIndexer(parameters.LRUCapacityPerServer), + }, nil +} + +// compile-time type validation var _ types.StateData = &schedulingContextState{} // This is the state of this plugin to be used during a scheduling cycle. @@ -133,7 +154,7 @@ func New(config Config) *Plugin { // Name returns the name of the plugin. func (m *Plugin) Name() string { - return "prefix-cache" + return PrefixCachePluginName } // Score returns the scoring result for the given list of pods based on context. diff --git a/pkg/epp/scheduling/framework/plugins/picker/max_score_picker.go b/pkg/epp/scheduling/framework/plugins/picker/max_score_picker.go index 1a859f51e..f358c16e8 100644 --- a/pkg/epp/scheduling/framework/plugins/picker/max_score_picker.go +++ b/pkg/epp/scheduling/framework/plugins/picker/max_score_picker.go @@ -18,17 +18,26 @@ package picker import ( "context" + "encoding/json" "fmt" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) -// compile-time type assertion +const MaxScorePickerName = "max-score" + +// compile-time type validation var _ framework.Picker = &MaxScorePicker{} +// MaxScorePickerFactory is the factory for the MaxScore picker +func MaxScorePickerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return &MaxScorePicker{random: NewRandomPicker()}, nil +} + // NewMaxScorePicker initializes a new MaxScorePicker and returns its pointer. func NewMaxScorePicker() *MaxScorePicker { return &MaxScorePicker{ @@ -43,7 +52,7 @@ type MaxScorePicker struct { // Name returns the name of the picker. func (p *MaxScorePicker) Name() string { - return "max_score" + return MaxScorePickerName } // Pick selects the pod with the maximum score from the list of candidates. diff --git a/pkg/epp/scheduling/framework/plugins/picker/random_picker.go b/pkg/epp/scheduling/framework/plugins/picker/random_picker.go index 4f585f535..8dec5e26f 100644 --- a/pkg/epp/scheduling/framework/plugins/picker/random_picker.go +++ b/pkg/epp/scheduling/framework/plugins/picker/random_picker.go @@ -18,18 +18,27 @@ package picker import ( "context" + "encoding/json" "fmt" "math/rand" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) -// compile-time type assertion +const RandomPickerName = "random" + +// compile-time type validation var _ framework.Picker = &RandomPicker{} +// RandomPickerFactory is the factory for the Random picker +func RandomPickerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return NewRandomPicker(), nil +} + // NewRandomPicker initializes a new RandomPicker and returns its pointer. func NewRandomPicker() *RandomPicker { return &RandomPicker{} @@ -40,7 +49,7 @@ type RandomPicker struct{} // Name returns the name of the picker. func (p *RandomPicker) Name() string { - return "random" + return RandomPickerName } // Pick selects a random pod from the list of candidates. diff --git a/pkg/epp/scheduling/framework/plugins/profile/single_profile_handler.go b/pkg/epp/scheduling/framework/plugins/profile/single_profile_handler.go index 43bfd621c..f31811adc 100644 --- a/pkg/epp/scheduling/framework/plugins/profile/single_profile_handler.go +++ b/pkg/epp/scheduling/framework/plugins/profile/single_profile_handler.go @@ -18,14 +18,22 @@ package profile import ( "context" + "encoding/json" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) +const SingleProfileHandlerName = "single-profile" + // compile-time type assertion var _ framework.ProfileHandler = &SingleProfileHandler{} +func SingleProfileHandlerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return NewSingleProfileHandler(), nil +} + // NewSingleProfileHandler initializes a new SingleProfileHandler and returns its pointer. func NewSingleProfileHandler() *SingleProfileHandler { return &SingleProfileHandler{} @@ -36,7 +44,7 @@ type SingleProfileHandler struct{} // Name returns the name of the Profiles Picker. func (h *SingleProfileHandler) Name() string { - return "single-profile" + return SingleProfileHandlerName } // Pick selects the SchedulingProfiles to run from the list of candidate profiles, while taking into consideration the request properties and the diff --git a/pkg/epp/scheduling/framework/plugins/scorer/kvcache.go b/pkg/epp/scheduling/framework/plugins/scorer/kvcache.go index 1e536604c..50e64853c 100644 --- a/pkg/epp/scheduling/framework/plugins/scorer/kvcache.go +++ b/pkg/epp/scheduling/framework/plugins/scorer/kvcache.go @@ -18,18 +18,26 @@ package scorer import ( "context" + "encoding/json" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) const ( DefaultKVCacheScorerWeight = 1 + KvCacheScorerName = "kv-cache" ) // compile-time type assertion var _ framework.Scorer = &KVCacheScorer{} +// KvCacheScorerFactory is the factory for the KV-Cache scorer +func KvCacheScorerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return &KVCacheScorer{}, nil +} + // NewKVCacheScorer initializes a new KVCacheScorer and returns its pointer. func NewKVCacheScorer() *KVCacheScorer { return &KVCacheScorer{} @@ -40,7 +48,7 @@ type KVCacheScorer struct{} // Name returns the name of the scorer. func (s *KVCacheScorer) Name() string { - return "kv-cache" + return KvCacheScorerName } // Score returns the scoring result for the given list of pods based on context. diff --git a/pkg/epp/scheduling/framework/plugins/scorer/queue.go b/pkg/epp/scheduling/framework/plugins/scorer/queue.go index dd311909f..0a769b84e 100644 --- a/pkg/epp/scheduling/framework/plugins/scorer/queue.go +++ b/pkg/epp/scheduling/framework/plugins/scorer/queue.go @@ -18,19 +18,27 @@ package scorer import ( "context" + "encoding/json" "math" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) const ( DefaultQueueScorerWeight = 1 + QueueScorerName = "queue" ) // compile-time type assertion var _ framework.Scorer = &QueueScorer{} +// QueueScorerFactory is the factory for the Queue based scorer +func QueueScorerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return &QueueScorer{}, nil +} + // NewQueueScorer initializes a new QueueScorer and returns its pointer. func NewQueueScorer() *QueueScorer { return &QueueScorer{} @@ -42,7 +50,7 @@ type QueueScorer struct{} // Name returns the name of the scorer. func (s *QueueScorer) Name() string { - return "queue" + return QueueScorerName } // Score returns the scoring result for the given list of pods based on context. diff --git a/pkg/epp/scheduling/scheduler.go b/pkg/epp/scheduling/scheduler.go index e9536ba8a..82c8e9999 100644 --- a/pkg/epp/scheduling/scheduler.go +++ b/pkg/epp/scheduling/scheduler.go @@ -28,7 +28,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker" - profilepicker "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -68,9 +68,9 @@ func NewScheduler(datastore Datastore) *Scheduler { WithFilters(lowLatencyFilter). WithPicker(&picker.RandomPicker{}) - profilePicker := profilepicker.NewSingleProfileHandler() + profileHandler := profile.NewSingleProfileHandler() - return NewSchedulerWithConfig(datastore, NewSchedulerConfig(profilePicker, map[string]*framework.SchedulerProfile{"default": defaultProfile})) + return NewSchedulerWithConfig(datastore, NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile})) } // NewSchedulerWithConfig returns a new scheduler with the given scheduler plugins configuration. diff --git a/pkg/epp/scheduling/scheduler_config.go b/pkg/epp/scheduling/scheduler_config.go index 9549d04f6..4e83aa812 100644 --- a/pkg/epp/scheduling/scheduler_config.go +++ b/pkg/epp/scheduling/scheduler_config.go @@ -16,7 +16,16 @@ limitations under the License. package scheduling -import "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" +import ( + "errors" + "fmt" + + "github.com/go-logr/logr" + + "sigs.k8s.io/gateway-api-inference-extension/api/config/v1alpha1" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" +) // NewSchedulerConfig creates a new SchedulerConfig object and returns its pointer. func NewSchedulerConfig(profileHandler framework.ProfileHandler, profiles map[string]*framework.SchedulerProfile) *SchedulerConfig { @@ -31,3 +40,48 @@ type SchedulerConfig struct { profileHandler framework.ProfileHandler profiles map[string]*framework.SchedulerProfile } + +func LoadSchedulerConfig(configProfiles []v1alpha1.SchedulingProfile, references map[string]plugins.Plugin, + log logr.Logger) (*SchedulerConfig, error) { + + var profiles = map[string]*framework.SchedulerProfile{} + + for _, configProfile := range configProfiles { + profile := framework.SchedulerProfile{} + + for _, plugin := range configProfile.Plugins { + var err error + thePlugin := references[plugin.PluginRef] + if theScorer, ok := thePlugin.(framework.Scorer); ok { + if plugin.Weight == nil { + err = fmt.Errorf("scorer %s is missing a weight", plugin.PluginRef) + log.Error(err, "failed to instantiate scheduler profile") + return nil, err + } + thePlugin = framework.NewWeightedScorer(theScorer, *plugin.Weight) + } + err = profile.AddPlugins(thePlugin) + if err != nil { + return nil, err + } + } + profiles[configProfile.Name] = &profile + } + + var profileHandler framework.ProfileHandler + var profileHandlerName string + + for pluginName, thePlugin := range references { + if theProfileHandler, ok := thePlugin.(framework.ProfileHandler); ok { + if profileHandler != nil { + return nil, fmt.Errorf("only one profile handler is allowed. Both %s and %s are profile handlers", profileHandlerName, pluginName) + } + profileHandler = theProfileHandler + profileHandlerName = pluginName + } + } + if profileHandler != nil { + return NewSchedulerConfig(profileHandler, profiles), nil + } + return nil, errors.New("no profile handler was specified") +} diff --git a/pkg/epp/scheduling/scheduler_config_test.go b/pkg/epp/scheduling/scheduler_config_test.go new file mode 100644 index 000000000..495df31c7 --- /dev/null +++ b/pkg/epp/scheduling/scheduler_config_test.go @@ -0,0 +1,263 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + "testing" + + commonconfig "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +func TestLoadSchedulerConfig(t *testing.T) { + log := logutil.NewTestLogger() + + tests := []struct { + name string + configText string + wantErr bool + }{ + { + name: "success", + configText: successConfigText, + wantErr: false, + }, + { + name: "errorBadPluginJson", + configText: errorBadPluginJsonText, + wantErr: true, + }, + { + name: "errorBadReferenceNoWeight", + configText: errorBadReferenceNoWeightText, + wantErr: true, + }, + { + name: "errorPluginReferenceJson", + configText: errorPluginReferenceJsonText, + wantErr: true, + }, + { + name: "errorTwoPickers", + configText: errorTwoPickersText, + wantErr: true, + }, + { + name: "errorConfig", + configText: errorConfigText, + wantErr: true, + }, + { + name: "errorTwoProfileHandlers", + configText: errorTwoProfileHandlersText, + wantErr: true, + }, + { + name: "errorNoProfileHandlers", + configText: errorNoProfileHandlersText, + wantErr: true, + }, + } + + registerNeededPlgugins() + + for _, test := range tests { + theConfig, err := commonconfig.LoadConfig([]byte(test.configText), "") + if err != nil { + if test.wantErr { + continue + } + t.Fatalf("LoadConfig returned unexpected error: %v", err) + } + instantiatedPlugins, err := commonconfig.LoadPluginReferences(theConfig.Plugins, testHandle{}) + if err != nil { + if test.wantErr { + continue + } + t.Fatalf("LoadPluginReferences returned unexpected error: %v", err) + } + + _, err = LoadSchedulerConfig(theConfig.SchedulingProfiles, instantiatedPlugins, log) + if err != nil { + if !test.wantErr { + t.Errorf("LoadSchedulerConfig returned an unexpected error. error %v", err) + } + } else if test.wantErr { + t.Errorf("LoadSchedulerConfig did not return an expected error (%s)", test.name) + } + } +} + +type testHandle struct { +} + +func registerNeededPlgugins() { + allPlugins := map[string]plugins.Factory{ + filter.LowQueueFilterName: filter.LowQueueFilterFactory, + prefix.PrefixCachePluginName: prefix.PrefixCachePluginFactory, + picker.MaxScorePickerName: picker.MaxScorePickerFactory, + picker.RandomPickerName: picker.RandomPickerFactory, + profile.SingleProfileHandlerName: profile.SingleProfileHandlerFactory, + } + for name, factory := range allPlugins { + plugins.Register(name, factory) + } +} + +// The following multi-line string constants, cause false positive lint errors (dupword) + +//nolint:dupword +const successConfigText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: lowQueue + pluginName: low-queue + parameters: + threshold: 10 +- name: prefixCache + pluginName: prefix-cache + parameters: + hashBlockSize: 32 +- name: maxScore + pluginName: max-score +- name: profileHandler + pluginName: single-profile +schedulingProfiles: +- name: default + plugins: + - pluginRef: lowQueue + - pluginRef: prefixCache + weight: 50 + - pluginRef: maxScore +` + +//nolint:dupword +const errorBadPluginJsonText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name:profileHandler + pluginName: single-profile +- name: prefixCache + pluginName: prefix-cache + parameters: + hashBlockSize: asdf +schedulingProfiles: +- name: default + plugins: + - pluginRef: prefixCache + weight: 50 +` + +//nolint:dupword +const errorBadReferenceNoWeightText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: profileHandler + pluginName: single-profile +- name: prefixCache + pluginName: prefix-cache + parameters: + hashBlockSize: 32 +schedulingProfiles: +- name: default + plugins: + - pluginRef: prefixCache +` + +//nolint:dupword +const errorPluginReferenceJsonText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: lowQueue + pluginName: low-queue + parameters: + threshold: qwer +- name: profileHandler + pluginName: single-profile +schedulingProfiles: +- name: default + plugins: + - pluginRef: lowQueue +` + +//nolint:dupword +const errorTwoPickersText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: profileHandler + pluginName: single-profile +- name: maxScore + pluginName: max-score +- name: random + pluginName: random +schedulingProfiles: +- name: default + plugins: + - pluginRef: maxScore + - pluginRef: random +` + +//nolint:dupword +const errorConfigText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: lowQueue + pluginName: low-queue + parameters: + threshold: 10 +` + +//nolint:dupword +const errorTwoProfileHandlersText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: profileHandler + pluginName: single-profile +- name: secondProfileHandler + pluginName: single-profile +- name: maxScore + pluginName: max-score +schedulingProfiles: +- name: default + plugins: + - pluginRef: maxScore +` + +//nolint:dupword +const errorNoProfileHandlersText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: maxScore + pluginName: max-score +schedulingProfiles: +- name: default + plugins: + - pluginRef: maxScore +` diff --git a/test/testdata/configloader_1_test.yaml b/test/testdata/configloader_1_test.yaml new file mode 100644 index 000000000..bd66b4590 --- /dev/null +++ b/test/testdata/configloader_1_test.yaml @@ -0,0 +1,22 @@ +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + pluginName: test-one + parameters: + threshold: 10 +- name: profileHandler + pluginName: test-profile-handler +- pluginName: test-two + parameters: + hashBlockSize: 32 +- name: testPicker + pluginName: test-picker + +schedulingProfiles: +- name: default + plugins: + - pluginRef: test1 + - pluginRef: test-two + weight: 50 + - pluginRef: testPicker