diff --git a/go.mod b/go.mod index 330b9e3bf0e..44b58c5490c 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/sony/gobreaker v0.5.0 github.com/spf13/afero v1.9.5 github.com/stretchr/testify v1.8.4 - github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd + github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea github.com/thanos-io/thanos v0.31.1-0.20230627154113-7cfaf3fe2d43 github.com/uber/jaeger-client-go v2.30.0+incompatible diff --git a/go.sum b/go.sum index af0a964f5ab..53485b38d25 100644 --- a/go.sum +++ b/go.sum @@ -1160,8 +1160,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= -github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd h1:asQ0HomkaUXZuR3J7daBEusMS++3hkYsYM6u8gpmPWM= -github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM= +github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a h1:tXcVeuval1nzdHn1JXqaBmyjuEUcpDI9huPrUF04nR4= +github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM= github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea h1:kzK8sBn2+mo3NAxP+XjAjAqr1hwfxxFUy5CybaBkjAI= github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea/go.mod h1:eIgPaXWgOhNAv6CPPrgu09r0AtT7byBTZy+7WkX0D18= github.com/thanos-io/thanos v0.31.1-0.20230627154113-7cfaf3fe2d43 h1:UHyTPGdDHAoNHuSce5cJ2vEi6g1v8D5ZFBWZ61uTHSM= diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 3259944987c..920155b2f37 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -325,6 +325,10 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger) if errors.Is(err, bucketindex.ErrIndexCorrupted) { level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it") + } else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { + // Give up cleaning if we get access denied + level.Warn(userLogger).Log("msg", err.Error()) + return nil } else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) { return err } diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 52f16d741a1..707d5f36ab6 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -3,7 +3,6 @@ package compactor import ( "context" "crypto/rand" - "errors" "fmt" "path" "strings" @@ -17,14 +16,12 @@ import ( prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" - "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -57,6 +54,37 @@ func TestBlocksCleaner(t *testing.T) { } } +func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { + const userID = "user-1" + + bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t) + bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) + + // Create blocks. + ctx := context.Background() + deletionDelay := 12 * time.Hour + bucketClient = &cortex_testutil.MockBucketFailure{ + Bucket: bucketClient, + GetFailures: map[string]error{ + path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, + }, + } + + cfg := BlocksCleanerConfig{ + DeletionDelay: deletionDelay, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + } + + logger := log.NewNopLogger() + scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) + cfgProvider := newMockConfigProvider() + + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil) + err := cleaner.cleanUser(ctx, userID, true) + require.NoError(t, err) +} + func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) { bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t) @@ -254,7 +282,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { createDeletionMark(t, bucketClient, userID, block4, now.Add(-deletionDelay).Add(-time.Hour)) // To emulate a failure deleting a block, we wrap the bucket client in a mocked one. - bucketClient = &mockBucketFailure{ + bucketClient = &cortex_testutil.MockBucketFailure{ Bucket: bucketClient, DeleteFailures: []string{path.Join(userID, block3.String(), metadata.MetaFilename)}, } @@ -658,19 +686,6 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { } } -type mockBucketFailure struct { - objstore.Bucket - - DeleteFailures []string -} - -func (m *mockBucketFailure) Delete(ctx context.Context, name string) error { - if util.StringsContain(m.DeleteFailures, name) { - return errors.New("mocked delete failure") - } - return m.Bucket.Delete(ctx, name) -} - type mockConfigProvider struct { userRetentionPeriods map[string]time.Duration } diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index 31686fcb7ba..ccf9e7b7728 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -10,6 +10,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/services" @@ -62,6 +64,11 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, // so the bucket index hasn't been created yet. return nil, nil, nil } + + if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { + return nil, nil, validation.AccessDeniedError(err.Error()) + } + if err != nil { return nil, nil, err } diff --git a/pkg/querier/blocks_finder_bucket_index_test.go b/pkg/querier/blocks_finder_bucket_index_test.go index 8230d638f9c..68523816b83 100644 --- a/pkg/querier/blocks_finder_bucket_index_test.go +++ b/pkg/querier/blocks_finder_bucket_index_test.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util/services" @@ -241,3 +243,21 @@ func prepareBucketIndexBlocksFinder(t testing.TB, bkt objstore.Bucket) *BucketIn return finder } + +func TestBucketIndexBlocksFinder_GetBlocks_KeyPermissionDenied(t *testing.T) { + const userID = "user-1" + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + bkt = &cortex_testutil.MockBucketFailure{ + Bucket: bkt, + GetFailures: map[string]error{ + path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, + }, + } + + finder := prepareBucketIndexBlocksFinder(t, bkt) + + _, _, err := finder.GetBlocks(context.Background(), userID, 0, 100) + expected := validation.AccessDeniedError("error") + require.IsType(t, expected, err) +} diff --git a/pkg/querier/error_translate_queryable.go b/pkg/querier/error_translate_queryable.go index c0a14a37231..b08e18554e0 100644 --- a/pkg/querier/error_translate_queryable.go +++ b/pkg/querier/error_translate_queryable.go @@ -39,6 +39,9 @@ func TranslateToPromqlAPIError(err error) error { case validation.LimitError: // This will be returned with status code 422 by Prometheus API. return err + case validation.AccessDeniedError: + // This will be returned with status code 422 by Prometheus API. + return err default: if errors.Is(err, context.Canceled) { return err // 422 diff --git a/pkg/querier/error_translate_queryable_test.go b/pkg/querier/error_translate_queryable_test.go index d5d93b72a59..89529ca9840 100644 --- a/pkg/querier/error_translate_queryable_test.go +++ b/pkg/querier/error_translate_queryable_test.go @@ -43,6 +43,12 @@ func TestApiStatusCodes(t *testing.T) { expectedCode: 422, }, + { + err: validation.AccessDeniedError("access denied"), + expectedString: "access denied", + expectedCode: 422, + }, + { err: promql.ErrTooManySamples("query execution"), expectedString: "too many samples", diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index aee3296dc7d..852b4a7cc66 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -40,6 +40,8 @@ var ( SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem} ErrUnsupportedStorageBackend = errors.New("unsupported storage backend") + + ErrCustomerManagedKeyAccessDenied = errors.New("access denied: customer key") ) // Config holds configuration for accessing long-term storage. diff --git a/pkg/storage/bucket/client_mock.go b/pkg/storage/bucket/client_mock.go index 1ad90b4dd35..76e24af13f5 100644 --- a/pkg/storage/bucket/client_mock.go +++ b/pkg/storage/bucket/client_mock.go @@ -12,7 +12,10 @@ import ( "github.com/thanos-io/objstore" ) -var errObjectDoesNotExist = errors.New("object does not exist") +var ( + errObjectDoesNotExist = errors.New("object does not exist") + errKeyPermissionDenied = errors.New("object key permission denied") +) // ClientMock mocks objstore.Bucket type ClientMock struct { @@ -175,6 +178,11 @@ func (m *ClientMock) IsObjNotFoundErr(err error) bool { return err == errObjectDoesNotExist } +// IsCustomerManagedKeyError mocks objstore.Bucket.IsCustomerManagedKeyError() +func (m *ClientMock) IsCustomerManagedKeyError(err error) bool { + return err == errKeyPermissionDenied +} + // ObjectSize mocks objstore.Bucket.Attributes() func (m *ClientMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { args := m.Called(ctx, name) diff --git a/pkg/storage/bucket/prefixed_bucket_client.go b/pkg/storage/bucket/prefixed_bucket_client.go index 5f0d8955b72..6486751e12c 100644 --- a/pkg/storage/bucket/prefixed_bucket_client.go +++ b/pkg/storage/bucket/prefixed_bucket_client.go @@ -73,18 +73,23 @@ func (b *PrefixedBucketClient) IsObjNotFoundErr(err error) bool { return b.bucket.IsObjNotFoundErr(err) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *PrefixedBucketClient) IsCustomerManagedKeyError(err error) bool { + return b.bucket.IsCustomerManagedKeyError(err) +} + // Attributes returns attributes of the specified object. func (b *PrefixedBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { return b.bucket.Attributes(ctx, b.fullName(name)) } -// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment +// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment // thanos_objstore_bucket_operation_failures_total metric. func (b *PrefixedBucketClient) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.BucketReader { return b.WithExpectedErrs(fn) } -// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment +// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment // thanos_objstore_bucket_operation_failures_total metric. func (b *PrefixedBucketClient) WithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.Bucket { if ib, ok := b.bucket.(objstore.InstrumentedBucket); ok { diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index ed76d835e5c..020037c5c9c 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -115,7 +115,7 @@ func (b *BucketWithRetries) retry(ctx context.Context, f func() error, operation if lastErr == nil { return nil } - if b.bucket.IsObjNotFoundErr(lastErr) { + if b.bucket.IsObjNotFoundErr(lastErr) || b.bucket.IsCustomerManagedKeyError(lastErr) { return lastErr } retries.Wait() @@ -194,6 +194,10 @@ func (b *BucketWithRetries) IsObjNotFoundErr(err error) bool { return b.bucket.IsObjNotFoundErr(err) } +func (b *BucketWithRetries) IsCustomerManagedKeyError(err error) bool { + return b.bucket.IsCustomerManagedKeyError(err) +} + func (b *BucketWithRetries) Close() error { return b.bucket.Close() } diff --git a/pkg/storage/bucket/s3/bucket_client_test.go b/pkg/storage/bucket/s3/bucket_client_test.go index bc991b4f8df..a6d88f76b79 100644 --- a/pkg/storage/bucket/s3/bucket_client_test.go +++ b/pkg/storage/bucket/s3/bucket_client_test.go @@ -3,6 +3,7 @@ package s3 import ( "bytes" "context" + "errors" "fmt" "io" "testing" @@ -13,6 +14,49 @@ import ( "github.com/thanos-io/objstore" ) +var ( + errNotFound = errors.New("not found") + errKeyDenied = errors.New("key denied") +) + +func TestBucketWithRetries_ShouldRetry(t *testing.T) { + t.Parallel() + + cases := map[string]struct { + err error + retryCount int + }{ + "should not retry on not found": { + err: errNotFound, + retryCount: 1, + }, + "should not retry on key access denied": { + err: errKeyDenied, + retryCount: 1, + }, + } + + for name, tc := range cases { + t.Run(name, func(*testing.T) { + m := mockBucket{ + FailCount: 3, + errToReturn: tc.err, + } + + b := BucketWithRetries{ + logger: log.NewNopLogger(), + bucket: &m, + operationRetries: 5, + retryMinBackoff: 10 * time.Millisecond, + retryMaxBackoff: time.Second, + } + + _, _ = b.Get(context.Background(), "something") + require.Equal(t, 1, m.calledCount) + }) + } +} + func TestBucketWithRetries_UploadSeekable(t *testing.T) { t.Parallel() @@ -102,6 +146,9 @@ func (f *fakeReader) Read(p []byte) (n int, err error) { type mockBucket struct { FailCount int uploadedContent []byte + errToReturn error + + calledCount int } // Upload mocks objstore.Bucket.Upload() @@ -135,7 +182,8 @@ func (m *mockBucket) Iter(ctx context.Context, dir string, f func(string) error, // Get mocks objstore.Bucket.Get() func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - return nil, nil + m.calledCount++ + return nil, m.errToReturn } // GetRange mocks objstore.Bucket.GetRange() @@ -150,7 +198,12 @@ func (m *mockBucket) Exists(ctx context.Context, name string) (bool, error) { // IsObjNotFoundErr mocks objstore.Bucket.IsObjNotFoundErr() func (m *mockBucket) IsObjNotFoundErr(err error) bool { - return false + return err == errNotFound +} + +// IsCustomerManagedKeyError mocks objstore.Bucket.IsCustomerManagedKeyError() +func (m *mockBucket) IsCustomerManagedKeyError(err error) bool { + return err == errKeyDenied } // ObjectSize mocks objstore.Bucket.Attributes() diff --git a/pkg/storage/bucket/sse_bucket_client.go b/pkg/storage/bucket/sse_bucket_client.go index e4901fcf018..36c1678c160 100644 --- a/pkg/storage/bucket/sse_bucket_client.go +++ b/pkg/storage/bucket/sse_bucket_client.go @@ -119,6 +119,10 @@ func (b *SSEBucketClient) IsObjNotFoundErr(err error) bool { return b.bucket.IsObjNotFoundErr(err) } +func (b *SSEBucketClient) IsCustomerManagedKeyError(err error) bool { + return b.bucket.IsCustomerManagedKeyError(err) +} + // Attributes implements objstore.Bucket. func (b *SSEBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { return b.bucket.Attributes(ctx, name) diff --git a/pkg/storage/tsdb/bucketindex/loader.go b/pkg/storage/tsdb/bucketindex/loader.go index 0d2ab7cce33..2961b864057 100644 --- a/pkg/storage/tsdb/bucketindex/loader.go +++ b/pkg/storage/tsdb/bucketindex/loader.go @@ -115,6 +115,8 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) { if errors.Is(err, ErrIndexNotFound) { level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID) + } else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { + level.Warn(l.logger).Log("msg", "key access denied when reading bucket index", "user", userID) } else { // We don't track ErrIndexNotFound as failure because it's a legit case (eg. a tenant just // started to remote write and its blocks haven't uploaded to storage yet). @@ -196,7 +198,7 @@ func (l *Loader) updateCachedIndex(ctx context.Context, userID string) { l.loadAttempts.Inc() startTime := time.Now() idx, err := ReadIndex(readCtx, l.bkt, userID, l.cfgProvider, l.logger) - if err != nil && !errors.Is(err, ErrIndexNotFound) { + if err != nil && !errors.Is(err, ErrIndexNotFound) && !errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { l.loadFailures.Inc() level.Warn(l.logger).Log("msg", "unable to update bucket index", "user", userID, "err", err) return diff --git a/pkg/storage/tsdb/bucketindex/loader_test.go b/pkg/storage/tsdb/bucketindex/loader_test.go index b9fe23752a8..e73ca0de1e9 100644 --- a/pkg/storage/tsdb/bucketindex/loader_test.go +++ b/pkg/storage/tsdb/bucketindex/loader_test.go @@ -10,11 +10,14 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" @@ -572,6 +575,121 @@ func TestLoader_ShouldOffloadIndexIfIdleTimeoutIsReachedDuringBackgroundUpdates( )) } +func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousKeyAcessDenied(t *testing.T) { + user := "user-1" + ctx := context.Background() + reg := prometheus.NewPedanticRegistry() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + // Create the loader. + cfg := LoaderConfig{ + CheckInterval: time.Hour, // Intentionally high to not hit it. + UpdateOnStaleInterval: time.Hour, // Intentionally high to not hit it. + UpdateOnErrorInterval: 0, + IdleTimeout: time.Hour, // Intentionally high to not hit it. + } + + mockedBkt := &cortex_testutil.MockBucketFailure{ + Bucket: bkt, + GetFailures: map[string]error{ + path.Join(user, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, + }, + } + + loader := NewLoader(cfg, mockedBkt, nil, log.NewNopLogger(), reg) + require.NoError(t, services.StartAndAwaitRunning(ctx, loader)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) + }) + + _, err := loader.GetIndex(ctx, user) + require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) + + // Check cached + require.NoError(t, loader.checkCachedIndexes(ctx)) + + loader.bkt = bkt + + // Upload the bucket index. + idx := &Index{ + Version: IndexVersion1, + Blocks: Blocks{ + {ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20}, + }, + BlockDeletionMarks: nil, + UpdatedAt: time.Now().Unix(), + } + require.NoError(t, WriteIndex(ctx, bkt, "user-1", nil, idx)) + + // Wait until the index has been updated in background. + test.Poll(t, 3*time.Second, nil, func() interface{} { + _, err := loader.GetIndex(ctx, "user-1") + // Check cached + require.NoError(t, loader.checkCachedIndexes(ctx)) + return err + }) + + actualIdx, err := loader.GetIndex(ctx, "user-1") + require.NoError(t, err) + assert.Equal(t, idx, actualIdx) + + // Ensure metrics have been updated accordingly. + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures. + # TYPE cortex_bucket_index_load_failures_total counter + cortex_bucket_index_load_failures_total 0 + # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory. + # TYPE cortex_bucket_index_loaded gauge + cortex_bucket_index_loaded 1 + `), + "cortex_bucket_index_loaded", "cortex_bucket_index_load_failures_total", + )) +} + +func TestLoader_GetIndex_ShouldCacheKeyDeniedErrors(t *testing.T) { + user := "user-1" + ctx := context.Background() + reg := prometheus.NewPedanticRegistry() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + bkt = &cortex_testutil.MockBucketFailure{ + Bucket: bkt, + GetFailures: map[string]error{ + path.Join(user, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, + }, + } + + // Create the loader. + loader := NewLoader(prepareLoaderConfig(), bkt, nil, log.NewNopLogger(), reg) + require.NoError(t, services.StartAndAwaitRunning(ctx, loader)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) + }) + + // Request the index multiple times. + for i := 0; i < 10; i++ { + _, err := loader.GetIndex(ctx, "user-1") + require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) + } + + // Ensure metrics have been updated accordingly. + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures. + # TYPE cortex_bucket_index_load_failures_total counter + cortex_bucket_index_load_failures_total 0 + # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory. + # TYPE cortex_bucket_index_loaded gauge + cortex_bucket_index_loaded 0 + # HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts. + # TYPE cortex_bucket_index_loads_total counter + cortex_bucket_index_loads_total 1 + `), + "cortex_bucket_index_loads_total", + "cortex_bucket_index_load_failures_total", + "cortex_bucket_index_loaded", + )) +} + func prepareLoaderConfig() LoaderConfig { return LoaderConfig{ CheckInterval: time.Minute, diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go index e175f10f90b..7c38b8ff3c6 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go @@ -100,6 +100,11 @@ func (b *globalMarkersBucket) IsObjNotFoundErr(err error) bool { return b.parent.IsObjNotFoundErr(err) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *globalMarkersBucket) IsCustomerManagedKeyError(err error) bool { + return b.parent.IsCustomerManagedKeyError(err) +} + // Attributes implements objstore.Bucket. func (b *globalMarkersBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { return b.parent.Attributes(ctx, name) diff --git a/pkg/storage/tsdb/bucketindex/storage.go b/pkg/storage/tsdb/bucketindex/storage.go index 6ebd615d2ef..5e66a1357bc 100644 --- a/pkg/storage/tsdb/bucketindex/storage.go +++ b/pkg/storage/tsdb/bucketindex/storage.go @@ -10,7 +10,10 @@ import ( "github.com/pkg/errors" "github.com/thanos-io/objstore" + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" "github.com/cortexproject/cortex/pkg/util/runutil" ) @@ -24,11 +27,16 @@ func ReadIndex(ctx context.Context, bkt objstore.Bucket, userID string, cfgProvi userBkt := bucket.NewUserBucketClient(userID, bkt, cfgProvider) // Get the bucket index. - reader, err := userBkt.WithExpectedErrs(userBkt.IsObjNotFoundErr).Get(ctx, IndexCompressedFilename) + reader, err := userBkt.WithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(userBkt.IsCustomerManagedKeyError, userBkt.IsObjNotFoundErr)).Get(ctx, IndexCompressedFilename) if err != nil { if userBkt.IsObjNotFoundErr(err) { return nil, ErrIndexNotFound } + + if userBkt.IsCustomerManagedKeyError(err) { + return nil, cortex_errors.WithCause(bucket.ErrCustomerManagedKeyAccessDenied, err) + } + return nil, errors.Wrap(err, "read bucket index") } defer runutil.CloseWithLogOnErr(logger, reader, "close bucket index reader") diff --git a/pkg/storage/tsdb/bucketindex/storage_test.go b/pkg/storage/tsdb/bucketindex/storage_test.go index c27e37aac4a..a3f28d8dbfd 100644 --- a/pkg/storage/tsdb/bucketindex/storage_test.go +++ b/pkg/storage/tsdb/bucketindex/storage_test.go @@ -2,6 +2,7 @@ package bucketindex import ( "context" + "errors" "path" "strings" "testing" @@ -10,6 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) @@ -36,6 +39,19 @@ func TestReadIndex_ShouldReturnErrorIfIndexIsCorrupted(t *testing.T) { require.Nil(t, idx) } +func TestReadIndex_ShouldReturnErrorIfKeyAccessDeniedErr(t *testing.T) { + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + bkt = &cortex_testutil.MockBucketFailure{ + Bucket: bkt, + GetFailures: map[string]error{ + path.Join("user-1", "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, + }, + } + idx, err := ReadIndex(context.Background(), bkt, "user-1", nil, log.NewNopLogger()) + require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) + require.Nil(t, idx) +} + func TestReadIndex_ShouldReturnTheParsedIndexOnSuccess(t *testing.T) { const userID = "user-1" diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index f46e7cddfde..64c84f06efc 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -15,6 +15,8 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/bucket" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/runutil" @@ -25,6 +27,8 @@ var ( ErrBlockMetaCorrupted = block.ErrorSyncMetaCorrupted ErrBlockDeletionMarkNotFound = errors.New("block deletion mark not found") ErrBlockDeletionMarkCorrupted = errors.New("block deletion mark corrupted") + + errBlockMetaKeyAccessDeniedErr = errors.New("block meta file key access denied error") ) // Updater is responsible to generate an update in-memory bucket index. @@ -108,6 +112,11 @@ func (w *Updater) updateBlocks(ctx context.Context, old []*Block) (blocks []*Blo level.Warn(w.logger).Log("msg", "skipped partial block when updating bucket index", "block", id.String()) continue } + if errors.Is(err, errBlockMetaKeyAccessDeniedErr) { + partials[id] = err + level.Warn(w.logger).Log("msg", "skipped partial block when updating bucket index due key permission", "block", id.String()) + continue + } if errors.Is(err, ErrBlockMetaCorrupted) { partials[id] = err level.Error(w.logger).Log("msg", "skipped block with corrupted meta.json when updating bucket index", "block", id.String(), "err", err) @@ -123,10 +132,13 @@ func (w *Updater) updateBlockIndexEntry(ctx context.Context, id ulid.ULID) (*Blo metaFile := path.Join(id.String(), block.MetaFilename) // Get the block's meta.json file. - r, err := w.bkt.ReaderWithExpectedErrs(w.bkt.IsObjNotFoundErr).Get(ctx, metaFile) + r, err := w.bkt.ReaderWithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(w.bkt.IsObjNotFoundErr, w.bkt.IsCustomerManagedKeyError)).Get(ctx, metaFile) if w.bkt.IsObjNotFoundErr(err) { return nil, ErrBlockMetaNotFound } + if w.bkt.IsCustomerManagedKeyError(err) { + return nil, errBlockMetaKeyAccessDeniedErr + } if err != nil { return nil, errors.Wrapf(err, "get block meta file: %v", metaFile) } diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index c42fbeaeebe..e9d40f37374 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -138,6 +138,50 @@ func TestUpdater_UpdateIndex_ShouldNotIncreaseOperationFailureMetric(t *testing. `), "thanos_objstore_bucket_operation_failures_total")) } +func TestUpdater_UpdateIndex_ShouldNotIncreaseOperationFailureMetricCustomerKey(t *testing.T) { + const userID = "user-1" + + bkt, _ := testutil.PrepareFilesystemBucket(t) + + ctx := context.Background() + logger := log.NewNopLogger() + registry := prometheus.NewRegistry() + + // Mock some blocks in the storage. + bkt = BucketWithGlobalMarkers(bkt) + bkt = objstore.BucketWithMetrics("test-bucket", bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry)) + block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) + block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) + + bkt = &testutil.MockBucketFailure{ + Bucket: bkt, + GetFailures: map[string]error{ + path.Join(userID, block2.ULID.String(), "meta.json"): testutil.ErrKeyAccessDeniedError, + }, + } + + w := NewUpdater(bkt, userID, nil, logger) + idx, partials, _, err := w.UpdateIndex(ctx, nil) + require.NoError(t, err) + assert.Len(t, partials, 1) + assert.True(t, errors.Is(partials[block2.ULID], errBlockMetaKeyAccessDeniedErr)) + assertBucketIndexEqual(t, idx, bkt, userID, + []tsdb.BlockMeta{block1}, + []*metadata.DeletionMark{}) + + assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` + # HELP thanos_objstore_bucket_operation_failures_total Total number of operations against a bucket that failed, but were not expected to fail in certain way from caller perspective. Those errors have to be investigated. + # TYPE thanos_objstore_bucket_operation_failures_total counter + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="attributes"} 0 + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="delete"} 0 + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="exists"} 0 + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="get"} 0 + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="get_range"} 0 + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="iter"} 0 + thanos_objstore_bucket_operation_failures_total{bucket="test-bucket",operation="upload"} 0 + `), "thanos_objstore_bucket_operation_failures_total")) +} + func TestUpdater_UpdateIndex_ShouldSkipBlocksWithCorruptedMeta(t *testing.T) { const userID = "user-1" diff --git a/pkg/storage/tsdb/testutil/objstore.go b/pkg/storage/tsdb/testutil/objstore.go index 04dc2208f08..46fc1b001bf 100644 --- a/pkg/storage/tsdb/testutil/objstore.go +++ b/pkg/storage/tsdb/testutil/objstore.go @@ -1,15 +1,23 @@ package testutil import ( + "context" + "io" "os" + "strings" "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" ) +var ErrKeyAccessDeniedError = errors.New("test key access denied") + func PrepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) { storageDir, err := os.MkdirTemp(os.TempDir(), "bucket") require.NoError(t, err) @@ -23,3 +31,50 @@ func PrepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) { return objstore.BucketWithMetrics("test", bkt, nil), storageDir } + +type MockBucketFailure struct { + objstore.Bucket + + DeleteFailures []string + GetFailures map[string]error +} + +func (m *MockBucketFailure) Delete(ctx context.Context, name string) error { + if util.StringsContain(m.DeleteFailures, name) { + return errors.New("mocked delete failure") + } + return m.Bucket.Delete(ctx, name) +} + +func (m *MockBucketFailure) Get(ctx context.Context, name string) (io.ReadCloser, error) { + for prefix, err := range m.GetFailures { + if strings.HasPrefix(name, prefix) { + return nil, err + } + } + if e, ok := m.GetFailures[name]; ok { + return nil, e + } + + return m.Bucket.Get(ctx, name) +} + +func (m *MockBucketFailure) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket { + if ibkt, ok := m.Bucket.(objstore.InstrumentedBucket); ok { + return &MockBucketFailure{Bucket: ibkt.WithExpectedErrs(expectedFunc), DeleteFailures: m.DeleteFailures, GetFailures: m.GetFailures} + } + + return m +} + +func (m *MockBucketFailure) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + if ibkt, ok := m.Bucket.(objstore.InstrumentedBucket); ok { + return &MockBucketFailure{Bucket: ibkt.WithExpectedErrs(expectedFunc), DeleteFailures: m.DeleteFailures, GetFailures: m.GetFailures} + } + + return m +} + +func (m *MockBucketFailure) IsCustomerManagedKeyError(err error) bool { + return errors.Is(err, ErrKeyAccessDeniedError) +} diff --git a/pkg/storage/tsdb/util.go b/pkg/storage/tsdb/util.go index c13f5a2b7c4..014b510d30c 100644 --- a/pkg/storage/tsdb/util.go +++ b/pkg/storage/tsdb/util.go @@ -2,6 +2,7 @@ package tsdb import ( "github.com/oklog/ulid" + "github.com/thanos-io/objstore" "github.com/cortexproject/cortex/pkg/ingester/client" ) @@ -15,3 +16,14 @@ func HashBlockID(id ulid.ULID) uint32 { } return h } + +func IsOneOfTheExpectedErrors(f ...objstore.IsOpFailureExpectedFunc) objstore.IsOpFailureExpectedFunc { + return func(err error) bool { + for _, f := range f { + if f(err) { + return true + } + } + return false + } +} diff --git a/pkg/storegateway/bucket_index_metadata_fetcher.go b/pkg/storegateway/bucket_index_metadata_fetcher.go index 60cb1cbb374..7dc1ea9048d 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher.go @@ -19,6 +19,7 @@ import ( const ( corruptedBucketIndex = "corrupted-bucket-index" + keyAccessDenied = "key-access-denied" noBucketIndex = "no-bucket-index" ) @@ -67,7 +68,7 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. start := time.Now() defer func() { f.metrics.SyncDuration.Observe(time.Since(start).Seconds()) - if err != nil { + if err != nil && !errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { f.metrics.SyncFailures.Inc() } }() @@ -93,6 +94,17 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. return nil, nil, nil } + + if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { + // stop the job and return the error + // this error should be used to return Access Denied to the caller + level.Error(f.logger).Log("msg", "bucket index key permission revoked", "user", f.userID, "err", err) + f.metrics.Synced.WithLabelValues(keyAccessDenied).Set(1) + f.metrics.Submit() + + return nil, nil, err + } + if err != nil { f.metrics.Synced.WithLabelValues(block.FailedMeta).Set(1) f.metrics.Submit() diff --git a/pkg/storegateway/bucket_index_metadata_fetcher_test.go b/pkg/storegateway/bucket_index_metadata_fetcher_test.go index b7279fb018c..129bd0c39f9 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher_test.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher_test.go @@ -3,6 +3,7 @@ package storegateway import ( "bytes" "context" + "errors" "path" "strings" "testing" @@ -99,6 +100,53 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) { )) } +func TestBucketIndexMetadataFetcher_Fetch_KeyPermissionDenied(t *testing.T) { + const userID = "user-1" + + bkt := &bucket.ClientMock{} + reg := prometheus.NewPedanticRegistry() + ctx := context.Background() + + bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucket.ErrCustomerManagedKeyAccessDenied) + + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, log.NewNopLogger(), reg, nil) + metas, _, err := fetcher.Fetch(ctx) + require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) + assert.Empty(t, metas) + + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP blocks_meta_modified Number of blocks whose metadata changed + # TYPE blocks_meta_modified gauge + blocks_meta_modified{modified="replica-label-removed"} 0 + # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures + # TYPE blocks_meta_sync_failures_total counter + blocks_meta_sync_failures_total 0 + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-bucket-index"} 0 + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="key-access-denied"} 1 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 0 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-bucket-index"} 0 + blocks_meta_synced{state="no-meta-json"} 0 + blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="too-fresh"} 0 + # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts + # TYPE blocks_meta_syncs_total counter + blocks_meta_syncs_total 1 + `), + "blocks_meta_modified", + "blocks_meta_sync_failures_total", + "blocks_meta_synced", + "blocks_meta_syncs_total", + )) +} + func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) { t.Parallel() const userID = "user-1" diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 879fcc4b29a..287fe4d0d26 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -29,11 +29,13 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/logging" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util/backoff" + cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/validation" @@ -66,6 +68,10 @@ type BucketStores struct { storesMu sync.RWMutex stores map[string]*store.BucketStore + // Keeps the last sync error for the bucket store for each tenant. + storesErrorsMu sync.RWMutex + storesErrors map[string]error + // Metrics. syncTimes prometheus.Histogram syncLastSuccess prometheus.Gauge @@ -95,6 +101,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra bucket: cachingBucket, shardingStrategy: shardingStrategy, stores: map[string]*store.BucketStore{}, + storesErrors: map[string]error{}, logLevel: logLevel, bucketStoreMetrics: NewBucketStoreMetrics(), metaFetcherMetrics: NewMetadataFetcherMetrics(), @@ -226,9 +233,19 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte for job := range jobs { if err := f(ctx, job.store); err != nil { - errsMx.Lock() - errs.Add(errors.Wrapf(err, "failed to synchronize TSDB blocks for user %s", job.userID)) - errsMx.Unlock() + if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { + u.storesErrorsMu.Lock() + u.storesErrors[job.userID] = err + u.storesErrorsMu.Unlock() + } else { + errsMx.Lock() + errs.Add(errors.Wrapf(err, "failed to synchronize TSDB blocks for user %s", job.userID)) + errsMx.Unlock() + } + } else { + u.storesErrorsMu.Lock() + delete(u.storesErrors, job.userID) + u.storesErrorsMu.Unlock() } } }() @@ -286,10 +303,22 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return nil } - return store.Series(req, spanSeriesServer{ + err := u.getStoreError(userID) + + if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + } + + err = store.Series(req, spanSeriesServer{ Store_SeriesServer: srv, ctx: spanCtx, }) + + if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + } + + return err } // LabelNames implements the Storegateway proto service. @@ -307,7 +336,19 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe return &storepb.LabelNamesResponse{}, nil } - return store.LabelNames(ctx, req) + err := u.getStoreError(userID) + + if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { + return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + } + + resp, err := store.LabelNames(ctx, req) + + if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { + return resp, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + } + + return resp, err } // LabelValues implements the Storegateway proto service. @@ -325,7 +366,19 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues return &storepb.LabelValuesResponse{}, nil } - return store.LabelValues(ctx, req) + err := u.getStoreError(userID) + + if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { + return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + } + + resp, err := store.LabelValues(ctx, req) + + if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { + return resp, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + } + + return resp, err } // scanUsers in the bucket and return the list of found users. If an error occurs while @@ -350,6 +403,12 @@ func (u *BucketStores) getStore(userID string) *store.BucketStore { return u.stores[userID] } +func (u *BucketStores) getStoreError(userID string) error { + u.storesErrorsMu.RLock() + defer u.storesErrorsMu.RUnlock() + return u.storesErrors[userID] +} + var ( errBucketStoreNotEmpty = errors.New("bucket store not empty") errBucketStoreNotFound = errors.New("bucket store not found") diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 1458a586fa7..d7585941163 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -29,10 +29,14 @@ import ( "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/logging" "go.uber.org/atomic" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -40,6 +44,72 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" ) +func TestBucketStores_CustomerKeyError(t *testing.T) { + userToMetric := map[string]string{ + "user-1": "series_1", + "user-2": "series_2", + } + + ctx := context.Background() + cfg := prepareStorageConfig(t) + cfg.BucketStore.BucketIndex.Enabled = true + + storageDir := t.TempDir() + + for userID, metricName := range userToMetric { + generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15) + } + + b, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + + mBucket := &cortex_testutil.MockBucketFailure{ + Bucket: b, + GetFailures: map[string]error{ + "user-1": cortex_testutil.ErrKeyAccessDeniedError, + }, + } + require.NoError(t, err) + + reg := prometheus.NewPedanticRegistry() + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + + // Should set the error on user-1 + require.NoError(t, stores.InitialSync(ctx)) + require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyAccessDenied) + require.ErrorIs(t, stores.storesErrors["user-2"], nil) + require.NoError(t, stores.SyncBlocks(context.Background())) + require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyAccessDenied) + require.ErrorIs(t, stores.storesErrors["user-2"], nil) + + _, _, err = querySeries(stores, "user-1", "anything", 0, 100) + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) + _, err = queryLabelsNames(stores, "user-1", "anything") + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) + _, err = queryLabelsValues(stores, "user-1", "anything") + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) + _, _, err = querySeries(stores, "user-2", "anything", 0, 100) + require.NoError(t, err) + _, err = queryLabelsNames(stores, "user-1", "anything") + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) + _, err = queryLabelsValues(stores, "user-1", "anything") + require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) + + // Cleaning the error + mBucket.GetFailures = map[string]error{} + require.NoError(t, stores.SyncBlocks(context.Background())) + require.ErrorIs(t, stores.storesErrors["user-1"], nil) + require.ErrorIs(t, stores.storesErrors["user-2"], nil) + _, _, err = querySeries(stores, "user-1", "anything", 0, 100) + require.NoError(t, err) + _, _, err = querySeries(stores, "user-2", "anything", 0, 100) + require.NoError(t, err) + _, err = queryLabelsNames(stores, "user-1", "anything") + require.NoError(t, err) + _, err = queryLabelsValues(stores, "user-1", "anything") + require.NoError(t, err) +} + func TestBucketStores_InitialSync(t *testing.T) { t.Parallel() userToMetric := map[string]string{ @@ -429,6 +499,34 @@ func querySeries(stores *BucketStores, userID, metricName string, minT, maxT int return srv.SeriesSet, srv.Warnings, err } +func queryLabelsNames(stores *BucketStores, userID, metricName string) (*storepb.LabelNamesResponse, error) { + req := &storepb.LabelNamesRequest{ + Matchers: []storepb.LabelMatcher{{ + Type: storepb.LabelMatcher_EQ, + Name: labels.MetricName, + Value: metricName, + }}, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + } + + ctx := setUserIDToGRPCContext(context.Background(), userID) + return stores.LabelNames(ctx, req) +} + +func queryLabelsValues(stores *BucketStores, userID, metricName string) (*storepb.LabelValuesResponse, error) { + req := &storepb.LabelValuesRequest{ + Matchers: []storepb.LabelMatcher{{ + Type: storepb.LabelMatcher_EQ, + Name: labels.MetricName, + Value: metricName, + }}, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + } + + ctx := setUserIDToGRPCContext(context.Background(), userID) + return stores.LabelValues(ctx, req) +} + func mockLoggingLevel() logging.Level { level := logging.Level{} err := level.Set("info") diff --git a/pkg/util/errors/errors.go b/pkg/util/errors/errors.go new file mode 100644 index 00000000000..d5c19f363bf --- /dev/null +++ b/pkg/util/errors/errors.go @@ -0,0 +1,47 @@ +package errors + +import "errors" + +type errWithCause struct { + error + cause error +} + +func (e errWithCause) Error() string { + return e.error.Error() +} + +// Cause To support errors.Cause(). +func (e errWithCause) Cause() error { + return e.cause +} + +// Is To support errors.Is(). +func (e errWithCause) Is(err error) bool { + return errors.Is(err, e.error) || errors.Is(err, e.cause) +} + +// Unwrap To support errors.Unwrap(). +func (e errWithCause) Unwrap() error { + return e.cause +} + +// WithCause wrappers err with a error cause +func WithCause(err, cause error) error { + return errWithCause{ + error: err, + cause: cause, + } +} + +// ErrorIs is similar to `errors.Is` but receives a function to compare +func ErrorIs(err error, f func(err error) bool) bool { + for { + if f(err) { + return true + } + if err = errors.Unwrap(err); err == nil { + return false + } + } +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 00b386923b4..1c690d69920 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -24,6 +24,13 @@ const ( GlobalIngestionRateStrategy = "global" ) +// AccessDeniedError are errors that do not comply with the limits specified. +type AccessDeniedError string + +func (e AccessDeniedError) Error() string { + return string(e) +} + // LimitError are errors that do not comply with the limits specified. type LimitError string diff --git a/vendor/github.com/thanos-io/objstore/CHANGELOG.md b/vendor/github.com/thanos-io/objstore/CHANGELOG.md index 05aafa7bd32..270f6d4f029 100644 --- a/vendor/github.com/thanos-io/objstore/CHANGELOG.md +++ b/vendor/github.com/thanos-io/objstore/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#43](https://github.com/thanos-io/objstore/pull/43) filesystem: abort filesystem bucket operations if the context has been cancelled - [#44](https://github.com/thanos-io/objstore/pull/44) Add new metric to count total number of fetched bytes from bucket - [#50](https://github.com/thanos-io/objstore/pull/50) Add Huawei Cloud OBS Object Storage Support +- [#59](https://github.com/thanos-io/objstore/pull/59) Adding method `IsCustomerManagedKeyError` on the bucket interface. ### Changed - [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`. diff --git a/vendor/github.com/thanos-io/objstore/inmem.go b/vendor/github.com/thanos-io/objstore/inmem.go index ac36e7f469f..aee4aec6cfa 100644 --- a/vendor/github.com/thanos-io/objstore/inmem.go +++ b/vendor/github.com/thanos-io/objstore/inmem.go @@ -207,6 +207,11 @@ func (b *InMemBucket) IsObjNotFoundErr(err error) bool { return errors.Is(err, errNotFound) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *InMemBucket) IsCustomerManagedKeyError(_ error) bool { + return false +} + func (b *InMemBucket) Close() error { return nil } // Name returns the bucket name. diff --git a/vendor/github.com/thanos-io/objstore/objstore.go b/vendor/github.com/thanos-io/objstore/objstore.go index cc6034d5dfd..fb9b0bac8d0 100644 --- a/vendor/github.com/thanos-io/objstore/objstore.go +++ b/vendor/github.com/thanos-io/objstore/objstore.go @@ -85,6 +85,9 @@ type BucketReader interface { // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. IsObjNotFoundErr(err error) bool + // IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. + IsCustomerManagedKeyError(err error) bool + // Attributes returns information about the specified object. Attributes(ctx context.Context, name string) (ObjectAttributes, error) } @@ -603,6 +606,10 @@ func (b *metricBucket) IsObjNotFoundErr(err error) bool { return b.bkt.IsObjNotFoundErr(err) } +func (b *metricBucket) IsCustomerManagedKeyError(err error) bool { + return b.bkt.IsCustomerManagedKeyError(err) +} + func (b *metricBucket) Close() error { return b.bkt.Close() } diff --git a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go index 130f14d439d..41448011729 100644 --- a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go +++ b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go @@ -74,6 +74,11 @@ func (p *PrefixedBucket) IsObjNotFoundErr(err error) bool { return p.bkt.IsObjNotFoundErr(err) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (p *PrefixedBucket) IsCustomerManagedKeyError(err error) bool { + return p.bkt.IsCustomerManagedKeyError(err) +} + // Attributes returns information about the specified object. func (p PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { return p.bkt.Attributes(ctx, conditionalPrefix(p.prefix, name)) diff --git a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go index 23e66169db6..a5f41ed1769 100644 --- a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go +++ b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go @@ -235,6 +235,11 @@ func (b *Bucket) IsObjNotFoundErr(err error) bool { return bloberror.HasCode(err, bloberror.BlobNotFound) || bloberror.HasCode(err, bloberror.InvalidURI) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *Bucket) IsCustomerManagedKeyError(_ error) bool { + return false +} + func (b *Bucket) getBlobReader(ctx context.Context, name string, httpRange blob.HTTPRange) (io.ReadCloser, error) { level.Debug(b.logger).Log("msg", "getting blob", "blob", name, "offset", httpRange.Offset, "length", httpRange.Count) if name == "" { diff --git a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go index 3206b91e717..8ccd33b10f6 100644 --- a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go +++ b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go @@ -258,6 +258,11 @@ func (b *Bucket) IsObjNotFoundErr(err error) bool { return os.IsNotExist(errors.Cause(err)) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *Bucket) IsCustomerManagedKeyError(_ error) bool { + return false +} + func (b *Bucket) Close() error { return nil } // Name returns the bucket name. diff --git a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go index 947e641a30f..8b107c83d82 100644 --- a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go +++ b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go @@ -188,6 +188,11 @@ func (b *Bucket) IsObjNotFoundErr(err error) bool { return errors.Is(err, storage.ErrObjectNotExist) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *Bucket) IsCustomerManagedKeyError(_ error) bool { + return false +} + func (b *Bucket) Close() error { return b.closer.Close() } diff --git a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go index 729ee7eb6de..6bc2b64438e 100644 --- a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go +++ b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go @@ -98,6 +98,9 @@ const ( // Storage class header. amzStorageClass = "X-Amz-Storage-Class" + + // amzKmsKeyAccessDeniedErrorMessage is the error message returned by s3 when the permissions to the KMS key is revoked. + amzKmsKeyAccessDeniedErrorMessage = "The ciphertext refers to a customer master key that does not exist, does not exist in this region, or you are not allowed to access." ) var DefaultConfig = Config{ @@ -144,7 +147,7 @@ type Config struct { } // SSEConfig deals with the configuration of SSE for Minio. The following options are valid: -// kmsencryptioncontext == https://docs.aws.amazon.com/kms/latest/developerguide/services-s3.html#s3-encryption-context +// KMSEncryptionContext == https://docs.aws.amazon.com/kms/latest/developerguide/services-s3.html#s3-encryption-context type SSEConfig struct { Type string `yaml:"type"` KMSKeyID string `yaml:"kms_key_id"` @@ -538,6 +541,12 @@ func (b *Bucket) IsObjNotFoundErr(err error) bool { return minio.ToErrorResponse(errors.Cause(err)).Code == "NoSuchKey" } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *Bucket) IsCustomerManagedKeyError(err error) bool { + errResponse := minio.ToErrorResponse(errors.Cause(err)) + return errResponse.Code == "AccessDenied" && errResponse.Message == amzKmsKeyAccessDeniedErrorMessage +} + func (b *Bucket) Close() error { return nil } // getServerSideEncryption returns the SSE to use. diff --git a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go index f30c655dc74..c24d03fd2d5 100644 --- a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go +++ b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go @@ -290,6 +290,11 @@ func (c *Container) IsObjNotFoundErr(err error) bool { return errors.Is(err, swift.ObjectNotFound) } +// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. +func (b *Container) IsCustomerManagedKeyError(_ error) bool { + return false +} + // Upload writes the contents of the reader as an object into the container. func (c *Container) Upload(_ context.Context, name string, r io.Reader) (err error) { size, err := objstore.TryToGetSize(r) diff --git a/vendor/github.com/thanos-io/objstore/testing.go b/vendor/github.com/thanos-io/objstore/testing.go index d750142af1d..4e41b278825 100644 --- a/vendor/github.com/thanos-io/objstore/testing.go +++ b/vendor/github.com/thanos-io/objstore/testing.go @@ -308,3 +308,7 @@ func (d *delayingBucket) IsObjNotFoundErr(err error) bool { // No delay for a local operation. return d.bkt.IsObjNotFoundErr(err) } + +func (d *delayingBucket) IsCustomerManagedKeyError(err error) bool { + return d.bkt.IsCustomerManagedKeyError(err) +} diff --git a/vendor/github.com/thanos-io/objstore/tracing.go b/vendor/github.com/thanos-io/objstore/tracing.go index 56f18ebead8..9f09df668e7 100644 --- a/vendor/github.com/thanos-io/objstore/tracing.go +++ b/vendor/github.com/thanos-io/objstore/tracing.go @@ -101,6 +101,10 @@ func (t TracingBucket) IsObjNotFoundErr(err error) bool { return t.bkt.IsObjNotFoundErr(err) } +func (t TracingBucket) IsCustomerManagedKeyError(err error) bool { + return t.bkt.IsCustomerManagedKeyError(err) +} + func (t TracingBucket) WithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) Bucket { if ib, ok := t.bkt.(InstrumentedBucket); ok { return TracingBucket{bkt: ib.WithExpectedErrs(expectedFunc)} diff --git a/vendor/modules.txt b/vendor/modules.txt index b120958f4e2..f32adcbe7a0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -832,7 +832,7 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd +# github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a ## explicit; go 1.18 github.com/thanos-io/objstore github.com/thanos-io/objstore/exthttp