Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/experiment/block/metadata/metadata_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
const (
LabelNameTenantDataset = "__tenant_dataset__"
LabelValueDatasetTSDBIndex = "dataset_tsdb_index"
LabelNameUnsymbolized = "__unsymbolized__"
)

type LabelBuilder struct {
Expand Down
23 changes: 4 additions & 19 deletions pkg/experiment/ingester/memdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
)

type FlushedHead struct {
Index []byte
Profiles []byte
Symbols []byte
HasUnsymbolizedProfiles bool
Meta struct {
Index []byte
Profiles []byte
Symbols []byte
Meta struct {
ProfileTypeNames []string
MinTimeNanos int64
MaxTimeNanos int64
Expand Down Expand Up @@ -154,8 +153,6 @@ func (h *Head) flush(ctx context.Context) (*FlushedHead, error) {
return res, nil
}

res.HasUnsymbolizedProfiles = HasUnsymbolizedProfiles(h.symbols.Symbols())

symbolsBuffer := bytes.NewBuffer(nil)
if err := symdb.WritePartition(h.symbols, symbolsBuffer); err != nil {
return nil, err
Expand All @@ -176,15 +173,3 @@ func (h *Head) flush(ctx context.Context) (*FlushedHead, error) {
}
return res, nil
}

// TODO: move into the symbolizer package when available
func HasUnsymbolizedProfiles(symbols *symdb.Symbols) bool {
locations := symbols.Locations
mappings := symbols.Mappings
for _, loc := range locations {
if !mappings[loc.MappingId].HasFunctions {
return true
}
}
return false
}
83 changes: 0 additions & 83 deletions pkg/experiment/ingester/memdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/grafana/pyroscope/pkg/og/convert/pprof/bench"
"github.com/grafana/pyroscope/pkg/phlaredb"
testutil2 "github.com/grafana/pyroscope/pkg/phlaredb/block/testutil"
"github.com/grafana/pyroscope/pkg/phlaredb/symdb"
"github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/pprof/testhelper"
)
Expand Down Expand Up @@ -673,88 +672,6 @@ func Test_HeadFlush_DuplicateLabels(t *testing.T) {
&typesv1.LabelPair{Name: "pod", Value: "not-my-pod"},
)
}

// TODO: move into the symbolizer package when available
func TestUnsymbolized(t *testing.T) {
testCases := []struct {
name string
profile *profilev1.Profile
expectUnsymbolized bool
}{
{
name: "fully symbolized profile",
profile: &profilev1.Profile{
StringTable: []string{"", "a"},
Function: []*profilev1.Function{
{Id: 4, Name: 1},
},
Mapping: []*profilev1.Mapping{
{Id: 239, HasFunctions: true},
},
Location: []*profilev1.Location{
{Id: 5, MappingId: 239, Line: []*profilev1.Line{{FunctionId: 4, Line: 1}}},
},
Sample: []*profilev1.Sample{
{LocationId: []uint64{5}, Value: []int64{1}},
},
},
expectUnsymbolized: false,
},
{
name: "mapping without functions",
profile: &profilev1.Profile{
StringTable: []string{"", "a"},
Function: []*profilev1.Function{
{Id: 4, Name: 1},
},
Mapping: []*profilev1.Mapping{
{Id: 239, HasFunctions: false},
},
Location: []*profilev1.Location{
{Id: 5, MappingId: 239, Line: []*profilev1.Line{{FunctionId: 4, Line: 1}}},
},
Sample: []*profilev1.Sample{
{LocationId: []uint64{5}, Value: []int64{1}},
},
},
expectUnsymbolized: true,
},
{
name: "multiple locations with mixed symbolization",
profile: &profilev1.Profile{
StringTable: []string{"", "a", "b"},
Function: []*profilev1.Function{
{Id: 4, Name: 1},
{Id: 5, Name: 2},
},
Mapping: []*profilev1.Mapping{
{Id: 239, HasFunctions: true},
{Id: 240, HasFunctions: false},
},
Location: []*profilev1.Location{
{Id: 5, MappingId: 239, Line: []*profilev1.Line{{FunctionId: 4, Line: 1}}},
{Id: 6, MappingId: 240, Line: nil},
},
Sample: []*profilev1.Sample{
{LocationId: []uint64{5, 6}, Value: []int64{1}},
},
},
expectUnsymbolized: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
symbols := symdb.NewPartitionWriter(0, &symdb.Config{
Version: symdb.FormatV3,
})
symbols.WriteProfileSymbols(tc.profile)
unsymbolized := HasUnsymbolizedProfiles(symbols.Symbols())
assert.Equal(t, tc.expectUnsymbolized, unsymbolized)
})
}
}

