Skip to content

Commit 71a95f7

Browse files
authored
feat: simplify query related modules (#3320)
* feat: simplify query related modules Signed-off-by: Jacob Lisi <[email protected]> * refactor per PR comments Signed-off-by: Jacob Lisi <[email protected]> * fix rebase import Signed-off-by: Jacob Lisi <[email protected]> * add route explanation diagram Signed-off-by: Jacob Lisi <[email protected]> * chore: address PR comments Signed-off-by: Jacob Lisi <[email protected]> * fix: context package import issue after rebase Signed-off-by: Jacob Lisi <[email protected]>
1 parent ff69357 commit 71a95f7

File tree

6 files changed

+335
-251
lines changed

6 files changed

+335
-251
lines changed

integration/getting_started_single_process_config_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ func TestGettingStartedSingleProcessConfigWithChunksStorage(t *testing.T) {
5252
labelNames, err := c.LabelNames()
5353
require.NoError(t, err)
5454
require.Equal(t, []string{"__name__", "foo"}, labelNames)
55+
56+
// Check that a range query does not return an error to sanity check the queryrange tripperware.
57+
_, err = c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second)
58+
require.NoError(t, err)
5559
}
5660

5761
func TestGettingStartedSingleProcessConfigWithBlocksStorage(t *testing.T) {
@@ -103,4 +107,8 @@ func TestGettingStartedSingleProcessConfigWithBlocksStorage(t *testing.T) {
103107
labelNames, err := c.LabelNames()
104108
require.NoError(t, err)
105109
require.Equal(t, []string{"__name__", "foo"}, labelNames)
110+
111+
// Check that a range query does not return an error to sanity check the queryrange tripperware.
112+
_, err = c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second)
113+
require.NoError(t, err)
106114
}

integration/query_frontend_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ func runQueryFrontendTest(t *testing.T, testMissingMetricName bool, setup queryF
259259
// requests to /metrics and /ready.
260260
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
261261
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
262+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount))
262263

263264
// Ensure no service-specific metrics prefix is used by the wrong service.
264265
assertServiceMetricsPrefixes(t, Distributor, distributor)

pkg/api/api.go

Lines changed: 10 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,16 @@ package api
22

33
import (
44
"context"
5-
"errors"
65
"flag"
76
"net/http"
8-
"regexp"
97
"strings"
108
"time"
119

12-
"github.com/opentracing-contrib/go-stdlib/nethttp"
13-
"github.com/opentracing/opentracing-go"
14-
"github.com/prometheus/client_golang/prometheus"
15-
dto "github.com/prometheus/client_model/go"
16-
1710
"github.com/felixge/fgprof"
1811
"github.com/go-kit/kit/log"
1912
"github.com/go-kit/kit/log/level"
20-
"github.com/gorilla/mux"
21-
"github.com/prometheus/common/route"
22-
"github.com/prometheus/prometheus/config"
23-
"github.com/prometheus/prometheus/promql"
13+
"github.com/prometheus/client_golang/prometheus"
2414
"github.com/prometheus/prometheus/storage"
25-
v1 "github.com/prometheus/prometheus/web/api/v1"
2615
"github.com/weaveworks/common/middleware"
2716
"github.com/weaveworks/common/server"
2817

@@ -104,23 +93,17 @@ func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logge
10493
// RegisterRoute registers a single route enforcing HTTP methods. A single
10594
// route is expected to be specific about which HTTP methods are supported.
10695
func (a *API) RegisterRoute(path string, handler http.Handler, auth bool, method string, methods ...string) {
107-
a.registerRouteWithRouter(a.server.HTTP, path, handler, auth, method, methods...)
108-
}
109-
110-
// RegisterRoute registers a single route to a router, enforcing HTTP methods. A single
111-
// route is expected to be specific about which HTTP methods are supported.
112-
func (a *API) registerRouteWithRouter(router *mux.Router, path string, handler http.Handler, auth bool, method string, methods ...string) {
11396
methods = append([]string{method}, methods...)
11497

11598
level.Debug(a.logger).Log("msg", "api: registering route", "methods", strings.Join(methods, ","), "path", path, "auth", auth)
11699
if auth {
117100
handler = a.authMiddleware.Wrap(handler)
118101
}
119102
if len(methods) == 0 {
120-
router.Path(path).Handler(handler)
103+
a.server.HTTP.Path(path).Handler(handler)
121104
return
122105
}
123-
router.Path(path).Methods(methods...).Handler(handler)
106+
a.server.HTTP.Path(path).Methods(methods...).Handler(handler)
124107
}
125108

126109
func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth bool, methods ...string) {
@@ -135,20 +118,6 @@ func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth
135118
a.server.HTTP.PathPrefix(prefix).Methods(methods...).Handler(handler)
136119
}
137120

