Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type Distributor struct {
aggregator *aggregator.MultiTenantAggregator[*pprof.ProfileMerge]
asyncRequests sync.WaitGroup
ingestionLimitsSampler *ingest_limits.Sampler
usageGroupEvaluator *validation.UsageGroupEvaluator

subservices *services.Manager
subservicesWatcher *services.FailureWatcher
Expand Down Expand Up @@ -191,6 +192,7 @@ func New(
}

d.ingestionLimitsSampler = ingest_limits.NewSampler(distributorsRing)
d.usageGroupEvaluator = validation.NewUsageGroupEvaluator(logger)

subservices = append(subservices, distributorsLifecycler, distributorsRing, d.aggregator, d.ingestionLimitsSampler)

Expand Down Expand Up @@ -325,7 +327,7 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push
for _, series := range req.Series {
profName := phlaremodel.Labels(series.Labels).Get(ProfileName)

groups := usageGroups.GetUsageGroups(tenantID, series.Labels)
groups := d.usageGroupEvaluator.GetMatch(tenantID, usageGroups, series.Labels)
if err := d.checkUsageGroupsIngestLimit(tenantID, groups.Names(), req); err != nil {
return nil, err
}
Expand Down Expand Up @@ -490,7 +492,7 @@ func (d *Distributor) aggregate(ctx context.Context, req *distributormodel.PushR
func (d *Distributor) sendRequestsToIngester(ctx context.Context, req *distributormodel.PushRequest) (resp *connect.Response[pushv1.PushResponse], err error) {
// Next we split profiles by labels and apply relabeling rules.
usageGroups := d.limits.DistributorUsageGroups(req.TenantID)
profileSeries, bytesRelabelDropped, profilesRelabelDropped := extractSampleSeries(req, req.TenantID, usageGroups, d.limits.IngestionRelabelingRules(req.TenantID))
profileSeries, bytesRelabelDropped, profilesRelabelDropped := d.extractSampleSeries(req, usageGroups)
validation.DiscardedBytes.WithLabelValues(string(validation.DroppedByRelabelRules), req.TenantID).Add(bytesRelabelDropped)
validation.DiscardedProfiles.WithLabelValues(string(validation.DroppedByRelabelRules), req.TenantID).Add(profilesRelabelDropped)

Expand All @@ -515,7 +517,7 @@ func (d *Distributor) sendRequestsToIngester(ctx context.Context, req *distribut
series.Labels = phlaremodel.Labels(series.Labels).InsertSorted(phlaremodel.LabelNameOrder, phlaremodel.LabelOrderEnforced)
}

groups := usageGroups.GetUsageGroups(req.TenantID, series.Labels)
groups := d.usageGroupEvaluator.GetMatch(req.TenantID, usageGroups, series.Labels)

if err = validation.ValidateLabels(d.limits, req.TenantID, series.Labels); err != nil {
validation.DiscardedProfiles.WithLabelValues(string(validation.ReasonOf(err)), req.TenantID).Add(float64(req.TotalProfiles))
Expand Down Expand Up @@ -883,16 +885,15 @@ func injectMappingVersions(series []*distributormodel.ProfileSeries) error {
return nil
}

func extractSampleSeries(
func (d *Distributor) extractSampleSeries(
req *distributormodel.PushRequest,
tenantID string,
usage *validation.UsageGroupConfig,
rules []*relabel.Config,
) (
result []*distributormodel.ProfileSeries,
bytesRelabelDropped float64,
profilesRelabelDropped float64,
) {
rules := d.limits.IngestionRelabelingRules(req.TenantID)
for _, series := range req.Series {
for _, p := range series.Samples {
v := &sampleSeriesVisitor{profile: p.Profile}
Expand All @@ -908,7 +909,7 @@ func extractSampleSeries(
result = append(result, v.series...)
bytesRelabelDropped += float64(v.discardedBytes)
profilesRelabelDropped += float64(v.discardedProfiles)
usage.GetUsageGroups(tenantID, series.Labels).
d.usageGroupEvaluator.GetMatch(req.TenantID, usage, series.Labels).
CountDiscardedBytes(string(validation.DroppedByRelabelRules), int64(v.discardedBytes))
}
}
Expand Down
33 changes: 29 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func Test_IngestLimits(t *testing.T) {
"group-2": "{service_name=\"svc2\"}",
})
require.NoError(t, err)
l.DistributorUsageGroups = &usageGroupCfg
l.DistributorUsageGroups = usageGroupCfg
tenantLimits["user-1"] = l
}),
verifyExpectations: func(err error, req *distributormodel.PushRequest, res *connect.Response[pushv1.PushResponse]) {
Expand Down Expand Up @@ -655,7 +655,7 @@ func Test_IngestLimits(t *testing.T) {
"group-1": "{service_name=\"svc\"}",
})
require.NoError(t, err)
l.DistributorUsageGroups = &usageGroupCfg
l.DistributorUsageGroups = usageGroupCfg
tenantLimits["user-1"] = l
}),
verifyExpectations: func(err error, req *distributormodel.PushRequest, res *connect.Response[pushv1.PushResponse]) {
Expand Down Expand Up @@ -698,11 +698,13 @@ func Test_SampleLabels(t *testing.T) {
expectBytesDropped float64
expectProfilesDropped float64
}
const dummyTenantID = "tenant1"

testCases := []testCase{
{
description: "no series labels, no sample labels",
pushReq: &distributormodel.PushRequest{
TenantID: dummyTenantID,
Series: []*distributormodel.ProfileSeries{
{
Samples: []*distributormodel.ProfileSample{
Expand Down Expand Up @@ -734,6 +736,7 @@ func Test_SampleLabels(t *testing.T) {
{
description: "has series labels, no sample labels",
pushReq: &distributormodel.PushRequest{
TenantID: dummyTenantID,
Series: []*distributormodel.ProfileSeries{
{
Labels: []*typesv1.LabelPair{
Expand Down Expand Up @@ -771,6 +774,7 @@ func Test_SampleLabels(t *testing.T) {
{
description: "no series labels, all samples have identical label set",
pushReq: &distributormodel.PushRequest{
TenantID: dummyTenantID,
Series: []*distributormodel.ProfileSeries{
{
Samples: []*distributormodel.ProfileSample{
Expand Down Expand Up @@ -811,6 +815,7 @@ func Test_SampleLabels(t *testing.T) {
{
description: "has series labels, all samples have identical label set",
pushReq: &distributormodel.PushRequest{
TenantID: dummyTenantID,
Series: []*distributormodel.ProfileSeries{
{
Labels: []*typesv1.LabelPair{
Expand Down Expand Up @@ -855,6 +860,7 @@ func Test_SampleLabels(t *testing.T) {
{
description: "has series labels, and the only sample label name overlaps with series label, creating overlapping groups",
pushReq: &distributormodel.PushRequest{
TenantID: dummyTenantID,
Series: []*distributormodel.ProfileSeries{
{
Labels: []*typesv1.LabelPair{
Expand Down Expand Up @@ -905,6 +911,7 @@ func Test_SampleLabels(t *testing.T) {
{
description: "has series labels, samples have distinct label sets",
pushReq: &distributormodel.PushRequest{
TenantID: dummyTenantID,
Series: []*distributormodel.ProfileSeries{
{
Labels: []*typesv1.LabelPair{
Expand Down Expand Up @@ -975,6 +982,7 @@ func Test_SampleLabels(t *testing.T) {
description: "has series labels that should be renamed to no longer include godeltaprof",
relabelRules: defaultRelabelConfigs,
pushReq: &distributormodel.PushRequest{
TenantID: dummyTenantID,
Series: []*distributormodel.ProfileSeries{
{
Labels: []*typesv1.LabelPair{
Expand Down Expand Up @@ -1021,6 +1029,7 @@ func Test_SampleLabels(t *testing.T) {
{Action: relabel.Drop, SourceLabels: []model.LabelName{"__name__", "span_name"}, Separator: "/", Regex: relabel.MustNewRegexp("unwanted/randomness")},
},
pushReq: &distributormodel.PushRequest{
TenantID: dummyTenantID,
Series: []*distributormodel.ProfileSeries{
{
Labels: []*typesv1.LabelPair{
Expand Down Expand Up @@ -1073,6 +1082,7 @@ func Test_SampleLabels(t *testing.T) {
{Action: relabel.Drop, Regex: relabel.MustNewRegexp(".*")},
},
pushReq: &distributormodel.PushRequest{
TenantID: dummyTenantID,
Series: []*distributormodel.ProfileSeries{
{
Labels: []*typesv1.LabelPair{
Expand Down Expand Up @@ -1108,6 +1118,7 @@ func Test_SampleLabels(t *testing.T) {
{Action: relabel.Replace, Regex: relabel.MustNewRegexp(".*"), Replacement: "", TargetLabel: "span_name"},
},
pushReq: &distributormodel.PushRequest{
TenantID: dummyTenantID,
Series: []*distributormodel.ProfileSeries{
{
Labels: []*typesv1.LabelPair{
Expand Down Expand Up @@ -1155,6 +1166,7 @@ func Test_SampleLabels(t *testing.T) {
{
description: "ensure only samples of same stacktraces get grouped",
pushReq: &distributormodel.PushRequest{
TenantID: dummyTenantID,
Series: []*distributormodel.ProfileSeries{
{
Labels: []*typesv1.LabelPair{
Expand Down Expand Up @@ -1266,10 +1278,23 @@ func Test_SampleLabels(t *testing.T) {
// reporting. Neither are validated by the tests, nor do they influence
// test behavior in any way.
ug := &validation.UsageGroupConfig{}
const dummyTenantID = "tenant1"

t.Run(tc.description, func(t *testing.T) {
series, actualBytesDropped, actualProfilesDropped := extractSampleSeries(tc.pushReq, dummyTenantID, ug, tc.relabelRules)
overrides := validation.MockOverrides(func(defaults *validation.Limits, tenantLimits map[string]*validation.Limits) {
l := validation.MockDefaultLimits()
l.IngestionRelabelingRules = tc.relabelRules
tenantLimits[dummyTenantID] = l
})
d, err := New(Config{
DistributorRing: ringConfig,
}, testhelper.NewMockRing([]ring.InstanceDesc{
{Addr: "foo"},
}, 3), &poolFactory{func(addr string) (client.PoolClient, error) {
return newFakeIngester(t, false), nil
}}, overrides, nil, log.NewLogfmtLogger(os.Stdout), nil)
require.NoError(t, err)

series, actualBytesDropped, actualProfilesDropped := d.extractSampleSeries(tc.pushReq, ug)
assert.Equal(t, tc.expectBytesDropped, actualBytesDropped)
assert.Equal(t, tc.expectProfilesDropped, actualProfilesDropped)
require.Len(t, series, len(tc.series))
Expand Down
12 changes: 7 additions & 5 deletions pkg/experiment/ingester/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
sshard: sshard,
doneChan: make(chan struct{}),
}
s.usageGroupEvaluator = validation.NewUsageGroupEvaluator(s.logger)
return s
}

Expand Down Expand Up @@ -462,8 +463,9 @@ type segment struct {
headsLock sync.RWMutex
heads map[datasetKey]dataset

logger log.Logger
sw *segmentsWriter
logger log.Logger
sw *segmentsWriter
usageGroupEvaluator *validation.UsageGroupEvaluator

// TODO(kolesnikovae): Revisit.
doneChan chan struct{}
Expand All @@ -483,7 +485,7 @@ type segment struct {
}

type segmentIngest interface {
ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair, annotations []*typesv1.ProfileAnnotation)
ingest(ctx context.Context, tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair, annotations []*typesv1.ProfileAnnotation)
}

type segmentWaitFlushed interface {
Expand All @@ -502,14 +504,14 @@ func (s *segment) waitFlushed(ctx context.Context) error {
}
}

func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair, annotations []*typesv1.ProfileAnnotation) {
func (s *segment) ingest(ctx context.Context, tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair, annotations []*typesv1.ProfileAnnotation) {
k := datasetKey{
tenant: tenantID,
service: model.Labels(labels).Get(model.LabelNameServiceName),
}
size := p.SizeVT()
rules := s.sw.limits.IngestionRelabelingRules(tenantID)
usage := s.sw.limits.DistributorUsageGroups(tenantID).GetUsageGroups(tenantID, labels)
usage := s.usageGroupEvaluator.GetMatch(tenantID, s.sw.limits.DistributorUsageGroups(tenantID), labels)
appender := &sampleAppender{
head: s.headForIngest(k),
profile: p,
Expand Down
10 changes: 5 additions & 5 deletions pkg/experiment/ingester/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestIngestWait(t *testing.T) {
t1 := time.Now()
awaiter := sw.ingest(0, func(head segmentIngest) {
p := cpuProfile(42, 480, "svc1", "foo", "bar")
head.ingest("t1", p.Profile, p.UUID, p.Labels, p.Annotations)
head.ingest(context.Background(), "t1", p.Profile, p.UUID, p.Labels, p.Annotations)
})
err := awaiter.waitFlushed(context.Background())
require.NoError(t, err)
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestBusyIngestLoop(t *testing.T) {
ts := workerno*1000000000 + len(profiles)
awaiter := sw.ingest(1, func(head segmentIngest) {
p := cpuProfile(42, ts, "svc1", "foo", "bar")
head.ingest("t1", p.CloneVT(), p.UUID, p.Labels, p.Annotations)
head.ingest(context.Background(), "t1", p.CloneVT(), p.UUID, p.Labels, p.Annotations)
profiles = append(profiles, p)
})
awaiters = append(awaiters, awaiter)
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestDLQFail(t *testing.T) {
ing := func(head segmentIngest) {
ts += 420
p := cpuProfile(42, ts, "svc1", "foo", "bar")
head.ingest("t1", p.Profile, p.UUID, p.Labels, p.Annotations)
head.ingest(context.Background(), "t1", p.Profile, p.UUID, p.Labels, p.Annotations)
}

awaiter1 := res.ingest(0, ing)
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestDatasetMinMaxTime(t *testing.T) {
}
_ = res.ingest(1, func(head segmentIngest) {
for _, p := range data {
head.ingest(p.tenant, p.profile.Profile, p.profile.UUID, p.profile.Labels, p.profile.Annotations)
head.ingest(context.Background(), p.tenant, p.profile.Profile, p.profile.UUID, p.profile.Labels, p.profile.Annotations)
}
})
defer res.stop()
Expand Down Expand Up @@ -727,7 +727,7 @@ func (sw *sw) ingestChunk(t *testing.T, chunk inputChunk, expectAwaitError bool)
defer wg.Done()
awaiter := sw.ingest(shardKey(it.shard), func(head segmentIngest) {
p := it.profile.CloneVT() // important to not rewrite original profile
head.ingest(it.tenant, p, it.profile.UUID, it.profile.Labels, it.profile.Annotations)
head.ingest(context.Background(), it.tenant, p, it.profile.UUID, it.profile.Labels, it.profile.Annotations)
})
err := awaiter.waitFlushed(context.Background())
if expectAwaitError {
Expand Down
2 changes: 1 addition & 1 deletion pkg/experiment/ingester/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (i *SegmentWriterService) Push(ctx context.Context, req *segmentwriterv1.Pu
}

wait := i.segmentWriter.ingest(shardKey(req.Shard), func(segment segmentIngest) {
segment.ingest(req.TenantId, p.Profile, id, req.Labels, req.Annotations)
segment.ingest(ctx, req.TenantId, p.Profile, id, req.Labels, req.Annotations)
})

flushStarted := time.Now()
Expand Down
9 changes: 6 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ type Ingester struct {
instances map[string]*instance
instancesMtx sync.RWMutex

limits Limits
reg prometheus.Registerer
limits Limits
reg prometheus.Registerer
usageGroupEvaluator *validation.UsageGroupEvaluator
}

type ingesterFlusherCompat struct {
Expand Down Expand Up @@ -107,6 +108,8 @@ func New(phlarectx context.Context, cfg Config, dbConfig phlaredb.Config, storag
return nil, err
}

i.usageGroupEvaluator = validation.NewUsageGroupEvaluator(i.logger)

i.lifecycler, err = ring.NewLifecycler(
cfg.LifecyclerConfig,
&ingesterFlusherCompat{i},
Expand Down Expand Up @@ -335,7 +338,7 @@ func (i *Ingester) Push(ctx context.Context, req *connect.Request[pushv1.PushReq
usageGroups := i.limits.DistributorUsageGroups(instance.tenantID)

for _, series := range req.Msg.Series {
groups := usageGroups.GetUsageGroups(instance.tenantID, series.Labels)
groups := i.usageGroupEvaluator.GetMatch(instance.tenantID, usageGroups, series.Labels)

for _, sample := range series.Samples {
err := pprof.FromBytes(sample.RawProfile, func(p *profilev1.Profile, size int) error {
Expand Down
Loading
Loading