Skip to content

Using a single seed array for expanded postings cache on ingesters #6365

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ type Ingester struct {

inflightQueryRequests atomic.Int64
maxInflightQueryRequests util_math.MaxTracker

expandedPostingsCacheFactory *cortex_tsdb.ExpandedPostingsCacheFactory
}

// Shipper interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -697,12 +699,13 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
}

i := &Ingester{
cfg: cfg,
limits: limits,
usersMetadata: map[string]*userMetricsMetadata{},
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
cfg: cfg,
limits: limits,
usersMetadata: map[string]*userMetricsMetadata{},
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
expandedPostingsCacheFactory: cortex_tsdb.NewExpandedPostingsCacheFactory(cfg.BlocksStorageConfig.TSDB.PostingsCache),
}
i.metrics = newIngesterMetrics(registerer,
false,
Expand Down Expand Up @@ -2174,9 +2177,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
blockRanges := i.cfg.BlocksStorageConfig.TSDB.BlockRanges.ToMilliseconds()

var postingCache cortex_tsdb.ExpandedPostingsCache
if i.cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled || i.cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled {
logutil.WarnExperimentalUse("expanded postings cache")
postingCache = cortex_tsdb.NewBlocksPostingsForMatchersCache(i.cfg.BlocksStorageConfig.TSDB.PostingsCache, i.metrics.expandedPostingsCacheMetrics)
if i.expandedPostingsCacheFactory != nil {
postingCache = i.expandedPostingsCacheFactory.NewExpandedPostingsCache(userID, i.metrics.expandedPostingsCacheMetrics)
}

userDB := &userTSDB{
Expand Down
59 changes: 59 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5080,6 +5080,65 @@ func TestIngester_instanceLimitsMetrics(t *testing.T) {
`), "cortex_ingester_instance_limits"))
}

func TestExpendedPostingsCacheIsolation(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
cfg.LifecyclerConfig.JoinAfter = 0
cfg.BlocksStorageConfig.TSDB.PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig{
SeedSize: 1, // lets make sure all metric names collide
Head: cortex_tsdb.PostingsCacheConfig{
Enabled: true,
},
Blocks: cortex_tsdb.PostingsCacheConfig{
Enabled: true,
},
}

r := prometheus.NewRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, r)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

numberOfTenants := 100
wg := sync.WaitGroup{}
wg.Add(numberOfTenants)

for j := 0; j < numberOfTenants; j++ {
go func() {
defer wg.Done()
userId := fmt.Sprintf("user%v", j)
ctx := user.InjectOrgID(context.Background(), userId)
_, err = i.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userId)}, []cortexpb.Sample{{Value: 2, TimestampMs: 4 * 60 * 60 * 1000}}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()
}

wg.Wait()

wg.Add(numberOfTenants)
for j := 0; j < numberOfTenants; j++ {
go func() {
defer wg.Done()
userId := fmt.Sprintf("user%v", j)
ctx := user.InjectOrgID(context.Background(), userId)
s := &mockQueryStreamServer{ctx: ctx}

err := i.QueryStream(&client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: math.MaxInt64,
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}},
}, s)
require.NoError(t, err)
require.Len(t, s.series, 1)
require.Len(t, s.series[0].Labels, 2)
require.Equal(t, userId, cortexpb.FromLabelAdaptersToLabels(s.series[0].Labels).Get("userId"))
}()
}
wg.Wait()
}

func TestExpendedPostingsCache(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
Expand Down
116 changes: 81 additions & 35 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (
"sync"
"time"

"github.com/cespare/xxhash/v2"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/segmentio/fasthash/fnv1a"

"github.com/cortexproject/cortex/pkg/util/extract"
logutil "github.com/cortexproject/cortex/pkg/util/log"
)

var (
Expand All @@ -29,8 +30,8 @@ var (

const (
// size of the seed array. Each seed is a 64bits int (8 bytes)
// totaling 8mb
seedArraySize = 1024 * 1024
// totaling 16mb
seedArraySize = 2 * 1024 * 1024

numOfSeedsStripes = 512
)
Expand Down Expand Up @@ -67,7 +68,9 @@ type TSDBPostingsCacheConfig struct {
Head PostingsCacheConfig `yaml:"head" doc:"description=If enabled, ingesters will cache expanded postings for the head block. Only queries with with an equal matcher for metric __name__ are cached."`
Blocks PostingsCacheConfig `yaml:"blocks" doc:"description=If enabled, ingesters will cache expanded postings for the compacted blocks. The cache is shared between all blocks."`

// The configurations below are used only for testing purpose
PostingsForMatchers func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) `yaml:"-"`
SeedSize int `yaml:"-"`
timeNow func() time.Time `yaml:"-"`
}

Expand All @@ -89,25 +92,48 @@ func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f
f.BoolVar(&cfg.Enabled, prefix+"expanded_postings_cache."+block+".enabled", false, "Whether the postings cache is enabled or not")
}

type ExpandedPostingsCacheFactory struct {
seedByHash *seedByHash
cfg TSDBPostingsCacheConfig
}

func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPostingsCacheFactory {
if cfg.Head.Enabled || cfg.Blocks.Enabled {
if cfg.SeedSize == 0 {
cfg.SeedSize = seedArraySize
}
logutil.WarnExperimentalUse("expanded postings cache")
return &ExpandedPostingsCacheFactory{
cfg: cfg,
seedByHash: newSeedByHash(cfg.SeedSize),
}
}

return nil
}

func (f *ExpandedPostingsCacheFactory) NewExpandedPostingsCache(userId string, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache {
return newBlocksPostingsForMatchersCache(userId, f.cfg, metrics, f.seedByHash)
}

type ExpandedPostingsCache interface {
PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
ExpireSeries(metric labels.Labels)
}

type BlocksPostingsForMatchersCache struct {
strippedLock []sync.RWMutex

headCache *fifoCache[[]storage.SeriesRef]
blocksCache *fifoCache[[]storage.SeriesRef]
type blocksPostingsForMatchersCache struct {
userId string

headSeedByMetricName []int
headCache *fifoCache[[]storage.SeriesRef]
blocksCache *fifoCache[[]storage.SeriesRef]
postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
timeNow func() time.Time

metrics *ExpandedPostingsCacheMetrics
metrics *ExpandedPostingsCacheMetrics
seedByHash *seedByHash
}

func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache {
func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash) ExpandedPostingsCache {
if cfg.PostingsForMatchers == nil {
cfg.PostingsForMatchers = tsdb.PostingsForMatchers
}
Expand All @@ -116,36 +142,30 @@ func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *Exp
cfg.timeNow = time.Now
}

return &BlocksPostingsForMatchersCache{
return &blocksPostingsForMatchersCache{
headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
headSeedByMetricName: make([]int, seedArraySize),
strippedLock: make([]sync.RWMutex, numOfSeedsStripes),
postingsForMatchersFunc: cfg.PostingsForMatchers,
timeNow: cfg.timeNow,
metrics: metrics,
seedByHash: seedByHash,
userId: userId,
}
}

func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) {
func (c *blocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) {
metricName, err := extract.MetricNameFromLabels(metric)
if err != nil {
return
}

h := MemHashString(metricName)
i := h % uint64(len(c.headSeedByMetricName))
l := h % uint64(len(c.strippedLock))
c.strippedLock[l].Lock()
defer c.strippedLock[l].Unlock()
c.headSeedByMetricName[i]++
c.seedByHash.incrementSeed(c.userId, metricName)
}

func (c *BlocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
func (c *blocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
return c.fetchPostings(blockID, ix, ms...)(ctx)
}

func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) {
func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) {
var seed string
cache := c.blocksCache

Expand Down Expand Up @@ -197,7 +217,7 @@ func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd
return c.result(promise)
}

func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) {
func (c *blocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) {
return func(ctx context.Context) (index.Postings, error) {
select {
case <-ctx.Done():
Expand All @@ -211,16 +231,11 @@ func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.
}
}

func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string {
h := MemHashString(metricName)
i := h % uint64(len(c.headSeedByMetricName))
l := h % uint64(len(c.strippedLock))
c.strippedLock[l].RLock()
defer c.strippedLock[l].RUnlock()
return strconv.Itoa(c.headSeedByMetricName[i])
func (c *blocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string {
return c.seedByHash.getSeed(c.userId, metricName)
}

func (c *BlocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string {
func (c *blocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string {
slices.SortFunc(ms, func(i, j *labels.Matcher) int {
if i.Type != j.Type {
return int(i.Type - j.Type)
Expand Down Expand Up @@ -272,6 +287,36 @@ func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) {
return "", false
}

type seedByHash struct {
strippedLock []sync.RWMutex
seedByHash []int
}

func newSeedByHash(size int) *seedByHash {
return &seedByHash{
seedByHash: make([]int, size),
strippedLock: make([]sync.RWMutex, numOfSeedsStripes),
}
}

func (s *seedByHash) getSeed(userId string, v string) string {
h := memHashString(userId, v)
i := h % uint64(len(s.seedByHash))
l := h % uint64(len(s.strippedLock))
s.strippedLock[l].RLock()
defer s.strippedLock[l].RUnlock()
return strconv.Itoa(s.seedByHash[i])
}

func (s *seedByHash) incrementSeed(userId string, v string) {
h := memHashString(userId, v)
i := h % uint64(len(s.seedByHash))
l := h % uint64(len(s.strippedLock))
s.strippedLock[l].Lock()
defer s.strippedLock[l].Unlock()
s.seedByHash[i]++
}

type fifoCache[V any] struct {
cfg PostingsCacheConfig
cachedValues *sync.Map
Expand Down Expand Up @@ -425,6 +470,7 @@ func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool
return r >= ttl
}

func MemHashString(str string) uint64 {
return xxhash.Sum64(yoloBuf(str))
func memHashString(userId, v string) uint64 {
h := fnv1a.HashString64(userId)
return fnv1a.AddString64(h, v)
}
18 changes: 18 additions & 0 deletions pkg/storage/tsdb/expanded_postings_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,24 @@ func TestFifoCacheExpire(t *testing.T) {
}
}

func Test_memHashString(test *testing.T) {
numberOfTenants := 200
numberOfMetrics := 100
occurrences := map[uint64]int{}

for k := 0; k < 10; k++ {
for j := 0; j < numberOfMetrics; j++ {
metricName := fmt.Sprintf("metricName%v", j)
for i := 0; i < numberOfTenants; i++ {
userId := fmt.Sprintf("user%v", i)
occurrences[memHashString(userId, metricName)]++
}
}

require.Len(test, occurrences, numberOfMetrics*numberOfTenants)
}
}

func RepeatStringIfNeeded(seed string, length int) string {
if len(seed) > length {
return seed
Expand Down
Loading