Skip to content

Ruler: Add support for per-user external labels #6340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3519,6 +3519,9 @@ query_rejection:
# CLI flag: -ruler.query-offset
[ruler_query_offset: <duration> | default = 0s]

# external labels for alerting rules
[ruler_external_labels: <map of string (labelName) to string (labelValue)> | default = []]

# The default tenant's shard size when the shuffle-sharding strategy is used.
# Must be set when the store-gateway sharding is enabled with the
# shuffle-sharding strategy. When this setting is specified in the per-tenant
Expand Down
7 changes: 4 additions & 3 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) {
// no need to initialize module if load path is empty
return nil, nil
}
t.Cfg.RuntimeConfig.Loader = loadRuntimeConfig
runtimeConfigLoader := runtimeConfigLoader{cfg: t.Cfg}
t.Cfg.RuntimeConfig.Loader = runtimeConfigLoader.load

// make sure to set default limits before we start loading configuration into memory
validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig)
Expand Down Expand Up @@ -612,14 +613,14 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
}

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
} else {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
}

if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ func (l *runtimeConfigTenantLimits) AllByUserID() map[string]*validation.Limits
return nil
}

func loadRuntimeConfig(r io.Reader) (interface{}, error) {
type runtimeConfigLoader struct {
cfg Config
}

func (l runtimeConfigLoader) load(r io.Reader) (interface{}, error) {
var overrides = &RuntimeConfigValues{}

decoder := yaml.NewDecoder(r)
Expand All @@ -74,6 +78,12 @@ func loadRuntimeConfig(r io.Reader) (interface{}, error) {
return nil, errMultipleDocuments
}

for _, ul := range overrides.TenantLimits {
if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels); err != nil {
return nil, err
}
}

return overrides, nil
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/cortex/runtime_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/util/validation"
)

