From bfb09757c11417af79e15d92056b853d80e712e4 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 16 Sep 2020 10:42:22 +0000 Subject: [PATCH 1/3] Smooth out spikes in rate of chunk flush ops Ingester chunk flushes run periodically, by default every minute. Add a rate-limiter so we spread out calls to the DB across the period, avoiding spikes of intense activity which can slow down other operations such as incoming Push() calls. Signed-off-by: Bryan Boreham --- CHANGELOG.md | 1 + pkg/ingester/flush.go | 23 +++++++++++++++++++++++ pkg/ingester/ingester.go | 28 +++++++++++++++++----------- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 19cfe87e297..e36961bef4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ * [ENHANCEMENT] Blocksconvert – Scanner: support for scanning specific date-range only. #3222 * [ENHANCEMENT] Blocksconvert – Scanner: metrics for tracking progress. #3222 * [ENHANCEMENT] Blocksconvert – Builder: retry block upload before giving up. #3245 +* [ENHANCEMENT] Reduce tail latency by smoothing out spikes in rate of chunk flush operations. #3191 * [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178 * [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195 * [BUGFIX] Handle hash-collisions in the query path. #3192 diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 1b3d0806a2d..d0521e82493 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -10,6 +10,7 @@ import ( ot "github.com/opentracing/opentracing-go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "golang.org/x/time/rate" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/util" @@ -19,6 +20,8 @@ const ( // Backoff for retrying 'immediate' flushes. Only counts for queue // position, not wallclock time. flushBackoff = 1 * time.Second + // Slowest per-second we will rate-limit flush calls to the chunk store + minFlushRate = 10 ) // Flush triggers a flush of all the chunks and closes the flush queues. @@ -94,6 +97,25 @@ func (i *Ingester) sweepUsers(immediate bool) { } i.metrics.oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix())) + i.setFlushRate() +} + +// Compute a rate such to spread calls to the store over nearly all of the flush period, +// for example if we have 600 items in the queue and period 1 min we will send 10.5 per second. +// Note if the store can't keep up with this rate then it doesn't make any difference. +func (i *Ingester) setFlushRate() { + totalQueueLength := 0 + for _, q := range i.flushQueues { + totalQueueLength += q.Length() + } + const fudge = 1.05 // aim to finish a little bit before the end of the period + flushesPerSecond := float64(totalQueueLength) / i.cfg.FlushCheckPeriod.Seconds() * fudge + // Avoid going very slowly with tiny queues + if flushesPerSecond < minFlushRate { + flushesPerSecond = minFlushRate + } + level.Debug(util.Logger).Log("msg", "computed flush rate", "rate", flushesPerSecond) + i.flushRateLimiter.SetLimit(rate.Limit(flushesPerSecond)) } type flushReason int8 @@ -229,6 +251,7 @@ func (i *Ingester) flushLoop(j int) { }() for { + _ = i.flushRateLimiter.Wait(context.Background()) o := i.flushQueues[j].Dequeue() if o == nil { return diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 10ee8853169..9932112db60 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -18,6 +18,7 @@ import ( tsdb_record "github.com/prometheus/prometheus/tsdb/record" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + "golang.org/x/time/rate" "google.golang.org/grpc/codes" cortex_chunk "github.com/cortexproject/cortex/pkg/chunk" @@ -141,6 +142,9 @@ type Ingester struct { flushQueues []*util.PriorityQueue flushQueuesDone sync.WaitGroup + // Spread out calls to the chunk store over the flush period + flushRateLimiter *rate.Limiter + // This should never be nil. wal WAL // To be passed to the WAL. @@ -196,11 +200,12 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c clientConfig: clientConfig, metrics: newIngesterMetrics(registerer, true, cfg.ActiveSeriesMetricsEnabled), - limits: limits, - chunkStore: chunkStore, - flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), - usersMetadata: map[string]*userMetricsMetadata{}, - registerer: registerer, + limits: limits, + chunkStore: chunkStore, + flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), + flushRateLimiter: rate.NewLimiter(minFlushRate, 1), + usersMetadata: map[string]*userMetricsMetadata{}, + registerer: registerer, } var err error @@ -275,12 +280,13 @@ func NewForFlusher(cfg Config, chunkStore ChunkStore, limits *validation.Overrid } i := &Ingester{ - cfg: cfg, - metrics: newIngesterMetrics(registerer, true, false), - chunkStore: chunkStore, - flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), - wal: &noopWAL{}, - limits: limits, + cfg: cfg, + metrics: newIngesterMetrics(registerer, true, false), + chunkStore: chunkStore, + flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), + flushRateLimiter: rate.NewLimiter(rate.Inf, 1), + wal: &noopWAL{}, + limits: limits, } i.BasicService = services.NewBasicService(i.startingForFlusher, i.loopForFlusher, i.stopping) From 4ba98fc615aedb109d8b377ecf732eeb88eae0c1 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sun, 27 Sep 2020 16:53:20 +0000 Subject: [PATCH 2/3] Wait after dequeuing, in flush rate-limiter This means we can check if it's an immediate flush, and also has better behaviour when transitioning from a fast to a slow rate, or vice-versa. Signed-off-by: Bryan Boreham --- pkg/ingester/flush.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index d0521e82493..9522854d422 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -251,13 +251,15 @@ func (i *Ingester) flushLoop(j int) { }() for { - _ = i.flushRateLimiter.Wait(context.Background()) o := i.flushQueues[j].Dequeue() if o == nil { return } op := o.(*flushOp) + if !op.immediate { + _ = i.flushRateLimiter.Wait(context.Background()) + } outcome, err := i.flushUserSeries(j, op.userID, op.fp, op.immediate) i.metrics.seriesDequeuedOutcome.WithLabelValues(outcome.String()).Inc() if err != nil { From 2f1812f428f6581b056cee6d436dd3534ecbb653 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 28 Sep 2020 08:58:22 +0000 Subject: [PATCH 3/3] Change min ratelimit calculation to per period So when you have a short flush period it will go faster Also initialize the rate-limit to "Inf" or no limit, so we start out fast and slow down once we know what the queue is like. Signed-off-by: Bryan Boreham --- pkg/ingester/flush.go | 8 ++++---- pkg/ingester/ingester.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 9522854d422..fd26a08da5e 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -20,8 +20,8 @@ const ( // Backoff for retrying 'immediate' flushes. Only counts for queue // position, not wallclock time. flushBackoff = 1 * time.Second - // Slowest per-second we will rate-limit flush calls to the chunk store - minFlushRate = 10 + // Lower bound on flushes per check period for rate-limiter + minFlushes = 100 ) // Flush triggers a flush of all the chunks and closes the flush queues. @@ -111,8 +111,8 @@ func (i *Ingester) setFlushRate() { const fudge = 1.05 // aim to finish a little bit before the end of the period flushesPerSecond := float64(totalQueueLength) / i.cfg.FlushCheckPeriod.Seconds() * fudge // Avoid going very slowly with tiny queues - if flushesPerSecond < minFlushRate { - flushesPerSecond = minFlushRate + if flushesPerSecond*i.cfg.FlushCheckPeriod.Seconds() < minFlushes { + flushesPerSecond = minFlushes / i.cfg.FlushCheckPeriod.Seconds() } level.Debug(util.Logger).Log("msg", "computed flush rate", "rate", flushesPerSecond) i.flushRateLimiter.SetLimit(rate.Limit(flushesPerSecond)) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9932112db60..aaff13ffaa8 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -203,7 +203,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c limits: limits, chunkStore: chunkStore, flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), - flushRateLimiter: rate.NewLimiter(minFlushRate, 1), + flushRateLimiter: rate.NewLimiter(rate.Inf, 1), usersMetadata: map[string]*userMetricsMetadata{}, registerer: registerer, }