diff --git a/internal/events/event.go b/internal/events/event.go index 5851835c5b..344ef377fb 100644 --- a/internal/events/event.go +++ b/internal/events/event.go @@ -5,6 +5,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +// EventBatch is a batch of events to be handled at once. +// FIXME(pleshakov): think about how to avoid using an interface{} here +type EventBatch []interface{} + // UpsertEvent represents upserting a resource. type UpsertEvent struct { // Resource is the resource that is being upserted. diff --git a/internal/events/eventsfakes/fake_event_handler.go b/internal/events/eventsfakes/fake_event_handler.go new file mode 100644 index 0000000000..06a20b3a12 --- /dev/null +++ b/internal/events/eventsfakes/fake_event_handler.go @@ -0,0 +1,79 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package eventsfakes + +import ( + "context" + "sync" + + "github.com/nginxinc/nginx-kubernetes-gateway/internal/events" +) + +type FakeEventHandler struct { + HandleEventBatchStub func(context.Context, events.EventBatch) + handleEventBatchMutex sync.RWMutex + handleEventBatchArgsForCall []struct { + arg1 context.Context + arg2 events.EventBatch + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeEventHandler) HandleEventBatch(arg1 context.Context, arg2 events.EventBatch) { + fake.handleEventBatchMutex.Lock() + fake.handleEventBatchArgsForCall = append(fake.handleEventBatchArgsForCall, struct { + arg1 context.Context + arg2 events.EventBatch + }{arg1, arg2}) + stub := fake.HandleEventBatchStub + fake.recordInvocation("HandleEventBatch", []interface{}{arg1, arg2}) + fake.handleEventBatchMutex.Unlock() + if stub != nil { + fake.HandleEventBatchStub(arg1, arg2) + } +} + +func (fake *FakeEventHandler) HandleEventBatchCallCount() int { + fake.handleEventBatchMutex.RLock() + defer fake.handleEventBatchMutex.RUnlock() + return len(fake.handleEventBatchArgsForCall) +} + +func (fake *FakeEventHandler) HandleEventBatchCalls(stub func(context.Context, events.EventBatch)) { + fake.handleEventBatchMutex.Lock() + defer fake.handleEventBatchMutex.Unlock() + fake.HandleEventBatchStub = stub +} + +func (fake *FakeEventHandler) HandleEventBatchArgsForCall(i int) (context.Context, events.EventBatch) { + fake.handleEventBatchMutex.RLock() + defer fake.handleEventBatchMutex.RUnlock() + argsForCall := fake.handleEventBatchArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeEventHandler) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.handleEventBatchMutex.RLock() + defer fake.handleEventBatchMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeEventHandler) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ events.EventHandler = new(FakeEventHandler) diff --git a/internal/events/handler.go b/internal/events/handler.go new file mode 100644 index 0000000000..a3a40499a1 --- /dev/null +++ b/internal/events/handler.go @@ -0,0 +1,161 @@ +package events + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + apiv1 "k8s.io/api/core/v1" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/file" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/runtime" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/state" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/status" +) + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . EventHandler + +// EventHandler handle events. +type EventHandler interface { + // HandleEventBatch handles a batch of events. + HandleEventBatch(ctx context.Context, batch EventBatch) +} + +// EventHandlerConfig holds configuration parameters for EventHandlerImpl. +type EventHandlerConfig struct { + // Processor is the state ChangeProcessor. + Processor state.ChangeProcessor + // ServiceStore is the state ServiceStore. + ServiceStore state.ServiceStore + // SecretStore is the state SecretStore. + SecretStore state.SecretStore + // SecretMemoryManager is the state SecretMemoryManager. + SecretMemoryManager state.SecretDiskMemoryManager + // Generator is the nginx config Generator. + Generator config.Generator + // Logger is the logger to be used by the EventHandler. + Logger logr.Logger + // NginxFileMgr is the file Manager for nginx. + NginxFileMgr file.Manager + // NginxRuntimeMgr manages nginx runtime. + NginxRuntimeMgr runtime.Manager + // StatusUpdater updates statuses on Kubernetes resources. + StatusUpdater status.Updater +} + +// EventHandlerImpl implements EventHandler. +// EventHandlerImpl is responsible for: +// (1) Reconciling the Gateway API and Kubernetes built-in resources with the NGINX configuration. +// (2) Keeping the statuses of the Gateway API resources updated. +type EventHandlerImpl struct { + cfg EventHandlerConfig +} + +// NewEventHandlerImpl creates a new EventHandlerImpl. +func NewEventHandlerImpl(cfg EventHandlerConfig) *EventHandlerImpl { + return &EventHandlerImpl{ + cfg: cfg, + } +} + +func (h *EventHandlerImpl) HandleEventBatch(ctx context.Context, batch EventBatch) { + + for _, event := range batch { + switch e := event.(type) { + case *UpsertEvent: + h.propagateUpsert(e) + case *DeleteEvent: + h.propagateDelete(e) + default: + panic(fmt.Errorf("unknown event type %T", e)) + } + } + + changed, conf, statuses := h.cfg.Processor.Process() + if !changed { + h.cfg.Logger.Info("Handling events didn't result into NGINX configuration changes") + return + } + + err := h.updateNginx(ctx, conf) + if err != nil { + h.cfg.Logger.Error(err, "Failed to update NGINX configuration") + } else { + h.cfg.Logger.Info("NGINX configuration was successfully updated") + } + + h.cfg.StatusUpdater.Update(ctx, statuses) +} + +func (h *EventHandlerImpl) updateNginx(ctx context.Context, conf state.Configuration) error { + // Write all secrets (nuke and pave). + // This will remove all secrets in the secrets directory before writing the requested secrets. + // FIXME(kate-osborn): We may want to rethink this approach in the future and write and remove secrets individually. + err := h.cfg.SecretMemoryManager.WriteAllRequestedSecrets() + if err != nil { + return err + } + + cfg, warnings := h.cfg.Generator.Generate(conf) + + // For now, we keep all http servers in one config + // We might rethink that. For example, we can write each server to its file + // or group servers in some way. + err = h.cfg.NginxFileMgr.WriteHTTPServersConfig("http-servers", cfg) + if err != nil { + return err + } + + for obj, objWarnings := range warnings { + for _, w := range objWarnings { + // FIXME(pleshakov): report warnings via Object status + h.cfg.Logger.Info("Got warning while generating config", + "kind", obj.GetObjectKind().GroupVersionKind().Kind, + "namespace", obj.GetNamespace(), + "name", obj.GetName(), + "warning", w) + } + } + + return h.cfg.NginxRuntimeMgr.Reload(ctx) +} + +func (h *EventHandlerImpl) propagateUpsert(e *UpsertEvent) { + switch r := e.Resource.(type) { + case *v1beta1.GatewayClass: + h.cfg.Processor.CaptureUpsertChange(r) + case *v1beta1.Gateway: + h.cfg.Processor.CaptureUpsertChange(r) + case *v1beta1.HTTPRoute: + h.cfg.Processor.CaptureUpsertChange(r) + case *apiv1.Service: + // FIXME(pleshakov): make sure the affected hosts are updated + h.cfg.ServiceStore.Upsert(r) + case *apiv1.Secret: + // FIXME(kate-osborn): need to handle certificate rotation + h.cfg.SecretStore.Upsert(r) + default: + panic(fmt.Errorf("unknown resource type %T", e.Resource)) + } +} + +func (h *EventHandlerImpl) propagateDelete(e *DeleteEvent) { + switch e.Type.(type) { + case *v1beta1.GatewayClass: + h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName) + case *v1beta1.Gateway: + h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName) + case *v1beta1.HTTPRoute: + h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName) + case *apiv1.Service: + // FIXME(pleshakov): make sure the affected hosts are updated + h.cfg.ServiceStore.Delete(e.NamespacedName) + case *apiv1.Secret: + // FIXME(kate-osborn): make sure that affected servers are updated + h.cfg.SecretStore.Delete(e.NamespacedName) + default: + panic(fmt.Errorf("unknown resource type %T", e.Type)) + } +} diff --git a/internal/events/handler_test.go b/internal/events/handler_test.go new file mode 100644 index 0000000000..ed1a15894d --- /dev/null +++ b/internal/events/handler_test.go @@ -0,0 +1,303 @@ +package events_test + +import ( + "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/nginxinc/nginx-kubernetes-gateway/internal/events" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config/configfakes" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/file/filefakes" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/runtime/runtimefakes" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/state" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/state/statefakes" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/status/statusfakes" +) + +type unsupportedResource struct { + metav1.ObjectMeta +} + +func (r *unsupportedResource) GetObjectKind() schema.ObjectKind { + return nil +} + +func (r *unsupportedResource) DeepCopyObject() runtime.Object { + return nil +} + +var _ = Describe("EventHandler", func() { + var ( + handler *events.EventHandlerImpl + fakeProcessor *statefakes.FakeChangeProcessor + fakeServiceStore *statefakes.FakeServiceStore + fakeSecretStore *statefakes.FakeSecretStore + fakeSecretMemoryManager *statefakes.FakeSecretDiskMemoryManager + fakeGenerator *configfakes.FakeGenerator + fakeNginxFimeMgr *filefakes.FakeManager + fakeNginxRuntimeMgr *runtimefakes.FakeManager + fakeStatusUpdater *statusfakes.FakeUpdater + ) + + expectReconfig := func(expectedConf state.Configuration, expectedCfg []byte, expectedStatuses state.Statuses) { + Expect(fakeProcessor.ProcessCallCount()).Should(Equal(1)) + + Expect(fakeGenerator.GenerateCallCount()).Should(Equal(1)) + Expect(fakeGenerator.GenerateArgsForCall(0)).Should(Equal(expectedConf)) + + Expect(fakeNginxFimeMgr.WriteHTTPServersConfigCallCount()).Should(Equal(1)) + name, cfg := fakeNginxFimeMgr.WriteHTTPServersConfigArgsForCall(0) + Expect(name).Should(Equal("http-servers")) + Expect(cfg).Should(Equal(expectedCfg)) + + Expect(fakeNginxRuntimeMgr.ReloadCallCount()).Should(Equal(1)) + + Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(1)) + _, statuses := fakeStatusUpdater.UpdateArgsForCall(0) + Expect(statuses).Should(Equal(expectedStatuses)) + } + + BeforeEach(func() { + fakeProcessor = &statefakes.FakeChangeProcessor{} + fakeServiceStore = &statefakes.FakeServiceStore{} + fakeSecretMemoryManager = &statefakes.FakeSecretDiskMemoryManager{} + fakeSecretStore = &statefakes.FakeSecretStore{} + fakeGenerator = &configfakes.FakeGenerator{} + fakeNginxFimeMgr = &filefakes.FakeManager{} + fakeNginxRuntimeMgr = &runtimefakes.FakeManager{} + fakeStatusUpdater = &statusfakes.FakeUpdater{} + + handler = events.NewEventHandlerImpl(events.EventHandlerConfig{ + Processor: fakeProcessor, + ServiceStore: fakeServiceStore, + SecretStore: fakeSecretStore, + SecretMemoryManager: fakeSecretMemoryManager, + Generator: fakeGenerator, + Logger: zap.New(), + NginxFileMgr: fakeNginxFimeMgr, + NginxRuntimeMgr: fakeNginxRuntimeMgr, + StatusUpdater: fakeStatusUpdater, + }) + }) + + Describe("Process the Gateway API resources events", func() { + DescribeTable("A batch with one event", + func(e interface{}) { + fakeConf := state.Configuration{} + fakeStatuses := state.Statuses{} + changed := true + fakeProcessor.ProcessReturns(changed, fakeConf, fakeStatuses) + + fakeCfg := []byte("fake") + fakeGenerator.GenerateReturns(fakeCfg, config.Warnings{}) + + batch := []interface{}{e} + + handler.HandleEventBatch(context.TODO(), batch) + + // Check that the events were captured + switch typedEvent := e.(type) { + case *events.UpsertEvent: + Expect(fakeProcessor.CaptureUpsertChangeCallCount()).Should(Equal(1)) + Expect(fakeProcessor.CaptureUpsertChangeArgsForCall(0)).Should(Equal(typedEvent.Resource)) + case *events.DeleteEvent: + Expect(fakeProcessor.CaptureDeleteChangeCallCount()).Should(Equal(1)) + passedObj, passedNsName := fakeProcessor.CaptureDeleteChangeArgsForCall(0) + Expect(passedObj).Should(Equal(typedEvent.Type)) + Expect(passedNsName).Should(Equal(typedEvent.NamespacedName)) + default: + Fail(fmt.Sprintf("unsupported event type %T", e)) + } + + // Check that a reconfig happened + expectReconfig(fakeConf, fakeCfg, fakeStatuses) + + }, + Entry("HTTPRoute upsert", &events.UpsertEvent{Resource: &v1beta1.HTTPRoute{}}), + Entry("Gateway upsert", &events.UpsertEvent{Resource: &v1beta1.Gateway{}}), + Entry("GatewayClass upsert", &events.UpsertEvent{Resource: &v1beta1.GatewayClass{}}), + Entry("HTTPRoute delete", &events.DeleteEvent{Type: &v1beta1.HTTPRoute{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "route"}}), + Entry("Gateway delete", &events.DeleteEvent{Type: &v1beta1.Gateway{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "gateway"}}), + Entry("GatewayClass delete", &events.DeleteEvent{Type: &v1beta1.GatewayClass{}, NamespacedName: types.NamespacedName{Name: "class"}}), + ) + }) + + Describe("Process Kubernetes resources events", func() { + expectNoReconfig := func() { + Expect(fakeProcessor.ProcessCallCount()).Should(Equal(1)) + Expect(fakeGenerator.GenerateCallCount()).Should(Equal(0)) + Expect(fakeNginxFimeMgr.WriteHTTPServersConfigCallCount()).Should(Equal(0)) + Expect(fakeNginxRuntimeMgr.ReloadCallCount()).Should(Equal(0)) + Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(0)) + } + + Describe("Process Service events", func() { + It("should process upsert event", func() { + svc := &apiv1.Service{} + + batch := []interface{}{&events.UpsertEvent{ + Resource: svc, + }} + + handler.HandleEventBatch(context.TODO(), batch) + + Expect(fakeServiceStore.UpsertCallCount()).Should(Equal(1)) + Expect(fakeServiceStore.UpsertArgsForCall(0)).Should(Equal(svc)) + + expectNoReconfig() + }) + + It("should process delete event", func() { + nsname := types.NamespacedName{Namespace: "test", Name: "service"} + + batch := []interface{}{&events.DeleteEvent{ + NamespacedName: nsname, + Type: &apiv1.Service{}, + }} + + handler.HandleEventBatch(context.TODO(), batch) + + Expect(fakeServiceStore.DeleteCallCount()).Should(Equal(1)) + Expect(fakeServiceStore.DeleteArgsForCall(0)).Should(Equal(nsname)) + + expectNoReconfig() + }) + }) + + Describe("Process Secret events", func() { + It("should process upsert event", func() { + secret := &apiv1.Secret{} + + batch := []interface{}{&events.UpsertEvent{ + Resource: secret, + }} + + handler.HandleEventBatch(context.TODO(), batch) + + Expect(fakeSecretStore.UpsertCallCount()).Should(Equal(1)) + Expect(fakeSecretStore.UpsertArgsForCall(0)).Should(Equal(secret)) + + expectNoReconfig() + }) + + It("should process delete event", func() { + nsname := types.NamespacedName{Namespace: "test", Name: "secret"} + + batch := []interface{}{&events.DeleteEvent{ + NamespacedName: nsname, + Type: &apiv1.Secret{}, + }} + + handler.HandleEventBatch(context.TODO(), batch) + + Expect(fakeSecretStore.DeleteCallCount()).Should(Equal(1)) + Expect(fakeSecretStore.DeleteArgsForCall(0)).Should(Equal(nsname)) + + expectNoReconfig() + }) + }) + }) + + It("should process a batch with upsert and delete events for every supported resource", func() { + svc := &apiv1.Service{} + svcNsName := types.NamespacedName{Namespace: "test", Name: "service"} + secret := &apiv1.Secret{} + secretNsName := types.NamespacedName{Namespace: "test", Name: "secret"} + + upserts := []interface{}{ + &events.UpsertEvent{Resource: &v1beta1.HTTPRoute{}}, + &events.UpsertEvent{Resource: &v1beta1.Gateway{}}, + &events.UpsertEvent{Resource: &v1beta1.GatewayClass{}}, + &events.UpsertEvent{Resource: svc}, + &events.UpsertEvent{Resource: secret}, + } + deletes := []interface{}{ + &events.DeleteEvent{Type: &v1beta1.HTTPRoute{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "route"}}, + &events.DeleteEvent{Type: &v1beta1.Gateway{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "gateway"}}, + &events.DeleteEvent{Type: &v1beta1.GatewayClass{}, NamespacedName: types.NamespacedName{Name: "class"}}, + &events.DeleteEvent{Type: &apiv1.Service{}, NamespacedName: svcNsName}, + &events.DeleteEvent{Type: &apiv1.Secret{}, NamespacedName: secretNsName}, + } + + batch := make([]interface{}, 0, len(upserts)+len(deletes)) + batch = append(batch, upserts...) + batch = append(batch, deletes...) + + fakeConf := state.Configuration{} + changed := true + fakeStatuses := state.Statuses{} + fakeProcessor.ProcessReturns(changed, fakeConf, fakeStatuses) + + fakeCfg := []byte("fake") + fakeGenerator.GenerateReturns(fakeCfg, config.Warnings{}) + + handler.HandleEventBatch(context.TODO(), batch) + + // Check that the events for Gateway API resources were captured + + // 3, not 5, because the last 2 do not result into CaptureUpsertChange() call + Expect(fakeProcessor.CaptureUpsertChangeCallCount()).Should(Equal(3)) + for i := 0; i < 3; i++ { + Expect(fakeProcessor.CaptureUpsertChangeArgsForCall(i)).Should(Equal(upserts[i].(*events.UpsertEvent).Resource)) + } + Expect(fakeProcessor.CaptureDeleteChangeCallCount()).Should(Equal(3)) + + // 3, not 5, because the last 2 do not result into CaptureDeleteChange() call + for i := 0; i < 3; i++ { + d := deletes[i].(*events.DeleteEvent) + passedObj, passedNsName := fakeProcessor.CaptureDeleteChangeArgsForCall(i) + Expect(passedObj).Should(Equal(d.Type)) + Expect(passedNsName).Should(Equal(d.NamespacedName)) + } + + // Check Service-related expectations + Expect(fakeServiceStore.UpsertCallCount()).Should(Equal(1)) + Expect(fakeServiceStore.UpsertArgsForCall(0)).Should(Equal(svc)) + + Expect(fakeServiceStore.DeleteCallCount()).Should(Equal(1)) + Expect(fakeServiceStore.DeleteArgsForCall(0)).Should(Equal(svcNsName)) + + // Check Secret-related expectations + Expect(fakeSecretStore.UpsertCallCount()).Should(Equal(1)) + Expect(fakeSecretStore.UpsertArgsForCall(0)).Should(Equal(secret)) + + Expect(fakeSecretStore.DeleteCallCount()).Should(Equal(1)) + Expect(fakeSecretStore.DeleteArgsForCall(0)).Should(Equal(secretNsName)) + + // Check that a reconfig happened + expectReconfig(fakeConf, fakeCfg, fakeStatuses) + }) + + Describe("Edge cases", func() { + DescribeTable("Edge cases for events", + func(e interface{}) { + handle := func() { + batch := []interface{}{e} + handler.HandleEventBatch(context.TODO(), batch) + } + + Expect(handle).Should(Panic()) + }, + Entry("should panic for an unknown event type", + &struct{}{}), + Entry("should panic for an unknown type of resource in upsert event", + &events.UpsertEvent{ + Resource: &unsupportedResource{}, + }), + Entry("should panic for an unknown type of resource in delete event", + &events.DeleteEvent{ + Type: &unsupportedResource{}, + }), + ) + }) +}) diff --git a/internal/events/loop.go b/internal/events/loop.go index 33c5531f0a..5c43d1d6cf 100644 --- a/internal/events/loop.go +++ b/internal/events/loop.go @@ -5,158 +5,102 @@ import ( "fmt" "github.com/go-logr/logr" - apiv1 "k8s.io/api/core/v1" - "sigs.k8s.io/gateway-api/apis/v1beta1" - - "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config" - "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/file" - "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/runtime" - "github.com/nginxinc/nginx-kubernetes-gateway/internal/state" - "github.com/nginxinc/nginx-kubernetes-gateway/internal/status" ) -// EventLoopConfig holds configuration parameters for EventLoop. -type EventLoopConfig struct { - // Processor is the state ChangeProcessor. - Processor state.ChangeProcessor - // ServiceStore is the state ServiceStore. - ServiceStore state.ServiceStore - // SecretStore is the state SecretStore. - SecretStore state.SecretStore - // SecretMemoryManager is the state SecretMemoryManager. - SecretMemoryManager state.SecretDiskMemoryManager - // Generator is the nginx config Generator. - Generator config.Generator - // EventCh is a read-only channel for events. - EventCh <-chan interface{} - // Logger is the logger to be used by the EventLoop. - Logger logr.Logger - // NginxFileMgr is the file Manager for nginx. - NginxFileMgr file.Manager - // NginxRuntimeMgr manages nginx runtime. - NginxRuntimeMgr runtime.Manager - // StatusUpdater updates statuses on Kubernetes resources. - StatusUpdater status.Updater -} - -// EventLoop is the main event loop of the Gateway. +// EventLoop is the main event loop of the Gateway. It handles events coming through the event channel. +// +// When a new event comes, there are two cases: +// - If there is no event(s) currently being handled, the new event is handled immediately. +// - Otherwise, the new event will be saved for later handling. All saved events will be handled after the handling of +// the current event(s) finishes. Multiple saved events will be handled at once -- they will be batched. +// +// Batching is needed because, because typically handling an event (or multiple events at once) will result into +// reloading NGINX, which is the operation we want to minimize, for the following reasons: +// (1) A reload takes time - at least 200ms. The time depends on the size of the configuration including the number of +// TLS certs, available CPU cycles. +// (2) A reload can have side-effects for the data plane traffic. +// FIXME(pleshakov): better document the side effects and how to prevent and mitigate them. +// So when the EventLoop have 100 saved events, it is better to process them at once rather than one by one. type EventLoop struct { - cfg EventLoopConfig + eventCh <-chan interface{} + logger logr.Logger + handler EventHandler } // NewEventLoop creates a new EventLoop. -func NewEventLoop(cfg EventLoopConfig) *EventLoop { - return &EventLoop{cfg: cfg} +func NewEventLoop(eventCh <-chan interface{}, logger logr.Logger, handler EventHandler) *EventLoop { + return &EventLoop{ + eventCh: eventCh, + logger: logger, + handler: handler, + } } // Start starts the EventLoop. -// The method will block until the EventLoop stops: -// - if it stops because of an error, the Start will return the error. -// - if it stops normally, the Start will return nil. +// This method will block until the EventLoop stops, which will happen after the ctx is closed. +// +// FIXME(pleshakov). Ensure that when the Gateway starts, the first time it generates configuration for NGINX, +// it has a complete view of the cluster resources. For example, when the Gateway processes a Gateway resource +// with a listener with TLS termination enabled (the listener references a TLS Secret), the Gateway knows about the secret. +// This way the Gateway will not produce any incomplete transient configuration at the start. func (el *EventLoop) Start(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - // although we always return nil, Start must return it to satisfy - // "sigs.k8s.io/controller-runtime/pkg/manager".Runnable - return nil - case e := <-el.cfg.EventCh: - el.handleEvent(ctx, e) - } - } -} + // The current batch. + var batch EventBatch + // handling tells if any batch is currently being handled. + var handling bool + // handlingDone is used to signal the completion of handling a batch. + handlingDone := make(chan struct{}) -// FIXME(pleshakov): think about how to avoid using an interface{} here -func (el *EventLoop) handleEvent(ctx context.Context, event interface{}) { - switch e := event.(type) { - case *UpsertEvent: - el.propagateUpsert(e) - case *DeleteEvent: - el.propagateDelete(e) - default: - panic(fmt.Errorf("unknown event type %T", e)) - } + handle := func(ctx context.Context, batch EventBatch) { + el.logger.Info("Handling events from the batch", "total", len(batch)) - changed, conf, statuses := el.cfg.Processor.Process() - if !changed { - return - } + el.handler.HandleEventBatch(ctx, batch) - err := el.updateNginx(ctx, conf) - if err != nil { - el.cfg.Logger.Error(err, "Failed to update NGINX configuration") - } + el.logger.Info("Finished handling the batch") - el.cfg.StatusUpdater.Update(ctx, statuses) -} - -func (el *EventLoop) updateNginx(ctx context.Context, conf state.Configuration) error { - // Write all secrets (nuke and pave). - // This will remove all secrets in the secrets directory before writing the requested secrets. - // FIXME(kate-osborn): We may want to rethink this approach in the future and write and remove secrets individually. - err := el.cfg.SecretMemoryManager.WriteAllRequestedSecrets() - if err != nil { - return err + handlingDone <- struct{}{} } - cfg, warnings := el.cfg.Generator.Generate(conf) - - // For now, we keep all http servers in one config - // We might rethink that. For example, we can write each server to its file - // or group servers in some way. - err = el.cfg.NginxFileMgr.WriteHTTPServersConfig("http-servers", cfg) - if err != nil { - return err - } + // Note: at any point of time, no more than one batch is currently being handled. - for obj, objWarnings := range warnings { - for _, w := range objWarnings { - // FIXME(pleshakov): report warnings via Object status - el.cfg.Logger.Info("got warning while generating config", - "kind", obj.GetObjectKind().GroupVersionKind().Kind, - "namespace", obj.GetNamespace(), - "name", obj.GetName(), - "warning", w) + // The event loop + for { + select { + case <-ctx.Done(): + // Wait for the completion if a batch is being handled. + if handling { + <-handlingDone + } + return nil + case e := <-el.eventCh: + // Add the event to the current batch. + batch = append(batch, e) + + // FIXME(pleshakov): Log more details about the event like resource GVK and ns/name. + el.logger.Info( + "added an event to the current batch", + "type", fmt.Sprintf("%T", e), + "total", len(batch), + ) + + // Handle the current batch if no batch is being handled. + if !handling { + go handle(ctx, batch) + // FIXME(pleshakov): Making an entirely new buffer is inefficient and multiplies memory operations. + // Use a double-buffer approach - create two buffers and exchange them between the producer and consumer + // routines. NOTE: pass-by-reference, and reset buffer to length 0, but retain capacity. + batch = make([]interface{}, 0) + handling = true + } + case <-handlingDone: + handling = false + + // Handle the current batch if it has at least one event. + if len(batch) > 0 { + go handle(ctx, batch) + batch = make([]interface{}, 0) + handling = true + } } } - - return el.cfg.NginxRuntimeMgr.Reload(ctx) -} - -func (el *EventLoop) propagateUpsert(e *UpsertEvent) { - switch r := e.Resource.(type) { - case *v1beta1.GatewayClass: - el.cfg.Processor.CaptureUpsertChange(r) - case *v1beta1.Gateway: - el.cfg.Processor.CaptureUpsertChange(r) - case *v1beta1.HTTPRoute: - el.cfg.Processor.CaptureUpsertChange(r) - case *apiv1.Service: - // FIXME(pleshakov): make sure the affected hosts are updated - el.cfg.ServiceStore.Upsert(r) - case *apiv1.Secret: - // FIXME(kate-osborn): need to handle certificate rotation - el.cfg.SecretStore.Upsert(r) - default: - panic(fmt.Errorf("unknown resource type %T", e.Resource)) - } -} - -func (el *EventLoop) propagateDelete(e *DeleteEvent) { - switch e.Type.(type) { - case *v1beta1.GatewayClass: - el.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName) - case *v1beta1.Gateway: - el.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName) - case *v1beta1.HTTPRoute: - el.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName) - case *apiv1.Service: - // FIXME(pleshakov): make sure the affected hosts are updated - el.cfg.ServiceStore.Delete(e.NamespacedName) - case *apiv1.Secret: - // FIXME(kate-osborn): make sure that affected servers are updated - el.cfg.SecretStore.Delete(e.NamespacedName) - default: - panic(fmt.Errorf("unknown resource type %T", e.Type)) - } } diff --git a/internal/events/loop_test.go b/internal/events/loop_test.go index f020889d19..0135ab93e9 100644 --- a/internal/events/loop_test.go +++ b/internal/events/loop_test.go @@ -5,268 +5,97 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/log/zap" - "sigs.k8s.io/gateway-api/apis/v1beta1" "github.com/nginxinc/nginx-kubernetes-gateway/internal/events" - "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config" - "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config/configfakes" - "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/file/filefakes" - "github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/runtime/runtimefakes" - "github.com/nginxinc/nginx-kubernetes-gateway/internal/state" - "github.com/nginxinc/nginx-kubernetes-gateway/internal/state/statefakes" - "github.com/nginxinc/nginx-kubernetes-gateway/internal/status/statusfakes" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/events/eventsfakes" ) -type unsupportedResource struct { - metav1.ObjectMeta -} - -func (r *unsupportedResource) GetObjectKind() schema.ObjectKind { - return nil -} - -func (r *unsupportedResource) DeepCopyObject() runtime.Object { - return nil -} - var _ = Describe("EventLoop", func() { var ( - fakeProcessor *statefakes.FakeChangeProcessor - fakeServiceStore *statefakes.FakeServiceStore - fakeSecretStore *statefakes.FakeSecretStore - fakeSecretMemoryManager *statefakes.FakeSecretDiskMemoryManager - fakeGenerator *configfakes.FakeGenerator - fakeNginxFimeMgr *filefakes.FakeManager - fakeNginxRuntimeMgr *runtimefakes.FakeManager - fakeStatusUpdater *statusfakes.FakeUpdater - cancel context.CancelFunc - eventCh chan interface{} - errorCh chan error - start func() + fakeHandler *eventsfakes.FakeEventHandler + eventCh chan interface{} + eventLoop *events.EventLoop + cancel context.CancelFunc + errorCh chan error ) BeforeEach(func() { - fakeProcessor = &statefakes.FakeChangeProcessor{} + fakeHandler = &eventsfakes.FakeEventHandler{} eventCh = make(chan interface{}) - fakeServiceStore = &statefakes.FakeServiceStore{} - fakeSecretMemoryManager = &statefakes.FakeSecretDiskMemoryManager{} - fakeSecretStore = &statefakes.FakeSecretStore{} - fakeGenerator = &configfakes.FakeGenerator{} - fakeNginxFimeMgr = &filefakes.FakeManager{} - fakeNginxRuntimeMgr = &runtimefakes.FakeManager{} - fakeStatusUpdater = &statusfakes.FakeUpdater{} - ctrl := events.NewEventLoop(events.EventLoopConfig{ - Processor: fakeProcessor, - ServiceStore: fakeServiceStore, - SecretStore: fakeSecretStore, - SecretMemoryManager: fakeSecretMemoryManager, - Generator: fakeGenerator, - EventCh: eventCh, - Logger: zap.New(), - NginxFileMgr: fakeNginxFimeMgr, - NginxRuntimeMgr: fakeNginxRuntimeMgr, - StatusUpdater: fakeStatusUpdater, - }) + eventLoop = events.NewEventLoop(eventCh, zap.New(), fakeHandler) var ctx context.Context ctx, cancel = context.WithCancel(context.Background()) errorCh = make(chan error) - start = func() { - errorCh <- ctrl.Start(ctx) - } - }) - - Describe("Process Gateway API resource events", func() { - BeforeEach(func() { - go start() - }) - - AfterEach(func() { - cancel() - - var err error - Eventually(errorCh).Should(Receive(&err)) - Expect(err).To(BeNil()) - }) - - DescribeTable("Upsert events", - func(e *events.UpsertEvent) { - fakeConf := state.Configuration{} - changed := true - fakeStatuses := state.Statuses{} - fakeProcessor.ProcessReturns(changed, fakeConf, fakeStatuses) - - fakeCfg := []byte("fake") - fakeGenerator.GenerateReturns(fakeCfg, config.Warnings{}) - - eventCh <- e - - Eventually(fakeProcessor.CaptureUpsertChangeCallCount).Should(Equal(1)) - Expect(fakeProcessor.CaptureUpsertChangeArgsForCall(0)).Should(Equal(e.Resource)) - Eventually(fakeProcessor.ProcessCallCount).Should(Equal(1)) - - Eventually(fakeGenerator.GenerateCallCount).Should(Equal(1)) - Expect(fakeGenerator.GenerateArgsForCall(0)).Should(Equal(fakeConf)) - - Eventually(fakeNginxFimeMgr.WriteHTTPServersConfigCallCount).Should(Equal(1)) - name, cfg := fakeNginxFimeMgr.WriteHTTPServersConfigArgsForCall(0) - Expect(name).Should(Equal("http-servers")) - Expect(cfg).Should(Equal(fakeCfg)) - - Eventually(fakeNginxRuntimeMgr.ReloadCallCount).Should(Equal(1)) - - Eventually(fakeStatusUpdater.UpdateCallCount).Should(Equal(1)) - _, statuses := fakeStatusUpdater.UpdateArgsForCall(0) - Expect(statuses).Should(Equal(fakeStatuses)) - }, - Entry("HTTPRoute", &events.UpsertEvent{Resource: &v1beta1.HTTPRoute{}}), - Entry("Gateway", &events.UpsertEvent{Resource: &v1beta1.Gateway{}}), - Entry("GatewayClass", &events.UpsertEvent{Resource: &v1beta1.GatewayClass{}}), - ) - - DescribeTable("Delete events", - func(e *events.DeleteEvent) { - fakeConf := state.Configuration{} - changed := true - fakeProcessor.ProcessReturns(changed, fakeConf, state.Statuses{}) - - fakeCfg := []byte("fake") - fakeGenerator.GenerateReturns(fakeCfg, config.Warnings{}) - eventCh <- e - - Eventually(fakeProcessor.CaptureDeleteChangeCallCount).Should(Equal(1)) - passedObj, passedNsName := fakeProcessor.CaptureDeleteChangeArgsForCall(0) - Expect(passedObj).Should(Equal(e.Type)) - Expect(passedNsName).Should(Equal(e.NamespacedName)) - - Eventually(fakeProcessor.ProcessCallCount).Should(Equal(1)) - - Eventually(fakeNginxFimeMgr.WriteHTTPServersConfigCallCount).Should(Equal(1)) - name, cfg := fakeNginxFimeMgr.WriteHTTPServersConfigArgsForCall(0) - Expect(name).Should(Equal("http-servers")) - Expect(cfg).Should(Equal(fakeCfg)) - - Eventually(fakeNginxRuntimeMgr.ReloadCallCount).Should(Equal(1)) - }, - Entry("HTTPRoute", &events.DeleteEvent{Type: &v1beta1.HTTPRoute{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "route"}}), - Entry("Gateway", &events.DeleteEvent{Type: &v1beta1.Gateway{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "gateway"}}), - Entry("GatewayClass", &events.DeleteEvent{Type: &v1beta1.GatewayClass{}, NamespacedName: types.NamespacedName{Name: "class"}}), - ) + go func() { + errorCh <- eventLoop.Start(ctx) + }() }) - Describe("Process Service events", func() { - BeforeEach(func() { - go start() - }) - - AfterEach(func() { - cancel() - - var err error - Eventually(errorCh).Should(Receive(&err)) - Expect(err).To(BeNil()) - }) - - It("should process upsert event", func() { - svc := &apiv1.Service{} - - eventCh <- &events.UpsertEvent{ - Resource: svc, - } + AfterEach(func() { + cancel() - Eventually(fakeServiceStore.UpsertCallCount).Should(Equal(1)) - Expect(fakeServiceStore.UpsertArgsForCall(0)).Should(Equal(svc)) - - Eventually(fakeProcessor.ProcessCallCount).Should(Equal(1)) - }) + var err error + Eventually(errorCh).Should(Receive(&err)) + Expect(err).To(BeNil()) + }) - It("should process delete event", func() { - nsname := types.NamespacedName{Namespace: "test", Name: "service"} + It("should process a single event", func() { + e := "event" - eventCh <- &events.DeleteEvent{ - NamespacedName: nsname, - Type: &apiv1.Service{}, - } + eventCh <- e - Eventually(fakeServiceStore.DeleteCallCount).Should(Equal(1)) - Expect(fakeServiceStore.DeleteArgsForCall(0)).Should(Equal(nsname)) + Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(1)) + _, batch := fakeHandler.HandleEventBatchArgsForCall(0) - Eventually(fakeProcessor.ProcessCallCount).Should(Equal(1)) - }) + var expectedBatch events.EventBatch = []interface{}{e} + Expect(batch).Should(Equal(expectedBatch)) }) - Describe("Process Secret events", func() { - BeforeEach(func() { - go start() - }) - - AfterEach(func() { - cancel() + It("should batch multiple events", func() { + firstHandleEventBatchCallInProgress := make(chan struct{}) + sentSecondAndThirdEvents := make(chan struct{}) - var err error - Eventually(errorCh).Should(Receive(&err)) - Expect(err).To(BeNil()) + // The func below will pause the handler goroutine while it is processing the batch with e1 until + // sentSecondAndThirdEvents is closed. This way we can add e2 and e3 to the current batch in the meantime. + fakeHandler.HandleEventBatchCalls(func(ctx context.Context, batch events.EventBatch) { + close(firstHandleEventBatchCallInProgress) + <-sentSecondAndThirdEvents }) - It("should process upsert event", func() { - secret := &apiv1.Secret{} + e1 := "event1" + e2 := "event2" + e3 := "event3" - eventCh <- &events.UpsertEvent{ - Resource: secret, - } + eventCh <- e1 - Eventually(fakeSecretStore.UpsertCallCount).Should(Equal(1)) - Expect(fakeSecretStore.UpsertArgsForCall(0)).Should(Equal(secret)) + // Making sure the handler goroutine started handling the batch with e1. + <-firstHandleEventBatchCallInProgress - Eventually(fakeProcessor.ProcessCallCount).Should(Equal(1)) - }) + eventCh <- e2 + eventCh <- e3 + // The event loop will add the e2 and e3 event to current batch before starting another handler goroutine. - It("should process delete event", func() { - nsname := types.NamespacedName{Namespace: "test", Name: "secret"} + fakeHandler.HandleEventBatchCalls(nil) - eventCh <- &events.DeleteEvent{ - NamespacedName: nsname, - Type: &apiv1.Secret{}, - } + // Unpause the handler goroutine so that it can handle the current batch. + close(sentSecondAndThirdEvents) - Eventually(fakeSecretStore.DeleteCallCount).Should(Equal(1)) - Expect(fakeSecretStore.DeleteArgsForCall(0)).Should(Equal(nsname)) + Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(2)) + _, batch := fakeHandler.HandleEventBatchArgsForCall(0) - Eventually(fakeProcessor.ProcessCallCount).Should(Equal(1)) - }) - }) + var expectedBatch events.EventBatch = []interface{}{e1} - Describe("Edge cases", func() { - AfterEach(func() { - cancel() - }) + // the first HandleEventBatch() call must have handled a batch with e1 + Expect(batch).Should(Equal(expectedBatch)) - DescribeTable("Edge cases for events", - func(e interface{}) { - go func() { - eventCh <- e - }() + _, batch = fakeHandler.HandleEventBatchArgsForCall(1) - Expect(start).Should(Panic()) - }, - Entry("should panic for an unknown event type", - &struct{}{}), - Entry("should panic for an unknown type of resource in upsert event", - &events.UpsertEvent{ - Resource: &unsupportedResource{}, - }), - Entry("should panic for an unknown type of resource in delete event", - &events.DeleteEvent{ - Type: &unsupportedResource{}, - }), - ) + expectedBatch = []interface{}{e2, e3} + // the second HandleEventBatch() call must have handled a batch with e2 and e3 + Expect(batch).Should(Equal(expectedBatch)) }) }) diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 4178523ac6..8c24c8beae 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -103,19 +103,23 @@ func Start(cfg config.Config) error { Clock: status.NewRealClock(), }) - eventLoop := events.NewEventLoop(events.EventLoopConfig{ + eventHandler := events.NewEventHandlerImpl(events.EventHandlerConfig{ Processor: processor, ServiceStore: serviceStore, SecretStore: secretStore, SecretMemoryManager: secretMemoryMgr, Generator: configGenerator, - EventCh: eventCh, - Logger: cfg.Logger.WithName("eventLoop"), + Logger: cfg.Logger.WithName("eventHandler"), NginxFileMgr: nginxFileMgr, NginxRuntimeMgr: nginxRuntimeMgr, StatusUpdater: statusUpdater, }) + eventLoop := events.NewEventLoop( + eventCh, + cfg.Logger.WithName("eventLoop"), + eventHandler) + err = mgr.Add(eventLoop) if err != nil { return fmt.Errorf("cannot register event loop: %w", err) diff --git a/internal/state/change_processor.go b/internal/state/change_processor.go index a266260be6..18040b9c7e 100644 --- a/internal/state/change_processor.go +++ b/internal/state/change_processor.go @@ -42,9 +42,14 @@ type ChangeProcessorConfig struct { } type ChangeProcessorImpl struct { - store *store - changed bool - cfg ChangeProcessorConfig + store *store + // storeChanged tells if the store is changed. + // The store is considered changed if: + // (1) Any of its resources was deleted. + // (2) A new resource was upserted. + // (3) An existing resource with the updated Generation was upserted. + storeChanged bool + cfg ChangeProcessorConfig lock sync.Mutex } @@ -67,7 +72,7 @@ func (c *ChangeProcessorImpl) CaptureUpsertChange(obj client.Object) { c.lock.Lock() defer c.lock.Unlock() - c.changed = true + resourceChanged := true switch o := obj.(type) { case *v1beta1.GatewayClass: @@ -76,33 +81,35 @@ func (c *ChangeProcessorImpl) CaptureUpsertChange(obj client.Object) { } // if the resource spec hasn't changed (its generation is the same), ignore the upsert if c.store.gc != nil && c.store.gc.Generation == o.Generation { - c.changed = false + resourceChanged = false } c.store.gc = o case *v1beta1.Gateway: // if the resource spec hasn't changed (its generation is the same), ignore the upsert prev, exist := c.store.gateways[getNamespacedName(obj)] if exist && o.Generation == prev.Generation { - c.changed = false + resourceChanged = false } c.store.gateways[getNamespacedName(obj)] = o case *v1beta1.HTTPRoute: // if the resource spec hasn't changed (its generation is the same), ignore the upsert prev, exist := c.store.httpRoutes[getNamespacedName(obj)] if exist && o.Generation == prev.Generation { - c.changed = false + resourceChanged = false } c.store.httpRoutes[getNamespacedName(obj)] = o default: panic(fmt.Errorf("ChangeProcessor doesn't support %T", obj)) } + + c.storeChanged = c.storeChanged || resourceChanged } func (c *ChangeProcessorImpl) CaptureDeleteChange(resourceType client.Object, nsname types.NamespacedName) { c.lock.Lock() defer c.lock.Unlock() - c.changed = true + c.storeChanged = true switch resourceType.(type) { case *v1beta1.GatewayClass: @@ -123,11 +130,11 @@ func (c *ChangeProcessorImpl) Process() (changed bool, conf Configuration, statu c.lock.Lock() defer c.lock.Unlock() - if !c.changed { + if !c.storeChanged { return false, conf, statuses } - c.changed = false + c.storeChanged = false graph := buildGraph( c.store, diff --git a/internal/state/change_processor_test.go b/internal/state/change_processor_test.go index de2d757a28..2c9646ce8b 100644 --- a/internal/state/change_processor_test.go +++ b/internal/state/change_processor_test.go @@ -920,6 +920,133 @@ var _ = Describe("ChangeProcessor", func() { }) }) + Describe("Multiple captured changes", func() { + var ( + processor *state.ChangeProcessorImpl + gcNsName, gwNsName, hrNsName types.NamespacedName + gc, gcUpdated *v1beta1.GatewayClass + gw1, gw1Updated, gw2 *v1beta1.Gateway + hr1, hr1Updated, hr2 *v1beta1.HTTPRoute + ) + + BeforeEach(OncePerOrdered, func() { + fakeSecretMemoryMgr := &statefakes.FakeSecretDiskMemoryManager{} + processor = state.NewChangeProcessorImpl(state.ChangeProcessorConfig{ + GatewayCtlrName: "test.controller", + GatewayClassName: "my-class", + SecretMemoryManager: fakeSecretMemoryMgr, + }) + + gcNsName = types.NamespacedName{Name: "my-class"} + + gc = &v1beta1.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: gcNsName.Name, + }, + Spec: v1beta1.GatewayClassSpec{ + ControllerName: "test.controller", + }, + } + + gcUpdated = gc.DeepCopy() + gcUpdated.Generation++ + + gwNsName = types.NamespacedName{Namespace: "test", Name: "gw-1"} + + gw1 = &v1beta1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: gwNsName.Namespace, + Name: gwNsName.Name, + }, + } + + gw1Updated = gw1.DeepCopy() + gw1Updated.Generation++ + + gw2 = gw1.DeepCopy() + gw2.Name = "gw-2" + + hrNsName = types.NamespacedName{Namespace: "test", Name: "hr-1"} + + hr1 = &v1beta1.HTTPRoute{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: hrNsName.Namespace, + Name: hrNsName.Name, + }, + } + + hr1Updated = hr1.DeepCopy() + hr1Updated.Generation++ + + hr2 = hr1.DeepCopy() + hr2.Name = "hr-2" + }) + + Describe("Ensuring non-changing changes don't override previously changing changes", Ordered, func() { + // Changing change - a change that makes processor.Process() report changed + // Non-changing change - a change that doesn't do that + + // Note: in this test, we deliberately don't fully inspect the returned configuration and statuses + // -- this is done in 'Normal cases of processing changes' + + It("should report changed after multiple Upserts", func() { + processor.CaptureUpsertChange(gc) + processor.CaptureUpsertChange(gw1) + processor.CaptureUpsertChange(hr1) + + changed, _, _ := processor.Process() + Expect(changed).To(BeTrue()) + }) + + It("should report not changed after multiple Upserts of the resource with same generation", func() { + processor.CaptureUpsertChange(gc) + processor.CaptureUpsertChange(gw1) + processor.CaptureUpsertChange(hr1) + + changed, _, _ := processor.Process() + Expect(changed).To(BeFalse()) + }) + + It("should report changed after upserting updated resources followed by upserting same generations", func() { + // these are changing changes + processor.CaptureUpsertChange(gcUpdated) + processor.CaptureUpsertChange(gw1Updated) + processor.CaptureUpsertChange(hr1Updated) + + // there are non-changing changes + processor.CaptureUpsertChange(gcUpdated) + processor.CaptureUpsertChange(gw1Updated) + processor.CaptureUpsertChange(hr1Updated) + + changed, _, _ := processor.Process() + Expect(changed).To(BeTrue()) + }) + + It("should report changed after upserting new resources", func() { + // we can't have a second GatewayClass, so we don't add it + processor.CaptureUpsertChange(gw2) + processor.CaptureUpsertChange(hr2) + + changed, _, _ := processor.Process() + Expect(changed).To(BeTrue()) + }) + + It("should report changed after deleting resources followed by upserting same generations of new resources", func() { + // these are changing changes + processor.CaptureDeleteChange(&v1beta1.GatewayClass{}, gcNsName) + processor.CaptureDeleteChange(&v1beta1.Gateway{}, gwNsName) + processor.CaptureDeleteChange(&v1beta1.HTTPRoute{}, hrNsName) + + // these are non-changing changes + processor.CaptureUpsertChange(gw2) + processor.CaptureUpsertChange(hr2) + + changed, _, _ := processor.Process() + Expect(changed).To(BeTrue()) + }) + }) + }) + Describe("Edge cases with panic", func() { var processor state.ChangeProcessor var fakeSecretMemoryMgr *statefakes.FakeSecretDiskMemoryManager