Skip to content

Commit f0fb4b7

Browse files
authored
Skip hitting the store if the query is in future. (#1929)
Signed-off-by: Goutham Veeramachaneni <[email protected]>
1 parent 1a9d546 commit f0fb4b7

File tree

6 files changed

+141
-12
lines changed

6 files changed

+141
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* `server.grpc-max-concurrent-streams` (for both query-frontends and queriers)
1515
* [CHANGE] The frontend http server will now send 502 in case of deadline exceeded and 499 if the user requested cancellation. #2156
1616
* [CHANGE] Config file changed to remove top level `config_store` field in favor of a nested `configdb` field. #2125
17+
* [CHANGE] We now enforce queries to be up to `-querier.max-query-into-future` into the future (defaults to 10m). #1929
1718
* [CHANGE] Removed unnecessary `frontend.cache-split-interval` in favor of `querier.split-queries-by-interval` both to reduce configuration complexity and guarantee alignment of these two configs. Starting from now, `-querier.cache-results` may only be enabled in conjunction with `-querier.split-queries-by-interval` (previously the cache interval default was `24h` so if you want to preserve the same behaviour you should set `-querier.split-queries-by-interval=24h`). #2040
1819
* [CHANGE] Removed remaining support for using denormalised tokens in the ring. If you're still running ingesters with denormalised tokens (Cortex 0.4 or earlier, with `-ingester.normalise-tokens=false`), such ingesters will now be completely invisible to distributors and need to be either switched to Cortex 0.6.0 or later, or be configured to use normalised tokens. #2034
1920
* [CHANGE] Moved `--store.min-chunk-age` to the Querier config as `--querier.query-store-after`, allowing the store to be skipped during query time if the metrics wouldn't be found. The YAML config option `ingestermaxquerylookback` has been renamed to `query_ingesters_within` to match its CLI flag. #1893

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,10 @@ The `querier_config` configures the Cortex querier.
562562
# CLI flag: -querier.query-store-after
563563
[query_store_after: <duration> | default = 0s]
564564
565+
# Maximum duration into the future you can query. 0 to disable.
566+
# CLI flag: -querier.max-query-into-future
567+
[max_query_into_future: <duration> | default = 10m0s]
568+
565569
# The default evaluation interval or step size for subqueries.
566570
# CLI flag: -querier.default-evaluation-interval
567571
[defaultevaluationinterval: <duration> | default = 1m0s]

pkg/querier/querier.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"time"
88

99
"github.com/prometheus/client_golang/prometheus"
10-
"github.com/prometheus/common/model"
1110
"github.com/prometheus/prometheus/pkg/labels"
1211
"github.com/prometheus/prometheus/promql"
1312
"github.com/prometheus/prometheus/storage"
@@ -31,7 +30,8 @@ type Config struct {
3130
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`
3231

3332
// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
34-
QueryStoreAfter time.Duration `yaml:"query_store_after"`
33+
QueryStoreAfter time.Duration `yaml:"query_store_after"`
34+
MaxQueryIntoFuture time.Duration `yaml:"max_query_into_future"`
3535

3636
// The default evaluation interval for the promql engine.
3737
// Needs to be configured for subqueries to work as it is the default
@@ -64,6 +64,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
6464
f.BoolVar(&cfg.IngesterStreaming, "querier.ingester-streaming", false, "Use streaming RPCs to query ingester.")
6565
f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.")
6666
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
67+
f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
6768
f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
6869
f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should only be queried from storage and not just ingesters. 0 means all queries are sent to store.")
6970
f.StringVar(&cfg.ActiveQueryTrackerDir, "querier.active-query-tracker-dir", "./active-query-tracker", "Active query tracker monitors active queries, and writes them to the file in given directory. If Cortex discovers any queries in this log during startup, it will log them to the log file. Setting to empty value disables active query tracker, which also disables -querier.max-concurrent option.")
@@ -136,6 +137,19 @@ func createActiveQueryTracker(cfg Config) *promql.ActiveQueryTracker {
136137
// NewQueryable creates a new Queryable for cortex.
137138
func NewQueryable(distributor, store storage.Queryable, chunkIterFn chunkIteratorFunc, cfg Config) storage.Queryable {
138139
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
140+
now := time.Now()
141+
142+
if cfg.MaxQueryIntoFuture > 0 {
143+
maxQueryTime := util.TimeMilliseconds(now.Add(cfg.MaxQueryIntoFuture))
144+
145+
if mint > maxQueryTime {
146+
return storage.NoopQuerier(), nil
147+
}
148+
if maxt > maxQueryTime {
149+
maxt = maxQueryTime
150+
}
151+
}
152+
139153
q := querier{
140154
ctx: ctx,
141155
mint: mint,
@@ -150,14 +164,13 @@ func NewQueryable(distributor, store storage.Queryable, chunkIterFn chunkIterato
150164

151165
q.metadataQuerier = dqr
152166

153-
// Include ingester only if maxt is within queryIngestersWithin w.r.t. current time.
154-
now := model.Now()
155-
if cfg.QueryIngestersWithin == 0 || maxt >= int64(now.Add(-cfg.QueryIngestersWithin)) {
167+
// Include ingester only if maxt is within QueryIngestersWithin w.r.t. current time.
168+
if cfg.QueryIngestersWithin == 0 || maxt >= util.TimeMilliseconds(now.Add(-cfg.QueryIngestersWithin)) {
156169
q.queriers = append(q.queriers, dqr)
157170
}
158171

159172
// Include store only if mint is within QueryStoreAfter w.r.t current time.
160-
if cfg.QueryStoreAfter == 0 || mint <= int64(now.Add(-cfg.QueryStoreAfter)) {
173+
if cfg.QueryStoreAfter == 0 || mint <= util.TimeMilliseconds(now.Add(-cfg.QueryStoreAfter)) {
161174
cqr, err := store.Querier(ctx, mint, maxt)
162175
if err != nil {
163176
return nil, err

pkg/querier/querier_test.go

Lines changed: 107 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,96 @@ func TestNoHistoricalQueryToIngester(t *testing.T) {
237237

238238
}
239239

240+
func TestNoFutureQueries(t *testing.T) {
241+
testCases := []struct {
242+
name string
243+
mint, maxt time.Time
244+
hitStores bool
245+
maxQueryIntoFuture time.Duration
246+
}{
247+
{
248+
name: "hit-test1",
249+
mint: time.Now().Add(-5 * time.Hour),
250+
maxt: time.Now().Add(1 * time.Hour),
251+
hitStores: true,
252+
maxQueryIntoFuture: 10 * time.Minute,
253+
},
254+
{
255+
name: "hit-test2",
256+
mint: time.Now().Add(-5 * time.Hour),
257+
maxt: time.Now().Add(-59 * time.Minute),
258+
hitStores: true,
259+
maxQueryIntoFuture: 10 * time.Minute,
260+
},
261+
{ // Skipping stores is disabled.
262+
name: "max-query-into-future-disabled",
263+
mint: time.Now().Add(500 * time.Hour),
264+
maxt: time.Now().Add(5000 * time.Hour),
265+
hitStores: true,
266+
maxQueryIntoFuture: 0,
267+
},
268+
{ // Still hit because of staleness.
269+
name: "hit-test3",
270+
mint: time.Now().Add(12 * time.Minute),
271+
maxt: time.Now().Add(60 * time.Minute),
272+
hitStores: true,
273+
maxQueryIntoFuture: 10 * time.Minute,
274+
},
275+
{
276+
name: "dont-hit-test1",
277+
mint: time.Now().Add(100 * time.Minute),
278+
maxt: time.Now().Add(5 * time.Hour),
279+
hitStores: false,
280+
maxQueryIntoFuture: 10 * time.Minute,
281+
},
282+
{
283+
name: "dont-hit-test2",
284+
mint: time.Now().Add(16 * time.Minute),
285+
maxt: time.Now().Add(60 * time.Minute),
286+
hitStores: false,
287+
maxQueryIntoFuture: 10 * time.Minute,
288+
},
289+
}
290+
291+
engine := promql.NewEngine(promql.EngineOpts{
292+
Logger: util.Logger,
293+
MaxSamples: 1e6,
294+
Timeout: 1 * time.Minute,
295+
})
296+
297+
cfg := Config{}
298+
flagext.DefaultValues(&cfg)
299+
cfg.metricsRegisterer = nil
300+
301+
for _, ingesterStreaming := range []bool{true, false} {
302+
cfg.IngesterStreaming = ingesterStreaming
303+
for _, c := range testCases {
304+
cfg.MaxQueryIntoFuture = c.maxQueryIntoFuture
305+
t.Run(fmt.Sprintf("IngesterStreaming=%t,test=%s", cfg.IngesterStreaming, c.name), func(t *testing.T) {
306+
chunkStore := &errChunkStore{}
307+
distributor := &errDistributor{}
308+
309+
queryable, _ := New(cfg, distributor, chunkStore)
310+
query, err := engine.NewRangeQuery(queryable, "dummy", c.mint, c.maxt, 1*time.Minute)
311+
require.NoError(t, err)
312+
313+
ctx := user.InjectOrgID(context.Background(), "0")
314+
r := query.Exec(ctx)
315+
_, err = r.Matrix()
316+
317+
if c.hitStores {
318+
// If the ingester was hit, the distributor always returns errDistributorError.
319+
require.Error(t, err)
320+
require.Equal(t, errDistributorError.Error(), err.Error())
321+
} else {
322+
// If the ingester was hit, there would have been an error from errDistributor.
323+
require.NoError(t, err)
324+
}
325+
})
326+
}
327+
}
328+
}
329+
240330
// mockDistibutorFor duplicates the chunks in the mockChunkStore into the mockDistributor
241331
// so we can test everything is dedupe correctly.
242332
func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *mockDistributor {
@@ -292,18 +382,27 @@ func testQuery(t testing.TB, queryable storage.Queryable, end model.Time, q quer
292382
return r
293383
}
294384

385+
type errChunkStore struct {
386+
}
387+
388+
func (m *errChunkStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
389+
return nil, errDistributorError
390+
}
391+
392+
func (m *errChunkStore) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
393+
return storage.NoopQuerier(), errDistributorError
394+
}
395+
295396
type errDistributor struct {
296-
m model.Matrix
297-
r []client.TimeSeriesChunk
298397
}
299398

300399
var errDistributorError = fmt.Errorf("errDistributorError")
301400

302401
func (m *errDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
303-
return m.m, errDistributorError
402+
return nil, errDistributorError
304403
}
305404
func (m *errDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]client.TimeSeriesChunk, error) {
306-
return m.r, errDistributorError
405+
return nil, errDistributorError
307406
}
308407
func (m *errDistributor) LabelValuesForLabelName(context.Context, model.LabelName) ([]string, error) {
309408
return nil, errDistributorError
@@ -382,7 +481,11 @@ func TestShortTermQueryToLTS(t *testing.T) {
382481
MaxSamples: 1e6,
383482
Timeout: 1 * time.Minute,
384483
})
484+
385485
cfg := Config{}
486+
flagext.DefaultValues(&cfg)
487+
cfg.metricsRegisterer = nil
488+
386489
for _, ingesterStreaming := range []bool{true, false} {
387490
cfg.IngesterStreaming = ingesterStreaming
388491
for _, c := range testCases {

pkg/querier/queryrange/query_range.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/weaveworks/common/httpgrpc"
2121

2222
"github.com/cortexproject/cortex/pkg/ingester/client"
23+
"github.com/cortexproject/cortex/pkg/util"
2324
)
2425

2526
// StatusSuccess Prometheus success result.
@@ -335,10 +336,10 @@ func ParseTime(s string) (int64, error) {
335336
if t, err := strconv.ParseFloat(s, 64); err == nil {
336337
s, ns := math.Modf(t)
337338
tm := time.Unix(int64(s), int64(ns*float64(time.Second)))
338-
return tm.UnixNano() / int64(time.Millisecond/time.Nanosecond), nil
339+
return util.TimeMilliseconds(tm), nil
339340
}
340341
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
341-
return t.UnixNano() / int64(time.Millisecond/time.Nanosecond), nil
342+
return util.TimeMilliseconds(t), nil
342343
}
343344
return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid timestamp", s)
344345
}

pkg/util/time.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package util
2+
3+
import "time"
4+
5+
func TimeMilliseconds(t time.Time) int64 {
6+
return t.UnixNano() / int64(time.Millisecond/time.Nanosecond)
7+
}

0 commit comments

Comments
 (0)