Skip to content

Purge expired postings cache items due inactivity #6502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,14 @@ func (i *Ingester) starting(ctx context.Context) error {
servs = append(servs, closeIdleService)
}

if i.expandedPostingsCacheFactory != nil {
interval := i.cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval
if interval == 0 {
interval = cortex_tsdb.ExpandedCachingExpireInterval
}
servs = append(servs, services.NewTimerService(interval, nil, i.expirePostingsCache, nil))
}

var err error
i.TSDBState.subservices, err = services.NewManager(servs...)
if err == nil {
Expand Down Expand Up @@ -2794,6 +2802,18 @@ func (i *Ingester) closeAndDeleteIdleUserTSDBs(ctx context.Context) error {
return nil
}

func (i *Ingester) expirePostingsCache(ctx context.Context) error {
for _, userID := range i.getTSDBUsers() {
if ctx.Err() != nil {
return nil
}
userDB := i.getTSDB(userID)
userDB.postingCache.PurgeExpiredItems()
}

return nil
}

func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckResult {
userDB := i.getTSDB(userID)
if userDB == nil || userDB.shipper == nil {
Expand Down
36 changes: 33 additions & 3 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5525,6 +5525,7 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) {

func TestExpendedPostingsCache(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}

runQuery := func(t *testing.T, ctx context.Context, i *Ingester, matchers []*client.LabelMatcher) []client.TimeSeriesChunk {
Expand All @@ -5540,9 +5541,10 @@ func TestExpendedPostingsCache(t *testing.T) {
}

tc := map[string]struct {
cacheConfig cortex_tsdb.TSDBPostingsCacheConfig
expectedBlockPostingCall int
expectedHeadPostingCall int
cacheConfig cortex_tsdb.TSDBPostingsCacheConfig
expectedBlockPostingCall int
expectedHeadPostingCall int
shouldExpireDueInactivity bool
}{
"cacheDisabled": {
expectedBlockPostingCall: 0,
Expand Down Expand Up @@ -5594,6 +5596,23 @@ func TestExpendedPostingsCache(t *testing.T) {
},
},
},
"expire due inactivity": {
expectedBlockPostingCall: 1,
expectedHeadPostingCall: 1,
shouldExpireDueInactivity: true,
cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{
Blocks: cortex_tsdb.PostingsCacheConfig{
Ttl: time.Second,
MaxBytes: 1024 * 1024 * 1024,
Enabled: true,
},
Head: cortex_tsdb.PostingsCacheConfig{
Ttl: time.Second,
MaxBytes: 1024 * 1024 * 1024,
Enabled: true,
},
},
},
}

for name, c := range tc {
Expand Down Expand Up @@ -5790,6 +5809,17 @@ func TestExpendedPostingsCache(t *testing.T) {
require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: "extra", Value: "1"}}), 1)
// Return cached value from block and bypass head
require.Equal(t, int64(0), postingsForMatchersCalls.Load())

if c.shouldExpireDueInactivity {
test.Poll(t, c.cacheConfig.Blocks.Ttl+c.cacheConfig.Head.Ttl+cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval, 0, func() interface{} {
size := 0
for _, userID := range i.getTSDBUsers() {
userDB := i.getTSDB(userID)
size += userDB.postingCache.Size()
}
return size
})
}
})
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
// How often are open TSDBs checked for being idle and closed.
DefaultCloseIdleTSDBInterval = 5 * time.Minute

// How often expired items are cleaned from the PostingsCache
ExpandedCachingExpireInterval = 5 * time.Minute

// How often to check for tenant deletion mark.
DeletionMarkCheckInterval = 1 * time.Hour

Expand Down Expand Up @@ -156,6 +159,9 @@ type TSDBConfig struct {
// How often to check for idle TSDBs for closing. DefaultCloseIdleTSDBInterval is not suitable for testing, so tests can override.
CloseIdleTSDBInterval time.Duration `yaml:"-"`

// How often expired items are cleaned from the PostingsCache. ExpandedCachingExpireInterval is not suitable for testing, so tests can override.
ExpandedCachingExpireInterval time.Duration `yaml:"-"`

// Positive value enables experimental support for exemplars. 0 or less to disable.
MaxExemplars int `yaml:"max_exemplars"`

Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func (f *ExpandedPostingsCacheFactory) NewExpandedPostingsCache(userId string, m
type ExpandedPostingsCache interface {
PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
ExpireSeries(metric labels.Labels)
PurgeExpiredItems()
Size() int
}

type blocksPostingsForMatchersCache struct {
Expand Down Expand Up @@ -166,6 +168,15 @@ func (c *blocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) {
c.seedByHash.incrementSeed(c.userId, metricName)
}

func (c *blocksPostingsForMatchersCache) PurgeExpiredItems() {
c.headCache.expire()
c.blocksCache.expire()
}

func (c *blocksPostingsForMatchersCache) Size() int {
return c.headCache.size() + c.blocksCache.size()
}

func (c *blocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
return c.fetchPostings(blockID, ix, ms...)(ctx)
}
Expand Down Expand Up @@ -365,6 +376,12 @@ func (c *fifoCache[V]) expire() {
}
}

func (c *fifoCache[V]) size() int {
c.cachedMtx.RLock()
defer c.cachedMtx.RUnlock()
return c.cached.Len()
}

func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
r := &cacheEntryPromise[V]{
done: make(chan struct{}),
Expand Down
Loading