Skip to content

Allow shard sizes to be percent of instances #5393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [FEATURE] Added 2 flags `-alertmanager.alertmanager-client.grpc-max-send-msg-size` and ` -alertmanager.alertmanager-client.grpc-max-recv-msg-size` to configure alert manager grpc client message size limits. #5338
* [FEATURE] Query Frontend: Add `cortex_rejected_queries_total` metric for throttled queries. #5356
* [FEATURE] Querier: Log query stats when querying store gateway. #5376
* [FEATURE] Querier/StoreGateway: Allow the tenant shard sizes to be a percent of total instances. #5393
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323
Expand Down
18 changes: 10 additions & 8 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2938,13 +2938,14 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s

# Maximum number of queriers that can handle requests for a single tenant. If
# set to 0 or value higher than number of available queriers, *all* queriers
# will handle requests for the tenant. Each frontend (or query-scheduler, if
# used) will select the same set of queriers for the same tenant (given that all
# queriers are connected to all frontends / query-schedulers). This option only
# works with queriers connecting to the query-frontend / query-scheduler, not
# when using downstream URL.
# will handle requests for the tenant. If the value is < 1, it will be treated
# as a percentage and the gets a percentage of the total queriers. Each frontend
# (or query-scheduler, if used) will select the same set of queriers for the
# same tenant (given that all queriers are connected to all frontends /
# query-schedulers). This option only works with queriers connecting to the
# query-frontend / query-scheduler, not when using downstream URL.
# CLI flag: -frontend.max-queriers-per-tenant
[max_queriers_per_tenant: <int> | default = 0]
[max_queriers_per_tenant: <float> | default = 0]

# Maximum number of outstanding requests per tenant per request queue (either
# query frontend or query scheduler); requests beyond this error with HTTP 429.
Expand Down Expand Up @@ -2973,9 +2974,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# The default tenant's shard size when the shuffle-sharding strategy is used.
# Must be set when the store-gateway sharding is enabled with the
# shuffle-sharding strategy. When this setting is specified in the per-tenant
# overrides, a value of 0 disables shuffle sharding for the tenant.
# overrides, a value of 0 disables shuffle sharding for the tenant. If the value
# is < 1 the shard size will be a percentage of the total store-gateways.
# CLI flag: -store-gateway.tenant-shard-size
[store_gateway_tenant_shard_size: <int> | default = 0]
[store_gateway_tenant_shard_size: <float> | default = 0]

# The maximum number of data bytes to download per gRPC request in Store
# Gateway, including Series/LabelNames/LabelValues requests. 0 to disable.
Expand Down
8 changes: 4 additions & 4 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

type Limits interface {
// Returns max queriers to use per tenant, or 0 if shuffle sharding is disabled.
MaxQueriersPerUser(user string) int
MaxQueriersPerUser(user string) float64

queue.Limits
}

// MockLimits implements the Limits interface. Used in tests only.
type MockLimits struct {
Queriers int
Queriers float64
queue.MockLimits
}

