Skip to content

Commit 55d8e99

Browse files
committed
Make telemetry job a generic runnable (nginx#1529)
Problem: Multiple reporting systems need to use the telemetry job, and duplicated code would like to be avoided. Solution: Turn the telemetry job into a generic job that runs a "worker" function, provided by the caller.
1 parent 4622790 commit 55d8e99

File tree

5 files changed

+131
-139
lines changed

5 files changed

+131
-139
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package runnables
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/go-logr/logr"
8+
"k8s.io/apimachinery/pkg/util/wait"
9+
"sigs.k8s.io/controller-runtime/pkg/manager"
10+
)
11+
12+
// CronJobConfig is the configuration for a cronjob.
13+
type CronJobConfig struct {
14+
// Worker is the function that will be run for every cronjob iteration.
15+
Worker func(context.Context)
16+
// Logger is the logger.
17+
Logger logr.Logger
18+
// Period defines the period of the cronjob. The cronjob will run every Period.
19+
Period time.Duration
20+
// JitterFactor sets the jitter for the cronjob. If positive, the period is jittered before every
21+
// run of the worker. If jitterFactor is not positive, the period is unchanged and not jittered.
22+
JitterFactor float64
23+
}
24+
25+
// CronJob periodically runs a worker function.
26+
type CronJob struct {
27+
cfg CronJobConfig
28+
}
29+
30+
// NewCronJob creates a new cronjob.
31+
func NewCronJob(cfg CronJobConfig) *CronJob {
32+
return &CronJob{
33+
cfg: cfg,
34+
}
35+
}
36+
37+
// Start starts the cronjob.
38+
// Implements controller-runtime manager.Runnable
39+
func (j *CronJob) Start(ctx context.Context) error {
40+
j.cfg.Logger.Info("Starting cronjob")
41+
42+
sliding := true // This means the period with jitter will be calculated after each worker call.
43+
44+
wait.JitterUntilWithContext(ctx, j.cfg.Worker, j.cfg.Period, j.cfg.JitterFactor, sliding)
45+
46+
j.cfg.Logger.Info("Stopping cronjob")
47+
return nil
48+
}
49+
50+
var _ manager.Runnable = &CronJob{}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package runnables
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
. "github.com/onsi/gomega"
9+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
10+
)
11+
12+
func TestCronJob(t *testing.T) {
13+
g := NewWithT(t)
14+
15+
timeout := 10 * time.Second
16+
var callCount int
17+
18+
valCh := make(chan int, 128)
19+
worker := func(context.Context) {
20+
callCount++
21+
valCh <- callCount
22+
}
23+
24+
cfg := CronJobConfig{
25+
Worker: worker,
26+
Logger: zap.New(),
27+
Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times
28+
}
29+
job := NewCronJob(cfg)
30+
31+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
32+
33+
errCh := make(chan error)
34+
go func() {
35+
errCh <- job.Start(ctx)
36+
close(errCh)
37+
}()
38+
39+
minReports := 2 // ensure that the CronJob reports more than once: it doesn't exit after the first run
40+
41+
g.Eventually(valCh).Should(Receive(BeNumerically(">=", minReports)))
42+
43+
cancel()
44+
g.Eventually(errCh).Should(Receive(BeNil()))
45+
g.Eventually(errCh).Should(BeClosed())
46+
}

internal/mode/static/manager.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -213,17 +213,7 @@ func StartManager(cfg config.Config) error {
213213
return fmt.Errorf("cannot register status updater: %w", err)
214214
}
215215

216-
telemetryJob := &runnables.Leader{
217-
Runnable: telemetry.NewJob(
218-
telemetry.JobConfig{
219-
Exporter: telemetry.NewLoggingExporter(cfg.Logger.WithName("telemetryExporter").V(1 /* debug */)),
220-
Logger: cfg.Logger.WithName("telemetryJob"),
221-
Period: cfg.TelemetryReportPeriod,
222-
},
223-
),
224-
}
225-
226-
if err = mgr.Add(telemetryJob); err != nil {
216+
if err = mgr.Add(createTelemetryJob(cfg)); err != nil {
227217
return fmt.Errorf("cannot register telemetry job: %w", err)
228218
}
229219

@@ -448,6 +438,40 @@ func prepareFirstEventBatchPreparerArgs(
448438
return objects, objectLists
449439
}
450440

441+
func createTelemetryJob(cfg config.Config) *runnables.Leader {
442+
logger := cfg.Logger.WithName("telemetryJob")
443+
exporter := telemetry.NewLoggingExporter(cfg.Logger.WithName("telemetryExporter").V(1 /* debug */))
444+
445+
worker := func(ctx context.Context) {
446+
// Gather telemetry
447+
logger.V(1).Info("Gathering telemetry")
448+
449+
// We will need to gather data as defined in https://github.com/nginxinc/nginx-gateway-fabric/issues/793
450+
data := telemetry.Data{}
451+
452+
// Export telemetry
453+
logger.V(1).Info("Exporting telemetry")
454+
455+
if err := exporter.Export(ctx, data); err != nil {
456+
logger.Error(err, "Failed to export telemetry")
457+
}
458+
}
459+
// 10 min jitter is enough per telemetry destination recommendation
460+
// For the default period of 24 hours, jitter will be 10min /(24*60)min = 0.0069
461+
jitterFactor := 10.0 / (24 * 60) // added jitter is bound by jitterFactor * period
462+
463+
return &runnables.Leader{
464+
Runnable: runnables.NewCronJob(
465+
runnables.CronJobConfig{
466+
Worker: worker,
467+
Logger: logger,
468+
Period: cfg.TelemetryReportPeriod,
469+
JitterFactor: jitterFactor,
470+
},
471+
),
472+
}
473+
}
474+
451475
func setInitialConfig(
452476
reader client.Reader,
453477
logger logr.Logger,

internal/mode/static/telemetry/job.go

Lines changed: 0 additions & 67 deletions
This file was deleted.

internal/mode/static/telemetry/job_test.go

Lines changed: 0 additions & 61 deletions
This file was deleted.

0 commit comments

Comments
 (0)