Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* `server.grpc-max-concurrent-streams` (for both query-frontends and queriers)
* [CHANGE] The frontend http server will now send 502 in case of deadline exceeded and 499 if the user requested cancellation. #2156
* [CHANGE] Config file changed to remove top level `config_store` field in favor of a nested `configdb` field. #2125
* [CHANGE] We now enforce queries to be up to `-querier.max-query-into-future` into the future (defaults to 10m). #1929
* [CHANGE] Removed unnecessary `frontend.cache-split-interval` in favor of `querier.split-queries-by-interval` both to reduce configuration complexity and guarantee alignment of these two configs. Starting from now, `-querier.cache-results` may only be enabled in conjunction with `-querier.split-queries-by-interval` (previously the cache interval default was `24h` so if you want to preserve the same behaviour you should set `-querier.split-queries-by-interval=24h`). #2040
* [CHANGE] Removed remaining support for using denormalised tokens in the ring. If you're still running ingesters with denormalised tokens (Cortex 0.4 or earlier, with `-ingester.normalise-tokens=false`), such ingesters will now be completely invisible to distributors and need to be either switched to Cortex 0.6.0 or later, or be configured to use normalised tokens. #2034
* [CHANGE] Moved `--store.min-chunk-age` to the Querier config as `--querier.query-store-after`, allowing the store to be skipped during query time if the metrics wouldn't be found. The YAML config option `ingestermaxquerylookback` has been renamed to `query_ingesters_within` to match its CLI flag. #1893
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ The `querier_config` configures the Cortex querier.
# CLI flag: -querier.query-store-after
[query_store_after: <duration> | default = 0s]

# Maximum duration into the future you can query. 0 to disable.
# CLI flag: -querier.max-query-into-future
[max_query_into_future: <duration> | default = 10m0s]