138-
// Latest Prometheus requires r.RemoteAddr to be set to addr:port, otherwise it reject the request.
139-
// Requests to Querier sometimes doesn't have that (if they are fetched from Query-Frontend).
140-
// Prometheus uses this when logging queries to QueryLogger, but Cortex doesn't call engine.SetQueryLogger to set one.
141-
//
142-
// Can be removed when (if) https://github.com/prometheus/prometheus/pull/6840 is merged.
143-
func fakeRemoteAddr(handler http.Handler) http.Handler {
144-
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
145-
if r.RemoteAddr == "" {
146-
r.RemoteAddr = "127.0.0.1:8888"
147-
}
148-
handler.ServeHTTP(w, r)
149-
})
150-
}
151-
152121
// RegisterAlertmanager registers endpoints associated with the alertmanager. It will only
153122
// serve endpoints using the legacy http-prefix if it is not run as a single binary.
154123
func (a *API) RegisterAlertmanager(am *alertmanager.MultitenantAlertmanager, target, apiEnabled bool) {
@@ -302,114 +271,22 @@ func (a *API) RegisterCompactor(c *compactor.Compactor) {
302271
a.RegisterRoute("/compactor/ring", http.HandlerFunc(c.RingHandler), false, "GET", "POST")
303272
}
304273

305-
// RegisterQuerier registers the Prometheus routes supported by the
306-
// Cortex querier service. Currently this can not be registered simultaneously
307-
// with the QueryFrontend.
308-
func (a *API) RegisterQuerier(
274+
// RegisterQueryable registers the the default routes associated with the querier
275+
// module.
276+
func (a *API) RegisterQueryable(
309277
queryable storage.SampleAndChunkQueryable,
310-
engine *promql.Engine,
311278
distributor *distributor.Distributor,
312-
registerRoutesExternally bool,
313-
tombstonesLoader *purger.TombstonesLoader,
314-
querierRequestDuration *prometheus.HistogramVec,
315-
receivedMessageSize *prometheus.HistogramVec,
316-
sentMessageSize *prometheus.HistogramVec,
317-
inflightRequests *prometheus.GaugeVec,
318-
) http.Handler {
319-
api := v1.NewAPI(
320-
engine,
321-
errorTranslateQueryable{queryable}, // Translate errors to errors expected by API.
322-
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
323-
func(context.Context) v1.AlertmanagerRetriever { return &querier.DummyAlertmanagerRetriever{} },
324-
func() config.Config { return config.Config{} },
325-
map[string]string{}, // TODO: include configuration flags
326-
v1.GlobalURLOptions{},
327-
func(f http.HandlerFunc) http.HandlerFunc { return f },
328-
nil, // Only needed for admin APIs.
329-
"", // This is for snapshots, which is disabled when admin APIs are disabled. Hence empty.
330-
false, // Disable admin APIs.
331-
a.logger,
332-
func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} },
333-
0, 0, 0, // Remote read samples and concurrency limit.
334-
regexp.MustCompile(".*"),
335-
func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") },
336-
&v1.PrometheusVersion{},
337-
// This is used for the stats API which we should not support. Or find other ways to.
338-
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
339-
)
340-
279+
) {
341280
// these routes are always registered to the default server
342281
a.RegisterRoute("/api/v1/user_stats", http.HandlerFunc(distributor.UserStatsHandler), true, "GET")
343282
a.RegisterRoute("/api/v1/chunks", querier.ChunksHandler(queryable), true, "GET")
344283

345284
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/user_stats", http.HandlerFunc(distributor.UserStatsHandler), true, "GET")
346285
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/chunks", querier.ChunksHandler(queryable), true, "GET")
347-
348-
// these routes are either registered the default server OR to an internal mux. The internal mux is
349-
// for use in a single binary mode when both the query frontend and the querier would attempt to claim these routes
350-
// TODO: Add support to expose querier paths with a configurable prefix in single binary mode.
351-
router := mux.NewRouter()
352-
if registerRoutesExternally {
353-
router = a.server.HTTP
354-
}
355-
356-
// Use a separate metric for the querier in order to differentiate requests from the query-frontend when
357-
// running Cortex as a single binary.
358-
inst := middleware.Instrument{
359-
RouteMatcher: router,
360-
Duration: querierRequestDuration,
361-
RequestBodySize: receivedMessageSize,
362-
ResponseBodySize: sentMessageSize,
363-
InflightRequests: inflightRequests,
364-
}
365-
366-
promRouter := route.New().WithPrefix(a.cfg.ServerPrefix + a.cfg.PrometheusHTTPPrefix + "/api/v1")
367-
api.Register(promRouter)
368-
cacheGenHeaderMiddleware := getHTTPCacheGenNumberHeaderSetterMiddleware(tombstonesLoader)
369-
promHandler := fakeRemoteAddr(inst.Wrap(cacheGenHeaderMiddleware.Wrap(promRouter)))
370-
371-
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/read", querier.RemoteReadHandler(queryable), true, "POST")
372-
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/query", promHandler, true, "GET", "POST")
373-
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/query_range", promHandler, true, "GET", "POST")
374-
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/labels", promHandler, true, "GET", "POST")
375-
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/label/{name}/values", promHandler, true, "GET")
376-
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/series", promHandler, true, "GET", "POST", "DELETE")
377-
//TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
378-
// https://github.com/prometheus/prometheus/pull/7125/files
379-
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/metadata", querier.MetadataHandler(distributor), true, "GET")
380-
381-
legacyPromRouter := route.New().WithPrefix(a.cfg.ServerPrefix + a.cfg.LegacyHTTPPrefix + "/api/v1")
382-
api.Register(legacyPromRouter)
383-
legacyPromHandler := fakeRemoteAddr(inst.Wrap(cacheGenHeaderMiddleware.Wrap(legacyPromRouter)))
384-
385-
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/read", querier.RemoteReadHandler(queryable), true, "POST")
386-
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/query", legacyPromHandler, true, "GET", "POST")
387-
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/query_range", legacyPromHandler, true, "GET", "POST")
388-
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/labels", legacyPromHandler, true, "GET", "POST")
389-
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/label/{name}/values", legacyPromHandler, true, "GET")
390-
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/series", legacyPromHandler, true, "GET", "POST", "DELETE")
391-
//TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
392-
// https://github.com/prometheus/prometheus/pull/7125/files
393-
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/metadata", querier.MetadataHandler(distributor), true, "GET")
394-
395-
// if we have externally registered routes then we need to return the server handler
396-
// so that we continue to use all standard middleware
397-
if registerRoutesExternally {
398-
return a.server.HTTPServer.Handler
399-
}
400-
401-
// Since we have a new router and the request will not go trough the default server
402-
// HTTP middleware stack, we need to add a middleware to extract the trace context
403-
// from the HTTP headers and inject it into the Go context.
404-
return nethttp.MiddlewareFunc(opentracing.GlobalTracer(), router.ServeHTTP, nethttp.OperationNameFunc(func(r *http.Request) string {
405-
return "internalQuerier"
406-
}))
407286
}
408287

