Skip to content

Commit 311a30f

Browse files
committed
Add an experimental flag to enable experimental promQL functions
Signed-off-by: SungJin1212 <[email protected]>
1 parent fb0561e commit 311a30f

File tree

12 files changed

+181
-9
lines changed

12 files changed

+181
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1717
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
1818
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
19+
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
1920
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
2021
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
2122
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255

docs/blocks-storage/querier.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,10 @@ querier:
253253
# evaluation like at Query Frontend or Ruler.
254254
# CLI flag: -querier.ignore-max-query-length
255255
[ignore_max_query_length: <boolean> | default = false]
256+
257+
# [Experimental] If true, experimental promQL functions are enabled.
258+
# CLI flag: -querier.enable-promql-experimental-functions
259+
[enable_promql_experimental_functions: <boolean> | default = false]
256260
```
257261
258262
### `blocks_storage_config`

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3997,6 +3997,10 @@ store_gateway_client:
39973997
# like at Query Frontend or Ruler.
39983998
# CLI flag: -querier.ignore-max-query-length
39993999
[ignore_max_query_length: <boolean> | default = false]
4000+
4001+
# [Experimental] If true, experimental promQL functions are enabled.
4002+
# CLI flag: -querier.enable-promql-experimental-functions
4003+
[enable_promql_experimental_functions: <boolean> | default = false]
40004004
```
40014005
40024006
### `query_frontend_config`

integration/query_fuzz_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,123 @@ func init() {
5252
}
5353
}
5454

