From 949192b5bb504263272e8e38d990e1bee01a4096 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Tue, 21 Jul 2020 11:43:03 +0200 Subject: [PATCH] Manipulate ingesters query min time when -querier.query-ingesters-within is set Signed-off-by: Marco Pracucci --- CHANGELOG.md | 1 + pkg/querier/blocks_store_queryable.go | 2 +- pkg/querier/distributor_queryable.go | 83 ++++++++++------- pkg/querier/distributor_queryable_test.go | 104 ++++++++++++++++++---- pkg/querier/metadata_handler_test.go | 13 +-- pkg/querier/querier_test.go | 8 +- 6 files changed, 153 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 984bb80b92d..a6379df8279 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ * [CHANGE] Ingester: Chunks flushed via /flush stay in memory until retention period is reached. This affects `cortex_ingester_memory_chunks` metric. #2778 * [CHANGE] Querier: the error message returned when the query time range exceeds `-store.max-query-length` has changed from `invalid query, length > limit (X > Y)` to `the query time range exceeds the limit (query length: X, limit: Y)`. #2826 * [CHANGE] Add `component` label to metrics exposed by chunk, delete and index store clients. #2774 +* [CHANGE] Querier: when `-querier.query-ingesters-within` is configured, the time range of the query sent to ingesters is now manipulated to ensure the query start time is not older than 'now - query-ingesters-within'. #2904 * [CHANGE] KV: The `role` label which was a label of `multi` KV store client only has been added to metrics of every KV store client. If KV store client is not `multi`, then the value of `role` label is `primary`. #2837 * [CHANGE] Added the `engine` label to the metrics exposed by the Prometheus query engine, to distinguish between `ruler` and `querier` metrics. #2854 * [CHANGE] Added ruler to the single binary when started with `-target=all` (default). #2854 diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 6c8e392cacb..70cd4ae085d 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -334,7 +334,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* maxT = util.Min64(maxT, util.TimeToMillis(now.Add(-q.queryStoreAfter))) if origMaxT != maxT { - level.Debug(spanLog).Log("msg", "query max time has been manipulated", "original", origMaxT, "updated", maxT) + level.Debug(spanLog).Log("msg", "the max time of the query to blocks storage has been manipulated", "original", origMaxT, "updated", maxT) } if maxT < minT { diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 675946f0e4c..9f397b896c1 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -5,6 +5,7 @@ import ( "sort" "time" + "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" @@ -31,44 +32,46 @@ type Distributor interface { MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) } -func newDistributorQueryable(distributor Distributor, streaming bool, iteratorFn chunkIteratorFunc, queryIngesterWithin time.Duration) QueryableWithFilter { +func newDistributorQueryable(distributor Distributor, streaming bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration) QueryableWithFilter { return distributorQueryable{ - distributor: distributor, - streaming: streaming, - iteratorFn: iteratorFn, - queryIngesterWithin: queryIngesterWithin, + distributor: distributor, + streaming: streaming, + iteratorFn: iteratorFn, + queryIngestersWithin: queryIngestersWithin, } } type distributorQueryable struct { - distributor Distributor - streaming bool - iteratorFn chunkIteratorFunc - queryIngesterWithin time.Duration + distributor Distributor + streaming bool + iteratorFn chunkIteratorFunc + queryIngestersWithin time.Duration } func (d distributorQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { return &distributorQuerier{ - distributor: d.distributor, - ctx: ctx, - mint: mint, - maxt: maxt, - streaming: d.streaming, - chunkIterFn: d.iteratorFn, + distributor: d.distributor, + ctx: ctx, + mint: mint, + maxt: maxt, + streaming: d.streaming, + chunkIterFn: d.iteratorFn, + queryIngestersWithin: d.queryIngestersWithin, }, nil } func (d distributorQueryable) UseQueryable(now time.Time, _, queryMaxT int64) bool { // Include ingester only if maxt is within QueryIngestersWithin w.r.t. current time. - return d.queryIngesterWithin == 0 || queryMaxT >= util.TimeToMillis(now.Add(-d.queryIngesterWithin)) + return d.queryIngestersWithin == 0 || queryMaxT >= util.TimeToMillis(now.Add(-d.queryIngestersWithin)) } type distributorQuerier struct { - distributor Distributor - ctx context.Context - mint, maxt int64 - streaming bool - chunkIterFn chunkIteratorFunc + distributor Distributor + ctx context.Context + mint, maxt int64 + streaming bool + chunkIterFn chunkIteratorFunc + queryIngestersWithin time.Duration } // Select implements storage.Querier interface. @@ -77,23 +80,45 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers .. log, ctx := spanlogger.New(q.ctx, "distributorQuerier.Select") defer log.Span.Finish() + minT, maxT := q.mint, q.maxt + if sp != nil { + minT, maxT = sp.Start, sp.End + } + + // If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until + // now - queryIngestersWithin, because older time ranges are covered by the storage. This + // optimization is particularly important for the blocks storage where the blocks retention in the + // ingesters could be way higher than queryIngestersWithin. + if q.queryIngestersWithin > 0 { + now := time.Now() + origMinT := minT + minT = util.Max64(minT, util.TimeToMillis(now.Add(-q.queryIngestersWithin))) + + if origMinT != minT { + level.Debug(log).Log("msg", "the min time of the query to ingesters has been manipulated", "original", origMinT, "updated", minT) + } + + if minT > maxT { + level.Debug(log).Log("msg", "empty query time range after min time manipulation") + return storage.EmptySeriesSet() + } + } + // Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation, // which needs only metadata. if sp == nil { - ms, err := q.distributor.MetricsForLabelMatchers(ctx, model.Time(q.mint), model.Time(q.maxt), matchers...) + ms, err := q.distributor.MetricsForLabelMatchers(ctx, model.Time(minT), model.Time(maxT), matchers...) if err != nil { return storage.ErrSeriesSet(err) } return series.MetricsToSeriesSet(ms) } - mint, maxt := sp.Start, sp.End - if q.streaming { - return q.streamingSelect(*sp, matchers) + return q.streamingSelect(minT, maxT, matchers) } - matrix, err := q.distributor.Query(ctx, model.Time(mint), model.Time(maxt), matchers...) + matrix, err := q.distributor.Query(ctx, model.Time(minT), model.Time(maxT), matchers...) if err != nil { return storage.ErrSeriesSet(promql.ErrStorage{Err: err}) } @@ -102,15 +127,13 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers .. return series.MatrixToSeriesSet(matrix) } -func (q *distributorQuerier) streamingSelect(sp storage.SelectHints, matchers []*labels.Matcher) storage.SeriesSet { +func (q *distributorQuerier) streamingSelect(minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet { userID, err := user.ExtractOrgID(q.ctx) if err != nil { return storage.ErrSeriesSet(promql.ErrStorage{Err: err}) } - mint, maxt := sp.Start, sp.End - - results, err := q.distributor.QueryStream(q.ctx, model.Time(mint), model.Time(maxt), matchers...) + results, err := q.distributor.QueryStream(q.ctx, model.Time(minT), model.Time(maxT), matchers...) if err != nil { return storage.ErrSeriesSet(promql.ErrStorage{Err: err}) } diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 58a461541ef..8df7bf56557 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -2,6 +2,7 @@ package querier import ( "context" + "fmt" "testing" "time" @@ -9,6 +10,8 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" @@ -25,8 +28,9 @@ const ( ) func TestDistributorQuerier(t *testing.T) { - d := &mockDistributor{ - m: model.Matrix{ + d := &mockDistributor{} + d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + model.Matrix{ // Matrixes are unsorted, so this tests that the labels get sorted. &model.SampleStream{ Metric: model.Metric{ @@ -39,7 +43,8 @@ func TestDistributorQuerier(t *testing.T) { }, }, }, - } + nil) + queryable := newDistributorQueryable(d, false, nil, 0) querier, err := queryable.Querier(context.Background(), mint, maxt) require.NoError(t, err) @@ -59,6 +64,73 @@ func TestDistributorQuerier(t *testing.T) { require.NoError(t, seriesSet.Err()) } +func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) { + now := time.Now() + + tests := map[string]struct { + queryIngestersWithin time.Duration + queryMinT int64 + queryMaxT int64 + expectedMinT int64 + expectedMaxT int64 + }{ + "should not manipulate query time range if queryIngestersWithin is disabled": { + queryIngestersWithin: 0, + queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)), + queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)), + expectedMinT: util.TimeToMillis(now.Add(-100 * time.Minute)), + expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)), + }, + "should not manipulate query time range if queryIngestersWithin is enabled but query min time is newer": { + queryIngestersWithin: time.Hour, + queryMinT: util.TimeToMillis(now.Add(-50 * time.Minute)), + queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)), + expectedMinT: util.TimeToMillis(now.Add(-50 * time.Minute)), + expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)), + }, + "should manipulate query time range if queryIngestersWithin is enabled and query min time is older": { + queryIngestersWithin: time.Hour, + queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)), + queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)), + expectedMinT: util.TimeToMillis(now.Add(-60 * time.Minute)), + expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)), + }, + "should skip the query if the query max time is older than queryIngestersWithin": { + queryIngestersWithin: time.Hour, + queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)), + queryMaxT: util.TimeToMillis(now.Add(-90 * time.Minute)), + expectedMinT: 0, + expectedMaxT: 0, + }, + } + + for _, streamingEnabled := range []bool{false, true} { + for testName, testData := range tests { + t.Run(fmt.Sprintf("%s (streaming enabled: %t)", testName, streamingEnabled), func(t *testing.T) { + distributor := &mockDistributor{} + distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) + distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) + + ctx := user.InjectOrgID(context.Background(), "test") + queryable := newDistributorQueryable(distributor, streamingEnabled, nil, testData.queryIngestersWithin) + querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT) + require.NoError(t, err) + + seriesSet := querier.Select(true, &storage.SelectHints{Start: testData.queryMinT, End: testData.queryMaxT}) + require.NoError(t, seriesSet.Err()) + + if testData.expectedMinT == 0 && testData.expectedMaxT == 0 { + assert.Len(t, distributor.Calls, 0) + } else { + require.Len(t, distributor.Calls, 1) + assert.InDelta(t, testData.expectedMinT, int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), float64(5*time.Second.Milliseconds())) + assert.Equal(t, testData.expectedMaxT, int64(distributor.Calls[0].Arguments.Get(2).(model.Time))) + } + }) + } + } +} + func TestDistributorQueryableFilter(t *testing.T) { d := &mockDistributor{} dq := newDistributorQueryable(d, false, nil, 1*time.Hour) @@ -86,8 +158,9 @@ func TestIngesterStreaming(t *testing.T) { }) require.NoError(t, err) - d := &mockDistributor{ - r: &client.QueryStreamResponse{ + d := &mockDistributor{} + d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + &client.QueryStreamResponse{ Chunkseries: []client.TimeSeriesChunk{ { Labels: []client.LabelAdapter{ @@ -103,7 +176,8 @@ func TestIngesterStreaming(t *testing.T) { }, }, }, - } + nil) + ctx := user.InjectOrgID(context.Background(), "0") queryable := newDistributorQueryable(d, true, mergeChunks, 0) querier, err := queryable.Querier(ctx, mint, maxt) @@ -125,17 +199,16 @@ func TestIngesterStreaming(t *testing.T) { } type mockDistributor struct { - metadata []scrape.MetricMetadata - metadataError error - m model.Matrix - r *client.QueryStreamResponse + mock.Mock } func (m *mockDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { - return m.m, nil + args := m.Called(ctx, from, to, matchers) + return args.Get(0).(model.Matrix), args.Error(1) } func (m *mockDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) { - return m.r, nil + args := m.Called(ctx, from, to, matchers) + return args.Get(0).(*client.QueryStreamResponse), args.Error(1) } func (m *mockDistributor) LabelValuesForLabelName(context.Context, model.LabelName) ([]string, error) { return nil, nil @@ -148,9 +221,6 @@ func (m *mockDistributor) MetricsForLabelMatchers(ctx context.Context, from, thr } func (m *mockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) { - if m.metadataError != nil { - return nil, m.metadataError - } - - return m.metadata, nil + args := m.Called(ctx) + return args.Get(0).([]scrape.MetricMetadata), args.Error(1) } diff --git a/pkg/querier/metadata_handler_test.go b/pkg/querier/metadata_handler_test.go index 774a4460209..56c92641a11 100644 --- a/pkg/querier/metadata_handler_test.go +++ b/pkg/querier/metadata_handler_test.go @@ -8,15 +8,17 @@ import ( "testing" "github.com/prometheus/prometheus/scrape" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) func TestMetadataHandler_Success(t *testing.T) { - d := &mockDistributor{ - metadata: []scrape.MetricMetadata{ + d := &mockDistributor{} + d.On("MetricsMetadata", mock.Anything).Return( + []scrape.MetricMetadata{ {Metric: "alertmanager_dispatcher_aggregation_groups", Help: "Number of active aggregation groups", Type: "gauge", Unit: ""}, }, - } + nil) handler := MetadataHandler(d) @@ -49,9 +51,8 @@ func TestMetadataHandler_Success(t *testing.T) { } func TestMetadataHandler_Error(t *testing.T) { - d := &mockDistributor{ - metadataError: fmt.Errorf("no user id"), - } + d := &mockDistributor{} + d.On("MetricsMetadata", mock.Anything).Return([]scrape.MetricMetadata{}, fmt.Errorf("no user id")) handler := MetadataHandler(d) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 7a3686a92a5..e5438edf888 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/stretchr/testify/mock" "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/util/validation" @@ -486,10 +487,9 @@ func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *moc matrix, err := chunk.ChunksToMatrix(context.Background(), cs.chunks, 0, through) require.NoError(t, err) - result := &mockDistributor{ - m: matrix, - r: &client.QueryStreamResponse{Chunkseries: []client.TimeSeriesChunk{tsc}}, - } + result := &mockDistributor{} + result.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(matrix, nil) + result.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{Chunkseries: []client.TimeSeriesChunk{tsc}}, nil) return result }