From 0c64f1b632e5454d117a7b12cca47d76d2ffe2ad Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 11 Jul 2025 09:12:01 -0700 Subject: [PATCH 1/5] support vertical sharding for parquet queryable Signed-off-by: yeya24 --- go.mod | 2 + go.sum | 4 +- integration/parquet_querier_test.go | 5 +- pkg/querier/parquet_queryable.go | 88 +++++++- pkg/querier/parquet_queryable_test.go | 197 ++++++++++++++++++ .../queryable/parquet_queryable.go | 21 +- .../parquet-common/search/constraint.go | 6 +- .../parquet-common/search/materialize.go | 44 ++-- .../parquet-common/search/rowrange.go | 62 +++--- vendor/modules.txt | 3 +- 10 files changed, 365 insertions(+), 67 deletions(-) diff --git a/go.mod b/go.mod index 1b88330c581..5a2c6675433 100644 --- a/go.mod +++ b/go.mod @@ -325,3 +325,5 @@ replace github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20 // v3.3.1 with https://github.com/prometheus/prometheus/pull/16252. (same as thanos) replace github.com/prometheus/prometheus => github.com/thanos-io/thanos-prometheus v0.0.0-20250610133519-082594458a88 + +replace github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee diff --git a/go.sum b/go.sum index 9aa18e3ac6f..a0b5c316778 100644 --- a/go.sum +++ b/go.sum @@ -814,8 +814,6 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 h1:XoOXq+q+CcY8MZqAVoPtdG3R6o84aeZpZFDM+C9DJXg= -github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= @@ -955,6 +953,8 @@ github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8 github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee h1:bjvKyYMZvukXL7/vqcL0jNxo5JNtSAZlQHAIWeFyzuc= +github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index ca31a019c9a..69095d386fc 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -63,8 +63,9 @@ func TestParquetFuzz(t *testing.T) { "-store-gateway.sharding-enabled": "false", "--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways // alert manager - "-alertmanager.web.external-url": "http://localhost/alertmanager", - "-frontend.query-vertical-shard-size": "1", + "-alertmanager.web.external-url": "http://localhost/alertmanager", + // Enable vertical sharding. + "-frontend.query-vertical-shard-size": "3", "-frontend.max-cache-freshness": "1m", // enable experimental promQL funcs "-querier.enable-promql-experimental-functions": "true", diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 8d7fe7152ed..98f8a127c28 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -6,13 +6,13 @@ import ( "time" "github.com/go-kit/log" - "github.com/go-kit/log/level" lru "github.com/hashicorp/golang-lru/v2" "github.com/opentracing/opentracing-go" "github.com/parquet-go/parquet-go" "github.com/pkg/errors" "github.com/prometheus-community/parquet-common/queryable" "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/search" parquet_storage "github.com/prometheus-community/parquet-common/storage" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -20,17 +20,18 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querysharding" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" - util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/multierror" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" @@ -153,6 +154,7 @@ func NewParquetQueryable( userID, _ := tenant.TenantID(ctx) return int64(limits.ParquetMaxFetchedDataBytes(userID)) }), + queryable.WithMaterializedLabelsCallback(materializedLabelsCallback), queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error { queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx) lbls := make([][]cortexpb.LabelAdapter, 0, len(cs)) @@ -432,17 +434,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.Select") defer span.Finish() - userID, err := tenant.TenantID(ctx) + newMatchers, shardMatcher, err := querysharding.ExtractShardingMatchers(matchers) if err != nil { return storage.ErrSeriesSet(err) } - - if q.limits.QueryVerticalShardSize(userID) > 1 { - uLogger := util_log.WithUserID(userID, q.logger) - level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertical sharding > 1. Falling back to the block storage") - - return q.blocksStoreQuerier.Select(ctx, sortSeries, h, matchers...) - } + defer shardMatcher.Close() hints := storage.SelectHints{ Start: q.minT, @@ -483,7 +479,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool go func() { span, _ := opentracing.StartSpanFromContext(ctx, "parquetQuerier.Select") defer span.Finish() - p <- q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, &hints, matchers...) + parquetCtx := InjectBlocksIntoContext(ctx, parquet...) + if shardMatcher != nil { + parquetCtx = injectShardMatcherIntoContext(parquetCtx, shardMatcher) + } + p <- q.parquetQuerier.Select(parquetCtx, sortSeries, &hints, newMatchers...) }() } @@ -570,6 +570,56 @@ func (q *parquetQuerierWithFallback) incrementOpsMetric(method string, remaining } } +func materializedLabelsCallback(ctx context.Context, _ *storage.SelectHints, seriesLabels [][]labels.Label, rr []search.RowRange) ([][]labels.Label, []search.RowRange) { + shardMatcher, exists := extractShardMatcherFromContext(ctx) + if !exists || !shardMatcher.IsSharded() { + return seriesLabels, rr + } + + var filteredLabels [][]labels.Label + var filteredRowRanges []search.RowRange + + // Track which individual rows match the shard matcher + rowMatches := make([]bool, len(seriesLabels)) + for i, lbls := range seriesLabels { + rowMatches[i] = shardMatcher.MatchesLabels(labels.New(lbls...)) + if rowMatches[i] { + filteredLabels = append(filteredLabels, lbls) + } + } + + // Convert matching rows back into row ranges + currentRange := search.RowRange{} + inRange := false + + for i, matches := range rowMatches { + if matches { + if !inRange { + // Start a new range + currentRange.From = int64(i) + currentRange.Count = 1 + inRange = true + } else { + // Extend the current range + currentRange.Count++ + } + } else { + if inRange { + // End the current range + filteredRowRanges = append(filteredRowRanges, currentRange) + inRange = false + } + } + } + + // Don't forget to add the last range if we're still in one + if inRange { + filteredRowRanges = append(filteredRowRanges, currentRange) + } + + return filteredLabels, filteredRowRanges +} + type cacheInterface[T any] interface { Get(path string) T Set(path string, reader T) @@ -655,3 +705,19 @@ func (n noopCache[T]) Get(_ string) (r T) { func (n noopCache[T]) Set(_ string, _ T) { } + +var ( + shardMatcherCtxKey contextKey = 1 +) + +func injectShardMatcherIntoContext(ctx context.Context, sm *storepb.ShardMatcher) context.Context { + return context.WithValue(ctx, shardMatcherCtxKey, sm) +} + +func extractShardMatcherFromContext(ctx context.Context) (*storepb.ShardMatcher, bool) { + if sm := ctx.Value(shardMatcherCtxKey); sm != nil { + return sm.(*storepb.ShardMatcher), true + } + + return nil, false +} diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 13cdde6cd57..5563ff31256 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -5,12 +5,14 @@ import ( "fmt" "math/rand" "path/filepath" + "sync" "testing" "time" "github.com/go-kit/log" "github.com/oklog/ulid/v2" "github.com/prometheus-community/parquet-common/convert" + "github.com/prometheus-community/parquet-common/search" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -597,6 +599,201 @@ func TestParquetQueryable_Limits(t *testing.T) { } } +func TestMaterializedLabelsCallback(t *testing.T) { + t.Parallel() + + // Helper function to create test labels + createLabels := func(name, value string) []labels.Label { + return []labels.Label{{Name: name, Value: value}} + } + + buf := sync.Pool{New: func() interface{} { + b := make([]byte, 0, 1024) + return &b + }} + // Helper function to create a proper shard matcher with buffer pool + createShardMatcher := func(labels []string, by bool, totalShards, shardIndex int64) *storepb.ShardMatcher { + si := &storepb.ShardInfo{ + ShardIndex: shardIndex, + TotalShards: totalShards, + By: by, + Labels: labels, + } + return si.Matcher(&buf) + } + + tests := map[string]struct { + ctx context.Context + seriesLabels [][]labels.Label + rowRanges []search.RowRange + expectedLabels [][]labels.Label + expectedRanges []search.RowRange + }{ + "no shard matcher in context - returns original data": { + ctx: context.Background(), + seriesLabels: [][]labels.Label{ + createLabels("__name__", "metric1"), + createLabels("__name__", "metric2"), + }, + rowRanges: []search.RowRange{ + {From: 0, Count: 2}, + }, + expectedLabels: [][]labels.Label{ + createLabels("__name__", "metric1"), + createLabels("__name__", "metric2"), + }, + expectedRanges: []search.RowRange{ + {From: 0, Count: 2}, + }, + }, + "shard matcher exists but not sharded - returns original data": { + ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher(nil, false, 0, 0)), + seriesLabels: [][]labels.Label{ + createLabels("__name__", "metric1"), + createLabels("__name__", "metric2"), + }, + rowRanges: []search.RowRange{ + {From: 0, Count: 2}, + }, + expectedLabels: [][]labels.Label{ + createLabels("__name__", "metric1"), + createLabels("__name__", "metric2"), + }, + expectedRanges: []search.RowRange{ + {From: 0, Count: 2}, + }, + }, + "shard matcher with shard by labels - filters correctly": { + ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher([]string{"__name__"}, true, 2, 0)), + seriesLabels: [][]labels.Label{ + createLabels("__name__", "metric1"), + createLabels("__name__", "metric2"), + createLabels("__name__", "metric3"), + createLabels("__name__", "metric4"), + }, + rowRanges: []search.RowRange{ + {From: 0, Count: 4}, + }, + expectedLabels: [][]labels.Label{ + createLabels("__name__", "metric1"), + createLabels("__name__", "metric2"), + createLabels("__name__", "metric4"), + }, + expectedRanges: []search.RowRange{ + {From: 0, Count: 2}, + {From: 3, Count: 1}, + }, + }, + "shard matcher with shard without labels - filters correctly": { + ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher([]string{"__name__"}, false, 2, 1)), + seriesLabels: [][]labels.Label{ + createLabels("__name__", "metric1"), + createLabels("__name__", "metric2"), + createLabels("__name__", "metric3"), + createLabels("__name__", "metric4"), + }, + rowRanges: []search.RowRange{ + {From: 0, Count: 4}, + }, + expectedLabels: [][]labels.Label{ + createLabels("__name__", "metric1"), + createLabels("__name__", "metric2"), + createLabels("__name__", "metric3"), + createLabels("__name__", "metric4"), + }, + expectedRanges: []search.RowRange{ + {From: 0, Count: 4}, + }, + }, + "shard matcher with multiple labels - filters correctly": { + ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher([]string{"__name__", "instance"}, true, 3, 1)), + seriesLabels: [][]labels.Label{ + {{Name: "__name__", Value: "metric1"}, {Name: "instance", Value: "host1"}}, + {{Name: "__name__", Value: "metric2"}, {Name: "instance", Value: "host2"}}, + {{Name: "__name__", Value: "metric3"}, {Name: "instance", Value: "host3"}}, + {{Name: "__name__", Value: "metric4"}, {Name: "instance", Value: "host4"}}, + {{Name: "__name__", Value: "metric5"}, {Name: "instance", Value: "host5"}}, + {{Name: "__name__", Value: "metric6"}, {Name: "instance", Value: "host6"}}, + }, + rowRanges: []search.RowRange{ + {From: 0, Count: 6}, + }, + expectedLabels: [][]labels.Label{ + {{Name: "__name__", Value: "metric2"}, {Name: "instance", Value: "host2"}}, + {{Name: "__name__", Value: "metric3"}, {Name: "instance", Value: "host3"}}, + }, + expectedRanges: []search.RowRange{ + {From: 1, Count: 2}, + }, + }, + "no matching rows - returns empty results": { + ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher([]string{"__name__"}, true, 2, 1)), + seriesLabels: [][]labels.Label{ + createLabels("__name__", "metric1"), + createLabels("__name__", "metric2"), + }, + rowRanges: []search.RowRange{ + {From: 0, Count: 2}, + }, + expectedLabels: [][]labels.Label(nil), + expectedRanges: []search.RowRange(nil), + }, + "all rows match - creates single range": { + ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher([]string{"__name__"}, true, 1, 0)), + seriesLabels: [][]labels.Label{ + createLabels("__name__", "metric1"), + createLabels("__name__", "metric2"), + createLabels("__name__", "metric3"), + }, + rowRanges: []search.RowRange{ + {From: 0, Count: 3}, + }, + expectedLabels: [][]labels.Label{ + createLabels("__name__", "metric1"), + createLabels("__name__", "metric2"), + createLabels("__name__", "metric3"), + }, + expectedRanges: []search.RowRange{ + {From: 0, Count: 3}, + }, + }, + "matching row at the end - creates range ending at last row": { + ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher([]string{"__name__"}, true, 2, 1)), + seriesLabels: [][]labels.Label{ + createLabels("__name__", "metric1"), + createLabels("__name__", "metric2"), + createLabels("__name__", "metric3"), + }, + rowRanges: []search.RowRange{ + {From: 0, Count: 3}, + }, + expectedLabels: [][]labels.Label{ + createLabels("__name__", "metric3"), + }, + expectedRanges: []search.RowRange{ + {From: 2, Count: 1}, + }, + }, + } + + for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + + // Clean up shard matcher after test + if shardMatcher, exists := extractShardMatcherFromContext(testData.ctx); exists { + defer shardMatcher.Close() + } + + resultLabels, resultRanges := materializedLabelsCallback(testData.ctx, nil, testData.seriesLabels, testData.rowRanges) + + require.Equal(t, testData.expectedLabels, resultLabels, "labels mismatch") + require.Equal(t, testData.expectedRanges, resultRanges, "row ranges mismatch") + }) + } +} + // convertBlockToParquet converts a TSDB block to parquet and uploads it to the bucket func convertBlockToParquet(t *testing.T, ctx context.Context, userBucketClient objstore.Bucket, blockID ulid.ULID, blockDir string) error { tsdbBlock, err := tsdb.OpenBlock(nil, blockDir, chunkenc.NewPool(), tsdb.DefaultPostingsDecoderFactory) diff --git a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go index c4b2996f5ab..bf7f96f75a1 100644 --- a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go +++ b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go @@ -39,6 +39,7 @@ type queryableOpts struct { chunkBytesLimitFunc search.QuotaLimitFunc dataBytesLimitFunc search.QuotaLimitFunc materializedSeriesCallback search.MaterializedSeriesFunc + materializedLabelsCallback search.MaterializedLabelsFunc } var DefaultQueryableOpts = queryableOpts{ @@ -47,6 +48,7 @@ var DefaultQueryableOpts = queryableOpts{ chunkBytesLimitFunc: search.NoopQuotaLimitFunc, dataBytesLimitFunc: search.NoopQuotaLimitFunc, materializedSeriesCallback: search.NoopMaterializedSeriesFunc, + materializedLabelsCallback: search.NoopMaterializedLabelsFunc, } type QueryableOpts func(*queryableOpts) @@ -87,6 +89,13 @@ func WithMaterializedSeriesCallback(fn search.MaterializedSeriesFunc) QueryableO } } +// WithMaterializedLabelsCallback sets a callback function to process the series labels for filtering before materializing chunks. +func WithMaterializedLabelsCallback(fn search.MaterializedLabelsFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.materializedLabelsCallback = fn + } +} + type parquetQueryable struct { shardsFinder ShardsFinderFunction d *schema.PrometheusParquetChunksDecoder @@ -207,7 +216,7 @@ func (p parquetQuerier) Select(ctx context.Context, sorted bool, sp *prom_storag for i, shard := range shards { errGroup.Go(func() error { - ss, err := shard.Query(ctx, sorted, minT, maxT, skipChunks, matchers) + ss, err := shard.Query(ctx, sorted, sp, minT, maxT, skipChunks, matchers) seriesSet[i] = ss return err }) @@ -252,7 +261,7 @@ func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schem if err != nil { return nil, err } - m, err := search.NewMaterializer(s, d, block, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback) + m, err := search.NewMaterializer(s, d, block, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback, opts.materializedLabelsCallback) if err != nil { return nil, err } @@ -264,7 +273,7 @@ func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schem }, nil } -func (b queryableShard) Query(ctx context.Context, sorted bool, mint, maxt int64, skipChunks bool, matchers []*labels.Matcher) (prom_storage.ChunkSeriesSet, error) { +func (b queryableShard) Query(ctx context.Context, sorted bool, sp *prom_storage.SelectHints, mint, maxt int64, skipChunks bool, matchers []*labels.Matcher) (prom_storage.ChunkSeriesSet, error) { errGroup, ctx := errgroup.WithContext(ctx) errGroup.SetLimit(b.concurrency) @@ -290,10 +299,14 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, mint, maxt int64 return nil } - series, err := b.m.Materialize(ctx, rgi, mint, maxt, skipChunks, rr) + series, err := b.m.Materialize(ctx, sp, rgi, mint, maxt, skipChunks, rr) if err != nil { return err } + if len(series) == 0 { + return nil + } + rMtx.Lock() results = append(results, series...) rMtx.Unlock() diff --git a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go index 03598bd28b1..5736073b414 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go @@ -94,7 +94,7 @@ func Filter(ctx context.Context, s storage.ParquetShard, rgIdx int, cs ...Constr } } var err error - rr := []RowRange{{from: int64(0), count: rg.NumRows()}} + rr := []RowRange{{From: int64(0), Count: rg.NumRows()}} for i := range cs { isPrimary := len(sc) > 0 && cs[i].path() == sc[0].Path()[0] rr, err = cs[i].filter(ctx, rgIdx, isPrimary, rr) @@ -185,7 +185,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, if len(rr) == 0 { return nil, nil } - from, to := rr[0].from, rr[len(rr)-1].from+rr[len(rr)-1].count + from, to := rr[0].From, rr[len(rr)-1].From+rr[len(rr)-1].Count rg := ec.f.RowGroups()[rgIdx] @@ -398,7 +398,7 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, if len(rr) == 0 { return nil, nil } - from, to := rr[0].from, rr[len(rr)-1].from+rr[len(rr)-1].count + from, to := rr[0].From, rr[len(rr)-1].From+rr[len(rr)-1].Count rg := rc.f.RowGroups()[rgIdx] diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index 2f485503e35..b12d7546799 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -50,6 +50,7 @@ type Materializer struct { dataBytesQuota *Quota materializedSeriesCallback MaterializedSeriesFunc + materializedLabelsCallback MaterializedLabelsFunc } // MaterializedSeriesFunc is a callback function that can be used to add limiter or statistic logics for @@ -61,6 +62,15 @@ func NoopMaterializedSeriesFunc(_ context.Context, _ []prom_storage.ChunkSeries) return nil } +// MaterializedLabelsFunc is a callback function that processes materialized series labels before materializing chunks. +// This can be used to filter series. +type MaterializedLabelsFunc func(ctx context.Context, hints *prom_storage.SelectHints, series [][]labels.Label, rr []RowRange) ([][]labels.Label, []RowRange) + +// NoopMaterializedLabelsFunc is a noop callback function that does nothing. +func NoopMaterializedLabelsFunc(_ context.Context, _ *prom_storage.SelectHints, lbls [][]labels.Label, rr []RowRange) ([][]labels.Label, []RowRange) { + return lbls, rr +} + func NewMaterializer(s *schema.TSDBSchema, d *schema.PrometheusParquetChunksDecoder, block storage.ParquetShard, @@ -69,6 +79,7 @@ func NewMaterializer(s *schema.TSDBSchema, chunkBytesQuota *Quota, dataBytesQuota *Quota, materializeSeriesCallback MaterializedSeriesFunc, + materializeLabelsCallback MaterializedLabelsFunc, ) (*Materializer, error) { colIdx, ok := block.LabelsFile().Schema().Lookup(schema.ColIndexes) if !ok { @@ -97,12 +108,13 @@ func NewMaterializer(s *schema.TSDBSchema, chunkBytesQuota: chunkBytesQuota, dataBytesQuota: dataBytesQuota, materializedSeriesCallback: materializeSeriesCallback, + materializedLabelsCallback: materializeLabelsCallback, }, nil } // Materialize reconstructs the ChunkSeries that belong to the specified row ranges (rr). // It uses the row group index (rgi) and time bounds (mint, maxt) to filter and decode the series. -func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) ([]prom_storage.ChunkSeries, error) { +func (m *Materializer) Materialize(ctx context.Context, hints *prom_storage.SelectHints, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) ([]prom_storage.ChunkSeries, error) { if err := m.checkRowCountQuota(rr); err != nil { return nil, err } @@ -111,6 +123,12 @@ func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int6 return nil, errors.Wrapf(err, "error materializing labels") } + sLbls, rr = m.materializedLabelsCallback(ctx, hints, sLbls, rr) + // Series and rows might be filtered out so check again if we need to materialize chunks. + if len(sLbls) == 0 || len(rr) == 0 { + return nil, nil + } + results := make([]prom_storage.ChunkSeries, len(sLbls)) for i, s := range sLbls { results[i] = &concreteChunksSeries{ @@ -298,7 +316,7 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R func totalRows(rr []RowRange) int64 { res := int64(0) for _, r := range rr { - res += r.count + res += r.Count } return res } @@ -348,12 +366,12 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq for i := 0; i < cidx.NumPages(); i++ { pageRowRange := RowRange{ - from: oidx.FirstRowIndex(i), + From: oidx.FirstRowIndex(i), } - pageRowRange.count = group.NumRows() + pageRowRange.Count = group.NumRows() if i < oidx.NumPages()-1 { - pageRowRange.count = oidx.FirstRowIndex(i+1) - pageRowRange.from + pageRowRange.Count = oidx.FirstRowIndex(i+1) - pageRowRange.From } for _, r := range rr { @@ -372,7 +390,7 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq rMutex := &sync.Mutex{} for _, v := range pageRanges { for _, rs := range v.rows { - r[rs] = make([]parquet.Value, 0, rs.count) + r[rs] = make([]parquet.Value, 0, rs.Count) } } @@ -397,7 +415,7 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq return errors.Wrap(err, "failed to get pages") } defer func() { _ = pgs.Close() }() - err = pgs.SeekToRow(p.rows[0].from) + err = pgs.SeekToRow(p.rows[0].From) if err != nil { return errors.Wrap(err, "could not seek to row") } @@ -405,9 +423,9 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq vi := new(valuesIterator) remainingRr := p.rows currentRr := remainingRr[0] - next := currentRr.from - remaining := currentRr.count - currentRow := currentRr.from + next := currentRr.From + remaining := currentRr.Count + currentRow := currentRr.From remainingRr = remainingRr[1:] for len(remainingRr) > 0 || remaining > 0 { @@ -426,8 +444,8 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq next = next + 1 } else if len(remainingRr) > 0 { currentRr = remainingRr[0] - next = currentRr.from - remaining = currentRr.count + next = currentRr.From + remaining = currentRr.Count remainingRr = remainingRr[1:] } } @@ -449,7 +467,7 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq ranges := slices.Collect(maps.Keys(r)) slices.SortFunc(ranges, func(a, b RowRange) int { - return int(a.from - b.from) + return int(a.From - b.From) }) res := make([]parquet.Value, 0, totalRows(rr)) diff --git a/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go b/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go index 412bcdfeca0..9112a711b75 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go @@ -18,29 +18,29 @@ import ( ) type RowRange struct { - from int64 - count int64 + From int64 + Count int64 } func NewRowRange(from, count int64) *RowRange { return &RowRange{ - from: from, - count: count, + From: from, + Count: count, } } // Overlaps returns true if the receiver and the given RowRange share any overlapping rows. -// Both ranges are treated as half-open intervals: [from, from+count). +// Both ranges are treated as half-open intervals: [From, From+Count). func (rr RowRange) Overlaps(o RowRange) bool { - endA := rr.from + rr.count - endB := o.from + o.count - return rr.from < endB && o.from < endA + endA := rr.From + rr.Count + endB := o.From + o.Count + return rr.From < endB && o.From < endA } // Intersection returns the intersection of rr and o. Both are assumed to be overlapping func (rr RowRange) Intersection(o RowRange) RowRange { - os, oe := max(rr.from, o.from), min(rr.from+rr.count, o.from+o.count) - return RowRange{from: os, count: oe - os} + os, oe := max(rr.From, o.From), min(rr.From+rr.Count, o.From+o.Count) + return RowRange{From: os, Count: oe - os} } // intersect intersects the row ranges from left hand sight with the row ranges from rhs @@ -49,13 +49,13 @@ func (rr RowRange) Intersection(o RowRange) RowRange { func intersectRowRanges(lhs, rhs []RowRange) []RowRange { res := make([]RowRange, 0) for l, r := 0, 0; l < len(lhs) && r < len(rhs); { - al, bl := lhs[l].from, lhs[l].from+lhs[l].count - ar, br := rhs[r].from, rhs[r].from+rhs[r].count + al, bl := lhs[l].From, lhs[l].From+lhs[l].Count + ar, br := rhs[r].From, rhs[r].From+rhs[r].Count // check if rows intersect if al <= br && ar <= bl { os, oe := max(al, ar), min(bl, br) - res = append(res, RowRange{from: os, count: oe - os}) + res = append(res, RowRange{From: os, Count: oe - os}) } // advance the cursor of the range that ends first @@ -70,9 +70,9 @@ func intersectRowRanges(lhs, rhs []RowRange) []RowRange { // complementRowRanges returns the ranges that are in rhs but not in lhs. // For example, if you have: -// lhs: [{from: 1, count: 3}] // represents rows 1,2,3 -// rhs: [{from: 0, count: 5}] // represents rows 0,1,2,3,4 -// The complement would be [{from: 0, count: 1}, {from: 4, count: 1}] // represents rows 0,4 +// lhs: [{From: 1, Count: 3}] // represents rows 1,2,3 +// rhs: [{From: 0, Count: 5}] // represents rows 0,1,2,3,4 +// The complement would be [{From: 0, Count: 1}, {From: 4, Count: 1}] // represents rows 0,4 // because these are the rows in rhs that are not in lhs. // // The function assumes that lhs and rhs are simplified (no overlapping ranges) @@ -83,8 +83,8 @@ func complementRowRanges(lhs, rhs []RowRange) []RowRange { l, r := 0, 0 for l < len(lhs) && r < len(rhs) { - al, bl := lhs[l].from, lhs[l].from+lhs[l].count - ar, br := rhs[r].from, rhs[r].from+rhs[r].count + al, bl := lhs[l].From, lhs[l].From+lhs[l].Count + ar, br := rhs[r].From, rhs[r].From+rhs[r].Count // check if rows intersect switch { @@ -93,7 +93,7 @@ func complementRowRanges(lhs, rhs []RowRange) []RowRange { if bl <= br { l++ } else { - res = append(res, RowRange{from: ar, count: br - ar}) + res = append(res, RowRange{From: ar, Count: br - ar}) r++ } case al < ar && bl > br: @@ -102,20 +102,20 @@ func complementRowRanges(lhs, rhs []RowRange) []RowRange { case al < ar && bl <= br: // l covers r from left but has room on top oe := min(bl, br) - rhs[r].from += oe - ar - rhs[r].count -= oe - ar + rhs[r].From += oe - ar + rhs[r].Count -= oe - ar l++ case al >= ar && bl > br: // l covers r from right but has room on bottom os := max(al, ar) - res = append(res, RowRange{from: ar, count: os - ar}) + res = append(res, RowRange{From: ar, Count: os - ar}) r++ case al >= ar && bl <= br: // l is included r os, oe := max(al, ar), min(bl, br) - res = append(res, RowRange{from: rhs[r].from, count: os - rhs[r].from}) - rhs[r].from = oe - rhs[r].count = br - oe + res = append(res, RowRange{From: rhs[r].From, Count: os - rhs[r].From}) + rhs[r].From = oe + rhs[r].Count = br - oe l++ } } @@ -133,15 +133,15 @@ func simplify(rr []RowRange) []RowRange { } sort.Slice(rr, func(i, j int) bool { - return rr[i].from < rr[j].from + return rr[i].From < rr[j].From }) tmp := make([]RowRange, 0) l := rr[0] for i := 1; i < len(rr); i++ { r := rr[i] - al, bl := l.from, l.from+l.count - ar, br := r.from, r.from+r.count + al, bl := l.From, l.From+l.Count + ar, br := r.From, r.From+r.Count if bl < ar { tmp = append(tmp, l) l = r @@ -155,15 +155,15 @@ func simplify(rr []RowRange) []RowRange { } l = RowRange{ - from: from, - count: count, + From: from, + Count: count, } } tmp = append(tmp, l) res := make([]RowRange, 0, len(tmp)) for i := range tmp { - if tmp[i].count != 0 { + if tmp[i].Count != 0 { res = append(res, tmp[i]) } } diff --git a/vendor/modules.txt b/vendor/modules.txt index ca8316be582..59563d54d89 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -947,7 +947,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 +# github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 => github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable @@ -1988,3 +1988,4 @@ sigs.k8s.io/yaml/goyaml.v3 # gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497 # github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 # github.com/prometheus/prometheus => github.com/thanos-io/thanos-prometheus v0.0.0-20250610133519-082594458a88 +# github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee From 49fc05fc80b05c2bd3313c14dc198cc00f65333d Mon Sep 17 00:00:00 2001 From: yeya24 Date: Sun, 13 Jul 2025 23:10:47 -0700 Subject: [PATCH 2/5] remove fallback vertical sharding test Signed-off-by: yeya24 --- pkg/querier/parquet_queryable_test.go | 43 --------------------------- 1 file changed, 43 deletions(-) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 5563ff31256..687a8c55c10 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -77,49 +77,6 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { } ctx := user.InjectOrgID(context.Background(), "user-1") - t.Run("should fallback when vertical sharding is enabled", func(t *testing.T) { - finder := &blocksFinderMock{} - stores := createStore() - - q := &blocksStoreQuerier{ - minT: minT, - maxT: maxT, - finder: finder, - stores: stores, - consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), - logger: log.NewNopLogger(), - metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), - limits: &blocksStoreLimitsMock{}, - - storeGatewayConsistencyCheckMaxAttempts: 3, - } - - mParquetQuerier := &mockParquetQuerier{} - pq := &parquetQuerierWithFallback{ - minT: minT, - maxT: maxT, - finder: finder, - blocksStoreQuerier: q, - parquetQuerier: mParquetQuerier, - metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), - limits: defaultOverrides(t, 4), - logger: log.NewNopLogger(), - defaultBlockStoreType: parquetBlockStore, - } - - finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, - &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, - }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) - - t.Run("select", func(t *testing.T) { - ss := pq.Select(ctx, true, nil, matchers...) - require.NoError(t, ss.Err()) - require.Len(t, stores.queriedBlocks, 2) - require.Len(t, mParquetQuerier.queriedBlocks, 0) - }) - }) - t.Run("should fallback all blocks", func(t *testing.T) { finder := &blocksFinderMock{} stores := createStore() From 1d48784e59028186bca2441ae55d54a15aeabd0b Mon Sep 17 00:00:00 2001 From: yeya24 Date: Tue, 15 Jul 2025 09:24:39 -0700 Subject: [PATCH 3/5] update version Signed-off-by: yeya24 --- go.mod | 2 +- go.sum | 4 +- pkg/querier/parquet_queryable.go | 60 +--- pkg/querier/parquet_queryable_test.go | 283 ++++++------------ .../parquet-common/convert/convert.go | 31 +- .../queryable/parquet_queryable.go | 128 ++++++-- .../parquet-common/search/materialize.go | 206 ++++++++++--- vendor/modules.txt | 4 +- 8 files changed, 406 insertions(+), 312 deletions(-) diff --git a/go.mod b/go.mod index 5a2c6675433..0426644b633 100644 --- a/go.mod +++ b/go.mod @@ -326,4 +326,4 @@ replace github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20 // v3.3.1 with https://github.com/prometheus/prometheus/pull/16252. (same as thanos) replace github.com/prometheus/prometheus => github.com/thanos-io/thanos-prometheus v0.0.0-20250610133519-082594458a88 -replace github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee +replace github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250715161708-8c081ce88b68 diff --git a/go.sum b/go.sum index a0b5c316778..08bae4a2261 100644 --- a/go.sum +++ b/go.sum @@ -953,8 +953,8 @@ github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8 github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= -github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee h1:bjvKyYMZvukXL7/vqcL0jNxo5JNtSAZlQHAIWeFyzuc= -github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= +github.com/yeya24/parquet-common v0.0.0-20250715161708-8c081ce88b68 h1:QLTHdTqOXVgZEv+4T/IgvNbBrojt1UsulxuRtv069Mw= +github.com/yeya24/parquet-common v0.0.0-20250715161708-8c081ce88b68/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 98f8a127c28..520438c5414 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -154,7 +154,7 @@ func NewParquetQueryable( userID, _ := tenant.TenantID(ctx) return int64(limits.ParquetMaxFetchedDataBytes(userID)) }), - queryable.WithMaterializedLabelsCallback(materializedLabelsCallback), + queryable.WithMaterializedLabelsFilterCallback(materializedLabelsFilterCallback), queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error { queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx) lbls := make([][]cortexpb.LabelAdapter, 0, len(cs)) @@ -570,54 +570,24 @@ func (q *parquetQuerierWithFallback) incrementOpsMetric(method string, remaining } } -func materializedLabelsCallback(ctx context.Context, _ *storage.SelectHints, seriesLabels [][]labels.Label, rr []search.RowRange) ([][]labels.Label, []search.RowRange) { - shardMatcher, exists := extractShardMatcherFromContext(ctx) - if !exists || !shardMatcher.IsSharded() { - return seriesLabels, rr - } - - var filteredLabels [][]labels.Label - var filteredRowRanges []search.RowRange +type shardMatcherLabelsFilter struct { + shardMatcher *storepb.ShardMatcher +} - // Track which individual rows match the shard matcher - rowMatches := make([]bool, len(seriesLabels)) - for i, lbls := range seriesLabels { - rowMatches[i] = shardMatcher.MatchesLabels(labels.New(lbls...)) - if rowMatches[i] { - filteredLabels = append(filteredLabels, lbls) - } - } +func (f *shardMatcherLabelsFilter) Filter(lbls labels.Labels) bool { + return f.shardMatcher.MatchesLabels(lbls) +} - // Convert matching rows back into row ranges - currentRange := search.RowRange{} - inRange := false - - for i, matches := range rowMatches { - if matches { - if !inRange { - // Start a new range - currentRange.From = int64(i) - currentRange.Count = 1 - inRange = true - } else { - // Extend the current range - currentRange.Count++ - } - } else { - if inRange { - // End the current range - filteredRowRanges = append(filteredRowRanges, currentRange) - inRange = false - } - } - } +func (f *shardMatcherLabelsFilter) Close() { + f.shardMatcher.Close() +} - // Don't forget to add the last range if we're still in one - if inRange { - filteredRowRanges = append(filteredRowRanges, currentRange) +func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHints) (search.MaterializedLabelsFilter, bool) { + shardMatcher, exists := extractShardMatcherFromContext(ctx) + if !exists || !shardMatcher.IsSharded() { + return nil, false } - - return filteredLabels, filteredRowRanges + return &shardMatcherLabelsFilter{shardMatcher: shardMatcher}, true } type cacheInterface[T any] interface { diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 687a8c55c10..73f7c50af21 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -12,7 +12,6 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid/v2" "github.com/prometheus-community/parquet-common/convert" - "github.com/prometheus-community/parquet-common/search" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -556,201 +555,6 @@ func TestParquetQueryable_Limits(t *testing.T) { } } -func TestMaterializedLabelsCallback(t *testing.T) { - t.Parallel() - - // Helper function to create test labels - createLabels := func(name, value string) []labels.Label { - return []labels.Label{{Name: name, Value: value}} - } - - buf := sync.Pool{New: func() interface{} { - b := make([]byte, 0, 1024) - return &b - }} - // Helper function to create a proper shard matcher with buffer pool - createShardMatcher := func(labels []string, by bool, totalShards, shardIndex int64) *storepb.ShardMatcher { - si := &storepb.ShardInfo{ - ShardIndex: shardIndex, - TotalShards: totalShards, - By: by, - Labels: labels, - } - return si.Matcher(&buf) - } - - tests := map[string]struct { - ctx context.Context - seriesLabels [][]labels.Label - rowRanges []search.RowRange - expectedLabels [][]labels.Label - expectedRanges []search.RowRange - }{ - "no shard matcher in context - returns original data": { - ctx: context.Background(), - seriesLabels: [][]labels.Label{ - createLabels("__name__", "metric1"), - createLabels("__name__", "metric2"), - }, - rowRanges: []search.RowRange{ - {From: 0, Count: 2}, - }, - expectedLabels: [][]labels.Label{ - createLabels("__name__", "metric1"), - createLabels("__name__", "metric2"), - }, - expectedRanges: []search.RowRange{ - {From: 0, Count: 2}, - }, - }, - "shard matcher exists but not sharded - returns original data": { - ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher(nil, false, 0, 0)), - seriesLabels: [][]labels.Label{ - createLabels("__name__", "metric1"), - createLabels("__name__", "metric2"), - }, - rowRanges: []search.RowRange{ - {From: 0, Count: 2}, - }, - expectedLabels: [][]labels.Label{ - createLabels("__name__", "metric1"), - createLabels("__name__", "metric2"), - }, - expectedRanges: []search.RowRange{ - {From: 0, Count: 2}, - }, - }, - "shard matcher with shard by labels - filters correctly": { - ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher([]string{"__name__"}, true, 2, 0)), - seriesLabels: [][]labels.Label{ - createLabels("__name__", "metric1"), - createLabels("__name__", "metric2"), - createLabels("__name__", "metric3"), - createLabels("__name__", "metric4"), - }, - rowRanges: []search.RowRange{ - {From: 0, Count: 4}, - }, - expectedLabels: [][]labels.Label{ - createLabels("__name__", "metric1"), - createLabels("__name__", "metric2"), - createLabels("__name__", "metric4"), - }, - expectedRanges: []search.RowRange{ - {From: 0, Count: 2}, - {From: 3, Count: 1}, - }, - }, - "shard matcher with shard without labels - filters correctly": { - ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher([]string{"__name__"}, false, 2, 1)), - seriesLabels: [][]labels.Label{ - createLabels("__name__", "metric1"), - createLabels("__name__", "metric2"), - createLabels("__name__", "metric3"), - createLabels("__name__", "metric4"), - }, - rowRanges: []search.RowRange{ - {From: 0, Count: 4}, - }, - expectedLabels: [][]labels.Label{ - createLabels("__name__", "metric1"), - createLabels("__name__", "metric2"), - createLabels("__name__", "metric3"), - createLabels("__name__", "metric4"), - }, - expectedRanges: []search.RowRange{ - {From: 0, Count: 4}, - }, - }, - "shard matcher with multiple labels - filters correctly": { - ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher([]string{"__name__", "instance"}, true, 3, 1)), - seriesLabels: [][]labels.Label{ - {{Name: "__name__", Value: "metric1"}, {Name: "instance", Value: "host1"}}, - {{Name: "__name__", Value: "metric2"}, {Name: "instance", Value: "host2"}}, - {{Name: "__name__", Value: "metric3"}, {Name: "instance", Value: "host3"}}, - {{Name: "__name__", Value: "metric4"}, {Name: "instance", Value: "host4"}}, - {{Name: "__name__", Value: "metric5"}, {Name: "instance", Value: "host5"}}, - {{Name: "__name__", Value: "metric6"}, {Name: "instance", Value: "host6"}}, - }, - rowRanges: []search.RowRange{ - {From: 0, Count: 6}, - }, - expectedLabels: [][]labels.Label{ - {{Name: "__name__", Value: "metric2"}, {Name: "instance", Value: "host2"}}, - {{Name: "__name__", Value: "metric3"}, {Name: "instance", Value: "host3"}}, - }, - expectedRanges: []search.RowRange{ - {From: 1, Count: 2}, - }, - }, - "no matching rows - returns empty results": { - ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher([]string{"__name__"}, true, 2, 1)), - seriesLabels: [][]labels.Label{ - createLabels("__name__", "metric1"), - createLabels("__name__", "metric2"), - }, - rowRanges: []search.RowRange{ - {From: 0, Count: 2}, - }, - expectedLabels: [][]labels.Label(nil), - expectedRanges: []search.RowRange(nil), - }, - "all rows match - creates single range": { - ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher([]string{"__name__"}, true, 1, 0)), - seriesLabels: [][]labels.Label{ - createLabels("__name__", "metric1"), - createLabels("__name__", "metric2"), - createLabels("__name__", "metric3"), - }, - rowRanges: []search.RowRange{ - {From: 0, Count: 3}, - }, - expectedLabels: [][]labels.Label{ - createLabels("__name__", "metric1"), - createLabels("__name__", "metric2"), - createLabels("__name__", "metric3"), - }, - expectedRanges: []search.RowRange{ - {From: 0, Count: 3}, - }, - }, - "matching row at the end - creates range ending at last row": { - ctx: injectShardMatcherIntoContext(context.Background(), createShardMatcher([]string{"__name__"}, true, 2, 1)), - seriesLabels: [][]labels.Label{ - createLabels("__name__", "metric1"), - createLabels("__name__", "metric2"), - createLabels("__name__", "metric3"), - }, - rowRanges: []search.RowRange{ - {From: 0, Count: 3}, - }, - expectedLabels: [][]labels.Label{ - createLabels("__name__", "metric3"), - }, - expectedRanges: []search.RowRange{ - {From: 2, Count: 1}, - }, - }, - } - - for testName, testData := range tests { - testData := testData - t.Run(testName, func(t *testing.T) { - t.Parallel() - - // Clean up shard matcher after test - if shardMatcher, exists := extractShardMatcherFromContext(testData.ctx); exists { - defer shardMatcher.Close() - } - - resultLabels, resultRanges := materializedLabelsCallback(testData.ctx, nil, testData.seriesLabels, testData.rowRanges) - - require.Equal(t, testData.expectedLabels, resultLabels, "labels mismatch") - require.Equal(t, testData.expectedRanges, resultRanges, "row ranges mismatch") - }) - } -} - // convertBlockToParquet converts a TSDB block to parquet and uploads it to the bucket func convertBlockToParquet(t *testing.T, ctx context.Context, userBucketClient objstore.Bucket, blockID ulid.ULID, blockDir string) error { tsdbBlock, err := tsdb.OpenBlock(nil, blockDir, chunkenc.NewPool(), tsdb.DefaultPostingsDecoderFactory) @@ -825,3 +629,90 @@ func (m *mockParquetQuerier) Reset() { func (mockParquetQuerier) Close() error { return nil } + +func TestMaterializedLabelsFilterCallback(t *testing.T) { + tests := []struct { + name string + setupContext func() context.Context + expectedFilterReturned bool + expectedCallbackReturned bool + }{ + { + name: "no shard matcher in context", + setupContext: func() context.Context { + return context.Background() + }, + expectedFilterReturned: false, + expectedCallbackReturned: false, + }, + { + name: "shard matcher exists but is not sharded", + setupContext: func() context.Context { + // Create a ShardInfo with TotalShards = 0 (not sharded) + shardInfo := &storepb.ShardInfo{ + ShardIndex: 0, + TotalShards: 0, // Not sharded + By: true, + Labels: []string{"__name__"}, + } + + buffers := &sync.Pool{New: func() interface{} { + b := make([]byte, 0, 100) + return &b + }} + shardMatcher := shardInfo.Matcher(buffers) + + return injectShardMatcherIntoContext(context.Background(), shardMatcher) + }, + expectedFilterReturned: false, + expectedCallbackReturned: false, + }, + { + name: "shard matcher exists and is sharded", + setupContext: func() context.Context { + // Create a ShardInfo with TotalShards > 0 (sharded) + shardInfo := &storepb.ShardInfo{ + ShardIndex: 0, + TotalShards: 2, // Sharded + By: true, + Labels: []string{"__name__"}, + } + + buffers := &sync.Pool{New: func() interface{} { + b := make([]byte, 0, 100) + return &b + }} + shardMatcher := shardInfo.Matcher(buffers) + + return injectShardMatcherIntoContext(context.Background(), shardMatcher) + }, + expectedFilterReturned: true, + expectedCallbackReturned: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := tt.setupContext() + + filter, exists := materializedLabelsFilterCallback(ctx, nil) + + require.Equal(t, tt.expectedCallbackReturned, exists) + + if tt.expectedFilterReturned { + require.NotNil(t, filter) + + // Test that the filter can be used + testLabels := labels.FromStrings("__name__", "test_metric", "label1", "value1") + // We can't easily test the actual filtering logic without knowing the internal + // shard matching implementation, but we can at least verify the filter interface works + _ = filter.Filter(testLabels) + + // Cleanup + filter.Close() + } else { + require.Nil(t, filter) + } + }) + } +} diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go index 703ba862905..ed3c06ac778 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go @@ -247,7 +247,7 @@ func NewTsdbRowReader(ctx context.Context, mint, maxt, colDuration int64, blks [ return nil, fmt.Errorf("unable to get label names from block: %w", err) } - postings := sortedPostings(ctx, indexr, compareFunc, ops.sortedLabels...) + postings := sortedPostings(ctx, indexr, ops.sortedLabels...) seriesSet := tsdb.NewBlockChunkSeriesSet(blk.Meta().ULID, indexr, chunkr, tombsr, postings, mint, maxt, false) seriesSets = append(seriesSets, seriesSet) @@ -285,7 +285,7 @@ func (rr *TsdbRowReader) Schema() *schema.TSDBSchema { return rr.tsdbSchema } -func sortedPostings(ctx context.Context, indexr tsdb.IndexReader, compare func(a, b labels.Labels) int, sortedLabels ...string) index.Postings { +func sortedPostings(ctx context.Context, indexr tsdb.IndexReader, sortedLabels ...string) index.Postings { p := tsdb.AllSortedPostings(ctx, indexr) if len(sortedLabels) == 0 { @@ -294,25 +294,42 @@ func sortedPostings(ctx context.Context, indexr tsdb.IndexReader, compare func(a type s struct { ref storage.SeriesRef + idx int labels labels.Labels } series := make([]s, 0, 128) - lb := labels.NewScratchBuilder(10) + scratchBuilder := labels.NewScratchBuilder(10) + lb := labels.NewBuilder(labels.EmptyLabels()) + i := 0 for p.Next() { - lb.Reset() - err := indexr.Series(p.At(), &lb, nil) + scratchBuilder.Reset() + err := indexr.Series(p.At(), &scratchBuilder, nil) if err != nil { return index.ErrPostings(fmt.Errorf("expand series: %w", err)) } + lb.Reset(scratchBuilder.Labels()) - series = append(series, s{labels: lb.Labels(), ref: p.At()}) + series = append(series, s{labels: lb.Keep(sortedLabels...).Labels(), ref: p.At(), idx: i}) + i++ } if err := p.Err(); err != nil { return index.ErrPostings(fmt.Errorf("expand postings: %w", err)) } - slices.SortFunc(series, func(a, b s) int { return compare(a.labels, b.labels) }) + slices.SortFunc(series, func(a, b s) int { + for _, lb := range sortedLabels { + if c := strings.Compare(a.labels.Get(lb), b.labels.Get(lb)); c != 0 { + return c + } + } + if a.idx < b.idx { + return -1 + } else if a.idx > b.idx { + return 1 + } + return 0 + }) // Convert back to list. ep := make([]storage.SeriesRef, 0, len(series)) diff --git a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go index bf7f96f75a1..a674c96efc4 100644 --- a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go +++ b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go @@ -17,11 +17,15 @@ import ( "context" "runtime" "sort" + "strings" "sync" "github.com/prometheus/prometheus/model/labels" prom_storage "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "golang.org/x/sync/errgroup" "github.com/prometheus-community/parquet-common/convert" @@ -31,24 +35,26 @@ import ( "github.com/prometheus-community/parquet-common/util" ) +var tracer = otel.Tracer("parquet-common") + type ShardsFinderFunction func(ctx context.Context, mint, maxt int64) ([]storage.ParquetShard, error) type queryableOpts struct { - concurrency int - rowCountLimitFunc search.QuotaLimitFunc - chunkBytesLimitFunc search.QuotaLimitFunc - dataBytesLimitFunc search.QuotaLimitFunc - materializedSeriesCallback search.MaterializedSeriesFunc - materializedLabelsCallback search.MaterializedLabelsFunc + concurrency int + rowCountLimitFunc search.QuotaLimitFunc + chunkBytesLimitFunc search.QuotaLimitFunc + dataBytesLimitFunc search.QuotaLimitFunc + materializedSeriesCallback search.MaterializedSeriesFunc + materializedLabelsFilterCallback search.MaterializedLabelsFilterCallback } var DefaultQueryableOpts = queryableOpts{ - concurrency: runtime.GOMAXPROCS(0), - rowCountLimitFunc: search.NoopQuotaLimitFunc, - chunkBytesLimitFunc: search.NoopQuotaLimitFunc, - dataBytesLimitFunc: search.NoopQuotaLimitFunc, - materializedSeriesCallback: search.NoopMaterializedSeriesFunc, - materializedLabelsCallback: search.NoopMaterializedLabelsFunc, + concurrency: runtime.GOMAXPROCS(0), + rowCountLimitFunc: search.NoopQuotaLimitFunc, + chunkBytesLimitFunc: search.NoopQuotaLimitFunc, + dataBytesLimitFunc: search.NoopQuotaLimitFunc, + materializedSeriesCallback: search.NoopMaterializedSeriesFunc, + materializedLabelsFilterCallback: search.NoopMaterializedLabelsFilterCallback, } type QueryableOpts func(*queryableOpts) @@ -89,10 +95,11 @@ func WithMaterializedSeriesCallback(fn search.MaterializedSeriesFunc) QueryableO } } -// WithMaterializedLabelsCallback sets a callback function to process the series labels for filtering before materializing chunks. -func WithMaterializedLabelsCallback(fn search.MaterializedLabelsFunc) QueryableOpts { +// WithMaterializedLabelsFilterCallback sets a callback function to create a filter that can be used +// to filter series based on their labels before materializing chunks. +func WithMaterializedLabelsFilterCallback(cb search.MaterializedLabelsFilterCallback) QueryableOpts { return func(opts *queryableOpts) { - opts.materializedLabelsCallback = fn + opts.materializedLabelsFilterCallback = cb } } @@ -133,7 +140,26 @@ type parquetQuerier struct { opts *queryableOpts } -func (p parquetQuerier) LabelValues(ctx context.Context, name string, hints *prom_storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { +func (p parquetQuerier) LabelValues(ctx context.Context, name string, hints *prom_storage.LabelHints, matchers ...*labels.Matcher) (result []string, annotations annotations.Annotations, err error) { + ctx, span := tracer.Start(ctx, "parquetQuerier.LabelValues") + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.String("label_name", name), + attribute.Int64("mint", p.mint), + attribute.Int64("maxt", p.maxt), + attribute.String("matchers", matchersToString(matchers)), + ) + if hints != nil { + span.SetAttributes(attribute.Int("limit", hints.Limit)) + } + shards, err := p.queryableShards(ctx, p.mint, p.maxt) if err != nil { return nil, nil, err @@ -161,10 +187,27 @@ func (p parquetQuerier) LabelValues(ctx context.Context, name string, hints *pro return nil, nil, err } - return util.MergeUnsortedSlices(int(limit), resNameValues...), nil, nil + result = util.MergeUnsortedSlices(int(limit), resNameValues...) + span.SetAttributes(attribute.Int("result_count", len(result))) + return result, nil, nil } -func (p parquetQuerier) LabelNames(ctx context.Context, hints *prom_storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { +func (p parquetQuerier) LabelNames(ctx context.Context, hints *prom_storage.LabelHints, matchers ...*labels.Matcher) (result []string, annotations annotations.Annotations, err error) { + ctx, span := tracer.Start(ctx, "parquetQuerier.LabelNames") + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.Int64("mint", p.mint), + attribute.Int64("maxt", p.maxt), + attribute.String("matchers", matchersToString(matchers)), + ) + shards, err := p.queryableShards(ctx, p.mint, p.maxt) if err != nil { return nil, nil, err @@ -174,6 +217,7 @@ func (p parquetQuerier) LabelNames(ctx context.Context, hints *prom_storage.Labe if hints != nil { limit = int64(hints.Limit) + span.SetAttributes(attribute.Int("limit", hints.Limit)) } resNameSets := make([][]string, len(shards)) @@ -192,7 +236,9 @@ func (p parquetQuerier) LabelNames(ctx context.Context, hints *prom_storage.Labe return nil, nil, err } - return util.MergeUnsortedSlices(int(limit), resNameSets...), nil, nil + result = util.MergeUnsortedSlices(int(limit), resNameSets...) + span.SetAttributes(attribute.Int("result_count", len(result))) + return result, nil, nil } func (p parquetQuerier) Close() error { @@ -200,6 +246,30 @@ func (p parquetQuerier) Close() error { } func (p parquetQuerier) Select(ctx context.Context, sorted bool, sp *prom_storage.SelectHints, matchers ...*labels.Matcher) prom_storage.SeriesSet { + ctx, span := tracer.Start(ctx, "parquetQuerier.Select") + var err error + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.Bool("sorted", sorted), + attribute.Int64("mint", p.mint), + attribute.Int64("maxt", p.maxt), + attribute.String("matchers", matchersToString(matchers)), + ) + if sp != nil { + span.SetAttributes( + attribute.Int64("select_start", sp.Start), + attribute.Int64("select_end", sp.End), + attribute.String("select_func", sp.Func), + ) + } + shards, err := p.queryableShards(ctx, p.mint, p.maxt) if err != nil { return prom_storage.ErrSeriesSet(err) @@ -222,10 +292,15 @@ func (p parquetQuerier) Select(ctx context.Context, sorted bool, sp *prom_storag }) } - if err := errGroup.Wait(); err != nil { + if err = errGroup.Wait(); err != nil { return prom_storage.ErrSeriesSet(err) } + span.SetAttributes( + attribute.Int("shards_count", len(shards)), + attribute.Bool("skip_chunks", skipChunks), + ) + ss := convert.NewMergeChunkSeriesSet(seriesSet, labels.Compare, prom_storage.NewConcatenatingChunkSeriesMerger()) return convert.NewSeriesSetFromChunkSeriesSet(ss, skipChunks) @@ -261,7 +336,7 @@ func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schem if err != nil { return nil, err } - m, err := search.NewMaterializer(s, d, block, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback, opts.materializedLabelsCallback) + m, err := search.NewMaterializer(s, d, block, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback, opts.materializedLabelsFilterCallback) if err != nil { return nil, err } @@ -433,3 +508,14 @@ type byLabels []prom_storage.ChunkSeries func (b byLabels) Len() int { return len(b) } func (b byLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] } func (b byLabels) Less(i, j int) bool { return labels.Compare(b[i].Labels(), b[j].Labels()) < 0 } + +func matchersToString(matchers []*labels.Matcher) string { + if len(matchers) == 0 { + return "[]" + } + var matcherStrings []string + for _, m := range matchers { + matcherStrings = append(matcherStrings, m.String()) + } + return "[" + strings.Join(matcherStrings, ",") + "]" +} diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index b12d7546799..6884a2f69cc 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -27,6 +27,9 @@ import ( "github.com/prometheus/prometheus/model/labels" prom_storage "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "golang.org/x/sync/errgroup" "github.com/prometheus-community/parquet-common/schema" @@ -34,6 +37,8 @@ import ( "github.com/prometheus-community/parquet-common/util" ) +var tracer = otel.Tracer("parquet-common") + type Materializer struct { b storage.ParquetShard s *schema.TSDBSchema @@ -49,8 +54,8 @@ type Materializer struct { chunkBytesQuota *Quota dataBytesQuota *Quota - materializedSeriesCallback MaterializedSeriesFunc - materializedLabelsCallback MaterializedLabelsFunc + materializedSeriesCallback MaterializedSeriesFunc + materializedLabelsFilterCallback MaterializedLabelsFilterCallback } // MaterializedSeriesFunc is a callback function that can be used to add limiter or statistic logics for @@ -62,13 +67,23 @@ func NoopMaterializedSeriesFunc(_ context.Context, _ []prom_storage.ChunkSeries) return nil } -// MaterializedLabelsFunc is a callback function that processes materialized series labels before materializing chunks. -// This can be used to filter series. -type MaterializedLabelsFunc func(ctx context.Context, hints *prom_storage.SelectHints, series [][]labels.Label, rr []RowRange) ([][]labels.Label, []RowRange) +// MaterializedLabelsFilterCallback returns a filter and a boolean indicating if the filter is enabled or not. +// The filter is used to filter series based on their labels. +// The boolean if set to false then it means that the filter is a noop and we can take shortcut to include all series. +// Otherwise, the filter is used to filter series based on their labels. +type MaterializedLabelsFilterCallback func(ctx context.Context, hints *prom_storage.SelectHints) (MaterializedLabelsFilter, bool) + +// MaterializedLabelsFilter is a filter that can be used to filter series based on their labels. +type MaterializedLabelsFilter interface { + // Filter returns true if the labels should be included in the result. + Filter(ls labels.Labels) bool + // Close is used to close the filter and do some cleanup. + Close() +} -// NoopMaterializedLabelsFunc is a noop callback function that does nothing. -func NoopMaterializedLabelsFunc(_ context.Context, _ *prom_storage.SelectHints, lbls [][]labels.Label, rr []RowRange) ([][]labels.Label, []RowRange) { - return lbls, rr +// NoopMaterializedLabelsFilterCallback is a noop MaterializedLabelsFilterCallback function that filters nothing. +func NoopMaterializedLabelsFilterCallback(ctx context.Context, hints *prom_storage.SelectHints) (MaterializedLabelsFilter, bool) { + return nil, false } func NewMaterializer(s *schema.TSDBSchema, @@ -79,7 +94,7 @@ func NewMaterializer(s *schema.TSDBSchema, chunkBytesQuota *Quota, dataBytesQuota *Quota, materializeSeriesCallback MaterializedSeriesFunc, - materializeLabelsCallback MaterializedLabelsFunc, + materializeLabelsFilterCallback MaterializedLabelsFilterCallback, ) (*Materializer, error) { colIdx, ok := block.LabelsFile().Schema().Lookup(schema.ColIndexes) if !ok { @@ -97,24 +112,41 @@ func NewMaterializer(s *schema.TSDBSchema, } return &Materializer{ - s: s, - d: d, - b: block, - colIdx: colIdx.ColumnIndex, - concurrency: concurrency, - partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize), - dataColToIndex: dataColToIndex, - rowCountQuota: rowCountQuota, - chunkBytesQuota: chunkBytesQuota, - dataBytesQuota: dataBytesQuota, - materializedSeriesCallback: materializeSeriesCallback, - materializedLabelsCallback: materializeLabelsCallback, + s: s, + d: d, + b: block, + colIdx: colIdx.ColumnIndex, + concurrency: concurrency, + partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize), + dataColToIndex: dataColToIndex, + rowCountQuota: rowCountQuota, + chunkBytesQuota: chunkBytesQuota, + dataBytesQuota: dataBytesQuota, + materializedSeriesCallback: materializeSeriesCallback, + materializedLabelsFilterCallback: materializeLabelsFilterCallback, }, nil } // Materialize reconstructs the ChunkSeries that belong to the specified row ranges (rr). // It uses the row group index (rgi) and time bounds (mint, maxt) to filter and decode the series. -func (m *Materializer) Materialize(ctx context.Context, hints *prom_storage.SelectHints, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) ([]prom_storage.ChunkSeries, error) { +func (m *Materializer) Materialize(ctx context.Context, hints *prom_storage.SelectHints, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) (results []prom_storage.ChunkSeries, err error) { + ctx, span := tracer.Start(ctx, "Materializer.Materialize") + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.Int("row_group_index", rgi), + attribute.Int64("mint", mint), + attribute.Int64("maxt", maxt), + attribute.Bool("skip_chunks", skipChunks), + attribute.Int("row_ranges_count", len(rr)), + ) + if err := m.checkRowCountQuota(rr); err != nil { return nil, err } @@ -123,19 +155,7 @@ func (m *Materializer) Materialize(ctx context.Context, hints *prom_storage.Sele return nil, errors.Wrapf(err, "error materializing labels") } - sLbls, rr = m.materializedLabelsCallback(ctx, hints, sLbls, rr) - // Series and rows might be filtered out so check again if we need to materialize chunks. - if len(sLbls) == 0 || len(rr) == 0 { - return nil, nil - } - - results := make([]prom_storage.ChunkSeries, len(sLbls)) - for i, s := range sLbls { - results[i] = &concreteChunksSeries{ - lbls: labels.New(s...), - } - } - + results, rr = m.filterSeries(ctx, hints, sLbls, rr) if !skipChunks { chks, err := m.materializeChunks(ctx, rgi, mint, maxt, rr) if err != nil { @@ -155,9 +175,80 @@ func (m *Materializer) Materialize(ctx context.Context, hints *prom_storage.Sele if err := m.materializedSeriesCallback(ctx, results); err != nil { return nil, err } + + span.SetAttributes(attribute.Int("materialized_series_count", len(results))) return results, err } +func (m *Materializer) filterSeries(ctx context.Context, hints *prom_storage.SelectHints, sLbls [][]labels.Label, rr []RowRange) ([]prom_storage.ChunkSeries, []RowRange) { + results := make([]prom_storage.ChunkSeries, 0, len(sLbls)) + labelsFilter, ok := m.materializedLabelsFilterCallback(ctx, hints) + if !ok { + for _, s := range sLbls { + results = append(results, &concreteChunksSeries{ + lbls: labels.New(s...), + }) + } + return results, rr + } + + defer labelsFilter.Close() + + filteredRR := make([]RowRange, 0, len(rr)) + var currentRange RowRange + inRange := false + seriesIdx := 0 + + for _, rowRange := range rr { + for i := int64(0); i < rowRange.Count; i++ { + if seriesIdx < len(sLbls) { + actualRowID := rowRange.From + i + lbls := labels.New(sLbls[seriesIdx]...) + + if labelsFilter.Filter(lbls) { + results = append(results, &concreteChunksSeries{ + lbls: lbls, + }) + + // Handle row range collection + if !inRange { + // Start new range + currentRange = RowRange{ + From: actualRowID, + Count: 1, + } + inRange = true + } else if actualRowID == currentRange.From+currentRange.Count { + // Extend current range + currentRange.Count++ + } else { + // Save current range and start new range (non-contiguous) + filteredRR = append(filteredRR, currentRange) + currentRange = RowRange{ + From: actualRowID, + Count: 1, + } + } + } else { + // Save current range and reset when we hit a non-matching series + if inRange { + filteredRR = append(filteredRR, currentRange) + inRange = false + } + } + seriesIdx++ + } + } + } + + // Save the final range if we have one + if inRange { + filteredRR = append(filteredRR, currentRange) + } + + return results, filteredRR +} + func (m *Materializer) MaterializeAllLabelNames() []string { r := make([]string, 0, len(m.b.LabelsFile().Schema().Columns())) for _, c := range m.b.LabelsFile().Schema().Columns() { @@ -255,6 +346,21 @@ func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name strin } func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []RowRange) ([][]labels.Label, error) { + ctx, span := tracer.Start(ctx, "Materializer.materializeAllLabels") + var err error + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.Int("row_group_index", rgi), + attribute.Int("row_ranges_count", len(rr)), + ) + labelsRg := m.b.LabelsFile().RowGroups()[rgi] cc := labelsRg.ColumnChunks()[m.colIdx] colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) @@ -290,7 +396,7 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R }) } - if err := errGroup.Wait(); err != nil { + if err = errGroup.Wait(); err != nil { return nil, err } @@ -310,6 +416,7 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R } } + span.SetAttributes(attribute.Int("materialized_labels_count", len(results))) return results, nil } @@ -321,11 +428,33 @@ func totalRows(rr []RowRange) int64 { return res } -func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, maxt int64, rr []RowRange) ([][]chunks.Meta, error) { +func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, maxt int64, rr []RowRange) (r [][]chunks.Meta, err error) { + ctx, span := tracer.Start(ctx, "Materializer.materializeChunks") + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.Int("row_group_index", rgi), + attribute.Int64("mint", mint), + attribute.Int64("maxt", maxt), + attribute.Int("row_ranges_count", len(rr)), + ) + minDataCol := m.s.DataColumIdx(mint) maxDataCol := m.s.DataColumIdx(maxt) rg := m.b.ChunksFile().RowGroups()[rgi] - r := make([][]chunks.Meta, totalRows(rr)) + r = make([][]chunks.Meta, totalRows(rr)) + + span.SetAttributes( + attribute.Int("min_data_col", minDataCol), + attribute.Int("max_data_col", maxDataCol), + attribute.Int("total_rows", int(totalRows(rr))), + ) for i := minDataCol; i <= min(maxDataCol, len(m.dataColToIndex)-1); i++ { values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rgi, rg.ColumnChunks()[m.dataColToIndex[i]], rr, true) @@ -342,6 +471,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max } } + span.SetAttributes(attribute.Int("materialized_chunks_count", len(r))) return r, nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 59563d54d89..defedef34b2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -947,7 +947,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 => github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee +# github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 => github.com/yeya24/parquet-common v0.0.0-20250715161708-8c081ce88b68 ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable @@ -1988,4 +1988,4 @@ sigs.k8s.io/yaml/goyaml.v3 # gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497 # github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 # github.com/prometheus/prometheus => github.com/thanos-io/thanos-prometheus v0.0.0-20250610133519-082594458a88 -# github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee +# github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250715161708-8c081ce88b68 From 19ee8d4ec722e28c929f93be0116bceb235a969e Mon Sep 17 00:00:00 2001 From: yeya24 Date: Tue, 15 Jul 2025 10:44:16 -0700 Subject: [PATCH 4/5] switch to upstream repo Signed-off-by: yeya24 --- go.mod | 4 +- go.sum | 4 +- .../parquet-common/search/materialize.go | 62 +++++++++---------- vendor/modules.txt | 3 +- 4 files changed, 34 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index 0426644b633..6806480c611 100644 --- a/go.mod +++ b/go.mod @@ -83,7 +83,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 + github.com/prometheus-community/parquet-common v0.0.0-20250715173542-c18889e0405b github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.4 @@ -325,5 +325,3 @@ replace github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20 // v3.3.1 with https://github.com/prometheus/prometheus/pull/16252. (same as thanos) replace github.com/prometheus/prometheus => github.com/thanos-io/thanos-prometheus v0.0.0-20250610133519-082594458a88 - -replace github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250715161708-8c081ce88b68 diff --git a/go.sum b/go.sum index 08bae4a2261..1c0a0e0c149 100644 --- a/go.sum +++ b/go.sum @@ -814,6 +814,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= +github.com/prometheus-community/parquet-common v0.0.0-20250715173542-c18889e0405b h1:jmE6MioPea3LHReWqFS2989CXPSipGQGTI2R80gq1nE= +github.com/prometheus-community/parquet-common v0.0.0-20250715173542-c18889e0405b/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= @@ -953,8 +955,6 @@ github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8 github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= -github.com/yeya24/parquet-common v0.0.0-20250715161708-8c081ce88b68 h1:QLTHdTqOXVgZEv+4T/IgvNbBrojt1UsulxuRtv069Mw= -github.com/yeya24/parquet-common v0.0.0-20250715161708-8c081ce88b68/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index 6884a2f69cc..7de75834692 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -201,43 +201,41 @@ func (m *Materializer) filterSeries(ctx context.Context, hints *prom_storage.Sel for _, rowRange := range rr { for i := int64(0); i < rowRange.Count; i++ { - if seriesIdx < len(sLbls) { - actualRowID := rowRange.From + i - lbls := labels.New(sLbls[seriesIdx]...) - - if labelsFilter.Filter(lbls) { - results = append(results, &concreteChunksSeries{ - lbls: lbls, - }) - - // Handle row range collection - if !inRange { - // Start new range - currentRange = RowRange{ - From: actualRowID, - Count: 1, - } - inRange = true - } else if actualRowID == currentRange.From+currentRange.Count { - // Extend current range - currentRange.Count++ - } else { - // Save current range and start new range (non-contiguous) - filteredRR = append(filteredRR, currentRange) - currentRange = RowRange{ - From: actualRowID, - Count: 1, - } + actualRowID := rowRange.From + i + lbls := labels.New(sLbls[seriesIdx]...) + + if labelsFilter.Filter(lbls) { + results = append(results, &concreteChunksSeries{ + lbls: lbls, + }) + + // Handle row range collection + if !inRange { + // Start new range + currentRange = RowRange{ + From: actualRowID, + Count: 1, } + inRange = true + } else if actualRowID == currentRange.From+currentRange.Count { + // Extend current range + currentRange.Count++ } else { - // Save current range and reset when we hit a non-matching series - if inRange { - filteredRR = append(filteredRR, currentRange) - inRange = false + // Save current range and start new range (non-contiguous) + filteredRR = append(filteredRR, currentRange) + currentRange = RowRange{ + From: actualRowID, + Count: 1, } } - seriesIdx++ + } else { + // Save current range and reset when we hit a non-matching series + if inRange { + filteredRR = append(filteredRR, currentRange) + inRange = false + } } + seriesIdx++ } } diff --git a/vendor/modules.txt b/vendor/modules.txt index defedef34b2..03afb7978ec 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -947,7 +947,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 => github.com/yeya24/parquet-common v0.0.0-20250715161708-8c081ce88b68 +# github.com/prometheus-community/parquet-common v0.0.0-20250715173542-c18889e0405b ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable @@ -1988,4 +1988,3 @@ sigs.k8s.io/yaml/goyaml.v3 # gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497 # github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 # github.com/prometheus/prometheus => github.com/thanos-io/thanos-prometheus v0.0.0-20250610133519-082594458a88 -# github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250715161708-8c081ce88b68 From 3a665549164099126787ec484de3ec3a739505cc Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 17 Jul 2025 12:22:06 -0700 Subject: [PATCH 5/5] bump parquet version Signed-off-by: yeya24 --- go.mod | 2 +- go.sum | 4 ++-- .../prometheus-community/parquet-common/search/constraint.go | 2 +- vendor/modules.txt | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 6806480c611..2c38d26cbc1 100644 --- a/go.mod +++ b/go.mod @@ -83,7 +83,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20250715173542-c18889e0405b + github.com/prometheus-community/parquet-common v0.0.0-20250716185251-4cfa597e936c github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.4 diff --git a/go.sum b/go.sum index 1c0a0e0c149..d42729e5298 100644 --- a/go.sum +++ b/go.sum @@ -814,8 +814,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250715173542-c18889e0405b h1:jmE6MioPea3LHReWqFS2989CXPSipGQGTI2R80gq1nE= -github.com/prometheus-community/parquet-common v0.0.0-20250715173542-c18889e0405b/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= +github.com/prometheus-community/parquet-common v0.0.0-20250716185251-4cfa597e936c h1:yDtT3c2klcWJj6A0osq72qM8rd1ohtl/J3rHD3FHuNw= +github.com/prometheus-community/parquet-common v0.0.0-20250716185251-4cfa597e936c/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= diff --git a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go index 5736073b414..f77bcf4aa87 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go @@ -262,7 +262,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, // Did not find any pages if len(readPgs) == 0 { - return nil, nil + return intersectRowRanges(simplify(res), rr), nil } dictOff, dictSz := ec.f.DictionaryPageBounds(rgIdx, col.ColumnIndex) diff --git a/vendor/modules.txt b/vendor/modules.txt index 03afb7978ec..418e65ed0a3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -947,7 +947,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250715173542-c18889e0405b +# github.com/prometheus-community/parquet-common v0.0.0-20250716185251-4cfa597e936c ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable