Skip to content

Improve GetRules performance #5805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [ENHANCEMENT] Query: Added `query_storage_wall_time` to Query Frontend and Ruler query stats log for wall time spent on fetching data from storage. Query evaluation is not included. #5799
* [ENHANCEMENT] Query: Added additional max query length check at Query Frontend and Ruler. Added `-querier.ignore-max-query-length` flag to disable max query length check at Querier. #5808
* [ENHANCEMENT] Querier: Add context error check when converting Metrics to SeriesSet for GetSeries on distributorQuerier. #5827
* [ENHANCEMENT] Ruler: Improve GetRules response time by refactoring mutexes and introducing a temporary rules cache in `ruler/manager.go`. #5805
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
* [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719
* [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734
Expand All @@ -38,7 +39,6 @@
* [BUGFIX] Ring DDB: Fix lifecycle for ring counting unhealthy pods as healthy. #5838
* [BUGFIX] Ring DDB: Fix region assignment. #5842


## 1.16.0 2023-11-20

* [CHANGE] AlertManager: include reason label in `cortex_alertmanager_notifications_failed_total`. #5409
Expand Down
100 changes: 77 additions & 23 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type DefaultMultiTenantManager struct {

// Structs for holding per-user Prometheus rules Managers
// and a corresponding metrics struct
userManagerMtx sync.Mutex
userManagerMtx sync.RWMutex
userManagers map[string]RulesManager
userManagerMetrics *ManagerMetrics

Expand All @@ -50,6 +50,10 @@ type DefaultMultiTenantManager struct {
configUpdatesTotal *prometheus.CounterVec
registry prometheus.Registerer
logger log.Logger

ruleCache map[string][]*promRules.Group
ruleCacheMtx sync.RWMutex
syncRuleMtx sync.Mutex
}

func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
Expand Down Expand Up @@ -85,6 +89,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
mapper: newMapper(cfg.RulePath, logger),
userManagers: map[string]RulesManager{},
userManagerMetrics: userManagerMetrics,
ruleCache: map[string][]*promRules.Group{},
managersTotal: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "ruler_managers_total",
Expand All @@ -111,15 +116,17 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
}

func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
// A lock is taken to ensure if this function is called concurrently, then each call
// returns after the call map files and check for updates
r.userManagerMtx.Lock()
defer r.userManagerMtx.Unlock()
// this is a safety lock to ensure this method is executed sequentially
r.syncRuleMtx.Lock()
defer r.syncRuleMtx.Unlock()

for userID, ruleGroup := range ruleGroups {
r.syncRulesToManager(ctx, userID, ruleGroup)
}

r.userManagerMtx.Lock()
defer r.userManagerMtx.Unlock()

// Check for deleted users and remove them
for userID, mngr := range r.userManagers {
if _, exists := ruleGroups[userID]; !exists {
Expand All @@ -142,6 +149,18 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
r.managersTotal.Set(float64(len(r.userManagers)))
}

func (r *DefaultMultiTenantManager) updateRuleCache(user string, rules []*promRules.Group) {
r.ruleCacheMtx.Lock()
defer r.ruleCacheMtx.Unlock()
r.ruleCache[user] = rules
}

func (r *DefaultMultiTenantManager) deleteRuleCache(user string) {
r.ruleCacheMtx.Lock()
defer r.ruleCacheMtx.Unlock()
delete(r.ruleCache, user)
}

// syncRulesToManager maps the rule files to disk, detects any changes and will create/update the
// the users Prometheus Rules Manager.
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
Expand All @@ -154,25 +173,25 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user
return
}

manager, exists := r.userManagers[user]
if !exists || update {
existing := true
manager := r.getRulesManager(user, ctx)
if manager == nil {
existing = false
manager = r.createRulesManager(user, ctx)
}

if manager == nil {
return
}

if !existing || update {
level.Debug(r.logger).Log("msg", "updating rules", "user", user)
r.configUpdatesTotal.WithLabelValues(user).Inc()
if !exists {
level.Debug(r.logger).Log("msg", "creating rule manager for user", "user", user)
manager, err = r.newManager(ctx, user)
if err != nil {
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err)
return
}
// manager.Run() starts running the manager and blocks until Stop() is called.
// Hence run it as another goroutine.
go manager.Run()
r.userManagers[user] = manager
if update && existing {
r.updateRuleCache(user, manager.RuleGroups())
}

err = manager.Update(r.cfg.EvaluationInterval, files, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc)
r.deleteRuleCache(user)
if err != nil {
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
level.Error(r.logger).Log("msg", "unable to update rule manager", "user", user, "err", err)
Expand All @@ -184,6 +203,29 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user
}
}

func (r *DefaultMultiTenantManager) getRulesManager(user string, ctx context.Context) RulesManager {
r.userManagerMtx.RLock()
defer r.userManagerMtx.RUnlock()
return r.userManagers[user]
}

func (r *DefaultMultiTenantManager) createRulesManager(user string, ctx context.Context) RulesManager {
r.userManagerMtx.Lock()
defer r.userManagerMtx.Unlock()

manager, err := r.newManager(ctx, user)
if err != nil {
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err)
return nil
}
// manager.Run() starts running the manager and blocks until Stop() is called.
// Hence run it as another goroutine.
go manager.Run()
r.userManagers[user] = manager
return manager
}

func ruleGroupIterationFunc(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {
logMessage := []interface{}{
"msg", "evaluating rule group",
Expand Down Expand Up @@ -269,13 +311,25 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string, userManag
return n.notifier, nil
}

func (r *DefaultMultiTenantManager) getCachedRules(userID string) ([]*promRules.Group, bool) {
r.ruleCacheMtx.RLock()
defer r.ruleCacheMtx.RUnlock()
groups, exists := r.ruleCache[userID]
return groups, exists
}

func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
var groups []*promRules.Group
r.userManagerMtx.Lock()
if mngr, exists := r.userManagers[userID]; exists {
groups, cached := r.getCachedRules(userID)
if cached {
return groups
}
r.userManagerMtx.RLock()
mngr, exists := r.userManagers[userID]
r.userManagerMtx.RUnlock()
if exists {
groups = mngr.RuleGroups()
}
r.userManagerMtx.Unlock()
return groups
}

Expand Down
172 changes: 156 additions & 16 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ruler

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -21,7 +22,14 @@ import (
func TestSyncRuleGroups(t *testing.T) {
dir := t.TempDir()

m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, nil, log.NewNopLogger())
waitDurations := []time.Duration{
1 * time.Millisecond,
1 * time.Millisecond,
}

ruleManagerFactory := RuleManagerFactory(nil, waitDurations)

m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, nil, log.NewNopLogger())
require.NoError(t, err)

const user = "testUser"
Expand Down Expand Up @@ -97,11 +105,118 @@ func TestSyncRuleGroups(t *testing.T) {
})
}

func TestSlowRuleGroupSyncDoesNotSlowdownListRules(t *testing.T) {
dir := t.TempDir()
const user = "testUser"
userRules := map[string]rulespb.RuleGroupList{
user: {
&rulespb.RuleGroupDesc{
Name: "group1",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user,
},
},
}

groupsToReturn := [][]*promRules.Group{
{
promRules.NewGroup(promRules.GroupOptions{
Name: "group1",
File: "ns",
Interval: 60,
Limit: 0,
Opts: &promRules.ManagerOptions{},
}),
},
{
promRules.NewGroup(promRules.GroupOptions{
Name: "group1",
File: "ns",
Interval: 60,
Limit: 0,
Opts: &promRules.ManagerOptions{},
}),
promRules.NewGroup(promRules.GroupOptions{
Name: "group2",
File: "ns",
Interval: 60,
Limit: 0,
Opts: &promRules.ManagerOptions{},
}),
},
}

waitDurations := []time.Duration{
5 * time.Millisecond,
1 * time.Second,
}

ruleManagerFactory := RuleManagerFactory(groupsToReturn, waitDurations)
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, prometheus.NewRegistry(), log.NewNopLogger())
require.NoError(t, err)

m.SyncRuleGroups(context.Background(), userRules)
mgr := getManager(m, user)
require.NotNil(t, mgr)

test.Poll(t, 1*time.Second, true, func() interface{} {
return mgr.(*mockRulesManager).running.Load()
})
groups := m.GetRules(user)
require.Len(t, groups, len(groupsToReturn[0]), "expected %d but got %d", len(groupsToReturn[0]), len(groups))

// update rules and call list rules concurrently
userRules = map[string]rulespb.RuleGroupList{
user: {
&rulespb.RuleGroupDesc{
Name: "group1",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user,
},
&rulespb.RuleGroupDesc{
Name: "group2",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user,
},
},
}
go m.SyncRuleGroups(context.Background(), userRules)

groups = m.GetRules(user)

require.Len(t, groups, len(groupsToReturn[0]), "expected %d but got %d", len(groupsToReturn[0]), len(groups))
test.Poll(t, 5*time.Second, len(groupsToReturn[1]), func() interface{} {
groups = m.GetRules(user)
return len(groups)
})

test.Poll(t, 1*time.Second, true, func() interface{} {
return mgr.(*mockRulesManager).running.Load()
})

m.Stop()

test.Poll(t, 1*time.Second, false, func() interface{} {
return mgr.(*mockRulesManager).running.Load()
})
}

func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
dir := t.TempDir()
reg := prometheus.NewPedanticRegistry()
evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg)
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, evalMetrics, reg, log.NewNopLogger())

