Skip to content

Commit f107e5d

Browse files
authored
Implement periodic writing of alertmanager state to storage. (#4031)
* Implement periodic writing of alertmanager state to storage. When ring-based/sharding replication is enabled, the alertmanager state (silences, notification log) is periodically written to object storage so that it can be used to recover from an all-replica outage. Only one of the replicas is responsible for writing the state (position 0). Signed-off-by: Steve Simpson <[email protected]> * Review comments (NewTimerService, interval config). Unit test cleanup. Signed-off-by: Steve Simpson <[email protected]> * Review comments: Update command line documentation. Signed-off-by: Steve Simpson <[email protected]> * Review comments: Move Validate/RegisterFlags functions. Signed-off-by: Steve Simpson <[email protected]>
1 parent 68a917d commit f107e5d

File tree

6 files changed

+329
-1
lines changed

6 files changed

+329
-1
lines changed

docs/configuration/config-file-reference.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2081,6 +2081,14 @@ alertmanager_client:
20812081
# Skip validating server certificate.
20822082
# CLI flag: -alertmanager.alertmanager-client.tls-insecure-skip-verify
20832083
[tls_insecure_skip_verify: <boolean> | default = false]
2084+
2085+
# The interval between persisting the current alertmanager state (notification
2086+
# log and silences) to object storage. This is only used when sharding is
2087+
# enabled. This state is read when all replicas for a shard can not be
2088+
# contacted. In this scenario, having persisted the state more frequently will
2089+
# result in potentially fewer lost silences, and fewer duplicate notifications.
2090+
# CLI flag: -alertmanager.persist-interval
2091+
[persist_interval: <duration> | default = 15m]
20842092
```
20852093
20862094
### `alertmanager_storage_config`

pkg/alertmanager/alertmanager.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ type Config struct {
7373
ReplicationFactor int
7474
Replicator Replicator
7575
Store alertstore.AlertStore
76+
PersisterConfig PersisterConfig
7677
}
7778

7879
// An Alertmanager manages the alerts for one user.
@@ -81,6 +82,7 @@ type Alertmanager struct {
8182
api *api.API
8283
logger log.Logger
8384
state State
85+
persister *statePersister
8486
nflog *nflog.Log
8587
silences *silence.Silences
8688
marker types.Marker
@@ -163,7 +165,9 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
163165
am.state = cfg.Peer
164166
} else if cfg.ShardingEnabled {
165167
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication")
166-
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, cfg.Store, am.logger, am.registry)
168+
state := newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, cfg.Store, am.logger, am.registry)
169+
am.state = state
170+
am.persister = newStatePersister(cfg.PersisterConfig, cfg.UserID, state, cfg.Store, am.logger)
167171
} else {
168172
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication")
169173
am.state = &NilPeer{}
@@ -208,6 +212,12 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
208212
}
209213
}
210214

215+
if am.persister != nil {
216+
if err := am.persister.StartAsync(context.Background()); err != nil {
217+
return nil, errors.Wrap(err, "failed to start state persister service")
218+
}
219+
}
220+
211221
am.pipelineBuilder = notify.NewPipelineBuilder(am.registry)
212222

213223
am.wg.Add(1)
@@ -351,6 +361,10 @@ func (am *Alertmanager) Stop() {
351361
am.dispatcher.Stop()
352362
}
353363

364+
if am.persister != nil {
365+
am.persister.StopAsync()
366+
}
367+
354368
if service, ok := am.state.(services.Service); ok {
355369
service.StopAsync()
356370
}
@@ -362,6 +376,12 @@ func (am *Alertmanager) Stop() {
362376
func (am *Alertmanager) StopAndWait() {
363377
am.Stop()
364378

379+
if am.persister != nil {
380+
if err := am.persister.AwaitTerminated(context.Background()); err != nil {
381+
level.Warn(am.logger).Log("msg", "error while stopping state persister service", "err", err)
382+
}
383+
}
384+
365385
if service, ok := am.state.(services.Service); ok {
366386
if err := service.AwaitTerminated(context.Background()); err != nil {
367387
level.Warn(am.logger).Log("msg", "error while stopping ring-based replication service", "err", err)

pkg/alertmanager/multitenant.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ type MultitenantAlertmanagerConfig struct {
120120

121121
// For distributor.
122122
AlertmanagerClient ClientConfig `yaml:"alertmanager_client"`
123+
124+
// For the state persister.
125+
Persister PersisterConfig `yaml:",inline"`
123126
}
124127

125128
type ClusterConfig struct {
@@ -154,6 +157,8 @@ func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) {
154157

155158
cfg.AlertmanagerClient.RegisterFlagsWithPrefix("alertmanager.alertmanager-client", f)
156159

160+
cfg.Persister.RegisterFlagsWithPrefix("alertmanager", f)
161+
157162
cfg.ShardingRing.RegisterFlags(f)
158163
cfg.Store.RegisterFlags(f)
159164
cfg.Cluster.RegisterFlags(f)
@@ -174,6 +179,11 @@ func (cfg *MultitenantAlertmanagerConfig) Validate() error {
174179
if err := cfg.Store.Validate(); err != nil {
175180
return errors.Wrap(err, "invalid storage config")
176181
}
182+
183+
if err := cfg.Persister.Validate(); err != nil {
184+
return err
185+
}
186+
177187
return nil
178188
}
179189

@@ -856,6 +866,7 @@ func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amco
856866
Replicator: am,
857867
ReplicationFactor: am.cfg.ShardingRing.ReplicationFactor,
858868
Store: am.store,
869+
PersisterConfig: am.cfg.Persister,
859870
}, reg)
860871
if err != nil {
861872
return nil, fmt.Errorf("unable to start Alertmanager for user %v: %v", userID, err)

pkg/alertmanager/multitenant_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,29 @@ func mockAlertmanagerConfig(t *testing.T) *MultitenantAlertmanagerConfig {
8585
return cfg
8686
}
8787

88+
func TestMultitenantAlertmanagerConfig_Validate(t *testing.T) {
89+
// Default values only.
90+
{
91+
cfg := &MultitenantAlertmanagerConfig{}
92+
flagext.DefaultValues(cfg)
93+
assert.NoError(t, cfg.Validate())
94+
}
95+
// Invalid persist interval (zero).
96+
{
97+
cfg := &MultitenantAlertmanagerConfig{}
98+
flagext.DefaultValues(cfg)
99+
cfg.Persister.Interval = 0
100+
assert.Equal(t, errInvalidPersistInterval, cfg.Validate())
101+
}
102+
// Invalid persist interval (negative).
103+
{
104+
cfg := &MultitenantAlertmanagerConfig{}
105+
flagext.DefaultValues(cfg)
106+
cfg.Persister.Interval = -1
107+
assert.Equal(t, errInvalidPersistInterval, cfg.Validate())
108+
}
109+
}
110+
88111
func TestMultitenantAlertmanager_loadAndSyncConfigs(t *testing.T) {
89112
ctx := context.Background()
90113

pkg/alertmanager/state_persister.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package alertmanager
2+
3+
import (
4+
"context"
5+
"flag"
6+
"time"
7+
8+
"github.com/go-kit/kit/log"
9+
"github.com/go-kit/kit/log/level"
10+
"github.com/pkg/errors"
11+
"github.com/prometheus/alertmanager/cluster/clusterpb"
12+
13+
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
14+
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
15+
"github.com/cortexproject/cortex/pkg/util/services"
16+
)
17+
18+
const (
19+
defaultPersistTimeout = 30 * time.Second
20+
)
21+
22+
var (
23+
errInvalidPersistInterval = errors.New("invalid alertmanager persist interval, must be greater than zero")
24+
)
25+
26+
type PersisterConfig struct {
27+
Interval time.Duration `yaml:"persist_interval"`
28+
}
29+
30+
func (cfg *PersisterConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
31+
f.DurationVar(&cfg.Interval, prefix+".persist-interval", 15*time.Minute, "The interval between persisting the current alertmanager state (notification log and silences) to object storage. This is only used when sharding is enabled. This state is read when all replicas for a shard can not be contacted. In this scenario, having persisted the state more frequently will result in potentially fewer lost silences, and fewer duplicate notifications.")
32+
}
33+
34+
func (cfg *PersisterConfig) Validate() error {
35+
if cfg.Interval <= 0 {
36+
return errInvalidPersistInterval
37+
}
38+
return nil
39+
}
40+
41+
type PersistableState interface {
42+
State
43+
GetFullState() (*clusterpb.FullState, error)
44+
}
45+
46+
// statePersister periodically writes the alertmanager state to persistent storage.
47+
type statePersister struct {
48+
services.Service
49+
50+
state PersistableState
51+
store alertstore.AlertStore
52+
userID string
53+
logger log.Logger
54+
55+
timeout time.Duration
56+
}
57+
58+
// newStatePersister creates a new state persister.
59+
func newStatePersister(cfg PersisterConfig, userID string, state PersistableState, store alertstore.AlertStore, l log.Logger) *statePersister {
60+
61+
s := &statePersister{
62+
state: state,
63+
store: store,
64+
userID: userID,
65+
logger: l,
66+
timeout: defaultPersistTimeout,
67+
}
68+
69+
s.Service = services.NewTimerService(cfg.Interval, s.starting, s.iteration, nil)
70+
71+
return s
72+
}
73+
74+
func (s *statePersister) starting(ctx context.Context) error {
75+
// Waits until the state replicator is settled, so that state is not
76+
// persisted before obtaining some initial state.
77+
return s.state.WaitReady(ctx)
78+
}
79+
80+
func (s *statePersister) iteration(ctx context.Context) error {
81+
if err := s.persist(ctx); err != nil {
82+
level.Error(s.logger).Log("msg", "failed to persist state", "user", s.userID, "err", err)
83+
}
84+
return nil
85+
}
86+
87+
func (s *statePersister) persist(ctx context.Context) error {
88+
// Only the replica at position zero should write the state.
89+
if s.state.Position() != 0 {
90+
return nil
91+
}
92+
93+
level.Debug(s.logger).Log("msg", "persisting state", "user", s.userID)
94+
95+
fs, err := s.state.GetFullState()
96+
if err != nil {
97+
return err
98+
}
99+
100+
ctx, cancel := context.WithTimeout(ctx, s.timeout)
101+
defer cancel()
102+
103+
desc := alertspb.FullStateDesc{State: fs}
104+
if err := s.store.SetFullState(ctx, s.userID, desc); err != nil {
105+
return err
106+
}
107+
108+
return nil
109+
}

0 commit comments

Comments
 (0)