diff --git a/CHANGELOG.md b/CHANGELOG.md
index d172a9d877f..23a77891f82 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -24,6 +24,7 @@ instructions below to upgrade your Postgres.
* [CHANGE] Overrides mechanism has been renamed to "runtime config", and is now separate from limits. Runtime config is simply a file that is reloaded by Cortex every couple of seconds. Limits and now also multi KV use this mechanism.
New arguments were introduced: `-runtime-config.file` (defaults to empty) and `-runtime-config.reload-period` (defaults to 10 seconds), which replace previously used `-limits.per-user-override-config` and `-limits.per-user-override-period` options. Old options are still used if `-runtime-config.file` is not specified. This change is also reflected in YAML configuration, where old `limits.per_tenant_override_config` and `limits.per_tenant_override_period` fields are replaced with `runtime_config.file` and `runtime_config.period` respectively. #1749
* [CHANGE] Cortex now rejects data with duplicate labels. Previously, such data was accepted, with duplicate labels removed with only one value left. #1964
* [CHANGE] Changed the default value for `-distributor.ha-tracker.prefix` from `collectors/` to `ha-tracker/` in order to not clash with other keys (ie. ring) stored in the same key-value store. #1940
+* [CHANGE] Removed remaining support for using denormalised tokens in the ring. If you're still running ingesters with denormalised tokens (Cortex 0.4 or earlier, with `-ingester.normalise-tokens=false`), such ingesters will now be completely invisible to distributors and need to be either switched to Cortex 0.6.0 or later, or be configured to use normalised tokens. #2034
* [FEATURE] The distributor can now drop labels from samples (similar to the removal of the replica label for HA ingestion) per user via the `distributor.drop-label` flag. #1726
* [FEATURE] Added flag `debug.mutex-profile-fraction` to enable mutex profiling #1969
* [FEATURE] Added `global` ingestion rate limiter strategy. Deprecated `-distributor.limiter-reload-period` flag. #1766
diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md
index c6d236d448e..5ac54b6e8ca 100644
--- a/docs/configuration/arguments.md
+++ b/docs/configuration/arguments.md
@@ -273,9 +273,9 @@ It also talks to a KVStore and has it's own copies of the same flags used by the
Deprecated. New ingesters always write "normalised" tokens to the ring. Normalised tokens consume less memory to encode and decode; as the ring is unmarshalled regularly, this significantly reduces memory usage of anything that watches the ring.
- Cortex 0.4.0 is the last version that can *write* denormalised tokens. Cortex 0.5.0 and later will always *write* normalised tokens, although it can still *read* denormalised tokens written by older ingesters.
+ Cortex 0.4.0 is the last version that can *write* denormalised tokens. Cortex 0.5.0 and above always write normalised tokens.
- It's perfectly OK to have a mix of ingesters running denormalised (<= 0.4.0) and normalised tokens (either by using `-ingester.normalise-tokens` in Cortex <= 0.4.0, or Cortex 0.5.0+) during upgrades.
+ Cortex 0.6.0 is the last version that can *read* denormalised tokens. Starting with Cortex 0.7.0 only normalised tokens are supported, and ingesters writing denormalised tokens to the ring (running Cortex 0.4.0 or earlier with `-ingester.normalise-tokens=false`) are ignored by distributors. Such ingesters should either switch to using normalised tokens, or be upgraded to Cortex 0.5.0 or later.
- `-ingester.chunk-encoding`
diff --git a/pkg/ring/http.go b/pkg/ring/http.go
index 97b01625a1b..9ca4ce21821 100644
--- a/pkg/ring/http.go
+++ b/pkg/ring/http.go
@@ -118,7 +118,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
sort.Strings(ingesterIDs)
ingesters := []interface{}{}
- tokens, owned := countTokens(r.ringDesc)
+ tokens, owned := countTokens(r.ringDesc, r.ringTokens)
for _, id := range ingesterIDs {
ing := r.ringDesc.Ingesters[id]
timestamp := time.Unix(ing.Timestamp, 0)
diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go
index 3f27560488d..cf41e1c842b 100644
--- a/pkg/ring/lifecycler_test.go
+++ b/pkg/ring/lifecycler_test.go
@@ -43,92 +43,12 @@ func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig {
return lifecyclerConfig
}
-func checkDenormalisedLeaving(d interface{}, id string) bool {
- desc, ok := d.(*Desc)
- return ok &&
- len(desc.Ingesters) == 1 &&
- desc.Ingesters[id].State == LEAVING &&
- len(desc.Ingesters[id].Tokens) == 0 &&
- len(desc.Tokens) == 1
-}
-
func checkNormalised(d interface{}, id string) bool {
desc, ok := d.(*Desc)
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters[id].State == ACTIVE &&
- len(desc.Ingesters[id].Tokens) == 1 &&
- len(desc.Tokens) == 0
-}
-
-func TestRingNormaliseMigration(t *testing.T) {
- var ringConfig Config
- flagext.DefaultValues(&ringConfig)
- ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec())
-
- r, err := New(ringConfig, "ingester", IngesterRingKey)
- require.NoError(t, err)
- defer r.Stop()
-
- // Add an 'ingester' with denormalised tokens.
- lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")
-
- // Since code to insert ingester with denormalised tokens into ring was removed,
- // instead of running lifecycler, we do it manually here.
- token := uint32(0)
- err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
- require.Nil(t, in)
- r := NewDesc()
- tks := GenerateTokens(lifecyclerConfig1.NumTokens, nil)
- r.Ingesters[lifecyclerConfig1.ID] = IngesterDesc{
- Addr: lifecyclerConfig1.Addr,
- Timestamp: time.Now().Unix(),
- State: LEAVING, // expected by second ingester`
- }
- for _, t := range tks {
- r.Tokens = append(r.Tokens, TokenDesc{
- Token: t,
- Ingester: lifecyclerConfig1.ID,
- })
- }
- token = tks[0]
- return r, true, nil
- })
- require.NoError(t, err)
-
- // Check this ingester joined, is active, and has one token.
- test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
- d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
- require.NoError(t, err)
- return checkDenormalisedLeaving(d, "ing1")
- })
-
- // Add a second ingester with normalised tokens.
- var lifecyclerConfig2 = testLifecyclerConfig(ringConfig, "ing2")
- lifecyclerConfig2.JoinAfter = 100 * time.Second
-
- l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester", IngesterRingKey, true)
- require.NoError(t, err)
- l2.Start()
-
- // Since there is nothing that would make l2 to claim tokens from l1 (normally done on transfer)
- // we do it manually.
- require.NoError(t, l2.ClaimTokensFor(context.Background(), "ing1"))
- require.NoError(t, l2.ChangeState(context.Background(), ACTIVE))
-
- // Check the new ingester joined, has the same token, and is active.
- test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
- d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
- require.NoError(t, err)
-
- if desc, ok := d.(*Desc); ok {
- // lifecycler for ingester 1 isn't running, so we need to delete it manually
- // (to make checkNormalised happy)
- delete(desc.Ingesters, lifecyclerConfig1.ID)
- }
- return checkNormalised(d, "ing2") &&
- d.(*Desc).Ingesters["ing2"].Tokens[0] == token
- })
+ len(desc.Ingesters[id].Tokens) == 1
}
func TestLifecycler_HealthyInstancesCount(t *testing.T) {
@@ -381,8 +301,7 @@ func TestTokensOnDisk(t *testing.T) {
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing1"].State == ACTIVE &&
- len(desc.Ingesters["ing1"].Tokens) == 512 &&
- len(desc.Tokens) == 0
+ len(desc.Ingesters["ing1"].Tokens) == 512
})
l1.Shutdown()
@@ -406,8 +325,7 @@ func TestTokensOnDisk(t *testing.T) {
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing2"].State == ACTIVE &&
- len(desc.Ingesters["ing2"].Tokens) == 512 &&
- len(desc.Tokens) == 0
+ len(desc.Ingesters["ing2"].Tokens) == 512
})
// Check for same tokens.
@@ -441,15 +359,8 @@ func TestJoinInLeavingState(t *testing.T) {
State: LEAVING,
Tokens: []uint32{1, 4},
},
- },
- Tokens: []TokenDesc{
- {
- Ingester: "ing2",
- Token: 2,
- },
- {
- Ingester: "ing2",
- Token: 3,
+ "ing2": {
+ Tokens: []uint32{2, 3},
},
},
}
@@ -468,9 +379,9 @@ func TestJoinInLeavingState(t *testing.T) {
require.NoError(t, err)
desc, ok := d.(*Desc)
return ok &&
- len(desc.Ingesters) == 1 &&
+ len(desc.Ingesters) == 2 &&
desc.Ingesters["ing1"].State == ACTIVE &&
len(desc.Ingesters["ing1"].Tokens) == cfg.NumTokens &&
- len(desc.Tokens) == 2
+ len(desc.Ingesters["ing2"].Tokens) == 2
})
}
diff --git a/pkg/ring/merge_test.go b/pkg/ring/merge_test.go
index 480e16331c6..d050deb7993 100644
--- a/pkg/ring/merge_test.go
+++ b/pkg/ring/merge_test.go
@@ -12,22 +12,11 @@ func TestNormalizationAndConflictResolution(t *testing.T) {
first := &Desc{
Ingesters: map[string]IngesterDesc{
- "Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: nil},
- "Ing 2": {Addr: "addr2", Timestamp: 123456, State: LEAVING, Tokens: []uint32{100, 5, 5, 100, 100, 200}},
- "Ing 3": {Addr: "addr3", Timestamp: now, State: LEFT, Tokens: []uint32{100, 200, 300}},
- "Ing 4": {Addr: "addr4", Timestamp: now, State: LEAVING, Tokens: []uint32{30, 40, 50}},
- },
-
- Tokens: []TokenDesc{
- {Token: 50, Ingester: "Ing 1"},
- {Token: 40, Ingester: "Ing 1"},
- {Token: 40, Ingester: "Ing 1"}, // dup
- {Token: 30, Ingester: "Ing 1"},
- {Token: 20, Ingester: "Ing 2"},
- {Token: 10, Ingester: "Ing 2"},
- {Token: 100, Ingester: "Ing 3"}, // LEFT, will be ignored
- {Token: 200, Ingester: "Ing 3"},
- {Token: 100, Ingester: "Unknown"},
+ "Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{50, 40, 40, 30}},
+ "Ing 2": {Addr: "addr2", Timestamp: 123456, State: LEAVING, Tokens: []uint32{100, 5, 5, 100, 100, 200, 20, 10}},
+ "Ing 3": {Addr: "addr3", Timestamp: now, State: LEFT, Tokens: []uint32{100, 200, 300}},
+ "Ing 4": {Addr: "addr4", Timestamp: now, State: LEAVING, Tokens: []uint32{30, 40, 50}},
+ "Unknown": {Tokens: []uint32{100}},
},
}
@@ -35,12 +24,9 @@ func TestNormalizationAndConflictResolution(t *testing.T) {
Ingesters: map[string]IngesterDesc{
"Unknown": {
Timestamp: now + 10,
+ Tokens: []uint32{1000, 2000},
},
},
- Tokens: []TokenDesc{
- {Token: 1000, Ingester: "Unknown"},
- {Token: 2000, Ingester: "Unknown"},
- },
}
change, err := first.Merge(second, false)
@@ -60,19 +46,6 @@ func TestNormalizationAndConflictResolution(t *testing.T) {
"Ing 4": {Addr: "addr4", Timestamp: now, State: LEAVING},
"Unknown": {Timestamp: now + 10, Tokens: []uint32{1000, 2000}},
},
- // // Since the ring wasn't normalized before the merge, it will be denormalized after the merge
- // Tokens: []TokenDesc{
- // {5, "Ing 2"},
- // {10, "Ing 2"},
- // {20, "Ing 2"},
- // {30, "Ing 1"},
- // {40, "Ing 1"},
- // {50, "Ing 1"},
- // {100, "Ing 2"},
- // {200, "Ing 2"},
- // {1000, "Unknown"},
- // {2000, "Unknown"},
- // },
}, first)
assert.Equal(t, &Desc{
@@ -80,7 +53,6 @@ func TestNormalizationAndConflictResolution(t *testing.T) {
Ingesters: map[string]IngesterDesc{
"Unknown": {Timestamp: now + 10, Tokens: []uint32{1000, 2000}},
},
- Tokens: nil,
}, changeRing)
}
@@ -107,7 +79,6 @@ func TestMerge(t *testing.T) {
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
"Ing 2": {Addr: "addr2", Timestamp: now, State: JOINING, Tokens: []uint32{5, 10, 20, 100, 200}},
},
- Tokens: nil,
}
}
@@ -117,7 +88,6 @@ func TestMerge(t *testing.T) {
"Ing 3": {Addr: "addr3", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{150, 250, 350}},
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
},
- Tokens: nil,
}
}
@@ -127,7 +97,6 @@ func TestMerge(t *testing.T) {
"Ing 1": {Addr: "addr1", Timestamp: now + 10, State: LEAVING, Tokens: []uint32{30, 40, 50}},
"Ing 3": {Addr: "addr3", Timestamp: now + 10, State: ACTIVE, Tokens: []uint32{150, 250, 350}},
},
- Tokens: nil,
}
}
@@ -138,7 +107,6 @@ func TestMerge(t *testing.T) {
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{150, 250, 350}},
},
- Tokens: nil,
}
}
@@ -149,7 +117,6 @@ func TestMerge(t *testing.T) {
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now + 10, State: ACTIVE, Tokens: []uint32{150, 250, 350}},
},
- Tokens: nil,
}
}
@@ -158,7 +125,6 @@ func TestMerge(t *testing.T) {
Ingesters: map[string]IngesterDesc{
"Ing 1": {Addr: "addr1", Timestamp: now + 10, State: LEFT, Tokens: []uint32{30, 40, 50}},
},
- Tokens: nil,
}
}
@@ -169,7 +135,6 @@ func TestMerge(t *testing.T) {
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now + 10, State: ACTIVE, Tokens: []uint32{150, 250, 350}},
},
- Tokens: nil,
}
}
@@ -193,7 +158,6 @@ func TestMerge(t *testing.T) {
Ingesters: map[string]IngesterDesc{
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
},
- Tokens: nil,
}, ch)
}
@@ -215,7 +179,6 @@ func TestMerge(t *testing.T) {
Ingesters: map[string]IngesterDesc{
"Ing 1": {Addr: "addr1", Timestamp: now + 10, State: LEFT, Tokens: nil},
},
- Tokens: nil,
}, ch)
}
}
@@ -229,7 +192,6 @@ func TestTokensTakeover(t *testing.T) {
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
"Ing 2": {Addr: "addr2", Timestamp: now, State: JOINING, Tokens: []uint32{5, 10, 20}}, // partially migrated from Ing 3
},
- Tokens: nil,
}
}
@@ -239,7 +201,6 @@ func TestTokensTakeover(t *testing.T) {
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20}},
"Ing 3": {Addr: "addr3", Timestamp: now + 5, State: LEAVING, Tokens: []uint32{5, 10, 20, 100, 200}},
},
- Tokens: nil,
}
}
@@ -250,7 +211,6 @@ func TestTokensTakeover(t *testing.T) {
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20}},
"Ing 3": {Addr: "addr3", Timestamp: now + 5, State: LEAVING, Tokens: []uint32{100, 200}},
},
- Tokens: nil,
}
}
@@ -293,7 +253,6 @@ func TestMergeLeft(t *testing.T) {
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
"Ing 2": {Addr: "addr2", Timestamp: now, State: JOINING, Tokens: []uint32{5, 10, 20, 100, 200}},
},
- Tokens: nil,
}
}
@@ -302,7 +261,6 @@ func TestMergeLeft(t *testing.T) {
Ingesters: map[string]IngesterDesc{
"Ing 2": {Addr: "addr2", Timestamp: now, State: LEFT, Tokens: []uint32{5, 10, 20, 100, 200}},
},
- Tokens: nil,
}
}
@@ -312,7 +270,6 @@ func TestMergeLeft(t *testing.T) {
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
"Ing 2": {Addr: "addr2", Timestamp: now, State: LEFT},
},
- Tokens: nil,
}
}
@@ -322,7 +279,6 @@ func TestMergeLeft(t *testing.T) {
"Ing 1": {Addr: "addr1", Timestamp: now + 10, State: LEAVING, Tokens: []uint32{30, 40, 50}},
"Ing 2": {Addr: "addr2", Timestamp: now, State: JOINING, Tokens: []uint32{5, 10, 20, 100, 200}}, // from firstRing
},
- Tokens: nil,
}
}
@@ -332,7 +288,6 @@ func TestMergeLeft(t *testing.T) {
"Ing 1": {Addr: "addr1", Timestamp: now + 10, State: LEAVING, Tokens: []uint32{30, 40, 50}},
"Ing 2": {Addr: "addr2", Timestamp: now, State: LEFT},
},
- Tokens: nil,
}
}
@@ -343,7 +298,6 @@ func TestMergeLeft(t *testing.T) {
Ingesters: map[string]IngesterDesc{
"Ing 2": {Addr: "addr2", Timestamp: now, State: LEFT},
},
- Tokens: nil,
}, ch)
}
@@ -361,7 +315,6 @@ func TestMergeLeft(t *testing.T) {
Ingesters: map[string]IngesterDesc{
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
},
- Tokens: nil,
}, ch)
}
@@ -386,7 +339,6 @@ func TestMergeRemoveMissing(t *testing.T) {
"Ing 2": {Addr: "addr2", Timestamp: now, State: JOINING, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now, State: LEAVING, Tokens: []uint32{5, 10, 20, 100, 200}},
},
- Tokens: nil,
}
}
@@ -396,7 +348,6 @@ func TestMergeRemoveMissing(t *testing.T) {
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
},
- Tokens: nil,
}
}
@@ -407,7 +358,6 @@ func TestMergeRemoveMissing(t *testing.T) {
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now, State: LEFT},
},
- Tokens: nil,
}
}
@@ -419,7 +369,6 @@ func TestMergeRemoveMissing(t *testing.T) {
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now, State: LEFT},
},
- Tokens: nil,
}, ch) // entire second ring is new
}
@@ -437,14 +386,12 @@ func TestMergeRemoveMissing(t *testing.T) {
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now, State: LEAVING},
},
- Tokens: nil,
}, our)
assert.Equal(t, &Desc{
Ingesters: map[string]IngesterDesc{
"Ing 3": {Addr: "addr3", Timestamp: now, State: LEAVING},
},
- Tokens: nil,
}, ch)
}
}
@@ -459,7 +406,6 @@ func TestMergeMissingIntoLeft(t *testing.T) {
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now, State: LEFT},
},
- Tokens: nil,
}
}
@@ -469,7 +415,6 @@ func TestMergeMissingIntoLeft(t *testing.T) {
"Ing 1": {Addr: "addr1", Timestamp: now + 10, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
"Ing 2": {Addr: "addr2", Timestamp: now + 10, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
},
- Tokens: nil,
}
}
@@ -481,7 +426,6 @@ func TestMergeMissingIntoLeft(t *testing.T) {
"Ing 2": {Addr: "addr2", Timestamp: now + 10, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now, State: LEFT},
},
- Tokens: nil,
}, our)
assert.Equal(t, &Desc{
@@ -490,7 +434,6 @@ func TestMergeMissingIntoLeft(t *testing.T) {
"Ing 2": {Addr: "addr2", Timestamp: now + 10, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
// Ing 3 is not changed, it was already LEFT
},
- Tokens: nil,
}, ch)
}
}
diff --git a/pkg/ring/model.go b/pkg/ring/model.go
index 7b36a682585..77a025513e3 100644
--- a/pkg/ring/model.go
+++ b/pkg/ring/model.go
@@ -49,69 +49,31 @@ func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState
Tokens: tokens,
}
- // Since this ingester is only using normalised tokens, let's delete any denormalised
- // tokens for this ingester. There may be such tokens eg. if previous instance
- // of the same ingester was running with denormalized tokens.
- for ix := 0; ix < len(d.Tokens); {
- if d.Tokens[ix].Ingester == id {
- d.Tokens = append(d.Tokens[:ix], d.Tokens[ix+1:]...)
- } else {
- ix++
- }
- }
-
d.Ingesters[id] = ingester
}
// RemoveIngester removes the given ingester and all its tokens.
func (d *Desc) RemoveIngester(id string) {
delete(d.Ingesters, id)
- output := []TokenDesc{}
- for i := 0; i < len(d.Tokens); i++ {
- if d.Tokens[i].Ingester != id {
- output = append(output, d.Tokens[i])
- }
- }
- d.Tokens = output
}
// ClaimTokens transfers all the tokens from one ingester to another,
// returning the claimed token.
-// This method assumes that Ring is in the correct state, 'from' ingester has no tokens anywhere,
-// and 'to' ingester uses either normalised or non-normalised tokens, but not both. Tokens list must
-// be sorted properly. If all of this is true, everything will be fine.
+// This method assumes that Ring is in the correct state, 'to' ingester has no tokens anywhere.
+// Tokens list must be sorted properly. If all of this is true, everything will be fine.
func (d *Desc) ClaimTokens(from, to string) Tokens {
var result Tokens
- // If the ingester we are claiming from is normalising, get its tokens then erase them from the ring.
if fromDesc, found := d.Ingesters[from]; found {
result = fromDesc.Tokens
fromDesc.Tokens = nil
d.Ingesters[from] = fromDesc
}
- // If we are storing the tokens in a normalise form, we need to deal with
- // the migration from denormalised by removing the tokens from the tokens
- // list.
- // When all ingesters are in normalised mode, d.Tokens is empty here
- for i := 0; i < len(d.Tokens); {
- if d.Tokens[i].Ingester == from {
- result = append(result, d.Tokens[i].Token)
- d.Tokens = append(d.Tokens[:i], d.Tokens[i+1:]...)
- continue
- }
- i++
- }
-
ing := d.Ingesters[to]
ing.Tokens = result
d.Ingesters[to] = ing
- // not necessary, but makes testing simpler
- if len(d.Tokens) == 0 {
- d.Tokens = nil
- }
-
return result
}
@@ -128,7 +90,7 @@ func (d *Desc) FindIngestersByState(state IngesterState) []IngesterDesc {
// Ready returns no error when all ingesters are active and healthy.
func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error {
- numTokens := len(d.Tokens)
+ numTokens := 0
for id, ingester := range d.Ingesters {
if now.Sub(time.Unix(ingester.Timestamp, 0)) > heartbeatTimeout {
return fmt.Errorf("ingester %s past heartbeat timeout", id)
@@ -147,7 +109,7 @@ func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error {
// TokensFor partitions the tokens into those for the given ID, and those for others.
func (d *Desc) TokensFor(id string) (tokens, other Tokens) {
takenTokens, myTokens := Tokens{}, Tokens{}
- for _, token := range migrateRing(d) {
+ for _, token := range d.getTokens() {
takenTokens = append(takenTokens, token.Token)
if token.Ingester == id {
myTokens = append(myTokens, token.Token)
@@ -254,9 +216,7 @@ func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.
out.Ingesters[u] = ing
}
- // Keep ring normalized.
d.Ingesters = thisIngesterMap
- d.Tokens = nil
return out, nil
}
@@ -272,7 +232,6 @@ func (d *Desc) MergeContent() []string {
}
// buildNormalizedIngestersMap will do the following:
-// - moves all tokens from r.Tokens into individual ingesters
// - sorts tokens and removes duplicates (only within single ingester)
// - it doesn't modify input ring
func buildNormalizedIngestersMap(inputRing *Desc) map[string]IngesterDesc {
@@ -286,20 +245,9 @@ func buildNormalizedIngestersMap(inputRing *Desc) map[string]IngesterDesc {
out[n] = ing
}
- for _, t := range inputRing.Tokens {
- // if ingester doesn't exist, we will add empty one (with tokens only)
- ing := out[t.Ingester]
-
- // don't add tokens to the LEFT ingesters. We skip such tokens.
- if ing.State != LEFT {
- ing.Tokens = append(ing.Tokens, t.Token)
- out[t.Ingester] = ing
- }
- }
-
// Sort tokens, and remove duplicates
for name, ing := range out {
- if ing.Tokens == nil {
+ if len(ing.Tokens) == 0 {
continue
}
@@ -307,17 +255,16 @@ func buildNormalizedIngestersMap(inputRing *Desc) map[string]IngesterDesc {
sort.Sort(Tokens(ing.Tokens))
}
- seen := make(map[uint32]bool)
-
- n := 0
- for _, v := range ing.Tokens {
- if !seen[v] {
- seen[v] = true
- ing.Tokens[n] = v
- n++
+ // tokens are sorted now, we can easily remove duplicates.
+ prev := ing.Tokens[0]
+ for ix := 1; ix < len(ing.Tokens); {
+ if ing.Tokens[ix] == prev {
+ ing.Tokens = append(ing.Tokens[:ix], ing.Tokens[ix+1:]...)
+ } else {
+ prev = ing.Tokens[ix]
+ ix++
}
}
- ing.Tokens = ing.Tokens[:n]
// write updated value back to map
out[name] = ing
@@ -426,3 +373,25 @@ func (d *Desc) RemoveTombstones(limit time.Time) {
}
}
}
+
+type TokenDesc struct {
+ Token uint32
+ Ingester string
+}
+
+// Returns sorted list of tokens with ingester names.
+func (d *Desc) getTokens() []TokenDesc {
+ numTokens := 0
+ for _, ing := range d.Ingesters {
+ numTokens += len(ing.Tokens)
+ }
+ tokens := make([]TokenDesc, 0, numTokens)
+ for key, ing := range d.Ingesters {
+ for _, token := range ing.Tokens {
+ tokens = append(tokens, TokenDesc{Token: token, Ingester: key})
+ }
+ }
+
+ sort.Sort(ByToken(tokens))
+ return tokens
+}
diff --git a/pkg/ring/model_test.go b/pkg/ring/model_test.go
index 7ad1ef8a3ea..80588fc3318 100644
--- a/pkg/ring/model_test.go
+++ b/pkg/ring/model_test.go
@@ -72,25 +72,12 @@ func normalizedSource() *Desc {
return r
}
-func unnormalizedSource() *Desc {
- r := NewDesc()
- r.Ingesters["first"] = IngesterDesc{}
- r.Ingesters["second"] = IngesterDesc{}
- r.Tokens = []TokenDesc{
- {Token: 100, Ingester: "first"},
- {Token: 200, Ingester: "first"},
- {Token: 300, Ingester: "first"},
- }
- return r
-}
-
func normalizedOutput() *Desc {
return &Desc{
Ingesters: map[string]IngesterDesc{
"first": {},
"second": {Tokens: []uint32{100, 200, 300}},
},
- Tokens: nil,
}
}
@@ -102,15 +89,6 @@ func TestClaimTokensFromNormalizedToNormalized(t *testing.T) {
assert.Equal(t, normalizedOutput(), r)
}
-func TestClaimTokensFromUnnormalizedToNormalized(t *testing.T) {
- r := unnormalizedSource()
-
- result := r.ClaimTokens("first", "second")
-
- assert.Equal(t, Tokens{100, 200, 300}, result)
- assert.Equal(t, normalizedOutput(), r)
-}
-
func TestReady(t *testing.T) {
now := time.Now()
@@ -145,8 +123,9 @@ func TestReady(t *testing.T) {
t.Fatal("expected !ready (no tokens), but got no error")
}
- r.Tokens = []TokenDesc{
- {Token: 12345, Ingester: "some ingester"},
+ r.Ingesters["some ingester"] = IngesterDesc{
+ Tokens: []uint32{12345},
+ Timestamp: now.Unix(),
}
if err := r.Ready(now, 10*time.Second); err != nil {
diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go
index c6a37ecc6e8..578a82cbb96 100644
--- a/pkg/ring/ring.go
+++ b/pkg/ring/ring.go
@@ -94,8 +94,9 @@ type Ring struct {
done chan struct{}
quit context.CancelFunc
- mtx sync.RWMutex
- ringDesc *Desc
+ mtx sync.RWMutex
+ ringDesc *Desc
+ ringTokens []TokenDesc
memberOwnershipDesc *prometheus.Desc
numMembersDesc *prometheus.Desc
@@ -169,41 +170,23 @@ func (r *Ring) loop(ctx context.Context) {
}
ringDesc := value.(*Desc)
- ringDesc.Tokens = migrateRing(ringDesc)
+ ringTokens := ringDesc.getTokens()
+
r.mtx.Lock()
defer r.mtx.Unlock()
r.ringDesc = ringDesc
+ r.ringTokens = ringTokens
return true
})
r.KVClient.Stop()
}
-// migrateRing will denormalise the ring's tokens if stored in normal form.
-func migrateRing(desc *Desc) []TokenDesc {
- numTokens := len(desc.Tokens)
- for _, ing := range desc.Ingesters {
- numTokens += len(ing.Tokens)
- }
- tokens := make([]TokenDesc, len(desc.Tokens), numTokens)
- copy(tokens, desc.Tokens)
- for key, ing := range desc.Ingesters {
- for _, token := range ing.Tokens {
- tokens = append(tokens, TokenDesc{
- Token: token,
- Ingester: key,
- })
- }
- }
- sort.Sort(ByToken(tokens))
- return tokens
-}
-
// Get returns n (or more) ingesters which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
- if r.ringDesc == nil || len(r.ringDesc.Tokens) == 0 {
+ if r.ringDesc == nil || len(r.ringTokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
}
@@ -214,13 +197,13 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
start = r.search(key)
iterations = 0
)
- for i := start; len(distinctHosts) < n && iterations < len(r.ringDesc.Tokens); i++ {
+ for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ {
iterations++
// Wrap i around in the ring.
- i %= len(r.ringDesc.Tokens)
+ i %= len(r.ringTokens)
// We want n *distinct* ingesters.
- token := r.ringDesc.Tokens[i]
+ token := r.ringTokens[i]
if _, ok := distinctHosts[token.Ingester]; ok {
continue
}
@@ -258,7 +241,7 @@ func (r *Ring) GetAll() (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
- if r.ringDesc == nil || len(r.ringDesc.Tokens) == 0 {
+ if r.ringDesc == nil || len(r.ringTokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
}
@@ -284,10 +267,10 @@ func (r *Ring) GetAll() (ReplicationSet, error) {
}
func (r *Ring) search(key uint32) int {
- i := sort.Search(len(r.ringDesc.Tokens), func(x int) bool {
- return r.ringDesc.Tokens[x].Token > key
+ i := sort.Search(len(r.ringTokens), func(x int) bool {
+ return r.ringTokens[x].Token > key
})
- if i >= len(r.ringDesc.Tokens) {
+ if i >= len(r.ringTokens) {
i = 0
}
return i
@@ -301,9 +284,7 @@ func (r *Ring) Describe(ch chan<- *prometheus.Desc) {
ch <- r.numTokensDesc
}
-func countTokens(ringDesc *Desc) (map[string]uint32, map[string]uint32) {
- tokens := ringDesc.Tokens
-
+func countTokens(ringDesc *Desc, tokens []TokenDesc) (map[string]uint32, map[string]uint32) {
owned := map[string]uint32{}
numTokens := map[string]uint32{}
for i, token := range tokens {
@@ -332,7 +313,7 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) {
r.mtx.RLock()
defer r.mtx.RUnlock()
- numTokens, ownedRange := countTokens(r.ringDesc)
+ numTokens, ownedRange := countTokens(r.ringDesc, r.ringTokens)
for id, totalOwned := range ownedRange {
ch <- prometheus.MustNewConstMetric(
r.memberOwnershipDesc,
@@ -392,7 +373,7 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
r.totalTokensDesc,
prometheus.GaugeValue,
- float64(len(r.ringDesc.Tokens)),
+ float64(len(r.ringTokens)),
r.name,
)
}
diff --git a/pkg/ring/ring.pb.go b/pkg/ring/ring.pb.go
index 0554fc2b99d..72e40d40e02 100644
--- a/pkg/ring/ring.pb.go
+++ b/pkg/ring/ring.pb.go
@@ -60,7 +60,6 @@ func (IngesterState) EnumDescriptor() ([]byte, []int) {
type Desc struct {
Ingesters map[string]IngesterDesc `protobuf:"bytes,1,rep,name=ingesters,proto3" json:"ingesters" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
- Tokens []TokenDesc `protobuf:"bytes,2,rep,name=tokens,proto3" json:"tokens"`
}
func (m *Desc) Reset() { *m = Desc{} }
@@ -102,13 +101,6 @@ func (m *Desc) GetIngesters() map[string]IngesterDesc {
return nil
}
-func (m *Desc) GetTokens() []TokenDesc {
- if m != nil {
- return m.Tokens
- }
- return nil
-}
-
type IngesterDesc struct {
Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"`
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
@@ -176,96 +168,42 @@ func (m *IngesterDesc) GetTokens() []uint32 {
return nil
}
-type TokenDesc struct {
- Token uint32 `protobuf:"varint,1,opt,name=token,proto3" json:"token,omitempty"`
- Ingester string `protobuf:"bytes,2,opt,name=ingester,proto3" json:"ingester,omitempty"`
-}
-
-func (m *TokenDesc) Reset() { *m = TokenDesc{} }
-func (*TokenDesc) ProtoMessage() {}
-func (*TokenDesc) Descriptor() ([]byte, []int) {
- return fileDescriptor_26381ed67e202a6e, []int{2}
-}
-func (m *TokenDesc) XXX_Unmarshal(b []byte) error {
- return m.Unmarshal(b)
-}
-func (m *TokenDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
- if deterministic {
- return xxx_messageInfo_TokenDesc.Marshal(b, m, deterministic)
- } else {
- b = b[:cap(b)]
- n, err := m.MarshalTo(b)
- if err != nil {
- return nil, err
- }
- return b[:n], nil
- }
-}
-func (m *TokenDesc) XXX_Merge(src proto.Message) {
- xxx_messageInfo_TokenDesc.Merge(m, src)
-}
-func (m *TokenDesc) XXX_Size() int {
- return m.Size()
-}
-func (m *TokenDesc) XXX_DiscardUnknown() {
- xxx_messageInfo_TokenDesc.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_TokenDesc proto.InternalMessageInfo
-
-func (m *TokenDesc) GetToken() uint32 {
- if m != nil {
- return m.Token
- }
- return 0
-}
-
-func (m *TokenDesc) GetIngester() string {
- if m != nil {
- return m.Ingester
- }
- return ""
-}
-
func init() {
proto.RegisterEnum("ring.IngesterState", IngesterState_name, IngesterState_value)
proto.RegisterType((*Desc)(nil), "ring.Desc")
proto.RegisterMapType((map[string]IngesterDesc)(nil), "ring.Desc.IngestersEntry")
proto.RegisterType((*IngesterDesc)(nil), "ring.IngesterDesc")
- proto.RegisterType((*TokenDesc)(nil), "ring.TokenDesc")
}
func init() { proto.RegisterFile("ring.proto", fileDescriptor_26381ed67e202a6e) }
var fileDescriptor_26381ed67e202a6e = []byte{
- // 426 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x92, 0xcd, 0x6e, 0xd3, 0x40,
- 0x10, 0xc7, 0x77, 0xe2, 0xb5, 0x89, 0x27, 0xa4, 0x58, 0x03, 0x42, 0x26, 0x42, 0x8b, 0x95, 0x93,
- 0x41, 0x6a, 0x2a, 0x05, 0x0e, 0x08, 0xa9, 0x87, 0x96, 0x1a, 0x94, 0x28, 0x0a, 0x95, 0x89, 0x7a,
- 0x4f, 0xda, 0xc5, 0x44, 0x25, 0x71, 0x65, 0x6f, 0x90, 0x7a, 0xe3, 0x0d, 0xe0, 0x31, 0x78, 0x12,
- 0xd4, 0x63, 0x8e, 0x3d, 0x21, 0xe2, 0x5c, 0x38, 0xf6, 0x11, 0xd0, 0xae, 0xf3, 0x41, 0x6e, 0xf3,
- 0xdb, 0xff, 0xc7, 0xee, 0x58, 0x46, 0xcc, 0xc6, 0xd3, 0xa4, 0x75, 0x95, 0xa5, 0x2a, 0x25, 0xae,
- 0xe7, 0xc6, 0x7e, 0x32, 0x56, 0x9f, 0x67, 0xa3, 0xd6, 0x79, 0x3a, 0x39, 0x48, 0xd2, 0x24, 0x3d,
- 0x30, 0xe2, 0x68, 0xf6, 0xc9, 0x90, 0x01, 0x33, 0x95, 0xa1, 0xe6, 0x2f, 0x40, 0x7e, 0x22, 0xf3,
- 0x73, 0x3a, 0x44, 0x77, 0x3c, 0x4d, 0x64, 0xae, 0x64, 0x96, 0xfb, 0x10, 0x58, 0x61, 0xad, 0xfd,
- 0xa4, 0x65, 0xda, 0xb5, 0xdc, 0xea, 0xac, 0xb5, 0x68, 0xaa, 0xb2, 0xeb, 0x63, 0x7e, 0xf3, 0xfb,
- 0x19, 0x8b, 0xb7, 0x09, 0xda, 0x47, 0x47, 0xa5, 0x97, 0x72, 0x9a, 0xfb, 0x15, 0x93, 0x7d, 0x50,
- 0x66, 0x07, 0xfa, 0x4c, 0x17, 0xac, 0x12, 0x2b, 0x53, 0xe3, 0x14, 0xf7, 0x76, 0x1b, 0xc9, 0x43,
- 0xeb, 0x52, 0x5e, 0xfb, 0x10, 0x40, 0xe8, 0xc6, 0x7a, 0xa4, 0x10, 0xed, 0xaf, 0xc3, 0x2f, 0x33,
- 0xe9, 0x57, 0x02, 0x08, 0x6b, 0x6d, 0x2a, 0x1b, 0xd7, 0x31, 0x5d, 0x1a, 0x97, 0x86, 0x37, 0x95,
- 0xd7, 0xd0, 0xfc, 0x0e, 0x78, 0xff, 0x7f, 0x8d, 0x08, 0xf9, 0xf0, 0xe2, 0x22, 0x5b, 0x35, 0x9a,
- 0x99, 0x9e, 0xa2, 0xab, 0xc6, 0x13, 0x99, 0xab, 0xe1, 0xe4, 0xca, 0xd4, 0x5a, 0xf1, 0xf6, 0x80,
- 0x9e, 0xa3, 0x9d, 0xab, 0xa1, 0x92, 0xbe, 0x15, 0x40, 0xb8, 0xd7, 0x7e, 0xb8, 0x7b, 0xe1, 0x47,
- 0x2d, 0xc5, 0xa5, 0x83, 0x1e, 0x6f, 0xd6, 0x75, 0x02, 0x2b, 0xac, 0xaf, 0xf7, 0xea, 0xf2, 0x2a,
- 0xf7, 0xec, 0x2e, 0xaf, 0xda, 0x9e, 0xd3, 0x3c, 0x44, 0x77, 0xb3, 0x3e, 0x3d, 0x42, 0xdb, 0x58,
- 0xcc, 0x73, 0xea, 0x71, 0x09, 0xd4, 0xc0, 0xea, 0xfa, 0x13, 0x9a, 0xe7, 0xb8, 0xf1, 0x86, 0x5f,
- 0xf4, 0xb0, 0xbe, 0x73, 0x35, 0x21, 0x3a, 0x47, 0x6f, 0x07, 0x9d, 0xb3, 0xc8, 0x63, 0x54, 0xc3,
- 0x7b, 0xbd, 0xe8, 0xe8, 0xac, 0xd3, 0x7f, 0xef, 0x81, 0x86, 0xd3, 0xa8, 0x7f, 0xa2, 0xa1, 0xa2,
- 0xa1, 0xfb, 0xa1, 0xd3, 0xd7, 0x60, 0x51, 0x15, 0x79, 0x2f, 0x7a, 0x37, 0xf0, 0xf8, 0xf1, 0xab,
- 0xf9, 0x42, 0xb0, 0xdb, 0x85, 0x60, 0x77, 0x0b, 0x01, 0xdf, 0x0a, 0x01, 0x3f, 0x0b, 0x01, 0x37,
- 0x85, 0x80, 0x79, 0x21, 0xe0, 0x4f, 0x21, 0xe0, 0x6f, 0x21, 0xd8, 0x5d, 0x21, 0xe0, 0xc7, 0x52,
- 0xb0, 0xf9, 0x52, 0xb0, 0xdb, 0xa5, 0x60, 0x23, 0xc7, 0xfc, 0x24, 0x2f, 0xff, 0x05, 0x00, 0x00,
- 0xff, 0xff, 0x3b, 0x76, 0x95, 0xe8, 0x67, 0x02, 0x00, 0x00,
+ // 387 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x91, 0x3f, 0x6f, 0xd3, 0x40,
+ 0x18, 0xc6, 0xef, 0xb5, 0xcf, 0xc6, 0x79, 0x43, 0x2b, 0xeb, 0x90, 0x90, 0xa9, 0xd0, 0x61, 0x75,
+ 0x32, 0x48, 0xb8, 0x52, 0x60, 0x40, 0x48, 0x0c, 0x2d, 0x35, 0xc8, 0x56, 0x14, 0x2a, 0x53, 0x75,
+ 0x77, 0xda, 0xc3, 0x58, 0x25, 0x76, 0x65, 0x5f, 0x90, 0xba, 0xf1, 0x0d, 0xe0, 0x0b, 0xb0, 0xf3,
+ 0x51, 0x3a, 0x66, 0xcc, 0x84, 0x88, 0xb3, 0x30, 0xe6, 0x23, 0xa0, 0x3b, 0x27, 0x0a, 0xd9, 0x9e,
+ 0xdf, 0x3d, 0x7f, 0xde, 0xe1, 0x10, 0xeb, 0xa2, 0xcc, 0xc3, 0x9b, 0xba, 0x92, 0x15, 0xa3, 0x4a,
+ 0x1f, 0x3c, 0xcf, 0x0b, 0xf9, 0x79, 0x3a, 0x0e, 0x2f, 0xab, 0xc9, 0x51, 0x5e, 0xe5, 0xd5, 0x91,
+ 0x36, 0xc7, 0xd3, 0x4f, 0x9a, 0x34, 0x68, 0xd5, 0x95, 0x0e, 0x7f, 0x02, 0xd2, 0x53, 0xd1, 0x5c,
+ 0xb2, 0x37, 0xd8, 0x2b, 0xca, 0x5c, 0x34, 0x52, 0xd4, 0x8d, 0x07, 0xbe, 0x19, 0xf4, 0x07, 0x8f,
+ 0x42, 0xbd, 0xae, 0xec, 0x30, 0xde, 0x78, 0x51, 0x29, 0xeb, 0xdb, 0x13, 0x7a, 0xf7, 0xfb, 0x09,
+ 0x49, 0xb7, 0x8d, 0x83, 0x33, 0xdc, 0xdf, 0x8d, 0x30, 0x17, 0xcd, 0x6b, 0x71, 0xeb, 0x81, 0x0f,
+ 0x41, 0x2f, 0x55, 0x92, 0x05, 0x68, 0x7d, 0xcd, 0xbe, 0x4c, 0x85, 0x67, 0xf8, 0x10, 0xf4, 0x07,
+ 0xac, 0x9b, 0xdf, 0xd4, 0xd4, 0x99, 0xb4, 0x0b, 0xbc, 0x36, 0x5e, 0x41, 0x42, 0x1d, 0xc3, 0x35,
+ 0x0f, 0xbf, 0x03, 0xde, 0xff, 0x3f, 0xc1, 0x18, 0xd2, 0xec, 0xea, 0xaa, 0x5e, 0xef, 0x6a, 0xcd,
+ 0x1e, 0x63, 0x4f, 0x16, 0x13, 0xd1, 0xc8, 0x6c, 0x72, 0xa3, 0xc7, 0xcd, 0x74, 0xfb, 0xc0, 0x9e,
+ 0xa2, 0xd5, 0xc8, 0x4c, 0x0a, 0xcf, 0xf4, 0x21, 0xd8, 0x1f, 0x3c, 0xd8, 0x3d, 0xfb, 0x51, 0x59,
+ 0x69, 0x97, 0x60, 0x0f, 0xd1, 0x96, 0xd5, 0xb5, 0x28, 0x1b, 0xcf, 0xf6, 0xcd, 0x60, 0x2f, 0x5d,
+ 0x53, 0x42, 0x1d, 0xea, 0x5a, 0x09, 0x75, 0x2c, 0xd7, 0x7e, 0x36, 0xc4, 0xbd, 0x9d, 0x2e, 0x43,
+ 0xb4, 0x8f, 0xdf, 0x9e, 0xc7, 0x17, 0x91, 0x4b, 0x58, 0x1f, 0xef, 0x0d, 0xa3, 0xe3, 0x8b, 0x78,
+ 0xf4, 0xde, 0x05, 0x05, 0x67, 0xd1, 0xe8, 0x54, 0x81, 0xa1, 0x20, 0xf9, 0x10, 0x8f, 0x14, 0x98,
+ 0xcc, 0x41, 0x3a, 0x8c, 0xde, 0x9d, 0xbb, 0xf4, 0xe4, 0xe5, 0x6c, 0xc1, 0xc9, 0x7c, 0xc1, 0xc9,
+ 0x6a, 0xc1, 0xe1, 0x5b, 0xcb, 0xe1, 0x57, 0xcb, 0xe1, 0xae, 0xe5, 0x30, 0x6b, 0x39, 0xfc, 0x69,
+ 0x39, 0xfc, 0x6d, 0x39, 0x59, 0xb5, 0x1c, 0x7e, 0x2c, 0x39, 0x99, 0x2d, 0x39, 0x99, 0x2f, 0x39,
+ 0x19, 0xdb, 0xfa, 0xf3, 0x5e, 0xfc, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x33, 0x18, 0xb8, 0xad, 0xff,
+ 0x01, 0x00, 0x00,
}
func (x IngesterState) String() string {
@@ -304,14 +242,6 @@ func (this *Desc) Equal(that interface{}) bool {
return false
}
}
- if len(this.Tokens) != len(that1.Tokens) {
- return false
- }
- for i := range this.Tokens {
- if !this.Tokens[i].Equal(&that1.Tokens[i]) {
- return false
- }
- }
return true
}
func (this *IngesterDesc) Equal(that interface{}) bool {
@@ -352,38 +282,11 @@ func (this *IngesterDesc) Equal(that interface{}) bool {
}
return true
}
-func (this *TokenDesc) Equal(that interface{}) bool {
- if that == nil {
- return this == nil
- }
-
- that1, ok := that.(*TokenDesc)
- if !ok {
- that2, ok := that.(TokenDesc)
- if ok {
- that1 = &that2
- } else {
- return false
- }
- }
- if that1 == nil {
- return this == nil
- } else if this == nil {
- return false
- }
- if this.Token != that1.Token {
- return false
- }
- if this.Ingester != that1.Ingester {
- return false
- }
- return true
-}
func (this *Desc) GoString() string {
if this == nil {
return "nil"
}
- s := make([]string, 0, 6)
+ s := make([]string, 0, 5)
s = append(s, "&ring.Desc{")
keysForIngesters := make([]string, 0, len(this.Ingesters))
for k, _ := range this.Ingesters {
@@ -398,13 +301,6 @@ func (this *Desc) GoString() string {
if this.Ingesters != nil {
s = append(s, "Ingesters: "+mapStringForIngesters+",\n")
}
- if this.Tokens != nil {
- vs := make([]*TokenDesc, len(this.Tokens))
- for i := range vs {
- vs[i] = &this.Tokens[i]
- }
- s = append(s, "Tokens: "+fmt.Sprintf("%#v", vs)+",\n")
- }
s = append(s, "}")
return strings.Join(s, "")
}
@@ -421,17 +317,6 @@ func (this *IngesterDesc) GoString() string {
s = append(s, "}")
return strings.Join(s, "")
}
-func (this *TokenDesc) GoString() string {
- if this == nil {
- return "nil"
- }
- s := make([]string, 0, 6)
- s = append(s, "&ring.TokenDesc{")
- s = append(s, "Token: "+fmt.Sprintf("%#v", this.Token)+",\n")
- s = append(s, "Ingester: "+fmt.Sprintf("%#v", this.Ingester)+",\n")
- s = append(s, "}")
- return strings.Join(s, "")
-}
func valueToGoStringRing(v interface{}, typ string) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
@@ -481,18 +366,6 @@ func (m *Desc) MarshalTo(dAtA []byte) (int, error) {
i += n1
}
}
- if len(m.Tokens) > 0 {
- for _, msg := range m.Tokens {
- dAtA[i] = 0x12
- i++
- i = encodeVarintRing(dAtA, i, uint64(msg.Size()))
- n, err := msg.MarshalTo(dAtA[i:])
- if err != nil {
- return 0, err
- }
- i += n
- }
- }
return i, nil
}
@@ -547,35 +420,6 @@ func (m *IngesterDesc) MarshalTo(dAtA []byte) (int, error) {
return i, nil
}
-func (m *TokenDesc) Marshal() (dAtA []byte, err error) {
- size := m.Size()
- dAtA = make([]byte, size)
- n, err := m.MarshalTo(dAtA)
- if err != nil {
- return nil, err
- }
- return dAtA[:n], nil
-}
-
-func (m *TokenDesc) MarshalTo(dAtA []byte) (int, error) {
- var i int
- _ = i
- var l int
- _ = l
- if m.Token != 0 {
- dAtA[i] = 0x8
- i++
- i = encodeVarintRing(dAtA, i, uint64(m.Token))
- }
- if len(m.Ingester) > 0 {
- dAtA[i] = 0x12
- i++
- i = encodeVarintRing(dAtA, i, uint64(len(m.Ingester)))
- i += copy(dAtA[i:], m.Ingester)
- }
- return i, nil
-}
-
func encodeVarintRing(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
@@ -600,12 +444,6 @@ func (m *Desc) Size() (n int) {
n += mapEntrySize + 1 + sovRing(uint64(mapEntrySize))
}
}
- if len(m.Tokens) > 0 {
- for _, e := range m.Tokens {
- l = e.Size()
- n += 1 + l + sovRing(uint64(l))
- }
- }
return n
}
@@ -635,22 +473,6 @@ func (m *IngesterDesc) Size() (n int) {
return n
}
-func (m *TokenDesc) Size() (n int) {
- if m == nil {
- return 0
- }
- var l int
- _ = l
- if m.Token != 0 {
- n += 1 + sovRing(uint64(m.Token))
- }
- l = len(m.Ingester)
- if l > 0 {
- n += 1 + l + sovRing(uint64(l))
- }
- return n
-}
-
func sovRing(x uint64) (n int) {
for {
n++
@@ -680,7 +502,6 @@ func (this *Desc) String() string {
mapStringForIngesters += "}"
s := strings.Join([]string{`&Desc{`,
`Ingesters:` + mapStringForIngesters + `,`,
- `Tokens:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Tokens), "TokenDesc", "TokenDesc", 1), `&`, ``, 1) + `,`,
`}`,
}, "")
return s
@@ -698,17 +519,6 @@ func (this *IngesterDesc) String() string {
}, "")
return s
}
-func (this *TokenDesc) String() string {
- if this == nil {
- return "nil"
- }
- s := strings.Join([]string{`&TokenDesc{`,
- `Token:` + fmt.Sprintf("%v", this.Token) + `,`,
- `Ingester:` + fmt.Sprintf("%v", this.Ingester) + `,`,
- `}`,
- }, "")
- return s
-}
func valueToStringRing(v interface{}) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
@@ -875,40 +685,6 @@ func (m *Desc) Unmarshal(dAtA []byte) error {
}
m.Ingesters[mapkey] = *mapvalue
iNdEx = postIndex
- case 2:
- if wireType != 2 {
- return fmt.Errorf("proto: wrong wireType = %d for field Tokens", wireType)
- }
- var msglen int
- for shift := uint(0); ; shift += 7 {
- if shift >= 64 {
- return ErrIntOverflowRing
- }
- if iNdEx >= l {
- return io.ErrUnexpectedEOF
- }
- b := dAtA[iNdEx]
- iNdEx++
- msglen |= int(b&0x7F) << shift
- if b < 0x80 {
- break
- }
- }
- if msglen < 0 {
- return ErrInvalidLengthRing
- }
- postIndex := iNdEx + msglen
- if postIndex < 0 {
- return ErrInvalidLengthRing
- }
- if postIndex > l {
- return io.ErrUnexpectedEOF
- }
- m.Tokens = append(m.Tokens, TokenDesc{})
- if err := m.Tokens[len(m.Tokens)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
- return err
- }
- iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRing(dAtA[iNdEx:])
@@ -1132,110 +908,6 @@ func (m *IngesterDesc) Unmarshal(dAtA []byte) error {
}
return nil
}
-func (m *TokenDesc) Unmarshal(dAtA []byte) error {
- l := len(dAtA)
- iNdEx := 0
- for iNdEx < l {
- preIndex := iNdEx
- var wire uint64
- for shift := uint(0); ; shift += 7 {
- if shift >= 64 {
- return ErrIntOverflowRing
- }
- if iNdEx >= l {
- return io.ErrUnexpectedEOF
- }
- b := dAtA[iNdEx]
- iNdEx++
- wire |= uint64(b&0x7F) << shift
- if b < 0x80 {
- break
- }
- }
- fieldNum := int32(wire >> 3)
- wireType := int(wire & 0x7)
- if wireType == 4 {
- return fmt.Errorf("proto: TokenDesc: wiretype end group for non-group")
- }
- if fieldNum <= 0 {
- return fmt.Errorf("proto: TokenDesc: illegal tag %d (wire type %d)", fieldNum, wire)
- }
- switch fieldNum {
- case 1:
- if wireType != 0 {
- return fmt.Errorf("proto: wrong wireType = %d for field Token", wireType)
- }
- m.Token = 0
- for shift := uint(0); ; shift += 7 {
- if shift >= 64 {
- return ErrIntOverflowRing
- }
- if iNdEx >= l {
- return io.ErrUnexpectedEOF
- }
- b := dAtA[iNdEx]
- iNdEx++
- m.Token |= uint32(b&0x7F) << shift
- if b < 0x80 {
- break
- }
- }
- case 2:
- if wireType != 2 {
- return fmt.Errorf("proto: wrong wireType = %d for field Ingester", wireType)
- }
- var stringLen uint64
- for shift := uint(0); ; shift += 7 {
- if shift >= 64 {
- return ErrIntOverflowRing
- }
- if iNdEx >= l {
- return io.ErrUnexpectedEOF
- }
- b := dAtA[iNdEx]
- iNdEx++
- stringLen |= uint64(b&0x7F) << shift
- if b < 0x80 {
- break
- }
- }
- intStringLen := int(stringLen)
- if intStringLen < 0 {
- return ErrInvalidLengthRing
- }
- postIndex := iNdEx + intStringLen
- if postIndex < 0 {
- return ErrInvalidLengthRing
- }
- if postIndex > l {
- return io.ErrUnexpectedEOF
- }
- m.Ingester = string(dAtA[iNdEx:postIndex])
- iNdEx = postIndex
- default:
- iNdEx = preIndex
- skippy, err := skipRing(dAtA[iNdEx:])
- if err != nil {
- return err
- }
- if skippy < 0 {
- return ErrInvalidLengthRing
- }
- if (iNdEx + skippy) < 0 {
- return ErrInvalidLengthRing
- }
- if (iNdEx + skippy) > l {
- return io.ErrUnexpectedEOF
- }
- iNdEx += skippy
- }
- }
-
- if iNdEx > l {
- return io.ErrUnexpectedEOF
- }
- return nil
-}
func skipRing(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
diff --git a/pkg/ring/ring.proto b/pkg/ring/ring.proto
index 1c968309307..9670b01c090 100644
--- a/pkg/ring/ring.proto
+++ b/pkg/ring/ring.proto
@@ -9,7 +9,7 @@ option (gogoproto.unmarshaler_all) = true;
message Desc {
map ingesters = 1 [(gogoproto.nullable) = false];
- repeated TokenDesc tokens = 2 [(gogoproto.nullable) = false];
+ reserved 2;
}
message IngesterDesc {
@@ -21,11 +21,6 @@ message IngesterDesc {
repeated uint32 tokens = 6;
}
-message TokenDesc {
- uint32 token = 1;
- string ingester = 2;
-}
-
enum IngesterState {
ACTIVE = 0;
LEAVING = 1;
diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go
index cf80ab94a8e..f850c7a3271 100644
--- a/pkg/ring/ring_test.go
+++ b/pkg/ring/ring_test.go
@@ -92,32 +92,14 @@ func TestDoBatchZeroIngesters(t *testing.T) {
func TestAddIngester(t *testing.T) {
r := NewDesc()
- const (
- ing1Name = "ing1"
- ing2Name = "ing2"
- )
+ const ingName = "ing1"
ing1Tokens := GenerateTokens(128, nil)
- ing2Tokens := GenerateTokens(128, ing1Tokens)
- // store tokens to r.Tokens
- for _, t := range ing1Tokens {
- r.Tokens = append(r.Tokens, TokenDesc{Token: t, Ingester: ing1Name})
- }
-
- for _, t := range ing2Tokens {
- r.Tokens = append(r.Tokens, TokenDesc{Token: t, Ingester: ing2Name})
- }
+ r.AddIngester(ingName, "addr", ing1Tokens, ACTIVE)
- r.AddIngester(ing1Name, "addr", ing1Tokens, ACTIVE)
-
- require.Equal(t, "addr", r.Ingesters[ing1Name].Addr)
- require.Equal(t, ing1Tokens, r.Ingesters[ing1Name].Tokens)
-
- require.Equal(t, len(ing2Tokens), len(r.Tokens))
- for _, tok := range r.Tokens {
- require.NotEqual(t, "test", tok.Ingester)
- }
+ require.Equal(t, "addr", r.Ingesters[ingName].Addr)
+ require.Equal(t, ing1Tokens, r.Ingesters[ingName].Tokens)
}
func TestAddIngesterReplacesExistingTokens(t *testing.T) {
@@ -125,16 +107,14 @@ func TestAddIngesterReplacesExistingTokens(t *testing.T) {
const ing1Name = "ing1"
- oldTokens := []uint32{11111, 22222, 33333}
// old tokens will be replaced
- for _, t := range oldTokens {
- r.Tokens = append(r.Tokens, TokenDesc{Token: t, Ingester: ing1Name})
+ r.Ingesters[ing1Name] = IngesterDesc{
+ Tokens: []uint32{11111, 22222, 33333},
}
- newTokens := GenerateTokens(128, oldTokens)
+ newTokens := GenerateTokens(128, nil)
r.AddIngester(ing1Name, "addr", newTokens, ACTIVE)
require.Equal(t, newTokens, r.Ingesters[ing1Name].Tokens)
- require.Equal(t, 0, len(r.Tokens)) // all previous tokens were removed
}
diff --git a/pkg/ring/testutils/testutils.go b/pkg/ring/testutils/testutils.go
index c043187ee82..d3acf462d3d 100644
--- a/pkg/ring/testutils/testutils.go
+++ b/pkg/ring/testutils/testutils.go
@@ -17,12 +17,6 @@ func NumTokens(c kv.Client, name, ringKey string) int {
level.Error(util.Logger).Log("msg", "error reading consul", "err", err)
return 0
}
- count := 0
rd := ringDesc.(*ring.Desc)
- for _, token := range rd.Tokens {
- if token.Ingester == name {
- count++
- }
- }
- return count + len(rd.Ingesters[name].Tokens)
+ return len(rd.Ingesters[name].Tokens)
}