Skip to content

Make TSDB max exemplars config per tenant #5080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [CHANGE] Storage: Make Max exemplars config per tenant instead of global configuration. #5016
* [CHANGE] Alertmanager: Local file disclosure vulnerability in OpsGenie configuration has been fixed. #5045
* [ENHANCEMENT] Update Go version to 1.19.3. #4988
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
Expand Down
4 changes: 3 additions & 1 deletion docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,9 @@ blocks_storage:
# CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]

# Enables support for exemplars in TSDB and sets the maximum number that
# Deprecated, use maxExemplars in limits instead. If the MaxExemplars value
# in limits is set to zero, cortex will fallback on this value. This setting
# enables support for exemplars in TSDB and sets the maximum number that
# will be stored. 0 or less means disabled.
# CLI flag: -blocks-storage.tsdb.max-exemplars
[max_exemplars: <int> | default = 0]
Expand Down
4 changes: 3 additions & 1 deletion docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,9 @@ blocks_storage:
# CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]

# Enables support for exemplars in TSDB and sets the maximum number that
# Deprecated, use maxExemplars in limits instead. If the MaxExemplars value
# in limits is set to zero, cortex will fallback on this value. This setting
# enables support for exemplars in TSDB and sets the maximum number that
# will be stored. 0 or less means disabled.
# CLI flag: -blocks-storage.tsdb.max-exemplars
[max_exemplars: <int> | default = 0]
Expand Down
14 changes: 13 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,10 @@ lifecycler:
# CLI flag: -ingester.rate-update-period
[rate_update_period: <duration> | default = 15s]

# Period with which to update the per-user tsdb config.
# CLI flag: -ingester.user-tsdb-configs-update-period
[user_tsdb_configs_update_period: <duration> | default = 15s]

# Enable tracking of active series and export them as metrics.
# CLI flag: -ingester.active-series-metrics-enabled
[active_series_metrics_enabled: <boolean> | default = true]
Expand Down Expand Up @@ -2622,6 +2626,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# e.g. remote_write.write_relabel_configs.
[metric_relabel_configs: <relabel_config...> | default = ]

# Enables support for exemplars in TSDB and sets the maximum number that will be
# stored. less than zero means disabled. If the value is set to zero, cortex
# will fallback to blocks-storage.tsdb.max-exemplars value.
# CLI flag: -block-storage.tsdb.max-exemplars
[max_exemplars: <int> | default = 0]

# The maximum number of series for which a query can fetch samples from each
# ingester. This limit is enforced only in the ingesters (when querying samples
# not flushed to the storage yet) and it's a per-instance limit. This limit is
Expand Down Expand Up @@ -3721,7 +3731,9 @@ tsdb:
# CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]

# Enables support for exemplars in TSDB and sets the maximum number that will
# Deprecated, use maxExemplars in limits instead. If the MaxExemplars value in
# limits is set to zero, cortex will fallback on this value. This setting
# enables support for exemplars in TSDB and sets the maximum number that will
# be stored. 0 or less means disabled.
# CLI flag: -blocks-storage.tsdb.max-exemplars
[max_exemplars: <int> | default = 0]
Expand Down
15 changes: 15 additions & 0 deletions docs/configuration/runtime-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
overrides:
tenant1:
ingestion_rate: 10000
max_exemplars: 1
tenant2:
ingestion_rate: 10000
max_exemplars: 0

multi_kv_config:
mirror_enabled: false
primary: memberlist

ingester_limits:
max_ingestion_rate: 42000
max_inflight_push_requests: 10000
57 changes: 52 additions & 5 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync"
"time"

