Skip to content

Commit dd1f3d3

Browse files
committed
Smooth out spikes in rate of chunk flush ops
Ingester chunk flushes run periodically, by default every minute. Add a rate-limiter so we spread out calls to the DB across the period, avoiding spikes of intense activity which can slow down other operations such as incoming Push() calls. Signed-off-by: Bryan Boreham <[email protected]>
1 parent dd5e14d commit dd1f3d3

File tree

3 files changed

+42
-11
lines changed

3 files changed

+42
-11
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## master / unreleased
44

5+
* [ENHANCEMENT] Smooth out spikes in rate of chunk flush operations. #3191
6+
57
## 1.4.0-rc.0 in progress
68

79
* [CHANGE] Cassandra backend support is now GA (stable). #3180

pkg/ingester/flush.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
ot "github.com/opentracing/opentracing-go"
1111
"github.com/prometheus/common/model"
1212
"github.com/prometheus/prometheus/pkg/labels"
13+
"golang.org/x/time/rate"
1314

1415
"github.com/cortexproject/cortex/pkg/chunk"
1516
"github.com/cortexproject/cortex/pkg/util"
@@ -19,6 +20,8 @@ const (
1920
// Backoff for retrying 'immediate' flushes. Only counts for queue
2021
// position, not wallclock time.
2122
flushBackoff = 1 * time.Second
23+
// Slowest per-second we will rate-limit flush calls to the DB
24+
minFlushRate = 10
2225
)
2326

2427
// Flush triggers a flush of all the chunks and closes the flush queues.
@@ -94,6 +97,25 @@ func (i *Ingester) sweepUsers(immediate bool) {
9497
}
9598

9699
i.metrics.oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix()))
100+
i.setFlushRate()
101+
}
102+
103+
// Compute a rate such to spread DB calls over nearly all of the flush period,
104+
// for example if we have 600 items in the queue and period 1 min we will send 10.5 per second.
105+
// Note if the DB can't keep up with this rate then it doesn't make any difference.
106+
func (i *Ingester) setFlushRate() {
107+
totalQueueLength := 0
108+
for _, q := range i.flushQueues {
109+
totalQueueLength += q.Length()
110+
}
111+
const fudge = 1.05 // aim to finish a little bit before the end of the period
112+
flushesPerSecond := float64(totalQueueLength) / i.cfg.FlushCheckPeriod.Seconds() * fudge
113+
// Avoid going very slowly with tiny queues
114+
if flushesPerSecond < minFlushRate {
115+
flushesPerSecond = minFlushRate
116+
}
117+
level.Debug(util.Logger).Log("msg", "computed flush rate", "rate", flushesPerSecond)
118+
i.flushRateLimiter.SetLimit(rate.Limit(flushesPerSecond))
97119
}
98120

99121
type flushReason int8
@@ -229,6 +251,7 @@ func (i *Ingester) flushLoop(j int) {
229251
}()
230252

231253
for {
254+
_ = i.flushRateLimiter.Wait(context.Background())
232255
o := i.flushQueues[j].Dequeue()
233256
if o == nil {
234257
return

pkg/ingester/ingester.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
tsdb_record "github.com/prometheus/prometheus/tsdb/record"
1919
"github.com/weaveworks/common/httpgrpc"
2020
"github.com/weaveworks/common/user"
21+
"golang.org/x/time/rate"
2122
"google.golang.org/grpc/codes"
2223

2324
cortex_chunk "github.com/cortexproject/cortex/pkg/chunk"
@@ -134,6 +135,9 @@ type Ingester struct {
134135
flushQueues []*util.PriorityQueue
135136
flushQueuesDone sync.WaitGroup
136137

138+
// Spread out calls to the DB over the flush period
139+
flushRateLimiter *rate.Limiter
140+
137141
// This should never be nil.
138142
wal WAL
139143
// To be passed to the WAL.
@@ -189,11 +193,12 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
189193
clientConfig: clientConfig,
190194
metrics: newIngesterMetrics(registerer, true),
191195

192-
limits: limits,
193-
chunkStore: chunkStore,
194-
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
195-
usersMetadata: map[string]*userMetricsMetadata{},
196-
registerer: registerer,
196+
limits: limits,
197+
chunkStore: chunkStore,
198+
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
199+
flushRateLimiter: rate.NewLimiter(minFlushRate, 1),
200+
usersMetadata: map[string]*userMetricsMetadata{},
201+
registerer: registerer,
197202
}
198203

199204
var err error
@@ -268,12 +273,13 @@ func NewForFlusher(cfg Config, chunkStore ChunkStore, limits *validation.Overrid
268273
}
269274

270275
i := &Ingester{
271-
cfg: cfg,
272-
metrics: newIngesterMetrics(registerer, true),
273-
chunkStore: chunkStore,
274-
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
275-
wal: &noopWAL{},
276-
limits: limits,
276+
cfg: cfg,
277+
metrics: newIngesterMetrics(registerer, true),
278+
chunkStore: chunkStore,
279+
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
280+
flushRateLimiter: rate.NewLimiter(rate.Inf, 1),
281+
wal: &noopWAL{},
282+
limits: limits,
277283
}
278284

279285
i.BasicService = services.NewBasicService(i.startingForFlusher, i.loopForFlusher, i.stopping)

0 commit comments

Comments
 (0)