diff --git a/CHANGELOG.md b/CHANGELOG.md index d51a4ef054b..616d445e238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ * [FEATURE] Added zstd as an option for grpc compression #5092 * [FEATURE] Ring: Add new kv store option `dynamodb`. #5026 * [FEATURE] Cache: Support redis as backend for caching bucket and index cache. #5057 +* [FEATURE] Querier/Store-Gateway: Added `-blocks-storage.bucket-store.ignore-blocks-within` allowing to filter out the recently created blocks from being synced by queriers and store-gateways. #5166 * [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008 * [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044 * [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 7dee1d32fa7..89546149346 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1084,6 +1084,15 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.ignore-deletion-marks-delay [ignore_deletion_mark_delay: | default = 6h] + # The blocks created since `now() - ignore_blocks_within` will not be + # synced. This should be used together with `-querier.query-store-after` to + # filter out the blocks that are too new to be queried. A reasonable value + # for this flag would be `-querier.query-store-after - + # blocks-storage.bucket-store.bucket-index.max-stale-period` to give some + # buffer. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.ignore-blocks-within + [ignore_blocks_within: | default = 0s] + bucket_index: # True to enable querier and store-gateway to discover blocks in the # storage via bucket index instead of bucket scanning. diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 23b8295f35a..907e7cbbf7f 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1161,6 +1161,15 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.ignore-deletion-marks-delay [ignore_deletion_mark_delay: | default = 6h] + # The blocks created since `now() - ignore_blocks_within` will not be + # synced. This should be used together with `-querier.query-store-after` to + # filter out the blocks that are too new to be queried. A reasonable value + # for this flag would be `-querier.query-store-after - + # blocks-storage.bucket-store.bucket-index.max-stale-period` to give some + # buffer. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.ignore-blocks-within + [ignore_blocks_within: | default = 0s] + bucket_index: # True to enable querier and store-gateway to discover blocks in the # storage via bucket index instead of bucket scanning. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a85cc9957fe..3076d1320b6 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4024,6 +4024,15 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.ignore-deletion-marks-delay [ignore_deletion_mark_delay: | default = 6h] + # The blocks created since `now() - ignore_blocks_within` will not be synced. + # This should be used together with `-querier.query-store-after` to filter out + # the blocks that are too new to be queried. A reasonable value for this flag + # would be `-querier.query-store-after - + # blocks-storage.bucket-store.bucket-index.max-stale-period` to give some + # buffer. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.ignore-blocks-within + [ignore_blocks_within: | default = 0s] + bucket_index: # True to enable querier and store-gateway to discover blocks in the storage # via bucket index instead of bucket scanning. diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index 551582c21cf..31686fcb7ba 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -24,6 +24,7 @@ type BucketIndexBlocksFinderConfig struct { IndexLoader bucketindex.LoaderConfig MaxStalePeriod time.Duration IgnoreDeletionMarksDelay time.Duration + IgnoreBlocksWithin time.Duration } // BucketIndexBlocksFinder implements BlocksFinder interface and find blocks in the bucket diff --git a/pkg/querier/blocks_finder_bucket_index_test.go b/pkg/querier/blocks_finder_bucket_index_test.go index 70fd84e0f9c..9f574e0b10e 100644 --- a/pkg/querier/blocks_finder_bucket_index_test.go +++ b/pkg/querier/blocks_finder_bucket_index_test.go @@ -25,17 +25,19 @@ func TestBucketIndexBlocksFinder_GetBlocks(t *testing.T) { bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) // Mock a bucket index. + now := time.Now() block1 := &bucketindex.Block{ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 15} block2 := &bucketindex.Block{ID: ulid.MustNew(2, nil), MinTime: 12, MaxTime: 20} block3 := &bucketindex.Block{ID: ulid.MustNew(3, nil), MinTime: 20, MaxTime: 30} block4 := &bucketindex.Block{ID: ulid.MustNew(4, nil), MinTime: 30, MaxTime: 40} - block5 := &bucketindex.Block{ID: ulid.MustNew(5, nil), MinTime: 30, MaxTime: 40} // Time range overlaps with block4, but this block deletion mark is above the threshold. + block5 := &bucketindex.Block{ID: ulid.MustNew(5, nil), MinTime: 30, MaxTime: 40} // Time range overlaps with block4, but this block deletion mark is above the threshold. + block6 := &bucketindex.Block{ID: ulid.MustNew(6, nil), MinTime: now.Add(-2 * time.Hour).UnixMilli(), MaxTime: now.UnixMilli()} // This block is within ignoreBlocksWithin and shouldn't be loaded. mark3 := &bucketindex.BlockDeletionMark{ID: block3.ID, DeletionTime: time.Now().Unix()} mark5 := &bucketindex.BlockDeletionMark{ID: block5.ID, DeletionTime: time.Now().Add(-2 * time.Hour).Unix()} require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, &bucketindex.Index{ Version: bucketindex.IndexVersion1, - Blocks: bucketindex.Blocks{block1, block2, block3, block4, block5}, + Blocks: bucketindex.Blocks{block1, block2, block3, block4, block5, block6}, BlockDeletionMarks: bucketindex.BlockDeletionMarks{mark3, mark5}, UpdatedAt: time.Now().Unix(), })) @@ -102,6 +104,14 @@ func TestBucketIndexBlocksFinder_GetBlocks(t *testing.T) { block3.ID: mark3, }, }, + "query range matching all blocks but should ignore non-queryable block": { + minT: 0, + maxT: block5.MaxTime, + expectedBlocks: bucketindex.Blocks{block4, block3, block2, block1}, + expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{ + block3.ID: mark3, + }, + }, } for testName, testData := range tests { @@ -209,6 +219,7 @@ func prepareBucketIndexBlocksFinder(t testing.TB, bkt objstore.Bucket) *BucketIn }, MaxStalePeriod: time.Hour, IgnoreDeletionMarksDelay: time.Hour, + IgnoreBlocksWithin: 10 * time.Hour, } finder := NewBucketIndexBlocksFinder(cfg, bkt, nil, log.NewNopLogger(), nil) diff --git a/pkg/querier/blocks_finder_bucket_scan.go b/pkg/querier/blocks_finder_bucket_scan.go index 0012a6beb10..43b0b12f8c4 100644 --- a/pkg/querier/blocks_finder_bucket_scan.go +++ b/pkg/querier/blocks_finder_bucket_scan.go @@ -42,6 +42,7 @@ type BucketScanBlocksFinderConfig struct { CacheDir string ConsistencyDelay time.Duration IgnoreDeletionMarksDelay time.Duration + IgnoreBlocksWithin time.Duration } // BucketScanBlocksFinder is a BlocksFinder implementation periodically scanning the bucket to discover blocks. @@ -378,6 +379,11 @@ func (d *BucketScanBlocksFinder) createMetaFetcher(userID string) (block.Metadat deletionMarkFilter := block.NewIgnoreDeletionMarkFilter(userLogger, userBucket, d.cfg.IgnoreDeletionMarksDelay, d.cfg.MetasConcurrency) filters := []block.MetadataFilter{deletionMarkFilter} + // Here we filter out the blocks that are too new to query. + if d.cfg.IgnoreBlocksWithin > 0 { + filters = append(filters, storegateway.NewIgnoreNonQueryableBlocksFilter(d.logger, d.cfg.IgnoreBlocksWithin)) + } + f, err := block.NewMetaFetcher( userLogger, d.cfg.MetasConcurrency, diff --git a/pkg/querier/blocks_finder_bucket_scan_test.go b/pkg/querier/blocks_finder_bucket_scan_test.go index b05452eb619..748a0976db8 100644 --- a/pkg/querier/blocks_finder_bucket_scan_test.go +++ b/pkg/querier/blocks_finder_bucket_scan_test.go @@ -383,10 +383,12 @@ func TestBucketScanBlocksFinder_GetBlocks(t *testing.T) { ctx := context.Background() s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig()) + now := time.Now() block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 15) block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 12, 20) block3 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30) block4 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 30, 40) + block5 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", now.Add(-2*time.Hour).UnixMilli(), now.UnixMilli()) // This block is within ignoreBlocksWithin mark3 := bucketindex.BlockDeletionMarkFromThanosMarker(cortex_testutil.MockStorageDeletionMark(t, bucket, "user-1", block3)) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) @@ -451,6 +453,14 @@ func TestBucketScanBlocksFinder_GetBlocks(t *testing.T) { block3.ULID: mark3, }, }, + "query range matching all blocks but should ignore non-queryable block": { + minT: 0, + maxT: block5.MaxTime, + expectedMetas: []tsdb.BlockMeta{block4, block3, block2, block1}, + expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{ + block3.ULID: mark3, + }, + }, } for testName, testData := range tests { @@ -488,5 +498,6 @@ func prepareBucketScanBlocksFinderConfig() BucketScanBlocksFinderConfig { TenantsConcurrency: 10, MetasConcurrency: 10, IgnoreDeletionMarksDelay: time.Hour, + IgnoreBlocksWithin: 10 * time.Hour, // All blocks created in the last 10 hour shoudn't be scanned. } } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 95f69bda97b..841d7fdf6a3 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -198,6 +198,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa }, MaxStalePeriod: storageCfg.BucketStore.BucketIndex.MaxStalePeriod, IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay, + IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin, }, bucketClient, limits, logger, reg) } else { finder = NewBucketScanBlocksFinder(BucketScanBlocksFinderConfig{ @@ -206,6 +207,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa MetasConcurrency: storageCfg.BucketStore.MetaSyncConcurrency, CacheDir: storageCfg.BucketStore.SyncDir, IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay, + IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin, }, bucketClient, limits, logger, reg) } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index ed57dc5ba30..2e030220a3e 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -239,6 +239,7 @@ type BucketStoreConfig struct { ChunksCache ChunksCacheConfig `yaml:"chunks_cache"` MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"` + IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"` BucketIndex BucketIndexConfig `yaml:"bucket_index"` // Chunk pool. @@ -282,6 +283,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.IgnoreDeletionMarksDelay, "blocks-storage.bucket-store.ignore-deletion-marks-delay", time.Hour*6, "Duration after which the blocks marked for deletion will be filtered out while fetching blocks. "+ "The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+ "Default is 6h, half of the default value for -compactor.deletion-delay.") + f.DurationVar(&cfg.IgnoreBlocksWithin, "blocks-storage.bucket-store.ignore-blocks-within", 0, "The blocks created since `now() - ignore_blocks_within` will not be synced. This should be used together with `-querier.query-store-after` to filter out the blocks that are too new to be queried. A reasonable value for this flag would be `-querier.query-store-after - blocks-storage.bucket-store.bucket-index.max-stale-period` to give some buffer. 0 to disable.") f.IntVar(&cfg.PostingOffsetsInMemSampling, "blocks-storage.bucket-store.posting-offsets-in-mem-sampling", store.DefaultPostingOffsetInMemorySampling, "Controls what is the ratio of postings offsets that the store will hold in memory.") f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", false, "If enabled, store-gateway will lazily memory-map an index-header only once required by a query.") f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 20*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will release memory-mapped index-headers after 'idle timeout' inactivity.") diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 680ad43a6e0..ac3fe6a2b07 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -435,6 +435,11 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro // Remove Cortex external labels so that they're not injected when querying blocks. }...) + if u.cfg.BucketStore.IgnoreBlocksWithin > 0 { + // Filter out blocks that are too new to be queried. + filters = append(filters, NewIgnoreNonQueryableBlocksFilter(userLogger, u.cfg.BucketStore.IgnoreBlocksWithin)) + } + // Instantiate a different blocks metadata fetcher based on whether bucket index is enabled or not. var fetcher block.MetadataFetcher if u.cfg.BucketStore.BucketIndex.Enabled { diff --git a/pkg/storegateway/metadata_fetcher_filters.go b/pkg/storegateway/metadata_fetcher_filters.go index efbca69d0d5..3bc35b88cac 100644 --- a/pkg/storegateway/metadata_fetcher_filters.go +++ b/pkg/storegateway/metadata_fetcher_filters.go @@ -5,6 +5,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" @@ -75,3 +76,32 @@ func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, meta return nil } + +func NewIgnoreNonQueryableBlocksFilter(logger log.Logger, ignoreWithin time.Duration) *IgnoreNonQueryableBlocksFilter { + return &IgnoreNonQueryableBlocksFilter{ + logger: logger, + ignoreWithin: ignoreWithin, + } +} + +// IgnoreNonQueryableBlocksFilter ignores blocks that are too new be queried. +// This has be used in conjunction with `-querier.query-store-after` with some buffer. +type IgnoreNonQueryableBlocksFilter struct { + // Blocks that were created since `now() - ignoreWithin` will not be synced. + ignoreWithin time.Duration + logger log.Logger +} + +// Filter implements block.MetadataFilter. +func (f *IgnoreNonQueryableBlocksFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error { + ignoreWithin := time.Now().Add(-f.ignoreWithin).UnixMilli() + + for id, m := range metas { + if m.MinTime > ignoreWithin { + level.Debug(f.logger).Log("msg", "ignoring block because it won't be queried", "id", id) + delete(metas, id) + } + } + + return nil +} diff --git a/pkg/storegateway/metadata_fetcher_filters_test.go b/pkg/storegateway/metadata_fetcher_filters_test.go index 0d988a99210..12a8a0f9619 100644 --- a/pkg/storegateway/metadata_fetcher_filters_test.go +++ b/pkg/storegateway/metadata_fetcher_filters_test.go @@ -19,6 +19,8 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" + "github.com/prometheus/prometheus/tsdb" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" @@ -106,3 +108,65 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) { assert.Equal(t, expectedMetas, inputMetas) assert.Equal(t, expectedDeletionMarks, f.DeletionMarkBlocks()) } + +func TestIgnoreNonQueryableBlocksFilter(t *testing.T) { + now := time.Now() + ctx := context.Background() + logger := log.NewNopLogger() + + inputMetas := map[ulid.ULID]*metadata.Meta{ + ulid.MustNew(1, nil): { + BlockMeta: tsdb.BlockMeta{ + MinTime: now.Add(-2 * time.Hour).UnixMilli(), + MaxTime: now.Add(-0 * time.Hour).UnixMilli(), + }, + }, + ulid.MustNew(2, nil): { + BlockMeta: tsdb.BlockMeta{ + MinTime: now.Add(-4 * time.Hour).UnixMilli(), + MaxTime: now.Add(-2 * time.Hour).UnixMilli(), + }, + }, + ulid.MustNew(3, nil): { + BlockMeta: tsdb.BlockMeta{ + MinTime: now.Add(-6 * time.Hour).UnixMilli(), + MaxTime: now.Add(-4 * time.Hour).UnixMilli(), + }, + }, + ulid.MustNew(4, nil): { + BlockMeta: tsdb.BlockMeta{ + MinTime: now.Add(-8 * time.Hour).UnixMilli(), + MaxTime: now.Add(-6 * time.Hour).UnixMilli(), + }, + }, + } + + expectedMetas := map[ulid.ULID]*metadata.Meta{ + ulid.MustNew(2, nil): { + BlockMeta: tsdb.BlockMeta{ + MinTime: now.Add(-4 * time.Hour).UnixMilli(), + MaxTime: now.Add(-2 * time.Hour).UnixMilli(), + }, + }, + ulid.MustNew(3, nil): { + BlockMeta: tsdb.BlockMeta{ + MinTime: now.Add(-6 * time.Hour).UnixMilli(), + MaxTime: now.Add(-4 * time.Hour).UnixMilli(), + }, + }, + ulid.MustNew(4, nil): { + BlockMeta: tsdb.BlockMeta{ + MinTime: now.Add(-8 * time.Hour).UnixMilli(), + MaxTime: now.Add(-6 * time.Hour).UnixMilli(), + }, + }, + } + + synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{Name: "synced"}, []string{"state"}) + modified := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{Name: "modified"}, []string{"state"}) + + f := NewIgnoreNonQueryableBlocksFilter(logger, 3*time.Hour) + + require.NoError(t, f.Filter(ctx, inputMetas, synced, modified)) + assert.Equal(t, expectedMetas, inputMetas) +}