Skip to content

Reject small subquery step size in query frontend #5323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [CHANGE] Ingester: Creating label `native-histogram-sample` on the `cortex_discarded_samples_total` to keep track of discarded native histogram samples. #5289
* [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request.
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
25 changes: 25 additions & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"fmt"
"net/http"
"os"
"path/filepath"
"strconv"
Expand All @@ -33,6 +34,7 @@ type queryFrontendTestConfig struct {
querySchedulerEnabled bool
queryStatsEnabled bool
remoteReadEnabled bool
testSubQueryStepSize bool
setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string)
}

Expand Down Expand Up @@ -209,6 +211,19 @@ func TestQueryFrontendRemoteRead(t *testing.T) {
})
}

func TestQueryFrontendSubQueryStepSize(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
testSubQueryStepSize: true,
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))

minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))
return cortexConfigFile, flags
},
})
}

func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
const numUsers = 10
const numQueriesPerUser = 10
Expand Down Expand Up @@ -334,6 +349,12 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
require.True(t, len(res.Results[0].Timeseries[0].Labels) > 0)
}

// No need to repeat the test on subquery step size.
if userID == 0 && cfg.testSubQueryStepSize {
resp, _, _ := c.QueryRaw(`up[30d:1m]`, now)
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
}

// In this test we do ensure that the /series start/end time is ignored and Cortex
// always returns series in ingesters memory. No need to repeat it for each user.
if userID == 0 {
Expand Down Expand Up @@ -386,6 +407,10 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
extra++
}

if cfg.testSubQueryStepSize {
extra++
}

require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total"))

// The number of received request is greater than the query requests because include
Expand Down
11 changes: 10 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,12 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) {
// to optimize Prometheus query requests.
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
queryAnalyzer := querysharding.NewQueryAnalyzer()
defaultSubQueryInterval := t.Cfg.Querier.DefaultEvaluationInterval
// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
prometheusCodec := queryrange.NewPrometheusCodec(false, defaultSubQueryInterval)
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, defaultSubQueryInterval)

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
util_log.Logger,
Expand All @@ -458,6 +464,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
prometheus.DefaultRegisterer,
t.TombstonesLoader,
queryAnalyzer,
prometheusCodec,
shardedPrometheusCodec,
)
if err != nil {
return nil, err
Expand All @@ -473,10 +481,11 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
t.Cfg.QueryRange.ForwardHeaders,
queryRangeMiddlewares,
instantQueryMiddlewares,
queryrange.PrometheusCodec,
prometheusCodec,
instantquery.InstantQueryCodec,
t.Overrides,
queryAnalyzer,
defaultSubQueryInterval,
)

return services.NewIdleService(nil, func(_ error) error {
Expand Down
10 changes: 7 additions & 3 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
promqlparser "github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc/status"

promqlparser "github.com/prometheus/prometheus/promql/parser"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
Expand Down Expand Up @@ -109,7 +108,8 @@ func (r *PrometheusRequest) WithStats(stats string) tripperware.Request {

type instantQueryCodec struct {
tripperware.Codec
now func() time.Time
now func() time.Time
noStepSubQueryInterval time.Duration
}

func newInstantQueryCodec() instantQueryCodec {
Expand Down Expand Up @@ -139,6 +139,10 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
}

result.Query = r.FormValue("query")
if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil {
return nil, err
}

result.Stats = r.FormValue("stats")
result.Path = r.URL.Path

Expand Down
4 changes: 1 addition & 3 deletions pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import (
func TestRequest(t *testing.T) {
t.Parallel()
now := time.Now()
codec := instantQueryCodec{now: func() time.Time {
return now
}}
codec := InstantQueryCodec

for _, tc := range []struct {
url string
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/tripperware/instantquery/shard_by_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package instantquery

import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
)

func Test_shardQuery(t *testing.T) {
t.Parallel()
tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.ShardedPrometheusCodec)
tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true, time.Minute))
}
20 changes: 14 additions & 6 deletions pkg/querier/tripperware/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,21 @@ var (
errNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "zero or negative query resolution step widths are not accepted. Try a positive integer")
errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")

// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
PrometheusCodec tripperware.Codec = &prometheusCodec{sharded: false}
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
ShardedPrometheusCodec tripperware.Codec = &prometheusCodec{sharded: true}

// Name of the cache control header.
cacheControlHeader = "Cache-Control"
)

type prometheusCodec struct {
sharded bool

noStepSubQueryInterval time.Duration
}

func NewPrometheusCodec(sharded bool, noStepSubQueryInterval time.Duration) *prometheusCodec { //nolint:revive
return &prometheusCodec{
sharded: sharded,
noStepSubQueryInterval: noStepSubQueryInterval,
}
}

// WithStartEnd clones the current `PrometheusRequest` with a new `start` and `end` timestamp.
Expand Down Expand Up @@ -166,7 +170,7 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Reques
return &response, nil
}

