diff --git a/.gitignore b/.gitignore index b5af05dca49..87cae3d4f16 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,6 @@ docker-images/ website/public website/resources website/content/en/docs -e2e_integration_test* \ No newline at end of file +e2e_integration_test* +active-query-tracker + diff --git a/CHANGELOG.md b/CHANGELOG.md index 187a03913d3..899afef7c67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ * [CHANGE] Experimental Memberlist KV store can now be used in single-binary Cortex. Attempts to use it previously would fail with panic. This change also breaks existing binary protocol used to exchange gossip messages, so this version will not be able to understand gossiped Ring when used in combination with the previous version of Cortex. Easiest way to upgrade is to shutdown old Cortex installation, and restart it with new version. Incremental rollout works too, but with reduced functionality until all components run the same version. #2016 * [CHANGE] Renamed the cache configuration setting `defaul_validity` to `default_validity`. #2140 * [CHANGE] Removed unused /validate_expr endpoint. #2152 +* [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088 * [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125 * [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947 * `--experimental.distributor.user-subring-size` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index d8c8f4a6da8..9121873ca83 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -569,6 +569,13 @@ The `querier_config` configures the Cortex querier. # The default evaluation interval or step size for subqueries. # CLI flag: -querier.default-evaluation-interval [defaultevaluationinterval: | default = 1m0s] + +# Active query tracker monitors active queries, and writes them to the file in +# given directory. If Cortex discovers any queries in this log during startup, +# it will log them to the log file. Setting to empty value disables active query +# tracker, which also disables -querier.max-concurrent option. +# CLI flag: -querier.active-query-tracker-dir +[active_query_tracker_dir: | default = "./active-query-tracker"] ``` ## `query_frontend_config` diff --git a/go.mod b/go.mod index b2b6e652785..cad5e9973a3 100644 --- a/go.mod +++ b/go.mod @@ -52,8 +52,8 @@ require ( github.com/prometheus/alertmanager v0.19.0 github.com/prometheus/client_golang v1.2.1 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 - github.com/prometheus/common v0.7.0 - github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef + github.com/prometheus/common v0.8.0 + github.com/prometheus/prometheus v1.8.2-0.20200213233353-b90be6f32a33 github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1 github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e github.com/spf13/afero v1.2.2 @@ -71,7 +71,7 @@ require ( golang.org/x/time v0.0.0-20191024005414-555d28b269f0 google.golang.org/api v0.14.0 google.golang.org/grpc v1.25.1 - gopkg.in/yaml.v2 v2.2.5 + gopkg.in/yaml.v2 v2.2.7 sigs.k8s.io/yaml v1.1.0 ) diff --git a/go.sum b/go.sum index f69ed698bd6..36a45a3bcbc 100644 --- a/go.sum +++ b/go.sum @@ -671,6 +671,8 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= +github.com/prometheus/common v0.8.0 h1:bLkjvFe2ZRX1DpcgZcdf7j/+MnusEps5hktST/FHA34= +github.com/prometheus/common v0.8.0/go.mod h1:PC/OgXc+UN7B4ALwvn1yzVZmVwvhXp5JsbBv6wSv6i0= github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= @@ -686,6 +688,8 @@ github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oA github.com/prometheus/prometheus v0.0.0-20190818123050-43acd0e2e93f/go.mod h1:rMTlmxGCvukf2KMu3fClMDKLLoJ5hl61MhcJ7xKakf0= github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef h1:pYYKXo/zGx25kyViw+Gdbxd0ItIg+vkVKpwgWUEyIc4= github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef/go.mod h1:7U90zPoLkWjEIQcy/rweQla82OCTUzxVHE51G3OhJbI= +github.com/prometheus/prometheus v1.8.2-0.20200213233353-b90be6f32a33 h1:HBYrMJj5iosUjUkAK9L5GO+5eEQXbcrzdjkqY9HV5W4= +github.com/prometheus/prometheus v1.8.2-0.20200213233353-b90be6f32a33/go.mod h1:fkIPPkuZnkXyopYHmXPxf9rgiPkVgZCN8w9o8+UgBlY= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1 h1:+kGqA4dNN5hn7WwvKdzHl0rdN5AEkbNZd0VjRltAiZg= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -1064,6 +1068,10 @@ gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5 h1:ymVxjfMaHvXD8RqPRmzHHsB3VvucivSkIAvJFDI5O3c= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= +gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2 h1:XZx7nhd5GMaZpmDaEHFVafUZC7ya0fuo7cSJ3UCKYmM= +gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/configs/configs.go b/pkg/configs/configs.go index 9f9514a4fcb..00554b07a9b 100644 --- a/pkg/configs/configs.go +++ b/pkg/configs/configs.go @@ -8,11 +8,11 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/rules" legacy_promql "github.com/cortexproject/cortex/pkg/configs/legacy_promql" + legacy_rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt" "github.com/cortexproject/cortex/pkg/util" ) @@ -185,7 +185,7 @@ func (c RulesConfig) Parse() (map[string][]rules.Rule, error) { // ParseFormatted returns the rulefmt map of a users rules configs. It allows // for rules to be mapped to disk and read by the prometheus rules manager. -func (c RulesConfig) ParseFormatted() (map[string]rulefmt.RuleGroups, error) { +func (c RulesConfig) ParseFormatted() (map[string]legacy_rulefmt.RuleGroups, error) { switch c.FormatVersion { case RuleFormatV1: return c.parseV1Formatted() @@ -198,11 +198,11 @@ func (c RulesConfig) ParseFormatted() (map[string]rulefmt.RuleGroups, error) { // parseV2 parses and validates the content of the rule files in a RulesConfig // according to the Prometheus 2.x rule format. -func (c RulesConfig) parseV2Formatted() (map[string]rulefmt.RuleGroups, error) { - ruleMap := map[string]rulefmt.RuleGroups{} +func (c RulesConfig) parseV2Formatted() (map[string]legacy_rulefmt.RuleGroups, error) { + ruleMap := map[string]legacy_rulefmt.RuleGroups{} for fn, content := range c.Files { - rgs, errs := rulefmt.Parse([]byte(content)) + rgs, errs := legacy_rulefmt.Parse([]byte(content)) for _, err := range errs { // return just the first error, if any return nil, err } @@ -214,17 +214,17 @@ func (c RulesConfig) parseV2Formatted() (map[string]rulefmt.RuleGroups, error) { // parseV1 parses and validates the content of the rule files in a RulesConfig // according to the Prometheus 1.x rule format. -func (c RulesConfig) parseV1Formatted() (map[string]rulefmt.RuleGroups, error) { - result := map[string]rulefmt.RuleGroups{} +func (c RulesConfig) parseV1Formatted() (map[string]legacy_rulefmt.RuleGroups, error) { + result := map[string]legacy_rulefmt.RuleGroups{} for fn, content := range c.Files { stmts, err := legacy_promql.ParseStmts(content) if err != nil { return nil, fmt.Errorf("error parsing %s: %s", fn, err) } - ra := []rulefmt.Rule{} + ra := []legacy_rulefmt.Rule{} for _, stmt := range stmts { - var rule rulefmt.Rule + var rule legacy_rulefmt.Rule switch r := stmt.(type) { case *legacy_promql.AlertStmt: _, err := promql.ParseExpr(r.Expr.String()) @@ -232,7 +232,7 @@ func (c RulesConfig) parseV1Formatted() (map[string]rulefmt.RuleGroups, error) { return nil, err } - rule = rulefmt.Rule{ + rule = legacy_rulefmt.Rule{ Alert: r.Name, Expr: r.Expr.String(), For: model.Duration(r.Duration), @@ -246,7 +246,7 @@ func (c RulesConfig) parseV1Formatted() (map[string]rulefmt.RuleGroups, error) { return nil, err } - rule = rulefmt.Rule{ + rule = legacy_rulefmt.Rule{ Record: r.Name, Expr: r.Expr.String(), Labels: r.Labels.Map(), @@ -257,8 +257,8 @@ func (c RulesConfig) parseV1Formatted() (map[string]rulefmt.RuleGroups, error) { } ra = append(ra, rule) } - result[fn] = rulefmt.RuleGroups{ - Groups: []rulefmt.RuleGroup{ + result[fn] = legacy_rulefmt.RuleGroups{ + Groups: []legacy_rulefmt.RuleGroup{ { Name: "rg:" + fn, Rules: ra, @@ -286,7 +286,7 @@ func (c RulesConfig) parseV2() (map[string][]rules.Rule, error) { groups := map[string][]rules.Rule{} for fn, content := range c.Files { - rgs, errs := rulefmt.Parse([]byte(content)) + rgs, errs := legacy_rulefmt.Parse([]byte(content)) if len(errs) > 0 { return nil, fmt.Errorf("error parsing %s: %v", fn, errs[0]) } diff --git a/pkg/configs/configs_test.go b/pkg/configs/configs_test.go index c4eba82abc5..32c19bb90a6 100644 --- a/pkg/configs/configs_test.go +++ b/pkg/configs/configs_test.go @@ -9,12 +9,12 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/rules" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + legacy_rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt" "github.com/cortexproject/cortex/pkg/util" ) @@ -130,7 +130,7 @@ func TestParseFormatted(t *testing.T) { dur, err := model.ParseDuration("5m") require.NoError(t, err) - rules := []rulefmt.Rule{ + rules := []legacy_rulefmt.Rule{ { Alert: "TestAlert", Expr: "up == 0", @@ -146,7 +146,7 @@ func TestParseFormatted(t *testing.T) { for i, tc := range []struct { cfg RulesConfig - expected map[string]rulefmt.RuleGroups + expected map[string]legacy_rulefmt.RuleGroups }{ { cfg: RulesConfig{ @@ -155,9 +155,9 @@ func TestParseFormatted(t *testing.T) { "legacy.rules": legacyRulesFile, }, }, - expected: map[string]rulefmt.RuleGroups{ + expected: map[string]legacy_rulefmt.RuleGroups{ "legacy.rules": { - Groups: []rulefmt.RuleGroup{ + Groups: []legacy_rulefmt.RuleGroup{ { Name: "rg:legacy.rules", Rules: rules, @@ -173,9 +173,9 @@ func TestParseFormatted(t *testing.T) { "alerts.yaml": ruleFile, }, }, - expected: map[string]rulefmt.RuleGroups{ + expected: map[string]legacy_rulefmt.RuleGroups{ "alerts.yaml": { - Groups: []rulefmt.RuleGroup{ + Groups: []legacy_rulefmt.RuleGroup{ { Name: "example", Rules: rules, diff --git a/pkg/configs/legacy_promql/engine_test.go b/pkg/configs/legacy_promql/engine_test.go index deceaba5d0d..6d06b7729f0 100644 --- a/pkg/configs/legacy_promql/engine_test.go +++ b/pkg/configs/legacy_promql/engine_test.go @@ -132,6 +132,9 @@ type errQuerier struct { func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { return errSeriesSet{err: q.err}, nil, q.err } +func (q *errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return errSeriesSet{err: q.err}, nil, q.err +} func (q *errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, q.err } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 7a71018740a..c3c894fb188 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -3,7 +3,6 @@ package cortex import ( "errors" "fmt" - "math" "net/http" "os" "regexp" @@ -262,7 +261,7 @@ func (t *Cortex) initQuerier(cfg *Config) (err error) { api.Register(promRouter) subrouter := t.server.HTTP.PathPrefix("/api/prom").Subrouter() - subrouter.PathPrefix("/api/v1").Handler(t.httpAuthMiddleware.Wrap(promRouter)) + subrouter.PathPrefix("/api/v1").Handler(fakeRemoteAddr(t.httpAuthMiddleware.Wrap(promRouter))) subrouter.Path("/read").Handler(t.httpAuthMiddleware.Wrap(querier.RemoteReadHandler(queryable))) subrouter.Path("/chunks").Handler(t.httpAuthMiddleware.Wrap(querier.ChunksHandler(queryable))) subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(t.distributor.UserStatsHandler))) @@ -283,6 +282,20 @@ func (t *Cortex) initQuerier(cfg *Config) (err error) { return } +// Latest Prometheus requires r.RemoteAddr to be set to addr:port, otherwise it reject the request. +// Requests to Querier sometimes doesn't have that (if they are fetched from Query-Frontend). +// Prometheus uses this when logging queries to QueryLogger, but Cortex doesn't call engine.SetQueryLogger to set one. +// +// Can be removed when (if) https://github.com/prometheus/prometheus/pull/6840 is merged. +func fakeRemoteAddr(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.RemoteAddr == "" { + r.RemoteAddr = "127.0.0.1:8888" + } + handler.ServeHTTP(w, r) + }) +} + func (t *Cortex) stopQuerier() error { t.worker.Stop() return nil @@ -374,11 +387,10 @@ func (t *Cortex) initQueryFrontend(cfg *Config) (err error) { queryrange.PrometheusResponseExtractor, cfg.Schema, promql.EngineOpts{ - Logger: util.Logger, - Reg: prometheus.DefaultRegisterer, - MaxConcurrent: int(math.MaxInt64), // the frontend's promql engine should not set any concurrency controls (these are handled by middleware) - MaxSamples: cfg.Querier.MaxSamples, - Timeout: cfg.Querier.Timeout, + Logger: util.Logger, + Reg: prometheus.DefaultRegisterer, + MaxSamples: cfg.Querier.MaxSamples, + Timeout: cfg.Querier.Timeout, }, cfg.Querier.QueryIngestersWithin, ) diff --git a/pkg/querier/astmapper/astmapper_test.go b/pkg/querier/astmapper/astmapper_test.go index ca90026da91..1caed998fbc 100644 --- a/pkg/querier/astmapper/astmapper_test.go +++ b/pkg/querier/astmapper/astmapper_test.go @@ -24,8 +24,8 @@ func TestCloneNode(t *testing.T) { }, &promql.BinaryExpr{ Op: promql.ADD, - LHS: &promql.NumberLiteral{Val: 1}, - RHS: &promql.NumberLiteral{Val: 1}, + LHS: &promql.NumberLiteral{Val: 1, PosRange: promql.PositionRange{Start: 0, End: 1}}, + RHS: &promql.NumberLiteral{Val: 1, PosRange: promql.PositionRange{Start: 4, End: 5}}, }, }, { @@ -48,8 +48,16 @@ func TestCloneNode(t *testing.T) { LabelMatchers: []*labels.Matcher{ mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "some_metric"), }, + PosRange: promql.PositionRange{ + Start: 18, + End: 29, + }, }, Grouping: []string{"foo"}, + PosRange: promql.PositionRange{ + Start: 0, + End: 30, + }, }, }, } diff --git a/pkg/querier/astmapper/shard_summer.go b/pkg/querier/astmapper/shard_summer.go index c437f0ca231..a2f12ab34ef 100644 --- a/pkg/querier/astmapper/shard_summer.go +++ b/pkg/querier/astmapper/shard_summer.go @@ -222,15 +222,23 @@ func shardMatrixSelector(curshard, shards int, selector *promql.MatrixSelector) return nil, err } - return &promql.MatrixSelector{ - Name: selector.Name, - Range: selector.Range, - Offset: selector.Offset, - LabelMatchers: append( - []*labels.Matcher{shardMatcher}, - selector.LabelMatchers..., - ), - }, nil + if vs, ok := selector.VectorSelector.(*promql.VectorSelector); ok { + return &promql.MatrixSelector{ + VectorSelector: &promql.VectorSelector{ + Name: vs.Name, + Offset: vs.Offset, + LabelMatchers: append( + []*labels.Matcher{shardMatcher}, + vs.LabelMatchers..., + ), + PosRange: vs.PosRange, + }, + Range: selector.Range, + EndPos: selector.EndPos, + }, nil + } + + return nil, fmt.Errorf("invalid selector type: %T", selector.VectorSelector) } // ParseShard will extract the shard information encoded in ShardLabelFmt diff --git a/pkg/querier/astmapper/subtree_folder.go b/pkg/querier/astmapper/subtree_folder.go index 5d142bea116..bdc1819bd0f 100644 --- a/pkg/querier/astmapper/subtree_folder.go +++ b/pkg/querier/astmapper/subtree_folder.go @@ -47,10 +47,7 @@ func isEmbedded(node promql.Node) (bool, error) { } case *promql.MatrixSelector: - if n.Name == EmbeddedQueriesMetricName { - return true, nil - } - + return isEmbedded(n.VectorSelector) } return false, nil } diff --git a/pkg/querier/block.go b/pkg/querier/block.go index 7a56813ea95..1f687f9742e 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -78,6 +78,10 @@ func (b *blocksQuerier) addUserToContext(ctx context.Context) context.Context { } func (b *blocksQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return b.SelectSorted(sp, matchers...) +} + +func (b *blocksQuerier) SelectSorted(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { log, ctx := spanlogger.New(b.ctx, "blocksQuerier.Select") defer log.Span.Finish() diff --git a/pkg/querier/chunk_store_queryable.go b/pkg/querier/chunk_store_queryable.go index 1c1922ff7d8..af3e587578d 100644 --- a/pkg/querier/chunk_store_queryable.go +++ b/pkg/querier/chunk_store_queryable.go @@ -36,7 +36,7 @@ type chunkStoreQuerier struct { mint, maxt int64 } -func (q *chunkStoreQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *chunkStoreQuerier) SelectSorted(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { userID, err := user.ExtractOrgID(q.ctx) if err != nil { return nil, nil, err @@ -49,6 +49,10 @@ func (q *chunkStoreQuerier) Select(sp *storage.SelectParams, matchers ...*labels return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc), nil, nil } +func (q *chunkStoreQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.SelectSorted(sp, matchers...) +} + // Series in the returned set are sorted alphabetically by labels. func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet { chunksBySeries := map[model.Fingerprint][]chunk.Chunk{} diff --git a/pkg/querier/chunk_tar_test.go b/pkg/querier/chunk_tar_test.go index bfc2a974af7..5c7882ef6d3 100644 --- a/pkg/querier/chunk_tar_test.go +++ b/pkg/querier/chunk_tar_test.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "context" "io" + "io/ioutil" "math" "os" "strconv" @@ -13,6 +14,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/util/testutil" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" @@ -50,15 +52,20 @@ func getTarDataFromEnv(t testing.TB) (query string, from, through time.Time, ste } func runRangeQuery(t testing.TB, query string, from, through time.Time, step time.Duration, store chunkstore.ChunkStore) { + dir, err := ioutil.TempDir("", t.Name()) + testutil.Ok(t, err) + defer os.RemoveAll(dir) + queryTracker := promql.NewActiveQueryTracker(dir, 1, util.Logger) + if len(query) == 0 || store == nil { return } queryable := newChunkStoreQueryable(store, batch.NewChunkMergeIterator) engine := promql.NewEngine(promql.EngineOpts{ - Logger: util.Logger, - MaxConcurrent: 1, - MaxSamples: math.MaxInt32, - Timeout: 10 * time.Minute, + Logger: util.Logger, + ActiveQueryTracker: queryTracker, + MaxSamples: math.MaxInt32, + Timeout: 10 * time.Minute, }) rangeQuery, err := engine.NewRangeQuery(queryable, query, from, through, step) require.NoError(t, err) diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 25961025912..36758651281 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -47,7 +47,7 @@ type distributorQuerier struct { chunkIterFn chunkIteratorFunc } -func (q *distributorQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *distributorQuerier) SelectSorted(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { // Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation, // which needs only metadata. if sp == nil { @@ -69,9 +69,14 @@ func (q *distributorQuerier) Select(sp *storage.SelectParams, matchers ...*label return nil, nil, promql.ErrStorage{Err: err} } + // Using MatrixToSeriesSet (and in turn NewConcreteSeriesSet), sorts the series. return series.MatrixToSeriesSet(matrix), nil, nil } +func (q *distributorQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.SelectSorted(sp, matchers...) +} + func (q *distributorQuerier) streamingSelect(sp storage.SelectParams, matchers []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { userID, err := user.ExtractOrgID(q.ctx) if err != nil { diff --git a/pkg/querier/lazyquery/lazyquery.go b/pkg/querier/lazyquery/lazyquery.go index ba6531f74f0..691179ebc5c 100644 --- a/pkg/querier/lazyquery/lazyquery.go +++ b/pkg/querier/lazyquery/lazyquery.go @@ -17,7 +17,7 @@ type LazyQueryable struct { q storage.Queryable } -// Querier impls storage.Queryable +// Querier implements storage.Queryable func (lq LazyQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { q, err := lq.q.Querier(ctx, mint, maxt) if err != nil { @@ -43,35 +43,48 @@ func NewLazyQuerier(next storage.Querier) storage.Querier { return LazyQuerier{next} } -// Select impls Storage.Querier -func (l LazyQuerier) Select(params *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (l LazyQuerier) createSeriesSet(params *storage.SelectParams, matchers []*labels.Matcher, selectFunc func(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error)) chan storage.SeriesSet { // make sure there is space in the buffer, to unblock the goroutine and let it die even if nobody is // waiting for the result yet (or anymore). future := make(chan storage.SeriesSet, 1) go func() { - set, _, err := l.next.Select(params, matchers...) + set, _, err := selectFunc(params, matchers...) if err != nil { future <- errSeriesSet{err} } else { future <- set } }() + return future +} + +// Select implements Storage.Querier +func (l LazyQuerier) Select(params *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + future := l.createSeriesSet(params, matchers, l.next.Select) + return &lazySeriesSet{ + future: future, + }, nil, nil +} + +// SelectSorted implements Storage.Querier +func (l LazyQuerier) SelectSorted(params *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + future := l.createSeriesSet(params, matchers, l.next.SelectSorted) return &lazySeriesSet{ future: future, }, nil, nil } -// LabelValues impls Storage.Querier +// LabelValues implements Storage.Querier func (l LazyQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return l.next.LabelValues(name) } -// LabelNames impls Storage.Querier +// LabelNames implements Storage.Querier func (l LazyQuerier) LabelNames() ([]string, storage.Warnings, error) { return l.next.LabelNames() } -// Close impls Storage.Querier +// Close implements Storage.Querier func (l LazyQuerier) Close() error { return l.next.Close() } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 63120f442ea..7e6f508cc23 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -38,6 +38,12 @@ type Config struct { // step if not specified. DefaultEvaluationInterval time.Duration + // Directory for ActiveQueryTracker. If empty, ActiveQueryTracker will be disabled and MaxConcurrent will not be applied (!). + // ActiveQueryTracker logs queries that were active during the last crash, but logs them on the next startup. + // However, we need to use active query tracker, otherwise we cannot limit Max Concurrent queries in the PromQL + // engine. + ActiveQueryTrackerDir string `yaml:"active_query_tracker_dir"` + // For testing, to prevent re-registration of metrics in the promql engine. metricsRegisterer prometheus.Registerer `yaml:"-"` } @@ -60,6 +66,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.") f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should only be queried from storage and not just ingesters. 0 means all queries are sent to store.") + f.StringVar(&cfg.ActiveQueryTrackerDir, "querier.active-query-tracker-dir", "./active-query-tracker", "Active query tracker monitors active queries, and writes them to the file in given directory. If Cortex discovers any queries in this log during startup, it will log them to the log file. Setting to empty value disables active query tracker, which also disables -querier.max-concurrent option.") cfg.metricsRegisterer = prometheus.DefaultRegisterer } @@ -107,15 +114,25 @@ func New(cfg Config, distributor Distributor, storeQueryable storage.Queryable) promql.SetDefaultEvaluationInterval(cfg.DefaultEvaluationInterval) engine := promql.NewEngine(promql.EngineOpts{ - Logger: util.Logger, - Reg: cfg.metricsRegisterer, - MaxConcurrent: cfg.MaxConcurrent, - MaxSamples: cfg.MaxSamples, - Timeout: cfg.Timeout, + Logger: util.Logger, + Reg: cfg.metricsRegisterer, + ActiveQueryTracker: createActiveQueryTracker(cfg), + MaxSamples: cfg.MaxSamples, + Timeout: cfg.Timeout, }) return lazyQueryable, engine } +func createActiveQueryTracker(cfg Config) *promql.ActiveQueryTracker { + dir := cfg.ActiveQueryTrackerDir + + if dir != "" { + return promql.NewActiveQueryTracker(dir, cfg.MaxConcurrent, util.Logger) + } + + return nil +} + // NewQueryable creates a new Queryable for cortex. func NewQueryable(distributor, store storage.Queryable, chunkIterFn chunkIteratorFunc, cfg Config) storage.Queryable { return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { @@ -165,8 +182,8 @@ type querier struct { mint, maxt int64 } -// Select implements storage.Querier. -func (q querier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +// SelectSorted implements storage.Querier. +func (q querier) SelectSorted(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { // Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation, // which needs only metadata. Here we expect that metadataQuerier querier will handle that. // In Cortex it is not feasible to query entire history (with no mint/maxt), so we only ask ingesters and skip @@ -206,9 +223,15 @@ func (q querier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) ( // we have all the sets from different sources (chunk from store, chunks from ingesters, // time series from store and time series from ingesters). + // mergeSeriesSets will return sorted set. return q.mergeSeriesSets(result), nil, nil } +// Select implements storage.Querier. +func (q querier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.SelectSorted(sp, matchers...) +} + // LabelsValue implements storage.Querier. func (q querier) LabelValues(name string) ([]string, storage.Warnings, error) { return q.metadataQuerier.LabelValues(name) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index e544495b6f3..891ee58aeb3 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -3,6 +3,8 @@ package querier import ( "context" "fmt" + "io/ioutil" + "os" "strconv" "sync" "testing" @@ -12,6 +14,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/testutil" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" @@ -192,11 +195,16 @@ func TestNoHistoricalQueryToIngester(t *testing.T) { }, } + dir, err := ioutil.TempDir("", t.Name()) + testutil.Ok(t, err) + defer os.RemoveAll(dir) + queryTracker := promql.NewActiveQueryTracker(dir, 10, util.Logger) + engine := promql.NewEngine(promql.EngineOpts{ - Logger: util.Logger, - MaxConcurrent: 10, - MaxSamples: 1e6, - Timeout: 1 * time.Minute, + Logger: util.Logger, + ActiveQueryTracker: queryTracker, + MaxSamples: 1e6, + Timeout: 1 * time.Minute, }) cfg := Config{} for _, ingesterStreaming := range []bool{true, false} { @@ -249,13 +257,18 @@ func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *moc return result } -func testQuery(t require.TestingT, queryable storage.Queryable, end model.Time, q query) *promql.Result { +func testQuery(t testing.TB, queryable storage.Queryable, end model.Time, q query) *promql.Result { + dir, err := ioutil.TempDir("", "test_query") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + queryTracker := promql.NewActiveQueryTracker(dir, 10, util.Logger) + from, through, step := time.Unix(0, 0), end.Time(), q.step engine := promql.NewEngine(promql.EngineOpts{ - Logger: util.Logger, - MaxConcurrent: 10, - MaxSamples: 1e6, - Timeout: 1 * time.Minute, + Logger: util.Logger, + ActiveQueryTracker: queryTracker, + MaxSamples: 1e6, + Timeout: 1 * time.Minute, }) query, err := engine.NewRangeQuery(queryable, q.query, from, through, step) require.NoError(t, err) @@ -358,11 +371,16 @@ func TestShortTermQueryToLTS(t *testing.T) { }, } + dir, err := ioutil.TempDir("", t.Name()) + testutil.Ok(t, err) + defer os.RemoveAll(dir) + queryTracker := promql.NewActiveQueryTracker(dir, 10, util.Logger) + engine := promql.NewEngine(promql.EngineOpts{ - Logger: util.Logger, - MaxConcurrent: 10, - MaxSamples: 1e6, - Timeout: 1 * time.Minute, + Logger: util.Logger, + ActiveQueryTracker: queryTracker, + MaxSamples: 1e6, + Timeout: 1 * time.Minute, }) cfg := Config{} for _, ingesterStreaming := range []bool{true, false} { diff --git a/pkg/querier/queryrange/promql_test.go b/pkg/querier/queryrange/promql_test.go index ded2cc12416..be3b4707ae1 100644 --- a/pkg/querier/queryrange/promql_test.go +++ b/pkg/querier/queryrange/promql_test.go @@ -2,6 +2,7 @@ package queryrange import ( "context" + "errors" "fmt" "math" "sort" @@ -26,7 +27,6 @@ var ( ctx = context.Background() engine = promql.NewEngine(promql.EngineOpts{ Reg: prometheus.DefaultRegisterer, - MaxConcurrent: 1000, Logger: util.Logger, Timeout: 1 * time.Hour, MaxSamples: 10e6, @@ -526,6 +526,10 @@ func (m *testMatrix) At() storage.Series { func (m *testMatrix) Err() error { return nil } +func (m *testMatrix) SelectSorted(selectParams *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return nil, nil, errors.New("not implemented") +} + func (m *testMatrix) Select(selectParams *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { s, _, err := astmapper.ShardFromMatchers(matchers) if err != nil { diff --git a/pkg/querier/queryrange/queryable.go b/pkg/querier/queryrange/queryable.go index 01ebe6b6865..d9a10d0c08b 100644 --- a/pkg/querier/queryrange/queryable.go +++ b/pkg/querier/queryrange/queryable.go @@ -33,11 +33,12 @@ type ShardedQuerier struct { Handler Handler } +func (q *ShardedQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.SelectSorted(sp, matchers...) +} + // Select returns a set of series that matches the given label matchers. -func (q *ShardedQuerier) Select( - _ *storage.SelectParams, - matchers ...*labels.Matcher, -) (storage.SeriesSet, storage.Warnings, error) { +func (q *ShardedQuerier) SelectSorted(_ *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { var embeddedQuery string var isEmbedded bool for _, matcher := range matchers { diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index d57c7b0e969..cd14f17d993 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -75,11 +75,10 @@ func TestQueryshardingMiddleware(t *testing.T) { for _, c := range testExpr { t.Run(c.name, func(t *testing.T) { engine := promql.NewEngine(promql.EngineOpts{ - Logger: util.Logger, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 1000, - Timeout: time.Minute, + Logger: util.Logger, + Reg: nil, + MaxSamples: 1000, + Timeout: time.Minute, }) handler := NewQueryShardMiddleware( @@ -519,12 +518,10 @@ func BenchmarkQuerySharding(b *testing.B) { time.Millisecond / 10, } { engine := promql.NewEngine(promql.EngineOpts{ - Logger: util.Logger, - Reg: nil, - // set high concurrency so we're not bottlenecked here - MaxConcurrent: 100000, - MaxSamples: 100000000, - Timeout: time.Minute, + Logger: util.Logger, + Reg: nil, + MaxSamples: 100000000, + Timeout: time.Minute, }) queryable := NewMockShardedQueryable( diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index e9423983c38..2ae24df2d90 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -53,11 +53,10 @@ func TestRoundTrip(t *testing.T) { nil, chunk.SchemaConfig{}, promql.EngineOpts{ - Logger: util.Logger, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 1000, - Timeout: time.Minute, + Logger: util.Logger, + Reg: nil, + MaxSamples: 1000, + Timeout: time.Minute, }, 0, ) diff --git a/pkg/querier/queryrange/test_utils.go b/pkg/querier/queryrange/test_utils.go index 400b0eef71a..71e7985aeef 100644 --- a/pkg/querier/queryrange/test_utils.go +++ b/pkg/querier/queryrange/test_utils.go @@ -82,11 +82,12 @@ func (q *MockShardedQueryable) Querier(ctx context.Context, mint, maxt int64) (s return q, nil } +func (q *MockShardedQueryable) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.SelectSorted(sp, matchers...) +} + // Select impls storage.Querier -func (q *MockShardedQueryable) Select( - _ *storage.SelectParams, - matchers ...*labels.Matcher, -) (storage.SeriesSet, storage.Warnings, error) { +func (q *MockShardedQueryable) SelectSorted(_ *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { tStart := time.Now() shard, _, err := astmapper.ShardFromMatchers(matchers) @@ -142,8 +143,8 @@ func (q *MockShardedQueryable) Select( time.Sleep(remaining) } + // sorted return series.NewConcreteSeriesSet(results), nil, nil - } // ShardLabelSeries allows extending a Series with new labels. This is helpful for adding cortex shard labels diff --git a/pkg/querier/queryrange/value.go b/pkg/querier/queryrange/value.go index e8b8095ee2a..c4924464ffb 100644 --- a/pkg/querier/queryrange/value.go +++ b/pkg/querier/queryrange/value.go @@ -99,6 +99,7 @@ func ResponseToSamples(resp Response) ([]SampleStream, error) { } // NewSeriesSet returns an in memory storage.SeriesSet from a []SampleStream +// As NewSeriesSet uses NewConcreteSeriesSet to implement SeriesSet, result will be sorted by label names. func NewSeriesSet(results []SampleStream) storage.SeriesSet { set := make([]storage.Series, 0, len(results)) diff --git a/pkg/querier/remote_read_test.go b/pkg/querier/remote_read_test.go index f12c2460d89..d02fef0d2e8 100644 --- a/pkg/querier/remote_read_test.go +++ b/pkg/querier/remote_read_test.go @@ -87,13 +87,17 @@ type mockQuerier struct { matrix model.Matrix } -func (m mockQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (m mockQuerier) SelectSorted(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { if sp == nil { panic(fmt.Errorf("select params must be set")) } return series.MatrixToSeriesSet(m.matrix), nil, nil } +func (m mockQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return m.SelectSorted(sp, matchers...) +} + func (m mockQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, nil } diff --git a/pkg/querier/series/series_set.go b/pkg/querier/series/series_set.go index 4f11dde09ca..fc0f42efc96 100644 --- a/pkg/querier/series/series_set.go +++ b/pkg/querier/series/series_set.go @@ -33,6 +33,7 @@ type ConcreteSeriesSet struct { } // NewConcreteSeriesSet instantiates an in-memory series set from a series +// Series will be sorted by labels. func NewConcreteSeriesSet(series []storage.Series) storage.SeriesSet { sort.Sort(byLabels(series)) return &ConcreteSeriesSet{ @@ -143,6 +144,7 @@ func (e errIterator) Err() error { } // MatrixToSeriesSet creates a storage.SeriesSet from a model.Matrix +// Series will be sorted by labels. func MatrixToSeriesSet(m model.Matrix) storage.SeriesSet { series := make([]storage.Series, 0, len(m)) for _, ss := range m { diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index e6e00478758..91ecdec595e 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -15,7 +15,8 @@ func TestRuler_rules(t *testing.T) { cfg, cleanup := defaultRulerConfig(newMockRuleStore(mockRules)) defer cleanup() - r := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg) + defer rcleanup() defer r.Stop() req := httptest.NewRequest("GET", "https://localhost:8080/api/prom/api/v1/rules", nil) @@ -70,7 +71,8 @@ func TestRuler_alerts(t *testing.T) { cfg, cleanup := defaultRulerConfig(newMockRuleStore(mockRules)) defer cleanup() - r := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg) + defer rcleanup() defer r.Stop() req := httptest.NewRequest("GET", "https://localhost:8080/api/prom/api/v1/alerts", nil) diff --git a/pkg/ruler/legacy_rulefmt/rulefmt.go b/pkg/ruler/legacy_rulefmt/rulefmt.go new file mode 100644 index 00000000000..501079862c7 --- /dev/null +++ b/pkg/ruler/legacy_rulefmt/rulefmt.go @@ -0,0 +1,216 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rulefmt + +import ( + "context" + "io/ioutil" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/common/model" + yaml "gopkg.in/yaml.v2" + + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/template" +) + +// Error represents semantic errors on parsing rule groups. +type Error struct { + Group string + Rule int + RuleName string + Err error +} + +func (err *Error) Error() string { + return errors.Wrapf(err.Err, "group %q, rule %d, %q", err.Group, err.Rule, err.RuleName).Error() +} + +// RuleGroups is a set of rule groups that are typically exposed in a file. +type RuleGroups struct { + Groups []RuleGroup `yaml:"groups"` +} + +// Validate validates all rules in the rule groups. +func (g *RuleGroups) Validate() (errs []error) { + set := map[string]struct{}{} + + for _, g := range g.Groups { + if g.Name == "" { + errs = append(errs, errors.Errorf("Groupname should not be empty")) + } + + if _, ok := set[g.Name]; ok { + errs = append( + errs, + errors.Errorf("groupname: \"%s\" is repeated in the same file", g.Name), + ) + } + + set[g.Name] = struct{}{} + + for i, r := range g.Rules { + for _, err := range r.Validate() { + var ruleName string + if r.Alert != "" { + ruleName = r.Alert + } else { + ruleName = r.Record + } + errs = append(errs, &Error{ + Group: g.Name, + Rule: i, + RuleName: ruleName, + Err: err, + }) + } + } + } + + return errs +} + +// RuleGroup is a list of sequentially evaluated recording and alerting rules. +type RuleGroup struct { + Name string `yaml:"name"` + Interval model.Duration `yaml:"interval,omitempty"` + Rules []Rule `yaml:"rules"` +} + +// Rule describes an alerting or recording rule. +type Rule struct { + Record string `yaml:"record,omitempty"` + Alert string `yaml:"alert,omitempty"` + Expr string `yaml:"expr"` + For model.Duration `yaml:"for,omitempty"` + Labels map[string]string `yaml:"labels,omitempty"` + Annotations map[string]string `yaml:"annotations,omitempty"` +} + +// Validate the rule and return a list of encountered errors. +func (r *Rule) Validate() (errs []error) { + if r.Record != "" && r.Alert != "" { + errs = append(errs, errors.Errorf("only one of 'record' and 'alert' must be set")) + } + if r.Record == "" && r.Alert == "" { + errs = append(errs, errors.Errorf("one of 'record' or 'alert' must be set")) + } + + if r.Expr == "" { + errs = append(errs, errors.Errorf("field 'expr' must be set in rule")) + } else if _, err := promql.ParseExpr(r.Expr); err != nil { + errs = append(errs, errors.Wrap(err, "could not parse expression")) + } + if r.Record != "" { + if len(r.Annotations) > 0 { + errs = append(errs, errors.Errorf("invalid field 'annotations' in recording rule")) + } + if r.For != 0 { + errs = append(errs, errors.Errorf("invalid field 'for' in recording rule")) + } + if !model.IsValidMetricName(model.LabelValue(r.Record)) { + errs = append(errs, errors.Errorf("invalid recording rule name: %s", r.Record)) + } + } + + for k, v := range r.Labels { + if !model.LabelName(k).IsValid() { + errs = append(errs, errors.Errorf("invalid label name: %s", k)) + } + + if !model.LabelValue(v).IsValid() { + errs = append(errs, errors.Errorf("invalid label value: %s", v)) + } + } + + for k := range r.Annotations { + if !model.LabelName(k).IsValid() { + errs = append(errs, errors.Errorf("invalid annotation name: %s", k)) + } + } + + return append(errs, testTemplateParsing(r)...) +} + +// testTemplateParsing checks if the templates used in labels and annotations +// of the alerting rules are parsed correctly. +func testTemplateParsing(rl *Rule) (errs []error) { + if rl.Alert == "" { + // Not an alerting rule. + return errs + } + + // Trying to parse templates. + tmplData := template.AlertTemplateData(map[string]string{}, map[string]string{}, 0) + defs := []string{ + "{{$labels := .Labels}}", + "{{$externalLabels := .ExternalLabels}}", + "{{$value := .Value}}", + } + parseTest := func(text string) error { + tmpl := template.NewTemplateExpander( + context.TODO(), + strings.Join(append(defs, text), ""), + "__alert_"+rl.Alert, + tmplData, + model.Time(timestamp.FromTime(time.Now())), + nil, + nil, + ) + return tmpl.ParseTest() + } + + // Parsing Labels. + for k, val := range rl.Labels { + err := parseTest(val) + if err != nil { + errs = append(errs, errors.Wrapf(err, "label %q", k)) + } + } + + // Parsing Annotations. + for k, val := range rl.Annotations { + err := parseTest(val) + if err != nil { + errs = append(errs, errors.Wrapf(err, "annotation %q", k)) + } + } + + return errs +} + +// Parse parses and validates a set of rules. +func Parse(content []byte) (*RuleGroups, []error) { + var groups RuleGroups + if err := yaml.UnmarshalStrict(content, &groups); err != nil { + return nil, []error{err} + } + return &groups, groups.Validate() +} + +// ParseFile reads and parses rules from a file. +func ParseFile(file string) (*RuleGroups, []error) { + b, err := ioutil.ReadFile(file) + if err != nil { + return nil, []error{errors.Wrap(err, file)} + } + rgs, errs := Parse(b) + for i := range errs { + errs[i] = errors.Wrap(errs[i], file) + } + return rgs, errs +} diff --git a/pkg/ruler/legacy_rulefmt/rulefmt_test.go b/pkg/ruler/legacy_rulefmt/rulefmt_test.go new file mode 100644 index 00000000000..26151c8c11c --- /dev/null +++ b/pkg/ruler/legacy_rulefmt/rulefmt_test.go @@ -0,0 +1,163 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rulefmt + +import ( + "path/filepath" + "strings" + "testing" + + "github.com/prometheus/prometheus/util/testutil" +) + +func TestParseFileSuccess(t *testing.T) { + if _, errs := ParseFile("testdata/test.yaml"); len(errs) > 0 { + t.Errorf("unexpected errors parsing file") + for _, err := range errs { + t.Error(err) + } + } +} + +func TestParseFileFailure(t *testing.T) { + table := []struct { + filename string + errMsg string + }{ + { + filename: "duplicate_grp.bad.yaml", + errMsg: "groupname: \"yolo\" is repeated in the same file", + }, + { + filename: "bad_expr.bad.yaml", + errMsg: "parse error", + }, + { + filename: "record_and_alert.bad.yaml", + errMsg: "only one of 'record' and 'alert' must be set", + }, + { + filename: "no_rec_alert.bad.yaml", + errMsg: "one of 'record' or 'alert' must be set", + }, + { + filename: "noexpr.bad.yaml", + errMsg: "field 'expr' must be set in rule", + }, + { + filename: "bad_lname.bad.yaml", + errMsg: "invalid label name", + }, + { + filename: "bad_annotation.bad.yaml", + errMsg: "invalid annotation name", + }, + { + filename: "invalid_record_name.bad.yaml", + errMsg: "invalid recording rule name", + }, + } + + for _, c := range table { + _, errs := ParseFile(filepath.Join("testdata", c.filename)) + if errs == nil { + t.Errorf("Expected error parsing %s but got none", c.filename) + continue + } + if !strings.Contains(errs[0].Error(), c.errMsg) { + t.Errorf("Expected error for %s to contain %q but got: %s", c.filename, c.errMsg, errs) + } + } + +} + +func TestTemplateParsing(t *testing.T) { + tests := []struct { + ruleString string + shouldPass bool + }{ + { + ruleString: ` +groups: +- name: example + rules: + - alert: InstanceDown + expr: up == 0 + for: 5m + labels: + severity: "page" + annotations: + summary: "Instance {{ $labels.instance }} down" +`, + shouldPass: true, + }, + { + // `$label` instead of `$labels`. + ruleString: ` +groups: +- name: example + rules: + - alert: InstanceDown + expr: up == 0 + for: 5m + labels: + severity: "page" + annotations: + summary: "Instance {{ $label.instance }} down" +`, + shouldPass: false, + }, + { + // `$this_is_wrong`. + ruleString: ` +groups: +- name: example + rules: + - alert: InstanceDown + expr: up == 0 + for: 5m + labels: + severity: "{{$this_is_wrong}}" + annotations: + summary: "Instance {{ $labels.instance }} down" +`, + shouldPass: false, + }, + { + // `$labels.quantile * 100`. + ruleString: ` +groups: +- name: example + rules: + - alert: InstanceDown + expr: up == 0 + for: 5m + labels: + severity: "page" + annotations: + summary: "Instance {{ $labels.instance }} down" + description: "{{$labels.quantile * 100}}" +`, + shouldPass: false, + }, + } + + for _, tst := range tests { + rgs, errs := Parse([]byte(tst.ruleString)) + testutil.Assert(t, rgs != nil, "Rule parsing, rule=\n"+tst.ruleString) + passed := (tst.shouldPass && len(errs) == 0) || (!tst.shouldPass && len(errs) > 0) + testutil.Assert(t, passed, "Rule validation failed, rule=\n"+tst.ruleString) + } + +} diff --git a/pkg/ruler/legacy_rulefmt/testdata/bad_annotation.bad.yaml b/pkg/ruler/legacy_rulefmt/testdata/bad_annotation.bad.yaml new file mode 100644 index 00000000000..b59c41a6326 --- /dev/null +++ b/pkg/ruler/legacy_rulefmt/testdata/bad_annotation.bad.yaml @@ -0,0 +1,7 @@ +groups: + - name: yolo + rules: + - alert: hola + expr: 1 + annotations: + ins-tance: localhost diff --git a/pkg/ruler/legacy_rulefmt/testdata/bad_expr.bad.yaml b/pkg/ruler/legacy_rulefmt/testdata/bad_expr.bad.yaml new file mode 100644 index 00000000000..f9a029ccfd3 --- /dev/null +++ b/pkg/ruler/legacy_rulefmt/testdata/bad_expr.bad.yaml @@ -0,0 +1,5 @@ +groups: +- name: yolo + rules: + - record: yolo + expr: rate(hi) diff --git a/pkg/ruler/legacy_rulefmt/testdata/bad_lname.bad.yaml b/pkg/ruler/legacy_rulefmt/testdata/bad_lname.bad.yaml new file mode 100644 index 00000000000..7153f3ba50e --- /dev/null +++ b/pkg/ruler/legacy_rulefmt/testdata/bad_lname.bad.yaml @@ -0,0 +1,7 @@ +groups: + - name: yolo + rules: + - record: hola + expr: 1 + labels: + ins-tance: localhost diff --git a/pkg/ruler/legacy_rulefmt/testdata/duplicate_grp.bad.yaml b/pkg/ruler/legacy_rulefmt/testdata/duplicate_grp.bad.yaml new file mode 100644 index 00000000000..97d453429e9 --- /dev/null +++ b/pkg/ruler/legacy_rulefmt/testdata/duplicate_grp.bad.yaml @@ -0,0 +1,3 @@ +groups: +- name: yolo +- name: yolo diff --git a/pkg/ruler/legacy_rulefmt/testdata/invalid_record_name.bad.yaml b/pkg/ruler/legacy_rulefmt/testdata/invalid_record_name.bad.yaml new file mode 100644 index 00000000000..bda5f4970be --- /dev/null +++ b/pkg/ruler/legacy_rulefmt/testdata/invalid_record_name.bad.yaml @@ -0,0 +1,5 @@ +groups: + - name: yolo + rules: + - record: strawberry{flavor="sweet"} + expr: 1 \ No newline at end of file diff --git a/pkg/ruler/legacy_rulefmt/testdata/no_rec_alert.bad.yaml b/pkg/ruler/legacy_rulefmt/testdata/no_rec_alert.bad.yaml new file mode 100644 index 00000000000..64d2e8f20d9 --- /dev/null +++ b/pkg/ruler/legacy_rulefmt/testdata/no_rec_alert.bad.yaml @@ -0,0 +1,4 @@ +groups: + - name: yolo + rules: + - expr: 1 diff --git a/pkg/ruler/legacy_rulefmt/testdata/noexpr.bad.yaml b/pkg/ruler/legacy_rulefmt/testdata/noexpr.bad.yaml new file mode 100644 index 00000000000..ad0c29e4cdd --- /dev/null +++ b/pkg/ruler/legacy_rulefmt/testdata/noexpr.bad.yaml @@ -0,0 +1,4 @@ +groups: + - name: yolo + rules: + - record: ylo diff --git a/pkg/ruler/legacy_rulefmt/testdata/record_and_alert.bad.yaml b/pkg/ruler/legacy_rulefmt/testdata/record_and_alert.bad.yaml new file mode 100644 index 00000000000..0ba81b74234 --- /dev/null +++ b/pkg/ruler/legacy_rulefmt/testdata/record_and_alert.bad.yaml @@ -0,0 +1,6 @@ +groups: +- name: yolo + rules: + - record: Hi + alert: Hello + expr: 1 diff --git a/pkg/ruler/legacy_rulefmt/testdata/test.yaml b/pkg/ruler/legacy_rulefmt/testdata/test.yaml new file mode 100644 index 00000000000..a3127426d8a --- /dev/null +++ b/pkg/ruler/legacy_rulefmt/testdata/test.yaml @@ -0,0 +1,64 @@ +groups: +- name: my-group-name + interval: 30s # defaults to global interval + rules: + - alert: HighErrors + expr: | + sum without(instance) (rate(errors_total[5m])) + / + sum without(instance) (rate(requests_total[5m])) + for: 5m + labels: + severity: critical + annotations: + description: "stuff's happening with {{ $.labels.service }}" + + # Mix recording rules in the same list + - record: "new_metric" + expr: | + sum without(instance) (rate(errors_total[5m])) + / + sum without(instance) (rate(requests_total[5m])) + labels: + abc: edf + uvw: xyz + + - alert: HighErrors + expr: | + sum without(instance) (rate(errors_total[5m])) + / + sum without(instance) (rate(requests_total[5m])) + for: 5m + labels: + severity: critical + annotations: + description: "stuff's happening with {{ $.labels.service }}" + +- name: my-another-name + interval: 30s # defaults to global interval + rules: + - alert: HighErrors + expr: | + sum without(instance) (rate(errors_total[5m])) + / + sum without(instance) (rate(requests_total[5m])) + for: 5m + labels: + severity: critical + + - record: "new_metric" + expr: | + sum without(instance) (rate(errors_total[5m])) + / + sum without(instance) (rate(requests_total[5m])) + + - alert: HighErrors + expr: | + sum without(instance) (rate(errors_total[5m])) + / + sum without(instance) (rate(requests_total[5m])) + for: 5m + labels: + severity: critical + annotations: + description: "stuff's happening with {{ $.labels.service }}" diff --git a/pkg/ruler/lifecycle_test.go b/pkg/ruler/lifecycle_test.go index 60aad08c4ea..62abbbb4d7b 100644 --- a/pkg/ruler/lifecycle_test.go +++ b/pkg/ruler/lifecycle_test.go @@ -18,7 +18,8 @@ func TestRulerShutdown(t *testing.T) { config.Ring.SkipUnregister = false defer cleanup() - r := newTestRuler(t, config) + r, rcleanup := newTestRuler(t, config) + defer rcleanup() // Wait until the tokens are registered in the ring test.Poll(t, 100*time.Millisecond, config.Ring.NumTokens, func() interface{} { @@ -40,7 +41,8 @@ func TestRulerRestart(t *testing.T) { config.EnableSharding = true defer cleanup() - r := newTestRuler(t, config) + r, rcleanup := newTestRuler(t, config) + defer rcleanup() // Wait until the tokens are registered in the ring test.Poll(t, 100*time.Millisecond, config.Ring.NumTokens, func() interface{} { @@ -54,7 +56,8 @@ func TestRulerRestart(t *testing.T) { assert.Equal(t, config.Ring.NumTokens, testutils.NumTokens(config.Ring.KVStore.Mock, "localhost", ring.RulerRingKey)) // Create a new ruler which is expected to pick up tokens from the ring. - r = newTestRuler(t, config) + r, rcleanup = newTestRuler(t, config) + defer rcleanup() defer r.Stop() // Wait until the ruler is ACTIVE in the ring. diff --git a/pkg/ruler/mapper.go b/pkg/ruler/mapper.go index 36d1f60f4b3..de3d100699e 100644 --- a/pkg/ruler/mapper.go +++ b/pkg/ruler/mapper.go @@ -6,9 +6,10 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/spf13/afero" "gopkg.in/yaml.v2" + + legacy_rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt" ) // mapper is designed to enusre the provided rule sets are identical @@ -28,7 +29,7 @@ func newMapper(path string, logger log.Logger) *mapper { } } -func (m *mapper) MapRules(user string, ruleConfigs map[string][]rulefmt.RuleGroup) (bool, []string, error) { +func (m *mapper) MapRules(user string, ruleConfigs map[string][]legacy_rulefmt.RuleGroup) (bool, []string, error) { anyUpdated := false filenames := []string{} @@ -73,12 +74,12 @@ func (m *mapper) MapRules(user string, ruleConfigs map[string][]rulefmt.RuleGrou return anyUpdated, filenames, nil } -func (m *mapper) writeRuleGroupsIfNewer(groups []rulefmt.RuleGroup, filename string) (bool, error) { +func (m *mapper) writeRuleGroupsIfNewer(groups []legacy_rulefmt.RuleGroup, filename string) (bool, error) { sort.Slice(groups, func(i, j int) bool { return groups[i].Name > groups[j].Name }) - rgs := rulefmt.RuleGroups{Groups: groups} + rgs := legacy_rulefmt.RuleGroups{Groups: groups} d, err := yaml.Marshal(&rgs) if err != nil { diff --git a/pkg/ruler/mapper_test.go b/pkg/ruler/mapper_test.go index 31a972e969c..b08f591aa0b 100644 --- a/pkg/ruler/mapper_test.go +++ b/pkg/ruler/mapper_test.go @@ -6,19 +6,20 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/spf13/afero" "github.com/stretchr/testify/require" + + legacy_rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt" ) var ( testUser = "user1" - initialRuleSet = map[string][]rulefmt.RuleGroup{ + initialRuleSet = map[string][]legacy_rulefmt.RuleGroup{ "file_one": { { Name: "rulegroup_one", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -27,7 +28,7 @@ var ( }, { Name: "rulegroup_two", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -37,11 +38,11 @@ var ( }, } - outOfOrderRuleSet = map[string][]rulefmt.RuleGroup{ + outOfOrderRuleSet = map[string][]legacy_rulefmt.RuleGroup{ "file_one": { { Name: "rulegroup_two", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -50,7 +51,7 @@ var ( }, { Name: "rulegroup_one", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -60,11 +61,11 @@ var ( }, } - updatedRuleSet = map[string][]rulefmt.RuleGroup{ + updatedRuleSet = map[string][]legacy_rulefmt.RuleGroup{ "file_one": { { Name: "rulegroup_one", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -73,7 +74,7 @@ var ( }, { Name: "rulegroup_two", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -82,7 +83,7 @@ var ( }, { Name: "rulegroup_three", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -150,11 +151,11 @@ func Test_mapper_MapRules(t *testing.T) { } var ( - twoFilesRuleSet = map[string][]rulefmt.RuleGroup{ + twoFilesRuleSet = map[string][]legacy_rulefmt.RuleGroup{ "file_one": { { Name: "rulegroup_one", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -163,7 +164,7 @@ var ( }, { Name: "rulegroup_two", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -174,7 +175,7 @@ var ( "file_two": { { Name: "rulegroup_one", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -184,11 +185,11 @@ var ( }, } - twoFilesUpdatedRuleSet = map[string][]rulefmt.RuleGroup{ + twoFilesUpdatedRuleSet = map[string][]legacy_rulefmt.RuleGroup{ "file_one": { { Name: "rulegroup_one", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -197,7 +198,7 @@ var ( }, { Name: "rulegroup_two", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -208,7 +209,7 @@ var ( "file_two": { { Name: "rulegroup_one", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_ruleupdated", Expr: "example_exprupdated", @@ -218,11 +219,11 @@ var ( }, } - twoFilesDeletedRuleSet = map[string][]rulefmt.RuleGroup{ + twoFilesDeletedRuleSet = map[string][]legacy_rulefmt.RuleGroup{ "file_one": { { Name: "rulegroup_one", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", @@ -231,7 +232,7 @@ var ( }, { Name: "rulegroup_two", - Rules: []rulefmt.Rule{ + Rules: []legacy_rulefmt.Rule{ { Record: "example_rule", Expr: "example_expr", diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 3ff61f1b527..a679a885b00 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -12,20 +12,21 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/stretchr/testify/require" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/testutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/ruler/rules" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" ) @@ -53,11 +54,19 @@ func defaultRulerConfig(store rules.RuleStore) (Config, func()) { return cfg, cleanup } -func newTestRuler(t *testing.T, cfg Config) *Ruler { +func newTestRuler(t *testing.T, cfg Config) (*Ruler, func()) { + dir, err := ioutil.TempDir("", t.Name()) + testutil.Ok(t, err) + cleanup := func() { + os.RemoveAll(dir) + } + + tracker := promql.NewActiveQueryTracker(dir, 20, util.Logger) + engine := promql.NewEngine(promql.EngineOpts{ - MaxSamples: 1e6, - MaxConcurrent: 20, - Timeout: 2 * time.Minute, + MaxSamples: 1e6, + ActiveQueryTracker: tracker, + Timeout: 2 * time.Minute, }) noopQueryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { @@ -76,7 +85,7 @@ func newTestRuler(t *testing.T, cfg Config) *Ruler { // Ensure all rules are loaded before usage ruler.loadRules(context.Background()) - return ruler + return ruler, cleanup } func TestNotifierSendsUserIDHeader(t *testing.T) { @@ -100,7 +109,8 @@ func TestNotifierSendsUserIDHeader(t *testing.T) { require.NoError(t, err) cfg.AlertmanagerDiscovery = false - r := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg) + defer rcleanup() defer r.Stop() n, err := r.getOrCreateNotifier("1") require.NoError(t, err) @@ -123,7 +133,8 @@ func TestRuler_Rules(t *testing.T) { cfg, cleanup := defaultRulerConfig(newMockRuleStore(mockRules)) defer cleanup() - r := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg) + defer rcleanup() defer r.Stop() // test user1 diff --git a/pkg/ruler/rules/compat.go b/pkg/ruler/rules/compat.go index 693157d15d8..fed3c549fee 100644 --- a/pkg/ruler/rules/compat.go +++ b/pkg/ruler/rules/compat.go @@ -7,11 +7,12 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/rulefmt" + + legacy_rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt" ) // ToProto transforms a formatted prometheus rulegroup to a rule group protobuf -func ToProto(user string, namespace string, rl rulefmt.RuleGroup) *RuleGroupDesc { +func ToProto(user string, namespace string, rl legacy_rulefmt.RuleGroup) *RuleGroupDesc { rg := RuleGroupDesc{ Name: rl.Name, Namespace: namespace, @@ -22,7 +23,7 @@ func ToProto(user string, namespace string, rl rulefmt.RuleGroup) *RuleGroupDesc return &rg } -func formattedRuleToProto(rls []rulefmt.Rule) []*RuleDesc { +func formattedRuleToProto(rls []legacy_rulefmt.Rule) []*RuleDesc { rules := make([]*RuleDesc, len(rls)) for i := range rls { rules[i] = &RuleDesc{ @@ -39,15 +40,15 @@ func formattedRuleToProto(rls []rulefmt.Rule) []*RuleDesc { } // FromProto generates a rulefmt RuleGroup -func FromProto(rg *RuleGroupDesc) rulefmt.RuleGroup { - formattedRuleGroup := rulefmt.RuleGroup{ +func FromProto(rg *RuleGroupDesc) legacy_rulefmt.RuleGroup { + formattedRuleGroup := legacy_rulefmt.RuleGroup{ Name: rg.GetName(), Interval: model.Duration(rg.Interval), - Rules: make([]rulefmt.Rule, len(rg.GetRules())), + Rules: make([]legacy_rulefmt.Rule, len(rg.GetRules())), } for i, rl := range rg.GetRules() { - newRule := rulefmt.Rule{ + newRule := legacy_rulefmt.Rule{ Record: rl.GetRecord(), Alert: rl.GetAlert(), Expr: rl.GetExpr(), diff --git a/pkg/ruler/rules/store.go b/pkg/ruler/rules/store.go index 4e921cee0f1..11584ea5d88 100644 --- a/pkg/ruler/rules/store.go +++ b/pkg/ruler/rules/store.go @@ -7,7 +7,7 @@ import ( "github.com/cortexproject/cortex/pkg/configs" "github.com/cortexproject/cortex/pkg/configs/client" - "github.com/prometheus/prometheus/pkg/rulefmt" + legacy_rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt" ) var ( @@ -29,11 +29,11 @@ type RuleGroupList []*RuleGroupDesc // Formatted returns the rule group list as a set of formatted rule groups mapped // by namespace -func (l RuleGroupList) Formatted() map[string][]rulefmt.RuleGroup { - ruleMap := map[string][]rulefmt.RuleGroup{} +func (l RuleGroupList) Formatted() map[string][]legacy_rulefmt.RuleGroup { + ruleMap := map[string][]legacy_rulefmt.RuleGroup{} for _, g := range l { if _, exists := ruleMap[g.Namespace]; !exists { - ruleMap[g.Namespace] = []rulefmt.RuleGroup{FromProto(g)} + ruleMap[g.Namespace] = []legacy_rulefmt.RuleGroup{FromProto(g)} continue } ruleMap[g.Namespace] = append(ruleMap[g.Namespace], FromProto(g)) diff --git a/vendor/github.com/prometheus/common/route/route.go b/vendor/github.com/prometheus/common/route/route.go index 1bd0a1edd3e..cc05516c801 100644 --- a/vendor/github.com/prometheus/common/route/route.go +++ b/vendor/github.com/prometheus/common/route/route.go @@ -53,6 +53,12 @@ func New() *Router { // WithInstrumentation returns a router with instrumentation support. func (r *Router) WithInstrumentation(instrh func(handlerName string, handler http.HandlerFunc) http.HandlerFunc) *Router { + if r.instrh != nil { + newInstrh := instrh + instrh = func(handlerName string, handler http.HandlerFunc) http.HandlerFunc { + return newInstrh(handlerName, r.instrh(handlerName, handler)) + } + } return &Router{rtr: r.rtr, prefix: r.prefix, instrh: instrh} } diff --git a/vendor/github.com/prometheus/common/version/info.go b/vendor/github.com/prometheus/common/version/info.go index 84489a51044..ac9af1febd9 100644 --- a/vendor/github.com/prometheus/common/version/info.go +++ b/vendor/github.com/prometheus/common/version/info.go @@ -33,9 +33,10 @@ var ( GoVersion = runtime.Version() ) -// NewCollector returns a collector which exports metrics about current version information. -func NewCollector(program string) *prometheus.GaugeVec { - buildInfo := prometheus.NewGaugeVec( +// NewCollector returns a collector that exports metrics about current version +// information. +func NewCollector(program string) prometheus.Collector { + return prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Namespace: program, Name: "build_info", @@ -43,11 +44,15 @@ func NewCollector(program string) *prometheus.GaugeVec { "A metric with a constant '1' value labeled by version, revision, branch, and goversion from which %s was built.", program, ), + ConstLabels: prometheus.Labels{ + "version": Version, + "revision": Revision, + "branch": Branch, + "goversion": GoVersion, + }, }, - []string{"version", "revision", "branch", "goversion"}, + func() float64 { return 1 }, ) - buildInfo.WithLabelValues(Version, Revision, Branch, GoVersion).Set(1) - return buildInfo } // versionInfoTmpl contains the template used by Info. diff --git a/vendor/github.com/prometheus/prometheus/config/config.go b/vendor/github.com/prometheus/prometheus/config/config.go index 794ccc0ee68..11667a8268a 100644 --- a/vendor/github.com/prometheus/prometheus/config/config.go +++ b/vendor/github.com/prometheus/prometheus/config/config.go @@ -299,6 +299,8 @@ type GlobalConfig struct { ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"` // How frequently to evaluate rules by default. EvaluationInterval model.Duration `yaml:"evaluation_interval,omitempty"` + // File to which PromQL queries are logged. + QueryLogFile string `yaml:"query_log_file,omitempty"` // The labels to add to any timeseries that this Prometheus instance scrapes. ExternalLabels labels.Labels `yaml:"external_labels,omitempty"` } @@ -349,7 +351,8 @@ func (c *GlobalConfig) isZero() bool { return c.ExternalLabels == nil && c.ScrapeInterval == 0 && c.ScrapeTimeout == 0 && - c.EvaluationInterval == 0 + c.EvaluationInterval == 0 && + c.QueryLogFile == "" } // ScrapeConfig configures a scraping unit for Prometheus. diff --git a/vendor/github.com/prometheus/prometheus/discovery/manager.go b/vendor/github.com/prometheus/prometheus/discovery/manager.go index 5457bd9b2e7..d135cd54e70 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/manager.go +++ b/vendor/github.com/prometheus/prometheus/discovery/manager.go @@ -304,8 +304,8 @@ func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { } func (m *Manager) allGroups() map[string][]*targetgroup.Group { - m.mtx.Lock() - defer m.mtx.Unlock() + m.mtx.RLock() + defer m.mtx.RUnlock() tSets := map[string][]*targetgroup.Group{} for pkey, tsets := range m.targets { diff --git a/vendor/github.com/prometheus/prometheus/pkg/labels/labels.go b/vendor/github.com/prometheus/prometheus/pkg/labels/labels.go index 3b462e68d71..b919408ed53 100644 --- a/vendor/github.com/prometheus/prometheus/pkg/labels/labels.go +++ b/vendor/github.com/prometheus/prometheus/pkg/labels/labels.go @@ -200,6 +200,20 @@ func (ls Labels) Has(name string) bool { return false } +// HasDuplicateLabels returns whether ls has duplicate label names. +// It assumes that the labelset is sorted. +func (ls Labels) HasDuplicateLabelNames() (string, bool) { + for i, l := range ls { + if i == 0 { + continue + } + if l.Name == ls[i-1].Name { + return l.Name, true + } + } + return "", false +} + // WithoutEmpty returns the labelset without empty labels. // May return the same labelset. func (ls Labels) WithoutEmpty() Labels { diff --git a/vendor/github.com/prometheus/prometheus/pkg/logging/dedupe.go b/vendor/github.com/prometheus/prometheus/pkg/logging/dedupe.go index f040b2f23d6..1d911ca2f6e 100644 --- a/vendor/github.com/prometheus/prometheus/pkg/logging/dedupe.go +++ b/vendor/github.com/prometheus/prometheus/pkg/logging/dedupe.go @@ -10,6 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package logging import ( diff --git a/vendor/github.com/prometheus/prometheus/pkg/logging/file.go b/vendor/github.com/prometheus/prometheus/pkg/logging/file.go new file mode 100644 index 00000000000..be118fad051 --- /dev/null +++ b/vendor/github.com/prometheus/prometheus/pkg/logging/file.go @@ -0,0 +1,62 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logging + +import ( + "os" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" +) + +var ( + timestampFormat = log.TimestampFormat( + func() time.Time { return time.Now().UTC() }, + "2006-01-02T15:04:05.000Z07:00", + ) +) + +// JSONFileLogger represents a logger that writes JSON to a file. +type JSONFileLogger struct { + logger log.Logger + file *os.File +} + +// NewJSONFileLogger returns a new JSONFileLogger. +func NewJSONFileLogger(s string) (*JSONFileLogger, error) { + if s == "" { + return nil, nil + } + + f, err := os.OpenFile(s, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return nil, errors.Wrap(err, "can't create json logger") + } + + return &JSONFileLogger{ + logger: log.With(log.NewJSONLogger(f), "ts", timestampFormat), + file: f, + }, nil +} + +// Close closes the underlying file. +func (l *JSONFileLogger) Close() error { + return l.file.Close() +} + +// Log calls the Log function of the underlying logger. +func (l *JSONFileLogger) Log(i ...interface{}) error { + return l.logger.Log(i...) +} diff --git a/vendor/github.com/prometheus/prometheus/pkg/logging/ratelimit.go b/vendor/github.com/prometheus/prometheus/pkg/logging/ratelimit.go index bf4d9dbda82..d3567eaa0e1 100644 --- a/vendor/github.com/prometheus/prometheus/pkg/logging/ratelimit.go +++ b/vendor/github.com/prometheus/prometheus/pkg/logging/ratelimit.go @@ -10,6 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package logging import ( diff --git a/vendor/github.com/prometheus/prometheus/pkg/rulefmt/rulefmt.go b/vendor/github.com/prometheus/prometheus/pkg/rulefmt/rulefmt.go index 501079862c7..6971657ba89 100644 --- a/vendor/github.com/prometheus/prometheus/pkg/rulefmt/rulefmt.go +++ b/vendor/github.com/prometheus/prometheus/pkg/rulefmt/rulefmt.go @@ -21,7 +21,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" - yaml "gopkg.in/yaml.v2" + yaml "gopkg.in/yaml.v3" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql" @@ -33,11 +33,24 @@ type Error struct { Group string Rule int RuleName string - Err error + Err WrappedError +} + +// WrappedError wraps error with the yaml node which can be used to represent +// the line and column numbers of the error. +type WrappedError struct { + err error + node *yaml.Node + nodeAlt *yaml.Node } func (err *Error) Error() string { - return errors.Wrapf(err.Err, "group %q, rule %d, %q", err.Group, err.Rule, err.RuleName).Error() + if err.Err.nodeAlt != nil { + return errors.Wrapf(err.Err.err, "%d:%d: %d:%d: group %q, rule %d, %q", err.Err.node.Line, err.Err.node.Column, err.Err.nodeAlt.Line, err.Err.nodeAlt.Column, err.Group, err.Rule, err.RuleName).Error() + } else if err.Err.node != nil { + return errors.Wrapf(err.Err.err, "%d:%d: group %q, rule %d, %q", err.Err.node.Line, err.Err.node.Column, err.Group, err.Rule, err.RuleName).Error() + } + return errors.Wrapf(err.Err.err, "group %q, rule %d, %q", err.Group, err.Rule, err.RuleName).Error() } // RuleGroups is a set of rule groups that are typically exposed in a file. @@ -45,28 +58,32 @@ type RuleGroups struct { Groups []RuleGroup `yaml:"groups"` } +type ruleGroups struct { + Groups []yaml.Node `yaml:"groups"` +} + // Validate validates all rules in the rule groups. -func (g *RuleGroups) Validate() (errs []error) { +func (g *RuleGroups) Validate(node ruleGroups) (errs []error) { set := map[string]struct{}{} - for _, g := range g.Groups { + for j, g := range g.Groups { if g.Name == "" { - errs = append(errs, errors.Errorf("Groupname should not be empty")) + errs = append(errs, errors.Errorf("%d:%d: Groupname should not be empty", node.Groups[j].Line, node.Groups[j].Column)) } if _, ok := set[g.Name]; ok { errs = append( errs, - errors.Errorf("groupname: \"%s\" is repeated in the same file", g.Name), + errors.Errorf("%d:%d: groupname: \"%s\" is repeated in the same file", node.Groups[j].Line, node.Groups[j].Column, g.Name), ) } set[g.Name] = struct{}{} for i, r := range g.Rules { - for _, err := range r.Validate() { - var ruleName string - if r.Alert != "" { + for _, node := range r.Validate() { + var ruleName yaml.Node + if r.Alert.Value != "" { ruleName = r.Alert } else { ruleName = r.Record @@ -74,8 +91,8 @@ func (g *RuleGroups) Validate() (errs []error) { errs = append(errs, &Error{ Group: g.Name, Rule: i, - RuleName: ruleName, - Err: err, + RuleName: ruleName.Value, + Err: node, }) } } @@ -88,7 +105,7 @@ func (g *RuleGroups) Validate() (errs []error) { type RuleGroup struct { Name string `yaml:"name"` Interval model.Duration `yaml:"interval,omitempty"` - Rules []Rule `yaml:"rules"` + Rules []RuleNode `yaml:"rules"` } // Rule describes an alerting or recording rule. @@ -101,55 +118,104 @@ type Rule struct { Annotations map[string]string `yaml:"annotations,omitempty"` } +// RuleNode adds yaml.v3 layer to support line and column outputs for invalid rules. +type RuleNode struct { + Record yaml.Node `yaml:"record,omitempty"` + Alert yaml.Node `yaml:"alert,omitempty"` + Expr yaml.Node `yaml:"expr"` + For model.Duration `yaml:"for,omitempty"` + Labels map[string]string `yaml:"labels,omitempty"` + Annotations map[string]string `yaml:"annotations,omitempty"` +} + // Validate the rule and return a list of encountered errors. -func (r *Rule) Validate() (errs []error) { - if r.Record != "" && r.Alert != "" { - errs = append(errs, errors.Errorf("only one of 'record' and 'alert' must be set")) +func (r *RuleNode) Validate() (nodes []WrappedError) { + if r.Record.Value != "" && r.Alert.Value != "" { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("only one of 'record' and 'alert' must be set"), + node: &r.Record, + nodeAlt: &r.Alert, + }) } - if r.Record == "" && r.Alert == "" { - errs = append(errs, errors.Errorf("one of 'record' or 'alert' must be set")) + if r.Record.Value == "" && r.Alert.Value == "" { + if r.Record.Value == "0" { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("one of 'record' or 'alert' must be set"), + node: &r.Alert, + }) + } else { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("one of 'record' or 'alert' must be set"), + node: &r.Record, + }) + } } - if r.Expr == "" { - errs = append(errs, errors.Errorf("field 'expr' must be set in rule")) - } else if _, err := promql.ParseExpr(r.Expr); err != nil { - errs = append(errs, errors.Wrap(err, "could not parse expression")) + if r.Expr.Value == "" { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("field 'expr' must be set in rule"), + node: &r.Expr, + }) + } else if _, err := promql.ParseExpr(r.Expr.Value); err != nil { + nodes = append(nodes, WrappedError{ + err: errors.Wrapf(err, "could not parse expression"), + node: &r.Expr, + }) } - if r.Record != "" { + if r.Record.Value != "" { if len(r.Annotations) > 0 { - errs = append(errs, errors.Errorf("invalid field 'annotations' in recording rule")) + nodes = append(nodes, WrappedError{ + err: errors.Errorf("invalid field 'annotations' in recording rule"), + node: &r.Record, + }) } if r.For != 0 { - errs = append(errs, errors.Errorf("invalid field 'for' in recording rule")) + nodes = append(nodes, WrappedError{ + err: errors.Errorf("invalid field 'for' in recording rule"), + node: &r.Record, + }) } - if !model.IsValidMetricName(model.LabelValue(r.Record)) { - errs = append(errs, errors.Errorf("invalid recording rule name: %s", r.Record)) + if !model.IsValidMetricName(model.LabelValue(r.Record.Value)) { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("invalid recording rule name: %s", r.Record.Value), + node: &r.Record, + }) } } for k, v := range r.Labels { if !model.LabelName(k).IsValid() { - errs = append(errs, errors.Errorf("invalid label name: %s", k)) + nodes = append(nodes, WrappedError{ + err: errors.Errorf("invalid label name: %s", k), + }) } if !model.LabelValue(v).IsValid() { - errs = append(errs, errors.Errorf("invalid label value: %s", v)) + nodes = append(nodes, WrappedError{ + err: errors.Errorf("invalid label value: %s", v), + }) } } for k := range r.Annotations { if !model.LabelName(k).IsValid() { - errs = append(errs, errors.Errorf("invalid annotation name: %s", k)) + nodes = append(nodes, WrappedError{ + err: errors.Errorf("invalid annotation name: %s", k), + }) } } - return append(errs, testTemplateParsing(r)...) + for _, err := range testTemplateParsing(r) { + nodes = append(nodes, WrappedError{err: err}) + } + + return } // testTemplateParsing checks if the templates used in labels and annotations // of the alerting rules are parsed correctly. -func testTemplateParsing(rl *Rule) (errs []error) { - if rl.Alert == "" { +func testTemplateParsing(rl *RuleNode) (errs []error) { + if rl.Alert.Value == "" { // Not an alerting rule. return errs } @@ -165,7 +231,7 @@ func testTemplateParsing(rl *Rule) (errs []error) { tmpl := template.NewTemplateExpander( context.TODO(), strings.Join(append(defs, text), ""), - "__alert_"+rl.Alert, + "__alert_"+rl.Alert.Value, tmplData, model.Time(timestamp.FromTime(time.Now())), nil, @@ -195,11 +261,16 @@ func testTemplateParsing(rl *Rule) (errs []error) { // Parse parses and validates a set of rules. func Parse(content []byte) (*RuleGroups, []error) { - var groups RuleGroups - if err := yaml.UnmarshalStrict(content, &groups); err != nil { + var ( + groups RuleGroups + node ruleGroups + ) + err := yaml.Unmarshal(content, &groups) + _err := yaml.Unmarshal(content, &node) + if err != nil || _err != nil { return nil, []error{err} } - return &groups, groups.Validate() + return &groups, groups.Validate(node) } // ParseFile reads and parses rules from a file. diff --git a/vendor/github.com/prometheus/prometheus/pkg/textparse/openmetricsparse.go b/vendor/github.com/prometheus/prometheus/pkg/textparse/openmetricsparse.go index 647223a3b95..35ef1d5c466 100644 --- a/vendor/github.com/prometheus/prometheus/pkg/textparse/openmetricsparse.go +++ b/vendor/github.com/prometheus/prometheus/pkg/textparse/openmetricsparse.go @@ -90,7 +90,6 @@ type OpenMetricsParser struct { hasTS bool start int offsets []int - prev token eOffsets []int exemplar []byte @@ -233,19 +232,14 @@ func (p *OpenMetricsParser) Next() (Entry, error) { p.exemplarVal = 0 p.hasExemplarTs = false - t := p.nextToken() - defer func() { p.prev = t }() - switch t { + switch t := p.nextToken(); t { case tEofWord: if t := p.nextToken(); t != tEOF { return EntryInvalid, errors.New("unexpected data after # EOF") } return EntryInvalid, io.EOF case tEOF: - if p.prev != tEofWord { - return EntryInvalid, errors.New("data does not end with # EOF") - } - return EntryInvalid, io.EOF + return EntryInvalid, errors.New("data does not end with # EOF") case tHelp, tType, tUnit: switch t := p.nextToken(); t { case tMName: diff --git a/vendor/github.com/prometheus/prometheus/prompb/remote.pb.go b/vendor/github.com/prometheus/prometheus/prompb/remote.pb.go index 5dcb254f2c7..56fd03e9477 100644 --- a/vendor/github.com/prometheus/prometheus/prompb/remote.pb.go +++ b/vendor/github.com/prometheus/prometheus/prompb/remote.pb.go @@ -340,7 +340,7 @@ func (m *QueryResult) GetTimeseries() []*TimeSeries { // ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS. // We strictly stream full series after series, optionally split by time. This means that a single frame can contain // partition of the single series, but once a new series is started to be streamed it means that no more chunks will -// be sent for previous one. +// be sent for previous one. Series are returned sorted in the same way TSDB block are internally. type ChunkedReadResponse struct { ChunkedSeries []*ChunkedSeries `protobuf:"bytes,1,rep,name=chunked_series,json=chunkedSeries,proto3" json:"chunked_series,omitempty"` // query_index represents an index of the query from ReadRequest.queries these chunks relates to. diff --git a/vendor/github.com/prometheus/prometheus/prompb/remote.proto b/vendor/github.com/prometheus/prometheus/prompb/remote.proto index da2b06f2977..ecd8f0bb198 100644 --- a/vendor/github.com/prometheus/prometheus/prompb/remote.proto +++ b/vendor/github.com/prometheus/prometheus/prompb/remote.proto @@ -73,7 +73,7 @@ message QueryResult { // ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS. // We strictly stream full series after series, optionally split by time. This means that a single frame can contain // partition of the single series, but once a new series is started to be streamed it means that no more chunks will -// be sent for previous one. +// be sent for previous one. Series are returned sorted in the same way TSDB block are internally. message ChunkedReadResponse { repeated prometheus.ChunkedSeries chunked_series = 1; diff --git a/vendor/github.com/prometheus/prometheus/promql/ast.go b/vendor/github.com/prometheus/prometheus/promql/ast.go index 973971d2929..803c9892eb1 100644 --- a/vendor/github.com/prometheus/prometheus/promql/ast.go +++ b/vendor/github.com/prometheus/prometheus/promql/ast.go @@ -39,6 +39,9 @@ type Node interface { // String representation of the node that returns the given node when parsed // as part of a valid query. String() string + + // PositionRange returns the position of the AST Node in the query string. + PositionRange() PositionRange } // Statement is a generic interface for all statements. @@ -84,6 +87,7 @@ type AggregateExpr struct { Param Expr // Parameter used by some aggregators. Grouping []string // The labels by which to group the Vector. Without bool // Whether to drop the given labels rather than keep them. + PosRange PositionRange } // BinaryExpr represents a binary expression between two child expressions. @@ -103,18 +107,18 @@ type BinaryExpr struct { type Call struct { Func *Function // The function that was called. Args Expressions // Arguments used in the call. + + PosRange PositionRange } // MatrixSelector represents a Matrix selection. type MatrixSelector struct { - Name string - Range time.Duration - Offset time.Duration - LabelMatchers []*labels.Matcher + // It is safe to assume that this is an VectorSelector + // if the parser hasn't returned an error. + VectorSelector Expr + Range time.Duration - // The unexpanded seriesSet populated at query preparation time. - unexpandedSeriesSet storage.SeriesSet - series []storage.Series + EndPos Pos } // SubqueryExpr represents a subquery. @@ -123,22 +127,28 @@ type SubqueryExpr struct { Range time.Duration Offset time.Duration Step time.Duration + + EndPos Pos } // NumberLiteral represents a number. type NumberLiteral struct { Val float64 + + PosRange PositionRange } // ParenExpr wraps an expression so it cannot be disassembled as a consequence // of operator precedence. type ParenExpr struct { - Expr Expr + Expr Expr + PosRange PositionRange } // StringLiteral represents a string. type StringLiteral struct { - Val string + Val string + PosRange PositionRange } // UnaryExpr represents a unary operation on another expression. @@ -146,6 +156,8 @@ type StringLiteral struct { type UnaryExpr struct { Op ItemType Expr Expr + + StartPos Pos } // VectorSelector represents a Vector selection. @@ -157,6 +169,8 @@ type VectorSelector struct { // The unexpanded seriesSet populated at query preparation time. unexpandedSeriesSet storage.SeriesSet series []storage.Series + + PosRange PositionRange } func (e *AggregateExpr) Type() ValueType { return ValueTypeVector } @@ -316,10 +330,91 @@ func Children(node Node) []Node { return []Node{n.Expr} case *UnaryExpr: return []Node{n.Expr} - case *MatrixSelector, *NumberLiteral, *StringLiteral, *VectorSelector: + case *MatrixSelector: + return []Node{n.VectorSelector} + case *NumberLiteral, *StringLiteral, *VectorSelector: // nothing to do return []Node{} default: panic(errors.Errorf("promql.Children: unhandled node type %T", node)) } } + +// PositionRange describes a position in the input string of the parser. +type PositionRange struct { + Start Pos + End Pos +} + +// mergeRanges is a helper function to merge the PositionRanges of two Nodes. +// Note that the arguments must be in the same order as they +// occur in the input string. +func mergeRanges(first Node, last Node) PositionRange { + return PositionRange{ + Start: first.PositionRange().Start, + End: last.PositionRange().End, + } +} + +// Item implements the Node interface. +// This makes it possible to call mergeRanges on them. +func (i *Item) PositionRange() PositionRange { + return PositionRange{ + Start: i.Pos, + End: i.Pos + Pos(len(i.Val)), + } +} + +func (e *AggregateExpr) PositionRange() PositionRange { + return e.PosRange +} +func (e *BinaryExpr) PositionRange() PositionRange { + return mergeRanges(e.LHS, e.RHS) +} +func (e *Call) PositionRange() PositionRange { + return e.PosRange +} +func (e *EvalStmt) PositionRange() PositionRange { + return e.Expr.PositionRange() +} +func (e Expressions) PositionRange() PositionRange { + if len(e) == 0 { + // Position undefined. + return PositionRange{ + Start: -1, + End: -1, + } + } else { + return mergeRanges(e[0], e[len(e)-1]) + } +} +func (e *MatrixSelector) PositionRange() PositionRange { + return PositionRange{ + Start: e.VectorSelector.PositionRange().Start, + End: e.EndPos, + } +} +func (e *SubqueryExpr) PositionRange() PositionRange { + return PositionRange{ + Start: e.Expr.PositionRange().Start, + End: e.EndPos, + } +} +func (e *NumberLiteral) PositionRange() PositionRange { + return e.PosRange +} +func (e *ParenExpr) PositionRange() PositionRange { + return e.PosRange +} +func (e *StringLiteral) PositionRange() PositionRange { + return e.PosRange +} +func (e *UnaryExpr) PositionRange() PositionRange { + return PositionRange{ + Start: e.StartPos, + End: e.Expr.PositionRange().End, + } +} +func (e *VectorSelector) PositionRange() PositionRange { + return e.PosRange +} diff --git a/vendor/github.com/prometheus/prometheus/promql/engine.go b/vendor/github.com/prometheus/prometheus/promql/engine.go index 6afb3d4fbde..a3321fc87b5 100644 --- a/vendor/github.com/prometheus/prometheus/promql/engine.go +++ b/vendor/github.com/prometheus/prometheus/promql/engine.go @@ -33,7 +33,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" @@ -76,6 +75,8 @@ func GetDefaultEvaluationInterval() int64 { type engineMetrics struct { currentQueries prometheus.Gauge maxConcurrentQueries prometheus.Gauge + queryLogEnabled prometheus.Gauge + queryLogFailures prometheus.Counter queryQueueTime prometheus.Summary queryPrepareTime prometheus.Summary queryInnerEval prometheus.Summary @@ -112,6 +113,13 @@ func (e ErrStorage) Error() string { return e.Err.Error() } +// QueryLogger is an interface that can be used to log all the queries logged +// by the engine. +type QueryLogger interface { + Log(...interface{}) error + Close() error +} + // A Query is derived from an a raw query string and can be run against an engine // it is associated with. type Query interface { @@ -146,6 +154,10 @@ type query struct { ng *Engine } +type queryCtx int + +var queryOrigin queryCtx + // Statement implements the Query interface. func (q *query) Statement() Statement { return q.stmt @@ -176,19 +188,9 @@ func (q *query) Exec(ctx context.Context) *Result { span.SetTag(queryTag, q.stmt.String()) } - // Log query in active log. - var queryIndex int - if q.ng.activeQueryTracker != nil { - queryIndex = q.ng.activeQueryTracker.Insert(q.q) - } - // Exec query. res, warnings, err := q.ng.exec(ctx, q) - // Delete query from active log. - if q.ng.activeQueryTracker != nil { - q.ng.activeQueryTracker.Delete(queryIndex) - } return &Result{Err: err, Value: res, Warnings: warnings} } @@ -215,7 +217,6 @@ func contextErr(err error, env string) error { type EngineOpts struct { Logger log.Logger Reg prometheus.Registerer - MaxConcurrent int MaxSamples int Timeout time.Duration ActiveQueryTracker *ActiveQueryTracker @@ -227,9 +228,10 @@ type Engine struct { logger log.Logger metrics *engineMetrics timeout time.Duration - gate *gate.Gate maxSamplesPerQuery int activeQueryTracker *ActiveQueryTracker + queryLogger QueryLogger + queryLoggerLock sync.RWMutex } // NewEngine returns a new engine. @@ -245,6 +247,18 @@ func NewEngine(opts EngineOpts) *Engine { Name: "queries", Help: "The current number of queries being executed or waiting.", }), + queryLogEnabled: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "query_log_enabled", + Help: "State of the query log.", + }), + queryLogFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "query_log_failures_total", + Help: "The number of query log failures.", + }), maxConcurrentQueries: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -284,12 +298,19 @@ func NewEngine(opts EngineOpts) *Engine { Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }), } - metrics.maxConcurrentQueries.Set(float64(opts.MaxConcurrent)) + + if t := opts.ActiveQueryTracker; t != nil { + metrics.maxConcurrentQueries.Set(float64(t.GetMaxConcurrent())) + } else { + metrics.maxConcurrentQueries.Set(-1) + } if opts.Reg != nil { opts.Reg.MustRegister( metrics.currentQueries, metrics.maxConcurrentQueries, + metrics.queryLogEnabled, + metrics.queryLogFailures, metrics.queryQueueTime, metrics.queryPrepareTime, metrics.queryInnerEval, @@ -298,7 +319,6 @@ func NewEngine(opts EngineOpts) *Engine { } return &Engine{ - gate: gate.New(opts.MaxConcurrent), timeout: opts.Timeout, logger: opts.Logger, metrics: metrics, @@ -307,6 +327,29 @@ func NewEngine(opts EngineOpts) *Engine { } } +// SetQueryLogger sets the query logger. +func (ng *Engine) SetQueryLogger(l QueryLogger) { + ng.queryLoggerLock.Lock() + defer ng.queryLoggerLock.Unlock() + + if ng.queryLogger != nil { + // An error closing the old file descriptor should + // not make reload fail; only log a warning. + err := ng.queryLogger.Close() + if err != nil { + level.Warn(ng.logger).Log("msg", "error while closing the previous query log file", "err", err) + } + } + + ng.queryLogger = l + + if l != nil { + ng.metrics.queryLogEnabled.Set(1) + } else { + ng.metrics.queryLogEnabled.Set(0) + } +} + // NewInstantQuery returns an evaluation query for the given expression at the given time. func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time) (Query, error) { expr, err := ParseExpr(qs) @@ -358,6 +401,13 @@ type testStmt func(context.Context) error func (testStmt) String() string { return "test statement" } func (testStmt) stmt() {} +func (testStmt) PositionRange() PositionRange { + return PositionRange{ + Start: -1, + End: -1, + } +} + func (ng *Engine) newTestQuery(f func(context.Context) error) Query { qry := &query{ q: "test statement", @@ -372,23 +422,56 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query { // // At this point per query only one EvalStmt is evaluated. Alert and record // statements are not handled by the Engine. -func (ng *Engine) exec(ctx context.Context, q *query) (Value, storage.Warnings, error) { +func (ng *Engine) exec(ctx context.Context, q *query) (v Value, w storage.Warnings, err error) { ng.metrics.currentQueries.Inc() defer ng.metrics.currentQueries.Dec() ctx, cancel := context.WithTimeout(ctx, ng.timeout) q.cancel = cancel + defer func() { + ng.queryLoggerLock.RLock() + if l := ng.queryLogger; l != nil { + params := make(map[string]interface{}, 4) + params["query"] = q.q + if eq, ok := q.Statement().(*EvalStmt); ok { + params["start"] = formatDate(eq.Start) + params["end"] = formatDate(eq.End) + // The step provided by the user is in seconds. + params["step"] = int64(eq.Interval / (time.Second / time.Nanosecond)) + } + f := []interface{}{"params", params} + if err != nil { + f = append(f, "error", err) + } + f = append(f, "stats", stats.NewQueryStats(q.Stats())) + if origin := ctx.Value(queryOrigin); origin != nil { + for k, v := range origin.(map[string]interface{}) { + f = append(f, k, v) + } + } + if err := l.Log(f...); err != nil { + ng.metrics.queryLogFailures.Inc() + level.Error(ng.logger).Log("msg", "can't log query", "err", err) + } + } + ng.queryLoggerLock.RUnlock() + }() + execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime) defer execSpanTimer.Finish() queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime) - - if err := ng.gate.Start(ctx); err != nil { - return nil, nil, contextErr(err, "query queue") + // Log query in active log. The active log guarantees that we don't run over + // MaxConcurrent queries. + if ng.activeQueryTracker != nil { + queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q) + if err != nil { + queueSpanTimer.Finish() + return nil, nil, contextErr(err, "query queue") + } + defer ng.activeQueryTracker.Delete(queryIndex) } - defer ng.gate.Done() - queueSpanTimer.Finish() // Cancel when execution is done or an error was raised. @@ -452,6 +535,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( defaultEvalInterval: GetDefaultEvaluationInterval(), logger: ng.logger, } + val, err := evaluator.Eval(s.Expr) if err != nil { return nil, warnings, err @@ -459,10 +543,17 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( evalSpanTimer.Finish() - mat, ok := val.(Matrix) - if !ok { + var mat Matrix + + switch result := val.(type) { + case Matrix: + mat = result + case String: + return result, warnings, nil + default: panic(errors.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) } + query.matrix = mat switch s.Expr.Type() { case ValueTypeVector: @@ -510,7 +601,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return nil, warnings, err } - // TODO(fabxc): order ensured by storage? // TODO(fabxc): where to ensure metric labels are a copy from the storage internals. sortSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.ResultSortTime, ng.metrics.queryResultSort) sort.Sort(mat) @@ -547,8 +637,8 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev if maxOffset < n.Range+subqOffset { maxOffset = n.Range + subqOffset } - if n.Offset+n.Range+subqOffset > maxOffset { - maxOffset = n.Offset + n.Range + subqOffset + if m := n.VectorSelector.(*VectorSelector).Offset + n.Range + subqOffset; m > maxOffset { + maxOffset = m } } return nil @@ -563,6 +653,11 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev var warnings storage.Warnings + // Whenever a MatrixSelector is evaluated this variable is set to the corresponding range. + // The evaluation of the VectorSelector inside then evaluates the given range and unsets + // the variable. + var evalRange time.Duration + Inspect(s.Expr, func(node Node, path []Node) error { var set storage.SeriesSet var wrn storage.Warnings @@ -582,7 +677,16 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev switch n := node.(type) { case *VectorSelector: - params.Start = params.Start - durationMilliseconds(LookbackDelta) + if evalRange == 0 { + params.Start = params.Start - durationMilliseconds(LookbackDelta) + } else { + params.Range = durationMilliseconds(evalRange) + // For all matrix queries we want to ensure that we have (end-start) + range selected + // this way we have `range` data before the start time + params.Start = params.Start - durationMilliseconds(evalRange) + evalRange = 0 + } + params.Func = extractFuncFromPath(path) params.By, params.Grouping = extractGroupsFromPath(path) if n.Offset > 0 { @@ -600,24 +704,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev n.unexpandedSeriesSet = set case *MatrixSelector: - params.Func = extractFuncFromPath(path) - params.Range = durationMilliseconds(n.Range) - // For all matrix queries we want to ensure that we have (end-start) + range selected - // this way we have `range` data before the start time - params.Start = params.Start - durationMilliseconds(n.Range) - if n.Offset > 0 { - offsetMilliseconds := durationMilliseconds(n.Offset) - params.Start = params.Start - offsetMilliseconds - params.End = params.End - offsetMilliseconds - } - - set, wrn, err = querier.Select(params, n.LabelMatchers...) - warnings = append(warnings, wrn...) - if err != nil { - level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) - return err - } - n.unexpandedSeriesSet = set + evalRange = n.Range } return nil }) @@ -658,14 +745,7 @@ func extractGroupsFromPath(p []Node) (bool, []string) { func checkForSeriesSetExpansion(ctx context.Context, expr Expr) { switch e := expr.(type) { case *MatrixSelector: - if e.series == nil { - series, err := expandSeriesSet(ctx, e.unexpandedSeriesSet) - if err != nil { - panic(err) - } else { - e.series = series - } - } + checkForSeriesSetExpansion(ctx, e.VectorSelector) case *VectorSelector: if e.series == nil { series, err := expandSeriesSet(ctx, e.unexpandedSeriesSet) @@ -923,13 +1003,16 @@ func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs .. // evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set. func (ev *evaluator) evalSubquery(subq *SubqueryExpr) *MatrixSelector { val := ev.eval(subq).(Matrix) - ms := &MatrixSelector{ - Range: subq.Range, + vs := &VectorSelector{ Offset: subq.Offset, series: make([]storage.Series, 0, len(val)), } + ms := &MatrixSelector{ + Range: subq.Range, + VectorSelector: vs, + } for _, s := range val { - ms.series = append(ms.series, NewStorageSeries(s)) + vs.series = append(vs.series, NewStorageSeries(s)) } return ms } @@ -945,6 +1028,7 @@ func (ev *evaluator) eval(expr Expr) Value { switch e := expr.(type) { case *AggregateExpr: + unwrapParenExpr(&e.Param) if s, ok := e.Param.(*StringLiteral); ok { return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { return ev.aggregation(e.Op, e.Grouping, e.Without, s.Val, v[0].(Vector), enh) @@ -974,7 +1058,9 @@ func (ev *evaluator) eval(expr Expr) Value { // Check if the function has a matrix argument. var matrixArgIndex int var matrixArg bool - for i, a := range e.Args { + for i := range e.Args { + unwrapParenExpr(&e.Args[i]) + a := e.Args[i] if _, ok := a.(*MatrixSelector); ok { matrixArgIndex = i matrixArg = true @@ -1009,9 +1095,11 @@ func (ev *evaluator) eval(expr Expr) Value { } sel := e.Args[matrixArgIndex].(*MatrixSelector) + selVS := sel.VectorSelector.(*VectorSelector) + checkForSeriesSetExpansion(ev.ctx, sel) - mat := make(Matrix, 0, len(sel.series)) // Output matrix. - offset := durationMilliseconds(sel.Offset) + mat := make(Matrix, 0, len(selVS.series)) // Output matrix. + offset := durationMilliseconds(selVS.Offset) selRange := durationMilliseconds(sel.Range) stepRange := selRange if stepRange > ev.interval { @@ -1024,17 +1112,17 @@ func (ev *evaluator) eval(expr Expr) Value { enh := &EvalNodeHelper{out: make(Vector, 0, 1)} // Process all the calls for one time series at a time. it := storage.NewBuffer(selRange) - for i, s := range sel.series { + for i, s := range selVS.series { points = points[:0] it.Reset(s.Iterator()) ss := Series{ // For all range vector functions, the only change to the // output labels is dropping the metric name so just do // it once here. - Metric: dropMetricName(sel.series[i].Labels()), + Metric: dropMetricName(selVS.series[i].Labels()), Points: getPointSlice(numSteps), } - inMatrix[0].Metric = sel.series[i].Labels() + inMatrix[0].Metric = selVS.series[i].Labels() for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { step++ // Set the non-matrix arguments. @@ -1247,6 +1335,8 @@ func (ev *evaluator) eval(expr Expr) Value { res := newEv.eval(e.Expr) ev.currentSamples = newEv.currentSamples return res + case *StringLiteral: + return String{V: e.Val, T: ev.startTimestamp} } panic(errors.Errorf("unhandled expression of type: %T", expr)) @@ -1332,21 +1422,25 @@ func putPointSlice(p []Point) { func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { checkForSeriesSetExpansion(ev.ctx, node) + vs := node.VectorSelector.(*VectorSelector) + var ( - offset = durationMilliseconds(node.Offset) + offset = durationMilliseconds(vs.Offset) maxt = ev.startTimestamp - offset mint = maxt - durationMilliseconds(node.Range) - matrix = make(Matrix, 0, len(node.series)) + matrix = make(Matrix, 0, len(vs.series)) ) it := storage.NewBuffer(durationMilliseconds(node.Range)) - for i, s := range node.series { + series := vs.series + + for i, s := range series { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) } it.Reset(s.Iterator()) ss := Series{ - Metric: node.series[i].Labels(), + Metric: series[i].Labels(), } ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16)) @@ -2000,6 +2094,11 @@ func shouldDropMetricName(op ItemType) bool { } } +// NewOriginContext returns a new context with data about the origin attached. +func NewOriginContext(ctx context.Context, data map[string]interface{}) context.Context { + return context.WithValue(ctx, queryOrigin, data) +} + // documentedType returns the internal type to the equivalent // user facing terminology as defined in the documentation. func documentedType(t ValueType) string { @@ -2012,3 +2111,18 @@ func documentedType(t ValueType) string { return string(t) } } + +func formatDate(t time.Time) string { + return t.UTC().Format("2006-01-02T15:04:05.000Z07:00") +} + +// unwrapParenExpr does the AST equivalent of removing parentheses around a expression. +func unwrapParenExpr(e *Expr) { + for { + if p, ok := (*e).(*ParenExpr); ok { + *e = p.Expr + } else { + break + } + } +} diff --git a/vendor/github.com/prometheus/prometheus/promql/functions.go b/vendor/github.com/prometheus/prometheus/promql/functions.go index 1475bb73869..4765965ab3e 100644 --- a/vendor/github.com/prometheus/prometheus/promql/functions.go +++ b/vendor/github.com/prometheus/prometheus/promql/functions.go @@ -64,11 +64,12 @@ func funcTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { // the result as either per-second (if isRate is true) or overall. func extrapolatedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector { ms := args[0].(*MatrixSelector) + vs := ms.VectorSelector.(*VectorSelector) var ( samples = vals[0].(Matrix)[0] - rangeStart = enh.ts - durationMilliseconds(ms.Range+ms.Offset) - rangeEnd = enh.ts - durationMilliseconds(ms.Offset) + rangeStart = enh.ts - durationMilliseconds(ms.Range+vs.Offset) + rangeEnd = enh.ts - durationMilliseconds(vs.Offset) ) // No sense in trying to compute a rate without at least two points. Drop @@ -1243,7 +1244,7 @@ func createLabelsForAbsentFunction(expr Expr) labels.Labels { case *VectorSelector: lm = n.LabelMatchers case *MatrixSelector: - lm = n.LabelMatchers + lm = n.VectorSelector.(*VectorSelector).LabelMatchers default: return m } diff --git a/vendor/github.com/prometheus/prometheus/promql/generated_parser.y b/vendor/github.com/prometheus/prometheus/promql/generated_parser.y index aae970d799b..0cb98df7553 100644 --- a/vendor/github.com/prometheus/prometheus/promql/generated_parser.y +++ b/vendor/github.com/prometheus/prometheus/promql/generated_parser.y @@ -18,6 +18,7 @@ import ( "math" "sort" "strconv" + "time" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/value" @@ -35,209 +36,265 @@ import ( series []sequenceValue uint uint64 float float64 + duration time.Duration } -%token ERROR -%token EOF -%token COMMENT -%token IDENTIFIER -%token METRIC_IDENTIFIER -%token LEFT_PAREN -%token RIGHT_PAREN -%token LEFT_BRACE -%token RIGHT_BRACE -%token LEFT_BRACKET -%token RIGHT_BRACKET -%token COMMA -%token ASSIGN -%token COLON -%token SEMICOLON -%token STRING -%token NUMBER -%token DURATION -%token BLANK -%token TIMES -%token SPACE +%token +ASSIGN +BLANK +COLON +COMMA +COMMENT +DURATION +EOF +ERROR +IDENTIFIER +LEFT_BRACE +LEFT_BRACKET +LEFT_PAREN +METRIC_IDENTIFIER +NUMBER +RIGHT_BRACE +RIGHT_BRACKET +RIGHT_PAREN +SEMICOLON +SPACE +STRING +TIMES -%token operatorsStart // Operators. -%token SUB -%token ADD -%token MUL -%token MOD -%token DIV -%token LAND -%token LOR -%token LUNLESS -%token EQL -%token NEQ -%token LTE -%token LSS -%token GTE -%token GTR -%token EQL_REGEX -%token NEQ_REGEX -%token POW +%token operatorsStart +%token +ADD +DIV +EQL +EQL_REGEX +GTE +GTR +LAND +LOR +LSS +LTE +LUNLESS +MOD +MUL +NEQ +NEQ_REGEX +POW +SUB %token operatorsEnd -%token aggregatorsStart // Aggregators. -%token AVG -%token COUNT -%token SUM -%token MIN -%token MAX -%token STDDEV -%token STDVAR -%token TOPK -%token BOTTOMK -%token COUNT_VALUES -%token QUANTILE +%token aggregatorsStart +%token +AVG +BOTTOMK +COUNT +COUNT_VALUES +MAX +MIN +QUANTILE +STDDEV +STDVAR +SUM +TOPK %token aggregatorsEnd -%token keywordsStart // Keywords. -%token OFFSET -%token BY -%token WITHOUT -%token ON -%token IGNORING -%token GROUP_LEFT -%token GROUP_RIGHT -%token BOOL - +%token keywordsStart +%token +BOOL +BY +GROUP_LEFT +GROUP_RIGHT +IGNORING +OFFSET +ON +WITHOUT %token keywordsEnd -%token startSymbolsStart // Start symbols for the generated parser. -%token START_LABELS -%token START_METRIC -%token START_GROUPING_LABELS -%token START_SERIES_DESCRIPTION +%token startSymbolsStart +%token +START_METRIC +START_SERIES_DESCRIPTION +START_EXPRESSION +START_METRIC_SELECTOR %token startSymbolsEnd -%type label_matchers label_match_list + +// Type definitions for grammar rules. +%type label_match_list %type label_matcher -%type match_op metric_identifier grouping_label maybe_label +%type aggregate_op grouping_label match_op maybe_label metric_identifier unary_op -%type label_set_list label_set metric -%type