Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
Expand Down Expand Up @@ -227,6 +228,8 @@ func run() error {

saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)

director := requestcontrol.NewDirector(datastore, scheduler, saturationDetector) // can call "director.WithPostResponsePlugins" to add post response plugins
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I prefer adding WithPostResponsePlugins as an "Option" object and add that as an optinal argument to the NewDirector(). This is more discoverable and remove the need of this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't really need this comment :)
this comment was added just until we add the WithPostResponse usage in main.go.

I implemented it this way to keep it consistent with how scheduler plugins are defined.
In general, both patterns are commonly used in go and more specifically in Kubernetes, but personally I prefer using the With... approach which reads clearer to me and also allows adding it only when used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option pattern is optional as well, you only use when you need it. But it's more discoverable in the function signature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option pattern is optional as well, you only use when you need it.

right. I was not trying to say otherwise :).

was just making the point that both are common patterns that are widely used in the community, and personally I prefer the With.. approach, which is also aligned with what was done in Scheduler. so it keeps the plugins setup consistent across the layers.


// --- Setup ExtProc Server Runner ---
serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
Expand All @@ -237,7 +240,7 @@ func run() error {
SecureServing: *secureServing,
CertPath: *certPath,
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
Scheduler: scheduler,
Director: director,
SaturationDetector: saturationDetector,
}
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
Expand Down Expand Up @@ -79,7 +80,7 @@ type StreamingServer struct {
// Specifically, there are fields related to the ext-proc protocol, and then fields related to the lifecycle of the request.
// We should split these apart as this monolithic object exposes too much data to too many layers.
type RequestContext struct {
TargetPod string
TargetPod *backend.Pod
TargetEndpoint string
Model string
ResolvedTargetModel string
Expand All @@ -93,6 +94,8 @@ type RequestContext struct {
RequestRunning bool
Request *Request

SchedulingRequest *schedulingtypes.LLMRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many duplicated fields in LLMRequest and RequestContext. Initially the LLMRequest was scoped to the scheduling package only.

Can we move LLMRequest out of scheduling package now it has wider scope? And consolidate duplicated fields such as the ResolvedTargetModel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is in conflict with some of the comments on #845 where the discussion went to the direction that scheduler shouldn't rely on structs outside of the scheduling package.

yes, I agree there are duplicate fields.
we should probably converge such that those fields are kept in scheduling request only and removed from RequestContext.

if it's not a hard issue from you PoV, I suggest to defer it to a follow up PR since this hasn't change in this PR (this was the situation also before this PR).
I like to keep PRs tightly scoped (the scope of this PR is just the move of PostResponse out of scheduler).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add the scheduling pkg type as a parameter here so that we don't duplicate the parameters.


RequestState StreamRequestState
modelServerStreaming bool

Expand Down
19 changes: 19 additions & 0 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,18 @@ var (
[]string{"plugin_type", "plugin_name"},
)

RequestControlPluginProcessingLatencies = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: InferenceExtension,
Name: "request_control_plugin_duration_seconds",
Help: metricsutil.HelpMsgWithStability("RequestControl plugin processing latency distribution in seconds for each plugin type and plugin name.", compbasemetrics.ALPHA),
Buckets: []float64{
0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
},
},
[]string{"plugin_type", "plugin_name"},
)

// Prefix indexer Metrics
PrefixCacheSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -263,6 +275,7 @@ func Register(customCollectors ...prometheus.Collector) {
metrics.Registry.MustRegister(inferencePoolReadyPods)
metrics.Registry.MustRegister(SchedulerPluginProcessingLatencies)
metrics.Registry.MustRegister(SchedulerE2ELatency)
metrics.Registry.MustRegister(RequestControlPluginProcessingLatencies)
metrics.Registry.MustRegister(InferenceExtensionInfo)
metrics.Registry.MustRegister(PrefixCacheSize)
metrics.Registry.MustRegister(PrefixCacheHitRatio)
Expand All @@ -289,6 +302,7 @@ func Reset() {
inferencePoolReadyPods.Reset()
SchedulerPluginProcessingLatencies.Reset()
SchedulerE2ELatency.Reset()
RequestControlPluginProcessingLatencies.Reset()
InferenceExtensionInfo.Reset()
PrefixCacheSize.Reset()
PrefixCacheHitRatio.Reset()
Expand Down Expand Up @@ -400,6 +414,11 @@ func RecordSchedulerE2ELatency(duration time.Duration) {
SchedulerE2ELatency.WithLabelValues().Observe(duration.Seconds())
}

// RecordRequestControlPluginProcessingLatency records the processing latency for a request-control plugin.
func RecordRequestControlPluginProcessingLatency(pluginType, pluginName string, duration time.Duration) {
RequestControlPluginProcessingLatencies.WithLabelValues(pluginType, pluginName).Observe(duration.Seconds())
}

// RecordPrefixCacheSize records the size of the prefix indexer in megabytes.
func RecordPrefixCacheSize(size int64) {
PrefixCacheSize.WithLabelValues().Set(float64(size))
Expand Down
48 changes: 32 additions & 16 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
"fmt"
"math/rand"
"strconv"
"time"

"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
Expand All @@ -39,24 +41,32 @@ import (
// Scheduler defines the interface required by the Director for scheduling.
type Scheduler interface {
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result map[string]*schedulingtypes.Result, err error)
OnResponse(ctx context.Context, resp *schedulingtypes.LLMResponse, targetPodName string)
}

// SaturationDetector provides a signal indicating whether the backends are considered saturated.
type SaturationDetector interface {
IsSaturated(ctx context.Context) bool
}

// NewDirector creates a new Director instance with all dependencies.
// postResponsePlugins remains nil as this is an optional field that can be set using the "WithPostResponsePlugins" function.
func NewDirector(datastore datastore.Datastore, scheduler Scheduler, saturationDetector SaturationDetector) *Director {
return &Director{datastore: datastore, scheduler: scheduler, saturationDetector: saturationDetector}
}

// Director orchestrates the request handling flow, including scheduling.
type Director struct {
datastore datastore.Datastore
scheduler Scheduler
saturationDetector SaturationDetector
datastore datastore.Datastore
scheduler Scheduler
saturationDetector SaturationDetector
postResponsePlugins []PostResponsePlugin
}

// NewDirector creates a new Director instance with all dependencies.
func NewDirector(datastore datastore.Datastore, scheduler Scheduler, saturationDetector SaturationDetector) *Director {
return &Director{datastore, scheduler, saturationDetector}
// WithPostResponsePlugins sets the given plugins as the PostResponse plugins.
// If the Director has PostResponse plugins already, this call replaces the existing plugins with the given ones.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding to the list might be more appropriate?

Copy link
Contributor Author

@nirrozenbaum nirrozenbaum Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally left it out of the list, since this is an optional field.
I would like to avoid creation of empty slice (or using nil) when caller doesn't need any PostResponsePlugins.
If I add it as arg the code will look like:

director := requestcontrol.NewDirector(datastore, scheduler, detector, []PostResponsePlugin{})

OR

director := requestcontrol.NewDirector(datastore, scheduler, detector, nil)

on the other hand, since this field is optional, it is possible to initialize detector with or without it like this -
without:

director := requestcontrol.NewDirector(datastore, scheduler, detector)

with:

director := requestcontrol.NewDirector(datastore, scheduler, detector).
    WithPostResponsePlugins(plugin1, plugin2, ...)

the latter gets also the same feeling of the Scheduler plugins.

func (d *Director) WithPostResponsePlugins(plugins ...PostResponsePlugin) *Director {
d.postResponsePlugins = plugins
return d
}

// HandleRequest orchestrates the request lifecycle:
Expand Down Expand Up @@ -104,7 +114,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
}

// Prepare LLMRequest (needed for both saturation detection and Scheduler)
llmReq := &schedulingtypes.LLMRequest{
reqCtx.SchedulingRequest = &schedulingtypes.LLMRequest{
TargetModel: reqCtx.ResolvedTargetModel,
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
Critical: requestCriticality == v1alpha2.Critical,
Expand All @@ -113,7 +123,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
}
logger = logger.WithValues(
"model", reqCtx.Model,
"resolvedTargetModel", llmReq.TargetModel,
"resolvedTargetModel", reqCtx.ResolvedTargetModel,
"criticality", requestCriticality,
)
ctx = log.IntoContext(ctx, logger)
Expand All @@ -126,7 +136,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
}

// --- 3. Dispatch (Calls Scheduler) ---
results, dispatchErr := d.Dispatch(ctx, llmReq)
results, dispatchErr := d.Dispatch(ctx, reqCtx.SchedulingRequest)
if dispatchErr != nil {
return reqCtx, dispatchErr
}
Expand Down Expand Up @@ -193,22 +203,19 @@ func (d *Director) PostDispatch(ctx context.Context, reqCtx *handlers.RequestCon
endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", targetPod)

reqCtx.TargetPod = targetPod.NamespacedName.String()
reqCtx.TargetPod = targetPod
reqCtx.TargetEndpoint = endpoint

return reqCtx, nil
}

func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
logger := log.FromContext(ctx)

llmResp := &schedulingtypes.LLMResponse{
response := &Response{
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
Headers: reqCtx.Response.Headers,
}
logger.V(logutil.DEBUG).Info("LLM response assembled", "response", llmResp)

d.scheduler.OnResponse(ctx, llmResp, reqCtx.TargetPod)
d.runPostResponsePlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod)

return reqCtx, nil
}
Expand Down Expand Up @@ -253,3 +260,12 @@ func RandomWeightedDraw(logger logr.Logger, model *v1alpha2.InferenceModel, seed
}
return ""
}

func (d *Director) runPostResponsePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) {
for _, plugin := range d.postResponsePlugins {
log.FromContext(ctx).V(logutil.DEBUG).Info("Running post-response plugin", "plugin", plugin.Name())
before := time.Now()
plugin.PostResponse(ctx, request, response, targetPod)
metrics.RecordRequestControlPluginProcessingLatency(PostResponsePluginType, plugin.Name(), time.Since(before))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric is tracking post response only, why is it called request_control_plugin_duration_seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR moves PostResponse plugin from Scheduler to Requestcontrol layer. it is the first plugin in this layer out of the ones that appear in the northstar doc.

more plugins in request control are expected to be added.

}
}
88 changes: 54 additions & 34 deletions pkg/epp/requestcontrol/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
k8stypes "k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand All @@ -53,24 +54,14 @@ func (m *mockSaturationDetector) IsSaturated(_ context.Context) bool {
}

