Skip to content

Store Gateway: Add pre add block ownership check #6483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,11 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
store.WithLazyExpandedPostings(u.cfg.BucketStore.LazyExpandedPostingsEnabled),
store.WithPostingGroupMaxKeySeriesRatio(u.cfg.BucketStore.LazyExpandedPostingGroupMaxKeySeriesRatio),
store.WithDontResort(true), // Cortex doesn't need to resort series in store gateway.
store.WithBlockLifecycleCallback(&shardingBlockLifecycleCallbackAdapter{
userID: userID,
strategy: u.shardingStrategy,
logger: userLogger,
}),
}
if u.logLevel.String() == "debug" {
bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging())
Expand Down
8 changes: 8 additions & 0 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,14 @@ func (u *userShardingStrategy) FilterBlocks(ctx context.Context, userID string,
return nil
}

func (u *userShardingStrategy) OwnBlock(userID string, _ thanos_metadata.Meta) (bool, error) {
if util.StringsContain(u.users, userID) {
return true, nil
}

return false, nil
}

// failFirstGetBucket is an objstore.Bucket wrapper which fails the first Get() request with a mocked error.
type failFirstGetBucket struct {
objstore.Bucket
Expand Down
5 changes: 5 additions & 0 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,11 @@ func (m *mockShardingStrategy) FilterBlocks(ctx context.Context, userID string,
return args.Error(0)
}

func (m *mockShardingStrategy) OwnBlock(userID string, meta metadata.Meta) (bool, error) {
args := m.Called(userID, meta)
return args.Bool(0), args.Error(1)
}

func createBucketIndex(t *testing.T, bkt objstore.Bucket, userID string) *bucketindex.Index {
updater := bucketindex.NewUpdater(bkt, userID, nil, log.NewNopLogger())
idx, _, _, err := updater.UpdateIndex(context.Background(), nil)
Expand Down
52 changes: 52 additions & 0 deletions pkg/storegateway/sharding_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storegateway

import (
"context"
"errors"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -19,6 +20,10 @@ const (
shardExcludedMeta = "shard-excluded"
)

var (
errBlockNotOwned = errors.New("block not owned")
)

type ShardingStrategy interface {
// FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs
// that should be synced by the store-gateway.
Expand All @@ -28,6 +33,9 @@ type ShardingStrategy interface {
// The provided loaded map contains blocks which have been previously returned by this function and
// are now loaded or loading in the store-gateway.
FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error

// OwnBlock checks if the block is owned by the current instance.
OwnBlock(userID string, meta metadata.Meta) (bool, error)
}

// ShardingLimits is the interface that should be implemented by the limits provider,
Expand Down Expand Up @@ -71,6 +79,10 @@ func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[uli
return nil
}

func (s *NoShardingStrategy) OwnBlock(_ string, meta metadata.Meta) (bool, error) {
return true, nil
}

// DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways.
// Not go-routine safe.
type DefaultShardingStrategy struct {
Expand Down Expand Up @@ -102,6 +114,17 @@ func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, meta
return nil
}

func (s *DefaultShardingStrategy) OwnBlock(_ string, meta metadata.Meta) (bool, error) {
key := cortex_tsdb.HashBlockID(meta.ULID)

// Check if the block is owned by the store-gateway
set, err := s.r.Get(key, BlocksOwnerSync, nil, nil, nil)
if err != nil {
return false, err
}
return set.Includes(s.instanceAddr), nil
}

// ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways,
// where each tenant blocks are sharded across a subset of store-gateway instances.
type ShuffleShardingStrategy struct {
Expand Down Expand Up @@ -151,6 +174,18 @@ func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string,
return nil
}

func (s *ShuffleShardingStrategy) OwnBlock(userID string, meta metadata.Meta) (bool, error) {
subRing := GetShuffleShardingSubring(s.r, userID, s.limits, s.zoneStableShuffleSharding)
key := cortex_tsdb.HashBlockID(meta.ULID)

// Check if the block is owned by the store-gateway
set, err := subRing.Get(key, BlocksOwnerSync, nil, nil, nil)
if err != nil {
return false, err
}
return set.Includes(s.instanceAddr), nil
}

func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec, logger log.Logger) {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()

Expand Down Expand Up @@ -275,3 +310,20 @@ func (a *shardingBucketReaderAdapter) Iter(ctx context.Context, dir string, f fu

return a.InstrumentedBucketReader.Iter(ctx, dir, f, options...)
}

type shardingBlockLifecycleCallbackAdapter struct {
userID string
strategy ShardingStrategy
logger log.Logger
}

func (a *shardingBlockLifecycleCallbackAdapter) PreAdd(meta metadata.Meta) error {
own, err := a.strategy.OwnBlock(a.userID, meta)
// If unable to check if block is owned or not because of ring error, mark it as owned
// and ignore the error.
if err != nil || own {
return nil
}
level.Info(a.logger).Log("msg", "block not owned from pre check", "block", meta.ULID.String())
return errBlockNotOwned
}
169 changes: 169 additions & 0 deletions pkg/storegateway/sharding_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storegateway

import (
"context"
"errors"
"fmt"
"strconv"
"testing"
Expand All @@ -11,7 +12,9 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
Expand Down Expand Up @@ -272,6 +275,11 @@ func TestDefaultShardingStrategy(t *testing.T) {

for instanceAddr, expectedBlocks := range testData.expectedBlocks {
filter := NewDefaultShardingStrategy(r, instanceAddr, log.NewNopLogger(), nil)
for _, block := range expectedBlocks {
owned, err := filter.OwnBlock("user-1", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block}})
require.NoError(t, err)
require.True(t, owned)
}
synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"})
synced.WithLabelValues(shardExcludedMeta).Set(0)

Expand Down Expand Up @@ -657,6 +665,11 @@ func TestShuffleShardingStrategy(t *testing.T) {
// Assert on filter blocks.
for _, expected := range testData.expectedBlocks {
filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), allowedTenants, zoneStableShuffleSharding) //nolint:govet
for _, block := range expected.blocks {
owned, err := filter.OwnBlock(userID, metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block}})
require.NoError(t, err)
require.True(t, owned)
}
synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"})
synced.WithLabelValues(shardExcludedMeta).Set(0)

Expand Down Expand Up @@ -693,3 +706,159 @@ type shardingLimitsMock struct {
func (m *shardingLimitsMock) StoreGatewayTenantShardSize(_ string) float64 {
return m.storeGatewayTenantShardSize
}

func TestDefaultShardingStrategy_OwnBlock(t *testing.T) {
t.Parallel()
// The following block IDs have been picked to have increasing hash values
// in order to simplify the tests.
block1 := ulid.MustNew(1, nil) // hash: 283204220
block2 := ulid.MustNew(2, nil)
block1Hash := cortex_tsdb.HashBlockID(block1)
registeredAt := time.Now()
block2Hash := cortex_tsdb.HashBlockID(block2)

ctx := context.Background()
store, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

// Initialize the ring state.
require.NoError(t, store.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) {
d := ring.NewDesc()
d.AddIngester("instance-1", "127.0.0.1", "zone-a", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
d.AddIngester("instance-2", "127.0.0.2", "zone-b", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
return d, true, nil
}))

cfg := ring.Config{
ReplicationFactor: 1,
HeartbeatTimeout: time.Minute,
ZoneAwarenessEnabled: true,
}

r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), nil, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck

// Wait until the ring client has synced.
require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE))
filter := NewDefaultShardingStrategy(r, "127.0.0.1", log.NewNopLogger(), nil)
owned, err := filter.OwnBlock("", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}})
require.NoError(t, err)
require.True(t, owned)
// Owned by 127.0.0.2
owned, err = filter.OwnBlock("", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}})
require.NoError(t, err)
require.False(t, owned)

