diff --git a/cmd/gateway/commands.go b/cmd/gateway/commands.go index d4eaa3ccc5..29c729f436 100644 --- a/cmd/gateway/commands.go +++ b/cmd/gateway/commands.go @@ -170,6 +170,7 @@ func createStaticModeCommand() *cobra.Command { }, Plus: plus, TelemetryReportPeriod: period, + Version: version, } if err := static.StartManager(conf); err != nil { diff --git a/deploy/helm-chart/templates/rbac.yaml b/deploy/helm-chart/templates/rbac.yaml index ac49069f32..97963ed1bd 100644 --- a/deploy/helm-chart/templates/rbac.yaml +++ b/deploy/helm-chart/templates/rbac.yaml @@ -21,6 +21,9 @@ rules: - namespaces - services - secrets + # FIXME(bjee19): make nodes permission dependent on telemetry being enabled. + # https://github.com/nginxinc/nginx-gateway-fabric/issues/1317. + - nodes verbs: - list - watch diff --git a/deploy/manifests/nginx-gateway.yaml b/deploy/manifests/nginx-gateway.yaml index 80fc304c05..de2c2c1b11 100644 --- a/deploy/manifests/nginx-gateway.yaml +++ b/deploy/manifests/nginx-gateway.yaml @@ -32,6 +32,9 @@ rules: - namespaces - services - secrets + # FIXME(bjee19): make nodes permission dependent on telemetry being enabled. + # https://github.com/nginxinc/nginx-gateway-fabric/issues/1317. + - nodes verbs: - list - watch diff --git a/internal/framework/runnables/cronjob.go b/internal/framework/runnables/cronjob.go index cc48bda91b..3bd4f32eaf 100644 --- a/internal/framework/runnables/cronjob.go +++ b/internal/framework/runnables/cronjob.go @@ -13,6 +13,8 @@ import ( type CronJobConfig struct { // Worker is the function that will be run for every cronjob iteration. Worker func(context.Context) + // ReadyCh delays the start of the job until the channel is closed. + ReadyCh <-chan struct{} // Logger is the logger. Logger logr.Logger // Period defines the period of the cronjob. The cronjob will run every Period. @@ -37,6 +39,13 @@ func NewCronJob(cfg CronJobConfig) *CronJob { // Start starts the cronjob. // Implements controller-runtime manager.Runnable func (j *CronJob) Start(ctx context.Context) error { + select { + case <-j.cfg.ReadyCh: + case <-ctx.Done(): + j.cfg.Logger.Info("Context canceled, failed to start cronjob") + return ctx.Err() + } + j.cfg.Logger.Info("Starting cronjob") sliding := true // This means the period with jitter will be calculated after each worker call. diff --git a/internal/framework/runnables/cronjob_test.go b/internal/framework/runnables/cronjob_test.go index 3faca2e160..9a3a6bbf72 100644 --- a/internal/framework/runnables/cronjob_test.go +++ b/internal/framework/runnables/cronjob_test.go @@ -12,6 +12,8 @@ import ( func TestCronJob(t *testing.T) { g := NewWithT(t) + readyChannel := make(chan struct{}) + timeout := 10 * time.Second var callCount int @@ -22,9 +24,10 @@ func TestCronJob(t *testing.T) { } cfg := CronJobConfig{ - Worker: worker, - Logger: zap.New(), - Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times + Worker: worker, + Logger: zap.New(), + Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times + ReadyCh: readyChannel, } job := NewCronJob(cfg) @@ -35,6 +38,7 @@ func TestCronJob(t *testing.T) { errCh <- job.Start(ctx) close(errCh) }() + close(readyChannel) minReports := 2 // ensure that the CronJob reports more than once: it doesn't exit after the first run @@ -44,3 +48,29 @@ func TestCronJob(t *testing.T) { g.Eventually(errCh).Should(Receive(BeNil())) g.Eventually(errCh).Should(BeClosed()) } + +func TestCronJob_ContextCanceled(t *testing.T) { + g := NewWithT(t) + + readyChannel := make(chan struct{}) + + cfg := CronJobConfig{ + Worker: func(ctx context.Context) {}, + Logger: zap.New(), + Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times + ReadyCh: readyChannel, + } + job := NewCronJob(cfg) + + ctx, cancel := context.WithCancel(context.Background()) + + errCh := make(chan error) + go func() { + errCh <- job.Start(ctx) + close(errCh) + }() + + cancel() + g.Eventually(errCh).Should(Receive(MatchError(context.Canceled))) + g.Eventually(errCh).Should(BeClosed()) +} diff --git a/internal/mode/static/config/config.go b/internal/mode/static/config/config.go index d434fce894..abbec1faa6 100644 --- a/internal/mode/static/config/config.go +++ b/internal/mode/static/config/config.go @@ -9,6 +9,8 @@ import ( ) type Config struct { + // Version is the running NGF version. + Version string // AtomicLevel is an atomically changeable, dynamic logging level. AtomicLevel zap.AtomicLevel // GatewayNsName is the namespaced name of a Gateway resource that the Gateway will use. diff --git a/internal/mode/static/handler.go b/internal/mode/static/handler.go index d0621e2fa0..74828760b3 100644 --- a/internal/mode/static/handler.go +++ b/internal/mode/static/handler.go @@ -3,6 +3,7 @@ package static import ( "context" "fmt" + "sync" "time" "github.com/go-logr/logr" @@ -58,8 +59,8 @@ type eventHandlerConfig struct { logLevelSetter logLevelSetter // metricsCollector collects metrics for this controller. metricsCollector handlerMetricsCollector - // healthChecker sets the health of the Pod to Ready once we've written out our initial config - healthChecker *healthChecker + // nginxConfiguredOnStartChecker sets the health of the Pod to Ready once we've written out our initial config. + nginxConfiguredOnStartChecker *nginxConfiguredOnStartChecker // controlConfigNSName is the NamespacedName of the NginxGateway config for this controller. controlConfigNSName types.NamespacedName // version is the current version number of the nginx config. @@ -72,7 +73,10 @@ type eventHandlerConfig struct { // (2) Keeping the statuses of the Gateway API resources updated. // (3) Updating control plane configuration. type eventHandlerImpl struct { - cfg eventHandlerConfig + // latestConfiguration is the latest Configuration generation. + latestConfiguration *dataplane.Configuration + cfg eventHandlerConfig + lock sync.Mutex } // newEventHandlerImpl creates a new eventHandlerImpl. @@ -105,22 +109,30 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log switch changeType { case state.NoChange: logger.Info("Handling events didn't result into NGINX configuration changes") - if !h.cfg.healthChecker.ready && h.cfg.healthChecker.firstBatchError == nil { - h.cfg.healthChecker.setAsReady() + if !h.cfg.nginxConfiguredOnStartChecker.ready && h.cfg.nginxConfiguredOnStartChecker.firstBatchError == nil { + h.cfg.nginxConfiguredOnStartChecker.setAsReady() } return case state.EndpointsOnlyChange: h.cfg.version++ + cfg := dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version) + + h.setLatestConfiguration(&cfg) + err = h.updateUpstreamServers( ctx, logger, - dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version), + cfg, ) case state.ClusterStateChange: h.cfg.version++ + cfg := dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version) + + h.setLatestConfiguration(&cfg) + err = h.updateNginxConf( ctx, - dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version), + cfg, ) } @@ -128,13 +140,13 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log if err != nil { logger.Error(err, "Failed to update NGINX configuration") nginxReloadRes.error = err - if !h.cfg.healthChecker.ready { - h.cfg.healthChecker.firstBatchError = err + if !h.cfg.nginxConfiguredOnStartChecker.ready { + h.cfg.nginxConfiguredOnStartChecker.firstBatchError = err } } else { logger.Info("NGINX configuration was successfully updated") - if !h.cfg.healthChecker.ready { - h.cfg.healthChecker.setAsReady() + if !h.cfg.nginxConfiguredOnStartChecker.ready { + h.cfg.nginxConfiguredOnStartChecker.setAsReady() } } @@ -384,3 +396,19 @@ func getGatewayAddresses( return gwAddresses, nil } + +// GetLatestConfiguration gets the latest configuration. +func (h *eventHandlerImpl) GetLatestConfiguration() *dataplane.Configuration { + h.lock.Lock() + defer h.lock.Unlock() + + return h.latestConfiguration +} + +// setLatestConfiguration sets the latest configuration. +func (h *eventHandlerImpl) setLatestConfiguration(cfg *dataplane.Configuration) { + h.lock.Lock() + defer h.lock.Unlock() + + h.latestConfiguration = cfg +} diff --git a/internal/mode/static/handler_test.go b/internal/mode/static/handler_test.go index 9adbf6aeca..682c2bc48c 100644 --- a/internal/mode/static/handler_test.go +++ b/internal/mode/static/handler_test.go @@ -76,23 +76,23 @@ var _ = Describe("eventHandler", func() { zapLogLevelSetter = newZapLogLevelSetter(zap.NewAtomicLevel()) handler = newEventHandlerImpl(eventHandlerConfig{ - k8sClient: fake.NewFakeClient(), - processor: fakeProcessor, - generator: fakeGenerator, - logLevelSetter: zapLogLevelSetter, - nginxFileMgr: fakeNginxFileMgr, - nginxRuntimeMgr: fakeNginxRuntimeMgr, - statusUpdater: fakeStatusUpdater, - eventRecorder: fakeEventRecorder, - healthChecker: &healthChecker{}, - controlConfigNSName: types.NamespacedName{Namespace: namespace, Name: configName}, + k8sClient: fake.NewFakeClient(), + processor: fakeProcessor, + generator: fakeGenerator, + logLevelSetter: zapLogLevelSetter, + nginxFileMgr: fakeNginxFileMgr, + nginxRuntimeMgr: fakeNginxRuntimeMgr, + statusUpdater: fakeStatusUpdater, + eventRecorder: fakeEventRecorder, + nginxConfiguredOnStartChecker: newNginxConfiguredOnStartChecker(), + controlConfigNSName: types.NamespacedName{Namespace: namespace, Name: configName}, gatewayPodConfig: config.GatewayPodConfig{ ServiceName: "nginx-gateway", Namespace: "nginx-gateway", }, metricsCollector: collectors.NewControllerNoopCollector(), }) - Expect(handler.cfg.healthChecker.ready).To(BeFalse()) + Expect(handler.cfg.nginxConfiguredOnStartChecker.ready).To(BeFalse()) }) Describe("Process the Gateway API resources events", func() { @@ -122,7 +122,7 @@ var _ = Describe("eventHandler", func() { }) AfterEach(func() { - Expect(handler.cfg.healthChecker.ready).To(BeTrue()) + Expect(handler.cfg.nginxConfiguredOnStartChecker.ready).To(BeTrue()) }) When("a batch has one event", func() { @@ -132,8 +132,11 @@ var _ = Describe("eventHandler", func() { handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) + dcfg := &dataplane.Configuration{Version: 1} + checkUpsertEventExpectations(e) - expectReconfig(dataplane.Configuration{Version: 1}, fakeCfgFiles) + expectReconfig(*dcfg, fakeCfgFiles) + Expect(helpers.Diff(handler.GetLatestConfiguration(), dcfg)).To(BeEmpty()) }) It("should process Delete", func() { @@ -145,8 +148,11 @@ var _ = Describe("eventHandler", func() { handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) + dcfg := &dataplane.Configuration{Version: 1} + checkDeleteEventExpectations(e) - expectReconfig(dataplane.Configuration{Version: 1}, fakeCfgFiles) + expectReconfig(*dcfg, fakeCfgFiles) + Expect(helpers.Diff(handler.GetLatestConfiguration(), dcfg)).To(BeEmpty()) }) }) @@ -165,6 +171,7 @@ var _ = Describe("eventHandler", func() { checkDeleteEventExpectations(deleteEvent) handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) + Expect(helpers.Diff(handler.GetLatestConfiguration(), &dataplane.Configuration{Version: 2})).To(BeEmpty()) }) }) }) @@ -199,6 +206,8 @@ var _ = Describe("eventHandler", func() { batch := []interface{}{&events.UpsertEvent{Resource: cfg(ngfAPI.ControllerLogLevelError)}} handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) + Expect(handler.GetLatestConfiguration()).To(BeNil()) + Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(1)) _, statuses := fakeStatusUpdater.UpdateArgsForCall(0) Expect(statuses).To(Equal(expStatuses(staticConds.NewNginxGatewayValid()))) @@ -210,6 +219,8 @@ var _ = Describe("eventHandler", func() { batch := []interface{}{&events.UpsertEvent{Resource: cfg(ngfAPI.ControllerLogLevel("invalid"))}} handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) + Expect(handler.GetLatestConfiguration()).To(BeNil()) + Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(1)) _, statuses := fakeStatusUpdater.UpdateArgsForCall(0) cond := staticConds.NewNginxGatewayInvalid( @@ -228,6 +239,9 @@ var _ = Describe("eventHandler", func() { It("handles a deleted config", func() { batch := []interface{}{&events.DeleteEvent{Type: &ngfAPI.NginxGateway{}}} handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) + + Expect(handler.GetLatestConfiguration()).To(BeNil()) + Expect(len(fakeEventRecorder.Events)).To(Equal(1)) event := <-fakeEventRecorder.Events Expect(event).To(Equal("Warning ResourceDeleted NginxGateway configuration was deleted; using defaults")) @@ -253,6 +267,8 @@ var _ = Describe("eventHandler", func() { handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) + Expect(handler.GetLatestConfiguration()).To(BeNil()) + Expect(fakeStatusUpdater.UpdateAddressesCallCount()).To(BeZero()) }) @@ -266,6 +282,8 @@ var _ = Describe("eventHandler", func() { batch := []interface{}{e} handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) + + Expect(handler.GetLatestConfiguration()).To(BeNil()) Expect(fakeStatusUpdater.UpdateAddressesCallCount()).ToNot(BeZero()) }) @@ -280,6 +298,8 @@ var _ = Describe("eventHandler", func() { batch := []interface{}{e} handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) + + Expect(handler.GetLatestConfiguration()).To(BeNil()) Expect(fakeStatusUpdater.UpdateAddressesCallCount()).ToNot(BeZero()) }) }) @@ -310,6 +330,8 @@ var _ = Describe("eventHandler", func() { fakeNginxRuntimeMgr.IsPlusReturns(true) handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) + Expect(helpers.Diff(handler.GetLatestConfiguration(), &dataplane.Configuration{Version: 1})).To(BeEmpty()) + Expect(fakeGenerator.GenerateCallCount()).To(Equal(1)) Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(1)) Expect(fakeNginxRuntimeMgr.GetUpstreamsCallCount()).To(Equal(1)) @@ -319,6 +341,8 @@ var _ = Describe("eventHandler", func() { When("not running NGINX Plus", func() { It("should not call the NGINX Plus API", func() { handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) + Expect(helpers.Diff(handler.GetLatestConfiguration(), &dataplane.Configuration{Version: 1})).To(BeEmpty()) + Expect(fakeGenerator.GenerateCallCount()).To(Equal(1)) Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(1)) Expect(fakeNginxRuntimeMgr.GetUpstreamsCallCount()).To(Equal(0)) @@ -405,40 +429,53 @@ var _ = Describe("eventHandler", func() { It("should set the health checker status properly when there are changes", func() { e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}} batch := []interface{}{e} + readyChannel := handler.cfg.nginxConfiguredOnStartChecker.getReadyCh() fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{}) - Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed()) + Expect(handler.cfg.nginxConfiguredOnStartChecker.readyCheck(nil)).ToNot(Succeed()) handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) - Expect(handler.cfg.healthChecker.readyCheck(nil)).To(Succeed()) + + Expect(helpers.Diff(handler.GetLatestConfiguration(), &dataplane.Configuration{Version: 1})).To(BeEmpty()) + + Expect(readyChannel).To(BeClosed()) + + Expect(handler.cfg.nginxConfiguredOnStartChecker.readyCheck(nil)).To(Succeed()) }) It("should set the health checker status properly when there are no changes or errors", func() { e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}} batch := []interface{}{e} + readyChannel := handler.cfg.nginxConfiguredOnStartChecker.getReadyCh() - Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed()) + Expect(handler.cfg.nginxConfiguredOnStartChecker.readyCheck(nil)).ToNot(Succeed()) handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) - Expect(handler.cfg.healthChecker.readyCheck(nil)).To(Succeed()) + + Expect(handler.GetLatestConfiguration()).To(BeNil()) + + Expect(readyChannel).To(BeClosed()) + + Expect(handler.cfg.nginxConfiguredOnStartChecker.readyCheck(nil)).To(Succeed()) }) It("should set the health checker status properly when there is an error", func() { e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}} batch := []interface{}{e} + readyChannel := handler.cfg.nginxConfiguredOnStartChecker.getReadyCh() fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{}) fakeNginxRuntimeMgr.ReloadReturns(errors.New("reload error")) handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) - Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed()) + Expect(handler.cfg.nginxConfiguredOnStartChecker.readyCheck(nil)).ToNot(Succeed()) // now send an update with no changes; should still return an error fakeProcessor.ProcessReturns(state.NoChange, &graph.Graph{}) handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) - Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed()) + Expect(handler.cfg.nginxConfiguredOnStartChecker.readyCheck(nil)).ToNot(Succeed()) // error goes away fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{}) @@ -446,7 +483,11 @@ var _ = Describe("eventHandler", func() { handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) - Expect(handler.cfg.healthChecker.readyCheck(nil)).To(Succeed()) + Expect(helpers.Diff(handler.GetLatestConfiguration(), &dataplane.Configuration{Version: 2})).To(BeEmpty()) + + Expect(readyChannel).To(BeClosed()) + + Expect(handler.cfg.nginxConfiguredOnStartChecker.readyCheck(nil)).To(Succeed()) }) It("should panic for an unknown event type", func() { @@ -458,6 +499,8 @@ var _ = Describe("eventHandler", func() { } Expect(handle).Should(Panic()) + + Expect(handler.GetLatestConfiguration()).To(BeNil()) }) }) diff --git a/internal/mode/static/health.go b/internal/mode/static/health.go index 3a764c347b..180c67d643 100644 --- a/internal/mode/static/health.go +++ b/internal/mode/static/health.go @@ -6,18 +6,28 @@ import ( "sync" ) -type healthChecker struct { +// newNginxConfiguredOnStartChecker creates a new nginxConfiguredOnStartChecker. +func newNginxConfiguredOnStartChecker() *nginxConfiguredOnStartChecker { + return &nginxConfiguredOnStartChecker{ + readyCh: make(chan struct{}), + } +} + +// nginxConfiguredOnStartChecker is used to check if nginx is successfully configured and if the NGF Pod is ready. +type nginxConfiguredOnStartChecker struct { // firstBatchError is set when the first batch fails to configure nginx // and we don't want to set ourselves as ready on the next batch if nothing changes firstBatchError error - lock sync.RWMutex - ready bool + // readyCh is a channel that is initialized in newNginxConfiguredOnStartChecker and represents if the NGF Pod is ready. + readyCh chan struct{} + lock sync.RWMutex + ready bool } // readyCheck returns the ready-state of the Pod. It satisfies the controller-runtime Checker type. // We are considered ready after the handler processed the first batch. In case there is NGINX configuration // to write, it must be written and NGINX must be reloaded successfully. -func (h *healthChecker) readyCheck(_ *http.Request) error { +func (h *nginxConfiguredOnStartChecker) readyCheck(_ *http.Request) error { h.lock.RLock() defer h.lock.RUnlock() @@ -29,10 +39,16 @@ func (h *healthChecker) readyCheck(_ *http.Request) error { } // setAsReady marks the health check as ready. -func (h *healthChecker) setAsReady() { +func (h *nginxConfiguredOnStartChecker) setAsReady() { h.lock.Lock() defer h.lock.Unlock() h.ready = true h.firstBatchError = nil + close(h.readyCh) +} + +// getReadyCh returns a read-only channel, which determines if the NGF Pod is ready. +func (h *nginxConfiguredOnStartChecker) getReadyCh() <-chan struct{} { + return h.readyCh } diff --git a/internal/mode/static/health_test.go b/internal/mode/static/health_test.go index 77dfa30ed9..352cc50b3c 100644 --- a/internal/mode/static/health_test.go +++ b/internal/mode/static/health_test.go @@ -8,9 +8,9 @@ import ( func TestReadyCheck(t *testing.T) { g := NewWithT(t) - hc := healthChecker{} - g.Expect(hc.readyCheck(nil)).ToNot(Succeed()) + nginxChecker := newNginxConfiguredOnStartChecker() + g.Expect(nginxChecker.readyCheck(nil)).ToNot(Succeed()) - hc.ready = true - g.Expect(hc.readyCheck(nil)).To(Succeed()) + nginxChecker.ready = true + g.Expect(nginxChecker.readyCheck(nil)).To(Succeed()) } diff --git a/internal/mode/static/manager.go b/internal/mode/static/manager.go index 445db3b289..712bb6c46f 100644 --- a/internal/mode/static/manager.go +++ b/internal/mode/static/manager.go @@ -69,8 +69,8 @@ func init() { // nolint:gocyclo func StartManager(cfg config.Config) error { - hc := &healthChecker{} - mgr, err := createManager(cfg, hc) + nginxChecker := newNginxConfiguredOnStartChecker() + mgr, err := createManager(cfg, nginxChecker) if err != nil { return fmt.Errorf("cannot build runtime manager: %w", err) } @@ -188,12 +188,12 @@ func StartManager(cfg config.Config) error { ngxruntimeCollector, cfg.Logger.WithName("nginxRuntimeManager"), ), - statusUpdater: statusUpdater, - eventRecorder: recorder, - healthChecker: hc, - controlConfigNSName: controlConfigNSName, - gatewayPodConfig: cfg.GatewayPodConfig, - metricsCollector: handlerCollector, + statusUpdater: statusUpdater, + eventRecorder: recorder, + nginxConfiguredOnStartChecker: nginxChecker, + controlConfigNSName: controlConfigNSName, + gatewayPodConfig: cfg.GatewayPodConfig, + metricsCollector: handlerCollector, }) objects, objectLists := prepareFirstEventBatchPreparerArgs(cfg.GatewayClassName, cfg.GatewayNsName) @@ -213,7 +213,13 @@ func StartManager(cfg config.Config) error { return fmt.Errorf("cannot register status updater: %w", err) } - if err = mgr.Add(createTelemetryJob(cfg)); err != nil { + dataCollector := telemetry.NewDataCollectorImpl(telemetry.DataCollectorConfig{ + K8sClientReader: mgr.GetClient(), + GraphGetter: processor, + ConfigurationGetter: eventHandler, + Version: cfg.Version, + }) + if err = mgr.Add(createTelemetryJob(cfg, dataCollector, nginxChecker.getReadyCh())); err != nil { return fmt.Errorf("cannot register telemetry job: %w", err) } @@ -221,7 +227,7 @@ func StartManager(cfg config.Config) error { return mgr.Start(ctx) } -func createManager(cfg config.Config, hc *healthChecker) (manager.Manager, error) { +func createManager(cfg config.Config, nginxChecker *nginxConfiguredOnStartChecker) (manager.Manager, error) { options := manager.Options{ Scheme: scheme, Logger: cfg.Logger, @@ -256,7 +262,7 @@ func createManager(cfg config.Config, hc *healthChecker) (manager.Manager, error } if cfg.HealthConfig.Enabled { - if err := mgr.AddReadyzCheck("readyz", hc.readyCheck); err != nil { + if err := mgr.AddReadyzCheck("readyz", nginxChecker.readyCheck); err != nil { return nil, fmt.Errorf("error adding ready check: %w", err) } } @@ -399,6 +405,33 @@ func registerControllers( return nil } +func createTelemetryJob( + cfg config.Config, + dataCollector telemetry.DataCollector, + readyCh <-chan struct{}, +) *runnables.Leader { + logger := cfg.Logger.WithName("telemetryJob") + exporter := telemetry.NewLoggingExporter(cfg.Logger.WithName("telemetryExporter").V(1 /* debug */)) + + worker := telemetry.CreateTelemetryJobWorker(logger, exporter, dataCollector) + + // 10 min jitter is enough per telemetry destination recommendation + // For the default period of 24 hours, jitter will be 10min /(24*60)min = 0.0069 + jitterFactor := 10.0 / (24 * 60) // added jitter is bound by jitterFactor * period + + return &runnables.Leader{ + Runnable: runnables.NewCronJob( + runnables.CronJobConfig{ + Worker: worker, + Logger: logger, + Period: cfg.TelemetryReportPeriod, + JitterFactor: jitterFactor, + ReadyCh: readyCh, + }, + ), + } +} + func prepareFirstEventBatchPreparerArgs( gcName string, gwNsName *types.NamespacedName, @@ -438,40 +471,6 @@ func prepareFirstEventBatchPreparerArgs( return objects, objectLists } -func createTelemetryJob(cfg config.Config) *runnables.Leader { - logger := cfg.Logger.WithName("telemetryJob") - exporter := telemetry.NewLoggingExporter(cfg.Logger.WithName("telemetryExporter").V(1 /* debug */)) - - worker := func(ctx context.Context) { - // Gather telemetry - logger.V(1).Info("Gathering telemetry") - - // We will need to gather data as defined in https://github.com/nginxinc/nginx-gateway-fabric/issues/793 - data := telemetry.Data{} - - // Export telemetry - logger.V(1).Info("Exporting telemetry") - - if err := exporter.Export(ctx, data); err != nil { - logger.Error(err, "Failed to export telemetry") - } - } - // 10 min jitter is enough per telemetry destination recommendation - // For the default period of 24 hours, jitter will be 10min /(24*60)min = 0.0069 - jitterFactor := 10.0 / (24 * 60) // added jitter is bound by jitterFactor * period - - return &runnables.Leader{ - Runnable: runnables.NewCronJob( - runnables.CronJobConfig{ - Worker: worker, - Logger: logger, - Period: cfg.TelemetryReportPeriod, - JitterFactor: jitterFactor, - }, - ), - } -} - func setInitialConfig( reader client.Reader, logger logr.Logger, diff --git a/internal/mode/static/state/change_processor.go b/internal/mode/static/state/change_processor.go index 93229e0d45..385ae0e938 100644 --- a/internal/mode/static/state/change_processor.go +++ b/internal/mode/static/state/change_processor.go @@ -62,6 +62,8 @@ type ChangeProcessor interface { // Process produces a graph-like representation of GatewayAPI resources. // If no changes were captured, the changed return argument will be NoChange and graph will be empty. Process() (changeType ChangeType, graphCfg *graph.Graph) + // GetLatestGraph returns the latest Graph. + GetLatestGraph() *graph.Graph } // ChangeProcessorConfig holds configuration parameters for ChangeProcessorImpl. @@ -256,3 +258,10 @@ func (c *ChangeProcessorImpl) Process() (ChangeType, *graph.Graph) { return changeType, c.latestGraph } + +func (c *ChangeProcessorImpl) GetLatestGraph() *graph.Graph { + c.lock.Lock() + defer c.lock.Unlock() + + return c.latestGraph +} diff --git a/internal/mode/static/state/change_processor_test.go b/internal/mode/static/state/change_processor_test.go index 8c009c20d4..dba175af51 100644 --- a/internal/mode/static/state/change_processor_test.go +++ b/internal/mode/static/state/change_processor_test.go @@ -520,6 +520,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.NoChange)) Expect(graphCfg).To(BeNil()) + Expect(processor.GetLatestGraph()).To(BeNil()) }) }) When("GatewayClass doesn't exist", func() { @@ -530,6 +531,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(&graph.Graph{}, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(&graph.Graph{}, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("Gateways don't exist", func() { @@ -540,6 +542,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(&graph.Graph{}, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(&graph.Graph{}, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the different namespace TLS Secret is upserted", func() { @@ -549,6 +552,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.NoChange)) Expect(graphCfg).To(BeNil()) + Expect(helpers.Diff(&graph.Graph{}, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the first Gateway is upserted", func() { @@ -584,6 +588,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) }) @@ -637,6 +642,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the ReferenceGrant allowing the Gateway to reference its Secret is upserted", func() { @@ -659,6 +665,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the ReferenceGrant allowing the hr1 to reference the Service in different ns is upserted", func() { @@ -672,6 +679,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the Gateway API CRD with bundle version annotation change is processed", func() { @@ -689,6 +697,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the Gateway API CRD without bundle version annotation change is processed", func() { @@ -697,9 +706,18 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(gatewayAPICRDSameVersion) + expGraph.ReferencedSecrets[client.ObjectKeyFromObject(diffNsTLSSecret)] = &graph.Secret{ + Source: diffNsTLSSecret, + } + + expGraph.GatewayClass.Conditions = conditions.NewGatewayClassSupportedVersionBestEffort( + gatewayclass.SupportedVersion, + ) + changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.NoChange)) Expect(graphCfg).To(BeNil()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the Gateway API CRD with bundle version annotation change is processed", func() { @@ -714,6 +732,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the first HTTPRoute update with a generation changed is processed", func() { @@ -732,6 +751,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }, ) }) @@ -747,6 +767,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the GatewayClass update with generation change is processed", func() { @@ -761,6 +782,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the different namespace TLS secret is upserted again", func() { @@ -774,24 +796,33 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("no changes are captured", func() { It("returns nil graph", func() { - changed, graphCfg := processor.Process() + expGraph.ReferencedSecrets[client.ObjectKeyFromObject(diffNsTLSSecret)] = &graph.Secret{ + Source: diffNsTLSSecret, + } + changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.NoChange)) Expect(graphCfg).To(BeNil()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the same namespace TLS Secret is upserted", func() { It("returns nil graph", func() { processor.CaptureUpsertChange(sameNsTLSSecret) - changed, graphCfg := processor.Process() + expGraph.ReferencedSecrets[client.ObjectKeyFromObject(diffNsTLSSecret)] = &graph.Secret{ + Source: diffNsTLSSecret, + } + changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.NoChange)) Expect(graphCfg).To(BeNil()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the second Gateway is upserted", func() { @@ -808,6 +839,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the second HTTPRoute is upserted", func() { @@ -833,6 +865,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the first Gateway is deleted", func() { @@ -868,6 +901,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the second HTTPRoute is deleted", func() { @@ -900,6 +934,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the GatewayClass is deleted", func() { @@ -923,6 +958,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the second Gateway is deleted", func() { @@ -938,6 +974,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(&graph.Graph{}, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(&graph.Graph{}, processor.GetLatestGraph())).To(BeEmpty()) }) }) When("the first HTTPRoute is deleted", func() { @@ -953,6 +990,7 @@ var _ = Describe("ChangeProcessor", func() { changed, graphCfg := processor.Process() Expect(changed).To(Equal(state.ClusterStateChange)) Expect(helpers.Diff(&graph.Graph{}, graphCfg)).To(BeEmpty()) + Expect(helpers.Diff(&graph.Graph{}, processor.GetLatestGraph())).To(BeEmpty()) }) }) }) diff --git a/internal/mode/static/state/graph/service_test.go b/internal/mode/static/state/graph/service_test.go index 82cf2960e0..2eb0aaf646 100644 --- a/internal/mode/static/state/graph/service_test.go +++ b/internal/mode/static/state/graph/service_test.go @@ -4,7 +4,6 @@ import ( "testing" . "github.com/onsi/gomega" - "k8s.io/apimachinery/pkg/types" ) diff --git a/internal/mode/static/state/statefakes/fake_change_processor.go b/internal/mode/static/state/statefakes/fake_change_processor.go index e9a37bbd68..1b710d4148 100644 --- a/internal/mode/static/state/statefakes/fake_change_processor.go +++ b/internal/mode/static/state/statefakes/fake_change_processor.go @@ -22,6 +22,16 @@ type FakeChangeProcessor struct { captureUpsertChangeArgsForCall []struct { arg1 client.Object } + GetLatestGraphStub func() *graph.Graph + getLatestGraphMutex sync.RWMutex + getLatestGraphArgsForCall []struct { + } + getLatestGraphReturns struct { + result1 *graph.Graph + } + getLatestGraphReturnsOnCall map[int]struct { + result1 *graph.Graph + } ProcessStub func() (state.ChangeType, *graph.Graph) processMutex sync.RWMutex processArgsForCall []struct { @@ -103,6 +113,59 @@ func (fake *FakeChangeProcessor) CaptureUpsertChangeArgsForCall(i int) client.Ob return argsForCall.arg1 } +func (fake *FakeChangeProcessor) GetLatestGraph() *graph.Graph { + fake.getLatestGraphMutex.Lock() + ret, specificReturn := fake.getLatestGraphReturnsOnCall[len(fake.getLatestGraphArgsForCall)] + fake.getLatestGraphArgsForCall = append(fake.getLatestGraphArgsForCall, struct { + }{}) + stub := fake.GetLatestGraphStub + fakeReturns := fake.getLatestGraphReturns + fake.recordInvocation("GetLatestGraph", []interface{}{}) + fake.getLatestGraphMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeChangeProcessor) GetLatestGraphCallCount() int { + fake.getLatestGraphMutex.RLock() + defer fake.getLatestGraphMutex.RUnlock() + return len(fake.getLatestGraphArgsForCall) +} + +func (fake *FakeChangeProcessor) GetLatestGraphCalls(stub func() *graph.Graph) { + fake.getLatestGraphMutex.Lock() + defer fake.getLatestGraphMutex.Unlock() + fake.GetLatestGraphStub = stub +} + +func (fake *FakeChangeProcessor) GetLatestGraphReturns(result1 *graph.Graph) { + fake.getLatestGraphMutex.Lock() + defer fake.getLatestGraphMutex.Unlock() + fake.GetLatestGraphStub = nil + fake.getLatestGraphReturns = struct { + result1 *graph.Graph + }{result1} +} + +func (fake *FakeChangeProcessor) GetLatestGraphReturnsOnCall(i int, result1 *graph.Graph) { + fake.getLatestGraphMutex.Lock() + defer fake.getLatestGraphMutex.Unlock() + fake.GetLatestGraphStub = nil + if fake.getLatestGraphReturnsOnCall == nil { + fake.getLatestGraphReturnsOnCall = make(map[int]struct { + result1 *graph.Graph + }) + } + fake.getLatestGraphReturnsOnCall[i] = struct { + result1 *graph.Graph + }{result1} +} + func (fake *FakeChangeProcessor) Process() (state.ChangeType, *graph.Graph) { fake.processMutex.Lock() ret, specificReturn := fake.processReturnsOnCall[len(fake.processArgsForCall)] @@ -166,6 +229,8 @@ func (fake *FakeChangeProcessor) Invocations() map[string][][]interface{} { defer fake.captureDeleteChangeMutex.RUnlock() fake.captureUpsertChangeMutex.RLock() defer fake.captureUpsertChangeMutex.RUnlock() + fake.getLatestGraphMutex.RLock() + defer fake.getLatestGraphMutex.RUnlock() fake.processMutex.RLock() defer fake.processMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/internal/mode/static/telemetry/collector.go b/internal/mode/static/telemetry/collector.go new file mode 100644 index 0000000000..416b004eef --- /dev/null +++ b/internal/mode/static/telemetry/collector.go @@ -0,0 +1,149 @@ +package telemetry + +import ( + "context" + "errors" + "fmt" + + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/graph" +) + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . GraphGetter + +// GraphGetter gets the latest Graph. +type GraphGetter interface { + GetLatestGraph() *graph.Graph +} + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . ConfigurationGetter + +// ConfigurationGetter gets the latest Configuration. +type ConfigurationGetter interface { + GetLatestConfiguration() *dataplane.Configuration +} + +// NGFResourceCounts stores the counts of all relevant resources that NGF processes and generates configuration from. +type NGFResourceCounts struct { + Gateways int + GatewayClasses int + HTTPRoutes int + Secrets int + Services int + // Endpoints include the total count of Endpoints(IP:port) across all referenced services. + Endpoints int +} + +// ProjectMetadata stores the name of the project and the current version. +type ProjectMetadata struct { + Name string + Version string +} + +// Data is telemetry data. +// Note: this type might change once https://github.com/nginxinc/nginx-gateway-fabric/issues/1318 is implemented. +type Data struct { + ProjectMetadata ProjectMetadata + NodeCount int + NGFResourceCounts NGFResourceCounts +} + +// DataCollectorConfig holds configuration parameters for DataCollectorImpl. +type DataCollectorConfig struct { + // K8sClientReader is a Kubernetes API client Reader. + K8sClientReader client.Reader + // GraphGetter allows us to get the Graph. + GraphGetter GraphGetter + // ConfigurationGetter allows us to get the Configuration. + ConfigurationGetter ConfigurationGetter + // Version is the NGF version. + Version string +} + +// DataCollectorImpl is am implementation of DataCollector. +type DataCollectorImpl struct { + cfg DataCollectorConfig +} + +// NewDataCollectorImpl creates a new DataCollectorImpl for a telemetry Job. +func NewDataCollectorImpl( + cfg DataCollectorConfig, +) *DataCollectorImpl { + return &DataCollectorImpl{ + cfg: cfg, + } +} + +// Collect collects and returns telemetry Data. +func (c DataCollectorImpl) Collect(ctx context.Context) (Data, error) { + nodeCount, err := collectNodeCount(ctx, c.cfg.K8sClientReader) + if err != nil { + return Data{}, fmt.Errorf("failed to collect node count: %w", err) + } + + graphResourceCount, err := collectGraphResourceCount(c.cfg.GraphGetter, c.cfg.ConfigurationGetter) + if err != nil { + return Data{}, fmt.Errorf("failed to collect NGF resource counts: %w", err) + } + + data := Data{ + NodeCount: nodeCount, + NGFResourceCounts: graphResourceCount, + ProjectMetadata: ProjectMetadata{ + Name: "NGF", + Version: c.cfg.Version, + }, + } + + return data, nil +} + +func collectNodeCount(ctx context.Context, k8sClient client.Reader) (int, error) { + var nodes v1.NodeList + if err := k8sClient.List(ctx, &nodes); err != nil { + return 0, err + } + + return len(nodes.Items), nil +} + +func collectGraphResourceCount( + graphGetter GraphGetter, + configurationGetter ConfigurationGetter, +) (NGFResourceCounts, error) { + ngfResourceCounts := NGFResourceCounts{} + g := graphGetter.GetLatestGraph() + cfg := configurationGetter.GetLatestConfiguration() + + if g == nil { + return ngfResourceCounts, errors.New("latest graph cannot be nil") + } + if cfg == nil { + return ngfResourceCounts, errors.New("latest configuration cannot be nil") + } + + ngfResourceCounts.GatewayClasses = len(g.IgnoredGatewayClasses) + if g.GatewayClass != nil { + ngfResourceCounts.GatewayClasses++ + } + + ngfResourceCounts.Gateways = len(g.IgnoredGateways) + if g.Gateway != nil { + ngfResourceCounts.Gateways++ + } + + ngfResourceCounts.HTTPRoutes = len(g.Routes) + ngfResourceCounts.Secrets = len(g.ReferencedSecrets) + ngfResourceCounts.Services = len(g.ReferencedServices) + + for _, upstream := range cfg.Upstreams { + if upstream.ErrorMsg == "" { + ngfResourceCounts.Endpoints += len(upstream.Endpoints) + } + } + + return ngfResourceCounts, nil +} diff --git a/internal/mode/static/telemetry/collector_test.go b/internal/mode/static/telemetry/collector_test.go new file mode 100644 index 0000000000..54a7cf8539 --- /dev/null +++ b/internal/mode/static/telemetry/collector_test.go @@ -0,0 +1,366 @@ +package telemetry_test + +import ( + "context" + "errors" + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + "github.com/nginxinc/nginx-gateway-fabric/internal/framework/events/eventsfakes" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/graph" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/resolver" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry/telemetryfakes" +) + +func createListCallsFunc(nodes []v1.Node) func( + ctx context.Context, + list client.ObjectList, + option ...client.ListOption, +) error { + return func(ctx context.Context, list client.ObjectList, option ...client.ListOption) error { + Expect(option).To(BeEmpty()) + + switch typedList := list.(type) { + case *v1.NodeList: + typedList.Items = append(typedList.Items, nodes...) + default: + Fail(fmt.Sprintf("unknown type: %T", typedList)) + } + return nil + } +} + +var _ = Describe("Collector", Ordered, func() { + var ( + k8sClientReader *eventsfakes.FakeReader + fakeGraphGetter *telemetryfakes.FakeGraphGetter + fakeConfigurationGetter *telemetryfakes.FakeConfigurationGetter + dataCollector telemetry.DataCollector + version string + expData telemetry.Data + ctx context.Context + ) + + BeforeAll(func() { + ctx = context.Background() + version = "1.1" + }) + + BeforeEach(func() { + expData = telemetry.Data{ + ProjectMetadata: telemetry.ProjectMetadata{Name: "NGF", Version: version}, + NodeCount: 0, + NGFResourceCounts: telemetry.NGFResourceCounts{}, + } + + k8sClientReader = &eventsfakes.FakeReader{} + fakeGraphGetter = &telemetryfakes.FakeGraphGetter{} + fakeConfigurationGetter = &telemetryfakes.FakeConfigurationGetter{} + + fakeGraphGetter.GetLatestGraphReturns(&graph.Graph{}) + fakeConfigurationGetter.GetLatestConfigurationReturns(&dataplane.Configuration{}) + + dataCollector = telemetry.NewDataCollectorImpl(telemetry.DataCollectorConfig{ + K8sClientReader: k8sClientReader, + GraphGetter: fakeGraphGetter, + ConfigurationGetter: fakeConfigurationGetter, + Version: version, + }) + }) + + Describe("Normal case", func() { + When("collecting telemetry data", func() { + It("collects all fields", func() { + nodes := []v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node2"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node3"}, + }, + } + + k8sClientReader.ListCalls(createListCallsFunc(nodes)) + + secret1 := &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "secret1"}} + secret2 := &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "secret2"}} + nilsecret := &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "nilsecret"}} + + svc1 := &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1"}} + svc2 := &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc2"}} + nilsvc := &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "nilsvc"}} + + graph := &graph.Graph{ + GatewayClass: &graph.GatewayClass{}, + Gateway: &graph.Gateway{}, + IgnoredGatewayClasses: map[types.NamespacedName]*gatewayv1.GatewayClass{ + {Name: "ignoredGC1"}: {}, + {Name: "ignoredGC2"}: {}, + }, + IgnoredGateways: map[types.NamespacedName]*gatewayv1.Gateway{ + {Name: "ignoredGw1"}: {}, + {Name: "ignoredGw2"}: {}, + }, + Routes: map[types.NamespacedName]*graph.Route{ + {Namespace: "test", Name: "hr-1"}: {}, + {Namespace: "test", Name: "hr-2"}: {}, + {Namespace: "test", Name: "hr-3"}: {}, + }, + ReferencedSecrets: map[types.NamespacedName]*graph.Secret{ + client.ObjectKeyFromObject(secret1): { + Source: secret1, + }, + client.ObjectKeyFromObject(secret2): { + Source: secret2, + }, + client.ObjectKeyFromObject(nilsecret): nil, + }, + ReferencedServices: map[types.NamespacedName]struct{}{ + client.ObjectKeyFromObject(svc1): {}, + client.ObjectKeyFromObject(svc2): {}, + client.ObjectKeyFromObject(nilsvc): {}, + }, + } + + config := &dataplane.Configuration{ + Upstreams: []dataplane.Upstream{ + { + Name: "upstream1", + ErrorMsg: "", + Endpoints: []resolver.Endpoint{ + { + Address: "endpoint1", + Port: 80, + }, { + Address: "endpoint2", + Port: 80, + }, { + Address: "endpoint3", + Port: 80, + }, + }, + }, + { + Name: "upstream2", + ErrorMsg: "", + Endpoints: []resolver.Endpoint{ + { + Address: "endpoint1", + Port: 80, + }, + }, + }, + }, + } + + fakeGraphGetter.GetLatestGraphReturns(graph) + fakeConfigurationGetter.GetLatestConfigurationReturns(config) + + expData.NodeCount = 3 + expData.NGFResourceCounts = telemetry.NGFResourceCounts{ + Gateways: 3, + GatewayClasses: 3, + HTTPRoutes: 3, + Secrets: 3, + Services: 3, + Endpoints: 4, + } + + data, err := dataCollector.Collect(ctx) + + Expect(err).To(BeNil()) + Expect(expData).To(Equal(data)) + }) + }) + }) + + Describe("node count collector", func() { + When("collecting node count data", func() { + It("collects correct data for no nodes", func() { + k8sClientReader.ListCalls(createListCallsFunc(nil)) + + data, err := dataCollector.Collect(ctx) + + Expect(err).To(BeNil()) + Expect(expData).To(Equal(data)) + }) + + It("collects correct data for one node", func() { + nodes := []v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + }, + } + + k8sClientReader.ListCalls(createListCallsFunc(nodes)) + + expData.NodeCount = 1 + + data, err := dataCollector.Collect(ctx) + + Expect(err).To(BeNil()) + Expect(expData).To(Equal(data)) + }) + }) + When("it encounters an error while collecting data", func() { + It("should error on kubernetes client api errors", func() { + k8sClientReader.ListReturns(errors.New("there was an error")) + + _, err := dataCollector.Collect(ctx) + Expect(err).To(HaveOccurred()) + }) + }) + }) + + Describe("NGF resource count collector", func() { + var ( + graph1 *graph.Graph + config1, invalidUpstreamsConfig *dataplane.Configuration + ) + + BeforeAll(func() { + secret := &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "secret1"}} + svc := &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1"}} + + graph1 = &graph.Graph{ + GatewayClass: &graph.GatewayClass{}, + Gateway: &graph.Gateway{}, + Routes: map[types.NamespacedName]*graph.Route{ + {Namespace: "test", Name: "hr-1"}: {}, + }, + ReferencedSecrets: map[types.NamespacedName]*graph.Secret{ + client.ObjectKeyFromObject(secret): { + Source: secret, + }, + }, + ReferencedServices: map[types.NamespacedName]struct{}{ + client.ObjectKeyFromObject(svc): {}, + }, + } + + config1 = &dataplane.Configuration{ + Upstreams: []dataplane.Upstream{ + { + Name: "upstream1", + ErrorMsg: "", + Endpoints: []resolver.Endpoint{ + { + Address: "endpoint1", + Port: 80, + }, + }, + }, + }, + } + + invalidUpstreamsConfig = &dataplane.Configuration{ + Upstreams: []dataplane.Upstream{ + { + Name: "invalidUpstream", + ErrorMsg: "there is an error here", + Endpoints: []resolver.Endpoint{ + { + Address: "endpoint1", + Port: 80, + }, { + Address: "endpoint2", + Port: 80, + }, { + Address: "endpoint3", + Port: 80, + }, + }, + }, + { + Name: "emptyUpstream", + ErrorMsg: "", + Endpoints: []resolver.Endpoint{}, + }, + }, + } + }) + + When("collecting NGF resource counts", func() { + It("collects correct data for graph with no resources", func() { + fakeGraphGetter.GetLatestGraphReturns(&graph.Graph{}) + fakeConfigurationGetter.GetLatestConfigurationReturns(&dataplane.Configuration{}) + + expData.NGFResourceCounts = telemetry.NGFResourceCounts{} + + data, err := dataCollector.Collect(ctx) + + Expect(err).To(BeNil()) + Expect(expData).To(Equal(data)) + }) + + It("collects correct data for graph with one of each resource", func() { + fakeGraphGetter.GetLatestGraphReturns(graph1) + fakeConfigurationGetter.GetLatestConfigurationReturns(config1) + + expData.NGFResourceCounts = telemetry.NGFResourceCounts{ + Gateways: 1, + GatewayClasses: 1, + HTTPRoutes: 1, + Secrets: 1, + Services: 1, + Endpoints: 1, + } + + data, err := dataCollector.Collect(ctx) + + Expect(err).To(BeNil()) + Expect(expData).To(Equal(data)) + }) + + It("ignores invalid and empty upstreams", func() { + fakeGraphGetter.GetLatestGraphReturns(&graph.Graph{}) + fakeConfigurationGetter.GetLatestConfigurationReturns(invalidUpstreamsConfig) + expData.NGFResourceCounts = telemetry.NGFResourceCounts{ + Gateways: 0, + GatewayClasses: 0, + HTTPRoutes: 0, + Secrets: 0, + Services: 0, + Endpoints: 0, + } + + data, err := dataCollector.Collect(ctx) + + Expect(err).To(BeNil()) + Expect(expData).To(Equal(data)) + }) + + When("it encounters an error while collecting data", func() { + BeforeEach(func() { + fakeGraphGetter.GetLatestGraphReturns(&graph.Graph{}) + fakeConfigurationGetter.GetLatestConfigurationReturns(&dataplane.Configuration{}) + }) + It("should error on nil latest graph", func() { + fakeGraphGetter.GetLatestGraphReturns(nil) + + _, err := dataCollector.Collect(ctx) + Expect(err).To(HaveOccurred()) + }) + + It("should error on nil latest configuration", func() { + fakeConfigurationGetter.GetLatestConfigurationReturns(nil) + + _, err := dataCollector.Collect(ctx) + Expect(err).To(HaveOccurred()) + }) + }) + }) + }) +}) diff --git a/internal/mode/static/telemetry/exporter.go b/internal/mode/static/telemetry/exporter.go index c010510926..55bee6f2be 100644 --- a/internal/mode/static/telemetry/exporter.go +++ b/internal/mode/static/telemetry/exporter.go @@ -6,10 +6,6 @@ import ( "github.com/go-logr/logr" ) -// Data is telemetry data. -// Note: this type might change once https://github.com/nginxinc/nginx-gateway-fabric/issues/1318 is implemented. -type Data struct{} - //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Exporter // Exporter exports telemetry data to some destination. diff --git a/internal/mode/static/telemetry/job_worker.go b/internal/mode/static/telemetry/job_worker.go new file mode 100644 index 0000000000..a4dc81932a --- /dev/null +++ b/internal/mode/static/telemetry/job_worker.go @@ -0,0 +1,39 @@ +package telemetry + +import ( + "context" + + "github.com/go-logr/logr" +) + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . DataCollector + +// DataCollector collects telemetry data. +type DataCollector interface { + // Collect collects and returns telemetry Data. + Collect(ctx context.Context) (Data, error) +} + +func CreateTelemetryJobWorker( + logger logr.Logger, + exporter Exporter, + dataCollector DataCollector, +) func(ctx context.Context) { + return func(ctx context.Context) { + // Gather telemetry + logger.V(1).Info("Gathering telemetry data") + + // We will need to gather data as defined in https://github.com/nginxinc/nginx-gateway-fabric/issues/793 + data, err := dataCollector.Collect(ctx) + if err != nil { + logger.Error(err, "Failed to collect telemetry data") + } + + // Export telemetry + logger.V(1).Info("Exporting telemetry data") + + if err := exporter.Export(ctx, data); err != nil { + logger.Error(err, "Failed to export telemetry data") + } + } +} diff --git a/internal/mode/static/telemetry/job_worker_test.go b/internal/mode/static/telemetry/job_worker_test.go new file mode 100644 index 0000000000..4e804ae26d --- /dev/null +++ b/internal/mode/static/telemetry/job_worker_test.go @@ -0,0 +1,44 @@ +package telemetry_test + +import ( + "context" + "testing" + "time" + + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry/telemetryfakes" +) + +func TestCreateTelemetryJobWorker(t *testing.T) { + g := NewWithT(t) + + exporter := &telemetryfakes.FakeExporter{} + dataCollector := &telemetryfakes.FakeDataCollector{} + + worker := telemetry.CreateTelemetryJobWorker(zap.New(), exporter, dataCollector) + + expData := telemetry.Data{ + ProjectMetadata: telemetry.ProjectMetadata{Name: "NGF", Version: "1.1"}, + NodeCount: 3, + NGFResourceCounts: telemetry.NGFResourceCounts{ + Gateways: 1, + GatewayClasses: 1, + HTTPRoutes: 1, + Secrets: 1, + Services: 1, + Endpoints: 1, + }, + } + dataCollector.CollectReturns(expData, nil) + + timeout := 10 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + worker(ctx) + _, data := exporter.ExportArgsForCall(0) + g.Expect(data).To(Equal(expData)) +} diff --git a/internal/mode/static/telemetry/telemetryfakes/fake_configuration_getter.go b/internal/mode/static/telemetry/telemetryfakes/fake_configuration_getter.go new file mode 100644 index 0000000000..841fff7679 --- /dev/null +++ b/internal/mode/static/telemetry/telemetryfakes/fake_configuration_getter.go @@ -0,0 +1,103 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package telemetryfakes + +import ( + "sync" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry" +) + +type FakeConfigurationGetter struct { + GetLatestConfigurationStub func() *dataplane.Configuration + getLatestConfigurationMutex sync.RWMutex + getLatestConfigurationArgsForCall []struct { + } + getLatestConfigurationReturns struct { + result1 *dataplane.Configuration + } + getLatestConfigurationReturnsOnCall map[int]struct { + result1 *dataplane.Configuration + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeConfigurationGetter) GetLatestConfiguration() *dataplane.Configuration { + fake.getLatestConfigurationMutex.Lock() + ret, specificReturn := fake.getLatestConfigurationReturnsOnCall[len(fake.getLatestConfigurationArgsForCall)] + fake.getLatestConfigurationArgsForCall = append(fake.getLatestConfigurationArgsForCall, struct { + }{}) + stub := fake.GetLatestConfigurationStub + fakeReturns := fake.getLatestConfigurationReturns + fake.recordInvocation("GetLatestConfiguration", []interface{}{}) + fake.getLatestConfigurationMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeConfigurationGetter) GetLatestConfigurationCallCount() int { + fake.getLatestConfigurationMutex.RLock() + defer fake.getLatestConfigurationMutex.RUnlock() + return len(fake.getLatestConfigurationArgsForCall) +} + +func (fake *FakeConfigurationGetter) GetLatestConfigurationCalls(stub func() *dataplane.Configuration) { + fake.getLatestConfigurationMutex.Lock() + defer fake.getLatestConfigurationMutex.Unlock() + fake.GetLatestConfigurationStub = stub +} + +func (fake *FakeConfigurationGetter) GetLatestConfigurationReturns(result1 *dataplane.Configuration) { + fake.getLatestConfigurationMutex.Lock() + defer fake.getLatestConfigurationMutex.Unlock() + fake.GetLatestConfigurationStub = nil + fake.getLatestConfigurationReturns = struct { + result1 *dataplane.Configuration + }{result1} +} + +func (fake *FakeConfigurationGetter) GetLatestConfigurationReturnsOnCall(i int, result1 *dataplane.Configuration) { + fake.getLatestConfigurationMutex.Lock() + defer fake.getLatestConfigurationMutex.Unlock() + fake.GetLatestConfigurationStub = nil + if fake.getLatestConfigurationReturnsOnCall == nil { + fake.getLatestConfigurationReturnsOnCall = make(map[int]struct { + result1 *dataplane.Configuration + }) + } + fake.getLatestConfigurationReturnsOnCall[i] = struct { + result1 *dataplane.Configuration + }{result1} +} + +func (fake *FakeConfigurationGetter) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.getLatestConfigurationMutex.RLock() + defer fake.getLatestConfigurationMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeConfigurationGetter) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ telemetry.ConfigurationGetter = new(FakeConfigurationGetter) diff --git a/internal/mode/static/telemetry/telemetryfakes/fake_data_collector.go b/internal/mode/static/telemetry/telemetryfakes/fake_data_collector.go new file mode 100644 index 0000000000..0bce02c41c --- /dev/null +++ b/internal/mode/static/telemetry/telemetryfakes/fake_data_collector.go @@ -0,0 +1,117 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package telemetryfakes + +import ( + "context" + "sync" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry" +) + +type FakeDataCollector struct { + CollectStub func(context.Context) (telemetry.Data, error) + collectMutex sync.RWMutex + collectArgsForCall []struct { + arg1 context.Context + } + collectReturns struct { + result1 telemetry.Data + result2 error + } + collectReturnsOnCall map[int]struct { + result1 telemetry.Data + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeDataCollector) Collect(arg1 context.Context) (telemetry.Data, error) { + fake.collectMutex.Lock() + ret, specificReturn := fake.collectReturnsOnCall[len(fake.collectArgsForCall)] + fake.collectArgsForCall = append(fake.collectArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.CollectStub + fakeReturns := fake.collectReturns + fake.recordInvocation("Collect", []interface{}{arg1}) + fake.collectMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeDataCollector) CollectCallCount() int { + fake.collectMutex.RLock() + defer fake.collectMutex.RUnlock() + return len(fake.collectArgsForCall) +} + +func (fake *FakeDataCollector) CollectCalls(stub func(context.Context) (telemetry.Data, error)) { + fake.collectMutex.Lock() + defer fake.collectMutex.Unlock() + fake.CollectStub = stub +} + +func (fake *FakeDataCollector) CollectArgsForCall(i int) context.Context { + fake.collectMutex.RLock() + defer fake.collectMutex.RUnlock() + argsForCall := fake.collectArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeDataCollector) CollectReturns(result1 telemetry.Data, result2 error) { + fake.collectMutex.Lock() + defer fake.collectMutex.Unlock() + fake.CollectStub = nil + fake.collectReturns = struct { + result1 telemetry.Data + result2 error + }{result1, result2} +} + +func (fake *FakeDataCollector) CollectReturnsOnCall(i int, result1 telemetry.Data, result2 error) { + fake.collectMutex.Lock() + defer fake.collectMutex.Unlock() + fake.CollectStub = nil + if fake.collectReturnsOnCall == nil { + fake.collectReturnsOnCall = make(map[int]struct { + result1 telemetry.Data + result2 error + }) + } + fake.collectReturnsOnCall[i] = struct { + result1 telemetry.Data + result2 error + }{result1, result2} +} + +func (fake *FakeDataCollector) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.collectMutex.RLock() + defer fake.collectMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeDataCollector) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ telemetry.DataCollector = new(FakeDataCollector) diff --git a/internal/mode/static/telemetry/telemetryfakes/fake_graph_getter.go b/internal/mode/static/telemetry/telemetryfakes/fake_graph_getter.go new file mode 100644 index 0000000000..2b7c5b2e13 --- /dev/null +++ b/internal/mode/static/telemetry/telemetryfakes/fake_graph_getter.go @@ -0,0 +1,103 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package telemetryfakes + +import ( + "sync" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/graph" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry" +) + +type FakeGraphGetter struct { + GetLatestGraphStub func() *graph.Graph + getLatestGraphMutex sync.RWMutex + getLatestGraphArgsForCall []struct { + } + getLatestGraphReturns struct { + result1 *graph.Graph + } + getLatestGraphReturnsOnCall map[int]struct { + result1 *graph.Graph + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeGraphGetter) GetLatestGraph() *graph.Graph { + fake.getLatestGraphMutex.Lock() + ret, specificReturn := fake.getLatestGraphReturnsOnCall[len(fake.getLatestGraphArgsForCall)] + fake.getLatestGraphArgsForCall = append(fake.getLatestGraphArgsForCall, struct { + }{}) + stub := fake.GetLatestGraphStub + fakeReturns := fake.getLatestGraphReturns + fake.recordInvocation("GetLatestGraph", []interface{}{}) + fake.getLatestGraphMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeGraphGetter) GetLatestGraphCallCount() int { + fake.getLatestGraphMutex.RLock() + defer fake.getLatestGraphMutex.RUnlock() + return len(fake.getLatestGraphArgsForCall) +} + +func (fake *FakeGraphGetter) GetLatestGraphCalls(stub func() *graph.Graph) { + fake.getLatestGraphMutex.Lock() + defer fake.getLatestGraphMutex.Unlock() + fake.GetLatestGraphStub = stub +} + +func (fake *FakeGraphGetter) GetLatestGraphReturns(result1 *graph.Graph) { + fake.getLatestGraphMutex.Lock() + defer fake.getLatestGraphMutex.Unlock() + fake.GetLatestGraphStub = nil + fake.getLatestGraphReturns = struct { + result1 *graph.Graph + }{result1} +} + +func (fake *FakeGraphGetter) GetLatestGraphReturnsOnCall(i int, result1 *graph.Graph) { + fake.getLatestGraphMutex.Lock() + defer fake.getLatestGraphMutex.Unlock() + fake.GetLatestGraphStub = nil + if fake.getLatestGraphReturnsOnCall == nil { + fake.getLatestGraphReturnsOnCall = make(map[int]struct { + result1 *graph.Graph + }) + } + fake.getLatestGraphReturnsOnCall[i] = struct { + result1 *graph.Graph + }{result1} +} + +func (fake *FakeGraphGetter) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.getLatestGraphMutex.RLock() + defer fake.getLatestGraphMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeGraphGetter) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ telemetry.GraphGetter = new(FakeGraphGetter)