Skip to content

StoreGateway: Implement metadata API limit in queryable #6195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
* [ENHANCEMENT] Ingester/Ring: New `READONLY` status on ring to be used by Ingester. New ingester API to change mode of ingester #6163
* [ENHANCEMENT] Ruler: Add query statistics metrics when --ruler.query-stats-enabled=true. #6173
* [ENHANCEMENT] Ingester: Add new API `/ingester/all_user_stats` which shows loaded blocks, active timeseries and ingestion rate for a specific ingester. #6178
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to to track the number of histogram samples which resolution was reduced. #6182
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182
* [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195

## 1.18.0 2024-09-03

Expand Down
43 changes: 30 additions & 13 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,11 @@ func (q *blocksStoreQuerier) LabelNames(ctx context.Context, hints *storage.Labe
spanLog, spanCtx := spanlogger.New(ctx, "blocksStoreQuerier.LabelNames")
defer spanLog.Span.Finish()

minT, maxT := q.minT, q.maxT
minT, maxT, limit := q.minT, q.maxT, int64(0)

if hints != nil {
limit = int64(hints.Limit)
}

var (
resMtx sync.Mutex
Expand All @@ -355,7 +359,7 @@ func (q *blocksStoreQuerier) LabelNames(ctx context.Context, hints *storage.Labe
)

queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) {
nameSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelNamesFromStore(spanCtx, userID, clients, minT, maxT, convertedMatchers)
nameSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelNamesFromStore(spanCtx, userID, clients, minT, maxT, limit, convertedMatchers)
if err != nil {
return nil, err, retryableError
}
Expand All @@ -372,6 +376,7 @@ func (q *blocksStoreQuerier) LabelNames(ctx context.Context, hints *storage.Labe
return nil, nil, err
}

// TODO(johrry): pass limit when merging.
return strutil.MergeSlices(resNameSets...), resWarnings, nil
}

Expand All @@ -384,7 +389,11 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, hints
spanLog, spanCtx := spanlogger.New(ctx, "blocksStoreQuerier.LabelValues")
defer spanLog.Span.Finish()

minT, maxT := q.minT, q.maxT
minT, maxT, limit := q.minT, q.maxT, int64(0)

if hints != nil {
limit = int64(hints.Limit)
}

var (
resValueSets = [][]string{}
Expand All @@ -394,7 +403,7 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, hints
)

queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) {
valueSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelValuesFromStore(spanCtx, userID, name, clients, minT, maxT, matchers...)
valueSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelValuesFromStore(spanCtx, userID, name, clients, minT, maxT, limit, matchers...)
if err != nil {
return nil, err, retryableError
}
Expand All @@ -411,6 +420,7 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, hints
return nil, nil, err
}

// TODO(johrry): pass limit when merging.
return strutil.MergeSlices(resValueSets...), resWarnings, nil
}

Expand All @@ -427,9 +437,9 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec
spanLog, spanCtx := spanlogger.New(ctx, "blocksStoreQuerier.selectSorted")
defer spanLog.Span.Finish()

minT, maxT := q.minT, q.maxT
minT, maxT, limit := q.minT, q.maxT, int64(0)
if sp != nil {
minT, maxT = sp.Start, sp.End
minT, maxT, limit = sp.Start, sp.End, int64(sp.Limit)
}

var (
Expand All @@ -443,7 +453,7 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec
)

queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) {
seriesSets, queriedBlocks, warnings, numChunks, err, retryableError := q.fetchSeriesFromStores(spanCtx, sp, userID, clients, minT, maxT, matchers, maxChunksLimit, leftChunksLimit)
seriesSets, queriedBlocks, warnings, numChunks, err, retryableError := q.fetchSeriesFromStores(spanCtx, sp, userID, clients, minT, maxT, limit, matchers, maxChunksLimit, leftChunksLimit)
if err != nil {
return nil, err, retryableError
}
Expand Down Expand Up @@ -471,6 +481,7 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec
storage.EmptySeriesSet()
}

