|
4 | 4 | "context"
|
5 | 5 | "fmt"
|
6 | 6 | "io"
|
| 7 | + "sort" |
7 | 8 | "strings"
|
8 | 9 | "sync"
|
9 | 10 | "time"
|
@@ -293,11 +294,6 @@ func (q *blocksStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
|
293 | 294 | return q.selectSorted(sp, matchers...)
|
294 | 295 | }
|
295 | 296 |
|
296 |
| -func (q *blocksStoreQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { |
297 |
| - // Cortex doesn't use this. It will ask ingesters for metadata. |
298 |
| - return nil, nil, errors.New("not implemented") |
299 |
| -} |
300 |
| - |
301 | 297 | func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) {
|
302 | 298 | spanLog, spanCtx := spanlogger.New(q.ctx, "blocksStoreQuerier.LabelNames")
|
303 | 299 | defer spanLog.Span.Finish()
|
@@ -333,6 +329,41 @@ func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) {
|
333 | 329 | return strutil.MergeSlices(resNameSets...), resWarnings, nil
|
334 | 330 | }
|
335 | 331 |
|
| 332 | +func (q *blocksStoreQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { |
| 333 | + spanLog, spanCtx := spanlogger.New(q.ctx, "blocksStoreQuerier.LabelValues") |
| 334 | + defer spanLog.Span.Finish() |
| 335 | + |
| 336 | + minT, maxT := q.minT, q.maxT |
| 337 | + |
| 338 | + var ( |
| 339 | + resValueSets = [][]string{} |
| 340 | + resWarnings = storage.Warnings(nil) |
| 341 | + |
| 342 | + resultMtx sync.Mutex |
| 343 | + ) |
| 344 | + |
| 345 | + queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { |
| 346 | + valueSets, warnings, queriedBlocks, err := q.fetchLabelValuesFromStore(spanCtx, name, clients, minT, maxT) |
| 347 | + if err != nil { |
| 348 | + return nil, err |
| 349 | + } |
| 350 | + |
| 351 | + resultMtx.Lock() |
| 352 | + resValueSets = append(resValueSets, valueSets...) |
| 353 | + resWarnings = append(resWarnings, warnings...) |
| 354 | + resultMtx.Unlock() |
| 355 | + |
| 356 | + return queriedBlocks, nil |
| 357 | + } |
| 358 | + |
| 359 | + err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc) |
| 360 | + if err != nil { |
| 361 | + return nil, nil, err |
| 362 | + } |
| 363 | + |
| 364 | + return strutil.MergeSlices(resValueSets...), resWarnings, nil |
| 365 | +} |
| 366 | + |
336 | 367 | func (q *blocksStoreQuerier) Close() error {
|
337 | 368 | return nil
|
338 | 369 | }
|
@@ -690,6 +721,85 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
|
690 | 721 | return nameSets, warnings, queriedBlocks, nil
|
691 | 722 | }
|
692 | 723 |
|
| 724 | +func (q *blocksStoreQuerier) fetchLabelValuesFromStore( |
| 725 | + ctx context.Context, |
| 726 | + name string, |
| 727 | + clients map[BlocksStoreClient][]ulid.ULID, |
| 728 | + minT int64, |
| 729 | + maxT int64, |
| 730 | +) ([][]string, storage.Warnings, []ulid.ULID, error) { |
| 731 | + var ( |
| 732 | + reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) |
| 733 | + g, gCtx = errgroup.WithContext(reqCtx) |
| 734 | + mtx = sync.Mutex{} |
| 735 | + nameSets = [][]string{} |
| 736 | + warnings = storage.Warnings(nil) |
| 737 | + queriedBlocks = []ulid.ULID(nil) |
| 738 | + spanLog = spanlogger.FromContext(ctx) |
| 739 | + ) |
| 740 | + |
| 741 | + // Concurrently fetch series from all clients. |
| 742 | + for c, blockIDs := range clients { |
| 743 | + // Change variables scope since it will be used in a goroutine. |
| 744 | + c := c |
| 745 | + blockIDs := blockIDs |
| 746 | + |
| 747 | + g.Go(func() error { |
| 748 | + req, err := createLabelValuesRequest(minT, maxT, name, blockIDs) |
| 749 | + if err != nil { |
| 750 | + return errors.Wrapf(err, "failed to create label names request") |
| 751 | + } |
| 752 | + |
| 753 | + valuesResp, err := c.LabelValues(gCtx, req) |
| 754 | + if err != nil { |
| 755 | + return errors.Wrapf(err, "failed to fetch series from %s", c) |
| 756 | + } |
| 757 | + |
| 758 | + myQueriedBlocks := []ulid.ULID(nil) |
| 759 | + if valuesResp.Hints != nil { |
| 760 | + hints := hintspb.LabelValuesResponseHints{} |
| 761 | + if err := types.UnmarshalAny(valuesResp.Hints, &hints); err != nil { |
| 762 | + return errors.Wrapf(err, "failed to unmarshal label names hints from %s", c) |
| 763 | + } |
| 764 | + |
| 765 | + ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) |
| 766 | + if err != nil { |
| 767 | + return errors.Wrapf(err, "failed to parse queried block IDs from received hints") |
| 768 | + } |
| 769 | + |
| 770 | + myQueriedBlocks = ids |
| 771 | + } |
| 772 | + |
| 773 | + level.Debug(spanLog).Log("msg", "received label names from store-gateway", |
| 774 | + "instance", c, |
| 775 | + "num labels", len(valuesResp.Values), |
| 776 | + "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), |
| 777 | + "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) |
| 778 | + |
| 779 | + // Values returned need not be sorted, but we need them to be sorted so we can merge. |
| 780 | + sort.Strings(valuesResp.Values) |
| 781 | + |
| 782 | + // Store the result. |
| 783 | + mtx.Lock() |
| 784 | + nameSets = append(nameSets, valuesResp.Values) |
| 785 | + for _, w := range valuesResp.Warnings { |
| 786 | + warnings = append(warnings, errors.New(w)) |
| 787 | + } |
| 788 | + queriedBlocks = append(queriedBlocks, myQueriedBlocks...) |
| 789 | + mtx.Unlock() |
| 790 | + |
| 791 | + return nil |
| 792 | + }) |
| 793 | + } |
| 794 | + |
| 795 | + // Wait until all client requests complete. |
| 796 | + if err := g.Wait(); err != nil { |
| 797 | + return nil, nil, nil, err |
| 798 | + } |
| 799 | + |
| 800 | + return nameSets, warnings, queriedBlocks, nil |
| 801 | +} |
| 802 | + |
693 | 803 | func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skipChunks bool, blockIDs []ulid.ULID) (*storepb.SeriesRequest, error) {
|
694 | 804 | // Selectively query only specific blocks.
|
695 | 805 | hints := &hintspb.SeriesRequestHints{
|
|
0 commit comments