diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b203060b82..036cdd1368a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392 * [ENHANCEMENT] Added `cortex_distributor_latest_seen_sample_timestamp_seconds` metric to see how far behind Prometheus servers are in sending data. #2371 * [ENHANCEMENT] FIFO cache to support eviction based on memory usage. The `-.fifocache.size` CLI flag has been renamed to `-.fifocache.max-size-items` as well as its YAML config option `size` renamed to `max_size_items`. Added `-.fifocache.max-size-bytes` CLI flag and YAML config option `max_size_bytes` to specify memory limit of the cache. #2319 +* [ENHANCEMENT] Single Binary: Added query-frontend to the single binary. Single binary users will now benefit from various query-frontend features. Primarily: sharding, parallelization, load shedding, additional caching (if configured), and query retries. #2437 * [ENHANCEMENT] Allow 1w (where w denotes week) and 1y (where y denotes year) when setting `-store.cache-lookups-older-than` and `-store.max-look-back-period`. #2454 * [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372 * [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400 diff --git a/integration/e2e/util.go b/integration/e2e/util.go index 23116216ef9..83d65e87ff5 100644 --- a/integration/e2e/util.go +++ b/integration/e2e/util.go @@ -68,15 +68,20 @@ func TimeToMilliseconds(t time.Time) int64 { return int64(math.Round(float64(t.UnixNano()) / 1000000)) } -func GenerateSeries(name string, ts time.Time) (series []prompb.TimeSeries, vector model.Vector) { +func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label) (series []prompb.TimeSeries, vector model.Vector) { tsMillis := TimeToMilliseconds(ts) value := rand.Float64() - // Generate the series - series = append(series, prompb.TimeSeries{ - Labels: []prompb.Label{ + lbls := append( + []prompb.Label{ {Name: labels.MetricName, Value: name}, }, + additionalLabels..., + ) + + // Generate the series + series = append(series, prompb.TimeSeries{ + Labels: lbls, Samples: []prompb.Sample{ {Value: value, Timestamp: tsMillis}, }, @@ -85,6 +90,9 @@ func GenerateSeries(name string, ts time.Time) (series []prompb.TimeSeries, vect // Generate the expected vector when querying it metric := model.Metric{} metric[labels.MetricName] = model.LabelValue(name) + for _, lbl := range additionalLabels { + metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + } vector = append(vector, &model.Sample{ Metric: metric, diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index b16c0aac591..bd45780516b 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -112,6 +112,18 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) { return value, err } +// LabelValues gets label values +func (c *Client) LabelValues(label string) (model.LabelValues, error) { + value, _, err := c.querierClient.LabelValues(context.Background(), label) + return value, err +} + +// LabelNames gets label names +func (c *Client) LabelNames() ([]string, error) { + value, _, err := c.querierClient.LabelNames(context.Background()) + return value, err +} + type addOrgIDRoundTripper struct { orgID string next http.RoundTripper diff --git a/integration/getting_started_single_process_config_test.go b/integration/getting_started_single_process_config_test.go index 6455b6d8a87..c5cd709178d 100644 --- a/integration/getting_started_single_process_config_test.go +++ b/integration/getting_started_single_process_config_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -38,7 +39,7 @@ func TestGettingStartedSingleProcessConfigWithChunksStorage(t *testing.T) { // Push some series to Cortex. now := time.Now() - series, expectedVector := generateSeries("series_1", now) + series, expectedVector := generateSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"}) res, err := c.Push(series) require.NoError(t, err) @@ -49,6 +50,14 @@ func TestGettingStartedSingleProcessConfigWithChunksStorage(t *testing.T) { require.NoError(t, err) require.Equal(t, model.ValVector, result.Type()) assert.Equal(t, expectedVector, result.(model.Vector)) + + labelValues, err := c.LabelValues("foo") + require.NoError(t, err) + require.Equal(t, model.LabelValues{"bar"}, labelValues) + + labelNames, err := c.LabelNames() + require.NoError(t, err) + require.Equal(t, []string{"__name__", "foo"}, labelNames) } func TestGettingStartedSingleProcessConfigWithBlocksStorage(t *testing.T) { @@ -82,7 +91,7 @@ func TestGettingStartedSingleProcessConfigWithBlocksStorage(t *testing.T) { // Push some series to Cortex. now := time.Now() - series, expectedVector := generateSeries("series_1", now) + series, expectedVector := generateSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"}) res, err := c.Push(series) require.NoError(t, err) @@ -93,4 +102,12 @@ func TestGettingStartedSingleProcessConfigWithBlocksStorage(t *testing.T) { require.NoError(t, err) require.Equal(t, model.ValVector, result.Type()) assert.Equal(t, expectedVector, result.(model.Vector)) + + labelValues, err := c.LabelValues("foo") + require.NoError(t, err) + require.Equal(t, model.LabelValues{"bar"}, labelValues) + + labelNames, err := c.LabelNames() + require.NoError(t, err) + require.Equal(t, []string{"__name__", "foo"}, labelNames) } diff --git a/pkg/api/api.go b/pkg/api/api.go index ce8527ac7a3..ddcd035f86f 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -17,6 +17,8 @@ import ( "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" + "github.com/gorilla/mux" + "github.com/cortexproject/cortex/pkg/alertmanager" "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/compactor" @@ -79,15 +81,19 @@ func New(cfg Config, s *server.Server, logger log.Logger) (*API, error) { } func (a *API) RegisterRoute(path string, handler http.Handler, auth bool, methods ...string) { + a.registerRouteWithRouter(a.server.HTTP, path, handler, auth, methods...) +} + +func (a *API) registerRouteWithRouter(router *mux.Router, path string, handler http.Handler, auth bool, methods ...string) { level.Debug(a.logger).Log("msg", "api: registering route", "methods", strings.Join(methods, ","), "path", path, "auth", auth) if auth { handler = a.authMiddleware.Wrap(handler) } if len(methods) == 0 { - a.server.HTTP.Path(path).Handler(handler) + router.Path(path).Handler(handler) return } - a.server.HTTP.Path(path).Methods(methods...).Handler(handler) + router.Path(path).Methods(methods...).Handler(handler) } func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth bool, methods ...string) { @@ -236,7 +242,7 @@ func (a *API) RegisterCompactor(c *compactor.Compactor) { // RegisterQuerier registers the Prometheus routes supported by the // Cortex querier service. Currently this can not be registered simultaneously // with the QueryFrontend. -func (a *API) RegisterQuerier(queryable storage.Queryable, engine *promql.Engine, distributor *distributor.Distributor) { +func (a *API) RegisterQuerier(queryable storage.Queryable, engine *promql.Engine, distributor *distributor.Distributor, registerRoutesExternally bool) http.Handler { api := v1.NewAPI( engine, queryable, @@ -255,36 +261,46 @@ func (a *API) RegisterQuerier(queryable storage.Queryable, engine *promql.Engine &v1.PrometheusVersion{}, ) - promRouter := route.New().WithPrefix(a.cfg.ServerPrefix + a.cfg.PrometheusHTTPPrefix + "/api/v1") - api.Register(promRouter) - promHandler := fakeRemoteAddr(promRouter) - - a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/read", querier.RemoteReadHandler(queryable), true, "GET") - a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/query", promHandler, true, "GET", "POST") - a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/query_range", promHandler, true, "GET", "POST") - a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/labels", promHandler, true, "GET", "POST") - a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/label/{name}/values", promHandler, true, "GET") - a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/series", promHandler, true, "GET", "POST", "DELETE") - a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/metadata", promHandler, true, "GET") - + // these routes are always registered to the default server a.RegisterRoute("/api/v1/user_stats", http.HandlerFunc(distributor.UserStatsHandler), true) a.RegisterRoute("/api/v1/chunks", querier.ChunksHandler(queryable), true) - // Legacy Routes a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/user_stats", http.HandlerFunc(distributor.UserStatsHandler), true) a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/chunks", querier.ChunksHandler(queryable), true) + // these routes are either registered the default server OR to an internal mux. The internal mux is + // for use in a single binary mode when both the query frontend and the querier would attempt to claim these routes + // TODO: Add support to expose querier paths with a configurable prefix in single binary mode. + router := mux.NewRouter() + if registerRoutesExternally { + router = a.server.HTTP + } + + promRouter := route.New().WithPrefix(a.cfg.ServerPrefix + a.cfg.PrometheusHTTPPrefix + "/api/v1") + api.Register(promRouter) + promHandler := fakeRemoteAddr(promRouter) + + a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/read", querier.RemoteReadHandler(queryable), true, "GET") + a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/query", promHandler, true, "GET", "POST") + a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/query_range", promHandler, true, "GET", "POST") + a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/labels", promHandler, true, "GET", "POST") + a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/label/{name}/values", promHandler, true, "GET") + a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/series", promHandler, true, "GET", "POST", "DELETE") + a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/metadata", promHandler, true, "GET") + legacyPromRouter := route.New().WithPrefix(a.cfg.ServerPrefix + a.cfg.LegacyHTTPPrefix + "/api/v1") api.Register(legacyPromRouter) legacyPromHandler := fakeRemoteAddr(legacyPromRouter) - a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/read", querier.RemoteReadHandler(queryable), true, "GET") - a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/query", legacyPromHandler, true, "GET", "POST") - a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/query_range", legacyPromHandler, true, "GET", "POST") - a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/labels", legacyPromHandler, true, "GET", "POST") - a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/label/{name}/values", legacyPromHandler, true, "GET") - a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/series", legacyPromHandler, true, "GET", "POST", "DELETE") - a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/api/v1/metadata", legacyPromHandler, true, "GET") + a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/read", querier.RemoteReadHandler(queryable), true, "GET") + a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/query", legacyPromHandler, true, "GET", "POST") + a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/query_range", legacyPromHandler, true, "GET", "POST") + a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/labels", legacyPromHandler, true, "GET", "POST") + a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/label/{name}/values", legacyPromHandler, true, "GET") + a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/series", legacyPromHandler, true, "GET", "POST", "DELETE") + a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/metadata", legacyPromHandler, true, "GET") + + return router } // RegisterQueryFrontend registers the Prometheus routes supported by the diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index ee976fd607e..2a0d8603583 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -196,11 +196,21 @@ func (t *Cortex) initQuerier(cfg *Config) (serv services.Service, err error) { queryable, engine := querier.New(cfg.Querier, t.distributor, t.storeQueryable, tombstonesLoader, prometheus.DefaultRegisterer) - t.api.RegisterQuerier(queryable, engine, t.distributor) + // if we are not configured for single binary mode then the querier needs to register its paths externally + registerExternally := cfg.Target != All + handler := t.api.RegisterQuerier(queryable, engine, t.distributor, registerExternally) + + // single binary mode requires a properly configured worker. if the operator did not attempt to configure the + // worker we will attempt an automatic configuration here + if cfg.Worker.Address == "" && cfg.Target == All { + address := fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort) + level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", address) + cfg.Worker.Address = address + } // Query frontend worker will only be started after all its dependencies are started, not here. // Worker may also be nil, if not configured, which is OK. - worker, err := frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger) + worker, err := frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(handler), util.Logger) if err != nil { return } @@ -611,6 +621,6 @@ var modules = map[ModuleName]module{ }, All: { - deps: []ModuleName{Querier, Ingester, Distributor, TableManager, DataPurger, StoreGateway}, + deps: []ModuleName{QueryFrontend, Querier, Ingester, Distributor, TableManager, DataPurger, StoreGateway}, }, } diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index aa622ac34a4..f3ea5dc68f4 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math" "net/http" "os" "path/filepath" @@ -431,9 +432,9 @@ func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesReq return &client.LabelValuesResponse{}, nil } - through := time.Now() - from := through.Add(-i.cfg.TSDBConfig.Retention) - q, err := db.Querier(from.Unix()*1000, through.Unix()*1000) + // Since we ingester runs with a very limited TSDB retention, we can (and should) query + // label values without any time range bound. + q, err := db.Querier(0, math.MaxInt64) if err != nil { return nil, err } @@ -460,9 +461,9 @@ func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesReque return &client.LabelNamesResponse{}, nil } - through := time.Now() - from := through.Add(-i.cfg.TSDBConfig.Retention) - q, err := db.Querier(from.Unix()*1000, through.Unix()*1000) + // Since we ingester runs with a very limited TSDB retention, we can (and should) query + // label names without any time range bound. + q, err := db.Querier(0, math.MaxInt64) if err != nil { return nil, err }