Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions docs/prometheus-frontend.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# You can use the Cortex query frontend with any Prometheus-API compatible
# service, including Prometheus and Thanos. Use this config file to get
# the benefits of query parallelisation and caching.

# Disable the requirement that every request to Cortex has a
# X-Scope-OrgID header. `fake` will be substituted in instead.
auth_enabled: false

# We only want to run the query-frontend module.
target: query-frontend

# We don't want the usual /api/prom prefix.
http_prefix:

server:
http_listen_port: 9091

frontend:
split_queries_by_day: true
align_queries_with_step: true
cache_results: true
compress_responses: true

results_cache:
max_freshness: 1m
cache:

# We're going to use the in-process "FIFO" cache, but you can enable
# memcached below.
enable_fifocache: true
fifocache:
size: 1024
validity: 24h

# If you want to use a memcached cluster, configure a headless service
# in Kubernetes and Cortex will discover the individual instances using
# a SRV DNS query. Cortex will then do client-side hashing to spread
# the load evenly.
# memcached:
# memcached_client:
# host: memcached.default.svc.cluster.local
# service: memcached
# consistent_hash: true
2 changes: 2 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Config struct {
Target moduleName `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
PrintConfig bool `yaml:"-"`
HTTPPrefix string `yaml:"http_prefix"`

Server server.Config `yaml:"server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Expand Down Expand Up @@ -82,6 +83,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
f.Var(&c.Target, "target", "target module (default All)")
f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.")
f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.")
f.StringVar(&c.HTTPPrefix, "http.prefix", "/api/prom", "HTTP path prefix for Cortex API.")

c.Server.RegisterFlags(f)
c.Distributor.RegisterFlags(f)
Expand Down
10 changes: 9 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ func (m *moduleName) Set(s string) error {
}
}

func (m *moduleName) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
if err := unmarshal(&s); err != nil {
return err
}
return m.Set(s)
}

func (t *Cortex) initServer(cfg *Config) (err error) {
t.server, err = server.New(cfg.Server)
return
Expand Down Expand Up @@ -259,7 +267,7 @@ func (t *Cortex) initQueryFrontend(cfg *Config) (err error) {
}

frontend.RegisterFrontendServer(t.server.GRPC, t.frontend)
t.server.HTTP.PathPrefix("/api/prom").Handler(
t.server.HTTP.PathPrefix(cfg.HTTPPrefix).Handler(
t.httpAuthMiddleware.Wrap(
t.frontend.Handler(),
),
Expand Down
37 changes: 33 additions & 4 deletions pkg/querier/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"path"
"sync"
"time"

Expand Down Expand Up @@ -57,6 +59,7 @@ type Config struct {
CacheResults bool `yaml:"cache_results"`
CompressResponses bool `yaml:"compress_responses"`
queryrange.ResultsCacheConfig `yaml:"results_cache"`
DownstreamURL string `yaml:"downstream"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand All @@ -68,6 +71,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
cfg.ResultsCacheConfig.RegisterFlags(f)
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
}

// Frontend queues HTTP requests, dispatches them to backends, and handles retries
Expand Down Expand Up @@ -99,6 +103,7 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e
log: log,
queues: map[string]chan *request{},
}
f.cond = sync.NewCond(&f.mtx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a move, used to be at bottom of the function. Prefer it here, but can do in different PR if necessary.


// Stack up the pipeline of various query range middlewares.
var queryRangeMiddleware []queryrange.Middleware
Expand All @@ -119,20 +124,44 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("retry", queryRangeDuration), queryrange.NewRetryMiddleware(log, cfg.MaxRetries))
}

// Finally, if the user selected any query range middleware, stitch it in.
// If the user has specified a downstream Prometheus, then we should
// forward requests to that. Otherwise we will wait for queries to
// contact us.
var roundTripper http.RoundTripper = f
if cfg.DownstreamURL != "" {
u, err := url.Parse(cfg.DownstreamURL)
if err != nil {
return nil, err
}

roundTripper = RoundTripFunc(func(r *http.Request) (*http.Response, error) {
r.URL.Scheme = u.Scheme
r.URL.Host = u.Host
r.URL.Path = path.Join(u.Path, r.URL.Path)
return http.DefaultTransport.RoundTrip(r)
})
}

// Finally, if the user selected any query range middleware, stitch it in.
if len(queryRangeMiddleware) > 0 {
roundTripper = queryrange.NewRoundTripper(
f,
queryrange.MergeMiddlewares(queryRangeMiddleware...).Wrap(&queryrange.ToRoundTripperMiddleware{Next: f}),
roundTripper,
queryrange.MergeMiddlewares(queryRangeMiddleware...).Wrap(&queryrange.ToRoundTripperMiddleware{Next: roundTripper}),
limits,
)
}
f.roundTripper = roundTripper
f.cond = sync.NewCond(&f.mtx)
return f, nil
}

// RoundTripFunc is to http.RoundTripper what http.HandlerFunc is to http.Handler.
type RoundTripFunc func(*http.Request) (*http.Response, error)

// RoundTrip implements http.RoundTripper.
func (f RoundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return f(r)
}

// Close stops new requests and errors out any pending requests.
func (f *Frontend) Close() {
f.mtx.Lock()
Expand Down