func BenchmarkHeadIngestProfiles(t *testing.B) {
var (
profilePaths = []string{
Expand Down
78 changes: 36 additions & 42 deletions pkg/experiment/ingester/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (sh *shard) flushSegment(ctx context.Context) {
if s.debuginfo.movedHeads > 0 {
_ = level.Debug(s.logger).Log("msg",
"writing segment block done",
"heads-count", len(s.datasets),
"heads-count", len(s.heads),
"heads-moved-count", s.debuginfo.movedHeads,
"inflight-duration", s.debuginfo.waitInflight,
"flush-heads-duration", s.debuginfo.flushHeadsDuration,
Expand Down Expand Up @@ -195,7 +195,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
s := &segment{
logger: log.With(sl, "segment-id", id.String()),
ulid: id,
datasets: make(map[datasetKey]*dataset),
heads: make(map[datasetKey]dataset),
sw: sw,
sh: sh,
shard: sk,
Expand All @@ -208,7 +208,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
func (s *segment) flush(ctx context.Context) (err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "segment.flush", opentracing.Tags{
"block_id": s.ulid.String(),
"datasets": len(s.datasets),
"datasets": len(s.heads),
"shard": s.shard,
})
defer span.Finish()
Expand Down Expand Up @@ -332,10 +332,6 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) *
lb.WithLabelSet(model.LabelNameServiceName, f.head.key.service, model.LabelNameProfileType, profileType)
}

if f.flushed.HasUnsymbolizedProfiles {
lb.WithLabelSet(model.LabelNameServiceName, f.head.key.service, metadata.LabelNameUnsymbolized, "true")
}

// Other optional labels:
// lb.WithLabelSet("label_name", "label_value", ...)
ds.Labels = lb.Build()
Expand All @@ -344,8 +340,8 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) *
}

func (s *segment) flushHeads(ctx context.Context) flushStream {
heads := maps.Values(s.datasets)
slices.SortFunc(heads, func(a, b *dataset) int {
heads := maps.Values(s.heads)
slices.SortFunc(heads, func(a, b dataset) int {
return a.key.compare(b.key)
})

Expand All @@ -360,15 +356,15 @@ func (s *segment) flushHeads(ctx context.Context) flushStream {
defer close(f.done)
flushed, err := s.flushHead(ctx, f.head)
if err != nil {
level.Error(s.logger).Log("msg", "failed to flush dataset", "err", err)
level.Error(s.logger).Log("msg", "failed to flush head", "err", err)
return
}
if flushed == nil {
level.Debug(s.logger).Log("msg", "skipping nil dataset")
level.Debug(s.logger).Log("msg", "skipping nil head")
return
}
if flushed.Meta.NumSamples == 0 {
level.Debug(s.logger).Log("msg", "skipping empty dataset")
level.Debug(s.logger).Log("msg", "skipping empty head")
return
}
f.flushed = flushed
Expand Down Expand Up @@ -399,24 +395,24 @@ func (s *flushStream) Next() bool {
return false
}

func (s *segment) flushHead(ctx context.Context, e *dataset) (*memdb.FlushedHead, error) {
func (s *segment) flushHead(ctx context.Context, e dataset) (*memdb.FlushedHead, error) {
th := time.Now()
flushed, err := e.head.Flush(ctx)
if err != nil {
s.sw.metrics.flushServiceHeadDuration.WithLabelValues(s.sshard, e.key.tenant).Observe(time.Since(th).Seconds())
s.sw.metrics.flushServiceHeadError.WithLabelValues(s.sshard, e.key.tenant).Inc()
return nil, fmt.Errorf("failed to flush dataset : %w", err)
return nil, fmt.Errorf("failed to flush head : %w", err)
}
s.sw.metrics.flushServiceHeadDuration.WithLabelValues(s.sshard, e.key.tenant).Observe(time.Since(th).Seconds())
level.Debug(s.logger).Log(
"msg", "flushed dataset",
"msg", "flushed head",
"tenant", e.key.tenant,
"service", e.key.service,
"profiles", flushed.Meta.NumProfiles,
"profiletypes", fmt.Sprintf("%v", flushed.Meta.ProfileTypeNames),
"mintime", flushed.Meta.MinTimeNanos,
"maxtime", flushed.Meta.MaxTimeNanos,
"dataset-flush-duration", time.Since(th).String(),
"head-flush-duration", time.Since(th).String(),
)
return flushed, nil
}
Expand All @@ -439,7 +435,7 @@ type dataset struct {
}

type headFlush struct {
head *dataset
head dataset
flushed *memdb.FlushedHead
// protects head
done chan struct{}
Expand All @@ -450,12 +446,10 @@ type segment struct {
shard shardKey
sshard string
inFlightProfiles sync.WaitGroup

mu sync.RWMutex
datasets map[datasetKey]*dataset

logger log.Logger
sw *segmentsWriter
heads map[datasetKey]dataset
headsLock sync.RWMutex
logger log.Logger
sw *segmentsWriter

// TODO(kolesnikovae): Revisit.
doneChan chan struct{}
Expand Down Expand Up @@ -499,12 +493,11 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la
tenant: tenantID,
service: model.Labels(labels).Get(model.LabelNameServiceName),
}
ds := s.datasetForIngest(k)
size := p.SizeVT()
rules := s.sw.limits.IngestionRelabelingRules(tenantID)
usage := s.sw.limits.DistributorUsageGroups(tenantID).GetUsageGroups(tenantID, labels)
appender := &sampleAppender{
dataset: ds,
head: s.headForIngest(k),
profile: p,
id: id,
annotations: annotations,
Expand All @@ -518,7 +511,7 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la

type sampleAppender struct {
id uuid.UUID
dataset *dataset
head *memdb.Head
profile *profilev1.Profile
exporter *pprofmodel.SampleExporter
annotations []*typesv1.ProfileAnnotation
Expand All @@ -528,7 +521,7 @@ type sampleAppender struct {
}

func (v *sampleAppender) VisitProfile(labels []*typesv1.LabelPair) {
v.dataset.head.Ingest(v.profile, v.id, labels, v.annotations)
v.head.Ingest(v.profile, v.id, labels, v.annotations)
}

func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples []*profilev1.Sample) {
Expand All @@ -537,36 +530,37 @@ func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples
}
var n profilev1.Profile
v.exporter.ExportSamples(&n, samples)
v.dataset.head.Ingest(v.profile, v.id, labels, v.annotations)
v.head.Ingest(&n, v.id, labels, v.annotations)
}

func (v *sampleAppender) Discarded(profiles, bytes int) {
v.discardedProfiles += profiles
v.discardedBytes += bytes
}

func (s *segment) datasetForIngest(k datasetKey) *dataset {
s.mu.RLock()
ds, ok := s.datasets[k]
s.mu.RUnlock()
func (s *segment) headForIngest(k datasetKey) *memdb.Head {
s.headsLock.RLock()
h, ok := s.heads[k]
s.headsLock.RUnlock()
if ok {
return ds
return h.head
}

s.mu.Lock()
defer s.mu.Unlock()
if ds, ok = s.datasets[k]; ok {
return ds
s.headsLock.Lock()
defer s.headsLock.Unlock()
h, ok = s.heads[k]
if ok {
return h.head
}

h := memdb.NewHead(s.sw.headMetrics)
ds = &dataset{
nh := memdb.NewHead(s.sw.headMetrics)

s.heads[k] = dataset{
key: k,
head: h,
head: nh,
}

s.datasets[k] = ds
return ds
return nh
}

func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, meta *metastorev1.BlockMeta, s *segment) error {
Expand Down
Loading