Skip to content

Commit 93882e5

Browse files
committed
Auto-configure prefix-cache-scorer parameters from engine metrics
1 parent ef666c1 commit 93882e5

File tree

9 files changed

+110
-14
lines changed

9 files changed

+110
-14
lines changed

cmd/epp/runner/runner.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ var (
109109
kvCacheUsagePercentageMetric = flag.String("kv-cache-usage-percentage-metric", runserver.DefaultKvCacheUsagePercentageMetric, "Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).")
110110
// LoRA metrics
111111
loraInfoMetric = flag.String("lora-info-metric", runserver.DefaultLoraInfoMetric, "Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
112+
// Cache info metrics
113+
cacheInfoMetric = flag.String("cache-info-metric", runserver.DefaultCacheInfoMetric, "Prometheus metric for the cache info metrics.")
112114
// metrics related flags
113115
refreshMetricsInterval = flag.Duration("refresh-metrics-interval", runserver.DefaultRefreshMetricsInterval, "interval to refresh metrics")
114116
refreshPrometheusMetricsInterval = flag.Duration("refresh-prometheus-metrics-interval", runserver.DefaultRefreshPrometheusMetricsInterval, "interval to flush prometheus metrics")
@@ -433,6 +435,7 @@ func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
433435
*totalQueuedRequestsMetric,
434436
*kvCacheUsagePercentageMetric,
435437
*loraInfoMetric,
438+
*cacheInfoMetric,
436439
)
437440
if err != nil {
438441
setupLog.Error(err, "Failed to create metric mapping from flags.")
@@ -476,7 +479,7 @@ func setupDatalayer() (datalayer.EndpointFactory, error) {
476479
nil)
477480
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
478481
*kvCacheUsagePercentageMetric,
479-
*loraInfoMetric)
482+
*loraInfoMetric, *cacheInfoMetric)
480483

481484
if err != nil {
482485
return nil, err
@@ -561,6 +564,9 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
561564
if mapping.LoraRequestInfo == nil {
562565
logger.Info("Not scraping metric: LoraRequestInfo")
563566
}
567+
if mapping.CacheConfigInfo == nil {
568+
logger.Info("Not scraping metric: CacheConfigInfo")
569+
}
564570
}
565571

566572
// setupPprofHandlers only implements the pre-defined profiles:

pkg/epp/backend/metrics/metrics.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const (
3636
LoraInfoRunningAdaptersMetricName = "running_lora_adapters"
3737
LoraInfoWaitingAdaptersMetricName = "waiting_lora_adapters"
3838
LoraInfoMaxAdaptersMetricName = "max_lora"
39+
40+
CacheConfigBlockSizeInfoMetricName = "block_size"
3941
)
4042

4143
type PodMetricsClientImpl struct {
@@ -144,6 +146,24 @@ func (p *PodMetricsClientImpl) promToPodMetrics(
144146
}
145147
}
146148

149+
if p.MetricMapping.CacheConfigInfo != nil {
150+
cacheMetrics, err := p.getMetric(metricFamilies, *p.MetricMapping.CacheConfigInfo)
151+
if err != nil {
152+
errs = multierr.Append(errs, err)
153+
} else {
154+
for _, v := range cacheMetrics.GetLabel() {
155+
if v.GetName() == CacheConfigBlockSizeInfoMetricName {
156+
updated.CacheBlockSize, err = strconv.Atoi(v.GetValue())
157+
if err != nil {
158+
errs = multierr.Append(errs, err)
159+
} else {
160+
break
161+
}
162+
}
163+
}
164+
}
165+
}
166+
147167
return updated, errs
148168
}
149169

pkg/epp/backend/metrics/metrics_spec.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type MetricMapping struct {
3232
TotalQueuedRequests *MetricSpec
3333
KVCacheUtilization *MetricSpec
3434
LoraRequestInfo *MetricSpec
35+
CacheConfigInfo *MetricSpec
3536
}
3637

3738
// stringToMetricSpec converts a string to a MetricSpec.
@@ -93,7 +94,7 @@ func stringToMetricSpec(specStr string) (*MetricSpec, error) {
9394
}
9495

