diff --git a/pkg/experiment/block/metadata/metadata_labels.go b/pkg/experiment/block/metadata/metadata_labels.go index b8789f2a7d..eb1ca23b38 100644 --- a/pkg/experiment/block/metadata/metadata_labels.go +++ b/pkg/experiment/block/metadata/metadata_labels.go @@ -18,7 +18,6 @@ import ( const ( LabelNameTenantDataset = "__tenant_dataset__" LabelValueDatasetTSDBIndex = "dataset_tsdb_index" - LabelNameUnsymbolized = "__unsymbolized__" ) type LabelBuilder struct { diff --git a/pkg/experiment/ingester/memdb/head.go b/pkg/experiment/ingester/memdb/head.go index 44d8fbecd2..363be1c368 100644 --- a/pkg/experiment/ingester/memdb/head.go +++ b/pkg/experiment/ingester/memdb/head.go @@ -21,11 +21,10 @@ import ( ) type FlushedHead struct { - Index []byte - Profiles []byte - Symbols []byte - HasUnsymbolizedProfiles bool - Meta struct { + Index []byte + Profiles []byte + Symbols []byte + Meta struct { ProfileTypeNames []string MinTimeNanos int64 MaxTimeNanos int64 @@ -154,8 +153,6 @@ func (h *Head) flush(ctx context.Context) (*FlushedHead, error) { return res, nil } - res.HasUnsymbolizedProfiles = HasUnsymbolizedProfiles(h.symbols.Symbols()) - symbolsBuffer := bytes.NewBuffer(nil) if err := symdb.WritePartition(h.symbols, symbolsBuffer); err != nil { return nil, err @@ -176,15 +173,3 @@ func (h *Head) flush(ctx context.Context) (*FlushedHead, error) { } return res, nil } - -// TODO: move into the symbolizer package when available -func HasUnsymbolizedProfiles(symbols *symdb.Symbols) bool { - locations := symbols.Locations - mappings := symbols.Mappings - for _, loc := range locations { - if !mappings[loc.MappingId].HasFunctions { - return true - } - } - return false -} diff --git a/pkg/experiment/ingester/memdb/head_test.go b/pkg/experiment/ingester/memdb/head_test.go index a05a835fb8..3df825fb39 100644 --- a/pkg/experiment/ingester/memdb/head_test.go +++ b/pkg/experiment/ingester/memdb/head_test.go @@ -27,7 +27,6 @@ import ( "github.com/grafana/pyroscope/pkg/og/convert/pprof/bench" "github.com/grafana/pyroscope/pkg/phlaredb" testutil2 "github.com/grafana/pyroscope/pkg/phlaredb/block/testutil" - "github.com/grafana/pyroscope/pkg/phlaredb/symdb" "github.com/grafana/pyroscope/pkg/pprof" "github.com/grafana/pyroscope/pkg/pprof/testhelper" ) @@ -673,88 +672,6 @@ func Test_HeadFlush_DuplicateLabels(t *testing.T) { &typesv1.LabelPair{Name: "pod", Value: "not-my-pod"}, ) } - -// TODO: move into the symbolizer package when available -func TestUnsymbolized(t *testing.T) { - testCases := []struct { - name string - profile *profilev1.Profile - expectUnsymbolized bool - }{ - { - name: "fully symbolized profile", - profile: &profilev1.Profile{ - StringTable: []string{"", "a"}, - Function: []*profilev1.Function{ - {Id: 4, Name: 1}, - }, - Mapping: []*profilev1.Mapping{ - {Id: 239, HasFunctions: true}, - }, - Location: []*profilev1.Location{ - {Id: 5, MappingId: 239, Line: []*profilev1.Line{{FunctionId: 4, Line: 1}}}, - }, - Sample: []*profilev1.Sample{ - {LocationId: []uint64{5}, Value: []int64{1}}, - }, - }, - expectUnsymbolized: false, - }, - { - name: "mapping without functions", - profile: &profilev1.Profile{ - StringTable: []string{"", "a"}, - Function: []*profilev1.Function{ - {Id: 4, Name: 1}, - }, - Mapping: []*profilev1.Mapping{ - {Id: 239, HasFunctions: false}, - }, - Location: []*profilev1.Location{ - {Id: 5, MappingId: 239, Line: []*profilev1.Line{{FunctionId: 4, Line: 1}}}, - }, - Sample: []*profilev1.Sample{ - {LocationId: []uint64{5}, Value: []int64{1}}, - }, - }, - expectUnsymbolized: true, - }, - { - name: "multiple locations with mixed symbolization", - profile: &profilev1.Profile{ - StringTable: []string{"", "a", "b"}, - Function: []*profilev1.Function{ - {Id: 4, Name: 1}, - {Id: 5, Name: 2}, - }, - Mapping: []*profilev1.Mapping{ - {Id: 239, HasFunctions: true}, - {Id: 240, HasFunctions: false}, - }, - Location: []*profilev1.Location{ - {Id: 5, MappingId: 239, Line: []*profilev1.Line{{FunctionId: 4, Line: 1}}}, - {Id: 6, MappingId: 240, Line: nil}, - }, - Sample: []*profilev1.Sample{ - {LocationId: []uint64{5, 6}, Value: []int64{1}}, - }, - }, - expectUnsymbolized: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - symbols := symdb.NewPartitionWriter(0, &symdb.Config{ - Version: symdb.FormatV3, - }) - symbols.WriteProfileSymbols(tc.profile) - unsymbolized := HasUnsymbolizedProfiles(symbols.Symbols()) - assert.Equal(t, tc.expectUnsymbolized, unsymbolized) - }) - } -} - func BenchmarkHeadIngestProfiles(t *testing.B) { var ( profilePaths = []string{ diff --git a/pkg/experiment/ingester/segment.go b/pkg/experiment/ingester/segment.go index 217cfc63bc..355fcdcfd5 100644 --- a/pkg/experiment/ingester/segment.go +++ b/pkg/experiment/ingester/segment.go @@ -108,7 +108,7 @@ func (sh *shard) flushSegment(ctx context.Context) { if s.debuginfo.movedHeads > 0 { _ = level.Debug(s.logger).Log("msg", "writing segment block done", - "heads-count", len(s.datasets), + "heads-count", len(s.heads), "heads-moved-count", s.debuginfo.movedHeads, "inflight-duration", s.debuginfo.waitInflight, "flush-heads-duration", s.debuginfo.flushHeadsDuration, @@ -195,7 +195,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg s := &segment{ logger: log.With(sl, "segment-id", id.String()), ulid: id, - datasets: make(map[datasetKey]*dataset), + heads: make(map[datasetKey]dataset), sw: sw, sh: sh, shard: sk, @@ -208,7 +208,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg func (s *segment) flush(ctx context.Context) (err error) { span, ctx := opentracing.StartSpanFromContext(ctx, "segment.flush", opentracing.Tags{ "block_id": s.ulid.String(), - "datasets": len(s.datasets), + "datasets": len(s.heads), "shard": s.shard, }) defer span.Finish() @@ -332,10 +332,6 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) * lb.WithLabelSet(model.LabelNameServiceName, f.head.key.service, model.LabelNameProfileType, profileType) } - if f.flushed.HasUnsymbolizedProfiles { - lb.WithLabelSet(model.LabelNameServiceName, f.head.key.service, metadata.LabelNameUnsymbolized, "true") - } - // Other optional labels: // lb.WithLabelSet("label_name", "label_value", ...) ds.Labels = lb.Build() @@ -344,8 +340,8 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) * } func (s *segment) flushHeads(ctx context.Context) flushStream { - heads := maps.Values(s.datasets) - slices.SortFunc(heads, func(a, b *dataset) int { + heads := maps.Values(s.heads) + slices.SortFunc(heads, func(a, b dataset) int { return a.key.compare(b.key) }) @@ -360,15 +356,15 @@ func (s *segment) flushHeads(ctx context.Context) flushStream { defer close(f.done) flushed, err := s.flushHead(ctx, f.head) if err != nil { - level.Error(s.logger).Log("msg", "failed to flush dataset", "err", err) + level.Error(s.logger).Log("msg", "failed to flush head", "err", err) return } if flushed == nil { - level.Debug(s.logger).Log("msg", "skipping nil dataset") + level.Debug(s.logger).Log("msg", "skipping nil head") return } if flushed.Meta.NumSamples == 0 { - level.Debug(s.logger).Log("msg", "skipping empty dataset") + level.Debug(s.logger).Log("msg", "skipping empty head") return } f.flushed = flushed @@ -399,24 +395,24 @@ func (s *flushStream) Next() bool { return false } -func (s *segment) flushHead(ctx context.Context, e *dataset) (*memdb.FlushedHead, error) { +func (s *segment) flushHead(ctx context.Context, e dataset) (*memdb.FlushedHead, error) { th := time.Now() flushed, err := e.head.Flush(ctx) if err != nil { s.sw.metrics.flushServiceHeadDuration.WithLabelValues(s.sshard, e.key.tenant).Observe(time.Since(th).Seconds()) s.sw.metrics.flushServiceHeadError.WithLabelValues(s.sshard, e.key.tenant).Inc() - return nil, fmt.Errorf("failed to flush dataset : %w", err) + return nil, fmt.Errorf("failed to flush head : %w", err) } s.sw.metrics.flushServiceHeadDuration.WithLabelValues(s.sshard, e.key.tenant).Observe(time.Since(th).Seconds()) level.Debug(s.logger).Log( - "msg", "flushed dataset", + "msg", "flushed head", "tenant", e.key.tenant, "service", e.key.service, "profiles", flushed.Meta.NumProfiles, "profiletypes", fmt.Sprintf("%v", flushed.Meta.ProfileTypeNames), "mintime", flushed.Meta.MinTimeNanos, "maxtime", flushed.Meta.MaxTimeNanos, - "dataset-flush-duration", time.Since(th).String(), + "head-flush-duration", time.Since(th).String(), ) return flushed, nil } @@ -439,7 +435,7 @@ type dataset struct { } type headFlush struct { - head *dataset + head dataset flushed *memdb.FlushedHead // protects head done chan struct{} @@ -450,12 +446,10 @@ type segment struct { shard shardKey sshard string inFlightProfiles sync.WaitGroup - - mu sync.RWMutex - datasets map[datasetKey]*dataset - - logger log.Logger - sw *segmentsWriter + heads map[datasetKey]dataset + headsLock sync.RWMutex + logger log.Logger + sw *segmentsWriter // TODO(kolesnikovae): Revisit. doneChan chan struct{} @@ -499,12 +493,11 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la tenant: tenantID, service: model.Labels(labels).Get(model.LabelNameServiceName), } - ds := s.datasetForIngest(k) size := p.SizeVT() rules := s.sw.limits.IngestionRelabelingRules(tenantID) usage := s.sw.limits.DistributorUsageGroups(tenantID).GetUsageGroups(tenantID, labels) appender := &sampleAppender{ - dataset: ds, + head: s.headForIngest(k), profile: p, id: id, annotations: annotations, @@ -518,7 +511,7 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la type sampleAppender struct { id uuid.UUID - dataset *dataset + head *memdb.Head profile *profilev1.Profile exporter *pprofmodel.SampleExporter annotations []*typesv1.ProfileAnnotation @@ -528,7 +521,7 @@ type sampleAppender struct { } func (v *sampleAppender) VisitProfile(labels []*typesv1.LabelPair) { - v.dataset.head.Ingest(v.profile, v.id, labels, v.annotations) + v.head.Ingest(v.profile, v.id, labels, v.annotations) } func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples []*profilev1.Sample) { @@ -537,7 +530,7 @@ func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples } var n profilev1.Profile v.exporter.ExportSamples(&n, samples) - v.dataset.head.Ingest(v.profile, v.id, labels, v.annotations) + v.head.Ingest(&n, v.id, labels, v.annotations) } func (v *sampleAppender) Discarded(profiles, bytes int) { @@ -545,28 +538,29 @@ func (v *sampleAppender) Discarded(profiles, bytes int) { v.discardedBytes += bytes } -func (s *segment) datasetForIngest(k datasetKey) *dataset { - s.mu.RLock() - ds, ok := s.datasets[k] - s.mu.RUnlock() +func (s *segment) headForIngest(k datasetKey) *memdb.Head { + s.headsLock.RLock() + h, ok := s.heads[k] + s.headsLock.RUnlock() if ok { - return ds + return h.head } - s.mu.Lock() - defer s.mu.Unlock() - if ds, ok = s.datasets[k]; ok { - return ds + s.headsLock.Lock() + defer s.headsLock.Unlock() + h, ok = s.heads[k] + if ok { + return h.head } - h := memdb.NewHead(s.sw.headMetrics) - ds = &dataset{ + nh := memdb.NewHead(s.sw.headMetrics) + + s.heads[k] = dataset{ key: k, - head: h, + head: nh, } - s.datasets[k] = ds - return ds + return nh } func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, meta *metastorev1.BlockMeta, s *segment) error {