Skip to content

Commit 28bb394

Browse files
Restructure mutex such that manager is not holding one mutex for the entirety of sync rules
Signed-off-by: Anand Rajagopal <[email protected]>
1 parent d0db754 commit 28bb394

File tree

2 files changed

+228
-39
lines changed

2 files changed

+228
-39
lines changed

pkg/ruler/manager.go

Lines changed: 72 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type DefaultMultiTenantManager struct {
3535

3636
// Structs for holding per-user Prometheus rules Managers
3737
// and a corresponding metrics struct
38-
userManagerMtx sync.Mutex
38+
userManagerMtx sync.RWMutex
3939
userManagers map[string]RulesManager
4040
userManagerMetrics *ManagerMetrics
4141

@@ -50,6 +50,10 @@ type DefaultMultiTenantManager struct {
5050
configUpdatesTotal *prometheus.CounterVec
5151
registry prometheus.Registerer
5252
logger log.Logger
53+
54+
ruleCache map[string][]*promRules.Group
55+
ruleCacheMtx sync.RWMutex
56+
syncRuleMtx sync.Mutex
5357
}
5458

5559
func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
@@ -85,6 +89,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
8589
mapper: newMapper(cfg.RulePath, logger),
8690
userManagers: map[string]RulesManager{},
8791
userManagerMetrics: userManagerMetrics,
92+
ruleCache: map[string][]*promRules.Group{},
8893
managersTotal: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
8994
Namespace: "cortex",
9095
Name: "ruler_managers_total",
@@ -111,15 +116,17 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
111116
}
112117

113118
func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
114-
// A lock is taken to ensure if this function is called concurrently, then each call
115-
// returns after the call map files and check for updates
116-
r.userManagerMtx.Lock()
117-
defer r.userManagerMtx.Unlock()
119+
// this is a safety lock to ensure this method is executed sequentially
120+
r.syncRuleMtx.Lock()
121+
defer r.syncRuleMtx.Unlock()
118122

119123
for userID, ruleGroup := range ruleGroups {
120124
r.syncRulesToManager(ctx, userID, ruleGroup)
121125
}
122126

127+
r.userManagerMtx.Lock()
128+
defer r.userManagerMtx.Unlock()
129+
123130
// Check for deleted users and remove them
124131
for userID, mngr := range r.userManagers {
125132
if _, exists := ruleGroups[userID]; !exists {
@@ -142,6 +149,18 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
142149
r.managersTotal.Set(float64(len(r.userManagers)))
143150
}
144151

152+
func (r *DefaultMultiTenantManager) updateRuleCache(user string, rules []*promRules.Group) {
153+
r.ruleCacheMtx.Lock()
154+
defer r.ruleCacheMtx.Unlock()
155+
r.ruleCache[user] = rules
156+
}
157+
158+
func (r *DefaultMultiTenantManager) deleteRuleCache(user string) {
159+
r.ruleCacheMtx.Lock()
160+
defer r.ruleCacheMtx.Unlock()
161+
delete(r.ruleCache, user)
162+
}
163+
145164
// syncRulesToManager maps the rule files to disk, detects any changes and will create/update the
146165
// the users Prometheus Rules Manager.
147166
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
@@ -154,25 +173,20 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user
154173
return
155174
}
156175

