Skip to content

Commit 46e7185

Browse files
disable rule groups
Signed-off-by: Anand Rajagopal <[email protected]>
1 parent 526a6d9 commit 46e7185

File tree

3 files changed

+117
-8
lines changed

3 files changed

+117
-8
lines changed

pkg/ruler/ruler.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ const (
7171
recordingRuleFilter string = "record"
7272
)
7373

74+
type DisabledRuleGroupErr struct {
75+
Message string
76+
}
77+
78+
func (e *DisabledRuleGroupErr) Error() string {
79+
return e.Message
80+
}
81+
7482
// Config is the configuration for the recording rules server.
7583
type Config struct {
7684
// This is used for template expansion in alerts; must be a valid URL.
@@ -126,6 +134,8 @@ type Config struct {
126134

127135
EnableQueryStats bool `yaml:"query_stats_enabled"`
128136
DisableRuleGroupLabel bool `yaml:"disable_rule_group_label"`
137+
138+
DisabledRuleGroups flagext.IntSliceCSV `yaml:"disabled_rule_groups"`
129139
}
130140

131141
// Validate config and returns error on failure
@@ -187,6 +197,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
187197
f.BoolVar(&cfg.EnableQueryStats, "ruler.query-stats-enabled", false, "Report the wall time for ruler queries to complete as a per user metric and as an info level log message.")
188198
f.BoolVar(&cfg.DisableRuleGroupLabel, "ruler.disable-rule-group-label", false, "Disable the rule_group label on exported metrics")
189199

200+
f.Var(&cfg.DisabledRuleGroups, "ruler.disabled-rule-groups", "Comma separated list of keys of rule groups this ruler cannot evaluate. If specified, a ruler that would normally pick the specified rule groups for processing will ignore them instead.")
201+
190202
cfg.RingCheckPeriod = 5 * time.Second
191203
}
192204

@@ -415,9 +427,14 @@ func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 {
415427
return ringHasher.Sum32()
416428
}
417429

418-
func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, instanceAddr string) (bool, error) {
430+
func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups flagext.IntSliceCSV, instanceAddr string) (bool, error) {
419431
hash := tokenForGroup(g)
420432

433+
for _, disabledGroupToken := range disabledRuleGroups {
434+
if hash == uint32(disabledGroupToken) {
435+
return false, &DisabledRuleGroupErr{Message: fmt.Sprintf("Skipping rule group %s, within namespace %s, owned by %s", g.Name, g.Namespace, g.User)}
436+
}
437+
}
421438
rlrs, err := r.Get(hash, RingOp, nil, nil, nil)
422439
if err != nil {
423440
return false, errors.Wrap(err, "error reading ring to verify rule group ownership")
@@ -544,7 +561,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
544561

545562
filteredConfigs := make(map[string]rulespb.RuleGroupList)
546563
for userID, groups := range configs {
547-
filtered := filterRuleGroups(userID, groups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
564+
filtered := filterRuleGroups(userID, groups, r.cfg.DisabledRuleGroups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
548565
if len(filtered) > 0 {
549566
filteredConfigs[userID] = filtered
550567
}
@@ -602,7 +619,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
602619
return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID)
603620
}
604621

605-
filtered := filterRuleGroups(userID, groups, userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
622+
filtered := filterRuleGroups(userID, groups, r.cfg.DisabledRuleGroups, userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
606623
if len(filtered) == 0 {
607624
continue
608625
}
@@ -624,15 +641,20 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
624641
//
625642
// Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring,
626643
// but only ring passed as parameter.
627-
func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc {
644+
func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabledRuleGroups flagext.IntSliceCSV, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc {
628645
// Prune the rule group to only contain rules that this ruler is responsible for, based on ring.
629646
var result []*rulespb.RuleGroupDesc
630647
for _, g := range ruleGroups {
631-
owned, err := instanceOwnsRuleGroup(ring, g, instanceAddr)
648+
owned, err := instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr)
632649
if err != nil {
633-
ringCheckErrors.Inc()
634-
level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
635-
continue
650+
switch e := err.(type) {
651+
case *DisabledRuleGroupErr:
652+
level.Info(log).Log("msg", e.Message)
653+
default:
654+
ringCheckErrors.Inc()
655+
level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
656+
continue
657+
}
636658
}
637659

638660
if owned {

pkg/util/flagext/intslicecsv.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package flagext
2+
3+
import (
4+
"strconv"
5+
"strings"
6+
)
7+
8+
// IntSliceCSV is a slice of ints that is parsed from a comma-separated string
9+
// It implements flag.Value and yaml Marshalers
10+
type IntSliceCSV []int
11+
12+
// String implements flag.Value
13+
func (v IntSliceCSV) String() string {
14+
var result strings.Builder
15+
for i, value := range v {
16+
if i != 0 {
17+
result.WriteString(",")
18+
}
19+
result.WriteString(strconv.Itoa(value))
20+
}
21+
return result.String()
22+
}
23+
24+
// Set implements flag.Value
25+
func (v *IntSliceCSV) Set(s string) error {
26+
if len(strings.TrimSpace(s)) == 0 {
27+
return nil
28+
}
29+
str := strings.Split(s, ",")
30+
for _, sv := range str {
31+
val, err := strconv.Atoi(strings.TrimSpace(sv))
32+
if err != nil {
33+
return err
34+
}
35+
*v = append(*v, val)
36+
}
37+
return nil
38+
}
39+
40+
// UnmarshalYAML implements yaml.Unmarshaler.
41+
func (v *IntSliceCSV) UnmarshalYAML(unmarshal func(interface{}) error) error {
42+
var s string
43+
if err := unmarshal(&s); err != nil {
44+
return err
45+
}
46+
return v.Set(s)
47+
}
48+
49+
// MarshalYAML implements yaml.Marshaler.
50+
func (v IntSliceCSV) MarshalYAML() (interface{}, error) {
51+
result := v.String()
52+
return result, nil
53+
}

pkg/util/flagext/intslicecsv_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package flagext
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"gopkg.in/yaml.v2"
8+
)
9+
10+
func Test_IntSliceCSV(t *testing.T) {
11+
type TestStruct struct {
12+
CSV IntSliceCSV `yaml:"csv"`
13+
}
14+
15+
var testStruct TestStruct
16+
s := "4,5,6,7"
17+
assert.Nil(t, testStruct.CSV.Set(s))
18+
19+
assert.Equal(t, []int{4, 5, 6, 7}, []int(testStruct.CSV))
20+
assert.Equal(t, s, testStruct.CSV.String())
21+
22+
expected := []byte(`csv: 4,5,6,7
23+
`)
24+
25+
actual, err := yaml.Marshal(testStruct)
26+
assert.Nil(t, err)
27+
assert.Equal(t, expected, actual)
28+
29+
var testStruct2 TestStruct
30+
31+
err = yaml.Unmarshal(expected, &testStruct2)
32+
assert.Nil(t, err)
33+
assert.Equal(t, testStruct, testStruct2)
34+
}

0 commit comments

Comments
 (0)