diff --git a/CHANGELOG.md b/CHANGELOG.md index e3798a71fc1..a8919815fbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 +* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not been specified. #4976 ## 1.14.0 in progress diff --git a/docs/api/_index.md b/docs/api/_index.md index 32dce280eb5..695e8252e30 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -341,7 +341,9 @@ GET,POST /api/v1/series GET,POST /api/v1/series ``` -Find series by label matchers. Differently than Prometheus and due to scalability and performances reasons, Cortex currently ignores the `start` and `end` request parameters and always fetches the series from in-memory data stored in the ingesters. There is experimental support to query the long-term store with the *blocks* storage engine when `-querier.query-store-for-labels-enabled` is set. +Find series by label matchers. Differently than Prometheus and due to scalability and performances reasons, if `-querier.query-store-for-labels-enabled` is not set or if `start` param is not specified, Cortex currently always fetches series from data stored in the ingesters. + +If `-querier.query-store-for-labels-enabled` is configured, Cortex also queries the long-term store with the *blocks* storage engine. _For more information, please check out the Prometheus [series endpoint](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers) documentation._ diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 2db6919a4f3..1d89dd064e3 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -108,9 +108,9 @@ func (q *distributorQuerier) Select(sortSeries bool, sp *storage.SelectHints, ma ) if q.streamingMetadata { - ms, err = q.distributor.MetricsForLabelMatchersStream(ctx, model.Time(q.mint), model.Time(q.maxt), matchers...) + ms, err = q.distributor.MetricsForLabelMatchersStream(ctx, model.Time(minT), model.Time(maxT), matchers...) } else { - ms, err = q.distributor.MetricsForLabelMatchers(ctx, model.Time(q.mint), model.Time(q.maxt), matchers...) + ms, err = q.distributor.MetricsForLabelMatchers(ctx, model.Time(minT), model.Time(maxT), matchers...) } if err != nil { diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 5e1a2b85aee..1481d719825 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -21,6 +21,7 @@ import ( "github.com/cortexproject/cortex/pkg/prom1/storage/metric" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" + "github.com/cortexproject/cortex/pkg/util/validation" ) const ( @@ -127,10 +128,20 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT) require.NoError(t, err) + limits := DefaultLimitsConfig() + overrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) + + start, end, err := validateQueryTimeRange(ctx, "test", testData.queryMinT, testData.queryMaxT, overrides, 0) + require.NoError(t, err) // Select hints are passed by Prometheus when querying /series. var hints *storage.SelectHints if testData.querySeries { - hints = &storage.SelectHints{Func: "series"} + hints = &storage.SelectHints{ + Start: start, + End: end, + Func: "series", + } } seriesSet := querier.Select(true, hints) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 2c950f28d42..705bec628db 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -335,6 +335,12 @@ func (q querier) Select(sortSeries bool, sp *storage.SelectHints, matchers ...*l sp.Start = startMs sp.End = endMs + // For series queries without specifying the start time, we prefer to + // only query ingesters and not to query maxQueryLength to avoid OOM kill. + if sp.Func == "series" && startMs == 0 { + return q.metadataQuerier.Select(true, sp, matchers...) + } + startTime := model.Time(startMs) endTime := model.Time(endMs) @@ -390,7 +396,7 @@ func (q querier) Select(sortSeries bool, sp *storage.SelectHints, matchers ...*l return seriesSet } -// LabelsValue implements storage.Querier. +// LabelValues implements storage.Querier. func (q querier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { if !q.queryStoreForLabels { return q.metadataQuerier.LabelValues(name, matchers...) @@ -639,6 +645,11 @@ func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs i } } + // start time should be at least non-negative to avoid int64 overflow. + if startTime < 0 { + startTime = 0 + } + return int64(startTime), int64(endTime), nil } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index da75a81bc38..b8cfe922570 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -8,35 +8,32 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/tenant" - "github.com/go-kit/log" "github.com/pkg/errors" - "github.com/prometheus/prometheus/tsdb" - "github.com/stretchr/testify/mock" - - "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/purger" - "github.com/cortexproject/cortex/pkg/util/validation" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/chunk" promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" + "github.com/cortexproject/cortex/pkg/purger" "github.com/cortexproject/cortex/pkg/querier/batch" "github.com/cortexproject/cortex/pkg/querier/iterators" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/validation" ) const ( @@ -693,6 +690,19 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { queryEndTime: now.Add(-thirtyDays).Add(-90 * time.Hour), expectedSkipped: true, }, + "negative start time with max query lookback": { + maxQueryLookback: model.Duration(thirtyDays), + queryStartTime: time.Unix(-1000, 0), + queryEndTime: now, + expectedMetadataStartTime: now.Add(-thirtyDays), + expectedMetadataEndTime: now, + }, + "negative start time without max query lookback": { + queryStartTime: time.Unix(-1000, 0), + queryEndTime: now, + expectedMetadataStartTime: time.Unix(0, 0), + expectedMetadataEndTime: now, + }, } // Create the PromQL engine to execute the queries. @@ -733,6 +743,9 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))} t.Run("query range", func(t *testing.T) { + if testData.query == "" { + return + } distributor := &MockDistributor{} distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) @@ -756,7 +769,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { assert.InDelta(t, util.TimeToMillis(testData.expectedQueryStartTime), int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), delta) assert.InDelta(t, util.TimeToMillis(testData.expectedQueryEndTime), int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), delta) } else { - // Ensure no query has been executed executed (because skipped). + // Ensure no query has been executed (because skipped). assert.Len(t, distributor.Calls, 0) } }) @@ -770,9 +783,17 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { q, err := queryable.Querier(ctx, util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) + // We apply the validation here again since when initializing querier we change the start/end time, + // but when querying series we don't validate again. So we should pass correct hints here. + start, end, err := validateQueryTimeRange(ctx, "test", util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime), overrides, 0) + // Skipped query will hit errEmptyTimeRange during validation. + if !testData.expectedSkipped { + require.NoError(t, err) + } + hints := &storage.SelectHints{ - Start: util.TimeToMillis(testData.queryStartTime), - End: util.TimeToMillis(testData.queryEndTime), + Start: start, + End: end, Func: "series", } matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test") @@ -789,7 +810,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { assert.InDelta(t, util.TimeToMillis(testData.expectedMetadataStartTime), int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), delta) assert.InDelta(t, util.TimeToMillis(testData.expectedMetadataEndTime), int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), delta) } else { - // Ensure no query has been executed executed (because skipped). + // Ensure no query has been executed (because skipped). assert.Len(t, distributor.Calls, 0) } }) @@ -814,7 +835,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { assert.InDelta(t, util.TimeToMillis(testData.expectedMetadataStartTime), int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), delta) assert.InDelta(t, util.TimeToMillis(testData.expectedMetadataEndTime), int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), delta) } else { - // Ensure no query has been executed executed (because skipped). + // Ensure no query has been executed (because skipped). assert.Len(t, distributor.Calls, 0) } }) @@ -878,6 +899,62 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { } } +// Test max query length limit works with new validateQueryTimeRange function. +func TestValidateMaxQueryLength(t *testing.T) { + ctx := context.Background() + now := time.Now() + for _, tc := range []struct { + name string + start time.Time + end time.Time + expectedStartMs int64 + expectedEndMs int64 + maxQueryLength time.Duration + exceedQueryLength bool + }{ + { + name: "normal params, not hit max query length", + start: now.Add(-time.Hour), + end: now, + expectedStartMs: util.TimeToMillis(now.Add(-time.Hour)), + expectedEndMs: util.TimeToMillis(now), + maxQueryLength: 24 * time.Hour, + exceedQueryLength: false, + }, + { + name: "normal params, hit max query length", + start: now.Add(-100 * time.Hour), + end: now, + expectedStartMs: util.TimeToMillis(now.Add(-100 * time.Hour)), + expectedEndMs: util.TimeToMillis(now), + maxQueryLength: 24 * time.Hour, + exceedQueryLength: true, + }, + { + name: "negative start", + start: time.Unix(-1000, 0), + end: now, + expectedStartMs: 0, + expectedEndMs: util.TimeToMillis(now), + maxQueryLength: 24 * time.Hour, + exceedQueryLength: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + limits := DefaultLimitsConfig() + overrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) + startMs, endMs, err := validateQueryTimeRange(ctx, "test", util.TimeToMillis(tc.start), util.TimeToMillis(tc.end), overrides, 0) + require.NoError(t, err) + startTime := model.Time(startMs) + endTime := model.Time(endMs) + if tc.maxQueryLength > 0 { + require.Equal(t, tc.exceedQueryLength, endTime.Sub(startTime) > tc.maxQueryLength) + } + }) + } +} + // mockDistibutorFor duplicates the chunks in the mockChunkStore into the mockDistributor // so we can test everything is dedupe correctly. func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *MockDistributor {