Skip to content

Commit ac123e8

Browse files
committed
lint
Signed-off-by: Alan Protasio <[email protected]>
1 parent 30dbe57 commit ac123e8

File tree

9 files changed

+58
-32
lines changed

9 files changed

+58
-32
lines changed

pkg/querier/blocks_finder_bucket_index.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import (
44
"context"
55
"time"
66

7-
"github.com/cortexproject/cortex/pkg/util/validation"
87
"github.com/go-kit/log"
98
"github.com/oklog/ulid"
109
"github.com/pkg/errors"
1110
"github.com/prometheus/client_golang/prometheus"
1211
"github.com/thanos-io/objstore"
1312

13+
"github.com/cortexproject/cortex/pkg/util/validation"
14+
1415
"github.com/cortexproject/cortex/pkg/storage/bucket"
1516
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
1617
"github.com/cortexproject/cortex/pkg/util/services"

pkg/querier/blocks_finder_bucket_index_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ import (
77
"testing"
88
"time"
99

10-
"github.com/cortexproject/cortex/pkg/util/validation"
1110
"github.com/go-kit/log"
1211
"github.com/oklog/ulid"
1312
"github.com/stretchr/testify/assert"
1413
"github.com/stretchr/testify/require"
1514
"github.com/thanos-io/objstore"
1615

16+
"github.com/cortexproject/cortex/pkg/util/validation"
17+
1718
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
1819
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
1920
"github.com/cortexproject/cortex/pkg/util/services"

pkg/storage/tsdb/bucketindex/loader_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousKeyAcessDenied(t *testing
604604
require.True(t, errors.Is(err, ErrCustomerManagedKeyError))
605605

606606
// Check cached
607-
loader.checkCachedIndexes(ctx)
607+
require.NoError(t, loader.checkCachedIndexes(ctx))
608608

609609
loader.bkt = bkt
610610

@@ -623,7 +623,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousKeyAcessDenied(t *testing
623623
test.Poll(t, 3*time.Second, nil, func() interface{} {
624624
_, err := loader.GetIndex(ctx, "user-1")
625625
// Check cached
626-
loader.checkCachedIndexes(ctx)
626+
require.NoError(t, loader.checkCachedIndexes(ctx))
627627
return err
628628
})
629629

pkg/storage/tsdb/bucketindex/storage.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ import (
66
"context"
77
"encoding/json"
88

9-
"github.com/cortexproject/cortex/pkg/storage/tsdb"
109
"github.com/go-kit/log"
1110
"github.com/pkg/errors"
1211
"github.com/thanos-io/objstore"
1312

13+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
14+
1415
"github.com/cortexproject/cortex/pkg/storage/bucket"
1516
"github.com/cortexproject/cortex/pkg/util/runutil"
1617
)

pkg/storage/tsdb/bucketindex/updater.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"path"
88
"time"
99

10-
"github.com/cortexproject/cortex/pkg/storage/tsdb"
1110
"github.com/go-kit/log"
1211
"github.com/go-kit/log/level"
1312
"github.com/oklog/ulid"
@@ -16,17 +15,20 @@ import (
1615
"github.com/thanos-io/thanos/pkg/block"
1716
"github.com/thanos-io/thanos/pkg/block/metadata"
1817

18+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
19+
1920
"github.com/cortexproject/cortex/pkg/storage/bucket"
2021
util_log "github.com/cortexproject/cortex/pkg/util/log"
2122
"github.com/cortexproject/cortex/pkg/util/runutil"
2223
)
2324

2425
var (
25-
ErrBlockMetaNotFound = block.ErrorSyncMetaNotFound
26-
ErrBlockMetaCorrupted = block.ErrorSyncMetaCorrupted
27-
ErrBlockMetaKeyAccessDeniedErr = errors.New("block meta file key access denied error")
28-
ErrBlockDeletionMarkNotFound = errors.New("block deletion mark not found")
29-
ErrBlockDeletionMarkCorrupted = errors.New("block deletion mark corrupted")
26+
ErrBlockMetaNotFound = block.ErrorSyncMetaNotFound
27+
ErrBlockMetaCorrupted = block.ErrorSyncMetaCorrupted
28+
ErrBlockDeletionMarkNotFound = errors.New("block deletion mark not found")
29+
ErrBlockDeletionMarkCorrupted = errors.New("block deletion mark corrupted")
30+
31+
errBlockMetaKeyAccessDeniedErr = errors.New("block meta file key access denied error")
3032
)
3133

3234
// Updater is responsible to generate an update in-memory bucket index.
@@ -110,7 +112,7 @@ func (w *Updater) updateBlocks(ctx context.Context, old []*Block) (blocks []*Blo
110112
level.Warn(w.logger).Log("msg", "skipped partial block when updating bucket index", "block", id.String())
111113
continue
112114
}
113-
if errors.Is(err, ErrBlockMetaKeyAccessDeniedErr) {
115+
if errors.Is(err, errBlockMetaKeyAccessDeniedErr) {
114116
partials[id] = err
115117
level.Warn(w.logger).Log("msg", "skipped partial block when updating bucket index due key permission", "block", id.String())
116118
continue
@@ -135,7 +137,7 @@ func (w *Updater) updateBlockIndexEntry(ctx context.Context, id ulid.ULID) (*Blo
135137
return nil, ErrBlockMetaNotFound
136138
}
137139
if w.bkt.IsCustomerManagedKeyError(err) {
138-
return nil, ErrBlockMetaKeyAccessDeniedErr
140+
return nil, errBlockMetaKeyAccessDeniedErr
139141
}
140142
if err != nil {
141143
return nil, errors.Wrapf(err, "get block meta file: %v", metaFile)

pkg/storage/tsdb/testutil/objstore.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ import (
66
"os"
77
"testing"
88

9-
"github.com/cortexproject/cortex/pkg/util"
109
"github.com/pkg/errors"
1110
"github.com/stretchr/testify/require"
1211
"github.com/thanos-io/objstore"
1312

13+
"github.com/cortexproject/cortex/pkg/util"
14+
1415
"github.com/cortexproject/cortex/pkg/storage/bucket/filesystem"
1516
)
1617

pkg/storegateway/bucket_index_metadata_fetcher.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,14 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.
9595
return nil, nil, nil
9696
}
9797

98-
if errors.Is(err, bucketindex.ErrBlockMetaKeyAccessDeniedErr) {
99-
// Do not fail if the permission to the bucket key got revoked. We'll act as if the tenant has no bucket index, but the query
100-
// will fail anyway in the querier (the querier fails in the querier if it cannot load the bucket index).
101-
// This will cause the store-gateway to unload all blocks
98+
if errors.Is(err, bucketindex.ErrCustomerManagedKeyError) {
99+
// stop the job and return the error
100+
// this error should be used to return Access Denied to the caller
102101
level.Error(f.logger).Log("msg", "bucket index key permission revoked", "user", f.userID, "err", err)
103102
f.metrics.Synced.WithLabelValues(keyAccessDenied).Set(1)
104103
f.metrics.Submit()
105104

106-
return nil, nil, nil
105+
return nil, nil, bucketindex.ErrCustomerManagedKeyError
107106
}
108107

109108
if err != nil {

pkg/storegateway/bucket_index_metadata_fetcher_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,11 @@ func TestBucketIndexMetadataFetcher_Fetch_KeyPermissionDenied(t *testing.T) {
106106
reg := prometheus.NewPedanticRegistry()
107107
ctx := context.Background()
108108

109-
bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucketindex.ErrBlockMetaKeyAccessDeniedErr)
109+
bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucketindex.ErrCustomerManagedKeyError)
110110

111111
fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, log.NewNopLogger(), reg, nil)
112112
metas, _, err := fetcher.Fetch(ctx)
113-
require.NoError(t, err)
113+
require.ErrorIs(t, bucketindex.ErrCustomerManagedKeyError, err)
114114
assert.Empty(t, metas)
115115

116116
assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`

pkg/storegateway/bucket_stores.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"sync"
1212
"time"
1313

14+
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
1415
"github.com/go-kit/log"
1516
"github.com/go-kit/log/level"
1617
"github.com/oklog/ulid"
@@ -29,6 +30,7 @@ import (
2930
"github.com/thanos-io/thanos/pkg/store/storepb"
3031
"github.com/weaveworks/common/httpgrpc"
3132
"github.com/weaveworks/common/logging"
33+
"google.golang.org/grpc/codes"
3234
"google.golang.org/grpc/metadata"
3335

3436
"github.com/cortexproject/cortex/pkg/storage/bucket"
@@ -64,7 +66,7 @@ type BucketStores struct {
6466

6567
// Keeps a bucket store for each tenant.
6668
storesMu sync.RWMutex
67-
stores map[string]*store.BucketStore
69+
stores map[string]*BucketStoreWithLastError
6870

6971
// Metrics.
7072
syncTimes prometheus.Histogram
@@ -73,6 +75,11 @@ type BucketStores struct {
7375
tenantsSynced prometheus.Gauge
7476
}
7577

78+
type BucketStoreWithLastError struct {
79+
*store.BucketStore
80+
err error
81+
}
82+
7683
// NewBucketStores makes a new BucketStores.
7784
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
7885
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg)
@@ -94,7 +101,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
94101
limits: limits,
95102
bucket: cachingBucket,
96103
shardingStrategy: shardingStrategy,
97-
stores: map[string]*store.BucketStore{},
104+
stores: map[string]*BucketStoreWithLastError{},
98105
logLevel: logLevel,
99106
bucketStoreMetrics: NewBucketStoreMetrics(),
100107
metaFetcherMetrics: NewMetadataFetcherMetrics(),
@@ -192,7 +199,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte
192199

193200
type job struct {
194201
userID string
195-
store *store.BucketStore
202+
store *BucketStoreWithLastError
196203
}
197204

198205
wg := &sync.WaitGroup{}
@@ -225,10 +232,16 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte
225232
defer wg.Done()
226233

227234
for job := range jobs {
228-
if err := f(ctx, job.store); err != nil {
229-
errsMx.Lock()
230-
errs.Add(errors.Wrapf(err, "failed to synchronize TSDB blocks for user %s", job.userID))
231-
errsMx.Unlock()
235+
if err := f(ctx, job.store.BucketStore); err != nil {
236+
if errors.Is(err, bucketindex.ErrCustomerManagedKeyError) {
237+
job.store.err = err
238+
} else {
239+
errsMx.Lock()
240+
errs.Add(errors.Wrapf(err, "failed to synchronize TSDB blocks for user %s", job.userID))
241+
errsMx.Unlock()
242+
}
243+
} else {
244+
job.store.err = nil
232245
}
233246
}
234247
}()
@@ -286,6 +299,10 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
286299
return nil
287300
}
288301

302+
if store.err != nil && errors.Is(store.err, bucketindex.ErrCustomerManagedKeyError) {
303+
return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", store.err)
304+
}
305+
289306
return store.Series(req, spanSeriesServer{
290307
Store_SeriesServer: srv,
291308
ctx: spanCtx,
@@ -344,7 +361,7 @@ func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) {
344361
return users, err
345362
}
346363

347-
func (u *BucketStores) getStore(userID string) *store.BucketStore {
364+
func (u *BucketStores) getStore(userID string) *BucketStoreWithLastError {
348365
u.storesMu.RLock()
349366
defer u.storesMu.RUnlock()
350367
return u.stores[userID]
@@ -387,7 +404,7 @@ func (u *BucketStores) closeEmptyBucketStore(userID string) error {
387404
return bs.Close()
388405
}
389406

390-
func isEmptyBucketStore(bs *store.BucketStore) bool {
407+
func isEmptyBucketStore(bs *BucketStoreWithLastError) bool {
391408
min, max := bs.TimeRange()
392409
return min == math.MaxInt64 && max == math.MinInt64
393410
}
@@ -396,7 +413,7 @@ func (u *BucketStores) syncDirForUser(userID string) string {
396413
return filepath.Join(u.cfg.BucketStore.SyncDir, userID)
397414
}
398415

399-
func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) {
416+
func (u *BucketStores) getOrCreateStore(userID string) (*BucketStoreWithLastError, error) {
400417
// Check if the store already exists.
401418
bs := u.getStore(userID)
402419
if bs != nil {
@@ -500,7 +517,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
500517
bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging())
501518
}
502519

503-
bs, err := store.NewBucketStore(
520+
s, err := store.NewBucketStore(
504521
userBkt,
505522
fetcher,
506523
u.syncDirForUser(userID),
@@ -520,6 +537,10 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
520537
return nil, err
521538
}
522539

540+
bs = &BucketStoreWithLastError{
541+
BucketStore: s,
542+
}
543+
523544
u.stores[userID] = bs
524545
u.metaFetcherMetrics.AddUserRegistry(userID, fetcherReg)
525546
u.bucketStoreMetrics.AddUserRegistry(userID, bucketStoreReg)

0 commit comments

Comments
 (0)