From 03babe2260dfb6224c14b86aed626bb532e0b42c Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 19 Dec 2022 12:00:23 -0800 Subject: [PATCH 1/2] Fixed no compact block got grouped in shuffle sharding grouper Signed-off-by: Alex Le --- pkg/compactor/compactor.go | 10 ++++---- pkg/compactor/shuffle_sharding_grouper.go | 11 +++++++-- .../shuffle_sharding_grouper_test.go | 24 +++++++++++++++++-- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 75732e22000..e665d3ec0db 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -52,7 +52,7 @@ var ( errInvalidShardingStrategy = errors.New("invalid sharding strategy") errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, @@ -67,7 +67,7 @@ var ( cfg.BlocksFetchConcurrency) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper { return NewShuffleShardingGrouper( ctx, logger, @@ -91,7 +91,8 @@ var ( cfg.CompactionConcurrency, cfg.BlockVisitMarkerTimeout, blockVisitMarkerReadFailed, - blockVisitMarkerWriteFailed) + blockVisitMarkerWriteFailed, + noCompactionMarkFilter.NoCompactMarkedBlocks) } DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { @@ -138,6 +139,7 @@ type BlocksGrouperFactory func( ringLifecycler *ring.Lifecycler, limit Limits, userID string, + noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ) compact.Grouper // BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks. @@ -814,7 +816,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { compactor, err := compact.NewBucketCompactor( ulogger, syncer, - c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.ring, c.ringLifecycler, c.limits, userID), + c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter), c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed), c.blocksCompactor, path.Join(c.compactorCfg.DataDir, "compact"), diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 3ed5c3bdeee..291dd655f86 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -53,6 +53,8 @@ type ShuffleShardingGrouper struct { blockVisitMarkerTimeout time.Duration blockVisitMarkerReadFailed prometheus.Counter blockVisitMarkerWriteFailed prometheus.Counter + + noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark } func NewShuffleShardingGrouper( @@ -79,6 +81,7 @@ func NewShuffleShardingGrouper( blockVisitMarkerTimeout time.Duration, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, + noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, ) *ShuffleShardingGrouper { if logger == nil { logger = log.NewNopLogger() @@ -129,17 +132,21 @@ func NewShuffleShardingGrouper( blockVisitMarkerTimeout: blockVisitMarkerTimeout, blockVisitMarkerReadFailed: blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed: blockVisitMarkerWriteFailed, + noCompBlocksFunc: noCompBlocksFunc, } } // Groups function modified from https://github.com/cortexproject/cortex/pull/2616 func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) { + noCompactMarked := g.noCompBlocksFunc() // First of all we have to group blocks using the Thanos default // grouping (based on downsample resolution + external labels). mainGroups := map[string][]*metadata.Meta{} for _, b := range blocks { - key := b.Thanos.GroupKey() - mainGroups[key] = append(mainGroups[key], b) + if _, excluded := noCompactMarked[b.ULID]; !excluded { + key := b.Thanos.GroupKey() + mainGroups[key] = append(mainGroups[key], b) + } } // For each group, we have to further split it into set of blocks diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index d74224ed831..a791f2eaa01 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -127,8 +127,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { compactorID string isExpired bool } - expected [][]ulid.ULID - metrics string + expected [][]ulid.ULID + metrics string + noCompactBlocks map[ulid.ULID]*metadata.NoCompactMark }{ "test basic grouping": { concurrency: 3, @@ -306,6 +307,20 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { cortex_compactor_remaining_planned_compactions 2 `, }, + "test should skip block with no compact marker": { + concurrency: 2, + ranges: []time.Duration{4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block1hto2hExt2Ulid: blocks[block1hto2hExt2Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid]}, + expected: [][]ulid.ULID{ + {block1hto2hExt2Ulid, block0hto1hExt2Ulid}, + {block1hto2hExt1Ulid, block0hto1hExt1Ulid}, + }, + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + # TYPE cortex_compactor_remaining_planned_compactions gauge + cortex_compactor_remaining_planned_compactions 2 +`, + noCompactBlocks: map[ulid.ULID]*metadata.NoCompactMark{block2hto3hExt1Ulid: {}}, + }, } for testName, testData := range tests { @@ -364,6 +379,10 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { bkt.MockUpload(mock.Anything, nil) bkt.MockGet(mock.Anything, "", nil) + noCompactFilter := func() map[ulid.ULID]*metadata.NoCompactMark { + return testData.noCompactBlocks + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() g := NewShuffleShardingGrouper( @@ -390,6 +409,7 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { blockVisitMarkerTimeout, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed, + noCompactFilter, ) actual, err := g.Groups(testData.blocks) require.NoError(t, err) From 9e0d830e06b387a1735a317fa0cd4e5ba118e217 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 19 Dec 2022 12:29:51 -0800 Subject: [PATCH 2/2] Updated change log Signed-off-by: Alex Le --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e50a507f75..a6e70fb805c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [FEATURE] Build ARM docker images. #5041 * [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008 * [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044 +* [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055 ## 1.14.0 2022-12-02