diff --git a/cmd/epp/main.go b/cmd/epp/main.go index ab270c49a..5b350bb28 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -30,16 +30,12 @@ import ( "go.uber.org/zap/zapcore" "google.golang.org/grpc" healthPb "google.golang.org/grpc/health/grpc_health_v1" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/component-base/metrics/legacyregistry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" - "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/vllm" @@ -97,15 +93,9 @@ var ( "are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+ "then a self-signed certificate is used.") - scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") ) -func init() { - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(v1alpha2.AddToScheme(scheme)) -} - func main() { if err := run(); err != nil { os.Exit(1) @@ -140,9 +130,9 @@ func run() error { return err } - mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme}) + mgr, err := runserver.NewDefaultManager(*poolNamespace, *poolName, cfg) if err != nil { - setupLog.Error(err, "Failed to create controller manager", "config", cfg) + setupLog.Error(err, "Failed to create controller manager") return err } diff --git a/pkg/epp/controller/inferencemodel_reconciler.go b/pkg/epp/controller/inferencemodel_reconciler.go index 8318324ff..a7f365b79 100644 --- a/pkg/epp/controller/inferencemodel_reconciler.go +++ b/pkg/epp/controller/inferencemodel_reconciler.go @@ -21,7 +21,6 @@ import ( "fmt" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" @@ -36,7 +35,6 @@ import ( type InferenceModelReconciler struct { client.Client - Scheme *runtime.Scheme Record record.EventRecorder Datastore datastore.Datastore PoolNamespacedName types.NamespacedName @@ -128,5 +126,5 @@ func (c *InferenceModelReconciler) SetupWithManager(ctx context.Context, mgr ctr } func (c *InferenceModelReconciler) eventPredicate(infModel *v1alpha2.InferenceModel) bool { - return (infModel.Spec.PoolRef.Name == v1alpha2.ObjectName(c.PoolNamespacedName.Name)) && (infModel.GetNamespace() == c.PoolNamespacedName.Namespace) + return string(infModel.Spec.PoolRef.Name) == c.PoolNamespacedName.Name } diff --git a/pkg/epp/controller/inferencemodel_reconciler_test.go b/pkg/epp/controller/inferencemodel_reconciler_test.go index d52779197..2ac5bb1e4 100644 --- a/pkg/epp/controller/inferencemodel_reconciler_test.go +++ b/pkg/epp/controller/inferencemodel_reconciler_test.go @@ -193,7 +193,6 @@ func TestInferenceModelReconciler(t *testing.T) { datastore := datastore.NewFakeDatastore(nil, test.modelsInStore, pool) reconciler := &InferenceModelReconciler{ Client: fakeClient, - Scheme: scheme, Record: record.NewFakeRecorder(10), Datastore: datastore, PoolNamespacedName: types.NamespacedName{Name: pool.Name, Namespace: pool.Namespace}, diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index 2ad7d2bbc..880aec8c8 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -21,13 +21,11 @@ import ( "reflect" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" @@ -38,7 +36,6 @@ import ( // will have the proper controller that will create/manage objects on behalf of the server pool. type InferencePoolReconciler struct { client.Client - Scheme *runtime.Scheme Record record.EventRecorder PoolNamespacedName types.NamespacedName Datastore datastore.Datastore @@ -90,8 +87,5 @@ func (c *InferencePoolReconciler) updateDatastore(ctx context.Context, newPool * func (c *InferencePoolReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha2.InferencePool{}). - WithEventFilter(predicate.NewPredicateFuncs(func(object client.Object) bool { - return (object.GetNamespace() == c.PoolNamespacedName.Namespace) && (object.GetName() == c.PoolNamespacedName.Name) - })). Complete(c) } diff --git a/pkg/epp/controller/pod_reconciler.go b/pkg/epp/controller/pod_reconciler.go index 717d9f60e..a6c897c2f 100644 --- a/pkg/epp/controller/pod_reconciler.go +++ b/pkg/epp/controller/pod_reconciler.go @@ -22,7 +22,6 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" @@ -35,19 +34,15 @@ import ( type PodReconciler struct { client.Client Datastore datastore.Datastore - Scheme *runtime.Scheme Record record.EventRecorder } func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) - inferencePool, err := c.Datastore.PoolGet() - if err != nil { - logger.V(logutil.TRACE).Info("Skipping reconciling Pod because the InferencePool is not available yet", "error", err) + if !c.Datastore.PoolHasSynced() { + logger.V(logutil.TRACE).Info("Skipping reconciling Pod because the InferencePool is not available yet") // When the inferencePool is initialized it lists the appropriate pods and populates the datastore, so no need to requeue. return ctrl.Result{}, nil - } else if inferencePool.Namespace != req.Namespace { - return ctrl.Result{}, nil } logger.V(logutil.VERBOSE).Info("Pod being reconciled", "name", req.NamespacedName) diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go new file mode 100644 index 000000000..fd505d002 --- /dev/null +++ b/pkg/epp/server/controller_manager.go @@ -0,0 +1,73 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2" +) + +var scheme = runtime.NewScheme() + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(v1alpha2.AddToScheme(scheme)) +} + +// NewDefaultManager creates a new controller manager with default configuration. +func NewDefaultManager(namespace, name string, restConfig *rest.Config) (ctrl.Manager, error) { + manager, err := ctrl.NewManager(restConfig, ctrl.Options{ + Scheme: scheme, + Cache: cache.Options{ + ByObject: map[client.Object]cache.ByObject{ + &corev1.Pod{}: { + Namespaces: map[string]cache.Config{ + namespace: {}, + }, + }, + &v1alpha2.InferencePool{}: { + Namespaces: map[string]cache.Config{ + namespace: { + FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": name, + }), + }, + }, + }, + &v1alpha2.InferenceModel{}: { + Namespaces: map[string]cache.Config{ + namespace: {}, + }, + }, + }, + }, + }) + if err != nil { + return nil, fmt.Errorf("failed to create controller manager: %v", err) + } + return manager, nil +} diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index f3d9b6ac0..4c0a7e533 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -89,7 +89,6 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man // Create the controllers and register them with the manager if err := (&controller.InferencePoolReconciler{ Datastore: r.Datastore, - Scheme: mgr.GetScheme(), Client: mgr.GetClient(), PoolNamespacedName: types.NamespacedName{ Name: r.PoolName, @@ -102,7 +101,6 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man if err := (&controller.InferenceModelReconciler{ Datastore: r.Datastore, - Scheme: mgr.GetScheme(), Client: mgr.GetClient(), PoolNamespacedName: types.NamespacedName{ Name: r.PoolName, @@ -115,7 +113,6 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man if err := (&controller.PodReconciler{ Datastore: r.Datastore, - Scheme: mgr.GetScheme(), Client: mgr.GetClient(), Record: mgr.GetEventRecorderFor("pod"), }).SetupWithManager(mgr); err != nil { diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index 7755795b0..4454733c7 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -58,6 +58,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" extprocutils "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/test" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" @@ -490,7 +491,8 @@ func BeforeSuit(t *testing.T) func() { // Init runtime. ctrl.SetLogger(logger) - mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme}) + + mgr, err := server.NewDefaultManager("default", "vllm-llama2-7b-pool", cfg) if err != nil { logutil.Fatal(logger, err, "Failed to create controller manager") }