From dc174c2a9c32faa8377e88cf5fd9f76b9f999973 Mon Sep 17 00:00:00 2001 From: Maroon Ayoub Date: Wed, 30 Apr 2025 17:05:41 +0300 Subject: [PATCH 1/6] - implemented kvcache-aware-scorer - added configuration Signed-off-by: Maroon Ayoub --- Dockerfile | 23 ++- Makefile | 7 +- README.md | 17 +++ cmd/epp/main.go | 1 - go.mod | 19 ++- go.sum | 36 +++-- pkg/epp/backend/metrics/fake.go | 2 + pkg/epp/backend/metrics/metrics.go | 1 - pkg/epp/backend/metrics/metrics_test.go | 3 - pkg/epp/backend/metrics/pod_metrics_test.go | 2 + .../inferencemodel_reconciler_test.go | 1 - pkg/epp/datastore/datastore.go | 4 +- pkg/epp/datastore/datastore_test.go | 2 - pkg/epp/handlers/request.go | 7 + pkg/epp/scheduling/local_config.go | 76 +++++++++- pkg/epp/scheduling/plugins/filter/filter.go | 1 - .../plugins/scorer/kvcache-aware-scorer.go | 141 ++++++++++++++++++ .../{scorers => scorer}/load_based_scorer.go | 11 +- pkg/epp/scheduling/scheduler.go | 1 + pkg/epp/scheduling/scheduler_test.go | 1 - pkg/epp/scheduling/scorers_test.go | 4 +- test/e2e/epp/e2e_test.go | 1 - test/integration/bbr/hermetic_test.go | 6 +- test/integration/epp/hermetic_test.go | 45 ++++-- test/utils/utils.go | 1 - 25 files changed, 348 insertions(+), 65 deletions(-) create mode 100644 pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go rename pkg/epp/scheduling/plugins/{scorers => scorer}/load_based_scorer.go (88%) diff --git a/Dockerfile b/Dockerfile index a92cbb71..5f7631ee 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,15 +3,26 @@ FROM quay.io/projectquay/golang:1.24 AS builder ARG TARGETOS ARG TARGETARCH -# ENV GOPROXY=https://goproxy.io,direct +# Install build tools +RUN dnf install -y gcc-c++ libstdc++ libstdc++-devel && dnf clean all WORKDIR /workspace + +## NeuralMagic internal repos pull config +ARG GIT_NM_USER +ARG NM_TOKEN +### use git token +RUN echo -e "machine github.com\n\tlogin ${GIT_NM_USER}\n\tpassword ${NM_TOKEN}" >> ~/.netrc +ENV GOPRIVATE=github.com/neuralmagic +ENV GIT_TERMINAL_PROMPT=1 + # Copy the Go Modules manifests COPY go.mod go.mod COPY go.sum go.sum # cache deps before building and copying source so that we don't need to re-download as much # and so that source changes don't invalidate our downloaded layer RUN go mod download +RUN rm -rf ~/.netrc # remove git token # Copy the go source COPY cmd ./cmd @@ -19,12 +30,20 @@ COPY pkg ./pkg COPY internal ./internal COPY api ./api +# HuggingFace tokenizer bindings +RUN mkdir -p lib +RUN curl -L https://github.com/daulet/tokenizers/releases/download/v1.20.2/libtokenizers.${TARGETOS}-${TARGETARCH}.tar.gz | tar -xz -C lib +RUN ranlib lib/*.a + # Build # the GOARCH has not a default value to allow the binary be built according to the host where the command # was called. For example, if we call make image-build in a local env which has the Apple Silicon M1 SO # the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore, # by leaving it empty we can ensure that the container and binary shipped on it will have the same platform. -RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -o bin/epp cmd/epp/main.go cmd/epp/health.go +ENV CGO_ENABLED=1 +ENV GOOS=${TARGETOS:-linux} +ENV GOARCH=${TARGETARCH} +RUN go build -o bin/epp -ldflags="-extldflags '-L$(pwd)/lib'" cmd/epp/main.go cmd/epp/health.go # Use distroless as minimal base image to package the manager binary # Refer to https://github.com/GoogleContainerTools/distroless for more details diff --git a/Makefile b/Makefile index 0bfb19fc..bb4f078d 100644 --- a/Makefile +++ b/Makefile @@ -489,7 +489,12 @@ buildah-build: check-builder load-version-json ## Build and push image (multi-ar .PHONY: image-build image-build: check-container-tool load-version-json ## Build container image using $(CONTAINER_TOOL) @printf "\033[33;1m==== Building container image $(IMG) ====\033[0m\n" - $(CONTAINER_TOOL) build --build-arg TARGETOS=$(TARGETOS) --build-arg TARGETARCH=$(TARGETARCH) -t $(IMG) . + $(CONTAINER_TOOL) build --platform=$(TARGETOS)/$(TARGETARCH) \ + --build-arg TARGETOS=$(TARGETOS) \ + --build-arg TARGETARCH=$(TARGETARCH) \ + --build-arg GIT_NM_USER=$(GIT_NM_USER)\ + --build-arg NM_TOKEN=$(NM_TOKEN) \ + -t $(IMG) . .PHONY: image-push image-push: check-container-tool load-version-json ## Push container image $(IMG) to registry diff --git a/README.md b/README.md index 4cdb1781..12d4186e 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,23 @@ This project offers tools for AI Inference, enabling developers to build [Inference Gateways]. +--- +## Temporary Fork Configuration + +To enable KVCacheAwareScorer, the following env vars must be configured: +``` +export ENABLE_KVCACHE_AWARE_SCORER=true +export KVCACHE_AWARE_SCORER_WEIGHT=1.0 +export KVCACHE_INDEXER_REDIS_ADDR= +export HF_TOKEN= +``` + +To enable LoadAwareScorer, the following env vars must be configured: +``` +export ENABLE_LOAD_AWARE_SCORER=true +export LOAD_AWARE_SCORER_WEIGHT=1.0 +``` +--- [Inference Gateways]:#concepts-and-definitions ## Concepts and Definitions diff --git a/cmd/epp/main.go b/cmd/epp/main.go index c0a87e62..3c383225 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -314,5 +314,4 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge if mapping.LoraRequestInfo == nil { logger.Info("Not scraping metric: LoraRequestInfo") } - } diff --git a/go.mod b/go.mod index 7da23767..dff0542e 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,15 @@ module sigs.k8s.io/gateway-api-inference-extension -go 1.24.0 +go 1.24.1 + +toolchain go1.24.2 require ( github.com/elastic/crd-ref-docs v0.1.0 github.com/envoyproxy/go-control-plane/envoy v1.32.4 github.com/go-logr/logr v1.4.2 github.com/google/go-cmp v0.7.0 + github.com/neuralmagic/llm-d-kv-cache-manager v0.0.0-20250430102735-86595011431d github.com/onsi/ginkgo/v2 v2.23.4 github.com/onsi/gomega v1.37.0 github.com/prometheus/client_golang v1.22.0 @@ -41,7 +44,9 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20241223141626-cff3c89139a3 // indirect + github.com/daulet/tokenizers v1.20.2 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect @@ -69,6 +74,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/huandu/xstrings v1.3.3 // indirect github.com/imdario/mergo v0.3.11 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -90,6 +96,7 @@ require ( github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/redis/go-redis/v9 v9.7.3 // indirect github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect @@ -104,15 +111,15 @@ require ( go.opentelemetry.io/otel/trace v1.34.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/automaxprocs v1.6.0 // indirect - golang.org/x/crypto v0.36.0 // indirect + golang.org/x/crypto v0.37.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.24.0 // indirect - golang.org/x/net v0.38.0 // indirect + golang.org/x/net v0.39.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect - golang.org/x/sync v0.12.0 // indirect + golang.org/x/sync v0.13.0 // indirect golang.org/x/sys v0.32.0 // indirect - golang.org/x/term v0.30.0 // indirect - golang.org/x/text v0.23.0 // indirect + golang.org/x/term v0.31.0 // indirect + golang.org/x/text v0.24.0 // indirect golang.org/x/time v0.7.0 // indirect golang.org/x/tools v0.31.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect diff --git a/go.sum b/go.sum index 11c244d4..ea299e2f 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -24,10 +28,14 @@ github.com/cncf/xds/go v0.0.0-20241223141626-cff3c89139a3 h1:boJj011Hh+874zpIySe github.com/cncf/xds/go v0.0.0-20241223141626-cff3c89139a3/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/daulet/tokenizers v1.20.2 h1:tlq/vIOiBTKDPets3596aFvmJYLn3XI6LFKq4q9LKhQ= +github.com/daulet/tokenizers v1.20.2/go.mod h1:tGnMdZthXdcWY6DGD07IygpwJqiPvG85FQUnhs/wSCs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/elastic/crd-ref-docs v0.1.0 h1:Cr5kz89QB3Iuuj7dhAfLMApCrChEGAaIBTxGk/xuRKw= github.com/elastic/crd-ref-docs v0.1.0/go.mod h1:X83mMBdJt05heJUYiS3T0yJ/JkCuliuhSUNav5Gjo/U= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= @@ -100,6 +108,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/huandu/xstrings v1.3.3 h1:/Gcsuc1x8JVbJ9/rlye4xZnVAbEkGauT8lbebqcQws4= github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA= @@ -147,6 +157,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/neuralmagic/llm-d-kv-cache-manager v0.0.0-20250430102735-86595011431d h1:6YSxvAG4ve5jy0nTLs509OMU5fuiQ3JNQdZxqiu8PgQ= +github.com/neuralmagic/llm-d-kv-cache-manager v0.0.0-20250430102735-86595011431d/go.mod h1:VB+KcEemkO1ZKpz/hgUPQMU9oSLv2uCLW6y6c+r8jkQ= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= @@ -172,6 +184,8 @@ github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= +github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -226,8 +240,8 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= -golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= +golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -238,17 +252,15 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= -golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= -golang.org/x/oauth2 v0.25.0 h1:CY4y7XT9v0cRI9oupztF8AgiIu99L/ksR/Xp/6jrZ70= -golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= +golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= -golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= +golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -256,13 +268,13 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= -golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= +golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o= +golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/pkg/epp/backend/metrics/fake.go b/pkg/epp/backend/metrics/fake.go index ec97c6de..d1b373fd 100644 --- a/pkg/epp/backend/metrics/fake.go +++ b/pkg/epp/backend/metrics/fake.go @@ -40,9 +40,11 @@ func (fpm *FakePodMetrics) String() string { func (fpm *FakePodMetrics) GetPod() *Pod { return fpm.Pod } + func (fpm *FakePodMetrics) GetMetrics() *Metrics { return fpm.Metrics } + func (fpm *FakePodMetrics) UpdatePod(pod *corev1.Pod) { fpm.Pod = toInternalPod(pod) } diff --git a/pkg/epp/backend/metrics/metrics.go b/pkg/epp/backend/metrics/metrics.go index 96814b4b..efe847dd 100644 --- a/pkg/epp/backend/metrics/metrics.go +++ b/pkg/epp/backend/metrics/metrics.go @@ -47,7 +47,6 @@ func (p *PodMetricsClientImpl) FetchMetrics( existing *Metrics, port int32, ) (*Metrics, error) { - // Currently the metrics endpoint is hard-coded, which works with vLLM. // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16): Consume this from InferencePool config. url := "http://" + pod.Address + ":" + strconv.Itoa(int(port)) + "/metrics" diff --git a/pkg/epp/backend/metrics/metrics_test.go b/pkg/epp/backend/metrics/metrics_test.go index e3b45b94..c69f2c67 100644 --- a/pkg/epp/backend/metrics/metrics_test.go +++ b/pkg/epp/backend/metrics/metrics_test.go @@ -58,7 +58,6 @@ func makeMetricFamily(name string, metrics ...*dto.Metric) *dto.MetricFamily { // --- Tests --- func TestGetMetric(t *testing.T) { - metricFamilies := map[string]*dto.MetricFamily{ "metric1": makeMetricFamily("metric1", makeMetric(map[string]string{"label1": "value1"}, 1.0, 1000), @@ -166,7 +165,6 @@ func TestGetMetric(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotMetric, err := p.getMetric(metricFamilies, tt.spec) if tt.wantError { @@ -240,7 +238,6 @@ func TestLabelsMatch(t *testing.T) { } func TestGetLatestLoraMetric(t *testing.T) { - testCases := []struct { name string metricFamilies map[string]*dto.MetricFamily diff --git a/pkg/epp/backend/metrics/pod_metrics_test.go b/pkg/epp/backend/metrics/pod_metrics_test.go index e79c1bf0..8d5f064a 100644 --- a/pkg/epp/backend/metrics/pod_metrics_test.go +++ b/pkg/epp/backend/metrics/pod_metrics_test.go @@ -88,10 +88,12 @@ type fakeDataStore struct{} func (f *fakeDataStore) PoolGet() (*v1alpha2.InferencePool, error) { return &v1alpha2.InferencePool{Spec: v1alpha2.InferencePoolSpec{TargetPortNumber: 8000}}, nil } + func (f *fakeDataStore) PodGetAll() []PodMetrics { // Not implemented. return nil } + func (f *fakeDataStore) PodList(func(PodMetrics) bool) []PodMetrics { // Not implemented. return nil diff --git a/pkg/epp/controller/inferencemodel_reconciler_test.go b/pkg/epp/controller/inferencemodel_reconciler_test.go index 80c30e19..024b6901 100644 --- a/pkg/epp/controller/inferencemodel_reconciler_test.go +++ b/pkg/epp/controller/inferencemodel_reconciler_test.go @@ -227,7 +227,6 @@ func TestInferenceModelReconciler(t *testing.T) { if diff := diffStore(ds, diffStoreParams{wantPool: pool, wantModels: test.wantModels}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } - }) } } diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index 288c4d7b..630b7119 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -38,9 +38,7 @@ const ( ModelNameIndexKey = "spec.modelName" ) -var ( - errPoolNotSynced = errors.New("InferencePool is not initialized in data store") -) +var errPoolNotSynced = errors.New("InferencePool is not initialized in data store") // The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api) type Datastore interface { diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index b6466e6b..248e95b4 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -204,7 +204,6 @@ func TestModel(t *testing.T) { existing := ds.ModelDelete(types.NamespacedName{Name: model1ts.Name, Namespace: model1ts.Namespace}) got := ds.ModelGet(tsModel) return existing != nil && got == nil - }, wantOpResult: true, wantModels: []*v1alpha2.InferenceModel{model2chat}, @@ -226,7 +225,6 @@ func TestModel(t *testing.T) { if diff := testutil.DiffModelLists(test.wantModels, ds.ModelGetAll()); diff != "" { t.Errorf("Unexpected models diff: %s", diff) } - }) } } diff --git a/pkg/epp/handlers/request.go b/pkg/epp/handlers/request.go index 203afc2f..4997a8b3 100644 --- a/pkg/epp/handlers/request.go +++ b/pkg/epp/handlers/request.go @@ -31,6 +31,8 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) +const emptyPrompt = "" + // HandleRequestBody always returns the requestContext even in the error case, as the request context is used in error handling. func (s *StreamingServer) HandleRequestBody( ctx context.Context, @@ -68,6 +70,7 @@ func (s *StreamingServer) HandleRequestBody( Headers: reqCtx.RequestHeaders, ResolvedTargetModel: modelName, Critical: modelObj.Spec.Criticality != nil && *modelObj.Spec.Criticality == v1alpha2.Critical, + Prompt: emptyPrompt, } logger.V(logutil.DEBUG).Info("LLM request assembled", "request", llmReq) @@ -76,6 +79,10 @@ func (s *StreamingServer) HandleRequestBody( if llmReq.Model != llmReq.ResolvedTargetModel { requestBodyMap["model"] = llmReq.ResolvedTargetModel } + // Extract prompt from the request body. + if prompt, ok := requestBodyMap["prompt"].(string); ok { + llmReq.Prompt = prompt + } requestBodyBytes, err = json.Marshal(requestBodyMap) if err != nil { diff --git a/pkg/epp/scheduling/local_config.go b/pkg/epp/scheduling/local_config.go index 87098ae0..931c2848 100644 --- a/pkg/epp/scheduling/local_config.go +++ b/pkg/epp/scheduling/local_config.go @@ -17,12 +17,78 @@ limitations under the License. package scheduling import ( - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorers" + "context" + "os" + "strconv" + + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +const ( + kvCacheScorerEnablementEnvVar = "ENABLE_KVCACHE_AWARE_SCORER" + loadAwareScorerEnablementEnvVar = "ENABLE_LOAD_AWARE_SCORER" + + kvCacheScorerWeightEnvVar = "KVCACHE_AWARE_SCORER_WEIGHT" + loadAwareScorerWeightEnvVar = "LOAD_AWARE_SCORER_WEIGHT" ) -func init() { - defaultConfig.scorers[&scorers.LoadBasedScorer{}] = 1.0 +func setDefaultConfig() { + // since the default config is a global variable, we add this function to minimize rebase conflicts. + // this configuration is a temporary state, it should be better streamlined. + setLoadBasedScorer() + setKVCacheAwareScorer() +} + +func setLoadBasedScorer() { + ctx := context.Background() + loggerDebug := log.FromContext(ctx).WithName("scheduler_config").V(logutil.DEBUG) + + if os.Getenv(loadAwareScorerEnablementEnvVar) != "true" { + loggerDebug.Info("Skipping LoadAwareScorer creation as it is not enabled") + return + } + + loadBasedScorerWeight := 1 + if weightStr := os.Getenv(loadAwareScorerWeightEnvVar); weightStr != "" { + var err error + loadBasedScorerWeightInt64, err := strconv.ParseInt(weightStr, 10, 32) + if err != nil { + loggerDebug.Error(err, "Failed to parse LOAD_BASED_SCORER_WEIGHT") + } + + loadBasedScorerWeight = int(loadBasedScorerWeightInt64) + } + + defaultConfig.scorers[&scorer.LoadAwareScorer{}] = loadBasedScorerWeight +} + +func setKVCacheAwareScorer() { + ctx := context.Background() + loggerDebug := log.FromContext(ctx).WithName("scheduler_config").V(logutil.DEBUG) + + if os.Getenv(kvCacheScorerEnablementEnvVar) != "true" { + loggerDebug.Info("Skipping KVCacheAwareScorer creation as it is not enabled") + return + } + + kvCacheScorer, err := scorer.NewKVCacheAwareScorer(ctx) + if err != nil { + loggerDebug.Error(err, "Failed to create KVCacheAwareScorer") + return + } + + kvCacheScorerWeight := 1 + if weightStr := os.Getenv(kvCacheScorerWeightEnvVar); weightStr != "" { + var err error + kvCacheScorerWeightInt64, err := strconv.ParseInt(weightStr, 10, 32) + if err != nil { + loggerDebug.Error(err, "Failed to parse KVCACHE_SCORER_WEIGHT") + } + + kvCacheScorerWeight = int(kvCacheScorerWeightInt64) + } - // Added as a reference - // defaultConfig.filters = []plugins.Filter{filter.PDFilter} + defaultConfig.scorers[kvCacheScorer] = kvCacheScorerWeight } diff --git a/pkg/epp/scheduling/plugins/filter/filter.go b/pkg/epp/scheduling/plugins/filter/filter.go index 86620aa9..a8c68ea9 100644 --- a/pkg/epp/scheduling/plugins/filter/filter.go +++ b/pkg/epp/scheduling/plugins/filter/filter.go @@ -214,7 +214,6 @@ var LoRAAffinityFilter = &baseFilter{ // - Filtered slice of pod metrics based on affinity and availability // - Error if any issues occur during filtering func loRASoftAffinityFilterFunc(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod { - // Pre-allocate slices with estimated capacity filtered_affinity := make([]types.Pod, 0, len(pods)) filtered_available := make([]types.Pod, 0, len(pods)) diff --git a/pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go b/pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go new file mode 100644 index 00000000..ce23e2c7 --- /dev/null +++ b/pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go @@ -0,0 +1,141 @@ +/* +Copyright 2025 The Neural Magic Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scorer + +import ( + "context" + "fmt" + "os" + + kvcache "github.com/neuralmagic/llm-d-kv-cache-manager/pkg/kv-cache" + + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" + + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +const ( + kvCacheAwareScorerName = "kvcache-aware-scorer" + kvCacheRedisEnvVar = "KVCACHE_INDEXER_REDIS_ADDR" + huggingFaceTokenEnvVar = "HF_TOKEN" +) + +// KVCacheAwareScorer is a concrete implementation of the Scorer interface. +// It uses the KVCacheIndexer to score pods based on KVCache awareness. +type KVCacheAwareScorer struct { + kvCacheIndexer *kvcache.Indexer +} + +// NewKVCacheAwareScorer creates a new KVCacheAwareScorer instance. +// It initializes the KVCacheIndexer with the provided configuration, +// and runs it with the given context. +// +// If the configuration is nil, it uses the default configuration. +func NewKVCacheAwareScorer(ctx context.Context) (plugins.Scorer, error) { + config := kvcache.NewDefaultConfig() + + redisAddr := os.Getenv(kvCacheRedisEnvVar) + if redisAddr != "" { + config.KVBlockIndexerConfig.RedisKVBlockIndexerConfig.RedisAddr = redisAddr + } else { + return nil, fmt.Errorf("environment variable %s is not set", kvCacheRedisEnvVar) + } + + hfToken := os.Getenv(huggingFaceTokenEnvVar) + if hfToken != "" { + config.TokenizersPoolConfig.HFTokenizerConfig.HuggingFaceToken = hfToken + } else { + return nil, fmt.Errorf("environment variable %s is not set", huggingFaceTokenEnvVar) + } + + kvCacheIndexer, err := kvcache.NewKVCacheIndexer(config) + if err != nil { + return nil, fmt.Errorf("failed to create KVCacheIndexer: %w", err) + } + + go kvCacheIndexer.Run(ctx) + + return &KVCacheAwareScorer{ + kvCacheIndexer: kvCacheIndexer, + }, nil +} + +// Name returns the name of the scorer. +func (s *KVCacheAwareScorer) Name() string { + return kvCacheAwareScorerName +} + +// Score scores the provided pod based on the KVCache index state. +// This function is not concurrent-safe and should be called in a +// single-threaded manner. +func (s *KVCacheAwareScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 { + loggerDebug := log.FromContext(ctx).WithName(kvCacheAwareScorerName).V(logutil.DEBUG) + if ctx.Req == nil { + loggerDebug.Info("Request is nil, skipping scoring") + return nil + } + + scores, err := s.kvCacheIndexer.GetPodScores(ctx.Context, ctx.Req.Prompt, ctx.Req.Model, nil) + if err != nil { + loggerDebug.Error(err, "Failed to get pod scores") + return nil + } + + return indexerScoresToNormalizedScoredPods(pods, scores) +} + +func getMinMax(scores map[string]int) (int, int) { + minScore := int(^uint(0) >> 1) // max int + maxScore := -1 + + for _, score := range scores { + if score < minScore { + minScore = score + } + if score > maxScore { + maxScore = score + } + } + + return minScore, maxScore +} + +func indexerScoresToNormalizedScoredPods(pods []types.Pod, scores map[string]int) map[types.Pod]float64 { + scoredPods := make(map[types.Pod]float64) + minScore, maxScore := getMinMax(scores) + + for _, pod := range pods { + metricsPod := pod.GetPod() + if metricsPod == nil { + continue + } + + if score, ok := scores[metricsPod.Address]; ok { + if minScore == maxScore { + scoredPods[pod] = 1.0 + continue + } + + scoredPods[pod] = float64(score-minScore) / float64(maxScore-minScore) + } else { + scoredPods[pod] = 0.0 + } + } + return scoredPods +} diff --git a/pkg/epp/scheduling/plugins/scorers/load_based_scorer.go b/pkg/epp/scheduling/plugins/scorer/load_based_scorer.go similarity index 88% rename from pkg/epp/scheduling/plugins/scorers/load_based_scorer.go rename to pkg/epp/scheduling/plugins/scorer/load_based_scorer.go index 5bea87c9..d24f49b3 100644 --- a/pkg/epp/scheduling/plugins/scorers/load_based_scorer.go +++ b/pkg/epp/scheduling/plugins/scorer/load_based_scorer.go @@ -13,17 +13,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package scorers + +package scorer import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) -type LoadBasedScorer struct{} +type LoadAwareScorer struct{} -func (s LoadBasedScorer) Name() string { - return "load based scorer" +func (s *LoadAwareScorer) Name() string { + return "load-aware-scorer" } // Score scores the given pod in range of 0-1 @@ -33,7 +34,7 @@ func (s LoadBasedScorer) Name() string { // Pod with requests in the queue will get score between 0.5 and 0. // Score 0 will get pod with number of requests in the queue equal to the threshold used in load-based filter (QueueingThresholdLoRA) // In future pods with additional capacity will get score higher than 0.5 -func (s LoadBasedScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 { +func (s *LoadAwareScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 { scoredPods := make(map[types.Pod]float64) for _, pod := range pods { diff --git a/pkg/epp/scheduling/scheduler.go b/pkg/epp/scheduling/scheduler.go index 9bad6131..f4e1714d 100644 --- a/pkg/epp/scheduling/scheduler.go +++ b/pkg/epp/scheduling/scheduler.go @@ -69,6 +69,7 @@ var ( ) func NewScheduler(datastore Datastore) *Scheduler { + setDefaultConfig() return NewSchedulerWithConfig(datastore, defaultConfig) } diff --git a/pkg/epp/scheduling/scheduler_test.go b/pkg/epp/scheduling/scheduler_test.go index e6d229ae..cda65496 100644 --- a/pkg/epp/scheduling/scheduler_test.go +++ b/pkg/epp/scheduling/scheduler_test.go @@ -536,7 +536,6 @@ func (tp *TestPlugin) Filter(ctx *types.SchedulingContext, pods []types.Pod) []t tp.ReceivedRequestHeaders[key] = value } return findPods(ctx, tp.FilterRes...) - } func (tp *TestPlugin) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 { diff --git a/pkg/epp/scheduling/scorers_test.go b/pkg/epp/scheduling/scorers_test.go index 365b2375..a98a838b 100644 --- a/pkg/epp/scheduling/scorers_test.go +++ b/pkg/epp/scheduling/scorers_test.go @@ -25,7 +25,7 @@ import ( backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" // Import config for thresholds "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorers" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) @@ -40,7 +40,7 @@ func TestScorers(t *testing.T) { }{ { name: "load based scorer", - scorer: &scorers.LoadBasedScorer{}, + scorer: &scorer.LoadAwareScorer{}, req: &types.LLMRequest{ Model: "critical", ResolvedTargetModel: "critical", diff --git a/test/e2e/epp/e2e_test.go b/test/e2e/epp/e2e_test.go index 7240cebc..f0220b30 100644 --- a/test/e2e/epp/e2e_test.go +++ b/test/e2e/epp/e2e_test.go @@ -87,7 +87,6 @@ var _ = ginkgo.Describe("InferencePool", func() { return nil }, readyTimeout, curlInterval).Should(gomega.Succeed()) - }) }) }) diff --git a/test/integration/bbr/hermetic_test.go b/test/integration/bbr/hermetic_test.go index b99186db..07b846de 100644 --- a/test/integration/bbr/hermetic_test.go +++ b/test/integration/bbr/hermetic_test.go @@ -122,7 +122,8 @@ func TestFullDuplexStreamed_BodyBasedRouting(t *testing.T) { RawValue: []byte("foo"), }, }, - }}, + }, + }, }, }, }, @@ -187,7 +188,8 @@ func TestFullDuplexStreamed_BodyBasedRouting(t *testing.T) { RawValue: []byte("sql-lora-sheddable"), }, }, - }}, + }, + }, }, }, }, diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 79b619fd..45e99dec 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -121,7 +121,8 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { KVCacheUsagePercent: 0.2, }, }, - wantMetrics: map[string]string{`inference_model_request_total`: ` + wantMetrics: map[string]string{ + `inference_model_request_total`: ` # HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model. # TYPE inference_model_request_total counter inference_model_request_total{model_name="my-model",target_model_name="my-model-12345"} 1 @@ -153,7 +154,8 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte(strconv.Itoa(76)), }, }, - }}, + }, + }, }, }, }, @@ -237,7 +239,8 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte(strconv.Itoa(76)), }, }, - }}, + }, + }, }, }, }, @@ -321,7 +324,8 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte(strconv.Itoa(76)), }, }, - }}, + }, + }, }, }, }, @@ -454,7 +458,8 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte(strconv.Itoa(76)), }, }, - }}, + }, + }, }, }, }, @@ -565,7 +570,8 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte(strconv.Itoa(76)), }, }, - }}, + }, + }, }, }, }, @@ -676,7 +682,8 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte(strconv.Itoa(74)), }, }, - }}, + }, + }, }, }, }, @@ -924,35 +931,40 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { Request: &extProcPb.ProcessingRequest_ResponseBody{ ResponseBody: &extProcPb.HttpBody{ Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"NEVER","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false}, + EndOfStream: false, + }, }, }, { Request: &extProcPb.ProcessingRequest_ResponseBody{ ResponseBody: &extProcPb.HttpBody{ Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"GONNA","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false}, + EndOfStream: false, + }, }, }, { Request: &extProcPb.ProcessingRequest_ResponseBody{ ResponseBody: &extProcPb.HttpBody{ Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"GIVE","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false}, + EndOfStream: false, + }, }, }, { Request: &extProcPb.ProcessingRequest_ResponseBody{ ResponseBody: &extProcPb.HttpBody{ Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"YOU","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false}, + EndOfStream: false, + }, }, }, { Request: &extProcPb.ProcessingRequest_ResponseBody{ ResponseBody: &extProcPb.HttpBody{ Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"UP","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false}, + EndOfStream: false, + }, }, }, { @@ -961,14 +973,16 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}} data: [DONE]`, ), - EndOfStream: false}, + EndOfStream: false, + }, }, }, { Request: &extProcPb.ProcessingRequest_ResponseBody{ ResponseBody: &extProcPb.HttpBody{ Body: []byte(""), - EndOfStream: true}, + EndOfStream: true, + }, }, }, }, @@ -1172,7 +1186,8 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte("192.168.1.1:8000"), }, }, - }}, + }, + }, }, }, }, diff --git a/test/utils/utils.go b/test/utils/utils.go index 1ec0fbaa..e6add0b6 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -240,7 +240,6 @@ func ExecCommandInPod( podNamespace, podName, containerName string, cmd []string, ) (string, error) { - parameterCodec := runtime.NewParameterCodec(scheme) req := kubeClient.CoreV1().RESTClient(). From 3476f5949cbdcf5329d161f363d27718f683c83e Mon Sep 17 00:00:00 2001 From: Maroon Ayoub Date: Wed, 30 Apr 2025 17:13:42 +0300 Subject: [PATCH 2/6] undo gofumpt Signed-off-by: Maroon Ayoub --- cmd/epp/main.go | 1 + pkg/epp/backend/metrics/fake.go | 2 - pkg/epp/backend/metrics/metrics.go | 1 + pkg/epp/backend/metrics/metrics_test.go | 3 ++ pkg/epp/backend/metrics/pod_metrics_test.go | 2 - .../inferencemodel_reconciler_test.go | 1 + pkg/epp/datastore/datastore.go | 4 +- pkg/epp/datastore/datastore_test.go | 2 + pkg/epp/scheduling/plugins/filter/filter.go | 1 + pkg/epp/scheduling/scheduler_test.go | 1 + test/e2e/epp/e2e_test.go | 1 + test/integration/bbr/hermetic_test.go | 6 +-- test/integration/epp/hermetic_test.go | 45 +++++++------------ test/utils/utils.go | 1 + 14 files changed, 32 insertions(+), 39 deletions(-) diff --git a/cmd/epp/main.go b/cmd/epp/main.go index 3c383225..c0a87e62 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -314,4 +314,5 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge if mapping.LoraRequestInfo == nil { logger.Info("Not scraping metric: LoraRequestInfo") } + } diff --git a/pkg/epp/backend/metrics/fake.go b/pkg/epp/backend/metrics/fake.go index d1b373fd..ec97c6de 100644 --- a/pkg/epp/backend/metrics/fake.go +++ b/pkg/epp/backend/metrics/fake.go @@ -40,11 +40,9 @@ func (fpm *FakePodMetrics) String() string { func (fpm *FakePodMetrics) GetPod() *Pod { return fpm.Pod } - func (fpm *FakePodMetrics) GetMetrics() *Metrics { return fpm.Metrics } - func (fpm *FakePodMetrics) UpdatePod(pod *corev1.Pod) { fpm.Pod = toInternalPod(pod) } diff --git a/pkg/epp/backend/metrics/metrics.go b/pkg/epp/backend/metrics/metrics.go index efe847dd..96814b4b 100644 --- a/pkg/epp/backend/metrics/metrics.go +++ b/pkg/epp/backend/metrics/metrics.go @@ -47,6 +47,7 @@ func (p *PodMetricsClientImpl) FetchMetrics( existing *Metrics, port int32, ) (*Metrics, error) { + // Currently the metrics endpoint is hard-coded, which works with vLLM. // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16): Consume this from InferencePool config. url := "http://" + pod.Address + ":" + strconv.Itoa(int(port)) + "/metrics" diff --git a/pkg/epp/backend/metrics/metrics_test.go b/pkg/epp/backend/metrics/metrics_test.go index c69f2c67..e3b45b94 100644 --- a/pkg/epp/backend/metrics/metrics_test.go +++ b/pkg/epp/backend/metrics/metrics_test.go @@ -58,6 +58,7 @@ func makeMetricFamily(name string, metrics ...*dto.Metric) *dto.MetricFamily { // --- Tests --- func TestGetMetric(t *testing.T) { + metricFamilies := map[string]*dto.MetricFamily{ "metric1": makeMetricFamily("metric1", makeMetric(map[string]string{"label1": "value1"}, 1.0, 1000), @@ -165,6 +166,7 @@ func TestGetMetric(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + gotMetric, err := p.getMetric(metricFamilies, tt.spec) if tt.wantError { @@ -238,6 +240,7 @@ func TestLabelsMatch(t *testing.T) { } func TestGetLatestLoraMetric(t *testing.T) { + testCases := []struct { name string metricFamilies map[string]*dto.MetricFamily diff --git a/pkg/epp/backend/metrics/pod_metrics_test.go b/pkg/epp/backend/metrics/pod_metrics_test.go index 8d5f064a..e79c1bf0 100644 --- a/pkg/epp/backend/metrics/pod_metrics_test.go +++ b/pkg/epp/backend/metrics/pod_metrics_test.go @@ -88,12 +88,10 @@ type fakeDataStore struct{} func (f *fakeDataStore) PoolGet() (*v1alpha2.InferencePool, error) { return &v1alpha2.InferencePool{Spec: v1alpha2.InferencePoolSpec{TargetPortNumber: 8000}}, nil } - func (f *fakeDataStore) PodGetAll() []PodMetrics { // Not implemented. return nil } - func (f *fakeDataStore) PodList(func(PodMetrics) bool) []PodMetrics { // Not implemented. return nil diff --git a/pkg/epp/controller/inferencemodel_reconciler_test.go b/pkg/epp/controller/inferencemodel_reconciler_test.go index 024b6901..80c30e19 100644 --- a/pkg/epp/controller/inferencemodel_reconciler_test.go +++ b/pkg/epp/controller/inferencemodel_reconciler_test.go @@ -227,6 +227,7 @@ func TestInferenceModelReconciler(t *testing.T) { if diff := diffStore(ds, diffStoreParams{wantPool: pool, wantModels: test.wantModels}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } + }) } } diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index 630b7119..288c4d7b 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -38,7 +38,9 @@ const ( ModelNameIndexKey = "spec.modelName" ) -var errPoolNotSynced = errors.New("InferencePool is not initialized in data store") +var ( + errPoolNotSynced = errors.New("InferencePool is not initialized in data store") +) // The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api) type Datastore interface { diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index 248e95b4..b6466e6b 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -204,6 +204,7 @@ func TestModel(t *testing.T) { existing := ds.ModelDelete(types.NamespacedName{Name: model1ts.Name, Namespace: model1ts.Namespace}) got := ds.ModelGet(tsModel) return existing != nil && got == nil + }, wantOpResult: true, wantModels: []*v1alpha2.InferenceModel{model2chat}, @@ -225,6 +226,7 @@ func TestModel(t *testing.T) { if diff := testutil.DiffModelLists(test.wantModels, ds.ModelGetAll()); diff != "" { t.Errorf("Unexpected models diff: %s", diff) } + }) } } diff --git a/pkg/epp/scheduling/plugins/filter/filter.go b/pkg/epp/scheduling/plugins/filter/filter.go index a8c68ea9..86620aa9 100644 --- a/pkg/epp/scheduling/plugins/filter/filter.go +++ b/pkg/epp/scheduling/plugins/filter/filter.go @@ -214,6 +214,7 @@ var LoRAAffinityFilter = &baseFilter{ // - Filtered slice of pod metrics based on affinity and availability // - Error if any issues occur during filtering func loRASoftAffinityFilterFunc(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod { + // Pre-allocate slices with estimated capacity filtered_affinity := make([]types.Pod, 0, len(pods)) filtered_available := make([]types.Pod, 0, len(pods)) diff --git a/pkg/epp/scheduling/scheduler_test.go b/pkg/epp/scheduling/scheduler_test.go index cda65496..e6d229ae 100644 --- a/pkg/epp/scheduling/scheduler_test.go +++ b/pkg/epp/scheduling/scheduler_test.go @@ -536,6 +536,7 @@ func (tp *TestPlugin) Filter(ctx *types.SchedulingContext, pods []types.Pod) []t tp.ReceivedRequestHeaders[key] = value } return findPods(ctx, tp.FilterRes...) + } func (tp *TestPlugin) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 { diff --git a/test/e2e/epp/e2e_test.go b/test/e2e/epp/e2e_test.go index f0220b30..7240cebc 100644 --- a/test/e2e/epp/e2e_test.go +++ b/test/e2e/epp/e2e_test.go @@ -87,6 +87,7 @@ var _ = ginkgo.Describe("InferencePool", func() { return nil }, readyTimeout, curlInterval).Should(gomega.Succeed()) + }) }) }) diff --git a/test/integration/bbr/hermetic_test.go b/test/integration/bbr/hermetic_test.go index 07b846de..b99186db 100644 --- a/test/integration/bbr/hermetic_test.go +++ b/test/integration/bbr/hermetic_test.go @@ -122,8 +122,7 @@ func TestFullDuplexStreamed_BodyBasedRouting(t *testing.T) { RawValue: []byte("foo"), }, }, - }, - }, + }}, }, }, }, @@ -188,8 +187,7 @@ func TestFullDuplexStreamed_BodyBasedRouting(t *testing.T) { RawValue: []byte("sql-lora-sheddable"), }, }, - }, - }, + }}, }, }, }, diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 45e99dec..79b619fd 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -121,8 +121,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { KVCacheUsagePercent: 0.2, }, }, - wantMetrics: map[string]string{ - `inference_model_request_total`: ` + wantMetrics: map[string]string{`inference_model_request_total`: ` # HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model. # TYPE inference_model_request_total counter inference_model_request_total{model_name="my-model",target_model_name="my-model-12345"} 1 @@ -154,8 +153,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte(strconv.Itoa(76)), }, }, - }, - }, + }}, }, }, }, @@ -239,8 +237,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte(strconv.Itoa(76)), }, }, - }, - }, + }}, }, }, }, @@ -324,8 +321,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte(strconv.Itoa(76)), }, }, - }, - }, + }}, }, }, }, @@ -458,8 +454,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte(strconv.Itoa(76)), }, }, - }, - }, + }}, }, }, }, @@ -570,8 +565,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte(strconv.Itoa(76)), }, }, - }, - }, + }}, }, }, }, @@ -682,8 +676,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte(strconv.Itoa(74)), }, }, - }, - }, + }}, }, }, }, @@ -931,40 +924,35 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { Request: &extProcPb.ProcessingRequest_ResponseBody{ ResponseBody: &extProcPb.HttpBody{ Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"NEVER","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false, - }, + EndOfStream: false}, }, }, { Request: &extProcPb.ProcessingRequest_ResponseBody{ ResponseBody: &extProcPb.HttpBody{ Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"GONNA","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false, - }, + EndOfStream: false}, }, }, { Request: &extProcPb.ProcessingRequest_ResponseBody{ ResponseBody: &extProcPb.HttpBody{ Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"GIVE","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false, - }, + EndOfStream: false}, }, }, { Request: &extProcPb.ProcessingRequest_ResponseBody{ ResponseBody: &extProcPb.HttpBody{ Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"YOU","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false, - }, + EndOfStream: false}, }, }, { Request: &extProcPb.ProcessingRequest_ResponseBody{ ResponseBody: &extProcPb.HttpBody{ Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"UP","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false, - }, + EndOfStream: false}, }, }, { @@ -973,16 +961,14 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}} data: [DONE]`, ), - EndOfStream: false, - }, + EndOfStream: false}, }, }, { Request: &extProcPb.ProcessingRequest_ResponseBody{ ResponseBody: &extProcPb.HttpBody{ Body: []byte(""), - EndOfStream: true, - }, + EndOfStream: true}, }, }, }, @@ -1186,8 +1172,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { RawValue: []byte("192.168.1.1:8000"), }, }, - }, - }, + }}, }, }, }, diff --git a/test/utils/utils.go b/test/utils/utils.go index e6add0b6..1ec0fbaa 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -240,6 +240,7 @@ func ExecCommandInPod( podNamespace, podName, containerName string, cmd []string, ) (string, error) { + parameterCodec := runtime.NewParameterCodec(scheme) req := kubeClient.CoreV1().RESTClient(). From e6ca553362e642375215a09e19406161dd0c5bfd Mon Sep 17 00:00:00 2001 From: Maroon Ayoub Date: Wed, 30 Apr 2025 21:43:59 +0300 Subject: [PATCH 3/6] added scorer initialization debug msg Signed-off-by: Maroon Ayoub --- pkg/epp/scheduling/local_config.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/epp/scheduling/local_config.go b/pkg/epp/scheduling/local_config.go index 931c2848..ccb61510 100644 --- a/pkg/epp/scheduling/local_config.go +++ b/pkg/epp/scheduling/local_config.go @@ -37,11 +37,11 @@ const ( func setDefaultConfig() { // since the default config is a global variable, we add this function to minimize rebase conflicts. // this configuration is a temporary state, it should be better streamlined. - setLoadBasedScorer() + setLoadAwareScorer() setKVCacheAwareScorer() } -func setLoadBasedScorer() { +func setLoadAwareScorer() { ctx := context.Background() loggerDebug := log.FromContext(ctx).WithName("scheduler_config").V(logutil.DEBUG) @@ -61,6 +61,7 @@ func setLoadBasedScorer() { loadBasedScorerWeight = int(loadBasedScorerWeightInt64) } + loggerDebug.Info("Initialized LoadAwareScorer", "weight", loadBasedScorerWeight) defaultConfig.scorers[&scorer.LoadAwareScorer{}] = loadBasedScorerWeight } @@ -90,5 +91,6 @@ func setKVCacheAwareScorer() { kvCacheScorerWeight = int(kvCacheScorerWeightInt64) } + loggerDebug.Info("Initialized KVCacheAwareScorer", "weight", kvCacheScorerWeight) defaultConfig.scorers[kvCacheScorer] = kvCacheScorerWeight } From 388a7db26f917b61090ac45ae0def1089c23d837 Mon Sep 17 00:00:00 2001 From: Maroon Ayoub Date: Wed, 30 Apr 2025 22:41:32 +0300 Subject: [PATCH 4/6] - added debug logging - configured maxscorepicker as default Signed-off-by: Maroon Ayoub --- pkg/epp/scheduling/local_config.go | 3 +++ pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go | 2 ++ 2 files changed, 5 insertions(+) diff --git a/pkg/epp/scheduling/local_config.go b/pkg/epp/scheduling/local_config.go index ccb61510..25c2cb24 100644 --- a/pkg/epp/scheduling/local_config.go +++ b/pkg/epp/scheduling/local_config.go @@ -19,6 +19,7 @@ package scheduling import ( "context" "os" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker" "strconv" "sigs.k8s.io/controller-runtime/pkg/log" @@ -39,6 +40,8 @@ func setDefaultConfig() { // this configuration is a temporary state, it should be better streamlined. setLoadAwareScorer() setKVCacheAwareScorer() + + defaultConfig.picker = picker.NewMaxScorePicker() } func setLoadAwareScorer() { diff --git a/pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go b/pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go index ce23e2c7..171967ef 100644 --- a/pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go +++ b/pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go @@ -96,6 +96,7 @@ func (s *KVCacheAwareScorer) Score(ctx *types.SchedulingContext, pods []types.Po loggerDebug.Error(err, "Failed to get pod scores") return nil } + loggerDebug.Info("Got pod scores", "scores", scores) return indexerScoresToNormalizedScoredPods(pods, scores) } @@ -137,5 +138,6 @@ func indexerScoresToNormalizedScoredPods(pods []types.Pod, scores map[string]int scoredPods[pod] = 0.0 } } + return scoredPods } From 01f019d640b3d05a06bfe774e318434195ed9904 Mon Sep 17 00:00:00 2001 From: Maroon Ayoub Date: Wed, 30 Apr 2025 22:48:17 +0300 Subject: [PATCH 5/6] updated KVCacheAwareScorer comments Signed-off-by: Maroon Ayoub --- .../plugins/scorer/kvcache-aware-scorer.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go b/pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go index 171967ef..bc025751 100644 --- a/pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go +++ b/pkg/epp/scheduling/plugins/scorer/kvcache-aware-scorer.go @@ -36,17 +36,17 @@ const ( huggingFaceTokenEnvVar = "HF_TOKEN" ) -// KVCacheAwareScorer is a concrete implementation of the Scorer interface. -// It uses the KVCacheIndexer to score pods based on KVCache awareness. +// KVCacheAwareScorer uses the KVCacheIndexer to score pods based on KVCache +// awareness. type KVCacheAwareScorer struct { kvCacheIndexer *kvcache.Indexer } // NewKVCacheAwareScorer creates a new KVCacheAwareScorer instance. -// It initializes the KVCacheIndexer with the provided configuration, -// and runs it with the given context. +// It initializes the KVCacheIndexer from environment variables. // -// If the configuration is nil, it uses the default configuration. +// If the environment variables are not set, or if the indexer +// fails to initialize, an error is returned. func NewKVCacheAwareScorer(ctx context.Context) (plugins.Scorer, error) { config := kvcache.NewDefaultConfig() @@ -82,8 +82,7 @@ func (s *KVCacheAwareScorer) Name() string { } // Score scores the provided pod based on the KVCache index state. -// This function is not concurrent-safe and should be called in a -// single-threaded manner. +// The returned scores are normalized to a range of 0-1. func (s *KVCacheAwareScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 { loggerDebug := log.FromContext(ctx).WithName(kvCacheAwareScorerName).V(logutil.DEBUG) if ctx.Req == nil { From bc2fee3a61911462ee26af80af1678c6d913d56a Mon Sep 17 00:00:00 2001 From: Maroon Ayoub Date: Wed, 30 Apr 2025 23:02:45 +0300 Subject: [PATCH 6/6] reused envutils (review comment) Signed-off-by: Maroon Ayoub --- pkg/epp/scheduling/local_config.go | 38 +++++++----------------------- 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/pkg/epp/scheduling/local_config.go b/pkg/epp/scheduling/local_config.go index 25c2cb24..2e261a87 100644 --- a/pkg/epp/scheduling/local_config.go +++ b/pkg/epp/scheduling/local_config.go @@ -18,12 +18,10 @@ package scheduling import ( "context" - "os" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker" - "strconv" - "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer" + envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -48,31 +46,21 @@ func setLoadAwareScorer() { ctx := context.Background() loggerDebug := log.FromContext(ctx).WithName("scheduler_config").V(logutil.DEBUG) - if os.Getenv(loadAwareScorerEnablementEnvVar) != "true" { + if envutil.GetEnvString(loadAwareScorerEnablementEnvVar, "false", loggerDebug) != "true" { loggerDebug.Info("Skipping LoadAwareScorer creation as it is not enabled") return } - loadBasedScorerWeight := 1 - if weightStr := os.Getenv(loadAwareScorerWeightEnvVar); weightStr != "" { - var err error - loadBasedScorerWeightInt64, err := strconv.ParseInt(weightStr, 10, 32) - if err != nil { - loggerDebug.Error(err, "Failed to parse LOAD_BASED_SCORER_WEIGHT") - } - - loadBasedScorerWeight = int(loadBasedScorerWeightInt64) - } - - loggerDebug.Info("Initialized LoadAwareScorer", "weight", loadBasedScorerWeight) + loadBasedScorerWeight := envutil.GetEnvInt(loadAwareScorerWeightEnvVar, 1, loggerDebug) defaultConfig.scorers[&scorer.LoadAwareScorer{}] = loadBasedScorerWeight + loggerDebug.Info("Initialized LoadAwareScorer", "weight", loadBasedScorerWeight) } func setKVCacheAwareScorer() { ctx := context.Background() loggerDebug := log.FromContext(ctx).WithName("scheduler_config").V(logutil.DEBUG) - if os.Getenv(kvCacheScorerEnablementEnvVar) != "true" { + if envutil.GetEnvString(kvCacheScorerEnablementEnvVar, "false", loggerDebug) != "true" { loggerDebug.Info("Skipping KVCacheAwareScorer creation as it is not enabled") return } @@ -83,17 +71,7 @@ func setKVCacheAwareScorer() { return } - kvCacheScorerWeight := 1 - if weightStr := os.Getenv(kvCacheScorerWeightEnvVar); weightStr != "" { - var err error - kvCacheScorerWeightInt64, err := strconv.ParseInt(weightStr, 10, 32) - if err != nil { - loggerDebug.Error(err, "Failed to parse KVCACHE_SCORER_WEIGHT") - } - - kvCacheScorerWeight = int(kvCacheScorerWeightInt64) - } - - loggerDebug.Info("Initialized KVCacheAwareScorer", "weight", kvCacheScorerWeight) + kvCacheScorerWeight := envutil.GetEnvInt(kvCacheScorerWeightEnvVar, 1, loggerDebug) defaultConfig.scorers[kvCacheScorer] = kvCacheScorerWeight + loggerDebug.Info("Initialized KVCacheAwareScorer", "weight", kvCacheScorerWeight) }