Skip to content

Commit 58c081a

Browse files
authored
Lifecycler errors don't make process exit immediately, but let other modules shutdown properly (#2251)
* Added ServiceFailureWatcher * Lifecycler now returns erors instead of exiting the process. * Components using the lifecycler now fail when lifecycler fails. * Don't run any cleanup if loop has failed. * Include main component name in the error. * Updated CHANGELOG.md * Added trivial test, mostly to show that nil watcher returns nil channel. * Changed error text to make lint happy * Moved failure watcher to services package. * Renamed serviceWatcher to subservicesWatcher * Renamed files Signed-off-by: Peter Štibraný <[email protected]>
1 parent c0b0238 commit 58c081a

File tree

8 files changed

+120
-20
lines changed

8 files changed

+120
-20
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
* [CHANGE] Utilize separate protos for rule state and storage. Experimental ruler API will not be functional until the rollout is complete. #2226
66
* [CHANGE] Frontend worker in querier now starts after all Querier module dependencies are started. This fixes issue where frontend worker started to send queries to querier before it was ready to serve them (mostly visible when using experimental blocks storage). #2246
7+
* [CHANGE] Lifecycler component now enters Failed state on errors, and doesn't exit the process. (Important if you're vendoring Cortex and use Lifecycler) #2251
78
* [FEATURE] Flusher target to flush the WAL.
89
* `-flusher.wal-dir` for the WAL directory to recover from.
910
* `-flusher.concurrent-flushes` for number of concurrent flushes.

pkg/compactor/compactor.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ type Compactor struct {
8787
ring *ring.Ring
8888

8989
// Subservices manager (ring, lifecycler)
90-
subservices *services.Manager
90+
subservices *services.Manager
91+
subservicesWatcher *services.FailureWatcher
9192

9293
// Metrics.
9394
compactionRunsStarted prometheus.Counter
@@ -181,6 +182,9 @@ func (c *Compactor) starting(ctx context.Context) error {
181182

182183
c.subservices, err = services.NewManager(c.ringLifecycler, c.ring)
183184
if err == nil {
185+
c.subservicesWatcher = services.NewFailureWatcher()
186+
c.subservicesWatcher.WatchManager(c.subservices)
187+
184188
err = services.StartManagerAndAwaitHealthy(ctx, c.subservices)
185189
}
186190

@@ -228,6 +232,8 @@ func (c *Compactor) running(ctx context.Context) error {
228232
c.compactUsersWithRetries(ctx)
229233
case <-ctx.Done():
230234
return nil
235+
case err := <-c.subservicesWatcher.Chan():
236+
return errors.Wrap(err, "compactor subservice failed")
231237
}
232238
}
233239
}

pkg/distributor/distributor.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
opentracing "github.com/opentracing/opentracing-go"
12+
"github.com/pkg/errors"
1213
"github.com/prometheus/client_golang/prometheus"
1314
"github.com/prometheus/client_golang/prometheus/promauto"
1415
"github.com/prometheus/common/model"
@@ -112,7 +113,8 @@ type Distributor struct {
112113
ingestionRateLimiter *limiter.RateLimiter
113114

114115
// Manager for subservices (HA Tracker, distributor ring and client pool)
115-
subservices *services.Manager
116+
subservices *services.Manager
117+
subservicesWatcher *services.FailureWatcher
116118
}
117119

118120
// Config contains the configuration require to
@@ -208,8 +210,10 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
208210
if err != nil {
209211
return nil, err
210212
}
213+
d.subservicesWatcher = services.NewFailureWatcher()
214+
d.subservicesWatcher.WatchManager(d.subservices)
211215

212-
d.Service = services.NewIdleService(d.starting, d.stopping)
216+
d.Service = services.NewBasicService(d.starting, d.running, d.stopping)
213217
return d, nil
214218
}
215219

@@ -218,6 +222,15 @@ func (d *Distributor) starting(ctx context.Context) error {
218222
return services.StartManagerAndAwaitHealthy(ctx, d.subservices)
219223
}
220224