# The default evaluation interval or step size for subqueries.
# CLI flag: -querier.default-evaluation-interval
[defaultevaluationinterval: <duration> | default = 1m0s]
Expand Down
25 changes: 19 additions & 6 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
Expand All @@ -31,7 +30,8 @@ type Config struct {
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`

// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
QueryStoreAfter time.Duration `yaml:"query_store_after"`
QueryStoreAfter time.Duration `yaml:"query_store_after"`
MaxQueryIntoFuture time.Duration `yaml:"max_query_into_future"`

// The default evaluation interval for the promql engine.
// Needs to be configured for subqueries to work as it is the default
Expand Down Expand Up @@ -64,6 +64,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.IngesterStreaming, "querier.ingester-streaming", false, "Use streaming RPCs to query ingester.")
f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.")
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should only be queried from storage and not just ingesters. 0 means all queries are sent to store.")
f.StringVar(&cfg.ActiveQueryTrackerDir, "querier.active-query-tracker-dir", "./active-query-tracker", "Active query tracker monitors active queries, and writes them to the file in given directory. If Cortex discovers any queries in this log during startup, it will log them to the log file. Setting to empty value disables active query tracker, which also disables -querier.max-concurrent option.")
Expand Down Expand Up @@ -136,6 +137,19 @@ func createActiveQueryTracker(cfg Config) *promql.ActiveQueryTracker {
// NewQueryable creates a new Queryable for cortex.
func NewQueryable(distributor, store storage.Queryable, chunkIterFn chunkIteratorFunc, cfg Config) storage.Queryable {
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
now := time.Now()

if cfg.MaxQueryIntoFuture > 0 {
maxQueryTime := util.TimeMilliseconds(now.Add(cfg.MaxQueryIntoFuture))

if mint > maxQueryTime {
return storage.NoopQuerier(), nil
}
if maxt > maxQueryTime {
maxt = maxQueryTime
}
}

q := querier{
ctx: ctx,
mint: mint,
Expand All @@ -150,14 +164,13 @@ func NewQueryable(distributor, store storage.Queryable, chunkIterFn chunkIterato

q.metadataQuerier = dqr

// Include ingester only if maxt is within queryIngestersWithin w.r.t. current time.
now := model.Now()
if cfg.QueryIngestersWithin == 0 || maxt >= int64(now.Add(-cfg.QueryIngestersWithin)) {
// Include ingester only if maxt is within QueryIngestersWithin w.r.t. current time.
if cfg.QueryIngestersWithin == 0 || maxt >= util.TimeMilliseconds(now.Add(-cfg.QueryIngestersWithin)) {
q.queriers = append(q.queriers, dqr)
}

// Include store only if mint is within QueryStoreAfter w.r.t current time.
if cfg.QueryStoreAfter == 0 || mint <= int64(now.Add(-cfg.QueryStoreAfter)) {
if cfg.QueryStoreAfter == 0 || mint <= util.TimeMilliseconds(now.Add(-cfg.QueryStoreAfter)) {
cqr, err := store.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
Expand Down
111 changes: 107 additions & 4 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,96 @@ func TestNoHistoricalQueryToIngester(t *testing.T) {

}

func TestNoFutureQueries(t *testing.T) {
testCases := []struct {
name string
mint, maxt time.Time
hitStores bool
maxQueryIntoFuture time.Duration
}{
{
name: "hit-test1",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(1 * time.Hour),
hitStores: true,
maxQueryIntoFuture: 10 * time.Minute,
},
{
name: "hit-test2",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(-59 * time.Minute),
hitStores: true,
maxQueryIntoFuture: 10 * time.Minute,
},
{ // Skipping stores is disabled.
name: "max-query-into-future-disabled",
mint: time.Now().Add(500 * time.Hour),
maxt: time.Now().Add(5000 * time.Hour),
hitStores: true,
maxQueryIntoFuture: 0,
},
{ // Still hit because of staleness.
name: "hit-test3",
mint: time.Now().Add(12 * time.Minute),
maxt: time.Now().Add(60 * time.Minute),
hitStores: true,
maxQueryIntoFuture: 10 * time.Minute,
},
{
name: "dont-hit-test1",
mint: time.Now().Add(100 * time.Minute),
maxt: time.Now().Add(5 * time.Hour),
hitStores: false,
maxQueryIntoFuture: 10 * time.Minute,
},
{
name: "dont-hit-test2",
mint: time.Now().Add(16 * time.Minute),
maxt: time.Now().Add(60 * time.Minute),
hitStores: false,
maxQueryIntoFuture: 10 * time.Minute,
},
}

engine := promql.NewEngine(promql.EngineOpts{
Logger: util.Logger,
MaxSamples: 1e6,
Timeout: 1 * time.Minute,
})

cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.metricsRegisterer = nil

for _, ingesterStreaming := range []bool{true, false} {
cfg.IngesterStreaming = ingesterStreaming
for _, c := range testCases {
cfg.MaxQueryIntoFuture = c.maxQueryIntoFuture
t.Run(fmt.Sprintf("IngesterStreaming=%t,test=%s", cfg.IngesterStreaming, c.name), func(t *testing.T) {
chunkStore := &errChunkStore{}
distributor := &errDistributor{}

queryable, _ := New(cfg, distributor, chunkStore)
query, err := engine.NewRangeQuery(queryable, "dummy", c.mint, c.maxt, 1*time.Minute)
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), "0")
r := query.Exec(ctx)
_, err = r.Matrix()

if c.hitStores {
// If the ingester was hit, the distributor always returns errDistributorError.
require.Error(t, err)
require.Equal(t, errDistributorError.Error(), err.Error())
} else {
// If the ingester was hit, there would have been an error from errDistributor.
require.NoError(t, err)
}
})
}
}
}

// mockDistibutorFor duplicates the chunks in the mockChunkStore into the mockDistributor
// so we can test everything is dedupe correctly.
func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *mockDistributor {
Expand Down Expand Up @@ -292,18 +382,27 @@ func testQuery(t testing.TB, queryable storage.Queryable, end model.Time, q quer
return r
}

type errChunkStore struct {
}

func (m *errChunkStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
return nil, errDistributorError
}

func (m *errChunkStore) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return storage.NoopQuerier(), errDistributorError
}

type errDistributor struct {
m model.Matrix
r []client.TimeSeriesChunk
}

var errDistributorError = fmt.Errorf("errDistributorError")

func (m *errDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
return m.m, errDistributorError
return nil, errDistributorError
}
func (m *errDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]client.TimeSeriesChunk, error) {
return m.r, errDistributorError
return nil, errDistributorError
}
func (m *errDistributor) LabelValuesForLabelName(context.Context, model.LabelName) ([]string, error) {
return nil, errDistributorError
Expand Down Expand Up @@ -382,7 +481,11 @@ func TestShortTermQueryToLTS(t *testing.T) {
MaxSamples: 1e6,
Timeout: 1 * time.Minute,
})

cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.metricsRegisterer = nil

for _, ingesterStreaming := range []bool{true, false} {
cfg.IngesterStreaming = ingesterStreaming
for _, c := range testCases {
Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
)

// StatusSuccess Prometheus success result.
Expand Down Expand Up @@ -335,10 +336,10 @@ func ParseTime(s string) (int64, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
tm := time.Unix(int64(s), int64(ns*float64(time.Second)))
return tm.UnixNano() / int64(time.Millisecond/time.Nanosecond), nil
return util.TimeMilliseconds(tm), nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return t.UnixNano() / int64(time.Millisecond/time.Nanosecond), nil
return util.TimeMilliseconds(t), nil
}
return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid timestamp", s)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package util

import "time"

func TimeMilliseconds(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond/time.Nanosecond)
}