Skip to content

Commit 5ca94a0

Browse files
authored
Add max-inflight-requests limit to store gateway (#5553)
* Add max-inflight-request limit to bucket stores Signed-off-by: Justin Jung <[email protected]> * Add changelog + fix the config name Signed-off-by: Justin Jung <[email protected]> * nit Signed-off-by: Justin Jung <[email protected]> * Lint Signed-off-by: Justin Jung <[email protected]> * Update docs Signed-off-by: Justin Jung <[email protected]> * Add read lock + increment right before sending series request Signed-off-by: Justin Jung <[email protected]> * Fix typo in config description + move max inflight request check to right before making the series call Signed-off-by: Justin Jung <[email protected]> * Fix test Signed-off-by: Justin Jung <[email protected]> * Renamed metric name to match convention Signed-off-by: Justin Jung <[email protected]> --------- Signed-off-by: Justin Jung <[email protected]>
1 parent b49cf5a commit 5ca94a0

File tree

10 files changed

+168
-0
lines changed

10 files changed

+168
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* [FEATURE] Ruler: Support for filtering rules in the API. #5417
3636
* [FEATURE] Compactor: Add `-compactor.ring.tokens-file-path` to store generated tokens locally. #5432
3737
* [FEATURE] Query Frontend: Add `-frontend.retry-on-too-many-outstanding-requests` to re-enqueue 429 requests if there are multiple query-schedulers available. #5496
38+
* [FEATURE] Store Gateway: Add `-blocks-storage.bucket-store.max-inflight-requests`for store gateways to reject further requests upon reaching the limit. #5553
3839
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
3940
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
4041
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323

docs/blocks-storage/querier.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,11 @@ blocks_storage:
499499
# CLI flag: -blocks-storage.bucket-store.max-concurrent
500500
[max_concurrent: <int> | default = 100]
501501
502+
# Max number of inflight queries to execute against the long-term storage.
503+
# The limit is shared across all tenants. 0 to disable.
504+
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
505+
[max_inflight_requests: <int> | default = 0]
506+
502507
# Maximum number of concurrent tenants synching blocks.
503508
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
504509
[tenant_sync_concurrency: <int> | default = 10]

docs/blocks-storage/store-gateway.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,11 @@ blocks_storage:
602602
# CLI flag: -blocks-storage.bucket-store.max-concurrent
603603
[max_concurrent: <int> | default = 100]
604604
605+
# Max number of inflight queries to execute against the long-term storage.
606+
# The limit is shared across all tenants. 0 to disable.
607+
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
608+
[max_inflight_requests: <int> | default = 0]
609+
605610
# Maximum number of concurrent tenants synching blocks.
606611
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
607612
[tenant_sync_concurrency: <int> | default = 10]

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,11 @@ bucket_store:
10421042
# CLI flag: -blocks-storage.bucket-store.max-concurrent
10431043
[max_concurrent: <int> | default = 100]
10441044
1045+
# Max number of inflight queries to execute against the long-term storage. The
1046+
# limit is shared across all tenants. 0 to disable.
1047+
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
1048+
[max_inflight_requests: <int> | default = 0]
1049+
10451050
# Maximum number of concurrent tenants synching blocks.
10461051
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
10471052
[tenant_sync_concurrency: <int> | default = 10]

pkg/querier/blocks_store_queryable.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,6 +1116,8 @@ func isRetryableError(err error) bool {
11161116
switch status.Code(err) {
11171117
case codes.Unavailable:
11181118
return true
1119+
case codes.ResourceExhausted:
1120+
return errors.Is(err, storegateway.ErrTooManyInflightRequests)
11191121
default:
11201122
return false
11211123
}

pkg/querier/blocks_store_queryable_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"google.golang.org/grpc/status"
3535

3636
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
37+
"github.com/cortexproject/cortex/pkg/storegateway"
3738
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
3839
"github.com/cortexproject/cortex/pkg/util"
3940
"github.com/cortexproject/cortex/pkg/util/limiter"
@@ -708,6 +709,56 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
708709
},
709710
},
710711
},
712+
"multiple store-gateways has the block, but one of them had too many inflight requests": {
713+
finderResult: bucketindex.Blocks{
714+
{ID: block1},
715+
},
716+
storeSetResponses: []interface{}{
717+
map[BlocksStoreClient][]ulid.ULID{
718+
&storeGatewayClientMock{
719+
remoteAddr: "1.1.1.1",
720+
mockedSeriesErr: storegateway.ErrTooManyInflightRequests,
721+
}: {block1},
722+
},
723+
map[BlocksStoreClient][]ulid.ULID{
724+
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
725+
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
726+
mockHintsResponse(block1),
727+
}}: {block1},
728+
},
729+
},
730+
limits: &blocksStoreLimitsMock{},
731+
queryLimiter: noOpQueryLimiter,
732+
expectedSeries: []seriesResult{
733+
{
734+
lbls: labels.New(metricNameLabel, series1Label),
735+
values: []valueResult{
736+
{t: minT, v: 2},
737+
},
738+
},
739+
},
740+
},
741+
"store gateway returns resource exhausted error other than max inflight request": {
742+
finderResult: bucketindex.Blocks{
743+
{ID: block1},
744+
},
745+
storeSetResponses: []interface{}{
746+
map[BlocksStoreClient][]ulid.ULID{
747+
&storeGatewayClientMock{
748+
remoteAddr: "1.1.1.1",
749+
mockedSeriesErr: status.Error(codes.ResourceExhausted, "some other resource"),
750+
}: {block1},
751+
},
752+
map[BlocksStoreClient][]ulid.ULID{
753+
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
754+
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
755+
mockHintsResponse(block1),
756+
}}: {block1},
757+
},
758+
},
759+
limits: &blocksStoreLimitsMock{},
760+
expectedErr: errors.Wrapf(status.Error(codes.ResourceExhausted, "some other resource"), "failed to fetch series from 1.1.1.1"),
761+
},
711762
}
712763

