From 9331e0232e4c9fef1ca25d2d860d3bb1e74d1a54 Mon Sep 17 00:00:00 2001 From: alanprot Date: Thu, 21 Nov 2024 22:23:46 -0800 Subject: [PATCH 1/4] Using a single seed array for expanded postings cache on ingesters Signed-off-by: alanprot --- pkg/ingester/ingester.go | 20 ++-- pkg/storage/tsdb/expanded_postings_cache.go | 103 +++++++++++++------- 2 files changed, 80 insertions(+), 43 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 00dd1337ce1..b7a3c882f1a 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -238,6 +238,8 @@ type Ingester struct { inflightQueryRequests atomic.Int64 maxInflightQueryRequests util_math.MaxTracker + + expandedPostingsCacheFactory *cortex_tsdb.ExpandedPostingsCacheFactory } // Shipper interface is used to have an easy way to mock it in tests. @@ -697,12 +699,13 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe } i := &Ingester{ - cfg: cfg, - limits: limits, - usersMetadata: map[string]*userMetricsMetadata{}, - TSDBState: newTSDBState(bucketClient, registerer), - logger: logger, - ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), + cfg: cfg, + limits: limits, + usersMetadata: map[string]*userMetricsMetadata{}, + TSDBState: newTSDBState(bucketClient, registerer), + logger: logger, + ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), + expandedPostingsCacheFactory: cortex_tsdb.NewExpandedPostingsCacheFactory(cfg.BlocksStorageConfig.TSDB.PostingsCache), } i.metrics = newIngesterMetrics(registerer, false, @@ -2174,9 +2177,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { blockRanges := i.cfg.BlocksStorageConfig.TSDB.BlockRanges.ToMilliseconds() var postingCache cortex_tsdb.ExpandedPostingsCache - if i.cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled || i.cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled { - logutil.WarnExperimentalUse("expanded postings cache") - postingCache = cortex_tsdb.NewBlocksPostingsForMatchersCache(i.cfg.BlocksStorageConfig.TSDB.PostingsCache, i.metrics.expandedPostingsCacheMetrics) + if i.expandedPostingsCacheFactory != nil { + postingCache = i.expandedPostingsCacheFactory.NewExpandedPostingsCache(i.metrics.expandedPostingsCacheMetrics) } userDB := &userTSDB{ diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 59af6d879f9..01bd08a8fca 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/prometheus/tsdb/index" "github.com/cortexproject/cortex/pkg/util/extract" + logutil "github.com/cortexproject/cortex/pkg/util/log" ) var ( @@ -29,8 +30,8 @@ var ( const ( // size of the seed array. Each seed is a 64bits int (8 bytes) - // totaling 8mb - seedArraySize = 1024 * 1024 + // totaling 16mb + seedArraySize = 2 * 1024 * 1024 numOfSeedsStripes = 512 ) @@ -89,25 +90,43 @@ func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f f.BoolVar(&cfg.Enabled, prefix+"expanded_postings_cache."+block+".enabled", false, "Whether the postings cache is enabled or not") } +type ExpandedPostingsCacheFactory struct { + seedByHash *seedByHash + cfg TSDBPostingsCacheConfig +} + +func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPostingsCacheFactory { + if cfg.Head.Enabled || cfg.Blocks.Enabled { + logutil.WarnExperimentalUse("expanded postings cache") + return &ExpandedPostingsCacheFactory{ + cfg: cfg, + seedByHash: newSeedByHash(), + } + } + + return nil +} + +func (f *ExpandedPostingsCacheFactory) NewExpandedPostingsCache(metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache { + return newBlocksPostingsForMatchersCache(f.cfg, metrics, f.seedByHash) +} + type ExpandedPostingsCache interface { PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) ExpireSeries(metric labels.Labels) } -type BlocksPostingsForMatchersCache struct { - strippedLock []sync.RWMutex - - headCache *fifoCache[[]storage.SeriesRef] - blocksCache *fifoCache[[]storage.SeriesRef] - - headSeedByMetricName []int +type blocksPostingsForMatchersCache struct { + headCache *fifoCache[[]storage.SeriesRef] + blocksCache *fifoCache[[]storage.SeriesRef] postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) timeNow func() time.Time - metrics *ExpandedPostingsCacheMetrics + metrics *ExpandedPostingsCacheMetrics + seedByHash *seedByHash } -func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache { +func newBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash) ExpandedPostingsCache { if cfg.PostingsForMatchers == nil { cfg.PostingsForMatchers = tsdb.PostingsForMatchers } @@ -116,36 +135,29 @@ func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *Exp cfg.timeNow = time.Now } - return &BlocksPostingsForMatchersCache{ + return &blocksPostingsForMatchersCache{ headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow), blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow), - headSeedByMetricName: make([]int, seedArraySize), - strippedLock: make([]sync.RWMutex, numOfSeedsStripes), postingsForMatchersFunc: cfg.PostingsForMatchers, timeNow: cfg.timeNow, metrics: metrics, + seedByHash: seedByHash, } } -func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) { +func (c *blocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) { metricName, err := extract.MetricNameFromLabels(metric) if err != nil { return } - - h := MemHashString(metricName) - i := h % uint64(len(c.headSeedByMetricName)) - l := h % uint64(len(c.strippedLock)) - c.strippedLock[l].Lock() - defer c.strippedLock[l].Unlock() - c.headSeedByMetricName[i]++ + c.seedByHash.incrementSeed(memHashString(metricName)) } -func (c *BlocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { +func (c *blocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { return c.fetchPostings(blockID, ix, ms...)(ctx) } -func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) { +func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) { var seed string cache := c.blocksCache @@ -197,7 +209,7 @@ func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd return c.result(promise) } -func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) { +func (c *blocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) { return func(ctx context.Context) (index.Postings, error) { select { case <-ctx.Done(): @@ -211,16 +223,11 @@ func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage. } } -func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { - h := MemHashString(metricName) - i := h % uint64(len(c.headSeedByMetricName)) - l := h % uint64(len(c.strippedLock)) - c.strippedLock[l].RLock() - defer c.strippedLock[l].RUnlock() - return strconv.Itoa(c.headSeedByMetricName[i]) +func (c *blocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { + return c.seedByHash.getSeed(memHashString(metricName)) } -func (c *BlocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string { +func (c *blocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string { slices.SortFunc(ms, func(i, j *labels.Matcher) int { if i.Type != j.Type { return int(i.Type - j.Type) @@ -272,6 +279,34 @@ func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) { return "", false } +type seedByHash struct { + strippedLock []sync.RWMutex + seedByHash []int +} + +func newSeedByHash() *seedByHash { + return &seedByHash{ + seedByHash: make([]int, seedArraySize), + strippedLock: make([]sync.RWMutex, numOfSeedsStripes), + } +} + +func (s *seedByHash) getSeed(h uint64) string { + i := h % uint64(len(s.seedByHash)) + l := h % uint64(len(s.strippedLock)) + s.strippedLock[l].RLock() + defer s.strippedLock[l].RUnlock() + return strconv.Itoa(s.seedByHash[i]) +} + +func (s *seedByHash) incrementSeed(h uint64) { + i := h % uint64(len(s.seedByHash)) + l := h % uint64(len(s.strippedLock)) + s.strippedLock[l].Lock() + defer s.strippedLock[l].Unlock() + s.seedByHash[i]++ +} + type fifoCache[V any] struct { cfg PostingsCacheConfig cachedValues *sync.Map @@ -425,6 +460,6 @@ func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool return r >= ttl } -func MemHashString(str string) uint64 { +func memHashString(str string) uint64 { return xxhash.Sum64(yoloBuf(str)) } From 7474d0ce29ba3036de5fba9d21a5e4e10a0fc828 Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 22 Nov 2024 10:59:19 -0800 Subject: [PATCH 2/4] using tenant id to calculate the seeds hash Signed-off-by: alanprot --- pkg/ingester/ingester.go | 2 +- pkg/storage/tsdb/expanded_postings_cache.go | 26 +++++++++++++-------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index b7a3c882f1a..6f2aea2e0d2 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -2178,7 +2178,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { var postingCache cortex_tsdb.ExpandedPostingsCache if i.expandedPostingsCacheFactory != nil { - postingCache = i.expandedPostingsCacheFactory.NewExpandedPostingsCache(i.metrics.expandedPostingsCacheMetrics) + postingCache = i.expandedPostingsCacheFactory.NewExpandedPostingsCache(userID, i.metrics.expandedPostingsCacheMetrics) } userDB := &userTSDB{ diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 01bd08a8fca..1c03f412cd3 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -10,7 +10,6 @@ import ( "sync" "time" - "github.com/cespare/xxhash/v2" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -18,6 +17,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" + "github.com/segmentio/fasthash/fnv1a" "github.com/cortexproject/cortex/pkg/util/extract" logutil "github.com/cortexproject/cortex/pkg/util/log" @@ -107,8 +107,8 @@ func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPosti return nil } -func (f *ExpandedPostingsCacheFactory) NewExpandedPostingsCache(metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache { - return newBlocksPostingsForMatchersCache(f.cfg, metrics, f.seedByHash) +func (f *ExpandedPostingsCacheFactory) NewExpandedPostingsCache(userId string, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache { + return newBlocksPostingsForMatchersCache(userId, f.cfg, metrics, f.seedByHash) } type ExpandedPostingsCache interface { @@ -117,6 +117,8 @@ type ExpandedPostingsCache interface { } type blocksPostingsForMatchersCache struct { + userId string + headCache *fifoCache[[]storage.SeriesRef] blocksCache *fifoCache[[]storage.SeriesRef] postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) @@ -126,7 +128,7 @@ type blocksPostingsForMatchersCache struct { seedByHash *seedByHash } -func newBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash) ExpandedPostingsCache { +func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash) ExpandedPostingsCache { if cfg.PostingsForMatchers == nil { cfg.PostingsForMatchers = tsdb.PostingsForMatchers } @@ -142,6 +144,7 @@ func newBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *Exp timeNow: cfg.timeNow, metrics: metrics, seedByHash: seedByHash, + userId: userId, } } @@ -150,7 +153,7 @@ func (c *blocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) { if err != nil { return } - c.seedByHash.incrementSeed(memHashString(metricName)) + c.seedByHash.incrementSeed(c.userId, metricName) } func (c *blocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { @@ -224,7 +227,7 @@ func (c *blocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage. } func (c *blocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { - return c.seedByHash.getSeed(memHashString(metricName)) + return c.seedByHash.getSeed(c.userId, metricName) } func (c *blocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string { @@ -291,7 +294,8 @@ func newSeedByHash() *seedByHash { } } -func (s *seedByHash) getSeed(h uint64) string { +func (s *seedByHash) getSeed(userId string, v string) string { + h := memHashString(userId, v) i := h % uint64(len(s.seedByHash)) l := h % uint64(len(s.strippedLock)) s.strippedLock[l].RLock() @@ -299,7 +303,8 @@ func (s *seedByHash) getSeed(h uint64) string { return strconv.Itoa(s.seedByHash[i]) } -func (s *seedByHash) incrementSeed(h uint64) { +func (s *seedByHash) incrementSeed(userId string, v string) { + h := memHashString(userId, v) i := h % uint64(len(s.seedByHash)) l := h % uint64(len(s.strippedLock)) s.strippedLock[l].Lock() @@ -460,6 +465,7 @@ func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool return r >= ttl } -func memHashString(str string) uint64 { - return xxhash.Sum64(yoloBuf(str)) +func memHashString(userId, v string) uint64 { + h := fnv1a.HashString64(userId) + return fnv1a.AddString64(h, v) } From a89d2cd352663feb3bb049072aeec7979cb12860 Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 22 Nov 2024 11:20:57 -0800 Subject: [PATCH 3/4] Adding cache isolation test Signed-off-by: alanprot --- pkg/ingester/ingester_test.go | 58 +++++++++++++++++++++ pkg/storage/tsdb/expanded_postings_cache.go | 11 ++-- 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 2f7a4c7cf64..d4f8f6106e9 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5080,6 +5080,64 @@ func TestIngester_instanceLimitsMetrics(t *testing.T) { `), "cortex_ingester_instance_limits")) } +func TestExpendedPostingsCacheIsolation(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} + cfg.LifecyclerConfig.JoinAfter = 0 + cfg.BlocksStorageConfig.TSDB.PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig{ + SeedSize: 1, // lets make sure all metric names collide + Head: cortex_tsdb.PostingsCacheConfig{ + Enabled: true, + }, + Blocks: cortex_tsdb.PostingsCacheConfig{ + Enabled: true, + }, + } + + r := prometheus.NewRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, r) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + numberOfTenants := 100 + wg := sync.WaitGroup{} + wg.Add(numberOfTenants) + + for j := 0; j < numberOfTenants; j++ { + go func() { + defer wg.Done() + userId := fmt.Sprintf("user%v", j) + ctx := user.InjectOrgID(context.Background(), userId) + _, err = i.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userId)}, []cortexpb.Sample{{Value: 2, TimestampMs: 4 * 60 * 60 * 1000}}, nil, nil, cortexpb.API)) + require.NoError(t, err) + }() + } + + wg.Wait() + + wg.Add(numberOfTenants) + for j := 0; j < numberOfTenants; j++ { + go func() { + defer wg.Done() + userId := fmt.Sprintf("user%v", j) + ctx := user.InjectOrgID(context.Background(), userId) + s := &mockQueryStreamServer{ctx: ctx} + + err := i.QueryStream(&client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}}, + }, s) + require.NoError(t, err) + require.Len(t, s.series, 1) + require.Len(t, s.series[0].Labels, 2) + }() + } + wg.Wait() +} + func TestExpendedPostingsCache(t *testing.T) { cfg := defaultIngesterTestConfig(t) cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 1c03f412cd3..ae9cbea2ee5 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -68,7 +68,9 @@ type TSDBPostingsCacheConfig struct { Head PostingsCacheConfig `yaml:"head" doc:"description=If enabled, ingesters will cache expanded postings for the head block. Only queries with with an equal matcher for metric __name__ are cached."` Blocks PostingsCacheConfig `yaml:"blocks" doc:"description=If enabled, ingesters will cache expanded postings for the compacted blocks. The cache is shared between all blocks."` + // The configurations below are used only for testing purpose PostingsForMatchers func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) `yaml:"-"` + SeedSize int `yaml:"-"` timeNow func() time.Time `yaml:"-"` } @@ -97,10 +99,13 @@ type ExpandedPostingsCacheFactory struct { func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPostingsCacheFactory { if cfg.Head.Enabled || cfg.Blocks.Enabled { + if cfg.SeedSize == 0 { + cfg.SeedSize = seedArraySize + } logutil.WarnExperimentalUse("expanded postings cache") return &ExpandedPostingsCacheFactory{ cfg: cfg, - seedByHash: newSeedByHash(), + seedByHash: newSeedByHash(cfg.SeedSize), } } @@ -287,9 +292,9 @@ type seedByHash struct { seedByHash []int } -func newSeedByHash() *seedByHash { +func newSeedByHash(size int) *seedByHash { return &seedByHash{ - seedByHash: make([]int, seedArraySize), + seedByHash: make([]int, size), strippedLock: make([]sync.RWMutex, numOfSeedsStripes), } } From 2d3abe33e2e8d8978977917ea5b2c6b9aab297b3 Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 22 Nov 2024 11:31:20 -0800 Subject: [PATCH 4/4] add test for memHashString Signed-off-by: alanprot --- pkg/ingester/ingester_test.go | 1 + .../tsdb/expanded_postings_cache_test.go | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index d4f8f6106e9..6f1f145a9ee 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5133,6 +5133,7 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) { require.NoError(t, err) require.Len(t, s.series, 1) require.Len(t, s.series[0].Labels, 2) + require.Equal(t, userId, cortexpb.FromLabelAdaptersToLabels(s.series[0].Labels).Get("userId")) }() } wg.Wait() diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index db821736a32..6a9476072aa 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -176,6 +176,24 @@ func TestFifoCacheExpire(t *testing.T) { } } +func Test_memHashString(test *testing.T) { + numberOfTenants := 200 + numberOfMetrics := 100 + occurrences := map[uint64]int{} + + for k := 0; k < 10; k++ { + for j := 0; j < numberOfMetrics; j++ { + metricName := fmt.Sprintf("metricName%v", j) + for i := 0; i < numberOfTenants; i++ { + userId := fmt.Sprintf("user%v", i) + occurrences[memHashString(userId, metricName)]++ + } + } + + require.Len(test, occurrences, numberOfMetrics*numberOfTenants) + } +} + func RepeatStringIfNeeded(seed string, length int) string { if len(seed) > length { return seed