"github.com/prometheus/prometheus/config"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
Expand Down Expand Up @@ -92,7 +94,8 @@ type Config struct {
// Config for metadata purging.
MetadataRetainPeriod time.Duration `yaml:"metadata_retain_period"`

RateUpdatePeriod time.Duration `yaml:"rate_update_period"`
RateUpdatePeriod time.Duration `yaml:"rate_update_period"`
UserTSDBConfigsUpdatePeriod time.Duration `yaml:"user_tsdb_configs_update_period"`

ActiveSeriesMetricsEnabled bool `yaml:"active_series_metrics_enabled"`
ActiveSeriesMetricsUpdatePeriod time.Duration `yaml:"active_series_metrics_update_period"`
Expand Down Expand Up @@ -126,6 +129,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MetadataRetainPeriod, "ingester.metadata-retain-period", 10*time.Minute, "Period at which metadata we have not seen will remain in memory before being deleted.")

f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")
f.DurationVar(&cfg.UserTSDBConfigsUpdatePeriod, "ingester.user-tsdb-configs-update-period", 15*time.Second, "Period with which to update the per-user tsdb config.")
f.BoolVar(&cfg.ActiveSeriesMetricsEnabled, "ingester.active-series-metrics-enabled", true, "Enable tracking of active series and export them as metrics.")
f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.")
f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.")
Expand Down Expand Up @@ -782,6 +786,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
rateUpdateTicker := time.NewTicker(i.cfg.RateUpdatePeriod)
defer rateUpdateTicker.Stop()

userTSDBConfigTicker := time.NewTicker(i.cfg.UserTSDBConfigsUpdatePeriod)
defer userTSDBConfigTicker.Stop()

ingestionRateTicker := time.NewTicker(instanceIngestionRateTickInterval)
defer ingestionRateTicker.Stop()

Expand Down Expand Up @@ -812,7 +819,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error {

case <-activeSeriesTickerChan:
i.updateActiveSeries()

case <-userTSDBConfigTicker.C:
i.updateUserTSDBConfigs()
case <-ctx.Done():
return nil
case err := <-i.subservicesWatcher.Chan():
Expand All @@ -821,6 +829,43 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
}
}

func (i *Ingester) updateUserTSDBConfigs() {
for _, userID := range i.getTSDBUsers() {
userDB := i.getTSDB(userID)
if userDB == nil {
continue
}

cfg := &config.Config{
StorageConfig: config.StorageConfig{
ExemplarsConfig: &config.ExemplarsConfig{
MaxExemplars: i.getMaxExemplars(userID),
},
},
}

// This method currently updates the MaxExemplars and OutOfOrderTimeWindow. Invoking this method
// with a 0 value of OutOfOrderTimeWindow simply updates Max Exemplars.
err := userDB.db.ApplyConfig(cfg)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method currently updates OutOfOrderTimeWindow and MaxExemplars value. I could not find usage of OutOfOrderTimeWindow in ingester. Please point me to where its configured per tsdb if it needs to be updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That one is fine. The OOO config is still WIP so we can ignore that in this pr.

if err != nil {
level.Error(logutil.WithUserID(userID, i.logger)).Log("msg", "failed to update user tsdb configuration.")
}
}
}

// getMaxExemplars returns the maxExemplars value set in limits config.
// If limits value is set to zero, it falls back to old configuration
// in block storage config.
func (i *Ingester) getMaxExemplars(userID string) int64 {
maxExemplarsFromLimits := i.limits.MaxExemplars(userID)

if maxExemplarsFromLimits == 0 {
return int64(i.cfg.BlocksStorageConfig.TSDB.MaxExemplars)
}

return int64(maxExemplarsFromLimits)
}

func (i *Ingester) updateActiveSeries() {
purgeTime := time.Now().Add(-i.cfg.ActiveSeriesMetricsIdleTimeout)

Expand Down Expand Up @@ -1043,7 +1088,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
})
}

