diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go
index 7f6178d6a..34db206de 100644
--- a/pkg/ext-proc/handlers/request.go
+++ b/pkg/ext-proc/handlers/request.go
@@ -19,7 +19,6 @@ package handlers
import (
"context"
"encoding/json"
- "errors"
"fmt"
"strconv"
@@ -29,6 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
+ errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)
@@ -49,14 +49,14 @@ func (s *Server) HandleRequestBody(
var rb map[string]interface{}
if err := json.Unmarshal(v.RequestBody.Body, &rb); err != nil {
logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body")
- return nil, fmt.Errorf("error unmarshaling request body: %v", err)
+ return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("error unmarshaling request body: %v", err)}
}
loggerVerbose.Info("Request body unmarshalled", "body", rb)
// Resolve target models.
model, ok := rb["model"].(string)
if !ok {
- return nil, errors.New("model not found in request")
+ return nil, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request"}
}
loggerVerbose.Info("Model requested", "model", model)
modelName := model
@@ -66,12 +66,12 @@ func (s *Server) HandleRequestBody(
// are able to be requested by using their distinct name.
modelObj, exist := s.datastore.ModelGet(model)
if !exist {
- return nil, fmt.Errorf("error finding a model object in InferenceModel for input %v", model)
+ return nil, errutil.Error{Code: errutil.BadConfiguration, Msg: fmt.Sprintf("error finding a model object in InferenceModel for input %v", model)}
}
if len(modelObj.Spec.TargetModels) > 0 {
modelName = datastore.RandomWeightedDraw(logger, modelObj, 0)
if modelName == "" {
- return nil, fmt.Errorf("error getting target model name for model %v", modelObj.Name)
+ return nil, errutil.Error{Code: errutil.BadConfiguration, Msg: fmt.Sprintf("error getting target model name for model %v", modelObj.Name)}
}
}
llmReq := &scheduling.LLMRequest{
@@ -89,14 +89,14 @@ func (s *Server) HandleRequestBody(
requestBody, err = json.Marshal(rb)
if err != nil {
logger.V(logutil.DEFAULT).Error(err, "Error marshaling request body")
- return nil, fmt.Errorf("error marshaling request body: %v", err)
+ return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)}
}
loggerVerbose.Info("Updated request body marshalled", "body", string(requestBody))
}
targetPod, err := s.scheduler.Schedule(ctx, llmReq)
if err != nil {
- return nil, fmt.Errorf("failed to find target pod: %w", err)
+ return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
}
logger.V(logutil.DEFAULT).Info("Request handled",
diff --git a/pkg/ext-proc/handlers/response.go b/pkg/ext-proc/handlers/response.go
index afe7549b3..ed3082c51 100644
--- a/pkg/ext-proc/handlers/response.go
+++ b/pkg/ext-proc/handlers/response.go
@@ -24,6 +24,7 @@ import (
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"sigs.k8s.io/controller-runtime/pkg/log"
+ errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)
@@ -38,6 +39,43 @@ func (s *Server) HandleResponseHeaders(
h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders)
loggerVerbose.Info("Headers before", "headers", h)
+ // Example header
+ // {
+ // "ResponseHeaders": {
+ // "headers": [
+ // {
+ // "key": ":status",
+ // "raw_value": "200"
+ // },
+ // {
+ // "key": "date",
+ // "raw_value": "Thu, 30 Jan 2025 18:50:48 GMT"
+ // },
+ // {
+ // "key": "server",
+ // "raw_value": "uvicorn"
+ // },
+ // {
+ // "key": "content-type",
+ // "raw_value": "text/event-stream; charset=utf-8"
+ // },
+ // {
+ // "key": "transfer-encoding",
+ // "raw_value": "chunked"
+ // }
+ // ]
+ // }
+ // }
+ for _, header := range h.ResponseHeaders.Headers.GetHeaders() {
+ if header.Key == "status" {
+ code := header.RawValue[0]
+ if string(code) != "200" {
+ reqCtx.ResponseStatusCode = errutil.ModelServerError
+ }
+ break
+ }
+ }
+
resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
ResponseHeaders: &extProcPb.HeadersResponse{
@@ -99,7 +137,7 @@ func (s *Server) HandleResponseBody(
res := Response{}
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
- return nil, fmt.Errorf("unmarshaling response body: %v", err)
+ return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
}
reqCtx.Response = res
reqCtx.ResponseSize = len(body.ResponseBody.Body)
diff --git a/pkg/ext-proc/handlers/server.go b/pkg/ext-proc/handlers/server.go
index a52742752..506eaa97d 100644
--- a/pkg/ext-proc/handlers/server.go
+++ b/pkg/ext-proc/handlers/server.go
@@ -30,6 +30,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
+ errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)
@@ -65,6 +66,18 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
// See https://github.com/envoyproxy/envoy/issues/17540.
reqCtx := &RequestContext{}
+ // Create variable for error handling as each request should only report once for
+ // error metric. This doesn't cover the error "Cannot receive stream request" because
+ // such error might happen even the response is processed.
+ var err error
+ defer func(error) {
+ if reqCtx.ResponseStatusCode != "" {
+ metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseStatusCode)
+ } else if err != nil {
+ metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, errutil.CanonicalCode(err))
+ }
+ }(err)
+
for {
select {
case <-ctx.Done():
@@ -72,11 +85,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
default:
}
- req, err := srv.Recv()
- if err == io.EOF || errors.Is(err, context.Canceled) {
+ req, recvErr := srv.Recv()
+ if recvErr == io.EOF || errors.Is(recvErr, context.Canceled) {
return nil
}
- if err != nil {
+ if recvErr != nil {
// This error occurs very frequently, though it doesn't seem to have any impact.
// TODO Figure out if we can remove this noise.
loggerVerbose.Error(err, "Cannot receive stream request")
@@ -113,12 +126,13 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
return status.Error(codes.Unknown, "unknown request type")
}
+
if err != nil {
logger.V(logutil.DEFAULT).Error(err, "Failed to process request", "request", req)
- switch status.Code(err) {
+ switch errutil.CanonicalCode(err) {
// This code can be returned by scheduler when there is no capacity for sheddable
// requests.
- case codes.ResourceExhausted:
+ case errutil.InferencePoolResourceExhausted:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
@@ -128,6 +142,38 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
},
},
}
+ // This code can be returned by when EPP processes the request and run into server-side errors.
+ case errutil.Internal:
+ resp = &extProcPb.ProcessingResponse{
+ Response: &extProcPb.ProcessingResponse_ImmediateResponse{
+ ImmediateResponse: &extProcPb.ImmediateResponse{
+ Status: &envoyTypePb.HttpStatus{
+ Code: envoyTypePb.StatusCode_InternalServerError,
+ },
+ },
+ },
+ }
+ // This code can be returned when users provide invalid json request.
+ case errutil.BadRequest:
+ resp = &extProcPb.ProcessingResponse{
+ Response: &extProcPb.ProcessingResponse_ImmediateResponse{
+ ImmediateResponse: &extProcPb.ImmediateResponse{
+ Status: &envoyTypePb.HttpStatus{
+ Code: envoyTypePb.StatusCode_BadRequest,
+ },
+ },
+ },
+ }
+ case errutil.BadConfiguration:
+ resp = &extProcPb.ProcessingResponse{
+ Response: &extProcPb.ProcessingResponse_ImmediateResponse{
+ ImmediateResponse: &extProcPb.ImmediateResponse{
+ Status: &envoyTypePb.HttpStatus{
+ Code: envoyTypePb.StatusCode_NotFound,
+ },
+ },
+ },
+ }
default:
return status.Errorf(status.Code(err), "failed to handle request: %v", err)
}
@@ -139,6 +185,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
}
}
+
}
// RequestContext stores context information during the life time of an HTTP request.
@@ -153,4 +200,5 @@ type RequestContext struct {
Response Response
ResponseSize int
ResponseComplete bool
+ ResponseStatusCode string
}
diff --git a/pkg/ext-proc/metrics/README.md b/pkg/ext-proc/metrics/README.md
index 8adfd94e9..1f68a0bdb 100644
--- a/pkg/ext-proc/metrics/README.md
+++ b/pkg/ext-proc/metrics/README.md
@@ -41,6 +41,7 @@ spec:
| Metric name | Metric Type | Description | Labels | Status |
| ------------|--------------| ----------- | ------ | ------ |
| inference_model_request_total | Counter | The counter of requests broken out for each model. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA |
+| inference_model_request_error_total | Counter | The counter of requests errors broken out for each model. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA |
| inference_model_request_duration_seconds | Distribution | Distribution of response latency. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA |
| inference_model_request_sizes | Distribution | Distribution of request size in bytes. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA |
| inference_model_response_sizes | Distribution | Distribution of response size in bytes. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA |
diff --git a/pkg/ext-proc/metrics/metrics.go b/pkg/ext-proc/metrics/metrics.go
index a396f4aef..cc21d531b 100644
--- a/pkg/ext-proc/metrics/metrics.go
+++ b/pkg/ext-proc/metrics/metrics.go
@@ -44,6 +44,16 @@ var (
[]string{"model_name", "target_model_name"},
)
+ requestErrCounter = compbasemetrics.NewCounterVec(
+ &compbasemetrics.CounterOpts{
+ Subsystem: InferenceModelComponent,
+ Name: "request_error_total",
+ Help: "Counter of inference model requests errors broken out for each model and target model.",
+ StabilityLevel: compbasemetrics.ALPHA,
+ },
+ []string{"model_name", "target_model_name", "error_code"},
+ )
+
requestLatencies = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Subsystem: InferenceModelComponent,
@@ -139,6 +149,7 @@ var registerMetrics sync.Once
func Register() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(requestCounter)
+ legacyregistry.MustRegister(requestErrCounter)
legacyregistry.MustRegister(requestLatencies)
legacyregistry.MustRegister(requestSizes)
legacyregistry.MustRegister(responseSizes)
@@ -155,6 +166,13 @@ func RecordRequestCounter(modelName, targetModelName string) {
requestCounter.WithLabelValues(modelName, targetModelName).Inc()
}
+// RecordRequestErrCounter records the number of error requests.
+func RecordRequestErrCounter(modelName, targetModelName string, code string) {
+ if code != "" {
+ requestErrCounter.WithLabelValues(modelName, targetModelName, code).Inc()
+ }
+}
+
// RecordRequestSizes records the request sizes.
func RecordRequestSizes(modelName, targetModelName string, reqSize int) {
requestSizes.WithLabelValues(modelName, targetModelName).Observe(float64(reqSize))
diff --git a/pkg/ext-proc/metrics/metrics_test.go b/pkg/ext-proc/metrics/metrics_test.go
index cf638b93c..2e891066b 100644
--- a/pkg/ext-proc/metrics/metrics_test.go
+++ b/pkg/ext-proc/metrics/metrics_test.go
@@ -24,18 +24,20 @@ import (
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
+ errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)
const (
- RequestTotalMetric = InferenceModelComponent + "_request_total"
- RequestLatenciesMetric = InferenceModelComponent + "_request_duration_seconds"
- RequestSizesMetric = InferenceModelComponent + "_request_sizes"
- ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
- InputTokensMetric = InferenceModelComponent + "_input_tokens"
- OutputTokensMetric = InferenceModelComponent + "_output_tokens"
- KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
- QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
+ RequestTotalMetric = InferenceModelComponent + "_request_total"
+ RequestErrorTotalMetric = InferenceModelComponent + "_request_error_total"
+ RequestLatenciesMetric = InferenceModelComponent + "_request_duration_seconds"
+ RequestSizesMetric = InferenceModelComponent + "_request_sizes"
+ ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
+ InputTokensMetric = InferenceModelComponent + "_input_tokens"
+ OutputTokensMetric = InferenceModelComponent + "_output_tokens"
+ KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
+ QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
)
func TestRecordRequestCounterandSizes(t *testing.T) {
@@ -107,6 +109,65 @@ func TestRecordRequestCounterandSizes(t *testing.T) {
}
}
+func TestRecordRequestErrorCounter(t *testing.T) {
+ type requests struct {
+ modelName string
+ targetModelName string
+ error string
+ }
+ scenarios := []struct {
+ name string
+ reqs []requests
+ invalid bool
+ }{{
+ name: "multiple requests",
+ reqs: []requests{
+ {
+ modelName: "m10",
+ targetModelName: "t10",
+ error: errutil.Internal,
+ },
+ {
+ modelName: "m10",
+ targetModelName: "t10",
+ error: errutil.Internal,
+ },
+ {
+ modelName: "m10",
+ targetModelName: "t11",
+ error: errutil.ModelServerError,
+ },
+ {
+ modelName: "m20",
+ targetModelName: "t20",
+ error: errutil.InferencePoolResourceExhausted,
+ },
+ },
+ },
+ }
+ Register()
+ for _, scenario := range scenarios {
+ t.Run(scenario.name, func(t *testing.T) {
+ for _, req := range scenario.reqs {
+ RecordRequestErrCounter(req.modelName, req.targetModelName, req.error)
+ }
+
+ wantRequestErrorCounter, err := os.Open("testdata/request_error_total_metric")
+ defer func() {
+ if err := wantRequestErrorCounter.Close(); err != nil {
+ t.Error(err)
+ }
+ }()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestErrorCounter, RequestErrorTotalMetric); err != nil {
+ t.Error(err)
+ }
+ })
+ }
+}
+
func TestRecordRequestLatencies(t *testing.T) {
ctx := logutil.NewTestLoggerIntoContext(context.Background())
timeBaseline := time.Now()
diff --git a/pkg/ext-proc/metrics/testdata/request_error_total_metric b/pkg/ext-proc/metrics/testdata/request_error_total_metric
new file mode 100644
index 000000000..31036eb60
--- /dev/null
+++ b/pkg/ext-proc/metrics/testdata/request_error_total_metric
@@ -0,0 +1,5 @@
+# HELP inference_model_request_error_total [ALPHA] Counter of inference model requests errors broken out for each model and target model.
+# TYPE inference_model_request_error_total counter
+inference_model_request_error_total{error_code="Internal", model_name="m10",target_model_name="t10"} 2
+inference_model_request_error_total{error_code="ModelServerError", model_name="m10",target_model_name="t11"} 1
+inference_model_request_error_total{error_code="InferencePoolResourceExhausted", model_name="m20",target_model_name="t20"} 1
diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go
index 49402fb33..b5f2f4f23 100644
--- a/pkg/ext-proc/scheduling/scheduler.go
+++ b/pkg/ext-proc/scheduling/scheduler.go
@@ -23,10 +23,9 @@ import (
"math/rand"
"github.com/go-logr/logr"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
+ errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)
@@ -102,8 +101,8 @@ var (
name: "drop request",
filter: func(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) {
logger.V(logutil.DEFAULT).Info("Request dropped", "request", req)
- return []*datastore.PodMetrics{}, status.Errorf(
- codes.ResourceExhausted, "dropping request due to limited backend resources")
+ return []*datastore.PodMetrics{}, errutil.Error{
+ Code: errutil.InferencePoolResourceExhausted, Msg: "dropping request due to limited backend resources"}
},
},
}
diff --git a/pkg/ext-proc/util/error/error.go b/pkg/ext-proc/util/error/error.go
new file mode 100644
index 000000000..2f9c992c8
--- /dev/null
+++ b/pkg/ext-proc/util/error/error.go
@@ -0,0 +1,34 @@
+package error
+
+import (
+ "fmt"
+)
+
+// Error is an error struct for errors returned by the epp server.
+type Error struct {
+ Code string
+ Msg string
+}
+
+const (
+ Unknown = "Unknown"
+ BadRequest = "BadRequest"
+ Internal = "Internal"
+ ModelServerError = "ModelServerError"
+ BadConfiguration = "BadConfiguration"
+ InferencePoolResourceExhausted = "InferencePoolResourceExhausted"
+)
+
+// Error returns a string version of the error.
+func (e Error) Error() string {
+ return fmt.Sprintf("inference gateway: %s - %s", e.Code, e.Msg)
+}
+
+// CanonicalCode returns the error's ErrorCode.
+func CanonicalCode(err error) string {
+ e, ok := err.(Error)
+ if ok {
+ return e.Code
+ }
+ return Unknown
+}