diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7a378d55341..70d6a356901 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -569,9 +569,9 @@ func (i *Ingester) Watch(in *grpc_health_v1.HealthCheckRequest, stream grpc_heal // the addition removal of another ingester. Returns 204 when the ingester is // ready, 500 otherwise. func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) { - if i.lifecycler.IsReady(r.Context()) { + if err := i.lifecycler.CheckReady(r.Context()); err == nil { w.WriteHeader(http.StatusNoContent) } else { - w.WriteHeader(http.StatusServiceUnavailable) + http.Error(w, "Not ready: "+err.Error(), http.StatusServiceUnavailable) } } diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 70e11ec066e..a076f8f4f7d 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -153,30 +153,38 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer) (*Life return l, nil } -// IsReady is used to rate limit the number of ingesters that can be coming or +// CheckReady is used to rate limit the number of ingesters that can be coming or // going at any one time, by only returning true if all ingesters are active. -func (i *Lifecycler) IsReady(ctx context.Context) bool { +// The state latches: once we have gone ready we don't go un-ready +func (i *Lifecycler) CheckReady(ctx context.Context) error { i.readyLock.Lock() defer i.readyLock.Unlock() if i.ready { - return true + return nil } // Ingester always take at least minReadyDuration to become ready to work // around race conditions with ingesters exiting and updating the ring if time.Now().Sub(i.startTime) < i.cfg.MinReadyDuration { - return false + return fmt.Errorf("waiting for %v after startup", i.cfg.MinReadyDuration) } ringDesc, err := i.KVStore.Get(ctx, ConsulKey) if err != nil { level.Error(util.Logger).Log("msg", "error talking to consul", "err", err) - return false + return fmt.Errorf("error talking to consul: %s", err) } - i.ready = i.ready || ringDesc.(*Desc).Ready(i.cfg.RingConfig.HeartbeatTimeout) - return i.ready + if len(i.getTokens()) == 0 { + return fmt.Errorf("this ingester owns no tokens") + } + if err := ringDesc.(*Desc).Ready(i.cfg.RingConfig.HeartbeatTimeout); err != nil { + return err + } + + i.ready = true + return nil } // GetState returns the state of this ingester. diff --git a/pkg/ring/model.go b/pkg/ring/model.go index e1296801897..0aff8d842af 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -1,6 +1,7 @@ package ring import ( + "fmt" "sort" "time" @@ -115,19 +116,22 @@ func (d *Desc) FindIngestersByState(state IngesterState) []IngesterDesc { return result } -// Ready is true when all ingesters are active and healthy. -func (d *Desc) Ready(heartbeatTimeout time.Duration) bool { +// Ready returns no error when all ingesters are active and healthy. +func (d *Desc) Ready(heartbeatTimeout time.Duration) error { numTokens := len(d.Tokens) - for _, ingester := range d.Ingesters { + for id, ingester := range d.Ingesters { if time.Now().Sub(time.Unix(ingester.Timestamp, 0)) > heartbeatTimeout { - return false + return fmt.Errorf("ingester %s past heartbeat timeout", id) } else if ingester.State != ACTIVE { - return false + return fmt.Errorf("ingester %s in state %v", id, ingester.State) } numTokens += len(ingester.Tokens) } - return numTokens > 0 + if numTokens == 0 { + return fmt.Errorf("Not ready: no tokens in ring") + } + return nil } // TokensFor partitions the tokens into those for the given ID, and those for others.