diff --git a/CHANGELOG.md b/CHANGELOG.md index 903118c1979..dac257f1a5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [ENHANCEMENT] Query Frontend/Query Scheduler: Increase upper bound to 60s for queue duration histogram metric. #5029 * [ENHANCEMENT] Query Frontend: Log Vertical sharding information when `query_stats_enabled` is enabled. #5037 * [ENHANCEMENT] Ingester: The metadata APIs should honour `querier.query-ingesters-within` when `querier.query-store-for-labels-enabled` is true. #5027 +* [ENHANCEMENT] Query Frontend: Skip instant query roundtripper if sharding is not applicable. #5062 * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 * [FEATURE] Ingester: Add active series to all_user_stats page. #4972 * [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000 diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index c8560f7e3b7..189ee6604b1 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/prometheus/rules" prom_storage "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/querysharding" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/server" @@ -437,6 +438,7 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { // initQueryFrontendTripperware instantiates the tripperware used by the query frontend // to optimize Prometheus query requests. func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) { + queryAnalyzer := querysharding.NewQueryAnalyzer() queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, util_log.Logger, @@ -444,12 +446,13 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro queryrange.PrometheusResponseExtractor{}, prometheus.DefaultRegisterer, t.TombstonesLoader, + queryAnalyzer, ) if err != nil { return nil, err } - instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides) + instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer) if err != nil { return nil, err } @@ -461,6 +464,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro instantQueryMiddlewares, queryrange.PrometheusCodec, instantquery.InstantQueryCodec, + t.Overrides, + queryAnalyzer, ) return services.NewIdleService(nil, func(_ error) error { diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go index 62cdf68011a..b88515e6be0 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go @@ -2,6 +2,7 @@ package instantquery import ( "github.com/go-kit/log" + "github.com/thanos-io/thanos/pkg/querysharding" "github.com/cortexproject/cortex/pkg/querier/tripperware" ) @@ -9,9 +10,10 @@ import ( func Middlewares( log log.Logger, limits tripperware.Limits, + queryAnalyzer querysharding.Analyzer, ) ([]tripperware.Middleware, error) { var m []tripperware.Middleware - m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec)) + m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer)) return m, nil } diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares.go b/pkg/querier/tripperware/queryrange/query_range_middlewares.go index e6f32dd2766..24714c54e1b 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares.go @@ -22,6 +22,7 @@ import ( "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/querysharding" "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/querier" @@ -76,6 +77,7 @@ func Middlewares( cacheExtractor Extractor, registerer prometheus.Registerer, cacheGenNumberLoader CacheGenNumberLoader, + queryAnalyzer querysharding.Analyzer, ) ([]tripperware.Middleware, cache.Cache, error) { // Metric used to keep track of each middleware execution duration. metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer) @@ -109,7 +111,7 @@ func Middlewares( queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer))) } - queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, ShardedPrometheusCodec)) + queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, ShardedPrometheusCodec, queryAnalyzer)) return queryRangeMiddleware, c, nil } diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index ffbb49ca278..cd45dea9df4 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -2,7 +2,7 @@ package queryrange import ( "context" - io "io" + "io" "net/http" "net/http/httptest" "net/url" @@ -11,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/querysharding" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" @@ -43,12 +44,14 @@ func TestRoundTrip(t *testing.T) { next: http.DefaultTransport, } + qa := querysharding.NewQueryAnalyzer() queyrangemiddlewares, _, err := Middlewares(Config{}, log.NewNopLogger(), mockLimits{}, nil, nil, nil, + qa, ) require.NoError(t, err) @@ -59,6 +62,8 @@ func TestRoundTrip(t *testing.T) { nil, PrometheusCodec, nil, + nil, + qa, ) for i, tc := range []struct { diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index a973e3e57d0..39f30a116b9 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -27,12 +27,14 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/querysharding" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/validation" ) // HandlerFunc is like http.HandlerFunc, but for Handler. @@ -98,6 +100,8 @@ func NewQueryTripperware( instantRangeMiddleware []Middleware, queryRangeCodec Codec, instantQueryCodec Codec, + limits Limits, + queryAnalyzer querysharding.Analyzer, ) Tripperware { // Per tenant query metrics. queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ @@ -139,7 +143,18 @@ func NewQueryTripperware( if isQueryRange { return queryrange.RoundTrip(r) - } else if isQuery && len(instantRangeMiddleware) > 0 { + } else if isQuery { + // If vertical sharding is not enabled for the tenant, use downstream roundtripper. + numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize) + if numShards <= 1 { + return next.RoundTrip(r) + } + // If the given query is not shardable, use downstream roundtripper. + query := r.FormValue("query") + analysis, err := queryAnalyzer.Analyze(query) + if err != nil || !analysis.IsShardable() { + return next.RoundTrip(r) + } return instantQuery.RoundTrip(r) } return next.RoundTrip(r) diff --git a/pkg/querier/tripperware/roundtrip_test.go b/pkg/querier/tripperware/roundtrip_test.go index 9d9456a3b13..66ee2d359b6 100644 --- a/pkg/querier/tripperware/roundtrip_test.go +++ b/pkg/querier/tripperware/roundtrip_test.go @@ -11,14 +11,19 @@ import ( "github.com/go-kit/log" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/querysharding" "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/validation" ) const ( - queryRange = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&stats=all&step=120" - query = "/api/v1/query?time=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680" - queryExemplar = "/api/v1/query_exemplars?query=test_exemplar_metric_total&start=2020-09-14T15:22:25.479Z&end=2020-09-14T15:23:25.479Z'" - responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}` + queryRange = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&stats=all&step=120" + query = "/api/v1/query?time=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680" + queryNonShardable = "/api/v1/query?time=1536716898&query=container_memory_rss&start=1536673680" + queryExemplar = "/api/v1/query_exemplars?query=test_exemplar_metric_total&start=2020-09-14T15:22:25.479Z&end=2020-09-14T15:23:25.479Z'" + responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}` ) type mockRequest struct { @@ -86,22 +91,54 @@ func TestRoundTrip(t *testing.T) { return mockMiddleware{} }), } - tw := NewQueryTripperware(log.NewNopLogger(), - nil, - nil, - middlewares, - middlewares, - mockCodec{}, - mockCodec{}, - ) + limits := validation.Limits{} + flagext.DefaultValues(&limits) + defaultOverrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) + + limitsWithVerticalSharding := validation.Limits{QueryVerticalShardSize: 3} + shardingOverrides, err := validation.NewOverrides(limitsWithVerticalSharding, nil) + require.NoError(t, err) for _, tc := range []struct { path, expectedBody string + limits Limits }{ - {"/foo", "bar"}, - {queryExemplar, "bar"}, - {queryRange, responseBody}, - {query, responseBody}, + { + path: "/foo", + expectedBody: "bar", + limits: defaultOverrides, + }, + { + path: queryExemplar, + expectedBody: "bar", + limits: defaultOverrides, + }, + { + path: queryRange, + expectedBody: responseBody, + limits: defaultOverrides, + }, + { + path: query, + expectedBody: "bar", + limits: defaultOverrides, + }, + { + path: queryNonShardable, + expectedBody: "bar", + limits: defaultOverrides, + }, + { + path: queryNonShardable, + expectedBody: "bar", + limits: shardingOverrides, + }, + { + path: query, + expectedBody: responseBody, + limits: shardingOverrides, + }, } { t.Run(tc.path, func(t *testing.T) { req, err := http.NewRequest("GET", tc.path, http.NoBody) @@ -115,6 +152,16 @@ func TestRoundTrip(t *testing.T) { err = user.InjectOrgIDIntoHTTPRequest(ctx, req) require.NoError(t, err) + tw := NewQueryTripperware(log.NewNopLogger(), + nil, + nil, + middlewares, + middlewares, + mockCodec{}, + mockCodec{}, + tc.limits, + querysharding.NewQueryAnalyzer(), + ) resp, err := tw(downstream).RoundTrip(req) require.NoError(t, err) require.Equal(t, 200, resp.StatusCode) diff --git a/pkg/querier/tripperware/shard_by.go b/pkg/querier/tripperware/shard_by.go index 13d6645612e..c596f40b637 100644 --- a/pkg/querier/tripperware/shard_by.go +++ b/pkg/querier/tripperware/shard_by.go @@ -17,23 +17,24 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -func ShardByMiddleware(logger log.Logger, limits Limits, merger Merger) Middleware { +func ShardByMiddleware(logger log.Logger, limits Limits, merger Merger, queryAnalyzer querysharding.Analyzer) Middleware { return MiddlewareFunc(func(next Handler) Handler { return shardBy{ - next: next, - limits: limits, - merger: merger, - logger: logger, + next: next, + limits: limits, + merger: merger, + logger: logger, + analyzer: queryAnalyzer, } }) } type shardBy struct { - next Handler - limits Limits - logger log.Logger - merger Merger - queryAnalyzer *querysharding.QueryAnalyzer + next Handler + limits Limits + logger log.Logger + merger Merger + analyzer querysharding.Analyzer } func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { @@ -51,7 +52,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { } logger := util_log.WithContext(ctx, s.logger) - analysis, err := s.queryAnalyzer.Analyze(r.GetQuery()) + analysis, err := s.analyzer.Analyze(r.GetQuery()) if err != nil { level.Warn(logger).Log("msg", "error analyzing query", "q", r.GetQuery(), "err", err) } diff --git a/pkg/querier/tripperware/test_shard_by_query_utils.go b/pkg/querier/tripperware/test_shard_by_query_utils.go index f673fc8e4e6..bb2d32619dd 100644 --- a/pkg/querier/tripperware/test_shard_by_query_utils.go +++ b/pkg/querier/tripperware/test_shard_by_query_utils.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/require" + thanosquerysharding "github.com/thanos-io/thanos/pkg/querysharding" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/user" @@ -406,7 +407,8 @@ http_requests_total`, next: http.DefaultTransport, } - roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), mockLimits{shardSize: tt.shardSize}, tt.codec)) + qa := thanosquerysharding.NewQueryAnalyzer() + roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), mockLimits{shardSize: tt.shardSize}, tt.codec, qa)) ctx := user.InjectOrgID(context.Background(), "1")