From a71fcd134ede902639aef8194ee5e98b527e637b Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 29 Jun 2023 15:09:32 -0700 Subject: [PATCH 1/8] Handling CMK Errors Signed-off-by: Alan Protasio --- go.mod | 2 +- go.sum | 4 +- pkg/compactor/blocks_cleaner.go | 4 + pkg/compactor/blocks_cleaner_test.go | 49 +++++--- pkg/querier/blocks_finder_bucket_index.go | 6 + .../blocks_finder_bucket_index_test.go | 19 +++ pkg/querier/error_translate_queryable.go | 3 + pkg/querier/error_translate_queryable_test.go | 6 + pkg/storage/bucket/client_mock.go | 10 +- pkg/storage/bucket/prefixed_bucket_client.go | 9 +- pkg/storage/bucket/s3/bucket_client.go | 6 +- pkg/storage/bucket/s3/bucket_client_test.go | 57 ++++++++- pkg/storage/bucket/sse_bucket_client.go | 4 + pkg/storage/tsdb/bucketindex/loader.go | 4 +- pkg/storage/tsdb/bucketindex/loader_test.go | 116 ++++++++++++++++++ .../tsdb/bucketindex/markers_bucket_client.go | 5 + pkg/storage/tsdb/bucketindex/storage.go | 13 +- pkg/storage/tsdb/bucketindex/storage_test.go | 13 ++ pkg/storage/tsdb/bucketindex/updater.go | 20 ++- pkg/storage/tsdb/bucketindex/updater_test.go | 44 +++++++ pkg/storage/tsdb/testutil/objstore.go | 48 ++++++++ pkg/storage/tsdb/util.go | 7 ++ .../bucket_index_metadata_fetcher.go | 13 ++ .../bucket_index_metadata_fetcher_test.go | 47 +++++++ pkg/util/validation/limits.go | 7 ++ .../thanos-io/objstore/CHANGELOG.md | 1 + vendor/github.com/thanos-io/objstore/inmem.go | 5 + .../github.com/thanos-io/objstore/objstore.go | 7 ++ .../thanos-io/objstore/prefixed_bucket.go | 5 + .../objstore/providers/azure/azure.go | 5 + .../providers/filesystem/filesystem.go | 5 + .../thanos-io/objstore/providers/gcs/gcs.go | 5 + .../thanos-io/objstore/providers/s3/s3.go | 11 +- .../objstore/providers/swift/swift.go | 5 + .../github.com/thanos-io/objstore/testing.go | 4 + .../github.com/thanos-io/objstore/tracing.go | 4 + vendor/modules.txt | 2 +- 37 files changed, 538 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index 330b9e3bf0e..44b58c5490c 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/sony/gobreaker v0.5.0 github.com/spf13/afero v1.9.5 github.com/stretchr/testify v1.8.4 - github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd + github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea github.com/thanos-io/thanos v0.31.1-0.20230627154113-7cfaf3fe2d43 github.com/uber/jaeger-client-go v2.30.0+incompatible diff --git a/go.sum b/go.sum index af0a964f5ab..53485b38d25 100644 --- a/go.sum +++ b/go.sum @@ -1160,8 +1160,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= -github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd h1:asQ0HomkaUXZuR3J7daBEusMS++3hkYsYM6u8gpmPWM= -github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM= +github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a h1:tXcVeuval1nzdHn1JXqaBmyjuEUcpDI9huPrUF04nR4= +github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM= github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea h1:kzK8sBn2+mo3NAxP+XjAjAqr1hwfxxFUy5CybaBkjAI= github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea/go.mod h1:eIgPaXWgOhNAv6CPPrgu09r0AtT7byBTZy+7WkX0D18= github.com/thanos-io/thanos v0.31.1-0.20230627154113-7cfaf3fe2d43 h1:UHyTPGdDHAoNHuSce5cJ2vEi6g1v8D5ZFBWZ61uTHSM= diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 3259944987c..c38a241d36b 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -325,6 +325,10 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger) if errors.Is(err, bucketindex.ErrIndexCorrupted) { level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it") + } else if errors.Is(err, bucketindex.ErrCustomerManagedKeyError) { + // Give up cleaning if we get access denied + level.Warn(userLogger).Log("msg", err.Error()) + return nil } else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) { return err } diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 52f16d741a1..707d5f36ab6 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -3,7 +3,6 @@ package compactor import ( "context" "crypto/rand" - "errors" "fmt" "path" "strings" @@ -17,14 +16,12 @@ import ( prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" - "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -57,6 +54,37 @@ func TestBlocksCleaner(t *testing.T) { } } +func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { + const userID = "user-1" + + bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t) + bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) + + // Create blocks. + ctx := context.Background() + deletionDelay := 12 * time.Hour + bucketClient = &cortex_testutil.MockBucketFailure{ + Bucket: bucketClient, + GetFailures: map[string]error{ + path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, + }, + } + + cfg := BlocksCleanerConfig{ + DeletionDelay: deletionDelay, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + } + + logger := log.NewNopLogger() + scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) + cfgProvider := newMockConfigProvider() + + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil) + err := cleaner.cleanUser(ctx, userID, true) + require.NoError(t, err) +} + func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) { bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t) @@ -254,7 +282,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { createDeletionMark(t, bucketClient, userID, block4, now.Add(-deletionDelay).Add(-time.Hour)) // To emulate a failure deleting a block, we wrap the bucket client in a mocked one. - bucketClient = &mockBucketFailure{ + bucketClient = &cortex_testutil.MockBucketFailure{ Bucket: bucketClient, DeleteFailures: []string{path.Join(userID, block3.String(), metadata.MetaFilename)}, } @@ -658,19 +686,6 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { } } -type mockBucketFailure struct { - objstore.Bucket - - DeleteFailures []string -} - -func (m *mockBucketFailure) Delete(ctx context.Context, name string) error { - if util.StringsContain(m.DeleteFailures, name) { - return errors.New("mocked delete failure") - } - return m.Bucket.Delete(ctx, name) -} - type mockConfigProvider struct { userRetentionPeriods map[string]time.Duration } diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index 31686fcb7ba..a1fcdbc4e70 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/cortexproject/cortex/pkg/util/validation" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -62,6 +63,11 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, // so the bucket index hasn't been created yet. return nil, nil, nil } + + if errors.Is(err, bucketindex.ErrCustomerManagedKeyError) { + return nil, nil, validation.AccessDeniedError(err.Error()) + } + if err != nil { return nil, nil, err } diff --git a/pkg/querier/blocks_finder_bucket_index_test.go b/pkg/querier/blocks_finder_bucket_index_test.go index 8230d638f9c..f3cace019f1 100644 --- a/pkg/querier/blocks_finder_bucket_index_test.go +++ b/pkg/querier/blocks_finder_bucket_index_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/util/validation" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/stretchr/testify/assert" @@ -241,3 +242,21 @@ func prepareBucketIndexBlocksFinder(t testing.TB, bkt objstore.Bucket) *BucketIn return finder } + +func TestBucketIndexBlocksFinder_GetBlocks_KeyPermissionDenied(t *testing.T) { + const userID = "user-1" + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + bkt = &cortex_testutil.MockBucketFailure{ + Bucket: bkt, + GetFailures: map[string]error{ + path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, + }, + } + + finder := prepareBucketIndexBlocksFinder(t, bkt) + + _, _, err := finder.GetBlocks(context.Background(), userID, 0, 100) + expected := validation.AccessDeniedError("error") + require.IsType(t, expected, err) +} diff --git a/pkg/querier/error_translate_queryable.go b/pkg/querier/error_translate_queryable.go index c0a14a37231..b08e18554e0 100644 --- a/pkg/querier/error_translate_queryable.go +++ b/pkg/querier/error_translate_queryable.go @@ -39,6 +39,9 @@ func TranslateToPromqlAPIError(err error) error { case validation.LimitError: // This will be returned with status code 422 by Prometheus API. return err + case validation.AccessDeniedError: + // This will be returned with status code 422 by Prometheus API. + return err default: if errors.Is(err, context.Canceled) { return err // 422 diff --git a/pkg/querier/error_translate_queryable_test.go b/pkg/querier/error_translate_queryable_test.go index d5d93b72a59..89529ca9840 100644 --- a/pkg/querier/error_translate_queryable_test.go +++ b/pkg/querier/error_translate_queryable_test.go @@ -43,6 +43,12 @@ func TestApiStatusCodes(t *testing.T) { expectedCode: 422, }, + { + err: validation.AccessDeniedError("access denied"), + expectedString: "access denied", + expectedCode: 422, + }, + { err: promql.ErrTooManySamples("query execution"), expectedString: "too many samples", diff --git a/pkg/storage/bucket/client_mock.go b/pkg/storage/bucket/client_mock.go index 1ad90b4dd35..76e24af13f5 100644 --- a/pkg/storage/bucket/client_mock.go +++ b/pkg/storage/bucket/client_mock.go @@ -12,7 +12,10 @@ import ( "github.com/thanos-io/objstore" ) -var errObjectDoesNotExist = errors.New("object does not exist") +var ( + errObjectDoesNotExist = errors.New("object does not exist") + errKeyPermissionDenied = errors.New("object key permission denied") +) // ClientMock mocks objstore.Bucket type ClientMock struct { @@ -175,6 +178,11 @@ func (m *ClientMock) IsObjNotFoundErr(err error) bool { return err == errObjectDoesNotExist } +// IsCustomerManagedKeyError mocks objstore.Bucket.IsCustomerManagedKeyError() +func (m *ClientMock) IsCustomerManagedKeyError(err error) bool { + return err == errKeyPermissionDenied +} + // ObjectSize mocks objstore.Bucket.Attributes() func (m *ClientMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { args := m.Called(ctx, name) diff --git a/pkg/storage/bucket/prefixed_bucket_client.go b/pkg/storage/bucket/prefixed_bucket_client.go index 5f0d8955b72..6486751e12c 100644 --- a/pkg/storage/bucket/prefixed_bucket_client.go +++ b/pkg/storage/bucket/prefixed_bucket_client.go @@ -73,18 +73,23 @@ func (b *PrefixedBucketClient) IsObjNotFoundErr(err error) bool { return b.bucket.IsObjNotFoundErr(err) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *PrefixedBucketClient) IsCustomerManagedKeyError(err error) bool { + return b.bucket.IsCustomerManagedKeyError(err) +} + // Attributes returns attributes of the specified object. func (b *PrefixedBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { return b.bucket.Attributes(ctx, b.fullName(name)) } -// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment +// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment // thanos_objstore_bucket_operation_failures_total metric. func (b *PrefixedBucketClient) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.BucketReader { return b.WithExpectedErrs(fn) } -// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment +// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment // thanos_objstore_bucket_operation_failures_total metric. func (b *PrefixedBucketClient) WithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.Bucket { if ib, ok := b.bucket.(objstore.InstrumentedBucket); ok { diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index ed76d835e5c..020037c5c9c 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -115,7 +115,7 @@ func (b *BucketWithRetries) retry(ctx context.Context, f func() error, operation if lastErr == nil { return nil } - if b.bucket.IsObjNotFoundErr(lastErr) { + if b.bucket.IsObjNotFoundErr(lastErr) || b.bucket.IsCustomerManagedKeyError(lastErr) { return lastErr } retries.Wait() @@ -194,6 +194,10 @@ func (b *BucketWithRetries) IsObjNotFoundErr(err error) bool { return b.bucket.IsObjNotFoundErr(err) } +func (b *BucketWithRetries) IsCustomerManagedKeyError(err error) bool { + return b.bucket.IsCustomerManagedKeyError(err) +} + func (b *BucketWithRetries) Close() error { return b.bucket.Close() } diff --git a/pkg/storage/bucket/s3/bucket_client_test.go b/pkg/storage/bucket/s3/bucket_client_test.go index bc991b4f8df..a6d88f76b79 100644 --- a/pkg/storage/bucket/s3/bucket_client_test.go +++ b/pkg/storage/bucket/s3/bucket_client_test.go @@ -3,6 +3,7 @@ package s3 import ( "bytes" "context" + "errors" "fmt" "io" "testing" @@ -13,6 +14,49 @@ import ( "github.com/thanos-io/objstore" ) +var ( + errNotFound = errors.New("not found") + errKeyDenied = errors.New("key denied") +) + +func TestBucketWithRetries_ShouldRetry(t *testing.T) { + t.Parallel() + + cases := map[string]struct { + err error + retryCount int + }{ + "should not retry on not found": { + err: errNotFound, + retryCount: 1, + }, + "should not retry on key access denied": { + err: errKeyDenied, + retryCount: 1, + }, + } + + for name, tc := range cases { + t.Run(name, func(*testing.T) { + m := mockBucket{ + FailCount: 3, + errToReturn: tc.err, + } + + b := BucketWithRetries{ + logger: log.NewNopLogger(), + bucket: &m, + operationRetries: 5, + retryMinBackoff: 10 * time.Millisecond, + retryMaxBackoff: time.Second, + } + + _, _ = b.Get(context.Background(), "something") + require.Equal(t, 1, m.calledCount) + }) + } +} + func TestBucketWithRetries_UploadSeekable(t *testing.T) { t.Parallel() @@ -102,6 +146,9 @@ func (f *fakeReader) Read(p []byte) (n int, err error) { type mockBucket struct { FailCount int uploadedContent []byte + errToReturn error + + calledCount int } // Upload mocks objstore.Bucket.Upload() @@ -135,7 +182,8 @@ func (m *mockBucket) Iter(ctx context.Context, dir string, f func(string) error, // Get mocks objstore.Bucket.Get() func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - return nil, nil + m.calledCount++ + return nil, m.errToReturn } // GetRange mocks objstore.Bucket.GetRange() @@ -150,7 +198,12 @@ func (m *mockBucket) Exists(ctx context.Context, name string) (bool, error) { // IsObjNotFoundErr mocks objstore.Bucket.IsObjNotFoundErr() func (m *mockBucket) IsObjNotFoundErr(err error) bool { - return false + return err == errNotFound +} + +// IsCustomerManagedKeyError mocks objstore.Bucket.IsCustomerManagedKeyError() +func (m *mockBucket) IsCustomerManagedKeyError(err error) bool { + return err == errKeyDenied } // ObjectSize mocks objstore.Bucket.Attributes() diff --git a/pkg/storage/bucket/sse_bucket_client.go b/pkg/storage/bucket/sse_bucket_client.go index e4901fcf018..36c1678c160 100644 --- a/pkg/storage/bucket/sse_bucket_client.go +++ b/pkg/storage/bucket/sse_bucket_client.go @@ -119,6 +119,10 @@ func (b *SSEBucketClient) IsObjNotFoundErr(err error) bool { return b.bucket.IsObjNotFoundErr(err) } +func (b *SSEBucketClient) IsCustomerManagedKeyError(err error) bool { + return b.bucket.IsCustomerManagedKeyError(err) +} + // Attributes implements objstore.Bucket. func (b *SSEBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { return b.bucket.Attributes(ctx, name) diff --git a/pkg/storage/tsdb/bucketindex/loader.go b/pkg/storage/tsdb/bucketindex/loader.go index 0d2ab7cce33..01f59cbef2d 100644 --- a/pkg/storage/tsdb/bucketindex/loader.go +++ b/pkg/storage/tsdb/bucketindex/loader.go @@ -115,6 +115,8 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) { if errors.Is(err, ErrIndexNotFound) { level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID) + } else if errors.Is(err, ErrCustomerManagedKeyError) { + level.Warn(l.logger).Log("msg", "key access denied when reading bucket index", "user", userID) } else { // We don't track ErrIndexNotFound as failure because it's a legit case (eg. a tenant just // started to remote write and its blocks haven't uploaded to storage yet). @@ -196,7 +198,7 @@ func (l *Loader) updateCachedIndex(ctx context.Context, userID string) { l.loadAttempts.Inc() startTime := time.Now() idx, err := ReadIndex(readCtx, l.bkt, userID, l.cfgProvider, l.logger) - if err != nil && !errors.Is(err, ErrIndexNotFound) { + if err != nil && !errors.Is(err, ErrIndexNotFound) && !errors.Is(err, ErrCustomerManagedKeyError) { l.loadFailures.Inc() level.Warn(l.logger).Log("msg", "unable to update bucket index", "user", userID, "err", err) return diff --git a/pkg/storage/tsdb/bucketindex/loader_test.go b/pkg/storage/tsdb/bucketindex/loader_test.go index b9fe23752a8..44bb9fd1a24 100644 --- a/pkg/storage/tsdb/bucketindex/loader_test.go +++ b/pkg/storage/tsdb/bucketindex/loader_test.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" @@ -572,6 +573,121 @@ func TestLoader_ShouldOffloadIndexIfIdleTimeoutIsReachedDuringBackgroundUpdates( )) } +func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousKeyAcessDenied(t *testing.T) { + user := "user-1" + ctx := context.Background() + reg := prometheus.NewPedanticRegistry() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + // Create the loader. + cfg := LoaderConfig{ + CheckInterval: time.Hour, // Intentionally high to not hit it. + UpdateOnStaleInterval: time.Hour, // Intentionally high to not hit it. + UpdateOnErrorInterval: 0, + IdleTimeout: time.Hour, // Intentionally high to not hit it. + } + + mockedBkt := &cortex_testutil.MockBucketFailure{ + Bucket: bkt, + GetFailures: map[string]error{ + path.Join(user, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, + }, + } + + loader := NewLoader(cfg, mockedBkt, nil, log.NewNopLogger(), reg) + require.NoError(t, services.StartAndAwaitRunning(ctx, loader)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) + }) + + _, err := loader.GetIndex(ctx, user) + require.True(t, errors.Is(err, ErrCustomerManagedKeyError)) + + // Check cached + loader.checkCachedIndexes(ctx) + + loader.bkt = bkt + + // Upload the bucket index. + idx := &Index{ + Version: IndexVersion1, + Blocks: Blocks{ + {ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20}, + }, + BlockDeletionMarks: nil, + UpdatedAt: time.Now().Unix(), + } + require.NoError(t, WriteIndex(ctx, bkt, "user-1", nil, idx)) + + // Wait until the index has been updated in background. + test.Poll(t, 3*time.Second, nil, func() interface{} { + _, err := loader.GetIndex(ctx, "user-1") + // Check cached + loader.checkCachedIndexes(ctx) + return err + }) + + actualIdx, err := loader.GetIndex(ctx, "user-1") + require.NoError(t, err) + assert.Equal(t, idx, actualIdx) + + // Ensure metrics have been updated accordingly. + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures. + # TYPE cortex_bucket_index_load_failures_total counter + cortex_bucket_index_load_failures_total 0 + # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory. + # TYPE cortex_bucket_index_loaded gauge + cortex_bucket_index_loaded 1 + `), + "cortex_bucket_index_loaded", "cortex_bucket_index_load_failures_total", + )) +} + +func TestLoader_GetIndex_ShouldCacheKeyDeniedErrors(t *testing.T) { + user := "user-1" + ctx := context.Background() + reg := prometheus.NewPedanticRegistry() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + bkt = &cortex_testutil.MockBucketFailure{ + Bucket: bkt, + GetFailures: map[string]error{ + path.Join(user, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, + }, + } + + // Create the loader. + loader := NewLoader(prepareLoaderConfig(), bkt, nil, log.NewNopLogger(), reg) + require.NoError(t, services.StartAndAwaitRunning(ctx, loader)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) + }) + + // Request the index multiple times. + for i := 0; i < 10; i++ { + _, err := loader.GetIndex(ctx, "user-1") + require.True(t, errors.Is(err, ErrCustomerManagedKeyError)) + } + + // Ensure metrics have been updated accordingly. + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures. + # TYPE cortex_bucket_index_load_failures_total counter + cortex_bucket_index_load_failures_total 0 + # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory. + # TYPE cortex_bucket_index_loaded gauge + cortex_bucket_index_loaded 0 + # HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts. + # TYPE cortex_bucket_index_loads_total counter + cortex_bucket_index_loads_total 1 + `), + "cortex_bucket_index_loads_total", + "cortex_bucket_index_load_failures_total", + "cortex_bucket_index_loaded", + )) +} + func prepareLoaderConfig() LoaderConfig { return LoaderConfig{ CheckInterval: time.Minute, diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go index e175f10f90b..7c38b8ff3c6 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go @@ -100,6 +100,11 @@ func (b *globalMarkersBucket) IsObjNotFoundErr(err error) bool { return b.parent.IsObjNotFoundErr(err) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *globalMarkersBucket) IsCustomerManagedKeyError(err error) bool { + return b.parent.IsCustomerManagedKeyError(err) +} + // Attributes implements objstore.Bucket. func (b *globalMarkersBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { return b.parent.Attributes(ctx, name) diff --git a/pkg/storage/tsdb/bucketindex/storage.go b/pkg/storage/tsdb/bucketindex/storage.go index 6ebd615d2ef..ac4a59a12b1 100644 --- a/pkg/storage/tsdb/bucketindex/storage.go +++ b/pkg/storage/tsdb/bucketindex/storage.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" + "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/go-kit/log" "github.com/pkg/errors" "github.com/thanos-io/objstore" @@ -15,8 +16,9 @@ import ( ) var ( - ErrIndexNotFound = errors.New("bucket index not found") - ErrIndexCorrupted = errors.New("bucket index corrupted") + ErrIndexNotFound = errors.New("bucket index not found") + ErrIndexCorrupted = errors.New("bucket index corrupted") + ErrCustomerManagedKeyError = errors.New("access denied: customer key") ) // ReadIndex reads, parses and returns a bucket index from the bucket. @@ -24,11 +26,16 @@ func ReadIndex(ctx context.Context, bkt objstore.Bucket, userID string, cfgProvi userBkt := bucket.NewUserBucketClient(userID, bkt, cfgProvider) // Get the bucket index. - reader, err := userBkt.WithExpectedErrs(userBkt.IsObjNotFoundErr).Get(ctx, IndexCompressedFilename) + reader, err := userBkt.WithExpectedErrs(tsdb.IsObjNotFoundOrCustomerManagedKeyErr(userBkt)).Get(ctx, IndexCompressedFilename) if err != nil { if userBkt.IsObjNotFoundErr(err) { return nil, ErrIndexNotFound } + + if userBkt.IsCustomerManagedKeyError(err) { + return nil, ErrCustomerManagedKeyError + } + return nil, errors.Wrap(err, "read bucket index") } defer runutil.CloseWithLogOnErr(logger, reader, "close bucket index reader") diff --git a/pkg/storage/tsdb/bucketindex/storage_test.go b/pkg/storage/tsdb/bucketindex/storage_test.go index c27e37aac4a..b259fa24b21 100644 --- a/pkg/storage/tsdb/bucketindex/storage_test.go +++ b/pkg/storage/tsdb/bucketindex/storage_test.go @@ -36,6 +36,19 @@ func TestReadIndex_ShouldReturnErrorIfIndexIsCorrupted(t *testing.T) { require.Nil(t, idx) } +func TestReadIndex_ShouldReturnErrorIfKeyAccessDeniedErr(t *testing.T) { + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + bkt = &cortex_testutil.MockBucketFailure{ + Bucket: bkt, + GetFailures: map[string]error{ + path.Join("user-1", "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, + }, + } + idx, err := ReadIndex(context.Background(), bkt, "user-1", nil, log.NewNopLogger()) + require.Equal(t, ErrCustomerManagedKeyError, err) + require.Nil(t, idx) +} + func TestReadIndex_ShouldReturnTheParsedIndexOnSuccess(t *testing.T) { const userID = "user-1" diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index f46e7cddfde..b4edde5486f 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -7,6 +7,7 @@ import ( "path" "time" + "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" @@ -21,10 +22,11 @@ import ( ) var ( - ErrBlockMetaNotFound = block.ErrorSyncMetaNotFound - ErrBlockMetaCorrupted = block.ErrorSyncMetaCorrupted - ErrBlockDeletionMarkNotFound = errors.New("block deletion mark not found") - ErrBlockDeletionMarkCorrupted = errors.New("block deletion mark corrupted") + ErrBlockMetaNotFound = block.ErrorSyncMetaNotFound + ErrBlockMetaCorrupted = block.ErrorSyncMetaCorrupted + ErrBlockMetaKeyAccessDeniedErr = errors.New("block meta file key access denied error") + ErrBlockDeletionMarkNotFound = errors.New("block deletion mark not found") + ErrBlockDeletionMarkCorrupted = errors.New("block deletion mark corrupted") ) // Updater is responsible to generate an update in-memory bucket index. @@ -108,6 +110,11 @@ func (w *Updater) updateBlocks(ctx context.Context, old []*Block) (blocks []*Blo level.Warn(w.logger).Log("msg", "skipped partial block when updating bucket index", "block", id.String()) continue } + if errors.Is(err, ErrBlockMetaKeyAccessDeniedErr) { + partials[id] = err + level.Warn(w.logger).Log("msg", "skipped partial block when updating bucket index due key permission", "block", id.String()) + continue + } if errors.Is(err, ErrBlockMetaCorrupted) { partials[id] = err level.Error(w.logger).Log("msg", "skipped block with corrupted meta.json when updating bucket index", "block", id.String(), "err", err) @@ -123,10 +130,13 @@ func (w *Updater) updateBlockIndexEntry(ctx context.Context, id ulid.ULID) (*Blo metaFile := path.Join(id.String(), block.MetaFilename) // Get the block's meta.json file. - r, err := w.bkt.ReaderWithExpectedErrs(w.bkt.IsObjNotFoundErr).Get(ctx, metaFile) + r, err := w.bkt.ReaderWithExpectedErrs(tsdb.IsObjNotFoundOrCustomerManagedKeyErr(w.bkt)).Get(ctx, metaFile) if w.bkt.IsObjNotFoundErr(err) { return nil, ErrBlockMetaNotFound } + if w.bkt.IsCustomerManagedKeyError(err) { + return nil, ErrBlockMetaKeyAccessDeniedErr + } if err != nil { return nil, errors.Wrapf(err, "get block meta file: %v", metaFile) } diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index c42fbeaeebe..c0c76fde210 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -138,6 +138,50 @@ func TestUpdater_UpdateIndex_ShouldNotIncreaseOperationFailureMetric(t *testing. `), "thanos_objstore_bucket_operation_failures_total")) } +func TestUpdater_UpdateIndex_ShouldNotIncreaseOperationFailureMetricCustomerKey(t *testing.T) { + const userID = "user-1" + + bkt, _ := testutil.PrepareFilesystemBucket(t) + + ctx := context.Background() + logger := log.NewNopLogger() + registry := prometheus.NewRegistry() + + // Mock some blocks in the storage. + bkt = BucketWithGlobalMarkers(bkt) + bkt = objstore.BucketWithMetrics("test-bucket", bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry)) + block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) + block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) + + bkt = &testutil.MockBucketFailure{ + Bucket: bkt, + GetFailures: map[string]error{ + path.Join(userID, block2.ULID.String(), "meta.json"): testutil.ErrKeyAccessDeniedError, + }, + } + + w := NewUpdater(bkt, userID, nil, logger) + idx, partials, _, err := w.UpdateIndex(ctx, nil) + require.NoError(t, err) + assert.Len(t, partials, 1) + assert.True(t, errors.Is(partials[block2.ULID], ErrBlockMetaKeyAccessDeniedErr)) + assertBucketIndexEqual(t, idx, bkt, userID, + []tsdb.BlockMeta{block1}, + []*metadata.DeletionMark{}) + + assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` + # HELP thanos_objstore_bucket_operation_failures_total Total number of operations against a bucket that failed, but were not expected to fail in certain way from caller perspective. Those errors have to be investigated. + # TYPE thanos_objstore_bucket_operation_failures_total counter + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="attributes"} 0 + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="delete"} 0 + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="exists"} 0 + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="get"} 0 + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="get_range"} 0 + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="iter"} 0 + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="upload"} 0 + `), "thanos_objstore_bucket_operation_failures_total")) +} + func TestUpdater_UpdateIndex_ShouldSkipBlocksWithCorruptedMeta(t *testing.T) { const userID = "user-1" diff --git a/pkg/storage/tsdb/testutil/objstore.go b/pkg/storage/tsdb/testutil/objstore.go index 04dc2208f08..3abfdb0ed46 100644 --- a/pkg/storage/tsdb/testutil/objstore.go +++ b/pkg/storage/tsdb/testutil/objstore.go @@ -1,15 +1,21 @@ package testutil import ( + "context" + "io" "os" "testing" + "github.com/cortexproject/cortex/pkg/util" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" ) +var ErrKeyAccessDeniedError = errors.New("test key access denied") + func PrepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) { storageDir, err := os.MkdirTemp(os.TempDir(), "bucket") require.NoError(t, err) @@ -23,3 +29,45 @@ func PrepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) { return objstore.BucketWithMetrics("test", bkt, nil), storageDir } + +type MockBucketFailure struct { + objstore.Bucket + + DeleteFailures []string + GetFailures map[string]error +} + +func (m *MockBucketFailure) Delete(ctx context.Context, name string) error { + if util.StringsContain(m.DeleteFailures, name) { + return errors.New("mocked delete failure") + } + return m.Bucket.Delete(ctx, name) +} + +func (m *MockBucketFailure) Get(ctx context.Context, name string) (io.ReadCloser, error) { + if e, ok := m.GetFailures[name]; ok { + return nil, e + } + + return m.Bucket.Get(ctx, name) +} + +func (m *MockBucketFailure) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket { + if ibkt, ok := m.Bucket.(objstore.InstrumentedBucket); ok { + return &MockBucketFailure{Bucket: ibkt.WithExpectedErrs(expectedFunc), DeleteFailures: m.DeleteFailures, GetFailures: m.GetFailures} + } + + return m.WithExpectedErrs(expectedFunc) +} + +func (m *MockBucketFailure) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + if ibkt, ok := m.Bucket.(objstore.InstrumentedBucket); ok { + return &MockBucketFailure{Bucket: ibkt.WithExpectedErrs(expectedFunc), DeleteFailures: m.DeleteFailures, GetFailures: m.GetFailures} + } + + return m +} + +func (m *MockBucketFailure) IsCustomerManagedKeyError(err error) bool { + return errors.Is(err, ErrKeyAccessDeniedError) +} diff --git a/pkg/storage/tsdb/util.go b/pkg/storage/tsdb/util.go index c13f5a2b7c4..28dc7948a4d 100644 --- a/pkg/storage/tsdb/util.go +++ b/pkg/storage/tsdb/util.go @@ -2,6 +2,7 @@ package tsdb import ( "github.com/oklog/ulid" + "github.com/thanos-io/objstore" "github.com/cortexproject/cortex/pkg/ingester/client" ) @@ -15,3 +16,9 @@ func HashBlockID(id ulid.ULID) uint32 { } return h } + +func IsObjNotFoundOrCustomerManagedKeyErr(bkt objstore.Bucket) objstore.IsOpFailureExpectedFunc { + return func(err error) bool { + return bkt.IsObjNotFoundErr(err) || bkt.IsCustomerManagedKeyError(err) + } +} diff --git a/pkg/storegateway/bucket_index_metadata_fetcher.go b/pkg/storegateway/bucket_index_metadata_fetcher.go index 60cb1cbb374..dfc3b9a4744 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher.go @@ -19,6 +19,7 @@ import ( const ( corruptedBucketIndex = "corrupted-bucket-index" + keyAccessDenied = "key-access-denied" noBucketIndex = "no-bucket-index" ) @@ -93,6 +94,18 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. return nil, nil, nil } + + if errors.Is(err, bucketindex.ErrBlockMetaKeyAccessDeniedErr) { + // Do not fail if the permission to the bucket key got revoked. We'll act as if the tenant has no bucket index, but the query + // will fail anyway in the querier (the querier fails in the querier if it cannot load the bucket index). + // This will cause the store-gateway to unload all blocks + level.Error(f.logger).Log("msg", "bucket index key permission revoked", "user", f.userID, "err", err) + f.metrics.Synced.WithLabelValues(keyAccessDenied).Set(1) + f.metrics.Submit() + + return nil, nil, nil + } + if err != nil { f.metrics.Synced.WithLabelValues(block.FailedMeta).Set(1) f.metrics.Submit() diff --git a/pkg/storegateway/bucket_index_metadata_fetcher_test.go b/pkg/storegateway/bucket_index_metadata_fetcher_test.go index b7279fb018c..015a7946d50 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher_test.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher_test.go @@ -99,6 +99,53 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) { )) } +func TestBucketIndexMetadataFetcher_Fetch_KeyPermissionDenied(t *testing.T) { + const userID = "user-1" + + bkt := &bucket.ClientMock{} + reg := prometheus.NewPedanticRegistry() + ctx := context.Background() + + bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucketindex.ErrBlockMetaKeyAccessDeniedErr) + + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, log.NewNopLogger(), reg, nil) + metas, _, err := fetcher.Fetch(ctx) + require.NoError(t, err) + assert.Empty(t, metas) + + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP blocks_meta_modified Number of blocks whose metadata changed + # TYPE blocks_meta_modified gauge + blocks_meta_modified{modified="replica-label-removed"} 0 + # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures + # TYPE blocks_meta_sync_failures_total counter + blocks_meta_sync_failures_total 0 + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-bucket-index"} 0 + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="key-access-denied"} 1 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 0 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-bucket-index"} 0 + blocks_meta_synced{state="no-meta-json"} 0 + blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="too-fresh"} 0 + # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts + # TYPE blocks_meta_syncs_total counter + blocks_meta_syncs_total 1 + `), + "blocks_meta_modified", + "blocks_meta_sync_failures_total", + "blocks_meta_synced", + "blocks_meta_syncs_total", + )) +} + func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) { t.Parallel() const userID = "user-1" diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 00b386923b4..1c690d69920 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -24,6 +24,13 @@ const ( GlobalIngestionRateStrategy = "global" ) +// AccessDeniedError are errors that do not comply with the limits specified. +type AccessDeniedError string + +func (e AccessDeniedError) Error() string { + return string(e) +} + // LimitError are errors that do not comply with the limits specified. type LimitError string diff --git a/vendor/github.com/thanos-io/objstore/CHANGELOG.md b/vendor/github.com/thanos-io/objstore/CHANGELOG.md index 05aafa7bd32..270f6d4f029 100644 --- a/vendor/github.com/thanos-io/objstore/CHANGELOG.md +++ b/vendor/github.com/thanos-io/objstore/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#43](https://github.com/thanos-io/objstore/pull/43) filesystem: abort filesystem bucket operations if the context has been cancelled - [#44](https://github.com/thanos-io/objstore/pull/44) Add new metric to count total number of fetched bytes from bucket - [#50](https://github.com/thanos-io/objstore/pull/50) Add Huawei Cloud OBS Object Storage Support +- [#59](https://github.com/thanos-io/objstore/pull/59) Adding method `IsCustomerManagedKeyError` on the bucket interface. ### Changed - [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`. diff --git a/vendor/github.com/thanos-io/objstore/inmem.go b/vendor/github.com/thanos-io/objstore/inmem.go index ac36e7f469f..aee4aec6cfa 100644 --- a/vendor/github.com/thanos-io/objstore/inmem.go +++ b/vendor/github.com/thanos-io/objstore/inmem.go @@ -207,6 +207,11 @@ func (b *InMemBucket) IsObjNotFoundErr(err error) bool { return errors.Is(err, errNotFound) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *InMemBucket) IsCustomerManagedKeyError(_ error) bool { + return false +} + func (b *InMemBucket) Close() error { return nil } // Name returns the bucket name. diff --git a/vendor/github.com/thanos-io/objstore/objstore.go b/vendor/github.com/thanos-io/objstore/objstore.go index cc6034d5dfd..fb9b0bac8d0 100644 --- a/vendor/github.com/thanos-io/objstore/objstore.go +++ b/vendor/github.com/thanos-io/objstore/objstore.go @@ -85,6 +85,9 @@ type BucketReader interface { // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. IsObjNotFoundErr(err error) bool + // IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. + IsCustomerManagedKeyError(err error) bool + // Attributes returns information about the specified object. Attributes(ctx context.Context, name string) (ObjectAttributes, error) } @@ -603,6 +606,10 @@ func (b *metricBucket) IsObjNotFoundErr(err error) bool { return b.bkt.IsObjNotFoundErr(err) } +func (b *metricBucket) IsCustomerManagedKeyError(err error) bool { + return b.bkt.IsCustomerManagedKeyError(err) +} + func (b *metricBucket) Close() error { return b.bkt.Close() } diff --git a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go index 130f14d439d..41448011729 100644 --- a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go +++ b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go @@ -74,6 +74,11 @@ func (p *PrefixedBucket) IsObjNotFoundErr(err error) bool { return p.bkt.IsObjNotFoundErr(err) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (p *PrefixedBucket) IsCustomerManagedKeyError(err error) bool { + return p.bkt.IsCustomerManagedKeyError(err) +} + // Attributes returns information about the specified object. func (p PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { return p.bkt.Attributes(ctx, conditionalPrefix(p.prefix, name)) diff --git a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go index 23e66169db6..a5f41ed1769 100644 --- a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go +++ b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go @@ -235,6 +235,11 @@ func (b *Bucket) IsObjNotFoundErr(err error) bool { return bloberror.HasCode(err, bloberror.BlobNotFound) || bloberror.HasCode(err, bloberror.InvalidURI) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *Bucket) IsCustomerManagedKeyError(_ error) bool { + return false +} + func (b *Bucket) getBlobReader(ctx context.Context, name string, httpRange blob.HTTPRange) (io.ReadCloser, error) { level.Debug(b.logger).Log("msg", "getting blob", "blob", name, "offset", httpRange.Offset, "length", httpRange.Count) if name == "" { diff --git a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go index 3206b91e717..8ccd33b10f6 100644 --- a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go +++ b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go @@ -258,6 +258,11 @@ func (b *Bucket) IsObjNotFoundErr(err error) bool { return os.IsNotExist(errors.Cause(err)) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *Bucket) IsCustomerManagedKeyError(_ error) bool { + return false +} + func (b *Bucket) Close() error { return nil } // Name returns the bucket name. diff --git a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go index 947e641a30f..8b107c83d82 100644 --- a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go +++ b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go @@ -188,6 +188,11 @@ func (b *Bucket) IsObjNotFoundErr(err error) bool { return errors.Is(err, storage.ErrObjectNotExist) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *Bucket) IsCustomerManagedKeyError(_ error) bool { + return false +} + func (b *Bucket) Close() error { return b.closer.Close() } diff --git a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go index 729ee7eb6de..6bc2b64438e 100644 --- a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go +++ b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go @@ -98,6 +98,9 @@ const ( // Storage class header. amzStorageClass = "X-Amz-Storage-Class" + + // amzKmsKeyAccessDeniedErrorMessage is the error message returned by s3 when the permissions to the KMS key is revoked. + amzKmsKeyAccessDeniedErrorMessage = "The ciphertext refers to a customer master key that does not exist, does not exist in this region, or you are not allowed to access." ) var DefaultConfig = Config{ @@ -144,7 +147,7 @@ type Config struct { } // SSEConfig deals with the configuration of SSE for Minio. The following options are valid: -// kmsencryptioncontext == https://docs.aws.amazon.com/kms/latest/developerguide/services-s3.html#s3-encryption-context +// KMSEncryptionContext == https://docs.aws.amazon.com/kms/latest/developerguide/services-s3.html#s3-encryption-context type SSEConfig struct { Type string `yaml:"type"` KMSKeyID string `yaml:"kms_key_id"` @@ -538,6 +541,12 @@ func (b *Bucket) IsObjNotFoundErr(err error) bool { return minio.ToErrorResponse(errors.Cause(err)).Code == "NoSuchKey" } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *Bucket) IsCustomerManagedKeyError(err error) bool { + errResponse := minio.ToErrorResponse(errors.Cause(err)) + return errResponse.Code == "AccessDenied" && errResponse.Message == amzKmsKeyAccessDeniedErrorMessage +} + func (b *Bucket) Close() error { return nil } // getServerSideEncryption returns the SSE to use. diff --git a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go index f30c655dc74..c24d03fd2d5 100644 --- a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go +++ b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go @@ -290,6 +290,11 @@ func (c *Container) IsObjNotFoundErr(err error) bool { return errors.Is(err, swift.ObjectNotFound) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *Container) IsCustomerManagedKeyError(_ error) bool { + return false +} + // Upload writes the contents of the reader as an object into the container. func (c *Container) Upload(_ context.Context, name string, r io.Reader) (err error) { size, err := objstore.TryToGetSize(r) diff --git a/vendor/github.com/thanos-io/objstore/testing.go b/vendor/github.com/thanos-io/objstore/testing.go index d750142af1d..4e41b278825 100644 --- a/vendor/github.com/thanos-io/objstore/testing.go +++ b/vendor/github.com/thanos-io/objstore/testing.go @@ -308,3 +308,7 @@ func (d *delayingBucket) IsObjNotFoundErr(err error) bool { // No delay for a local operation. return d.bkt.IsObjNotFoundErr(err) } + +func (d *delayingBucket) IsCustomerManagedKeyError(err error) bool { + return d.bkt.IsCustomerManagedKeyError(err) +} diff --git a/vendor/github.com/thanos-io/objstore/tracing.go b/vendor/github.com/thanos-io/objstore/tracing.go index 56f18ebead8..9f09df668e7 100644 --- a/vendor/github.com/thanos-io/objstore/tracing.go +++ b/vendor/github.com/thanos-io/objstore/tracing.go @@ -101,6 +101,10 @@ func (t TracingBucket) IsObjNotFoundErr(err error) bool { return t.bkt.IsObjNotFoundErr(err) } +func (t TracingBucket) IsCustomerManagedKeyError(err error) bool { + return t.bkt.IsCustomerManagedKeyError(err) +} + func (t TracingBucket) WithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) Bucket { if ib, ok := t.bkt.(InstrumentedBucket); ok { return TracingBucket{bkt: ib.WithExpectedErrs(expectedFunc)} diff --git a/vendor/modules.txt b/vendor/modules.txt index b120958f4e2..f32adcbe7a0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -832,7 +832,7 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd +# github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a ## explicit; go 1.18 github.com/thanos-io/objstore github.com/thanos-io/objstore/exthttp From 0738d983ae970319f311dd23476847415dcf9724 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 29 Jun 2023 15:11:17 -0700 Subject: [PATCH 2/8] lint Signed-off-by: Alan Protasio --- pkg/querier/blocks_finder_bucket_index.go | 3 +- .../blocks_finder_bucket_index_test.go | 3 +- pkg/storage/tsdb/bucketindex/loader_test.go | 4 +- pkg/storage/tsdb/bucketindex/storage.go | 3 +- pkg/storage/tsdb/bucketindex/updater.go | 18 ++++--- pkg/storage/tsdb/bucketindex/updater_test.go | 2 +- pkg/storage/tsdb/testutil/objstore.go | 3 +- .../bucket_index_metadata_fetcher.go | 9 ++-- .../bucket_index_metadata_fetcher_test.go | 4 +- pkg/storegateway/bucket_stores.go | 52 ++++++++++++++----- 10 files changed, 67 insertions(+), 34 deletions(-) diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index a1fcdbc4e70..fc093ebbe00 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -4,13 +4,14 @@ import ( "context" "time" - "github.com/cortexproject/cortex/pkg/util/validation" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/services" diff --git a/pkg/querier/blocks_finder_bucket_index_test.go b/pkg/querier/blocks_finder_bucket_index_test.go index f3cace019f1..68523816b83 100644 --- a/pkg/querier/blocks_finder_bucket_index_test.go +++ b/pkg/querier/blocks_finder_bucket_index_test.go @@ -7,13 +7,14 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/util/validation" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util/services" diff --git a/pkg/storage/tsdb/bucketindex/loader_test.go b/pkg/storage/tsdb/bucketindex/loader_test.go index 44bb9fd1a24..29764dae3e7 100644 --- a/pkg/storage/tsdb/bucketindex/loader_test.go +++ b/pkg/storage/tsdb/bucketindex/loader_test.go @@ -604,7 +604,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousKeyAcessDenied(t *testing require.True(t, errors.Is(err, ErrCustomerManagedKeyError)) // Check cached - loader.checkCachedIndexes(ctx) + require.NoError(t, loader.checkCachedIndexes(ctx)) loader.bkt = bkt @@ -623,7 +623,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousKeyAcessDenied(t *testing test.Poll(t, 3*time.Second, nil, func() interface{} { _, err := loader.GetIndex(ctx, "user-1") // Check cached - loader.checkCachedIndexes(ctx) + require.NoError(t, loader.checkCachedIndexes(ctx)) return err }) diff --git a/pkg/storage/tsdb/bucketindex/storage.go b/pkg/storage/tsdb/bucketindex/storage.go index ac4a59a12b1..f5ac6d452de 100644 --- a/pkg/storage/tsdb/bucketindex/storage.go +++ b/pkg/storage/tsdb/bucketindex/storage.go @@ -6,11 +6,12 @@ import ( "context" "encoding/json" - "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/go-kit/log" "github.com/pkg/errors" "github.com/thanos-io/objstore" + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/util/runutil" ) diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index b4edde5486f..06f8b824f1e 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -7,7 +7,6 @@ import ( "path" "time" - "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" @@ -16,17 +15,20 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/bucket" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/runutil" ) var ( - ErrBlockMetaNotFound = block.ErrorSyncMetaNotFound - ErrBlockMetaCorrupted = block.ErrorSyncMetaCorrupted - ErrBlockMetaKeyAccessDeniedErr = errors.New("block meta file key access denied error") - ErrBlockDeletionMarkNotFound = errors.New("block deletion mark not found") - ErrBlockDeletionMarkCorrupted = errors.New("block deletion mark corrupted") + ErrBlockMetaNotFound = block.ErrorSyncMetaNotFound + ErrBlockMetaCorrupted = block.ErrorSyncMetaCorrupted + ErrBlockDeletionMarkNotFound = errors.New("block deletion mark not found") + ErrBlockDeletionMarkCorrupted = errors.New("block deletion mark corrupted") + + errBlockMetaKeyAccessDeniedErr = errors.New("block meta file key access denied error") ) // Updater is responsible to generate an update in-memory bucket index. @@ -110,7 +112,7 @@ func (w *Updater) updateBlocks(ctx context.Context, old []*Block) (blocks []*Blo level.Warn(w.logger).Log("msg", "skipped partial block when updating bucket index", "block", id.String()) continue } - if errors.Is(err, ErrBlockMetaKeyAccessDeniedErr) { + if errors.Is(err, errBlockMetaKeyAccessDeniedErr) { partials[id] = err level.Warn(w.logger).Log("msg", "skipped partial block when updating bucket index due key permission", "block", id.String()) continue @@ -135,7 +137,7 @@ func (w *Updater) updateBlockIndexEntry(ctx context.Context, id ulid.ULID) (*Blo return nil, ErrBlockMetaNotFound } if w.bkt.IsCustomerManagedKeyError(err) { - return nil, ErrBlockMetaKeyAccessDeniedErr + return nil, errBlockMetaKeyAccessDeniedErr } if err != nil { return nil, errors.Wrapf(err, "get block meta file: %v", metaFile) diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index c0c76fde210..e9d40f37374 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -164,7 +164,7 @@ func TestUpdater_UpdateIndex_ShouldNotIncreaseOperationFailureMetricCustomerKey( idx, partials, _, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assert.Len(t, partials, 1) - assert.True(t, errors.Is(partials[block2.ULID], ErrBlockMetaKeyAccessDeniedErr)) + assert.True(t, errors.Is(partials[block2.ULID], errBlockMetaKeyAccessDeniedErr)) assertBucketIndexEqual(t, idx, bkt, userID, []tsdb.BlockMeta{block1}, []*metadata.DeletionMark{}) diff --git a/pkg/storage/tsdb/testutil/objstore.go b/pkg/storage/tsdb/testutil/objstore.go index 3abfdb0ed46..7b29900a414 100644 --- a/pkg/storage/tsdb/testutil/objstore.go +++ b/pkg/storage/tsdb/testutil/objstore.go @@ -6,11 +6,12 @@ import ( "os" "testing" - "github.com/cortexproject/cortex/pkg/util" "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" ) diff --git a/pkg/storegateway/bucket_index_metadata_fetcher.go b/pkg/storegateway/bucket_index_metadata_fetcher.go index dfc3b9a4744..6c8783e3b93 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher.go @@ -95,15 +95,14 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. return nil, nil, nil } - if errors.Is(err, bucketindex.ErrBlockMetaKeyAccessDeniedErr) { - // Do not fail if the permission to the bucket key got revoked. We'll act as if the tenant has no bucket index, but the query - // will fail anyway in the querier (the querier fails in the querier if it cannot load the bucket index). - // This will cause the store-gateway to unload all blocks + if errors.Is(err, bucketindex.ErrCustomerManagedKeyError) { + // stop the job and return the error + // this error should be used to return Access Denied to the caller level.Error(f.logger).Log("msg", "bucket index key permission revoked", "user", f.userID, "err", err) f.metrics.Synced.WithLabelValues(keyAccessDenied).Set(1) f.metrics.Submit() - return nil, nil, nil + return nil, nil, bucketindex.ErrCustomerManagedKeyError } if err != nil { diff --git a/pkg/storegateway/bucket_index_metadata_fetcher_test.go b/pkg/storegateway/bucket_index_metadata_fetcher_test.go index 015a7946d50..efca963d50d 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher_test.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher_test.go @@ -106,11 +106,11 @@ func TestBucketIndexMetadataFetcher_Fetch_KeyPermissionDenied(t *testing.T) { reg := prometheus.NewPedanticRegistry() ctx := context.Background() - bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucketindex.ErrBlockMetaKeyAccessDeniedErr) + bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucketindex.ErrCustomerManagedKeyError) fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, log.NewNopLogger(), reg, nil) metas, _, err := fetcher.Fetch(ctx) - require.NoError(t, err) + require.ErrorIs(t, bucketindex.ErrCustomerManagedKeyError, err) assert.Empty(t, metas) assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 879fcc4b29a..f12036b42f4 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -29,8 +29,11 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/logging" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util/backoff" @@ -64,7 +67,7 @@ type BucketStores struct { // Keeps a bucket store for each tenant. storesMu sync.RWMutex - stores map[string]*store.BucketStore + stores map[string]*BucketStoreWithLastError // Metrics. syncTimes prometheus.Histogram @@ -73,6 +76,11 @@ type BucketStores struct { tenantsSynced prometheus.Gauge } +type BucketStoreWithLastError struct { + *store.BucketStore + err error +} + // NewBucketStores makes a new BucketStores. func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg) @@ -94,7 +102,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra limits: limits, bucket: cachingBucket, shardingStrategy: shardingStrategy, - stores: map[string]*store.BucketStore{}, + stores: map[string]*BucketStoreWithLastError{}, logLevel: logLevel, bucketStoreMetrics: NewBucketStoreMetrics(), metaFetcherMetrics: NewMetadataFetcherMetrics(), @@ -192,7 +200,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte type job struct { userID string - store *store.BucketStore + store *BucketStoreWithLastError } wg := &sync.WaitGroup{} @@ -225,10 +233,16 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte defer wg.Done() for job := range jobs { - if err := f(ctx, job.store); err != nil { - errsMx.Lock() - errs.Add(errors.Wrapf(err, "failed to synchronize TSDB blocks for user %s", job.userID)) - errsMx.Unlock() + if err := f(ctx, job.store.BucketStore); err != nil { + if errors.Is(err, bucketindex.ErrCustomerManagedKeyError) { + job.store.err = err + } else { + errsMx.Lock() + errs.Add(errors.Wrapf(err, "failed to synchronize TSDB blocks for user %s", job.userID)) + errsMx.Unlock() + } + } else { + job.store.err = nil } } }() @@ -286,10 +300,20 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return nil } - return store.Series(req, spanSeriesServer{ + if store.err != nil && errors.Is(store.err, bucketindex.ErrCustomerManagedKeyError) { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", store.err) + } + + err := store.Series(req, spanSeriesServer{ Store_SeriesServer: srv, ctx: spanCtx, }) + + if err != nil && errors.Is(err, bucketindex.ErrCustomerManagedKeyError) { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + } + + return err } // LabelNames implements the Storegateway proto service. @@ -344,7 +368,7 @@ func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) { return users, err } -func (u *BucketStores) getStore(userID string) *store.BucketStore { +func (u *BucketStores) getStore(userID string) *BucketStoreWithLastError { u.storesMu.RLock() defer u.storesMu.RUnlock() return u.stores[userID] @@ -387,7 +411,7 @@ func (u *BucketStores) closeEmptyBucketStore(userID string) error { return bs.Close() } -func isEmptyBucketStore(bs *store.BucketStore) bool { +func isEmptyBucketStore(bs *BucketStoreWithLastError) bool { min, max := bs.TimeRange() return min == math.MaxInt64 && max == math.MinInt64 } @@ -396,7 +420,7 @@ func (u *BucketStores) syncDirForUser(userID string) string { return filepath.Join(u.cfg.BucketStore.SyncDir, userID) } -func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) { +func (u *BucketStores) getOrCreateStore(userID string) (*BucketStoreWithLastError, error) { // Check if the store already exists. bs := u.getStore(userID) if bs != nil { @@ -500,7 +524,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging()) } - bs, err := store.NewBucketStore( + s, err := store.NewBucketStore( userBkt, fetcher, u.syncDirForUser(userID), @@ -520,6 +544,10 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro return nil, err } + bs = &BucketStoreWithLastError{ + BucketStore: s, + } + u.stores[userID] = bs u.metaFetcherMetrics.AddUserRegistry(userID, fetcherReg) u.bucketStoreMetrics.AddUserRegistry(userID, bucketStoreReg) From 7ee5d1a70249281bb7a2fb3bc269d1a8eb0805fe Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 29 Jun 2023 18:42:18 -0700 Subject: [PATCH 3/8] add test on BucketStores Signed-off-by: Alan Protasio --- pkg/compactor/blocks_cleaner.go | 2 +- pkg/querier/blocks_finder_bucket_index.go | 2 +- pkg/storage/bucket/client.go | 2 + pkg/storage/tsdb/bucketindex/loader.go | 4 +- pkg/storage/tsdb/bucketindex/loader_test.go | 5 +- pkg/storage/tsdb/bucketindex/storage.go | 7 ++- pkg/storage/tsdb/bucketindex/storage_test.go | 3 +- pkg/storage/tsdb/testutil/objstore.go | 8 +++- .../bucket_index_metadata_fetcher.go | 6 +-- .../bucket_index_metadata_fetcher_test.go | 4 +- pkg/storegateway/bucket_stores.go | 8 ++-- pkg/storegateway/bucket_stores_test.go | 47 +++++++++++++++++++ 12 files changed, 76 insertions(+), 22 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index c38a241d36b..ce2bbbb844b 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -325,7 +325,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger) if errors.Is(err, bucketindex.ErrIndexCorrupted) { level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it") - } else if errors.Is(err, bucketindex.ErrCustomerManagedKeyError) { + } else if errors.Is(err, bucket.ErrCustomerManagedKeyError) { // Give up cleaning if we get access denied level.Warn(userLogger).Log("msg", err.Error()) return nil diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index fc093ebbe00..ffa1433b5fc 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -65,7 +65,7 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, return nil, nil, nil } - if errors.Is(err, bucketindex.ErrCustomerManagedKeyError) { + if errors.Is(err, bucket.ErrCustomerManagedKeyError) { return nil, nil, validation.AccessDeniedError(err.Error()) } diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index aee3296dc7d..5f96ac60064 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -40,6 +40,8 @@ var ( SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem} ErrUnsupportedStorageBackend = errors.New("unsupported storage backend") + + ErrCustomerManagedKeyError = errors.New("access denied: customer key") ) // Config holds configuration for accessing long-term storage. diff --git a/pkg/storage/tsdb/bucketindex/loader.go b/pkg/storage/tsdb/bucketindex/loader.go index 01f59cbef2d..b984312b1b6 100644 --- a/pkg/storage/tsdb/bucketindex/loader.go +++ b/pkg/storage/tsdb/bucketindex/loader.go @@ -115,7 +115,7 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) { if errors.Is(err, ErrIndexNotFound) { level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID) - } else if errors.Is(err, ErrCustomerManagedKeyError) { + } else if errors.Is(err, bucket.ErrCustomerManagedKeyError) { level.Warn(l.logger).Log("msg", "key access denied when reading bucket index", "user", userID) } else { // We don't track ErrIndexNotFound as failure because it's a legit case (eg. a tenant just @@ -198,7 +198,7 @@ func (l *Loader) updateCachedIndex(ctx context.Context, userID string) { l.loadAttempts.Inc() startTime := time.Now() idx, err := ReadIndex(readCtx, l.bkt, userID, l.cfgProvider, l.logger) - if err != nil && !errors.Is(err, ErrIndexNotFound) && !errors.Is(err, ErrCustomerManagedKeyError) { + if err != nil && !errors.Is(err, ErrIndexNotFound) && !errors.Is(err, bucket.ErrCustomerManagedKeyError) { l.loadFailures.Inc() level.Warn(l.logger).Log("msg", "unable to update bucket index", "user", userID, "err", err) return diff --git a/pkg/storage/tsdb/bucketindex/loader_test.go b/pkg/storage/tsdb/bucketindex/loader_test.go index 29764dae3e7..3c5c9815b3e 100644 --- a/pkg/storage/tsdb/bucketindex/loader_test.go +++ b/pkg/storage/tsdb/bucketindex/loader_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -601,7 +602,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousKeyAcessDenied(t *testing }) _, err := loader.GetIndex(ctx, user) - require.True(t, errors.Is(err, ErrCustomerManagedKeyError)) + require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyError)) // Check cached require.NoError(t, loader.checkCachedIndexes(ctx)) @@ -667,7 +668,7 @@ func TestLoader_GetIndex_ShouldCacheKeyDeniedErrors(t *testing.T) { // Request the index multiple times. for i := 0; i < 10; i++ { _, err := loader.GetIndex(ctx, "user-1") - require.True(t, errors.Is(err, ErrCustomerManagedKeyError)) + require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyError)) } // Ensure metrics have been updated accordingly. diff --git a/pkg/storage/tsdb/bucketindex/storage.go b/pkg/storage/tsdb/bucketindex/storage.go index f5ac6d452de..771a361024d 100644 --- a/pkg/storage/tsdb/bucketindex/storage.go +++ b/pkg/storage/tsdb/bucketindex/storage.go @@ -17,9 +17,8 @@ import ( ) var ( - ErrIndexNotFound = errors.New("bucket index not found") - ErrIndexCorrupted = errors.New("bucket index corrupted") - ErrCustomerManagedKeyError = errors.New("access denied: customer key") + ErrIndexNotFound = errors.New("bucket index not found") + ErrIndexCorrupted = errors.New("bucket index corrupted") ) // ReadIndex reads, parses and returns a bucket index from the bucket. @@ -34,7 +33,7 @@ func ReadIndex(ctx context.Context, bkt objstore.Bucket, userID string, cfgProvi } if userBkt.IsCustomerManagedKeyError(err) { - return nil, ErrCustomerManagedKeyError + return nil, bucket.ErrCustomerManagedKeyError } return nil, errors.Wrap(err, "read bucket index") diff --git a/pkg/storage/tsdb/bucketindex/storage_test.go b/pkg/storage/tsdb/bucketindex/storage_test.go index b259fa24b21..11369d98df9 100644 --- a/pkg/storage/tsdb/bucketindex/storage_test.go +++ b/pkg/storage/tsdb/bucketindex/storage_test.go @@ -6,6 +6,7 @@ import ( "strings" "testing" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/go-kit/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -45,7 +46,7 @@ func TestReadIndex_ShouldReturnErrorIfKeyAccessDeniedErr(t *testing.T) { }, } idx, err := ReadIndex(context.Background(), bkt, "user-1", nil, log.NewNopLogger()) - require.Equal(t, ErrCustomerManagedKeyError, err) + require.Equal(t, bucket.ErrCustomerManagedKeyError, err) require.Nil(t, idx) } diff --git a/pkg/storage/tsdb/testutil/objstore.go b/pkg/storage/tsdb/testutil/objstore.go index 7b29900a414..46fc1b001bf 100644 --- a/pkg/storage/tsdb/testutil/objstore.go +++ b/pkg/storage/tsdb/testutil/objstore.go @@ -4,6 +4,7 @@ import ( "context" "io" "os" + "strings" "testing" "github.com/pkg/errors" @@ -46,6 +47,11 @@ func (m *MockBucketFailure) Delete(ctx context.Context, name string) error { } func (m *MockBucketFailure) Get(ctx context.Context, name string) (io.ReadCloser, error) { + for prefix, err := range m.GetFailures { + if strings.HasPrefix(name, prefix) { + return nil, err + } + } if e, ok := m.GetFailures[name]; ok { return nil, e } @@ -58,7 +64,7 @@ func (m *MockBucketFailure) WithExpectedErrs(expectedFunc objstore.IsOpFailureEx return &MockBucketFailure{Bucket: ibkt.WithExpectedErrs(expectedFunc), DeleteFailures: m.DeleteFailures, GetFailures: m.GetFailures} } - return m.WithExpectedErrs(expectedFunc) + return m } func (m *MockBucketFailure) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.BucketReader { diff --git a/pkg/storegateway/bucket_index_metadata_fetcher.go b/pkg/storegateway/bucket_index_metadata_fetcher.go index 6c8783e3b93..ac55eef35d3 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher.go @@ -68,7 +68,7 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. start := time.Now() defer func() { f.metrics.SyncDuration.Observe(time.Since(start).Seconds()) - if err != nil { + if err != nil && !errors.Is(err, bucket.ErrCustomerManagedKeyError) { f.metrics.SyncFailures.Inc() } }() @@ -95,14 +95,14 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. return nil, nil, nil } - if errors.Is(err, bucketindex.ErrCustomerManagedKeyError) { + if errors.Is(err, bucket.ErrCustomerManagedKeyError) { // stop the job and return the error // this error should be used to return Access Denied to the caller level.Error(f.logger).Log("msg", "bucket index key permission revoked", "user", f.userID, "err", err) f.metrics.Synced.WithLabelValues(keyAccessDenied).Set(1) f.metrics.Submit() - return nil, nil, bucketindex.ErrCustomerManagedKeyError + return nil, nil, bucket.ErrCustomerManagedKeyError } if err != nil { diff --git a/pkg/storegateway/bucket_index_metadata_fetcher_test.go b/pkg/storegateway/bucket_index_metadata_fetcher_test.go index efca963d50d..094bf9ddce0 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher_test.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher_test.go @@ -106,11 +106,11 @@ func TestBucketIndexMetadataFetcher_Fetch_KeyPermissionDenied(t *testing.T) { reg := prometheus.NewPedanticRegistry() ctx := context.Background() - bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucketindex.ErrCustomerManagedKeyError) + bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucket.ErrCustomerManagedKeyError) fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, log.NewNopLogger(), reg, nil) metas, _, err := fetcher.Fetch(ctx) - require.ErrorIs(t, bucketindex.ErrCustomerManagedKeyError, err) + require.ErrorIs(t, bucket.ErrCustomerManagedKeyError, err) assert.Empty(t, metas) assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index f12036b42f4..e23f9f4dc55 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -32,8 +32,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" - "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" - "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util/backoff" @@ -234,7 +232,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte for job := range jobs { if err := f(ctx, job.store.BucketStore); err != nil { - if errors.Is(err, bucketindex.ErrCustomerManagedKeyError) { + if errors.Is(err, bucket.ErrCustomerManagedKeyError) { job.store.err = err } else { errsMx.Lock() @@ -300,7 +298,7 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return nil } - if store.err != nil && errors.Is(store.err, bucketindex.ErrCustomerManagedKeyError) { + if store.err != nil && errors.Is(store.err, bucket.ErrCustomerManagedKeyError) { return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", store.err) } @@ -309,7 +307,7 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri ctx: spanCtx, }) - if err != nil && errors.Is(err, bucketindex.ErrCustomerManagedKeyError) { + if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) { return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) } diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 1458a586fa7..1ef24f6ee8b 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -33,6 +33,8 @@ import ( "go.uber.org/atomic" "google.golang.org/grpc/metadata" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -40,6 +42,51 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" ) +func TestBucketStores_CustomerKeyError(t *testing.T) { + userToMetric := map[string]string{ + "user-1": "series_1", + "user-2": "series_2", + } + + ctx := context.Background() + cfg := prepareStorageConfig(t) + cfg.BucketStore.BucketIndex.Enabled = true + + storageDir := t.TempDir() + + for userID, metricName := range userToMetric { + generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15) + } + + b, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + + mBucket := &cortex_testutil.MockBucketFailure{ + Bucket: b, + GetFailures: map[string]error{ + "user-1": cortex_testutil.ErrKeyAccessDeniedError, + }, + } + require.NoError(t, err) + + reg := prometheus.NewPedanticRegistry() + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + + // Should set the error on user-1 + require.NoError(t, stores.InitialSync(ctx)) + require.ErrorIs(t, stores.stores["user-1"].err, bucket.ErrCustomerManagedKeyError) + require.ErrorIs(t, stores.stores["user-2"].err, nil) + require.NoError(t, stores.SyncBlocks(context.Background())) + require.ErrorIs(t, stores.stores["user-1"].err, bucket.ErrCustomerManagedKeyError) + require.ErrorIs(t, stores.stores["user-2"].err, nil) + + // Cleaning the error + mBucket.GetFailures = map[string]error{} + require.NoError(t, stores.SyncBlocks(context.Background())) + require.ErrorIs(t, stores.stores["user-1"].err, nil) + require.ErrorIs(t, stores.stores["user-2"].err, nil) +} + func TestBucketStores_InitialSync(t *testing.T) { t.Parallel() userToMetric := map[string]string{ From e8c9f1d2be9b3ea237bb16f1b685fc4a0ccfe247 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 3 Jul 2023 09:53:20 -0700 Subject: [PATCH 4/8] fixing race Signed-off-by: Alan Protasio --- pkg/storegateway/bucket_stores.go | 47 ++++++++++++++++---------- pkg/storegateway/bucket_stores_test.go | 23 +++++++++---- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index e23f9f4dc55..2ba400aca5a 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -65,7 +65,11 @@ type BucketStores struct { // Keeps a bucket store for each tenant. storesMu sync.RWMutex - stores map[string]*BucketStoreWithLastError + stores map[string]*store.BucketStore + + // Keeps the last sync error for the bucket store for each tenant. + storesErrorsMu sync.RWMutex + storesErrors map[string]error // Metrics. syncTimes prometheus.Histogram @@ -100,7 +104,8 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra limits: limits, bucket: cachingBucket, shardingStrategy: shardingStrategy, - stores: map[string]*BucketStoreWithLastError{}, + stores: map[string]*store.BucketStore{}, + storesErrors: map[string]error{}, logLevel: logLevel, bucketStoreMetrics: NewBucketStoreMetrics(), metaFetcherMetrics: NewMetadataFetcherMetrics(), @@ -198,7 +203,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte type job struct { userID string - store *BucketStoreWithLastError + store *store.BucketStore } wg := &sync.WaitGroup{} @@ -231,16 +236,20 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte defer wg.Done() for job := range jobs { - if err := f(ctx, job.store.BucketStore); err != nil { + if err := f(ctx, job.store); err != nil { if errors.Is(err, bucket.ErrCustomerManagedKeyError) { - job.store.err = err + u.storesErrorsMu.Lock() + u.storesErrors[job.userID] = err + u.storesErrorsMu.Unlock() } else { errsMx.Lock() errs.Add(errors.Wrapf(err, "failed to synchronize TSDB blocks for user %s", job.userID)) errsMx.Unlock() } } else { - job.store.err = nil + u.storesErrorsMu.Lock() + delete(u.storesErrors, job.userID) + u.storesErrorsMu.Unlock() } } }() @@ -298,11 +307,13 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return nil } - if store.err != nil && errors.Is(store.err, bucket.ErrCustomerManagedKeyError) { - return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", store.err) + err := u.getStoreError(userID) + + if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) } - err := store.Series(req, spanSeriesServer{ + err = store.Series(req, spanSeriesServer{ Store_SeriesServer: srv, ctx: spanCtx, }) @@ -366,12 +377,18 @@ func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) { return users, err } -func (u *BucketStores) getStore(userID string) *BucketStoreWithLastError { +func (u *BucketStores) getStore(userID string) *store.BucketStore { u.storesMu.RLock() defer u.storesMu.RUnlock() return u.stores[userID] } +func (u *BucketStores) getStoreError(userID string) error { + u.storesErrorsMu.RLock() + defer u.storesErrorsMu.RUnlock() + return u.storesErrors[userID] +} + var ( errBucketStoreNotEmpty = errors.New("bucket store not empty") errBucketStoreNotFound = errors.New("bucket store not found") @@ -409,7 +426,7 @@ func (u *BucketStores) closeEmptyBucketStore(userID string) error { return bs.Close() } -func isEmptyBucketStore(bs *BucketStoreWithLastError) bool { +func isEmptyBucketStore(bs *store.BucketStore) bool { min, max := bs.TimeRange() return min == math.MaxInt64 && max == math.MinInt64 } @@ -418,7 +435,7 @@ func (u *BucketStores) syncDirForUser(userID string) string { return filepath.Join(u.cfg.BucketStore.SyncDir, userID) } -func (u *BucketStores) getOrCreateStore(userID string) (*BucketStoreWithLastError, error) { +func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) { // Check if the store already exists. bs := u.getStore(userID) if bs != nil { @@ -522,7 +539,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStoreWithLastErro bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging()) } - s, err := store.NewBucketStore( + bs, err := store.NewBucketStore( userBkt, fetcher, u.syncDirForUser(userID), @@ -542,10 +559,6 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStoreWithLastErro return nil, err } - bs = &BucketStoreWithLastError{ - BucketStore: s, - } - u.stores[userID] = bs u.metaFetcherMetrics.AddUserRegistry(userID, fetcherReg) u.bucketStoreMetrics.AddUserRegistry(userID, bucketStoreReg) diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 1ef24f6ee8b..86956b9321b 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -29,8 +29,10 @@ import ( "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/logging" "go.uber.org/atomic" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" @@ -74,17 +76,26 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { // Should set the error on user-1 require.NoError(t, stores.InitialSync(ctx)) - require.ErrorIs(t, stores.stores["user-1"].err, bucket.ErrCustomerManagedKeyError) - require.ErrorIs(t, stores.stores["user-2"].err, nil) + require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyError) + require.ErrorIs(t, stores.storesErrors["user-2"], nil) require.NoError(t, stores.SyncBlocks(context.Background())) - require.ErrorIs(t, stores.stores["user-1"].err, bucket.ErrCustomerManagedKeyError) - require.ErrorIs(t, stores.stores["user-2"].err, nil) + require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyError) + require.ErrorIs(t, stores.storesErrors["user-2"], nil) + + _, _, err = querySeries(stores, "user-1", "anything", 0, 100) + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyError)) + _, _, err = querySeries(stores, "user-2", "anything", 0, 100) + require.NoError(t, err) // Cleaning the error mBucket.GetFailures = map[string]error{} require.NoError(t, stores.SyncBlocks(context.Background())) - require.ErrorIs(t, stores.stores["user-1"].err, nil) - require.ErrorIs(t, stores.stores["user-2"].err, nil) + require.ErrorIs(t, stores.storesErrors["user-1"], nil) + require.ErrorIs(t, stores.storesErrors["user-2"], nil) + _, _, err = querySeries(stores, "user-1", "anything", 0, 100) + require.NoError(t, err) + _, _, err = querySeries(stores, "user-2", "anything", 0, 100) + require.NoError(t, err) } func TestBucketStores_InitialSync(t *testing.T) { From 465da07309ffa20127943784587a95020c8c160a Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 3 Jul 2023 09:54:13 -0700 Subject: [PATCH 5/8] lint Signed-off-by: Alan Protasio --- pkg/storage/tsdb/bucketindex/loader_test.go | 3 ++- pkg/storage/tsdb/bucketindex/storage_test.go | 3 ++- pkg/storegateway/bucket_stores.go | 7 +------ 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/pkg/storage/tsdb/bucketindex/loader_test.go b/pkg/storage/tsdb/bucketindex/loader_test.go index 3c5c9815b3e..99c3e0b5f89 100644 --- a/pkg/storage/tsdb/bucketindex/loader_test.go +++ b/pkg/storage/tsdb/bucketindex/loader_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -17,6 +16,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" diff --git a/pkg/storage/tsdb/bucketindex/storage_test.go b/pkg/storage/tsdb/bucketindex/storage_test.go index 11369d98df9..a6e0606efc8 100644 --- a/pkg/storage/tsdb/bucketindex/storage_test.go +++ b/pkg/storage/tsdb/bucketindex/storage_test.go @@ -6,11 +6,12 @@ import ( "strings" "testing" - "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/go-kit/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 2ba400aca5a..c8532242648 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -78,11 +78,6 @@ type BucketStores struct { tenantsSynced prometheus.Gauge } -type BucketStoreWithLastError struct { - *store.BucketStore - err error -} - // NewBucketStores makes a new BucketStores. func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg) @@ -105,7 +100,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra bucket: cachingBucket, shardingStrategy: shardingStrategy, stores: map[string]*store.BucketStore{}, - storesErrors: map[string]error{}, + storesErrors: map[string]error{}, logLevel: logLevel, bucketStoreMetrics: NewBucketStoreMetrics(), metaFetcherMetrics: NewMetadataFetcherMetrics(), From 7fbec0a962c6584e948a5e1c5e68d2c4e9b436a1 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 3 Jul 2023 10:12:52 -0700 Subject: [PATCH 6/8] Implementing error handling on labels apis Signed-off-by: Alan Protasio --- pkg/storegateway/bucket_stores.go | 28 ++++++++++++++++-- pkg/storegateway/bucket_stores_test.go | 40 ++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index c8532242648..fe02a552f00 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -335,7 +335,19 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe return &storepb.LabelNamesResponse{}, nil } - return store.LabelNames(ctx, req) + err := u.getStoreError(userID) + + if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) { + return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + } + + resp, err := store.LabelNames(ctx, req) + + if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) { + return resp, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + } + + return resp, err } // LabelValues implements the Storegateway proto service. @@ -353,7 +365,19 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues return &storepb.LabelValuesResponse{}, nil } - return store.LabelValues(ctx, req) + err := u.getStoreError(userID) + + if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) { + return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + } + + resp, err := store.LabelValues(ctx, req) + + if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) { + return resp, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + } + + return resp, err } // scanUsers in the bucket and return the list of found users. If an error occurs while diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 86956b9321b..95de2877afa 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -84,8 +84,16 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { _, _, err = querySeries(stores, "user-1", "anything", 0, 100) require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyError)) + _, err = queryLabelsNames(stores, "user-1", "anything") + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyError)) + _, err = queryLabelsValues(stores, "user-1", "anything") + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyError)) _, _, err = querySeries(stores, "user-2", "anything", 0, 100) require.NoError(t, err) + _, err = queryLabelsNames(stores, "user-1", "anything") + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyError)) + _, err = queryLabelsValues(stores, "user-1", "anything") + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyError)) // Cleaning the error mBucket.GetFailures = map[string]error{} @@ -96,6 +104,10 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { require.NoError(t, err) _, _, err = querySeries(stores, "user-2", "anything", 0, 100) require.NoError(t, err) + _, err = queryLabelsNames(stores, "user-1", "anything") + require.NoError(t, err) + _, err = queryLabelsValues(stores, "user-1", "anything") + require.NoError(t, err) } func TestBucketStores_InitialSync(t *testing.T) { @@ -487,6 +499,34 @@ func querySeries(stores *BucketStores, userID, metricName string, minT, maxT int return srv.SeriesSet, srv.Warnings, err } +func queryLabelsNames(stores *BucketStores, userID, metricName string) (*storepb.LabelNamesResponse, error) { + req := &storepb.LabelNamesRequest{ + Matchers: []storepb.LabelMatcher{{ + Type: storepb.LabelMatcher_EQ, + Name: labels.MetricName, + Value: metricName, + }}, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + } + + ctx := setUserIDToGRPCContext(context.Background(), userID) + return stores.LabelNames(ctx, req) +} + +func queryLabelsValues(stores *BucketStores, userID, metricName string) (*storepb.LabelValuesResponse, error) { + req := &storepb.LabelValuesRequest{ + Matchers: []storepb.LabelMatcher{{ + Type: storepb.LabelMatcher_EQ, + Name: labels.MetricName, + Value: metricName, + }}, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + } + + ctx := setUserIDToGRPCContext(context.Background(), userID) + return stores.LabelValues(ctx, req) +} + func mockLoggingLevel() logging.Level { level := logging.Level{} err := level.Set("info") From 1828dcd6f0e45e618e2c7bfcfc9bb12bf9df7186 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 3 Jul 2023 12:07:44 -0700 Subject: [PATCH 7/8] handling errros from thanos SG Signed-off-by: Alan Protasio --- pkg/compactor/blocks_cleaner.go | 2 +- pkg/querier/blocks_finder_bucket_index.go | 2 +- pkg/storage/bucket/client.go | 2 +- pkg/storage/tsdb/bucketindex/loader.go | 4 +- pkg/storage/tsdb/bucketindex/loader_test.go | 4 +- pkg/storage/tsdb/bucketindex/storage.go | 3 +- pkg/storage/tsdb/bucketindex/storage_test.go | 3 +- .../bucket_index_metadata_fetcher.go | 6 +-- .../bucket_index_metadata_fetcher_test.go | 5 +- pkg/storegateway/bucket_stores.go | 15 +++--- pkg/storegateway/bucket_stores_test.go | 14 +++--- pkg/util/errors/errors.go | 47 +++++++++++++++++++ 12 files changed, 79 insertions(+), 28 deletions(-) create mode 100644 pkg/util/errors/errors.go diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index ce2bbbb844b..920155b2f37 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -325,7 +325,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger) if errors.Is(err, bucketindex.ErrIndexCorrupted) { level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it") - } else if errors.Is(err, bucket.ErrCustomerManagedKeyError) { + } else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { // Give up cleaning if we get access denied level.Warn(userLogger).Log("msg", err.Error()) return nil diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index ffa1433b5fc..ccf9e7b7728 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -65,7 +65,7 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, return nil, nil, nil } - if errors.Is(err, bucket.ErrCustomerManagedKeyError) { + if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { return nil, nil, validation.AccessDeniedError(err.Error()) } diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 5f96ac60064..852b4a7cc66 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -41,7 +41,7 @@ var ( ErrUnsupportedStorageBackend = errors.New("unsupported storage backend") - ErrCustomerManagedKeyError = errors.New("access denied: customer key") + ErrCustomerManagedKeyAccessDenied = errors.New("access denied: customer key") ) // Config holds configuration for accessing long-term storage. diff --git a/pkg/storage/tsdb/bucketindex/loader.go b/pkg/storage/tsdb/bucketindex/loader.go index b984312b1b6..2961b864057 100644 --- a/pkg/storage/tsdb/bucketindex/loader.go +++ b/pkg/storage/tsdb/bucketindex/loader.go @@ -115,7 +115,7 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) { if errors.Is(err, ErrIndexNotFound) { level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID) - } else if errors.Is(err, bucket.ErrCustomerManagedKeyError) { + } else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { level.Warn(l.logger).Log("msg", "key access denied when reading bucket index", "user", userID) } else { // We don't track ErrIndexNotFound as failure because it's a legit case (eg. a tenant just @@ -198,7 +198,7 @@ func (l *Loader) updateCachedIndex(ctx context.Context, userID string) { l.loadAttempts.Inc() startTime := time.Now() idx, err := ReadIndex(readCtx, l.bkt, userID, l.cfgProvider, l.logger) - if err != nil && !errors.Is(err, ErrIndexNotFound) && !errors.Is(err, bucket.ErrCustomerManagedKeyError) { + if err != nil && !errors.Is(err, ErrIndexNotFound) && !errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { l.loadFailures.Inc() level.Warn(l.logger).Log("msg", "unable to update bucket index", "user", userID, "err", err) return diff --git a/pkg/storage/tsdb/bucketindex/loader_test.go b/pkg/storage/tsdb/bucketindex/loader_test.go index 99c3e0b5f89..e73ca0de1e9 100644 --- a/pkg/storage/tsdb/bucketindex/loader_test.go +++ b/pkg/storage/tsdb/bucketindex/loader_test.go @@ -603,7 +603,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousKeyAcessDenied(t *testing }) _, err := loader.GetIndex(ctx, user) - require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyError)) + require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) // Check cached require.NoError(t, loader.checkCachedIndexes(ctx)) @@ -669,7 +669,7 @@ func TestLoader_GetIndex_ShouldCacheKeyDeniedErrors(t *testing.T) { // Request the index multiple times. for i := 0; i < 10; i++ { _, err := loader.GetIndex(ctx, "user-1") - require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyError)) + require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) } // Ensure metrics have been updated accordingly. diff --git a/pkg/storage/tsdb/bucketindex/storage.go b/pkg/storage/tsdb/bucketindex/storage.go index 771a361024d..ecfb713f4ae 100644 --- a/pkg/storage/tsdb/bucketindex/storage.go +++ b/pkg/storage/tsdb/bucketindex/storage.go @@ -13,6 +13,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" "github.com/cortexproject/cortex/pkg/util/runutil" ) @@ -33,7 +34,7 @@ func ReadIndex(ctx context.Context, bkt objstore.Bucket, userID string, cfgProvi } if userBkt.IsCustomerManagedKeyError(err) { - return nil, bucket.ErrCustomerManagedKeyError + return nil, cortex_errors.WithCause(bucket.ErrCustomerManagedKeyAccessDenied, err) } return nil, errors.Wrap(err, "read bucket index") diff --git a/pkg/storage/tsdb/bucketindex/storage_test.go b/pkg/storage/tsdb/bucketindex/storage_test.go index a6e0606efc8..a3f28d8dbfd 100644 --- a/pkg/storage/tsdb/bucketindex/storage_test.go +++ b/pkg/storage/tsdb/bucketindex/storage_test.go @@ -2,6 +2,7 @@ package bucketindex import ( "context" + "errors" "path" "strings" "testing" @@ -47,7 +48,7 @@ func TestReadIndex_ShouldReturnErrorIfKeyAccessDeniedErr(t *testing.T) { }, } idx, err := ReadIndex(context.Background(), bkt, "user-1", nil, log.NewNopLogger()) - require.Equal(t, bucket.ErrCustomerManagedKeyError, err) + require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) require.Nil(t, idx) } diff --git a/pkg/storegateway/bucket_index_metadata_fetcher.go b/pkg/storegateway/bucket_index_metadata_fetcher.go index ac55eef35d3..7dc1ea9048d 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher.go @@ -68,7 +68,7 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. start := time.Now() defer func() { f.metrics.SyncDuration.Observe(time.Since(start).Seconds()) - if err != nil && !errors.Is(err, bucket.ErrCustomerManagedKeyError) { + if err != nil && !errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { f.metrics.SyncFailures.Inc() } }() @@ -95,14 +95,14 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. return nil, nil, nil } - if errors.Is(err, bucket.ErrCustomerManagedKeyError) { + if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { // stop the job and return the error // this error should be used to return Access Denied to the caller level.Error(f.logger).Log("msg", "bucket index key permission revoked", "user", f.userID, "err", err) f.metrics.Synced.WithLabelValues(keyAccessDenied).Set(1) f.metrics.Submit() - return nil, nil, bucket.ErrCustomerManagedKeyError + return nil, nil, err } if err != nil { diff --git a/pkg/storegateway/bucket_index_metadata_fetcher_test.go b/pkg/storegateway/bucket_index_metadata_fetcher_test.go index 094bf9ddce0..129bd0c39f9 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher_test.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher_test.go @@ -3,6 +3,7 @@ package storegateway import ( "bytes" "context" + "errors" "path" "strings" "testing" @@ -106,11 +107,11 @@ func TestBucketIndexMetadataFetcher_Fetch_KeyPermissionDenied(t *testing.T) { reg := prometheus.NewPedanticRegistry() ctx := context.Background() - bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucket.ErrCustomerManagedKeyError) + bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucket.ErrCustomerManagedKeyAccessDenied) fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, log.NewNopLogger(), reg, nil) metas, _, err := fetcher.Fetch(ctx) - require.ErrorIs(t, bucket.ErrCustomerManagedKeyError, err) + require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) assert.Empty(t, metas) assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index fe02a552f00..287fe4d0d26 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -35,6 +35,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util/backoff" + cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/validation" @@ -232,7 +233,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte for job := range jobs { if err := f(ctx, job.store); err != nil { - if errors.Is(err, bucket.ErrCustomerManagedKeyError) { + if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { u.storesErrorsMu.Lock() u.storesErrors[job.userID] = err u.storesErrorsMu.Unlock() @@ -304,7 +305,7 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri err := u.getStoreError(userID) - if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) { + if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) } @@ -313,7 +314,7 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri ctx: spanCtx, }) - if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) { + if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) } @@ -337,13 +338,13 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe err := u.getStoreError(userID) - if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) { + if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) } resp, err := store.LabelNames(ctx, req) - if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) { + if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { return resp, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) } @@ -367,13 +368,13 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues err := u.getStoreError(userID) - if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) { + if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) } resp, err := store.LabelValues(ctx, req) - if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) { + if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { return resp, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) } diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 95de2877afa..d7585941163 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -76,24 +76,24 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { // Should set the error on user-1 require.NoError(t, stores.InitialSync(ctx)) - require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyError) + require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyAccessDenied) require.ErrorIs(t, stores.storesErrors["user-2"], nil) require.NoError(t, stores.SyncBlocks(context.Background())) - require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyError) + require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyAccessDenied) require.ErrorIs(t, stores.storesErrors["user-2"], nil) _, _, err = querySeries(stores, "user-1", "anything", 0, 100) - require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyError)) + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) _, err = queryLabelsNames(stores, "user-1", "anything") - require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyError)) + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) _, err = queryLabelsValues(stores, "user-1", "anything") - require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyError)) + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) _, _, err = querySeries(stores, "user-2", "anything", 0, 100) require.NoError(t, err) _, err = queryLabelsNames(stores, "user-1", "anything") - require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyError)) + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) _, err = queryLabelsValues(stores, "user-1", "anything") - require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyError)) + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) // Cleaning the error mBucket.GetFailures = map[string]error{} diff --git a/pkg/util/errors/errors.go b/pkg/util/errors/errors.go new file mode 100644 index 00000000000..d5c19f363bf --- /dev/null +++ b/pkg/util/errors/errors.go @@ -0,0 +1,47 @@ +package errors + +import "errors" + +type errWithCause struct { + error + cause error +} + +func (e errWithCause) Error() string { + return e.error.Error() +} + +// Cause To support errors.Cause(). +func (e errWithCause) Cause() error { + return e.cause +} + +// Is To support errors.Is(). +func (e errWithCause) Is(err error) bool { + return errors.Is(err, e.error) || errors.Is(err, e.cause) +} + +// Unwrap To support errors.Unwrap(). +func (e errWithCause) Unwrap() error { + return e.cause +} + +// WithCause wrappers err with a error cause +func WithCause(err, cause error) error { + return errWithCause{ + error: err, + cause: cause, + } +} + +// ErrorIs is similar to `errors.Is` but receives a function to compare +func ErrorIs(err error, f func(err error) bool) bool { + for { + if f(err) { + return true + } + if err = errors.Unwrap(err); err == nil { + return false + } + } +} From 1be9cc99cb8054bfb0b95804890af7143ebad781 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 3 Jul 2023 12:34:26 -0700 Subject: [PATCH 8/8] creating IsOneOfTheExpectedErrors func Signed-off-by: Alan Protasio --- pkg/storage/tsdb/bucketindex/storage.go | 2 +- pkg/storage/tsdb/bucketindex/updater.go | 2 +- pkg/storage/tsdb/util.go | 9 +++++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/storage/tsdb/bucketindex/storage.go b/pkg/storage/tsdb/bucketindex/storage.go index ecfb713f4ae..5e66a1357bc 100644 --- a/pkg/storage/tsdb/bucketindex/storage.go +++ b/pkg/storage/tsdb/bucketindex/storage.go @@ -27,7 +27,7 @@ func ReadIndex(ctx context.Context, bkt objstore.Bucket, userID string, cfgProvi userBkt := bucket.NewUserBucketClient(userID, bkt, cfgProvider) // Get the bucket index. - reader, err := userBkt.WithExpectedErrs(tsdb.IsObjNotFoundOrCustomerManagedKeyErr(userBkt)).Get(ctx, IndexCompressedFilename) + reader, err := userBkt.WithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(userBkt.IsCustomerManagedKeyError, userBkt.IsObjNotFoundErr)).Get(ctx, IndexCompressedFilename) if err != nil { if userBkt.IsObjNotFoundErr(err) { return nil, ErrIndexNotFound diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index 06f8b824f1e..64c84f06efc 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -132,7 +132,7 @@ func (w *Updater) updateBlockIndexEntry(ctx context.Context, id ulid.ULID) (*Blo metaFile := path.Join(id.String(), block.MetaFilename) // Get the block's meta.json file. - r, err := w.bkt.ReaderWithExpectedErrs(tsdb.IsObjNotFoundOrCustomerManagedKeyErr(w.bkt)).Get(ctx, metaFile) + r, err := w.bkt.ReaderWithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(w.bkt.IsObjNotFoundErr, w.bkt.IsCustomerManagedKeyError)).Get(ctx, metaFile) if w.bkt.IsObjNotFoundErr(err) { return nil, ErrBlockMetaNotFound } diff --git a/pkg/storage/tsdb/util.go b/pkg/storage/tsdb/util.go index 28dc7948a4d..014b510d30c 100644 --- a/pkg/storage/tsdb/util.go +++ b/pkg/storage/tsdb/util.go @@ -17,8 +17,13 @@ func HashBlockID(id ulid.ULID) uint32 { return h } -func IsObjNotFoundOrCustomerManagedKeyErr(bkt objstore.Bucket) objstore.IsOpFailureExpectedFunc { +func IsOneOfTheExpectedErrors(f ...objstore.IsOpFailureExpectedFunc) objstore.IsOpFailureExpectedFunc { return func(err error) bool { - return bkt.IsObjNotFoundErr(err) || bkt.IsCustomerManagedKeyError(err) + for _, f := range f { + if f(err) { + return true + } + } + return false } }