waitDurations := []time.Duration{
1 * time.Millisecond,
1 * time.Millisecond,
}

ruleManagerFactory := RuleManagerFactory(nil, waitDurations)

m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
require.NoError(t, err)

const user = "testUser"
Expand Down Expand Up @@ -139,19 +254,52 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
}

func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
m.userManagerMtx.Lock()
defer m.userManagerMtx.Unlock()
m.userManagerMtx.RLock()
defer m.userManagerMtx.RUnlock()

return m.userManagers[user]
}

func factory(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
return &mockRulesManager{done: make(chan struct{})}
func RuleManagerFactory(groupsToReturn [][]*promRules.Group, waitDurations []time.Duration) ManagerFactory {
return func(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
return &mockRulesManager{
done: make(chan struct{}),
groupsToReturn: groupsToReturn,
waitDurations: waitDurations,
iteration: -1,
}
}
}

type mockRulesManager struct {
running atomic.Bool
done chan struct{}
mtx sync.Mutex
groupsToReturn [][]*promRules.Group
iteration int
waitDurations []time.Duration
running atomic.Bool
done chan struct{}
}

func (m *mockRulesManager) Update(_ time.Duration, _ []string, _ labels.Labels, _ string, _ promRules.GroupEvalIterationFunc) error {
m.mtx.Lock()
defer m.mtx.Unlock()
ticker := time.NewTicker(m.waitDurations[m.iteration+1])
select {
case <-ticker.C:
m.iteration = m.iteration + 1
return nil
case <-m.done:
return nil
}
}

func (m *mockRulesManager) RuleGroups() []*promRules.Group {
m.mtx.Lock()
defer m.mtx.Unlock()
if m.iteration < 0 {
return nil
}
return m.groupsToReturn[m.iteration]
}

func (m *mockRulesManager) Run() {
Expand All @@ -163,11 +311,3 @@ func (m *mockRulesManager) Stop() {
m.running.Store(false)
close(m.done)
}

func (m *mockRulesManager) Update(_ time.Duration, _ []string, _ labels.Labels, _ string, _ promRules.GroupEvalIterationFunc) error {
return nil
}

func (m *mockRulesManager) RuleGroups() []*promRules.Group {
return nil
}