Skip to content

Commit f51bdef

Browse files
Pagination support for ListRules
Signed-off-by: Anand Rajagopal <[email protected]>
1 parent 275a5bf commit f51bdef

File tree

10 files changed

+885
-179
lines changed

10 files changed

+885
-179
lines changed

integration/e2ecortex/client.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,8 @@ type RuleFilter struct {
601601
RuleNames []string
602602
RuleType string
603603
ExcludeAlerts string
604+
MaxRuleGroup int
605+
NextToken string
604606
}
605607

606608
func addQueryParams(urlValues url.Values, paramName string, params ...string) {
@@ -612,12 +614,12 @@ func addQueryParams(urlValues url.Values, paramName string, params ...string) {
612614
}
613615

614616
// GetPrometheusRules fetches the rules from the Prometheus endpoint /api/v1/rules.
615-
func (c *Client) GetPrometheusRules(filter RuleFilter) ([]*ruler.RuleGroup, error) {
617+
func (c *Client) GetPrometheusRules(filter RuleFilter) ([]*ruler.RuleGroup, string, error) {
616618
// Create HTTP request
617619

618620
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/api/v1/rules", c.rulerAddress), nil)
619621
if err != nil {
620-
return nil, err
622+
return nil, "", err
621623
}
622624
req.Header.Set("X-Scope-OrgID", c.orgID)
623625

@@ -627,6 +629,12 @@ func (c *Client) GetPrometheusRules(filter RuleFilter) ([]*ruler.RuleGroup, erro
627629
addQueryParams(urlValues, "rule_group[]", filter.RuleGroupNames...)
628630
addQueryParams(urlValues, "type", filter.RuleType)
629631
addQueryParams(urlValues, "exclude_alerts", filter.ExcludeAlerts)
632+
if filter.MaxRuleGroup > 0 {
633+
addQueryParams(urlValues, "group_limit", strconv.Itoa(filter.MaxRuleGroup))
634+
}
635+
if filter.NextToken != "" {
636+
addQueryParams(urlValues, "group_next_token", filter.NextToken)
637+
}
630638
req.URL.RawQuery = urlValues.Encode()
631639

632640
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
@@ -635,13 +643,13 @@ func (c *Client) GetPrometheusRules(filter RuleFilter) ([]*ruler.RuleGroup, erro
635643
// Execute HTTP request
636644
res, err := c.httpClient.Do(req.WithContext(ctx))
637645
if err != nil {
638-
return nil, err
646+
return nil, "", err
639647
}
640648
defer res.Body.Close()
641649

642650
body, err := io.ReadAll(res.Body)
643651
if err != nil {
644-
return nil, err
652+
return nil, "", err
645653
}
646654

647655
// Decode the response.
@@ -652,14 +660,14 @@ func (c *Client) GetPrometheusRules(filter RuleFilter) ([]*ruler.RuleGroup, erro
652660

653661
decoded := &response{}
654662
if err := json.Unmarshal(body, decoded); err != nil {
655-
return nil, err
663+
return nil, "", err
656664
}
657665

658666
if decoded.Status != "success" {
659-
return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status)
667+
return nil, "", fmt.Errorf("unexpected response status '%s'", decoded.Status)
660668
}
661669

662-
return decoded.Data.RuleGroups, nil
670+
return decoded.Data.RuleGroups, decoded.Data.GroupNextToken, nil
663671
}
664672

665673
// GetRuleGroups gets the configured rule groups from the ruler.

integration/ruler_test.go

Lines changed: 246 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func TestRulerSharding(t *testing.T) {
278278
require.NoError(t, ruler2.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store"))
279279

280280
// Fetch the rules and ensure they match the configured ones.
281-
actualGroups, err := c.GetPrometheusRules(e2ecortex.DefaultFilter)
281+
actualGroups, _, err := c.GetPrometheusRules(e2ecortex.DefaultFilter)
282282
require.NoError(t, err)
283283

284284
var actualNames []string
@@ -493,13 +493,252 @@ func testRulerAPIWithSharding(t *testing.T, enableRulesBackup bool) {
493493
}
494494
for name, tc := range testCases {
495495
t.Run(name, func(t *testing.T) {
496-
actualGroups, err := c.GetPrometheusRules(tc.filter)
496+
actualGroups, _, err := c.GetPrometheusRules(tc.filter)
497497
require.NoError(t, err)
498498
tc.resultCheckFn(t, actualGroups)
499499
})
500500
}
501501
}
502502

503+
func TestRulesPaginationAPISharding(t *testing.T) {
504+
testRulesPaginationAPIWithSharding(t, false)
505+
}
506+
507+
func TestRulesPaginationAPIShardingWithAPIRulesBackupEnabled(t *testing.T) {
508+
testRulesPaginationAPIWithSharding(t, true)
509+
}
510+
511+
func testRulesPaginationAPIWithSharding(t *testing.T, enableRulesBackup bool) {
512+
const numRulesGroups = 100
513+
514+
random := rand.New(rand.NewSource(time.Now().UnixNano()))
515+
s, err := e2e.NewScenario(networkName)
516+
require.NoError(t, err)
517+
defer s.Close()
518+
519+
// Generate multiple rule groups, with 1 rule each.
520+
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
521+
expectedNames := make([]string, numRulesGroups)
522+
alertCount := 0
523+
evalInterval, _ := model.ParseDuration("1s")
524+
for i := 0; i < numRulesGroups; i++ {
525+
num := random.Intn(100)
526+
var ruleNode yaml.Node
527+
var exprNode yaml.Node
528+
529+
ruleNode.SetString(fmt.Sprintf("rule_%d", i))
530+
exprNode.SetString(strconv.Itoa(i))
531+
ruleName := fmt.Sprintf("test_%d", i)
532+
533+
expectedNames[i] = ruleName
534+
if num%2 == 0 {
535+
alertCount++
536+
ruleGroups[i] = rulefmt.RuleGroup{
537+
Name: ruleName,
538+
Interval: evalInterval,
539+
Rules: []rulefmt.RuleNode{{
540+
Alert: ruleNode,
541+
Expr: exprNode,
542+
}},
543+
}
544+
} else {
545+
ruleGroups[i] = rulefmt.RuleGroup{
546+
Name: ruleName,
547+
Interval: evalInterval,
548+
Rules: []rulefmt.RuleNode{{
549+
Record: ruleNode,
550+
Expr: exprNode,
551+
}},
552+
}
553+
}
554+
}
555+
556+
// Start dependencies.
557+
consul := e2edb.NewConsul()
558+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
559+
require.NoError(t, s.StartAndWaitReady(consul, minio))
560+
561+
// Configure the ruler.
562+
overrides := map[string]string{
563+
// Since we're not going to run any rule, we don't need the
564+
// store-gateway to be configured to a valid address.
565+
"-querier.store-gateway-addresses": "localhost:12345",
566+
// Enable the bucket index so we can skip the initial bucket scan.
567+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
568+
"-ruler.poll-interval": "5s",
569+
}
570+
if enableRulesBackup {
571+
overrides["-ruler.ring.replication-factor"] = "2"
572+
}
573+
rulerFlags := mergeFlags(
574+
BlocksStorageFlags(),
575+
RulerFlags(),
576+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
577+
overrides,
578+
)
579+
580+
// Start rulers.
581+
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
582+
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
583+
ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "")
584+
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2, ruler3)
585+
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3))
586+
587+
// Upload rule groups to one of the rulers.
588+
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
589+
require.NoError(t, err)
590+
591+
namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"}
592+
namespaceNameCount := make([]int, len(namespaceNames))
593+
nsRand := rand.New(rand.NewSource(time.Now().UnixNano()))
594+
ruleGroupToNSMap := map[string]string{}
595+
for _, ruleGroup := range ruleGroups {
596+
index := nsRand.Intn(len(namespaceNames))
597+
namespaceNameCount[index] = namespaceNameCount[index] + 1
598+
require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index]))
599+
ruleGroupToNSMap[ruleGroup.Name] = namespaceNames[index]
600+
}
601+
602+
// Wait until rulers have loaded all rules.
603+
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))
604+
605+
// Since rulers have loaded all rules, we expect that rules have been sharded
606+
// between the two rulers.
607+
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
608+
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
609+
610+
testCases := map[string]struct {
611+
filter e2ecortex.RuleFilter
612+
resultCheckFn func(assert.TestingT, []*ruler.RuleGroup, string, int)
613+
iterations int
614+
}{
615+
"List Rule Groups - Equal number of rule groups per page": {
616+
filter: e2ecortex.RuleFilter{
617+
MaxRuleGroup: 20,
618+
},
619+
resultCheckFn: func(t assert.TestingT, resultGroups []*ruler.RuleGroup, token string, iteration int) {
620+
assert.Len(t, resultGroups, 20, "Expected %d rules but got %d", 20, len(resultGroups))
621+
if iteration < 4 {
622+
assert.NotEmpty(t, token)
623+
return
624+
}
625+
assert.Empty(t, token)
626+
},
627+
iterations: 5,
628+
},
629+
"List Rule Groups - Last page unequal": {
630+
filter: e2ecortex.RuleFilter{
631+
MaxRuleGroup: 72,
632+
},
633+
resultCheckFn: func(t assert.TestingT, resultGroups []*ruler.RuleGroup, token string, iteration int) {
634+
if iteration == 0 {
635+
assert.Len(t, resultGroups, 72, "Expected %d rules but got %d", 72, len(resultGroups))
636+
assert.NotEmpty(t, token)
637+
return
638+
}
639+
assert.Len(t, resultGroups, 28, "Expected %d rules but got %d", 28, len(resultGroups))
640+
assert.Empty(t, token)
641+
},
642+
iterations: 2,
643+
},
644+
"List all rule groups": {
645+
filter: e2ecortex.RuleFilter{},
646+
resultCheckFn: func(t assert.TestingT, resultGroups []*ruler.RuleGroup, token string, iteration int) {
647+
assert.Len(t, resultGroups, 100, "Expected %d rules but got %d", 100, len(resultGroups))
648+
assert.Empty(t, token)
649+
},
650+
iterations: 1,
651+
},
652+
"List all rule groups - Max Rule Groups > Actual": {
653+
filter: e2ecortex.RuleFilter{
654+
MaxRuleGroup: 200,
655+
},
656+
resultCheckFn: func(t assert.TestingT, resultGroups []*ruler.RuleGroup, token string, iteration int) {
657+
assert.Len(t, resultGroups, 100, "Expected %d rules but got %d", 100, len(resultGroups))
658+
assert.Empty(t, token)
659+
},
660+
iterations: 1,
661+
},
662+
}
663+
664+
// For each test case, fetch the rules with configured filters, and ensure the results match.
665+
if enableRulesBackup {
666+
err := ruler2.Kill() // if rules backup is enabled the APIs should be able to handle a ruler going down
667+
require.NoError(t, err)
668+
}
669+
for name, tc := range testCases {
670+
t.Run(name, func(t *testing.T) {
671+
filter := tc.filter
672+
for i := 0; i < tc.iterations; i++ {
673+
actualGroups, token, err := c.GetPrometheusRules(filter)
674+
require.NoError(t, err)
675+
tc.resultCheckFn(t, actualGroups, token, i)
676+
filter.NextToken = token
677+
}
678+
})
679+
}
680+
}
681+
682+
func TestRulesAPIWithNoRules(t *testing.T) {
683+
s, err := e2e.NewScenario(networkName)
684+
require.NoError(t, err)
685+
defer s.Close()
686+
687+
// Start dependencies.
688+
consul := e2edb.NewConsul()
689+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
690+
require.NoError(t, s.StartAndWaitReady(consul, minio))
691+
692+
// Configure the ruler.
693+
overrides := map[string]string{
694+
// Since we're not going to run any rule, we don't need the
695+
// store-gateway to be configured to a valid address.
696+
"-querier.store-gateway-addresses": "localhost:12345",
697+
// Enable the bucket index so we can skip the initial bucket scan.
698+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
699+
"-ruler.poll-interval": "5s",
700+
}
701+
702+
rulerFlags := mergeFlags(
703+
BlocksStorageFlags(),
704+
RulerFlags(),
705+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
706+
overrides,
707+
)
708+
709+
// Start rulers.
710+
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
711+
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
712+
ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "")
713+
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3))
714+
715+
time.Sleep(5 * time.Second)
716+
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
717+
require.NoError(t, err)
718+
719+
testCases := map[string]struct {
720+
filter e2ecortex.RuleFilter
721+
}{
722+
"List Rule Groups With Filter": {
723+
filter: e2ecortex.RuleFilter{
724+
MaxRuleGroup: 20,
725+
},
726+
},
727+
"List All Rule Groups With No Filter": {
728+
filter: e2ecortex.RuleFilter{},
729+
},
730+
}
731+
732+
for name, tc := range testCases {
733+
t.Run(name, func(t *testing.T) {
734+
actualGroups, token, err := c.GetPrometheusRules(tc.filter)
735+
require.NoError(t, err)
736+
assert.Empty(t, actualGroups)
737+
assert.Empty(t, token)
738+
})
739+
}
740+
}
741+
503742
func TestRulerAlertmanager(t *testing.T) {
504743
var namespaceOne = "test_/encoded_+namespace/?"
505744
ruleGroup := createTestRuleGroup(t)
@@ -979,7 +1218,7 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
9791218
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m1), e2e.WaitMissingMetrics))
9801219

