From 6ef1f7b4421d0130ced5d9761ca37d395742143f Mon Sep 17 00:00:00 2001 From: Ondrej Kupka Date: Thu, 13 Feb 2025 13:24:59 +0100 Subject: [PATCH] Move manager from runserver to main The manager setup logic is now moved to main. runserver package does not manage the manager any more. This establishes a clear separation of concerns. --- pkg/ext-proc/main.go | 34 +++++++++++++++++++---------- pkg/ext-proc/server/runserver.go | 36 +++---------------------------- test/integration/hermetic_test.go | 13 +++++++---- 3 files changed, 35 insertions(+), 48 deletions(-) diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index d51435ac5..7021fab43 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -103,11 +103,6 @@ func run() error { flag.Parse() initLogging(&opts) - cfg, err := ctrl.GetConfig() - if err != nil { - klog.ErrorS(err, "Failed to get rest config") - return err - } // Validate flags if err := validateFlags(); err != nil { klog.ErrorS(err, "Failed to validate flags") @@ -123,6 +118,20 @@ func run() error { datastore := backend.NewK8sDataStore() + // Init runtime. + cfg, err := ctrl.GetConfig() + if err != nil { + klog.ErrorS(err, "Failed to get rest config") + return err + } + + mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme}) + if err != nil { + klog.ErrorS(err, "Failed to create controller manager", "config", cfg) + return err + } + + // Setup runner. serverRunner := &runserver.ExtProcServerRunner{ GrpcPort: *grpcPort, TargetEndpointKey: *targetEndpointKey, @@ -133,15 +142,12 @@ func run() error { RefreshPodsInterval: *refreshPodsInterval, RefreshMetricsInterval: *refreshMetricsInterval, RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval, - Scheme: scheme, - Config: ctrl.GetConfigOrDie(), Datastore: datastore, } - if err := serverRunner.Setup(); err != nil { + if err := serverRunner.SetupWithManager(mgr); err != nil { klog.ErrorS(err, "Failed to setup ext-proc server") return err } - mgr := serverRunner.Manager // Register health server. if err := registerHealthServer(mgr, datastore, *grpcHealthPort); err != nil { @@ -159,8 +165,14 @@ func run() error { return err } - // Start the manager. - return serverRunner.StartManager(ctrl.SetupSignalHandler()) + // Start the manager. This blocks until a signal is received. + klog.InfoS("Controller manager starting") + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + klog.ErrorS(err, "Error starting controller manager") + return err + } + klog.InfoS("Controller manager terminated") + return nil } func initLogging(opts *zap.Options) { diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go index 71499e8f5..38965a69b 100644 --- a/pkg/ext-proc/server/runserver.go +++ b/pkg/ext-proc/server/runserver.go @@ -2,15 +2,12 @@ package server import ( "context" - "errors" "fmt" "time" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "google.golang.org/grpc" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/rest" klog "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -31,10 +28,7 @@ type ExtProcServerRunner struct { RefreshPodsInterval time.Duration RefreshMetricsInterval time.Duration RefreshPrometheusMetricsInterval time.Duration - Scheme *runtime.Scheme - Config *rest.Config Datastore *backend.K8sDatastore - Manager ctrl.Manager } // Default values for CLI flags in main @@ -61,19 +55,12 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner { RefreshPodsInterval: DefaultRefreshPodsInterval, RefreshMetricsInterval: DefaultRefreshMetricsInterval, RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, - // Scheme, Config, and Datastore can be assigned later. + // Datastore can be assigned later. } } -// Setup creates the reconcilers for pools, models, and endpointSlices and starts the manager. -func (r *ExtProcServerRunner) Setup() error { - // Create a new manager to manage controllers - mgr, err := ctrl.NewManager(r.Config, ctrl.Options{Scheme: r.Scheme}) - if err != nil { - return fmt.Errorf("failed to create controller manager: %w", err) - } - r.Manager = mgr - +// SetupWithManager sets up the runner with the given manager. +func (r *ExtProcServerRunner) SetupWithManager(mgr ctrl.Manager) error { // Create the controllers and register them with the manager if err := (&backend.InferencePoolReconciler{ Datastore: r.Datastore, @@ -139,20 +126,3 @@ func (r *ExtProcServerRunner) AsRunnable( return runnable.GRPCServer("ext-proc", srv, r.GrpcPort).Start(ctx) })) } - -func (r *ExtProcServerRunner) StartManager(ctx context.Context) error { - if r.Manager == nil { - err := errors.New("runner manager is not set") - klog.ErrorS(err, "Runner has no manager setup to run") - return err - } - - // Start the controller manager. Blocking and will return when shutdown is complete. - klog.InfoS("Controller manager starting") - if err := r.Manager.Start(ctx); err != nil { - klog.ErrorS(err, "Error starting controller manager") - return err - } - klog.InfoS("Controller manager terminated") - return nil -} diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index 74c9f0495..ff018f286 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -468,20 +468,25 @@ func BeforeSuit() { log.Fatalf("No error, but returned kubernetes client is nil, cfg: %v", cfg) } + // Init runtime. + mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme}) + if err != nil { + klog.ErrorS(err, "Failed to create controller manager") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + serverRunner = runserver.NewDefaultExtProcServerRunner() // Adjust from defaults serverRunner.PoolName = "vllm-llama2-7b-pool" - serverRunner.Scheme = scheme - serverRunner.Config = cfg serverRunner.Datastore = backend.NewK8sDataStore() - if err := serverRunner.Setup(); err != nil { + if err := serverRunner.SetupWithManager(mgr); err != nil { log.Fatalf("Failed to start server runner: %v", err) } // Start the controller manager in go routine, not blocking go func() { - if err := serverRunner.StartManager(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { log.Fatalf("Failed to start manager: %v", err) } }()