55+
func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) {
56+
prometheusLatestImage := "quay.io/prometheus/prometheus:v2.55.1"
57+
s, err := e2e.NewScenario(networkName)
58+
require.NoError(t, err)
59+
defer s.Close()
60+
61+
// Start dependencies.
62+
consul := e2edb.NewConsulWithName("consul")
63+
require.NoError(t, s.StartAndWaitReady(consul))
64+
65+
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
66+
flags := mergeFlags(
67+
baseFlags,
68+
map[string]string{
69+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
70+
"-blocks-storage.tsdb.block-ranges-period": "2h",
71+
"-blocks-storage.tsdb.ship-interval": "1h",
72+
"-blocks-storage.bucket-store.sync-interval": "1s",
73+
"-blocks-storage.tsdb.retention-period": "24h",
74+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
75+
"-querier.query-store-for-labels-enabled": "true",
76+
// Ingester.
77+
"-ring.store": "consul",
78+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
79+
// Distributor.
80+
"-distributor.replication-factor": "1",
81+
// Store-gateway.
82+
"-store-gateway.sharding-enabled": "false",
83+
// alert manager
84+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
85+
"-frontend.query-vertical-shard-size": "1",
86+
"-frontend.max-cache-freshness": "1m",
87+
// enable experimental promQL funcs
88+
"-querier.enable-promql-experimental-functions": "true",
89+
},
90+
)
91+
// make alert manager config dir
92+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
93+
94+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
95+
require.NoError(t, s.StartAndWaitReady(minio))
96+
97+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
98+
require.NoError(t, s.StartAndWaitReady(cortex))
99+
100+
// Wait until Cortex replicas have updated the ring state.
101+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
102+
103+
now := time.Now()
104+
start := now.Add(-time.Hour * 2)
105+
end := now.Add(-time.Hour)
106+
numSeries := 10
107+
numSamples := 60
108+
lbls := make([]labels.Labels, 0, numSeries*2)
109+
scrapeInterval := time.Minute
110+
statusCodes := []string{"200", "400", "404", "500", "502"}
111+
for i := 0; i < numSeries; i++ {
112+
lbls = append(lbls, labels.Labels{
113+
{Name: labels.MetricName, Value: "test_series_a"},
114+
{Name: "job", Value: "test"},
115+
{Name: "series", Value: strconv.Itoa(i % 3)},
116+
{Name: "status_code", Value: statusCodes[i%5]},
117+
})
118+
119+
lbls = append(lbls, labels.Labels{
120+
{Name: labels.MetricName, Value: "test_series_b"},
121+
{Name: "job", Value: "test"},
122+
{Name: "series", Value: strconv.Itoa((i + 1) % 3)},
123+
{Name: "status_code", Value: statusCodes[(i+1)%5]},
124+
})
125+
}
126+
127+
ctx := context.Background()
128+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
129+
130+
dir := filepath.Join(s.SharedDir(), "data")
131+
err = os.MkdirAll(dir, os.ModePerm)
132+
require.NoError(t, err)
133+
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
134+
require.NoError(t, err)
135+
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)
136+
id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
137+
require.NoError(t, err)
138+
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
139+
require.NoError(t, err)
140+
141+
// Wait for querier and store to sync blocks.
142+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"))))
143+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "querier"))))
144+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_bucket_store_blocks_loaded"}, e2e.WaitMissingMetrics))
145+
146+
c1, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
147+
require.NoError(t, err)
148+
149+
err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
150+
require.NoError(t, err)
151+
prom := e2edb.NewPrometheus(prometheusLatestImage, map[string]string{
152+
"--enable-feature": "promql-experimental-functions",
153+
})
154+
require.NoError(t, s.StartAndWaitReady(prom))
155+
156+
c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())
157+
require.NoError(t, err)
158+
159+
waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)
160+
161+
opts := []promqlsmith.Option{
162+
promqlsmith.WithEnableOffset(true),
163+
promqlsmith.WithEnableAtModifier(true),
164+
promqlsmith.WithEnabledFunctions(enabledFunctions),
165+
promqlsmith.WithEnableExperimentalPromQLFunctions(true),
166+
}
167+
ps := promqlsmith.New(rnd, lbls, opts...)
168+
169+
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000)
170+
}
171+
55172
func TestDisableChunkTrimmingFuzz(t *testing.T) {
56173
s, err := e2e.NewScenario(networkName)
57174
require.NoError(t, err)
@@ -1410,6 +1527,15 @@ func runQueryFuzzTestCases(t *testing.T, ps *promqlsmith.PromQLSmith, c1, c2 *e2
14101527
func isValidQuery(generatedQuery parser.Expr, maxDepth int) bool {
14111528
isValid := true
14121529
currentDepth := 0
1530+
// TODO(SungJin1212): Test limitk, limit_ratio
1531+
if strings.Contains(generatedQuery.String(), "limitk") {
1532+
// current skip the limitk
1533+
return false
1534+
}
1535+
if strings.Contains(generatedQuery.String(), "limit_ratio") {
1536+
// current skip the limit_ratio
1537+
return false
1538+
}
14131539
parser.Inspect(generatedQuery, func(node parser.Node, path []parser.Node) error {
14141540
if currentDepth > maxDepth {
14151541
isValid = false

pkg/cortex/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
508508
t.Cfg.Querier.DefaultEvaluationInterval,
509509
t.Cfg.Querier.MaxSubQuerySteps,
510510
t.Cfg.Querier.LookbackDelta,
511+
t.Cfg.Querier.EnablePromQLExperimentalFunctions,
511512
)
512513

513514
return services.NewIdleService(nil, func(_ error) error {

pkg/querier/querier.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/prometheus/common/model"
1717
"github.com/prometheus/prometheus/model/labels"
1818
"github.com/prometheus/prometheus/promql"
19+
"github.com/prometheus/prometheus/promql/parser"
1920
"github.com/prometheus/prometheus/storage"
2021
"github.com/prometheus/prometheus/tsdb/chunkenc"
2122
"github.com/prometheus/prometheus/util/annotations"
@@ -89,7 +90,8 @@ type Config struct {
8990
ThanosEngine bool `yaml:"thanos_engine"`
9091

9192
// Ignore max query length check at Querier.
92-
IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"`
93+
IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"`
94+
EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"`
9395
}
9496

9597
var (
@@ -132,6 +134,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
132134
f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")
133135
f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.")
134136
f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.")
137+
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
135138
}
136139

137140
// Validate the config
@@ -204,6 +207,9 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
204207
})
205208
maxConcurrentMetric.Set(float64(cfg.MaxConcurrent))
206209

210+
// set EnableExperimentalFunctions
211+
parser.EnableExperimentalFunctions = cfg.EnablePromQLExperimentalFunctions
212+
207213
var queryEngine promql.QueryEngine
208214
opts := promql.EngineOpts{
209215
Logger: logger,

pkg/querier/querier_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,6 @@ func TestLimits(t *testing.T) {
542542
}
543543

544544
func TestQuerier(t *testing.T) {
545-
t.Parallel()
546545
var cfg Config
547546
flagext.DefaultValues(&cfg)
548547
const chunks = 24
@@ -610,7 +609,6 @@ func TestQuerierMetric(t *testing.T) {
610609
}
611610

612611
func TestNoHistoricalQueryToIngester(t *testing.T) {
613-
t.Parallel()
614612
testCases := []struct {
615613
name string
616614
mint, maxt time.Time
@@ -711,7 +709,6 @@ func TestNoHistoricalQueryToIngester(t *testing.T) {
711709
}
712710

713711
func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) {
714-
t.Parallel()
715712
const engineLookbackDelta = 5 * time.Minute
716713

717714
now := time.Now()
@@ -929,7 +926,6 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Series(t *testing.T) {
929926
}
930927

931928
func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Labels(t *testing.T) {
932-
t.Parallel()
933929
const maxQueryLength = 30 * 24 * time.Hour
934930
tests := map[string]struct {
935931
startTime time.Time
@@ -1002,7 +998,6 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Labels(t *testing.T) {
1002998
}
1003999

10041000
func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
1005-
t.Parallel()
10061001
const (
10071002
engineLookbackDelta = 5 * time.Minute
10081003
thirtyDays = 30 * 24 * time.Hour
@@ -1511,7 +1506,6 @@ func (q *mockStoreQuerier) Close() error {
15111506
}
15121507

15131508
func TestShortTermQueryToLTS(t *testing.T) {
1514-
t.Parallel()
15151509
testCases := []struct {
15161510
name string
15171511
mint, maxt time.Time

pkg/querier/tripperware/merge.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,9 +288,9 @@ func sortPlanForQuery(q string) (sortPlan, error) {
288288
if err != nil {
289289
return 0, err
290290
}
291-
// Check if the root expression is topk or bottomk
291+
// Check if the root expression is topk, bottomk, limitk, or limit_ratio
292292
if aggr, ok := expr.(*promqlparser.AggregateExpr); ok {
293-
if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK {
293+
if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK || aggr.Op == promqlparser.LIMITK || aggr.Op == promqlparser.LIMIT_RATIO {
294294
return mergeOnly, nil
295295
}
296296
}
@@ -303,6 +303,12 @@ func sortPlanForQuery(q string) (sortPlan, error) {
303303
if n.Func.Name == "sort_desc" {
304304
sortDesc = true
305305
}
306+
if n.Func.Name == "sort_by_label" {
307+
sortAsc = true
308+
}
309+
if n.Func.Name == "sort_by_label_desc" {
310+
sortDesc = true
311+
}
306312
}
307313
}
308314
return sortAsc, sortDesc

pkg/querier/tripperware/merge_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"testing"
55

66
"github.com/prometheus/prometheus/model/labels"
7+
promqlparser "github.com/prometheus/prometheus/promql/parser"
78
"github.com/stretchr/testify/assert"
89

910
"github.com/cortexproject/cortex/pkg/cortexpb"
@@ -618,6 +619,16 @@ func Test_sortPlanForQuery(t *testing.T) {
618619
expectedPlan: mergeOnly,
619620
err: false,
620621
},
622+
{
623+
query: "limitk(10, up)",
624+
expectedPlan: mergeOnly,
625+
err: false,
626+
},
627+
{
628+
query: "limit_ratio(0.1, up)",
629+
expectedPlan: mergeOnly,
630+
err: false,
631+
},
621632
{
622633
query: "1 + topk(10, up)",
623634
expectedPlan: sortByLabels,
@@ -633,6 +644,16 @@ func Test_sortPlanForQuery(t *testing.T) {
633644
expectedPlan: sortByValuesAsc,
634645
err: false,
635646
},
647+
{
648+
query: "1 + sort_by_label_desc(sum by (job) (up) )",
649+
expectedPlan: sortByValuesDesc,
650+
err: false,
651+
},
652+
{
653+
query: "sort_by_label(topk by (job) (10, up))",
654+
expectedPlan: sortByValuesAsc,
655+
err: false,
656+
},
636657
{
637658
query: "topk(5, up) by (job) + sort_desc(up)",
638659
expectedPlan: sortByValuesDesc,
@@ -652,6 +673,7 @@ func Test_sortPlanForQuery(t *testing.T) {
652673

653674
for _, tc := range tc {
654675
t.Run(tc.query, func(t *testing.T) {
676+
promqlparser.EnableExperimentalFunctions = true
655677
p, err := sortPlanForQuery(tc.query)
656678
if tc.err {
657679
assert.Error(t, err)

pkg/querier/tripperware/queryrange/query_range_middlewares_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func TestRoundTrip(t *testing.T) {
8080
time.Minute,
8181
0,
8282
0,
83+
false,
8384
)
8485

8586
for i, tc := range []struct {

pkg/querier/tripperware/roundtrip.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/opentracing/opentracing-go"
2828
"github.com/prometheus/client_golang/prometheus"
2929
"github.com/prometheus/client_golang/prometheus/promauto"
30+
"github.com/prometheus/prometheus/promql/parser"
3031
"github.com/thanos-io/thanos/pkg/querysharding"
3132
"github.com/weaveworks/common/httpgrpc"
3233
"github.com/weaveworks/common/user"
@@ -104,7 +105,12 @@ func NewQueryTripperware(
104105
defaultSubQueryInterval time.Duration,
105106
maxSubQuerySteps int64,
106107
lookbackDelta time.Duration,
108+
enablePromQLExperimentalFunctions bool,
107109
) Tripperware {
110+
111+
// set EnableExperimentalFunctions
112+
parser.EnableExperimentalFunctions = enablePromQLExperimentalFunctions
113+
108114
// Per tenant query metrics.
109115
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
110116
Name: "cortex_query_frontend_queries_total",

pkg/querier/tripperware/roundtrip_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ func TestRoundTrip(t *testing.T) {
221221
time.Minute,
222222
tc.maxSubQuerySteps,
223223
0,
224+
false,
224225
)
225226
resp, err := tw(downstream).RoundTrip(req)
226227
if tc.expectedErr == nil {

0 commit comments

Comments
 (0)