@@ -28,6 +28,7 @@ import (
2828 healthPb "google.golang.org/grpc/health/grpc_health_v1"
2929 "k8s.io/apimachinery/pkg/types"
3030 ctrl "sigs.k8s.io/controller-runtime"
31+ "sigs.k8s.io/controller-runtime/pkg/log"
3132 "sigs.k8s.io/controller-runtime/pkg/log/zap"
3233 "sigs.k8s.io/controller-runtime/pkg/manager"
3334 "sigs.k8s.io/controller-runtime/pkg/metrics/filters"
@@ -38,7 +39,13 @@ import (
3839 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3940 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4041 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
42+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
43+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
44+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
45+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/prefix"
46+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
4147 runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
48+ envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
4249 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4350)
4451
@@ -97,8 +104,22 @@ var (
97104 "Prometheus metric for the LoRA info metrics (must be in vLLM label format)." )
98105
99106 setupLog = ctrl .Log .WithName ("setup" )
107+
108+ // Environment variables
109+ schedulerV2 = envutil .GetEnvString ("EXPERIMENTAL_USE_SCHEDULER_V2" , "false" , setupLog )
110+ prefixCacheScheduling = envutil .GetEnvString ("ENABLE_PREFIX_CACHE_SCHEDULING" , "false" , setupLog )
100111)
101112
113+ func loadPrefixCacheConfig () prefix.Config {
114+ baseLogger := log .Log .WithName ("env-config" )
115+
116+ return prefix.Config {
117+ HashBlockSize : envutil .GetEnvInt ("PREFIX_CACHE_HASH_BLOCK_SIZE" , prefix .DefaultHashBlockSize , baseLogger ),
118+ MaxPrefixBlocksToMatch : envutil .GetEnvInt ("PREFIX_CACHE_MAX_PREFIX_BLOCKS" , prefix .DefaultMaxPrefixBlocks , baseLogger ),
119+ LRUIndexerCapacity : envutil .GetEnvInt ("PREFIX_CACHE_LRU_CAPACITY" , prefix .DefaultLRUIndexerCapacity , baseLogger ),
120+ }
121+ }
122+
102123func main () {
103124 if err := run (); err != nil {
104125 os .Exit (1 )
@@ -173,6 +194,27 @@ func run() error {
173194 datastore := datastore .NewDatastore (ctx , pmf )
174195
175196 scheduler := scheduling .NewScheduler (datastore )
197+ if schedulerV2 == "true" {
198+ queueScorerWeight := envutil .GetEnvInt ("QUEUE_SCORE_WEIGHT" , scorer .DefaultQueueScorerWeight , setupLog )
199+ kvCacheScorerWeight := envutil .GetEnvInt ("KV_CACHE_SCORE_WEIGHT" , scorer .DefaultKVCacheScorerWeight , setupLog )
200+ scorers := map [plugins.Scorer ]int {
201+ & scorer.QueueScorer {}: queueScorerWeight ,
202+ & scorer.KVCacheScorer {}: kvCacheScorerWeight ,
203+ }
204+ schedConfigOpts := []scheduling.ConfigOption {}
205+ if prefixCacheScheduling == "true" {
206+ prefixScorerWeight := envutil .GetEnvInt ("PREFIX_CACHE_SCORE_WEIGHT" , prefix .DefaultScorerWeight , setupLog )
207+ schedConfigOpts = append (schedConfigOpts , scheduling .AddPrefixPlugin (loadPrefixCacheConfig (), prefixScorerWeight ))
208+ }
209+ schedulerConfig := scheduling .NewSchedulerConfig (
210+ []plugins.PreSchedule {},
211+ []plugins.Filter {filter .NewSheddableCapacityFilter ()},
212+ scorers ,
213+ picker .NewMaxScorePicker (),
214+ []plugins.PostSchedule {},
215+ schedConfigOpts ... )
216+ scheduler = scheduling .NewSchedulerWithConfig (datastore , schedulerConfig )
217+ }
176218 serverRunner := & runserver.ExtProcServerRunner {
177219 GrpcPort : * grpcPort ,
178220 DestinationEndpointHintMetadataNamespace : * destinationEndpointHintMetadataNamespace ,
0 commit comments