diff --git a/CHANGELOG.md b/CHANGELOG.md index e72902ffd09..fa7e11df703 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,8 @@ * [ENHANCEMENT] Blocksconvert – Builder: retry block upload before giving up. #3245 * [ENHANCEMENT] Hash ring: added instance registered timestamp to the ring. #3248 * [ENHANCEMENT] Reduce tail latency by smoothing out spikes in rate of chunk flush operations. #3191 +* [ENHANCEMENT] Experimental Ruler API: Fetch rule groups from object storage in parallel. #3218 +* [ENHANCEMENT] Chunks GCS object storage client uses the `fields` selector to limit the payload size when listing objects in the bucket. #3218 * [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178 * [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195 * [BUGFIX] Handle hash-collisions in the query path. #3192 diff --git a/pkg/chunk/gcp/gcs_object_client.go b/pkg/chunk/gcp/gcs_object_client.go index 28574373f8a..11421077019 100644 --- a/pkg/chunk/gcp/gcs_object_client.go +++ b/pkg/chunk/gcp/gcs_object_client.go @@ -109,8 +109,13 @@ func (s *GCSObjectClient) PutObject(ctx context.Context, objectKey string, objec func (s *GCSObjectClient) List(ctx context.Context, prefix, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) { var storageObjects []chunk.StorageObject var commonPrefixes []chunk.StorageCommonPrefix + q := &storage.Query{Prefix: prefix, Delimiter: delimiter} + err := q.SetAttrSelection([]string{"Name", "Updated"}) + if err != nil { + return nil, nil, err + } - iter := s.bucket.Objects(ctx, &storage.Query{Prefix: prefix, Delimiter: delimiter}) + iter := s.bucket.Objects(ctx, q) for { if ctx.Err() != nil { return nil, nil, ctx.Err() diff --git a/pkg/ruler/rules/objectclient/rule_store.go b/pkg/ruler/rules/objectclient/rule_store.go index bbe04235242..834f82d7806 100644 --- a/pkg/ruler/rules/objectclient/rule_store.go +++ b/pkg/ruler/rules/objectclient/rule_store.go @@ -6,9 +6,11 @@ import ( "encoding/base64" "io/ioutil" strings "strings" + "sync" "github.com/go-kit/kit/log/level" proto "github.com/gogo/protobuf/proto" + "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ruler/rules" @@ -26,6 +28,8 @@ import ( const ( rulePrefix = "rules/" + + loadRuleGroupsConcurrency = 4 ) // RuleStore allows cortex rules to be stored using an object store backend. @@ -74,26 +78,11 @@ func (o *RuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rules.Rul return nil, err } - userGroupMap := map[string]rules.RuleGroupList{} - for _, obj := range ruleGroupObjects { - - user := decomposeRuleObjectKey(obj.Key) - if user == "" { - continue - } - - rg, err := o.getRuleGroup(ctx, obj.Key) - if err != nil { - return nil, err - } - - if _, exists := userGroupMap[user]; !exists { - userGroupMap[user] = rules.RuleGroupList{} - } - userGroupMap[user] = append(userGroupMap[user], rg) + if len(ruleGroupObjects) == 0 { + return map[string]rules.RuleGroupList{}, nil } - return userGroupMap, nil + return o.loadRuleGroupsConcurrently(ctx, ruleGroupObjects) } // ListRuleGroups returns all the active rule groups for a user @@ -103,18 +92,12 @@ func (o *RuleStore) ListRuleGroups(ctx context.Context, userID, namespace string return nil, err } - groups := []*rules.RuleGroupDesc{} - for _, obj := range ruleGroupObjects { - level.Debug(util.Logger).Log("msg", "listing rule group", "key", obj.Key) - - rg, err := o.getRuleGroup(ctx, obj.Key) - if err != nil { - level.Error(util.Logger).Log("msg", "unable to retrieve rule group", "err", err, "key", obj.Key) - return nil, err - } - groups = append(groups, rg) + if len(ruleGroupObjects) == 0 { + return rules.RuleGroupList{}, nil } - return groups, nil + + groups, err := o.loadRuleGroupsConcurrently(ctx, ruleGroupObjects) + return groups[userID], err } // GetRuleGroup returns the requested rule group @@ -172,6 +155,54 @@ func (o *RuleStore) DeleteNamespace(ctx context.Context, userID, namespace strin return nil } +func (o *RuleStore) loadRuleGroupsConcurrently(ctx context.Context, rgObjects []chunk.StorageObject) (map[string]rules.RuleGroupList, error) { + ch := make(chan string, len(rgObjects)) + + for _, obj := range rgObjects { + ch <- obj.Key + } + close(ch) + + mtx := sync.Mutex{} + result := map[string]rules.RuleGroupList{} + + concurrency := loadRuleGroupsConcurrency + if loadRuleGroupsConcurrency < len(rgObjects) { + concurrency = len(rgObjects) + } + + // Given we store one file per rule group. With this, we create a pool of workers that will + // download all rule groups in parallel. We limit the number of workers to avoid a + // particular user having too many rule groups rate limiting us with the object storage. + g, gCtx := errgroup.WithContext(ctx) + for i := 0; i < concurrency; i++ { + g.Go(func() error { + for key := range ch { + user := decomposeRuleObjectKey(key) + if user == "" { + continue + } + + level.Debug(util.Logger).Log("msg", "listing rule group", "key", key, "user", user) + rg, err := o.getRuleGroup(gCtx, key) + if err != nil { + level.Error(util.Logger).Log("msg", "failed to get rule group", "key", key, "user", user) + return err + } + + mtx.Lock() + result[user] = append(result[user], rg) + mtx.Unlock() + } + + return nil + }) + } + + err := g.Wait() + return result, err +} + func generateRuleObjectKey(id, namespace, name string) string { if id == "" { return rulePrefix