225+
func (d *Distributor) running(ctx context.Context) error {
226+
select {
227+
case <-ctx.Done():
228+
return nil
229+
case err := <-d.subservicesWatcher.Chan():
230+
return errors.Wrap(err, "distributor subservice failed")
231+
}
232+
}
233+
221234
// Called after distributor is asked to stop via StopAsync.
222235
func (d *Distributor) stopping(_ error) error {
223236
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)

pkg/ingester/ingester.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,11 @@ type Ingester struct {
100100

101101
metrics *ingesterMetrics
102102

103-
chunkStore ChunkStore
104-
lifecycler *ring.Lifecycler
105-
limits *validation.Overrides
106-
limiter *SeriesLimiter
103+
chunkStore ChunkStore
104+
lifecycler *ring.Lifecycler
105+
limits *validation.Overrides
106+
limiter *SeriesLimiter
107+
subservicesWatcher *services.FailureWatcher
107108

108109
userStatesMtx sync.RWMutex // protects userStates and stopped
109110
userStates *userStates
@@ -170,6 +171,8 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
170171
return nil, err
171172
}
172173
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
174+
i.subservicesWatcher = services.NewFailureWatcher()
175+
i.subservicesWatcher.WatchService(i.lifecycler)
173176

174177
i.Service = services.NewBasicService(i.starting, i.loop, i.stopping)
175178
return i, nil
@@ -279,6 +282,9 @@ func (i *Ingester) loop(ctx context.Context) error {
279282

280283
case <-ctx.Done():
281284
return nil
285+
286+
case err := <-i.subservicesWatcher.Chan():
287+
return errors.Wrap(err, "ingester subservice failed")
282288
}
283289
}
284290
}

pkg/ingester/ingester_v2.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
126126
if err != nil {
127127
return nil, err
128128
}
129+
i.subservicesWatcher = services.NewFailureWatcher()
130+
i.subservicesWatcher.WatchService(i.lifecycler)
129131

130132
// Init the limter and instantiate the user states which depend on it
131133
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
@@ -209,6 +211,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
209211
}
210212
case <-ctx.Done():
211213
return nil
214+
case err := <-i.subservicesWatcher.Chan():
215+
return errors.Wrap(err, "ingester subservice failed")
212216
}
213217
}
214218
}

pkg/ring/lifecycler.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/go-kit/kit/log/level"
14+
perrors "github.com/pkg/errors"
1415
"github.com/prometheus/client_golang/prometheus"
1516
"github.com/prometheus/client_golang/prometheus/promauto"
1617

@@ -339,15 +340,10 @@ func (i *Lifecycler) HealthyInstancesCount() int {
339340
}
340341