type mockScheduler struct {
scheduleResults map[string]*schedulingtypes.Result
scheduleErr error
lastRespOnResponse *schedulingtypes.LLMResponse
lastTargetPodOnResponse string
scheduleResults map[string]*schedulingtypes.Result
scheduleErr error
}

func (m *mockScheduler) Schedule(
ctx context.Context,
req *schedulingtypes.LLMRequest,
) (map[string]*schedulingtypes.Result, error) {
func (m *mockScheduler) Schedule(ctx context.Context, req *schedulingtypes.LLMRequest) (map[string]*schedulingtypes.Result, error) {
return m.scheduleResults, m.scheduleErr
}

func (m *mockScheduler) OnResponse(ctx context.Context, resp *schedulingtypes.LLMResponse, targetPodName string) {
m.lastRespOnResponse = resp
m.lastTargetPodOnResponse = targetPodName
}

func TestDirector_HandleRequest(t *testing.T) {
ctx := logutil.NewTestLoggerIntoContext(context.Background())

Expand Down Expand Up @@ -170,8 +161,11 @@ func TestDirector_HandleRequest(t *testing.T) {
wantReqCtx: &handlers.RequestContext{
Model: model,
ResolvedTargetModel: model,
TargetPod: "default/pod1",
TargetEndpoint: "192.168.1.100:8000",
TargetPod: &backend.Pod{
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
Address: "192.168.1.100",
},
TargetEndpoint: "192.168.1.100:8000",
},
wantMutatedBodyModel: model,
},
Expand All @@ -192,8 +186,11 @@ func TestDirector_HandleRequest(t *testing.T) {
wantReqCtx: &handlers.RequestContext{
Model: model,
ResolvedTargetModel: model,
TargetPod: "default/pod1",
TargetEndpoint: "192.168.1.100:8000",
TargetPod: &backend.Pod{
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
Address: "192.168.1.100",
},
TargetEndpoint: "192.168.1.100:8000",
},
wantMutatedBodyModel: model,
},
Expand All @@ -218,8 +215,11 @@ func TestDirector_HandleRequest(t *testing.T) {
wantReqCtx: &handlers.RequestContext{
Model: model,
ResolvedTargetModel: model,
TargetPod: "default/pod1",
TargetEndpoint: "192.168.1.100:8000",
TargetPod: &backend.Pod{
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
Address: "192.168.1.100",
},
TargetEndpoint: "192.168.1.100:8000",
},
wantMutatedBodyModel: model,
},
Expand All @@ -236,8 +236,11 @@ func TestDirector_HandleRequest(t *testing.T) {
wantReqCtx: &handlers.RequestContext{
Model: modelSheddable,
ResolvedTargetModel: modelSheddable,
TargetPod: "default/pod1",
TargetEndpoint: "192.168.1.100:8000",
TargetPod: &backend.Pod{
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
Address: "192.168.1.100",
},
TargetEndpoint: "192.168.1.100:8000",
},
wantMutatedBodyModel: modelSheddable,
},
Expand All @@ -254,8 +257,11 @@ func TestDirector_HandleRequest(t *testing.T) {
wantReqCtx: &handlers.RequestContext{
Model: modelWithResolvedTarget,
ResolvedTargetModel: "resolved-target-model-A",
TargetPod: "default/pod1",
TargetEndpoint: "192.168.1.100:8000",
TargetPod: &backend.Pod{
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
Address: "192.168.1.100",
},
TargetEndpoint: "192.168.1.100:8000",
},
wantMutatedBodyModel: "resolved-target-model-A",
},
Expand Down Expand Up @@ -338,12 +344,7 @@ func TestDirector_HandleRequest(t *testing.T) {
if test.schedulerMockSetup != nil {
test.schedulerMockSetup(mockSched)
}

var sd SaturationDetector
if test.mockSaturationDetector != nil {
sd = test.mockSaturationDetector
}
director := NewDirector(ds, mockSched, sd)
director := NewDirector(ds, mockSched, test.mockSaturationDetector)

reqCtx := &handlers.RequestContext{
Request: &handlers.Request{
Expand Down Expand Up @@ -513,10 +514,15 @@ func pointer(v int32) *int32 {
}

func TestDirector_HandleResponse(t *testing.T) {
pr1 := &testPostResponse{
NameRes: "pr1",
}

ctx := logutil.NewTestLoggerIntoContext(context.Background())
ds := datastore.NewDatastore(t.Context(), nil)
mockSched := &mockScheduler{}
director := NewDirector(ds, mockSched, nil)
director := NewDirector(ds, mockSched, nil).
WithPostResponsePlugins(pr1)

reqCtx := &handlers.RequestContext{
Request: &handlers.Request{
Expand All @@ -527,21 +533,35 @@ func TestDirector_HandleResponse(t *testing.T) {
Response: &handlers.Response{ // Simulate some response headers
Headers: map[string]string{"X-Test-Response-Header": "TestValue"},
},
TargetPod: "namespace1/test-pod-name",

TargetPod: &backend.Pod{NamespacedName: types.NamespacedName{Namespace: "namespace1", Name: "test-pod-name"}},
}

_, err := director.HandleResponse(ctx, reqCtx)
if err != nil {
t.Fatalf("HandleResponse() returned unexpected error: %v", err)
}

if diff := cmp.Diff("test-req-id-for-response", mockSched.lastRespOnResponse.RequestId); diff != "" {
if diff := cmp.Diff("test-req-id-for-response", pr1.lastRespOnResponse.RequestId); diff != "" {
t.Errorf("Scheduler.OnResponse RequestId mismatch (-want +got):\n%s", diff)
}
if diff := cmp.Diff(reqCtx.Response.Headers, mockSched.lastRespOnResponse.Headers); diff != "" {
if diff := cmp.Diff(reqCtx.Response.Headers, pr1.lastRespOnResponse.Headers); diff != "" {
t.Errorf("Scheduler.OnResponse Headers mismatch (-want +got):\n%s", diff)
}
if diff := cmp.Diff("namespace1/test-pod-name", mockSched.lastTargetPodOnResponse); diff != "" {
if diff := cmp.Diff("namespace1/test-pod-name", pr1.lastTargetPodOnResponse); diff != "" {
t.Errorf("Scheduler.OnResponse TargetPodName mismatch (-want +got):\n%s", diff)
}
}

type testPostResponse struct {
NameRes string
lastRespOnResponse *Response
lastTargetPodOnResponse string
}

func (p *testPostResponse) Name() string { return p.NameRes }

func (p *testPostResponse) PostResponse(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) {
p.lastRespOnResponse = response
p.lastTargetPodOnResponse = targetPod.NamespacedName.String()
}
Loading