diff --git a/CHANGELOG.md b/CHANGELOG.md index 6389ddccede..68024a4e7d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - limit for outgoing gRPC messages has changed from 2147483647 to 16777216 bytes - limit for incoming gRPC messages has changed from 4194304 to 104857600 bytes * [FEATURE] Distributor/Ingester: Provide ability to not overflow writes in the presence of a leaving or unhealthy ingester. This allows for more efficient ingester rolling restarts. #3305 +* [FEATURE] Query-frontend: introduced query statistics logged in the query-frontend when enabled via `-frontend.query-stats-enabled=true`. When enabled, the metric `cortex_query_seconds_total` is tracked, counting the sum of the wall time spent across all queriers while running queries (on a per-tenant basis). The metrics `cortex_request_duration_seconds` and `cortex_query_seconds_total` are different: the first one tracks the request duration (eg. HTTP request from the client), while the latter tracks the sum of the wall time on all queriers involved executing the query. #3539 * [ENHANCEMENT] API: Add GZIP HTTP compression to the API responses. Compression can be enabled via `-api.response-compression-enabled`. #3536 * [ENHANCEMENT] Added zone-awareness support on queries. When zone-awareness is enabled, queries will still succeed if all ingesters in a single zone will fail. #3414 * [ENHANCEMENT] Blocks storage ingester: exported more TSDB-related metrics. #3412 diff --git a/Makefile b/Makefile index e93d4dc3a15..1ad5238507f 100644 --- a/Makefile +++ b/Makefile @@ -81,6 +81,7 @@ pkg/ring/ring.pb.go: pkg/ring/ring.proto pkg/frontend/v1/frontendv1pb/frontend.pb.go: pkg/frontend/v1/frontendv1pb/frontend.proto pkg/frontend/v2/frontendv2pb/frontend.pb.go: pkg/frontend/v2/frontendv2pb/frontend.proto pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto +pkg/querier/stats/stats.pb.go: pkg/querier/stats/stats.proto pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto diff --git a/development/tsdb-blocks-storage-s3/config/cortex.yaml b/development/tsdb-blocks-storage-s3/config/cortex.yaml index 7fbe1128938..bd106761e33 100644 --- a/development/tsdb-blocks-storage-s3/config/cortex.yaml +++ b/development/tsdb-blocks-storage-s3/config/cortex.yaml @@ -118,6 +118,9 @@ store_gateway: consul: host: consul:8500 +frontend: + query_stats_enabled: true + frontend_worker: frontend_address: "query-frontend:9007" match_max_concurrent: true diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 91d1d7be7d8..61e18184a1f 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -874,6 +874,12 @@ The `query_frontend_config` configures the Cortex query-frontend. # CLI flag: -frontend.max-body-size [max_body_size: | default = 10485760] +# True to enable query statistics tracking. When enabled, a message with some +# statistics is logged for every query. This configuration option must be set +# both on query-frontend and querier. +# CLI flag: -frontend.query-stats-enabled +[query_stats_enabled: | default = false] + # Maximum number of outstanding requests per tenant per frontend; requests # beyond this error with HTTP 429. # CLI flag: -querier.max-outstanding-requests-per-tenant diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 1eedcd33a37..746b7032789 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -62,3 +62,4 @@ Currently experimental features are: - Distributor: do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`) - Ingester: close idle TSDB and remove them from local disk (`-blocks-storage.tsdb.close-idle-tsdb-timeout`) - Tenant Deletion in Purger, for blocks storage. +- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`) diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 877a273d848..78d9c2a606e 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -8,6 +8,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "sync" "testing" "time" @@ -28,6 +29,7 @@ import ( type queryFrontendTestConfig struct { testMissingMetricName bool querySchedulerEnabled bool + queryStatsEnabled bool setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) } @@ -45,6 +47,21 @@ func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) { }) } +func TestQueryFrontendWithBlocksStorageViaFlagsAndQueryStatsEnabled(t *testing.T) { + runQueryFrontendTest(t, queryFrontendTestConfig{ + testMissingMetricName: false, + queryStatsEnabled: true, + setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + flags = BlocksStorageFlags() + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + return "", flags + }, + }) +} + func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQueryScheduler(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, @@ -60,6 +77,22 @@ func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQueryScheduler(t *testing. }) } +func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQuerySchedulerAndQueryStatsEnabled(t *testing.T) { + runQueryFrontendTest(t, queryFrontendTestConfig{ + testMissingMetricName: false, + querySchedulerEnabled: true, + queryStatsEnabled: true, + setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + flags = BlocksStorageFlags() + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + return "", flags + }, + }) +} + func TestQueryFrontendWithBlocksStorageViaConfigFile(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, @@ -183,6 +216,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { "-querier.split-queries-by-interval": "24h", "-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range "-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-frontend.query-stats-enabled": strconv.FormatBool(cfg.queryStatsEnabled), }) // Start the query-scheduler if enabled. @@ -306,6 +340,16 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount)) require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount)) + // Ensure query stats metrics are tracked only when enabled. + if cfg.queryStatsEnabled { + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Greater(0), + []string{"cortex_query_seconds_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1")))) + } else { + require.NoError(t, queryFrontend.WaitRemovedMetric("cortex_query_seconds_total")) + } + // Ensure no service-specific metrics prefix is used by the wrong service. assertServiceMetricsPrefixes(t, Distributor, distributor) assertServiceMetricsPrefixes(t, Ingester, ingester) diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 9016a1f9134..9afffcc8730 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -29,6 +29,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/distributor" "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util" ) @@ -77,7 +78,7 @@ func (pc *IndexPageContent) GetContent() map[string]map[string]string { return result } -var indexPageTemplate = ` +var indexPageTemplate = ` @@ -242,7 +243,10 @@ func NewQuerierHandler( router.Path(legacyPrefix + "/api/v1/metadata").Methods("GET").Handler(legacyPromRouter) // Add a middleware to extract the trace context and add a header. - return nethttp.MiddlewareFunc(opentracing.GlobalTracer(), router.ServeHTTP, nethttp.OperationNameFunc(func(r *http.Request) string { + handler := nethttp.MiddlewareFunc(opentracing.GlobalTracer(), router.ServeHTTP, nethttp.OperationNameFunc(func(r *http.Request) string { return "internalQuerier" })) + + // Track execution time. + return stats.NewWallTimeMiddleware().Wrap(handler) } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 6f146b224f8..5ec08f5c72e 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -306,6 +306,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) { } t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent + t.Cfg.Worker.QueryStatsEnabled = t.Cfg.Frontend.Handler.QueryStatsEnabled return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util.Logger, prometheus.DefaultRegisterer) } @@ -520,7 +521,7 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) { // Wrap roundtripper into Tripperware. roundTripper = t.QueryFrontendTripperware(roundTripper) - handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util.Logger) + handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util.Logger, prometheus.DefaultRegisterer) if t.Cfg.Frontend.CompressResponses { handler = gziphandler.GzipHandler(handler) } diff --git a/pkg/frontend/frontend_test.go b/pkg/frontend/frontend_test.go index dc2ad654db7..4939a31ea1e 100644 --- a/pkg/frontend/frontend_test.go +++ b/pkg/frontend/frontend_test.go @@ -248,7 +248,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand r.PathPrefix("/").Handler(middleware.Merge( middleware.AuthenticateUser, middleware.Tracer{}, - ).Wrap(transport.NewHandler(config.Handler, rt, logger))) + ).Wrap(transport.NewHandler(config.Handler, rt, logger, nil))) httpServer := http.Server{ Handler: r, diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 40f68ca7810..f3a8d3fc311 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -8,14 +8,19 @@ import ( "io" "io/ioutil" "net/http" + "net/url" "strings" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" ) @@ -34,11 +39,13 @@ var ( type HandlerConfig struct { LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"` MaxBodySize int64 `yaml:"max_body_size"` + QueryStatsEnabled bool `yaml:"query_stats_enabled"` } func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.LogQueriesLongerThan, "frontend.log-queries-longer-than", 0, "Log queries that are slower than the specified duration. Set to 0 to disable. Set to < 0 to enable on all queries.") f.Int64Var(&cfg.MaxBodySize, "frontend.max-body-size", 10*1024*1024, "Max body size for downstream prometheus.") + f.BoolVar(&cfg.QueryStatsEnabled, "frontend.query-stats-enabled", false, "True to enable query statistics tracking. When enabled, a message with some statistics is logged for every query. This configuration option must be set both on query-frontend and querier.") } // Handler accepts queries and forwards them to RoundTripper. It can log slow queries, @@ -47,18 +54,43 @@ type Handler struct { cfg HandlerConfig log log.Logger roundTripper http.RoundTripper + + // Metrics. + querySeconds *prometheus.CounterVec } // New creates a new frontend handler. -func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger) http.Handler { - return &Handler{ +func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) http.Handler { + h := &Handler{ cfg: cfg, log: log, roundTripper: roundTripper, } + + if cfg.QueryStatsEnabled { + h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_seconds_total", + Help: "Total amount of wall clock time spend processing queries.", + }, []string{"user"}) + } + + return h } func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + var ( + stats *querier_stats.Stats + queryString url.Values + ) + + // Initialise the stats in the context and make sure it's propagated + // down the request chain. + if f.cfg.QueryStatsEnabled { + var ctx context.Context + stats, ctx = querier_stats.ContextWithEmptyStats(r.Context()) + r = r.WithContext(ctx) + } + defer func() { _ = r.Body.Close() }() @@ -86,38 +118,73 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // we don't check for copy error as there is no much we can do at this point _, _ = io.Copy(w, resp.Body) - f.reportSlowQuery(queryResponseTime, r, buf) -} + // Check whether we should parse the query string. + shouldReportSlowQuery := f.cfg.LogQueriesLongerThan > 0 && queryResponseTime > f.cfg.LogQueriesLongerThan + if shouldReportSlowQuery || f.cfg.QueryStatsEnabled { + queryString = f.parseRequestQueryString(r, buf) + } -// reportSlowQuery reports slow queries if LogQueriesLongerThan is set to <0, where 0 disables logging -func (f *Handler) reportSlowQuery(queryResponseTime time.Duration, r *http.Request, bodyBuf bytes.Buffer) { - if f.cfg.LogQueriesLongerThan == 0 || queryResponseTime <= f.cfg.LogQueriesLongerThan { - return + if shouldReportSlowQuery { + f.reportSlowQuery(r, queryString, queryResponseTime) } + if f.cfg.QueryStatsEnabled { + f.reportQueryStats(r, queryString, queryResponseTime, stats) + } +} - logMessage := []interface{}{ +// reportSlowQuery reports slow queries. +func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, queryResponseTime time.Duration) { + logMessage := append([]interface{}{ "msg", "slow query detected", "method", r.Method, "host", r.Host, "path", r.URL.Path, "time_taken", queryResponseTime.String(), + }, formatQueryString(queryString)...) + + level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...) +} + +func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.Stats) { + userID, err := tenant.TenantID(r.Context()) + if err != nil { + return } - // use previously buffered body + // Track stats. + f.querySeconds.WithLabelValues(userID).Add(float64(stats.LoadWallTime())) + + // Log stats. + logMessage := append([]interface{}{ + "msg", "query stats", + "method", r.Method, + "path", r.URL.Path, + "response_time", queryResponseTime, + "query_wall_time", stats.LoadWallTime(), + }, formatQueryString(queryString)...) + + level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...) +} + +func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) url.Values { + // Use previously buffered body. r.Body = ioutil.NopCloser(&bodyBuf) // Ensure the form has been parsed so all the parameters are present err := r.ParseForm() if err != nil { - level.Warn(util.WithContext(r.Context(), f.log)).Log("msg", "unable to parse form for request", "err", err) + level.Warn(util.WithContext(r.Context(), f.log)).Log("msg", "unable to parse request form", "err", err) + return nil } - // Attempt to iterate through the Form to log any filled in values - for k, v := range r.Form { - logMessage = append(logMessage, fmt.Sprintf("param_%s", k), strings.Join(v, ",")) - } + return r.Form +} - level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...) +func formatQueryString(queryString url.Values) (fields []interface{}) { + for k, v := range queryString { + fields = append(fields, fmt.Sprintf("param_%s", k), strings.Join(v, ",")) + } + return fields } func writeError(w http.ResponseWriter, err error) { diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index 0077111169c..385ec26e9fa 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -16,6 +16,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/scheduler/queue" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/grpcutil" @@ -185,7 +186,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { // Handle the stream sending & receiving on a goroutine so we can // monitoring the contexts in a select and cancel things appropriately. - resps := make(chan *httpgrpc.HTTPResponse, 1) + resps := make(chan *frontendv1pb.ClientToFrontend, 1) errs := make(chan error, 1) go func() { err = server.Send(&frontendv1pb.FrontendToClient{ @@ -203,7 +204,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { return } - resps <- resp.HttpResponse + resps <- resp }() select { @@ -219,9 +220,14 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { req.err <- err return err - // Happy path: propagate the response. + // Happy path: merge the stats and propagate the response. case resp := <-resps: - req.response <- resp + if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) { + stats := stats.FromContext(req.originalCtx) + stats.Merge(resp.Stats) // Safe if stats is nil. + } + + req.response <- resp.HttpResponse } } } diff --git a/pkg/frontend/v1/frontend_test.go b/pkg/frontend/v1/frontend_test.go index 3890f8d17e0..7d7bfffc252 100644 --- a/pkg/frontend/v1/frontend_test.go +++ b/pkg/frontend/v1/frontend_test.go @@ -217,7 +217,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a r.PathPrefix("/").Handler(middleware.Merge( middleware.AuthenticateUser, middleware.Tracer{}, - ).Wrap(transport.NewHandler(handlerCfg, rt, logger))) + ).Wrap(transport.NewHandler(handlerCfg, rt, logger, nil))) httpServer := http.Server{ Handler: r, diff --git a/pkg/frontend/v1/frontendv1pb/frontend.pb.go b/pkg/frontend/v1/frontendv1pb/frontend.pb.go index 0729cc1df15..2fae54a845a 100644 --- a/pkg/frontend/v1/frontendv1pb/frontend.pb.go +++ b/pkg/frontend/v1/frontendv1pb/frontend.pb.go @@ -9,6 +9,7 @@ package frontendv1pb import ( context "context" fmt "fmt" + stats "github.com/cortexproject/cortex/pkg/querier/stats" _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" httpgrpc "github.com/weaveworks/common/httpgrpc" @@ -109,6 +110,7 @@ func (m *FrontendToClient) GetType() Type { type ClientToFrontend struct { HttpResponse *httpgrpc.HTTPResponse `protobuf:"bytes,1,opt,name=httpResponse,proto3" json:"httpResponse,omitempty"` ClientID string `protobuf:"bytes,2,opt,name=clientID,proto3" json:"clientID,omitempty"` + Stats *stats.Stats `protobuf:"bytes,3,opt,name=stats,proto3" json:"stats,omitempty"` } func (m *ClientToFrontend) Reset() { *m = ClientToFrontend{} } @@ -157,6 +159,13 @@ func (m *ClientToFrontend) GetClientID() string { return "" } +func (m *ClientToFrontend) GetStats() *stats.Stats { + if m != nil { + return m.Stats + } + return nil +} + func init() { proto.RegisterEnum("frontend.Type", Type_name, Type_value) proto.RegisterType((*FrontendToClient)(nil), "frontend.FrontendToClient") @@ -166,30 +175,34 @@ func init() { func init() { proto.RegisterFile("frontend.proto", fileDescriptor_eca3873955a29cfe) } var fileDescriptor_eca3873955a29cfe = []byte{ - // 364 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x91, 0xc1, 0x4e, 0xea, 0x40, - 0x14, 0x86, 0x67, 0x6e, 0x08, 0x97, 0x3b, 0x10, 0xd2, 0x4c, 0x72, 0x6f, 0x48, 0x17, 0x13, 0xd2, - 0xdc, 0x05, 0x31, 0xb1, 0x55, 0x34, 0x31, 0x71, 0x89, 0x20, 0xb2, 0xc3, 0x52, 0x37, 0x6e, 0x88, - 0xad, 0x43, 0x41, 0xa5, 0xa7, 0xb6, 0x03, 0x84, 0x9d, 0x8f, 0xe0, 0x63, 0xf8, 0x28, 0x2e, 0x59, - 0xb2, 0x94, 0x61, 0xe3, 0x92, 0x47, 0x30, 0x4c, 0xa1, 0x22, 0xbb, 0xf3, 0xe7, 0xff, 0xcf, 0xff, - 0x9d, 0x76, 0x48, 0xb1, 0x17, 0x41, 0x20, 0x78, 0x70, 0x6f, 0x86, 0x11, 0x08, 0xa0, 0xb9, 0xad, - 0xd6, 0x0f, 0xfd, 0x81, 0xe8, 0x8f, 0x5c, 0xd3, 0x83, 0xa1, 0xe5, 0x83, 0x0f, 0x96, 0x0a, 0xb8, - 0xa3, 0x9e, 0x52, 0x4a, 0xa8, 0x29, 0x59, 0xd4, 0x4f, 0x77, 0xe2, 0x13, 0x7e, 0x37, 0xe6, 0x13, - 0x88, 0x1e, 0x63, 0xcb, 0x83, 0xe1, 0x10, 0x02, 0xab, 0x2f, 0x44, 0xe8, 0x47, 0xa1, 0x97, 0x0e, - 0xc9, 0x96, 0x01, 0x44, 0xbb, 0xdc, 0x00, 0x1d, 0xb8, 0x78, 0x1a, 0xf0, 0x40, 0xd0, 0x33, 0x92, - 0x5f, 0xa7, 0x6c, 0xfe, 0x3c, 0xe2, 0xb1, 0x28, 0xe1, 0x32, 0xae, 0xe4, 0xab, 0x7f, 0xcd, 0x74, - 0xf3, 0xca, 0x71, 0xda, 0x1b, 0xd3, 0xde, 0x4d, 0x52, 0x83, 0x64, 0xc4, 0x34, 0xe4, 0xa5, 0x5f, - 0x65, 0x5c, 0x29, 0x56, 0x8b, 0x66, 0xfa, 0x69, 0xce, 0x34, 0xe4, 0xb6, 0xf2, 0x8c, 0x07, 0xa2, - 0x25, 0x18, 0x07, 0xb6, 0x60, 0x7a, 0x4e, 0x0a, 0x49, 0x4d, 0x1c, 0x42, 0x10, 0xf3, 0x0d, 0xf1, - 0xdf, 0x3e, 0x31, 0x71, 0xed, 0x1f, 0x59, 0xaa, 0x93, 0x9c, 0xa7, 0xfa, 0x5a, 0x75, 0xc5, 0xfd, - 0x63, 0xa7, 0xfa, 0xe0, 0x3f, 0xc9, 0xac, 0xc9, 0x54, 0x23, 0x85, 0x75, 0x43, 0xd7, 0x6e, 0x5c, - 0xdf, 0x34, 0x3a, 0x8e, 0x86, 0x28, 0x21, 0xd9, 0x66, 0xc3, 0xe9, 0xb6, 0xea, 0x1a, 0xae, 0x76, - 0x48, 0x2e, 0xbd, 0xa4, 0x49, 0x7e, 0xb7, 0x23, 0xf0, 0x78, 0x1c, 0x53, 0xfd, 0xfb, 0xfc, 0xfd, - 0x83, 0xf5, 0x1d, 0x6f, 0xff, 0xef, 0x19, 0xa8, 0x82, 0x8f, 0x70, 0xad, 0x36, 0x5b, 0x30, 0x34, - 0x5f, 0x30, 0xb4, 0x5a, 0x30, 0xfc, 0x22, 0x19, 0x7e, 0x93, 0x0c, 0xbf, 0x4b, 0x86, 0x67, 0x92, - 0xe1, 0x0f, 0xc9, 0xf0, 0xa7, 0x64, 0x68, 0x25, 0x19, 0x7e, 0x5d, 0x32, 0x34, 0x5b, 0x32, 0x34, - 0x5f, 0x32, 0x74, 0x5b, 0xd8, 0xd6, 0x8e, 0x8f, 0x43, 0xd7, 0xcd, 0xaa, 0x27, 0x3a, 0xf9, 0x0a, - 0x00, 0x00, 0xff, 0xff, 0x85, 0xf3, 0xb6, 0x2a, 0x23, 0x02, 0x00, 0x00, + // 419 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x91, 0x41, 0x6f, 0xd3, 0x30, + 0x14, 0xc7, 0x6d, 0x18, 0xa3, 0x78, 0x51, 0x15, 0x59, 0x02, 0x55, 0x39, 0x58, 0x53, 0xc4, 0xa1, + 0x42, 0x22, 0x81, 0x82, 0x84, 0x84, 0xc4, 0x65, 0xac, 0x8c, 0xdd, 0x46, 0x1a, 0x2e, 0x5c, 0xa6, + 0x25, 0x78, 0x59, 0x19, 0xcd, 0xf3, 0x6c, 0xa7, 0xa5, 0x37, 0x3e, 0x01, 0xe2, 0x63, 0xf0, 0x51, + 0x38, 0xf6, 0xd8, 0x23, 0x4d, 0x2f, 0x1c, 0xfb, 0x11, 0x50, 0xec, 0x34, 0x64, 0xbd, 0x58, 0xfe, + 0xeb, 0xff, 0xde, 0xfb, 0xbd, 0xbf, 0x4d, 0xba, 0x97, 0x12, 0x72, 0xcd, 0xf3, 0xcf, 0x81, 0x90, + 0xa0, 0x81, 0x76, 0xb6, 0xda, 0x7b, 0x9a, 0x8d, 0xf5, 0x55, 0x91, 0x04, 0x29, 0x4c, 0xc2, 0x0c, + 0x32, 0x08, 0x4d, 0x41, 0x52, 0x5c, 0x1a, 0x65, 0x84, 0xb9, 0xd9, 0x46, 0xef, 0x65, 0xab, 0x7c, + 0xc6, 0x2f, 0xa6, 0x7c, 0x06, 0xf2, 0x5a, 0x85, 0x29, 0x4c, 0x26, 0x90, 0x87, 0x57, 0x5a, 0x8b, + 0x4c, 0x8a, 0xb4, 0xb9, 0xd4, 0x5d, 0x6f, 0x5a, 0x5d, 0x29, 0x48, 0xcd, 0xbf, 0x09, 0x09, 0x5f, + 0x78, 0xaa, 0x6b, 0x15, 0x8a, 0xeb, 0x2c, 0xbc, 0x29, 0xb8, 0x1c, 0x73, 0x19, 0x2a, 0x7d, 0xa1, + 0x95, 0x3d, 0x6d, 0xbb, 0x0f, 0xc4, 0x7d, 0x57, 0xef, 0x1b, 0xc3, 0xdb, 0xaf, 0x63, 0x9e, 0x6b, + 0xfa, 0x8a, 0x1c, 0x54, 0x90, 0x88, 0xdf, 0x14, 0x5c, 0xe9, 0x1e, 0x3e, 0xc4, 0xfd, 0x83, 0xc1, + 0xc3, 0xa0, 0x01, 0xbf, 0x8f, 0xe3, 0xb3, 0xda, 0x8c, 0xda, 0x95, 0xd4, 0x27, 0x7b, 0x7a, 0x2e, + 0x78, 0xef, 0xce, 0x21, 0xee, 0x77, 0x07, 0xdd, 0xa0, 0x79, 0x99, 0x78, 0x2e, 0x78, 0x64, 0x3c, + 0xff, 0x07, 0x26, 0xae, 0xe5, 0xc4, 0xb0, 0x25, 0xd3, 0xd7, 0xc4, 0xb1, 0x73, 0x94, 0x80, 0x5c, + 0xf1, 0x1a, 0xf9, 0x68, 0x17, 0x69, 0xdd, 0xe8, 0x56, 0x2d, 0xf5, 0x48, 0x27, 0x35, 0xf3, 0x4e, + 0x8f, 0x0d, 0xf8, 0x41, 0xd4, 0x68, 0xea, 0x93, 0x7b, 0x26, 0x6c, 0xef, 0xae, 0x19, 0xe8, 0x04, + 0x36, 0xfa, 0xa8, 0x3a, 0x23, 0x6b, 0x3d, 0x79, 0x4c, 0xf6, 0xaa, 0xf5, 0xa8, 0x4b, 0x9c, 0x8a, + 0x72, 0x1e, 0x0d, 0x3f, 0x7c, 0x1c, 0x8e, 0x62, 0x17, 0x51, 0x42, 0xf6, 0x4f, 0x86, 0xf1, 0xf9, + 0xe9, 0xb1, 0x8b, 0x07, 0x23, 0xd2, 0x69, 0xb6, 0x3d, 0x21, 0xf7, 0xcf, 0x24, 0xa4, 0x5c, 0x29, + 0xea, 0xfd, 0xcf, 0xb8, 0x1b, 0xca, 0x6b, 0x79, 0xbb, 0x4f, 0xec, 0xa3, 0x3e, 0x7e, 0x86, 0x8f, + 0x8e, 0x16, 0x2b, 0x86, 0x96, 0x2b, 0x86, 0x36, 0x2b, 0x86, 0xbf, 0x97, 0x0c, 0xff, 0x2a, 0x19, + 0xfe, 0x5d, 0x32, 0xbc, 0x28, 0x19, 0xfe, 0x53, 0x32, 0xfc, 0xb7, 0x64, 0x68, 0x53, 0x32, 0xfc, + 0x73, 0xcd, 0xd0, 0x62, 0xcd, 0xd0, 0x72, 0xcd, 0xd0, 0x27, 0x67, 0x3b, 0x76, 0xfa, 0x5c, 0x24, + 0xc9, 0xbe, 0xf9, 0xc7, 0x17, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0xe0, 0xa2, 0x48, 0x34, 0x87, + 0x02, 0x00, 0x00, } func (x Type) String() string { @@ -251,6 +264,9 @@ func (this *ClientToFrontend) Equal(that interface{}) bool { if this.ClientID != that1.ClientID { return false } + if !this.Stats.Equal(that1.Stats) { + return false + } return true } func (this *FrontendToClient) GoString() string { @@ -270,12 +286,15 @@ func (this *ClientToFrontend) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 7) s = append(s, "&frontendv1pb.ClientToFrontend{") if this.HttpResponse != nil { s = append(s, "HttpResponse: "+fmt.Sprintf("%#v", this.HttpResponse)+",\n") } s = append(s, "ClientID: "+fmt.Sprintf("%#v", this.ClientID)+",\n") + if this.Stats != nil { + s = append(s, "Stats: "+fmt.Sprintf("%#v", this.Stats)+",\n") + } s = append(s, "}") return strings.Join(s, "") } @@ -464,6 +483,18 @@ func (m *ClientToFrontend) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Stats != nil { + { + size, err := m.Stats.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintFrontend(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } if len(m.ClientID) > 0 { i -= len(m.ClientID) copy(dAtA[i:], m.ClientID) @@ -527,6 +558,10 @@ func (m *ClientToFrontend) Size() (n int) { if l > 0 { n += 1 + l + sovFrontend(uint64(l)) } + if m.Stats != nil { + l = m.Stats.Size() + n += 1 + l + sovFrontend(uint64(l)) + } return n } @@ -554,6 +589,7 @@ func (this *ClientToFrontend) String() string { s := strings.Join([]string{`&ClientToFrontend{`, `HttpResponse:` + strings.Replace(fmt.Sprintf("%v", this.HttpResponse), "HTTPResponse", "httpgrpc.HTTPResponse", 1) + `,`, `ClientID:` + fmt.Sprintf("%v", this.ClientID) + `,`, + `Stats:` + strings.Replace(fmt.Sprintf("%v", this.Stats), "Stats", "stats.Stats", 1) + `,`, `}`, }, "") return s @@ -771,6 +807,42 @@ func (m *ClientToFrontend) Unmarshal(dAtA []byte) error { } m.ClientID = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Stats", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFrontend + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthFrontend + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthFrontend + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Stats == nil { + m.Stats = &stats.Stats{} + } + if err := m.Stats.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipFrontend(dAtA[iNdEx:]) diff --git a/pkg/frontend/v1/frontendv1pb/frontend.proto b/pkg/frontend/v1/frontendv1pb/frontend.proto index 1d63ce2b950..c801993578a 100644 --- a/pkg/frontend/v1/frontendv1pb/frontend.proto +++ b/pkg/frontend/v1/frontendv1pb/frontend.proto @@ -8,6 +8,7 @@ option go_package = "frontendv1pb"; import "github.com/gogo/protobuf/gogoproto/gogo.proto"; import "github.com/weaveworks/common/httpgrpc/httpgrpc.proto"; +import "github.com/cortexproject/cortex/pkg/querier/stats/stats.proto"; option (gogoproto.marshaler_all) = true; option (gogoproto.unmarshaler_all) = true; @@ -31,4 +32,5 @@ message FrontendToClient { message ClientToFrontend { httpgrpc.HTTPResponse httpResponse = 1; string clientID = 2; + stats.Stats stats = 3; } diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 4165b34ca32..da5942883c6 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -19,6 +19,7 @@ import ( "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/grpcclient" @@ -79,7 +80,7 @@ type frontendRequest struct { cancel context.CancelFunc enqueue chan enqueueResult - response chan *httpgrpc.HTTPResponse + response chan *frontendv2pb.QueryResultRequest } type enqueueStatus int @@ -178,7 +179,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) // Buffer of 1 to ensure response or error can be written to the channel // even if this goroutine goes away due to client context cancellation. enqueue: make(chan enqueueResult, 1), - response: make(chan *httpgrpc.HTTPResponse, 1), + response: make(chan *frontendv2pb.QueryResultRequest, 1), } f.requests.put(freq) @@ -228,7 +229,12 @@ enqueueAgain: return nil, ctx.Err() case resp := <-freq.response: - return resp, nil + if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) { + stats := stats.FromContext(ctx) + stats.Merge(resp.Stats) // Safe if stats is nil. + } + + return resp.HttpResponse, nil } } @@ -244,7 +250,7 @@ func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryRes // To avoid mixing results from different queries, we randomize queryID counter on start. if req != nil && req.userID == userID { select { - case req.response <- qrReq.HttpResponse: + case req.response <- qrReq: // Should always be possible, unless QueryResult is called multiple times with the same queryID. default: level.Warn(f.log).Log("msg", "failed to write query result to the response channel", "queryID", qrReq.QueryID, "user", userID) diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index 2e5f7875a0f..577a0d27abf 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -12,6 +12,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc" + "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" @@ -285,16 +286,20 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro case schedulerpb.ERROR: req.enqueue <- enqueueResult{status: waitForResponse} - req.response <- &httpgrpc.HTTPResponse{ - Code: http.StatusInternalServerError, - Body: []byte(err.Error()), + req.response <- &frontendv2pb.QueryResultRequest{ + HttpResponse: &httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Body: []byte(err.Error()), + }, } case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT: req.enqueue <- enqueueResult{status: waitForResponse} - req.response <- &httpgrpc.HTTPResponse{ - Code: http.StatusTooManyRequests, - Body: []byte("too many outstanding requests"), + req.response <- &frontendv2pb.QueryResultRequest{ + HttpResponse: &httpgrpc.HTTPResponse{ + Code: http.StatusTooManyRequests, + Body: []byte("too many outstanding requests"), + }, } } diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index 9a28e644576..bfbdfd64dd7 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" @@ -87,6 +88,7 @@ func sendResponseWithDelay(f *Frontend, delay time.Duration, userID string, quer _, _ = f.QueryResult(ctx, &frontendv2pb.QueryResultRequest{ QueryID: queryID, HttpResponse: resp, + Stats: &stats.Stats{}, }) } diff --git a/pkg/frontend/v2/frontendv2pb/frontend.pb.go b/pkg/frontend/v2/frontendv2pb/frontend.pb.go index 940134933f0..daeb50a8c3b 100644 --- a/pkg/frontend/v2/frontendv2pb/frontend.pb.go +++ b/pkg/frontend/v2/frontendv2pb/frontend.pb.go @@ -6,6 +6,7 @@ package frontendv2pb import ( context "context" fmt "fmt" + stats "github.com/cortexproject/cortex/pkg/querier/stats" _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" httpgrpc "github.com/weaveworks/common/httpgrpc" @@ -33,6 +34,7 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type QueryResultRequest struct { QueryID uint64 `protobuf:"varint,1,opt,name=queryID,proto3" json:"queryID,omitempty"` HttpResponse *httpgrpc.HTTPResponse `protobuf:"bytes,2,opt,name=httpResponse,proto3" json:"httpResponse,omitempty"` + Stats *stats.Stats `protobuf:"bytes,3,opt,name=stats,proto3" json:"stats,omitempty"` } func (m *QueryResultRequest) Reset() { *m = QueryResultRequest{} } @@ -81,6 +83,13 @@ func (m *QueryResultRequest) GetHttpResponse() *httpgrpc.HTTPResponse { return nil } +func (m *QueryResultRequest) GetStats() *stats.Stats { + if m != nil { + return m.Stats + } + return nil +} + type QueryResultResponse struct { } @@ -124,26 +133,29 @@ func init() { func init() { proto.RegisterFile("frontend.proto", fileDescriptor_eca3873955a29cfe) } var fileDescriptor_eca3873955a29cfe = []byte{ - // 298 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4b, 0x2b, 0xca, 0xcf, - 0x2b, 0x49, 0xcd, 0x4b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x81, 0xf1, 0xcb, 0x8c, - 0x0a, 0x92, 0xa4, 0x74, 0xd3, 0x33, 0x4b, 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0xd3, - 0xf3, 0xd3, 0xf3, 0xf5, 0xc1, 0x8a, 0x92, 0x4a, 0xd3, 0xc0, 0x3c, 0x30, 0x07, 0xcc, 0x82, 0x68, - 0x96, 0x32, 0x41, 0x52, 0x5e, 0x9e, 0x9a, 0x58, 0x96, 0x5a, 0x9e, 0x5f, 0x94, 0x5d, 0xac, 0x9f, - 0x9c, 0x9f, 0x9b, 0x9b, 0x9f, 0xa7, 0x9f, 0x51, 0x52, 0x52, 0x90, 0x5e, 0x54, 0x90, 0x0c, 0x67, - 0x40, 0x74, 0x29, 0x65, 0x71, 0x09, 0x05, 0x96, 0xa6, 0x16, 0x55, 0x06, 0xa5, 0x16, 0x97, 0xe6, - 0x94, 0x04, 0xa5, 0x16, 0x96, 0xa6, 0x16, 0x97, 0x08, 0x49, 0x70, 0xb1, 0x17, 0x82, 0x44, 0x3d, - 0x5d, 0x24, 0x18, 0x15, 0x18, 0x35, 0x58, 0x82, 0x60, 0x5c, 0x21, 0x2b, 0x2e, 0x1e, 0x90, 0x09, - 0x41, 0xa9, 0xc5, 0x05, 0xf9, 0x79, 0xc5, 0xa9, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xdc, 0x46, 0x62, - 0x7a, 0x70, 0x63, 0x3d, 0x42, 0x42, 0x02, 0x60, 0xb2, 0x41, 0x28, 0x6a, 0x95, 0x44, 0xb9, 0x84, - 0x51, 0xec, 0x82, 0x08, 0x1b, 0x65, 0x71, 0x09, 0xb9, 0x41, 0xfd, 0xed, 0x96, 0x5f, 0x04, 0x52, - 0x91, 0x99, 0x5a, 0x24, 0x14, 0xc2, 0xc5, 0x8d, 0xa4, 0x58, 0x48, 0x41, 0x0f, 0x39, 0x6c, 0xf4, - 0x30, 0xdd, 0x2c, 0xa5, 0x88, 0x47, 0x05, 0xd4, 0x01, 0x0c, 0x4e, 0x4e, 0x17, 0x1e, 0xca, 0x31, - 0xdc, 0x78, 0x28, 0xc7, 0xf0, 0xe1, 0xa1, 0x1c, 0x63, 0xc3, 0x23, 0x39, 0xc6, 0x15, 0x8f, 0xe4, - 0x18, 0x4f, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x17, 0x8f, - 0xe4, 0x18, 0x3e, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x86, 0x1b, - 0x8f, 0xe5, 0x18, 0xa2, 0x50, 0xe2, 0x25, 0x89, 0x0d, 0x1c, 0x72, 0xc6, 0x80, 0x00, 0x00, 0x00, - 0xff, 0xff, 0xeb, 0x00, 0x1d, 0x52, 0xbe, 0x01, 0x00, 0x00, + // 351 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x91, 0xcd, 0x4e, 0x3a, 0x31, + 0x14, 0xc5, 0xdb, 0xff, 0xdf, 0x8f, 0xa4, 0x10, 0x17, 0x35, 0x9a, 0x09, 0x8b, 0x06, 0x67, 0xc5, + 0xc6, 0x69, 0x82, 0xae, 0x4c, 0xdc, 0x10, 0x43, 0x74, 0x27, 0x23, 0x2b, 0x77, 0xcc, 0x58, 0x86, + 0x0f, 0x99, 0x96, 0xb6, 0x03, 0xb2, 0xf3, 0x09, 0x8c, 0x8f, 0xe1, 0xa3, 0xb8, 0x64, 0xc9, 0x52, + 0xca, 0xc6, 0x25, 0x8f, 0x60, 0x68, 0x81, 0x0c, 0x31, 0x71, 0xd3, 0xdc, 0x93, 0x7b, 0x7e, 0xb9, + 0xe7, 0xde, 0xa2, 0xa3, 0xb6, 0xe4, 0xa9, 0x66, 0xe9, 0x53, 0x20, 0x24, 0xd7, 0x1c, 0x17, 0x37, + 0x7a, 0x54, 0x15, 0x51, 0xe9, 0x3c, 0xe9, 0xea, 0x4e, 0x16, 0x05, 0x31, 0x1f, 0xd0, 0x84, 0x27, + 0x9c, 0x5a, 0x53, 0x94, 0xb5, 0xad, 0xb2, 0xc2, 0x56, 0x0e, 0x2e, 0x5d, 0xe6, 0xec, 0x63, 0xd6, + 0x1a, 0xb1, 0x31, 0x97, 0x7d, 0x45, 0x63, 0x3e, 0x18, 0xf0, 0x94, 0x76, 0xb4, 0x16, 0x89, 0x14, + 0xf1, 0xb6, 0x58, 0x53, 0xd7, 0x39, 0x2a, 0xe6, 0x52, 0xb3, 0x17, 0x21, 0x79, 0x8f, 0xc5, 0x7a, + 0xad, 0xa8, 0xe8, 0x27, 0x74, 0x98, 0x31, 0xd9, 0x65, 0x92, 0x2a, 0xdd, 0xd2, 0xca, 0xbd, 0x0e, + 0xf7, 0xdf, 0x20, 0xc2, 0x8d, 0x8c, 0xc9, 0x49, 0xc8, 0x54, 0xf6, 0xac, 0x43, 0x36, 0xcc, 0x98, + 0xd2, 0xd8, 0x43, 0x87, 0x2b, 0x66, 0x72, 0x77, 0xe3, 0xc1, 0x32, 0xac, 0xec, 0x85, 0x1b, 0x89, + 0xaf, 0x50, 0x71, 0x95, 0x20, 0x64, 0x4a, 0xf0, 0x54, 0x31, 0xef, 0x5f, 0x19, 0x56, 0x0a, 0xd5, + 0xd3, 0x60, 0x1b, 0xeb, 0xb6, 0xd9, 0xbc, 0xdf, 0x74, 0xc3, 0x1d, 0x2f, 0xf6, 0xd1, 0xbe, 0x9d, + 0xed, 0xfd, 0xb7, 0x50, 0x31, 0x70, 0x49, 0x1e, 0x56, 0x6f, 0xe8, 0x5a, 0xfe, 0x09, 0x3a, 0xde, + 0xc9, 0xe3, 0xd0, 0x6a, 0x0f, 0xe1, 0xfa, 0xfa, 0xb6, 0x75, 0x2e, 0x1b, 0x6e, 0x1f, 0xdc, 0x44, + 0x85, 0x9c, 0x19, 0x97, 0x83, 0xfc, 0xfd, 0x83, 0xdf, 0x7b, 0x95, 0xce, 0xfe, 0x70, 0xb8, 0x49, + 0x3e, 0xa8, 0xd5, 0xa6, 0x73, 0x02, 0x66, 0x73, 0x02, 0x96, 0x73, 0x02, 0x5f, 0x0d, 0x81, 0x1f, + 0x86, 0xc0, 0x4f, 0x43, 0xe0, 0xd4, 0x10, 0xf8, 0x65, 0x08, 0xfc, 0x36, 0x04, 0x2c, 0x0d, 0x81, + 0xef, 0x0b, 0x02, 0xa6, 0x0b, 0x02, 0x66, 0x0b, 0x02, 0x1e, 0x77, 0xfe, 0x3e, 0x3a, 0xb0, 0xe7, + 0xbd, 0xf8, 0x09, 0x00, 0x00, 0xff, 0xff, 0x02, 0xb0, 0x28, 0xb5, 0x22, 0x02, 0x00, 0x00, } func (this *QueryResultRequest) Equal(that interface{}) bool { @@ -171,6 +183,9 @@ func (this *QueryResultRequest) Equal(that interface{}) bool { if !this.HttpResponse.Equal(that1.HttpResponse) { return false } + if !this.Stats.Equal(that1.Stats) { + return false + } return true } func (this *QueryResultResponse) Equal(that interface{}) bool { @@ -198,12 +213,15 @@ func (this *QueryResultRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 7) s = append(s, "&frontendv2pb.QueryResultRequest{") s = append(s, "QueryID: "+fmt.Sprintf("%#v", this.QueryID)+",\n") if this.HttpResponse != nil { s = append(s, "HttpResponse: "+fmt.Sprintf("%#v", this.HttpResponse)+",\n") } + if this.Stats != nil { + s = append(s, "Stats: "+fmt.Sprintf("%#v", this.Stats)+",\n") + } s = append(s, "}") return strings.Join(s, "") } @@ -325,6 +343,18 @@ func (m *QueryResultRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Stats != nil { + { + size, err := m.Stats.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintFrontend(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } if m.HttpResponse != nil { { size, err := m.HttpResponse.MarshalToSizedBuffer(dAtA[:i]) @@ -392,6 +422,10 @@ func (m *QueryResultRequest) Size() (n int) { l = m.HttpResponse.Size() n += 1 + l + sovFrontend(uint64(l)) } + if m.Stats != nil { + l = m.Stats.Size() + n += 1 + l + sovFrontend(uint64(l)) + } return n } @@ -417,6 +451,7 @@ func (this *QueryResultRequest) String() string { s := strings.Join([]string{`&QueryResultRequest{`, `QueryID:` + fmt.Sprintf("%v", this.QueryID) + `,`, `HttpResponse:` + strings.Replace(fmt.Sprintf("%v", this.HttpResponse), "HTTPResponse", "httpgrpc.HTTPResponse", 1) + `,`, + `Stats:` + strings.Replace(fmt.Sprintf("%v", this.Stats), "Stats", "stats.Stats", 1) + `,`, `}`, }, "") return s @@ -522,6 +557,42 @@ func (m *QueryResultRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Stats", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFrontend + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthFrontend + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthFrontend + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Stats == nil { + m.Stats = &stats.Stats{} + } + if err := m.Stats.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipFrontend(dAtA[iNdEx:]) diff --git a/pkg/frontend/v2/frontendv2pb/frontend.proto b/pkg/frontend/v2/frontendv2pb/frontend.proto index 367f352afd3..b93106d7873 100644 --- a/pkg/frontend/v2/frontendv2pb/frontend.proto +++ b/pkg/frontend/v2/frontendv2pb/frontend.proto @@ -6,6 +6,7 @@ option go_package = "frontendv2pb"; import "github.com/gogo/protobuf/gogoproto/gogo.proto"; import "github.com/weaveworks/common/httpgrpc/httpgrpc.proto"; +import "github.com/cortexproject/cortex/pkg/querier/stats/stats.proto"; option (gogoproto.marshaler_all) = true; option (gogoproto.unmarshaler_all) = true; @@ -18,6 +19,7 @@ service FrontendForQuerier { message QueryResultRequest { uint64 queryID = 1; httpgrpc.HTTPResponse httpResponse = 2; + stats.Stats stats = 3; // There is no userID field here, because Querier puts userID into the context when // calling QueryResult, and that is where Frontend expects to find it. diff --git a/pkg/querier/chunk_store_queryable_test.go b/pkg/querier/chunk_store_queryable_test.go index 76cc8074243..20a4cbf072d 100644 --- a/pkg/querier/chunk_store_queryable_test.go +++ b/pkg/querier/chunk_store_queryable_test.go @@ -25,7 +25,7 @@ func TestChunkQueryable(t *testing.T) { t.Run(fmt.Sprintf("%s/%s/%s", testcase.name, encoding.name, query.query), func(t *testing.T) { store, from := makeMockChunkStore(t, 24, encoding.e) queryable := newChunkStoreQueryable(store, testcase.f) - testQuery(t, queryable, from, query) + testRangeQuery(t, queryable, from, query) }) } } diff --git a/pkg/querier/querier_benchmark_test.go b/pkg/querier/querier_benchmark_test.go index 03767394c31..25784e30a52 100644 --- a/pkg/querier/querier_benchmark_test.go +++ b/pkg/querier/querier_benchmark_test.go @@ -20,7 +20,7 @@ func BenchmarkChunkQueryable(b *testing.B) { var r *promql.Result for n := 0; n < b.N; n++ { queryable := newChunkStoreQueryable(store, testcase.f) - r = testQuery(b, queryable, from, query) + r = testRangeQuery(b, queryable, from, query) } result = r }) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 466fde36ed7..5b7c574659a 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -161,7 +161,7 @@ func TestQuerier(t *testing.T) { queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore)), UseAlwaysQueryable(db)} queryable, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil) - testQuery(t, queryable, through, query) + testRangeQuery(t, queryable, through, query) }) } } @@ -702,7 +702,7 @@ func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *moc return result } -func testQuery(t testing.TB, queryable storage.Queryable, end model.Time, q query) *promql.Result { +func testRangeQuery(t testing.TB, queryable storage.Queryable, end model.Time, q query) *promql.Result { dir, err := ioutil.TempDir("", "test_query") assert.NoError(t, err) defer os.RemoveAll(dir) diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go new file mode 100644 index 00000000000..0de38e08833 --- /dev/null +++ b/pkg/querier/stats/stats.go @@ -0,0 +1,62 @@ +package stats + +import ( + "context" + "sync/atomic" //lint:ignore faillint we can't use go.uber.org/atomic with a protobuf struct without wrapping it. + "time" + + "github.com/weaveworks/common/httpgrpc" +) + +type contextKey int + +var ctxKey = contextKey(0) + +// ContextWithEmptyStats returns a context with empty stats. +func ContextWithEmptyStats(ctx context.Context) (*Stats, context.Context) { + stats := &Stats{} + ctx = context.WithValue(ctx, ctxKey, stats) + return stats, ctx +} + +// FromContext gets the Stats out of the Context. Returns nil if stats have not +// been initialised in the context. +func FromContext(ctx context.Context) *Stats { + o := ctx.Value(ctxKey) + if o == nil { + return nil + } + return o.(*Stats) +} + +// AddWallTime adds some time to the counter. +func (s *Stats) AddWallTime(t time.Duration) { + if s == nil { + return + } + + atomic.AddInt64((*int64)(&s.WallTime), int64(t)) +} + +// LoadWallTime returns current wall time. +func (s *Stats) LoadWallTime() time.Duration { + if s == nil { + return 0 + } + + return time.Duration(atomic.LoadInt64((*int64)(&s.WallTime))) +} + +// Merge the provide Stats into this one. +func (s *Stats) Merge(other *Stats) { + if s == nil || other == nil { + return + } + + s.AddWallTime(other.LoadWallTime()) +} + +func ShouldTrackHTTPGRPCResponse(r *httpgrpc.HTTPResponse) bool { + // Do no track statistics for requests failed because of a server error. + return r.Code < 500 +} diff --git a/pkg/querier/stats/stats.pb.go b/pkg/querier/stats/stats.pb.go new file mode 100644 index 00000000000..b9ec9a49bad --- /dev/null +++ b/pkg/querier/stats/stats.pb.go @@ -0,0 +1,414 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: stats.proto + +package stats + +import ( + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" + _ "github.com/golang/protobuf/ptypes/duration" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strings "strings" + time "time" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf +var _ = time.Kitchen + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type Stats struct { + // The sum of all wall time spent in the querier to execute the query. + WallTime time.Duration `protobuf:"bytes,1,opt,name=wall_time,json=wallTime,proto3,stdduration" json:"wall_time"` +} + +func (m *Stats) Reset() { *m = Stats{} } +func (*Stats) ProtoMessage() {} +func (*Stats) Descriptor() ([]byte, []int) { + return fileDescriptor_b4756a0aec8b9d44, []int{0} +} +func (m *Stats) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Stats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Stats.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Stats) XXX_Merge(src proto.Message) { + xxx_messageInfo_Stats.Merge(m, src) +} +func (m *Stats) XXX_Size() int { + return m.Size() +} +func (m *Stats) XXX_DiscardUnknown() { + xxx_messageInfo_Stats.DiscardUnknown(m) +} + +var xxx_messageInfo_Stats proto.InternalMessageInfo + +func (m *Stats) GetWallTime() time.Duration { + if m != nil { + return m.WallTime + } + return 0 +} + +func init() { + proto.RegisterType((*Stats)(nil), "stats.Stats") +} + +func init() { proto.RegisterFile("stats.proto", fileDescriptor_b4756a0aec8b9d44) } + +var fileDescriptor_b4756a0aec8b9d44 = []byte{ + // 213 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0x2e, 0x49, 0x2c, + 0x29, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x73, 0xa4, 0x74, 0xd3, 0x33, 0x4b, + 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0xd3, 0xf3, 0xd3, 0xf3, 0xf5, 0xc1, 0xb2, 0x49, + 0xa5, 0x69, 0x60, 0x1e, 0x98, 0x03, 0x66, 0x41, 0x74, 0x49, 0xc9, 0xa5, 0xe7, 0xe7, 0xa7, 0xe7, + 0xa4, 0x22, 0x54, 0xa5, 0x94, 0x16, 0x25, 0x96, 0x64, 0xe6, 0xe7, 0x41, 0xe4, 0x95, 0x3c, 0xb9, + 0x58, 0x83, 0x41, 0xe6, 0x0a, 0x39, 0x70, 0x71, 0x96, 0x27, 0xe6, 0xe4, 0xc4, 0x97, 0x64, 0xe6, + 0xa6, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x1b, 0x49, 0xea, 0x41, 0x34, 0xeb, 0xc1, 0x34, 0xeb, + 0xb9, 0x40, 0x35, 0x3b, 0x71, 0x9c, 0xb8, 0x27, 0xcf, 0x30, 0xe3, 0xbe, 0x3c, 0x63, 0x10, 0x07, + 0x48, 0x57, 0x48, 0x66, 0x6e, 0xaa, 0x93, 0xf5, 0x85, 0x87, 0x72, 0x0c, 0x37, 0x1e, 0xca, 0x31, + 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48, 0x8e, 0x71, 0xc5, 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, + 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0xf1, 0xc5, 0x23, 0x39, 0x86, 0x0f, 0x8f, + 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, 0x8e, 0xe1, 0xc6, 0x63, 0x39, 0x86, 0x28, + 0x88, 0xb7, 0x92, 0xd8, 0xc0, 0x76, 0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x2d, 0xc4, 0x26, + 0x5d, 0xf3, 0x00, 0x00, 0x00, +} + +func (this *Stats) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Stats) + if !ok { + that2, ok := that.(Stats) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.WallTime != that1.WallTime { + return false + } + return true +} +func (this *Stats) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&stats.Stats{") + s = append(s, "WallTime: "+fmt.Sprintf("%#v", this.WallTime)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringStats(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func (m *Stats) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Stats) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + n1, err1 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.WallTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.WallTime):]) + if err1 != nil { + return 0, err1 + } + i -= n1 + i = encodeVarintStats(dAtA, i, uint64(n1)) + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} + +func encodeVarintStats(dAtA []byte, offset int, v uint64) int { + offset -= sovStats(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *Stats) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.WallTime) + n += 1 + l + sovStats(uint64(l)) + return n +} + +func sovStats(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozStats(x uint64) (n int) { + return sovStats(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *Stats) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Stats{`, + `WallTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.WallTime), "Duration", "duration.Duration", 1), `&`, ``, 1) + `,`, + `}`, + }, "") + return s +} +func valueToStringStats(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *Stats) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Stats: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Stats: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field WallTime", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStats + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&m.WallTime, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipStats(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthStats + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthStats + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipStats(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowStats + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowStats + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowStats + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthStats + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthStats + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowStats + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipStats(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthStats + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthStats = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowStats = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/querier/stats/stats.proto b/pkg/querier/stats/stats.proto new file mode 100644 index 00000000000..3ec55448af7 --- /dev/null +++ b/pkg/querier/stats/stats.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package stats; + +option go_package = "stats"; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "google/protobuf/duration.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + +message Stats { + // The sum of all wall time spent in the querier to execute the query. + google.protobuf.Duration wall_time = 1 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; +} diff --git a/pkg/querier/stats/time_middleware.go b/pkg/querier/stats/time_middleware.go new file mode 100644 index 00000000000..d91cae6b45e --- /dev/null +++ b/pkg/querier/stats/time_middleware.go @@ -0,0 +1,29 @@ +package stats + +import ( + "net/http" + "time" +) + +// WallTimeMiddleware tracks the wall time. +type WallTimeMiddleware struct{} + +// NewWallTimeMiddleware makes a new WallTimeMiddleware. +func NewWallTimeMiddleware() WallTimeMiddleware { + return WallTimeMiddleware{} +} + +// Wrap implements middleware.Interface. +func (m WallTimeMiddleware) Wrap(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + stats := FromContext(r.Context()) + if stats == nil { + next.ServeHTTP(w, r) + return + } + + startTime := time.Now() + next.ServeHTTP(w, r) + stats.AddWallTime(time.Since(startTime)) + }) +} diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index 318f66cb218..89bd6967168 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -12,6 +12,8 @@ import ( "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" + "github.com/cortexproject/cortex/pkg/querier/stats" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util" ) @@ -24,18 +26,20 @@ var ( func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger) processor { return &frontendProcessor{ - log: log, - handler: handler, - maxMessageSize: cfg.GRPCClientConfig.GRPC.MaxSendMsgSize, - querierID: cfg.QuerierID, + log: log, + handler: handler, + maxMessageSize: cfg.GRPCClientConfig.GRPC.MaxSendMsgSize, + querierID: cfg.QuerierID, + queryStatsEnabled: cfg.QueryStatsEnabled, } } // Handles incoming queries from frontend. type frontendProcessor struct { - handler RequestHandler - maxMessageSize int - querierID string + handler RequestHandler + maxMessageSize int + querierID string + queryStatsEnabled bool log log.Logger } @@ -82,8 +86,11 @@ func (fp *frontendProcessor) process(c frontendv1pb.Frontend_ProcessClient) erro // and cancel the query. We don't actually handle queries in parallel // here, as we're running in lock step with the server - each Recv is // paired with a Send. - go fp.runRequest(ctx, request.HttpRequest, func(response *httpgrpc.HTTPResponse) error { - return c.Send(&frontendv1pb.ClientToFrontend{HttpResponse: response}) + go fp.runRequest(ctx, request.HttpRequest, func(response *httpgrpc.HTTPResponse, stats *stats.Stats) error { + return c.Send(&frontendv1pb.ClientToFrontend{ + HttpResponse: response, + Stats: stats, + }) }) case frontendv1pb.GET_ID: @@ -98,7 +105,12 @@ func (fp *frontendProcessor) process(c frontendv1pb.Frontend_ProcessClient) erro } } -func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.HTTPRequest, sendHTTPResponse func(response *httpgrpc.HTTPResponse) error) { +func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.HTTPRequest, sendHTTPResponse func(response *httpgrpc.HTTPResponse, stats *stats.Stats) error) { + var stats *querier_stats.Stats + if fp.queryStatsEnabled { + stats, ctx = querier_stats.ContextWithEmptyStats(ctx) + } + response, err := fp.handler.Handle(ctx, request) if err != nil { var ok bool @@ -121,7 +133,7 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H level.Error(fp.log).Log("msg", "error processing query", "err", errMsg) } - if err := sendHTTPResponse(response); err != nil { + if err := sendHTTPResponse(response, stats); err != nil { level.Error(fp.log).Log("msg", "error processing requests", "err", err) } } diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index 55b0c6d84e4..c92da21f704 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -20,6 +20,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/ring/client" "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" "github.com/cortexproject/cortex/pkg/util" @@ -31,11 +32,12 @@ import ( func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) { p := &schedulerProcessor{ - log: log, - handler: handler, - maxMessageSize: cfg.GRPCClientConfig.GRPC.MaxSendMsgSize, - querierID: cfg.QuerierID, - grpcConfig: cfg.GRPCClientConfig, + log: log, + handler: handler, + maxMessageSize: cfg.GRPCClientConfig.GRPC.MaxSendMsgSize, + querierID: cfg.QuerierID, + grpcConfig: cfg.GRPCClientConfig, + queryStatsEnabled: cfg.QueryStatsEnabled, frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_querier_query_frontend_request_duration_seconds", @@ -61,11 +63,12 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r // Handles incoming queries from query-scheduler. type schedulerProcessor struct { - log log.Logger - handler RequestHandler - grpcConfig grpcclient.ConfigWithTLS - maxMessageSize int - querierID string + log log.Logger + handler RequestHandler + grpcConfig grpcclient.ConfigWithTLS + maxMessageSize int + querierID string + queryStatsEnabled bool frontendPool *client.Pool frontendClientRequestDuration *prometheus.HistogramVec @@ -140,6 +143,11 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer } func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, queryID uint64, frontendAddress string, request *httpgrpc.HTTPRequest) { + var stats *querier_stats.Stats + if sp.queryStatsEnabled { + stats, ctx = querier_stats.ContextWithEmptyStats(ctx) + } + response, err := sp.handler.Handle(ctx, request) if err != nil { var ok bool @@ -169,6 +177,7 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, _, err = c.(frontendv2pb.FrontendForQuerierClient).QueryResult(ctx, &frontendv2pb.QueryResultRequest{ QueryID: queryID, HttpResponse: response, + Stats: stats, }) } if err != nil { diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index 289aaca11e1..ac47c11a55f 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -31,6 +31,9 @@ type Config struct { QuerierID string `yaml:"id"` GRPCClientConfig grpcclient.ConfigWithTLS `yaml:"grpc_client_config"` + + // The following config is injected internally. + QueryStatsEnabled bool `yaml:"-"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) {