7
7
"fmt"
8
8
"io"
9
9
"math"
10
- "math/rand"
11
10
"net"
12
11
"net/http"
13
12
"net/http/httptest"
@@ -183,11 +182,7 @@ func TestIngesterDeletionRace(t *testing.T) {
183
182
limits := defaultLimitsTestConfig ()
184
183
tenantLimits := newMockTenantLimits (map [string ]* validation.Limits {userID : & limits })
185
184
cfg := defaultIngesterTestConfig (t )
186
- cfg .BlocksStorageConfig .TSDB .CloseIdleTSDBInterval = 5 * time .Millisecond
187
- cfg .BlocksStorageConfig .TSDB .CloseIdleTSDBTimeout = 10 * time .Second
188
- cfg .BlocksStorageConfig .TSDB .ExpandedCachingExpireInterval = 5 * time .Millisecond
189
185
cfg .BlocksStorageConfig .TSDB .PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig {
190
- SeedSize : 3 , // lets make sure all metric names collide
191
186
Head : cortex_tsdb.PostingsCacheConfig {
192
187
Enabled : true ,
193
188
Ttl : time .Hour ,
@@ -215,7 +210,7 @@ func TestIngesterDeletionRace(t *testing.T) {
215
210
return ing .lifecycler .GetState ()
216
211
})
217
212
218
- numberOfTenants := 150
213
+ numberOfTenants := 50
219
214
wg := sync.WaitGroup {}
220
215
wg .Add (numberOfTenants )
221
216
@@ -234,11 +229,27 @@ func TestIngesterDeletionRace(t *testing.T) {
234
229
Matchers : []* client.LabelMatcher {{Type : client .REGEX_MATCH , Name : labels .MetricName , Value : ".*" }},
235
230
}, s )
236
231
require .NoError (t , err )
237
- time . Sleep ( time . Duration ( rand . Int63n ( 5 )) * time .Millisecond )
232
+ ing . getTSDB ( u ). postingCache = & wrappedExpandedPostingsCache { ExpandedPostingsCache : ing . getTSDB ( u ). postingCache , purgeDelay : 10 * time .Millisecond }
238
233
ing .getTSDB (u ).deletionMarkFound .Store (true ) // lets force close the tenant
239
234
}()
240
235
}
236
+
241
237
wg .Wait ()
238
+
239
+ ctx , c := context .WithCancel (context .Background ())
240
+ defer c ()
241
+
242
+ go func () {
243
+ ing .expirePostingsCache (ctx ) //nolint:errcheck
244
+ }()
245
+
246
+ go func () {
247
+ ing .closeAndDeleteIdleUserTSDBs (ctx ) //nolint:errcheck
248
+ }()
249
+
250
+ test .Poll (t , 5 * time .Second , 0 , func () interface {} {
251
+ return len (ing .getTSDBUsers ())
252
+ })
242
253
}
243
254
244
255
func TestIngesterPerLabelsetLimitExceeded (t * testing.T ) {
@@ -3592,6 +3603,17 @@ func (m *mockMetricsForLabelMatchersStreamServer) Context() context.Context {
3592
3603
return m .ctx
3593
3604
}
3594
3605
3606
+ type wrappedExpandedPostingsCache struct {
3607
+ cortex_tsdb.ExpandedPostingsCache
3608
+
3609
+ purgeDelay time.Duration
3610
+ }
3611
+
3612
+ func (w * wrappedExpandedPostingsCache ) PurgeExpiredItems () {
3613
+ time .Sleep (w .purgeDelay )
3614
+ w .ExpandedPostingsCache .PurgeExpiredItems ()
3615
+ }
3616
+
3595
3617
type mockQueryStreamServer struct {
3596
3618
grpc.ServerStream
3597
3619
ctx context.Context
0 commit comments