From 46dba0a42cbd6ff1966ca99c5a7c33472a2badbb Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 10 Jan 2025 11:47:22 -0800 Subject: [PATCH 1/2] Purge expired postings cache items due inactivity Signed-off-by: alanprot --- pkg/ingester/ingester.go | 20 ++++++++++++ pkg/ingester/ingester_test.go | 36 +++++++++++++++++++-- pkg/storage/tsdb/config.go | 6 ++++ pkg/storage/tsdb/expanded_postings_cache.go | 17 ++++++++++ 4 files changed, 76 insertions(+), 3 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d46398d4b94..2502ef8c762 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -878,6 +878,14 @@ func (i *Ingester) starting(ctx context.Context) error { servs = append(servs, closeIdleService) } + if i.expandedPostingsCacheFactory != nil { + interval := i.cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval + if interval == 0 { + interval = cortex_tsdb.ExpandedCachingExpireInterval + } + servs = append(servs, services.NewTimerService(interval, nil, i.expirePostingsCache, nil)) + } + var err error i.TSDBState.subservices, err = services.NewManager(servs...) if err == nil { @@ -2794,6 +2802,18 @@ func (i *Ingester) closeAndDeleteIdleUserTSDBs(ctx context.Context) error { return nil } +func (i *Ingester) expirePostingsCache(ctx context.Context) error { + for _, userID := range i.getTSDBUsers() { + if ctx.Err() != nil { + return nil + } + userDB := i.getTSDB(userID) + userDB.postingCache.PurgeExpiredItems() + } + + return nil +} + func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckResult { userDB := i.getTSDB(userID) if userDB == nil || userDB.shipper == nil { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 15fec1d1fd5..1de49fb8811 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5525,6 +5525,7 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) { func TestExpendedPostingsCache(t *testing.T) { cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} runQuery := func(t *testing.T, ctx context.Context, i *Ingester, matchers []*client.LabelMatcher) []client.TimeSeriesChunk { @@ -5540,9 +5541,10 @@ func TestExpendedPostingsCache(t *testing.T) { } tc := map[string]struct { - cacheConfig cortex_tsdb.TSDBPostingsCacheConfig - expectedBlockPostingCall int - expectedHeadPostingCall int + cacheConfig cortex_tsdb.TSDBPostingsCacheConfig + expectedBlockPostingCall int + expectedHeadPostingCall int + shouldExpireDueInactivity bool }{ "cacheDisabled": { expectedBlockPostingCall: 0, @@ -5594,6 +5596,23 @@ func TestExpendedPostingsCache(t *testing.T) { }, }, }, + "expire due inactivity": { + expectedBlockPostingCall: 1, + expectedHeadPostingCall: 1, + shouldExpireDueInactivity: true, + cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{ + Blocks: cortex_tsdb.PostingsCacheConfig{ + Ttl: time.Second, + MaxBytes: 1024 * 1024 * 1024, + Enabled: true, + }, + Head: cortex_tsdb.PostingsCacheConfig{ + Ttl: time.Second, + MaxBytes: 1024 * 1024 * 1024, + Enabled: true, + }, + }, + }, } for name, c := range tc { @@ -5790,6 +5809,17 @@ func TestExpendedPostingsCache(t *testing.T) { require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: "extra", Value: "1"}}), 1) // Return cached value from block and bypass head require.Equal(t, int64(0), postingsForMatchersCalls.Load()) + + if c.shouldExpireDueInactivity { + test.Poll(t, c.cacheConfig.Blocks.Ttl+c.cacheConfig.Head.Ttl+cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval, 0, func() interface{} { + size := 0 + for _, userID := range i.getTSDBUsers() { + userDB := i.getTSDB(userID) + size += userDB.postingCache.Size() + } + return size + }) + } }) } } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 42d3494abee..9ca28ae95fd 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -30,6 +30,9 @@ const ( // How often are open TSDBs checked for being idle and closed. DefaultCloseIdleTSDBInterval = 5 * time.Minute + // How often we should proactively expire expired cache entries + ExpandedCachingExpireInterval = 5 * time.Minute + // How often to check for tenant deletion mark. DeletionMarkCheckInterval = 1 * time.Hour @@ -156,6 +159,9 @@ type TSDBConfig struct { // How often to check for idle TSDBs for closing. DefaultCloseIdleTSDBInterval is not suitable for testing, so tests can override. CloseIdleTSDBInterval time.Duration `yaml:"-"` + // How often are open TSDBs checked for being idle and closed. ExpandedCachingExpireInterval is not suitable for testing, so tests can override. + ExpandedCachingExpireInterval time.Duration `yaml:"-"` + // Positive value enables experimental support for exemplars. 0 or less to disable. MaxExemplars int `yaml:"max_exemplars"` diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 3ea8da709ec..a24087e824f 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -124,6 +124,8 @@ func (f *ExpandedPostingsCacheFactory) NewExpandedPostingsCache(userId string, m type ExpandedPostingsCache interface { PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) ExpireSeries(metric labels.Labels) + PurgeExpiredItems() + Size() int } type blocksPostingsForMatchersCache struct { @@ -166,6 +168,15 @@ func (c *blocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) { c.seedByHash.incrementSeed(c.userId, metricName) } +func (c *blocksPostingsForMatchersCache) PurgeExpiredItems() { + c.headCache.expire() + c.blocksCache.expire() +} + +func (c *blocksPostingsForMatchersCache) Size() int { + return c.headCache.size() + c.blocksCache.size() +} + func (c *blocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { return c.fetchPostings(blockID, ix, ms...)(ctx) } @@ -365,6 +376,12 @@ func (c *fifoCache[V]) expire() { } } +func (c *fifoCache[V]) size() int { + c.cachedMtx.RLock() + defer c.cachedMtx.RUnlock() + return c.cached.Len() +} + func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { r := &cacheEntryPromise[V]{ done: make(chan struct{}), From 112055e7994123ca0831a25b452d93456ec6a76f Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 10 Jan 2025 14:12:48 -0800 Subject: [PATCH 2/2] Fix comments Signed-off-by: alanprot --- pkg/storage/tsdb/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 9ca28ae95fd..afb51d1a416 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -30,7 +30,7 @@ const ( // How often are open TSDBs checked for being idle and closed. DefaultCloseIdleTSDBInterval = 5 * time.Minute - // How often we should proactively expire expired cache entries + // How often expired items are cleaned from the PostingsCache ExpandedCachingExpireInterval = 5 * time.Minute // How often to check for tenant deletion mark. @@ -159,7 +159,7 @@ type TSDBConfig struct { // How often to check for idle TSDBs for closing. DefaultCloseIdleTSDBInterval is not suitable for testing, so tests can override. CloseIdleTSDBInterval time.Duration `yaml:"-"` - // How often are open TSDBs checked for being idle and closed. ExpandedCachingExpireInterval is not suitable for testing, so tests can override. + // How often expired items are cleaned from the PostingsCache. ExpandedCachingExpireInterval is not suitable for testing, so tests can override. ExpandedCachingExpireInterval time.Duration `yaml:"-"` // Positive value enables experimental support for exemplars. 0 or less to disable.