Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ require (
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3
github.com/cespare/xxhash/v2 v2.3.0
github.com/google/go-cmp v0.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/sercand/kuberesolver/v5 v5.1.1
go.opentelemetry.io/collector/pdata v1.21.0
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
Expand Down Expand Up @@ -163,7 +164,6 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/serf v0.10.1 // indirect
github.com/jessevdk/go-flags v1.5.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
13 changes: 5 additions & 8 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
return err
}

if u.labelsStringInterningEnabled {
metric.InternStrings(u.interner.Intern)
}

return nil
}

Expand All @@ -454,9 +458,6 @@ func (u *userTSDB) PostCreation(metric labels.Labels) {
}
u.seriesInMetric.increaseSeriesForMetric(metricName)
u.labelSetCounter.increaseSeriesLabelSet(u, metric)
if u.labelsStringInterningEnabled {
metric.InternStrings(u.interner.Intern)
}

if u.postingCache != nil {
u.postingCache.ExpireSeries(metric)
Expand All @@ -475,9 +476,6 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels)
}
u.seriesInMetric.decreaseSeriesForMetric(metricName)
u.labelSetCounter.decreaseSeriesLabelSet(u, metric)
if u.labelsStringInterningEnabled {
metric.ReleaseStrings(u.interner.Release)
}
if u.postingCache != nil {
u.postingCache.ExpireSeries(metric)
}
Expand Down Expand Up @@ -1233,7 +1231,6 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
} else {
// Copy the label set because both TSDB and the active series tracker may retain it.
copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels)

// Retain the reference in case there are multiple samples for the series.
if ref, err = app.Append(0, copiedLabels, s.TimestampMs, s.Value); err == nil {
succeededSamplesCount++
Expand Down Expand Up @@ -2201,7 +2198,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {

instanceLimitsFn: i.getInstanceLimits,
instanceSeriesCount: &i.TSDBState.seriesCount,
interner: util.NewInterner(),
interner: util.NewLruInterner(),
labelsStringInterningEnabled: i.cfg.LabelsStringInterningEnabled,

blockRetentionPeriod: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(),
Expand Down
64 changes: 64 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,70 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {

}

func TestPushRace(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.LabelsStringInterningEnabled = true
cfg.LifecyclerConfig.JoinAfter = 0
dir := t.TempDir()
blocksDir := filepath.Join(dir, "blocks")

require.NoError(t, os.Mkdir(blocksDir, os.ModePerm))

ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, defaultLimitsTestConfig(), nil, blocksDir, prometheus.NewRegistry(), true)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
// Wait until it's ACTIVE
test.Poll(t, time.Second, ring.ACTIVE, func() interface{} {
return ing.lifecycler.GetState()
})

ctx := user.InjectOrgID(context.Background(), userID)
sample1 := cortexpb.Sample{
TimestampMs: 0,
Value: 1,
}

concurrentRequest := 100
numberOfSeries := 100
wg := sync.WaitGroup{}
wg.Add(numberOfSeries * concurrentRequest)
for k := 0; k < numberOfSeries; k++ {
for i := 0; i < concurrentRequest; i++ {
go func() {
defer wg.Done()
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()
}
}

wg.Wait()

db := ing.getTSDB(userID)
ir, err := db.db.Head().Index()
require.NoError(t, err)

p, err := ir.Postings(ctx, "", "")
require.NoError(t, err)
p = ir.SortedPostings(p)
total := 0
var builder labels.ScratchBuilder

for p.Next() {
total++
err = ir.Series(p.At(), &builder, nil)
require.NoError(t, err)
lbls := builder.Labels()
require.Equal(t, "foo", lbls.Get(labels.MetricName))
require.Equal(t, "1", lbls.Get("userId"))
require.NotEmpty(t, lbls.Get("k"))
builder.Reset()
}
require.Equal(t, numberOfSeries, total)
require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries())
}

func TestIngesterUserLimitExceeded(t *testing.T) {
limits := defaultLimitsTestConfig()
limits.MaxLocalSeriesPerUser = 1
Expand Down
73 changes: 17 additions & 56 deletions pkg/util/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,18 @@ package util
import (
"context"
"sync"
"time"
"unsafe"

"github.com/bboreham/go-loser"
"go.uber.org/atomic"
"github.com/hashicorp/golang-lru/v2/expirable"
)

const (
// Max size is ser to 2M.
maxInternerLruCacheSize = 2e6
// TTL should be similar to the head compaction interval
internerLruCacheTTL = time.Hour * 2
)

// StringsContain returns true if the search value is within the list of input values.
Expand Down Expand Up @@ -145,30 +153,18 @@ func MergeSortedSlices(ctx context.Context, a ...[]string) ([]string, error) {

type Interner interface {
Intern(s string) string
Release(s string)
}

// NewInterner returns a new Interner to be used to intern strings.
// Based on https://github.com/prometheus/prometheus/blob/726ed124e4468d0274ba89b0934a6cc8c975532d/storage/remote/intern.go#L51
func NewInterner() Interner {
// NewLruInterner returns a new Interner to be used to intern strings.
// The interner will use a LRU cache to return the deduplicated strings
func NewLruInterner() Interner {
return &pool{
pool: map[string]*entry{},
lru: expirable.NewLRU[string, string](maxInternerLruCacheSize, nil, internerLruCacheTTL),
}
}

type pool struct {
mtx sync.RWMutex
pool map[string]*entry
}

type entry struct {
refs atomic.Int64

s string
}

func newEntry(s string) *entry {
return &entry{s: s}
lru *expirable.LRU[string, string]
}

// Intern returns the interned string. It returns the canonical representation of string.
Expand All @@ -177,45 +173,10 @@ func (p *pool) Intern(s string) string {
return ""
}

p.mtx.RLock()
interned, ok := p.pool[s]
p.mtx.RUnlock()
interned, ok := p.lru.Get(s)
if ok {
interned.refs.Inc()
return interned.s
return interned
}
p.mtx.Lock()
defer p.mtx.Unlock()
if interned, ok := p.pool[s]; ok {
interned.refs.Inc()
return interned.s
}

p.pool[s] = newEntry(s)
p.pool[s].refs.Store(1)
p.lru.Add(s, s)
return s
}

// Release releases a reference of the string `s`.
// If the reference count become 0, the string `s` is removed from the memory
func (p *pool) Release(s string) {
p.mtx.RLock()
interned, ok := p.pool[s]
p.mtx.RUnlock()

if !ok {
return
}

refs := interned.refs.Dec()
if refs > 0 {
return
}

p.mtx.Lock()
defer p.mtx.Unlock()
if interned.refs.Load() != 0 {
return
}
delete(p.pool, s)
}
Loading
Loading