From 183ed5d1e043e7904025a81e2782e04b9e9077aa Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Mon, 27 Mar 2023 12:57:36 -0700 Subject: [PATCH 1/5] Refactor Webhook validation code - Move webhook validation code to the state package so that the package is responsible for enforcing it. As a result, it will not need require the users of that package to validate the supplied Gateway API resources, which will make it easier to use the package and reduce the number of possible bugs. Fixes https://github.com/nginxinc/nginx-kubernetes-gateway/issues/416 - Fix Webhook Validation is bypassed for existing resources when NKG starts. Fixes https://github.com/nginxinc/nginx-kubernetes-gateway/issues/433 --- internal/manager/controllers.go | 10 - internal/manager/controllers_test.go | 12 - internal/manager/manager.go | 19 +- internal/manager/validators.go | 27 -- internal/manager/validators_test.go | 77 ---- internal/reconciler/implementation.go | 28 +- internal/reconciler/implementation_test.go | 63 +--- .../reconcilerfakes/fake_event_recorder.go | 85 ----- internal/reconciler/recorder.go | 12 - internal/state/change_processor.go | 171 +++++---- internal/state/change_processor_test.go | 348 ++++++++++++++++-- internal/state/graph/graph.go | 22 +- internal/state/graph/graph_test.go | 32 +- internal/state/store.go | 299 ++++++++++++--- 14 files changed, 723 insertions(+), 482 deletions(-) delete mode 100644 internal/manager/validators.go delete mode 100644 internal/manager/validators_test.go delete mode 100644 internal/reconciler/reconcilerfakes/fake_event_recorder.go delete mode 100644 internal/reconciler/recorder.go diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go index ddb6ad8b65..5a39e00447 100644 --- a/internal/manager/controllers.go +++ b/internal/manager/controllers.go @@ -26,7 +26,6 @@ type controllerConfig struct { k8sPredicate predicate.Predicate fieldIndices index.FieldIndices newReconciler newReconcilerFunc - webhookValidator reconciler.ValidatorFunc } type controllerOption func(*controllerConfig) @@ -56,12 +55,6 @@ func withNewReconciler(newReconciler newReconcilerFunc) controllerOption { } } -func withWebhookValidator(validator reconciler.ValidatorFunc) controllerOption { - return func(cfg *controllerConfig) { - cfg.webhookValidator = validator - } -} - func defaultControllerConfig() controllerConfig { return controllerConfig{ newReconciler: reconciler.NewImplementation, @@ -73,7 +66,6 @@ func registerController( objectType client.Object, mgr manager.Manager, eventCh chan<- interface{}, - recorder reconciler.EventRecorder, options ...controllerOption, ) error { cfg := defaultControllerConfig() @@ -100,8 +92,6 @@ func registerController( ObjectType: objectType, EventCh: eventCh, NamespacedNameFilter: cfg.namespacedNameFilter, - WebhookValidator: cfg.webhookValidator, - EventRecorder: recorder, } err := builder.Complete(cfg.newReconciler(recCfg)) diff --git a/internal/manager/controllers_test.go b/internal/manager/controllers_test.go index 75a6cecf67..dc1367dcc8 100644 --- a/internal/manager/controllers_test.go +++ b/internal/manager/controllers_test.go @@ -11,7 +11,6 @@ import ( "github.com/onsi/gomega/types" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/validation/field" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/gateway-api/apis/v1beta1" @@ -21,7 +20,6 @@ import ( "github.com/nginxinc/nginx-kubernetes-gateway/internal/manager/managerfakes" "github.com/nginxinc/nginx-kubernetes-gateway/internal/manager/predicate" "github.com/nginxinc/nginx-kubernetes-gateway/internal/reconciler" - "github.com/nginxinc/nginx-kubernetes-gateway/internal/reconciler/reconcilerfakes" ) func TestRegisterController(t *testing.T) { @@ -86,12 +84,6 @@ func TestRegisterController(t *testing.T) { namespacedNameFilter := filter.CreateFilterForGatewayClass("test") fieldIndexes := index.CreateEndpointSliceFieldIndices() - webhookValidator := createValidator(func(_ *v1beta1.HTTPRoute) field.ErrorList { - return nil - }) - - eventRecorder := &reconcilerfakes.FakeEventRecorder{} - eventCh := make(chan<- interface{}) beSameFunctionPointer := func(expected interface{}) types.GomegaMatcher { @@ -109,8 +101,6 @@ func TestRegisterController(t *testing.T) { g.Expect(c.Getter).To(BeIdenticalTo(test.fakes.mgr.GetClient())) g.Expect(c.ObjectType).To(BeIdenticalTo(objectType)) g.Expect(c.EventCh).To(BeIdenticalTo(eventCh)) - g.Expect(c.EventRecorder).To(BeIdenticalTo(eventRecorder)) - g.Expect(c.WebhookValidator).Should(beSameFunctionPointer(webhookValidator)) g.Expect(c.NamespacedNameFilter).Should(beSameFunctionPointer(namespacedNameFilter)) return reconciler.NewImplementation(c) @@ -121,12 +111,10 @@ func TestRegisterController(t *testing.T) { objectType, test.fakes.mgr, eventCh, - eventRecorder, withNamespacedNameFilter(namespacedNameFilter), withK8sPredicate(predicate.ServicePortsChangedPredicate{}), withFieldIndices(fieldIndexes), withNewReconciler(newReconciler), - withWebhookValidator(webhookValidator), ) if test.expectedErr == nil { diff --git a/internal/manager/manager.go b/internal/manager/manager.go index aa5d8f1568..79cd8c11e2 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -14,7 +14,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" k8spredicate "sigs.k8s.io/controller-runtime/pkg/predicate" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" - gwapivalidation "sigs.k8s.io/gateway-api/apis/v1beta1/validation" "github.com/nginxinc/nginx-kubernetes-gateway/internal/config" "github.com/nginxinc/nginx-kubernetes-gateway/internal/events" @@ -75,21 +74,13 @@ func Start(cfg config.Config) error { objectType: &gatewayv1beta1.GatewayClass{}, options: []controllerOption{ withNamespacedNameFilter(filter.CreateFilterForGatewayClass(cfg.GatewayClassName)), - // as of v0.6.2, the Gateway API Webhook doesn't include a validation function - // for the GatewayClass resource }, }, { objectType: &gatewayv1beta1.Gateway{}, - options: []controllerOption{ - withWebhookValidator(createValidator(gwapivalidation.ValidateGateway)), - }, }, { objectType: &gatewayv1beta1.HTTPRoute{}, - options: []controllerOption{ - withWebhookValidator(createValidator(gwapivalidation.ValidateHTTPRoute)), - }, }, { objectType: &apiv1.Service{}, @@ -111,11 +102,8 @@ func Start(cfg config.Config) error { ctx := ctlr.SetupSignalHandler() - recorderName := fmt.Sprintf("nginx-kubernetes-gateway-%s", cfg.GatewayClassName) - recorder := mgr.GetEventRecorderFor(recorderName) - for _, regCfg := range controllerRegCfgs { - err := registerController(ctx, regCfg.objectType, mgr, eventCh, recorder, regCfg.options...) + err := registerController(ctx, regCfg.objectType, mgr, eventCh, regCfg.options...) if err != nil { return fmt.Errorf("cannot register controller for %T: %w", regCfg.objectType, err) } @@ -124,6 +112,9 @@ func Start(cfg config.Config) error { secretStore := secrets.NewSecretStore() secretMemoryMgr := secrets.NewSecretDiskMemoryManager(secretsFolder, secretStore) + recorderName := fmt.Sprintf("nginx-kubernetes-gateway-%s", cfg.GatewayClassName) + recorder := mgr.GetEventRecorderFor(recorderName) + processor := state.NewChangeProcessorImpl(state.ChangeProcessorConfig{ GatewayCtlrName: cfg.GatewayCtlrName, GatewayClassName: cfg.GatewayClassName, @@ -134,6 +125,8 @@ func Start(cfg config.Config) error { Validators: validation.Validators{ HTTPFieldsValidator: ngxvalidation.HTTPValidator{}, }, + EventRecorder: recorder, + Scheme: scheme, }) configGenerator := ngxcfg.NewGeneratorImpl() diff --git a/internal/manager/validators.go b/internal/manager/validators.go deleted file mode 100644 index 425bcefd2a..0000000000 --- a/internal/manager/validators.go +++ /dev/null @@ -1,27 +0,0 @@ -package manager - -import ( - "errors" - "fmt" - - "k8s.io/apimachinery/pkg/util/validation/field" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/nginxinc/nginx-kubernetes-gateway/internal/reconciler" -) - -// createValidator creates a reconciler.ValidatorFunc from a function that validates a resource of type R. -func createValidator[R client.Object](validate func(R) field.ErrorList) reconciler.ValidatorFunc { - return func(obj client.Object) error { - if obj == nil { - panic(errors.New("obj is nil")) - } - - r, ok := obj.(R) - if !ok { - panic(fmt.Errorf("obj type mismatch: got %T, expected %T", obj, r)) - } - - return validate(r).ToAggregate() - } -} diff --git a/internal/manager/validators_test.go b/internal/manager/validators_test.go deleted file mode 100644 index f6423244fd..0000000000 --- a/internal/manager/validators_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package manager - -import ( - "testing" - - . "github.com/onsi/gomega" - "k8s.io/apimachinery/pkg/util/validation/field" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/gateway-api/apis/v1beta1" -) - -func TestCreateTypedValidator(t *testing.T) { - tests := []struct { - name string - obj client.Object - errorList field.ErrorList - expectPanic bool - expectErr bool - }{ - { - obj: &v1beta1.HTTPRoute{}, - errorList: field.ErrorList{}, - expectPanic: false, - expectErr: false, - name: "no errors", - }, - { - obj: &v1beta1.HTTPRoute{}, - errorList: []*field.Error{{Detail: "test"}}, - expectPanic: false, - expectErr: true, - name: "one error", - }, - { - obj: nil, - errorList: field.ErrorList{}, - expectPanic: true, - expectErr: false, - name: "nil object", - }, - { - obj: &v1beta1.Gateway{}, - errorList: field.ErrorList{}, - expectPanic: true, - expectErr: false, - name: "wrong object type", - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - g := NewGomegaWithT(t) - - v := createValidator(createValidateHTTPRouteThatReturns(test.errorList)) - - if test.expectPanic { - g.Expect(func() { _ = v(test.obj) }).To(Panic()) - return - } - - result := v(test.obj) - - if test.expectErr { - g.Expect(result).ToNot(BeNil()) - return - } - - g.Expect(result).To(BeNil()) - }) - } -} - -func createValidateHTTPRouteThatReturns(errorList field.ErrorList) func(*v1beta1.HTTPRoute) field.ErrorList { - return func(*v1beta1.HTTPRoute) field.ErrorList { - return errorList - } -} diff --git a/internal/reconciler/implementation.go b/internal/reconciler/implementation.go index 38a568e8fa..44a9d1b1b7 100644 --- a/internal/reconciler/implementation.go +++ b/internal/reconciler/implementation.go @@ -5,7 +5,6 @@ import ( "fmt" "reflect" - apiv1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -19,9 +18,6 @@ import ( // If the function returns false, the reconciler will log the returned string. type NamespacedNameFilterFunc func(nsname types.NamespacedName) (bool, string) -// ValidatorFunc validates a Kubernetes resource. -type ValidatorFunc func(object client.Object) error - // Config contains the configuration for the Implementation. type Config struct { // Getter gets a resource from the k8s API. @@ -32,10 +28,6 @@ type Config struct { EventCh chan<- interface{} // NamespacedNameFilter filters resources the controller will process. Can be nil. NamespacedNameFilter NamespacedNameFilterFunc - // WebhookValidator validates a resource using the same rules as in the Gateway API Webhook. Can be nil. - WebhookValidator ValidatorFunc - // EventRecorder records event about resources. - EventRecorder EventRecorder } // Implementation is a reconciler for Kubernetes resources. @@ -66,12 +58,6 @@ func newObject(objectType client.Object) client.Object { return reflect.New(t).Interface().(client.Object) } -const ( - webhookValidationErrorLogMsg = "Rejected the resource because the Gateway API webhook failed to reject it with " + - "a validation error; make sure the webhook is installed and running correctly; " + - "NKG will delete any existing NGINX configuration that corresponds to the resource" -) - // Reconcile implements the reconcile.Reconciler Reconcile method. func (r *Implementation) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { logger := log.FromContext(ctx) @@ -98,22 +84,10 @@ func (r *Implementation) Reconcile(ctx context.Context, req reconcile.Request) ( obj = nil } - var validationError error - if obj != nil && r.cfg.WebhookValidator != nil { - validationError = r.cfg.WebhookValidator(obj) - } - - if validationError != nil { - logger.Error(validationError, webhookValidationErrorLogMsg) - r.cfg.EventRecorder.Eventf(obj, apiv1.EventTypeWarning, "Rejected", - webhookValidationErrorLogMsg+"; validation error: %v", validationError) - } - var e interface{} var op string - if obj == nil || validationError != nil { - // In case of a validation error, we handle the resource as if it was deleted. + if obj == nil { e = &events.DeleteEvent{ Type: r.cfg.ObjectType, NamespacedName: req.NamespacedName, diff --git a/internal/reconciler/implementation_test.go b/internal/reconciler/implementation_test.go index 952c07e703..e96b0f2927 100644 --- a/internal/reconciler/implementation_test.go +++ b/internal/reconciler/implementation_test.go @@ -56,13 +56,6 @@ var _ = Describe("Reconciler", func() { Name: hr2NsName.Name, }, } - - hr2IsInvalidValidator = func(obj client.Object) error { - if client.ObjectKeyFromObject(obj) == hr2NsName { - return errors.New("test") - } - return nil - } ) getReturnsHRForHR := func(hr *v1beta1.HTTPRoute) getFunc { @@ -206,63 +199,14 @@ var _ = Describe("Reconciler", func() { }) }) }) - - When("Reconciler includes a Webhook Validator", func() { - var fakeRecorder *reconcilerfakes.FakeEventRecorder - - BeforeEach(func() { - fakeRecorder = &reconcilerfakes.FakeEventRecorder{} - - rec = reconciler.NewImplementation(reconciler.Config{ - Getter: fakeGetter, - ObjectType: &v1beta1.HTTPRoute{}, - EventCh: eventCh, - WebhookValidator: hr2IsInvalidValidator, - EventRecorder: fakeRecorder, - }) - }) - - It("should upsert valid HTTPRoute", func() { - testUpsert(hr1) - Expect(fakeRecorder.EventfCallCount()).To(Equal(0)) - }) - - It("should reject invalid HTTPRoute", func() { - fakeGetter.GetCalls(getReturnsHRForHR(hr2)) - - resultCh := startReconciling(client.ObjectKeyFromObject(hr2)) - - Eventually(eventCh).Should(Receive(Equal(&events.DeleteEvent{ - NamespacedName: client.ObjectKeyFromObject(hr2), - Type: &v1beta1.HTTPRoute{}, - }))) - Eventually(resultCh).Should(Receive(Equal(result{err: nil, reconcileResult: reconcile.Result{}}))) - - Expect(fakeRecorder.EventfCallCount()).To(Equal(1)) - obj, _, _, _, _ := fakeRecorder.EventfArgsForCall(0) - Expect(obj).To(Equal(hr2)) - }) - - It("should delete HTTPRoutes", func() { - testDelete(hr1) - testDelete(hr2) - Expect(fakeRecorder.EventfCallCount()).To(Equal(0)) - }) - }) }) Describe("Edge cases", func() { - var fakeRecorder *reconcilerfakes.FakeEventRecorder - BeforeEach(func() { - fakeRecorder = &reconcilerfakes.FakeEventRecorder{} - rec = reconciler.NewImplementation(reconciler.Config{ - Getter: fakeGetter, - ObjectType: &v1beta1.HTTPRoute{}, - EventCh: eventCh, - WebhookValidator: hr2IsInvalidValidator, - EventRecorder: fakeRecorder, + Getter: fakeGetter, + ObjectType: &v1beta1.HTTPRoute{}, + EventCh: eventCh, }) }) @@ -287,7 +231,6 @@ var _ = Describe("Reconciler", func() { Consistently(eventCh).ShouldNot(Receive()) Expect(resultCh).To(Receive(Equal(result{err: nil, reconcileResult: reconcile.Result{}}))) - Expect(fakeRecorder.EventfCallCount()).To(Equal(invalidResourceEventCount)) }, Entry("Upserting valid HTTPRoute", getReturnsHRForHR(hr1), 0, hr1NsName), Entry("Deleting valid HTTPRoute", getReturnsNotFoundErrorForHR(hr1), 0, hr1NsName), diff --git a/internal/reconciler/reconcilerfakes/fake_event_recorder.go b/internal/reconciler/reconcilerfakes/fake_event_recorder.go deleted file mode 100644 index 33f459eb85..0000000000 --- a/internal/reconciler/reconcilerfakes/fake_event_recorder.go +++ /dev/null @@ -1,85 +0,0 @@ -// Code generated by counterfeiter. DO NOT EDIT. -package reconcilerfakes - -import ( - "sync" - - "github.com/nginxinc/nginx-kubernetes-gateway/internal/reconciler" - "k8s.io/apimachinery/pkg/runtime" -) - -type FakeEventRecorder struct { - EventfStub func(runtime.Object, string, string, string, ...interface{}) - eventfMutex sync.RWMutex - eventfArgsForCall []struct { - arg1 runtime.Object - arg2 string - arg3 string - arg4 string - arg5 []interface{} - } - invocations map[string][][]interface{} - invocationsMutex sync.RWMutex -} - -func (fake *FakeEventRecorder) Eventf(arg1 runtime.Object, arg2 string, arg3 string, arg4 string, arg5 ...interface{}) { - fake.eventfMutex.Lock() - fake.eventfArgsForCall = append(fake.eventfArgsForCall, struct { - arg1 runtime.Object - arg2 string - arg3 string - arg4 string - arg5 []interface{} - }{arg1, arg2, arg3, arg4, arg5}) - stub := fake.EventfStub - fake.recordInvocation("Eventf", []interface{}{arg1, arg2, arg3, arg4, arg5}) - fake.eventfMutex.Unlock() - if stub != nil { - fake.EventfStub(arg1, arg2, arg3, arg4, arg5...) - } -} - -func (fake *FakeEventRecorder) EventfCallCount() int { - fake.eventfMutex.RLock() - defer fake.eventfMutex.RUnlock() - return len(fake.eventfArgsForCall) -} - -func (fake *FakeEventRecorder) EventfCalls(stub func(runtime.Object, string, string, string, ...interface{})) { - fake.eventfMutex.Lock() - defer fake.eventfMutex.Unlock() - fake.EventfStub = stub -} - -func (fake *FakeEventRecorder) EventfArgsForCall(i int) (runtime.Object, string, string, string, []interface{}) { - fake.eventfMutex.RLock() - defer fake.eventfMutex.RUnlock() - argsForCall := fake.eventfArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 -} - -func (fake *FakeEventRecorder) Invocations() map[string][][]interface{} { - fake.invocationsMutex.RLock() - defer fake.invocationsMutex.RUnlock() - fake.eventfMutex.RLock() - defer fake.eventfMutex.RUnlock() - copiedInvocations := map[string][][]interface{}{} - for key, value := range fake.invocations { - copiedInvocations[key] = value - } - return copiedInvocations -} - -func (fake *FakeEventRecorder) recordInvocation(key string, args []interface{}) { - fake.invocationsMutex.Lock() - defer fake.invocationsMutex.Unlock() - if fake.invocations == nil { - fake.invocations = map[string][][]interface{}{} - } - if fake.invocations[key] == nil { - fake.invocations[key] = [][]interface{}{} - } - fake.invocations[key] = append(fake.invocations[key], args) -} - -var _ reconciler.EventRecorder = new(FakeEventRecorder) diff --git a/internal/reconciler/recorder.go b/internal/reconciler/recorder.go deleted file mode 100644 index 954124a1eb..0000000000 --- a/internal/reconciler/recorder.go +++ /dev/null @@ -1,12 +0,0 @@ -package reconciler - -import "k8s.io/apimachinery/pkg/runtime" - -//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . EventRecorder - -// EventRecorder records events for a resource. -// It allows us to mock the record.EventRecorder.Eventf method. -type EventRecorder interface { - // Eventf is a method of k8s.io/client-go/tools/record.EventRecorder - Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) -} diff --git a/internal/state/change_processor.go b/internal/state/change_processor.go index 9961f7820a..5a07d14d39 100644 --- a/internal/state/change_processor.go +++ b/internal/state/change_processor.go @@ -6,12 +6,18 @@ import ( "sync" "github.com/go-logr/logr" - v1 "k8s.io/api/core/v1" + apiv1 "k8s.io/api/core/v1" discoveryV1 "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/gateway-api/apis/v1beta1" + gwapivalidation "sigs.k8s.io/gateway-api/apis/v1beta1/validation" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/state/dataplane" "github.com/nginxinc/nginx-kubernetes-gateway/internal/state/graph" "github.com/nginxinc/nginx-kubernetes-gateway/internal/state/relationship" @@ -20,8 +26,15 @@ import ( "github.com/nginxinc/nginx-kubernetes-gateway/internal/state/validation" ) +const ( + webhookValidationErrorLogMsg = "the resource failed webhook validation, however the Gateway API webhook " + + "failed to reject it with the error; make sure the webhook is installed and running correctly" +) + //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . ChangeProcessor +type extractGVKFunc func(obj client.Object) schema.GroupVersionKind + // ChangeProcessor processes the changes to resources producing the internal representation // of the Gateway configuration. It only supports one GatewayClass resource. type ChangeProcessor interface { @@ -52,6 +65,10 @@ type ChangeProcessorConfig struct { Validators validation.Validators // Logger is the logger for this Change Processor. Logger logr.Logger + // EventRecorder records events for Kubernetes resources. + EventRecorder record.EventRecorder + // Scheme is the a Kubernetes scheme. + Scheme *runtime.Scheme // GatewayCtlrName is the name of the Gateway controller. GatewayCtlrName string // GatewayClassName is the name of the GatewayClass resource. @@ -60,23 +77,93 @@ type ChangeProcessorConfig struct { // ChangeProcessorImpl is an implementation of ChangeProcessor. type ChangeProcessorImpl struct { - store *store - cfg ChangeProcessorConfig + // clusterState holds the current state of the cluster + clusterState graph.ClusterState + // updater acts upon the cluster state. + updater Updater + // getAndResetClusterStateChanged tells if the cluster state has changed. + getAndResetClusterStateChanged func() bool - // changed is true if any changes that were captured require an update to nginx. - // It is true if the store changed, or if a Kubernetes resource (e.g. - // Service, EndpointSlice) that is related to a Gateway API resource (e.g. Gateway, HTTPRoute) changed. - // It is reset to false after Process is called. - changed bool + cfg ChangeProcessorConfig lock sync.Mutex } // NewChangeProcessorImpl creates a new ChangeProcessorImpl for the Gateway resource with the configured namespace name. func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl { + clusterStore := graph.ClusterState{ + GatewayClasses: make(map[types.NamespacedName]*v1beta1.GatewayClass), + Gateways: make(map[types.NamespacedName]*v1beta1.Gateway), + HTTPRoutes: make(map[types.NamespacedName]*v1beta1.HTTPRoute), + Services: make(map[types.NamespacedName]*apiv1.Service), + } + + extractGVK := func(obj client.Object) schema.GroupVersionKind { + gvk, err := apiutil.GVKForObject(obj, cfg.Scheme) + if err != nil { + panic(fmt.Errorf("failed to get GVK for object %T: %w", obj, err)) + } + return gvk + } + + trackingUpdater := newChangeTrackingUpdater( + cfg.RelationshipCapturer, + extractGVK, + []changeTrackingUpdaterObjectTypeCfg{ + { + gvk: extractGVK(&v1beta1.GatewayClass{}), + store: newObjectStoreMapAdapter(clusterStore.GatewayClasses), + trackUpsertDelete: true, + }, + { + gvk: extractGVK(&v1beta1.Gateway{}), + store: newObjectStoreMapAdapter(clusterStore.Gateways), + trackUpsertDelete: true, + }, + { + gvk: extractGVK(&v1beta1.HTTPRoute{}), + store: newObjectStoreMapAdapter(clusterStore.HTTPRoutes), + trackUpsertDelete: true, + }, + { + gvk: extractGVK(&apiv1.Service{}), + store: newObjectStoreMapAdapter(clusterStore.Services), + trackUpsertDelete: false, + }, + { + gvk: extractGVK(&discoveryV1.EndpointSlice{}), + store: nil, + trackUpsertDelete: false, + }, + }, + ) + + updater := newValidatingUpsertUpdater( + trackingUpdater, + cfg.EventRecorder, + []upsertValidatorFunc{ + func(obj client.Object) error { + var err error + switch o := obj.(type) { + case *v1beta1.Gateway: + err = gwapivalidation.ValidateGateway(o).ToAggregate() + case *v1beta1.HTTPRoute: + err = gwapivalidation.ValidateHTTPRoute(o).ToAggregate() + } + + if err != nil { + return fmt.Errorf(webhookValidationErrorLogMsg+"; validation error: %w", err) + } + + return nil + }, + }) + return &ChangeProcessorImpl{ - store: newStore(), - cfg: cfg, + cfg: cfg, + getAndResetClusterStateChanged: trackingUpdater.getAndResetChangedStatus, + updater: updater, + clusterState: clusterStore, } } @@ -86,60 +173,24 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl { // belong to the NGINX Gateway or an HTTPRoute that doesn't belong to any of the Gateways of the NGINX Gateway. // Find a way to ignore changes that don't affect the configuration and/or statuses of the resources. +// FIXME(pleshakov) +// Remove CaptureUpsertChange() and CaptureDeleteChange() from ChangeProcessor and pass all changes directly to +// Process() instead. As a result, the clients will only need to call Process(), which will simplify them. +// Now the clients make a combination of CaptureUpsertChange() and CaptureDeleteChange() calls followed by a call to +// Process(). + func (c *ChangeProcessorImpl) CaptureUpsertChange(obj client.Object) { c.lock.Lock() defer c.lock.Unlock() - c.cfg.RelationshipCapturer.Capture(obj) - - switch o := obj.(type) { - case *v1beta1.GatewayClass: - c.store.captureGatewayClassChange(o, c.cfg.GatewayClassName) - case *v1beta1.Gateway: - c.store.captureGatewayChange(o) - case *v1beta1.HTTPRoute: - c.store.captureHTTPRouteChange(o) - case *v1.Service: - c.store.captureServiceChange(o) - case *discoveryV1.EndpointSlice: - break - default: - panic(fmt.Errorf("ChangeProcessor doesn't support %T", obj)) - } - - c.changed = c.changed || c.store.changed || c.cfg.RelationshipCapturer.Exists(obj, client.ObjectKeyFromObject(obj)) + c.updater.Upsert(obj) } func (c *ChangeProcessorImpl) CaptureDeleteChange(resourceType client.Object, nsname types.NamespacedName) { c.lock.Lock() defer c.lock.Unlock() - switch resourceType.(type) { - case *v1beta1.GatewayClass: - if nsname.Name != c.cfg.GatewayClassName { - panic(fmt.Errorf("gatewayclass resource must be %s, got %s", c.cfg.GatewayClassName, nsname.Name)) - } - if c.store.gc != nil { - c.store.changed = true - } - c.store.gc = nil - case *v1beta1.Gateway: - _, c.store.changed = c.store.gateways[nsname] - delete(c.store.gateways, nsname) - case *v1beta1.HTTPRoute: - _, c.store.changed = c.store.httpRoutes[nsname] - delete(c.store.httpRoutes, nsname) - case *v1.Service: - delete(c.store.services, nsname) - case *discoveryV1.EndpointSlice: - break - default: - panic(fmt.Errorf("ChangeProcessor doesn't support %T", resourceType)) - } - - c.changed = c.changed || c.store.changed || c.cfg.RelationshipCapturer.Exists(resourceType, nsname) - - c.cfg.RelationshipCapturer.Remove(resourceType, nsname) + c.updater.Delete(resourceType, nsname) } func (c *ChangeProcessorImpl) Process( @@ -148,20 +199,12 @@ func (c *ChangeProcessorImpl) Process( c.lock.Lock() defer c.lock.Unlock() - if !c.changed { + if !c.getAndResetClusterStateChanged() { return false, conf, statuses } - c.store.changed = false - c.changed = false - g := graph.BuildGraph( - graph.ClusterStore{ - GatewayClass: c.store.gc, - Gateways: c.store.gateways, - HTTPRoutes: c.store.httpRoutes, - Services: c.store.services, - }, + c.clusterState, c.cfg.GatewayCtlrName, c.cfg.GatewayClassName, c.cfg.SecretMemoryManager, diff --git a/internal/state/change_processor_test.go b/internal/state/change_processor_test.go index 4312187d44..00a383bb0e 100644 --- a/internal/state/change_processor_test.go +++ b/internal/state/change_processor_test.go @@ -9,7 +9,10 @@ import ( apiv1 "k8s.io/api/core/v1" discoveryV1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/gateway-api/apis/v1alpha2" @@ -145,6 +148,7 @@ func createHTTPRule(path string, backendRefs ...v1beta1.HTTPBackendRef) v1beta1. Matches: []v1beta1.HTTPRouteMatch{ { Path: &v1beta1.HTTPPathMatch{ + Type: helpers.GetPointer(v1beta1.PathMatchPathPrefix), Value: &path, }, }, @@ -164,6 +168,7 @@ func createBackendRef( Kind: kind, Name: name, Namespace: namespace, + Port: helpers.GetPointer[v1beta1.PortNumber](80), }, }, } @@ -177,6 +182,33 @@ func createAlwaysValidValidators() validation.Validators { } } +func createScheme() *runtime.Scheme { + scheme := runtime.NewScheme() + + utilruntime.Must(v1beta1.AddToScheme(scheme)) + utilruntime.Must(apiv1.AddToScheme(scheme)) + utilruntime.Must(discoveryV1.AddToScheme(scheme)) + + return scheme +} + +func assertStatuses(expected, result state.Statuses) { + sortConditions := func(statuses state.HTTPRouteStatuses) { + for _, status := range statuses { + for _, ps := range status.ParentStatuses { + sort.Slice(ps.Conditions, func(i, j int) bool { + return ps.Conditions[i].Type < ps.Conditions[j].Type + }) + } + } + } + + sortConditions(expected.HTTPRouteStatuses) + sortConditions(result.HTTPRouteStatuses) + + ExpectWithOffset(1, helpers.Diff(expected, result)).To(BeEmpty()) +} + // FIXME(kate-osborn): Consider refactoring these tests to reduce code duplication. var _ = Describe("ChangeProcessor", func() { Describe("Normal cases of processing changes", func() { @@ -204,6 +236,7 @@ var _ = Describe("ChangeProcessor", func() { RelationshipCapturer: relationship.NewCapturerImpl(), Logger: zap.New(), Validators: createAlwaysValidValidators(), + Scheme: createScheme(), }) fakeSecretMemoryMgr.RequestReturns(certificatePath, nil) @@ -245,23 +278,6 @@ var _ = Describe("ChangeProcessor", func() { gw2 = createGatewayWithTLSListener("gateway-2") }) - assertStatuses := func(expected, result state.Statuses) { - sortConditions := func(statuses state.HTTPRouteStatuses) { - for _, status := range statuses { - for _, ps := range status.ParentStatuses { - sort.Slice(ps.Conditions, func(i, j int) bool { - return ps.Conditions[i].Type < ps.Conditions[j].Type - }) - } - } - } - - sortConditions(expected.HTTPRouteStatuses) - sortConditions(result.HTTPRouteStatuses) - - ExpectWithOffset(1, helpers.Diff(expected, result)).To(BeEmpty()) - } - When("no upsert has occurred", func() { It("returns empty configuration and statuses", func() { changed, conf, statuses := processor.Process(context.TODO()) @@ -1590,6 +1606,7 @@ var _ = Describe("ChangeProcessor", func() { SecretMemoryManager: fakeSecretMemoryMgr, RelationshipCapturer: fakeRelationshipCapturer, Validators: createAlwaysValidValidators(), + Scheme: createScheme(), }) gcNsName = types.NamespacedName{Name: "my-class"} @@ -1876,6 +1893,292 @@ var _ = Describe("ChangeProcessor", func() { }) }) + Describe("Webhook validation cases", Ordered, func() { + var ( + processor state.ChangeProcessor + fakeEventRecorder *record.FakeRecorder + fakeSecretMemoryMgr *secretsfakes.FakeSecretDiskMemoryManager + + gc *v1beta1.GatewayClass + + gwNsName, hrNsName types.NamespacedName + gw, gwInvalid *v1beta1.Gateway + hr, hrInvalid *v1beta1.HTTPRoute + ) + BeforeAll(func() { + fakeSecretMemoryMgr = &secretsfakes.FakeSecretDiskMemoryManager{} + fakeEventRecorder = record.NewFakeRecorder(2 /* number of buffered events */) + + processor = state.NewChangeProcessorImpl(state.ChangeProcessorConfig{ + GatewayCtlrName: controllerName, + GatewayClassName: gcName, + SecretMemoryManager: fakeSecretMemoryMgr, + RelationshipCapturer: relationship.NewCapturerImpl(), + Logger: zap.New(), + Validators: createAlwaysValidValidators(), + EventRecorder: fakeEventRecorder, + Scheme: createScheme(), + }) + + gc = &v1beta1.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: gcName, + Generation: 1, + }, + Spec: v1beta1.GatewayClassSpec{ + ControllerName: controllerName, + }, + } + + gwNsName = types.NamespacedName{Namespace: "test", Name: "gateway"} + hrNsName = types.NamespacedName{Namespace: "test", Name: "hr"} + + gw = &v1beta1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: gwNsName.Namespace, + Name: gwNsName.Name, + }, + Spec: v1beta1.GatewaySpec{ + GatewayClassName: gcName, + Listeners: []v1beta1.Listener{ + { + Name: "listener-80-1", + Hostname: helpers.GetPointer[v1beta1.Hostname]("foo.example.com"), + Port: 80, + Protocol: v1beta1.HTTPProtocolType, + }, + }, + }, + } + + gwInvalid = gw.DeepCopy() + // cannot have hostname for TCP protocol + gwInvalid.Spec.Listeners[0].Protocol = v1beta1.TCPProtocolType + + hr = &v1beta1.HTTPRoute{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: hrNsName.Namespace, + Name: hrNsName.Name, + }, + Spec: v1beta1.HTTPRouteSpec{ + CommonRouteSpec: v1beta1.CommonRouteSpec{ + ParentRefs: []v1beta1.ParentReference{ + { + Namespace: (*v1beta1.Namespace)(&gw.Namespace), + Name: v1beta1.ObjectName(gw.Name), + SectionName: (*v1beta1.SectionName)( + helpers.GetStringPointer("listener-80-1"), + ), + }, + }, + }, + Hostnames: []v1beta1.Hostname{ + "foo.example.com", + }, + Rules: []v1beta1.HTTPRouteRule{ + { + Matches: []v1beta1.HTTPRouteMatch{ + { + Path: &v1beta1.HTTPPathMatch{ + Type: (*v1beta1.PathMatchType)(helpers.GetStringPointer(string(v1beta1.PathMatchPathPrefix))), + Value: helpers.GetStringPointer("/"), + }, + }, + }, + }, + }, + }, + } + + hrInvalid = hr.DeepCopy() + hrInvalid.Spec.Rules[0].Matches[0].Path.Type = nil // cannot be nil + }) + + assertHREvent := func() { + e := <-fakeEventRecorder.Events + ExpectWithOffset(1, e).To(ContainSubstring("Rejected")) + ExpectWithOffset(1, e).To(ContainSubstring("spec.rules[0].matches[0].path.type")) + } + + assertGwEvent := func() { + e := <-fakeEventRecorder.Events + ExpectWithOffset(1, e).To(ContainSubstring("Rejected")) + ExpectWithOffset(1, e).To(ContainSubstring("spec.listeners[0].hostname")) + } + + It("should process GatewayClass", func() { + processor.CaptureUpsertChange(gc) + + expectedConf := dataplane.Configuration{} + expectedStatuses := state.Statuses{ + GatewayClassStatus: &state.GatewayClassStatus{ + ObservedGeneration: gc.Generation, + Conditions: conditions.NewDefaultGatewayClassConditions(), + }, + IgnoredGatewayStatuses: map[types.NamespacedName]state.IgnoredGatewayStatus{}, + HTTPRouteStatuses: map[types.NamespacedName]state.HTTPRouteStatus{}, + } + + changed, conf, statuses := processor.Process(context.TODO()) + Expect(changed).To(BeTrue()) + Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) + assertStatuses(expectedStatuses, statuses) + Expect(fakeEventRecorder.Events).To(HaveLen(0)) + }) + + It("should ignore invalid resources", func() { + processor.CaptureUpsertChange(gwInvalid) + processor.CaptureUpsertChange(hrInvalid) + + expectedConf := dataplane.Configuration{} + expectedStatuses := state.Statuses{} + + changed, conf, statuses := processor.Process(context.TODO()) + + Expect(changed).To(BeFalse()) + Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) + assertStatuses(expectedStatuses, statuses) + + Expect(fakeEventRecorder.Events).To(HaveLen(2)) + assertGwEvent() + assertHREvent() + }) + + It("should process valid resources", func() { + processor.CaptureUpsertChange(gw) + processor.CaptureUpsertChange(hr) + + bg := graph.BackendGroup{ + Source: types.NamespacedName{Namespace: hr.Namespace, Name: hr.Name}, + RuleIdx: 0, + } + + expectedConf := dataplane.Configuration{ + HTTPServers: []dataplane.VirtualServer{ + { + IsDefault: true, + }, + { + Hostname: "foo.example.com", + PathRules: []dataplane.PathRule{ + { + Path: "/", + MatchRules: []dataplane.MatchRule{ + { + MatchIdx: 0, + RuleIdx: 0, + BackendGroup: bg, + Source: hr, + }, + }, + }, + }, + }, + }, + SSLServers: []dataplane.VirtualServer{}, + BackendGroups: []graph.BackendGroup{bg}, + } + expectedStatuses := state.Statuses{ + GatewayClassStatus: &state.GatewayClassStatus{ + ObservedGeneration: gc.Generation, + Conditions: conditions.NewDefaultGatewayClassConditions(), + }, + GatewayStatus: &state.GatewayStatus{ + NsName: gwNsName, + ObservedGeneration: gw.Generation, + ListenerStatuses: map[string]state.ListenerStatus{ + "listener-80-1": { + AttachedRoutes: 1, + Conditions: conditions.NewDefaultListenerConditions(), + }, + }, + }, + IgnoredGatewayStatuses: map[types.NamespacedName]state.IgnoredGatewayStatus{}, + HTTPRouteStatuses: map[types.NamespacedName]state.HTTPRouteStatus{ + hrNsName: { + ObservedGeneration: hr.Generation, + ParentStatuses: map[string]state.ParentStatus{ + "listener-80-1": { + Conditions: conditions.NewDefaultRouteConditions(), + }, + }, + }, + }, + } + + changed, conf, statuses := processor.Process(context.TODO()) + + Expect(changed).To(BeTrue()) + Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) + assertStatuses(expectedStatuses, statuses) + + Expect(fakeEventRecorder.Events).To(HaveLen(0)) + }) + + It("should delete invalid HTTPRoute", func() { + processor.CaptureUpsertChange(hrInvalid) + + expectedConf := dataplane.Configuration{ + HTTPServers: []dataplane.VirtualServer{ + { + IsDefault: true, + }, + }, + SSLServers: []dataplane.VirtualServer{}, + } + expectedStatuses := state.Statuses{ + GatewayClassStatus: &state.GatewayClassStatus{ + ObservedGeneration: gc.Generation, + Conditions: conditions.NewDefaultGatewayClassConditions(), + }, + GatewayStatus: &state.GatewayStatus{ + NsName: gwNsName, + ObservedGeneration: gw.Generation, + ListenerStatuses: map[string]state.ListenerStatus{ + "listener-80-1": { + AttachedRoutes: 0, + Conditions: conditions.NewDefaultListenerConditions(), + }, + }, + }, + IgnoredGatewayStatuses: map[types.NamespacedName]state.IgnoredGatewayStatus{}, + HTTPRouteStatuses: map[types.NamespacedName]state.HTTPRouteStatus{}, + } + + changed, conf, statuses := processor.Process(context.TODO()) + + Expect(changed).To(BeTrue()) + Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) + assertStatuses(expectedStatuses, statuses) + + Expect(fakeEventRecorder.Events).To(HaveLen(1)) + assertHREvent() + }) + + It("should delete invalid Gateway", func() { + processor.CaptureUpsertChange(gwInvalid) + + expectedConf := dataplane.Configuration{} + expectedStatuses := state.Statuses{ + GatewayClassStatus: &state.GatewayClassStatus{ + ObservedGeneration: gc.Generation, + Conditions: conditions.NewDefaultGatewayClassConditions(), + }, + IgnoredGatewayStatuses: map[types.NamespacedName]state.IgnoredGatewayStatus{}, + HTTPRouteStatuses: map[types.NamespacedName]state.HTTPRouteStatus{}, + } + + changed, conf, statuses := processor.Process(context.TODO()) + + Expect(changed).To(BeTrue()) + Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) + assertStatuses(expectedStatuses, statuses) + + Expect(fakeEventRecorder.Events).To(HaveLen(1)) + assertGwEvent() + }) + }) + Describe("Edge cases with panic", func() { var ( processor state.ChangeProcessor @@ -1893,6 +2196,7 @@ var _ = Describe("ChangeProcessor", func() { SecretMemoryManager: fakeSecretMemoryMgr, RelationshipCapturer: fakeRelationshipCapturer, Validators: createAlwaysValidValidators(), + Scheme: createScheme(), }) }) @@ -1908,8 +2212,8 @@ var _ = Describe("ChangeProcessor", func() { &v1alpha2.TCPRoute{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "tcp"}}, ), Entry( - "a wrong gatewayclass", - &v1beta1.GatewayClass{ObjectMeta: metav1.ObjectMeta{Name: "wrong-class"}}, + "nil resource", + nil, ), ) @@ -1927,9 +2231,9 @@ var _ = Describe("ChangeProcessor", func() { types.NamespacedName{Namespace: "test", Name: "tcp"}, ), Entry( - "a wrong gatewayclass", - &v1beta1.GatewayClass{}, - types.NamespacedName{Name: "wrong-class"}, + "nil resource type", + nil, + types.NamespacedName{Namespace: "test", Name: "resource"}, ), ) }) diff --git a/internal/state/graph/graph.go b/internal/state/graph/graph.go index bed5138477..981ac3332b 100644 --- a/internal/state/graph/graph.go +++ b/internal/state/graph/graph.go @@ -9,12 +9,12 @@ import ( "github.com/nginxinc/nginx-kubernetes-gateway/internal/state/validation" ) -// ClusterStore includes cluster resources necessary to build the Graph. -type ClusterStore struct { - GatewayClass *v1beta1.GatewayClass - Gateways map[types.NamespacedName]*v1beta1.Gateway - HTTPRoutes map[types.NamespacedName]*v1beta1.HTTPRoute - Services map[types.NamespacedName]*v1.Service +// ClusterState includes cluster resources necessary to build the Graph. +type ClusterState struct { + GatewayClasses map[types.NamespacedName]*v1beta1.GatewayClass + Gateways map[types.NamespacedName]*v1beta1.Gateway + HTTPRoutes map[types.NamespacedName]*v1beta1.HTTPRoute + Services map[types.NamespacedName]*v1.Service } // Graph is a Graph-like representation of Gateway API resources. @@ -31,19 +31,21 @@ type Graph struct { Routes map[types.NamespacedName]*Route } -// BuildGraph builds a Graph from a store. +// BuildGraph builds a Graph from a state. func BuildGraph( - store ClusterStore, + store ClusterState, controllerName string, gcName string, secretMemoryMgr secrets.SecretDiskMemoryManager, validators validation.Validators, ) *Graph { - if !gatewayClassBelongsToController(store.GatewayClass, controllerName) { + gatewayClass := store.GatewayClasses[types.NamespacedName{Name: gcName}] + + if !gatewayClassBelongsToController(gatewayClass, controllerName) { return &Graph{} } - gc := buildGatewayClass(store.GatewayClass) + gc := buildGatewayClass(gatewayClass) processedGws := processGateways(store.Gateways, gcName) diff --git a/internal/state/graph/graph_test.go b/internal/state/graph/graph_test.go index 67bd2883e0..277f2ea619 100644 --- a/internal/state/graph/graph_test.go +++ b/internal/state/graph/graph_test.go @@ -156,20 +156,22 @@ func TestBuildGraph(t *testing.T) { svc := &v1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"}} - createStoreWithGatewayClass := func(gc *v1beta1.GatewayClass) ClusterStore { - return ClusterStore{ - GatewayClass: gc, + createStateWithGatewayClass := func(gc *v1beta1.GatewayClass) ClusterState { + return ClusterState{ + GatewayClasses: map[types.NamespacedName]*v1beta1.GatewayClass{ + client.ObjectKeyFromObject(gc): gc, + }, Gateways: map[types.NamespacedName]*v1beta1.Gateway{ - {Namespace: "test", Name: "gateway-1"}: gw1, - {Namespace: "test", Name: "gateway-2"}: gw2, + client.ObjectKeyFromObject(gw1): gw1, + client.ObjectKeyFromObject(gw2): gw2, }, HTTPRoutes: map[types.NamespacedName]*v1beta1.HTTPRoute{ - {Namespace: "test", Name: "hr-1"}: hr1, - {Namespace: "test", Name: "hr-2"}: hr2, - {Namespace: "test", Name: "hr-3"}: hr3, + client.ObjectKeyFromObject(hr1): hr1, + client.ObjectKeyFromObject(hr2): hr2, + client.ObjectKeyFromObject(hr3): hr3, }, Services: map[types.NamespacedName]*v1.Service{ - {Namespace: "test", Name: "foo"}: svc, + client.ObjectKeyFromObject(svc): svc, }, } } @@ -251,28 +253,34 @@ func TestBuildGraph(t *testing.T) { } normalGC := &v1beta1.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: gcName, + }, Spec: v1beta1.GatewayClassSpec{ ControllerName: controllerName, }, } differentControllerGC := &v1beta1.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: gcName, + }, Spec: v1beta1.GatewayClassSpec{ ControllerName: "different-controller", }, } tests := []struct { - store ClusterStore + store ClusterState expected *Graph name string }{ { - store: createStoreWithGatewayClass(normalGC), + store: createStateWithGatewayClass(normalGC), expected: createExpectedGraphWithGatewayClass(normalGC), name: "normal case", }, { - store: createStoreWithGatewayClass(differentControllerGC), + store: createStateWithGatewayClass(differentControllerGC), expected: &Graph{}, name: "gatewayclass belongs to a different controller", }, diff --git a/internal/state/store.go b/internal/state/store.go index 89fa369a26..33e8eaefe9 100644 --- a/internal/state/store.go +++ b/internal/state/store.go @@ -3,83 +3,280 @@ package state import ( "fmt" - v1 "k8s.io/api/core/v1" + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/nginxinc/nginx-kubernetes-gateway/internal/state/relationship" ) -// store contains the resources that represent the state of the Gateway. -type store struct { - gc *v1beta1.GatewayClass - gateways map[types.NamespacedName]*v1beta1.Gateway - httpRoutes map[types.NamespacedName]*v1beta1.HTTPRoute - services map[types.NamespacedName]*v1.Service - - // changed tells if the store is changed. - // The store is considered changed if: - // (1) Any of its resources was deleted. - // (2) A new resource was upserted. - // (3) An existing resource with the updated Generation was upserted. +// Updater updates the cluster state. +type Updater interface { + Upsert(obj client.Object) + Delete(objType client.Object, nsname types.NamespacedName) +} + +// objectStore is a store of client.Object +type objectStore interface { + get(nsname types.NamespacedName) (client.Object, bool) + upsert(obj client.Object) + delete(nsname types.NamespacedName) +} + +// objectStoreMapAdapter wraps maps of types.NamespacedName to Kubernetes resources +// (e.g. map[types.NamespacedName]*v1beta1.Gateway) so that they can be used through objectStore interface. +type objectStoreMapAdapter[T client.Object] struct { + objects map[types.NamespacedName]T +} + +func newObjectStoreMapAdapter[T client.Object](objects map[types.NamespacedName]T) *objectStoreMapAdapter[T] { + return &objectStoreMapAdapter[T]{ + objects: objects, + } +} + +func (m *objectStoreMapAdapter[T]) get(nsname types.NamespacedName) (client.Object, bool) { + obj, exist := m.objects[nsname] + return obj, exist +} + +func (m *objectStoreMapAdapter[T]) upsert(obj client.Object) { + t, ok := obj.(T) + if !ok { + panic(fmt.Errorf("obj type mismatch: got %T, expected %T", obj, t)) + } + m.objects[client.ObjectKeyFromObject(obj)] = t +} + +func (m *objectStoreMapAdapter[T]) delete(nsname types.NamespacedName) { + delete(m.objects, nsname) +} + +type gvkList []schema.GroupVersionKind + +func (list gvkList) contains(gvk schema.GroupVersionKind) bool { + for _, g := range list { + if gvk == g { + return true + } + } + + return false +} + +type multiObjectStore struct { + stores map[schema.GroupVersionKind]objectStore + extractGVK extractGVKFunc +} + +func newMultiObjectStore( + stores map[schema.GroupVersionKind]objectStore, + extractGVK extractGVKFunc, +) *multiObjectStore { + return &multiObjectStore{ + stores: stores, + extractGVK: extractGVK, + } +} + +func (m *multiObjectStore) mustFindStoreForObj(obj client.Object) objectStore { + objGVK := m.extractGVK(obj) + + store, exist := m.stores[objGVK] + if !exist { + panic(fmt.Errorf("object store for %T %v not found", obj, client.ObjectKeyFromObject(obj))) + } + + return store +} + +func (m *multiObjectStore) get(objType client.Object, nsname types.NamespacedName) (client.Object, bool) { + return m.mustFindStoreForObj(objType).get(nsname) +} + +func (m *multiObjectStore) upsert(obj client.Object) { + m.mustFindStoreForObj(obj).upsert(obj) +} + +func (m *multiObjectStore) delete(objType client.Object, nsname types.NamespacedName) { + m.mustFindStoreForObj(objType).delete(nsname) +} + +type changeTrackingUpdaterObjectTypeCfg struct { + // store holds the objects of the gvk. If the store is nil, the objects of the gvk are not persisted. + store objectStore + gvk schema.GroupVersionKind + // trackUpsertDelete indicates whether an upsert or delete of an object with the gvk results into a change to + // the changeTrackingUpdater's store. Note that for an upsert, the generation of a new object must be different + // from the generation of the previous version, otherwise such an upsert is not considered a change. + trackUpsertDelete bool +} + +// changeTrackingUpdater is an Updater that tracks changes to the cluster state in the multiObjectStore. +// +// It only works with objects with the GVKs registered in changeTrackingUpdaterObjectTypeCfg. Otherwise, it panics. +// +// A change is tracked when: +// - An object with a GVK with a non-nil store and trackUpsertDelete set to 'true' is upserted or deleted, provided +// that its generation changed. +// - An object is upserted or deleted, and it is related to another object, based on the decision by +// the relationship capturer. +type changeTrackingUpdater struct { + store *multiObjectStore + capturer relationship.Capturer + + extractGVK extractGVKFunc + supportedGKVs gvkList + trackedUpsertDeleteGKVs gvkList + persistedGKVs gvkList + changed bool } -func newStore() *store { - return &store{ - gateways: make(map[types.NamespacedName]*v1beta1.Gateway), - httpRoutes: make(map[types.NamespacedName]*v1beta1.HTTPRoute), - services: make(map[types.NamespacedName]*v1.Service), +func newChangeTrackingUpdater( + capturer relationship.Capturer, + extractGVK extractGVKFunc, + objectTypeCfgs []changeTrackingUpdaterObjectTypeCfg, +) *changeTrackingUpdater { + var ( + supportedGKVs gvkList + trackedUpsertDeleteGKVs gvkList + persistedGKVs gvkList + + stores = make(map[schema.GroupVersionKind]objectStore) + ) + + for _, cfg := range objectTypeCfgs { + supportedGKVs = append(supportedGKVs, cfg.gvk) + + if cfg.trackUpsertDelete { + trackedUpsertDeleteGKVs = append(trackedUpsertDeleteGKVs, cfg.gvk) + } + + if cfg.store != nil { + persistedGKVs = append(persistedGKVs, cfg.gvk) + stores[cfg.gvk] = cfg.store + } + } + + return &changeTrackingUpdater{ + store: newMultiObjectStore(stores, extractGVK), + extractGVK: extractGVK, + supportedGKVs: supportedGKVs, + trackedUpsertDeleteGKVs: trackedUpsertDeleteGKVs, + persistedGKVs: persistedGKVs, + capturer: capturer, } } -func (s *store) captureGatewayClassChange(gc *v1beta1.GatewayClass, gwClassName string) { - resourceChanged := true +func (s *changeTrackingUpdater) assertSupportedGVK(gvk schema.GroupVersionKind) { + if !s.supportedGKVs.contains(gvk) { + panic(fmt.Errorf("unsupported GVK %v", gvk)) + } +} - if gc.Name != gwClassName { - panic(fmt.Errorf("gatewayclass resource must be %s, got %s", gwClassName, gc.Name)) +func (s *changeTrackingUpdater) upsert(obj client.Object) (changed bool) { + if !s.persistedGKVs.contains(s.extractGVK(obj)) { + return false } - // if the resource spec hasn't changed (its generation is the same), ignore the upsert - if s.gc != nil && s.gc.Generation == gc.Generation { - resourceChanged = false + oldObj, exist := s.store.get(obj, client.ObjectKeyFromObject(obj)) + s.store.upsert(obj) + + if !s.trackedUpsertDeleteGKVs.contains(s.extractGVK(obj)) { + return false } - s.gc = gc + return !exist || obj.GetGeneration() != oldObj.GetGeneration() +} + +func (s *changeTrackingUpdater) Upsert(obj client.Object) { + s.assertSupportedGVK(s.extractGVK(obj)) - s.changed = s.changed || resourceChanged + changingUpsert := s.upsert(obj) + s.capturer.Capture(obj) + + s.changed = s.changed || changingUpsert || s.capturer.Exists(obj, client.ObjectKeyFromObject(obj)) } -func (s *store) captureGatewayChange(gw *v1beta1.Gateway) { - resourceChanged := true +func (s *changeTrackingUpdater) delete(objType client.Object, nsname types.NamespacedName) (changed bool) { + objTypeGVK := s.extractGVK(objType) + + if !s.persistedGKVs.contains(objTypeGVK) { + return false + } - // if the resource spec hasn't changed (its generation is the same), ignore the upsert - prev, exist := s.gateways[client.ObjectKeyFromObject(gw)] - if exist && gw.Generation == prev.Generation { - resourceChanged = false + _, exist := s.store.get(objType, nsname) + if !exist { + return false } + s.store.delete(objType, nsname) + + return s.trackedUpsertDeleteGKVs.contains(objTypeGVK) +} - s.gateways[client.ObjectKeyFromObject(gw)] = gw +func (s *changeTrackingUpdater) Delete(objType client.Object, nsname types.NamespacedName) { + s.assertSupportedGVK(s.extractGVK(objType)) + + changingDelete := s.delete(objType, nsname) + + s.changed = s.changed || changingDelete || s.capturer.Exists(objType, nsname) + + s.capturer.Remove(objType, nsname) +} - s.changed = s.changed || resourceChanged +// getAndResetChangedStatus returns true if the previous updates (Upserts/Deletes) require an update of +// the configuration of the data plane. It also resets the changed status to false. +func (s *changeTrackingUpdater) getAndResetChangedStatus() bool { + changed := s.changed + s.changed = false + return changed } -func (s *store) captureHTTPRouteChange(hr *v1beta1.HTTPRoute) { - resourceChanged := true - // if the resource spec hasn't changed (its generation is the same), ignore the upsert - prev, exist := s.httpRoutes[client.ObjectKeyFromObject(hr)] - if exist && hr.Generation == prev.Generation { - resourceChanged = false +type upsertValidatorFunc func(obj client.Object) error + +// validatingUpsertUpdater is an Updater that validates an object before upserting it. +// If the validation fails, it deletes the object and records an event with the validation error. +type validatingUpsertUpdater struct { + updater Updater + eventRecorder record.EventRecorder + upsertValidators []upsertValidatorFunc +} + +func newValidatingUpsertUpdater( + updater Updater, + eventRecorder record.EventRecorder, + upsertValidators []upsertValidatorFunc, +) *validatingUpsertUpdater { + return &validatingUpsertUpdater{ + updater: updater, + eventRecorder: eventRecorder, + upsertValidators: upsertValidators, + } +} + +func (u *validatingUpsertUpdater) Upsert(obj client.Object) { + for _, validator := range u.upsertValidators { + if err := validator(obj); err != nil { + u.updater.Delete(obj, client.ObjectKeyFromObject(obj)) + + u.eventRecorder.Eventf( + obj, + apiv1.EventTypeWarning, + "Rejected", + "%s; NKG will delete any existing NGINX configuration that corresponds to the resource", + err.Error(), + ) + + return + } } - s.httpRoutes[client.ObjectKeyFromObject(hr)] = hr - s.changed = s.changed || resourceChanged + u.updater.Upsert(obj) } -// Service changes are treated differently than Gateway API resource changes in the following ways: -// (1) We don't check generation here because services do not use generation, and Service Controller filters upsert -// events based on the Service ports. This means we will only receive upsert events for Services with port changes. -// (2) We don't set the store's changed value to true because we don't want to trigger a reload on every Service change. -// We will rely on the relationship.Capturer to trigger a reload when necessary. -func (s *store) captureServiceChange(svc *v1.Service) { - s.services[client.ObjectKeyFromObject(svc)] = svc +func (u *validatingUpsertUpdater) Delete(objType client.Object, nsname types.NamespacedName) { + u.updater.Delete(objType, nsname) } From c6fae80ba4e4285e8c7ecada19a8a607f9e9ff91 Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Tue, 4 Apr 2023 10:57:47 -0700 Subject: [PATCH 2/5] Add a note here about the GatewayClass not being covered --- internal/state/change_processor.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/state/change_processor.go b/internal/state/change_processor.go index 5a07d14d39..c0db307812 100644 --- a/internal/state/change_processor.go +++ b/internal/state/change_processor.go @@ -143,8 +143,14 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl { cfg.EventRecorder, []upsertValidatorFunc{ func(obj client.Object) error { + // Add the validation for Gateway API resources which the webhook validates + var err error switch o := obj.(type) { + // We don't validate GatewayClass, because as of 0.6.2, the webhook doesn't validate it (it only + // validates an update that requires the previous version of the resource, + // which NKG cannot reliably provide - for example, after NKG restarts). + // https://github.com/kubernetes-sigs/gateway-api/blob/v0.6.2/apis/v1beta1/validation/gatewayclass.go#L28 case *v1beta1.Gateway: err = gwapivalidation.ValidateGateway(o).ToAggregate() case *v1beta1.HTTPRoute: From 073dd263ab82d38457cbf0488e665a699541e15e Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Tue, 4 Apr 2023 11:11:51 -0700 Subject: [PATCH 3/5] Improve descriptions in the unit tests --- internal/state/change_processor_test.go | 236 ++++++++++++------------ 1 file changed, 122 insertions(+), 114 deletions(-) diff --git a/internal/state/change_processor_test.go b/internal/state/change_processor_test.go index 00a383bb0e..28a5226ffe 100644 --- a/internal/state/change_processor_test.go +++ b/internal/state/change_processor_test.go @@ -2026,156 +2026,164 @@ var _ = Describe("ChangeProcessor", func() { Expect(fakeEventRecorder.Events).To(HaveLen(0)) }) - It("should ignore invalid resources", func() { - processor.CaptureUpsertChange(gwInvalid) - processor.CaptureUpsertChange(hrInvalid) + When("resources are invalid", func() { + It("should not process them", func() { + processor.CaptureUpsertChange(gwInvalid) + processor.CaptureUpsertChange(hrInvalid) - expectedConf := dataplane.Configuration{} - expectedStatuses := state.Statuses{} + expectedConf := dataplane.Configuration{} + expectedStatuses := state.Statuses{} - changed, conf, statuses := processor.Process(context.TODO()) + changed, conf, statuses := processor.Process(context.TODO()) - Expect(changed).To(BeFalse()) - Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) - assertStatuses(expectedStatuses, statuses) + Expect(changed).To(BeFalse()) + Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) + assertStatuses(expectedStatuses, statuses) - Expect(fakeEventRecorder.Events).To(HaveLen(2)) - assertGwEvent() - assertHREvent() + Expect(fakeEventRecorder.Events).To(HaveLen(2)) + assertGwEvent() + assertHREvent() + }) }) - It("should process valid resources", func() { - processor.CaptureUpsertChange(gw) - processor.CaptureUpsertChange(hr) + When("resources are valid", func() { + It("should process them", func() { + processor.CaptureUpsertChange(gw) + processor.CaptureUpsertChange(hr) - bg := graph.BackendGroup{ - Source: types.NamespacedName{Namespace: hr.Namespace, Name: hr.Name}, - RuleIdx: 0, - } + bg := graph.BackendGroup{ + Source: types.NamespacedName{Namespace: hr.Namespace, Name: hr.Name}, + RuleIdx: 0, + } - expectedConf := dataplane.Configuration{ - HTTPServers: []dataplane.VirtualServer{ - { - IsDefault: true, - }, - { - Hostname: "foo.example.com", - PathRules: []dataplane.PathRule{ - { - Path: "/", - MatchRules: []dataplane.MatchRule{ - { - MatchIdx: 0, - RuleIdx: 0, - BackendGroup: bg, - Source: hr, + expectedConf := dataplane.Configuration{ + HTTPServers: []dataplane.VirtualServer{ + { + IsDefault: true, + }, + { + Hostname: "foo.example.com", + PathRules: []dataplane.PathRule{ + { + Path: "/", + MatchRules: []dataplane.MatchRule{ + { + MatchIdx: 0, + RuleIdx: 0, + BackendGroup: bg, + Source: hr, + }, }, }, }, }, }, - }, - SSLServers: []dataplane.VirtualServer{}, - BackendGroups: []graph.BackendGroup{bg}, - } - expectedStatuses := state.Statuses{ - GatewayClassStatus: &state.GatewayClassStatus{ - ObservedGeneration: gc.Generation, - Conditions: conditions.NewDefaultGatewayClassConditions(), - }, - GatewayStatus: &state.GatewayStatus{ - NsName: gwNsName, - ObservedGeneration: gw.Generation, - ListenerStatuses: map[string]state.ListenerStatus{ - "listener-80-1": { - AttachedRoutes: 1, - Conditions: conditions.NewDefaultListenerConditions(), - }, + SSLServers: []dataplane.VirtualServer{}, + BackendGroups: []graph.BackendGroup{bg}, + } + expectedStatuses := state.Statuses{ + GatewayClassStatus: &state.GatewayClassStatus{ + ObservedGeneration: gc.Generation, + Conditions: conditions.NewDefaultGatewayClassConditions(), }, - }, - IgnoredGatewayStatuses: map[types.NamespacedName]state.IgnoredGatewayStatus{}, - HTTPRouteStatuses: map[types.NamespacedName]state.HTTPRouteStatus{ - hrNsName: { - ObservedGeneration: hr.Generation, - ParentStatuses: map[string]state.ParentStatus{ + GatewayStatus: &state.GatewayStatus{ + NsName: gwNsName, + ObservedGeneration: gw.Generation, + ListenerStatuses: map[string]state.ListenerStatus{ "listener-80-1": { - Conditions: conditions.NewDefaultRouteConditions(), + AttachedRoutes: 1, + Conditions: conditions.NewDefaultListenerConditions(), }, }, }, - }, - } + IgnoredGatewayStatuses: map[types.NamespacedName]state.IgnoredGatewayStatus{}, + HTTPRouteStatuses: map[types.NamespacedName]state.HTTPRouteStatus{ + hrNsName: { + ObservedGeneration: hr.Generation, + ParentStatuses: map[string]state.ParentStatus{ + "listener-80-1": { + Conditions: conditions.NewDefaultRouteConditions(), + }, + }, + }, + }, + } - changed, conf, statuses := processor.Process(context.TODO()) + changed, conf, statuses := processor.Process(context.TODO()) - Expect(changed).To(BeTrue()) - Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) - assertStatuses(expectedStatuses, statuses) + Expect(changed).To(BeTrue()) + Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) + assertStatuses(expectedStatuses, statuses) - Expect(fakeEventRecorder.Events).To(HaveLen(0)) + Expect(fakeEventRecorder.Events).To(HaveLen(0)) + }) }) - It("should delete invalid HTTPRoute", func() { - processor.CaptureUpsertChange(hrInvalid) + When("a new version of HTTPRoute is invalid", func() { + It("it should delete the configuration for the old one and not process the new one", func() { + processor.CaptureUpsertChange(hrInvalid) - expectedConf := dataplane.Configuration{ - HTTPServers: []dataplane.VirtualServer{ - { - IsDefault: true, + expectedConf := dataplane.Configuration{ + HTTPServers: []dataplane.VirtualServer{ + { + IsDefault: true, + }, }, - }, - SSLServers: []dataplane.VirtualServer{}, - } - expectedStatuses := state.Statuses{ - GatewayClassStatus: &state.GatewayClassStatus{ - ObservedGeneration: gc.Generation, - Conditions: conditions.NewDefaultGatewayClassConditions(), - }, - GatewayStatus: &state.GatewayStatus{ - NsName: gwNsName, - ObservedGeneration: gw.Generation, - ListenerStatuses: map[string]state.ListenerStatus{ - "listener-80-1": { - AttachedRoutes: 0, - Conditions: conditions.NewDefaultListenerConditions(), + SSLServers: []dataplane.VirtualServer{}, + } + expectedStatuses := state.Statuses{ + GatewayClassStatus: &state.GatewayClassStatus{ + ObservedGeneration: gc.Generation, + Conditions: conditions.NewDefaultGatewayClassConditions(), + }, + GatewayStatus: &state.GatewayStatus{ + NsName: gwNsName, + ObservedGeneration: gw.Generation, + ListenerStatuses: map[string]state.ListenerStatus{ + "listener-80-1": { + AttachedRoutes: 0, + Conditions: conditions.NewDefaultListenerConditions(), + }, }, }, - }, - IgnoredGatewayStatuses: map[types.NamespacedName]state.IgnoredGatewayStatus{}, - HTTPRouteStatuses: map[types.NamespacedName]state.HTTPRouteStatus{}, - } + IgnoredGatewayStatuses: map[types.NamespacedName]state.IgnoredGatewayStatus{}, + HTTPRouteStatuses: map[types.NamespacedName]state.HTTPRouteStatus{}, + } - changed, conf, statuses := processor.Process(context.TODO()) + changed, conf, statuses := processor.Process(context.TODO()) - Expect(changed).To(BeTrue()) - Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) - assertStatuses(expectedStatuses, statuses) + Expect(changed).To(BeTrue()) + Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) + assertStatuses(expectedStatuses, statuses) - Expect(fakeEventRecorder.Events).To(HaveLen(1)) - assertHREvent() + Expect(fakeEventRecorder.Events).To(HaveLen(1)) + assertHREvent() + }) }) - It("should delete invalid Gateway", func() { - processor.CaptureUpsertChange(gwInvalid) + When("a new version of Gateway is invalid", func() { + It("it should delete the configuration for the old one and not process the new one", func() { + processor.CaptureUpsertChange(gwInvalid) - expectedConf := dataplane.Configuration{} - expectedStatuses := state.Statuses{ - GatewayClassStatus: &state.GatewayClassStatus{ - ObservedGeneration: gc.Generation, - Conditions: conditions.NewDefaultGatewayClassConditions(), - }, - IgnoredGatewayStatuses: map[types.NamespacedName]state.IgnoredGatewayStatus{}, - HTTPRouteStatuses: map[types.NamespacedName]state.HTTPRouteStatus{}, - } + expectedConf := dataplane.Configuration{} + expectedStatuses := state.Statuses{ + GatewayClassStatus: &state.GatewayClassStatus{ + ObservedGeneration: gc.Generation, + Conditions: conditions.NewDefaultGatewayClassConditions(), + }, + IgnoredGatewayStatuses: map[types.NamespacedName]state.IgnoredGatewayStatus{}, + HTTPRouteStatuses: map[types.NamespacedName]state.HTTPRouteStatus{}, + } - changed, conf, statuses := processor.Process(context.TODO()) + changed, conf, statuses := processor.Process(context.TODO()) - Expect(changed).To(BeTrue()) - Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) - assertStatuses(expectedStatuses, statuses) + Expect(changed).To(BeTrue()) + Expect(helpers.Diff(expectedConf, conf)).To(BeEmpty()) + assertStatuses(expectedStatuses, statuses) - Expect(fakeEventRecorder.Events).To(HaveLen(1)) - assertGwEvent() + Expect(fakeEventRecorder.Events).To(HaveLen(1)) + assertGwEvent() + }) }) }) From a15a24607c2656797f53d1ee07b6b1a0ddcb09b6 Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Tue, 4 Apr 2023 11:13:33 -0700 Subject: [PATCH 4/5] store -> state --- internal/state/graph/graph.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/state/graph/graph.go b/internal/state/graph/graph.go index 981ac3332b..4098401f88 100644 --- a/internal/state/graph/graph.go +++ b/internal/state/graph/graph.go @@ -33,13 +33,13 @@ type Graph struct { // BuildGraph builds a Graph from a state. func BuildGraph( - store ClusterState, + state ClusterState, controllerName string, gcName string, secretMemoryMgr secrets.SecretDiskMemoryManager, validators validation.Validators, ) *Graph { - gatewayClass := store.GatewayClasses[types.NamespacedName{Name: gcName}] + gatewayClass := state.GatewayClasses[types.NamespacedName{Name: gcName}] if !gatewayClassBelongsToController(gatewayClass, controllerName) { return &Graph{} @@ -47,13 +47,13 @@ func BuildGraph( gc := buildGatewayClass(gatewayClass) - processedGws := processGateways(store.Gateways, gcName) + processedGws := processGateways(state.Gateways, gcName) gw := buildGateway(processedGws.Winner, secretMemoryMgr) - routes := buildRoutesForGateways(validators.HTTPFieldsValidator, store.HTTPRoutes, processedGws.GetAllNsNames()) + routes := buildRoutesForGateways(validators.HTTPFieldsValidator, state.HTTPRoutes, processedGws.GetAllNsNames()) bindRoutesToListeners(routes, gw) - addBackendGroupsToRoutes(routes, store.Services) + addBackendGroupsToRoutes(routes, state.Services) g := &Graph{ GatewayClass: gc, From 33765c344b066a9b708b2ed0e9605faa99c55c5e Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Tue, 4 Apr 2023 13:34:56 -0700 Subject: [PATCH 5/5] Simplify validatingUpsertUpdater --- internal/state/change_processor.go | 45 +++++++++++++++--------------- internal/state/store.go | 40 +++++++++++++------------- 2 files changed, 41 insertions(+), 44 deletions(-) diff --git a/internal/state/change_processor.go b/internal/state/change_processor.go index c0db307812..911d8061fa 100644 --- a/internal/state/change_processor.go +++ b/internal/state/change_processor.go @@ -141,29 +141,28 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl { updater := newValidatingUpsertUpdater( trackingUpdater, cfg.EventRecorder, - []upsertValidatorFunc{ - func(obj client.Object) error { - // Add the validation for Gateway API resources which the webhook validates - - var err error - switch o := obj.(type) { - // We don't validate GatewayClass, because as of 0.6.2, the webhook doesn't validate it (it only - // validates an update that requires the previous version of the resource, - // which NKG cannot reliably provide - for example, after NKG restarts). - // https://github.com/kubernetes-sigs/gateway-api/blob/v0.6.2/apis/v1beta1/validation/gatewayclass.go#L28 - case *v1beta1.Gateway: - err = gwapivalidation.ValidateGateway(o).ToAggregate() - case *v1beta1.HTTPRoute: - err = gwapivalidation.ValidateHTTPRoute(o).ToAggregate() - } - - if err != nil { - return fmt.Errorf(webhookValidationErrorLogMsg+"; validation error: %w", err) - } - - return nil - }, - }) + func(obj client.Object) error { + // Add the validation for Gateway API resources which the webhook validates + + var err error + switch o := obj.(type) { + // We don't validate GatewayClass, because as of 0.6.2, the webhook doesn't validate it (it only + // validates an update that requires the previous version of the resource, + // which NKG cannot reliably provide - for example, after NKG restarts). + // https://github.com/kubernetes-sigs/gateway-api/blob/v0.6.2/apis/v1beta1/validation/gatewayclass.go#L28 + case *v1beta1.Gateway: + err = gwapivalidation.ValidateGateway(o).ToAggregate() + case *v1beta1.HTTPRoute: + err = gwapivalidation.ValidateHTTPRoute(o).ToAggregate() + } + + if err != nil { + return fmt.Errorf(webhookValidationErrorLogMsg+"; validation error: %w", err) + } + + return nil + }, + ) return &ChangeProcessorImpl{ cfg: cfg, diff --git a/internal/state/store.go b/internal/state/store.go index 33e8eaefe9..2f2db86acb 100644 --- a/internal/state/store.go +++ b/internal/state/store.go @@ -240,38 +240,36 @@ type upsertValidatorFunc func(obj client.Object) error // validatingUpsertUpdater is an Updater that validates an object before upserting it. // If the validation fails, it deletes the object and records an event with the validation error. type validatingUpsertUpdater struct { - updater Updater - eventRecorder record.EventRecorder - upsertValidators []upsertValidatorFunc + updater Updater + eventRecorder record.EventRecorder + validator upsertValidatorFunc } func newValidatingUpsertUpdater( updater Updater, eventRecorder record.EventRecorder, - upsertValidators []upsertValidatorFunc, + validator upsertValidatorFunc, ) *validatingUpsertUpdater { return &validatingUpsertUpdater{ - updater: updater, - eventRecorder: eventRecorder, - upsertValidators: upsertValidators, + updater: updater, + eventRecorder: eventRecorder, + validator: validator, } } func (u *validatingUpsertUpdater) Upsert(obj client.Object) { - for _, validator := range u.upsertValidators { - if err := validator(obj); err != nil { - u.updater.Delete(obj, client.ObjectKeyFromObject(obj)) - - u.eventRecorder.Eventf( - obj, - apiv1.EventTypeWarning, - "Rejected", - "%s; NKG will delete any existing NGINX configuration that corresponds to the resource", - err.Error(), - ) - - return - } + if err := u.validator(obj); err != nil { + u.updater.Delete(obj, client.ObjectKeyFromObject(obj)) + + u.eventRecorder.Eventf( + obj, + apiv1.EventTypeWarning, + "Rejected", + "%s; NKG will delete any existing NGINX configuration that corresponds to the resource", + err.Error(), + ) + + return } u.updater.Upsert(obj)