Skip to content

disable rule groups #5521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [FEATURE] Ruler: Add support for disabling rule groups. #5521
* [FEATURE] Added the flag `-alertmanager.alerts-gc-interval` to configure alert manager alerts Garbage collection interval. #5550
* [FEATURE] Ruler: Add support for Limit field on RuleGroup. #5528
* [FEATURE] AlertManager: Add support for Webex, Discord and Telegram Receiver. #5493
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3097,6 +3097,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# alerts will fail with a log message and metric increment. 0 = no limit.
# CLI flag: -alertmanager.max-alerts-size-bytes
[alertmanager_max_alerts_size_bytes: <int> | default = 0]

# list of rule groups to disable
[disabled_rule_groups: <list of rule groups to disable> | default = ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should default be []?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document this format somehow?

We have others "complex configs" like https://cortexmetrics.io/docs/configuration/configuration-file/#memcached_config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will submit another PR for this. Existing doc generator does not handle slices of structs well

```

### `memberlist_config`
Expand Down
123 changes: 123 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package integration

import (
"bytes"
"context"
"crypto/x509"
"crypto/x509/pkix"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore/providers/s3"
"gopkg.in/yaml.v3"

"github.com/cortexproject/cortex/integration/ca"
Expand Down Expand Up @@ -915,6 +917,127 @@ func TestRulerMetricsWhenIngesterFails(t *testing.T) {
})
}

func TestRulerDisablesRuleGroups(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

const blockRangePeriod = 2 * time.Second
// Configure the ruler.
flags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),

// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "false",
// Evaluate rules often, so that we don't need to wait for metrics to show up.
"-ruler.evaluation-interval": "2s",
"-ruler.poll-interval": "2s",
// No delay
"-ruler.evaluation-delay-duration": "0",

// We run single ingester only, no replication.
"-distributor.replication-factor": "1",

// Very low limit so that ruler hits it.
"-querier.max-fetched-chunks-per-query": "15",
"-querier.query-store-after": (1 * time.Second).String(),
"-querier.query-ingesters-within": (2 * time.Second).String(),
},
)

const namespace = "test"
const user = "user"
configFileName := "runtime-config.yaml"
bucketName := "cortex"

storeGateway := e2ecortex.NewStoreGateway("store-gateway-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")

flags = mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(),
"-runtime-config.backend": "s3",
"-runtime-config.s3.access-key-id": e2edb.MinioAccessKey,
"-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey,
"-runtime-config.s3.bucket-name": bucketName,
"-runtime-config.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-runtime-config.s3.insecure": "true",
"-runtime-config.file": configFileName,
"-runtime-config.reload-period": "2s",
})

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")

client, err := s3.NewBucketWithConfig(nil, s3.Config{
Endpoint: minio.HTTPEndpoint(),
Insecure: true,
Bucket: bucketName,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
}, "runtime-config-test")

require.NoError(t, err)

// update runtime config
newRuntimeConfig := []byte(`overrides:
user:
disabled_rule_groups:
- name: bad_rule
namespace: test`)
require.NoError(t, client.Upload(context.Background(), configFileName, bytes.NewReader(newRuntimeConfig)))
time.Sleep(2 * time.Second)

ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")

ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler, storeGateway))

// Wait until both the distributor and ruler have updated the ring. The querier will also watch
// the store-gateway ring if blocks sharding is enabled.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1024), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

expression := "absent(sum_over_time(metric{}[2s] offset 1h))"

t.Run("disable_rule_group", func(t *testing.T) {

ruleGroup := ruleGroupWithRule("bad_rule", "rule", expression)
ruleGroup.Interval = 2
require.NoError(t, c.SetRuleGroup(ruleGroup, namespace))

ruleGroup = ruleGroupWithRule("good_rule", "rule", expression)
ruleGroup.Interval = 2
require.NoError(t, c.SetRuleGroup(ruleGroup, namespace))

m1 := ruleGroupMatcher(user, namespace, "good_rule")

// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_sync_rules_total"}, e2e.WaitMissingMetrics))

require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m1), e2e.WaitMissingMetrics))

filter := e2ecortex.RuleFilter{}
actualGroups, err := c.GetPrometheusRules(filter)
require.NoError(t, err)
assert.Equal(t, 1, len(actualGroups))
assert.Equal(t, "good_rule", actualGroups[0].Name)
assert.Equal(t, "test", actualGroups[0].File)
})
}

func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"time"

"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -142,6 +144,7 @@ type RulesLimits interface {
RulerTenantShardSize(userID string) int
RulerMaxRuleGroupsPerTenant(userID string) int
RulerMaxRulesPerRuleGroup(userID string) int
DisabledRuleGroups(userID string) validation.DisabledRuleGroups
}

// EngineQueryFunc returns a new engine query function by passing an altered timestamp.
Expand Down
70 changes: 60 additions & 10 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ const (
recordingRuleFilter string = "record"
)

type DisabledRuleGroupErr struct {
Message string
}

func (e *DisabledRuleGroupErr) Error() string {
return e.Message
}

// Config is the configuration for the recording rules server.
type Config struct {
// This is used for template expansion in alerts; must be a valid URL.
Expand Down Expand Up @@ -400,6 +408,17 @@ func SendAlerts(n sender, externalURL string) promRules.NotifyFunc {
}
}

func ruleGroupDisabled(ruleGroup *rulespb.RuleGroupDesc, disabledRuleGroupsForUser validation.DisabledRuleGroups) bool {
for _, disabledRuleGroupForUser := range disabledRuleGroupsForUser {
if ruleGroup.Namespace == disabledRuleGroupForUser.Namespace &&
ruleGroup.Name == disabledRuleGroupForUser.Name &&
ruleGroup.User == disabledRuleGroupForUser.User {
return true
}
}
return false
}

var sep = []byte("/")

func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 {
Expand All @@ -415,15 +434,21 @@ func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 {
return ringHasher.Sum32()
}

func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, instanceAddr string) (bool, error) {
func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, instanceAddr string) (bool, error) {

hash := tokenForGroup(g)

rlrs, err := r.Get(hash, RingOp, nil, nil, nil)
if err != nil {
return false, errors.Wrap(err, "error reading ring to verify rule group ownership")
}

return rlrs.Instances[0].Addr == instanceAddr, nil
ownsRuleGroup := rlrs.Instances[0].Addr == instanceAddr
if ownsRuleGroup && ruleGroupDisabled(g, disabledRuleGroups) {
return false, &DisabledRuleGroupErr{Message: fmt.Sprintf("rule group %s, namespace %s, user %s is disabled", g.Name, g.Namespace, g.User)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Should this return an error? i see why u did that (u wanna catch on the caller layer) but is weird to return an error on something expected.

But im ok with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this is a little bit weird... But seems like a workaround here as Go doesn't have a good way to capture such things...

}

return ownsRuleGroup, nil
}

func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -533,7 +558,26 @@ func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGr
}

func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
return r.store.ListAllRuleGroups(ctx)
allRuleGroups, err := r.store.ListAllRuleGroups(ctx)
if err != nil {
return nil, err
}
for userID, groups := range allRuleGroups {
disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID)
if len(disabledRuleGroupsForUser) == 0 {
continue
}
Comment on lines +567 to +569
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do this check right in the beginning of the method and return the original allRuleGroups value if the len is 0, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to loop thru and check if there are overrides for each user, correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh my bad... this is for all users! ok

filteredGroupsForUser := rulespb.RuleGroupList{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should we use make here to since we have an idea how many elements there will be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't know how many rule groups are disabled until the loop is complete

for _, group := range groups {
if !ruleGroupDisabled(group, disabledRuleGroupsForUser) {
filteredGroupsForUser = append(filteredGroupsForUser, group)
} else {
level.Info(r.logger).Log("msg", "rule group disabled", "name", group.Name, "namespace", group.Namespace, "user", group.User)
}
}
allRuleGroups[userID] = filteredGroupsForUser
}
return allRuleGroups, nil
}

func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
Expand All @@ -544,7 +588,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp

filteredConfigs := make(map[string]rulespb.RuleGroupList)
for userID, groups := range configs {
filtered := filterRuleGroups(userID, groups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
if len(filtered) > 0 {
filteredConfigs[userID] = filtered
}
Expand Down Expand Up @@ -602,7 +646,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID)
}

filtered := filterRuleGroups(userID, groups, userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
if len(filtered) == 0 {
continue
}
Expand All @@ -624,15 +668,21 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
//
// Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring,
// but only ring passed as parameter.
func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc {
func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc {
// Prune the rule group to only contain rules that this ruler is responsible for, based on ring.
var result []*rulespb.RuleGroupDesc
for _, g := range ruleGroups {
owned, err := instanceOwnsRuleGroup(ring, g, instanceAddr)
owned, err := instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr)
if err != nil {
ringCheckErrors.Inc()
level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
continue
switch e := err.(type) {
case *DisabledRuleGroupErr:
level.Info(log).Log("msg", e.Message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it too verbose to log it every time during filtering? And this doesn't feel like an error to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To give visibility, I think it needs to be either Info or Warn

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, all the instance will log this right even if that instance doesn't own the group? I think this will produce too much logs for large clusters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seems to be an error to me +1

continue
default:
ringCheckErrors.Inc()
level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
continue
}
}

if owned {
Expand Down
Loading