Skip to content

Commit 56aa40c

Browse files
authored
Smooth out spikes in rate of chunk flush ops (#3191)
Smooth out spikes in rate of chunk flush ops Ingester chunk flushes run periodically, by default every minute. Add a rate-limiter so we spread out calls to the chunk store across the period, avoiding spikes of intense activity which can slow down other operations such as incoming Push() calls. Signed-off-by: Bryan Boreham <[email protected]>
1 parent e319a66 commit 56aa40c

File tree

3 files changed

+43
-11
lines changed

3 files changed

+43
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* [ENHANCEMENT] Blocksconvert – Scanner: metrics for tracking progress. #3222
6161
* [ENHANCEMENT] Blocksconvert – Builder: retry block upload before giving up. #3245
6262
* [ENHANCEMENT] Hash ring: added instance registered timestamp to the ring. #3248
63+
* [ENHANCEMENT] Reduce tail latency by smoothing out spikes in rate of chunk flush operations. #3191
6364
* [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178
6465
* [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195
6566
* [BUGFIX] Handle hash-collisions in the query path. #3192

pkg/ingester/flush.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
ot "github.com/opentracing/opentracing-go"
1111
"github.com/prometheus/common/model"
1212
"github.com/prometheus/prometheus/pkg/labels"
13+
"golang.org/x/time/rate"
1314

1415
"github.com/cortexproject/cortex/pkg/chunk"
1516
"github.com/cortexproject/cortex/pkg/util"
@@ -19,6 +20,8 @@ const (
1920
// Backoff for retrying 'immediate' flushes. Only counts for queue
2021
// position, not wallclock time.
2122
flushBackoff = 1 * time.Second
23+
// Lower bound on flushes per check period for rate-limiter
24+
minFlushes = 100
2225
)
2326

2427
// Flush triggers a flush of all the chunks and closes the flush queues.
@@ -94,6 +97,25 @@ func (i *Ingester) sweepUsers(immediate bool) {
9497
}
9598

9699
i.metrics.oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix()))
100+
i.setFlushRate()
101+
}
102+
103+
// Compute a rate such to spread calls to the store over nearly all of the flush period,
104+
// for example if we have 600 items in the queue and period 1 min we will send 10.5 per second.
105+
// Note if the store can't keep up with this rate then it doesn't make any difference.
106+
func (i *Ingester) setFlushRate() {
107+
totalQueueLength := 0
108+
for _, q := range i.flushQueues {
109+
totalQueueLength += q.Length()
110+
}
111+
const fudge = 1.05 // aim to finish a little bit before the end of the period
112+
flushesPerSecond := float64(totalQueueLength) / i.cfg.FlushCheckPeriod.Seconds() * fudge
113+
// Avoid going very slowly with tiny queues
114+
if flushesPerSecond*i.cfg.FlushCheckPeriod.Seconds() < minFlushes {
115+
flushesPerSecond = minFlushes / i.cfg.FlushCheckPeriod.Seconds()
116+
}
117+
level.Debug(util.Logger).Log("msg", "computed flush rate", "rate", flushesPerSecond)
118+
i.flushRateLimiter.SetLimit(rate.Limit(flushesPerSecond))
97119
}
98120

99121
type flushReason int8
@@ -235,6 +257,9 @@ func (i *Ingester) flushLoop(j int) {
235257
}
236258
op := o.(*flushOp)
237259

260+
if !op.immediate {
261+
_ = i.flushRateLimiter.Wait(context.Background())
262+
}
238263
outcome, err := i.flushUserSeries(j, op.userID, op.fp, op.immediate)
239264
i.metrics.seriesDequeuedOutcome.WithLabelValues(outcome.String()).Inc()
240265
if err != nil {

pkg/ingester/ingester.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
tsdb_record "github.com/prometheus/prometheus/tsdb/record"
1919
"github.com/weaveworks/common/httpgrpc"
2020
"github.com/weaveworks/common/user"
21+
"golang.org/x/time/rate"
2122
"google.golang.org/grpc/codes"
2223

2324
cortex_chunk "github.com/cortexproject/cortex/pkg/chunk"
@@ -141,6 +142,9 @@ type Ingester struct {
141142
flushQueues []*util.PriorityQueue
142143
flushQueuesDone sync.WaitGroup
143144

145+
// Spread out calls to the chunk store over the flush period
146+
flushRateLimiter *rate.Limiter
147+
144148
// This should never be nil.
145149
wal WAL
146150
// To be passed to the WAL.
@@ -196,11 +200,12 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
196200
clientConfig: clientConfig,
197201
metrics: newIngesterMetrics(registerer, true, cfg.ActiveSeriesMetricsEnabled),
198202

199-
limits: limits,
200-
chunkStore: chunkStore,
201-
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
202-
usersMetadata: map[string]*userMetricsMetadata{},
203-
registerer: registerer,
203+
limits: limits,
204+
chunkStore: chunkStore,
205+
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
206+
flushRateLimiter: rate.NewLimiter(rate.Inf, 1),
207+
usersMetadata: map[string]*userMetricsMetadata{},
208+
registerer: registerer,
204209
}
205210

206211
var err error
@@ -275,12 +280,13 @@ func NewForFlusher(cfg Config, chunkStore ChunkStore, limits *validation.Overrid
275280
}
276281

277282
i := &Ingester{
278-
cfg: cfg,
279-
metrics: newIngesterMetrics(registerer, true, false),
280-
chunkStore: chunkStore,
281-
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
282-
wal: &noopWAL{},
283-
limits: limits,
283+
cfg: cfg,
284+
metrics: newIngesterMetrics(registerer, true, false),
285+
chunkStore: chunkStore,
286+
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
287+
flushRateLimiter: rate.NewLimiter(rate.Inf, 1),
288+
wal: &noopWAL{},
289+
limits: limits,
284290
}
285291

286292
i.BasicService = services.NewBasicService(i.startingForFlusher, i.loopForFlusher, i.stopping)

0 commit comments

Comments
 (0)