diff --git a/components/usage/pkg/controller/controller.go b/components/usage/pkg/controller/controller.go index 8a6ae3189e0213..6ecbcffba41dcb 100644 --- a/components/usage/pkg/controller/controller.go +++ b/components/usage/pkg/controller/controller.go @@ -26,23 +26,38 @@ type Controller struct { scheduler *cron.Cron + jobs chan struct{} runningJobs sync.WaitGroup } func (c *Controller) Start() error { log.Info("Starting usage controller.") + // Using channel of size 1 ensures we don't queue up overly many runs when there is already 1 queued up. + c.jobs = make(chan struct{}, 1) + + go func() { + // Here, we guarantee we're only ever executing 1 job at a time - in other words we always wait for the previous job to finish. + for range c.jobs { + c.runningJobs.Add(1) + defer c.runningJobs.Done() + + err := c.reconciler.Reconcile() + if err != nil { + log.WithError(err).Errorf("Reconciliation run failed.") + } else { + log.Info("Completed usage reconciliation run without errors.") + } + } + }() err := c.scheduler.AddFunc(fmt.Sprintf("@every %s", c.schedule.String()), cron.FuncJob(func() { log.Info("Starting usage reconciliation.") - c.runningJobs.Add(1) - defer c.runningJobs.Done() - - err := c.reconciler.Reconcile() - if err != nil { - log.WithError(err).Errorf("Reconciliation run failed.") - } else { - log.Info("Completed usage reconciliation run without errors.") + select { + case c.jobs <- struct{}{}: + log.Info("Triggered next reconciliation.") + default: + log.Info("Previous reconciliation loop is still running, skipping.") } })) if err != nil { @@ -60,7 +75,10 @@ func (c *Controller) Stop() { // Stop any new jobs from running c.scheduler.Stop() + close(c.jobs) + log.Info("Awaiting existing reconciliation runs to complete..") // Wait for existing jobs to finish c.runningJobs.Wait() + } diff --git a/components/usage/pkg/controller/controller_test.go b/components/usage/pkg/controller/controller_test.go index b639dfa5abdcdd..12026aeaa92130 100644 --- a/components/usage/pkg/controller/controller_test.go +++ b/components/usage/pkg/controller/controller_test.go @@ -6,6 +6,7 @@ package controller import ( "github.com/stretchr/testify/require" + "sync/atomic" "testing" "time" ) @@ -26,6 +27,23 @@ func TestController(t *testing.T) { ctrl.Stop() } +func TestController_PreventsConcurrentRunsOfReconcilerFunc(t *testing.T) { + schedule := 1 * time.Second + count := int32(0) + + ctrl, err := New(schedule, ReconcilerFunc(func() error { + atomic.AddInt32(&count, 1) + time.Sleep(3 * time.Second) + return nil + })) + require.NoError(t, err) + + require.NoError(t, ctrl.Start()) + time.Sleep(schedule + 2*time.Second) + require.Equal(t, int32(1), count, "must trigger reconciler function exactly once") + ctrl.Stop() +} + func TestController_GracefullyHandlesPanic(t *testing.T) { ctrl, err := New(20*time.Millisecond, ReconcilerFunc(func() error { panic("pls help")