From 80c05d0c298be6a2f201eed2ec6e49312e68b666 Mon Sep 17 00:00:00 2001 From: "Xiaochao Dong (@damnever)" Date: Thu, 3 Aug 2023 14:47:18 +0800 Subject: [PATCH 1/2] Re-enqueue 429 requests if there are multiple query-schedulers Signed-off-by: Xiaochao Dong (@damnever) --- pkg/frontend/v2/frontend.go | 4 ++++ pkg/frontend/v2/frontend_scheduler_worker.go | 16 ++++++++++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 31ef2e95336..5df02f542b8 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -86,6 +86,8 @@ type frontendRequest struct { enqueue chan enqueueResult response chan *frontendv2pb.QueryResultRequest + + retryOnTooManyOutstandingRequests bool } type enqueueStatus int @@ -192,6 +194,8 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) // even if this goroutine goes away due to client context cancellation. enqueue: make(chan enqueueResult, 1), response: make(chan *frontendv2pb.QueryResultRequest, 1), + + retryOnTooManyOutstandingRequests: f.schedulerWorkers.getWorkersCount() > 0, } f.requests.put(freq) diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index 1b8b45ce46c..bbe1e2ed746 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -296,12 +296,16 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro } case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT: - req.enqueue <- enqueueResult{status: waitForResponse} - req.response <- &frontendv2pb.QueryResultRequest{ - HttpResponse: &httpgrpc.HTTPResponse{ - Code: http.StatusTooManyRequests, - Body: []byte("too many outstanding requests"), - }, + if req.retryOnTooManyOutstandingRequests { + req.enqueue <- enqueueResult{status: failed} + } else { + req.enqueue <- enqueueResult{status: waitForResponse} + req.response <- &frontendv2pb.QueryResultRequest{ + HttpResponse: &httpgrpc.HTTPResponse{ + Code: http.StatusTooManyRequests, + Body: []byte("too many outstanding requests"), + }, + } } } From 979ff3ef151c67b75586ae5afe88a882be3aa883 Mon Sep 17 00:00:00 2001 From: "Xiaochao Dong (@damnever)" Date: Thu, 3 Aug 2023 18:45:37 +0800 Subject: [PATCH 2/2] Add feature flag Signed-off-by: Xiaochao Dong (@damnever) --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 5 +++++ pkg/frontend/v2/frontend.go | 12 +++++++----- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85512f3cf8e..90f371b0b68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ * [FEATURE] Store Gateway: Add `-store-gateway.sharding-ring.keep-instance-in-the-ring-on-shutdown` to skip unregistering instance from the ring in shutdown. #5421 * [FEATURE] Ruler: Support for filtering rules in the API. #5417 * [FEATURE] Compactor: Add `-compactor.ring.tokens-file-path` to store generated tokens locally. #5432 +* [FEATURE] Query Frontend: Add `-frontend.retry-on-too-many-outstanding-requests` to re-enqueue 429 requests if there are multiple query-schedulers available. #5496 * [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319 * [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292 * [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 502031dce1b..723fe6629a9 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3528,6 +3528,11 @@ grpc_client_config: # CLI flag: -frontend.grpc-client-config.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] +# When multiple query-schedulers are available, re-enqueue queries that were +# rejected due to too many outstanding requests. +# CLI flag: -frontend.retry-on-too-many-outstanding-requests +[retry_on_too_many_outstanding_requests: | default = false] + # Name of network interface to read address from. This address is sent to # query-scheduler and querier, which uses it to send the query response back to # query-frontend. diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 5df02f542b8..3781eb9f334 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -30,10 +30,11 @@ import ( // Config for a Frontend. type Config struct { - SchedulerAddress string `yaml:"scheduler_address"` - DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"` - WorkerConcurrency int `yaml:"scheduler_worker_concurrency"` - GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + SchedulerAddress string `yaml:"scheduler_address"` + DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"` + WorkerConcurrency int `yaml:"scheduler_worker_concurrency"` + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + RetryOnTooManyOutstandingRequests bool `yaml:"retry_on_too_many_outstanding_requests"` // Used to find local IP address, that is sent to scheduler and querier-worker. InfNames []string `yaml:"instance_interface_names"` @@ -47,6 +48,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.SchedulerAddress, "frontend.scheduler-address", "", "DNS hostname used for finding query-schedulers.") f.DurationVar(&cfg.DNSLookupPeriod, "frontend.scheduler-dns-lookup-period", 10*time.Second, "How often to resolve the scheduler-address, in order to look for new query-scheduler instances.") f.IntVar(&cfg.WorkerConcurrency, "frontend.scheduler-worker-concurrency", 5, "Number of concurrent workers forwarding queries to single query-scheduler.") + f.BoolVar(&cfg.RetryOnTooManyOutstandingRequests, "frontend.retry-on-too-many-outstanding-requests", false, "When multiple query-schedulers are available, re-enqueue queries that were rejected due to too many outstanding requests.") cfg.InfNames = []string{"eth0", "en0"} f.Var((*flagext.StringSlice)(&cfg.InfNames), "frontend.instance-interface-names", "Name of network interface to read address from. This address is sent to query-scheduler and querier, which uses it to send the query response back to query-frontend.") @@ -195,7 +197,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) enqueue: make(chan enqueueResult, 1), response: make(chan *frontendv2pb.QueryResultRequest, 1), - retryOnTooManyOutstandingRequests: f.schedulerWorkers.getWorkersCount() > 0, + retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1, } f.requests.put(freq)