Skip to content

Commit f0f0bbe

Browse files
authored
Fix pushes when ingesters don't have a zone. (#2357)
* Fix pushes when ingesters don't have a zone. Signed-off-by: Goutham Veeramachaneni <[email protected]> * Added test for nil zone handling Signed-off-by: Goutham Veeramachaneni <[email protected]> * Address feedback Signed-off-by: Goutham Veeramachaneni <[email protected]>
1 parent 500d41c commit f0f0bbe

File tree

2 files changed

+96
-5
lines changed

2 files changed

+96
-5
lines changed

integration/backward_compatibility_test.go

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) {
3737
}
3838
}
3939

40+
func TestNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T) {
41+
for _, previousImage := range previousVersionImages {
42+
t.Run(fmt.Sprintf("Backward compatibility upgrading from %s", previousImage), func(t *testing.T) {
43+
runNewDistributorsCanPushToOldIngestersWithReplication(t, previousImage)
44+
})
45+
}
46+
}
47+
4048
func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage string) {
4149
s, err := e2e.NewScenario(networkName)
4250
require.NoError(t, err)
@@ -92,20 +100,101 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s
92100
// stopped, which means the transfer to ingester-2 is completed.
93101
require.NoError(t, s.Stop(ingester1))
94102

103+
checkQueries(t, consul, distributor,
104+
expectedVector,
105+
previousImage,
106+
flagsForOldImage, ChunksStorageFlags,
107+
now,
108+
s,
109+
1,
110+
)
111+
}
112+
113+
// Check for issues like https://github.com/cortexproject/cortex/issues/2356
114+
func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previousImage string) {
115+
s, err := e2e.NewScenario(networkName)
116+
require.NoError(t, err)
117+
defer s.Close()
118+
119+
// Start dependencies.
120+
dynamo := e2edb.NewDynamoDB()
121+
consul := e2edb.NewConsul()
122+
require.NoError(t, s.StartAndWaitReady(dynamo, consul))
123+
124+
flagsForOldImage := mergeFlags(ChunksStorageFlags, map[string]string{
125+
"-schema-config-file": "",
126+
"-config-yaml": ChunksStorageFlags["-schema-config-file"],
127+
"-distributor.replication-factor": "3",
128+
})
129+
130+
flagsForNewImage := mergeFlags(ChunksStorageFlags, map[string]string{
131+
"-distributor.replication-factor": "3",
132+
})
133+
134+
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
135+
136+
// Start Cortex table-manager (running on current version since the backward compatibility
137+
// test is about testing a rolling update of other services).
138+
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
139+
require.NoError(t, s.StartAndWaitReady(tableManager))
140+
141+
// Wait until the first table-manager sync has completed, so that we're
142+
// sure the tables have been created.
143+
require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds"))
144+
145+
// Start other Cortex components (ingester running on previous version).
146+
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
147+
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
148+
ingester3 := e2ecortex.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
149+
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flagsForNewImage, "")
150+
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3))
151+
152+
// Wait until the distributor has updated the ring.
153+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(1536), "cortex_ring_tokens_total"))
154+
155+
// Push some series to Cortex.
156+
now := time.Now()
157+
series, expectedVector := generateSeries("series_1", now)
158+
159+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
160+
require.NoError(t, err)
161+
162+
res, err := c.Push(series)
163+
require.NoError(t, err)
164+
require.Equal(t, 200, res.StatusCode)
165+
166+
checkQueries(t, consul, distributor,
167+
expectedVector,
168+
previousImage,
169+
flagsForOldImage, flagsForNewImage,
170+
now,
171+
s,
172+
3,
173+
)
174+
}
175+
176+
func checkQueries(t *testing.T, consul *e2e.HTTPService, distributor *e2ecortex.CortexService,
177+
expectedVector model.Vector,
178+
previousImage string,
179+
flagsForOldImage, flagsForNewImage map[string]string,
180+
now time.Time,
181+
s *e2e.Scenario,
182+
numIngesters int,
183+
) {
95184
// Query the new ingester both with the old and the new querier.
96185
for _, image := range []string{previousImage, ""} {
97186
var querier *e2ecortex.CortexService
98187

99188
if image == previousImage {
100189
querier = e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flagsForOldImage, image)
101190
} else {
102-
querier = e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, image)
191+
querier = e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flagsForNewImage, image)
103192
}
104193

105194
require.NoError(t, s.StartAndWaitReady(querier))
106195

107196
// Wait until the querier has updated the ring.
108-
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
197+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(float64(numIngesters*512)), "cortex_ring_tokens_total"))
109198

110199
// Query the series
111200
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")

pkg/ring/ring.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,13 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
199199
if _, ok := distinctHosts[token.Ingester]; ok {
200200
continue
201201
}
202-
if _, ok := distinctZones[token.Zone]; ok {
203-
continue
202+
if token.Zone != "" { // Ignore if the ingesters don't have a zone set.
203+
if _, ok := distinctZones[token.Zone]; ok {
204+
continue
205+
}
206+
distinctZones[token.Zone] = struct{}{}
204207
}
205208
distinctHosts[token.Ingester] = struct{}{}
206-
distinctZones[token.Zone] = struct{}{}
207209
ingester := r.ringDesc.Ingesters[token.Ingester]
208210

209211
// We do not want to Write to Ingesters that are not ACTIVE, but we do want

0 commit comments

Comments
 (0)