713764
for testName, testData := range tests {

pkg/storage/tsdb/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ type BucketStoreConfig struct {
241241
SyncDir string `yaml:"sync_dir"`
242242
SyncInterval time.Duration `yaml:"sync_interval"`
243243
MaxConcurrent int `yaml:"max_concurrent"`
244+
MaxInflightRequests int `yaml:"max_inflight_requests"`
244245
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"`
245246
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
246247
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
@@ -294,6 +295,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
294295
f.IntVar(&cfg.ChunkPoolMinBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-min-bucket-size-bytes", ChunkPoolDefaultMinBucketSize, "Size - in bytes - of the smallest chunks pool bucket.")
295296
f.IntVar(&cfg.ChunkPoolMaxBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-max-bucket-size-bytes", ChunkPoolDefaultMaxBucketSize, "Size - in bytes - of the largest chunks pool bucket.")
296297
f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.")
298+
f.IntVar(&cfg.MaxInflightRequests, "blocks-storage.bucket-store.max-inflight-requests", 0, "Max number of inflight queries to execute against the long-term storage. The limit is shared across all tenants. 0 to disable.")
297299
f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants synching blocks.")
298300
f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks synching per tenant.")
299301
f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.")

pkg/storegateway/bucket_stores.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/weaveworks/common/logging"
3232
"google.golang.org/grpc/codes"
3333
"google.golang.org/grpc/metadata"
34+
"google.golang.org/grpc/status"
3435

3536
"github.com/cortexproject/cortex/pkg/storage/bucket"
3637
"github.com/cortexproject/cortex/pkg/storage/tsdb"
@@ -72,13 +73,19 @@ type BucketStores struct {
7273
storesErrorsMu sync.RWMutex
7374
storesErrors map[string]error
7475

76+
// Keeps number of inflight requests
77+
inflightRequestCnt int
78+
inflightRequestMu sync.RWMutex
79+
7580
// Metrics.
7681
syncTimes prometheus.Histogram
7782
syncLastSuccess prometheus.Gauge
7883
tenantsDiscovered prometheus.Gauge
7984
tenantsSynced prometheus.Gauge
8085
}
8186

87+
var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway")
88+
8289
// NewBucketStores makes a new BucketStores.
8390
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
8491
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg)
@@ -313,6 +320,16 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
313320
return nil
314321
}
315322

323+
maxInflightRequests := u.cfg.BucketStore.MaxInflightRequests
324+
if maxInflightRequests > 0 {
325+
if u.getInflightRequestCnt() >= maxInflightRequests {
326+
return ErrTooManyInflightRequests
327+
}
328+
329+
u.incrementInflightRequestCnt()
330+
defer u.decrementInflightRequestCnt()
331+
}
332+
316333
err = store.Series(req, spanSeriesServer{
317334
Store_SeriesServer: srv,
318335
ctx: spanCtx,
@@ -321,6 +338,24 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
321338
return err
322339
}
323340

341+
func (u *BucketStores) getInflightRequestCnt() int {
342+
u.inflightRequestMu.RLock()
343+
defer u.inflightRequestMu.RUnlock()
344+
return u.inflightRequestCnt
345+
}
346+
347+
func (u *BucketStores) incrementInflightRequestCnt() {
348+
u.inflightRequestMu.Lock()
349+
u.inflightRequestCnt++
350+
u.inflightRequestMu.Unlock()
351+
}
352+
353+
func (u *BucketStores) decrementInflightRequestCnt() {
354+
u.inflightRequestMu.Lock()
355+
u.inflightRequestCnt--
356+
u.inflightRequestMu.Unlock()
357+
}
358+
324359
// LabelNames implements the Storegateway proto service.
325360
func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
326361
spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames")

pkg/storegateway/bucket_stores_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,48 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t
514514
}
515515
}
516516

517+
func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *testing.T) {
518+
cfg := prepareStorageConfig(t)
519+
cfg.BucketStore.MaxInflightRequests = 10
520+
reg := prometheus.NewPedanticRegistry()
521+
storageDir := t.TempDir()
522+
generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15)
523+
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
524+
require.NoError(t, err)
525+
526+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
527+
require.NoError(t, err)
528+
require.NoError(t, stores.InitialSync(context.Background()))
529+
530+
stores.inflightRequestMu.Lock()
531+
stores.inflightRequestCnt = 10
532+
stores.inflightRequestMu.Unlock()
533+
series, warnings, err := querySeries(stores, "user_id", "series_1", 0, 100)
534+
assert.ErrorIs(t, err, ErrTooManyInflightRequests)
535+
assert.Empty(t, series)
536+
assert.Empty(t, warnings)
537+
}
538+
539+
func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabled(t *testing.T) {
540+
cfg := prepareStorageConfig(t)
541+
reg := prometheus.NewPedanticRegistry()
542+
storageDir := t.TempDir()
543+
generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15)
544+
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
545+
require.NoError(t, err)
546+
547+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
548+
require.NoError(t, err)
549+
require.NoError(t, stores.InitialSync(context.Background()))
550+
551+
stores.inflightRequestMu.Lock()
552+
stores.inflightRequestCnt = 10 // max_inflight_request is set to 0 by default = disabled
553+
stores.inflightRequestMu.Unlock()
554+
series, _, err := querySeries(stores, "user_id", "series_1", 0, 100)
555+
require.NoError(t, err)
556+
assert.Equal(t, 1, len(series))
557+
}
558+
517559
func prepareStorageConfig(t *testing.T) cortex_tsdb.BlocksStorageConfig {
518560
cfg := cortex_tsdb.BlocksStorageConfig{}
519561
flagext.DefaultValues(&cfg)

pkg/storegateway/gateway.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ const (
3939
// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
4040
// in the ring will be automatically removed.
4141
ringAutoForgetUnhealthyPeriods = 10
42+
43+
instanceLimitsMetric = "cortex_storegateway_instance_limits"
44+
instanceLimitsMetricHelp = "Instance limits used by this store gateway."
45+
limitLabel = "limit"
4246
)
4347

4448
var (
@@ -142,6 +146,22 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf
142146
g.bucketSync.WithLabelValues(syncReasonPeriodic)
143147
g.bucketSync.WithLabelValues(syncReasonRingChange)
144148

149+
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
150+
Name: instanceLimitsMetric,
151+
Help: instanceLimitsMetricHelp,
152+
ConstLabels: map[string]string{limitLabel: "max_inflight_requests"},
153+
}).Set(float64(storageCfg.BucketStore.MaxInflightRequests))
154+
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
155+
Name: instanceLimitsMetric,
156+
Help: instanceLimitsMetricHelp,
157+
ConstLabels: map[string]string{limitLabel: "max_concurrent"},
158+
}).Set(float64(storageCfg.BucketStore.MaxConcurrent))
159+
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
160+
Name: instanceLimitsMetric,
161+
Help: instanceLimitsMetricHelp,
162+
ConstLabels: map[string]string{limitLabel: "max_chunk_pool_bytes"},
163+
}).Set(float64(storageCfg.BucketStore.MaxChunkPoolBytes))
164+
145165
// Init sharding strategy.
146166
var shardingStrategy ShardingStrategy
147167

0 commit comments

Comments
 (0)