From 199e3cb52c4aa67d3d9a85def281431fa55f1fb2 Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Tue, 9 Aug 2022 16:15:29 -0600 Subject: [PATCH 1/4] Process all relevant cluster resources at start Problem: At start, the Gateway generates the first NGINX configuration (and a few subsequent ones) before it processed all relevant cluster resources. Such configuration is incomplete and should not be generated. For clients it means they can see transient 404 errors when NGINX is configured with such configuration. Solution: When the Gateway starts, it will generate the first NGINX configuration based on processing all relevant cluster resources. --- .../events/eventsfakes/fake_cached_reader.go | 270 ++++++++++++++++++ .../fake_first_event_batch_preparer.go | 117 ++++++++ internal/events/first_eventbatch_preparer.go | 105 +++++++ .../events/first_eventbatch_preparer_test.go | 157 ++++++++++ internal/events/handler.go | 1 + internal/events/loop.go | 43 ++- internal/events/loop_test.go | 150 ++++++---- internal/manager/manager.go | 5 +- internal/state/change_processor.go | 5 +- 9 files changed, 786 insertions(+), 67 deletions(-) create mode 100644 internal/events/eventsfakes/fake_cached_reader.go create mode 100644 internal/events/eventsfakes/fake_first_event_batch_preparer.go create mode 100644 internal/events/first_eventbatch_preparer.go create mode 100644 internal/events/first_eventbatch_preparer_test.go diff --git a/internal/events/eventsfakes/fake_cached_reader.go b/internal/events/eventsfakes/fake_cached_reader.go new file mode 100644 index 0000000000..68df58036e --- /dev/null +++ b/internal/events/eventsfakes/fake_cached_reader.go @@ -0,0 +1,270 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package eventsfakes + +import ( + "context" + "sync" + + "github.com/nginxinc/nginx-kubernetes-gateway/internal/events" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type FakeCachedReader struct { + GetStub func(context.Context, types.NamespacedName, client.Object) error + getMutex sync.RWMutex + getArgsForCall []struct { + arg1 context.Context + arg2 types.NamespacedName + arg3 client.Object + } + getReturns struct { + result1 error + } + getReturnsOnCall map[int]struct { + result1 error + } + ListStub func(context.Context, client.ObjectList, ...client.ListOption) error + listMutex sync.RWMutex + listArgsForCall []struct { + arg1 context.Context + arg2 client.ObjectList + arg3 []client.ListOption + } + listReturns struct { + result1 error + } + listReturnsOnCall map[int]struct { + result1 error + } + WaitForCacheSyncStub func(context.Context) bool + waitForCacheSyncMutex sync.RWMutex + waitForCacheSyncArgsForCall []struct { + arg1 context.Context + } + waitForCacheSyncReturns struct { + result1 bool + } + waitForCacheSyncReturnsOnCall map[int]struct { + result1 bool + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeCachedReader) Get(arg1 context.Context, arg2 types.NamespacedName, arg3 client.Object) error { + fake.getMutex.Lock() + ret, specificReturn := fake.getReturnsOnCall[len(fake.getArgsForCall)] + fake.getArgsForCall = append(fake.getArgsForCall, struct { + arg1 context.Context + arg2 types.NamespacedName + arg3 client.Object + }{arg1, arg2, arg3}) + stub := fake.GetStub + fakeReturns := fake.getReturns + fake.recordInvocation("Get", []interface{}{arg1, arg2, arg3}) + fake.getMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeCachedReader) GetCallCount() int { + fake.getMutex.RLock() + defer fake.getMutex.RUnlock() + return len(fake.getArgsForCall) +} + +func (fake *FakeCachedReader) GetCalls(stub func(context.Context, types.NamespacedName, client.Object) error) { + fake.getMutex.Lock() + defer fake.getMutex.Unlock() + fake.GetStub = stub +} + +func (fake *FakeCachedReader) GetArgsForCall(i int) (context.Context, types.NamespacedName, client.Object) { + fake.getMutex.RLock() + defer fake.getMutex.RUnlock() + argsForCall := fake.getArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeCachedReader) GetReturns(result1 error) { + fake.getMutex.Lock() + defer fake.getMutex.Unlock() + fake.GetStub = nil + fake.getReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeCachedReader) GetReturnsOnCall(i int, result1 error) { + fake.getMutex.Lock() + defer fake.getMutex.Unlock() + fake.GetStub = nil + if fake.getReturnsOnCall == nil { + fake.getReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.getReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeCachedReader) List(arg1 context.Context, arg2 client.ObjectList, arg3 ...client.ListOption) error { + fake.listMutex.Lock() + ret, specificReturn := fake.listReturnsOnCall[len(fake.listArgsForCall)] + fake.listArgsForCall = append(fake.listArgsForCall, struct { + arg1 context.Context + arg2 client.ObjectList + arg3 []client.ListOption + }{arg1, arg2, arg3}) + stub := fake.ListStub + fakeReturns := fake.listReturns + fake.recordInvocation("List", []interface{}{arg1, arg2, arg3}) + fake.listMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3...) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeCachedReader) ListCallCount() int { + fake.listMutex.RLock() + defer fake.listMutex.RUnlock() + return len(fake.listArgsForCall) +} + +func (fake *FakeCachedReader) ListCalls(stub func(context.Context, client.ObjectList, ...client.ListOption) error) { + fake.listMutex.Lock() + defer fake.listMutex.Unlock() + fake.ListStub = stub +} + +func (fake *FakeCachedReader) ListArgsForCall(i int) (context.Context, client.ObjectList, []client.ListOption) { + fake.listMutex.RLock() + defer fake.listMutex.RUnlock() + argsForCall := fake.listArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeCachedReader) ListReturns(result1 error) { + fake.listMutex.Lock() + defer fake.listMutex.Unlock() + fake.ListStub = nil + fake.listReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeCachedReader) ListReturnsOnCall(i int, result1 error) { + fake.listMutex.Lock() + defer fake.listMutex.Unlock() + fake.ListStub = nil + if fake.listReturnsOnCall == nil { + fake.listReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.listReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeCachedReader) WaitForCacheSync(arg1 context.Context) bool { + fake.waitForCacheSyncMutex.Lock() + ret, specificReturn := fake.waitForCacheSyncReturnsOnCall[len(fake.waitForCacheSyncArgsForCall)] + fake.waitForCacheSyncArgsForCall = append(fake.waitForCacheSyncArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.WaitForCacheSyncStub + fakeReturns := fake.waitForCacheSyncReturns + fake.recordInvocation("WaitForCacheSync", []interface{}{arg1}) + fake.waitForCacheSyncMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeCachedReader) WaitForCacheSyncCallCount() int { + fake.waitForCacheSyncMutex.RLock() + defer fake.waitForCacheSyncMutex.RUnlock() + return len(fake.waitForCacheSyncArgsForCall) +} + +func (fake *FakeCachedReader) WaitForCacheSyncCalls(stub func(context.Context) bool) { + fake.waitForCacheSyncMutex.Lock() + defer fake.waitForCacheSyncMutex.Unlock() + fake.WaitForCacheSyncStub = stub +} + +func (fake *FakeCachedReader) WaitForCacheSyncArgsForCall(i int) context.Context { + fake.waitForCacheSyncMutex.RLock() + defer fake.waitForCacheSyncMutex.RUnlock() + argsForCall := fake.waitForCacheSyncArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeCachedReader) WaitForCacheSyncReturns(result1 bool) { + fake.waitForCacheSyncMutex.Lock() + defer fake.waitForCacheSyncMutex.Unlock() + fake.WaitForCacheSyncStub = nil + fake.waitForCacheSyncReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeCachedReader) WaitForCacheSyncReturnsOnCall(i int, result1 bool) { + fake.waitForCacheSyncMutex.Lock() + defer fake.waitForCacheSyncMutex.Unlock() + fake.WaitForCacheSyncStub = nil + if fake.waitForCacheSyncReturnsOnCall == nil { + fake.waitForCacheSyncReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.waitForCacheSyncReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + +func (fake *FakeCachedReader) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.getMutex.RLock() + defer fake.getMutex.RUnlock() + fake.listMutex.RLock() + defer fake.listMutex.RUnlock() + fake.waitForCacheSyncMutex.RLock() + defer fake.waitForCacheSyncMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeCachedReader) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ events.CachedReader = new(FakeCachedReader) diff --git a/internal/events/eventsfakes/fake_first_event_batch_preparer.go b/internal/events/eventsfakes/fake_first_event_batch_preparer.go new file mode 100644 index 0000000000..e35585ddac --- /dev/null +++ b/internal/events/eventsfakes/fake_first_event_batch_preparer.go @@ -0,0 +1,117 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package eventsfakes + +import ( + "context" + "sync" + + "github.com/nginxinc/nginx-kubernetes-gateway/internal/events" +) + +type FakeFirstEventBatchPreparer struct { + PrepareStub func(context.Context) (events.EventBatch, error) + prepareMutex sync.RWMutex + prepareArgsForCall []struct { + arg1 context.Context + } + prepareReturns struct { + result1 events.EventBatch + result2 error + } + prepareReturnsOnCall map[int]struct { + result1 events.EventBatch + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeFirstEventBatchPreparer) Prepare(arg1 context.Context) (events.EventBatch, error) { + fake.prepareMutex.Lock() + ret, specificReturn := fake.prepareReturnsOnCall[len(fake.prepareArgsForCall)] + fake.prepareArgsForCall = append(fake.prepareArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.PrepareStub + fakeReturns := fake.prepareReturns + fake.recordInvocation("Prepare", []interface{}{arg1}) + fake.prepareMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeFirstEventBatchPreparer) PrepareCallCount() int { + fake.prepareMutex.RLock() + defer fake.prepareMutex.RUnlock() + return len(fake.prepareArgsForCall) +} + +func (fake *FakeFirstEventBatchPreparer) PrepareCalls(stub func(context.Context) (events.EventBatch, error)) { + fake.prepareMutex.Lock() + defer fake.prepareMutex.Unlock() + fake.PrepareStub = stub +} + +func (fake *FakeFirstEventBatchPreparer) PrepareArgsForCall(i int) context.Context { + fake.prepareMutex.RLock() + defer fake.prepareMutex.RUnlock() + argsForCall := fake.prepareArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeFirstEventBatchPreparer) PrepareReturns(result1 events.EventBatch, result2 error) { + fake.prepareMutex.Lock() + defer fake.prepareMutex.Unlock() + fake.PrepareStub = nil + fake.prepareReturns = struct { + result1 events.EventBatch + result2 error + }{result1, result2} +} + +func (fake *FakeFirstEventBatchPreparer) PrepareReturnsOnCall(i int, result1 events.EventBatch, result2 error) { + fake.prepareMutex.Lock() + defer fake.prepareMutex.Unlock() + fake.PrepareStub = nil + if fake.prepareReturnsOnCall == nil { + fake.prepareReturnsOnCall = make(map[int]struct { + result1 events.EventBatch + result2 error + }) + } + fake.prepareReturnsOnCall[i] = struct { + result1 events.EventBatch + result2 error + }{result1, result2} +} + +func (fake *FakeFirstEventBatchPreparer) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.prepareMutex.RLock() + defer fake.prepareMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeFirstEventBatchPreparer) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ events.FirstEventBatchPreparer = new(FakeFirstEventBatchPreparer) diff --git a/internal/events/first_eventbatch_preparer.go b/internal/events/first_eventbatch_preparer.go new file mode 100644 index 0000000000..6e0196ca96 --- /dev/null +++ b/internal/events/first_eventbatch_preparer.go @@ -0,0 +1,105 @@ +package events + +import ( + "context" + "fmt" + + apiv1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/gateway-api/apis/v1beta1" +) + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . FirstEventBatchPreparer + +// FirstEventBatchPreparer prepares the first batch of events to be processed by the EventHandler. +// The first batch includes the UpsertEvents for all relevant resources in the cluster. +type FirstEventBatchPreparer interface { + // Prepare prepares the first event batch. + Prepare(ctx context.Context) (EventBatch, error) +} + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . CachedReader + +// CachedReader allows getting and listing resources from a cache. +// This interface is introduced for testing to mock a subset of methods from +// sigs.k8s.io/controller-runtime/pkg/cache.Cache. +type CachedReader interface { + WaitForCacheSync(ctx context.Context) bool + Get(ctx context.Context, key client.ObjectKey, obj client.Object) error + List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error +} + +// FirstEventBatchPreparerImpl is an implementation of FirstEventBatchPreparer. +type FirstEventBatchPreparerImpl struct { + reader CachedReader + gcName string +} + +// NewFirstEventBatchPreparerImpl creates a new FirstEventBatchPreparerImpl. +func NewFirstEventBatchPreparerImpl(reader CachedReader, gcName string) *FirstEventBatchPreparerImpl { + return &FirstEventBatchPreparerImpl{ + reader: reader, + gcName: gcName, + } +} + +func (p *FirstEventBatchPreparerImpl) Prepare(ctx context.Context) (EventBatch, error) { + synced := p.reader.WaitForCacheSync(ctx) + if !synced { + return nil, fmt.Errorf("cache is not synced") + } + + var gc v1beta1.GatewayClass + gcExist := true + err := p.reader.Get(ctx, types.NamespacedName{Name: p.gcName}, &gc) + if err != nil { + if apierrors.IsNotFound(err) { + gcExist = false + } else { + return nil, err + } + } + + var svcList apiv1.ServiceList + var secretList apiv1.SecretList + var gwList v1beta1.GatewayList + var hrList v1beta1.HTTPRouteList + + objLists := []client.ObjectList{&svcList, &secretList, &gwList, &hrList} + for _, list := range objLists { + err := p.reader.List(ctx, list) + if err != nil { + return nil, err + } + } + + gcCount := 0 + if gcExist { + gcCount = 1 + } + + batch := make([]interface{}, 0, gcCount+len(svcList.Items)+len(secretList.Items)+len(gwList.Items)+len(hrList.Items)) + + // Note: the order of the events doesn't matter. + + if gcExist { + batch = append(batch, &UpsertEvent{Resource: &gc}) + } + + for i := range svcList.Items { + batch = append(batch, &UpsertEvent{Resource: &svcList.Items[i]}) + } + for i := range secretList.Items { + batch = append(batch, &UpsertEvent{Resource: &secretList.Items[i]}) + } + for i := range gwList.Items { + batch = append(batch, &UpsertEvent{Resource: &gwList.Items[i]}) + } + for i := range hrList.Items { + batch = append(batch, &UpsertEvent{Resource: &hrList.Items[i]}) + } + + return batch, nil +} diff --git a/internal/events/first_eventbatch_preparer_test.go b/internal/events/first_eventbatch_preparer_test.go new file mode 100644 index 0000000000..eed125ed5a --- /dev/null +++ b/internal/events/first_eventbatch_preparer_test.go @@ -0,0 +1,157 @@ +package events_test + +import ( + "context" + "errors" + "fmt" + "reflect" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + apiv1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/nginxinc/nginx-kubernetes-gateway/internal/events" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/events/eventsfakes" +) + +var _ = Describe("FirstEventBatchPreparer", func() { + var ( + fakeReader *eventsfakes.FakeCachedReader + preparer *events.FirstEventBatchPreparerImpl + ) + + const gcName = "my-class" + + BeforeEach(func() { + fakeReader = &eventsfakes.FakeCachedReader{} + preparer = events.NewFirstEventBatchPreparerImpl(fakeReader, gcName) + }) + + Describe("Normal cases", func() { + BeforeEach(func() { + fakeReader.WaitForCacheSyncReturns(true) + }) + + AfterEach(func() { + Expect(fakeReader.GetCallCount()).Should(Equal(1)) + Expect(fakeReader.ListCallCount()).Should(Equal(4)) + }) + + It("should prepare zero events when resources don't exist", func() { + fakeReader.GetCalls(func(ctx context.Context, name types.NamespacedName, object client.Object) error { + Expect(name).Should(Equal(types.NamespacedName{Name: gcName})) + Expect(object).Should(BeAssignableToTypeOf(&v1beta1.GatewayClass{})) + + return apierrors.NewNotFound(schema.GroupResource{}, "test") + }) + fakeReader.ListReturns(nil) + + batch, err := preparer.Prepare(context.Background()) + + Expect(batch).Should(BeEmpty()) + Expect(err).Should(BeNil()) + }) + + It("should prepare one event for each resource type", func() { + const resourceName = "resource" + + gatewayClass := v1beta1.GatewayClass{ObjectMeta: metav1.ObjectMeta{Name: gcName}} + + fakeReader.GetCalls(func(ctx context.Context, name types.NamespacedName, object client.Object) error { + Expect(name).Should(Equal(types.NamespacedName{Name: gcName})) + Expect(object).Should(BeAssignableToTypeOf(&v1beta1.GatewayClass{})) + + reflect.Indirect(reflect.ValueOf(object)).Set(reflect.Indirect(reflect.ValueOf(&gatewayClass))) + return nil + }) + + service := apiv1.Service{ObjectMeta: metav1.ObjectMeta{Name: resourceName}} + secret := apiv1.Secret{ObjectMeta: metav1.ObjectMeta{Name: resourceName}} + gateway := v1beta1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: resourceName}} + httpRoute := v1beta1.HTTPRoute{ObjectMeta: metav1.ObjectMeta{Name: resourceName}} + + fakeReader.ListCalls(func(ctx context.Context, list client.ObjectList, option ...client.ListOption) error { + Expect(option).To(BeEmpty()) + + switch typedList := list.(type) { + case *apiv1.ServiceList: + typedList.Items = append(typedList.Items, service) + case *apiv1.SecretList: + typedList.Items = append(typedList.Items, secret) + case *v1beta1.GatewayList: + typedList.Items = append(typedList.Items, gateway) + case *v1beta1.HTTPRouteList: + typedList.Items = append(typedList.Items, httpRoute) + default: + Fail(fmt.Sprintf("unknown type: %T", typedList)) + } + + return nil + }) + + expectedBatch := events.EventBatch{ + &events.UpsertEvent{Resource: &gatewayClass}, + &events.UpsertEvent{Resource: &service}, + &events.UpsertEvent{Resource: &secret}, + &events.UpsertEvent{Resource: &gateway}, + &events.UpsertEvent{Resource: &httpRoute}, + } + + batch, err := preparer.Prepare(context.Background()) + + Expect(batch).Should(Equal(expectedBatch)) + Expect(err).Should(BeNil()) + }) + }) + + Describe("Edge cases", func() { + It("should fail when cache is not synced", func() { + fakeReader.WaitForCacheSyncReturns(false) + + batch, err := preparer.Prepare(context.Background()) + Expect(batch).To(BeNil()) + Expect(err).To(MatchError("cache is not synced")) + }) + + DescribeTable("CachedReader returns errors", + func(obj client.Object) { + fakeReader.WaitForCacheSyncReturns(true) + + readerError := errors.New("test") + + fakeReader.GetReturns(nil) + fakeReader.ListReturns(nil) + + switch obj.(type) { + case *v1beta1.GatewayClass: + fakeReader.GetReturns(readerError) + case *apiv1.Service: + fakeReader.ListReturnsOnCall(0, readerError) + case *apiv1.Secret: + fakeReader.ListReturnsOnCall(1, readerError) + case *v1beta1.Gateway: + fakeReader.ListReturnsOnCall(2, readerError) + case *v1beta1.HTTPRoute: + fakeReader.ListReturnsOnCall(3, readerError) + default: + Fail(fmt.Sprintf("Unknown type: %T", obj)) + } + + batch, err := preparer.Prepare(context.Background()) + Expect(batch).To(BeNil()) + Expect(err).To(MatchError(readerError)) + }, + Entry("Service", &apiv1.Service{}), + Entry("Secret", &apiv1.Secret{}), + Entry("GatewayClass", &v1beta1.GatewayClass{}), + Entry("Gateway", &v1beta1.Gateway{}), + Entry("HTTPRoute", &v1beta1.HTTPRoute{}), + ) + }) +}) diff --git a/internal/events/handler.go b/internal/events/handler.go index a3a40499a1..8abc617104 100644 --- a/internal/events/handler.go +++ b/internal/events/handler.go @@ -20,6 +20,7 @@ import ( // EventHandler handle events. type EventHandler interface { // HandleEventBatch handles a batch of events. + // EventBatch can include duplicated events. HandleEventBatch(ctx context.Context, batch EventBatch) } diff --git a/internal/events/loop.go b/internal/events/loop.go index 5c43d1d6cf..0215cea61a 100644 --- a/internal/events/loop.go +++ b/internal/events/loop.go @@ -25,24 +25,27 @@ type EventLoop struct { eventCh <-chan interface{} logger logr.Logger handler EventHandler + + preparer FirstEventBatchPreparer } // NewEventLoop creates a new EventLoop. -func NewEventLoop(eventCh <-chan interface{}, logger logr.Logger, handler EventHandler) *EventLoop { +func NewEventLoop( + eventCh <-chan interface{}, + logger logr.Logger, + handler EventHandler, + preparer FirstEventBatchPreparer, +) *EventLoop { return &EventLoop{ - eventCh: eventCh, - logger: logger, - handler: handler, + eventCh: eventCh, + logger: logger, + handler: handler, + preparer: preparer, } } // Start starts the EventLoop. // This method will block until the EventLoop stops, which will happen after the ctx is closed. -// -// FIXME(pleshakov). Ensure that when the Gateway starts, the first time it generates configuration for NGINX, -// it has a complete view of the cluster resources. For example, when the Gateway processes a Gateway resource -// with a listener with TLS termination enabled (the listener references a TLS Secret), the Gateway knows about the secret. -// This way the Gateway will not produce any incomplete transient configuration at the start. func (el *EventLoop) Start(ctx context.Context) error { // The current batch. var batch EventBatch @@ -61,6 +64,28 @@ func (el *EventLoop) Start(ctx context.Context) error { handlingDone <- struct{}{} } + // Prepare the fist event batch, which includes the UpsertEvents for all relevant cluster resources. + // This is necessary so that the first time the EventHandler generates NGINX configuration, it derives it from + // a complete view of the cluster. Otherwise, the handler would generate incomplete configuration, which can lead + // to clients seeing transient 404 errors from NGINX and incorrect statuses of the resources updated by the Gateway. + // + // Note: + // After the handler goroutine handles the first batch, the loop will start receiving events from + // the controllers, which at the beginning will be UpsertEvents with the relevant cluster resources - i.e. they + // will be duplicates of the events in the first batch. This is OK, because it is expected that the EventHandler will + // not trigger any reconfiguration after receiving an upsert for an existing resource with the same Generation. + + var err error + batch, err = el.preparer.Prepare(ctx) + if err != nil { + return fmt.Errorf("failed to prepare the first batch: %w", err) + } + + // Handle the first batch + go handle(ctx, batch) + batch = make([]interface{}, 0) + handling = true + // Note: at any point of time, no more than one batch is currently being handled. // The event loop diff --git a/internal/events/loop_test.go b/internal/events/loop_test.go index 0135ab93e9..b6ae87fb9c 100644 --- a/internal/events/loop_test.go +++ b/internal/events/loop_test.go @@ -2,6 +2,7 @@ package events_test import ( "context" + "errors" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -13,89 +14,130 @@ import ( var _ = Describe("EventLoop", func() { var ( - fakeHandler *eventsfakes.FakeEventHandler - eventCh chan interface{} - eventLoop *events.EventLoop - cancel context.CancelFunc - errorCh chan error + fakeHandler *eventsfakes.FakeEventHandler + eventCh chan interface{} + fakePreparer *eventsfakes.FakeFirstEventBatchPreparer + eventLoop *events.EventLoop + ctx context.Context + cancel context.CancelFunc + errorCh chan error ) BeforeEach(func() { fakeHandler = &eventsfakes.FakeEventHandler{} eventCh = make(chan interface{}) + fakePreparer = &eventsfakes.FakeFirstEventBatchPreparer{} - eventLoop = events.NewEventLoop(eventCh, zap.New(), fakeHandler) + eventLoop = events.NewEventLoop(eventCh, zap.New(), fakeHandler, fakePreparer) - var ctx context.Context ctx, cancel = context.WithCancel(context.Background()) errorCh = make(chan error) - - go func() { - errorCh <- eventLoop.Start(ctx) - }() }) - AfterEach(func() { - cancel() + Describe("Normal processing", func() { + BeforeEach(func() { + batch := events.EventBatch{ + "event0", + } + fakePreparer.PrepareReturns(batch, nil) - var err error - Eventually(errorCh).Should(Receive(&err)) - Expect(err).To(BeNil()) - }) + go func() { + errorCh <- eventLoop.Start(ctx) + }() - It("should process a single event", func() { - e := "event" + // Ensure the first batch is handled + Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(1)) + _, batch = fakeHandler.HandleEventBatchArgsForCall(0) - eventCh <- e + var expectedBatch events.EventBatch = []interface{}{"event0"} + Expect(batch).Should(Equal(expectedBatch)) + }) - Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(1)) - _, batch := fakeHandler.HandleEventBatchArgsForCall(0) + AfterEach(func() { + cancel() - var expectedBatch events.EventBatch = []interface{}{e} - Expect(batch).Should(Equal(expectedBatch)) - }) + var err error + Eventually(errorCh).Should(Receive(&err)) + Expect(err).To(BeNil()) + }) - It("should batch multiple events", func() { - firstHandleEventBatchCallInProgress := make(chan struct{}) - sentSecondAndThirdEvents := make(chan struct{}) + // Because BeforeEach() creates the first batch and waits for it to be handled, in the tests below + // HandleEventBatchCallCount() is already 1. - // The func below will pause the handler goroutine while it is processing the batch with e1 until - // sentSecondAndThirdEvents is closed. This way we can add e2 and e3 to the current batch in the meantime. - fakeHandler.HandleEventBatchCalls(func(ctx context.Context, batch events.EventBatch) { - close(firstHandleEventBatchCallInProgress) - <-sentSecondAndThirdEvents + It("should process a single event", func() { + e := "event" + + eventCh <- e + + Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(2)) + _, batch := fakeHandler.HandleEventBatchArgsForCall(1) + + var expectedBatch events.EventBatch = []interface{}{e} + Expect(batch).Should(Equal(expectedBatch)) }) - e1 := "event1" - e2 := "event2" - e3 := "event3" + It("should batch multiple events", func() { + firstHandleEventBatchCallInProgress := make(chan struct{}) + sentSecondAndThirdEvents := make(chan struct{}) + + // The func below will pause the handler goroutine while it is processing the batch with e1 until + // sentSecondAndThirdEvents is closed. This way we can add e2 and e3 to the current batch in the meantime. + fakeHandler.HandleEventBatchCalls(func(ctx context.Context, batch events.EventBatch) { + close(firstHandleEventBatchCallInProgress) + <-sentSecondAndThirdEvents + }) + + e1 := "event1" + e2 := "event2" + e3 := "event3" - eventCh <- e1 + eventCh <- e1 - // Making sure the handler goroutine started handling the batch with e1. - <-firstHandleEventBatchCallInProgress + // Making sure the handler goroutine started handling the batch with e1. + <-firstHandleEventBatchCallInProgress - eventCh <- e2 - eventCh <- e3 - // The event loop will add the e2 and e3 event to current batch before starting another handler goroutine. + eventCh <- e2 + eventCh <- e3 + // The event loop will add the e2 and e3 event to current batch before starting another handler goroutine. - fakeHandler.HandleEventBatchCalls(nil) + fakeHandler.HandleEventBatchCalls(nil) - // Unpause the handler goroutine so that it can handle the current batch. - close(sentSecondAndThirdEvents) + // Unpause the handler goroutine so that it can handle the current batch. + close(sentSecondAndThirdEvents) - Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(2)) - _, batch := fakeHandler.HandleEventBatchArgsForCall(0) + Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(3)) + _, batch := fakeHandler.HandleEventBatchArgsForCall(1) - var expectedBatch events.EventBatch = []interface{}{e1} + var expectedBatch events.EventBatch = []interface{}{e1} - // the first HandleEventBatch() call must have handled a batch with e1 - Expect(batch).Should(Equal(expectedBatch)) + // the first HandleEventBatch() call must have handled a batch with e1 + Expect(batch).Should(Equal(expectedBatch)) - _, batch = fakeHandler.HandleEventBatchArgsForCall(1) + _, batch = fakeHandler.HandleEventBatchArgsForCall(2) - expectedBatch = []interface{}{e2, e3} - // the second HandleEventBatch() call must have handled a batch with e2 and e3 - Expect(batch).Should(Equal(expectedBatch)) + expectedBatch = []interface{}{e2, e3} + // the second HandleEventBatch() call must have handled a batch with e2 and e3 + Expect(batch).Should(Equal(expectedBatch)) + }) + }) + + Describe("Edge cases", func() { + It("should return error when preparer returns error without blocking", func() { + preparerError := errors.New("test") + fakePreparer.PrepareReturns(events.EventBatch{}, preparerError) + + err := eventLoop.Start(ctx) + + Expect(err).Should(MatchError(preparerError)) + }) + + It("should return nil when started with canceled context without blocking", func() { + fakePreparer.PrepareReturns(events.EventBatch{}, nil) + + cancel() + err := eventLoop.Start(ctx) + + Expect(err).Should(BeNil()) + }) }) }) diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 8c24c8beae..e8ba48dbc4 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -115,10 +115,13 @@ func Start(cfg config.Config) error { StatusUpdater: statusUpdater, }) + firstBatchPreparer := events.NewFirstEventBatchPreparerImpl(mgr.GetCache(), cfg.GatewayClassName) + eventLoop := events.NewEventLoop( eventCh, cfg.Logger.WithName("eventLoop"), - eventHandler) + eventHandler, + firstBatchPreparer) err = mgr.Add(eventLoop) if err != nil { diff --git a/internal/state/change_processor.go b/internal/state/change_processor.go index 18040b9c7e..3b83b44956 100644 --- a/internal/state/change_processor.go +++ b/internal/state/change_processor.go @@ -12,7 +12,7 @@ import ( //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . ChangeProcessor // ChangeProcessor processes the changes to resources producing the internal representation of the Gateway configuration. -// ChangeProcessor only supports one Gateway resource. +// ChangeProcessor only supports one GatewayClass resource. type ChangeProcessor interface { // CaptureUpsertChange captures an upsert change to a resource. // It panics if the resource is of unsupported type or if the passed Gateway is different from the one this ChangeProcessor @@ -31,8 +31,6 @@ type ChangeProcessor interface { // ChangeProcessorConfig holds configuration parameters for ChangeProcessorImpl. type ChangeProcessorConfig struct { - // GatewayNsName is the namespaced name of the Gateway resource. - GatewayNsName types.NamespacedName // GatewayCtlrName is the name of the Gateway controller. GatewayCtlrName string // GatewayClassName is the name of the GatewayClass resource. @@ -41,6 +39,7 @@ type ChangeProcessorConfig struct { SecretMemoryManager SecretDiskMemoryManager } +// ChangeProcessorImpl is an implementation of ChangeProcessor. type ChangeProcessorImpl struct { store *store // storeChanged tells if the store is changed. From 83ff4be36c25cf5d710a22a06d197a4674972762 Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Tue, 16 Aug 2022 14:59:12 -0600 Subject: [PATCH 2/4] Introduce a function --- internal/events/loop.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/internal/events/loop.go b/internal/events/loop.go index 0215cea61a..23df1b866b 100644 --- a/internal/events/loop.go +++ b/internal/events/loop.go @@ -54,14 +54,20 @@ func (el *EventLoop) Start(ctx context.Context) error { // handlingDone is used to signal the completion of handling a batch. handlingDone := make(chan struct{}) - handle := func(ctx context.Context, batch EventBatch) { - el.logger.Info("Handling events from the batch", "total", len(batch)) + handleAndResetBatch := func() { + go func(batch EventBatch) { + el.logger.Info("Handling events from the batch", "total", len(batch)) - el.handler.HandleEventBatch(ctx, batch) + el.handler.HandleEventBatch(ctx, batch) - el.logger.Info("Finished handling the batch") + el.logger.Info("Finished handling the batch") + handlingDone <- struct{}{} + }(batch) - handlingDone <- struct{}{} + // FIXME(pleshakov): Making an entirely new buffer is inefficient and multiplies memory operations. + // Use a double-buffer approach - create two buffers and exchange them between the producer and consumer + // routines. NOTE: pass-by-reference, and reset buffer to length 0, but retain capacity. + batch = make([]interface{}, 0) } // Prepare the fist event batch, which includes the UpsertEvents for all relevant cluster resources. @@ -82,8 +88,7 @@ func (el *EventLoop) Start(ctx context.Context) error { } // Handle the first batch - go handle(ctx, batch) - batch = make([]interface{}, 0) + handleAndResetBatch() handling = true // Note: at any point of time, no more than one batch is currently being handled. @@ -110,11 +115,7 @@ func (el *EventLoop) Start(ctx context.Context) error { // Handle the current batch if no batch is being handled. if !handling { - go handle(ctx, batch) - // FIXME(pleshakov): Making an entirely new buffer is inefficient and multiplies memory operations. - // Use a double-buffer approach - create two buffers and exchange them between the producer and consumer - // routines. NOTE: pass-by-reference, and reset buffer to length 0, but retain capacity. - batch = make([]interface{}, 0) + handleAndResetBatch() handling = true } case <-handlingDone: @@ -122,8 +123,7 @@ func (el *EventLoop) Start(ctx context.Context) error { // Handle the current batch if it has at least one event. if len(batch) > 0 { - go handle(ctx, batch) - batch = make([]interface{}, 0) + handleAndResetBatch() handling = true } } From fe8baab438bde193e83f35b40b3294c2f1bc389a Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Wed, 17 Aug 2022 11:04:23 -0600 Subject: [PATCH 3/4] Remove unnecessary WaitForCacheSyncReturns --- .../{fake_cached_reader.go => fake_reader.go} | 106 +++--------------- internal/events/first_eventbatch_preparer.go | 24 ++-- .../events/first_eventbatch_preparer_test.go | 18 +-- 3 files changed, 26 insertions(+), 122 deletions(-) rename internal/events/eventsfakes/{fake_cached_reader.go => fake_reader.go} (53%) diff --git a/internal/events/eventsfakes/fake_cached_reader.go b/internal/events/eventsfakes/fake_reader.go similarity index 53% rename from internal/events/eventsfakes/fake_cached_reader.go rename to internal/events/eventsfakes/fake_reader.go index 68df58036e..6341c894fd 100644 --- a/internal/events/eventsfakes/fake_cached_reader.go +++ b/internal/events/eventsfakes/fake_reader.go @@ -10,7 +10,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -type FakeCachedReader struct { +type FakeReader struct { GetStub func(context.Context, types.NamespacedName, client.Object) error getMutex sync.RWMutex getArgsForCall []struct { @@ -37,22 +37,11 @@ type FakeCachedReader struct { listReturnsOnCall map[int]struct { result1 error } - WaitForCacheSyncStub func(context.Context) bool - waitForCacheSyncMutex sync.RWMutex - waitForCacheSyncArgsForCall []struct { - arg1 context.Context - } - waitForCacheSyncReturns struct { - result1 bool - } - waitForCacheSyncReturnsOnCall map[int]struct { - result1 bool - } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } -func (fake *FakeCachedReader) Get(arg1 context.Context, arg2 types.NamespacedName, arg3 client.Object) error { +func (fake *FakeReader) Get(arg1 context.Context, arg2 types.NamespacedName, arg3 client.Object) error { fake.getMutex.Lock() ret, specificReturn := fake.getReturnsOnCall[len(fake.getArgsForCall)] fake.getArgsForCall = append(fake.getArgsForCall, struct { @@ -73,26 +62,26 @@ func (fake *FakeCachedReader) Get(arg1 context.Context, arg2 types.NamespacedNam return fakeReturns.result1 } -func (fake *FakeCachedReader) GetCallCount() int { +func (fake *FakeReader) GetCallCount() int { fake.getMutex.RLock() defer fake.getMutex.RUnlock() return len(fake.getArgsForCall) } -func (fake *FakeCachedReader) GetCalls(stub func(context.Context, types.NamespacedName, client.Object) error) { +func (fake *FakeReader) GetCalls(stub func(context.Context, types.NamespacedName, client.Object) error) { fake.getMutex.Lock() defer fake.getMutex.Unlock() fake.GetStub = stub } -func (fake *FakeCachedReader) GetArgsForCall(i int) (context.Context, types.NamespacedName, client.Object) { +func (fake *FakeReader) GetArgsForCall(i int) (context.Context, types.NamespacedName, client.Object) { fake.getMutex.RLock() defer fake.getMutex.RUnlock() argsForCall := fake.getArgsForCall[i] return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeCachedReader) GetReturns(result1 error) { +func (fake *FakeReader) GetReturns(result1 error) { fake.getMutex.Lock() defer fake.getMutex.Unlock() fake.GetStub = nil @@ -101,7 +90,7 @@ func (fake *FakeCachedReader) GetReturns(result1 error) { }{result1} } -func (fake *FakeCachedReader) GetReturnsOnCall(i int, result1 error) { +func (fake *FakeReader) GetReturnsOnCall(i int, result1 error) { fake.getMutex.Lock() defer fake.getMutex.Unlock() fake.GetStub = nil @@ -115,7 +104,7 @@ func (fake *FakeCachedReader) GetReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeCachedReader) List(arg1 context.Context, arg2 client.ObjectList, arg3 ...client.ListOption) error { +func (fake *FakeReader) List(arg1 context.Context, arg2 client.ObjectList, arg3 ...client.ListOption) error { fake.listMutex.Lock() ret, specificReturn := fake.listReturnsOnCall[len(fake.listArgsForCall)] fake.listArgsForCall = append(fake.listArgsForCall, struct { @@ -136,26 +125,26 @@ func (fake *FakeCachedReader) List(arg1 context.Context, arg2 client.ObjectList, return fakeReturns.result1 } -func (fake *FakeCachedReader) ListCallCount() int { +func (fake *FakeReader) ListCallCount() int { fake.listMutex.RLock() defer fake.listMutex.RUnlock() return len(fake.listArgsForCall) } -func (fake *FakeCachedReader) ListCalls(stub func(context.Context, client.ObjectList, ...client.ListOption) error) { +func (fake *FakeReader) ListCalls(stub func(context.Context, client.ObjectList, ...client.ListOption) error) { fake.listMutex.Lock() defer fake.listMutex.Unlock() fake.ListStub = stub } -func (fake *FakeCachedReader) ListArgsForCall(i int) (context.Context, client.ObjectList, []client.ListOption) { +func (fake *FakeReader) ListArgsForCall(i int) (context.Context, client.ObjectList, []client.ListOption) { fake.listMutex.RLock() defer fake.listMutex.RUnlock() argsForCall := fake.listArgsForCall[i] return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeCachedReader) ListReturns(result1 error) { +func (fake *FakeReader) ListReturns(result1 error) { fake.listMutex.Lock() defer fake.listMutex.Unlock() fake.ListStub = nil @@ -164,7 +153,7 @@ func (fake *FakeCachedReader) ListReturns(result1 error) { }{result1} } -func (fake *FakeCachedReader) ListReturnsOnCall(i int, result1 error) { +func (fake *FakeReader) ListReturnsOnCall(i int, result1 error) { fake.listMutex.Lock() defer fake.listMutex.Unlock() fake.ListStub = nil @@ -178,76 +167,13 @@ func (fake *FakeCachedReader) ListReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeCachedReader) WaitForCacheSync(arg1 context.Context) bool { - fake.waitForCacheSyncMutex.Lock() - ret, specificReturn := fake.waitForCacheSyncReturnsOnCall[len(fake.waitForCacheSyncArgsForCall)] - fake.waitForCacheSyncArgsForCall = append(fake.waitForCacheSyncArgsForCall, struct { - arg1 context.Context - }{arg1}) - stub := fake.WaitForCacheSyncStub - fakeReturns := fake.waitForCacheSyncReturns - fake.recordInvocation("WaitForCacheSync", []interface{}{arg1}) - fake.waitForCacheSyncMutex.Unlock() - if stub != nil { - return stub(arg1) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeCachedReader) WaitForCacheSyncCallCount() int { - fake.waitForCacheSyncMutex.RLock() - defer fake.waitForCacheSyncMutex.RUnlock() - return len(fake.waitForCacheSyncArgsForCall) -} - -func (fake *FakeCachedReader) WaitForCacheSyncCalls(stub func(context.Context) bool) { - fake.waitForCacheSyncMutex.Lock() - defer fake.waitForCacheSyncMutex.Unlock() - fake.WaitForCacheSyncStub = stub -} - -func (fake *FakeCachedReader) WaitForCacheSyncArgsForCall(i int) context.Context { - fake.waitForCacheSyncMutex.RLock() - defer fake.waitForCacheSyncMutex.RUnlock() - argsForCall := fake.waitForCacheSyncArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeCachedReader) WaitForCacheSyncReturns(result1 bool) { - fake.waitForCacheSyncMutex.Lock() - defer fake.waitForCacheSyncMutex.Unlock() - fake.WaitForCacheSyncStub = nil - fake.waitForCacheSyncReturns = struct { - result1 bool - }{result1} -} - -func (fake *FakeCachedReader) WaitForCacheSyncReturnsOnCall(i int, result1 bool) { - fake.waitForCacheSyncMutex.Lock() - defer fake.waitForCacheSyncMutex.Unlock() - fake.WaitForCacheSyncStub = nil - if fake.waitForCacheSyncReturnsOnCall == nil { - fake.waitForCacheSyncReturnsOnCall = make(map[int]struct { - result1 bool - }) - } - fake.waitForCacheSyncReturnsOnCall[i] = struct { - result1 bool - }{result1} -} - -func (fake *FakeCachedReader) Invocations() map[string][][]interface{} { +func (fake *FakeReader) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() fake.getMutex.RLock() defer fake.getMutex.RUnlock() fake.listMutex.RLock() defer fake.listMutex.RUnlock() - fake.waitForCacheSyncMutex.RLock() - defer fake.waitForCacheSyncMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value @@ -255,7 +181,7 @@ func (fake *FakeCachedReader) Invocations() map[string][][]interface{} { return copiedInvocations } -func (fake *FakeCachedReader) recordInvocation(key string, args []interface{}) { +func (fake *FakeReader) recordInvocation(key string, args []interface{}) { fake.invocationsMutex.Lock() defer fake.invocationsMutex.Unlock() if fake.invocations == nil { @@ -267,4 +193,4 @@ func (fake *FakeCachedReader) recordInvocation(key string, args []interface{}) { fake.invocations[key] = append(fake.invocations[key], args) } -var _ events.CachedReader = new(FakeCachedReader) +var _ events.Reader = new(FakeReader) diff --git a/internal/events/first_eventbatch_preparer.go b/internal/events/first_eventbatch_preparer.go index 6e0196ca96..e011f8030c 100644 --- a/internal/events/first_eventbatch_preparer.go +++ b/internal/events/first_eventbatch_preparer.go @@ -2,7 +2,6 @@ package events import ( "context" - "fmt" apiv1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -20,25 +19,23 @@ type FirstEventBatchPreparer interface { Prepare(ctx context.Context) (EventBatch, error) } -//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . CachedReader +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Reader -// CachedReader allows getting and listing resources from a cache. -// This interface is introduced for testing to mock a subset of methods from -// sigs.k8s.io/controller-runtime/pkg/cache.Cache. -type CachedReader interface { - WaitForCacheSync(ctx context.Context) bool - Get(ctx context.Context, key client.ObjectKey, obj client.Object) error - List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error +// Reader allows getting and listing resources from a cache. +// This interface is introduced for testing to mock the methods from +// sigs.k8s.io/controller-runtime/pkg/client.Reader. +type Reader interface { + client.Reader } // FirstEventBatchPreparerImpl is an implementation of FirstEventBatchPreparer. type FirstEventBatchPreparerImpl struct { - reader CachedReader + reader Reader gcName string } // NewFirstEventBatchPreparerImpl creates a new FirstEventBatchPreparerImpl. -func NewFirstEventBatchPreparerImpl(reader CachedReader, gcName string) *FirstEventBatchPreparerImpl { +func NewFirstEventBatchPreparerImpl(reader Reader, gcName string) *FirstEventBatchPreparerImpl { return &FirstEventBatchPreparerImpl{ reader: reader, gcName: gcName, @@ -46,11 +43,6 @@ func NewFirstEventBatchPreparerImpl(reader CachedReader, gcName string) *FirstEv } func (p *FirstEventBatchPreparerImpl) Prepare(ctx context.Context) (EventBatch, error) { - synced := p.reader.WaitForCacheSync(ctx) - if !synced { - return nil, fmt.Errorf("cache is not synced") - } - var gc v1beta1.GatewayClass gcExist := true err := p.reader.Get(ctx, types.NamespacedName{Name: p.gcName}, &gc) diff --git a/internal/events/first_eventbatch_preparer_test.go b/internal/events/first_eventbatch_preparer_test.go index eed125ed5a..02e1581ca2 100644 --- a/internal/events/first_eventbatch_preparer_test.go +++ b/internal/events/first_eventbatch_preparer_test.go @@ -22,22 +22,18 @@ import ( var _ = Describe("FirstEventBatchPreparer", func() { var ( - fakeReader *eventsfakes.FakeCachedReader + fakeReader *eventsfakes.FakeReader preparer *events.FirstEventBatchPreparerImpl ) const gcName = "my-class" BeforeEach(func() { - fakeReader = &eventsfakes.FakeCachedReader{} + fakeReader = &eventsfakes.FakeReader{} preparer = events.NewFirstEventBatchPreparerImpl(fakeReader, gcName) }) Describe("Normal cases", func() { - BeforeEach(func() { - fakeReader.WaitForCacheSyncReturns(true) - }) - AfterEach(func() { Expect(fakeReader.GetCallCount()).Should(Equal(1)) Expect(fakeReader.ListCallCount()).Should(Equal(4)) @@ -111,18 +107,8 @@ var _ = Describe("FirstEventBatchPreparer", func() { }) Describe("Edge cases", func() { - It("should fail when cache is not synced", func() { - fakeReader.WaitForCacheSyncReturns(false) - - batch, err := preparer.Prepare(context.Background()) - Expect(batch).To(BeNil()) - Expect(err).To(MatchError("cache is not synced")) - }) - DescribeTable("CachedReader returns errors", func(obj client.Object) { - fakeReader.WaitForCacheSyncReturns(true) - readerError := errors.New("test") fakeReader.GetReturns(nil) From 82414cbf274585caae66223bf33409f35df6e4d3 Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Wed, 17 Aug 2022 12:59:44 -0600 Subject: [PATCH 4/4] Improve FirstEventBatchPrepareImpl --- internal/events/first_eventbatch_preparer.go | 100 ++++++++++-------- .../events/first_eventbatch_preparer_test.go | 85 ++++++++++----- internal/manager/manager.go | 15 ++- 3 files changed, 127 insertions(+), 73 deletions(-) diff --git a/internal/events/first_eventbatch_preparer.go b/internal/events/first_eventbatch_preparer.go index e011f8030c..e91eeed5db 100644 --- a/internal/events/first_eventbatch_preparer.go +++ b/internal/events/first_eventbatch_preparer.go @@ -2,12 +2,13 @@ package events import ( "context" + "fmt" - apiv1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/gateway-api/apis/v1beta1" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . FirstEventBatchPreparer @@ -28,69 +29,82 @@ type Reader interface { client.Reader } +// EachListItemFunc lists each item of a client.ObjectList. +// It is from k8s.io/apimachinery/pkg/api/meta. +type EachListItemFunc func(obj runtime.Object, fn func(runtime.Object) error) error + // FirstEventBatchPreparerImpl is an implementation of FirstEventBatchPreparer. type FirstEventBatchPreparerImpl struct { - reader Reader - gcName string + reader Reader + objects []client.Object + objectLists []client.ObjectList + eachListItem EachListItemFunc } // NewFirstEventBatchPreparerImpl creates a new FirstEventBatchPreparerImpl. -func NewFirstEventBatchPreparerImpl(reader Reader, gcName string) *FirstEventBatchPreparerImpl { +// objects and objectList specify which resources will be included in the first batch. +// For each object from objects, FirstEventBatchPreparerImpl will get the corresponding resource from the reader. +// The object must specify its namespace (if any) and name. +// For each list from objectLists, FirstEventBatchPreparerImpl will list the resources of the corresponding type from +// the reader. +func NewFirstEventBatchPreparerImpl(reader Reader, objects []client.Object, objectLists []client.ObjectList) *FirstEventBatchPreparerImpl { return &FirstEventBatchPreparerImpl{ - reader: reader, - gcName: gcName, + reader: reader, + objects: objects, + objectLists: objectLists, + eachListItem: meta.EachListItem, } } -func (p *FirstEventBatchPreparerImpl) Prepare(ctx context.Context) (EventBatch, error) { - var gc v1beta1.GatewayClass - gcExist := true - err := p.reader.Get(ctx, types.NamespacedName{Name: p.gcName}, &gc) - if err != nil { - if apierrors.IsNotFound(err) { - gcExist = false - } else { - return nil, err - } - } +// SetEachListItem sets the EachListItemFunc function. +// Used for unit testing. +func (p *FirstEventBatchPreparerImpl) SetEachListItem(eachListItem EachListItemFunc) { + p.eachListItem = eachListItem +} - var svcList apiv1.ServiceList - var secretList apiv1.SecretList - var gwList v1beta1.GatewayList - var hrList v1beta1.HTTPRouteList +func (p *FirstEventBatchPreparerImpl) Prepare(ctx context.Context) (EventBatch, error) { + total := 0 - objLists := []client.ObjectList{&svcList, &secretList, &gwList, &hrList} - for _, list := range objLists { + for _, list := range p.objectLists { err := p.reader.List(ctx, list) if err != nil { return nil, err } - } - gcCount := 0 - if gcExist { - gcCount = 1 + total += meta.LenList(list) } - batch := make([]interface{}, 0, gcCount+len(svcList.Items)+len(secretList.Items)+len(gwList.Items)+len(hrList.Items)) + // If some of p.objects don't exist, they will not be added to the batch. In that case, the capacity will be greater + // than the length, but it is OK, because len(p.objects) is small. + batch := make([]interface{}, 0, total+len(p.objects)) - // Note: the order of the events doesn't matter. + for _, obj := range p.objects { + key := types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()} - if gcExist { - batch = append(batch, &UpsertEvent{Resource: &gc}) + err := p.reader.Get(ctx, key, obj) + if err != nil { + if !apierrors.IsNotFound(err) { + return nil, err + } + } else { + batch = append(batch, &UpsertEvent{Resource: obj}) + } } - for i := range svcList.Items { - batch = append(batch, &UpsertEvent{Resource: &svcList.Items[i]}) - } - for i := range secretList.Items { - batch = append(batch, &UpsertEvent{Resource: &secretList.Items[i]}) - } - for i := range gwList.Items { - batch = append(batch, &UpsertEvent{Resource: &gwList.Items[i]}) - } - for i := range hrList.Items { - batch = append(batch, &UpsertEvent{Resource: &hrList.Items[i]}) + // Note: the order of the events doesn't matter. + + for _, list := range p.objectLists { + err := p.eachListItem(list, func(object runtime.Object) error { + clientObj, ok := object.(client.Object) + if !ok { + return fmt.Errorf("cannot cast %T to client.Object", object) + } + batch = append(batch, &UpsertEvent{Resource: clientObj}) + return nil + }) + if err != nil { + return nil, err + } } return batch, nil diff --git a/internal/events/first_eventbatch_preparer_test.go b/internal/events/first_eventbatch_preparer_test.go index 02e1581ca2..7005e05fdc 100644 --- a/internal/events/first_eventbatch_preparer_test.go +++ b/internal/events/first_eventbatch_preparer_test.go @@ -8,9 +8,9 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - apiv1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -30,13 +30,18 @@ var _ = Describe("FirstEventBatchPreparer", func() { BeforeEach(func() { fakeReader = &eventsfakes.FakeReader{} - preparer = events.NewFirstEventBatchPreparerImpl(fakeReader, gcName) + preparer = events.NewFirstEventBatchPreparerImpl( + fakeReader, + []client.Object{&v1beta1.GatewayClass{ObjectMeta: metav1.ObjectMeta{Name: gcName}}}, + []client.ObjectList{ + &v1beta1.HTTPRouteList{}, + }) }) Describe("Normal cases", func() { AfterEach(func() { Expect(fakeReader.GetCallCount()).Should(Equal(1)) - Expect(fakeReader.ListCallCount()).Should(Equal(4)) + Expect(fakeReader.ListCallCount()).Should(Equal(1)) }) It("should prepare zero events when resources don't exist", func() { @@ -55,8 +60,6 @@ var _ = Describe("FirstEventBatchPreparer", func() { }) It("should prepare one event for each resource type", func() { - const resourceName = "resource" - gatewayClass := v1beta1.GatewayClass{ObjectMeta: metav1.ObjectMeta{Name: gcName}} fakeReader.GetCalls(func(ctx context.Context, name types.NamespacedName, object client.Object) error { @@ -67,21 +70,12 @@ var _ = Describe("FirstEventBatchPreparer", func() { return nil }) - service := apiv1.Service{ObjectMeta: metav1.ObjectMeta{Name: resourceName}} - secret := apiv1.Secret{ObjectMeta: metav1.ObjectMeta{Name: resourceName}} - gateway := v1beta1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: resourceName}} - httpRoute := v1beta1.HTTPRoute{ObjectMeta: metav1.ObjectMeta{Name: resourceName}} + httpRoute := v1beta1.HTTPRoute{ObjectMeta: metav1.ObjectMeta{Name: "test"}} fakeReader.ListCalls(func(ctx context.Context, list client.ObjectList, option ...client.ListOption) error { Expect(option).To(BeEmpty()) switch typedList := list.(type) { - case *apiv1.ServiceList: - typedList.Items = append(typedList.Items, service) - case *apiv1.SecretList: - typedList.Items = append(typedList.Items, secret) - case *v1beta1.GatewayList: - typedList.Items = append(typedList.Items, gateway) case *v1beta1.HTTPRouteList: typedList.Items = append(typedList.Items, httpRoute) default: @@ -93,9 +87,6 @@ var _ = Describe("FirstEventBatchPreparer", func() { expectedBatch := events.EventBatch{ &events.UpsertEvent{Resource: &gatewayClass}, - &events.UpsertEvent{Resource: &service}, - &events.UpsertEvent{Resource: &secret}, - &events.UpsertEvent{Resource: &gateway}, &events.UpsertEvent{Resource: &httpRoute}, } @@ -107,7 +98,42 @@ var _ = Describe("FirstEventBatchPreparer", func() { }) Describe("Edge cases", func() { - DescribeTable("CachedReader returns errors", + Describe("EachListItem cases", func() { + BeforeEach(func() { + fakeReader.GetReturns(apierrors.NewNotFound(schema.GroupResource{}, "test")) + fakeReader.ListCalls(func(ctx context.Context, list client.ObjectList, option ...client.ListOption) error { + httpRoute := v1beta1.HTTPRoute{ObjectMeta: metav1.ObjectMeta{Name: "test"}} + typedList := list.(*v1beta1.HTTPRouteList) + typedList.Items = append(typedList.Items, httpRoute) + + return nil + }) + }) + + It("should return error if EachListItem passes a wrong object type", func() { + preparer.SetEachListItem(func(obj runtime.Object, fn func(runtime.Object) error) error { + return fn(&fakeRuntimeObject{}) + }) + + batch, err := preparer.Prepare(context.Background()) + Expect(batch).To(BeNil()) + Expect(err).To(MatchError("cannot cast *events_test.fakeRuntimeObject to client.Object")) + }) + + It("should return error if EachListItem returns an error", func() { + testError := errors.New("test") + + preparer.SetEachListItem(func(obj runtime.Object, fn func(runtime.Object) error) error { + return testError + }) + + batch, err := preparer.Prepare(context.Background()) + Expect(batch).To(BeNil()) + Expect(err).To(MatchError(testError)) + }) + }) + + DescribeTable("Reader returns errors", func(obj client.Object) { readerError := errors.New("test") @@ -117,14 +143,8 @@ var _ = Describe("FirstEventBatchPreparer", func() { switch obj.(type) { case *v1beta1.GatewayClass: fakeReader.GetReturns(readerError) - case *apiv1.Service: - fakeReader.ListReturnsOnCall(0, readerError) - case *apiv1.Secret: - fakeReader.ListReturnsOnCall(1, readerError) - case *v1beta1.Gateway: - fakeReader.ListReturnsOnCall(2, readerError) case *v1beta1.HTTPRoute: - fakeReader.ListReturnsOnCall(3, readerError) + fakeReader.ListReturnsOnCall(0, readerError) default: Fail(fmt.Sprintf("Unknown type: %T", obj)) } @@ -133,11 +153,18 @@ var _ = Describe("FirstEventBatchPreparer", func() { Expect(batch).To(BeNil()) Expect(err).To(MatchError(readerError)) }, - Entry("Service", &apiv1.Service{}), - Entry("Secret", &apiv1.Secret{}), Entry("GatewayClass", &v1beta1.GatewayClass{}), - Entry("Gateway", &v1beta1.Gateway{}), Entry("HTTPRoute", &v1beta1.HTTPRoute{}), ) }) }) + +type fakeRuntimeObject struct{} + +func (f *fakeRuntimeObject) GetObjectKind() schema.ObjectKind { + return nil +} + +func (f *fakeRuntimeObject) DeepCopyObject() runtime.Object { + return nil +} diff --git a/internal/manager/manager.go b/internal/manager/manager.go index e8ba48dbc4..14c5150c48 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -5,8 +5,10 @@ import ( "time" apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctlr "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" @@ -115,7 +117,18 @@ func Start(cfg config.Config) error { StatusUpdater: statusUpdater, }) - firstBatchPreparer := events.NewFirstEventBatchPreparerImpl(mgr.GetCache(), cfg.GatewayClassName) + firstBatchPreparer := events.NewFirstEventBatchPreparerImpl( + mgr.GetCache(), + []client.Object{ + &gatewayv1beta1.GatewayClass{ObjectMeta: metav1.ObjectMeta{Name: cfg.GatewayClassName}}, + }, + []client.ObjectList{ + &apiv1.ServiceList{}, + &apiv1.SecretList{}, + &gatewayv1beta1.GatewayList{}, + &gatewayv1beta1.HTTPRouteList{}, + }, + ) eventLoop := events.NewEventLoop( eventCh,