diff --git a/docs/prometheus-frontend.yml b/docs/prometheus-frontend.yml new file mode 100644 index 00000000000..24878d2ef98 --- /dev/null +++ b/docs/prometheus-frontend.yml @@ -0,0 +1,43 @@ +# You can use the Cortex query frontend with any Prometheus-API compatible +# service, including Prometheus and Thanos. Use this config file to get +# the benefits of query parallelisation and caching. + +# Disable the requirement that every request to Cortex has a +# X-Scope-OrgID header. `fake` will be substituted in instead. +auth_enabled: false + +# We only want to run the query-frontend module. +target: query-frontend + +# We don't want the usual /api/prom prefix. +http_prefix: + +server: + http_listen_port: 9091 + +frontend: + split_queries_by_day: true + align_queries_with_step: true + cache_results: true + compress_responses: true + + results_cache: + max_freshness: 1m + cache: + + # We're going to use the in-process "FIFO" cache, but you can enable + # memcached below. + enable_fifocache: true + fifocache: + size: 1024 + validity: 24h + + # If you want to use a memcached cluster, configure a headless service + # in Kubernetes and Cortex will discover the individual instances using + # a SRV DNS query. Cortex will then do client-side hashing to spread + # the load evenly. + # memcached: + # memcached_client: + # host: memcached.default.svc.cluster.local + # service: memcached + # consistent_hash: true diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 19604dd3ba0..8b135dd33b4 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -53,6 +53,7 @@ type Config struct { Target moduleName `yaml:"target,omitempty"` AuthEnabled bool `yaml:"auth_enabled,omitempty"` PrintConfig bool `yaml:"-"` + HTTPPrefix string `yaml:"http_prefix"` Server server.Config `yaml:"server,omitempty"` Distributor distributor.Config `yaml:"distributor,omitempty"` @@ -82,6 +83,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { f.Var(&c.Target, "target", "target module (default All)") f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.") f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.") + f.StringVar(&c.HTTPPrefix, "http.prefix", "/api/prom", "HTTP path prefix for Cortex API.") c.Server.RegisterFlags(f) c.Distributor.RegisterFlags(f) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 411f19f70f0..7515e6a0fda 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -132,6 +132,14 @@ func (m *moduleName) Set(s string) error { } } +func (m *moduleName) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + return m.Set(s) +} + func (t *Cortex) initServer(cfg *Config) (err error) { t.server, err = server.New(cfg.Server) return @@ -259,7 +267,7 @@ func (t *Cortex) initQueryFrontend(cfg *Config) (err error) { } frontend.RegisterFrontendServer(t.server.GRPC, t.frontend) - t.server.HTTP.PathPrefix("/api/prom").Handler( + t.server.HTTP.PathPrefix(cfg.HTTPPrefix).Handler( t.httpAuthMiddleware.Wrap( t.frontend.Handler(), ), diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go index 9e13a3ee2d3..3c457fe282f 100644 --- a/pkg/querier/frontend/frontend.go +++ b/pkg/querier/frontend/frontend.go @@ -8,6 +8,8 @@ import ( "io/ioutil" "math/rand" "net/http" + "net/url" + "path" "sync" "time" @@ -57,6 +59,7 @@ type Config struct { CacheResults bool `yaml:"cache_results"` CompressResponses bool `yaml:"compress_responses"` queryrange.ResultsCacheConfig `yaml:"results_cache"` + DownstreamURL string `yaml:"downstream"` } // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -68,6 +71,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.") f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.") cfg.ResultsCacheConfig.RegisterFlags(f) + f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.") } // Frontend queues HTTP requests, dispatches them to backends, and handles retries @@ -99,6 +103,7 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e log: log, queues: map[string]chan *request{}, } + f.cond = sync.NewCond(&f.mtx) // Stack up the pipeline of various query range middlewares. var queryRangeMiddleware []queryrange.Middleware @@ -119,20 +124,44 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("retry", queryRangeDuration), queryrange.NewRetryMiddleware(log, cfg.MaxRetries)) } - // Finally, if the user selected any query range middleware, stitch it in. + // If the user has specified a downstream Prometheus, then we should + // forward requests to that. Otherwise we will wait for queries to + // contact us. var roundTripper http.RoundTripper = f + if cfg.DownstreamURL != "" { + u, err := url.Parse(cfg.DownstreamURL) + if err != nil { + return nil, err + } + + roundTripper = RoundTripFunc(func(r *http.Request) (*http.Response, error) { + r.URL.Scheme = u.Scheme + r.URL.Host = u.Host + r.URL.Path = path.Join(u.Path, r.URL.Path) + return http.DefaultTransport.RoundTrip(r) + }) + } + + // Finally, if the user selected any query range middleware, stitch it in. if len(queryRangeMiddleware) > 0 { roundTripper = queryrange.NewRoundTripper( - f, - queryrange.MergeMiddlewares(queryRangeMiddleware...).Wrap(&queryrange.ToRoundTripperMiddleware{Next: f}), + roundTripper, + queryrange.MergeMiddlewares(queryRangeMiddleware...).Wrap(&queryrange.ToRoundTripperMiddleware{Next: roundTripper}), limits, ) } f.roundTripper = roundTripper - f.cond = sync.NewCond(&f.mtx) return f, nil } +// RoundTripFunc is to http.RoundTripper what http.HandlerFunc is to http.Handler. +type RoundTripFunc func(*http.Request) (*http.Response, error) + +// RoundTrip implements http.RoundTripper. +func (f RoundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return f(r) +} + // Close stops new requests and errors out any pending requests. func (f *Frontend) Close() { f.mtx.Lock()