Expand All @@ -28,7 +29,8 @@ overrides:
'1235': *id001
'1236': *id001
`)
runtimeCfg, err := loadRuntimeConfig(yamlFile)
loader := runtimeConfigLoader{cfg: Config{Distributor: distributor.Config{ShardByAllLabels: true}}}
runtimeCfg, err := loader.load(yamlFile)
require.NoError(t, err)

limits := validation.Limits{
Expand All @@ -51,7 +53,7 @@ func TestLoadRuntimeConfig_ShouldLoadEmptyFile(t *testing.T) {
yamlFile := strings.NewReader(`
# This is an empty YAML.
`)
actual, err := loadRuntimeConfig(yamlFile)
actual, err := runtimeConfigLoader{}.load(yamlFile)
require.NoError(t, err)
assert.Equal(t, &RuntimeConfigValues{}, actual)
}
Expand All @@ -60,7 +62,7 @@ func TestLoadRuntimeConfig_MissingPointerFieldsAreNil(t *testing.T) {
yamlFile := strings.NewReader(`
# This is an empty YAML.
`)
actual, err := loadRuntimeConfig(yamlFile)
actual, err := runtimeConfigLoader{}.load(yamlFile)
require.NoError(t, err)

actualCfg, ok := actual.(*RuntimeConfigValues)
Expand Down Expand Up @@ -102,7 +104,7 @@ overrides:
}

for _, tc := range cases {
actual, err := loadRuntimeConfig(strings.NewReader(tc))
actual, err := runtimeConfigLoader{}.load(strings.NewReader(tc))
assert.Equal(t, errMultipleDocuments, err)
assert.Nil(t, actual)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func TestRuler_LimitsPerGroup(t *testing.T) {
r := newTestRuler(t, cfg, store, nil)
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

r.limits = ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1}
r.limits = &ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1}

a := NewAPI(r, r.store, log.NewNopLogger())

Expand Down Expand Up @@ -508,7 +508,7 @@ func TestRuler_RulerGroupLimits(t *testing.T) {
r := newTestRuler(t, cfg, store, nil)
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

r.limits = ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1}
r.limits = &ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1}

a := NewAPI(r, r.store, log.NewNopLogger())

Expand Down
1 change: 1 addition & 0 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type RulesLimits interface {
RulerMaxRulesPerRuleGroup(userID string) int
RulerQueryOffset(userID string) time.Duration
DisabledRuleGroups(userID string) validation.DisabledRuleGroups
RulerExternalLabels(userID string) labels.Labels
}

// EngineQueryFunc returns a new engine query function validating max queryLength.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestPusherErrors(t *testing.T) {
writes := prometheus.NewCounter(prometheus.CounterOpts{})
failures := prometheus.NewCounter(prometheus.CounterOpts{})

pa := NewPusherAppendable(pusher, "user-1", ruleLimits{}, writes, failures)
pa := NewPusherAppendable(pusher, "user-1", &ruleLimits{}, writes, failures)

lbls, err := parser.ParseMetric("foo_bar")
require.NoError(t, err)
Expand Down
68 changes: 68 additions & 0 deletions pkg/ruler/external_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package ruler

import (
"sync"

"github.com/prometheus/prometheus/model/labels"
)

// userExternalLabels checks and merges per-user external labels with global external labels.
type userExternalLabels struct {
global labels.Labels
limits RulesLimits
builder *labels.Builder

mtx sync.Mutex
users map[string]labels.Labels
}

func newUserExternalLabels(global labels.Labels, limits RulesLimits) *userExternalLabels {
return &userExternalLabels{
global: global,
limits: limits,
builder: labels.NewBuilder(nil),

mtx: sync.Mutex{},
users: map[string]labels.Labels{},
}
}

func (e *userExternalLabels) get(userID string) (labels.Labels, bool) {
e.mtx.Lock()
defer e.mtx.Unlock()
lset, ok := e.users[userID]
return lset, ok
}

func (e *userExternalLabels) update(userID string) (labels.Labels, bool) {
lset := e.limits.RulerExternalLabels(userID)

e.mtx.Lock()
defer e.mtx.Unlock()

e.builder.Reset(e.global)
for _, l := range lset {
e.builder.Set(l.Name, l.Value)
}
lset = e.builder.Labels()

if !labels.Equal(e.users[userID], lset) {
e.users[userID] = lset
return lset, true
}
return lset, false
}

func (e *userExternalLabels) remove(user string) {
e.mtx.Lock()
defer e.mtx.Unlock()
delete(e.users, user)
}

func (e *userExternalLabels) cleanup() {
e.mtx.Lock()
defer e.mtx.Unlock()
for user := range e.users {
delete(e.users, user)
}
}
69 changes: 69 additions & 0 deletions pkg/ruler/external_labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package ruler

import (
"testing"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)

func TestUserExternalLabels(t *testing.T) {
limits := ruleLimits{}
e := newUserExternalLabels(labels.FromStrings("from", "cortex"), &limits)

tests := []struct {
name string
removeBeforeTest bool
exists bool
userExternalLabels labels.Labels
expectedExternalLabels labels.Labels
}{
{
name: "global labels only",
removeBeforeTest: false,
exists: false,
userExternalLabels: nil,
expectedExternalLabels: labels.FromStrings("from", "cortex"),
},
{
name: "local labels without overriding",
removeBeforeTest: true,
exists: false,
userExternalLabels: labels.FromStrings("tag", "local"),
expectedExternalLabels: labels.FromStrings("from", "cortex", "tag", "local"),
},
{
name: "local labels that override globals",
removeBeforeTest: false,
exists: true,
userExternalLabels: labels.FromStrings("from", "cloud", "tag", "local"),
expectedExternalLabels: labels.FromStrings("from", "cloud", "tag", "local"),
},
}

const userID = "test-user"
for _, data := range tests {
data := data
t.Run(data.name, func(t *testing.T) {
if data.removeBeforeTest {
e.remove(userID)
}
_, exists := e.get(userID)
require.Equal(t, data.exists, exists)

limits.externalLabels = data.userExternalLabels
lset, ok := e.update(userID)
require.True(t, ok)
require.Equal(t, data.expectedExternalLabels, lset)
lset1, ok := e.update(userID)
require.False(t, ok) // Not updated.
require.Equal(t, data.expectedExternalLabels, lset1)
})
}

_, ok := e.get(userID)
require.True(t, ok)
e.cleanup()
_, ok = e.get(userID)
require.False(t, ok)
}
Loading
Loading