Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## master / unreleased

* [FEATURE] Add option to use jump hashing to load balance requests to memcached #1554
* [FEATURE] Add status page for HA tracker to distributors #1546

## 0.1.0 / 2019-08-07

Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (t *Cortex) initDistributor(cfg *Config) (err error) {

t.server.HTTP.HandleFunc("/all_user_stats", t.distributor.AllUserStatsHandler)
t.server.HTTP.Handle("/api/prom/push", t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.distributor.PushHandler)))
t.server.HTTP.Handle("/ha-tracker", t.distributor.Replicas)
return
}

Expand Down
27 changes: 10 additions & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type Distributor struct {
billingClient *billing.Client

// For handling HA replicas.
replicas *haTracker
Replicas *haTracker

// Per-user rate limiters.
ingestLimitersMtx sync.RWMutex
Expand All @@ -123,7 +123,6 @@ type Config struct {
BillingConfig billing.Config `yaml:"billing,omitempty"`
PoolConfig ingester_client.PoolConfig `yaml:"pool,omitempty"`

EnableHATracker bool `yaml:"enable_ha_tracker,omitempty"`
HATrackerConfig HATrackerConfig `yaml:"ha_tracker,omitempty"`

RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
Expand All @@ -143,7 +142,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.HATrackerConfig.RegisterFlags(f)

f.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.")
f.BoolVar(&cfg.EnableHATracker, "distributor.ha-tracker.enable", false, "Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels).")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.")
Expand All @@ -170,6 +168,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
replicationFactor.Set(float64(ring.ReplicationFactor()))
cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout

replicas, err := newClusterTracker(cfg.HATrackerConfig)
if err != nil {
return nil, err
}

d := &Distributor{
cfg: cfg,
ring: ring,
Expand All @@ -178,16 +181,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
limits: limits,
ingestLimiters: map[string]*rate.Limiter{},
quit: make(chan struct{}),
Replicas: replicas,
}

if cfg.EnableHATracker {
replicas, err := newClusterTracker(cfg.HATrackerConfig)
if err != nil {
return nil, err
}
d.replicas = replicas
}

go d.loop()

return d, nil
Expand Down Expand Up @@ -218,9 +213,7 @@ func (d *Distributor) loop() {
func (d *Distributor) Stop() {
close(d.quit)
d.ingesterPool.Stop()
if d.cfg.EnableHATracker {
d.replicas.stop()
}
d.Replicas.stop()
}

func (d *Distributor) tokenForLabels(userID string, labels []client.LabelAdapter) (uint32, error) {
Expand Down Expand Up @@ -279,7 +272,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica

// At this point we know we have both HA labels, we should lookup
// the cluster/instance here to see if we want to accept this sample.
err := d.replicas.checkReplica(ctx, userID, cluster, replica)
err := d.Replicas.checkReplica(ctx, userID, cluster, replica)
// checkReplica should only have returned an error if there was a real error talking to Consul, or if the replica labels don't match.
if err != nil { // Don't accept the sample.
return false, err
Expand All @@ -304,7 +297,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
// Count the total samples in, prior to validation or deuplication, for comparison with other metrics.
incomingSamples.WithLabelValues(userID).Add(float64(numSamples))

if d.cfg.EnableHATracker && d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
removeReplica, err = d.checkSample(ctx, userID, cluster, replica)
if err != nil {
Expand Down
32 changes: 23 additions & 9 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func TestDistributorPushHAInstances(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "user")

for i, tc := range []struct {
enableTracker bool
acceptedReplica string
testReplica string
cluster string
Expand All @@ -121,6 +122,7 @@ func TestDistributorPushHAInstances(t *testing.T) {
expectedCode int32
}{
{
enableTracker: true,
acceptedReplica: "instance0",
testReplica: "instance0",
cluster: "cluster0",
Expand All @@ -129,32 +131,44 @@ func TestDistributorPushHAInstances(t *testing.T) {
},
// The 202 indicates that we didn't accept this sample.
{
enableTracker: true,
acceptedReplica: "instance2",
testReplica: "instance0",
cluster: "cluster0",
samples: 5,
expectedCode: 202,
},
// If the HA tracker is disabled we should still accept samples that have both labels.
{
enableTracker: false,
acceptedReplica: "instance0",
testReplica: "instance0",
cluster: "cluster0",
samples: 5,
expectedResponse: success,
},
} {
for _, shardByAllLabels := range []bool{true, false} {
t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v)", i, shardByAllLabels), func(t *testing.T) {
d := prepare(t, 1, 1, 0, shardByAllLabels)
d.cfg.EnableHATracker = true
d.limits.Defaults.AcceptHASamples = true
codec := codec.Proto{Factory: ProtoReplicaDescFactory}
mock := kv.PrefixClient(consul.NewInMemoryClient(codec), "prefix")

r, err := newClusterTracker(HATrackerConfig{
KVStore: kv.Config{Mock: mock},
UpdateTimeout: 100 * time.Millisecond,
FailoverTimeout: time.Second,
})
assert.NoError(t, err)
d.replicas = r
if tc.enableTracker {
r, err := newClusterTracker(HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Mock: mock},
UpdateTimeout: 100 * time.Millisecond,
FailoverTimeout: time.Second,
})
assert.NoError(t, err)
d.Replicas = r
}

userID, err := user.ExtractOrgID(ctx)
assert.NoError(t, err)
err = d.replicas.checkReplica(ctx, userID, tc.cluster, tc.acceptedReplica)
err = d.Replicas.checkReplica(ctx, userID, tc.cluster, tc.acceptedReplica)
assert.NoError(t, err)

request := makeWriteRequestHA(tc.samples, tc.testReplica, tc.cluster)
Expand Down
37 changes: 25 additions & 12 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,26 @@ type haTracker struct {
// HATrackerConfig contains the configuration require to
// create a HA Tracker.
type HATrackerConfig struct {
EnableHATracker bool `yaml:"enable_ha_tracker,omitempty"`
// We should only update the timestamp if the difference
// between the stored timestamp and the time we received a sample at
// is more than this duration.
UpdateTimeout time.Duration
UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"`
// We should only failover to accepting samples from a replica
// other than the replica written in the KVStore if the difference
// between the stored timestamp and the time we received a sample is
// more than this duration
FailoverTimeout time.Duration
FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"`

KVStore kv.Config
}

// RegisterFlags adds the flags required to config this to the given FlagSet
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.EnableHATracker,
"distributor.ha-tracker.enable",
false,
"Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels).")
f.DurationVar(&cfg.UpdateTimeout,
"distributor.ha-tracker.update-timeout",
15*time.Second,
Expand All @@ -112,11 +117,6 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) {
codec := codec.Proto{Factory: ProtoReplicaDescFactory}

client, err := kv.NewClient(cfg.KVStore, codec)
if err != nil {
return nil, err
}

if cfg.FailoverTimeout <= cfg.UpdateTimeout {
return nil, fmt.Errorf("HA Tracker failover timeout must be greater than update timeout, %d is <= %d", cfg.FailoverTimeout, cfg.UpdateTimeout)
}
Expand All @@ -126,10 +126,17 @@ func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) {
cfg: cfg,
done: make(chan struct{}),
elected: map[string]ReplicaDesc{},
client: client,
cancel: cancel,
}
go t.loop(ctx)

if cfg.EnableHATracker {
client, err := kv.NewClient(cfg.KVStore, codec)
if err != nil {
return nil, err
}
t.client = client
go t.loop(ctx)
}
return &t, nil
}

Expand All @@ -155,8 +162,10 @@ func (c *haTracker) loop(ctx context.Context) {

// Stop ends calls the trackers cancel function, which will end the loop for WatchPrefix.
func (c *haTracker) stop() {
c.cancel()
<-c.done
if c.cfg.EnableHATracker {
c.cancel()
<-c.done
}
}

// CheckReplica checks the cluster and replica against the backing KVStore and local cache in the
Expand All @@ -167,6 +176,10 @@ func (c *haTracker) stop() {
// accepting samples from another replica for the cluster, so that there isn't a bunch of error's returned
// to customers clients.
func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica string) error {
// If HA tracking isn't enabled then accept the sample
if !c.cfg.EnableHATracker {
return nil
}
key := fmt.Sprintf("%s/%s", userID, cluster)
now := mtime.Now()
c.electedLock.RLock()
Expand Down
99 changes: 99 additions & 0 deletions pkg/distributor/ha_tracker_http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package distributor

import (
"html/template"
"net/http"
"sort"
"strings"
"time"

"github.com/prometheus/prometheus/pkg/timestamp"
)

const trackerTpl = `
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Cortex HA Tracker Status</title>
</head>
<body>
<h1>Cortex HA Tracker Status</h1>
<p>Current time: {{ .Now }}</p>
<table width="100%" border="1">
<thead>
<tr>
<th>User ID</th>
<th>Cluster</th>
<th>Replica</th>
<th>Elected Time</th>
<th>Time Until Update</th>
<th>Time Until Failover</th>
</tr>
</thead>
<tbody>
{{ range .Elected }}
<tr>
<td>{{ .UserID }}</td>
<td>{{ .Cluster }}</td>
<td>{{ .Replica }}</td>
<td>{{ .ElectedAt }}</td>
<td>{{ .UpdateTime }}</td>
<td>{{ .FailoverTime }}</td>
</tr>
{{ end }}
</tbody>
</table>
</body>
</html>`

var trackerTmpl *template.Template

func init() {
trackerTmpl = template.Must(template.New("ha-tracker").Parse(trackerTpl))
}

func (h *haTracker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.electedLock.RLock()
type replica struct {
UserID, Cluster, Replica string
ElectedAt time.Time
UpdateTime, FailoverTime time.Duration
}

electedReplicas := []replica{}
for key, desc := range h.elected {
chunks := strings.SplitN(key, "/", 2)

electedReplicas = append(electedReplicas, replica{
UserID: chunks[0],
Cluster: chunks[1],
Replica: desc.Replica,
ElectedAt: timestamp.Time(desc.ReceivedAt),
UpdateTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.UpdateTimeout)),
FailoverTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.FailoverTimeout)),
})
}
h.electedLock.RUnlock()

sort.Slice(electedReplicas, func(i, j int) bool {
first := electedReplicas[i]
second := electedReplicas[j]

if first.UserID != second.UserID {
return first.UserID < second.UserID
}
return first.Cluster < second.Cluster
})

if err := trackerTmpl.Execute(w, struct {
Elected []replica
Now time.Time
}{
Elected: electedReplicas,
Now: time.Now(),
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
Loading