if i.cfg.BlocksStorageConfig.TSDB.MaxExemplars > 0 {
maxExemplarsForUser := i.getMaxExemplars(userID)
if maxExemplarsForUser > 0 {
// app.AppendExemplar currently doesn't create the series, it must
// already exist. If it does not then drop.
if ref == 0 && len(ts.Exemplars) > 0 {
Expand Down Expand Up @@ -1832,7 +1878,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
}

enableExemplars := false
if i.cfg.BlocksStorageConfig.TSDB.MaxExemplars > 0 {
maxExemplarsForUser := i.getMaxExemplars(userID)
if maxExemplarsForUser > 0 {
enableExemplars = true
}
// Create a new user database
Expand All @@ -1849,7 +1896,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
BlocksToDelete: userDB.blocksToDelete,
EnableExemplarStorage: enableExemplars,
IsolationDisabled: true,
MaxExemplars: int64(i.cfg.BlocksStorageConfig.TSDB.MaxExemplars),
MaxExemplars: maxExemplarsForUser,
HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize,
EnableMemorySnapshotOnShutdown: i.cfg.BlocksStorageConfig.TSDB.MemorySnapshotOnShutdown,
}, nil)
Expand Down
29 changes: 27 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,9 +774,10 @@ func TestIngester_Push(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0
cfg.ActiveSeriesMetricsEnabled = !testData.disableActiveSeries
cfg.BlocksStorageConfig.TSDB.MaxExemplars = testData.maxExemplars

i, err := prepareIngesterWithBlocksStorage(t, cfg, registry)
limits := defaultLimitsTestConfig()
limits.MaxExemplars = testData.maxExemplars
i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, "", registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -4015,6 +4016,30 @@ func TestIngester_inflightPushRequests(t *testing.T) {
require.NoError(t, g.Wait())
}

func TestIngester_MaxExemplarsFallBack(t *testing.T) {
// Create ingester.
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.MaxExemplars = 2

dir := t.TempDir()
blocksDir := filepath.Join(dir, "blocks")
limits := defaultLimitsTestConfig()
i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, blocksDir, nil)
require.NoError(t, err)

maxExemplars := i.getMaxExemplars("someTenant")
require.Equal(t, maxExemplars, int64(2))

// set max exemplars value in limits, and re-initialize the ingester
limits.MaxExemplars = 5
i, err = prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, blocksDir, nil)
require.NoError(t, err)

// validate this value is picked up now
maxExemplars = i.getMaxExemplars("someTenant")
require.Equal(t, maxExemplars, int64(5))
}

func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest {
var lbls = make([]labels.Labels, 0, count)
var samples = make([]cortexpb.Sample, 0, count)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.WALSegmentSizeBytes, "blocks-storage.tsdb.wal-segment-size-bytes", wal.DefaultSegmentSize, "TSDB WAL segments files max size (bytes).")
f.BoolVar(&cfg.FlushBlocksOnShutdown, "blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.")
f.DurationVar(&cfg.CloseIdleTSDBTimeout, "blocks-storage.tsdb.close-idle-tsdb-timeout", 0, "If TSDB has not received any data for this duration, and all blocks from TSDB have been shipped, TSDB is closed and deleted from local disk. If set to positive value, this value should be equal or higher than -querier.query-ingesters-within flag to make sure that TSDB is not closed prematurely, which could cause partial query results. 0 or negative value disables closing of idle TSDB.")
f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alanprot Is it okay to remove an existing flag cause it breaks backward compatibility?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shuold keep around for 2 releases as per governance, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Maybe we can do something like #5068

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the original flag in storage config, and marked it as deprecated.

Further, cortex will fall back to storage config value if exemplars value for the user is set to zero. Discussed these changes with @alanprot offline.