409-
// registerQueryAPI registers the Prometheus routes supported by the
410-
// Cortex querier service. Currently this can not be registered simultaneously
411-
// with the Querier.
412-
func (a *API) registerQueryAPI(handler http.Handler) {
288+
// RegisterQueryAPI registers the Prometheus API routes with the provided handler.
289+
func (a *API) RegisterQueryAPI(handler http.Handler) {
413290
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/read", handler, true, "POST")
414291
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/query", handler, true, "GET", "POST")
415292
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/query_range", handler, true, "GET", "POST")
@@ -433,7 +310,7 @@ func (a *API) registerQueryAPI(handler http.Handler) {
433310
// with the Querier.
434311
func (a *API) RegisterQueryFrontend(f *frontend.Frontend) {
435312
frontend.RegisterFrontendServer(a.server.GRPC, f)
436-
a.registerQueryAPI(f.Handler())
313+
a.RegisterQueryAPI(f.Handler())
437314
}
438315

439316
// RegisterServiceMapHandler registers the Cortex structs service handler

pkg/api/handlers.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,34 @@
11
package api
22

33
import (
4+
"context"
45
"html/template"
56
"net/http"
67
"path"
8+
"regexp"
79
"sync"
810

11+
"github.com/go-kit/kit/log"
912
"github.com/go-kit/kit/log/level"
13+
"github.com/gorilla/mux"
14+
"github.com/opentracing-contrib/go-stdlib/nethttp"
15+
"github.com/opentracing/opentracing-go"
16+
"github.com/pkg/errors"
17+
"github.com/prometheus/client_golang/prometheus"
18+
"github.com/prometheus/client_golang/prometheus/promauto"
19+
dto "github.com/prometheus/client_model/go"
20+
"github.com/prometheus/common/route"
21+
"github.com/prometheus/prometheus/config"
22+
"github.com/prometheus/prometheus/promql"
23+
"github.com/prometheus/prometheus/storage"
24+
v1 "github.com/prometheus/prometheus/web/api/v1"
25+
"github.com/weaveworks/common/instrument"
26+
"github.com/weaveworks/common/middleware"
1027
"gopkg.in/yaml.v2"
1128

29+
"github.com/cortexproject/cortex/pkg/chunk/purger"
30+
"github.com/cortexproject/cortex/pkg/distributor"
31+
"github.com/cortexproject/cortex/pkg/querier"
1232
"github.com/cortexproject/cortex/pkg/util"
1333
)
1434

@@ -109,3 +129,106 @@ func configHandler(cfg interface{}) http.HandlerFunc {
109129
}
110130
}
111131
}
132+
133+
// NewQuerierHandler returns a HTTP handler that can be used by the querier service to
134+
// either register with the frontend worker query processor or with the external HTTP
135+
// server to fulfill the Prometheus query API.
136+
func NewQuerierHandler(
137+
cfg Config,
138+
queryable storage.SampleAndChunkQueryable,
139+
engine *promql.Engine,
140+
distributor *distributor.Distributor,
141+
tombstonesLoader *purger.TombstonesLoader,
142+
reg prometheus.Registerer,
143+
logger log.Logger,
144+
) http.Handler {
145+
// Prometheus histograms for requests to the querier.
146+
querierRequestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
147+
Namespace: "cortex",
148+
Name: "querier_request_duration_seconds",
149+
Help: "Time (in seconds) spent serving HTTP requests to the querier.",
150+
Buckets: instrument.DefBuckets,
151+
}, []string{"method", "route", "status_code", "ws"})
152+
153+
receivedMessageSize := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
154+
Namespace: "cortex",
155+
Name: "querier_request_message_bytes",
156+
Help: "Size (in bytes) of messages received in the request to the querier.",
157+
Buckets: middleware.BodySizeBuckets,
158+
}, []string{"method", "route"})
159+
160+
sentMessageSize := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
161+
Namespace: "cortex",
162+
Name: "querier_response_message_bytes",
163+
Help: "Size (in bytes) of messages sent in response by the querier.",
164+
Buckets: middleware.BodySizeBuckets,
165+
}, []string{"method", "route"})
166+
167+
inflightRequests := promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
168+
Namespace: "cortex",
169+
Name: "querier_inflight_requests",
170+
Help: "Current number of inflight requests to the querier.",
171+
}, []string{"method", "route"})
172+
173+
api := v1.NewAPI(
174+
engine,
175+
errorTranslateQueryable{queryable}, // Translate errors to errors expected by API.
176+
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
177+
func(context.Context) v1.AlertmanagerRetriever { return &querier.DummyAlertmanagerRetriever{} },
178+
func() config.Config { return config.Config{} },
179+
map[string]string{}, // TODO: include configuration flags
180+
v1.GlobalURLOptions{},
181+
func(f http.HandlerFunc) http.HandlerFunc { return f },
182+
nil, // Only needed for admin APIs.
183+
"", // This is for snapshots, which is disabled when admin APIs are disabled. Hence empty.
184+
false, // Disable admin APIs.
185+
logger,
186+
func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} },
187+
0, 0, 0, // Remote read samples and concurrency limit.
188+
regexp.MustCompile(".*"),
189+
func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") },
190+
&v1.PrometheusVersion{},
191+
// This is used for the stats API which we should not support. Or find other ways to.
192+
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
193+
)
194+
195+
router := mux.NewRouter()
196+
197+
// Use a separate metric for the querier in order to differentiate requests from the query-frontend when
198+
// running Cortex as a single binary.
199+
inst := middleware.Instrument{
200+
RouteMatcher: router,
201+
Duration: querierRequestDuration,
202+
RequestBodySize: receivedMessageSize,
203+
ResponseBodySize: sentMessageSize,
204+
InflightRequests: inflightRequests,
205+
}
206+
cacheGenHeaderMiddleware := getHTTPCacheGenNumberHeaderSetterMiddleware(tombstonesLoader)
207+
middlewares := middleware.Merge(inst, cacheGenHeaderMiddleware)
208+
router.Use(middlewares.Wrap)
209+
210+
promRouter := route.New().WithPrefix(cfg.ServerPrefix + cfg.PrometheusHTTPPrefix + "/api/v1")
211+
api.Register(promRouter)
212+
213+
legacyPromRouter := route.New().WithPrefix(cfg.ServerPrefix + cfg.LegacyHTTPPrefix + "/api/v1")
214+
api.Register(legacyPromRouter)
215+
216+
//TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
217+
// https://github.com/prometheus/prometheus/pull/7125/files
218+
router.Path(cfg.PrometheusHTTPPrefix + "/api/v1/metadata").Handler(querier.MetadataHandler(distributor))
219+
router.Path(cfg.PrometheusHTTPPrefix + "/api/v1/read").Handler(querier.RemoteReadHandler(queryable))
220+
// A prefix is fine because external routes will be registered explicitly
221+
router.PathPrefix(cfg.PrometheusHTTPPrefix + "/api/v1/").Handler(promRouter)
222+
223+
//TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
224+
// https://github.com/prometheus/prometheus/pull/7125/files
225+
router.Path(cfg.LegacyHTTPPrefix + "/api/v1/metadata").Handler(querier.MetadataHandler(distributor))
226+
router.Path(cfg.LegacyHTTPPrefix + "/api/v1/read").Handler(querier.RemoteReadHandler(queryable))
227+
// A prefix is fine because external routes will be registered explicitly
228+
router.PathPrefix(cfg.LegacyHTTPPrefix + "/api/v1/").Handler(legacyPromRouter)
229+
230+
// Add a middleware to extract the trace context and add a header.
231+
return nethttp.MiddlewareFunc(opentracing.GlobalTracer(), router.ServeHTTP, nethttp.OperationNameFunc(func(r *http.Request) string {
232+
return "internalQuerier"
233+
}))
234+
}

0 commit comments

Comments
 (0)