Skip to content

Commit 3a6f761

Browse files
committed
Speed up the listing of rule groups on the Ruler Configuration API
Signed-off-by: gotjosh <[email protected]>
1 parent 56aa40c commit 3a6f761

File tree

3 files changed

+68
-30
lines changed

3 files changed

+68
-30
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
- `POST <legacy-http-prefix>/push`
4141
- `POST /push`
4242
- `POST /ingester/push`
43+
* [CHANGE] Chunks GCS object storage client uses the `fields` selector to limit the payload size. #3218
4344
* [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-user` globally, or using per-user limit `max_queriers_per_user`), each user's requests will be handled by different set of queriers. #3113
4445
* [FEATURE] Query-frontend: added `compression` config to support results cache with compression. #3217
4546
* [ENHANCEMENT] Expose additional HTTP configs for the S3 backend client. New flag are listed below: #3244
@@ -61,6 +62,7 @@
6162
* [ENHANCEMENT] Blocksconvert – Builder: retry block upload before giving up. #3245
6263
* [ENHANCEMENT] Hash ring: added instance registered timestamp to the ring. #3248
6364
* [ENHANCEMENT] Reduce tail latency by smoothing out spikes in rate of chunk flush operations. #3191
65+
* [ENHANCEMENT] Experimental Ruler API: Fetch rule groups from object storage in parallel. #3218
6466
* [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178
6567
* [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195
6668
* [BUGFIX] Handle hash-collisions in the query path. #3192

pkg/chunk/gcp/gcs_object_client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,13 @@ func (s *GCSObjectClient) PutObject(ctx context.Context, objectKey string, objec
109109
func (s *GCSObjectClient) List(ctx context.Context, prefix, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) {
110110
var storageObjects []chunk.StorageObject
111111
var commonPrefixes []chunk.StorageCommonPrefix
112+
q := &storage.Query{Prefix: prefix, Delimiter: delimiter}
113+
err := q.SetAttrSelection([]string{"Name", "Updated"})
114+
if err != nil {
115+
return nil, nil, err
116+
}
112117

113-
iter := s.bucket.Objects(ctx, &storage.Query{Prefix: prefix, Delimiter: delimiter})
118+
iter := s.bucket.Objects(ctx, q)
114119
for {
115120
if ctx.Err() != nil {
116121
return nil, nil, ctx.Err()

pkg/ruler/rules/objectclient/rule_store.go

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import (
66
"encoding/base64"
77
"io/ioutil"
88
strings "strings"
9+
"sync"
910

1011
"github.com/go-kit/kit/log/level"
1112
proto "github.com/gogo/protobuf/proto"
13+
"golang.org/x/sync/errgroup"
1214

1315
"github.com/cortexproject/cortex/pkg/chunk"
1416
"github.com/cortexproject/cortex/pkg/ruler/rules"
@@ -26,6 +28,8 @@ import (
2628

2729
const (
2830
rulePrefix = "rules/"
31+
32+
loadRuleGroupsConcurrency = 4
2933
)
3034

3135
// RuleStore allows cortex rules to be stored using an object store backend.
@@ -74,26 +78,11 @@ func (o *RuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rules.Rul
7478
return nil, err
7579
}
7680

77-
userGroupMap := map[string]rules.RuleGroupList{}
78-
for _, obj := range ruleGroupObjects {
79-
80-
user := decomposeRuleObjectKey(obj.Key)
81-
if user == "" {
82-
continue
83-
}
84-
85-
rg, err := o.getRuleGroup(ctx, obj.Key)
86-
if err != nil {
87-
return nil, err
88-
}
89-
90-
if _, exists := userGroupMap[user]; !exists {
91-
userGroupMap[user] = rules.RuleGroupList{}
92-
}
93-
userGroupMap[user] = append(userGroupMap[user], rg)
81+
if len(ruleGroupObjects) == 0 {
82+
return map[string]rules.RuleGroupList{}, nil
9483
}
9584

96-
return userGroupMap, nil
85+
return o.loadRuleGroupsConcurrently(ctx, ruleGroupObjects)
9786
}
9887

9988
// ListRuleGroups returns all the active rule groups for a user
@@ -103,18 +92,12 @@ func (o *RuleStore) ListRuleGroups(ctx context.Context, userID, namespace string
10392
return nil, err
10493
}
10594

106-
groups := []*rules.RuleGroupDesc{}
107-
for _, obj := range ruleGroupObjects {
108-
level.Debug(util.Logger).Log("msg", "listing rule group", "key", obj.Key)
109-
110-
rg, err := o.getRuleGroup(ctx, obj.Key)
111-
if err != nil {
112-
level.Error(util.Logger).Log("msg", "unable to retrieve rule group", "err", err, "key", obj.Key)
113-
return nil, err
114-
}
115-
groups = append(groups, rg)
95+
if len(ruleGroupObjects) == 0 {
96+
return rules.RuleGroupList{}, nil
11697
}
117-
return groups, nil
98+
99+
groups, err := o.loadRuleGroupsConcurrently(ctx, ruleGroupObjects)
100+
return groups[userID], err
118101
}
119102

120103
// GetRuleGroup returns the requested rule group
@@ -172,6 +155,54 @@ func (o *RuleStore) DeleteNamespace(ctx context.Context, userID, namespace strin
172155
return nil
173156
}
174157

158+
func (o *RuleStore) loadRuleGroupsConcurrently(ctx context.Context, rgObjects []chunk.StorageObject) (map[string]rules.RuleGroupList, error) {
159+
ch := make(chan string, len(rgObjects))
160+
161+
for _, obj := range rgObjects {
162+
ch <- obj.Key
163+
}
164+
close(ch)
165+
166+
mtx := sync.Mutex{}
167+
result := map[string]rules.RuleGroupList{}
168+
169+
concurrency := loadRuleGroupsConcurrency
170+
if loadRuleGroupsConcurrency < len(rgObjects) {
171+
concurrency = len(rgObjects)
172+
}
173+
174+
// Given we store one file per rule group. With this, we create a pool of workers that will
175+
// download all rule groups in parallel. We limit the number of workers to avoid a
176+
// particular user having too many rule groups rate limiting us with the object storage.
177+
g, gCtx := errgroup.WithContext(ctx)
178+
for i := 0; i < concurrency; i++ {
179+
g.Go(func() error {
180+
for key := range ch {
181+
user := decomposeRuleObjectKey(key)
182+
if user == "" {
183+
continue
184+
}
185+
186+
level.Debug(util.Logger).Log("msg", "listing rule group", "key", key, "user", user)
187+
rg, err := o.getRuleGroup(gCtx, key)
188+
if err != nil {
189+
level.Error(util.Logger).Log("msg", "failed to get rule group", "key", key, "user", user)
190+
return err
191+
}
192+
193+
mtx.Lock()
194+
result[user] = append(result[user], rg)
195+
mtx.Unlock()
196+
}
197+
198+
return nil
199+
})
200+
}
201+
202+
err := g.Wait()
203+
return result, err
204+
}
205+
175206
func generateRuleObjectKey(id, namespace, name string) string {
176207
if id == "" {
177208
return rulePrefix

0 commit comments

Comments
 (0)