9596
// NewMetricMapping creates a MetricMapping from string values.
96-
func NewMetricMapping(queuedStr, kvUsageStr, loraReqInfoStr string) (*MetricMapping, error) {
97+
func NewMetricMapping(queuedStr, kvUsageStr, loraReqInfoStr, cacheInfoMetric string) (*MetricMapping, error) {
9798
queuedSpec, err := stringToMetricSpec(queuedStr)
9899
if err != nil {
99100
return nil, fmt.Errorf("error parsing WaitingRequests: %w", err)
@@ -106,10 +107,17 @@ func NewMetricMapping(queuedStr, kvUsageStr, loraReqInfoStr string) (*MetricMapp
106107
if err != nil {
107108
return nil, fmt.Errorf("error parsing loraReqInfoStr: %w", err)
108109
}
110+
111+
cacheInfoSpec, err := stringToMetricSpec(cacheInfoMetric)
112+
if err != nil {
113+
return nil, fmt.Errorf("error parsing cacheInfoMetric: %w", err)
114+
}
115+
109116
mapping := &MetricMapping{
110117
TotalQueuedRequests: queuedSpec,
111118
KVCacheUtilization: kvUsageSpec,
112119
LoraRequestInfo: loraReqInfoSpec,
120+
CacheConfigInfo: cacheInfoSpec,
113121
}
114122

115123
return mapping, nil

pkg/epp/datalayer/metrics.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Metrics struct {
3232
WaitingQueueSize int
3333
KVCacheUsagePercent float64
3434
KvCacheMaxTokenCapacity int
35+
CacheBlockSize int
3536

3637
// UpdateTime records the last time when the metrics were updated.
3738
UpdateTime time.Time
@@ -75,6 +76,7 @@ func (m *Metrics) Clone() *Metrics {
7576
WaitingQueueSize: m.WaitingQueueSize,
7677
KVCacheUsagePercent: m.KVCacheUsagePercent,
7778
KvCacheMaxTokenCapacity: m.KvCacheMaxTokenCapacity,
79+
CacheBlockSize: m.CacheBlockSize,
7880
UpdateTime: m.UpdateTime,
7981
}
8082
}

pkg/epp/datalayer/metrics/extractor.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ const (
3737
LoraInfoRunningAdaptersMetricName = "running_lora_adapters"
3838
LoraInfoWaitingAdaptersMetricName = "waiting_lora_adapters"
3939
LoraInfoMaxAdaptersMetricName = "max_lora"
40+
41+
CacheConfigBlockSizeInfoMetricName = "block_size"
4042
)
4143

4244
// Extractor implements the metrics extraction based on the model
@@ -49,8 +51,8 @@ type Extractor struct {
4951
// configured with the given metrics' specifications.
5052
// These are mandatory metrics per the MSP specification, and are used
5153
// as the basis for the built-in scheduling plugins.
52-
func NewExtractor(queueSpec, kvusageSpec, loraSpec string) (*Extractor, error) {
53-
mapping, err := NewMapping(queueSpec, kvusageSpec, loraSpec)
54+
func NewExtractor(queueSpec, kvusageSpec, loraSpec, cacheInfoSpec string) (*Extractor, error) {
55+
mapping, err := NewMapping(queueSpec, kvusageSpec, loraSpec, cacheInfoSpec)
5456
if err != nil {
5557
return nil, fmt.Errorf("failed to create extractor metrics Mapping - %w", err)
5658
}
@@ -111,6 +113,16 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi
111113
}
112114
}
113115

116+
if spec := ext.mapping.CacheInfo; spec != nil { // extract CacheInfo-specific metrics
117+
metric, err := spec.getLatestMetric(families)
118+
if err != nil {
119+
errs = append(errs, err)
120+
} else if metric != nil {
121+
populateCacheInfoMetrics(clone, metric, &errs)
122+
updated = true
123+
}
124+
}
125+
114126
if updated {
115127
clone.UpdateTime = time.Now()
116128
ep.UpdateMetrics(clone)
@@ -145,6 +157,23 @@ func populateLoRAMetrics(clone *datalayer.Metrics, metric *dto.Metric, errs *[]e
145157
}
146158
}
147159

160+
// populateCacheInfoMetrics updates the metrics with cache info from the metric labels.
161+
func populateCacheInfoMetrics(clone *datalayer.Metrics, metric *dto.Metric, errs *[]error) {
162+
clone.CacheBlockSize = 0
163+
for _, label := range metric.GetLabel() {
164+
if label.GetName() == CacheConfigBlockSizeInfoMetricName {
165+
if label.GetValue() != "" {
166+
if val, err := strconv.Atoi(label.GetValue()); err == nil {
167+
clone.CacheBlockSize = val
168+
break
169+
} else {
170+
*errs = append(*errs, err)
171+
}
172+
}
173+
}
174+
}
175+
}
176+
148177
// addAdapters splits a comma-separated adapter list and stores keys with default value 0.
149178
func addAdapters(m map[string]int, csv string) {
150179
for _, name := range strings.Split(csv, ",") {

pkg/epp/datalayer/metrics/mapping.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ type Mapping struct {
2626
TotalQueuedRequests *Spec
2727
KVCacheUtilization *Spec
2828
LoraRequestInfo *LoRASpec
29+
CacheInfo *Spec
2930
}
3031

3132
// NewMapping creates a metrics.Mapping from the input specification strings.
32-
func NewMapping(queue, kvusage, lora string) (*Mapping, error) {
33+
func NewMapping(queue, kvusage, lora, cacheInfo string) (*Mapping, error) {
3334
var errs []error
3435

3536
queueSpec, err := parseStringToSpec(queue)
@@ -44,12 +45,19 @@ func NewMapping(queue, kvusage, lora string) (*Mapping, error) {
4445
if err != nil {
4546
errs = append(errs, err)
4647
}
48+
49+
cacheInfoSpec, err := parseStringToSpec(cacheInfo)
50+
if err != nil {
51+
errs = append(errs, err)
52+
}
53+
4754
if len(errs) != 0 {
4855
return nil, errors.Join(errs...)
4956
}
5057
return &Mapping{
5158
TotalQueuedRequests: queueSpec,
5259
KVCacheUtilization: kvusageSpec,
5360
LoraRequestInfo: loraSpec,
61+
CacheInfo: cacheInfoSpec,
5462
}, nil
5563
}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,21 @@ const (
6161

6262
const (
6363
PodActiveCheckInterval = 2 * time.Minute
64+
65+
// An estimated average characters per token, used since the request we cached is not tokenized.
66+
averageCharactersPerToken = 4
6467
)
6568

6669
var DefaultConfig = Config{
67-
BlockSize: DefaultBlockSize,
70+
DefaultBlockSize: DefaultBlockSize,
6871
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
6972
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
7073
}
7174

7275
type Config struct {
7376
// The input prompt is broken into sizes of BlockSize to calculate block hashes . Requests
7477
// with length shorter than the block size will be ignored.
75-
BlockSize int `json:"blockSize"`
78+
DefaultBlockSize int `json:"defaultBlockSize"`
7679
// MaxPrefixBlocksToMatch is the maximum number of prefix blocks to match. Input beyond this limit will
7780
// be ignored.
7881
MaxPrefixBlocksToMatch int `json:"maxPrefixBlocksToMatch"`
@@ -190,7 +193,7 @@ func (p *Plugin) WithName(name string) *Plugin {
190193
// Score returns the scoring result for the given list of pods based on context.
191194
func (p *Plugin) Score(ctx context.Context, cycleState *types.CycleState, request *types.LLMRequest, pods []types.Pod) map[types.Pod]float64 {
192195
// pre score step, hashing prompt and find longest prefix match.
193-
hashes := hashPrompt(ctx, request, p.config.BlockSize, p.config.MaxPrefixBlocksToMatch)
196+
hashes := hashPrompt(ctx, request, getBlockSize(pods, p.config.DefaultBlockSize), p.config.MaxPrefixBlocksToMatch)
194197
state := &SchedulingContextState{
195198
PrefixHashes: hashes,
196199
PrefixCacheServers: p.matchLongestPrefix(ctx, hashes),
@@ -241,7 +244,9 @@ func (p *Plugin) PreRequest(ctx context.Context, request *types.LLMRequest, sche
241244

242245
total := len(state.PrefixHashes)
243246
matchLen := state.PrefixCacheServers[ServerID(targetPod.NamespacedName)]
244-
metrics.RecordPrefixCacheMatch(matchLen*p.config.BlockSize, total*p.config.BlockSize)
247+
248+
blockSize := getBlockSize(primaryProfileResult.TargetPods, p.config.DefaultBlockSize)
249+
metrics.RecordPrefixCacheMatch(matchLen*blockSize, total*p.config.DefaultBlockSize)
245250
}
246251

247252
// matchLongestPrefix returns a map of servers and length of prefix that each server caches.
@@ -353,3 +358,19 @@ func getUserInputBytes(request *types.LLMRequest) ([]byte, error) {
353358
// must be chat-completions request at this point, return bytes of entire messages
354359
return json.Marshal(request.Body.ChatCompletions.Messages)
355360
}
361+
362+
func getBlockSize(pods []types.Pod, defaultBlockSize int) int {
363+
if pods == nil || len(pods) == 0 {
364+
return defaultBlockSize
365+
}
366+
367+
// Since all PODs originate from the same inference pool, they are considered to have identical configurations.
368+
// Therefore, using the CacheBlockSize value from the first POD suffices.
369+
if pod := pods[0]; pod.GetMetrics() != nil {
370+
cacheBlockSize := pod.GetMetrics().CacheBlockSize
371+
if cacheBlockSize > 0 {
372+
return cacheBlockSize * averageCharactersPerToken
373+
}
374+
}
375+
return defaultBlockSize
376+
}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
k8stypes "k8s.io/apimachinery/pkg/types"
3030

3131
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
32+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3233
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
3334
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
3435
)
@@ -41,8 +42,8 @@ func TestPrefixPluginCompletion(t *testing.T) {
4142
}
4243
plugin := New(context.Background(), config)
4344

44-
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}
45-
pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}
45+
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, MetricsState: backendmetrics.NewMetricsState()}
46+
pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, MetricsState: backendmetrics.NewMetricsState()}
4647
pods := []types.Pod{pod1, pod2}
4748

4849
// First request.
@@ -207,7 +208,7 @@ func TestPrefixPluginChatCompletions(t *testing.T) {
207208
}
208209
plugin := New(context.Background(), config)
209210

210-
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}
211+
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, MetricsState: &backendmetrics.MetricsState{}}
211212
pods := []types.Pod{pod1}
212213

213214
// Test with chat completions request
@@ -241,8 +242,8 @@ func TestPrefixPluginChatCompletionsGrowth(t *testing.T) {
241242
}
242243
plugin := New(context.Background(), config)
243244

244-
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}
245-
pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}
245+
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, MetricsState: &backendmetrics.MetricsState{}}
246+
pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, MetricsState: &backendmetrics.MetricsState{}}
246247
pods := []types.Pod{pod1, pod2}
247248

248249
// First request with initial conversation

pkg/epp/server/runserver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ const (
8080
DefaultTotalQueuedRequestsMetric = "vllm:num_requests_waiting" // default for --total-queued-requests-metric
8181
DefaultKvCacheUsagePercentageMetric = "vllm:gpu_cache_usage_perc" // default for --kv-cache-usage-percentage-metric
8282
DefaultLoraInfoMetric = "vllm:lora_requests_info" // default for --lora-info-metric
83+
DefaultCacheInfoMetric = "vllm:cache_config_info" // default for --cache-info-metric
8384
DefaultCertPath = "" // default for --cert-path
8485
DefaultConfigFile = "" // default for --config-file
8586
DefaultConfigText = "" // default for --config-text

0 commit comments

Comments
 (0)