diff --git a/CHANGELOG.md b/CHANGELOG.md index 51a3400106c..c1804e5a0cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## master / unreleased * [FEATURE] Add option to use jump hashing to load balance requests to memcached #1554 +* [FEATURE] Add status page for HA tracker to distributors #1546 ## 0.1.0 / 2019-08-07 diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index c899556cb13..411f19f70f0 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -170,6 +170,7 @@ func (t *Cortex) initDistributor(cfg *Config) (err error) { t.server.HTTP.HandleFunc("/all_user_stats", t.distributor.AllUserStatsHandler) t.server.HTTP.Handle("/api/prom/push", t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.distributor.PushHandler))) + t.server.HTTP.Handle("/ha-tracker", t.distributor.Replicas) return } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index fb1110bb521..3716698c1ee 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -108,7 +108,7 @@ type Distributor struct { billingClient *billing.Client // For handling HA replicas. - replicas *haTracker + Replicas *haTracker // Per-user rate limiters. ingestLimitersMtx sync.RWMutex @@ -123,7 +123,6 @@ type Config struct { BillingConfig billing.Config `yaml:"billing,omitempty"` PoolConfig ingester_client.PoolConfig `yaml:"pool,omitempty"` - EnableHATracker bool `yaml:"enable_ha_tracker,omitempty"` HATrackerConfig HATrackerConfig `yaml:"ha_tracker,omitempty"` RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"` @@ -143,7 +142,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.HATrackerConfig.RegisterFlags(f) f.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.") - f.BoolVar(&cfg.EnableHATracker, "distributor.ha-tracker.enable", false, "Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels).") f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.") f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.") @@ -170,6 +168,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove replicationFactor.Set(float64(ring.ReplicationFactor())) cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout + replicas, err := newClusterTracker(cfg.HATrackerConfig) + if err != nil { + return nil, err + } + d := &Distributor{ cfg: cfg, ring: ring, @@ -178,16 +181,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove limits: limits, ingestLimiters: map[string]*rate.Limiter{}, quit: make(chan struct{}), + Replicas: replicas, } - - if cfg.EnableHATracker { - replicas, err := newClusterTracker(cfg.HATrackerConfig) - if err != nil { - return nil, err - } - d.replicas = replicas - } - go d.loop() return d, nil @@ -218,9 +213,7 @@ func (d *Distributor) loop() { func (d *Distributor) Stop() { close(d.quit) d.ingesterPool.Stop() - if d.cfg.EnableHATracker { - d.replicas.stop() - } + d.Replicas.stop() } func (d *Distributor) tokenForLabels(userID string, labels []client.LabelAdapter) (uint32, error) { @@ -279,7 +272,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica // At this point we know we have both HA labels, we should lookup // the cluster/instance here to see if we want to accept this sample. - err := d.replicas.checkReplica(ctx, userID, cluster, replica) + err := d.Replicas.checkReplica(ctx, userID, cluster, replica) // checkReplica should only have returned an error if there was a real error talking to Consul, or if the replica labels don't match. if err != nil { // Don't accept the sample. return false, err @@ -304,7 +297,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie // Count the total samples in, prior to validation or deuplication, for comparison with other metrics. incomingSamples.WithLabelValues(userID).Add(float64(numSamples)) - if d.cfg.EnableHATracker && d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 { + if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 { cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels) removeReplica, err = d.checkSample(ctx, userID, cluster, replica) if err != nil { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 5b968622d64..70c83a45ee7 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -113,6 +113,7 @@ func TestDistributorPushHAInstances(t *testing.T) { ctx = user.InjectOrgID(context.Background(), "user") for i, tc := range []struct { + enableTracker bool acceptedReplica string testReplica string cluster string @@ -121,6 +122,7 @@ func TestDistributorPushHAInstances(t *testing.T) { expectedCode int32 }{ { + enableTracker: true, acceptedReplica: "instance0", testReplica: "instance0", cluster: "cluster0", @@ -129,32 +131,44 @@ func TestDistributorPushHAInstances(t *testing.T) { }, // The 202 indicates that we didn't accept this sample. { + enableTracker: true, acceptedReplica: "instance2", testReplica: "instance0", cluster: "cluster0", samples: 5, expectedCode: 202, }, + // If the HA tracker is disabled we should still accept samples that have both labels. + { + enableTracker: false, + acceptedReplica: "instance0", + testReplica: "instance0", + cluster: "cluster0", + samples: 5, + expectedResponse: success, + }, } { for _, shardByAllLabels := range []bool{true, false} { t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v)", i, shardByAllLabels), func(t *testing.T) { d := prepare(t, 1, 1, 0, shardByAllLabels) - d.cfg.EnableHATracker = true d.limits.Defaults.AcceptHASamples = true codec := codec.Proto{Factory: ProtoReplicaDescFactory} mock := kv.PrefixClient(consul.NewInMemoryClient(codec), "prefix") - r, err := newClusterTracker(HATrackerConfig{ - KVStore: kv.Config{Mock: mock}, - UpdateTimeout: 100 * time.Millisecond, - FailoverTimeout: time.Second, - }) - assert.NoError(t, err) - d.replicas = r + if tc.enableTracker { + r, err := newClusterTracker(HATrackerConfig{ + EnableHATracker: true, + KVStore: kv.Config{Mock: mock}, + UpdateTimeout: 100 * time.Millisecond, + FailoverTimeout: time.Second, + }) + assert.NoError(t, err) + d.Replicas = r + } userID, err := user.ExtractOrgID(ctx) assert.NoError(t, err) - err = d.replicas.checkReplica(ctx, userID, tc.cluster, tc.acceptedReplica) + err = d.Replicas.checkReplica(ctx, userID, tc.cluster, tc.acceptedReplica) assert.NoError(t, err) request := makeWriteRequestHA(tc.samples, tc.testReplica, tc.cluster) diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index d7229575e10..0c6a2c8dd3c 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -80,21 +80,26 @@ type haTracker struct { // HATrackerConfig contains the configuration require to // create a HA Tracker. type HATrackerConfig struct { + EnableHATracker bool `yaml:"enable_ha_tracker,omitempty"` // We should only update the timestamp if the difference // between the stored timestamp and the time we received a sample at // is more than this duration. - UpdateTimeout time.Duration + UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"` // We should only failover to accepting samples from a replica // other than the replica written in the KVStore if the difference // between the stored timestamp and the time we received a sample is // more than this duration - FailoverTimeout time.Duration + FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"` KVStore kv.Config } -// RegisterFlags adds the flags required to config this to the given FlagSet +// RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.EnableHATracker, + "distributor.ha-tracker.enable", + false, + "Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels).") f.DurationVar(&cfg.UpdateTimeout, "distributor.ha-tracker.update-timeout", 15*time.Second, @@ -112,11 +117,6 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) { func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) { codec := codec.Proto{Factory: ProtoReplicaDescFactory} - client, err := kv.NewClient(cfg.KVStore, codec) - if err != nil { - return nil, err - } - if cfg.FailoverTimeout <= cfg.UpdateTimeout { return nil, fmt.Errorf("HA Tracker failover timeout must be greater than update timeout, %d is <= %d", cfg.FailoverTimeout, cfg.UpdateTimeout) } @@ -126,10 +126,17 @@ func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) { cfg: cfg, done: make(chan struct{}), elected: map[string]ReplicaDesc{}, - client: client, cancel: cancel, } - go t.loop(ctx) + + if cfg.EnableHATracker { + client, err := kv.NewClient(cfg.KVStore, codec) + if err != nil { + return nil, err + } + t.client = client + go t.loop(ctx) + } return &t, nil } @@ -155,8 +162,10 @@ func (c *haTracker) loop(ctx context.Context) { // Stop ends calls the trackers cancel function, which will end the loop for WatchPrefix. func (c *haTracker) stop() { - c.cancel() - <-c.done + if c.cfg.EnableHATracker { + c.cancel() + <-c.done + } } // CheckReplica checks the cluster and replica against the backing KVStore and local cache in the @@ -167,6 +176,10 @@ func (c *haTracker) stop() { // accepting samples from another replica for the cluster, so that there isn't a bunch of error's returned // to customers clients. func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica string) error { + // If HA tracking isn't enabled then accept the sample + if !c.cfg.EnableHATracker { + return nil + } key := fmt.Sprintf("%s/%s", userID, cluster) now := mtime.Now() c.electedLock.RLock() diff --git a/pkg/distributor/ha_tracker_http.go b/pkg/distributor/ha_tracker_http.go new file mode 100644 index 00000000000..68146b4224e --- /dev/null +++ b/pkg/distributor/ha_tracker_http.go @@ -0,0 +1,99 @@ +package distributor + +import ( + "html/template" + "net/http" + "sort" + "strings" + "time" + + "github.com/prometheus/prometheus/pkg/timestamp" +) + +const trackerTpl = ` + + +
+ +Current time: {{ .Now }}
+User ID | +Cluster | +Replica | +Elected Time | +Time Until Update | +Time Until Failover | +
---|---|---|---|---|---|
{{ .UserID }} | +{{ .Cluster }} | +{{ .Replica }} | +{{ .ElectedAt }} | +{{ .UpdateTime }} | +{{ .FailoverTime }} | +