Skip to content

Commit 0e39bd9

Browse files
Adding support to filter rules
Signed-off-by: Anand Rajagopal <[email protected]>
1 parent 03ce53f commit 0e39bd9

File tree

7 files changed

+278
-36
lines changed

7 files changed

+278
-36
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [FEATURE] Querier/StoreGateway: Allow the tenant shard sizes to be a percent of total instances. #5393
1919
* [FEATURE] Added the flag `-alertmanager.api-concurrency` to configure alert manager api concurrency limit. #5412
2020
* [FEATURE] Store Gateway: Add `-store-gateway.sharding-ring.keep-instance-in-the-ring-on-shutdown` to skip unregistering instance from the ring in shutdown. #5421
21+
* [FEATURE] Ruler: Support for filtering rules in the API. #5417
2122
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
2223
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
2324
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323
@@ -32,7 +33,6 @@
3233
* [ENHANCEMENT] Store Gateway: Add new metrics `cortex_bucket_store_sent_chunk_size_bytes`, `cortex_bucket_store_postings_size_bytes` and `cortex_bucket_store_empty_postings_total`. #5397
3334
* [ENHANCEMENT] Add jitter to lifecycler heartbeat. #5404
3435
* [ENHANCEMENT] Store Gateway: Add config `estimated_max_series_size_bytes` and `estimated_max_chunk_size_bytes` to address data overfetch. #5401
35-
* [ENHANCEMENT] Ruler: Support for filtering rules. #5417
3636
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
3737
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
3838
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293

docs/contributing/how-integration-tests-work.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ This will locally build the `quay.io/cortexproject/cortex:latest` image used by
2020
Once the Docker image is built, you can run integration tests:
2121

2222
```
23-
go test -v -tags=integration,requires_docker,integration_alertmanager,integration_memberlist,integration_querier,integration_ruler,integration_query_fuzz ./integration/...
23+
go test -v -tags=requires_docker ./integration/...
2424
```
2525

26-
If you want to run a single test you can use a filter. For example, to only run `TestRulerMetricsWhenIngesterFails`:
26+
If you want to run a single test you can use a filter. For example, to only run `TestChunksStorageAllIndexBackends`:
2727

2828
```
29-
go test -v -tags=integration,requires_docker,integration_ruler ./integration/ -run "^TestRulerMetricsWhenIngesterFails$" -count=1
29+
go test -v -tags=requires_docker ./integration -run "^TestChunksStorageAllIndexBackends$"
3030
```
3131

3232
### Supported environment variables
@@ -46,4 +46,4 @@ Integration tests have `requires_docker` tag (`// +build requires_docker` line f
4646

4747
## Isolation
4848

49-
Each integration test runs in isolation. For each integration test, we do create a Docker network, start Cortex and its dependencies containers, push/query series to/from Cortex and run assertions on it. Once the test has done, both the Docker network and containers are terminated and deleted.
49+
Each integration test runs in isolation. For each integration test, we do create a Docker network, start Cortex and its dependencies containers, push/query series to/from Cortex and run assertions on it. Once the test has done, both the Docker network and containers are terminated and deleted.

integration/e2ecortex/client.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,69 @@ type ServerStatus struct {
361361
} `json:"data"`
362362
}
363363