157-
manager, exists := r.userManagers[user]
158-
if !exists || update {
176+
manager, existing := r.getRulesManager(user, ctx)
177+
178+
if manager == nil {
179+
return
180+
}
181+
182+
if !existing || update {
159183
level.Debug(r.logger).Log("msg", "updating rules", "user", user)
160184
r.configUpdatesTotal.WithLabelValues(user).Inc()
161-
if !exists {
162-
level.Debug(r.logger).Log("msg", "creating rule manager for user", "user", user)
163-
manager, err = r.newManager(ctx, user)
164-
if err != nil {
165-
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
166-
level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err)
167-
return
168-
}
169-
// manager.Run() starts running the manager and blocks until Stop() is called.
170-
// Hence run it as another goroutine.
171-
go manager.Run()
172-
r.userManagers[user] = manager
185+
if update && existing {
186+
r.updateRuleCache(user, manager.RuleGroups())
173187
}
174-
175188
err = manager.Update(r.cfg.EvaluationInterval, files, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc)
189+
r.deleteRuleCache(user)
176190
if err != nil {
177191
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
178192
level.Error(r.logger).Log("msg", "unable to update rule manager", "user", user, "err", err)
@@ -184,6 +198,29 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user
184198
}
185199
}
186200

201+
func (r *DefaultMultiTenantManager) getRulesManager(user string, ctx context.Context) (RulesManager, bool) {
202+
r.userManagerMtx.RLock()
203+
manager, exists := r.userManagers[user]
204+
r.userManagerMtx.RUnlock()
205+
if exists {
206+
return manager, true
207+
}
208+
r.userManagerMtx.Lock()
209+
defer r.userManagerMtx.Unlock()
210+
211+
manager, err := r.newManager(ctx, user)
212+
if err != nil {
213+
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
214+
level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err)
215+
return nil, false
216+
}
217+
// manager.Run() starts running the manager and blocks until Stop() is called.
218+
// Hence run it as another goroutine.
219+
go manager.Run()
220+
r.userManagers[user] = manager
221+
return manager, false
222+
}
223+
187224
func ruleGroupIterationFunc(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {
188225
logMessage := []interface{}{
189226
"msg", "evaluating rule group",
@@ -269,13 +306,25 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string, userManag
269306
return n.notifier, nil
270307
}
271308

309+
func (r *DefaultMultiTenantManager) getCachedRules(userID string) ([]*promRules.Group, bool) {
310+
r.ruleCacheMtx.RLock()
311+
defer r.ruleCacheMtx.RUnlock()
312+
groups, exists := r.ruleCache[userID]
313+
return groups, exists
314+
}
315+
272316
func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
273317
var groups []*promRules.Group
274-
r.userManagerMtx.Lock()
275-
if mngr, exists := r.userManagers[userID]; exists {
318+
groups, cached := r.getCachedRules(userID)
319+
if cached {
320+
return groups
321+
}
322+
r.userManagerMtx.RLock()
323+
mngr, exists := r.userManagers[userID]
324+
r.userManagerMtx.RUnlock()
325+
if exists {
276326
groups = mngr.RuleGroups()
277327
}
278-
r.userManagerMtx.Unlock()
279328
return groups
280329
}
281330

pkg/ruler/manager_test.go

Lines changed: 156 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ruler
22

33
import (
44
"context"
5+
"sync"
56
"testing"
67
"time"
78

@@ -21,7 +22,14 @@ import (
2122
func TestSyncRuleGroups(t *testing.T) {
2223
dir := t.TempDir()
2324

24-
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, nil, log.NewNopLogger())
25+
waitDurations := []time.Duration{
26+
1 * time.Millisecond,
27+
1 * time.Millisecond,
28+
}
29+
30+
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
31+
32+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, nil, log.NewNopLogger())
2533
require.NoError(t, err)
2634

2735
const user = "testUser"
@@ -97,11 +105,118 @@ func TestSyncRuleGroups(t *testing.T) {
97105
})
98106
}
99107

108+
func TestSlowRuleGroupSyncDoesNotSlowdownListRules(t *testing.T) {
109+
dir := t.TempDir()
110+
const user = "testUser"
111+
userRules := map[string]rulespb.RuleGroupList{
112+
user: {
113+
&rulespb.RuleGroupDesc{
114+
Name: "group1",
115+
Namespace: "ns",
116+
Interval: 1 * time.Minute,
117+
User: user,
118+
},
119+
},
120+
}
121+
122+
groupsToReturn := [][]*promRules.Group{
123+
{
124+
promRules.NewGroup(promRules.GroupOptions{
125+
Name: "group1",
126+
File: "ns",
127+
Interval: 60,
128+
Limit: 0,
129+
Opts: &promRules.ManagerOptions{},
130+
}),
131+
},
132+
{
133+
promRules.NewGroup(promRules.GroupOptions{
134+
Name: "group1",
135+
File: "ns",
136+
Interval: 60,
137+
Limit: 0,
138+
Opts: &promRules.ManagerOptions{},
139+
}),
140+
promRules.NewGroup(promRules.GroupOptions{
141+
Name: "group2",
142+
File: "ns",
143+
Interval: 60,
144+
Limit: 0,
145+
Opts: &promRules.ManagerOptions{},
146+
}),
147+
},
148+
}
149+
150+
waitDurations := []time.Duration{
151+
5 * time.Millisecond,
152+
1 * time.Second,
153+
}
154+
155+
ruleManagerFactory := RuleManagerFactory(groupsToReturn, waitDurations)
156+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, prometheus.NewRegistry(), log.NewNopLogger())
157+
require.NoError(t, err)
158+
159+
m.SyncRuleGroups(context.Background(), userRules)
160+
mgr := getManager(m, user)
161+
require.NotNil(t, mgr)
162+
163+
test.Poll(t, 1*time.Second, true, func() interface{} {
164+
return mgr.(*mockRulesManager).running.Load()
165+
})
166+
groups := m.GetRules(user)
167+
require.Len(t, groups, len(groupsToReturn[0]), "expected %d but got %d", len(groupsToReturn[0]), len(groups))
168+
169+
// update rules and call list rules concurrently
170+
userRules = map[string]rulespb.RuleGroupList{
171+
user: {
172+
&rulespb.RuleGroupDesc{
173+
Name: "group1",
174+
Namespace: "ns",
175+
Interval: 1 * time.Minute,
176+
User: user,
177+
},
178+
&rulespb.RuleGroupDesc{
179+
Name: "group2",
180+
Namespace: "ns",
181+
Interval: 1 * time.Minute,
182+
User: user,
183+
},
184+
},
185+
}
186+
go m.SyncRuleGroups(context.Background(), userRules)
187+
188+
groups = m.GetRules(user)
189+
190+
require.Len(t, groups, len(groupsToReturn[0]), "expected %d but got %d", len(groupsToReturn[0]), len(groups))
191+
test.Poll(t, 5*time.Second, len(groupsToReturn[1]), func() interface{} {
192+
groups = m.GetRules(user)
193+
return len(groups)
194+
})
195+
196+
test.Poll(t, 1*time.Second, true, func() interface{} {
197+
return mgr.(*mockRulesManager).running.Load()
198+
})
199+
200+
m.Stop()
201+
202+
test.Poll(t, 1*time.Second, false, func() interface{} {
203+
return mgr.(*mockRulesManager).running.Load()
204+
})
205+
}
206+
100207
func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
101208
dir := t.TempDir()
102209
reg := prometheus.NewPedanticRegistry()
103210
evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg)
104-
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, evalMetrics, reg, log.NewNopLogger())
211+
212+
waitDurations := []time.Duration{
213+
1 * time.Millisecond,
214+
1 * time.Millisecond,
215+
}
216+
217+
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
218+
219+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
105220
require.NoError(t, err)
106221