func (l MockLimits) MaxQueriersPerUser(_ string) int {
func (l MockLimits) MaxQueriersPerUser(_ string) float64 {
return l.Queriers
}

Expand Down Expand Up @@ -338,7 +338,7 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error {
req.queueSpan, _ = opentracing.StartSpanFromContext(ctx, "queued")

// aggregate the max queriers limit in the case of a multi tenant query
maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueriersPerUser)
maxQueriers := validation.SmallestPositiveNonZeroFloat64PerTenant(tenantIDs, f.limits.MaxQueriersPerUser)

joinedTenantID := tenant.JoinTenantIDs(tenantIDs)
f.activeUsers.UpdateUserTimestamp(joinedTenantID, now)
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type BlocksStoreLimits interface {
bucket.TenantConfigProvider

MaxChunksPerQueryFromStore(userID string) int
StoreGatewayTenantShardSize(userID string) int
StoreGatewayTenantShardSize(userID string) float64
}

type blocksStoreQueryableMetrics struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1558,14 +1558,14 @@ func (m *storeGatewaySeriesClientMock) Recv() (*storepb.SeriesResponse, error) {

type blocksStoreLimitsMock struct {
maxChunksPerQuery int
storeGatewayTenantShardSize int
storeGatewayTenantShardSize float64
}

func (m *blocksStoreLimitsMock) MaxChunksPerQueryFromStore(_ string) int {
return m.maxChunksPerQuery
}

func (m *blocksStoreLimitsMock) StoreGatewayTenantShardSize(_ string) int {
func (m *blocksStoreLimitsMock) StoreGatewayTenantShardSize(_ string) float64 {
return m.storeGatewayTenantShardSize
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_replicated_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {

tests := map[string]struct {
shardingStrategy string
tenantShardSize int
tenantShardSize float64
replicationFactor int
setup func(*ring.Desc)
queryBlocks []ulid.ULID
Expand Down
6 changes: 4 additions & 2 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand Down Expand Up @@ -86,15 +87,16 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, que
// between calls.
//
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int, successFn func()) error {
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers float64, successFn func()) error {
q.mtx.Lock()
defer q.mtx.Unlock()

if q.stopped {
return ErrStopped
}

queue := q.queues.getOrAddQueue(userID, maxQueriers)
shardSize := util.DynamicShardSize(maxQueriers, len(q.queues.queriers))
queue := q.queues.getOrAddQueue(userID, shardSize)
if queue == nil {
// This can only happen if userID is "".
return errors.New("no queue found")
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
// Limits needed for the Query Scheduler - interface used for decoupling.
type Limits interface {
// MaxQueriersPerUser returns max queriers to use per tenant, or 0 if shuffle sharding is disabled.
MaxQueriersPerUser(user string) int
MaxQueriersPerUser(user string) float64

queue.Limits
}
Expand Down Expand Up @@ -307,7 +307,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr
if err != nil {
return err
}
maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, s.limits.MaxQueriersPerUser)
maxQueriers := validation.SmallestPositiveNonZeroFloat64PerTenant(tenantIDs, s.limits.MaxQueriersPerUser)

s.activeUsers.UpdateUserTimestamp(userID, now)
return s.requestQueue.EnqueueRequest(userID, req, maxQueriers, func() {
Expand Down
14 changes: 11 additions & 3 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func TestStoreGateway_InitialSyncWithWaitRingStability(t *testing.T) {

tests := map[string]struct {
shardingStrategy string
tenantShardSize int // Used only when the sharding strategy is shuffle-sharding.
tenantShardSize float64 // Used only when the sharding strategy is shuffle-sharding.
replicationFactor int
numGateways int
expectedBlocksLoaded int
Expand Down Expand Up @@ -291,6 +291,13 @@ func TestStoreGateway_InitialSyncWithWaitRingStability(t *testing.T) {
numGateways: 20,
expectedBlocksLoaded: 3 * numBlocks, // blocks are replicated 3 times
},
"shuffle sharding strategy, 20 gateways, RF = 3, SS = 0.5": {
shardingStrategy: util.ShardingStrategyShuffle,
tenantShardSize: 0.5,
replicationFactor: 3,
numGateways: 20,
expectedBlocksLoaded: 3 * numBlocks, // blocks are replicated 3 times
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -361,8 +368,9 @@ func TestStoreGateway_InitialSyncWithWaitRingStability(t *testing.T) {
assert.Equal(t, float64(2*testData.numGateways), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_discovered"))

if testData.shardingStrategy == util.ShardingStrategyShuffle {
assert.Equal(t, float64(testData.tenantShardSize*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced"))
assert.Equal(t, float64(testData.tenantShardSize*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced"))
shards := util.DynamicShardSize(testData.tenantShardSize, testData.numGateways)
assert.Equal(t, float64(shards*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced"))
assert.Equal(t, float64(shards*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced"))
} else {
assert.Equal(t, float64(testData.numGateways*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced"))
assert.Equal(t, float64(testData.numGateways*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced"))
Expand Down
5 changes: 3 additions & 2 deletions pkg/storegateway/sharding_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/cortexproject/cortex/pkg/ring"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
)

const (
Expand All @@ -32,7 +33,7 @@ type ShardingStrategy interface {
// ShardingLimits is the interface that should be implemented by the limits provider,
// limiting the scope of the limits to the ones required by sharding strategies.
type ShardingLimits interface {
StoreGatewayTenantShardSize(userID string) int
StoreGatewayTenantShardSize(userID string) float64
}

// NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out.
Expand Down Expand Up @@ -173,7 +174,7 @@ func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[
// GetShuffleShardingSubring returns the subring to be used for a given user. This function
// should be used both by store-gateway and querier in order to guarantee the same logic is used.
func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing {
shardSize := limits.StoreGatewayTenantShardSize(userID)
shardSize := util.DynamicShardSize(limits.StoreGatewayTenantShardSize(userID), ring.InstancesCount())

// A shard size of 0 means shuffle sharding is disabled for this specific user,
// so we just return the full ring so that blocks will be sharded across all store-gateways.
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/sharding_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,9 +662,9 @@ func TestShuffleShardingStrategy(t *testing.T) {
}

type shardingLimitsMock struct {
storeGatewayTenantShardSize int
storeGatewayTenantShardSize float64
}

func (m *shardingLimitsMock) StoreGatewayTenantShardSize(_ string) int {
func (m *shardingLimitsMock) StoreGatewayTenantShardSize(_ string) float64 {
return m.storeGatewayTenantShardSize
}
9 changes: 9 additions & 0 deletions pkg/util/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,12 @@ func ShuffleShardExpectedInstancesPerZone(shardSize, numZones int) int {
func ShuffleShardExpectedInstances(shardSize, numZones int) int {
return ShuffleShardExpectedInstancesPerZone(shardSize, numZones) * numZones
}

// DynamicShardSize returns the shard size as a percentage of numInstances if the value is < 1. If the value is >= 1, the value is rounded and returned.
func DynamicShardSize(value float64, numInstances int) int {
var shardSize = int(math.Ceil(value))
if value < 1 {
shardSize = int(math.Ceil(float64(numInstances) * value))
}
return shardSize
}
58 changes: 58 additions & 0 deletions pkg/util/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,61 @@ func TestShuffleShardExpectedInstances(t *testing.T) {
assert.Equal(t, test.expected, ShuffleShardExpectedInstances(test.shardSize, test.numZones))
}
}

func TestDynamicShardSize(t *testing.T) {
tests := []struct {
value float64
numInstances int
expected int
}{
{
value: 0,
numInstances: 100,
expected: 0,
},
{
value: 0.1,
numInstances: 100,
expected: 10,
},
{
value: 0.01,
numInstances: 100,
expected: 1,
},
{
value: 3,
numInstances: 100,
expected: 3,
},
{
value: 0.4,
numInstances: 100,
expected: 40,
},
{
value: 1,
numInstances: 100,
expected: 1,
},
{
value: 0.99999,
numInstances: 100,
expected: 100,
},
{
value: 0.5,
numInstances: 3,
expected: 2,
},
{
value: 0.8,
numInstances: 3,
expected: 3,
},
}

for _, test := range tests {
assert.Equal(t, test.expected, DynamicShardSize(test.value, test.numInstances))
}
}
20 changes: 10 additions & 10 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type Limits struct {
MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"`
MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"`
MaxCacheFreshness model.Duration `yaml:"max_cache_freshness" json:"max_cache_freshness"`
MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"`
MaxQueriersPerTenant float64 `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"`
QueryVerticalShardSize int `yaml:"query_vertical_shard_size" json:"query_vertical_shard_size" doc:"hidden"`

// Query Frontend / Scheduler enforced limits.
Expand All @@ -95,8 +95,8 @@ type Limits struct {
RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"`

// Store-gateway.
StoreGatewayTenantShardSize int `yaml:"store_gateway_tenant_shard_size" json:"store_gateway_tenant_shard_size"`
MaxDownloadedBytesPerRequest int `yaml:"max_downloaded_bytes_per_request" json:"max_downloaded_bytes_per_request"`
StoreGatewayTenantShardSize float64 `yaml:"store_gateway_tenant_shard_size" json:"store_gateway_tenant_shard_size"`
MaxDownloadedBytesPerRequest int `yaml:"max_downloaded_bytes_per_request" json:"max_downloaded_bytes_per_request"`

// Compactor.
CompactorBlocksRetentionPeriod model.Duration `yaml:"compactor_blocks_retention_period" json:"compactor_blocks_retention_period"`
Expand Down Expand Up @@ -168,7 +168,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split queries will be scheduled in parallel by the frontend.")
_ = l.MaxCacheFreshness.Set("1m")
f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.")
f.IntVar(&l.MaxQueriersPerTenant, "frontend.max-queriers-per-tenant", 0, "Maximum number of queriers that can handle requests for a single tenant. If set to 0 or value higher than number of available queriers, *all* queriers will handle requests for the tenant. Each frontend (or query-scheduler, if used) will select the same set of queriers for the same tenant (given that all queriers are connected to all frontends / query-schedulers). This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL.")
f.Float64Var(&l.MaxQueriersPerTenant, "frontend.max-queriers-per-tenant", 0, "Maximum number of queriers that can handle requests for a single tenant. If set to 0 or value higher than number of available queriers, *all* queriers will handle requests for the tenant. If the value is < 1, it will be treated as a percentage and the gets a percentage of the total queriers. Each frontend (or query-scheduler, if used) will select the same set of queriers for the same tenant (given that all queriers are connected to all frontends / query-schedulers). This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL.")
f.IntVar(&l.QueryVerticalShardSize, "frontend.query-vertical-shard-size", 0, "[Experimental] Number of shards to use when distributing shardable PromQL queries.")

f.IntVar(&l.MaxOutstandingPerTenant, "frontend.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per request queue (either query frontend or query scheduler); requests beyond this error with HTTP 429.")
Expand All @@ -182,7 +182,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.CompactorTenantShardSize, "compactor.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the compactor. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")

// Store-gateway.
f.IntVar(&l.StoreGatewayTenantShardSize, "store-gateway.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set when the store-gateway sharding is enabled with the shuffle-sharding strategy. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
f.Float64Var(&l.StoreGatewayTenantShardSize, "store-gateway.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set when the store-gateway sharding is enabled with the shuffle-sharding strategy. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 the shard size will be a percentage of the total store-gateways.")
f.IntVar(&l.MaxDownloadedBytesPerRequest, "store-gateway.max-downloaded-bytes-per-request", 0, "The maximum number of data bytes to download per gRPC request in Store Gateway, including Series/LabelNames/LabelValues requests. 0 to disable.")

// Alertmanager.
Expand Down Expand Up @@ -455,7 +455,7 @@ func (o *Overrides) MaxCacheFreshness(userID string) time.Duration {
}

// MaxQueriersPerUser returns the maximum number of queriers that can handle requests for this user.
func (o *Overrides) MaxQueriersPerUser(userID string) int {
func (o *Overrides) MaxQueriersPerUser(userID string) float64 {
return o.GetOverridesForUser(userID).MaxQueriersPerTenant
}

Expand Down Expand Up @@ -547,7 +547,7 @@ func (o *Overrides) RulerMaxRuleGroupsPerTenant(userID string) int {
}

// StoreGatewayTenantShardSize returns the store-gateway shard size for a given user.
func (o *Overrides) StoreGatewayTenantShardSize(userID string) int {
func (o *Overrides) StoreGatewayTenantShardSize(userID string) float64 {
return o.GetOverridesForUser(userID).StoreGatewayTenantShardSize
}

Expand Down Expand Up @@ -687,12 +687,12 @@ func SmallestPositiveIntPerTenant(tenantIDs []string, f func(string) int) int {
return *result
}

// SmallestPositiveNonZeroIntPerTenant is returning the minimal positive and
// SmallestPositiveNonZeroFloat64PerTenant is returning the minimal positive and
// non-zero value of the supplied limit function for all given tenants. In many
// limits a value of 0 means unlimted so the method will return 0 only if all
// inputs have a limit of 0 or an empty tenant list is given.
func SmallestPositiveNonZeroIntPerTenant(tenantIDs []string, f func(string) int) int {
var result *int
func SmallestPositiveNonZeroFloat64PerTenant(tenantIDs []string, f func(string) float64) float64 {
var result *float64
for _, tenantID := range tenantIDs {
v := f(tenantID)
if v > 0 && (result == nil || v < *result) {
Expand Down
Loading