f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", chunks.DefaultWriteQueueSize, "The size of the in-memory queue used before flushing chunks to the disk.")
f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Deprecated, use maxExemplars in limits instead. If the MaxExemplars value in limits is set to zero, cortex will fallback on this value. This setting enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.")
f.BoolVar(&cfg.MemorySnapshotOnShutdown, "blocks-storage.tsdb.memory-snapshot-on-shutdown", false, "True to enable snapshotting of in-memory TSDB data on disk when shutting down.")
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Limits struct {
EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"`
IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"`
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."`
MaxExemplars int `yaml:"max_exemplars" json:"max_exemplars"`

// Ingester enforced limits.
// Series
Expand All @@ -63,6 +64,7 @@ type Limits struct {
MaxLocalSeriesPerMetric int `yaml:"max_series_per_metric" json:"max_series_per_metric"`
MaxGlobalSeriesPerUser int `yaml:"max_global_series_per_user" json:"max_global_series_per_user"`
MaxGlobalSeriesPerMetric int `yaml:"max_global_series_per_metric" json:"max_global_series_per_metric"`

// Metadata
MaxLocalMetricsWithMetadataPerUser int `yaml:"max_metadata_per_user" json:"max_metadata_per_user"`
MaxLocalMetadataPerMetric int `yaml:"max_metadata_per_metric" json:"max_metadata_per_metric"`
Expand Down Expand Up @@ -147,6 +149,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxLocalSeriesPerMetric, "ingester.max-series-per-metric", 50000, "The maximum number of active series per metric name, per ingester. 0 to disable.")
f.IntVar(&l.MaxGlobalSeriesPerUser, "ingester.max-global-series-per-user", 0, "The maximum number of active series per user, across the cluster before replication. 0 to disable. Supported only if -distributor.shard-by-all-labels is true.")
f.IntVar(&l.MaxGlobalSeriesPerMetric, "ingester.max-global-series-per-metric", 0, "The maximum number of active series per metric name, across the cluster before replication. 0 to disable.")
f.IntVar(&l.MaxExemplars, "block-storage.tsdb.max-exemplars", 0, "Enables support for exemplars in TSDB and sets the maximum number that will be stored. less than zero means disabled. If the value is set to zero, cortex will fallback to blocks-storage.tsdb.max-exemplars value.")

f.IntVar(&l.MaxLocalMetricsWithMetadataPerUser, "ingester.max-metadata-per-user", 8000, "The maximum number of active metrics with metadata per user, per ingester. 0 to disable.")
f.IntVar(&l.MaxLocalMetadataPerMetric, "ingester.max-metadata-per-metric", 10, "The maximum number of metadata per metric, per ingester. 0 to disable.")
Expand Down Expand Up @@ -564,6 +567,11 @@ func (o *Overrides) AlertmanagerReceiversBlockPrivateAddresses(user string) bool
return o.GetOverridesForUser(user).AlertmanagerReceiversBlockPrivateAddresses
}

// MaxExemplars gets the maximum number of exemplars that will be stored per user. 0 or less means disabled.
func (o *Overrides) MaxExemplars(userID string) int {
return o.GetOverridesForUser(userID).MaxExemplars
}

// Notification limits are special. Limits are returned in following order:
// 1. per-tenant limits for given integration
// 2. default limits for given integration
Expand Down
32 changes: 32 additions & 0 deletions pkg/util/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,3 +532,35 @@ testuser:
})
}
}

func TestMaxExemplarsOverridesPerTenant(t *testing.T) {
SetDefaultLimitsForYAMLUnmarshalling(Limits{
MaxLabelNameLength: 100,
})

baseYAML := `
max_exemplars: 5`
overridesYAML := `
tenant1:
max_exemplars: 1
tenant2:
max_exemplars: 3
`

l := Limits{}
err := yaml.UnmarshalStrict([]byte(baseYAML), &l)
require.NoError(t, err)

overrides := map[string]*Limits{}
err = yaml.Unmarshal([]byte(overridesYAML), &overrides)
require.NoError(t, err, "parsing overrides")

tl := newMockTenantLimits(overrides)

ov, err := NewOverrides(l, tl)
require.NoError(t, err)

require.Equal(t, 1, ov.MaxExemplars("tenant1"))
require.Equal(t, 3, ov.MaxExemplars("tenant2"))
require.Equal(t, 5, ov.MaxExemplars("tenant3"))
}