diff --git a/CHANGELOG.md b/CHANGELOG.md index 69e49d7ed4e..91ab74622d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ * [ENHANCEMENT] Emit querier `max_concurrent` as a metric. #5362 * [ENHANCEMENT] Do not resync blocks in running store gateways during rollout deployment and container restart. #5363 * [ENHANCEMENT] Store Gateway: Add new metrics `cortex_bucket_store_sent_chunk_size_bytes`, `cortex_bucket_store_postings_size_bytes` and `cortex_bucket_store_empty_postings_total`. #5397 +* [ENHANCEMENT] Add jitter to lifecycler heartbeat. #5404 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293 diff --git a/pkg/ring/basic_lifecycler.go b/pkg/ring/basic_lifecycler.go index e8bcf8914bb..015e31d96a0 100644 --- a/pkg/ring/basic_lifecycler.go +++ b/pkg/ring/basic_lifecycler.go @@ -3,6 +3,7 @@ package ring import ( "context" "fmt" + mathrand "math/rand" "sort" "sync" "time" @@ -185,8 +186,18 @@ func (l *BasicLifecycler) starting(ctx context.Context) error { } func (l *BasicLifecycler) running(ctx context.Context) error { - heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(l.cfg.HeartbeatPeriod) - defer heartbeatTickerStop() + var heartbeatTickerChan <-chan time.Time + if uint64(l.cfg.HeartbeatPeriod) > 0 { + heartbeatTicker := time.NewTicker(l.cfg.HeartbeatPeriod) + heartbeatTicker.Stop() + time.AfterFunc(time.Duration(uint64(mathrand.Int63())%uint64(l.cfg.HeartbeatPeriod)), func() { + l.heartbeat(ctx) + heartbeatTicker.Reset(l.cfg.HeartbeatPeriod) + }) + defer heartbeatTicker.Stop() + + heartbeatTickerChan = heartbeatTicker.C + } for { select { diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 9bf69bff736..44185ac2210 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + mathrand "math/rand" "os" "sort" "sync" @@ -429,8 +430,18 @@ func (i *Lifecycler) loop(ctx context.Context) error { autoJoinAfter = time.After(i.cfg.JoinAfter) } - heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod) - defer heartbeatTickerStop() + var heartbeatTickerChan <-chan time.Time + if uint64(i.cfg.HeartbeatPeriod) > 0 { + heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod) + heartbeatTicker.Stop() + time.AfterFunc(time.Duration(uint64(mathrand.Int63())%uint64(i.cfg.HeartbeatPeriod)), func() { + i.heartbeat() + heartbeatTicker.Reset(i.cfg.HeartbeatPeriod) + }) + defer heartbeatTicker.Stop() + + heartbeatTickerChan = heartbeatTicker.C + } for { select { @@ -486,11 +497,7 @@ func (i *Lifecycler) loop(ctx context.Context) error { } case <-heartbeatTickerChan: - i.lifecyclerMetrics.consulHeartbeats.Inc() - if err := i.updateConsul(context.Background()); err != nil { - level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err) - } - + i.heartbeat() case f := <-i.actorChan: f() @@ -501,6 +508,13 @@ func (i *Lifecycler) loop(ctx context.Context) error { } } +func (i *Lifecycler) heartbeat() { + i.lifecyclerMetrics.consulHeartbeats.Inc() + if err := i.updateConsul(context.Background()); err != nil { + level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err) + } +} + // Shutdown the lifecycle. It will: // - send chunks to another ingester, if it can. // - otherwise, flush chunks to the chunk store.