Skip to content

limit get series API call time range when start param is not specified #4976

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not been specified. #4976

## 1.14.0 in progress

Expand Down
4 changes: 3 additions & 1 deletion docs/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,9 @@ GET,POST <prometheus-http-prefix>/api/v1/series
GET,POST <legacy-http-prefix>/api/v1/series
```

Find series by label matchers. Differently than Prometheus and due to scalability and performances reasons, Cortex currently ignores the `start` and `end` request parameters and always fetches the series from in-memory data stored in the ingesters. There is experimental support to query the long-term store with the *blocks* storage engine when `-querier.query-store-for-labels-enabled` is set.
Find series by label matchers. Differently than Prometheus and due to scalability and performances reasons, if `-querier.query-store-for-labels-enabled` is not set or if `start` param is not specified, Cortex currently always fetches series from data stored in the ingesters.

If `-querier.query-store-for-labels-enabled` is configured, Cortex also queries the long-term store with the *blocks* storage engine.

_For more information, please check out the Prometheus [series endpoint](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers) documentation._

Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ func (q *distributorQuerier) Select(sortSeries bool, sp *storage.SelectHints, ma
)

if q.streamingMetadata {
ms, err = q.distributor.MetricsForLabelMatchersStream(ctx, model.Time(q.mint), model.Time(q.maxt), matchers...)
ms, err = q.distributor.MetricsForLabelMatchersStream(ctx, model.Time(minT), model.Time(maxT), matchers...)
} else {
ms, err = q.distributor.MetricsForLabelMatchers(ctx, model.Time(q.mint), model.Time(q.maxt), matchers...)
ms, err = q.distributor.MetricsForLabelMatchers(ctx, model.Time(minT), model.Time(maxT), matchers...)
}

if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
Expand Down Expand Up @@ -127,10 +128,20 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT)
require.NoError(t, err)

limits := DefaultLimitsConfig()
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

start, end, err := validateQueryTimeRange(ctx, "test", testData.queryMinT, testData.queryMaxT, overrides, 0)
require.NoError(t, err)
// Select hints are passed by Prometheus when querying /series.
var hints *storage.SelectHints
if testData.querySeries {
hints = &storage.SelectHints{Func: "series"}
hints = &storage.SelectHints{
Start: start,
End: end,
Func: "series",
}
}

seriesSet := querier.Select(true, hints)
Expand Down
13 changes: 12 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@ func (q querier) Select(sortSeries bool, sp *storage.SelectHints, matchers ...*l
sp.Start = startMs
sp.End = endMs

// For series queries without specifying the start time, we prefer to
// only query ingesters and not to query maxQueryLength to avoid OOM kill.
if sp.Func == "series" && startMs == 0 {
Copy link
Contributor

@alvinlin123 alvinlin123 Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better if we merge this to line 307 above. Because what they are doing are pretty much the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I wanted to merge the two but to do this we need to do L318 ~ L336 first before L307.
This is probably a behavior change because we will do the time range validation first and then send to the metadata querier.

Copy link
Contributor

@alvinlin123 alvinlin123 Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking we can do something along the line

userID, err := tenant.TenantID(ctx)
if err != nil {
  return storage.ErrSeriesSet(err)
}
startMs, endMs, err := validateQueryTimeRange(ctx, userID, sp.Start, sp.End, q.limits, q.maxQueryIntoFuture)

if sp == nil {
  sp = &storage.SelectHints{Start: q.mint, End: q.maxt}
} else if sp.Func == "series" && (!q.queryStoreForLabels ||  (err != nil && startMs == 0)) {	
   return q.metadataQuerier.Select(true, sp, matchers...)
} 

if err == errEmptyTimeRange {
    return storage.NoopSeriesSet()
} else if err != nil {
   return storage.ErrSeriesSet(err)
}

Essentially if start is 0 we treat it as if queryStoreForLabels is false. I also think that validateQueryTimeRange should return -1 for startMs and endMs when err != nil to make our life easier.

There is an edge case where user actually specifies "0" for "start", and Cortex would still return last 24 hours data, but I think this behaviour is ok for the edge case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think this could work...
But is it worth? For readability I prefer to separate them. It is just 1 duplicated line.
This is not very clear tbh.
if sp.Func == "series" && (!q.queryStoreForLabels || (err != nil && startMs == 0))

Copy link
Contributor

@alvinlin123 alvinlin123 Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that's why I was thinking maybe validateQueryTimeRange should return -1s when err is not nil. But this is fine, I don't think we need to do it change anything for now; it's up to you.

I think this PR is ok, but I will let @alanprot have a chance to voice any other concerns based on what I said :)

return q.metadataQuerier.Select(true, sp, matchers...)
}

startTime := model.Time(startMs)
endTime := model.Time(endMs)

Expand Down Expand Up @@ -390,7 +396,7 @@ func (q querier) Select(sortSeries bool, sp *storage.SelectHints, matchers ...*l
return seriesSet
}

// LabelsValue implements storage.Querier.
// LabelValues implements storage.Querier.
func (q querier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
if !q.queryStoreForLabels {
return q.metadataQuerier.LabelValues(name, matchers...)
Expand Down Expand Up @@ -639,6 +645,11 @@ func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs i
}
}

// start time should be at least non-negative to avoid int64 overflow.
if startTime < 0 {
startTime = 0
}

return int64(startTime), int64(endTime), nil
}

Expand Down
105 changes: 91 additions & 14 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,32 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/tenant"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/mock"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/purger"
"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/chunk"
promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
"github.com/cortexproject/cortex/pkg/purger"
"github.com/cortexproject/cortex/pkg/querier/batch"
"github.com/cortexproject/cortex/pkg/querier/iterators"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
Expand Down Expand Up @@ -693,6 +690,19 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
queryEndTime: now.Add(-thirtyDays).Add(-90 * time.Hour),
expectedSkipped: true,
},
"negative start time with max query lookback": {
maxQueryLookback: model.Duration(thirtyDays),
queryStartTime: time.Unix(-1000, 0),
queryEndTime: now,
expectedMetadataStartTime: now.Add(-thirtyDays),
expectedMetadataEndTime: now,
},
"negative start time without max query lookback": {
queryStartTime: time.Unix(-1000, 0),
queryEndTime: now,
expectedMetadataStartTime: time.Unix(0, 0),
expectedMetadataEndTime: now,
},
}

// Create the PromQL engine to execute the queries.
Expand Down Expand Up @@ -733,6 +743,9 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))}

t.Run("query range", func(t *testing.T) {
if testData.query == "" {
return
}
distributor := &MockDistributor{}
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil)
Expand All @@ -756,7 +769,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
assert.InDelta(t, util.TimeToMillis(testData.expectedQueryStartTime), int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), delta)
assert.InDelta(t, util.TimeToMillis(testData.expectedQueryEndTime), int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), delta)
} else {
// Ensure no query has been executed executed (because skipped).
// Ensure no query has been executed (because skipped).
assert.Len(t, distributor.Calls, 0)
}
})
Expand All @@ -770,9 +783,17 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
q, err := queryable.Querier(ctx, util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime))
require.NoError(t, err)

// We apply the validation here again since when initializing querier we change the start/end time,
// but when querying series we don't validate again. So we should pass correct hints here.
start, end, err := validateQueryTimeRange(ctx, "test", util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime), overrides, 0)
// Skipped query will hit errEmptyTimeRange during validation.
if !testData.expectedSkipped {
require.NoError(t, err)
}

hints := &storage.SelectHints{
Start: util.TimeToMillis(testData.queryStartTime),
End: util.TimeToMillis(testData.queryEndTime),
Start: start,
End: end,
Func: "series",
}
matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test")
Expand All @@ -789,7 +810,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
assert.InDelta(t, util.TimeToMillis(testData.expectedMetadataStartTime), int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), delta)
assert.InDelta(t, util.TimeToMillis(testData.expectedMetadataEndTime), int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), delta)
} else {
// Ensure no query has been executed executed (because skipped).
// Ensure no query has been executed (because skipped).
assert.Len(t, distributor.Calls, 0)
}
})
Expand All @@ -814,7 +835,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
assert.InDelta(t, util.TimeToMillis(testData.expectedMetadataStartTime), int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), delta)
assert.InDelta(t, util.TimeToMillis(testData.expectedMetadataEndTime), int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), delta)
} else {
// Ensure no query has been executed executed (because skipped).
// Ensure no query has been executed (because skipped).
assert.Len(t, distributor.Calls, 0)
}
})
Expand Down Expand Up @@ -878,6 +899,62 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
}
}

// Test max query length limit works with new validateQueryTimeRange function.
func TestValidateMaxQueryLength(t *testing.T) {
ctx := context.Background()
now := time.Now()
for _, tc := range []struct {
name string
start time.Time
end time.Time
expectedStartMs int64
expectedEndMs int64
maxQueryLength time.Duration
exceedQueryLength bool
}{
{
name: "normal params, not hit max query length",
start: now.Add(-time.Hour),
end: now,
expectedStartMs: util.TimeToMillis(now.Add(-time.Hour)),
expectedEndMs: util.TimeToMillis(now),
maxQueryLength: 24 * time.Hour,
exceedQueryLength: false,
},
{
name: "normal params, hit max query length",
start: now.Add(-100 * time.Hour),
end: now,
expectedStartMs: util.TimeToMillis(now.Add(-100 * time.Hour)),
expectedEndMs: util.TimeToMillis(now),
maxQueryLength: 24 * time.Hour,
exceedQueryLength: true,
},
{
name: "negative start",
start: time.Unix(-1000, 0),
end: now,
expectedStartMs: 0,
expectedEndMs: util.TimeToMillis(now),
maxQueryLength: 24 * time.Hour,
exceedQueryLength: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
limits := DefaultLimitsConfig()
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)
startMs, endMs, err := validateQueryTimeRange(ctx, "test", util.TimeToMillis(tc.start), util.TimeToMillis(tc.end), overrides, 0)
require.NoError(t, err)
startTime := model.Time(startMs)
endTime := model.Time(endMs)
if tc.maxQueryLength > 0 {
require.Equal(t, tc.exceedQueryLength, endTime.Sub(startTime) > tc.maxQueryLength)
}
})
}
}

// mockDistibutorFor duplicates the chunks in the mockChunkStore into the mockDistributor
// so we can test everything is dedupe correctly.
func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *MockDistributor {
Expand Down