364+
type RuleFilter struct {
365+
Namespaces []string
366+
RuleGroupNames []string
367+
RuleNames []string
368+
RuleType string
369+
}
370+
371+
func addQueryParams(urlValues url.Values, paramName string, params ...string) {
372+
for _, paramValue := range params {
373+
urlValues.Add(paramName, paramValue)
374+
}
375+
}
376+
377+
// GetPrometheusRulesWithFilter fetches the rules from the Prometheus endpoint /api/v1/rules.
378+
func (c *Client) GetPrometheusRulesWithFilter(filter RuleFilter) ([]*ruler.RuleGroup, error) {
379+
// Create HTTP request
380+
381+
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/api/v1/rules", c.rulerAddress), nil)
382+
if err != nil {
383+
return nil, err
384+
}
385+
req.Header.Set("X-Scope-OrgID", c.orgID)
386+
387+
urlValues := req.URL.Query()
388+
addQueryParams(urlValues, "file[]", filter.Namespaces...)
389+
addQueryParams(urlValues, "rule_name[]", filter.RuleNames...)
390+
addQueryParams(urlValues, "rule_group[]", filter.RuleGroupNames...)
391+
addQueryParams(urlValues, "type", filter.RuleType)
392+
req.URL.RawQuery = urlValues.Encode()
393+
394+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
395+
defer cancel()
396+
397+
// Execute HTTP request
398+
res, err := c.httpClient.Do(req.WithContext(ctx))
399+
if err != nil {
400+
return nil, err
401+
}
402+
defer res.Body.Close()
403+
404+
body, err := io.ReadAll(res.Body)
405+
if err != nil {
406+
return nil, err
407+
}
408+
409+
// Decode the response.
410+
type response struct {
411+
Status string `json:"status"`
412+
Data ruler.RuleDiscovery `json:"data"`
413+
}
414+
415+
decoded := &response{}
416+
if err := json.Unmarshal(body, decoded); err != nil {
417+
return nil, err
418+
}
419+
420+
if decoded.Status != "success" {
421+
return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status)
422+
}
423+
424+
return decoded.Data.RuleGroups, nil
425+
}
426+
364427
// GetPrometheusRules fetches the rules from the Prometheus endpoint /api/v1/rules.
365428
func (c *Client) GetPrometheusRules() ([]*ruler.RuleGroup, error) {
366429
// Create HTTP request

integration/ruler_test.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"crypto/x509/pkix"
1010
"fmt"
1111
"math"
12+
"math/rand"
1213
"net/http"
1314
"os"
1415
"path/filepath"
@@ -17,6 +18,8 @@ import (
1718
"testing"
1819
"time"
1920

21+
"github.com/cortexproject/cortex/pkg/ruler"
22+
2023
"github.com/cortexproject/cortex/pkg/storage/tsdb"
2124

2225
"github.com/prometheus/common/model"
@@ -389,6 +392,163 @@ func TestRulerSharding(t *testing.T) {
389392
assert.ElementsMatch(t, expectedNames, actualNames)
390393
}
391394

395+
func TestRulerAPISharding(t *testing.T) {
396+
const numRulesGroups = 100
397+
398+
random := rand.New(rand.NewSource(time.Now().UnixNano()))
399+
s, err := e2e.NewScenario(networkName)
400+
require.NoError(t, err)
401+
defer s.Close()
402+
403+
// Generate multiple rule groups, with 1 rule each.
404+
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
405+
expectedNames := make([]string, numRulesGroups)
406+
alertCount := 0
407+
for i := 0; i < numRulesGroups; i++ {
408+
num := random.Intn(100)
409+
var ruleNode yaml.Node
410+
var exprNode yaml.Node
411+
412+
ruleNode.SetString(fmt.Sprintf("rule_%d", i))
413+
exprNode.SetString(strconv.Itoa(i))
414+
ruleName := fmt.Sprintf("test_%d", i)
415+
416+
expectedNames[i] = ruleName
417+
if num%2 == 0 {
418+
alertCount++
419+
ruleGroups[i] = rulefmt.RuleGroup{
420+
Name: ruleName,
421+
Interval: 60,
422+
Rules: []rulefmt.RuleNode{{
423+
Alert: ruleNode,
424+
Expr: exprNode,
425+
}},
426+
}
427+
} else {
428+
ruleGroups[i] = rulefmt.RuleGroup{
429+
Name: ruleName,
430+
Interval: 60,
431+
Rules: []rulefmt.RuleNode{{
432+
Record: ruleNode,
433+
Expr: exprNode,
434+
}},
435+
}
436+
}
437+
}
438+
439+
// Start dependencies.
440+
consul := e2edb.NewConsul()
441+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
442+
require.NoError(t, s.StartAndWaitReady(consul, minio))
443+
444+
// Configure the ruler.
445+
rulerFlags := mergeFlags(
446+
BlocksStorageFlags(),
447+
RulerFlags(),
448+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
449+
map[string]string{
450+
// Since we're not going to run any rule, we don't need the
451+
// store-gateway to be configured to a valid address.
452+
"-querier.store-gateway-addresses": "localhost:12345",
453+
// Enable the bucket index so we can skip the initial bucket scan.
454+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
455+
},
456+
)
457+
458+
// Start rulers.
459+
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
460+
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
461+
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2)
462+
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2))
463+
464+
// Upload rule groups to one of the rulers.
465+
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
466+
require.NoError(t, err)
467+
468+
namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"}
469+
namespaceNameCount := make([]int, 5)
470+
nsRand := rand.New(rand.NewSource(time.Now().UnixNano()))
471+
for _, ruleGroup := range ruleGroups {
472+
index := nsRand.Intn(len(namespaceNames))
473+
namespaceNameCount[index] = namespaceNameCount[index] + 1
474+
require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index]))
475+
}
476+
477+
// Wait until rulers have loaded all rules.
478+
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))
479+
480+
// Since rulers have loaded all rules, we expect that rules have been sharded
481+
// between the two rulers.
482+
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
483+
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
484+
485+
testCases := map[string]struct {
486+
filter e2ecortex.RuleFilter
487+
resultCheckFn func(assert.TestingT, []*ruler.RuleGroup)
488+
}{
489+
"Filter for Alert Rules": {
490+
filter: e2ecortex.RuleFilter{
491+
RuleType: "alert",
492+
},
493+
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
494+
assert.Len(t, ruleGroups, alertCount, "Expected %d rules but got %d", alertCount, len(ruleGroups))
495+
},
496+
},
497+
"Filter for Recording Rules": {
498+
filter: e2ecortex.RuleFilter{
499+
RuleType: "record",
500+
},
501+
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
502+
assert.Len(t, ruleGroups, numRulesGroups-alertCount, "Expected %d rules but got %d", numRulesGroups-alertCount, len(ruleGroups))
503+
},
504+
},
505+
"Filter by Namespace Name": {
506+
filter: e2ecortex.RuleFilter{
507+
Namespaces: []string{namespaceNames[2]},
508+
},
509+
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
510+
assert.Len(t, ruleGroups, namespaceNameCount[2], "Expected %d rules but got %d", namespaceNameCount[2], len(ruleGroups))
511+
},
512+
},
513+
"Filter by Namespace Name and Alert Rules": {
514+
filter: e2ecortex.RuleFilter{
515+
RuleType: "alert",
516+
Namespaces: []string{namespaceNames[2]},
517+
},
518+
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
519+
for _, ruleGroup := range ruleGroups {
520+
rule := ruleGroup.Rules[0].(map[string]interface{})
521+
ruleType := rule["type"]
522+
assert.Equal(t, "alerting", ruleType, "Expected 'alerting' rule type but got %s", ruleType)
523+
}
524+
},
525+
},
526+
"Filter by Rule Names": {
527+
filter: e2ecortex.RuleFilter{
528+
RuleNames: []string{"rule_3", "rule_64", "rule_99"},
529+
},
530+
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
531+
ruleNames := []string{}
532+
for _, ruleGroup := range ruleGroups {
533+
rule := ruleGroup.Rules[0].(map[string]interface{})
534+
ruleName := rule["name"]
535+
ruleNames = append(ruleNames, ruleName.(string))
536+
537+
}
538+
assert.Len(t, ruleNames, 3, "Expected %d rules but got %d", 3, len(ruleNames))
539+
},
540+
},
541+
}
542+
// For each test case, fetch the rules with configured filters, and ensure the results match.
543+
for name, tc := range testCases {
544+
t.Run(name, func(t *testing.T) {
545+
actualGroups, err := c.GetPrometheusRulesWithFilter(tc.filter)
546+
require.NoError(t, err)
547+
tc.resultCheckFn(t, actualGroups)
548+
})
549+
}
550+
}
551+
392552
func TestRulerAlertmanager(t *testing.T) {
393553
var namespaceOne = "test_/encoded_+namespace/?"
394554
ruleGroup := createTestRuleGroup(t)

pkg/ruler/api.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,6 @@ type recordingRule struct {
100100
EvaluationTime float64 `json:"evaluationTime"`
101101
}
102102

103-
const (
104-
AlertingRuleFilter string = "alert"
105-
RecordingRuleFilter string = "record"
106-
)
107-
108103
func respondError(logger log.Logger, w http.ResponseWriter, msg string) {
109104
b, err := json.Marshal(&response{
110105
Status: "error",
@@ -125,10 +120,10 @@ func respondError(logger log.Logger, w http.ResponseWriter, msg string) {
125120
}
126121
}
127122

128-
func respondClientError(logger log.Logger, w http.ResponseWriter, msg string) {
123+
func respondBadRequest(logger log.Logger, w http.ResponseWriter, msg string) {
129124
b, err := json.Marshal(&response{
130125
Status: "error",
131-
ErrorType: v1.ErrServer,
126+
ErrorType: v1.ErrBadData,
132127
Error: msg,
133128
Data: nil,
134129
})
@@ -173,13 +168,13 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {
173168

174169
if err := req.ParseForm(); err != nil {
175170
level.Error(logger).Log("msg", "error parsing form/query params", "err", err)
176-
respondClientError(logger, w, err.Error())
171+
respondBadRequest(logger, w, "error parsing form/query params")
177172
return
178173
}
179174

180175
typ := strings.ToLower(req.URL.Query().Get("type"))
181-
if typ != "" && typ != AlertingRuleFilter && typ != RecordingRuleFilter {
182-
respondClientError(logger, w, fmt.Sprintf("not supported value %q", typ))
176+
if typ != "" && typ != alertingRuleFilter && typ != recordingRuleFilter {
177+
respondBadRequest(logger, w, fmt.Sprintf("unsupported rule type %q", typ))
183178
return
184179
}
185180

@@ -284,7 +279,7 @@ func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) {
284279

285280
w.Header().Set("Content-Type", "application/json")
286281
rulesRequest := RulesRequest{
287-
Type: AlertingRuleFilter,
282+
Type: alertingRuleFilter,
288283
}
289284
rgs, err := a.ruler.GetRules(req.Context(), rulesRequest)
290285

pkg/ruler/ruler.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ const (
6666

6767
// errors
6868
errListAllUser = "unable to list the ruler users"
69+
70+
alertingRuleFilter string = "alert"
71+
recordingRuleFilter string = "record"
6972
)
7073

7174
// Config is the configuration for the recording rules server.
@@ -677,12 +680,17 @@ func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest) ([]*Grou
677680
fileSet := sliceToSet(rulesRequest.Files)
678681
ruleType := rulesRequest.Type
679682

680-
returnAlerts := ruleType == "" || ruleType == "alert"
681-
returnRecording := ruleType == "" || ruleType == "record"
683+
returnAlerts := ruleType == "" || ruleType == alertingRuleFilter
684+
returnRecording := ruleType == "" || ruleType == recordingRuleFilter
682685

683686
for _, group := range groups {
687+
// The mapped filename is url path escaped encoded to make handling `/` characters easier
688+
decodedNamespace, err := url.PathUnescape(strings.TrimPrefix(group.File(), prefix))
689+
if err != nil {
690+
return nil, errors.Wrap(err, "unable to decode rule filename")
691+
}
684692
if len(fileSet) > 0 {
685-
if _, OK := fileSet[group.File()]; !OK {
693+
if _, OK := fileSet[decodedNamespace]; !OK {
686694
continue
687695
}
688696
}
@@ -694,12 +702,6 @@ func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest) ([]*Grou
694702
}
695703
interval := group.Interval()
696704

697-
// The mapped filename is url path escaped encoded to make handling `/` characters easier
698-
decodedNamespace, err := url.PathUnescape(strings.TrimPrefix(group.File(), prefix))
699-
if err != nil {
700-
return nil, errors.Wrap(err, "unable to decode rule filename")
701-
}
702-
703705
groupDesc := &GroupStateDesc{
704706
Group: &rulespb.RuleGroupDesc{
705707
Name: group.Name(),

0 commit comments

Comments
 (0)