diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f46a069559..3fdddaa8188 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,12 @@ ## master / unreleased +* [CHANGE] Improved shuffle sharding support in the write path. This work introduced some config changes: #3090 + * Introduced `-distributor.sharding-strategy` CLI flag (and its respective `sharding_strategy` YAML config option) to explicitly specify which sharding strategy should be used in the write path + * `-experimental.distributor.user-subring-size` flag renamed to `-distributor.ingestion-tenant-shard-size` + * `user_subring_size` limit YAML config option renamed to `ingestion_tenant_shard_size` * [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-user` globally, or using per-user limit `max_queriers_per_user`), each user's requests will be handled by different set of queriers. #3113 +* [ENHANCEMENT] Shuffle sharding: improved shuffle sharding in the write path. Shuffle sharding now should be explicitly enabled via `-distributor.sharding-strategy` CLI flag (or its respective YAML config option) and guarantees stability, consistency, shuffling and balanced zone-awareness properties. #3090 * [ENHANCEMENT] Ingester: added new metric `cortex_ingester_active_series` to track active series more accurately. Also added options to control whether active series tracking is enabled (`-ingester.active-series-enabled`, defaults to false), and how often this metric is updated (`-ingester.active-series-update-period`) and max idle time for series to be considered inactive (`-ingester.active-series-idle-timeout`). #3153 * [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178 * [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 40d2f8ca2dc..75423b645c8 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -392,6 +392,10 @@ ha_tracker: # CLI flag: -distributor.extra-query-delay [extra_queue_delay: | default = 0s] +# The sharding strategy to use. Supported values are: default, shuffle-sharding. +# CLI flag: -distributor.sharding-strategy +[sharding_strategy: | default = "default"] + # Distribute samples based on all labels, as opposed to solely by user and # metric name. # CLI flag: -distributor.shard-by-all-labels @@ -2748,9 +2752,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -validation.enforce-metric-name [enforce_metric_name: | default = true] -# Per-user subring to shard metrics to ingesters. 0 is disabled. -# CLI flag: -experimental.distributor.user-subring-size -[user_subring_size: | default = 0] +# The default tenant's shard size when the shuffle-sharding strategy is used. +# Must be set both on ingesters and distributors. When this setting is specified +# in the per-tenant overrides, a value of 0 disables shuffle sharding for the +# tenant. +# CLI flag: -distributor.ingestion-tenant-shard-size +[ingestion_tenant_shard_size: | default = 0] # The maximum number of series for which a query can fetch samples from each # ingester. This limit is enforced only in the ingesters (when querying samples diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 2a6615c6ef6..1e192553c8a 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -37,7 +37,7 @@ Currently experimental features are: - Azure blob storage. - Zone awareness based replication. -- User subrings. +- Shuffle sharding (both read and write path). - Ruler API (to PUT rules). - Alertmanager API - Memcached client DNS-based service discovery. diff --git a/integration/e2e/metrics.go b/integration/e2e/metrics.go index 445cdcd24d8..cdfa799e70b 100644 --- a/integration/e2e/metrics.go +++ b/integration/e2e/metrics.go @@ -73,7 +73,7 @@ func filterMetrics(metrics []*io_prometheus_client.Metric, opts MetricsOptions) return filtered } -func sumValues(values []float64) float64 { +func SumValues(values []float64) float64 { sum := 0.0 for _, v := range values { sum += v diff --git a/integration/e2e/service.go b/integration/e2e/service.go index 5b6fdfb80d3..0dc09c4c7c2 100644 --- a/integration/e2e/service.go +++ b/integration/e2e/service.go @@ -586,7 +586,7 @@ func (s *HTTPService) SumMetrics(metricNames []string, opts ...MetricsOption) ([ return nil, errors.Wrapf(errMissingMetric, "metric=%s service=%s", m, s.name) } - sums[i] = sumValues(getValues(metrics, options)) + sums[i] = SumValues(getValues(metrics, options)) } return sums, nil diff --git a/integration/ingester_sharding_test.go b/integration/ingester_sharding_test.go new file mode 100644 index 00000000000..250397bddc2 --- /dev/null +++ b/integration/ingester_sharding_test.go @@ -0,0 +1,110 @@ +// +build requires_docker + +package integration + +import ( + "fmt" + "strconv" + "testing" + "time" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func TestIngesterSharding(t *testing.T) { + const numSeriesToPush = 1000 + + tests := map[string]struct { + shardingStrategy string + tenantShardSize int + expectedIngestersWithSeries int + }{ + "default sharding strategy should spread series across all ingesters": { + shardingStrategy: "default", + tenantShardSize: 2, // Ignored by default strategy. + expectedIngestersWithSeries: 3, + }, + "shuffle-sharding strategy should spread series across the configured shard size number of ingesters": { + shardingStrategy: "shuffle-sharding", + tenantShardSize: 2, + expectedIngestersWithSeries: 2, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + flags := BlocksStorageFlags + flags["-distributor.sharding-strategy"] = testData.shardingStrategy + flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components. + distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "") + ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flags, "") + ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flags, "") + ingester3 := e2ecortex.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flags, "") + querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3, querier)) + + // Wait until distributor and queriers have updated the ring. + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + // Push series. + now := time.Now() + + client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + for i := 1; i <= numSeriesToPush; i++ { + series, _ := generateSeries(fmt.Sprintf("series_%d", i), now) + res, err := client.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + // Extract metrics from ingesters. + numIngestersWithSeries := 0 + totalIngestedSeries := 0 + + for _, ing := range []*e2ecortex.CortexService{ingester1, ingester2, ingester3} { + values, err := ing.SumMetrics([]string{"cortex_ingester_memory_series"}) + require.NoError(t, err) + + numMemorySeries := e2e.SumValues(values) + totalIngestedSeries += int(numMemorySeries) + if numMemorySeries > 0 { + numIngestersWithSeries++ + } + } + + require.Equal(t, testData.expectedIngestersWithSeries, numIngestersWithSeries) + require.Equal(t, numSeriesToPush, totalIngestedSeries) + + // Ensure no service-specific metrics prefix is used by the wrong service. + assertServiceMetricsPrefixes(t, Distributor, distributor) + assertServiceMetricsPrefixes(t, Ingester, ingester1) + assertServiceMetricsPrefixes(t, Ingester, ingester2) + assertServiceMetricsPrefixes(t, Ingester, ingester3) + assertServiceMetricsPrefixes(t, Querier, querier) + }) + } +} diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 9c6c4f6eeda..43783f8189b 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -176,7 +176,7 @@ func (c *Config) Validate(log log.Logger) error { if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels); err != nil { return errors.Wrap(err, "invalid limits config") } - if err := c.Distributor.Validate(); err != nil { + if err := c.Distributor.Validate(c.LimitsConfig); err != nil { return errors.Wrap(err, "invalid distributor config") } if err := c.Querier.Validate(); err != nil { diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index cf7e7849a6a..8520cdfa8fd 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -3,6 +3,7 @@ package distributor import ( "context" "flag" + "fmt" "net/http" "sort" "strings" @@ -104,11 +105,21 @@ var ( Help: "Unix timestamp of latest received sample per user.", }, []string{"user"}) emptyPreallocSeries = ingester_client.PreallocTimeseries{} + + supportedShardingStrategies = []string{ShardingStrategyDefault, ShardingStrategyShuffle} + + // Validation errors. + errInvalidShardingStrategy = errors.New("invalid sharding strategy") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") ) const ( typeSamples = "samples" typeMetadata = "metadata" + + // Supported sharding strategies. + ShardingStrategyDefault = "default" + ShardingStrategyShuffle = "shuffle-sharding" ) // Distributor is a storage.SampleAppender and a client.Querier which @@ -147,7 +158,8 @@ type Config struct { RemoteTimeout time.Duration `yaml:"remote_timeout"` ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"` - ShardByAllLabels bool `yaml:"shard_by_all_labels"` + ShardingStrategy string `yaml:"sharding_strategy"` + ShardByAllLabels bool `yaml:"shard_by_all_labels"` // Distributors ring DistributorRing RingConfig `yaml:"ring"` @@ -170,10 +182,19 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.") f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.") + f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) } // Validate config and returns error on failure -func (cfg *Config) Validate() error { +func (cfg *Config) Validate(limits validation.Limits) error { + if !util.StringsContain(supportedShardingStrategies, cfg.ShardingStrategy) { + return errInvalidShardingStrategy + } + + if cfg.ShardingStrategy == ShardingStrategyShuffle && limits.IngestionTenantShardSize <= 0 { + return errInvalidTenantShardSize + } + return cfg.HATrackerConfig.Validate() } @@ -508,13 +529,11 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata)) } - var subRing ring.ReadRing - subRing = d.ingestersRing + subRing := d.ingestersRing.(ring.ReadRing) - // Obtain a subring if required - if size := d.limits.SubringSize(userID); size > 0 { - h := client.HashAdd32a(client.HashNew32a(), userID) - subRing = d.ingestersRing.Subring(h, size) + // Obtain a subring if required. + if d.cfg.ShardingStrategy == ShardingStrategyShuffle { + subRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID)) } keys := append(seriesKeys, metadataKeys...) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 7f2b9394ff2..4a4430f0a63 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -46,6 +46,58 @@ var ( ctx = user.InjectOrgID(context.Background(), "user") ) +func TestConfig_Validate(t *testing.T) { + tests := map[string]struct { + initConfig func(*Config) + initLimits func(*validation.Limits) + expected error + }{ + "default config should pass": { + initConfig: func(_ *Config) {}, + initLimits: func(_ *validation.Limits) {}, + expected: nil, + }, + "should fail on invalid sharding strategy": { + initConfig: func(cfg *Config) { + cfg.ShardingStrategy = "xxx" + }, + initLimits: func(_ *validation.Limits) {}, + expected: errInvalidShardingStrategy, + }, + "should fail if the default shard size is 0 on when sharding strategy = shuffle-sharding": { + initConfig: func(cfg *Config) { + cfg.ShardingStrategy = "shuffle-sharding" + }, + initLimits: func(limits *validation.Limits) { + limits.IngestionTenantShardSize = 0 + }, + expected: errInvalidTenantShardSize, + }, + "should pass if the default shard size > 0 on when sharding strategy = shuffle-sharding": { + initConfig: func(cfg *Config) { + cfg.ShardingStrategy = "shuffle-sharding" + }, + initLimits: func(limits *validation.Limits) { + limits.IngestionTenantShardSize = 3 + }, + expected: nil, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + cfg := Config{} + limits := validation.Limits{} + flagext.DefaultValues(&cfg, &limits) + + testData.initConfig(&cfg) + testData.initLimits(&limits) + + assert.Equal(t, testData.expected, cfg.Validate(limits)) + }) + } +} + func TestDistributor_Push(t *testing.T) { // Metrics to assert on. lastSeenTimestamp := "cortex_distributor_latest_seen_sample_timestamp_seconds" diff --git a/pkg/querier/blocks_store_replicated_set_test.go b/pkg/querier/blocks_store_replicated_set_test.go index 25380046cdc..3d14e71eb1b 100644 --- a/pkg/querier/blocks_store_replicated_set_test.go +++ b/pkg/querier/blocks_store_replicated_set_test.go @@ -37,10 +37,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { block3Hash := cortex_tsdb.HashBlockID(block3) block4Hash := cortex_tsdb.HashBlockID(block4) - // Ensure the user ID we use belongs to the instances holding the token for the block 1 - // (it's expected by the assertions below). userID := "user-A" - require.LessOrEqual(t, cortex_tsdb.HashTenantID(userID), block1Hash) tests := map[string]struct { shardingStrategy string @@ -250,7 +247,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { queryBlocks: []ulid.ULID{block1, block2, block4}, expectedClients: map[string][]ulid.ULID{ "127.0.0.1": {block1, block4}, - "127.0.0.2": {block2}, + "127.0.0.3": {block2}, }, }, "shuffle sharding, multiple instances in the ring with RF = 1, SS = 4": { @@ -286,7 +283,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { block2: {"127.0.0.1"}, }, expectedClients: map[string][]ulid.ULID{ - "127.0.0.2": {block1, block2}, + "127.0.0.3": {block1, block2}, }, }, "shuffle sharding, multiple instances in the ring with RF = 2, SS = 2 with excluded blocks and no replacement available": { @@ -301,7 +298,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { }, queryBlocks: []ulid.ULID{block1, block2}, exclude: map[ulid.ULID][]string{ - block1: {"127.0.0.1", "127.0.0.2"}, + block1: {"127.0.0.1", "127.0.0.3"}, block2: {"127.0.0.1"}, }, expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()), diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 85a1ad83d3c..7b404a7e3d9 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -412,6 +412,25 @@ func (d *Desc) getTokens() []TokenDesc { return tokens } +// getTokensByZone returns instances tokens grouped by zone. Tokens within each zone +// are guaranteed to be sorted. +func (d *Desc) getTokensByZone() map[string][]TokenDesc { + zones := map[string][]TokenDesc{} + + for key, ing := range d.Ingesters { + for _, token := range ing.Tokens { + zones[ing.Zone] = append(zones[ing.Zone], TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()}) + } + } + + // Ensure tokens are sorted within each zone. + for zone := range zones { + sort.Sort(ByToken(zones[zone])) + } + + return zones +} + func GetOrCreateRingDesc(d interface{}) *Desc { if d == nil { return NewDesc() diff --git a/pkg/ring/model_test.go b/pkg/ring/model_test.go index 27c0609531e..05d7816d0c9 100644 --- a/pkg/ring/model_test.go +++ b/pkg/ring/model_test.go @@ -186,3 +186,57 @@ func TestDesc_Ready(t *testing.T) { t.Fatal("expected ready, got", err) } } + +func TestDesc_getTokensByZone(t *testing.T) { + tests := map[string]struct { + desc *Desc + expected map[string][]TokenDesc + }{ + "empty ring": { + desc: &Desc{Ingesters: map[string]IngesterDesc{}}, + expected: map[string][]TokenDesc{}, + }, + "single zone": { + desc: &Desc{Ingesters: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Tokens: []uint32{1, 5}, Zone: ""}, + "instance-2": {Addr: "127.0.0.1", Tokens: []uint32{2, 4}, Zone: ""}, + "instance-3": {Addr: "127.0.0.1", Tokens: []uint32{3, 6}, Zone: ""}, + }}, + expected: map[string][]TokenDesc{ + "": { + {Token: 1, Ingester: "instance-1", Zone: ""}, + {Token: 2, Ingester: "instance-2", Zone: ""}, + {Token: 3, Ingester: "instance-3", Zone: ""}, + {Token: 4, Ingester: "instance-2", Zone: ""}, + {Token: 5, Ingester: "instance-1", Zone: ""}, + {Token: 6, Ingester: "instance-3", Zone: ""}, + }, + }, + }, + "multiple zones": { + desc: &Desc{Ingesters: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Tokens: []uint32{1, 5}, Zone: "zone-1"}, + "instance-2": {Addr: "127.0.0.1", Tokens: []uint32{2, 4}, Zone: "zone-1"}, + "instance-3": {Addr: "127.0.0.1", Tokens: []uint32{3, 6}, Zone: "zone-2"}, + }}, + expected: map[string][]TokenDesc{ + "zone-1": { + {Token: 1, Ingester: "instance-1", Zone: "zone-1"}, + {Token: 2, Ingester: "instance-2", Zone: "zone-1"}, + {Token: 4, Ingester: "instance-2", Zone: "zone-1"}, + {Token: 5, Ingester: "instance-1", Zone: "zone-1"}, + }, + "zone-2": { + {Token: 3, Ingester: "instance-3", Zone: "zone-2"}, + {Token: 6, Ingester: "instance-3", Zone: "zone-2"}, + }, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, testData.desc.getTokensByZone()) + }) + } +} diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index c5da5c6fd86..370c0040a52 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -85,8 +85,8 @@ func (r *Ring) ReplicationFactor() int { // IngesterCount is number of ingesters in the ring func (r *Ring) IngesterCount() int { - r.mtx.Lock() + r.mtx.RLock() c := len(r.ringDesc.Ingesters) - r.mtx.Unlock() + r.mtx.RUnlock() return c } diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 95acfd11878..e572b0500b3 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -4,11 +4,13 @@ package ring import ( "context" + "crypto/md5" + "encoding/binary" "errors" "flag" "fmt" "math" - "sort" + "math/rand" "sync" "time" @@ -47,7 +49,10 @@ type ReadRing interface { GetAll(op Operation) (ReplicationSet, error) ReplicationFactor() int IngesterCount() int - Subring(key uint32, n int) ReadRing + + // ShuffleShard returns a subring for the provided identifier (eg. a tenant ID) + // and size (number of instances). + ShuffleShard(identifier string, size int) ReadRing // HasInstance returns whether the ring contains an instance matching the provided instanceID. HasInstance(instanceID string) bool @@ -107,9 +112,14 @@ type Ring struct { KVClient kv.Client strategy ReplicationStrategy - mtx sync.RWMutex - ringDesc *Desc - ringTokens []TokenDesc + mtx sync.RWMutex + ringDesc *Desc + ringTokens []TokenDesc + ringTokensByZone map[string][]TokenDesc + + // List of zones for which there's at least 1 instance in the ring. This list is guaranteed + // to be sorted alphabetically. + ringZones []string memberOwnershipDesc *prometheus.Desc numMembersDesc *prometheus.Desc @@ -190,11 +200,15 @@ func (r *Ring) loop(ctx context.Context) error { ringDesc := value.(*Desc) ringTokens := ringDesc.getTokens() + ringTokensByZone := ringDesc.getTokensByZone() + ringZones := getZones(ringTokensByZone) r.mtx.Lock() defer r.mtx.Unlock() r.ringDesc = ringDesc r.ringTokens = ringTokens + r.ringTokensByZone = ringTokensByZone + r.ringZones = ringZones return true }) return nil @@ -213,7 +227,7 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet ingesters = buf[:0] distinctHosts = map[string]struct{}{} distinctZones = map[string]struct{}{} - start = r.search(key) + start = searchToken(r.ringTokens, key) iterations = 0 ) for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ { @@ -290,16 +304,6 @@ func (r *Ring) GetAll(op Operation) (ReplicationSet, error) { }, nil } -func (r *Ring) search(key uint32) int { - i := sort.Search(len(r.ringTokens), func(x int) bool { - return r.ringTokens[x].Token > key - }) - if i >= len(r.ringTokens) { - i = 0 - } - return i -} - // Describe implements prometheus.Collector. func (r *Ring) Describe(ch chan<- *prometheus.Desc) { ch <- r.memberOwnershipDesc @@ -411,7 +415,7 @@ func (r *Ring) Subring(key uint32, n int) ReadRing { var ( ingesters = make(map[string]IngesterDesc, n) distinctHosts = map[string]struct{}{} - start = r.search(key) + start = searchToken(r.ringTokens, key) iterations = 0 ) @@ -464,6 +468,102 @@ func (r *Ring) Subring(key uint32, n int) ReadRing { return sub } +// ShuffleShard returns a subring for the provided identifier (eg. a tenant ID) +// and size (number of instances). The size is expected to be a multiple of the +// number of zones and the returned subring will contain the same number of +// instances per zone as far as there are enough registered instances in the ring. +// +// The algorithm used to build the subring is a shuffle sharder based on probabilistic +// hashing. We treat each zone as a separate ring and pick N unique replicas from each +// zone, walking the ring starting from random but predictable numbers. The random +// generator is initialised with a seed based on the provided identifier. +// +// This implementation guarantees: +// +// - Stability: given the same ring, two invocations returns the same result. +// +// - Consistency: adding/removing 1 instance from the ring generates a resulting +// subring with no more then 1 difference. +// +// - Shuffling: probabilistically, for a large enough cluster each identifier gets a different +// set of instances, with a reduced number of overlapping instances between two identifiers. +func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { + // Nothing to do if the shard size is not smaller then the actual ring. + if size <= 0 || r.IngesterCount() <= size { + return r + } + + // Use the identifier to compute an hash we'll use to seed the random. + hasher := md5.New() + hasher.Write([]byte(identifier)) // nolint:errcheck + checksum := hasher.Sum(nil) + + // Generate the seed based on the first 64 bits of the checksum. + seed := int64(binary.BigEndian.Uint64(checksum)) + + // Initialise the random generator used to select instances in the ring. + random := rand.New(rand.NewSource(seed)) + + r.mtx.RLock() + defer r.mtx.RUnlock() + + // We expect the shard size to be divisible by the number of zones, in order to + // have nodes balanced across zones. If it's not, we do round up. + numInstancesPerZone := int(math.Ceil(float64(size) / float64(len(r.ringZones)))) + + shard := make(map[string]IngesterDesc, size) + + // We need to iterate zones always in the same order to guarantee stability. + for _, zone := range r.ringZones { + tokens := r.ringTokensByZone[zone] + + // To select one more instance while guaranteeing the "consistency" property, + // we do pick a random value from the generator and resolve uniqueness collisions + // (if any) continuing walking the ring. + for i := 0; i < numInstancesPerZone; i++ { + start := searchToken(tokens, random.Uint32()) + iterations := 0 + found := false + + for p := start; iterations < len(tokens); p++ { + iterations++ + + // Wrap p around in the ring. + p %= len(tokens) + + // Ensure we select an unique instance. + if _, ok := shard[tokens[p].Ingester]; ok { + continue + } + + shard[tokens[p].Ingester] = r.ringDesc.Ingesters[tokens[p].Ingester] + found = true + break + } + + // If one more instance has not been found, we can stop looking for + // more instances in this zone, because it means the zone has no more + // instances which haven't been already selected. + if !found { + break + } + } + } + + // Build a read-only ring for the shard. + shardDesc := &Desc{Ingesters: shard} + shardTokensByZone := shardDesc.getTokensByZone() + + return &Ring{ + cfg: r.cfg, + strategy: r.strategy, + ringDesc: shardDesc, + ringTokens: shardDesc.getTokens(), + ringTokensByZone: shardTokensByZone, + ringZones: getZones(shardTokensByZone), + } +} + // GetInstanceState returns the current state of an instance or an error if the // instance does not exist in the ring. func (r *Ring) GetInstanceState(instanceID string) (IngesterState, error) { diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index d7085151ae7..1ef531fed4a 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -3,14 +3,17 @@ package ring import ( "context" "fmt" + "math" "math/rand" "sort" strconv "strconv" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" ) @@ -121,6 +124,366 @@ func TestAddIngesterReplacesExistingTokens(t *testing.T) { require.Equal(t, newTokens, r.Ingesters[ing1Name].Tokens) } +func TestRing_ShuffleShard(t *testing.T) { + tests := map[string]struct { + ringInstances map[string]IngesterDesc + shardSize int + expectedSize int + expectedDistribution []int + }{ + "empty ring": { + ringInstances: nil, + shardSize: 2, + expectedSize: 0, + expectedDistribution: []int{}, + }, + "single zone, shard size > num instances": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + }, + shardSize: 3, + expectedSize: 2, + expectedDistribution: []int{2}, + }, + "single zone, shard size < num instances": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + }, + shardSize: 2, + expectedSize: 2, + expectedDistribution: []int{2}, + }, + "multiple zones, shard size < num zones": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + shardSize: 2, + expectedSize: 3, + expectedDistribution: []int{1, 1, 1}, + }, + "multiple zones, shard size divisible by num zones": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + shardSize: 3, + expectedSize: 3, + expectedDistribution: []int{1, 1, 1}, + }, + "multiple zones, shard size NOT divisible by num zones": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + shardSize: 4, + expectedSize: 6, + expectedDistribution: []int{2, 2, 2}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + // Init the ring description. + ringDesc := &Desc{Ingesters: testData.ringInstances} + for id, instance := range ringDesc.Ingesters { + instance.Timestamp = time.Now().Unix() + instance.State = ACTIVE + ringDesc.Ingesters[id] = instance + } + + ring := Ring{ + cfg: Config{HeartbeatTimeout: time.Hour}, + ringDesc: ringDesc, + ringTokens: ringDesc.getTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, + } + + shardRing := ring.ShuffleShard("tenant-id", testData.shardSize) + assert.Equal(t, testData.expectedSize, shardRing.IngesterCount()) + + // Compute the actual distribution of instances across zones. + var actualDistribution []int + + if shardRing.IngesterCount() > 0 { + all, err := shardRing.GetAll(Read) + require.NoError(t, err) + + countByZone := map[string]int{} + for _, instance := range all.Ingesters { + countByZone[instance.Zone]++ + } + + for _, count := range countByZone { + actualDistribution = append(actualDistribution, count) + } + } + + assert.ElementsMatch(t, testData.expectedDistribution, actualDistribution) + }) + } +} + +// This test asserts on shard stability across multiple invocations and given the same input ring. +func TestRing_ShuffleShard_Stability(t *testing.T) { + var ( + numTenants = 100 + numInstances = 50 + numZones = 3 + numInvocations = 10 + shardSizes = []int{3, 6, 9, 12, 15} + ) + + // Initialise the ring. + ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones)} + ring := Ring{ + cfg: Config{HeartbeatTimeout: time.Hour}, + ringDesc: ringDesc, + ringTokens: ringDesc.getTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, + } + + for i := 1; i <= numTenants; i++ { + tenantID := fmt.Sprintf("%d", i) + + for _, size := range shardSizes { + r := ring.ShuffleShard(tenantID, size) + expected, err := r.GetAll(Read) + require.NoError(t, err) + + // Assert that multiple invocations generate the same exact shard. + for n := 0; n < numInvocations; n++ { + r := ring.ShuffleShard(tenantID, size) + actual, err := r.GetAll(Read) + require.NoError(t, err) + assert.ElementsMatch(t, expected.Ingesters, actual.Ingesters) + } + } + } +} + +func TestRing_ShuffleShard_Shuffling(t *testing.T) { + var ( + numTenants = 1000 + numInstances = 90 + numZones = 3 + shardSize = 3 + + // This is the expected theoretical distribution of matching instances + // between different shards, given the settings above. It has been computed + // using this spreadsheet: + // https://docs.google.com/spreadsheets/d/1FXbiWTXi6bdERtamH-IfmpgFq1fNL4GP_KX_yJvbRi4/edit + theoreticalMatchings = map[int]float64{ + 0: 90.2239, + 1: 9.55312, + 2: 0.22217, + 3: 0.00085, + } + ) + + // Initialise the ring instances. To have stable tests we generate tokens using a linear + // distribution. Tokens within the same zone are evenly distributed too. + instances := make(map[string]IngesterDesc, numInstances) + for i := 0; i < numInstances; i++ { + id := fmt.Sprintf("instance-%d", i) + instances[id] = IngesterDesc{ + Addr: fmt.Sprintf("127.0.0.%d", i), + Timestamp: time.Now().Unix(), + State: ACTIVE, + Tokens: generateTokensLinear(i, numInstances, 128), + Zone: fmt.Sprintf("zone-%d", i%numZones), + } + } + + // Initialise the ring. + ringDesc := &Desc{Ingesters: instances} + ring := Ring{ + cfg: Config{HeartbeatTimeout: time.Hour}, + ringDesc: ringDesc, + ringTokens: ringDesc.getTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, + } + + // Compute the shard for each tenant. + shards := map[string][]string{} + + for i := 1; i <= numTenants; i++ { + tenantID := fmt.Sprintf("%d", i) + r := ring.ShuffleShard(tenantID, shardSize) + set, err := r.GetAll(Read) + require.NoError(t, err) + + instances := make([]string, 0, len(set.Ingesters)) + for _, instance := range set.Ingesters { + instances = append(instances, instance.Addr) + } + + shards[tenantID] = instances + } + + // Compute the distribution of matching instances between every combination of shards. + // The shards comparison is not optimized, but it's fine for a test. + distribution := map[int]int{} + + for currID, currShard := range shards { + for otherID, otherShard := range shards { + if currID == otherID { + continue + } + + numMatching := 0 + for _, c := range currShard { + if util.StringsContain(otherShard, c) { + numMatching++ + } + } + + distribution[numMatching]++ + } + } + + maxCombinations := int(math.Pow(float64(numTenants), 2)) - numTenants + for numMatching, probability := range theoreticalMatchings { + // We allow a max deviance of 10% compared to the theoretical probability, + // clamping it between 1% and 0.1% boundaries. + maxDeviance := math.Min(1, math.Max(0.1, probability*0.1)) + + actual := (float64(distribution[numMatching]) / float64(maxCombinations)) * 100 + assert.InDelta(t, probability, actual, maxDeviance, "numMatching: %d", numMatching) + } +} + +func TestRing_ShuffleShard_Consistency(t *testing.T) { + type change string + + type scenario struct { + name string + numInstances int + numZones int + shardSize int + ringChange change + } + + const ( + numTenants = 100 + add = change("add-instance") + remove = change("remove-instance") + ) + + // Generate all test scenarios. + var scenarios []scenario + for _, numInstances := range []int{20, 30, 40, 50} { + for _, shardSize := range []int{3, 6, 9, 12, 15} { + for _, c := range []change{add, remove} { + scenarios = append(scenarios, scenario{ + name: fmt.Sprintf("instances = %d, shard size = %d, ring operation = %s", numInstances, shardSize, c), + numInstances: numInstances, + numZones: 3, + shardSize: shardSize, + ringChange: c, + }) + } + } + } + + for _, s := range scenarios { + t.Run(s.name, func(t *testing.T) { + // Initialise the ring. + ringDesc := &Desc{Ingesters: generateRingInstances(s.numInstances, s.numZones)} + ring := Ring{ + cfg: Config{HeartbeatTimeout: time.Hour}, + ringDesc: ringDesc, + ringTokens: ringDesc.getTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, + } + + // Compute the initial shard for each tenant. + initial := map[int]ReplicationSet{} + for id := 0; id < numTenants; id++ { + set, err := ring.ShuffleShard(fmt.Sprintf("%d", id), s.shardSize).GetAll(Read) + require.NoError(t, err) + initial[id] = set + } + + // Update the ring. + switch s.ringChange { + case add: + newID, newDesc := generateRingInstance(s.numInstances+1, 0) + ringDesc.Ingesters[newID] = newDesc + case remove: + // Remove the first one. + for id := range ringDesc.Ingesters { + delete(ringDesc.Ingesters, id) + break + } + } + + ring.ringTokens = ringDesc.getTokens() + ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringZones = getZones(ringDesc.getTokensByZone()) + + // Compute the update shard for each tenant and compare it with the initial one. + // If the "consistency" property is guaranteed, we expect no more then 1 different instance + // in the updated shard. + for id := 0; id < numTenants; id++ { + updated, err := ring.ShuffleShard(fmt.Sprintf("%d", id), s.shardSize).GetAll(Read) + require.NoError(t, err) + + added, removed := compareReplicationSets(initial[id], updated) + assert.LessOrEqual(t, len(added), 1) + assert.LessOrEqual(t, len(removed), 1) + } + }) + } +} + +func BenchmarkRing_ShuffleShard(b *testing.B) { + for _, numInstances := range []int{50, 100, 1000} { + for _, numZones := range []int{1, 3} { + for _, shardSize := range []int{3, 10, 30} { + b.Run(fmt.Sprintf("num instances = %d, num zones = %d, shard size = %d", numInstances, numZones, shardSize), func(b *testing.B) { + // Initialise the ring. + ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones)} + ring := Ring{ + cfg: Config{HeartbeatTimeout: time.Hour}, + ringDesc: ringDesc, + ringTokens: ringDesc.getTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, + } + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + ring.ShuffleShard("tenant-1", shardSize) + } + }) + } + } + } +} + func TestSubring(t *testing.T) { r := NewDesc() @@ -140,9 +503,11 @@ func TestSubring(t *testing.T) { cfg: Config{ HeartbeatTimeout: time.Hour, }, - ringDesc: r, - ringTokens: r.getTokens(), - strategy: &DefaultReplicationStrategy{}, + ringDesc: r, + ringTokens: r.getTokens(), + ringTokensByZone: r.getTokensByZone(), + ringZones: getZones(r.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, } // Generate a sub ring for all possible valid ranges @@ -189,9 +554,11 @@ func TestStableSubring(t *testing.T) { cfg: Config{ HeartbeatTimeout: time.Hour, }, - ringDesc: r, - ringTokens: r.getTokens(), - strategy: &DefaultReplicationStrategy{}, + ringDesc: r, + ringTokens: r.getTokens(), + ringTokensByZone: r.getTokensByZone(), + ringZones: getZones(r.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, } // Generate the same subring multiple times @@ -247,9 +614,11 @@ func TestZoneAwareIngesterAssignmentSucccess(t *testing.T) { HeartbeatTimeout: time.Hour, ReplicationFactor: 3, }, - ringDesc: r, - ringTokens: r.getTokens(), - strategy: &DefaultReplicationStrategy{}, + ringDesc: r, + ringTokens: r.getTokens(), + ringTokensByZone: r.getTokensByZone(), + ringZones: getZones(r.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, } // use the GenerateTokens to get an array of random uint32 values testValues := make([]uint32, testCount) @@ -311,9 +680,11 @@ func TestZoneAwareIngesterAssignmentFailure(t *testing.T) { HeartbeatTimeout: time.Hour, ReplicationFactor: 3, }, - ringDesc: r, - ringTokens: r.getTokens(), - strategy: &DefaultReplicationStrategy{}, + ringDesc: r, + ringTokens: r.getTokens(), + ringTokensByZone: r.getTokensByZone(), + ringZones: getZones(r.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, } // use the GenerateTokens to get an array of random uint32 values testValues := make([]uint32, testCount) @@ -335,3 +706,54 @@ func TestZoneAwareIngesterAssignmentFailure(t *testing.T) { } } + +// generateTokensLinear returns tokens with a linear distribution. +func generateTokensLinear(instanceID, numInstances, numTokens int) []uint32 { + tokens := make([]uint32, 0, numTokens) + step := math.MaxUint32 / numTokens + offset := (step / numInstances) * instanceID + + for t := offset; t <= math.MaxUint32; t += step { + tokens = append(tokens, uint32(t)) + } + + return tokens +} + +func generateRingInstances(numInstances, numZones int) map[string]IngesterDesc { + instances := make(map[string]IngesterDesc, numInstances) + + for i := 1; i <= numInstances; i++ { + id, desc := generateRingInstance(i, i%numZones) + instances[id] = desc + } + + return instances +} + +func generateRingInstance(id, zone int) (string, IngesterDesc) { + return fmt.Sprintf("instance-%d", id), IngesterDesc{ + Addr: fmt.Sprintf("127.0.0.%d", id), + Timestamp: time.Now().Unix(), + State: ACTIVE, + Tokens: GenerateTokens(128, nil), + Zone: fmt.Sprintf("zone-%d", zone), + } +} + +// compareReplicationSets returns the list of instance addresses which differ between the two sets. +func compareReplicationSets(first, second ReplicationSet) (added, removed []string) { + for _, instance := range first.Ingesters { + if !second.Includes(instance.Addr) { + added = append(added, instance.Addr) + } + } + + for _, instance := range second.Ingesters { + if !first.Includes(instance.Addr) { + removed = append(removed, instance.Addr) + } + } + + return +} diff --git a/pkg/ring/util.go b/pkg/ring/util.go index cf9fea3ed7f..c234d05b9ce 100644 --- a/pkg/ring/util.go +++ b/pkg/ring/util.go @@ -3,6 +3,7 @@ package ring import ( "context" "math/rand" + "sort" "time" "github.com/cortexproject/cortex/pkg/util" @@ -17,7 +18,7 @@ func GenerateTokens(numTokens int, takenTokens []uint32) []uint32 { r := rand.New(rand.NewSource(time.Now().UnixNano())) - used := make(map[uint32]bool) + used := make(map[uint32]bool, len(takenTokens)) for _, v := range takenTokens { used[v] = true } @@ -80,3 +81,27 @@ func WaitInstanceState(ctx context.Context, r *Ring, instanceID string, state In return backoff.Err() } + +// getZones return the list zones from the provided tokens. The returned list +// is guaranteed to be sorted. +func getZones(tokens map[string][]TokenDesc) []string { + var zones []string + + for zone := range tokens { + zones = append(zones, zone) + } + + sort.Strings(zones) + return zones +} + +// searchToken returns the offset of the tokens entry holding the range for the provided key. +func searchToken(tokens []TokenDesc, key uint32) int { + i := sort.Search(len(tokens), func(x int) bool { + return tokens[x].Token > key + }) + if i >= len(tokens) { + i = 0 + } + return i +} diff --git a/pkg/storage/tsdb/util.go b/pkg/storage/tsdb/util.go index 7687e0dbc9c..c13f5a2b7c4 100644 --- a/pkg/storage/tsdb/util.go +++ b/pkg/storage/tsdb/util.go @@ -15,9 +15,3 @@ func HashBlockID(id ulid.ULID) uint32 { } return h } - -// HashTenantID returns a 32-bit hash of the tenant ID useful for -// ring-based sharding. -func HashTenantID(id string) uint32 { - return client.HashAdd32a(client.HashNew32a(), id) -} diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index 068c914a8dc..9945735db4d 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -154,7 +154,7 @@ func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLi return ring } - return ring.Subring(cortex_tsdb.HashTenantID(userID), shardSize) + return ring.ShuffleShard(userID, shardSize) } type shardingMetadataFilterAdapter struct { diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index 9ab039e8249..f17bfaaa499 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -291,10 +291,7 @@ func TestShuffleShardingStrategy(t *testing.T) { block3Hash := cortex_tsdb.HashBlockID(block3) block4Hash := cortex_tsdb.HashBlockID(block4) - // Ensure the user ID we use belongs to the instances holding the token for the block 1 - // (it's expected by the assertions below). userID := "user-A" - require.LessOrEqual(t, cortex_tsdb.HashTenantID(userID), block1Hash) type usersExpectation struct { instanceID string @@ -499,24 +496,24 @@ func TestShuffleShardingStrategy(t *testing.T) { limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block4Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) - r.Ingesters["instance-2"] = ring.IngesterDesc{ - Addr: "127.0.0.2", + r.Ingesters["instance-3"] = ring.IngesterDesc{ + Addr: "127.0.0.3", Timestamp: time.Now().Add(-time.Hour).Unix(), State: ring.ACTIVE, - Tokens: []uint32{block2Hash + 1}, + Tokens: []uint32{block3Hash + 1}, } }, expectedUsers: []usersExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, - {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, - {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: nil}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}}, }, expectedBlocks: []blocksExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, - {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{ /* no blocks because unhealthy */ }}, - {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{ /* no blocks because unhealthy */ }}, }, }, "LEAVING instance in the ring should continue to keep its shard blocks but they should also be replicated to another instance": { @@ -524,18 +521,18 @@ func TestShuffleShardingStrategy(t *testing.T) { limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.LEAVING) - r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.LEAVING) }, expectedUsers: []usersExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, - {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, - {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: nil}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}}, }, expectedBlocks: []blocksExpectation{ - {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3, block4 /* replicated: */, block2}}, - {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2}}, - {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }}, + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3 /* replicated: */, block4}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{block4}}, }, }, "JOINING instance in the ring should get its shard blocks but they should also be replicated to another instance": { @@ -543,18 +540,18 @@ func TestShuffleShardingStrategy(t *testing.T) { limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.JOINING) - r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.JOINING) }, expectedUsers: []usersExpectation{ {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, - {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, - {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: nil}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}}, }, expectedBlocks: []blocksExpectation{ - {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3, block4 /* replicated: */, block2}}, - {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2}}, - {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }}, + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3 /* replicated: */, block4}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{block4}}, }, }, "SS = 0 disables shuffle sharding": { diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 1670d8e995f..ea5c6054541 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -45,7 +45,7 @@ type Limits struct { CreationGracePeriod time.Duration `yaml:"creation_grace_period"` EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name"` EnforceMetricName bool `yaml:"enforce_metric_name"` - SubringSize int `yaml:"user_subring_size"` + IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size"` // Ingester enforced limits. // Series @@ -83,7 +83,7 @@ type Limits struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (l *Limits) RegisterFlags(f *flag.FlagSet) { - f.IntVar(&l.SubringSize, "experimental.distributor.user-subring-size", 0, "Per-user subring to shard metrics to ingesters. 0 is disabled.") + f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set both on ingesters and distributors. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).") f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") @@ -360,9 +360,9 @@ func (o *Overrides) MaxGlobalMetadataPerMetric(userID string) int { return o.getOverridesForUser(userID).MaxGlobalMetadataPerMetric } -// SubringSize returns the size of the subring for a given user. -func (o *Overrides) SubringSize(userID string) int { - return o.getOverridesForUser(userID).SubringSize +// IngestionTenantShardSize returns the ingesters shard size for a given user. +func (o *Overrides) IngestionTenantShardSize(userID string) int { + return o.getOverridesForUser(userID).IngestionTenantShardSize } // EvaluationDelay returns the rules evaluation delay for a given user. @@ -370,7 +370,7 @@ func (o *Overrides) EvaluationDelay(userID string) time.Duration { return o.getOverridesForUser(userID).RulerEvaluationDelay } -// StoreGatewayTenantShardSize returns the size of the store-gateway shard size for a given user. +// StoreGatewayTenantShardSize returns the store-gateway shard size for a given user. func (o *Overrides) StoreGatewayTenantShardSize(userID string) int { return o.getOverridesForUser(userID).StoreGatewayTenantShardSize }