diff --git a/CHANGELOG.md b/CHANGELOG.md index 2639a810d56..88bdd1e8522 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [ENHANCEMENT] Update Go version to 1.19.3. #4988 * [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976 +* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005 * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 * [FEATURE] Ingester: Add active series to all_user_stats page. #4972 * [FEATURE] Query Frontend: Log query params in query frontend even if error happens. #5005 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index e717e6ec7a7..0f7c425cc39 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -172,11 +172,12 @@ runtime_config: [memberlist: ] query_scheduler: - # Maximum number of outstanding requests per tenant per query-scheduler. - # In-flight requests above this limit will fail with HTTP response status code - # 429. + # Deprecated (use frontend.max-outstanding-requests-per-tenant instead) and + # will be removed in v1.17.0: Maximum number of outstanding requests per + # tenant per query-scheduler. In-flight requests above this limit will fail + # with HTTP response status code 429. # CLI flag: -query-scheduler.max-outstanding-requests-per-tenant - [max_outstanding_requests_per_tenant: | default = 100] + [max_outstanding_requests_per_tenant: | default = 0] # If a querier disconnects without sending notification about graceful # shutdown, the query-scheduler will keep the querier in the tenant's shard @@ -916,10 +917,11 @@ The `query_frontend_config` configures the Cortex query-frontend. # CLI flag: -frontend.query-stats-enabled [query_stats_enabled: | default = false] -# Maximum number of outstanding requests per tenant per frontend; requests -# beyond this error with HTTP 429. +# Deprecated (use frontend.max-outstanding-requests-per-tenant instead) and will +# be removed in v1.17.0: Maximum number of outstanding requests per tenant per +# frontend; requests beyond this error with HTTP 429. # CLI flag: -querier.max-outstanding-requests-per-tenant -[max_outstanding_per_tenant: | default = 100] +[max_outstanding_per_tenant: | default = 0] # If a querier disconnects without sending notification about graceful shutdown, # the query-frontend will keep the querier in the tenant's shard until the @@ -2724,6 +2726,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -frontend.max-queriers-per-tenant [max_queriers_per_tenant: | default = 0] +# Maximum number of outstanding requests per tenant per request queue (either +# query frontend or query scheduler); requests beyond this error with HTTP 429. +# CLI flag: -frontend.max-outstanding-requests-per-tenant +[max_outstanding_requests_per_tenant: | default = 100] + # Duration to delay the evaluation of rules to ensure the underlying metrics # have been pushed to Cortex. # CLI flag: -ruler.evaluation-delay-duration diff --git a/pkg/frontend/frontend_test.go b/pkg/frontend/frontend_test.go index b17606042a2..2ca1d628905 100644 --- a/pkg/frontend/frontend_test.go +++ b/pkg/frontend/frontend_test.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/frontend/transport" + frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" "github.com/cortexproject/cortex/pkg/util/concurrency" @@ -253,7 +254,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand httpListen, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) - rt, v1, v2, err := InitFrontend(config, limits{}, 0, logger, nil) + rt, v1, v2, err := InitFrontend(config, frontendv1.MockLimits{}, 0, logger, nil) require.NoError(t, err) require.NotNil(t, rt) // v1 will be nil if DownstreamURL is defined. @@ -306,11 +307,3 @@ func defaultFrontendConfig() CombinedFrontendConfig { flagext.DefaultValues(&config.FrontendV2) return config } - -type limits struct { - queriers int -} - -func (l limits) MaxQueriersPerUser(_ string) int { - return l.queriers -} diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index 93eaf4b73d1..541e17dc4b4 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -37,13 +37,25 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.") + f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 0, "Deprecated (use frontend.max-outstanding-requests-per-tenant instead) and will be removed in v1.17.0: Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.") f.DurationVar(&cfg.QuerierForgetDelay, "query-frontend.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-frontend will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") } type Limits interface { // Returns max queriers to use per tenant, or 0 if shuffle sharding is disabled. MaxQueriersPerUser(user string) int + + queue.Limits +} + +// MockLimits implements the Limits interface. Used in tests only. +type MockLimits struct { + Queriers int + queue.MockLimits +} + +func (l MockLimits) MaxQueriersPerUser(_ string) int { + return l.Queriers } // Frontend queues HTTP requests, dispatches them to backends, and handles retries @@ -100,7 +112,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist }), } - f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests) + f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests, f.limits) f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics) var err error diff --git a/pkg/frontend/v1/frontend_test.go b/pkg/frontend/v1/frontend_test.go index 7f89d3a1b90..3c512aef800 100644 --- a/pkg/frontend/v1/frontend_test.go +++ b/pkg/frontend/v1/frontend_test.go @@ -116,6 +116,7 @@ func TestFrontendPropagateTrace(t *testing.T) { } func TestFrontendCheckReady(t *testing.T) { + limits := MockLimits{MockLimits: queue.MockLimits{MaxOutstanding: 100}} for _, tt := range []struct { name string connectedClients int @@ -131,6 +132,7 @@ func TestFrontendCheckReady(t *testing.T) { requestQueue: queue.NewRequestQueue(5, 0, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + limits, ), } for i := 0; i < tt.connectedClients; i++ { @@ -243,7 +245,8 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a httpListen, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) - v1, err := New(config, limits{}, logger, reg) + limits := MockLimits{MockLimits: queue.MockLimits{MaxOutstanding: 100}} + v1, err := New(config, limits, logger, reg) require.NoError(t, err) require.NotNil(t, v1) require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1)) @@ -292,11 +295,3 @@ func defaultFrontendConfig() Config { flagext.DefaultValues(&config) return config } - -type limits struct { - queriers int -} - -func (l limits) MaxQueriersPerUser(_ string) int { - return l.queriers -} diff --git a/pkg/frontend/v1/queue_test.go b/pkg/frontend/v1/queue_test.go index 7b72cf02b3a..e58f73176d5 100644 --- a/pkg/frontend/v1/queue_test.go +++ b/pkg/frontend/v1/queue_test.go @@ -16,6 +16,7 @@ import ( "google.golang.org/grpc/metadata" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" + "github.com/cortexproject/cortex/pkg/scheduler/queue" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -23,7 +24,8 @@ import ( func setupFrontend(t *testing.T, config Config) (*Frontend, error) { logger := log.NewNopLogger() - frontend, err := New(config, limits{queriers: 3}, logger, nil) + limits := MockLimits{Queriers: 3, MockLimits: queue.MockLimits{MaxOutstanding: 100}} + frontend, err := New(config, limits, logger, nil) require.NoError(t, err) t.Cleanup(func() { diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 37de027368a..bdaad09ee6d 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -61,9 +61,9 @@ type RequestQueue struct { discardedRequests *prometheus.CounterVec // Per user. } -func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue { +func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, limits Limits) *RequestQueue { q := &RequestQueue{ - queues: newUserQueues(maxOutstandingPerTenant, forgetDelay), + queues: newUserQueues(maxOutstandingPerTenant, forgetDelay, limits), connectedQuerierWorkers: atomic.NewInt32(0), queueLength: queueLength, discardedRequests: discardedRequests, diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index b571322224a..ef02a492f3b 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -26,6 +26,7 @@ func BenchmarkGetNextRequest(b *testing.B) { queue := NewRequestQueue(maxOutstandingPerTenant, 0, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + MockLimits{MaxOutstanding: 100}, ) queues = append(queues, queue) @@ -83,6 +84,7 @@ func BenchmarkQueueRequest(b *testing.B) { q := NewRequestQueue(maxOutstandingPerTenant, 0, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + MockLimits{MaxOutstanding: 100}, ) for ix := 0; ix < queriers; ix++ { @@ -115,7 +117,9 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe queue := NewRequestQueue(1, forgetDelay, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"})) + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + MockLimits{MaxOutstanding: 100}, + ) // Start the queue service. ctx := context.Background() diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index cdad43402bd..0f9cd1a0811 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -8,6 +8,13 @@ import ( "github.com/cortexproject/cortex/pkg/util" ) +// Limits needed for the Query Scheduler - interface used for decoupling. +type Limits interface { + // MaxOutstandingPerTenant returns the limit to the maximum number + // of outstanding requests per tenant per request queue. + MaxOutstandingPerTenant(user string) int +} + // querier holds information about a querier registered in the queue. type querier struct { // Number of active connections. @@ -41,6 +48,8 @@ type queues struct { // Sorted list of querier names, used when creating per-user shard. sortedQueriers []string + + limits Limits } type userQueue struct { @@ -59,7 +68,7 @@ type userQueue struct { index int } -func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration) *queues { +func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limits) *queues { return &queues{ userQueues: map[string]*userQueue{}, users: nil, @@ -67,6 +76,7 @@ func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration) *queues { forgetDelay: forgetDelay, queriers: map[string]*querier{}, sortedQueriers: nil, + limits: limits, } } @@ -106,8 +116,14 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request { uq := q.userQueues[userID] if uq == nil { + queueSize := q.limits.MaxOutstandingPerTenant(userID) + // 0 is the default value of the flag. If the old flag is set + // then we use its value for compatibility reason. + if q.maxUserQueueSize != 0 { + queueSize = q.maxUserQueueSize + } uq = &userQueue{ - ch: make(chan Request, q.maxUserQueueSize), + ch: make(chan Request, queueSize), seed: util.ShuffleShardSeed(userID, ""), index: -1, } @@ -303,3 +319,12 @@ func shuffleQueriersForUser(userSeed int64, queriersToSelect int, allSortedQueri return result } + +// MockLimits implements the Limits interface. Used in tests only. +type MockLimits struct { + MaxOutstanding int +} + +func (l MockLimits) MaxOutstandingPerTenant(_ string) int { + return l.MaxOutstanding +} diff --git a/pkg/scheduler/queue/user_queues_test.go b/pkg/scheduler/queue/user_queues_test.go index 699dd79dfd9..cc986cac416 100644 --- a/pkg/scheduler/queue/user_queues_test.go +++ b/pkg/scheduler/queue/user_queues_test.go @@ -13,7 +13,7 @@ import ( ) func TestQueues(t *testing.T) { - uq := newUserQueues(0, 0) + uq := newUserQueues(0, 0, MockLimits{}) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -68,7 +68,7 @@ func TestQueues(t *testing.T) { } func TestQueuesWithQueriers(t *testing.T) { - uq := newUserQueues(0, 0) + uq := newUserQueues(0, 0, MockLimits{}) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -145,7 +145,7 @@ func TestQueuesConsistency(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - uq := newUserQueues(0, testData.forgetDelay) + uq := newUserQueues(0, testData.forgetDelay, MockLimits{}) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -194,7 +194,7 @@ func TestQueues_ForgetDelay(t *testing.T) { ) now := time.Now() - uq := newUserQueues(0, forgetDelay) + uq := newUserQueues(0, forgetDelay, MockLimits{}) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -286,7 +286,7 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget ) now := time.Now() - uq := newUserQueues(0, forgetDelay) + uq := newUserQueues(0, forgetDelay, MockLimits{}) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 772e60c87a1..115de295c31 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -86,7 +86,7 @@ type Config struct { } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.") + f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 0, "Deprecated (use frontend.max-outstanding-requests-per-tenant instead) and will be removed in v1.17.0: Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.") f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f) } @@ -111,7 +111,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe Name: "cortex_query_scheduler_discarded_requests_total", Help: "Total number of query requests discarded.", }, []string{"user"}) - s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests) + s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests, s.limits) s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_query_scheduler_queue_duration_seconds", @@ -143,6 +143,8 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe type Limits interface { // MaxQueriersPerUser returns max queriers to use per tenant, or 0 if shuffle sharding is disabled. MaxQueriersPerUser(user string) int + + queue.Limits } type schedulerRequest struct { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 14449dc8c45..b2ef66cc651 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -20,7 +20,9 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" + "github.com/cortexproject/cortex/pkg/scheduler/queue" "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/httpgrpcutil" @@ -35,7 +37,7 @@ func setupScheduler(t *testing.T, reg prometheus.Registerer) (*Scheduler, schedu flagext.DefaultValues(&cfg) cfg.MaxOutstandingPerTenant = testMaxOutstandingPerTenant - s, err := NewScheduler(cfg, &limits{queriers: 2}, log.NewNopLogger(), reg) + s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: 100}}, log.NewNopLogger(), reg) require.NoError(t, err) server := grpc.NewServer() @@ -494,14 +496,6 @@ func verifyNoPendingRequestsLeft(t *testing.T, scheduler *Scheduler) { }) } -type limits struct { - queriers int -} - -func (l limits) MaxQueriersPerUser(_ string) int { - return l.queriers -} - type frontendMock struct { mu sync.Mutex resp map[uint64]*httpgrpc.HTTPResponse diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 24869b15506..3f4e67dc36d 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -81,6 +81,9 @@ type Limits struct { MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` QueryVerticalShardSize int `yaml:"query_vertical_shard_size" json:"query_vertical_shard_size" doc:"hidden"` + // Query Frontend / Scheduler enforced limits. + MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant" json:"max_outstanding_requests_per_tenant"` + // Ruler defaults and limits. RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"` RulerTenantShardSize int `yaml:"ruler_tenant_shard_size" json:"ruler_tenant_shard_size"` @@ -161,6 +164,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxQueriersPerTenant, "frontend.max-queriers-per-tenant", 0, "Maximum number of queriers that can handle requests for a single tenant. If set to 0 or value higher than number of available queriers, *all* queriers will handle requests for the tenant. Each frontend (or query-scheduler, if used) will select the same set of queriers for the same tenant (given that all queriers are connected to all frontends / query-schedulers). This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL.") f.IntVar(&l.QueryVerticalShardSize, "frontend.query-vertical-shard-size", 0, "[Experimental] Number of shards to use when distributing shardable PromQL queries.") + f.IntVar(&l.MaxOutstandingPerTenant, "frontend.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per request queue (either query frontend or query scheduler); requests beyond this error with HTTP 429.") + f.Var(&l.RulerEvaluationDelay, "ruler.evaluation-delay-duration", "Duration to delay the evaluation of rules to ensure the underlying metrics have been pushed to Cortex.") f.IntVar(&l.RulerTenantShardSize, "ruler.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by ruler. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") f.IntVar(&l.RulerMaxRulesPerRuleGroup, "ruler.max-rules-per-rule-group", 0, "Maximum number of rules per rule group per-tenant. 0 to disable.") @@ -446,6 +451,12 @@ func (o *Overrides) MaxQueryParallelism(userID string) int { return o.GetOverridesForUser(userID).MaxQueryParallelism } +// MaxOutstandingPerTenant returns the limit to the maximum number +// of outstanding requests per tenant per request queue. +func (o *Overrides) MaxOutstandingPerTenant(userID string) int { + return o.GetOverridesForUser(userID).MaxOutstandingPerTenant +} + // EnforceMetricName whether to enforce the presence of a metric name. func (o *Overrides) EnforceMetricName(userID string) bool { return o.GetOverridesForUser(userID).EnforceMetricName