Skip to content

Commit 2de0c64

Browse files
committed
Retry generate tokens if there is more than 1 ingester being registered
Signed-off-by: alanprot <[email protected]>
1 parent f56912b commit 2de0c64

File tree

12 files changed

+234
-213
lines changed

12 files changed

+234
-213
lines changed

pkg/alertmanager/lifecycle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func (r *MultitenantAlertmanager) OnRingInstanceRegister(lc *ring.BasicLifecycle
1313
tokens = instanceDesc.GetTokens()
1414
}
1515

16-
newTokens := lc.GenerateTokens(&ringDesc, instanceID, instanceDesc.Zone, RingNumTokens-len(tokens))
16+
newTokens := lc.GenerateTokens(&ringDesc, instanceID, instanceDesc.Zone, RingNumTokens-len(tokens), true)
1717

1818
// Tokens sorting will be enforced by the parent caller.
1919
tokens = append(tokens, newTokens...)

pkg/alertmanager/multitenant_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,7 +1042,7 @@ func TestMultitenantAlertmanager_InitialSyncWithSharding(t *testing.T) {
10421042
name: "with an instance already in the ring with ACTIVE state and all tokens",
10431043
existing: true,
10441044
initialState: ring.ACTIVE,
1045-
initialTokens: tg.GenerateTokens(ring.NewDesc(), "id1", "", 128),
1045+
initialTokens: tg.GenerateTokens(ring.NewDesc(), "id1", "", 128, true),
10461046
},
10471047
{
10481048
name: "with an instance already in the ring with LEAVING state and all tokens",
@@ -1524,7 +1524,7 @@ func TestMultitenantAlertmanager_RingLifecyclerShouldAutoForgetUnhealthyInstance
15241524
tg := ring.NewRandomTokenGenerator()
15251525
require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) {
15261526
ringDesc := ring.GetOrCreateRingDesc(in)
1527-
instance := ringDesc.AddIngester(unhealthyInstanceID, "127.0.0.1", "", tg.GenerateTokens(ringDesc, unhealthyInstanceID, "", RingNumTokens), ring.ACTIVE, time.Now())
1527+
instance := ringDesc.AddIngester(unhealthyInstanceID, "127.0.0.1", "", tg.GenerateTokens(ringDesc, unhealthyInstanceID, "", RingNumTokens, true), ring.ACTIVE, time.Now())
15281528
instance.Timestamp = time.Now().Add(-(ringAutoForgetUnhealthyPeriods + 1) * heartbeatTimeout).Unix()
15291529
ringDesc.Ingesters[unhealthyInstanceID] = instance
15301530

pkg/distributor/distributor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1944,7 +1944,7 @@ func BenchmarkDistributor_Push(b *testing.B) {
19441944
err := kvStore.CAS(context.Background(), ingester.RingKey,
19451945
func(_ interface{}) (interface{}, bool, error) {
19461946
d := &ring.Desc{}
1947-
d.AddIngester("ingester-1", "127.0.0.1", "", tg.GenerateTokens(d, "ingester-1", "", 128), ring.ACTIVE, time.Now())
1947+
d.AddIngester("ingester-1", "127.0.0.1", "", tg.GenerateTokens(d, "ingester-1", "", 128, true), ring.ACTIVE, time.Now())
19481948
return d, true, nil
19491949
},
19501950
)

pkg/ring/basic_lifecycler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ func (l *BasicLifecycler) verifyTokens(ctx context.Context) bool {
371371
needTokens := l.cfg.NumTokens - len(actualTokens)
372372

373373
level.Info(l.logger).Log("msg", "generating new tokens", "count", needTokens, "ring", l.ringName)
374-
newTokens := l.GenerateTokens(r, l.cfg.ID, l.cfg.Zone, needTokens)
374+
newTokens := l.GenerateTokens(r, l.cfg.ID, l.cfg.Zone, needTokens, true)
375375

376376
actualTokens = append(actualTokens, newTokens...)
377377
sort.Sort(actualTokens)

pkg/ring/lifecycler.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,7 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
721721
needTokens := i.cfg.NumTokens - len(ringTokens)
722722

723723
level.Info(i.logger).Log("msg", "generating new tokens", "count", needTokens, "ring", i.RingName)
724-
newTokens := i.tg.GenerateTokens(ringDesc, i.ID, i.Zone, needTokens)
724+
newTokens := i.tg.GenerateTokens(ringDesc, i.ID, i.Zone, needTokens, true)
725725

726726
ringTokens = append(ringTokens, newTokens...)
727727
sort.Sort(ringTokens)
@@ -788,7 +788,11 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
788788
return ringDesc, true, nil
789789
}
790790

791-
newTokens := i.tg.GenerateTokens(ringDesc, i.ID, i.Zone, needTokens)
791+
newTokens := i.tg.GenerateTokens(ringDesc, i.ID, i.Zone, needTokens, false)
792+
if len(newTokens) != needTokens {
793+
level.Warn(i.logger).Log("msg", "retrying generate tokens")
794+
return ringDesc, true, errors.New("could not generate tokens")
795+
}
792796

793797
myTokens = append(myTokens, newTokens...)
794798
sort.Sort(myTokens)

pkg/ring/ring_test.go

Lines changed: 179 additions & 179 deletions
Large diffs are not rendered by default.

pkg/ring/token_generator.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ var (
2323
type TokenGenerator interface {
2424
// GenerateTokens make numTokens unique random tokens, none of which clash
2525
// with takenTokens. Generated tokens are sorted.
26-
GenerateTokens(ring *Desc, id, zone string, numTokens int) []uint32
26+
// GenerateTokens can return any number of token between 0 and numTokens if force is set to false.
27+
// If force is set to true, all tokens needs to be generated
28+
GenerateTokens(ring *Desc, id, zone string, numTokens int, force bool) []uint32
2729
}
2830

2931
type RandomTokenGenerator struct{}
@@ -32,7 +34,7 @@ func NewRandomTokenGenerator() TokenGenerator {
3234
return &RandomTokenGenerator{}
3335
}
3436

35-
func (g *RandomTokenGenerator) GenerateTokens(ring *Desc, _, _ string, numTokens int) []uint32 {
37+
func (g *RandomTokenGenerator) GenerateTokens(ring *Desc, _, _ string, numTokens int, _ bool) []uint32 {
3638
if numTokens <= 0 {
3739
return []uint32{}
3840
}
@@ -77,7 +79,7 @@ func NewMinimizeSpreadTokenGenerator() TokenGenerator {
7779
// GenerateTokens try to place nearly generated tokens on the optimal position given the existing ingesters in the ring.
7880
// In order to do so, order all the existing ingester on the ring based on its ownership (by az), and start to create
7981
// new tokens in order to balance out the ownership amongst all ingesters.
80-
func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone string, numTokens int) []uint32 {
82+
func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone string, numTokens int, force bool) []uint32 {
8183
if numTokens <= 0 {
8284
return []uint32{}
8385
}
@@ -100,8 +102,12 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin
100102
if len(instance.Tokens) == 0 {
101103
// If there is more than one instance with no tokens, lets only use
102104
// MinimizeSpread token algorithm on the last one
103-
if instance.RegisteredTimestamp > ring.Ingesters[id].RegisteredTimestamp {
104-
return g.innerGenerator.GenerateTokens(ring, id, zone, numTokens)
105+
if instance.RegisteredTimestamp < ring.Ingesters[id].RegisteredTimestamp {
106+
if force {
107+
return g.innerGenerator.GenerateTokens(ring, id, zone, numTokens, true)
108+
} else {
109+
return make([]uint32, 0)
110+
}
105111
}
106112

107113
continue
@@ -113,7 +119,7 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin
113119

114120
// If we don't have tokens to split, lets create the tokens randomly
115121
if len(zonalTokens) == 0 {
116-
return g.innerGenerator.GenerateTokens(ring, id, zone, numTokens)
122+
return g.innerGenerator.GenerateTokens(ring, id, zone, numTokens, true)
117123
}
118124

119125
// Populate the map tokensPerInstanceWithDistance with the tokens and total distance of each ingester.
@@ -150,7 +156,7 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin
150156
// If we don't have ingesters to take ownership or if the ownership was already completed we should fallback to
151157
// back fill the remaining tokens using the random algorithm
152158
if len(*distancesHeap) == 0 || currentInstance.totalDistance > expectedOwnershipDistance {
153-
r = append(r, g.innerGenerator.GenerateTokens(ring, id, zone, numTokens-len(r))...)
159+
r = append(r, g.innerGenerator.GenerateTokens(ring, id, zone, numTokens-len(r), true)...)
154160
break
155161
}
156162

pkg/ring/token_generator_test.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func TestGenerateTokens(t *testing.T) {
2525

2626
for name, tc := range testCase {
2727
t.Run(name, func(t *testing.T) {
28-
tokens := tc.tg.GenerateTokens(NewDesc(), "", "", 1000000)
28+
tokens := tc.tg.GenerateTokens(NewDesc(), "", "", 1000000, true)
2929

3030
dups := make(map[uint32]int)
3131

@@ -62,7 +62,7 @@ func TestGenerateTokens_IgnoresOldTokens(t *testing.T) {
6262
for i := 0; i < 500; i++ {
6363
id := strconv.Itoa(i)
6464
zone := strconv.Itoa(i % 3)
65-
tokens := tc.tg.GenerateTokens(d, id, zone, 500)
65+
tokens := tc.tg.GenerateTokens(d, id, zone, 500, true)
6666
d.AddIngester(id, id, zone, tokens, ACTIVE, time.Now())
6767
for _, v := range tokens {
6868
if dups[v] {
@@ -98,25 +98,36 @@ func TestMinimizeSpreadTokenGenerator(t *testing.T) {
9898
require.Equal(t, mTokenGenerator.called, len(zones))
9999

100100
mTokenGenerator.called = 0
101-
// Should fallback to random generator when more than 1 ingester does not have tokens
101+
// Should fallback to random generator when more than 1 ingester does not have tokens and force flag is set
102102
rindDesc.AddIngester("pendingIngester-1", "pendingIngester-1", zones[0], []uint32{}, PENDING, time.Now())
103-
rindDesc.AddIngester("pendingIngester-2", "pendingIngester-2", zones[0], []uint32{}, PENDING, time.Now().Add(10*time.Minute))
104-
minimizeTokenGenerator.GenerateTokens(rindDesc, "pendingIngester-1", zones[0], 512)
103+
rindDesc.AddIngester("pendingIngester-2", "pendingIngester-2", zones[0], []uint32{}, PENDING, time.Now().Add(-10*time.Minute))
104+
tokens := minimizeTokenGenerator.GenerateTokens(rindDesc, "pendingIngester-1", zones[0], 512, true)
105+
require.Len(t, tokens, 512)
105106
require.Equal(t, mTokenGenerator.called, 1)
106-
// Should generate if this is the last ingester in the AZ with more than 1 ingester with no tokens
107-
minimizeTokenGenerator.GenerateTokens(rindDesc, "pendingIngester-2", zones[0], 512)
107+
108+
// Should generate for the ingester with with the smaller registered
109+
tokens = minimizeTokenGenerator.GenerateTokens(rindDesc, "pendingIngester-2", zones[0], 512, false)
110+
require.Len(t, tokens, 512)
108111
require.Equal(t, mTokenGenerator.called, 1)
112+
109113
// Should generate tokens on other AZs
110114
rindDesc.AddIngester("pendingIngester-1-az-2", "pendingIngester-1-az-2", zones[0], []uint32{}, PENDING, time.Now())
111-
minimizeTokenGenerator.GenerateTokens(rindDesc, "pendingIngester-1-az-2", zones[1], 512)
115+
tokens = minimizeTokenGenerator.GenerateTokens(rindDesc, "pendingIngester-1-az-2", zones[1], 512, false)
116+
require.Len(t, tokens, 512)
112117
require.Equal(t, mTokenGenerator.called, 1)
113118

119+
// Should generate tokens only for the ingesters with the smaller registered time when multiples
120+
// ingesters does not have tokens
121+
tokens = minimizeTokenGenerator.GenerateTokens(rindDesc, "pendingIngester-1", zones[0], 512, false)
122+
require.Len(t, tokens, 0)
123+
tokens = minimizeTokenGenerator.GenerateTokens(rindDesc, "pendingIngester-2", zones[0], 512, false)
124+
require.Len(t, tokens, 512)
114125
}
115126

116127
func generateTokensForIngesters(t *testing.T, rindDesc *Desc, prefix string, zones []string, minimizeTokenGenerator *MinimizeSpreadTokenGenerator, dups map[uint32]bool) {
117128
for _, zone := range zones {
118129
id := fmt.Sprintf("%v-%v", prefix, zone)
119-
tokens := minimizeTokenGenerator.GenerateTokens(rindDesc, id, zone, 512)
130+
tokens := minimizeTokenGenerator.GenerateTokens(rindDesc, id, zone, 512, true)
120131
for _, token := range tokens {
121132
if dups[token] {
122133
t.Fatal("GenerateTokens returned duplicated tokens")
@@ -136,9 +147,9 @@ type mockedTokenGenerator struct {
136147
RandomTokenGenerator
137148
}
138149

139-
func (m *mockedTokenGenerator) GenerateTokens(d *Desc, id, zone string, numTokens int) []uint32 {
150+
func (m *mockedTokenGenerator) GenerateTokens(d *Desc, id, zone string, numTokens int, force bool) []uint32 {
140151
m.called++
141-
return m.RandomTokenGenerator.GenerateTokens(d, id, zone, numTokens)
152+
return m.RandomTokenGenerator.GenerateTokens(d, id, zone, numTokens, force)
142153
}
143154

144155
func newMockedTokenGenerator(totalZones int) *mockedTokenGenerator {

pkg/ruler/lifecycle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func (r *Ruler) OnRingInstanceRegister(lc *ring.BasicLifecycler, ringDesc ring.D
1313
tokens = instanceDesc.GetTokens()
1414
}
1515

16-
newTokens := lc.GenerateTokens(&ringDesc, instanceID, instanceDesc.Zone, r.cfg.Ring.NumTokens-len(tokens))
16+
newTokens := lc.GenerateTokens(&ringDesc, instanceID, instanceDesc.Zone, r.cfg.Ring.NumTokens-len(tokens), true)
1717

1818
// Tokens sorting will be enforced by the parent caller.
1919
tokens = append(tokens, newTokens...)

pkg/ruler/lifecycle_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
7676
require.NoError(t, ringStore.CAS(ctx, ringKey, func(in interface{}) (interface{}, bool, error) {
7777
ringDesc := ring.GetOrCreateRingDesc(in)
7878

79-
instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", tg.GenerateTokens(ringDesc, unhealthyInstanceID, "", config.Ring.NumTokens), ring.ACTIVE, time.Now())
79+
instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", tg.GenerateTokens(ringDesc, unhealthyInstanceID, "", config.Ring.NumTokens, true), ring.ACTIVE, time.Now())
8080
instance.Timestamp = time.Now().Add(-(ringAutoForgetUnhealthyPeriods + 1) * heartbeatTimeout).Unix()
8181
ringDesc.Ingesters[unhealthyInstanceID] = instance
8282

0 commit comments

Comments
 (0)