func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) {
func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) {
var result PrometheusRequest
var err error
result.Start, err = util.ParseTime(r.FormValue("start"))
Expand Down Expand Up @@ -199,6 +203,10 @@ func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forward
}

result.Query = r.FormValue("query")
if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil {
return nil, err
}

result.Stats = r.FormValue("stats")
result.Path = r.URL.Path

Expand Down
8 changes: 5 additions & 3 deletions pkg/querier/tripperware/queryrange/query_range_middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func Middlewares(
registerer prometheus.Registerer,
cacheGenNumberLoader CacheGenNumberLoader,
queryAnalyzer querysharding.Analyzer,
prometheusCodec tripperware.Codec,
shardedPrometheusCodec tripperware.Codec,
) ([]tripperware.Middleware, cache.Cache, error) {
// Metric used to keep track of each middleware execution duration.
metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer)
Expand All @@ -88,7 +90,7 @@ func Middlewares(
}
if cfg.SplitQueriesByInterval != 0 {
staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, PrometheusCodec, registerer))
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer))
}

var c cache.Cache
Expand All @@ -99,7 +101,7 @@ func Middlewares(
}
return false
}
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, PrometheusCodec, cacheExtractor, cacheGenNumberLoader, shouldCache, registerer)
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, cacheGenNumberLoader, shouldCache, registerer)
if err != nil {
return nil, nil, err
}
Expand All @@ -111,7 +113,7 @@ func Middlewares(
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer)))
}

queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, ShardedPrometheusCodec, queryAnalyzer))
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, shardedPrometheusCodec, queryAnalyzer))

return queryRangeMiddleware, c, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"strconv"
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
Expand All @@ -18,6 +19,11 @@ import (
"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

var (
PrometheusCodec = NewPrometheusCodec(false, time.Minute)
ShardedPrometheusCodec = NewPrometheusCodec(false, time.Minute)
)

func TestRoundTrip(t *testing.T) {
t.Parallel()
s := httptest.NewServer(
Expand Down Expand Up @@ -53,6 +59,8 @@ func TestRoundTrip(t *testing.T) {
nil,
nil,
qa,
PrometheusCodec,
ShardedPrometheusCodec,
)
require.NoError(t, err)

Expand All @@ -65,6 +73,7 @@ func TestRoundTrip(t *testing.T) {
nil,
nil,
qa,
time.Minute,
)

for i, tc := range []struct {
Expand Down
6 changes: 5 additions & 1 deletion pkg/querier/tripperware/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"compress/gzip"
"context"
"fmt"
io "io"
"io"
"net/http"
"strconv"
"testing"
Expand Down Expand Up @@ -61,6 +61,10 @@ func TestRequest(t *testing.T) {
url: "api/v1/query_range?start=0&end=11001&step=1",
expectedErr: errStepTooSmall,
},
{
url: "/api/v1/query?query=up%5B30d%3A%5D&start=123&end=456&step=10",
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tripperware.ErrSubQueryStepTooSmall, 11000),
},
} {
tc := tc
t.Run(tc.url, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/queryrange/step_align.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// StepAlignMiddleware aligns the start and end of request to the step to
// improved the cacheability of the query results.
// improve the cacheability of the query results.
var StepAlignMiddleware = tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler {
return stepAlign{
next: next,
Expand Down
10 changes: 8 additions & 2 deletions pkg/querier/tripperware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func NewQueryTripperware(
instantQueryCodec Codec,
limits Limits,
queryAnalyzer querysharding.Analyzer,
defaultSubQueryInterval time.Duration,
) Tripperware {
// Per tenant query metrics.
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -144,13 +145,18 @@ func NewQueryTripperware(
if isQueryRange {
return queryrange.RoundTrip(r)
} else if isQuery {
// If the given query is not shardable, use downstream roundtripper.
query := r.FormValue("query")
// Check subquery step size.
if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, MaxStep); err != nil {
return nil, err
}

// If vertical sharding is not enabled for the tenant, use downstream roundtripper.
numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize)
if numShards <= 1 {
return next.RoundTrip(r)
}
// If the given query is not shardable, use downstream roundtripper.
query := r.FormValue("query")
analysis, err := queryAnalyzer.Analyze(query)
if err != nil || !analysis.IsShardable() {
return next.RoundTrip(r)
Expand Down
Loading