diff --git a/CHANGELOG.md b/CHANGELOG.md index 0823a44944e..00ef0552776 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,12 @@ instructions below to upgrade your Postgres. * `--experimental.tsdb.max-tsdb-opening-concurrency-on-startup` * [ENHANCEMENT] Experimental TSDB: Added `cortex_ingester_shipper_dir_syncs_total`, `cortex_ingester_shipper_dir_sync_failures_total`, `cortex_ingester_shipper_uploads_total` and `cortex_ingester_shipper_upload_failures_total` metrics from TSDB shipper component. #1983 * [ENHANCEMENT] Experimental TSDB: Querier now exports aggregate metrics from Thanos bucket store and in memory index cache (many metrics to list, but all have `cortex_querier_bucket_store_` or `cortex_querier_blocks_index_cache_` prefix). #1996 +* [ENHANCEMENT] Experimental TSDB: Improved multi-tenant bucket store. #1991 + * Allowed to configure the blocks sync interval via `-experimental.tsdb.bucket-store.sync-interval` (0 disables the sync) + * Limited the number of tenants concurrently synched by `-experimental.tsdb.bucket-store.block-sync-concurrency` + * Renamed `cortex_querier_sync_seconds` metric to `cortex_querier_blocks_sync_seconds` + * Track `cortex_querier_blocks_sync_seconds` metric for the initial sync too + * Fixed race condition * [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861 * [BUGFIX] Fixed #1904 ingesters getting stuck in a LEAVING state after coming up from an ungraceful exit. #1921 * [BUGFIX] Reduce memory usage when ingester Push() errors. #1922 diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 8b02d770135..5c3c28e0846 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -115,6 +115,11 @@ tsdb: # CLI flag: -experimental.tsdb.bucket-store.sync-dir [sync_dir: | default = "tsdb-sync"] + # How frequently scan the bucket to look for changes (new blocks shipped by + # ingesters and blocks removed by retention or compaction). + # CLI flag: -experimental.tsdb.bucket-store.sync-interval + [sync_interval: | default = 5m] + # Size - in bytes - of a per-tenant in-memory index cache used to speed up # blocks index lookups. # CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes diff --git a/pkg/querier/block.go b/pkg/querier/block.go index 7246b6f4eaf..76c5d132100 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "time" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/encoding" @@ -17,7 +16,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/logging" "google.golang.org/grpc/metadata" @@ -25,46 +23,24 @@ import ( // BlockQuerier is a querier of thanos blocks type BlockQuerier struct { - syncTimes prometheus.Histogram - us *UserStore + us *UserStore } // NewBlockQuerier returns a client to query a block store -func NewBlockQuerier(cfg tsdb.Config, logLevel logging.Level, r prometheus.Registerer) (*BlockQuerier, error) { - b := &BlockQuerier{ - syncTimes: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "cortex_querier_sync_seconds", - Help: "The total time it takes to perform a sync stores", - Buckets: prometheus.DefBuckets, - }), - } - - us, err := NewUserStore(cfg, logLevel, util.Logger) +func NewBlockQuerier(cfg tsdb.Config, logLevel logging.Level, registerer prometheus.Registerer) (*BlockQuerier, error) { + bucketClient, err := tsdb.NewBucketClient(context.Background(), cfg, "cortex-userstore", util.Logger) if err != nil { return nil, err } - b.us = us - - if r != nil { - r.MustRegister(b.syncTimes, us.tsdbMetrics) - } - level.Info(util.Logger).Log("msg", "synchronizing TSDB blocks for all users") - if err := us.InitialSync(context.Background()); err != nil { - level.Warn(util.Logger).Log("msg", "failed to synchronize TSDB blocks", "err", err) + us, err := NewUserStore(cfg, bucketClient, logLevel, util.Logger, registerer) + if err != nil { return nil, err } - level.Info(util.Logger).Log("msg", "successfully synchronized TSDB blocks for all users") - stopc := make(chan struct{}) - go runutil.Repeat(30*time.Second, stopc, func() error { - ts := time.Now() - if err := us.SyncStores(context.Background()); err != nil && err != io.EOF { - level.Warn(util.Logger).Log("msg", "sync stores failed", "err", err) - } - b.syncTimes.Observe(time.Since(ts).Seconds()) - return nil - }) + b := &BlockQuerier{ + us: us, + } return b, nil } diff --git a/pkg/querier/block_store.go b/pkg/querier/block_store.go index 2e5942703a5..898b44cebd6 100644 --- a/pkg/querier/block_store.go +++ b/pkg/querier/block_store.go @@ -3,10 +3,12 @@ package querier import ( "context" "fmt" + "io" "net" "path/filepath" "strings" "sync" + "time" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -15,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store" storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -27,27 +30,55 @@ import ( type UserStore struct { logger log.Logger cfg tsdb.Config - bucket objstore.BucketReader - stores map[string]*store.BucketStore + bucket objstore.Bucket client storepb.StoreClient logLevel logging.Level tsdbMetrics *tsdbBucketStoreMetrics + + syncMint model.TimeOrDurationValue + syncMaxt model.TimeOrDurationValue + + // Keeps a bucket store for each tenant. + storesMu sync.RWMutex + stores map[string]*store.BucketStore + + // Used to cancel workers and wait until done. + workers sync.WaitGroup + workersCancel context.CancelFunc + + // Metrics. + syncTimes prometheus.Histogram } // NewUserStore returns a new UserStore -func NewUserStore(cfg tsdb.Config, logLevel logging.Level, logger log.Logger) (*UserStore, error) { - bkt, err := tsdb.NewBucketClient(context.Background(), cfg, "cortex-userstore", logger) - if err != nil { +func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel logging.Level, logger log.Logger, registerer prometheus.Registerer) (*UserStore, error) { + workersCtx, workersCancel := context.WithCancel(context.Background()) + + u := &UserStore{ + logger: logger, + cfg: cfg, + bucket: bucketClient, + stores: map[string]*store.BucketStore{}, + logLevel: logLevel, + tsdbMetrics: newTSDBBucketStoreMetrics(), + workersCancel: workersCancel, + syncTimes: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_querier_blocks_sync_seconds", + Help: "The total time it takes to perform a sync stores", + Buckets: []float64{0.1, 1, 10, 30, 60, 120, 300, 600, 900}, + }), + } + + // Configure the time range to sync all blocks. + if err := u.syncMint.Set("0000-01-01T00:00:00Z"); err != nil { + return nil, err + } + if err := u.syncMaxt.Set("9999-12-31T23:59:59Z"); err != nil { return nil, err } - u := &UserStore{ - logger: logger, - cfg: cfg, - bucket: bkt, - stores: map[string]*store.BucketStore{}, - logLevel: logLevel, - tsdbMetrics: newTSDBBucketStoreMetrics(), + if registerer != nil { + registerer.MustRegister(u.syncTimes, u.tsdbMetrics) } serv := grpc.NewServer() @@ -65,22 +96,78 @@ func NewUserStore(cfg tsdb.Config, logLevel logging.Level, logger log.Logger) (* u.client = storepb.NewStoreClient(cc) + // If the sync is disabled we never sync blocks, which means the bucket store + // will be empty and no series will be returned once queried. + if u.cfg.BucketStore.SyncInterval > 0 { + // Run an initial blocks sync, required in order to be able to serve queries. + if err := u.initialSync(workersCtx); err != nil { + return nil, err + } + + // Periodically sync the blocks. + u.workers.Add(1) + go u.syncStoresLoop(workersCtx) + } + return u, nil } -// InitialSync iterates over the storage bucket creating user bucket stores, and calling InitialSync on each of them -func (u *UserStore) InitialSync(ctx context.Context) error { +// Stop the blocks sync and waits until done. +func (u *UserStore) Stop() { + u.workersCancel() + u.workers.Wait() +} + +// initialSync iterates over the storage bucket creating user bucket stores, and calling initialSync on each of them +func (u *UserStore) initialSync(ctx context.Context) error { + level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users") + if err := u.syncUserStores(ctx, func(ctx context.Context, s *store.BucketStore) error { return s.InitialSync(ctx) }); err != nil { + level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err) return err } + level.Info(u.logger).Log("msg", "successfully synchronized TSDB blocks for all users") return nil } -// SyncStores iterates over the storage bucket creating user bucket stores -func (u *UserStore) SyncStores(ctx context.Context) error { +// syncStoresLoop periodically calls syncStores() to synchronize the blocks for all tenants. +func (u *UserStore) syncStoresLoop(ctx context.Context) { + defer u.workers.Done() + + syncInterval := u.cfg.BucketStore.SyncInterval + + // Since we've just run the initial sync, we should wait the next + // sync interval before resynching. + select { + case <-ctx.Done(): + return + case <-time.After(syncInterval): + } + + err := runutil.Repeat(syncInterval, ctx.Done(), func() error { + level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users") + if err := u.syncStores(ctx); err != nil && err != io.EOF { + level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err) + } else { + level.Info(u.logger).Log("msg", "successfully synchronized TSDB blocks for all users") + } + + return nil + }) + + if err != nil { + // This should never occur because the rununtil.Repeat() returns error + // only if the callback function returns error (which doesn't), but since + // we have to handle the error because of the linter, it's better to log it. + level.Error(u.logger).Log("msg", "blocks synchronization has been halted due to an unexpected error", "err", err) + } +} + +// syncStores iterates over the storage bucket creating user bucket stores +func (u *UserStore) syncStores(ctx context.Context) error { if err := u.syncUserStores(ctx, func(ctx context.Context, s *store.BucketStore) error { return s.SyncBlocks(ctx) }); err != nil { @@ -91,81 +178,54 @@ func (u *UserStore) SyncStores(ctx context.Context) error { } func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, *store.BucketStore) error) error { - mint, maxt := &model.TimeOrDurationValue{}, &model.TimeOrDurationValue{} - mint.Set("0000-01-01T00:00:00Z") - maxt.Set("9999-12-31T23:59:59Z") + defer func(start time.Time) { + u.syncTimes.Observe(time.Since(start).Seconds()) + }(time.Now()) + + type job struct { + userID string + store *store.BucketStore + } wg := &sync.WaitGroup{} - err := u.bucket.Iter(ctx, "", func(s string) error { - user := strings.TrimSuffix(s, "/") + jobs := make(chan job) - var bs *store.BucketStore - var ok bool - if bs, ok = u.stores[user]; !ok { - level.Info(u.logger).Log("msg", "creating user bucket store", "user", user) - - // Instance a new bucket used by this tenant's shipper. We're going - // to instance a new context instead of reusing the one of this function, - // because the bucket client's lifespan is longer. - bkt, err := tsdb.NewBucketClient(context.Background(), u.cfg, fmt.Sprintf("cortex-%s", user), u.logger) - if err != nil { - return err - } + // Create a pool of workers which will synchronize blocks. The pool size + // is limited in order to avoid to concurrently sync a lot of tenants in + // a large cluster. + for i := 0; i < u.cfg.BucketStore.BlockSyncConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() - // Bucket with the user wrapper - userBkt := &ingester.Bucket{ - UserID: user, - Bucket: bkt, + for job := range jobs { + if err := f(ctx, job.store); err != nil { + level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks for user", "user", job.userID, "err", err) + } } + }() + } - reg := prometheus.NewRegistry() - - indexCacheSizeBytes := u.cfg.BucketStore.IndexCacheSizeBytes - maxItemSizeBytes := indexCacheSizeBytes / 2 - indexCache, err := storecache.NewInMemoryIndexCache(u.logger, reg, storecache.Opts{ - MaxSizeBytes: indexCacheSizeBytes, - MaxItemSizeBytes: maxItemSizeBytes, - }) - if err != nil { - return err - } - bs, err = store.NewBucketStore( - u.logger, - reg, - userBkt, - filepath.Join(u.cfg.BucketStore.SyncDir, user), - indexCache, - uint64(u.cfg.BucketStore.MaxChunkPoolBytes), - u.cfg.BucketStore.MaxSampleCount, - u.cfg.BucketStore.MaxConcurrent, - u.logLevel.String() == "debug", // Turn on debug logging, if the log level is set to debug - u.cfg.BucketStore.BlockSyncConcurrency, - &store.FilterConfig{ - MinTime: *mint, - MaxTime: *maxt, - }, - nil, // No relabelling config - false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers - ) - if err != nil { - return err - } + // Iterate the bucket, lazily create a bucket store for each new user found + // and submit a sync job for each user. + err := u.bucket.Iter(ctx, "", func(s string) error { + user := strings.TrimSuffix(s, "/") - u.stores[user] = bs - u.tsdbMetrics.addUserRegistry(user, reg) + bs, err := u.getOrCreateStore(user) + if err != nil { + return err } - wg.Add(1) - go func(userID string, s *store.BucketStore) { - defer wg.Done() - if err := f(ctx, s); err != nil { - level.Warn(u.logger).Log("msg", "user sync failed", "user", userID) - } - }(user, bs) + jobs <- job{ + userID: user, + store: bs, + } return nil }) + // Wait until all workers completed. + close(jobs) wg.Wait() return err @@ -183,8 +243,8 @@ func (u *UserStore) Info(ctx context.Context, req *storepb.InfoRequest) (*storep return nil, fmt.Errorf("no userID") } - store, ok := u.stores[v[0]] - if !ok { + store := u.getStore(v[0]) + if store == nil { return nil, nil } @@ -203,8 +263,8 @@ func (u *UserStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesS return fmt.Errorf("no userID") } - store, ok := u.stores[v[0]] - if !ok { + store := u.getStore(v[0]) + if store == nil { return nil } @@ -223,8 +283,8 @@ func (u *UserStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReque return nil, fmt.Errorf("no userID") } - store, ok := u.stores[v[0]] - if !ok { + store := u.getStore(v[0]) + if store == nil { return nil, nil } @@ -243,10 +303,81 @@ func (u *UserStore) LabelValues(ctx context.Context, req *storepb.LabelValuesReq return nil, fmt.Errorf("no userID") } - store, ok := u.stores[v[0]] - if !ok { + store := u.getStore(v[0]) + if store == nil { return nil, nil } return store.LabelValues(ctx, req) } + +func (u *UserStore) getStore(userID string) *store.BucketStore { + u.storesMu.RLock() + store := u.stores[userID] + u.storesMu.RUnlock() + + return store +} + +func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error) { + // Check if the store already exists. + bs := u.getStore(userID) + if bs != nil { + return bs, nil + } + + u.storesMu.Lock() + defer u.storesMu.Unlock() + + // Check again for the store in the event it was created in-between locks. + bs = u.stores[userID] + if bs != nil { + return bs, nil + } + + level.Info(u.logger).Log("msg", "creating user bucket store", "user", userID) + + // Bucket with the user wrapper + userBkt := &ingester.Bucket{ + UserID: userID, + Bucket: u.bucket, + } + + reg := prometheus.NewRegistry() + indexCacheSizeBytes := u.cfg.BucketStore.IndexCacheSizeBytes + maxItemSizeBytes := indexCacheSizeBytes / 2 + indexCache, err := storecache.NewInMemoryIndexCache(u.logger, reg, storecache.Opts{ + MaxSizeBytes: indexCacheSizeBytes, + MaxItemSizeBytes: maxItemSizeBytes, + }) + if err != nil { + return nil, err + } + + bs, err = store.NewBucketStore( + u.logger, + reg, + userBkt, + filepath.Join(u.cfg.BucketStore.SyncDir, userID), + indexCache, + uint64(u.cfg.BucketStore.MaxChunkPoolBytes), + u.cfg.BucketStore.MaxSampleCount, + u.cfg.BucketStore.MaxConcurrent, + u.logLevel.String() == "debug", // Turn on debug logging, if the log level is set to debug + u.cfg.BucketStore.BlockSyncConcurrency, + &store.FilterConfig{ + MinTime: u.syncMint, + MaxTime: u.syncMaxt, + }, + nil, // No relabelling config + false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers + ) + if err != nil { + return nil, err + } + + u.stores[userID] = bs + u.tsdbMetrics.addUserRegistry(userID, reg) + + return bs, nil +} diff --git a/pkg/querier/block_store_test.go b/pkg/querier/block_store_test.go new file mode 100644 index 00000000000..87f5741789f --- /dev/null +++ b/pkg/querier/block_store_test.go @@ -0,0 +1,111 @@ +package querier + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/go-kit/kit/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/store" + "github.com/weaveworks/common/logging" +) + +func TestUserStore_InitialSync(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + setup func(bucketClient *tsdb.BucketClientMock) + syncInterval time.Duration + expectedIter int + expectedErr error + }{ + "should sync blocks for all tenants": { + setup: func(bucketClient *tsdb.BucketClientMock) { + bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("user-1/", []string{}, nil) + bucketClient.MockIter("user-2/", []string{}, nil) + }, + syncInterval: time.Minute, + expectedIter: 3, + }, + "should not sync blocks if sync interval is 0": { + setup: func(bucketClient *tsdb.BucketClientMock) { + bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("user-1/", []string{}, nil) + bucketClient.MockIter("user-2/", []string{}, nil) + }, + syncInterval: 0, + expectedIter: 0, + }, + "should return error on initial sync failed": { + setup: func(bucketClient *tsdb.BucketClientMock) { + bucketClient.MockIter("", nil, errors.New("mocked error")) + }, + syncInterval: time.Minute, + expectedIter: 1, + expectedErr: errors.New("mocked error"), + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + cfg := tsdb.Config{} + flagext.DefaultValues(&cfg) + cfg.BucketStore.SyncInterval = testData.syncInterval + + bucketClient := &tsdb.BucketClientMock{} + testData.setup(bucketClient) + + us, err := NewUserStore(cfg, bucketClient, mockLoggingLevel(), log.NewNopLogger(), nil) + if us != nil { + defer us.Stop() + } + + require.Equal(t, err, testData.expectedErr) + bucketClient.AssertNumberOfCalls(t, "Iter", testData.expectedIter) + }) + } +} + +func TestUserStore_syncUserStores(t *testing.T) { + cfg := tsdb.Config{} + flagext.DefaultValues(&cfg) + cfg.BucketStore.BlockSyncConcurrency = 2 + + // Disable the sync interval so that there will be no initial sync. + cfg.BucketStore.SyncInterval = 0 + + bucketClient := &tsdb.BucketClientMock{} + bucketClient.MockIter("", []string{"user-1", "user-2", "user-3"}, nil) + + us, err := NewUserStore(cfg, bucketClient, mockLoggingLevel(), log.NewNopLogger(), nil) + require.NoError(t, err) + defer us.Stop() + + // Sync user stores and count the number of times the callback is called. + storesCount := int32(0) + err = us.syncUserStores(context.Background(), func(ctx context.Context, bs *store.BucketStore) error { + atomic.AddInt32(&storesCount, 1) + return nil + }) + + assert.NoError(t, err) + bucketClient.AssertNumberOfCalls(t, "Iter", 1) + assert.Equal(t, storesCount, int32(3)) +} + +func mockLoggingLevel() logging.Level { + level := logging.Level{} + err := level.Set("info") + if err != nil { + panic(err) + } + + return level +} diff --git a/pkg/storage/tsdb/bucket_client_mock.go b/pkg/storage/tsdb/bucket_client_mock.go new file mode 100644 index 00000000000..9d9b34c23b9 --- /dev/null +++ b/pkg/storage/tsdb/bucket_client_mock.go @@ -0,0 +1,98 @@ +package tsdb + +import ( + "bytes" + "context" + "errors" + "io" + "io/ioutil" + + "github.com/stretchr/testify/mock" +) + +// BucketClientMock mocks objstore.Bucket +type BucketClientMock struct { + mock.Mock +} + +// Upload mocks objstore.Bucket.Upload() +func (m *BucketClientMock) Upload(ctx context.Context, name string, r io.Reader) error { + args := m.Called(ctx, name, r) + return args.Error(0) +} + +// Delete mocks objstore.Bucket.Delete() +func (m *BucketClientMock) Delete(ctx context.Context, name string) error { + args := m.Called(ctx, name) + return args.Error(0) +} + +// Name mocks objstore.Bucket.Name() +func (m *BucketClientMock) Name() string { + return "mock" +} + +// Iter mocks objstore.Bucket.Iter() +func (m *BucketClientMock) Iter(ctx context.Context, dir string, f func(string) error) error { + args := m.Called(ctx, dir, f) + return args.Error(0) +} + +// MockIter is a convenient method to mock Iter() +func (m *BucketClientMock) MockIter(prefix string, objects []string, err error) { + m.On("Iter", mock.Anything, prefix, mock.Anything).Return(err).Run(func(args mock.Arguments) { + f := args.Get(2).(func(string) error) + + for _, o := range objects { + if f(o) != nil { + break + } + } + }) +} + +// Get mocks objstore.Bucket.Get() +func (m *BucketClientMock) Get(ctx context.Context, name string) (io.ReadCloser, error) { + args := m.Called(ctx, name) + return args.Get(0).(io.ReadCloser), args.Error(1) +} + +// MockGet is a convenient method to mock Get() and Exists() +func (m *BucketClientMock) MockGet(name, content string, err error) { + if content != "" { + m.On("Exists", mock.Anything, name).Return(true, err) + m.On("Get", mock.Anything, name).Return(ioutil.NopCloser(bytes.NewReader([]byte(content))), err) + } else { + m.On("Exists", mock.Anything, name).Return(false, err) + m.On("Get", mock.Anything, name).Return(nil, errors.New("object does not exist")) + } +} + +// GetRange mocks objstore.Bucket.GetRange() +func (m *BucketClientMock) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + args := m.Called(ctx, name, off, length) + return args.Get(0).(io.ReadCloser), args.Error(1) +} + +// Exists mocks objstore.Bucket.Exists() +func (m *BucketClientMock) Exists(ctx context.Context, name string) (bool, error) { + args := m.Called(ctx, name) + return args.Bool(0), args.Error(1) +} + +// IsObjNotFoundErr mocks objstore.Bucket.IsObjNotFoundErr() +func (m *BucketClientMock) IsObjNotFoundErr(err error) bool { + args := m.Called(err) + return args.Bool(0) +} + +// ObjectSize mocks objstore.Bucket.ObjectSize() +func (m *BucketClientMock) ObjectSize(ctx context.Context, name string) (uint64, error) { + args := m.Called(ctx, name) + return args.Get(0).(uint64), args.Error(1) +} + +// Close mocks objstore.Bucket.Close() +func (m *BucketClientMock) Close() error { + return nil +} diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index b887039a945..55d3f96c7d3 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -104,17 +104,19 @@ func (cfg *Config) Validate() error { // BucketStoreConfig holds the config information for Bucket Stores used by the querier type BucketStoreConfig struct { - SyncDir string `yaml:"sync_dir"` - IndexCacheSizeBytes uint64 `yaml:"index_cache_size_bytes"` - MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` - MaxSampleCount uint64 `yaml:"max_sample_count"` - MaxConcurrent int `yaml:"max_concurrent"` - BlockSyncConcurrency int `yaml:"block_sync_concurrency"` + SyncDir string `yaml:"sync_dir"` + SyncInterval time.Duration `yaml:"sync_interval"` + IndexCacheSizeBytes uint64 `yaml:"index_cache_size_bytes"` + MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` + MaxSampleCount uint64 `yaml:"max_sample_count"` + MaxConcurrent int `yaml:"max_concurrent"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency"` } // RegisterFlags registers the BucketStore flags func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.SyncDir, "experimental.tsdb.bucket-store.sync-dir", "tsdb-sync", "Directory to place synced tsdb indicies.") + f.DurationVar(&cfg.SyncInterval, "experimental.tsdb.bucket-store.sync-interval", 5*time.Minute, "How frequently scan the bucket to look for changes (new blocks shipped by ingesters and blocks removed by retention or compaction). 0 disables it.") f.Uint64Var(&cfg.IndexCacheSizeBytes, "experimental.tsdb.bucket-store.index-cache-size-bytes", uint64(250*units.Mebibyte), "Size of index cache in bytes per tenant.") f.Uint64Var(&cfg.MaxChunkPoolBytes, "experimental.tsdb.bucket-store.max-chunk-pool-bytes", uint64(2*units.Gibibyte), "Max size of chunk pool in bytes per tenant.") f.Uint64Var(&cfg.MaxSampleCount, "experimental.tsdb.bucket-store.max-sample-count", 0, "Max number of samples (0 is no limit) per query when loading series from storage.")