107222
const user = "testUser"
@@ -139,19 +254,52 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
139254
}
140255

141256
func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
142-
m.userManagerMtx.Lock()
143-
defer m.userManagerMtx.Unlock()
257+
m.userManagerMtx.RLock()
258+
defer m.userManagerMtx.RUnlock()
144259

145260
return m.userManagers[user]
146261
}
147262

148-
func factory(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
149-
return &mockRulesManager{done: make(chan struct{})}
263+
func RuleManagerFactory(groupsToReturn [][]*promRules.Group, waitDurations []time.Duration) ManagerFactory {
264+
return func(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
265+
return &mockRulesManager{
266+
done: make(chan struct{}),
267+
groupsToReturn: groupsToReturn,
268+
waitDurations: waitDurations,
269+
iteration: -1,
270+
}
271+
}
150272
}
151273

152274
type mockRulesManager struct {
153-
running atomic.Bool
154-
done chan struct{}
275+
mtx sync.Mutex
276+
groupsToReturn [][]*promRules.Group
277+
iteration int
278+
waitDurations []time.Duration
279+
running atomic.Bool
280+
done chan struct{}
281+
}
282+
283+
func (m *mockRulesManager) Update(_ time.Duration, _ []string, _ labels.Labels, _ string, _ promRules.GroupEvalIterationFunc) error {
284+
m.mtx.Lock()
285+
defer m.mtx.Unlock()
286+
ticker := time.NewTicker(m.waitDurations[m.iteration+1])
287+
select {
288+
case <-ticker.C:
289+
m.iteration = m.iteration + 1
290+
return nil
291+
case <-m.done:
292+
return nil
293+
}
294+
}
295+
296+
func (m *mockRulesManager) RuleGroups() []*promRules.Group {
297+
m.mtx.Lock()
298+
defer m.mtx.Unlock()
299+
if m.iteration < 0 {
300+
return nil
301+
}
302+
return m.groupsToReturn[m.iteration]
155303
}
156304

157305
func (m *mockRulesManager) Run() {
@@ -163,11 +311,3 @@ func (m *mockRulesManager) Stop() {
163311
m.running.Store(false)
164312
close(m.done)
165313
}
166-
167-
func (m *mockRulesManager) Update(_ time.Duration, _ []string, _ labels.Labels, _ string, _ promRules.GroupEvalIterationFunc) error {
168-
return nil
169-
}
170-
171-
func (m *mockRulesManager) RuleGroups() []*promRules.Group {
172-
return nil
173-
}

0 commit comments

Comments
 (0)