Skip to content

Commit 9ef6bdd

Browse files
committed
[usage] Ensure controlle ticks are not concurrent
1 parent 65384db commit 9ef6bdd

File tree

2 files changed

+44
-8
lines changed

2 files changed

+44
-8
lines changed

components/usage/pkg/controller/controller.go

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,38 @@ type Controller struct {
2626

2727
scheduler *cron.Cron
2828

29+
jobs chan struct{}
2930
runningJobs sync.WaitGroup
3031
}
3132

3233
func (c *Controller) Start() error {
3334
log.Info("Starting usage controller.")
35+
// Using channel of size 1 ensures we don't queue up overly many runs when there is already 1 queued up.
36+
c.jobs = make(chan struct{}, 1)
37+
38+
go func() {
39+
// Here, we guarantee we're only ever executing 1 job at a time - in other words we always wait for the previous job to finish.
40+
for range c.jobs {
41+
c.runningJobs.Add(1)
42+
defer c.runningJobs.Done()
43+
44+
err := c.reconciler.Reconcile()
45+
if err != nil {
46+
log.WithError(err).Errorf("Reconciliation run failed.")
47+
} else {
48+
log.Info("Completed usage reconciliation run without errors.")
49+
}
50+
}
51+
}()
3452

3553
err := c.scheduler.AddFunc(fmt.Sprintf("@every %s", c.schedule.String()), cron.FuncJob(func() {
3654
log.Info("Starting usage reconciliation.")
3755

38-
c.runningJobs.Add(1)
39-
defer c.runningJobs.Done()
40-
41-
err := c.reconciler.Reconcile()
42-
if err != nil {
43-
log.WithError(err).Errorf("Reconciliation run failed.")
44-
} else {
45-
log.Info("Completed usage reconciliation run without errors.")
56+
select {
57+
case c.jobs <- struct{}{}:
58+
log.Info("Triggered next reconciliation.")
59+
default:
60+
log.Info("Previous reconciliation loop is still running, skipping.")
4661
}
4762
}))
4863
if err != nil {
@@ -60,7 +75,10 @@ func (c *Controller) Stop() {
6075
// Stop any new jobs from running
6176
c.scheduler.Stop()
6277

78+
close(c.jobs)
79+
6380
log.Info("Awaiting existing reconciliation runs to complete..")
6481
// Wait for existing jobs to finish
6582
c.runningJobs.Wait()
83+
6684
}

components/usage/pkg/controller/controller_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package controller
66

77
import (
88
"github.com/stretchr/testify/require"
9+
"sync/atomic"
910
"testing"
1011
"time"
1112
)
@@ -26,6 +27,23 @@ func TestController(t *testing.T) {
2627
ctrl.Stop()
2728
}
2829

30+
func TestController_PreventsConcurrentRunsOfReconcilerFunc(t *testing.T) {
31+
schedule := 1 * time.Second
32+
count := int32(0)
33+
34+
ctrl, err := New(schedule, ReconcilerFunc(func() error {
35+
atomic.AddInt32(&count, 1)
36+
time.Sleep(3 * time.Second)
37+
return nil
38+
}))
39+
require.NoError(t, err)
40+
41+
require.NoError(t, ctrl.Start())
42+
time.Sleep(schedule + 2*time.Second)
43+
require.Equal(t, int32(1), count, "must trigger reconciler function exactly once")
44+
ctrl.Stop()
45+
}
46+
2947
func TestController_GracefullyHandlesPanic(t *testing.T) {
3048
ctrl, err := New(20*time.Millisecond, ReconcilerFunc(func() error {
3149
panic("pls help")

0 commit comments

Comments
 (0)