From c181bbcda0006aed5a89328b4b22882e512eb805 Mon Sep 17 00:00:00 2001 From: zhengkezhou1 Date: Tue, 10 Jun 2025 16:10:59 +0800 Subject: [PATCH] refactor: Move scheduling parameter updates to main Signed-off-by: zhengkezhou1 --- cmd/epp/main.go | 56 ++++++++++++++++--- pkg/epp/saturationdetector/config.go | 37 ------------ .../saturationdetector_test.go | 32 +---------- 3 files changed, 48 insertions(+), 77 deletions(-) diff --git a/cmd/epp/main.go b/cmd/epp/main.go index cef2649d0..2f8c39d94 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -20,6 +20,9 @@ import ( "flag" "fmt" "os" + "time" + + commonconfig "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" @@ -35,7 +38,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp" "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" @@ -112,11 +114,51 @@ var ( setupLog = ctrl.Log.WithName("setup") // Environment variables - schedulerV2 = envutil.GetEnvBool("EXPERIMENTAL_USE_SCHEDULER_V2", false, setupLog) - prefixCacheScheduling = envutil.GetEnvBool("ENABLE_PREFIX_CACHE_SCHEDULING", false, setupLog) - reqHeaderBasedSchedulerForTesting = envutil.GetEnvBool("ENABLE_REQ_HEADER_BASED_SCHEDULER_FOR_TESTING", false, setupLog) + schedulerV2 = envutil.GetEnvBool("EXPERIMENTAL_USE_SCHEDULER_V2", false, setupLog) + prefixCacheScheduling = envutil.GetEnvBool("ENABLE_PREFIX_CACHE_SCHEDULING", false, setupLog) +) + +// Default saturationdetector configuration values +const ( + DefaultQueueDepthThreshold = commonconfig.DefaultQueueThresholdCritical + DefaultKVCacheUtilThreshold = commonconfig.DefaultKVCacheThreshold + // DefaultMetricsStalenessThreshold defines how old metrics can be before they + // are considered stale. + // Given the pod metrics refresh interval is 50ms, a threshold slightly above + // that should be fine. + DefaultMetricsStalenessThreshold = 200 * time.Millisecond + + // Environment variable names for SaturationDetector configuration + EnvSdQueueDepthThreshold = "SD_QUEUE_DEPTH_THRESHOLD" + EnvSdKVCacheUtilThreshold = "SD_KV_CACHE_UTIL_THRESHOLD" + EnvSdMetricsStalenessThreshold = "SD_METRICS_STALENESS_THRESHOLD" ) +func loadSaturationDetectorConfig() *saturationdetector.Config { + logger := log.Log.WithName("saturation-detector-config") + + cfg := &saturationdetector.Config{} + + cfg.QueueDepthThreshold = envutil.GetEnvInt(EnvSdQueueDepthThreshold, DefaultQueueDepthThreshold, logger) + if cfg.QueueDepthThreshold <= 0 { + cfg.QueueDepthThreshold = DefaultQueueDepthThreshold + } + + cfg.KVCacheUtilThreshold = envutil.GetEnvFloat(EnvSdKVCacheUtilThreshold, DefaultKVCacheUtilThreshold, logger) + if cfg.KVCacheUtilThreshold <= 0 || cfg.KVCacheUtilThreshold >= 1 { + cfg.KVCacheUtilThreshold = DefaultKVCacheUtilThreshold + } + + cfg.MetricsStalenessThreshold = envutil.GetEnvDuration(EnvSdMetricsStalenessThreshold, DefaultMetricsStalenessThreshold, logger) + if cfg.MetricsStalenessThreshold <= 0 { + cfg.MetricsStalenessThreshold = DefaultMetricsStalenessThreshold + } + + // NewDetector validates the config and assigns defaults. + logger.Info("SaturationDetector configuration loaded from env", "config", fmt.Sprintf("%+v", cfg)) + return cfg +} + func loadPrefixCacheConfig() prefix.Config { baseLogger := log.Log.WithName("env-config") @@ -155,7 +197,7 @@ func run() error { setupLog.Info("Flags processed", "flags", flags) // --- Load Configurations from Environment Variables --- - sdConfig := saturationdetector.LoadConfigFromEnv() + sdConfig := loadSaturationDetectorConfig() // --- Get Kubernetes Config --- cfg, err := ctrl.GetConfig() @@ -226,10 +268,6 @@ func run() error { scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig) } - if reqHeaderBasedSchedulerForTesting { - scheduler = conformance_epp.NewReqHeaderBasedScheduler(datastore) - } - saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log) director := requestcontrol.NewDirector(datastore, scheduler, saturationDetector) // can call "director.WithPostResponsePlugins" to add post response plugins diff --git a/pkg/epp/saturationdetector/config.go b/pkg/epp/saturationdetector/config.go index 78a5833e4..dea215e4a 100644 --- a/pkg/epp/saturationdetector/config.go +++ b/pkg/epp/saturationdetector/config.go @@ -16,12 +16,9 @@ limitations under the License. package saturationdetector import ( - "fmt" "time" - "sigs.k8s.io/controller-runtime/pkg/log" commonconfig "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config" - envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env" ) // Default configuration values @@ -34,37 +31,3 @@ const ( // that should be fine. DefaultMetricsStalenessThreshold = 200 * time.Millisecond ) - -// Environment variable names for SaturationDetector configuration -const ( - EnvSdQueueDepthThreshold = "SD_QUEUE_DEPTH_THRESHOLD" - EnvSdKVCacheUtilThreshold = "SD_KV_CACHE_UTIL_THRESHOLD" - EnvSdMetricsStalenessThreshold = "SD_METRICS_STALENESS_THRESHOLD" -) - -// LoadConfigFromEnv loads SaturationDetector Config from environment variables. -func LoadConfigFromEnv() *Config { - // Use a default logger for initial configuration loading. - logger := log.Log.WithName("saturation-detector-config") - - cfg := &Config{} - - cfg.QueueDepthThreshold = envutil.GetEnvInt(EnvSdQueueDepthThreshold, DefaultQueueDepthThreshold, logger) - if cfg.QueueDepthThreshold <= 0 { - cfg.QueueDepthThreshold = DefaultQueueDepthThreshold - } - - cfg.KVCacheUtilThreshold = envutil.GetEnvFloat(EnvSdKVCacheUtilThreshold, DefaultKVCacheUtilThreshold, logger) - if cfg.KVCacheUtilThreshold <= 0 || cfg.KVCacheUtilThreshold >= 1 { - cfg.KVCacheUtilThreshold = DefaultKVCacheUtilThreshold - } - - cfg.MetricsStalenessThreshold = envutil.GetEnvDuration(EnvSdMetricsStalenessThreshold, DefaultMetricsStalenessThreshold, logger) - if cfg.MetricsStalenessThreshold <= 0 { - cfg.MetricsStalenessThreshold = DefaultMetricsStalenessThreshold - } - - // NewDetector validates the config and assigns defaults. - logger.Info("SaturationDetector configuration loaded from env", "config", fmt.Sprintf("%+v", cfg)) - return cfg -} diff --git a/pkg/epp/saturationdetector/saturationdetector_test.go b/pkg/epp/saturationdetector/saturationdetector_test.go index 42e81b5fd..92d8728f1 100644 --- a/pkg/epp/saturationdetector/saturationdetector_test.go +++ b/pkg/epp/saturationdetector/saturationdetector_test.go @@ -18,9 +18,6 @@ package saturationdetector import ( "context" - "fmt" - "os" - "strconv" "testing" "time" @@ -77,40 +74,13 @@ func TestNewDetector(t *testing.T) { expectedKVCacheUtilThreshold: 0.8, expectedStalenessThreshold: 100 * time.Millisecond, }, - { - name: "invalid thresholds, fallback to default", - config: &Config{ - QueueDepthThreshold: -1, - KVCacheUtilThreshold: -5, - MetricsStalenessThreshold: 0, - }, - datastore: &mockDatastore{}, - expectedQueueDepthThreshold: DefaultQueueDepthThreshold, - expectedKVCacheUtilThreshold: DefaultKVCacheUtilThreshold, - expectedStalenessThreshold: DefaultMetricsStalenessThreshold, - }, - { - name: "kv cache threshold above range, fallback to default", - config: &Config{ - QueueDepthThreshold: 10, - KVCacheUtilThreshold: 1.5, - MetricsStalenessThreshold: 100 * time.Millisecond, - }, - datastore: &mockDatastore{}, - expectedQueueDepthThreshold: 10, - expectedKVCacheUtilThreshold: DefaultKVCacheUtilThreshold, - expectedStalenessThreshold: 100 * time.Millisecond, - }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { // validate configuration values are loaded from env vars properly, including the use of default values when provided value is invalid. - os.Setenv(EnvSdQueueDepthThreshold, strconv.Itoa(test.config.QueueDepthThreshold)) - os.Setenv(EnvSdKVCacheUtilThreshold, fmt.Sprintf("%v", test.config.KVCacheUtilThreshold)) - os.Setenv(EnvSdMetricsStalenessThreshold, test.config.MetricsStalenessThreshold.String()) - detector := NewDetector(LoadConfigFromEnv(), test.datastore, logr.Discard()) + detector := NewDetector(test.config, test.datastore, logr.Discard()) if detector == nil { t.Fatalf("NewDetector() returned nil detector for valid config") }