341342
func (i *Lifecycler) loop(ctx context.Context) error {
342-
defer func() {
343-
level.Info(util.Logger).Log("msg", "member.loop() exited gracefully", "ring", i.RingName)
344-
}()
345-
346343
// First, see if we exist in the cluster, update our state to match if we do,
347344
// and add ourselves (without tokens) if we don't.
348345
if err := i.initRing(context.Background()); err != nil {
349-
level.Error(util.Logger).Log("msg", "failed to join the ring", "ring", i.RingName, "err", err)
350-
os.Exit(1)
346+
return perrors.Wrapf(err, "failed to join the ring %s", i.RingName)
351347
}
352348

353349
// We do various period tasks
@@ -370,16 +366,14 @@ func (i *Lifecycler) loop(ctx context.Context) error {
370366
// let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING
371367
// ingesters, but we also signal that it is not fully functional yet.
372368
if err := i.autoJoin(context.Background(), JOINING); err != nil {
373-
level.Error(util.Logger).Log("msg", "failed to pick tokens in the KV store", "ring", i.RingName, "err", err)
374-
os.Exit(1)
369+
return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
375370
}
376371

377372
level.Info(util.Logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName)
378373
observeChan = time.After(i.cfg.ObservePeriod)
379374
} else {
380375
if err := i.autoJoin(context.Background(), ACTIVE); err != nil {
381-
level.Error(util.Logger).Log("msg", "failed to pick tokens in the KV store", "ring", i.RingName, "err", err)
382-
os.Exit(1)
376+
return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
383377
}
384378
}
385379
}
@@ -416,6 +410,7 @@ func (i *Lifecycler) loop(ctx context.Context) error {
416410
f()
417411

418412
case <-ctx.Done():
413+
level.Info(util.Logger).Log("msg", "lifecycler loop() exited gracefully", "ring", i.RingName)
419414
return nil
420415
}
421416
}
@@ -425,7 +420,13 @@ func (i *Lifecycler) loop(ctx context.Context) error {
425420
// - send chunks to another ingester, if it can.
426421
// - otherwise, flush chunks to the chunk store.
427422
// - remove config from Consul.
428-
func (i *Lifecycler) stopping(_ error) error {
423+
func (i *Lifecycler) stopping(runningError error) error {
424+
if runningError != nil {
425+
// previously lifecycler just called os.Exit (from loop method)...
426+
// now it stops more gracefully, but also without doing any cleanup
427+
return nil
428+
}
429+
429430
heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod)
430431
defer heartbeatTicker.Stop()
431432

@@ -459,8 +460,7 @@ heartbeatLoop:
459460

460461
if !i.cfg.SkipUnregister {
461462
if err := i.unregister(context.Background()); err != nil {
462-
level.Error(util.Logger).Log("msg", "Failed to unregister from the KV store", "ring", i.RingName, "err", err)
463-
os.Exit(1)
463+
return perrors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName)
464464
}
465465
level.Info(util.Logger).Log("msg", "instance removed from the KV store", "ring", i.RingName)
466466
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package services
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
e2 "github.com/pkg/errors"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestNilServiceFailureWatcher(t *testing.T) {
13+
var w *FailureWatcher = nil
14+
15+
// prove it doesn't fail, but returns nil channel.
16+
require.Nil(t, w.Chan())
17+
}
18+
19+
func TestServiceFailureWatcher(t *testing.T) {
20+
w := NewFailureWatcher()
21+
22+
err := errors.New("this error doesn't end with dot")
23+
24+
failing := NewBasicService(nil, nil, func(_ error) error {
25+
return err
26+
})
27+
28+
w.WatchService(failing)
29+
30+
require.NoError(t, failing.StartAsync(context.Background()))
31+
32+
e := <-w.Chan()
33+
require.NotNil(t, e)
34+
require.Equal(t, err, e2.Cause(e))
35+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package services
2+
3+
import (
4+
"github.com/pkg/errors"
5+
)
6+
7+
// FailureWatcher waits for service failures, and passed them to the channel.
8+
type FailureWatcher struct {
9+
ch chan error
10+
}
11+
12+
func NewFailureWatcher() *FailureWatcher {
13+
return &FailureWatcher{ch: make(chan error)}
14+
}
15+
16+
// Returns channel for this watcher. If watcher is nil, returns nil channel.
17+
// Errors returned on the channel include failure case and service description.
18+
func (w *FailureWatcher) Chan() <-chan error {
19+
if w == nil {
20+
return nil
21+
}
22+
return w.ch
23+
}
24+
25+
func (w *FailureWatcher) WatchService(service Service) {
26+
service.AddListener(NewListener(nil, nil, nil, nil, func(from State, failure error) {
27+
w.ch <- errors.Wrapf(failure, "service %v failed", service)
28+
}))
29+
}
30+
31+
func (w *FailureWatcher) WatchManager(manager *Manager) {
32+
manager.AddListener(NewManagerListener(nil, nil, func(service Service) {
33+
w.ch <- errors.Wrapf(service.FailureCase(), "service %v failed", service)
34+
}))
35+
}

0 commit comments

Comments
 (0)