diff --git a/integration/querier_test.go b/integration/querier_test.go index f5b9d744fdb..acbc9dc97d8 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -297,11 +297,11 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { } require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache - if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendInMemory) { + if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items")) // as before require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total")) // as before } - if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendMemcached) { + if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(23-l0CacheHits), "thanos_memcached_operations_total")) // as before + 2 gets - cache hits } diff --git a/pkg/storage/tsdb/multilevel_cache.go b/pkg/storage/tsdb/multilevel_cache.go index 73342e2daa4..5283eedd664 100644 --- a/pkg/storage/tsdb/multilevel_cache.go +++ b/pkg/storage/tsdb/multilevel_cache.go @@ -30,18 +30,35 @@ func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []b func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { misses = keys hits = map[labels.Label][]byte{} - for _, c := range m.caches { - h, m := c.FetchMultiPostings(ctx, blockID, misses) - misses = m + backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{} + for i, c := range m.caches { + backfillMap[c] = []map[labels.Label][]byte{} + h, mi := c.FetchMultiPostings(ctx, blockID, misses) + misses = mi for label, bytes := range h { hits[label] = bytes } + + if i > 0 { + backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h) + } + if len(misses) == 0 { break } } + defer func() { + for cache, hit := range backfillMap { + for _, values := range hit { + for l, b := range values { + cache.StorePostings(blockID, l, b) + } + } + } + }() + return hits, misses } @@ -59,8 +76,11 @@ func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*l } func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { - for _, c := range m.caches { + for i, c := range m.caches { if d, h := c.FetchExpandedPostings(ctx, blockID, matchers); h { + if i > 0 { + m.caches[i-1].StoreExpandedPostings(blockID, matchers, d) + } return d, h } } @@ -84,18 +104,36 @@ func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { misses = ids hits = map[storage.SeriesRef][]byte{} - for _, c := range m.caches { - h, m := c.FetchMultiSeries(ctx, blockID, misses) - misses = m + backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{} + + for i, c := range m.caches { + backfillMap[c] = []map[storage.SeriesRef][]byte{} + h, miss := c.FetchMultiSeries(ctx, blockID, misses) + misses = miss for label, bytes := range h { hits[label] = bytes } + + if i > 0 && len(h) > 0 { + backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h) + } + if len(misses) == 0 { break } } + defer func() { + for cache, hit := range backfillMap { + for _, values := range hit { + for m, b := range values { + cache.StoreSeries(blockID, m, b) + } + } + } + }() + return hits, misses } diff --git a/pkg/storage/tsdb/multilevel_cache_test.go b/pkg/storage/tsdb/multilevel_cache_test.go index e5fcdb13533..749da9da5b5 100644 --- a/pkg/storage/tsdb/multilevel_cache_test.go +++ b/pkg/storage/tsdb/multilevel_cache_test.go @@ -148,9 +148,10 @@ func Test_MultiLevelCache(t *testing.T) { cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2}) }, }, - "[FetchMultiPostings] should fallback only the missing keys on l1": { + "[FetchMultiPostings] should fallback and backfill only the missing keys on l1": { m1ExpectedCalls: map[string][][]interface{}{ "FetchMultiPostings": {{bID, []labels.Label{l1, l2}}}, + "StorePostings": {{bID, l2, v}}, }, m2ExpectedCalls: map[string][][]interface{}{ "FetchMultiPostings": {{bID, []labels.Label{l2}}}, @@ -158,6 +159,9 @@ func Test_MultiLevelCache(t *testing.T) { m1MockedCalls: map[string][]interface{}{ "FetchMultiPostings": {map[labels.Label][]byte{l1: make([]byte, 1)}, []labels.Label{l2}}, }, + m2MockedCalls: map[string][]interface{}{ + "FetchMultiPostings": {map[labels.Label][]byte{l2: v}, []labels.Label{}}, + }, call: func(cache storecache.IndexCache) { cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2}) }, @@ -185,15 +189,19 @@ func Test_MultiLevelCache(t *testing.T) { cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}) }, }, - "[FetchMultiSeries] should fallback only the missing keys on l1": { + "[FetchMultiSeries] should fallback and backfill only the missing keys on l1": { m1ExpectedCalls: map[string][][]interface{}{ "FetchMultiSeries": {{bID, []storage.SeriesRef{1, 2}}}, + "StoreSeries": {{bID, storage.SeriesRef(2), v}}, }, m2ExpectedCalls: map[string][][]interface{}{ "FetchMultiSeries": {{bID, []storage.SeriesRef{2}}}, }, m1MockedCalls: map[string][]interface{}{ - "FetchMultiSeries": {map[storage.SeriesRef][]byte{1: make([]byte, 1)}, []storage.SeriesRef{2}}, + "FetchMultiSeries": {map[storage.SeriesRef][]byte{1: v}, []storage.SeriesRef{2}}, + }, + m2MockedCalls: map[string][]interface{}{ + "FetchMultiSeries": {map[storage.SeriesRef][]byte{2: v}, []storage.SeriesRef{2}}, }, call: func(cache storecache.IndexCache) { cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}) @@ -211,13 +219,17 @@ func Test_MultiLevelCache(t *testing.T) { cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}) }, }, - "[FetchExpandedPostings] Should fallback when miss": { + "[FetchExpandedPostings] Should fallback and backfill when miss": { m1ExpectedCalls: map[string][][]interface{}{ + "StoreExpandedPostings": {{bID, []*labels.Matcher{matcher}, v}}, "FetchExpandedPostings": {{bID, []*labels.Matcher{matcher}}}, }, m2ExpectedCalls: map[string][][]interface{}{ "FetchExpandedPostings": {{bID, []*labels.Matcher{matcher}}}, }, + m2MockedCalls: map[string][]interface{}{ + "FetchExpandedPostings": {v, true}, + }, call: func(cache storecache.IndexCache) { cache.FetchExpandedPostings(ctx, bID, []*labels.Matcher{matcher}) },