Skip to content

Commit cc8e02c

Browse files
Query frontend single binary (#2437)
* Added query frontend to the single binary Signed-off-by: Joe Elliott <[email protected]> * First pass single binary Signed-off-by: Joe Elliott <[email protected]> * Pass parameter correctly Signed-off-by: Joe Elliott <[email protected]> * lint Signed-off-by: Joe Elliott <[email protected]> * Added label tests to single binary Signed-off-by: Joe Elliott <[email protected]> * Added a warning for bad single process config Signed-off-by: Joe Elliott <[email protected]> * Added auto config of worker Signed-off-by: Joe Elliott <[email protected]> * Always add auth middleware. Signed-off-by: Joe Elliott <[email protected]> * Fixed label tests and used address that worked in e2e tests Signed-off-by: Joe Elliott <[email protected]> * updated changelog Signed-off-by: Joe Elliott <[email protected]> * Run unbounded queries to fetch label names/values in the blocks storage Signed-off-by: Marco Pracucci <[email protected]> * Moved automatic worker config attempt above worker Signed-off-by: Joe Elliott <[email protected]> * Moved changelog entry and expanded Signed-off-by: Joe Elliott <[email protected]> * Change => Enhancement Signed-off-by: Joe Elliott <[email protected]> * Added todo Signed-off-by: Joe Elliott <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent c7bc4fb commit cc8e02c

File tree

7 files changed

+103
-38
lines changed

7 files changed

+103
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
* [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392
2222
* [ENHANCEMENT] Added `cortex_distributor_latest_seen_sample_timestamp_seconds` metric to see how far behind Prometheus servers are in sending data. #2371
2323
* [ENHANCEMENT] FIFO cache to support eviction based on memory usage. The `-<prefix>.fifocache.size` CLI flag has been renamed to `-<prefix>.fifocache.max-size-items` as well as its YAML config option `size` renamed to `max_size_items`. Added `-<prefix>.fifocache.max-size-bytes` CLI flag and YAML config option `max_size_bytes` to specify memory limit of the cache. #2319
24+
* [ENHANCEMENT] Single Binary: Added query-frontend to the single binary. Single binary users will now benefit from various query-frontend features. Primarily: sharding, parallelization, load shedding, additional caching (if configured), and query retries. #2437
2425
* [ENHANCEMENT] Allow 1w (where w denotes week) and 1y (where y denotes year) when setting `-store.cache-lookups-older-than` and `-store.max-look-back-period`. #2454
2526
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
2627
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400

integration/e2e/util.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,20 @@ func TimeToMilliseconds(t time.Time) int64 {
6868
return int64(math.Round(float64(t.UnixNano()) / 1000000))
6969
}
7070

71-
func GenerateSeries(name string, ts time.Time) (series []prompb.TimeSeries, vector model.Vector) {
71+
func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label) (series []prompb.TimeSeries, vector model.Vector) {
7272
tsMillis := TimeToMilliseconds(ts)
7373
value := rand.Float64()
7474

75-
// Generate the series
76-
series = append(series, prompb.TimeSeries{
77-
Labels: []prompb.Label{
75+
lbls := append(
76+
[]prompb.Label{
7877
{Name: labels.MetricName, Value: name},
7978
},
79+
additionalLabels...,
80+
)
81+
82+
// Generate the series
83+
series = append(series, prompb.TimeSeries{
84+
Labels: lbls,
8085
Samples: []prompb.Sample{
8186
{Value: value, Timestamp: tsMillis},
8287
},
@@ -85,6 +90,9 @@ func GenerateSeries(name string, ts time.Time) (series []prompb.TimeSeries, vect
8590
// Generate the expected vector when querying it
8691
metric := model.Metric{}
8792
metric[labels.MetricName] = model.LabelValue(name)
93+
for _, lbl := range additionalLabels {
94+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
95+
}
8896

8997
vector = append(vector, &model.Sample{
9098
Metric: metric,

integration/e2ecortex/client.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,18 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
112112
return value, err
113113
}
114114

115+
// LabelValues gets label values
116+
func (c *Client) LabelValues(label string) (model.LabelValues, error) {
117+
value, _, err := c.querierClient.LabelValues(context.Background(), label)
118+
return value, err
119+
}
120+
121+
// LabelNames gets label names
122+
func (c *Client) LabelNames() ([]string, error) {
123+
value, _, err := c.querierClient.LabelNames(context.Background())
124+
return value, err
125+
}
126+
115127
type addOrgIDRoundTripper struct {
116128
orgID string
117129
next http.RoundTripper

integration/getting_started_single_process_config_test.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/prometheus/common/model"
12+
"github.com/prometheus/prometheus/prompb"
1213
"github.com/stretchr/testify/assert"
1314
"github.com/stretchr/testify/require"
1415

@@ -38,7 +39,7 @@ func TestGettingStartedSingleProcessConfigWithChunksStorage(t *testing.T) {
3839

3940
// Push some series to Cortex.
4041
now := time.Now()
41-
series, expectedVector := generateSeries("series_1", now)
42+
series, expectedVector := generateSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"})
4243

4344
res, err := c.Push(series)
4445
require.NoError(t, err)
@@ -49,6 +50,14 @@ func TestGettingStartedSingleProcessConfigWithChunksStorage(t *testing.T) {
4950
require.NoError(t, err)
5051
require.Equal(t, model.ValVector, result.Type())
5152
assert.Equal(t, expectedVector, result.(model.Vector))
53+
54+
labelValues, err := c.LabelValues("foo")
55+
require.NoError(t, err)
56+
require.Equal(t, model.LabelValues{"bar"}, labelValues)
57+
58+
labelNames, err := c.LabelNames()
59+
require.NoError(t, err)
60+
require.Equal(t, []string{"__name__", "foo"}, labelNames)
5261
}
5362

5463
func TestGettingStartedSingleProcessConfigWithBlocksStorage(t *testing.T) {
@@ -82,7 +91,7 @@ func TestGettingStartedSingleProcessConfigWithBlocksStorage(t *testing.T) {
8291

8392
// Push some series to Cortex.
8493
now := time.Now()
85-
series, expectedVector := generateSeries("series_1", now)
94+
series, expectedVector := generateSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"})
8695

8796
res, err := c.Push(series)
8897
require.NoError(t, err)
@@ -93,4 +102,12 @@ func TestGettingStartedSingleProcessConfigWithBlocksStorage(t *testing.T) {
93102
require.NoError(t, err)
94103
require.Equal(t, model.ValVector, result.Type())
95104
assert.Equal(t, expectedVector, result.(model.Vector))
105+
106+
labelValues, err := c.LabelValues("foo")
107+
require.NoError(t, err)
108+
require.Equal(t, model.LabelValues{"bar"}, labelValues)
109+
110+
labelNames, err := c.LabelNames()
111+
require.NoError(t, err)
112+
require.Equal(t, []string{"__name__", "foo"}, labelNames)
96113
}

pkg/api/api.go

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"github.com/weaveworks/common/middleware"
1818
"github.com/weaveworks/common/server"
1919

20+
"github.com/gorilla/mux"
21+
2022
"github.com/cortexproject/cortex/pkg/alertmanager"
2123
"github.com/cortexproject/cortex/pkg/chunk/purger"
2224
"github.com/cortexproject/cortex/pkg/compactor"
@@ -79,15 +81,19 @@ func New(cfg Config, s *server.Server, logger log.Logger) (*API, error) {
7981
}
8082

8183
func (a *API) RegisterRoute(path string, handler http.Handler, auth bool, methods ...string) {
84+
a.registerRouteWithRouter(a.server.HTTP, path, handler, auth, methods...)
85+
}
86+
87+
func (a *API) registerRouteWithRouter(router *mux.Router, path string, handler http.Handler, auth bool, methods ...string) {
8288
level.Debug(a.logger).Log("msg", "api: registering route", "methods", strings.Join(methods, ","), "path", path, "auth", auth)
8389
if auth {
8490
handler = a.authMiddleware.Wrap(handler)
8591
}
8692
if len(methods) == 0 {
87-
a.server.HTTP.Path(path).Handler(handler)
93+
router.Path(path).Handler(handler)
8894
return
8995
}
90-
a.server.HTTP.Path(path).Methods(methods...).Handler(handler)
96+
router.Path(path).Methods(methods...).Handler(handler)
9197
}
9298

9399
func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth bool, methods ...string) {
@@ -236,7 +242,7 @@ func (a *API) RegisterCompactor(c *compactor.Compactor) {
236242
// RegisterQuerier registers the Prometheus routes supported by the
237243
// Cortex querier service. Currently this can not be registered simultaneously
238244
// with the QueryFrontend.
239-
func (a *API) RegisterQuerier(queryable storage.Queryable, engine *promql.Engine, distributor *distributor.Distributor) {
245+
func (a *API) RegisterQuerier(queryable storage.Queryable, engine *promql.Engine, distributor *distributor.Distributor, registerRoutesExternally bool) http.Handler {
240246
api := v1.NewAPI(
241247
engine,
242248
queryable,
@@ -255,36 +261,46 @@ func (a *API) RegisterQuerier(queryable storage.Queryable, engine *promql.Engine
255261
&v1.PrometheusVersion{},
256262
)
257263

258-
promRouter := route.New().WithPrefix(a.cfg.ServerPrefix + a.cfg.PrometheusHTTPPrefix + "/api/v1")
259-
api.Register(promRouter)
260-
promHandler := fakeRemoteAddr(promRouter)
261-
262-
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/read", querier.RemoteReadHandler(queryable), true, "GET")
263-
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/query", promHandler, true, "GET", "POST")
264-
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/query_range", promHandler, true, "GET", "POST")
265-
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/labels", promHandler, true, "GET", "POST")
266-
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/label/{name}/values", promHandler, true, "GET")
267-
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/series", promHandler, true, "GET", "POST", "DELETE")
268-
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/metadata", promHandler, true, "GET")
269-
264+
// these routes are always registered to the default server
270265
a.RegisterRoute("/api/v1/user_stats", http.HandlerFunc(distributor.UserStatsHandler), true)
271266
a.RegisterRoute("/api/v1/chunks", querier.ChunksHandler(queryable), true)
272267

273-
// Legacy Routes
274268
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/user_stats", http.HandlerFunc(distributor.UserStatsHandler), true)
275269
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/chunks", querier.ChunksHandler(queryable), true)
276270

271+
// these routes are either registered the default server OR to an internal mux. The internal mux is
272+
// for use in a single binary mode when both the query frontend and the querier would attempt to claim these routes
273+
// TODO: Add support to expose querier paths with a configurable prefix in single binary mode.
274+
router := mux.NewRouter()
275+
if registerRoutesExternally {
276+
router = a.server.HTTP
277+
}
278+
279+
promRouter := route.New().WithPrefix(a.cfg.ServerPrefix + a.cfg.PrometheusHTTPPrefix + "/api/v1")
280+
api.Register(promRouter)
281+
promHandler := fakeRemoteAddr(promRouter)
282+
283+
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/read", querier.RemoteReadHandler(queryable), true, "GET")
284+
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/query", promHandler, true, "GET", "POST")
285+
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/query_range", promHandler, true, "GET", "POST")
286+
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/labels", promHandler, true, "GET", "POST")
287+
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/label/{name}/values", promHandler, true, "GET")
288+
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/series", promHandler, true, "GET", "POST", "DELETE")
289+
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/metadata", promHandler, true, "GET")
290+
277291
legacyPromRouter := route.New().WithPrefix(a.cfg.ServerPrefix + a.cfg.LegacyHTTPPrefix + "/api/v1")
278292
api.Register(legacyPromRouter)
279293
legacyPromHandler := fakeRemoteAddr(legacyPromRouter)
280294

281-
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/read", querier.RemoteReadHandler(queryable), true, "GET")
282-
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/query", legacyPromHandler, true, "GET", "POST")
283-
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/query_range", legacyPromHandler, true, "GET", "POST")
284-
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/labels", legacyPromHandler, true, "GET", "POST")
285-
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/label/{name}/values", legacyPromHandler, true, "GET")
286-
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/series", legacyPromHandler, true, "GET", "POST", "DELETE")
287-
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/metadata", legacyPromHandler, true, "GET")
295+
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/read", querier.RemoteReadHandler(queryable), true, "GET")
296+
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/query", legacyPromHandler, true, "GET", "POST")
297+
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/query_range", legacyPromHandler, true, "GET", "POST")
298+
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/labels", legacyPromHandler, true, "GET", "POST")
299+
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/label/{name}/values", legacyPromHandler, true, "GET")
300+
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/series", legacyPromHandler, true, "GET", "POST", "DELETE")
301+
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/metadata", legacyPromHandler, true, "GET")
302+
303+
return router
288304
}
289305

290306
// RegisterQueryFrontend registers the Prometheus routes supported by the

pkg/cortex/modules.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,21 @@ func (t *Cortex) initQuerier(cfg *Config) (serv services.Service, err error) {
196196

197197
queryable, engine := querier.New(cfg.Querier, t.distributor, t.storeQueryable, tombstonesLoader, prometheus.DefaultRegisterer)
198198

199-
t.api.RegisterQuerier(queryable, engine, t.distributor)
199+
// if we are not configured for single binary mode then the querier needs to register its paths externally
200+
registerExternally := cfg.Target != All
201+
handler := t.api.RegisterQuerier(queryable, engine, t.distributor, registerExternally)
202+
203+
// single binary mode requires a properly configured worker. if the operator did not attempt to configure the
204+
// worker we will attempt an automatic configuration here
205+
if cfg.Worker.Address == "" && cfg.Target == All {
206+
address := fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort)
207+
level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", address)
208+
cfg.Worker.Address = address
209+
}
200210

201211
// Query frontend worker will only be started after all its dependencies are started, not here.
202212
// Worker may also be nil, if not configured, which is OK.
203-
worker, err := frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger)
213+
worker, err := frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(handler), util.Logger)
204214
if err != nil {
205215
return
206216
}
@@ -611,6 +621,6 @@ var modules = map[ModuleName]module{
611621
},
612622

613623
All: {
614-
deps: []ModuleName{Querier, Ingester, Distributor, TableManager, DataPurger, StoreGateway},
624+
deps: []ModuleName{QueryFrontend, Querier, Ingester, Distributor, TableManager, DataPurger, StoreGateway},
615625
},
616626
}

pkg/ingester/ingester_v2.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"math"
78
"net/http"
89
"os"
910
"path/filepath"
@@ -431,9 +432,9 @@ func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesReq
431432
return &client.LabelValuesResponse{}, nil
432433
}
433434

434-
through := time.Now()
435-
from := through.Add(-i.cfg.TSDBConfig.Retention)
436-
q, err := db.Querier(from.Unix()*1000, through.Unix()*1000)
435+
// Since we ingester runs with a very limited TSDB retention, we can (and should) query
436+
// label values without any time range bound.
437+
q, err := db.Querier(0, math.MaxInt64)
437438
if err != nil {
438439
return nil, err
439440
}
@@ -460,9 +461,9 @@ func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesReque
460461
return &client.LabelNamesResponse{}, nil
461462
}
462463

463-
through := time.Now()
464-
from := through.Add(-i.cfg.TSDBConfig.Retention)
465-
q, err := db.Querier(from.Unix()*1000, through.Unix()*1000)
464+
// Since we ingester runs with a very limited TSDB retention, we can (and should) query
465+
// label names without any time range bound.
466+
q, err := db.Querier(0, math.MaxInt64)
466467
if err != nil {
467468
return nil, err
468469
}

0 commit comments

Comments
 (0)