9811220
filter := e2ecortex.RuleFilter{}
982-
actualGroups, err := c.GetPrometheusRules(filter)
1221+
actualGroups, _, err := c.GetPrometheusRules(filter)
9831222
require.NoError(t, err)
9841223
assert.Equal(t, 1, len(actualGroups))
9851224
assert.Equal(t, "good_rule", actualGroups[0].Name)
@@ -1125,7 +1364,7 @@ func TestRulerHAEvaluation(t *testing.T) {
11251364
// assumes ownership, it might not immediately evaluate until it's time to evaluate. The following sleep is to ensure the
11261365
// rulers have evaluated the rule groups
11271366
time.Sleep(2100 * time.Millisecond)
1128-
results, err := c.GetPrometheusRules(e2ecortex.RuleFilter{})
1367+
results, _, err := c.GetPrometheusRules(e2ecortex.RuleFilter{})
11291368
require.NoError(t, err)
11301369
require.Equal(t, numRulesGroups, len(results))
11311370
for _, v := range results {
@@ -1199,7 +1438,7 @@ func TestRulerKeepFiring(t *testing.T) {
11991438
// Wait until rule group has tried to evaluate the rule.
12001439
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
12011440

1202-
groups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
1441+
groups, _, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
12031442
RuleNames: []string{ruleName},
12041443
})
12051444
require.NoError(t, err)
@@ -1216,7 +1455,7 @@ func TestRulerKeepFiring(t *testing.T) {
12161455
// Wait until rule group has tried to evaluate the rule.
12171456
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(5), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
12181457

1219-
updatedGroups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
1458+
updatedGroups, _, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
12201459
RuleNames: []string{ruleName},
12211460
})
12221461
require.NoError(t, err)
@@ -1231,7 +1470,7 @@ func TestRulerKeepFiring(t *testing.T) {
12311470
require.Greater(t, alert.Alerts[0].KeepFiringSince.UnixNano(), ts.UnixNano(), "KeepFiringSince value should be after expression is resolved")
12321471

12331472
time.Sleep(10 * time.Second) // Sleep beyond keepFiringFor time
1234-
updatedGroups, err = c.GetPrometheusRules(e2ecortex.RuleFilter{
1473+
updatedGroups, _, err = c.GetPrometheusRules(e2ecortex.RuleFilter{
12351474
RuleNames: []string{ruleName},
12361475
})
12371476
require.NoError(t, err)

0 commit comments

Comments
 (0)