From ae395cbce1f398855790acd93947502c1f599a12 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 16 Jan 2020 12:13:16 +0100 Subject: [PATCH 1/8] Allow to configure TSDB blocks sync interval Signed-off-by: Marco Pracucci --- docs/operations/blocks-storage.md | 5 +++++ pkg/querier/block.go | 2 +- pkg/storage/tsdb/config.go | 14 ++++++++------ 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 8b02d770135..5c3c28e0846 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -115,6 +115,11 @@ tsdb: # CLI flag: -experimental.tsdb.bucket-store.sync-dir [sync_dir: | default = "tsdb-sync"] + # How frequently scan the bucket to look for changes (new blocks shipped by + # ingesters and blocks removed by retention or compaction). + # CLI flag: -experimental.tsdb.bucket-store.sync-interval + [sync_interval: | default = 5m] + # Size - in bytes - of a per-tenant in-memory index cache used to speed up # blocks index lookups. # CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes diff --git a/pkg/querier/block.go b/pkg/querier/block.go index 7246b6f4eaf..e23359ce285 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -57,7 +57,7 @@ func NewBlockQuerier(cfg tsdb.Config, logLevel logging.Level, r prometheus.Regis level.Info(util.Logger).Log("msg", "successfully synchronized TSDB blocks for all users") stopc := make(chan struct{}) - go runutil.Repeat(30*time.Second, stopc, func() error { + go runutil.Repeat(cfg.BucketStore.SyncInterval, stopc, func() error { ts := time.Now() if err := us.SyncStores(context.Background()); err != nil && err != io.EOF { level.Warn(util.Logger).Log("msg", "sync stores failed", "err", err) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index b887039a945..bc9cd014d29 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -104,17 +104,19 @@ func (cfg *Config) Validate() error { // BucketStoreConfig holds the config information for Bucket Stores used by the querier type BucketStoreConfig struct { - SyncDir string `yaml:"sync_dir"` - IndexCacheSizeBytes uint64 `yaml:"index_cache_size_bytes"` - MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` - MaxSampleCount uint64 `yaml:"max_sample_count"` - MaxConcurrent int `yaml:"max_concurrent"` - BlockSyncConcurrency int `yaml:"block_sync_concurrency"` + SyncDir string `yaml:"sync_dir"` + SyncInterval time.Duration `yaml:"sync_interval"` + IndexCacheSizeBytes uint64 `yaml:"index_cache_size_bytes"` + MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` + MaxSampleCount uint64 `yaml:"max_sample_count"` + MaxConcurrent int `yaml:"max_concurrent"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency"` } // RegisterFlags registers the BucketStore flags func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.SyncDir, "experimental.tsdb.bucket-store.sync-dir", "tsdb-sync", "Directory to place synced tsdb indicies.") + f.DurationVar(&cfg.SyncInterval, "experimental.tsdb.bucket-store.sync-interval", 5*time.Minute, "How frequently scan the bucket to look for changes (new blocks shipped by ingesters and blocks removed by retention or compaction).") f.Uint64Var(&cfg.IndexCacheSizeBytes, "experimental.tsdb.bucket-store.index-cache-size-bytes", uint64(250*units.Mebibyte), "Size of index cache in bytes per tenant.") f.Uint64Var(&cfg.MaxChunkPoolBytes, "experimental.tsdb.bucket-store.max-chunk-pool-bytes", uint64(2*units.Gibibyte), "Max size of chunk pool in bytes per tenant.") f.Uint64Var(&cfg.MaxSampleCount, "experimental.tsdb.bucket-store.max-sample-count", 0, "Max number of samples (0 is no limit) per query when loading series from storage.") From c9e8863d94d34d8a04859c6051a9f06276ac4551 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 16 Jan 2020 12:16:44 +0100 Subject: [PATCH 2/8] Updated changelog Signed-off-by: Marco Pracucci --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0823a44944e..f06f8bdabd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ instructions below to upgrade your Postgres. * `--experimental.tsdb.max-tsdb-opening-concurrency-on-startup` * [ENHANCEMENT] Experimental TSDB: Added `cortex_ingester_shipper_dir_syncs_total`, `cortex_ingester_shipper_dir_sync_failures_total`, `cortex_ingester_shipper_uploads_total` and `cortex_ingester_shipper_upload_failures_total` metrics from TSDB shipper component. #1983 * [ENHANCEMENT] Experimental TSDB: Querier now exports aggregate metrics from Thanos bucket store and in memory index cache (many metrics to list, but all have `cortex_querier_bucket_store_` or `cortex_querier_blocks_index_cache_` prefix). #1996 +* [ENHANCEMENT] Experimental TSDB: Allow to configure the blocks sync interval via `-experimental.tsdb.bucket-store.sync-interval`. #1991 * [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861 * [BUGFIX] Fixed #1904 ingesters getting stuck in a LEAVING state after coming up from an ungraceful exit. #1921 * [BUGFIX] Reduce memory usage when ingester Push() errors. #1922 From 2fcce4a67ab9ef7276518298b58de898de7b3ec6 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 16 Jan 2020 12:30:08 +0100 Subject: [PATCH 3/8] Added sync interval config validation Signed-off-by: Marco Pracucci --- pkg/storage/tsdb/config.go | 14 ++++++++++++-- pkg/storage/tsdb/config_test.go | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index bc9cd014d29..42910eabb1d 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -25,7 +25,8 @@ const ( // Validation errors var ( - errUnsupportedBackend = errors.New("unsupported TSDB storage backend") + errUnsupportedBackend = errors.New("unsupported TSDB storage backend") + errInvalidSyncInterval = errors.New("the sync interval should be > 0") ) // Config holds the config information for TSDB storage @@ -99,7 +100,7 @@ func (cfg *Config) Validate() error { return errUnsupportedBackend } - return nil + return cfg.BucketStore.Validate() } // BucketStoreConfig holds the config information for Bucket Stores used by the querier @@ -129,3 +130,12 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) BlocksDir(userID string) string { return filepath.Join(cfg.Dir, userID) } + +// Validate the config +func (cfg *BucketStoreConfig) Validate() error { + if cfg.SyncInterval <= 0 { + return errInvalidSyncInterval + } + + return nil +} diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index 5aeeeba3a60..a72a329172a 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -18,21 +18,39 @@ func TestConfig_Validate(t *testing.T) { "should pass on S3 backend": { config: Config{ Backend: "s3", + BucketStore: BucketStoreConfig{ + SyncInterval: 5 * time.Minute, + }, }, expectedErr: nil, }, "should pass on GCS backend": { config: Config{ Backend: "gcs", + BucketStore: BucketStoreConfig{ + SyncInterval: 5 * time.Minute, + }, }, expectedErr: nil, }, "should pass on unknown backend": { config: Config{ Backend: "unknown", + BucketStore: BucketStoreConfig{ + SyncInterval: 5 * time.Minute, + }, }, expectedErr: errUnsupportedBackend, }, + "should fail on invalid sync interval": { + config: Config{ + Backend: "s3", + BucketStore: BucketStoreConfig{ + SyncInterval: 0, + }, + }, + expectedErr: errInvalidSyncInterval, + }, } for testName, testData := range tests { From ca4dbaeb2e17cacda32f8e7f9cf11a0c3ef6ab39 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 16 Jan 2020 12:56:16 +0100 Subject: [PATCH 4/8] Blocks sync loop refactoring and renamed cortex_querier_sync_seconds metric to cortex_querier_blocks_sync_seconds Signed-off-by: Marco Pracucci --- pkg/querier/block.go | 39 +++--------------------- pkg/querier/block_store.go | 62 +++++++++++++++++++++++++++++++++++--- 2 files changed, 62 insertions(+), 39 deletions(-) diff --git a/pkg/querier/block.go b/pkg/querier/block.go index e23359ce285..2dc3a1de978 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "time" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/encoding" @@ -17,7 +16,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/logging" "google.golang.org/grpc/metadata" @@ -25,46 +23,19 @@ import ( // BlockQuerier is a querier of thanos blocks type BlockQuerier struct { - syncTimes prometheus.Histogram - us *UserStore + us *UserStore } // NewBlockQuerier returns a client to query a block store -func NewBlockQuerier(cfg tsdb.Config, logLevel logging.Level, r prometheus.Registerer) (*BlockQuerier, error) { - b := &BlockQuerier{ - syncTimes: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "cortex_querier_sync_seconds", - Help: "The total time it takes to perform a sync stores", - Buckets: prometheus.DefBuckets, - }), - } - - us, err := NewUserStore(cfg, logLevel, util.Logger) +func NewBlockQuerier(cfg tsdb.Config, logLevel logging.Level, registerer prometheus.Registerer) (*BlockQuerier, error) { + us, err := NewUserStore(cfg, logLevel, util.Logger, registerer) if err != nil { return nil, err } - b.us = us - - if r != nil { - r.MustRegister(b.syncTimes, us.tsdbMetrics) - } - level.Info(util.Logger).Log("msg", "synchronizing TSDB blocks for all users") - if err := us.InitialSync(context.Background()); err != nil { - level.Warn(util.Logger).Log("msg", "failed to synchronize TSDB blocks", "err", err) - return nil, err + b := &BlockQuerier{ + us: us, } - level.Info(util.Logger).Log("msg", "successfully synchronized TSDB blocks for all users") - - stopc := make(chan struct{}) - go runutil.Repeat(cfg.BucketStore.SyncInterval, stopc, func() error { - ts := time.Now() - if err := us.SyncStores(context.Background()); err != nil && err != io.EOF { - level.Warn(util.Logger).Log("msg", "sync stores failed", "err", err) - } - b.syncTimes.Observe(time.Since(ts).Seconds()) - return nil - }) return b, nil } diff --git a/pkg/querier/block_store.go b/pkg/querier/block_store.go index 2e5942703a5..0783fd0ca44 100644 --- a/pkg/querier/block_store.go +++ b/pkg/querier/block_store.go @@ -3,10 +3,12 @@ package querier import ( "context" "fmt" + "io" "net" "path/filepath" "strings" "sync" + "time" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -15,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store" storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -32,10 +35,13 @@ type UserStore struct { client storepb.StoreClient logLevel logging.Level tsdbMetrics *tsdbBucketStoreMetrics + + // Metrics. + syncTimes prometheus.Histogram } // NewUserStore returns a new UserStore -func NewUserStore(cfg tsdb.Config, logLevel logging.Level, logger log.Logger) (*UserStore, error) { +func NewUserStore(cfg tsdb.Config, logLevel logging.Level, logger log.Logger, registerer prometheus.Registerer) (*UserStore, error) { bkt, err := tsdb.NewBucketClient(context.Background(), cfg, "cortex-userstore", logger) if err != nil { return nil, err @@ -48,6 +54,15 @@ func NewUserStore(cfg tsdb.Config, logLevel logging.Level, logger log.Logger) (* stores: map[string]*store.BucketStore{}, logLevel: logLevel, tsdbMetrics: newTSDBBucketStoreMetrics(), + syncTimes: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_querier_blocks_sync_seconds", + Help: "The total time it takes to perform a sync stores", + Buckets: []float64{0.1, 1, 10, 30, 60, 120, 300, 600, 900}, + }), + } + + if registerer != nil { + registerer.MustRegister(u.syncTimes, u.tsdbMetrics) } serv := grpc.NewServer() @@ -65,11 +80,22 @@ func NewUserStore(cfg tsdb.Config, logLevel logging.Level, logger log.Logger) (* u.client = storepb.NewStoreClient(cc) + // Run an initial blocks sync, required in order to be able to serve queries. + level.Info(logger).Log("msg", "synchronizing TSDB blocks for all users") + if err := u.initialSync(context.Background()); err != nil { + level.Warn(logger).Log("msg", "failed to synchronize TSDB blocks", "err", err) + return nil, err + } + level.Info(logger).Log("msg", "successfully synchronized TSDB blocks for all users") + + // Periodically sync the blocks. + go u.syncStoresLoop() + return u, nil } -// InitialSync iterates over the storage bucket creating user bucket stores, and calling InitialSync on each of them -func (u *UserStore) InitialSync(ctx context.Context) error { +// initialSync iterates over the storage bucket creating user bucket stores, and calling initialSync on each of them +func (u *UserStore) initialSync(ctx context.Context) error { if err := u.syncUserStores(ctx, func(ctx context.Context, s *store.BucketStore) error { return s.InitialSync(ctx) }); err != nil { @@ -79,8 +105,34 @@ func (u *UserStore) InitialSync(ctx context.Context) error { return nil } -// SyncStores iterates over the storage bucket creating user bucket stores -func (u *UserStore) SyncStores(ctx context.Context) error { +// syncStoresLoop periodically calls syncStores() to synchronize the blocks for all tenants. +func (u *UserStore) syncStoresLoop() { + stopc := make(chan struct{}) + + err := runutil.Repeat(u.cfg.BucketStore.SyncInterval, stopc, func() error { + ts := time.Now() + + level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users") + if err := u.syncStores(context.Background()); err != nil && err != io.EOF { + level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err) + } else { + level.Info(u.logger).Log("msg", "successfully synchronized TSDB blocks for all users") + } + + u.syncTimes.Observe(time.Since(ts).Seconds()) + return nil + }) + + if err != nil { + // This should never occur because the rununtil.Repeat() returns error + // only if the callback function returns error (which doesn't), but since + // we have to handle the error because of the linter, it's better to log it. + level.Error(u.logger).Log("msg", "blocks synchronization has been halted due to an unexpected error", "err", err) + } +} + +// syncStores iterates over the storage bucket creating user bucket stores +func (u *UserStore) syncStores(ctx context.Context) error { if err := u.syncUserStores(ctx, func(ctx context.Context, s *store.BucketStore) error { return s.SyncBlocks(ctx) }); err != nil { From 9dbad11dc530ecf8d4c7787e3f5111c4a72d228d Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 17 Jan 2020 06:01:30 +0100 Subject: [PATCH 5/8] Improved UserStore Signed-off-by: Marco Pracucci --- CHANGELOG.md | 7 +- pkg/querier/block.go | 7 +- pkg/querier/block_store.go | 270 ++++++++++++++++--------- pkg/querier/block_store_test.go | 106 ++++++++++ pkg/storage/tsdb/bucket_client_mock.go | 98 +++++++++ pkg/storage/tsdb/config.go | 13 +- 6 files changed, 390 insertions(+), 111 deletions(-) create mode 100644 pkg/querier/block_store_test.go create mode 100644 pkg/storage/tsdb/bucket_client_mock.go diff --git a/CHANGELOG.md b/CHANGELOG.md index f06f8bdabd9..00ef0552776 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,7 +34,12 @@ instructions below to upgrade your Postgres. * `--experimental.tsdb.max-tsdb-opening-concurrency-on-startup` * [ENHANCEMENT] Experimental TSDB: Added `cortex_ingester_shipper_dir_syncs_total`, `cortex_ingester_shipper_dir_sync_failures_total`, `cortex_ingester_shipper_uploads_total` and `cortex_ingester_shipper_upload_failures_total` metrics from TSDB shipper component. #1983 * [ENHANCEMENT] Experimental TSDB: Querier now exports aggregate metrics from Thanos bucket store and in memory index cache (many metrics to list, but all have `cortex_querier_bucket_store_` or `cortex_querier_blocks_index_cache_` prefix). #1996 -* [ENHANCEMENT] Experimental TSDB: Allow to configure the blocks sync interval via `-experimental.tsdb.bucket-store.sync-interval`. #1991 +* [ENHANCEMENT] Experimental TSDB: Improved multi-tenant bucket store. #1991 + * Allowed to configure the blocks sync interval via `-experimental.tsdb.bucket-store.sync-interval` (0 disables the sync) + * Limited the number of tenants concurrently synched by `-experimental.tsdb.bucket-store.block-sync-concurrency` + * Renamed `cortex_querier_sync_seconds` metric to `cortex_querier_blocks_sync_seconds` + * Track `cortex_querier_blocks_sync_seconds` metric for the initial sync too + * Fixed race condition * [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861 * [BUGFIX] Fixed #1904 ingesters getting stuck in a LEAVING state after coming up from an ungraceful exit. #1921 * [BUGFIX] Reduce memory usage when ingester Push() errors. #1922 diff --git a/pkg/querier/block.go b/pkg/querier/block.go index 2dc3a1de978..76c5d132100 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -28,7 +28,12 @@ type BlockQuerier struct { // NewBlockQuerier returns a client to query a block store func NewBlockQuerier(cfg tsdb.Config, logLevel logging.Level, registerer prometheus.Registerer) (*BlockQuerier, error) { - us, err := NewUserStore(cfg, logLevel, util.Logger, registerer) + bucketClient, err := tsdb.NewBucketClient(context.Background(), cfg, "cortex-userstore", util.Logger) + if err != nil { + return nil, err + } + + us, err := NewUserStore(cfg, bucketClient, logLevel, util.Logger, registerer) if err != nil { return nil, err } diff --git a/pkg/querier/block_store.go b/pkg/querier/block_store.go index 0783fd0ca44..89a1b9d80cc 100644 --- a/pkg/querier/block_store.go +++ b/pkg/querier/block_store.go @@ -30,30 +30,35 @@ import ( type UserStore struct { logger log.Logger cfg tsdb.Config - bucket objstore.BucketReader - stores map[string]*store.BucketStore + bucket objstore.Bucket client storepb.StoreClient logLevel logging.Level tsdbMetrics *tsdbBucketStoreMetrics + // Keeps a bucket store for each tenant. + stores map[string]*store.BucketStore + storesMu sync.RWMutex + + // Used to cancel workers and wait until done. + workers sync.WaitGroup + workersCancel context.CancelFunc + // Metrics. syncTimes prometheus.Histogram } // NewUserStore returns a new UserStore -func NewUserStore(cfg tsdb.Config, logLevel logging.Level, logger log.Logger, registerer prometheus.Registerer) (*UserStore, error) { - bkt, err := tsdb.NewBucketClient(context.Background(), cfg, "cortex-userstore", logger) - if err != nil { - return nil, err - } +func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel logging.Level, logger log.Logger, registerer prometheus.Registerer) (*UserStore, error) { + workersCtx, workersCancel := context.WithCancel(context.Background()) u := &UserStore{ - logger: logger, - cfg: cfg, - bucket: bkt, - stores: map[string]*store.BucketStore{}, - logLevel: logLevel, - tsdbMetrics: newTSDBBucketStoreMetrics(), + logger: logger, + cfg: cfg, + bucket: bucketClient, + stores: map[string]*store.BucketStore{}, + logLevel: logLevel, + tsdbMetrics: newTSDBBucketStoreMetrics(), + workersCancel: workersCancel, syncTimes: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "cortex_querier_blocks_sync_seconds", Help: "The total time it takes to perform a sync stores", @@ -80,46 +85,65 @@ func NewUserStore(cfg tsdb.Config, logLevel logging.Level, logger log.Logger, re u.client = storepb.NewStoreClient(cc) - // Run an initial blocks sync, required in order to be able to serve queries. - level.Info(logger).Log("msg", "synchronizing TSDB blocks for all users") - if err := u.initialSync(context.Background()); err != nil { - level.Warn(logger).Log("msg", "failed to synchronize TSDB blocks", "err", err) - return nil, err - } - level.Info(logger).Log("msg", "successfully synchronized TSDB blocks for all users") + // If the sync is disabled we never sync blocks, which means the bucket store + // will be empty and no series will be returned once queried. + if u.cfg.BucketStore.SyncInterval > 0 { + // Run an initial blocks sync, required in order to be able to serve queries. + if err := u.initialSync(workersCtx); err != nil { + return nil, err + } - // Periodically sync the blocks. - go u.syncStoresLoop() + // Periodically sync the blocks. + u.workers.Add(1) + go u.syncStoresLoop(workersCtx) + } return u, nil } +// Stop the blocks sync and waits until done. +func (u *UserStore) Stop() { + u.workersCancel() + u.workers.Wait() +} + // initialSync iterates over the storage bucket creating user bucket stores, and calling initialSync on each of them func (u *UserStore) initialSync(ctx context.Context) error { + level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users") + if err := u.syncUserStores(ctx, func(ctx context.Context, s *store.BucketStore) error { return s.InitialSync(ctx) }); err != nil { + level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err) return err } + level.Info(u.logger).Log("msg", "successfully synchronized TSDB blocks for all users") return nil } // syncStoresLoop periodically calls syncStores() to synchronize the blocks for all tenants. -func (u *UserStore) syncStoresLoop() { - stopc := make(chan struct{}) +func (u *UserStore) syncStoresLoop(ctx context.Context) { + defer u.workers.Done() - err := runutil.Repeat(u.cfg.BucketStore.SyncInterval, stopc, func() error { - ts := time.Now() + syncInterval := u.cfg.BucketStore.SyncInterval + // Since we've just run the initial sync, we should wait the next + // sync interval before resynching. + select { + case <-ctx.Done(): + return + case <-time.After(syncInterval): + } + + err := runutil.Repeat(syncInterval, ctx.Done(), func() error { level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users") - if err := u.syncStores(context.Background()); err != nil && err != io.EOF { + if err := u.syncStores(ctx); err != nil && err != io.EOF { level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err) } else { level.Info(u.logger).Log("msg", "successfully synchronized TSDB blocks for all users") } - u.syncTimes.Observe(time.Since(ts).Seconds()) return nil }) @@ -143,81 +167,56 @@ func (u *UserStore) syncStores(ctx context.Context) error { } func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, *store.BucketStore) error) error { - mint, maxt := &model.TimeOrDurationValue{}, &model.TimeOrDurationValue{} - mint.Set("0000-01-01T00:00:00Z") - maxt.Set("9999-12-31T23:59:59Z") + start := time.Now() + defer func() { + u.syncTimes.Observe(time.Since(start).Seconds()) + }() wg := &sync.WaitGroup{} - err := u.bucket.Iter(ctx, "", func(s string) error { - user := strings.TrimSuffix(s, "/") + jobs := make(chan struct { + userID string + store *store.BucketStore + }) - var bs *store.BucketStore - var ok bool - if bs, ok = u.stores[user]; !ok { - level.Info(u.logger).Log("msg", "creating user bucket store", "user", user) - - // Instance a new bucket used by this tenant's shipper. We're going - // to instance a new context instead of reusing the one of this function, - // because the bucket client's lifespan is longer. - bkt, err := tsdb.NewBucketClient(context.Background(), u.cfg, fmt.Sprintf("cortex-%s", user), u.logger) - if err != nil { - return err - } + // Create a pool of workers which will synchronize blocks. The pool size + // is limited in order to avoid to concurrently sync a lot of tenants in + // a large cluster. + for i := 0; i < u.cfg.BucketStore.BlockSyncConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() - // Bucket with the user wrapper - userBkt := &ingester.Bucket{ - UserID: user, - Bucket: bkt, + for job := range jobs { + if err := f(ctx, job.store); err != nil { + level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks for user", "user", job.userID, "err", err) + } } + }() + } - reg := prometheus.NewRegistry() - - indexCacheSizeBytes := u.cfg.BucketStore.IndexCacheSizeBytes - maxItemSizeBytes := indexCacheSizeBytes / 2 - indexCache, err := storecache.NewInMemoryIndexCache(u.logger, reg, storecache.Opts{ - MaxSizeBytes: indexCacheSizeBytes, - MaxItemSizeBytes: maxItemSizeBytes, - }) - if err != nil { - return err - } - bs, err = store.NewBucketStore( - u.logger, - reg, - userBkt, - filepath.Join(u.cfg.BucketStore.SyncDir, user), - indexCache, - uint64(u.cfg.BucketStore.MaxChunkPoolBytes), - u.cfg.BucketStore.MaxSampleCount, - u.cfg.BucketStore.MaxConcurrent, - u.logLevel.String() == "debug", // Turn on debug logging, if the log level is set to debug - u.cfg.BucketStore.BlockSyncConcurrency, - &store.FilterConfig{ - MinTime: *mint, - MaxTime: *maxt, - }, - nil, // No relabelling config - false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers - ) - if err != nil { - return err - } + // Iterate the bucket, lazily create a bucket store for each new user found + // and submit a sync job for each user. + err := u.bucket.Iter(ctx, "", func(s string) error { + user := strings.TrimSuffix(s, "/") - u.stores[user] = bs - u.tsdbMetrics.addUserRegistry(user, reg) + bs, err := u.getOrCreateStore(user) + if err != nil { + return err } - wg.Add(1) - go func(userID string, s *store.BucketStore) { - defer wg.Done() - if err := f(ctx, s); err != nil { - level.Warn(u.logger).Log("msg", "user sync failed", "user", userID) - } - }(user, bs) + jobs <- struct { + userID string + store *store.BucketStore + }{ + userID: user, + store: bs, + } return nil }) + // Wait until all workers completed. + close(jobs) wg.Wait() return err @@ -235,8 +234,8 @@ func (u *UserStore) Info(ctx context.Context, req *storepb.InfoRequest) (*storep return nil, fmt.Errorf("no userID") } - store, ok := u.stores[v[0]] - if !ok { + store := u.getStore(v[0]) + if store == nil { return nil, nil } @@ -255,8 +254,8 @@ func (u *UserStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesS return fmt.Errorf("no userID") } - store, ok := u.stores[v[0]] - if !ok { + store := u.getStore(v[0]) + if store == nil { return nil } @@ -275,8 +274,8 @@ func (u *UserStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReque return nil, fmt.Errorf("no userID") } - store, ok := u.stores[v[0]] - if !ok { + store := u.getStore(v[0]) + if store == nil { return nil, nil } @@ -295,10 +294,85 @@ func (u *UserStore) LabelValues(ctx context.Context, req *storepb.LabelValuesReq return nil, fmt.Errorf("no userID") } - store, ok := u.stores[v[0]] - if !ok { + store := u.getStore(v[0]) + if store == nil { return nil, nil } return store.LabelValues(ctx, req) } + +func (u *UserStore) getStore(userID string) *store.BucketStore { + u.storesMu.RLock() + store, _ := u.stores[userID] + u.storesMu.RUnlock() + + return store +} + +func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error) { + // Check if the store already exists. + bs := u.getStore(userID) + if bs != nil { + return bs, nil + } + + u.storesMu.Lock() + defer u.storesMu.Unlock() + + // Check again for the store in the event it was created in-between locks. + bs, _ = u.stores[userID] + if bs != nil { + return bs, nil + } + + level.Info(u.logger).Log("msg", "creating user bucket store", "user", userID) + + // Bucket with the user wrapper + userBkt := &ingester.Bucket{ + UserID: userID, + Bucket: u.bucket, + } + + reg := prometheus.NewRegistry() + indexCacheSizeBytes := u.cfg.BucketStore.IndexCacheSizeBytes + maxItemSizeBytes := indexCacheSizeBytes / 2 + indexCache, err := storecache.NewInMemoryIndexCache(u.logger, reg, storecache.Opts{ + MaxSizeBytes: indexCacheSizeBytes, + MaxItemSizeBytes: maxItemSizeBytes, + }) + if err != nil { + return nil, err + } + + mint, maxt := &model.TimeOrDurationValue{}, &model.TimeOrDurationValue{} + mint.Set("0000-01-01T00:00:00Z") + maxt.Set("9999-12-31T23:59:59Z") + + bs, err = store.NewBucketStore( + u.logger, + reg, + userBkt, + filepath.Join(u.cfg.BucketStore.SyncDir, userID), + indexCache, + uint64(u.cfg.BucketStore.MaxChunkPoolBytes), + u.cfg.BucketStore.MaxSampleCount, + u.cfg.BucketStore.MaxConcurrent, + u.logLevel.String() == "debug", // Turn on debug logging, if the log level is set to debug + u.cfg.BucketStore.BlockSyncConcurrency, + &store.FilterConfig{ + MinTime: *mint, + MaxTime: *maxt, + }, + nil, // No relabelling config + false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers + ) + if err != nil { + return nil, err + } + + u.stores[userID] = bs + u.tsdbMetrics.addUserRegistry(userID, reg) + + return bs, nil +} diff --git a/pkg/querier/block_store_test.go b/pkg/querier/block_store_test.go new file mode 100644 index 00000000000..b798f90d86f --- /dev/null +++ b/pkg/querier/block_store_test.go @@ -0,0 +1,106 @@ +package querier + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/go-kit/kit/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/store" + "github.com/weaveworks/common/logging" +) + +func TestUserStore_InitialSync(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + setup func(bucketClient *tsdb.BucketClientMock) + syncInterval time.Duration + expectedIter int + expectedErr error + }{ + "should sync blocks for all tenants": { + setup: func(bucketClient *tsdb.BucketClientMock) { + bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("user-1/", []string{}, nil) + bucketClient.MockIter("user-2/", []string{}, nil) + }, + syncInterval: time.Minute, + expectedIter: 3, + }, + "should not sync blocks if sync interval is 0": { + setup: func(bucketClient *tsdb.BucketClientMock) { + bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("user-1/", []string{}, nil) + bucketClient.MockIter("user-2/", []string{}, nil) + }, + syncInterval: 0, + expectedIter: 0, + }, + "should return error on initial sync failed": { + setup: func(bucketClient *tsdb.BucketClientMock) { + bucketClient.MockIter("", nil, errors.New("mocked error")) + }, + syncInterval: time.Minute, + expectedIter: 1, + expectedErr: errors.New("mocked error"), + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + cfg := tsdb.Config{} + flagext.DefaultValues(&cfg) + cfg.BucketStore.SyncInterval = testData.syncInterval + + bucketClient := &tsdb.BucketClientMock{} + testData.setup(bucketClient) + + us, err := NewUserStore(cfg, bucketClient, mockLoggingLevel(), log.NewNopLogger(), nil) + if us != nil { + defer us.Stop() + } + + require.Equal(t, err, testData.expectedErr) + bucketClient.AssertNumberOfCalls(t, "Iter", testData.expectedIter) + }) + } +} + +func TestUserStore_syncUserStores(t *testing.T) { + cfg := tsdb.Config{} + flagext.DefaultValues(&cfg) + cfg.BucketStore.BlockSyncConcurrency = 2 + + // Disable the sync interval so that there will be no initial sync. + cfg.BucketStore.SyncInterval = 0 + + bucketClient := &tsdb.BucketClientMock{} + bucketClient.MockIter("", []string{"user-1", "user-2", "user-3"}, nil) + + us, err := NewUserStore(cfg, bucketClient, mockLoggingLevel(), log.NewNopLogger(), nil) + require.NoError(t, err) + defer us.Stop() + + // Sync user stores and count the number of times the callback is called. + storesCount := int32(0) + us.syncUserStores(context.Background(), func(ctx context.Context, bs *store.BucketStore) error { + atomic.AddInt32(&storesCount, 1) + return nil + }) + + bucketClient.AssertNumberOfCalls(t, "Iter", 1) + assert.Equal(t, storesCount, int32(3)) +} + +func mockLoggingLevel() logging.Level { + level := logging.Level{} + level.Set("info") + return level +} diff --git a/pkg/storage/tsdb/bucket_client_mock.go b/pkg/storage/tsdb/bucket_client_mock.go new file mode 100644 index 00000000000..9d9b34c23b9 --- /dev/null +++ b/pkg/storage/tsdb/bucket_client_mock.go @@ -0,0 +1,98 @@ +package tsdb + +import ( + "bytes" + "context" + "errors" + "io" + "io/ioutil" + + "github.com/stretchr/testify/mock" +) + +// BucketClientMock mocks objstore.Bucket +type BucketClientMock struct { + mock.Mock +} + +// Upload mocks objstore.Bucket.Upload() +func (m *BucketClientMock) Upload(ctx context.Context, name string, r io.Reader) error { + args := m.Called(ctx, name, r) + return args.Error(0) +} + +// Delete mocks objstore.Bucket.Delete() +func (m *BucketClientMock) Delete(ctx context.Context, name string) error { + args := m.Called(ctx, name) + return args.Error(0) +} + +// Name mocks objstore.Bucket.Name() +func (m *BucketClientMock) Name() string { + return "mock" +} + +// Iter mocks objstore.Bucket.Iter() +func (m *BucketClientMock) Iter(ctx context.Context, dir string, f func(string) error) error { + args := m.Called(ctx, dir, f) + return args.Error(0) +} + +// MockIter is a convenient method to mock Iter() +func (m *BucketClientMock) MockIter(prefix string, objects []string, err error) { + m.On("Iter", mock.Anything, prefix, mock.Anything).Return(err).Run(func(args mock.Arguments) { + f := args.Get(2).(func(string) error) + + for _, o := range objects { + if f(o) != nil { + break + } + } + }) +} + +// Get mocks objstore.Bucket.Get() +func (m *BucketClientMock) Get(ctx context.Context, name string) (io.ReadCloser, error) { + args := m.Called(ctx, name) + return args.Get(0).(io.ReadCloser), args.Error(1) +} + +// MockGet is a convenient method to mock Get() and Exists() +func (m *BucketClientMock) MockGet(name, content string, err error) { + if content != "" { + m.On("Exists", mock.Anything, name).Return(true, err) + m.On("Get", mock.Anything, name).Return(ioutil.NopCloser(bytes.NewReader([]byte(content))), err) + } else { + m.On("Exists", mock.Anything, name).Return(false, err) + m.On("Get", mock.Anything, name).Return(nil, errors.New("object does not exist")) + } +} + +// GetRange mocks objstore.Bucket.GetRange() +func (m *BucketClientMock) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + args := m.Called(ctx, name, off, length) + return args.Get(0).(io.ReadCloser), args.Error(1) +} + +// Exists mocks objstore.Bucket.Exists() +func (m *BucketClientMock) Exists(ctx context.Context, name string) (bool, error) { + args := m.Called(ctx, name) + return args.Bool(0), args.Error(1) +} + +// IsObjNotFoundErr mocks objstore.Bucket.IsObjNotFoundErr() +func (m *BucketClientMock) IsObjNotFoundErr(err error) bool { + args := m.Called(err) + return args.Bool(0) +} + +// ObjectSize mocks objstore.Bucket.ObjectSize() +func (m *BucketClientMock) ObjectSize(ctx context.Context, name string) (uint64, error) { + args := m.Called(ctx, name) + return args.Get(0).(uint64), args.Error(1) +} + +// Close mocks objstore.Bucket.Close() +func (m *BucketClientMock) Close() error { + return nil +} diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 42910eabb1d..59c50ff08e0 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -100,7 +100,7 @@ func (cfg *Config) Validate() error { return errUnsupportedBackend } - return cfg.BucketStore.Validate() + return nil } // BucketStoreConfig holds the config information for Bucket Stores used by the querier @@ -117,7 +117,7 @@ type BucketStoreConfig struct { // RegisterFlags registers the BucketStore flags func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.SyncDir, "experimental.tsdb.bucket-store.sync-dir", "tsdb-sync", "Directory to place synced tsdb indicies.") - f.DurationVar(&cfg.SyncInterval, "experimental.tsdb.bucket-store.sync-interval", 5*time.Minute, "How frequently scan the bucket to look for changes (new blocks shipped by ingesters and blocks removed by retention or compaction).") + f.DurationVar(&cfg.SyncInterval, "experimental.tsdb.bucket-store.sync-interval", 5*time.Minute, "How frequently scan the bucket to look for changes (new blocks shipped by ingesters and blocks removed by retention or compaction). 0 disables it.") f.Uint64Var(&cfg.IndexCacheSizeBytes, "experimental.tsdb.bucket-store.index-cache-size-bytes", uint64(250*units.Mebibyte), "Size of index cache in bytes per tenant.") f.Uint64Var(&cfg.MaxChunkPoolBytes, "experimental.tsdb.bucket-store.max-chunk-pool-bytes", uint64(2*units.Gibibyte), "Max size of chunk pool in bytes per tenant.") f.Uint64Var(&cfg.MaxSampleCount, "experimental.tsdb.bucket-store.max-sample-count", 0, "Max number of samples (0 is no limit) per query when loading series from storage.") @@ -130,12 +130,3 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) BlocksDir(userID string) string { return filepath.Join(cfg.Dir, userID) } - -// Validate the config -func (cfg *BucketStoreConfig) Validate() error { - if cfg.SyncInterval <= 0 { - return errInvalidSyncInterval - } - - return nil -} From 6633477e987818cb1be7e9dd879c003f92471f96 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 17 Jan 2020 06:13:36 +0100 Subject: [PATCH 6/8] Fixed linter and tests Signed-off-by: Marco Pracucci --- pkg/querier/block_store.go | 23 +++++++++++++++-------- pkg/querier/block_store_test.go | 9 +++++++-- pkg/storage/tsdb/config_test.go | 18 ------------------ 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/pkg/querier/block_store.go b/pkg/querier/block_store.go index 89a1b9d80cc..1bf643951e1 100644 --- a/pkg/querier/block_store.go +++ b/pkg/querier/block_store.go @@ -35,6 +35,9 @@ type UserStore struct { logLevel logging.Level tsdbMetrics *tsdbBucketStoreMetrics + syncMint model.TimeOrDurationValue + syncMaxt model.TimeOrDurationValue + // Keeps a bucket store for each tenant. stores map[string]*store.BucketStore storesMu sync.RWMutex @@ -66,6 +69,14 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin }), } + // Configure the time range to sync all blocks. + if err := u.syncMint.Set("0000-01-01T00:00:00Z"); err != nil { + return nil, err + } + if err := u.syncMaxt.Set("9999-12-31T23:59:59Z"); err != nil { + return nil, err + } + if registerer != nil { registerer.MustRegister(u.syncTimes, u.tsdbMetrics) } @@ -304,7 +315,7 @@ func (u *UserStore) LabelValues(ctx context.Context, req *storepb.LabelValuesReq func (u *UserStore) getStore(userID string) *store.BucketStore { u.storesMu.RLock() - store, _ := u.stores[userID] + store := u.stores[userID] u.storesMu.RUnlock() return store @@ -321,7 +332,7 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error) defer u.storesMu.Unlock() // Check again for the store in the event it was created in-between locks. - bs, _ = u.stores[userID] + bs = u.stores[userID] if bs != nil { return bs, nil } @@ -345,10 +356,6 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error) return nil, err } - mint, maxt := &model.TimeOrDurationValue{}, &model.TimeOrDurationValue{} - mint.Set("0000-01-01T00:00:00Z") - maxt.Set("9999-12-31T23:59:59Z") - bs, err = store.NewBucketStore( u.logger, reg, @@ -361,8 +368,8 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error) u.logLevel.String() == "debug", // Turn on debug logging, if the log level is set to debug u.cfg.BucketStore.BlockSyncConcurrency, &store.FilterConfig{ - MinTime: *mint, - MaxTime: *maxt, + MinTime: u.syncMint, + MaxTime: u.syncMaxt, }, nil, // No relabelling config false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers diff --git a/pkg/querier/block_store_test.go b/pkg/querier/block_store_test.go index b798f90d86f..87f5741789f 100644 --- a/pkg/querier/block_store_test.go +++ b/pkg/querier/block_store_test.go @@ -90,17 +90,22 @@ func TestUserStore_syncUserStores(t *testing.T) { // Sync user stores and count the number of times the callback is called. storesCount := int32(0) - us.syncUserStores(context.Background(), func(ctx context.Context, bs *store.BucketStore) error { + err = us.syncUserStores(context.Background(), func(ctx context.Context, bs *store.BucketStore) error { atomic.AddInt32(&storesCount, 1) return nil }) + assert.NoError(t, err) bucketClient.AssertNumberOfCalls(t, "Iter", 1) assert.Equal(t, storesCount, int32(3)) } func mockLoggingLevel() logging.Level { level := logging.Level{} - level.Set("info") + err := level.Set("info") + if err != nil { + panic(err) + } + return level } diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index a72a329172a..5aeeeba3a60 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -18,39 +18,21 @@ func TestConfig_Validate(t *testing.T) { "should pass on S3 backend": { config: Config{ Backend: "s3", - BucketStore: BucketStoreConfig{ - SyncInterval: 5 * time.Minute, - }, }, expectedErr: nil, }, "should pass on GCS backend": { config: Config{ Backend: "gcs", - BucketStore: BucketStoreConfig{ - SyncInterval: 5 * time.Minute, - }, }, expectedErr: nil, }, "should pass on unknown backend": { config: Config{ Backend: "unknown", - BucketStore: BucketStoreConfig{ - SyncInterval: 5 * time.Minute, - }, }, expectedErr: errUnsupportedBackend, }, - "should fail on invalid sync interval": { - config: Config{ - Backend: "s3", - BucketStore: BucketStoreConfig{ - SyncInterval: 0, - }, - }, - expectedErr: errInvalidSyncInterval, - }, } for testName, testData := range tests { From 21ca21b38a4fc4d5ca2d79b381d14aacd502cdc3 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 17 Jan 2020 06:17:50 +0100 Subject: [PATCH 7/8] Fixed linter Signed-off-by: Marco Pracucci --- pkg/storage/tsdb/config.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 59c50ff08e0..55d3f96c7d3 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -25,8 +25,7 @@ const ( // Validation errors var ( - errUnsupportedBackend = errors.New("unsupported TSDB storage backend") - errInvalidSyncInterval = errors.New("the sync interval should be > 0") + errUnsupportedBackend = errors.New("unsupported TSDB storage backend") ) // Config holds the config information for TSDB storage From 175783c95bec136b1e0f5a9981037c01e63e2657 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 17 Jan 2020 09:09:50 +0100 Subject: [PATCH 8/8] Code cleanup after review Signed-off-by: Marco Pracucci --- pkg/querier/block_store.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/pkg/querier/block_store.go b/pkg/querier/block_store.go index 1bf643951e1..898b44cebd6 100644 --- a/pkg/querier/block_store.go +++ b/pkg/querier/block_store.go @@ -39,8 +39,8 @@ type UserStore struct { syncMaxt model.TimeOrDurationValue // Keeps a bucket store for each tenant. - stores map[string]*store.BucketStore storesMu sync.RWMutex + stores map[string]*store.BucketStore // Used to cancel workers and wait until done. workers sync.WaitGroup @@ -178,16 +178,17 @@ func (u *UserStore) syncStores(ctx context.Context) error { } func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, *store.BucketStore) error) error { - start := time.Now() - defer func() { + defer func(start time.Time) { u.syncTimes.Observe(time.Since(start).Seconds()) - }() + }(time.Now()) - wg := &sync.WaitGroup{} - jobs := make(chan struct { + type job struct { userID string store *store.BucketStore - }) + } + + wg := &sync.WaitGroup{} + jobs := make(chan job) // Create a pool of workers which will synchronize blocks. The pool size // is limited in order to avoid to concurrently sync a lot of tenants in @@ -215,10 +216,7 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, return err } - jobs <- struct { - userID string - store *store.BucketStore - }{ + jobs <- job{ userID: user, store: bs, }