diff --git a/CHANGELOG.md b/CHANGELOG.md index c23d1b95192..a1837f1e865 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [CHANGE] Utilize separate protos for rule state and storage. Experimental ruler API will not be functional until the rollout is complete. #2226 * [CHANGE] Frontend worker in querier now starts after all Querier module dependencies are started. This fixes issue where frontend worker started to send queries to querier before it was ready to serve them (mostly visible when using experimental blocks storage). #2246 +* [CHANGE] Lifecycler component now enters Failed state on errors, and doesn't exit the process. (Important if you're vendoring Cortex and use Lifecycler) #2251 * [FEATURE] Flusher target to flush the WAL. * `-flusher.wal-dir` for the WAL directory to recover from. * `-flusher.concurrent-flushes` for number of concurrent flushes. diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index b6bb327f784..b506b1976f9 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -87,7 +87,8 @@ type Compactor struct { ring *ring.Ring // Subservices manager (ring, lifecycler) - subservices *services.Manager + subservices *services.Manager + subservicesWatcher *services.FailureWatcher // Metrics. compactionRunsStarted prometheus.Counter @@ -181,6 +182,9 @@ func (c *Compactor) starting(ctx context.Context) error { c.subservices, err = services.NewManager(c.ringLifecycler, c.ring) if err == nil { + c.subservicesWatcher = services.NewFailureWatcher() + c.subservicesWatcher.WatchManager(c.subservices) + err = services.StartManagerAndAwaitHealthy(ctx, c.subservices) } @@ -228,6 +232,8 @@ func (c *Compactor) running(ctx context.Context) error { c.compactUsersWithRetries(ctx) case <-ctx.Done(): return nil + case err := <-c.subservicesWatcher.Chan(): + return errors.Wrap(err, "compactor subservice failed") } } } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index aaaf8f73fcf..45e3cb1dffc 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -9,6 +9,7 @@ import ( "time" opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -112,7 +113,8 @@ type Distributor struct { ingestionRateLimiter *limiter.RateLimiter // Manager for subservices (HA Tracker, distributor ring and client pool) - subservices *services.Manager + subservices *services.Manager + subservicesWatcher *services.FailureWatcher } // Config contains the configuration require to @@ -208,8 +210,10 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove if err != nil { return nil, err } + d.subservicesWatcher = services.NewFailureWatcher() + d.subservicesWatcher.WatchManager(d.subservices) - d.Service = services.NewIdleService(d.starting, d.stopping) + d.Service = services.NewBasicService(d.starting, d.running, d.stopping) return d, nil } @@ -218,6 +222,15 @@ func (d *Distributor) starting(ctx context.Context) error { return services.StartManagerAndAwaitHealthy(ctx, d.subservices) } +func (d *Distributor) running(ctx context.Context) error { + select { + case <-ctx.Done(): + return nil + case err := <-d.subservicesWatcher.Chan(): + return errors.Wrap(err, "distributor subservice failed") + } +} + // Called after distributor is asked to stop via StopAsync. func (d *Distributor) stopping(_ error) error { return services.StopManagerAndAwaitStopped(context.Background(), d.subservices) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 1cf95c403c5..1aa500abd9b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -100,10 +100,11 @@ type Ingester struct { metrics *ingesterMetrics - chunkStore ChunkStore - lifecycler *ring.Lifecycler - limits *validation.Overrides - limiter *SeriesLimiter + chunkStore ChunkStore + lifecycler *ring.Lifecycler + limits *validation.Overrides + limiter *SeriesLimiter + subservicesWatcher *services.FailureWatcher userStatesMtx sync.RWMutex // protects userStates and stopped userStates *userStates @@ -170,6 +171,8 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c return nil, err } i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) + i.subservicesWatcher = services.NewFailureWatcher() + i.subservicesWatcher.WatchService(i.lifecycler) i.Service = services.NewBasicService(i.starting, i.loop, i.stopping) return i, nil @@ -279,6 +282,9 @@ func (i *Ingester) loop(ctx context.Context) error { case <-ctx.Done(): return nil + + case err := <-i.subservicesWatcher.Chan(): + return errors.Wrap(err, "ingester subservice failed") } } } diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 29d12a9d8d2..73fc4611d43 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -126,6 +126,8 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, if err != nil { return nil, err } + i.subservicesWatcher = services.NewFailureWatcher() + i.subservicesWatcher.WatchService(i.lifecycler) // Init the limter and instantiate the user states which depend on it i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) @@ -209,6 +211,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error { } case <-ctx.Done(): return nil + case err := <-i.subservicesWatcher.Chan(): + return errors.Wrap(err, "ingester subservice failed") } } } diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index f07cd7a319a..62d6ca63522 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -11,6 +11,7 @@ import ( "time" "github.com/go-kit/kit/log/level" + perrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -339,15 +340,10 @@ func (i *Lifecycler) HealthyInstancesCount() int { } func (i *Lifecycler) loop(ctx context.Context) error { - defer func() { - level.Info(util.Logger).Log("msg", "member.loop() exited gracefully", "ring", i.RingName) - }() - // First, see if we exist in the cluster, update our state to match if we do, // and add ourselves (without tokens) if we don't. if err := i.initRing(context.Background()); err != nil { - level.Error(util.Logger).Log("msg", "failed to join the ring", "ring", i.RingName, "err", err) - os.Exit(1) + return perrors.Wrapf(err, "failed to join the ring %s", i.RingName) } // We do various period tasks @@ -370,16 +366,14 @@ func (i *Lifecycler) loop(ctx context.Context) error { // let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING // ingesters, but we also signal that it is not fully functional yet. if err := i.autoJoin(context.Background(), JOINING); err != nil { - level.Error(util.Logger).Log("msg", "failed to pick tokens in the KV store", "ring", i.RingName, "err", err) - os.Exit(1) + return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) } level.Info(util.Logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName) observeChan = time.After(i.cfg.ObservePeriod) } else { if err := i.autoJoin(context.Background(), ACTIVE); err != nil { - level.Error(util.Logger).Log("msg", "failed to pick tokens in the KV store", "ring", i.RingName, "err", err) - os.Exit(1) + return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) } } } @@ -416,6 +410,7 @@ func (i *Lifecycler) loop(ctx context.Context) error { f() case <-ctx.Done(): + level.Info(util.Logger).Log("msg", "lifecycler loop() exited gracefully", "ring", i.RingName) return nil } } @@ -425,7 +420,13 @@ func (i *Lifecycler) loop(ctx context.Context) error { // - send chunks to another ingester, if it can. // - otherwise, flush chunks to the chunk store. // - remove config from Consul. -func (i *Lifecycler) stopping(_ error) error { +func (i *Lifecycler) stopping(runningError error) error { + if runningError != nil { + // previously lifecycler just called os.Exit (from loop method)... + // now it stops more gracefully, but also without doing any cleanup + return nil + } + heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod) defer heartbeatTicker.Stop() @@ -459,8 +460,7 @@ heartbeatLoop: if !i.cfg.SkipUnregister { if err := i.unregister(context.Background()); err != nil { - level.Error(util.Logger).Log("msg", "Failed to unregister from the KV store", "ring", i.RingName, "err", err) - os.Exit(1) + return perrors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) } level.Info(util.Logger).Log("msg", "instance removed from the KV store", "ring", i.RingName) } diff --git a/pkg/util/services/failure_watch_test.go b/pkg/util/services/failure_watch_test.go new file mode 100644 index 00000000000..b21fc6010a5 --- /dev/null +++ b/pkg/util/services/failure_watch_test.go @@ -0,0 +1,35 @@ +package services + +import ( + "context" + "errors" + "testing" + + e2 "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestNilServiceFailureWatcher(t *testing.T) { + var w *FailureWatcher = nil + + // prove it doesn't fail, but returns nil channel. + require.Nil(t, w.Chan()) +} + +func TestServiceFailureWatcher(t *testing.T) { + w := NewFailureWatcher() + + err := errors.New("this error doesn't end with dot") + + failing := NewBasicService(nil, nil, func(_ error) error { + return err + }) + + w.WatchService(failing) + + require.NoError(t, failing.StartAsync(context.Background())) + + e := <-w.Chan() + require.NotNil(t, e) + require.Equal(t, err, e2.Cause(e)) +} diff --git a/pkg/util/services/failure_watcher.go b/pkg/util/services/failure_watcher.go new file mode 100644 index 00000000000..9b19f7b15cb --- /dev/null +++ b/pkg/util/services/failure_watcher.go @@ -0,0 +1,35 @@ +package services + +import ( + "github.com/pkg/errors" +) + +// FailureWatcher waits for service failures, and passed them to the channel. +type FailureWatcher struct { + ch chan error +} + +func NewFailureWatcher() *FailureWatcher { + return &FailureWatcher{ch: make(chan error)} +} + +// Returns channel for this watcher. If watcher is nil, returns nil channel. +// Errors returned on the channel include failure case and service description. +func (w *FailureWatcher) Chan() <-chan error { + if w == nil { + return nil + } + return w.ch +} + +func (w *FailureWatcher) WatchService(service Service) { + service.AddListener(NewListener(nil, nil, nil, nil, func(from State, failure error) { + w.ch <- errors.Wrapf(failure, "service %v failed", service) + })) +} + +func (w *FailureWatcher) WatchManager(manager *Manager) { + manager.AddListener(NewManagerListener(nil, nil, func(service Service) { + w.ch <- errors.Wrapf(service.FailureCase(), "service %v failed", service) + })) +}