Skip to content

Commit e710de2

Browse files
Pagination support for ListRules
Signed-off-by: Anand Rajagopal <[email protected]>
1 parent 275a5bf commit e710de2

File tree

11 files changed

+886
-179
lines changed

11 files changed

+886
-179
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
* [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.memcached.max-async-concurrency` from `50` to `3` #6265
66
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
7+
* [FEATURE] Ruler: Pagination support for List Rules API. #6299
78
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
89
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
910
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129

integration/e2ecortex/client.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,8 @@ type RuleFilter struct {
601601
RuleNames []string
602602
RuleType string
603603
ExcludeAlerts string
604+
MaxRuleGroup int
605+
NextToken string
604606
}
605607

606608
func addQueryParams(urlValues url.Values, paramName string, params ...string) {
@@ -612,12 +614,12 @@ func addQueryParams(urlValues url.Values, paramName string, params ...string) {
612614
}
613615

614616
// GetPrometheusRules fetches the rules from the Prometheus endpoint /api/v1/rules.
615-
func (c *Client) GetPrometheusRules(filter RuleFilter) ([]*ruler.RuleGroup, error) {
617+
func (c *Client) GetPrometheusRules(filter RuleFilter) ([]*ruler.RuleGroup, string, error) {
616618
// Create HTTP request
617619

618620
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/api/v1/rules", c.rulerAddress), nil)
619621
if err != nil {
620-
return nil, err
622+
return nil, "", err
621623
}
622624
req.Header.Set("X-Scope-OrgID", c.orgID)
623625

@@ -627,6 +629,12 @@ func (c *Client) GetPrometheusRules(filter RuleFilter) ([]*ruler.RuleGroup, erro
627629
addQueryParams(urlValues, "rule_group[]", filter.RuleGroupNames...)
628630
addQueryParams(urlValues, "type", filter.RuleType)
629631
addQueryParams(urlValues, "exclude_alerts", filter.ExcludeAlerts)
632+
if filter.MaxRuleGroup > 0 {
633+
addQueryParams(urlValues, "group_limit", strconv.Itoa(filter.MaxRuleGroup))
634+
}
635+
if filter.NextToken != "" {
636+
addQueryParams(urlValues, "group_next_token", filter.NextToken)
637+
}
630638
req.URL.RawQuery = urlValues.Encode()
631639

632640
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
@@ -635,13 +643,13 @@ func (c *Client) GetPrometheusRules(filter RuleFilter) ([]*ruler.RuleGroup, erro
635643
// Execute HTTP request
636644
res, err := c.httpClient.Do(req.WithContext(ctx))
637645
if err != nil {
638-
return nil, err
646+
return nil, "", err
639647
}
640648
defer res.Body.Close()
641649

642650
body, err := io.ReadAll(res.Body)
643651
if err != nil {
644-
return nil, err
652+
return nil, "", err
645653
}
646654

647655
// Decode the response.
@@ -652,14 +660,14 @@ func (c *Client) GetPrometheusRules(filter RuleFilter) ([]*ruler.RuleGroup, erro
652660

653661
decoded := &response{}
654662
if err := json.Unmarshal(body, decoded); err != nil {
655-
return nil, err
663+
return nil, "", err
656664
}
657665

658666
if decoded.Status != "success" {
659-
return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status)
667+
return nil, "", fmt.Errorf("unexpected response status '%s'", decoded.Status)
660668
}
661669

662-
return decoded.Data.RuleGroups, nil
670+
return decoded.Data.RuleGroups, decoded.Data.GroupNextToken, nil
663671
}
664672

665673
// GetRuleGroups gets the configured rule groups from the ruler.

integration/ruler_test.go

Lines changed: 246 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func TestRulerSharding(t *testing.T) {
278278
require.NoError(t, ruler2.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store"))
279279

280280
// Fetch the rules and ensure they match the configured ones.
281-
actualGroups, err := c.GetPrometheusRules(e2ecortex.DefaultFilter)
281+
actualGroups, _, err := c.GetPrometheusRules(e2ecortex.DefaultFilter)
282282
require.NoError(t, err)
283283

284284
var actualNames []string
@@ -493,13 +493,252 @@ func testRulerAPIWithSharding(t *testing.T, enableRulesBackup bool) {
493493
}
494494
for name, tc := range testCases {
495495
t.Run(name, func(t *testing.T) {
496-
actualGroups, err := c.GetPrometheusRules(tc.filter)
496+
actualGroups, _, err := c.GetPrometheusRules(tc.filter)
497497
require.NoError(t, err)
498498
tc.resultCheckFn(t, actualGroups)
499499
})
500500
}
501501
}
502502

503+
func TestRulesPaginationAPISharding(t *testing.T) {
504+
testRulesPaginationAPIWithSharding(t, false)
505+
}
506+
507+
func TestRulesPaginationAPIShardingWithAPIRulesBackupEnabled(t *testing.T) {
508+
testRulesPaginationAPIWithSharding(t, true)
509+
}
510+
511+
func testRulesPaginationAPIWithSharding(t *testing.T, enableRulesBackup bool) {
512+
const numRulesGroups = 100
513+
514+
random := rand.New(rand.NewSource(time.Now().UnixNano()))
515+
s, err := e2e.NewScenario(networkName)
516+
require.NoError(t, err)
517+
defer s.Close()
518+
519+
// Generate multiple rule groups, with 1 rule each.
520+
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
521+
expectedNames := make([]string, numRulesGroups)
522+
alertCount := 0
523+
evalInterval, _ := model.ParseDuration("1s")
524+
for i := 0; i < numRulesGroups; i++ {
525+
num := random.Intn(100)
526+
var ruleNode yaml.Node
527+
var exprNode yaml.Node
528+
529+
ruleNode.SetString(fmt.Sprintf("rule_%d", i))
530+
exprNode.SetString(strconv.Itoa(i))
531+
ruleName := fmt.Sprintf("test_%d", i)
532+
533+
expectedNames[i] = ruleName
534+
if num%2 == 0 {
535+
alertCount++
536+
ruleGroups[i] = rulefmt.RuleGroup{
537+
Name: ruleName,
538+
Interval: evalInterval,
539+
Rules: []rulefmt.RuleNode{{
540+
Alert: ruleNode,
541+
Expr: exprNode,
542+
}},
543+
}
544+
} else {
545+
ruleGroups[i] = rulefmt.RuleGroup{
546+
Name: ruleName,
547+
Interval: evalInterval,
548+
Rules: []rulefmt.RuleNode{{
549+
Record: ruleNode,
550+
Expr: exprNode,
551+
}},
552+
}
553+
}
554+
}
555+
556+
// Start dependencies.
557+
consul := e2edb.NewConsul()
558+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
559+
require.NoError(t, s.StartAndWaitReady(consul, minio))
560+
561+
// Configure the ruler.
562+
overrides := map[string]string{
563+
// Since we're not going to run any rule, we don't need the
564+
// store-gateway to be configured to a valid address.
565+
"-querier.store-gateway-addresses": "localhost:12345",
566+
// Enable the bucket index so we can skip the initial bucket scan.
567+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
568+
"-ruler.poll-interval": "5s",
569+
}
570+
if enableRulesBackup {
571+
overrides["-ruler.ring.replication-factor"] = "2"
572+
}
573+
rulerFlags := mergeFlags(
574+
BlocksStorageFlags(),
575+
RulerFlags(),
576+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
577+
overrides,
578+
)
579+
580+
// Start rulers.
581+
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
582+
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
583+
ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "")
584+
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2, ruler3)
585+
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3))
586+
587+
// Upload rule groups to one of the rulers.
588+
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
589+
require.NoError(t, err)
590+
591+
namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"}
592+
namespaceNameCount := make([]int, len(namespaceNames))
593+
nsRand := rand.New(rand.NewSource(time.Now().UnixNano()))
594+
ruleGroupToNSMap := map[string]string{}
595+
for _, ruleGroup := range ruleGroups {
596+
index := nsRand.Intn(len(namespaceNames))
597+
namespaceNameCount[index] = namespaceNameCount[index] + 1
598+
require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index]))
599+
ruleGroupToNSMap[ruleGroup.Name] = namespaceNames[index]
600+
}
601+
602+
// Wait until rulers have loaded all rules.
603+
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))
604+
605+
// Since rulers have loaded all rules, we expect that rules have been sharded
606+
// between the two rulers.
607+
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
608+
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
609+
610+
testCases := map[string]struct {
611+
filter e2ecortex.RuleFilter
612+
resultCheckFn func(assert.TestingT, []*ruler.RuleGroup, string, int)
613+
iterations int
614+
}{
615+
"List Rule Groups - Equal number of rule groups per page": {
616+
filter: e2ecortex.RuleFilter{
617+
MaxRuleGroup: 20,
618+
},
619+
resultCheckFn: func(t assert.TestingT, resultGroups []*ruler.RuleGroup, token string, iteration int) {
620+
assert.Len(t, resultGroups, 20, "Expected %d rules but got %d", 20, len(resultGroups))
621+
if iteration < 4 {
622+
assert.NotEmpty(t, token)
623+
return
624+
}
625+
assert.Empty(t, token)
626+
},
627+
iterations: 5,
628+
},
629+
"List Rule Groups - Last page unequal": {
630+
filter: e2ecortex.RuleFilter{
631+
MaxRuleGroup: 72,
632+
},
633+
resultCheckFn: func(t assert.TestingT, resultGroups []*ruler.RuleGroup, token string, iteration int) {
634+
if iteration == 0 {
635+
assert.Len(t, resultGroups, 72, "Expected %d rules but got %d", 72, len(resultGroups))
636+
assert.NotEmpty(t, token)
637+
return
638+
}
639+
assert.Len(t, resultGroups, 28, "Expected %d rules but got %d", 28, len(resultGroups))
640+
assert.Empty(t, token)
641+
},
642+
iterations: 2,
643+
},
644+
"List all rule groups": {
645+
filter: e2ecortex.RuleFilter{},
646+
resultCheckFn: func(t assert.TestingT, resultGroups []*ruler.RuleGroup, token string, iteration int) {
647+
assert.Len(t, resultGroups, 100, "Expected %d rules but got %d", 100, len(resultGroups))
648+
assert.Empty(t, token)
649+
},
650+
iterations: 1,
651+
},
652+
"List all rule groups - Max Rule Groups > Actual": {
653+
filter: e2ecortex.RuleFilter{
654+
MaxRuleGroup: 200,
655+
},
656+
resultCheckFn: func(t assert.TestingT, resultGroups []*ruler.RuleGroup, token string, iteration int) {
657+
assert.Len(t, resultGroups, 100, "Expected %d rules but got %d", 100, len(resultGroups))
658+
assert.Empty(t, token)
659+
},
660+
iterations: 1,
661+
},
662+
}
663+
664+
// For each test case, fetch the rules with configured filters, and ensure the results match.
665+
if enableRulesBackup {
666+
err := ruler2.Kill() // if rules backup is enabled the APIs should be able to handle a ruler going down
667+
require.NoError(t, err)
668+
}
669+
for name, tc := range testCases {
670+
t.Run(name, func(t *testing.T) {
671+
filter := tc.filter
672+
for i := 0; i < tc.iterations; i++ {
673+
actualGroups, token, err := c.GetPrometheusRules(filter)
674+
require.NoError(t, err)
675+
tc.resultCheckFn(t, actualGroups, token, i)
676+
filter.NextToken = token
677+
}
678+
})
679+
}
680+
}
681+
682+
func TestRulesAPIWithNoRules(t *testing.T) {
683+
s, err := e2e.NewScenario(networkName)
684+
require.NoError(t, err)
685+
defer s.Close()
686+
687+
// Start dependencies.
688+
consul := e2edb.NewConsul()
689+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
690+
require.NoError(t, s.StartAndWaitReady(consul, minio))
691+
692+
// Configure the ruler.
693+
overrides := map[string]string{
694+
// Since we're not going to run any rule, we don't need the
695+
// store-gateway to be configured to a valid address.
696+
"-querier.store-gateway-addresses": "localhost:12345",
697+
// Enable the bucket index so we can skip the initial bucket scan.
698+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
699+
"-ruler.poll-interval": "5s",
700+
}
701+
702+
rulerFlags := mergeFlags(
703+
BlocksStorageFlags(),
704+
RulerFlags(),
705+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
706+
overrides,
707+
)
708+
709+
// Start rulers.
710+
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
711+
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
712+
ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "")
713+
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3))
714+
715+
time.Sleep(5 * time.Second)
716+
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
717+
require.NoError(t, err)
718+
719+
testCases := map[string]struct {
720+
filter e2ecortex.RuleFilter
721+
}{
722+
"List Rule Groups With Filter": {
723+
filter: e2ecortex.RuleFilter{
724+
MaxRuleGroup: 20,
725+
},
726+
},
727+
"List All Rule Groups With No Filter": {
728+
filter: e2ecortex.RuleFilter{},
729+
},
730+
}
731+
732+
for name, tc := range testCases {
733+
t.Run(name, func(t *testing.T) {
734+
actualGroups, token, err := c.GetPrometheusRules(tc.filter)
735+
require.NoError(t, err)
736+
assert.Empty(t, actualGroups)
737+
assert.Empty(t, token)
738+
})
739+
}
740+
}
741+
503742
func TestRulerAlertmanager(t *testing.T) {
504743
var namespaceOne = "test_/encoded_+namespace/?"
505744
ruleGroup := createTestRuleGroup(t)
@@ -979,7 +1218,7 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
9791218
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m1), e2e.WaitMissingMetrics))
9801219

9811220
filter := e2ecortex.RuleFilter{}
982-
actualGroups, err := c.GetPrometheusRules(filter)
1221+
actualGroups, _, err := c.GetPrometheusRules(filter)
9831222
require.NoError(t, err)
9841223
assert.Equal(t, 1, len(actualGroups))
9851224
assert.Equal(t, "good_rule", actualGroups[0].Name)
@@ -1125,7 +1364,7 @@ func TestRulerHAEvaluation(t *testing.T) {
11251364
// assumes ownership, it might not immediately evaluate until it's time to evaluate. The following sleep is to ensure the
11261365
// rulers have evaluated the rule groups
11271366
time.Sleep(2100 * time.Millisecond)
1128-
results, err := c.GetPrometheusRules(e2ecortex.RuleFilter{})
1367+
results, _, err := c.GetPrometheusRules(e2ecortex.RuleFilter{})
11291368
require.NoError(t, err)
11301369
require.Equal(t, numRulesGroups, len(results))
11311370
for _, v := range results {
@@ -1199,7 +1438,7 @@ func TestRulerKeepFiring(t *testing.T) {
11991438
// Wait until rule group has tried to evaluate the rule.
12001439
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
12011440

1202-
groups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
1441+
groups, _, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
12031442
RuleNames: []string{ruleName},
12041443
})
12051444
require.NoError(t, err)
@@ -1216,7 +1455,7 @@ func TestRulerKeepFiring(t *testing.T) {
12161455
// Wait until rule group has tried to evaluate the rule.
12171456
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(5), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
12181457

1219-
updatedGroups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
1458+
updatedGroups, _, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
12201459
RuleNames: []string{ruleName},
12211460
})
12221461
require.NoError(t, err)
@@ -1231,7 +1470,7 @@ func TestRulerKeepFiring(t *testing.T) {
12311470
require.Greater(t, alert.Alerts[0].KeepFiringSince.UnixNano(), ts.UnixNano(), "KeepFiringSince value should be after expression is resolved")
12321471

12331472
time.Sleep(10 * time.Second) // Sleep beyond keepFiringFor time
1234-
updatedGroups, err = c.GetPrometheusRules(e2ecortex.RuleFilter{
1473+
updatedGroups, _, err = c.GetPrometheusRules(e2ecortex.RuleFilter{
12351474
RuleNames: []string{ruleName},
12361475
})
12371476
require.NoError(t, err)

0 commit comments

Comments
 (0)