diff --git a/internal/events/eventsfakes/fake_first_event_batch_preparer.go b/internal/events/eventsfakes/fake_first_event_batch_preparer.go new file mode 100644 index 0000000000..e35585ddac --- /dev/null +++ b/internal/events/eventsfakes/fake_first_event_batch_preparer.go @@ -0,0 +1,117 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package eventsfakes + +import ( + "context" + "sync" + + "github.com/nginxinc/nginx-kubernetes-gateway/internal/events" +) + +type FakeFirstEventBatchPreparer struct { + PrepareStub func(context.Context) (events.EventBatch, error) + prepareMutex sync.RWMutex + prepareArgsForCall []struct { + arg1 context.Context + } + prepareReturns struct { + result1 events.EventBatch + result2 error + } + prepareReturnsOnCall map[int]struct { + result1 events.EventBatch + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeFirstEventBatchPreparer) Prepare(arg1 context.Context) (events.EventBatch, error) { + fake.prepareMutex.Lock() + ret, specificReturn := fake.prepareReturnsOnCall[len(fake.prepareArgsForCall)] + fake.prepareArgsForCall = append(fake.prepareArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.PrepareStub + fakeReturns := fake.prepareReturns + fake.recordInvocation("Prepare", []interface{}{arg1}) + fake.prepareMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeFirstEventBatchPreparer) PrepareCallCount() int { + fake.prepareMutex.RLock() + defer fake.prepareMutex.RUnlock() + return len(fake.prepareArgsForCall) +} + +func (fake *FakeFirstEventBatchPreparer) PrepareCalls(stub func(context.Context) (events.EventBatch, error)) { + fake.prepareMutex.Lock() + defer fake.prepareMutex.Unlock() + fake.PrepareStub = stub +} + +func (fake *FakeFirstEventBatchPreparer) PrepareArgsForCall(i int) context.Context { + fake.prepareMutex.RLock() + defer fake.prepareMutex.RUnlock() + argsForCall := fake.prepareArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeFirstEventBatchPreparer) PrepareReturns(result1 events.EventBatch, result2 error) { + fake.prepareMutex.Lock() + defer fake.prepareMutex.Unlock() + fake.PrepareStub = nil + fake.prepareReturns = struct { + result1 events.EventBatch + result2 error + }{result1, result2} +} + +func (fake *FakeFirstEventBatchPreparer) PrepareReturnsOnCall(i int, result1 events.EventBatch, result2 error) { + fake.prepareMutex.Lock() + defer fake.prepareMutex.Unlock() + fake.PrepareStub = nil + if fake.prepareReturnsOnCall == nil { + fake.prepareReturnsOnCall = make(map[int]struct { + result1 events.EventBatch + result2 error + }) + } + fake.prepareReturnsOnCall[i] = struct { + result1 events.EventBatch + result2 error + }{result1, result2} +} + +func (fake *FakeFirstEventBatchPreparer) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.prepareMutex.RLock() + defer fake.prepareMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeFirstEventBatchPreparer) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ events.FirstEventBatchPreparer = new(FakeFirstEventBatchPreparer) diff --git a/internal/events/eventsfakes/fake_reader.go b/internal/events/eventsfakes/fake_reader.go new file mode 100644 index 0000000000..6341c894fd --- /dev/null +++ b/internal/events/eventsfakes/fake_reader.go @@ -0,0 +1,196 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package eventsfakes + +import ( + "context" + "sync" + + "github.com/nginxinc/nginx-kubernetes-gateway/internal/events" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type FakeReader struct { + GetStub func(context.Context, types.NamespacedName, client.Object) error + getMutex sync.RWMutex + getArgsForCall []struct { + arg1 context.Context + arg2 types.NamespacedName + arg3 client.Object + } + getReturns struct { + result1 error + } + getReturnsOnCall map[int]struct { + result1 error + } + ListStub func(context.Context, client.ObjectList, ...client.ListOption) error + listMutex sync.RWMutex + listArgsForCall []struct { + arg1 context.Context + arg2 client.ObjectList + arg3 []client.ListOption + } + listReturns struct { + result1 error + } + listReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeReader) Get(arg1 context.Context, arg2 types.NamespacedName, arg3 client.Object) error { + fake.getMutex.Lock() + ret, specificReturn := fake.getReturnsOnCall[len(fake.getArgsForCall)] + fake.getArgsForCall = append(fake.getArgsForCall, struct { + arg1 context.Context + arg2 types.NamespacedName + arg3 client.Object + }{arg1, arg2, arg3}) + stub := fake.GetStub + fakeReturns := fake.getReturns + fake.recordInvocation("Get", []interface{}{arg1, arg2, arg3}) + fake.getMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeReader) GetCallCount() int { + fake.getMutex.RLock() + defer fake.getMutex.RUnlock() + return len(fake.getArgsForCall) +} + +func (fake *FakeReader) GetCalls(stub func(context.Context, types.NamespacedName, client.Object) error) { + fake.getMutex.Lock() + defer fake.getMutex.Unlock() + fake.GetStub = stub +} + +func (fake *FakeReader) GetArgsForCall(i int) (context.Context, types.NamespacedName, client.Object) { + fake.getMutex.RLock() + defer fake.getMutex.RUnlock() + argsForCall := fake.getArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeReader) GetReturns(result1 error) { + fake.getMutex.Lock() + defer fake.getMutex.Unlock() + fake.GetStub = nil + fake.getReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeReader) GetReturnsOnCall(i int, result1 error) { + fake.getMutex.Lock() + defer fake.getMutex.Unlock() + fake.GetStub = nil + if fake.getReturnsOnCall == nil { + fake.getReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.getReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeReader) List(arg1 context.Context, arg2 client.ObjectList, arg3 ...client.ListOption) error { + fake.listMutex.Lock() + ret, specificReturn := fake.listReturnsOnCall[len(fake.listArgsForCall)] + fake.listArgsForCall = append(fake.listArgsForCall, struct { + arg1 context.Context + arg2 client.ObjectList + arg3 []client.ListOption + }{arg1, arg2, arg3}) + stub := fake.ListStub + fakeReturns := fake.listReturns + fake.recordInvocation("List", []interface{}{arg1, arg2, arg3}) + fake.listMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3...) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeReader) ListCallCount() int { + fake.listMutex.RLock() + defer fake.listMutex.RUnlock() + return len(fake.listArgsForCall) +} + +func (fake *FakeReader) ListCalls(stub func(context.Context, client.ObjectList, ...client.ListOption) error) { + fake.listMutex.Lock() + defer fake.listMutex.Unlock() + fake.ListStub = stub +} + +func (fake *FakeReader) ListArgsForCall(i int) (context.Context, client.ObjectList, []client.ListOption) { + fake.listMutex.RLock() + defer fake.listMutex.RUnlock() + argsForCall := fake.listArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeReader) ListReturns(result1 error) { + fake.listMutex.Lock() + defer fake.listMutex.Unlock() + fake.ListStub = nil + fake.listReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeReader) ListReturnsOnCall(i int, result1 error) { + fake.listMutex.Lock() + defer fake.listMutex.Unlock() + fake.ListStub = nil + if fake.listReturnsOnCall == nil { + fake.listReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.listReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeReader) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.getMutex.RLock() + defer fake.getMutex.RUnlock() + fake.listMutex.RLock() + defer fake.listMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeReader) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ events.Reader = new(FakeReader) diff --git a/internal/events/first_eventbatch_preparer.go b/internal/events/first_eventbatch_preparer.go new file mode 100644 index 0000000000..e91eeed5db --- /dev/null +++ b/internal/events/first_eventbatch_preparer.go @@ -0,0 +1,111 @@ +package events + +import ( + "context" + "fmt" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . FirstEventBatchPreparer + +// FirstEventBatchPreparer prepares the first batch of events to be processed by the EventHandler. +// The first batch includes the UpsertEvents for all relevant resources in the cluster. +type FirstEventBatchPreparer interface { + // Prepare prepares the first event batch. + Prepare(ctx context.Context) (EventBatch, error) +} + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Reader + +// Reader allows getting and listing resources from a cache. +// This interface is introduced for testing to mock the methods from +// sigs.k8s.io/controller-runtime/pkg/client.Reader. +type Reader interface { + client.Reader +} + +// EachListItemFunc lists each item of a client.ObjectList. +// It is from k8s.io/apimachinery/pkg/api/meta. +type EachListItemFunc func(obj runtime.Object, fn func(runtime.Object) error) error + +// FirstEventBatchPreparerImpl is an implementation of FirstEventBatchPreparer. +type FirstEventBatchPreparerImpl struct { + reader Reader + objects []client.Object + objectLists []client.ObjectList + eachListItem EachListItemFunc +} + +// NewFirstEventBatchPreparerImpl creates a new FirstEventBatchPreparerImpl. +// objects and objectList specify which resources will be included in the first batch. +// For each object from objects, FirstEventBatchPreparerImpl will get the corresponding resource from the reader. +// The object must specify its namespace (if any) and name. +// For each list from objectLists, FirstEventBatchPreparerImpl will list the resources of the corresponding type from +// the reader. +func NewFirstEventBatchPreparerImpl(reader Reader, objects []client.Object, objectLists []client.ObjectList) *FirstEventBatchPreparerImpl { + return &FirstEventBatchPreparerImpl{ + reader: reader, + objects: objects, + objectLists: objectLists, + eachListItem: meta.EachListItem, + } +} + +// SetEachListItem sets the EachListItemFunc function. +// Used for unit testing. +func (p *FirstEventBatchPreparerImpl) SetEachListItem(eachListItem EachListItemFunc) { + p.eachListItem = eachListItem +} + +func (p *FirstEventBatchPreparerImpl) Prepare(ctx context.Context) (EventBatch, error) { + total := 0 + + for _, list := range p.objectLists { + err := p.reader.List(ctx, list) + if err != nil { + return nil, err + } + + total += meta.LenList(list) + } + + // If some of p.objects don't exist, they will not be added to the batch. In that case, the capacity will be greater + // than the length, but it is OK, because len(p.objects) is small. + batch := make([]interface{}, 0, total+len(p.objects)) + + for _, obj := range p.objects { + key := types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()} + + err := p.reader.Get(ctx, key, obj) + if err != nil { + if !apierrors.IsNotFound(err) { + return nil, err + } + } else { + batch = append(batch, &UpsertEvent{Resource: obj}) + } + } + + // Note: the order of the events doesn't matter. + + for _, list := range p.objectLists { + err := p.eachListItem(list, func(object runtime.Object) error { + clientObj, ok := object.(client.Object) + if !ok { + return fmt.Errorf("cannot cast %T to client.Object", object) + } + batch = append(batch, &UpsertEvent{Resource: clientObj}) + return nil + }) + if err != nil { + return nil, err + } + } + + return batch, nil +} diff --git a/internal/events/first_eventbatch_preparer_test.go b/internal/events/first_eventbatch_preparer_test.go new file mode 100644 index 0000000000..7005e05fdc --- /dev/null +++ b/internal/events/first_eventbatch_preparer_test.go @@ -0,0 +1,170 @@ +package events_test + +import ( + "context" + "errors" + "fmt" + "reflect" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/nginxinc/nginx-kubernetes-gateway/internal/events" + "github.com/nginxinc/nginx-kubernetes-gateway/internal/events/eventsfakes" +) + +var _ = Describe("FirstEventBatchPreparer", func() { + var ( + fakeReader *eventsfakes.FakeReader + preparer *events.FirstEventBatchPreparerImpl + ) + + const gcName = "my-class" + + BeforeEach(func() { + fakeReader = &eventsfakes.FakeReader{} + preparer = events.NewFirstEventBatchPreparerImpl( + fakeReader, + []client.Object{&v1beta1.GatewayClass{ObjectMeta: metav1.ObjectMeta{Name: gcName}}}, + []client.ObjectList{ + &v1beta1.HTTPRouteList{}, + }) + }) + + Describe("Normal cases", func() { + AfterEach(func() { + Expect(fakeReader.GetCallCount()).Should(Equal(1)) + Expect(fakeReader.ListCallCount()).Should(Equal(1)) + }) + + It("should prepare zero events when resources don't exist", func() { + fakeReader.GetCalls(func(ctx context.Context, name types.NamespacedName, object client.Object) error { + Expect(name).Should(Equal(types.NamespacedName{Name: gcName})) + Expect(object).Should(BeAssignableToTypeOf(&v1beta1.GatewayClass{})) + + return apierrors.NewNotFound(schema.GroupResource{}, "test") + }) + fakeReader.ListReturns(nil) + + batch, err := preparer.Prepare(context.Background()) + + Expect(batch).Should(BeEmpty()) + Expect(err).Should(BeNil()) + }) + + It("should prepare one event for each resource type", func() { + gatewayClass := v1beta1.GatewayClass{ObjectMeta: metav1.ObjectMeta{Name: gcName}} + + fakeReader.GetCalls(func(ctx context.Context, name types.NamespacedName, object client.Object) error { + Expect(name).Should(Equal(types.NamespacedName{Name: gcName})) + Expect(object).Should(BeAssignableToTypeOf(&v1beta1.GatewayClass{})) + + reflect.Indirect(reflect.ValueOf(object)).Set(reflect.Indirect(reflect.ValueOf(&gatewayClass))) + return nil + }) + + httpRoute := v1beta1.HTTPRoute{ObjectMeta: metav1.ObjectMeta{Name: "test"}} + + fakeReader.ListCalls(func(ctx context.Context, list client.ObjectList, option ...client.ListOption) error { + Expect(option).To(BeEmpty()) + + switch typedList := list.(type) { + case *v1beta1.HTTPRouteList: + typedList.Items = append(typedList.Items, httpRoute) + default: + Fail(fmt.Sprintf("unknown type: %T", typedList)) + } + + return nil + }) + + expectedBatch := events.EventBatch{ + &events.UpsertEvent{Resource: &gatewayClass}, + &events.UpsertEvent{Resource: &httpRoute}, + } + + batch, err := preparer.Prepare(context.Background()) + + Expect(batch).Should(Equal(expectedBatch)) + Expect(err).Should(BeNil()) + }) + }) + + Describe("Edge cases", func() { + Describe("EachListItem cases", func() { + BeforeEach(func() { + fakeReader.GetReturns(apierrors.NewNotFound(schema.GroupResource{}, "test")) + fakeReader.ListCalls(func(ctx context.Context, list client.ObjectList, option ...client.ListOption) error { + httpRoute := v1beta1.HTTPRoute{ObjectMeta: metav1.ObjectMeta{Name: "test"}} + typedList := list.(*v1beta1.HTTPRouteList) + typedList.Items = append(typedList.Items, httpRoute) + + return nil + }) + }) + + It("should return error if EachListItem passes a wrong object type", func() { + preparer.SetEachListItem(func(obj runtime.Object, fn func(runtime.Object) error) error { + return fn(&fakeRuntimeObject{}) + }) + + batch, err := preparer.Prepare(context.Background()) + Expect(batch).To(BeNil()) + Expect(err).To(MatchError("cannot cast *events_test.fakeRuntimeObject to client.Object")) + }) + + It("should return error if EachListItem returns an error", func() { + testError := errors.New("test") + + preparer.SetEachListItem(func(obj runtime.Object, fn func(runtime.Object) error) error { + return testError + }) + + batch, err := preparer.Prepare(context.Background()) + Expect(batch).To(BeNil()) + Expect(err).To(MatchError(testError)) + }) + }) + + DescribeTable("Reader returns errors", + func(obj client.Object) { + readerError := errors.New("test") + + fakeReader.GetReturns(nil) + fakeReader.ListReturns(nil) + + switch obj.(type) { + case *v1beta1.GatewayClass: + fakeReader.GetReturns(readerError) + case *v1beta1.HTTPRoute: + fakeReader.ListReturnsOnCall(0, readerError) + default: + Fail(fmt.Sprintf("Unknown type: %T", obj)) + } + + batch, err := preparer.Prepare(context.Background()) + Expect(batch).To(BeNil()) + Expect(err).To(MatchError(readerError)) + }, + Entry("GatewayClass", &v1beta1.GatewayClass{}), + Entry("HTTPRoute", &v1beta1.HTTPRoute{}), + ) + }) +}) + +type fakeRuntimeObject struct{} + +func (f *fakeRuntimeObject) GetObjectKind() schema.ObjectKind { + return nil +} + +func (f *fakeRuntimeObject) DeepCopyObject() runtime.Object { + return nil +} diff --git a/internal/events/handler.go b/internal/events/handler.go index a3a40499a1..8abc617104 100644 --- a/internal/events/handler.go +++ b/internal/events/handler.go @@ -20,6 +20,7 @@ import ( // EventHandler handle events. type EventHandler interface { // HandleEventBatch handles a batch of events. + // EventBatch can include duplicated events. HandleEventBatch(ctx context.Context, batch EventBatch) } diff --git a/internal/events/loop.go b/internal/events/loop.go index 5c43d1d6cf..23df1b866b 100644 --- a/internal/events/loop.go +++ b/internal/events/loop.go @@ -25,24 +25,27 @@ type EventLoop struct { eventCh <-chan interface{} logger logr.Logger handler EventHandler + + preparer FirstEventBatchPreparer } // NewEventLoop creates a new EventLoop. -func NewEventLoop(eventCh <-chan interface{}, logger logr.Logger, handler EventHandler) *EventLoop { +func NewEventLoop( + eventCh <-chan interface{}, + logger logr.Logger, + handler EventHandler, + preparer FirstEventBatchPreparer, +) *EventLoop { return &EventLoop{ - eventCh: eventCh, - logger: logger, - handler: handler, + eventCh: eventCh, + logger: logger, + handler: handler, + preparer: preparer, } } // Start starts the EventLoop. // This method will block until the EventLoop stops, which will happen after the ctx is closed. -// -// FIXME(pleshakov). Ensure that when the Gateway starts, the first time it generates configuration for NGINX, -// it has a complete view of the cluster resources. For example, when the Gateway processes a Gateway resource -// with a listener with TLS termination enabled (the listener references a TLS Secret), the Gateway knows about the secret. -// This way the Gateway will not produce any incomplete transient configuration at the start. func (el *EventLoop) Start(ctx context.Context) error { // The current batch. var batch EventBatch @@ -51,16 +54,43 @@ func (el *EventLoop) Start(ctx context.Context) error { // handlingDone is used to signal the completion of handling a batch. handlingDone := make(chan struct{}) - handle := func(ctx context.Context, batch EventBatch) { - el.logger.Info("Handling events from the batch", "total", len(batch)) + handleAndResetBatch := func() { + go func(batch EventBatch) { + el.logger.Info("Handling events from the batch", "total", len(batch)) + + el.handler.HandleEventBatch(ctx, batch) - el.handler.HandleEventBatch(ctx, batch) + el.logger.Info("Finished handling the batch") + handlingDone <- struct{}{} + }(batch) - el.logger.Info("Finished handling the batch") + // FIXME(pleshakov): Making an entirely new buffer is inefficient and multiplies memory operations. + // Use a double-buffer approach - create two buffers and exchange them between the producer and consumer + // routines. NOTE: pass-by-reference, and reset buffer to length 0, but retain capacity. + batch = make([]interface{}, 0) + } + + // Prepare the fist event batch, which includes the UpsertEvents for all relevant cluster resources. + // This is necessary so that the first time the EventHandler generates NGINX configuration, it derives it from + // a complete view of the cluster. Otherwise, the handler would generate incomplete configuration, which can lead + // to clients seeing transient 404 errors from NGINX and incorrect statuses of the resources updated by the Gateway. + // + // Note: + // After the handler goroutine handles the first batch, the loop will start receiving events from + // the controllers, which at the beginning will be UpsertEvents with the relevant cluster resources - i.e. they + // will be duplicates of the events in the first batch. This is OK, because it is expected that the EventHandler will + // not trigger any reconfiguration after receiving an upsert for an existing resource with the same Generation. - handlingDone <- struct{}{} + var err error + batch, err = el.preparer.Prepare(ctx) + if err != nil { + return fmt.Errorf("failed to prepare the first batch: %w", err) } + // Handle the first batch + handleAndResetBatch() + handling = true + // Note: at any point of time, no more than one batch is currently being handled. // The event loop @@ -85,11 +115,7 @@ func (el *EventLoop) Start(ctx context.Context) error { // Handle the current batch if no batch is being handled. if !handling { - go handle(ctx, batch) - // FIXME(pleshakov): Making an entirely new buffer is inefficient and multiplies memory operations. - // Use a double-buffer approach - create two buffers and exchange them between the producer and consumer - // routines. NOTE: pass-by-reference, and reset buffer to length 0, but retain capacity. - batch = make([]interface{}, 0) + handleAndResetBatch() handling = true } case <-handlingDone: @@ -97,8 +123,7 @@ func (el *EventLoop) Start(ctx context.Context) error { // Handle the current batch if it has at least one event. if len(batch) > 0 { - go handle(ctx, batch) - batch = make([]interface{}, 0) + handleAndResetBatch() handling = true } } diff --git a/internal/events/loop_test.go b/internal/events/loop_test.go index 0135ab93e9..b6ae87fb9c 100644 --- a/internal/events/loop_test.go +++ b/internal/events/loop_test.go @@ -2,6 +2,7 @@ package events_test import ( "context" + "errors" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -13,89 +14,130 @@ import ( var _ = Describe("EventLoop", func() { var ( - fakeHandler *eventsfakes.FakeEventHandler - eventCh chan interface{} - eventLoop *events.EventLoop - cancel context.CancelFunc - errorCh chan error + fakeHandler *eventsfakes.FakeEventHandler + eventCh chan interface{} + fakePreparer *eventsfakes.FakeFirstEventBatchPreparer + eventLoop *events.EventLoop + ctx context.Context + cancel context.CancelFunc + errorCh chan error ) BeforeEach(func() { fakeHandler = &eventsfakes.FakeEventHandler{} eventCh = make(chan interface{}) + fakePreparer = &eventsfakes.FakeFirstEventBatchPreparer{} - eventLoop = events.NewEventLoop(eventCh, zap.New(), fakeHandler) + eventLoop = events.NewEventLoop(eventCh, zap.New(), fakeHandler, fakePreparer) - var ctx context.Context ctx, cancel = context.WithCancel(context.Background()) errorCh = make(chan error) - - go func() { - errorCh <- eventLoop.Start(ctx) - }() }) - AfterEach(func() { - cancel() + Describe("Normal processing", func() { + BeforeEach(func() { + batch := events.EventBatch{ + "event0", + } + fakePreparer.PrepareReturns(batch, nil) - var err error - Eventually(errorCh).Should(Receive(&err)) - Expect(err).To(BeNil()) - }) + go func() { + errorCh <- eventLoop.Start(ctx) + }() - It("should process a single event", func() { - e := "event" + // Ensure the first batch is handled + Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(1)) + _, batch = fakeHandler.HandleEventBatchArgsForCall(0) - eventCh <- e + var expectedBatch events.EventBatch = []interface{}{"event0"} + Expect(batch).Should(Equal(expectedBatch)) + }) - Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(1)) - _, batch := fakeHandler.HandleEventBatchArgsForCall(0) + AfterEach(func() { + cancel() - var expectedBatch events.EventBatch = []interface{}{e} - Expect(batch).Should(Equal(expectedBatch)) - }) + var err error + Eventually(errorCh).Should(Receive(&err)) + Expect(err).To(BeNil()) + }) - It("should batch multiple events", func() { - firstHandleEventBatchCallInProgress := make(chan struct{}) - sentSecondAndThirdEvents := make(chan struct{}) + // Because BeforeEach() creates the first batch and waits for it to be handled, in the tests below + // HandleEventBatchCallCount() is already 1. - // The func below will pause the handler goroutine while it is processing the batch with e1 until - // sentSecondAndThirdEvents is closed. This way we can add e2 and e3 to the current batch in the meantime. - fakeHandler.HandleEventBatchCalls(func(ctx context.Context, batch events.EventBatch) { - close(firstHandleEventBatchCallInProgress) - <-sentSecondAndThirdEvents + It("should process a single event", func() { + e := "event" + + eventCh <- e + + Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(2)) + _, batch := fakeHandler.HandleEventBatchArgsForCall(1) + + var expectedBatch events.EventBatch = []interface{}{e} + Expect(batch).Should(Equal(expectedBatch)) }) - e1 := "event1" - e2 := "event2" - e3 := "event3" + It("should batch multiple events", func() { + firstHandleEventBatchCallInProgress := make(chan struct{}) + sentSecondAndThirdEvents := make(chan struct{}) + + // The func below will pause the handler goroutine while it is processing the batch with e1 until + // sentSecondAndThirdEvents is closed. This way we can add e2 and e3 to the current batch in the meantime. + fakeHandler.HandleEventBatchCalls(func(ctx context.Context, batch events.EventBatch) { + close(firstHandleEventBatchCallInProgress) + <-sentSecondAndThirdEvents + }) + + e1 := "event1" + e2 := "event2" + e3 := "event3" - eventCh <- e1 + eventCh <- e1 - // Making sure the handler goroutine started handling the batch with e1. - <-firstHandleEventBatchCallInProgress + // Making sure the handler goroutine started handling the batch with e1. + <-firstHandleEventBatchCallInProgress - eventCh <- e2 - eventCh <- e3 - // The event loop will add the e2 and e3 event to current batch before starting another handler goroutine. + eventCh <- e2 + eventCh <- e3 + // The event loop will add the e2 and e3 event to current batch before starting another handler goroutine. - fakeHandler.HandleEventBatchCalls(nil) + fakeHandler.HandleEventBatchCalls(nil) - // Unpause the handler goroutine so that it can handle the current batch. - close(sentSecondAndThirdEvents) + // Unpause the handler goroutine so that it can handle the current batch. + close(sentSecondAndThirdEvents) - Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(2)) - _, batch := fakeHandler.HandleEventBatchArgsForCall(0) + Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(3)) + _, batch := fakeHandler.HandleEventBatchArgsForCall(1) - var expectedBatch events.EventBatch = []interface{}{e1} + var expectedBatch events.EventBatch = []interface{}{e1} - // the first HandleEventBatch() call must have handled a batch with e1 - Expect(batch).Should(Equal(expectedBatch)) + // the first HandleEventBatch() call must have handled a batch with e1 + Expect(batch).Should(Equal(expectedBatch)) - _, batch = fakeHandler.HandleEventBatchArgsForCall(1) + _, batch = fakeHandler.HandleEventBatchArgsForCall(2) - expectedBatch = []interface{}{e2, e3} - // the second HandleEventBatch() call must have handled a batch with e2 and e3 - Expect(batch).Should(Equal(expectedBatch)) + expectedBatch = []interface{}{e2, e3} + // the second HandleEventBatch() call must have handled a batch with e2 and e3 + Expect(batch).Should(Equal(expectedBatch)) + }) + }) + + Describe("Edge cases", func() { + It("should return error when preparer returns error without blocking", func() { + preparerError := errors.New("test") + fakePreparer.PrepareReturns(events.EventBatch{}, preparerError) + + err := eventLoop.Start(ctx) + + Expect(err).Should(MatchError(preparerError)) + }) + + It("should return nil when started with canceled context without blocking", func() { + fakePreparer.PrepareReturns(events.EventBatch{}, nil) + + cancel() + err := eventLoop.Start(ctx) + + Expect(err).Should(BeNil()) + }) }) }) diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 8c24c8beae..14c5150c48 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -5,8 +5,10 @@ import ( "time" apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctlr "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" @@ -115,10 +117,24 @@ func Start(cfg config.Config) error { StatusUpdater: statusUpdater, }) + firstBatchPreparer := events.NewFirstEventBatchPreparerImpl( + mgr.GetCache(), + []client.Object{ + &gatewayv1beta1.GatewayClass{ObjectMeta: metav1.ObjectMeta{Name: cfg.GatewayClassName}}, + }, + []client.ObjectList{ + &apiv1.ServiceList{}, + &apiv1.SecretList{}, + &gatewayv1beta1.GatewayList{}, + &gatewayv1beta1.HTTPRouteList{}, + }, + ) + eventLoop := events.NewEventLoop( eventCh, cfg.Logger.WithName("eventLoop"), - eventHandler) + eventHandler, + firstBatchPreparer) err = mgr.Add(eventLoop) if err != nil { diff --git a/internal/state/change_processor.go b/internal/state/change_processor.go index 18040b9c7e..3b83b44956 100644 --- a/internal/state/change_processor.go +++ b/internal/state/change_processor.go @@ -12,7 +12,7 @@ import ( //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . ChangeProcessor // ChangeProcessor processes the changes to resources producing the internal representation of the Gateway configuration. -// ChangeProcessor only supports one Gateway resource. +// ChangeProcessor only supports one GatewayClass resource. type ChangeProcessor interface { // CaptureUpsertChange captures an upsert change to a resource. // It panics if the resource is of unsupported type or if the passed Gateway is different from the one this ChangeProcessor @@ -31,8 +31,6 @@ type ChangeProcessor interface { // ChangeProcessorConfig holds configuration parameters for ChangeProcessorImpl. type ChangeProcessorConfig struct { - // GatewayNsName is the namespaced name of the Gateway resource. - GatewayNsName types.NamespacedName // GatewayCtlrName is the name of the Gateway controller. GatewayCtlrName string // GatewayClassName is the name of the GatewayClass resource. @@ -41,6 +39,7 @@ type ChangeProcessorConfig struct { SecretMemoryManager SecretDiskMemoryManager } +// ChangeProcessorImpl is an implementation of ChangeProcessor. type ChangeProcessorImpl struct { store *store // storeChanged tells if the store is changed.