filter2 := NewDefaultShardingStrategy(r, "127.0.0.2", log.NewNopLogger(), nil)
owned, err = filter2.OwnBlock("", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}})
require.NoError(t, err)
require.True(t, owned)
}

func TestShuffleShardingStrategy_OwnBlock(t *testing.T) {
t.Parallel()
// The following block IDs have been picked to have increasing hash values
// in order to simplify the tests.
block1 := ulid.MustNew(1, nil) // hash: 283204220
block2 := ulid.MustNew(2, nil)
block1Hash := cortex_tsdb.HashBlockID(block1)
registeredAt := time.Now()
block2Hash := cortex_tsdb.HashBlockID(block2)

ctx := context.Background()
store, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

// Initialize the ring state.
require.NoError(t, store.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) {
d := ring.NewDesc()
d.AddIngester("instance-1", "127.0.0.1", "zone-a", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
d.AddIngester("instance-2", "127.0.0.2", "zone-b", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
d.AddIngester("instance-3", "127.0.0.3", "zone-c", []uint32{block2Hash + 2}, ring.ACTIVE, registeredAt)
return d, true, nil
}))

cfg := ring.Config{
ReplicationFactor: 1,
HeartbeatTimeout: time.Minute,
ZoneAwarenessEnabled: true,
}
limits := &shardingLimitsMock{storeGatewayTenantShardSize: 2}

r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), nil, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck

// Wait until the ring client has synced.
require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE))
filter := NewShuffleShardingStrategy(r, "instance-1", "127.0.0.1", limits, log.NewNopLogger(), nil, true)
filter2 := NewShuffleShardingStrategy(r, "instance-2", "127.0.0.2", limits, log.NewNopLogger(), nil, true)

owned, err := filter.OwnBlock("user-1", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}})
require.NoError(t, err)
require.True(t, owned)
// Owned by 127.0.0.2
owned, err = filter.OwnBlock("user-1", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}})
require.NoError(t, err)
require.False(t, owned)

owned, err = filter2.OwnBlock("user-1", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}})
require.NoError(t, err)
require.True(t, owned)
}

func TestShardingBlockLifecycleCallbackAdapter(t *testing.T) {
userID := "user-1"
logger := log.NewNopLogger()
block := ulid.MustNew(1, nil)
meta := metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block}}

for _, tc := range []struct {
name string
shardingStrategy func() ShardingStrategy
expectErr bool
}{
{
name: "own block",
shardingStrategy: func() ShardingStrategy {
s := &mockShardingStrategy{}
s.On("OwnBlock", mock.Anything, mock.Anything).Return(true, nil)
return s
},
},
{
name: "own block has error, still own block",
shardingStrategy: func() ShardingStrategy {
s := &mockShardingStrategy{}
s.On("OwnBlock", mock.Anything, mock.Anything).Return(false, errors.New("some error"))
return s
},
},
{
name: "not own block",
shardingStrategy: func() ShardingStrategy {
s := &mockShardingStrategy{}
s.On("OwnBlock", mock.Anything, mock.Anything).Return(false, nil)
return s
},
expectErr: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
a := &shardingBlockLifecycleCallbackAdapter{
userID: userID,
logger: logger,
strategy: tc.shardingStrategy(),
}
err := a.PreAdd(meta)
if tc.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
Loading