diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c03a76983f..71cd8163b43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [CHANGE] Ingester: Remove `-querier.query-store-for-labels-enabled` flag. Querying long-term store for labels is always enabled. #5984 * [CHANGE] Server: Instrument `cortex_request_duration_seconds` metric with native histogram. If `native-histograms` feature is enabled in monitoring Prometheus then the metric name needs to be updated in your dashboards. #6056 * [FEATURE] Ingester: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986 +* [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005 * [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987 * [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892 * [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 6bea83aa01b..24240c36bf7 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3285,6 +3285,20 @@ query_priority: # List of priority definitions. [priorities: | default = []] +# Configuration for query rejection. +query_rejection: + # Whether query rejection is enabled. + # CLI flag: -frontend.query-rejection.enabled + [enabled: | default = false] + + # List of query_attributes to match and reject queries. A query is rejected if + # it matches any query_attribute in this list. Each query_attribute has + # several properties (e.g., regex, time_window, user_agent), and all specified + # properties must match for a query_attribute to be considered a match. Only + # the specified properties are checked, and an AND operator is applied to + # them. + [query_attributes: | default = []] + # Duration to delay the evaluation of rules to ensure the underlying metrics # have been pushed to Cortex. # CLI flag: -ruler.evaluation-delay-duration @@ -5345,14 +5359,24 @@ limits: # priority level. Value between 0 and 1 will be used as a percentage. [reserved_queriers: | default = 0] -# List of query attributes to assign the priority. +# List of query_attributes to match and assign priority to queries. A query is +# assigned to this priority if it matches any query_attribute in this list. Each +# query_attribute has several properties (e.g., regex, time_window, user_agent), +# and all specified properties must match for a query_attribute to be considered +# a match. Only the specified properties are checked, and an AND operator is +# applied to them. [query_attributes: | default = []] ``` ### `QueryAttribute` ```yaml -# Regex that the query string should match. If not set, it won't be checked. +# API type for the query. Should be one of the query, query_range, series, +# labels, label_values. If not set, it won't be checked. +[api_type: | default = ""] + +# Regex that the query string (or at least one of the matchers in metadata +# query) should match. If not set, it won't be checked. [regex: | default = ""] # Overall data select time window (including range selectors, modifiers and @@ -5368,6 +5392,49 @@ time_window: # lookback delta) that the query should be within. If set to 0, it won't be # checked. [end: | default = 0] + +# Query time range should be within this limit to match. Depending on where it +# was used, in most of the use-cases, either min or max value will be used. If +# not set, it won't be checked. +time_range_limit: + # This will be duration (12h, 1d, 15d etc.). Query time range should be above + # or equal to this value to match. Ex: if this value is 20d, then queries + # whose range is bigger than or equal to 20d will match. If set to 0, it won't + # be checked. + [min: | default = 0] + + # This will be duration (12h, 1d, 15d etc.). Query time range should be below + # or equal to this value to match. Ex: if this value is 24h, then queries + # whose range is smaller than or equal to 24h will match.If set to 0, it won't + # be checked. + [max: | default = 0] + +# If query step provided should be within this limit to match. If not set, it +# won't be checked. This property only applied to range queries and ignored for +# other types of queries. +query_step_limit: + # Query step should be above or equal to this value to match. If set to 0, it + # won't be checked. + [min: | default = 0] + + # Query step should be below or equal to this value to match. If set to 0, it + # won't be checked. + [max: | default = 0] + +# Regex that User-Agent header of the request should match. If not set, it won't +# be checked. +[user_agent_regex: | default = ""] + +# Grafana includes X-Dashboard-Uid header in query requests. If this field is +# provided then X-Dashboard-Uid header of request should match this value. If +# not set, it won't be checked. This property won't be applied to metadata +# queries. +[dashboard_uid: | default = ""] + +# Grafana includes X-Panel-Id header in query requests. If this field is +# provided then X-Panel-Id header of request should match this value. If not +# set, it won't be checked. This property won't be applied to metadata queries. +[panel_id: | default = ""] ``` ### `DisabledRuleGroup` diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index fd6bbe66412..7c3691d99d8 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -115,3 +115,4 @@ Currently experimental features are: - Ingestion can be enabled by setting `-blocks-storage.tsdb.enable-native-histograms=true` on Ingester. - String interning for metrics labels - Enable string interning for metrics labels by setting `-ingester.labels-string-interning-enabled` on Ingester. +- Query-frontend: query rejection (`-frontend.query-rejection.enabled`) diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 003cef426de..a993816f0a0 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -234,7 +234,7 @@ func (c *Client) QueryRange(query string, start, end time.Time, step time.Durati } // QueryRangeRaw runs a ranged query directly against the querier API. -func (c *Client) QueryRangeRaw(query string, start, end time.Time, step time.Duration) (*http.Response, []byte, error) { +func (c *Client) QueryRangeRaw(query string, start, end time.Time, step time.Duration, headers map[string]string) (*http.Response, []byte, error) { addr := fmt.Sprintf( "http://%s/api/prom/api/v1/query_range?query=%s&start=%s&end=%s&step=%s", c.querierAddress, @@ -244,11 +244,11 @@ func (c *Client) QueryRangeRaw(query string, start, end time.Time, step time.Dur strconv.FormatFloat(step.Seconds(), 'f', -1, 64), ) - return c.query(addr) + return c.query(addr, headers) } // QueryRaw runs a query directly against the querier API. -func (c *Client) QueryRaw(query string, ts time.Time) (*http.Response, []byte, error) { +func (c *Client) QueryRaw(query string, ts time.Time, headers map[string]string) (*http.Response, []byte, error) { u := &url.URL{ Scheme: "http", Path: fmt.Sprintf("%s/api/prom/api/v1/query", c.querierAddress), @@ -260,11 +260,11 @@ func (c *Client) QueryRaw(query string, ts time.Time) (*http.Response, []byte, e q.Set("time", FormatTime(ts)) } u.RawQuery = q.Encode() - return c.query(u.String()) + return c.query(u.String(), headers) } // SeriesRaw runs a series request directly against the querier API. -func (c *Client) SeriesRaw(matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) { +func (c *Client) SeriesRaw(matches []string, startTime, endTime time.Time, headers map[string]string) (*http.Response, []byte, error) { u := &url.URL{ Scheme: "http", Path: fmt.Sprintf("%s/api/prom/api/v1/series", c.querierAddress), @@ -283,11 +283,11 @@ func (c *Client) SeriesRaw(matches []string, startTime, endTime time.Time) (*htt } u.RawQuery = q.Encode() - return c.query(u.String()) + return c.query(u.String(), headers) } // LabelNamesRaw runs a label names request directly against the querier API. -func (c *Client) LabelNamesRaw(matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) { +func (c *Client) LabelNamesRaw(matches []string, startTime, endTime time.Time, headers map[string]string) (*http.Response, []byte, error) { u := &url.URL{ Scheme: "http", Path: fmt.Sprintf("%s/api/prom/api/v1/labels", c.querierAddress), @@ -306,11 +306,11 @@ func (c *Client) LabelNamesRaw(matches []string, startTime, endTime time.Time) ( } u.RawQuery = q.Encode() - return c.query(u.String()) + return c.query(u.String(), headers) } // LabelValuesRaw runs a label values request directly against the querier API. -func (c *Client) LabelValuesRaw(label string, matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) { +func (c *Client) LabelValuesRaw(label string, matches []string, startTime, endTime time.Time, headers map[string]string) (*http.Response, []byte, error) { u := &url.URL{ Scheme: "http", Path: fmt.Sprintf("%s/api/prom/api/v1/label/%s/values", c.querierAddress, label), @@ -329,7 +329,7 @@ func (c *Client) LabelValuesRaw(label string, matches []string, startTime, endTi } u.RawQuery = q.Encode() - return c.query(u.String()) + return c.query(u.String(), headers) } // RemoteRead runs a remote read query. @@ -398,7 +398,7 @@ func (c *Client) RemoteRead(matchers []*labels.Matcher, start, end time.Time, st return &resp, nil } -func (c *Client) query(addr string) (*http.Response, []byte, error) { +func (c *Client) query(addr string, headers map[string]string) (*http.Response, []byte, error) { ctx, cancel := context.WithTimeout(context.Background(), c.timeout) defer cancel() @@ -409,6 +409,10 @@ func (c *Client) query(addr string) (*http.Response, []byte, error) { req.Header.Set("X-Scope-OrgID", c.orgID) + for key, value := range headers { + req.Header.Set(key, value) + } + retries := backoff.New(ctx, backoff.Config{ MinBackoff: 1 * time.Second, MaxBackoff: 2 * time.Second, diff --git a/integration/querier_test.go b/integration/querier_test.go index 733252e3315..a5e92e299e7 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -901,22 +901,22 @@ func TestQuerierWithBlocksStorageLimits(t *testing.T) { require.NoError(t, err) // We expect all queries hitting 422 exceeded series limit on store gateway. - resp, body, err := c.QueryRangeRaw(`{job="test"}`, seriesTimestamp.Add(-time.Second), seriesTimestamp, time.Second) + resp, body, err := c.QueryRangeRaw(`{job="test"}`, seriesTimestamp.Add(-time.Second), seriesTimestamp, time.Second, map[string]string{}) require.NoError(t, err) require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) require.Contains(t, string(body), "exceeded series limit") - resp, body, err = c.SeriesRaw([]string{`{job="test"}`}, seriesTimestamp.Add(-time.Second), seriesTimestamp) + resp, body, err = c.SeriesRaw([]string{`{job="test"}`}, seriesTimestamp.Add(-time.Second), seriesTimestamp, map[string]string{}) require.NoError(t, err) require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) require.Contains(t, string(body), "exceeded series limit") - resp, body, err = c.LabelNamesRaw([]string{`{job="test"}`}, seriesTimestamp.Add(-time.Second), seriesTimestamp) + resp, body, err = c.LabelNamesRaw([]string{`{job="test"}`}, seriesTimestamp.Add(-time.Second), seriesTimestamp, map[string]string{}) require.NoError(t, err) require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) require.Contains(t, string(body), "exceeded series limit") - resp, body, err = c.LabelValuesRaw("job", []string{`{job="test"}`}, seriesTimestamp.Add(-time.Second), seriesTimestamp) + resp, body, err = c.LabelValuesRaw("job", []string{`{job="test"}`}, seriesTimestamp.Add(-time.Second), seriesTimestamp, map[string]string{}) require.NoError(t, err) require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) require.Contains(t, string(body), "exceeded series limit") @@ -994,7 +994,7 @@ func TestQuerierWithStoreGatewayDataBytesLimits(t *testing.T) { require.NoError(t, err) // We expect all queries hitting 422 exceeded series limit - resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp) + resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp, map[string]string{}) require.NoError(t, err) require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) require.Contains(t, string(body), "exceeded bytes limit") @@ -1245,7 +1245,7 @@ func TestQuerierMaxSamplesLimit(t *testing.T) { var body []byte for retries.Ongoing() { // We expect request to hit max samples limit. - res, body, err = c.QueryRaw(`sum({job="test"})`, series1Timestamp) + res, body, err = c.QueryRaw(`sum({job="test"})`, series1Timestamp, map[string]string{}) if err == nil { break } diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 8703e8f777f..0af72024c40 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -4,6 +4,8 @@ package integration import ( + "bytes" + "context" "crypto/x509" "crypto/x509/pkix" "fmt" @@ -21,6 +23,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore/providers/s3" "github.com/thanos-io/thanos/pkg/pool" "github.com/cortexproject/cortex/integration/ca" @@ -28,6 +31,7 @@ import ( e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/cortexproject/cortex/pkg/querier/tripperware" ) type queryFrontendTestConfig struct { @@ -314,7 +318,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { // No need to repeat the test on missing metric name for each user. if userID == 0 && cfg.testMissingMetricName { - res, body, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now()) + res, body, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now(), map[string]string{}) require.NoError(t, err) require.Equal(t, 422, res.StatusCode) require.Contains(t, string(body), "query must contain metric name") @@ -338,7 +342,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { // No need to repeat the test on Server-Timing header for each user. if userID == 0 && cfg.queryStatsEnabled { - res, _, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now()) + res, _, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now(), map[string]string{}) require.NoError(t, err) require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0]) } @@ -357,7 +361,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { // No need to repeat the test on subquery step size. if userID == 0 && cfg.testSubQueryStepSize { - resp, _, _ := c.QueryRaw(`up[30d:1m]`, now) + resp, _, _ := c.QueryRaw(`up[30d:1m]`, now, map[string]string{}) require.Equal(t, http.StatusBadRequest, resp.StatusCode) } @@ -511,7 +515,7 @@ func TestQueryFrontendNoRetryChunkPool(t *testing.T) { require.NoError(t, err) // We expect request to hit chunk pool exhaustion. - resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp) + resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp, map[string]string{}) require.NoError(t, err) require.Equal(t, http.StatusInternalServerError, resp.StatusCode) require.Contains(t, string(body), pool.ErrPoolExhausted.Error()) @@ -562,26 +566,236 @@ func TestQueryFrontendMaxQueryLengthLimits(t *testing.T) { now := time.Now() // We expect request to hit max query length limit. - resp, body, err := c.QueryRangeRaw(`rate(test[1m])`, now.Add(-90*time.Hour*24), now, 10*time.Hour) + resp, body, err := c.QueryRangeRaw(`rate(test[1m])`, now.Add(-90*time.Hour*24), now, 10*time.Hour, map[string]string{}) require.NoError(t, err) require.Equal(t, http.StatusBadRequest, resp.StatusCode) require.Contains(t, string(body), "the query time range exceeds the limit") // We expect request to hit max query length limit. - resp, body, err = cSharding.QueryRangeRaw(`rate(test[1m])`, now.Add(-90*time.Hour*24), now, 10*time.Hour) + resp, body, err = cSharding.QueryRangeRaw(`rate(test[1m])`, now.Add(-90*time.Hour*24), now, 10*time.Hour, map[string]string{}) require.NoError(t, err) require.Equal(t, http.StatusBadRequest, resp.StatusCode) require.Contains(t, string(body), "the query time range exceeds the limit") // We expect request to hit max query length limit. - resp, body, err = c.QueryRaw(`rate(test[90d])`, now) + resp, body, err = c.QueryRaw(`rate(test[90d])`, now, map[string]string{}) require.NoError(t, err) require.Equal(t, http.StatusBadRequest, resp.StatusCode) require.Contains(t, string(body), "the query time range exceeds the limit") // We expect request to hit max query length limit. - resp, body, err = cSharding.QueryRaw(`rate(test[90d])`, now) + resp, body, err = cSharding.QueryRaw(`rate(test[90d])`, now, map[string]string{}) require.NoError(t, err) require.Equal(t, http.StatusBadRequest, resp.StatusCode) require.Contains(t, string(body), "the query time range exceeds the limit") } + +func TestQueryFrontendQueryRejection(t *testing.T) { + configFileName := "runtime-config.yaml" + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-runtime-config.backend": "s3", + "-runtime-config.s3.access-key-id": e2edb.MinioAccessKey, + "-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey, + "-runtime-config.s3.bucket-name": bucketName, + "-runtime-config.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName), + "-runtime-config.s3.insecure": "true", + "-runtime-config.file": configFileName, + "-runtime-config.reload-period": "1s", + }) + + client, err := s3.NewBucketWithConfig(nil, s3.Config{ + Endpoint: minio.HTTPEndpoint(), + Insecure: true, + Bucket: bucketName, + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + }, "runtime-config-test") + + require.NoError(t, err) + + // update runtime config + newRuntimeConfig := []byte(`overrides: + user-1: + query_rejection: + enabled: true + query_attributes: + - api_type: "query" + regex: .*rate.* + query_step_limit: + min: 6s + max: 20m + dashboard_uid: "dash123" +`) + require.NoError(t, client.Upload(context.Background(), configFileName, bytes.NewReader(newRuntimeConfig))) + time.Sleep(2 * time.Second) + + queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint() + + // Start all other services. + ingester := e2ecortex.NewIngesterWithConfigFile("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "") + querier := e2ecortex.NewQuerierWithConfigFile("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "") + + require.NoError(t, s.StartAndWaitReady(querier, ingester)) + require.NoError(t, s.WaitReady(queryFrontend)) + + // Wait until querier have updated the ring. + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + now := time.Now() + // We expect request to be rejected, as it matches query_attribute of query_rejection (contains rate, contains dashboard header dash123). step limit is ignored for instant queries + // Query shouldn't be checked against attributes that is not provided in query_attribute config(time_window, time_range_limit, user_agent_regex, panel_id) + resp, body, err := c.QueryRaw(`min_over_time( rate(http_requests_total[5m])[30m:5s] )`, now, map[string]string{"X-Dashboard-Uid": "dash123", "User-Agent": "grafana"}) + require.NoError(t, err) + require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + require.Contains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request not to be rejected, as it doesn't match api_type + resp, body, err = c.QueryRangeRaw(`min_over_time( rate(http_requests_total[5m])[30m:5s] )`, now.Add(-11*time.Hour), now.Add(-8*time.Hour), 25*time.Minute, map[string]string{"X-Dashboard-Uid": "dash123", "User-Agent": "grafana"}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NotContains(t, string(body), tripperware.QueryRejectErrorMessage) + + // update runtime config + newRuntimeConfig = []byte(`overrides: + user-1: + query_rejection: + enabled: true + query_attributes: + - regex: .*rate.* + time_window: + start: 12h + end: 0h + time_range_limit: + min: 2h + max: 6h + query_step_limit: + min: 22m + dashboard_uid: "dash123" + user_agent_regex: "grafana.*" +`) + require.NoError(t, client.Upload(context.Background(), configFileName, bytes.NewReader(newRuntimeConfig))) + time.Sleep(2 * time.Second) + + // We expect request to be rejected, as it matches query_attribute (contains 'rate', within time_window(11h-8h), within time range(3h), within step limit(25m>22m), contains dashboard header(dash123) and user-agent matches regex). + resp, body, err = c.QueryRangeRaw(`rate(test[1m])`, now.Add(-11*time.Hour), now.Add(-8*time.Hour), 25*time.Minute, map[string]string{"X-Dashboard-Uid": "dash123", "User-Agent": "grafana"}) + require.NoError(t, err) + require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + require.Contains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request not to be rejected, as it doesn't match query step limit (min is 22m, and actual step is 20m) + resp, body, err = c.QueryRangeRaw(`rate(test[1m])`, now.Add(-11*time.Hour), now.Add(-8*time.Hour), 20*time.Minute, map[string]string{"X-Dashboard-Uid": "dash123", "User-Agent": "grafana"}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NotContains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request not to be rejected, as it goes beyond time_window(-15h is outside of 12h-0h window) + resp, body, err = c.QueryRangeRaw(`rate(test[1m])`, now.Add(-15*time.Hour), now.Add(-8*time.Hour), 25*time.Minute, map[string]string{"X-Dashboard-Uid": "dash123", "User-Agent": "grafana"}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NotContains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request not to be rejected as it goes beyond time-range(9h is bigger than max time range of 6h) + resp, body, err = c.QueryRangeRaw(`rate(test[1m])`, now.Add(-11*time.Hour), now.Add(-2*time.Hour), 25*time.Minute, map[string]string{"X-Dashboard-Uid": "dash123", "User-Agent": "grafana"}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NotContains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request not to be rejected, as it doesn't match regex (doesn't contain 'rate') + resp, body, err = c.QueryRangeRaw(`increase(test[1m])`, now.Add(-11*time.Hour), now.Add(-8*time.Hour), 25*time.Minute, map[string]string{"X-Dashboard-Uid": "dash123", "User-Agent": "grafana"}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NotContains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request not to be rejected, as it doesn't match user-agent regex (doesn't contain 'grafana') + resp, body, err = c.QueryRangeRaw(`rate(test[1m])`, now.Add(-11*time.Hour), now.Add(-8*time.Hour), 25*time.Minute, map[string]string{"X-Dashboard-Uid": "dash123", "User-Agent": "go-client/v0.19.0"}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NotContains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request not to be rejected, as it doesn't match grafana dashboard uid ('dash123' != 'new-dashboard') + resp, body, err = c.QueryRangeRaw(`rate(test[1m])`, now.Add(-11*time.Hour), now.Add(-8*time.Hour), 25*time.Minute, map[string]string{"X-Dashboard-Uid": "new-dashboard", "User-Agent": "grafana"}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NotContains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request to be rejected for series request, as it has at least one matcher matching regex(contains 'rate'), within time_window(11h-8h, within time_range(3h). + // query_step_limit, dashboard_uid, panel_id fields are ignored for metadata queries. + resp, body, err = c.SeriesRaw([]string{`http_requests_rate_total{job="prometheus"}`}, now.Add(-11*time.Hour), now.Add(-8*time.Hour), map[string]string{"User-Agent": "grafana-agent/v0.19.0"}) + require.NoError(t, err) + require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + require.Contains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request not to be rejected for series request as it does not have at least one matcher matching regex + resp, body, err = c.SeriesRaw([]string{`http_requests_total{job="prometheus"}`}, now.Add(-11*time.Hour), now.Add(-8*time.Hour), map[string]string{"User-Agent": "grafana-agent/v0.19.0"}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NotContains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request to be rejected for labels request, as it has at least one matcher matching regex(contains 'rate'), within time_window(11h-8h, within time_range(3h). + // query_step_limit, dashboard_uid, panel_id properties are ignored for metadata queries. + resp, body, err = c.LabelNamesRaw([]string{`http_requests_rate_total{job="prometheus"}`}, now.Add(-11*time.Hour), now.Add(-8*time.Hour), map[string]string{"User-Agent": "grafana-agent/v0.19.0"}) + require.NoError(t, err) + require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + require.Contains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request not to be rejected if label/label_values request has no matcher, but rejection query_attribute has regex property specified. + // All the provided query_attributes fields that can be applied to metadata queries should match + resp, body, err = c.LabelNamesRaw([]string{}, now.Add(-11*time.Hour), now.Add(-8*time.Hour), map[string]string{"User-Agent": "grafana-agent/v0.19.0"}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NotContains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request not to be rejected if label/label_values request doesn't provide time but rejection query_attribute has time_window property + resp, body, err = c.LabelNamesRaw([]string{`http_requests_rate_total{job="prometheus"}`}, time.Time{}, time.Time{}, map[string]string{"User-Agent": "grafana-agent/v0.19.0"}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NotContains(t, string(body), tripperware.QueryRejectErrorMessage) + + // We expect request not to be rejected if label/label_values request doesn't provide one of the time(startTime or endTime) but rejection query_attribute has both time_window limits + resp, body, err = c.LabelNamesRaw([]string{`http_requests_rate_total{job="prometheus"}`}, now.Add(-11*time.Hour), time.Time{}, map[string]string{"User-Agent": "grafana-agent/v0.19.0"}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NotContains(t, string(body), tripperware.QueryRejectErrorMessage) + + // update runtime config + newRuntimeConfig = []byte(`overrides: + user-1: + query_rejection: + enabled: true + query_attributes: + - regex: .*rate.* + time_window: + start: 12h + end: 0h + - user_agent_regex: "grafana.*" +`) + require.NoError(t, client.Upload(context.Background(), configFileName, bytes.NewReader(newRuntimeConfig))) + time.Sleep(2 * time.Second) + + // We expect request to be rejected if any of the listed query_attributes configuration matches. Two query_attributes provided here, and doesn't match regex from first one, but matches second one. + // query rejection should consider only provided attributes. + // There is no regex, time_window, time_range_limit on second query_attribute. Only user_agent_regex provided so any query with this agent should be rejected + resp, body, err = c.LabelValuesRaw("cluster", []string{}, time.Time{}, time.Time{}, map[string]string{"User-Agent": "grafana-agent/v0.19.0"}) + require.NoError(t, err) + require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + require.Contains(t, string(body), tripperware.QueryRejectErrorMessage) + +} diff --git a/integration/zone_aware_test.go b/integration/zone_aware_test.go index a2e983eb9b9..215d488ec3f 100644 --- a/integration/zone_aware_test.go +++ b/integration/zone_aware_test.go @@ -135,7 +135,7 @@ func TestZoneAwareReplication(t *testing.T) { require.NoError(t, ingester3.Kill()) // Query back any series => fail (either because of a timeout or 500) - result, _, err := client.QueryRaw("series_1", time.Now()) + result, _, err := client.QueryRaw("series_1", time.Now(), map[string]string{}) if !errors.Is(err, context.DeadlineExceeded) { require.NoError(t, err) require.Equal(t, 500, result.StatusCode) diff --git a/pkg/querier/tripperware/limits.go b/pkg/querier/tripperware/limits.go index 815693b3c1d..54d712842f1 100644 --- a/pkg/querier/tripperware/limits.go +++ b/pkg/querier/tripperware/limits.go @@ -28,4 +28,6 @@ type Limits interface { // QueryPriority returns the query priority config for the tenant, including different priorities and their attributes. QueryPriority(userID string) validation.QueryPriority + + QueryRejection(userID string) validation.QueryRejection } diff --git a/pkg/querier/tripperware/priority.go b/pkg/querier/tripperware/priority.go deleted file mode 100644 index 07326d22d50..00000000000 --- a/pkg/querier/tripperware/priority.go +++ /dev/null @@ -1,51 +0,0 @@ -package tripperware - -import ( - "time" - - "github.com/cortexproject/cortex/pkg/util/validation" -) - -func GetPriority(query string, minTime, maxTime int64, now time.Time, queryPriority validation.QueryPriority) int64 { - if !queryPriority.Enabled || query == "" || len(queryPriority.Priorities) == 0 { - return queryPriority.DefaultPriority - } - - for _, priority := range queryPriority.Priorities { - for _, attribute := range priority.QueryAttributes { - if attribute.Regex != "" && attribute.Regex != ".*" && attribute.Regex != ".+" { - if attribute.CompiledRegex != nil && !attribute.CompiledRegex.MatchString(query) { - continue - } - } - - if isWithinTimeAttributes(attribute.TimeWindow, now, minTime, maxTime) { - return priority.Priority - } - } - } - - return queryPriority.DefaultPriority -} - -func isWithinTimeAttributes(timeWindow validation.TimeWindow, now time.Time, startTime, endTime int64) bool { - if timeWindow.Start == 0 && timeWindow.End == 0 { - return true - } - - if timeWindow.Start != 0 { - startTimeThreshold := now.Add(-1 * time.Duration(timeWindow.Start).Abs()).Add(-1 * time.Minute).Truncate(time.Minute).UnixMilli() - if startTime < startTimeThreshold { - return false - } - } - - if timeWindow.End != 0 { - endTimeThreshold := now.Add(-1 * time.Duration(timeWindow.End).Abs()).Add(1 * time.Minute).Truncate(time.Minute).UnixMilli() - if endTime > endTimeThreshold { - return false - } - } - - return true -} diff --git a/pkg/querier/tripperware/priority_test.go b/pkg/querier/tripperware/priority_test.go deleted file mode 100644 index 12c1243ed61..00000000000 --- a/pkg/querier/tripperware/priority_test.go +++ /dev/null @@ -1,222 +0,0 @@ -package tripperware - -import ( - "regexp" - "testing" - "time" - - "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" - - "github.com/cortexproject/cortex/pkg/util/validation" -) - -func Test_GetPriorityShouldReturnDefaultPriorityIfNotEnabledOrInvalidQueryString(t *testing.T) { - now := time.Now() - limits := mockLimits{queryPriority: validation.QueryPriority{ - Priorities: []validation.PriorityDef{ - { - Priority: 1, - QueryAttributes: []validation.QueryAttribute{ - { - Regex: ".*", - CompiledRegex: regexp.MustCompile(".*"), - }, - }, - }, - }, - }} - - type testCase struct { - query string - queryPriorityEnabled bool - } - - tests := map[string]testCase{ - "should miss if query priority not enabled": { - query: "up", - }, - "should miss if query string empty": { - query: "", - queryPriorityEnabled: true, - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - limits.queryPriority.Enabled = testData.queryPriorityEnabled - priority := GetPriority(testData.query, 0, 0, now, limits.queryPriority) - assert.Equal(t, int64(0), priority) - }) - } -} - -func Test_GetPriorityShouldConsiderRegex(t *testing.T) { - now := time.Now() - limits := mockLimits{queryPriority: validation.QueryPriority{ - Enabled: true, - Priorities: []validation.PriorityDef{ - { - Priority: 1, - QueryAttributes: []validation.QueryAttribute{ - {}, - }, - }, - }, - }} - - type testCase struct { - regex string - query string - expectedPriority int - } - - tests := map[string]testCase{ - "should hit if regex matches": { - regex: "(^sum|c(.+)t)", - query: "sum(up)", - expectedPriority: 1, - }, - "should miss if regex doesn't match": { - regex: "(^sum|c(.+)t)", - query: "min(up)", - expectedPriority: 0, - }, - "should hit if regex matches - .*": { - regex: ".*", - query: "count(sum(up))", - expectedPriority: 1, - }, - "should hit if regex matches - .+": { - regex: ".+", - query: "count(sum(up))", - expectedPriority: 1, - }, - "should hit if regex is an empty string": { - regex: "", - query: "sum(up)", - expectedPriority: 1, - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - limits.queryPriority.Priorities[0].QueryAttributes[0].Regex = testData.regex - limits.queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = regexp.MustCompile(testData.regex) - priority := GetPriority(testData.query, 0, 0, now, limits.queryPriority) - assert.Equal(t, int64(testData.expectedPriority), priority) - }) - } -} - -func Test_GetPriorityShouldConsiderStartAndEndTime(t *testing.T) { - now := time.Now() - limits := mockLimits{queryPriority: validation.QueryPriority{ - Enabled: true, - Priorities: []validation.PriorityDef{ - { - Priority: 1, - QueryAttributes: []validation.QueryAttribute{ - { - Regex: ".*", - CompiledRegex: regexp.MustCompile(".*"), - TimeWindow: validation.TimeWindow{ - Start: model.Duration(45 * time.Minute), - End: model.Duration(15 * time.Minute), - }, - }, - }, - }, - }, - }} - - type testCase struct { - start time.Time - end time.Time - expectedPriority int - } - - tests := map[string]testCase{ - "should hit between start and end time": { - start: now.Add(-40 * time.Minute), - end: now.Add(-20 * time.Minute), - expectedPriority: 1, - }, - "should hit equal to start and end time": { - start: now.Add(-45 * time.Minute), - end: now.Add(-15 * time.Minute), - expectedPriority: 1, - }, - "should miss outside of start time": { - start: now.Add(-50 * time.Minute), - end: now.Add(-15 * time.Minute), - expectedPriority: 0, - }, - "should miss completely outside of start time": { - start: now.Add(-50 * time.Minute), - end: now.Add(-45 * time.Minute), - expectedPriority: 0, - }, - "should miss outside of end time": { - start: now.Add(-45 * time.Minute), - end: now.Add(-10 * time.Minute), - expectedPriority: 0, - }, - "should miss completely outside of end time": { - start: now.Add(-15 * time.Minute), - end: now.Add(-10 * time.Minute), - expectedPriority: 0, - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - priority := GetPriority("sum(up)", testData.start.UnixMilli(), testData.end.UnixMilli(), now, limits.queryPriority) - assert.Equal(t, int64(testData.expectedPriority), priority) - }) - } -} - -func Test_GetPriorityShouldNotConsiderStartAndEndTimeIfEmpty(t *testing.T) { - now := time.Now() - limits := mockLimits{queryPriority: validation.QueryPriority{ - Enabled: true, - Priorities: []validation.PriorityDef{ - { - Priority: 1, - QueryAttributes: []validation.QueryAttribute{ - { - Regex: "^sum\\(up\\)$", - }, - }, - }, - }, - }} - - type testCase struct { - start time.Time - end time.Time - } - - tests := map[string]testCase{ - "should hit with future time": { - start: now, - end: now.Add(1000000 * time.Hour), - }, - "should hit with very old time": { - start: now.Add(-1000000 * time.Hour), - end: now, - }, - "should hit with very wide time window": { - start: now.Add(-1000000 * time.Hour), - end: now.Add(1000000 * time.Hour), - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - priority := GetPriority("sum(up)", testData.start.Unix(), testData.end.Unix(), now, limits.queryPriority) - assert.Equal(t, int64(1), priority) - }) - } -} diff --git a/pkg/querier/tripperware/query_attribute_matcher.go b/pkg/querier/tripperware/query_attribute_matcher.go new file mode 100644 index 00000000000..de92d5e2c65 --- /dev/null +++ b/pkg/querier/tripperware/query_attribute_matcher.go @@ -0,0 +1,228 @@ +package tripperware + +import ( + "net/http" + "strings" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/promql/parser" + "github.com/weaveworks/common/httpgrpc" + + "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +const QueryRejectErrorMessage = "This query has been rejected by the service operator." + +func rejectQueryOrSetPriority(r *http.Request, now time.Time, lookbackDelta time.Duration, limits Limits, userStr string, rejectedQueriesPerTenant *prometheus.CounterVec) error { + if limits == nil || !(limits.QueryPriority(userStr).Enabled || limits.QueryRejection(userStr).Enabled) { + return nil + } + op := getOperation(r) + + if op == "query" || op == "query_range" { + query := r.FormValue("query") + expr, err := parser.ParseExpr(query) + if err != nil { + return httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + minTime, maxTime := util.FindMinMaxTime(r, expr, lookbackDelta, now) + + if queryReject := limits.QueryRejection(userStr); queryReject.Enabled && query != "" { + for _, attribute := range queryReject.QueryAttributes { + if matchAttributeForExpressionQuery(attribute, op, r, query, now, minTime, maxTime) { + rejectedQueriesPerTenant.WithLabelValues(op, userStr).Inc() + return httpgrpc.Errorf(http.StatusUnprocessableEntity, QueryRejectErrorMessage) + } + } + } + + reqStats := stats.FromContext(r.Context()) + reqStats.SetDataSelectMaxTime(maxTime) + reqStats.SetDataSelectMinTime(minTime) + + if queryPriority := limits.QueryPriority(userStr); queryPriority.Enabled && len(queryPriority.Priorities) != 0 && query != "" { + for _, priority := range queryPriority.Priorities { + for _, attribute := range priority.QueryAttributes { + if matchAttributeForExpressionQuery(attribute, op, r, query, now, minTime, maxTime) { + reqStats.SetPriority(priority.Priority) + return nil + } + } + } + reqStats.SetPriority(queryPriority.DefaultPriority) + } + } + + if queryReject := limits.QueryRejection(userStr); queryReject.Enabled && (op == "series" || op == "labels" || op == "label_values") { + for _, attribute := range queryReject.QueryAttributes { + if matchAttributeForMetadataQuery(attribute, op, r, now) { + rejectedQueriesPerTenant.WithLabelValues(op, userStr).Inc() + return httpgrpc.Errorf(http.StatusUnprocessableEntity, QueryRejectErrorMessage) + } + } + } + + return nil +} + +func getOperation(r *http.Request) string { + switch { + case strings.HasSuffix(r.URL.Path, "/query"): + return "query" + case strings.HasSuffix(r.URL.Path, "/query_range"): + return "query_range" + case strings.HasSuffix(r.URL.Path, "/series"): + return "series" + case strings.HasSuffix(r.URL.Path, "/labels"): + return "labels" + case strings.HasSuffix(r.URL.Path, "/values"): + return "label_values" + default: + return "other" + } +} + +func matchAttributeForExpressionQuery(attribute validation.QueryAttribute, op string, r *http.Request, query string, now time.Time, minTime, maxTime int64) bool { + if attribute.ApiType != "" && attribute.ApiType != op { + return false + } + if attribute.Regex != "" && attribute.Regex != ".*" && attribute.Regex != ".+" { + if attribute.CompiledRegex != nil && !attribute.CompiledRegex.MatchString(query) { + return false + } + } + + if !isWithinTimeAttributes(attribute.TimeWindow, now, minTime, maxTime) { + return false + } + + if !isWithinTimeRangeAttribute(attribute.TimeRangeLimit, minTime, maxTime) { + return false + } + + if op == "query_range" && !isWithinQueryStepLimit(attribute.QueryStepLimit, r) { + return false + } + + if attribute.UserAgentRegex != "" && attribute.UserAgentRegex != ".*" && attribute.CompiledUserAgentRegex != nil { + if !attribute.CompiledUserAgentRegex.MatchString(r.Header.Get("User-Agent")) { + return false + } + } + + if attribute.DashboardUID != "" && attribute.DashboardUID != r.Header.Get("X-Dashboard-Uid") { + return false + } + + if attribute.PanelID != "" && attribute.PanelID != r.Header.Get("X-Panel-Id") { + return false + } + + return true +} + +func matchAttributeForMetadataQuery(attribute validation.QueryAttribute, op string, r *http.Request, now time.Time) bool { + if attribute.ApiType != "" && attribute.ApiType != op { + return false + } + if err := r.ParseForm(); err != nil { + return false + } + if attribute.Regex != "" && attribute.Regex != ".*" && attribute.CompiledRegex != nil { + atLeastOneMatched := false + for _, matcher := range r.Form["match[]"] { + if attribute.CompiledRegex.MatchString(matcher) { + atLeastOneMatched = true + break + } + } + if !atLeastOneMatched { + return false + } + } + + startTime, _ := util.ParseTime(r.FormValue("start")) + endTime, _ := util.ParseTime(r.FormValue("end")) + + if !isWithinTimeAttributes(attribute.TimeWindow, now, startTime, endTime) { + return false + } + + if !isWithinTimeRangeAttribute(attribute.TimeRangeLimit, startTime, endTime) { + return false + } + + if attribute.UserAgentRegex != "" && attribute.UserAgentRegex != ".*" && attribute.CompiledUserAgentRegex != nil { + if !attribute.CompiledUserAgentRegex.MatchString(r.Header.Get("User-Agent")) { + return false + } + } + + return true +} + +func isWithinTimeAttributes(timeWindow validation.TimeWindow, now time.Time, startTime, endTime int64) bool { + if timeWindow.Start == 0 && timeWindow.End == 0 { + return true + } + + if timeWindow.Start != 0 { + startTimeThreshold := now.Add(-1 * time.Duration(timeWindow.Start).Abs()).Add(-1 * time.Minute).Truncate(time.Minute).UnixMilli() + if startTime == 0 || startTime < startTimeThreshold { + return false + } + } + + if timeWindow.End != 0 { + endTimeThreshold := now.Add(-1 * time.Duration(timeWindow.End).Abs()).Add(1 * time.Minute).Truncate(time.Minute).UnixMilli() + if endTime == 0 || endTime > endTimeThreshold { + return false + } + } + + return true +} + +func isWithinTimeRangeAttribute(limit validation.TimeRangeLimit, startTime, endTime int64) bool { + if limit.Min == 0 && limit.Max == 0 { + return true + } + + if startTime == 0 || endTime == 0 { + return false + } + + timeRangeInMillis := endTime - startTime + + if limit.Min != 0 && time.Duration(limit.Min).Milliseconds() > timeRangeInMillis { + return false + } + if limit.Max != 0 && time.Duration(limit.Max).Milliseconds() < timeRangeInMillis { + return false + } + + return true +} + +func isWithinQueryStepLimit(queryStepLimit validation.QueryStepLimit, r *http.Request) bool { + if queryStepLimit.Min == 0 && queryStepLimit.Max == 0 { + return true + } + + step, err := util.ParseDurationMs(r.FormValue("step")) + if err != nil { + return false + } + + if queryStepLimit.Min != 0 && time.Duration(queryStepLimit.Min).Milliseconds() > step { + return false + } + if queryStepLimit.Max != 0 && time.Duration(queryStepLimit.Max).Milliseconds() < step { + return false + } + + return true +} diff --git a/pkg/querier/tripperware/query_attribute_matcher_test.go b/pkg/querier/tripperware/query_attribute_matcher_test.go new file mode 100644 index 00000000000..dc3c204dd9c --- /dev/null +++ b/pkg/querier/tripperware/query_attribute_matcher_test.go @@ -0,0 +1,647 @@ +package tripperware + +import ( + "context" + "fmt" + "net/http" + "net/url" + "regexp" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" + + "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +var rejectedQueriesPerTenant = prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"op", "user"}) + +func Test_rejectQueryOrSetPriorityShouldReturnDefaultPriorityIfNotEnabledOrInvalidQueryString(t *testing.T) { + + limits := mockLimits{queryPriority: validation.QueryPriority{ + Priorities: []validation.PriorityDef{ + { + Priority: 1, + QueryAttributes: []validation.QueryAttribute{ + { + Regex: ".*", + CompiledRegex: regexp.MustCompile(".*"), + }, + }, + }, + }, + }, + queryRejection: validation.QueryRejection{ + Enabled: false, + }, + } + + type testCase struct { + queryPriorityEnabled bool + queryRejectionEnabled bool + path string + expectedError error + expectedPriority int64 + } + + tests := map[string]testCase{ + "should miss if query priority/rejection not enabled": { + path: "/api/v1/query?time=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29", + }, + "should throw parse error if query string empty": { + queryPriorityEnabled: true, + queryRejectionEnabled: true, + path: "/api/v1/query?time=1536716898&query=", + expectedError: httpgrpc.Errorf(http.StatusBadRequest, "unknown position: parse error: no expression found in input"), + }, + "should miss if it's metadata query and only priority is enabled": { + queryPriorityEnabled: true, + queryRejectionEnabled: false, + path: "/api/v1/labels?match[]", + }, + "should set priority if regex match and rejection disabled": { + queryPriorityEnabled: true, + queryRejectionEnabled: false, + path: "/api/v1/query?time=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29", + expectedPriority: int64(1), + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + req, err := http.NewRequest("GET", testData.path, http.NoBody) + require.NoError(t, err) + reqStats, ctx := stats.ContextWithEmptyStats(context.Background()) + req = req.WithContext(ctx) + limits.queryPriority.Enabled = testData.queryPriorityEnabled + limits.queryRejection.Enabled = testData.queryRejectionEnabled + resultErr := rejectQueryOrSetPriority(req, time.Now(), time.Duration(1), limits, "", rejectedQueriesPerTenant) + assert.Equal(t, testData.expectedError, resultErr) + assert.Equal(t, testData.expectedPriority, reqStats.Priority) + }) + } +} + +func Test_rejectQueryOrSetPriorityShouldRejectIfMatches(t *testing.T) { + now := time.Now() + limits := mockLimits{ + queryRejection: validation.QueryRejection{ + Enabled: false, + QueryAttributes: []validation.QueryAttribute{}, + }, + } + + type testCase struct { + queryRejectionEnabled bool + path string + expectedError error + expectedPriority int64 + rejectQueryAttribute validation.QueryAttribute + } + + tests := map[string]testCase{ + + "should not reject if query rejection not enabled": { + queryRejectionEnabled: false, + path: "/api/v1/query_range?start=1536716898&end=1536729898&step=7s&query=avg_over_time%28rate%28node_cpu_seconds_total%5B1m%5D%29%5B10m%3A5s%5D%29", + expectedError: nil, + rejectQueryAttribute: validation.QueryAttribute{ + Regex: ".*", + CompiledRegex: regexp.MustCompile(".*"), + }, + }, + + "should reject if query rejection enabled with all query match regex": { + queryRejectionEnabled: true, + path: "/api/v1/query_range?start=1536716898&end=1536729898&step=7s&query=avg_over_time%28rate%28node_cpu_seconds_total%5B1m%5D%29%5B10m%3A5s%5D%29", + expectedError: httpgrpc.Errorf(http.StatusUnprocessableEntity, QueryRejectErrorMessage), + rejectQueryAttribute: validation.QueryAttribute{ + Regex: ".*", + CompiledRegex: regexp.MustCompile(".*"), + }, + }, + + "should reject if query rejection enabled with step limit and query match": { + queryRejectionEnabled: true, + path: "/api/v1/query_range?start=1536716898&end=1536729898&step=7s&query=count%28sum%28up%29%29", //count(sum(up)) + expectedError: httpgrpc.Errorf(http.StatusUnprocessableEntity, QueryRejectErrorMessage), + rejectQueryAttribute: validation.QueryAttribute{ + QueryStepLimit: validation.QueryStepLimit{ + Min: model.Duration(time.Second * 5), + Max: model.Duration(time.Minute * 2), + }, + }, + }, + + "should reject if query rejection enabled with min step limit and query match": { + queryRejectionEnabled: true, + path: "/api/v1/query_range?start=1536716898&end=1536729898&step=7m&query=count%28sum%28up%29%29", //count(sum(up)) + expectedError: httpgrpc.Errorf(http.StatusUnprocessableEntity, QueryRejectErrorMessage), + rejectQueryAttribute: validation.QueryAttribute{ + QueryStepLimit: validation.QueryStepLimit{ + Min: model.Duration(time.Minute * 5), + }, + }, + }, + + "should reject if query rejection enabled with step limit and subQuery step match": { + queryRejectionEnabled: true, + path: "/api/v1/query?time=1536716898&query=avg_over_time%28rate%28node_cpu_seconds_total%5B1m%5D%29%5B10m%3A5s%5D%29", //avg_over_time(rate(node_cpu_seconds_total[1m])[10m:5s]) + expectedError: httpgrpc.Errorf(http.StatusUnprocessableEntity, QueryRejectErrorMessage), + rejectQueryAttribute: validation.QueryAttribute{ + QueryStepLimit: validation.QueryStepLimit{ + Min: model.Duration(time.Second * 5), + Max: model.Duration(time.Minute * 2), + }, + }, + }, + + "should ignore step limit for instant query, and reject if other properties of query_attribute matches": { + queryRejectionEnabled: true, + path: "/api/v1/query?time=1536716898&query=avg_over_time%28rate%28node_cpu_seconds_total%5B1m%5D%29%5B10m%3A5s%5D%29", //avg_over_time(rate(node_cpu_seconds_total[1m])[10m:5s]) + expectedError: httpgrpc.Errorf(http.StatusUnprocessableEntity, QueryRejectErrorMessage), + rejectQueryAttribute: validation.QueryAttribute{ + Regex: ".*over_time.*", + QueryStepLimit: validation.QueryStepLimit{ + Min: model.Duration(time.Second * 6), + Max: model.Duration(time.Minute * 2), + }, + }, + }, + + "should reject if query rejection enabled with time window matching": { + queryRejectionEnabled: true, + path: fmt.Sprintf("/api/v1/query_range?start=%d&end=%d&step=7s&query=%s", now.Add(-30*time.Minute).UnixMilli()/1000, now.Add(-20*time.Minute).UnixMilli()/1000, url.QueryEscape("count(sum(up))")), + expectedError: httpgrpc.Errorf(http.StatusUnprocessableEntity, QueryRejectErrorMessage), + rejectQueryAttribute: validation.QueryAttribute{ + TimeWindow: validation.TimeWindow{ + Start: model.Duration(45 * time.Minute), + End: model.Duration(15 * time.Minute), + }, + }, + }, + + "should reject if query rejection api matches and regex matches match[] of series request and time window": { + queryRejectionEnabled: true, + path: fmt.Sprintf("/api/v1/series?start=%d&end=%d&step=7s&match[]=%s", now.Add(-30*time.Minute).UnixMilli()/1000, now.Add(-20*time.Minute).UnixMilli()/1000, url.QueryEscape("count(sum(up))")), + expectedError: httpgrpc.Errorf(http.StatusUnprocessableEntity, QueryRejectErrorMessage), + rejectQueryAttribute: validation.QueryAttribute{ + ApiType: "series", + Regex: ".*sum.*", + CompiledRegex: regexp.MustCompile(".*sum.*"), + TimeWindow: validation.TimeWindow{ + Start: model.Duration(45 * time.Minute), + End: model.Duration(15 * time.Minute), + }, + }, + }, + + "should not reject if query api_type doesn't match matches": { + queryRejectionEnabled: true, + path: fmt.Sprintf("/api/v1/series?start=%d&end=%d&step=7s&match[]=%s", now.Add(-30*time.Minute).UnixMilli()/1000, now.Add(-20*time.Minute).UnixMilli()/1000, url.QueryEscape("count(sum(up))")), + expectedError: nil, + rejectQueryAttribute: validation.QueryAttribute{ + ApiType: "query", + Regex: ".*sum.*", + CompiledRegex: regexp.MustCompile(".*sum.*"), + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + req, err := http.NewRequest("GET", testData.path, http.NoBody) + require.NoError(t, err) + reqStats, ctx := stats.ContextWithEmptyStats(context.Background()) + req = req.WithContext(ctx) + limits.queryRejection.Enabled = testData.queryRejectionEnabled + limits.queryRejection.QueryAttributes = []validation.QueryAttribute{testData.rejectQueryAttribute} + resultErr := rejectQueryOrSetPriority(req, time.Now(), time.Duration(1), limits, "", rejectedQueriesPerTenant) + assert.Equal(t, testData.expectedError, resultErr) + assert.Equal(t, testData.expectedPriority, reqStats.Priority) + }) + } +} + +func Test_matchAttributeForExpressionQueryShouldMatchRegex(t *testing.T) { + queryAttribute := validation.QueryAttribute{} + + type testCase struct { + regex string + query string + result bool + } + + tests := map[string]testCase{ + "should hit if regex matches": { + regex: "(^sum|c(.+)t)", + query: "sum(up)", + result: true, + }, + "should miss if regex doesn't match": { + regex: "(^sum|c(.+)t)", + query: "min(up)", + }, + "should hit if regex matches - .*": { + regex: ".*", + query: "count(sum(up))", + result: true, + }, + "should hit if regex matches - .+": { + regex: ".+", + query: "count(sum(up))", + result: true, + }, + "should hit if regex is an empty string": { + regex: "", + query: "sum(up)", + result: true, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + queryAttribute.Regex = testData.regex + queryAttribute.CompiledRegex = regexp.MustCompile(testData.regex) + priority := matchAttributeForExpressionQuery(queryAttribute, "query_range", &http.Request{}, testData.query, time.Time{}, 0, 0) + assert.Equal(t, testData.result, priority) + }) + } + +} + +func Test_isWithinTimeAttributes(t *testing.T) { + now := time.Now() + + timeWindow := validation.TimeWindow{ + Start: model.Duration(45 * time.Minute), + End: model.Duration(15 * time.Minute), + } + + type testCase struct { + timeWindow validation.TimeWindow + start time.Time + end time.Time + expectedResult bool + } + + tests := map[string]testCase{ + "should hit between start and end time": { + timeWindow: timeWindow, + start: now.Add(-40 * time.Minute), + end: now.Add(-20 * time.Minute), + expectedResult: true, + }, + "should hit equal to start and end time": { + timeWindow: timeWindow, + start: now.Add(-45 * time.Minute), + end: now.Add(-15 * time.Minute), + expectedResult: true, + }, + "should miss outside of start time": { + timeWindow: timeWindow, + start: now.Add(-50 * time.Minute), + end: now.Add(-15 * time.Minute), + expectedResult: false, + }, + "should miss completely outside of start time": { + timeWindow: timeWindow, + start: now.Add(-50 * time.Minute), + end: now.Add(-45 * time.Minute), + expectedResult: false, + }, + "should miss outside of end time": { + timeWindow: timeWindow, + start: now.Add(-45 * time.Minute), + end: now.Add(-10 * time.Minute), + expectedResult: false, + }, + "should miss completely outside of end time": { + timeWindow: timeWindow, + start: now.Add(-15 * time.Minute), + end: now.Add(-10 * time.Minute), + expectedResult: false, + }, + "should not consider on empty start and end time": { + start: now.Add(-15 * time.Minute), + end: now.Add(-10 * time.Minute), + expectedResult: true, + }, + "should not consider start on empty start limit": { + timeWindow: validation.TimeWindow{ + End: model.Duration(15 * time.Minute), + }, + start: now.Add(-50 * time.Minute), + end: now.Add(-20 * time.Minute), + expectedResult: true, + }, + "should not consider end on empty end limit": { + timeWindow: validation.TimeWindow{ + Start: model.Duration(45 * time.Minute), + }, + start: now.Add(-40 * time.Minute), + end: now.Add(-10 * time.Minute), + expectedResult: true, + }, + "should miss if start time for query is missing but start for limits exists": { + timeWindow: validation.TimeWindow{ + Start: model.Duration(45 * time.Minute), + End: model.Duration(15 * time.Minute), + }, + start: time.UnixMilli(0), + end: now.Add(-20 * time.Minute), + expectedResult: false, + }, + "should miss if end time for query is missing but end for limits exists": { + timeWindow: validation.TimeWindow{ + Start: model.Duration(45 * time.Minute), + End: model.Duration(15 * time.Minute), + }, + start: now.Add(-40 * time.Minute), + end: time.UnixMilli(0), + expectedResult: false, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + priority := isWithinTimeAttributes(testData.timeWindow, now, testData.start.UnixMilli(), testData.end.UnixMilli()) + assert.Equal(t, testData.expectedResult, priority) + }) + } +} + +func Test_isWithinTimeRangeAttribute(t *testing.T) { + now := time.Now().UnixMilli() + + timeRangeLimit := validation.TimeRangeLimit{ + Min: model.Duration(12 * time.Hour), + Max: model.Duration(15 * 24 * time.Hour), + } + + type testCase struct { + timeRangeLimit validation.TimeRangeLimit + startTime int64 + endTime int64 + expectedResult bool + } + + tests := map[string]testCase{ + "valid if within timeRange": { + timeRangeLimit: timeRangeLimit, + startTime: now - 20*time.Hour.Milliseconds(), + endTime: now, + expectedResult: true, + }, + "valid if queryTimeRange is equal to the limit": { + timeRangeLimit: timeRangeLimit, + startTime: now - 12*time.Hour.Milliseconds(), + endTime: now, + expectedResult: true, + }, + "not valid if queryTimeRange is smaller than the limit min": { + timeRangeLimit: timeRangeLimit, + startTime: now - 11*time.Hour.Milliseconds(), + endTime: now, + expectedResult: false, + }, + "not valid if queryTimeRange is bigger than the limit max": { + timeRangeLimit: timeRangeLimit, + startTime: now - 35*24*time.Hour.Milliseconds(), + endTime: now, + expectedResult: false, + }, + "valid if max is not provided and queryRange is bigger than min": { + timeRangeLimit: validation.TimeRangeLimit{ + Min: model.Duration(12 * time.Hour), + }, + startTime: now - 35*24*time.Hour.Milliseconds(), + endTime: now, + expectedResult: true, + }, + "valid if min is not provided and queryRange is smaller than max": { + timeRangeLimit: validation.TimeRangeLimit{ + Max: model.Duration(15 * 24 * time.Hour), + }, + startTime: now - 14*24*time.Hour.Milliseconds(), + endTime: now, + expectedResult: true, + }, + "not valid if limit provided but query doesn't have range (missing startTime or endTime)": { + timeRangeLimit: validation.TimeRangeLimit{ + Max: model.Duration(15 * 24 * time.Hour), + }, + startTime: now, + expectedResult: false, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + priority := isWithinTimeRangeAttribute(testData.timeRangeLimit, testData.startTime, testData.endTime) + assert.Equal(t, testData.expectedResult, priority) + }) + } +} + +func Test_isWithinQueryStepLimit(t *testing.T) { + + queryStepLimit := validation.QueryStepLimit{ + Min: model.Duration(time.Second * 5), + Max: model.Duration(time.Minute * 2), + } + + type testCase struct { + step string + queryString string + queryStepLimit validation.QueryStepLimit + expectedResult bool + } + + tests := map[string]testCase{ + "query within the limits if no limits was defined for steps": { + queryString: "count(sum(up))", + step: "15s", + expectedResult: true, + }, + "query should be considered outside of the step limit if query doesn't have steps": { + queryString: "count(sum(up))", + step: "not_parseable", + queryStepLimit: queryStepLimit, + }, + "should match if step limit set and step is within the range": { + step: "15s", + queryString: "count(sum(up))", + queryStepLimit: queryStepLimit, + expectedResult: true, + }, + "should not match if min step limit set and step is size is smaller": { + step: "4s", + queryString: "count(sum(up))", + queryStepLimit: queryStepLimit, + expectedResult: false, + }, + "should not match if max step limit set and step is size is bigger": { + step: "3m", + queryString: "count(sum(up))", + queryStepLimit: queryStepLimit, + expectedResult: false, + }, + "should match if step limit set is within the range and should ignore subquery step even it's outside the range": { + step: "15s", + queryString: "up[60m:5m]", + queryStepLimit: queryStepLimit, + expectedResult: true, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + params := url.Values{} + if testData.step != "" { + params.Add("step", testData.step) + } + req, err := http.NewRequest("POST", "/query?"+params.Encode(), http.NoBody) + require.NoError(t, err) + + require.NoError(t, err) + + result := isWithinQueryStepLimit(testData.queryStepLimit, req) + assert.Equal(t, testData.expectedResult, result) + }) + } +} + +func Test_matchAttributeForExpressionQueryHeadersShouldBeCheckedIfSet(t *testing.T) { + + type testCase struct { + headers http.Header + queryAttribute validation.QueryAttribute + expectedResult bool + } + + tests := map[string]testCase{ + "should not check any of them if attributes are empty (match)": { + expectedResult: true, + }, + "should not check if attributes are empty even corresponding headers exist (match)": { + headers: http.Header{ + "X-Dashboard-Uid": {"dashboard-uid"}, + "X-Panel-Id": {"panel-id"}, + }, + expectedResult: true, + }, + "should match all attributes if all set and all headers provided": { + headers: http.Header{ + "X-Dashboard-Uid": {"dashboard-uid"}, + "X-Panel-Id": {"panel-id"}, + }, + queryAttribute: validation.QueryAttribute{ + DashboardUID: "dashboard-uid", + PanelID: "panel-id", + }, + expectedResult: true, + }, + "should not match if headers are missing for provided attributes ": { + headers: http.Header{ + "X-Dashboard-Uid": {"dashboard-uid"}, + }, + queryAttribute: validation.QueryAttribute{ + DashboardUID: "dashboard-uid", + PanelID: "panel-id", + }, + }, + "should not match if both attribute and header is set but does not match ": { + headers: http.Header{ + "X-Panel-Id": {"panel123"}, + }, + queryAttribute: validation.QueryAttribute{ + PanelID: "panel-id", + }, + }, + "should not compare if values are empty (match)": { + headers: http.Header{ + "X-Panel-Id": {""}, + }, + queryAttribute: validation.QueryAttribute{ + PanelID: "", + }, + expectedResult: true, + }, + "should match if headers match provided attributes ": { + headers: http.Header{ + "X-Dashboard-Uid": {"dashboard-uid"}, + "X-Panel-Id": {"pane"}, + }, + queryAttribute: validation.QueryAttribute{ + PanelID: "pane", + }, + expectedResult: true, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + req, err := http.NewRequest("GET", "/", http.NoBody) + require.NoError(t, err) + req.Header = testData.headers + + result := matchAttributeForExpressionQuery(testData.queryAttribute, "query_range", req, "", time.Time{}, 0, 0) + assert.Equal(t, testData.expectedResult, result) + }) + } +} + +func Test_matchAttributeForExpressionQueryShouldMatchUserAgentRegex(t *testing.T) { + + type testCase struct { + userAgentRegex string + userAgentHeader string + result bool + } + + tests := map[string]testCase{ + "should hit if regex matches": { + userAgentRegex: "(^grafana-agent|prometheus-(.*)client(.+))", + userAgentHeader: "prometheus-client-go/v0.9.3", + result: true, + }, + "should miss if regex doesn't match": { + userAgentRegex: "(^grafana-agent|prometheus-(.*)client(.+))", + userAgentHeader: "loki", + }, + "should hit if regex matches - .*": { + userAgentRegex: ".*", + userAgentHeader: "grafana-agent/v0.19.0", + result: true, + }, + "should hit if regex is an empty string": { + userAgentRegex: "", + userAgentHeader: "grafana-agent/v0.19.0", + result: true, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + req, err := http.NewRequest("GET", "/", http.NoBody) + require.NoError(t, err) + req.Header = http.Header{ + "User-Agent": {testData.userAgentHeader}, + } + queryAttribute := validation.QueryAttribute{ + UserAgentRegex: testData.userAgentRegex, + CompiledUserAgentRegex: regexp.MustCompile(testData.userAgentRegex), + } + + result := matchAttributeForExpressionQuery(queryAttribute, "query_range", req, "", time.Time{}, 0, 0) + assert.Equal(t, testData.result, result) + }) + } + +} diff --git a/pkg/querier/tripperware/queryrange/limits_test.go b/pkg/querier/tripperware/queryrange/limits_test.go index f20db72ae5f..5ee78fbf0bb 100644 --- a/pkg/querier/tripperware/queryrange/limits_test.go +++ b/pkg/querier/tripperware/queryrange/limits_test.go @@ -262,6 +262,10 @@ func (m mockLimits) QueryPriority(userID string) validation.QueryPriority { return validation.QueryPriority{} } +func (m mockLimits) QueryRejection(userID string) validation.QueryRejection { + return validation.QueryRejection{} +} + type mockHandler struct { mock.Mock } diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 00e7ea2928d..82eb0c12fc6 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "io" - "math" "net/http" "net/url" "sort" @@ -181,7 +180,7 @@ func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwa return nil, errEndBeforeStart } - result.Step, err = parseDurationMs(r.FormValue("step")) + result.Step, err = util.ParseDurationMs(r.FormValue("step")) if err != nil { return nil, decorateWithParamName(err, "step") } @@ -376,20 +375,6 @@ func matrixMerge(ctx context.Context, resps []*PrometheusResponse) ([]tripperwar return result, nil } -func parseDurationMs(s string) (int64, error) { - if d, err := strconv.ParseFloat(s, 64); err == nil { - ts := d * float64(time.Second/time.Millisecond) - if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) { - return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid duration. It overflows int64", s) - } - return int64(ts), nil - } - if d, err := model.ParseDuration(s); err == nil { - return int64(d) / int64(time.Millisecond/time.Nanosecond), nil - } - return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid duration", s) -} - func encodeDurationMs(d int64) string { return strconv.FormatFloat(float64(d)/float64(time.Second/time.Millisecond), 'f', -1, 64) } diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index fc2501b303c..4edcd51cc9b 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -27,12 +27,10 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/prometheus/promql/parser" "github.com/thanos-io/thanos/pkg/querysharding" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" - "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -113,6 +111,11 @@ func NewQueryTripperware( Help: "Total queries sent per tenant.", }, []string{"op", "user"}) + rejectedQueriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_frontend_rejected_queries_total", + Help: "Total rejected queries per tenant.", + }, []string{"op", "user"}) + activeUsers := util.NewActiveUsersCleanupWithDefaultValues(func(user string) { err := util.DeleteMatchingLabels(queriesPerTenant, map[string]string{"user": user}) if err != nil { @@ -149,30 +152,16 @@ func NewQueryTripperware( activeUsers.UpdateUserTimestamp(userStr, now) queriesPerTenant.WithLabelValues(op, userStr).Inc() - if isQuery || isQueryRange { + if maxSubQuerySteps > 0 && (isQuery || isQueryRange) { query := r.FormValue("query") - - if maxSubQuerySteps > 0 { - // Check subquery step size. - if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, maxSubQuerySteps); err != nil { - return nil, err - } - } - - expr, err := parser.ParseExpr(query) - if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + // Check subquery step size. + if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, maxSubQuerySteps); err != nil { + return nil, err } + } - reqStats := stats.FromContext(r.Context()) - minTime, maxTime := util.FindMinMaxTime(r, expr, lookbackDelta, now) - reqStats.SetDataSelectMaxTime(maxTime) - reqStats.SetDataSelectMinTime(minTime) - - if limits != nil && limits.QueryPriority(userStr).Enabled { - priority := GetPriority(query, minTime, maxTime, now, limits.QueryPriority(userStr)) - reqStats.SetPriority(priority) - } + if err := rejectQueryOrSetPriority(r, now, lookbackDelta, limits, userStr, rejectedQueriesPerTenant); err != nil { + return nil, err } if isQueryRange { diff --git a/pkg/querier/tripperware/test_shard_by_query_utils.go b/pkg/querier/tripperware/test_shard_by_query_utils.go index e8c93709927..2fbd2a48c57 100644 --- a/pkg/querier/tripperware/test_shard_by_query_utils.go +++ b/pkg/querier/tripperware/test_shard_by_query_utils.go @@ -478,6 +478,7 @@ type mockLimits struct { maxCacheFreshness time.Duration shardSize int queryPriority validation.QueryPriority + queryRejection validation.QueryRejection } func (m mockLimits) MaxQueryLookback(string) time.Duration { @@ -504,6 +505,10 @@ func (m mockLimits) QueryPriority(userID string) validation.QueryPriority { return m.queryPriority } +func (m mockLimits) QueryRejection(userID string) validation.QueryRejection { + return m.queryRejection +} + type singleHostRoundTripper struct { host string next http.RoundTripper diff --git a/pkg/util/time.go b/pkg/util/time.go index bb8acc9fc98..3f19a71da9e 100644 --- a/pkg/util/time.go +++ b/pkg/util/time.go @@ -226,3 +226,17 @@ func (t *SlottedTicker) nextInterval() time.Duration { slotSize := t.d / time.Duration(totalSlots) return time.Until(lastStartTime) + PositiveJitter(slotSize, t.slotJitter) } + +func ParseDurationMs(s string) (int64, error) { + if d, err := strconv.ParseFloat(s, 64); err == nil { + ts := d * float64(time.Second/time.Millisecond) + if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) { + return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid duration. It overflows int64", s) + } + return int64(ts), nil + } + if d, err := model.ParseDuration(s); err == nil { + return int64(d) / int64(time.Millisecond/time.Nanosecond), nil + } + return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid duration", s) +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index a0f53aaabcb..4586a24622d 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -63,13 +63,25 @@ type QueryPriority struct { type PriorityDef struct { Priority int64 `yaml:"priority" json:"priority" doc:"nocli|description=Priority level. Must be a unique value.|default=0"` ReservedQueriers float64 `yaml:"reserved_queriers" json:"reserved_queriers" doc:"nocli|description=Number of reserved queriers to handle priorities higher or equal to the priority level. Value between 0 and 1 will be used as a percentage.|default=0"` - QueryAttributes []QueryAttribute `yaml:"query_attributes" json:"query_attributes" doc:"nocli|description=List of query attributes to assign the priority."` + QueryAttributes []QueryAttribute `yaml:"query_attributes" json:"query_attributes" doc:"nocli|description=List of query_attributes to match and assign priority to queries. A query is assigned to this priority if it matches any query_attribute in this list. Each query_attribute has several properties (e.g., regex, time_window, user_agent), and all specified properties must match for a query_attribute to be considered a match. Only the specified properties are checked, and an AND operator is applied to them."` +} + +type QueryRejection struct { + Enabled bool `yaml:"enabled" json:"enabled"` + QueryAttributes []QueryAttribute `yaml:"query_attributes" json:"query_attributes" doc:"nocli|description=List of query_attributes to match and reject queries. A query is rejected if it matches any query_attribute in this list. Each query_attribute has several properties (e.g., regex, time_window, user_agent), and all specified properties must match for a query_attribute to be considered a match. Only the specified properties are checked, and an AND operator is applied to them."` } type QueryAttribute struct { - Regex string `yaml:"regex" json:"regex" doc:"nocli|description=Regex that the query string should match. If not set, it won't be checked."` - TimeWindow TimeWindow `yaml:"time_window" json:"time_window" doc:"nocli|description=Overall data select time window (including range selectors, modifiers and lookback delta) that the query should be within. If not set, it won't be checked."` - CompiledRegex *regexp.Regexp + ApiType string `yaml:"api_type" json:"api_type" doc:"nocli|description=API type for the query. Should be one of the query, query_range, series, labels, label_values. If not set, it won't be checked."` + Regex string `yaml:"regex" json:"regex" doc:"nocli|description=Regex that the query string (or at least one of the matchers in metadata query) should match. If not set, it won't be checked."` + TimeWindow TimeWindow `yaml:"time_window" json:"time_window" doc:"nocli|description=Overall data select time window (including range selectors, modifiers and lookback delta) that the query should be within. If not set, it won't be checked."` + TimeRangeLimit TimeRangeLimit `yaml:"time_range_limit" json:"time_range_limit" doc:"nocli|description=Query time range should be within this limit to match. Depending on where it was used, in most of the use-cases, either min or max value will be used. If not set, it won't be checked."` + QueryStepLimit QueryStepLimit `yaml:"query_step_limit" json:"query_step_limit" doc:"nocli|description=If query step provided should be within this limit to match. If not set, it won't be checked. This property only applied to range queries and ignored for other types of queries."` + UserAgentRegex string `yaml:"user_agent_regex" json:"user_agent_regex" doc:"nocli|description=Regex that User-Agent header of the request should match. If not set, it won't be checked."` + DashboardUID string `yaml:"dashboard_uid" json:"dashboard_uid" doc:"nocli|description=Grafana includes X-Dashboard-Uid header in query requests. If this field is provided then X-Dashboard-Uid header of request should match this value. If not set, it won't be checked. This property won't be applied to metadata queries."` + PanelID string `yaml:"panel_id" json:"panel_id" doc:"nocli|description=Grafana includes X-Panel-Id header in query requests. If this field is provided then X-Panel-Id header of request should match this value. If not set, it won't be checked. This property won't be applied to metadata queries."` + CompiledRegex *regexp.Regexp + CompiledUserAgentRegex *regexp.Regexp } type TimeWindow struct { @@ -77,6 +89,16 @@ type TimeWindow struct { End model.Duration `yaml:"end" json:"end" doc:"nocli|description=End of the data select time window (including range selectors, modifiers and lookback delta) that the query should be within. If set to 0, it won't be checked.|default=0"` } +type TimeRangeLimit struct { + Min model.Duration `yaml:"min" json:"min" doc:"nocli|description=This will be duration (12h, 1d, 15d etc.). Query time range should be above or equal to this value to match. Ex: if this value is 20d, then queries whose range is bigger than or equal to 20d will match. If set to 0, it won't be checked.|default=0"` + Max model.Duration `yaml:"max" json:"max" doc:"nocli|description=This will be duration (12h, 1d, 15d etc.). Query time range should be below or equal to this value to match. Ex: if this value is 24h, then queries whose range is smaller than or equal to 24h will match.If set to 0, it won't be checked.|default=0"` +} + +type QueryStepLimit struct { + Min model.Duration `yaml:"min" json:"min" doc:"nocli|description=Query step should be above or equal to this value to match. If set to 0, it won't be checked.|default=0"` + Max model.Duration `yaml:"max" json:"max" doc:"nocli|description=Query step should be below or equal to this value to match. If set to 0, it won't be checked.|default=0"` +} + type LimitsPerLabelSetEntry struct { MaxSeries int `yaml:"max_series" json:"max_series" doc:"nocli|description=The maximum number of active series per LabelSet, across the cluster before replication. Setting the value 0 will enable the monitoring (metrics) but would not enforce any limits."` } @@ -143,10 +165,11 @@ type Limits struct { QueryVerticalShardSize int `yaml:"query_vertical_shard_size" json:"query_vertical_shard_size" doc:"hidden"` // Query Frontend / Scheduler enforced limits. - MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant" json:"max_outstanding_requests_per_tenant"` - QueryPriority QueryPriority `yaml:"query_priority" json:"query_priority" doc:"nocli|description=Configuration for query priority."` - queryPriorityRegexHash uint64 - queryPriorityCompiledRegex map[string]*regexp.Regexp + MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant" json:"max_outstanding_requests_per_tenant"` + QueryPriority QueryPriority `yaml:"query_priority" json:"query_priority" doc:"nocli|description=Configuration for query priority."` + queryAttributeRegexHash uint64 + queryAttributeCompiledRegex map[string]*regexp.Regexp + QueryRejection QueryRejection `yaml:"query_rejection" json:"query_rejection" doc:"nocli|description=Configuration for query rejection."` // Ruler defaults and limits. RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"` @@ -234,6 +257,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.QueryVerticalShardSize, "frontend.query-vertical-shard-size", 0, "[Experimental] Number of shards to use when distributing shardable PromQL queries.") f.BoolVar(&l.QueryPriority.Enabled, "frontend.query-priority.enabled", false, "Whether queries are assigned with priorities.") f.Int64Var(&l.QueryPriority.DefaultPriority, "frontend.query-priority.default-priority", 0, "Priority assigned to all queries by default. Must be a unique value. Use this as a baseline to make certain queries higher/lower priority.") + f.BoolVar(&l.QueryRejection.Enabled, "frontend.query-rejection.enabled", false, "Whether query rejection is enabled.") f.IntVar(&l.MaxOutstandingPerTenant, "frontend.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per request queue (either query frontend or query scheduler); requests beyond this error with HTTP 429.") @@ -296,7 +320,7 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error { return err } - if err := l.compileQueryPriorityRegex(); err != nil { + if err := l.compileQueryAttributeRegex(); err != nil { return err } @@ -326,7 +350,7 @@ func (l *Limits) UnmarshalJSON(data []byte) error { return err } - if err := l.compileQueryPriorityRegex(); err != nil { + if err := l.compileQueryAttributeRegex(); err != nil { return err } @@ -360,31 +384,51 @@ func (l *Limits) copyNotificationIntegrationLimits(defaults NotificationRateLimi } } -func (l *Limits) hasQueryPriorityRegexChanged() bool { +func (l *Limits) hasQueryAttributeRegexChanged() bool { var newHash uint64 - - var seps = []byte{'\xff'} h := xxhash.New() - for _, priority := range l.QueryPriority.Priorities { - for _, attribute := range priority.QueryAttributes { - _, _ = h.WriteString(attribute.Regex) - _, _ = h.Write(seps) + + if l.QueryPriority.Enabled { + for _, priority := range l.QueryPriority.Priorities { + for _, attribute := range priority.QueryAttributes { + addToHash(h, attribute.Regex) + addToHash(h, attribute.UserAgentRegex) + } + } + } + if l.QueryRejection.Enabled { + for _, attribute := range l.QueryRejection.QueryAttributes { + addToHash(h, attribute.Regex) + addToHash(h, attribute.UserAgentRegex) } } + newHash = h.Sum64() - if newHash != l.queryPriorityRegexHash { - l.queryPriorityRegexHash = newHash + if newHash != l.queryAttributeRegexHash { + l.queryAttributeRegexHash = newHash return true } return false } -func (l *Limits) compileQueryPriorityRegex() error { +func addToHash(h *xxhash.Digest, regex string) { + if regex == "" { + return + } + _, _ = h.WriteString(regex) + _, _ = h.Write([]byte{'\xff'}) +} + +func (l *Limits) compileQueryAttributeRegex() error { + if !l.QueryPriority.Enabled && !l.QueryRejection.Enabled { + return nil + } + regexChanged := l.hasQueryAttributeRegexChanged() + newCompiledRegex := map[string]*regexp.Regexp{} + if l.QueryPriority.Enabled { - hasQueryPriorityRegexChanged := l.hasQueryPriorityRegexChanged() prioritySet := map[int64]struct{}{} - newQueryPriorityCompiledRegex := map[string]*regexp.Regexp{} for i, priority := range l.QueryPriority.Priorities { // Check for duplicate priority entry @@ -393,25 +437,48 @@ func (l *Limits) compileQueryPriorityRegex() error { } prioritySet[priority.Priority] = struct{}{} - for j, attribute := range priority.QueryAttributes { - if hasQueryPriorityRegexChanged { - compiledRegex, err := regexp.Compile(attribute.Regex) - if err != nil { - return errors.Join(errCompilingQueryPriorityRegex, err) - } - newQueryPriorityCompiledRegex[attribute.Regex] = compiledRegex - l.QueryPriority.Priorities[i].QueryAttributes[j].CompiledRegex = compiledRegex - } else { - l.QueryPriority.Priorities[i].QueryAttributes[j].CompiledRegex = l.queryPriorityCompiledRegex[attribute.Regex] - } + err := l.compileQueryAttributeRegexes(l.QueryPriority.Priorities[i].QueryAttributes, regexChanged, newCompiledRegex) + if err != nil { + return err } } + } - if hasQueryPriorityRegexChanged { - l.queryPriorityCompiledRegex = newQueryPriorityCompiledRegex + if l.QueryRejection.Enabled { + err := l.compileQueryAttributeRegexes(l.QueryRejection.QueryAttributes, regexChanged, newCompiledRegex) + if err != nil { + return err } } + if regexChanged { + l.queryAttributeCompiledRegex = newCompiledRegex + } + + return nil +} + +func (l *Limits) compileQueryAttributeRegexes(queryAttributes []QueryAttribute, regexChanged bool, newCompiledRegex map[string]*regexp.Regexp) error { + for j, attribute := range queryAttributes { + if regexChanged { + compiledRegex, err := regexp.Compile(attribute.Regex) + if err != nil { + return errors.Join(errCompilingQueryPriorityRegex, err) + } + newCompiledRegex[attribute.Regex] = compiledRegex + queryAttributes[j].CompiledRegex = compiledRegex + + compiledUserAgentRegex, err := regexp.Compile(attribute.UserAgentRegex) + if err != nil { + return errors.Join(errCompilingQueryPriorityRegex, err) + } + newCompiledRegex[attribute.UserAgentRegex] = compiledUserAgentRegex + queryAttributes[j].CompiledUserAgentRegex = compiledUserAgentRegex + } else { + queryAttributes[j].CompiledRegex = l.queryAttributeCompiledRegex[attribute.Regex] + queryAttributes[j].CompiledUserAgentRegex = l.queryAttributeCompiledRegex[attribute.UserAgentRegex] + } + } return nil } @@ -640,6 +707,11 @@ func (o *Overrides) QueryPriority(userID string) QueryPriority { return o.GetOverridesForUser(userID).QueryPriority } +// QueryRejection returns the query reject config for the tenant +func (o *Overrides) QueryRejection(userID string) QueryRejection { + return o.GetOverridesForUser(userID).QueryRejection +} + // EnforceMetricName whether to enforce the presence of a metric name. func (o *Overrides) EnforceMetricName(userID string) bool { return o.GetOverridesForUser(userID).EnforceMetricName diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index a104c2e7b07..05807f63c9e 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -629,9 +629,10 @@ tenant2: require.Equal(t, 5, ov.MaxDownloadedBytesPerRequest("tenant3")) } -func TestHasQueryPriorityRegexChanged(t *testing.T) { +func TestHasQueryAttributeRegexChanged(t *testing.T) { l := Limits{ QueryPriority: QueryPriority{ + Enabled: true, Priorities: []PriorityDef{ { Priority: 1, @@ -643,25 +644,53 @@ func TestHasQueryPriorityRegexChanged(t *testing.T) { }, }, }, + QueryRejection: QueryRejection{ + Enabled: true, + QueryAttributes: []QueryAttribute{ + { + Regex: "testRejection", + }, + }, + }, } - require.True(t, l.hasQueryPriorityRegexChanged()) + require.True(t, l.hasQueryAttributeRegexChanged()) l.QueryPriority.Priorities[0].QueryAttributes[0].Regex = "new" - require.True(t, l.hasQueryPriorityRegexChanged()) + require.True(t, l.hasQueryAttributeRegexChanged()) l.QueryPriority.Priorities[0].QueryAttributes[0].TimeWindow.Start = model.Duration(2 * time.Hour) - require.False(t, l.hasQueryPriorityRegexChanged()) + require.False(t, l.hasQueryAttributeRegexChanged()) l.QueryPriority.Priorities[0].QueryAttributes = append(l.QueryPriority.Priorities[0].QueryAttributes, QueryAttribute{Regex: "hi"}) - require.True(t, l.hasQueryPriorityRegexChanged()) + require.True(t, l.hasQueryAttributeRegexChanged()) l.QueryPriority.Priorities[0].QueryAttributes = l.QueryPriority.Priorities[0].QueryAttributes[:1] - require.True(t, l.hasQueryPriorityRegexChanged()) + require.True(t, l.hasQueryAttributeRegexChanged()) + + l.QueryRejection.QueryAttributes[0].Regex = "newRejectionRegex" + + require.True(t, l.hasQueryAttributeRegexChanged()) + + l.QueryRejection.QueryAttributes = append(l.QueryRejection.QueryAttributes, QueryAttribute{Regex: "new element"}) + + require.True(t, l.hasQueryAttributeRegexChanged()) + + l.QueryRejection.QueryAttributes[1].UserAgentRegex = "New User agent regex" + + require.True(t, l.hasQueryAttributeRegexChanged()) + + l.QueryRejection.QueryAttributes[1].DashboardUID = "New Dashboard Uid" + + require.False(t, l.hasQueryAttributeRegexChanged()) + + l.QueryPriority.Enabled = false + + require.True(t, l.hasQueryAttributeRegexChanged()) } func TestCompileQueryPriorityRegex(t *testing.T) { @@ -679,30 +708,59 @@ func TestCompileQueryPriorityRegex(t *testing.T) { }, }, }, + QueryRejection: QueryRejection{ + Enabled: false, + QueryAttributes: []QueryAttribute{ + { + Regex: "testRejection", + }, + }, + }, } require.Nil(t, l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) - err := l.compileQueryPriorityRegex() + err := l.compileQueryAttributeRegex() require.NoError(t, err) require.Equal(t, regexp.MustCompile("test"), l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) l.QueryPriority.Priorities[0].QueryAttributes[0].Regex = "new" - err = l.compileQueryPriorityRegex() + err = l.compileQueryAttributeRegex() require.NoError(t, err) require.Equal(t, regexp.MustCompile("new"), l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = nil - err = l.compileQueryPriorityRegex() + err = l.compileQueryAttributeRegex() require.NoError(t, err) require.Equal(t, regexp.MustCompile("new"), l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) l.QueryPriority.Enabled = false l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = nil - err = l.compileQueryPriorityRegex() + err = l.compileQueryAttributeRegex() + require.NoError(t, err) + require.Nil(t, l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) + + require.Nil(t, l.QueryRejection.QueryAttributes[0].CompiledRegex) + + l.QueryRejection.Enabled = true + + err = l.compileQueryAttributeRegex() + require.NoError(t, err) + require.Equal(t, regexp.MustCompile("testRejection"), l.QueryRejection.QueryAttributes[0].CompiledRegex) + require.Equal(t, regexp.MustCompile(""), l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledUserAgentRegex) + + l.QueryRejection.QueryAttributes[0].UserAgentRegex = "User agent added" + + err = l.compileQueryAttributeRegex() + require.NoError(t, err) + require.Equal(t, regexp.MustCompile("User agent added"), l.QueryRejection.QueryAttributes[0].CompiledUserAgentRegex) + + l.QueryRejection.QueryAttributes[0].Regex = "" + + err = l.compileQueryAttributeRegex() require.NoError(t, err) require.Nil(t, l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) } diff --git a/tools/doc-generator/main.go b/tools/doc-generator/main.go index 1fc1cf4701c..58a1787ee27 100644 --- a/tools/doc-generator/main.go +++ b/tools/doc-generator/main.go @@ -330,7 +330,7 @@ func main() { flags := parseFlags(cfg) // Parse the config, mapping each config field with the related CLI flag. - blocks, err := parseConfig(nil, cfg, flags) + blocks, err := parseConfig(nil, cfg, flags, map[string]struct{}{}) if err != nil { fmt.Fprintf(os.Stderr, "An error occurred while generating the doc: %s\n", err.Error()) os.Exit(1) diff --git a/tools/doc-generator/parser.go b/tools/doc-generator/parser.go index 762d5e10ecd..178799eefe3 100644 --- a/tools/doc-generator/parser.go +++ b/tools/doc-generator/parser.go @@ -73,7 +73,7 @@ func parseFlags(cfg flagext.Registerer) map[uintptr]*flag.Flag { return flags } -func parseConfig(block *configBlock, cfg interface{}, flags map[uintptr]*flag.Flag) ([]*configBlock, error) { +func parseConfig(block *configBlock, cfg interface{}, flags map[uintptr]*flag.Flag, addedRootBlocks map[string]struct{}) ([]*configBlock, error) { blocks := []*configBlock{} // If the input block is nil it means we're generating the doc for the top-level block @@ -175,7 +175,7 @@ func parseConfig(block *configBlock, cfg interface{}, flags map[uintptr]*flag.Fl } // Recursively generate the doc for the sub-block - otherBlocks, err := parseConfig(subBlock, fieldValue.Addr().Interface(), flags) + otherBlocks, err := parseConfig(subBlock, fieldValue.Addr().Interface(), flags, addedRootBlocks) if err != nil { return nil, err } @@ -193,18 +193,20 @@ func parseConfig(block *configBlock, cfg interface{}, flags map[uintptr]*flag.Fl if field.Type.Kind() == reflect.Slice { sliceElementType := field.Type.Elem() if sliceElementType.Kind() == reflect.Struct { - if field.Type.String() != "labels.Labels" { + fieldTypeName := field.Type.Elem().Name() + if _, exists := addedRootBlocks[fieldTypeName]; !exists && field.Type.String() != "labels.Labels" { rootBlocks = append(rootBlocks, rootBlock{ - name: field.Type.Elem().Name(), + name: fieldTypeName, structType: field.Type.Elem(), }) + addedRootBlocks[fieldTypeName] = struct{}{} } sliceElementBlock := &configBlock{ - name: field.Type.Elem().Name(), + name: fieldTypeName, desc: "", } sliceElementCfg := reflect.New(sliceElementType).Interface() - otherBlocks, err := parseConfig(sliceElementBlock, sliceElementCfg, flags) + otherBlocks, err := parseConfig(sliceElementBlock, sliceElementCfg, flags, addedRootBlocks) if err != nil { return nil, err