// TODO(johrry): pass limit when merging.
return series.NewSeriesSetWithWarnings(
storage.NewMergeSeriesSet(resSeriesSets, storage.ChainedSeriesMerge),
resWarnings)
Expand Down Expand Up @@ -593,6 +604,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
clients map[BlocksStoreClient][]ulid.ULID,
minT int64,
maxT int64,
limit int64,
matchers []*labels.Matcher,
maxChunksLimit int,
leftChunksLimit int,
Expand Down Expand Up @@ -635,7 +647,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
seriesQueryStats := &hintspb.QueryStats{}
skipChunks := sp != nil && sp.Func == "series"

req, err := createSeriesRequest(minT, maxT, convertedMatchers, shardingInfo, skipChunks, blockIDs, defaultAggrs)
req, err := createSeriesRequest(minT, maxT, limit, convertedMatchers, shardingInfo, skipChunks, blockIDs, defaultAggrs)
if err != nil {
return errors.Wrapf(err, "failed to create series request")
}
Expand Down Expand Up @@ -825,6 +837,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
clients map[BlocksStoreClient][]ulid.ULID,
minT int64,
maxT int64,
limit int64,
matchers []storepb.LabelMatcher,
) ([][]string, annotations.Annotations, []ulid.ULID, error, error) {
var (
Expand All @@ -846,7 +859,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
blockIDs := blockIDs

g.Go(func() error {
req, err := createLabelNamesRequest(minT, maxT, blockIDs, matchers)
req, err := createLabelNamesRequest(minT, maxT, limit, blockIDs, matchers)
if err != nil {
return errors.Wrapf(err, "failed to create label names request")
}
Expand Down Expand Up @@ -927,6 +940,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
clients map[BlocksStoreClient][]ulid.ULID,
minT int64,
maxT int64,
limit int64,
matchers ...*labels.Matcher,
) ([][]string, annotations.Annotations, []ulid.ULID, error, error) {
var (
Expand All @@ -948,7 +962,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
blockIDs := blockIDs

g.Go(func() error {
req, err := createLabelValuesRequest(minT, maxT, name, blockIDs, matchers...)
req, err := createLabelValuesRequest(minT, maxT, limit, name, blockIDs, matchers...)
if err != nil {
return errors.Wrapf(err, "failed to create label values request")
}
Expand Down Expand Up @@ -1025,7 +1039,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
return valueSets, warnings, queriedBlocks, nil, merr.Err()
}

func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID, aggrs []storepb.Aggr) (*storepb.SeriesRequest, error) {
func createSeriesRequest(minT, maxT, limit int64, matchers []storepb.LabelMatcher, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID, aggrs []storepb.Aggr) (*storepb.SeriesRequest, error) {
// Selectively query only specific blocks.
hints := &hintspb.SeriesRequestHints{
BlockMatchers: []storepb.LabelMatcher{
Expand All @@ -1046,6 +1060,7 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, shar
return &storepb.SeriesRequest{
MinTime: minT,
MaxTime: maxT,
Limit: limit,
Matchers: matchers,
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT,
Hints: anyHints,
Expand All @@ -1057,10 +1072,11 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, shar
}, nil
}

func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID, matchers []storepb.LabelMatcher) (*storepb.LabelNamesRequest, error) {
func createLabelNamesRequest(minT, maxT, limit int64, blockIDs []ulid.ULID, matchers []storepb.LabelMatcher) (*storepb.LabelNamesRequest, error) {
req := &storepb.LabelNamesRequest{
Start: minT,
End: maxT,
Limit: limit,
Matchers: matchers,
}

Expand All @@ -1085,10 +1101,11 @@ func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID, matchers []
return req, nil
}

func createLabelValuesRequest(minT, maxT int64, label string, blockIDs []ulid.ULID, matchers ...*labels.Matcher) (*storepb.LabelValuesRequest, error) {
func createLabelValuesRequest(minT, maxT, limit int64, label string, blockIDs []ulid.ULID, matchers ...*labels.Matcher) (*storepb.LabelValuesRequest, error) {
req := &storepb.LabelValuesRequest{
Start: minT,
End: maxT,
Limit: limit,
Label: label,
Matchers: convertMatchersToLabelMatcher(matchers),
}
Expand Down
Loading
Loading