diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d896bc1a69..f0e99cc8cd 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -101,6 +101,7 @@ type Distributor struct { aggregator *aggregator.MultiTenantAggregator[*pprof.ProfileMerge] asyncRequests sync.WaitGroup ingestionLimitsSampler *ingest_limits.Sampler + usageGroupEvaluator *validation.UsageGroupEvaluator subservices *services.Manager subservicesWatcher *services.FailureWatcher @@ -191,6 +192,7 @@ func New( } d.ingestionLimitsSampler = ingest_limits.NewSampler(distributorsRing) + d.usageGroupEvaluator = validation.NewUsageGroupEvaluator(logger) subservices = append(subservices, distributorsLifecycler, distributorsRing, d.aggregator, d.ingestionLimitsSampler) @@ -325,7 +327,7 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push for _, series := range req.Series { profName := phlaremodel.Labels(series.Labels).Get(ProfileName) - groups := usageGroups.GetUsageGroups(tenantID, series.Labels) + groups := d.usageGroupEvaluator.GetMatch(tenantID, usageGroups, series.Labels) if err := d.checkUsageGroupsIngestLimit(tenantID, groups.Names(), req); err != nil { return nil, err } @@ -490,7 +492,7 @@ func (d *Distributor) aggregate(ctx context.Context, req *distributormodel.PushR func (d *Distributor) sendRequestsToIngester(ctx context.Context, req *distributormodel.PushRequest) (resp *connect.Response[pushv1.PushResponse], err error) { // Next we split profiles by labels and apply relabeling rules. usageGroups := d.limits.DistributorUsageGroups(req.TenantID) - profileSeries, bytesRelabelDropped, profilesRelabelDropped := extractSampleSeries(req, req.TenantID, usageGroups, d.limits.IngestionRelabelingRules(req.TenantID)) + profileSeries, bytesRelabelDropped, profilesRelabelDropped := d.extractSampleSeries(req, usageGroups) validation.DiscardedBytes.WithLabelValues(string(validation.DroppedByRelabelRules), req.TenantID).Add(bytesRelabelDropped) validation.DiscardedProfiles.WithLabelValues(string(validation.DroppedByRelabelRules), req.TenantID).Add(profilesRelabelDropped) @@ -515,7 +517,7 @@ func (d *Distributor) sendRequestsToIngester(ctx context.Context, req *distribut series.Labels = phlaremodel.Labels(series.Labels).InsertSorted(phlaremodel.LabelNameOrder, phlaremodel.LabelOrderEnforced) } - groups := usageGroups.GetUsageGroups(req.TenantID, series.Labels) + groups := d.usageGroupEvaluator.GetMatch(req.TenantID, usageGroups, series.Labels) if err = validation.ValidateLabels(d.limits, req.TenantID, series.Labels); err != nil { validation.DiscardedProfiles.WithLabelValues(string(validation.ReasonOf(err)), req.TenantID).Add(float64(req.TotalProfiles)) @@ -883,16 +885,15 @@ func injectMappingVersions(series []*distributormodel.ProfileSeries) error { return nil } -func extractSampleSeries( +func (d *Distributor) extractSampleSeries( req *distributormodel.PushRequest, - tenantID string, usage *validation.UsageGroupConfig, - rules []*relabel.Config, ) ( result []*distributormodel.ProfileSeries, bytesRelabelDropped float64, profilesRelabelDropped float64, ) { + rules := d.limits.IngestionRelabelingRules(req.TenantID) for _, series := range req.Series { for _, p := range series.Samples { v := &sampleSeriesVisitor{profile: p.Profile} @@ -908,7 +909,7 @@ func extractSampleSeries( result = append(result, v.series...) bytesRelabelDropped += float64(v.discardedBytes) profilesRelabelDropped += float64(v.discardedProfiles) - usage.GetUsageGroups(tenantID, series.Labels). + d.usageGroupEvaluator.GetMatch(req.TenantID, usage, series.Labels). CountDiscardedBytes(string(validation.DroppedByRelabelRules), int64(v.discardedBytes)) } } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index a0f02c2603..91de5d9322 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -599,7 +599,7 @@ func Test_IngestLimits(t *testing.T) { "group-2": "{service_name=\"svc2\"}", }) require.NoError(t, err) - l.DistributorUsageGroups = &usageGroupCfg + l.DistributorUsageGroups = usageGroupCfg tenantLimits["user-1"] = l }), verifyExpectations: func(err error, req *distributormodel.PushRequest, res *connect.Response[pushv1.PushResponse]) { @@ -655,7 +655,7 @@ func Test_IngestLimits(t *testing.T) { "group-1": "{service_name=\"svc\"}", }) require.NoError(t, err) - l.DistributorUsageGroups = &usageGroupCfg + l.DistributorUsageGroups = usageGroupCfg tenantLimits["user-1"] = l }), verifyExpectations: func(err error, req *distributormodel.PushRequest, res *connect.Response[pushv1.PushResponse]) { @@ -698,11 +698,13 @@ func Test_SampleLabels(t *testing.T) { expectBytesDropped float64 expectProfilesDropped float64 } + const dummyTenantID = "tenant1" testCases := []testCase{ { description: "no series labels, no sample labels", pushReq: &distributormodel.PushRequest{ + TenantID: dummyTenantID, Series: []*distributormodel.ProfileSeries{ { Samples: []*distributormodel.ProfileSample{ @@ -734,6 +736,7 @@ func Test_SampleLabels(t *testing.T) { { description: "has series labels, no sample labels", pushReq: &distributormodel.PushRequest{ + TenantID: dummyTenantID, Series: []*distributormodel.ProfileSeries{ { Labels: []*typesv1.LabelPair{ @@ -771,6 +774,7 @@ func Test_SampleLabels(t *testing.T) { { description: "no series labels, all samples have identical label set", pushReq: &distributormodel.PushRequest{ + TenantID: dummyTenantID, Series: []*distributormodel.ProfileSeries{ { Samples: []*distributormodel.ProfileSample{ @@ -811,6 +815,7 @@ func Test_SampleLabels(t *testing.T) { { description: "has series labels, all samples have identical label set", pushReq: &distributormodel.PushRequest{ + TenantID: dummyTenantID, Series: []*distributormodel.ProfileSeries{ { Labels: []*typesv1.LabelPair{ @@ -855,6 +860,7 @@ func Test_SampleLabels(t *testing.T) { { description: "has series labels, and the only sample label name overlaps with series label, creating overlapping groups", pushReq: &distributormodel.PushRequest{ + TenantID: dummyTenantID, Series: []*distributormodel.ProfileSeries{ { Labels: []*typesv1.LabelPair{ @@ -905,6 +911,7 @@ func Test_SampleLabels(t *testing.T) { { description: "has series labels, samples have distinct label sets", pushReq: &distributormodel.PushRequest{ + TenantID: dummyTenantID, Series: []*distributormodel.ProfileSeries{ { Labels: []*typesv1.LabelPair{ @@ -975,6 +982,7 @@ func Test_SampleLabels(t *testing.T) { description: "has series labels that should be renamed to no longer include godeltaprof", relabelRules: defaultRelabelConfigs, pushReq: &distributormodel.PushRequest{ + TenantID: dummyTenantID, Series: []*distributormodel.ProfileSeries{ { Labels: []*typesv1.LabelPair{ @@ -1021,6 +1029,7 @@ func Test_SampleLabels(t *testing.T) { {Action: relabel.Drop, SourceLabels: []model.LabelName{"__name__", "span_name"}, Separator: "/", Regex: relabel.MustNewRegexp("unwanted/randomness")}, }, pushReq: &distributormodel.PushRequest{ + TenantID: dummyTenantID, Series: []*distributormodel.ProfileSeries{ { Labels: []*typesv1.LabelPair{ @@ -1073,6 +1082,7 @@ func Test_SampleLabels(t *testing.T) { {Action: relabel.Drop, Regex: relabel.MustNewRegexp(".*")}, }, pushReq: &distributormodel.PushRequest{ + TenantID: dummyTenantID, Series: []*distributormodel.ProfileSeries{ { Labels: []*typesv1.LabelPair{ @@ -1108,6 +1118,7 @@ func Test_SampleLabels(t *testing.T) { {Action: relabel.Replace, Regex: relabel.MustNewRegexp(".*"), Replacement: "", TargetLabel: "span_name"}, }, pushReq: &distributormodel.PushRequest{ + TenantID: dummyTenantID, Series: []*distributormodel.ProfileSeries{ { Labels: []*typesv1.LabelPair{ @@ -1155,6 +1166,7 @@ func Test_SampleLabels(t *testing.T) { { description: "ensure only samples of same stacktraces get grouped", pushReq: &distributormodel.PushRequest{ + TenantID: dummyTenantID, Series: []*distributormodel.ProfileSeries{ { Labels: []*typesv1.LabelPair{ @@ -1266,10 +1278,23 @@ func Test_SampleLabels(t *testing.T) { // reporting. Neither are validated by the tests, nor do they influence // test behavior in any way. ug := &validation.UsageGroupConfig{} - const dummyTenantID = "tenant1" t.Run(tc.description, func(t *testing.T) { - series, actualBytesDropped, actualProfilesDropped := extractSampleSeries(tc.pushReq, dummyTenantID, ug, tc.relabelRules) + overrides := validation.MockOverrides(func(defaults *validation.Limits, tenantLimits map[string]*validation.Limits) { + l := validation.MockDefaultLimits() + l.IngestionRelabelingRules = tc.relabelRules + tenantLimits[dummyTenantID] = l + }) + d, err := New(Config{ + DistributorRing: ringConfig, + }, testhelper.NewMockRing([]ring.InstanceDesc{ + {Addr: "foo"}, + }, 3), &poolFactory{func(addr string) (client.PoolClient, error) { + return newFakeIngester(t, false), nil + }}, overrides, nil, log.NewLogfmtLogger(os.Stdout), nil) + require.NoError(t, err) + + series, actualBytesDropped, actualProfilesDropped := d.extractSampleSeries(tc.pushReq, ug) assert.Equal(t, tc.expectBytesDropped, actualBytesDropped) assert.Equal(t, tc.expectProfilesDropped, actualProfilesDropped) require.Len(t, series, len(tc.series)) diff --git a/pkg/experiment/ingester/segment.go b/pkg/experiment/ingester/segment.go index ed008c76e6..a3288416f1 100644 --- a/pkg/experiment/ingester/segment.go +++ b/pkg/experiment/ingester/segment.go @@ -210,6 +210,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg sshard: sshard, doneChan: make(chan struct{}), } + s.usageGroupEvaluator = validation.NewUsageGroupEvaluator(s.logger) return s } @@ -462,8 +463,9 @@ type segment struct { headsLock sync.RWMutex heads map[datasetKey]dataset - logger log.Logger - sw *segmentsWriter + logger log.Logger + sw *segmentsWriter + usageGroupEvaluator *validation.UsageGroupEvaluator // TODO(kolesnikovae): Revisit. doneChan chan struct{} @@ -483,7 +485,7 @@ type segment struct { } type segmentIngest interface { - ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair, annotations []*typesv1.ProfileAnnotation) + ingest(ctx context.Context, tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair, annotations []*typesv1.ProfileAnnotation) } type segmentWaitFlushed interface { @@ -502,14 +504,14 @@ func (s *segment) waitFlushed(ctx context.Context) error { } } -func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair, annotations []*typesv1.ProfileAnnotation) { +func (s *segment) ingest(ctx context.Context, tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair, annotations []*typesv1.ProfileAnnotation) { k := datasetKey{ tenant: tenantID, service: model.Labels(labels).Get(model.LabelNameServiceName), } size := p.SizeVT() rules := s.sw.limits.IngestionRelabelingRules(tenantID) - usage := s.sw.limits.DistributorUsageGroups(tenantID).GetUsageGroups(tenantID, labels) + usage := s.usageGroupEvaluator.GetMatch(tenantID, s.sw.limits.DistributorUsageGroups(tenantID), labels) appender := &sampleAppender{ head: s.headForIngest(k), profile: p, diff --git a/pkg/experiment/ingester/segment_test.go b/pkg/experiment/ingester/segment_test.go index 145a945235..4192ac07a5 100644 --- a/pkg/experiment/ingester/segment_test.go +++ b/pkg/experiment/ingester/segment_test.go @@ -131,7 +131,7 @@ func TestIngestWait(t *testing.T) { t1 := time.Now() awaiter := sw.ingest(0, func(head segmentIngest) { p := cpuProfile(42, 480, "svc1", "foo", "bar") - head.ingest("t1", p.Profile, p.UUID, p.Labels, p.Annotations) + head.ingest(context.Background(), "t1", p.Profile, p.UUID, p.Labels, p.Annotations) }) err := awaiter.waitFlushed(context.Background()) require.NoError(t, err) @@ -200,7 +200,7 @@ func TestBusyIngestLoop(t *testing.T) { ts := workerno*1000000000 + len(profiles) awaiter := sw.ingest(1, func(head segmentIngest) { p := cpuProfile(42, ts, "svc1", "foo", "bar") - head.ingest("t1", p.CloneVT(), p.UUID, p.Labels, p.Annotations) + head.ingest(context.Background(), "t1", p.CloneVT(), p.UUID, p.Labels, p.Annotations) profiles = append(profiles, p) }) awaiters = append(awaiters, awaiter) @@ -250,7 +250,7 @@ func TestDLQFail(t *testing.T) { ing := func(head segmentIngest) { ts += 420 p := cpuProfile(42, ts, "svc1", "foo", "bar") - head.ingest("t1", p.Profile, p.UUID, p.Labels, p.Annotations) + head.ingest(context.Background(), "t1", p.Profile, p.UUID, p.Labels, p.Annotations) } awaiter1 := res.ingest(0, ing) @@ -298,7 +298,7 @@ func TestDatasetMinMaxTime(t *testing.T) { } _ = res.ingest(1, func(head segmentIngest) { for _, p := range data { - head.ingest(p.tenant, p.profile.Profile, p.profile.UUID, p.profile.Labels, p.profile.Annotations) + head.ingest(context.Background(), p.tenant, p.profile.Profile, p.profile.UUID, p.profile.Labels, p.profile.Annotations) } }) defer res.stop() @@ -727,7 +727,7 @@ func (sw *sw) ingestChunk(t *testing.T, chunk inputChunk, expectAwaitError bool) defer wg.Done() awaiter := sw.ingest(shardKey(it.shard), func(head segmentIngest) { p := it.profile.CloneVT() // important to not rewrite original profile - head.ingest(it.tenant, p, it.profile.UUID, it.profile.Labels, it.profile.Annotations) + head.ingest(context.Background(), it.tenant, p, it.profile.UUID, it.profile.Labels, it.profile.Annotations) }) err := awaiter.waitFlushed(context.Background()) if expectAwaitError { diff --git a/pkg/experiment/ingester/service.go b/pkg/experiment/ingester/service.go index 0e5db05b44..7191ad16a3 100644 --- a/pkg/experiment/ingester/service.go +++ b/pkg/experiment/ingester/service.go @@ -210,7 +210,7 @@ func (i *SegmentWriterService) Push(ctx context.Context, req *segmentwriterv1.Pu } wait := i.segmentWriter.ingest(shardKey(req.Shard), func(segment segmentIngest) { - segment.ingest(req.TenantId, p.Profile, id, req.Labels, req.Annotations) + segment.ingest(ctx, req.TenantId, p.Profile, id, req.Labels, req.Annotations) }) flushStarted := time.Now() diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 5245ddce2e..bc5f473358 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -68,8 +68,9 @@ type Ingester struct { instances map[string]*instance instancesMtx sync.RWMutex - limits Limits - reg prometheus.Registerer + limits Limits + reg prometheus.Registerer + usageGroupEvaluator *validation.UsageGroupEvaluator } type ingesterFlusherCompat struct { @@ -107,6 +108,8 @@ func New(phlarectx context.Context, cfg Config, dbConfig phlaredb.Config, storag return nil, err } + i.usageGroupEvaluator = validation.NewUsageGroupEvaluator(i.logger) + i.lifecycler, err = ring.NewLifecycler( cfg.LifecyclerConfig, &ingesterFlusherCompat{i}, @@ -335,7 +338,7 @@ func (i *Ingester) Push(ctx context.Context, req *connect.Request[pushv1.PushReq usageGroups := i.limits.DistributorUsageGroups(instance.tenantID) for _, series := range req.Msg.Series { - groups := usageGroups.GetUsageGroups(instance.tenantID, series.Labels) + groups := i.usageGroupEvaluator.GetMatch(instance.tenantID, usageGroups, series.Labels) for _, sample := range series.Samples { err := pprof.FromBytes(sample.RawProfile, func(p *profilev1.Profile, size int) error { diff --git a/pkg/validation/usage_groups.go b/pkg/validation/usage_groups.go index 5af2bf6e2b..cd1d4ff9e2 100644 --- a/pkg/validation/usage_groups.go +++ b/pkg/validation/usage_groups.go @@ -10,6 +10,8 @@ import ( "strings" "unicode/utf8" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" @@ -51,18 +53,66 @@ var ( ) ) +// templatePart represents a part of a parsed usage group name template +type templatePart struct { + isLiteral bool + value string // literal text or label name for placeholder +} + +// usageGroupEntry represents a single usage group configuration +type usageGroupEntry struct { + matchers []*labels.Matcher + // For static names, template is nil and name is used + name string + // For dynamic names, template contains the parsed template parts + template []templatePart +} + type UsageGroupConfig struct { config map[string][]*labels.Matcher + + parsedEntries []usageGroupEntry } -func (c *UsageGroupConfig) GetUsageGroups(tenantID string, lbls phlaremodel.Labels) UsageGroupMatch { +const dynamicLabelNamePrefix = "${labels." + +type UsageGroupEvaluator struct { + logger log.Logger +} + +func NewUsageGroupEvaluator(logger log.Logger) *UsageGroupEvaluator { + return &UsageGroupEvaluator{ + logger: logger, + } +} + +func (e *UsageGroupEvaluator) GetMatch(tenantID string, c *UsageGroupConfig, lbls phlaremodel.Labels) UsageGroupMatch { match := UsageGroupMatch{ tenantID: tenantID, + names: make([]string, 0, len(c.parsedEntries)), } - for name, matchers := range c.config { - if matchesAll(matchers, lbls) { - match.names = append(match.names, name) + for _, entry := range c.parsedEntries { + if c.matchesAll(entry.matchers, lbls) { + if entry.template != nil { + dynamicName, err := c.expandTemplate(entry.template, lbls) + if err != nil { + level.Warn(e.logger).Log( + "msg", "failed to expand usage group template, skipping usage group", + "err", err, + "usage_group", entry.name) + continue + } + if dynamicName == "" { + level.Warn(e.logger).Log( + "msg", "usage group template expanded to empty string, skipping usage group", + "usage_group", entry.name) + continue + } + match.names = append(match.names, dynamicName) + } else { + match.names = append(match.names, entry.name) + } } } @@ -78,10 +128,12 @@ func (c *UsageGroupConfig) UnmarshalYAML(value *yaml.Node) error { return fmt.Errorf("malformed usage group config: %w", err) } - *c, err = NewUsageGroupConfig(m) + entries, rawData, err := parseUsageGroupEntries(m) if err != nil { return err } + c.parsedEntries = entries + c.config = rawData return nil } @@ -92,10 +144,12 @@ func (c *UsageGroupConfig) UnmarshalJSON(bytes []byte) error { return fmt.Errorf("malformed usage group config: %w", err) } - *c, err = NewUsageGroupConfig(m) + entries, rawData, err := parseUsageGroupEntries(m) if err != nil { return err } + c.parsedEntries = entries + c.config = rawData return nil } @@ -130,38 +184,103 @@ func (m UsageGroupMatch) Names() []string { return m.names } -func NewUsageGroupConfig(m map[string]string) (UsageGroupConfig, error) { - if len(m) > maxUsageGroups { - return UsageGroupConfig{}, fmt.Errorf("maximum number of usage groups is %d, got %d", maxUsageGroups, len(m)) +func NewUsageGroupConfig(m map[string]string) (*UsageGroupConfig, error) { + entries, rawData, err := parseUsageGroupEntries(m) + if err != nil { + return nil, err + } + config := &UsageGroupConfig{ + parsedEntries: entries, + config: rawData, } + return config, nil +} - config := UsageGroupConfig{ - config: make(map[string][]*labels.Matcher), +func parseUsageGroupEntries(m map[string]string) ([]usageGroupEntry, map[string][]*labels.Matcher, error) { + if len(m) > maxUsageGroups { + return nil, nil, fmt.Errorf("maximum number of usage groups is %d, got %d", maxUsageGroups, len(m)) } + rawData := make(map[string][]*labels.Matcher) + entries := make([]usageGroupEntry, 0, len(m)) + for name, matchersText := range m { if !utf8.ValidString(name) { - return UsageGroupConfig{}, fmt.Errorf("usage group name %q is not valid UTF-8", name) + return nil, nil, fmt.Errorf("usage group name %q is not valid UTF-8", name) } name = strings.TrimSpace(name) if name == "" { - return UsageGroupConfig{}, fmt.Errorf("usage group name cannot be empty") + return nil, nil, fmt.Errorf("usage group name cannot be empty") } if name == noMatchName { - return UsageGroupConfig{}, fmt.Errorf("usage group name %q is reserved", noMatchName) + return nil, nil, fmt.Errorf("usage group name %q is reserved", noMatchName) } matchers, err := parser.ParseMetricSelector(matchersText) if err != nil { - return UsageGroupConfig{}, fmt.Errorf("failed to parse matchers for usage group %q: %w", name, err) + return nil, nil, fmt.Errorf("failed to parse matchers for usage group %q: %w", name, err) + } + + entry := usageGroupEntry{ + matchers: matchers, + name: name, + } + + if strings.Contains(name, dynamicLabelNamePrefix) { + template, err := parseTemplate(name) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse template for usage group %q: %w", name, err) + } + entry.template = template } - config.config[name] = matchers + entries = append(entries, entry) + rawData[name] = matchers } - return config, nil + return entries, rawData, nil +} + +// parseTemplate parses a usage group name template into parts +func parseTemplate(name string) ([]templatePart, error) { + var parts []templatePart + remaining := name + + for len(remaining) > 0 { + before, after, found := strings.Cut(remaining, dynamicLabelNamePrefix) + + // add literal part before placeholder (if any) + if len(before) > 0 { + parts = append(parts, templatePart{ + isLiteral: true, + value: before, + }) + } + + if !found { + break + } + + labelName, afterBrace, foundBrace := strings.Cut(after, "}") + if !foundBrace { + return nil, fmt.Errorf("unclosed placeholder") + } + + if labelName == "" { + return nil, fmt.Errorf("empty label name in placeholder") + } + + parts = append(parts, templatePart{ + isLiteral: false, + value: labelName, + }) + + remaining = afterBrace + } + + return parts, nil } func (o *Overrides) DistributorUsageGroups(tenantID string) *UsageGroupConfig { @@ -174,25 +293,41 @@ func (o *Overrides) DistributorUsageGroups(tenantID string) *UsageGroupConfig { return config } -func matchesAll(matchers []*labels.Matcher, lbls phlaremodel.Labels) bool { - if len(lbls) == 0 { +func (c *UsageGroupConfig) matchesAll(matchers []*labels.Matcher, lbls phlaremodel.Labels) bool { + if len(lbls) == 0 && len(matchers) > 0 { return false } for _, m := range matchers { - matched := false - for _, lbl := range lbls { - if lbl.Name == m.Name { - if !m.Matches(lbl.Value) { - return false - } - matched = true - break + if lbl, ok := lbls.GetLabel(m.Name); ok { + if !m.Matches(lbl.Value) { + return false } + continue } - if !matched { - return false - } + return false } return true } + +func (c *UsageGroupConfig) expandTemplate(template []templatePart, lbls phlaremodel.Labels) (string, error) { + var result strings.Builder + result.Grow(len(template) * 8) + + for _, part := range template { + if part.isLiteral { + result.WriteString(part.value) + } else { + value, found := lbls.GetLabel(part.value) + if !found { + return "", fmt.Errorf("label %q not found", part.value) + } + if value.Value == "" { + return "", fmt.Errorf("label %q is empty", part.value) + } + result.WriteString(value.Value) + } + } + + return result.String(), nil +} diff --git a/pkg/validation/usage_groups_bench_test.go b/pkg/validation/usage_groups_bench_test.go new file mode 100644 index 0000000000..eedab1a9c8 --- /dev/null +++ b/pkg/validation/usage_groups_bench_test.go @@ -0,0 +1,74 @@ +package validation + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/pyroscope/pkg/model" + "github.com/grafana/pyroscope/pkg/util" +) + +func BenchmarkUsageGroups_Regular(b *testing.B) { + config, err := NewUsageGroupConfig(map[string]string{ + "app/frontend": `{service_name=~"(.*)"}`, + "app/backend": `{team=~"(.*)"}`, + "app/database": `{environment=~"(.*)"}`, + "team/platform": `{service_name=~"(.*)", team=~"(.*)"}`, + "team/product": `{service_name=~"(.*)", team=~"(.*)", environment=~"(.*)"}`, + }) + require.NoError(b, err) + + l := model.Labels{ + {Name: "service_name", Value: "frontend"}, + {Name: "team", Value: "platform"}, + {Name: "environment", Value: "production"}, + } + evaluator := NewUsageGroupEvaluator(util.Logger) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = evaluator.GetMatch("tenant1", config, l) + } +} + +func BenchmarkUsageGroups_Dynamic(b *testing.B) { + config, err := NewUsageGroupConfig(map[string]string{ + "app/${labels.service_name}": `{service_name=~".*"}`, + "team/${labels.team}": `{team=~".*"}`, + "env/${labels.environment}": `{environment=~".*"}`, + "${labels.service_name}/${labels.team}": `{service_name=~".*", team=~".*"}`, + "complex/${labels.service_name}-${labels.team}-${labels.environment}": `{service_name=~".*", team=~".*", environment=~".*"}`, + }) + require.NoError(b, err) + + l := model.Labels{ + {Name: "service_name", Value: "frontend"}, + {Name: "team", Value: "platform"}, + {Name: "environment", Value: "production"}, + } + evaluator := NewUsageGroupEvaluator(util.Logger) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = evaluator.GetMatch("tenant1", config, l) + } +} + +func BenchmarkUsageGroups_ComplexRegex(b *testing.B) { + config, err := NewUsageGroupConfig(map[string]string{ + "complex/${labels.service_name}": `{service_name=~"[a-zA-Z]+-[0-9]+"}`, + "very-complex/${labels.service_name}": `{service_name=~"[a-zA-Z]+-[0-9]+\\.[a-z]{2,4}"}`, + }) + require.NoError(b, err) + + l := model.Labels{ + {Name: "service_name", Value: "frontend-123.prod"}, + } + evaluator := NewUsageGroupEvaluator(util.Logger) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = evaluator.GetMatch("tenant1", config, l) + } +} diff --git a/pkg/validation/usage_groups_test.go b/pkg/validation/usage_groups_test.go index b19ec2c249..b765fec281 100644 --- a/pkg/validation/usage_groups_test.go +++ b/pkg/validation/usage_groups_test.go @@ -13,23 +13,22 @@ import ( "gopkg.in/yaml.v3" phlaremodel "github.com/grafana/pyroscope/pkg/model" + "github.com/grafana/pyroscope/pkg/util" ) func TestUsageGroupConfig_GetUsageGroups(t *testing.T) { tests := []struct { Name string TenantID string - Config UsageGroupConfig + Config map[string]string Labels phlaremodel.Labels Want UsageGroupMatch }{ { Name: "single_usage_group_match", TenantID: "tenant1", - Config: UsageGroupConfig{ - config: map[string][]*labels.Matcher{ - "app/foo": testMustParseMatcher(t, `{service_name="foo"}`), - }, + Config: map[string]string{ + "app/foo": `{service_name="foo"}`, }, Labels: phlaremodel.Labels{ {Name: "service_name", Value: "foo"}, @@ -42,11 +41,9 @@ func TestUsageGroupConfig_GetUsageGroups(t *testing.T) { { Name: "multiple_usage_group_matches", TenantID: "tenant1", - Config: UsageGroupConfig{ - config: map[string][]*labels.Matcher{ - "app/foo": testMustParseMatcher(t, `{service_name="foo"}`), - "app/foo2": testMustParseMatcher(t, `{service_name="foo", namespace=~"bar.*"}`), - }, + Config: map[string]string{ + "app/foo": `{service_name="foo"}`, + "app/foo2": `{service_name="foo", namespace=~"bar.*"}`, }, Labels: phlaremodel.Labels{ {Name: "service_name", Value: "foo"}, @@ -63,25 +60,22 @@ func TestUsageGroupConfig_GetUsageGroups(t *testing.T) { { Name: "no_usage_group_matches", TenantID: "tenant1", - Config: UsageGroupConfig{ - config: map[string][]*labels.Matcher{ - "app/foo": testMustParseMatcher(t, `{service_name="notfound"}`), - }, + Config: map[string]string{ + "app/foo": `{service_name="notfound"}`, }, Labels: phlaremodel.Labels{ {Name: "service_name", Value: "foo"}, }, Want: UsageGroupMatch{ tenantID: "tenant1", + names: []string{}, }, }, { Name: "wildcard_matcher", TenantID: "tenant1", - Config: UsageGroupConfig{ - config: map[string][]*labels.Matcher{ - "app/foo": testMustParseMatcher(t, `{}`), - }, + Config: map[string]string{ + "app/foo": `{}`, }, Labels: phlaremodel.Labels{ {Name: "service_name", Value: "foo"}, @@ -94,36 +88,82 @@ func TestUsageGroupConfig_GetUsageGroups(t *testing.T) { { Name: "no_labels", TenantID: "tenant1", - Config: UsageGroupConfig{ - config: map[string][]*labels.Matcher{ - "app/foo": testMustParseMatcher(t, `{service_name="foo"}`), - }, + Config: map[string]string{ + "app/foo": `{service_name="foo"}`, }, Labels: phlaremodel.Labels{}, Want: UsageGroupMatch{ tenantID: "tenant1", + names: []string{}, }, }, { Name: "disjoint_labels_do_not_match", TenantID: "tenant1", - Config: UsageGroupConfig{ - config: map[string][]*labels.Matcher{ - "app/foo": testMustParseMatcher(t, `{namespace="foo", container="bar"}`), + Config: map[string]string{ + "app/foo": `{namespace="foo", container="bar"}`, + }, + Labels: phlaremodel.Labels{ + {Name: "service_name", Value: "foo"}, + }, + Want: UsageGroupMatch{ + tenantID: "tenant1", + names: []string{}, + }, + }, + { + Name: "dynamic_usage_group_names", + TenantID: "tenant1", + Config: map[string]string{ + "app/${labels.service_name}": `{service_name=~"(.*)"}`, + }, + Labels: phlaremodel.Labels{ + {Name: "service_name", Value: "foo"}, + }, + Want: UsageGroupMatch{ + tenantID: "tenant1", + names: []string{ + "app/foo", }, }, + }, + { + Name: "dynamic_usage_group_names_missing_label", + TenantID: "tenant1", + Config: map[string]string{ + "app/${labels.service_name}/${labels.env}": `{service_name=~"(.*)"}`, + }, Labels: phlaremodel.Labels{ {Name: "service_name", Value: "foo"}, }, Want: UsageGroupMatch{ tenantID: "tenant1", + names: []string{}, + }, + }, + { + Name: "dynamic_usage_group_names_empty_label", + TenantID: "tenant1", + Config: map[string]string{ + "app/${labels.service_name}": `{service_name=~"(.*)"}`, + }, + Labels: phlaremodel.Labels{ + {Name: "service_name", Value: ""}, + }, + Want: UsageGroupMatch{ + tenantID: "tenant1", + names: []string{}, }, }, } for _, tt := range tests { t.Run(tt.Name, func(t *testing.T) { - got := tt.Config.GetUsageGroups(tt.TenantID, tt.Labels) + config, err := NewUsageGroupConfig(tt.Config) + require.NoError(t, err) + + evaluator := NewUsageGroupEvaluator(util.Logger) + got := evaluator.GetMatch(tt.TenantID, config, tt.Labels) slices.Sort(got.names) slices.Sort(tt.Want.names) @@ -290,7 +330,7 @@ func TestNewUsageGroupConfig(t *testing.T) { tests := []struct { Name string ConfigMap map[string]string - Want UsageGroupConfig + Want *UsageGroupConfig WantErr string }{ { @@ -298,7 +338,7 @@ func TestNewUsageGroupConfig(t *testing.T) { ConfigMap: map[string]string{ "app/foo": `{service_name="foo"}`, }, - Want: UsageGroupConfig{ + Want: &UsageGroupConfig{ config: map[string][]*labels.Matcher{ "app/foo": testMustParseMatcher(t, `{service_name="foo"}`), }, @@ -310,7 +350,7 @@ func TestNewUsageGroupConfig(t *testing.T) { "app/foo": `{service_name="foo"}`, "app/foo2": `{service_name="foo", namespace=~"bar.*"}`, }, - Want: UsageGroupConfig{ + Want: &UsageGroupConfig{ config: map[string][]*labels.Matcher{ "app/foo": testMustParseMatcher(t, `{service_name="foo"}`), "app/foo2": testMustParseMatcher(t, `{service_name="foo", namespace=~"bar.*"}`), @@ -320,7 +360,7 @@ func TestNewUsageGroupConfig(t *testing.T) { { Name: "no_usage_groups", ConfigMap: map[string]string{}, - Want: UsageGroupConfig{ + Want: &UsageGroupConfig{ config: map[string][]*labels.Matcher{}, }, }, @@ -329,7 +369,7 @@ func TestNewUsageGroupConfig(t *testing.T) { ConfigMap: map[string]string{ "app/foo": `{}`, }, - Want: UsageGroupConfig{ + Want: &UsageGroupConfig{ config: map[string][]*labels.Matcher{ "app/foo": testMustParseMatcher(t, `{}`), }, @@ -372,7 +412,7 @@ func TestNewUsageGroupConfig(t *testing.T) { ConfigMap: map[string]string{ " app/foo ": `{service_name="foo"}`, }, - Want: UsageGroupConfig{ + Want: &UsageGroupConfig{ config: map[string][]*labels.Matcher{ "app/foo": testMustParseMatcher(t, `{service_name="foo"}`), }, @@ -415,7 +455,7 @@ func TestUsageGroupConfig_UnmarshalYAML(t *testing.T) { tests := []struct { Name string YAML string - Want UsageGroupConfig + Want *UsageGroupConfig WantErr string }{ { @@ -423,7 +463,7 @@ func TestUsageGroupConfig_UnmarshalYAML(t *testing.T) { YAML: ` usage_groups: app/foo: '{service_name="foo"}'`, - Want: UsageGroupConfig{ + Want: &UsageGroupConfig{ config: map[string][]*labels.Matcher{ "app/foo": testMustParseMatcher(t, `{service_name="foo"}`), }, @@ -435,7 +475,7 @@ usage_groups: usage_groups: app/foo: '{service_name="foo"}' app/foo2: '{service_name="foo", namespace=~"bar.*"}'`, - Want: UsageGroupConfig{ + Want: &UsageGroupConfig{ config: map[string][]*labels.Matcher{ "app/foo": testMustParseMatcher(t, `{service_name="foo"}`), "app/foo2": testMustParseMatcher(t, `{service_name="foo", namespace=~"bar.*"}`), @@ -446,7 +486,7 @@ usage_groups: Name: "empty_usage_groups", YAML: ` usage_groups: {}`, - Want: UsageGroupConfig{ + Want: &UsageGroupConfig{ config: map[string][]*labels.Matcher{}, }, }, @@ -467,7 +507,7 @@ usage_groups: YAML: ` some_other_config: foo: bar`, - Want: UsageGroupConfig{}, + Want: &UsageGroupConfig{}, }, } @@ -493,7 +533,7 @@ func TestUsageGroupConfig_UnmarshalJSON(t *testing.T) { tests := []struct { Name string JSON string - Want UsageGroupConfig + Want *UsageGroupConfig WantErr string }{ { @@ -503,7 +543,7 @@ func TestUsageGroupConfig_UnmarshalJSON(t *testing.T) { "app/foo": "{service_name=\"foo\"}" } }`, - Want: UsageGroupConfig{ + Want: &UsageGroupConfig{ config: map[string][]*labels.Matcher{ "app/foo": testMustParseMatcher(t, `{service_name="foo"}`), }, @@ -517,7 +557,7 @@ func TestUsageGroupConfig_UnmarshalJSON(t *testing.T) { "app/foo2": "{service_name=\"foo\", namespace=~\"bar.*\"}" } }`, - Want: UsageGroupConfig{ + Want: &UsageGroupConfig{ config: map[string][]*labels.Matcher{ "app/foo": testMustParseMatcher(t, `{service_name="foo"}`), "app/foo2": testMustParseMatcher(t, `{service_name="foo", namespace=~"bar.*"}`), @@ -527,7 +567,7 @@ func TestUsageGroupConfig_UnmarshalJSON(t *testing.T) { { Name: "empty_usage_groups", JSON: `{"usage_groups": {}}`, - Want: UsageGroupConfig{ + Want: &UsageGroupConfig{ config: map[string][]*labels.Matcher{}, }, }, @@ -544,7 +584,7 @@ func TestUsageGroupConfig_UnmarshalJSON(t *testing.T) { { Name: "missing_usage_groups_key_in_config", JSON: `{"some_other_key": {"foo": "bar"}}`, - Want: UsageGroupConfig{}, + Want: &UsageGroupConfig{}, }, }