From e31817d37e39bdf787f9efdb707553c96a7ce5e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Wed, 9 Aug 2023 14:48:31 -0700 Subject: [PATCH] Batch adding series to query limiter to optimize locks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- CHANGELOG.md | 1 + pkg/distributor/distributor.go | 20 +++--- pkg/distributor/query.go | 19 +++--- pkg/util/limiter/query_limiter.go | 13 ++-- pkg/util/limiter/query_limiter_test.go | 88 ++++++++++++++++++++++---- 5 files changed, 106 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 571177527f8..93b67b9eb72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ * [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476 * [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481 * [ENHANCEMENT] Compactor: allow unregisteronshutdown to be configurable. #5503 +* [ENHANCEMENT] Querier: Batch adding series to query limiter to optimize locking. #5505 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 7ce4179e54a..16eee1f4448 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1093,17 +1093,18 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through if err := queryLimiter.AddDataBytes(resp.Size()); err != nil { return nil, validation.LimitError(err.Error()) } + s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric)) for _, m := range resp.Metric { - if err := queryLimiter.AddSeries(m.Labels); err != nil { - return nil, validation.LimitError(err.Error()) - } + s = append(s, m.Labels) m := cortexpb.FromLabelAdaptersToMetric(m.Labels) fingerprint := m.Fingerprint() mutex.Lock() (*metrics)[fingerprint] = m mutex.Unlock() } - + if err := queryLimiter.AddSeries(s...); err != nil { + return nil, validation.LimitError(err.Error()) + } return nil, nil }) @@ -1130,19 +1131,18 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t } else if err != nil { return nil, err } - + s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric)) for _, metric := range resp.Metric { m := cortexpb.FromLabelAdaptersToMetricWithCopy(metric.Labels) - - if err := queryLimiter.AddSeries(metric.Labels); err != nil { - return nil, validation.LimitError(err.Error()) - } - + s = append(s, metric.Labels) fingerprint := m.Fingerprint() mutex.Lock() (*metrics)[fingerprint] = m mutex.Unlock() } + if err := queryLimiter.AddSeries(s...); err != nil { + return nil, validation.LimitError(err.Error()) + } } return nil, nil diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index f54bb737fa6..21aa3419ac2 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -326,10 +326,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri return nil, validation.LimitError(chunkLimitErr.Error()) } + s := make([][]cortexpb.LabelAdapter, 0, len(resp.Chunkseries)+len(resp.Timeseries)) for _, series := range resp.Chunkseries { - if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil { - return nil, validation.LimitError(limitErr.Error()) - } + s = append(s, series.Labels) + } + + for _, series := range resp.Timeseries { + s = append(s, series.Labels) + } + + if limitErr := queryLimiter.AddSeries(s...); limitErr != nil { + return nil, validation.LimitError(limitErr.Error()) } if chunkBytesLimitErr := queryLimiter.AddChunkBytes(resp.ChunksSize()); chunkBytesLimitErr != nil { @@ -340,12 +347,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri return nil, validation.LimitError(dataBytesLimitErr.Error()) } - for _, series := range resp.Timeseries { - if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil { - return nil, validation.LimitError(limitErr.Error()) - } - } - result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...) result.Timeseries = append(result.Timeseries, resp.Timeseries...) } diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go index 84031711e1b..aa2261f7e6b 100644 --- a/pkg/util/limiter/query_limiter.go +++ b/pkg/util/limiter/query_limiter.go @@ -65,18 +65,23 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { return ql } -// AddSeries adds the input series and returns an error if the limit is reached. -func (ql *QueryLimiter) AddSeries(seriesLabels []cortexpb.LabelAdapter) error { +// AddSeriesBatch adds the batch of input series and returns an error if the limit is reached. +func (ql *QueryLimiter) AddSeries(series ...[]cortexpb.LabelAdapter) error { // If the max series is unlimited just return without managing map if ql.maxSeriesPerQuery == 0 { return nil } - fingerprint := client.FastFingerprint(seriesLabels) + fps := make([]model.Fingerprint, 0, len(series)) + for _, s := range series { + fps = append(fps, client.FastFingerprint(s)) + } ql.uniqueSeriesMx.Lock() defer ql.uniqueSeriesMx.Unlock() + for _, fp := range fps { + ql.uniqueSeries[fp] = struct{}{} + } - ql.uniqueSeries[fingerprint] = struct{}{} if len(ql.uniqueSeries) > ql.maxSeriesPerQuery { // Format error with max limit return fmt.Errorf(ErrMaxSeriesHit, ql.maxSeriesPerQuery) diff --git a/pkg/util/limiter/query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go index 02b1fc9f73c..699adccd32e 100644 --- a/pkg/util/limiter/query_limiter_test.go +++ b/pkg/util/limiter/query_limiter_test.go @@ -2,6 +2,7 @@ package limiter import ( "fmt" + "sync" "testing" "github.com/prometheus/prometheus/model/labels" @@ -87,6 +88,37 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T) require.Error(t, err) } +func TestQueryLimiter_AddSeriesBatch_ShouldReturnErrorOnLimitExceeded(t *testing.T) { + const ( + metricName = "test_metric" + ) + + limiter := NewQueryLimiter(10, 0, 0, 0) + series := make([][]cortexpb.LabelAdapter, 0, 10) + + for i := 0; i < 10; i++ { + s := []cortexpb.LabelAdapter{ + { + Name: labels.MetricName, + Value: fmt.Sprintf("%v_%v", metricName, i), + }, + } + series = append(series, s) + } + err := limiter.AddSeries(series...) + require.NoError(t, err) + + series1 := []cortexpb.LabelAdapter{ + { + Name: labels.MetricName, + Value: metricName + "_11", + }, + } + + err = limiter.AddSeries(series1) + require.Error(t, err) +} + func TestQueryLimiter_AddChunkBytes(t *testing.T) { var limiter = NewQueryLimiter(0, 100, 0, 0) @@ -106,23 +138,55 @@ func TestQueryLimiter_AddDataBytes(t *testing.T) { } func BenchmarkQueryLimiter_AddSeries(b *testing.B) { + AddSeriesConcurrentBench(b, 1) +} + +func BenchmarkQueryLimiter_AddSeriesBatch(b *testing.B) { + AddSeriesConcurrentBench(b, 128) +} + +func AddSeriesConcurrentBench(b *testing.B, batchSize int) { + b.ResetTimer() const ( metricName = "test_metric" ) - var series []labels.Labels - for i := 0; i < b.N; i++ { - series = append(series, - labels.FromMap(map[string]string{ - labels.MetricName: metricName + "_1", - "series1": fmt.Sprint(i), - })) - } - b.ResetTimer() limiter := NewQueryLimiter(b.N+1, 0, 0, 0) - for _, s := range series { - err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s)) - assert.NoError(b, err) + + // Concurrent goroutines trying to add duplicated series + const numWorkers = 100 + var wg sync.WaitGroup + + worker := func(w int) { + defer wg.Done() + var series []labels.Labels + for i := 0; i < b.N; i++ { + series = append(series, + labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_1", + "series1": fmt.Sprint(i), + })) + } + + for i := 0; i < len(series); i += batchSize { + s := make([][]cortexpb.LabelAdapter, 0, batchSize) + j := i + batchSize + if j > len(series) { + j = len(series) + } + for k := i; k < j; k++ { + s = append(s, cortexpb.FromLabelsToLabelAdapters(series[k])) + } + + err := limiter.AddSeries(s...) + assert.NoError(b, err) + } + } + + for w := 1; w <= numWorkers; w++ { + wg.Add(1) + go worker(w) } + wg.Wait() }