Skip to content
Merged
130 changes: 26 additions & 104 deletions pkg/ext-proc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@ import (
"net"
"net/http"
"strconv"
"time"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
healthPb "google.golang.org/grpc/health/grpc_health_v1"
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand All @@ -37,7 +33,7 @@ const (
var (
grpcPort = flag.Int(
"grpcPort",
9002,
runserver.DefaultGrpcPort,
"The gRPC port used for communicating with Envoy proxy")
grpcHealthPort = flag.Int(
"grpcHealthPort",
Expand All @@ -47,31 +43,31 @@ var (
"metricsPort", 9090, "The metrics port")
targetPodHeader = flag.String(
"targetPodHeader",
"target-pod",
runserver.DefaultTargetPodHeader,
"Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
poolName = flag.String(
"poolName",
"",
runserver.DefaultPoolName,
"Name of the InferencePool this Endpoint Picker is associated with.")
poolNamespace = flag.String(
"poolNamespace",
"default",
runserver.DefaultPoolNamespace,
"Namespace of the InferencePool this Endpoint Picker is associated with.")
serviceName = flag.String(
"serviceName",
"",
runserver.DefaultServiceName,
"Name of the Service that will be used to read EndpointSlices from")
zone = flag.String(
"zone",
"",
runserver.DefaultZone,
"The zone that this instance is created in. Will be passed to the corresponding endpointSlice. ")
refreshPodsInterval = flag.Duration(
"refreshPodsInterval",
10*time.Second,
runserver.DefaultRefreshPodsInterval,
"interval to refresh pods")
refreshMetricsInterval = flag.Duration(
"refreshMetricsInterval",
50*time.Millisecond,
runserver.DefaultRefreshMetricsInterval,
"interval to refresh metrics")

scheme = runtime.NewScheme()
Expand Down Expand Up @@ -103,71 +99,34 @@ func main() {
})
klog.Info(flags)

// Create a new manager to manage controllers
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
if err != nil {
klog.Fatalf("Failed to create controller manager: %v", err)
}

// Create the data store used to cache watched resources
datastore := backend.NewK8sDataStore()

// Create the controllers and register them with the manager
if err := (&backend.InferencePoolReconciler{
Datastore: datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
PoolNamespacedName: types.NamespacedName{
Name: *poolName,
Namespace: *poolNamespace,
},
Record: mgr.GetEventRecorderFor("InferencePool"),
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
}

if err := (&backend.InferenceModelReconciler{
Datastore: datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
PoolNamespacedName: types.NamespacedName{
Name: *poolName,
Namespace: *poolNamespace,
},
Record: mgr.GetEventRecorderFor("InferenceModel"),
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
}

if err := (&backend.EndpointSliceReconciler{
Datastore: datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Record: mgr.GetEventRecorderFor("endpointslice"),
ServiceName: *serviceName,
Zone: *zone,
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
TargetPodHeader: *targetPodHeader,
PoolName: *poolName,
PoolNamespace: *poolNamespace,
ServiceName: *serviceName,
Zone: *zone,
RefreshPodsInterval: *refreshPodsInterval,
RefreshMetricsInterval: *refreshMetricsInterval,
Scheme: scheme,
Config: ctrl.GetConfigOrDie(),
Datastore: datastore,
}
serverRunner.Setup()

// Start health and ext-proc servers in goroutines
healthSvr := startHealthServer(datastore, *grpcHealthPort)
extProcSvr := startExternalProcessorServer(
extProcSvr := serverRunner.Start(
datastore,
*grpcPort,
*refreshPodsInterval,
*refreshMetricsInterval,
*targetPodHeader,
&vllm.PodMetricsClientImpl{},
)
// Start metrics handler
metricsSvr := startMetricsHandler(*metricsPort, cfg)

// Start the controller manager. Blocking and will return when shutdown is complete.
klog.Infof("Starting controller manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
klog.Fatalf("Error starting controller manager: %v", err)
}
klog.Info("Controller manager shutting down")
// Start manager, blocking
serverRunner.StartManager()

// Gracefully shutdown servers
if healthSvr != nil {
Expand Down Expand Up @@ -209,43 +168,6 @@ func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
return svr
}

// startExternalProcessorServer starts the Envoy external processor server in a goroutine.
func startExternalProcessorServer(
datastore *backend.K8sDatastore,
port int,
refreshPodsInterval, refreshMetricsInterval time.Duration,
targetPodHeader string,
) *grpc.Server {
svr := grpc.NewServer()

go func() {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
klog.Fatalf("Ext-proc server failed to listen: %v", err)
}
klog.Infof("Ext-proc server listening on port: %d", port)

// Initialize backend provider
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
klog.Fatalf("Failed to initialize backend provider: %v", err)
}

// Register ext_proc handlers
extProcPb.RegisterExternalProcessorServer(
svr,
handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore),
)

// Blocking and will return when shutdown is complete.
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
klog.Fatalf("Ext-proc server failed: %v", err)
}
klog.Info("Ext-proc server shutting down")
}()
return svr
}

func startMetricsHandler(port int, cfg *rest.Config) *http.Server {
metrics.Register()

Expand Down
156 changes: 156 additions & 0 deletions pkg/ext-proc/server/runserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package server

import (
"fmt"
"net"
"time"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/grpc"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
klog "k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
)

// ExtProcServerRunner provides methods to manage an external process server.
type ExtProcServerRunner struct {
GrpcPort int
TargetPodHeader string
PoolName string
PoolNamespace string
ServiceName string
Zone string
RefreshPodsInterval time.Duration
RefreshMetricsInterval time.Duration
Scheme *runtime.Scheme
Config *rest.Config
Datastore *backend.K8sDatastore
manager ctrl.Manager
}

// Default values for CLI flags in main
const (
DefaultGrpcPort = 9002 // default for --grpcPort
DefaultTargetPodHeader = "target-pod" // default for --targetPodHeader
DefaultPoolName = "" // required but no default
DefaultPoolNamespace = "default" // default for --poolNamespace
DefaultServiceName = "" // required but no default
DefaultZone = "" // default for --zone
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
)

func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
return &ExtProcServerRunner{
GrpcPort: DefaultGrpcPort,
TargetPodHeader: DefaultTargetPodHeader,
PoolName: DefaultPoolName,
PoolNamespace: DefaultPoolNamespace,
ServiceName: DefaultServiceName,
Zone: DefaultZone,
RefreshPodsInterval: DefaultRefreshPodsInterval,
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
// Scheme, Config, and Datastore can be assigned later.
}
}

// Setup creates the reconcilers for pools, models, and endpointSlices and starts the manager.
func (r *ExtProcServerRunner) Setup() {
// Create a new manager to manage controllers
mgr, err := ctrl.NewManager(r.Config, ctrl.Options{Scheme: r.Scheme})
if err != nil {
klog.Fatalf("Failed to create controller manager: %v", err)
}
r.manager = mgr

// Create the controllers and register them with the manager
if err := (&backend.InferencePoolReconciler{
Datastore: r.Datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
PoolNamespacedName: types.NamespacedName{
Name: r.PoolName,
Namespace: r.PoolNamespace,
},
Record: mgr.GetEventRecorderFor("InferencePool"),
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
}

if err := (&backend.InferenceModelReconciler{
Datastore: r.Datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
PoolNamespacedName: types.NamespacedName{
Name: r.PoolName,
Namespace: r.PoolNamespace,
},
Record: mgr.GetEventRecorderFor("InferenceModel"),
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
}

if err := (&backend.EndpointSliceReconciler{
Datastore: r.Datastore,
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Record: mgr.GetEventRecorderFor("endpointslice"),
ServiceName: r.ServiceName,
Zone: r.Zone,
}).SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
}
}

// Start starts the Envoy external processor server in a goroutine.
func (r *ExtProcServerRunner) Start(
podDatastore *backend.K8sDatastore,
podMetricsClient backend.PodMetricsClient,
) *grpc.Server {
svr := grpc.NewServer()

go func() {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.GrpcPort))
if err != nil {
klog.Fatalf("Ext-proc server failed to listen: %v", err)
}
klog.Infof("Ext-proc server listening on port: %d", r.GrpcPort)

// Initialize backend provider
pp := backend.NewProvider(podMetricsClient, podDatastore)
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
klog.Fatalf("Failed to initialize backend provider: %v", err)
}

// Register ext_proc handlers
extProcPb.RegisterExternalProcessorServer(
svr,
handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetPodHeader, r.Datastore),
)

// Blocking and will return when shutdown is complete.
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
klog.Fatalf("Ext-proc server failed: %v", err)
}
klog.Info("Ext-proc server shutting down")
}()
return svr
}

func (r *ExtProcServerRunner) StartManager() {
if r.manager == nil {
klog.Fatalf("Runner has no manager setup to run: %v", r)
}
// Start the controller manager. Blocking and will return when shutdown is complete.
klog.Infof("Starting controller manager")
mgr := r.manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
klog.Fatalf("Error starting controller manager: %v", err)
}
klog.Info("Controller manager shutting down")
}
Loading