From b1a435ea61953694267205cab4ead85716cbbcf8 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Sun, 28 Sep 2025 16:49:59 +0800 Subject: [PATCH 1/8] feat(ratelimitprocessor): add class-based limiting Introduces class-based dynamic rate limiting to the `ratelimitprocessor` allowing different rate limiting based on the received client metadata. The new architecture allows users to define named rate limit classes in the processor configuration, each with its own static rate, burst, and whether or not they support dynamic rate limiting. A new class resolver extension can then be used to map unique keys to class names, enabling dynamic assignment of rate limits based on custom resolution providers. The processor now supports a clear precedence order: per-key overrides take highest priority, followed by resolved class, default class, and finally the global fallback configuration. More telemetry has been added to provide visibility into rate limit resolution, dynamic escalations, degraded operation, and resolver failures. See the updated README for detailed usage instructions. Signed-off-by: Marc Lopez Rubio --- processor/ratelimitprocessor/README.md | 112 ++++ processor/ratelimitprocessor/config.go | 132 ++++- processor/ratelimitprocessor/config_test.go | 61 ++- processor/ratelimitprocessor/documentation.md | 52 ++ processor/ratelimitprocessor/factory.go | 40 +- processor/ratelimitprocessor/factory_test.go | 4 +- processor/ratelimitprocessor/gubernator.go | 99 +++- .../ratelimitprocessor/gubernator_test.go | 516 +++++++++++++++++- .../internal/metadata/generated_telemetry.go | 42 +- .../metadatatest/generated_telemetrytest.go | 64 +++ .../generated_telemetrytest_test.go | 16 + processor/ratelimitprocessor/local.go | 3 +- processor/ratelimitprocessor/metadata.yaml | 40 ++ processor/ratelimitprocessor/processor.go | 40 +- .../ratelimitprocessor/processor_test.go | 4 +- .../ratelimitprocessor/testdata/config.yaml | 37 ++ 16 files changed, 1153 insertions(+), 109 deletions(-) diff --git a/processor/ratelimitprocessor/README.md b/processor/ratelimitprocessor/README.md index 9e5187c90..cbb3c8e38 100644 --- a/processor/ratelimitprocessor/README.md +++ b/processor/ratelimitprocessor/README.md @@ -18,6 +18,8 @@ a in-memory rate limiter, or makes use of a [gubernator](https://github.com/gube | `type` | Type of rate limiter. Options are `local` or `gubernator`. | No | `local` | | `overrides` | Allows customizing rate limiting parameters for specific metadata key-value pairs. Use this to apply different rate limits to different tenants, projects, or other entities identified by metadata. Each override is keyed by a metadata value and can specify custom `rate`, `burst`, and `throttle_interval` settings that take precedence over the global configuration for matching requests. | No | | | `dynamic_limits` | Holds the dynamic rate limiting configuration. This is only applicable when the rate limiter type is `gubernator`. | No | | +| `classes` | Named rate limit class definitions for class-based dynamic rate limiting. Only applicable when the rate limiter type is `gubernator`. | No | | +| `default_class` | Default class name to use when resolver returns unknown/empty class. Must exist in classes when set. Only applicable when the rate limiter type is `gubernator`. | No | | ### Overrides @@ -191,3 +193,113 @@ processors: window_multiplier: 1.5 window_duration: 1m ``` + +### Class-Based Dynamic Rate Limiting + +When using the Gubernator rate limiter type, you can define named rate limit classes with different base rates and escalation policies. A class resolver extension maps unique keys to class names, enabling different rate limits per customer tier, API version, or other business logic. + +Example with class-based configuration: + +```yaml +extension: + configmapclassresolverextension: + name: resolutions + namespace: default + +processors: + ratelimiter: + metadata_keys: + - x-customer-id + rate: 100 # fallback rate + burst: 200 # fallback burst + type: gubernator + strategy: requests + + # define the class resolver ID. + class_resolver: configmapclassresolverextension + # Define named classes + classes: + trial: + rate: 50 # 50 requests/second for trial users + burst: 100 # burst capacity of 100 + static_only: true # no dynamic escalation + + paying: + rate: 500 # 500 requests/second for paying customers + burst: 1000 # burst capacity of 1000 + static_only: false + + enterprise: + rate: 2000 # 2000 requests/second for enterprise + burst: 4000 # burst capacity of 4000 + static_only: false # allow gradual increase. + + # Default class when resolver returns unknown class + default_class: "trial" + + # Enable dynamic rate limiting + dynamic_limits: + enabled: true + window_multiplier: 1.3 + window_duration: 60s + + # Per-key overrides (highest precedence) + overrides: + "customer-123": + rate: 5000 # special override + burst: 10000 + static_only: true +``` + +Class Resolution Precedence: + +1. **Per-key override** (highest) - From `overrides` section +2. **Resolved class** - From class resolver extension +3. **Default class** - From `default_class` setting +4. **Top-level fallback** (lowest) - From top-level `rate`/`burst` + +### Class resolver + +The `class_resolver` option tells the processor which OpenTelemetry Collector extension should be used to map a unique key (derived from the configured `metadata_keys`) to a class name. The value is the extension ID (the extension's configured name), for example: + +```yaml +processors: + ratelimiter: + class_resolver: configmapclassresolverextension +``` + +Behavior and notes: + +* The resolver is optional. If no `class_resolver` is configured the processor skips class resolution and falls back to the top-level `rate`/`burst` values. + +* The processor initializes the resolver during Start. If the extension is not found the processor logs a warning and proceeds without resolution. + +* When the resolver returns an unknown or empty class name, the processor treats it as "unknown" and uses the configured `default_class` (if set) or falls back to the top-level rate/burst. + +Caching and performance: + +* Implementations of resolver extensions should be mindful of latency; the processor assumes the resolver is reasonably fast. The processor will fall back when resolver errors occur or if the extension is absent. + +* Ideally, implementations of the `class_resolver` implement their own caching to guarantee performance if they require on an external source. + +Telemetry and metrics: + +* The processor emits attributes on relevant metrics to aid debugging and monitoring: + + * `rate_source`: one of `static`, `dynamic`, `fallback`, or `degraded` (indicates whether dynamic calculation was used or not) + + * `class`: resolved class name when applicable + + * `source_kind`: which precedence path was used (`override`, `class`, or `fallback`) + +* Counters introduced to observe resolver and dynamic behavior include: + + * `ratelimit.resolver_failures` — total number of resolver failures + + * `ratelimit.gubernator_degraded` — total number of operations in degraded mode due to Gubernator unavailability + + * `ratelimit.dynamic_escalations` — number of times dynamic rate > static rate (attributes: `class`, `source_kind`) + + * `ratelimit.dynamic_escalations_skipped` — number of times dynamic escalation was skipped because dynamic <= static (attributes: `class`, `source_kind`) + +These telemetry signals help operators understand when resolution or dynamic logic is active and diagnose fallback behavior. diff --git a/processor/ratelimitprocessor/config.go b/processor/ratelimitprocessor/config.go index 3ba10128e..40fd15dbf 100644 --- a/processor/ratelimitprocessor/config.go +++ b/processor/ratelimitprocessor/config.go @@ -47,6 +47,24 @@ type Config struct { // // Defaults to empty Overrides map[string]RateLimitOverrides `mapstructure:"overrides"` + + // Classes holds named rate limit class definitions for class-based dynamic rate limiting. + // Only applicable when the rate limiter type is "gubernator". + // + // Defaults to empty + Classes map[string]Class `mapstructure:"classes"` + + // DefaultClass specifies the class name to use when no override exists and + // the class resolver returns unknown/empty class. Must exist in Classes when set. + // Only applicable when the rate limiter type is "gubernator". + // + // Defaults to empty (no default class) + DefaultClass string `mapstructure:"default_class"` + + // ClassResolverClass is the component ID of the class resolver extension to use. + // If not set, class resolution is disabled. + // Only applicable when the rate limiter type is "gubernator". + ClassResolver component.ID `mapstructure:"class_resolver"` } // DynamicRateLimiting defines settings for dynamic rate limiting. @@ -61,6 +79,19 @@ type DynamicRateLimiting struct { WindowDuration time.Duration `mapstructure:"window_duration"` } +// Class defines a named rate limit class for class-based dynamic rate limiting. +type Class struct { + // Rate holds bucket refill rate, in tokens per second. + Rate int `mapstructure:"rate"` + + // Burst holds the maximum capacity of rate limit buckets. + Burst int `mapstructure:"burst"` + + // StaticOnly disables dynamic rate escalation for this class. + // When true, effective rate will always be the static Rate. + StaticOnly bool `mapstructure:"static_only"` +} + // Validate checks the DynamicRateLimiting configuration. func (d *DynamicRateLimiting) Validate() error { if !d.Enabled { @@ -76,6 +107,21 @@ func (d *DynamicRateLimiting) Validate() error { return errors.Join(errs...) } +// Validate checks the Class configuration. +func (c *Class) Validate() error { + var errs []error + if c.Rate <= 0 { + errs = append(errs, errors.New("rate must be greater than zero")) + } + if c.Burst < 0 { + errs = append(errs, errors.New("burst must be non-negative")) + } + if c.Burst > 0 && c.Burst < c.Rate { + errs = append(errs, errors.New("burst must be greater than or equal to rate when specified")) + } + return errors.Join(errs...) +} + // RateLimitSettings holds the core rate limiting configuration. type RateLimitSettings struct { // Strategy holds the rate limiting strategy. @@ -102,6 +148,10 @@ type RateLimitSettings struct { disableDynamic bool `mapstructure:"-"` } +// RateLimitOverrides defines per-unique-key override settings. +// It replaces the top-level RateLimitSettings fields when the unique key matches. +// Nil pointer fields leave the corresponding top-level field unchanged. +// StaticOnly disables dynamic escalation for that specific key when true. type RateLimitOverrides struct { // Rate holds the override rate limit. StaticOnly bool `mapstructure:"static_only"` @@ -197,16 +247,26 @@ func createDefaultConfig() component.Config { WindowMultiplier: 1.3, WindowDuration: 2 * time.Minute, }, + Classes: nil, + DefaultClass: "", } } -// resolveRateLimitSettings returns the rate limit settings for the given unique key. -// If no override is found, the default rate limit settings are returned. -func resolveRateLimitSettings(cfg *Config, uniqueKey string) RateLimitSettings { - // We start from the default settings - result := cfg.RateLimitSettings - if override, ok := cfg.Overrides[uniqueKey]; ok { - // If an override is found, we apply it +// resolveRateLimit computes the effective RateLimitSettings for a given unique key. +// It unifies the legacy per-key override resolution and the class-based precedence logic. +// Precedence order: +// 1. Explicit per-key override (SourceKindOverride) +// 2. Resolved class (SourceKindClass) +// 3. DefaultClass (SourceKindClass) +// 4. Top-level fallback config (SourceKindFallback) +// +// When sourceKind is override or fallback, className will be empty. +func resolveRateLimit(cfg *Config, + uniqueKey, className string, +) (result RateLimitSettings, kind SourceKind, name string) { + result = cfg.RateLimitSettings + // 1. Per-key override takes absolute precedence regardless of classes. + if override, hasOverride := cfg.Overrides[uniqueKey]; hasOverride { if override.Rate != nil { result.Rate = *override.Rate } @@ -219,10 +279,51 @@ func resolveRateLimitSettings(cfg *Config, uniqueKey string) RateLimitSettings { if override.StaticOnly { result.disableDynamic = true } + return result, SourceKindOverride, "" + } + // 2. Resolved class (only if provided and exists) + if className != "" { + if class, exists := cfg.Classes[className]; exists { + result.Rate = class.Rate + if class.Burst > 0 { + result.Burst = class.Burst + } + if class.StaticOnly { + result.disableDynamic = true + } + return result, SourceKindClass, className + } } - return result + // 3. DefaultClass (if configured & exists) + if cfg.DefaultClass != "" { + if class, exists := cfg.Classes[cfg.DefaultClass]; exists { + result.Rate = class.Rate + if class.Burst > 0 { + result.Burst = class.Burst + } + if class.StaticOnly { + result.disableDynamic = true + } + return result, SourceKindClass, cfg.DefaultClass + } + } + // 4. Fallback to top-level settings. + return cfg.RateLimitSettings, SourceKindFallback, "" } +// SourceKind indicates the source of rate limit settings for telemetry. +type SourceKind string + +const ( + // SourceKindOverride indicates the settings originated from an explicit per-key override. + SourceKindOverride SourceKind = "override" + // SourceKindClass indicates the settings originated from a class (either resolved or default_class). + SourceKindClass SourceKind = "class" + // SourceKindFallback indicates the settings originated from the top-level fallback configuration. + SourceKindFallback SourceKind = "fallback" +) + +// Validate performs semantic validation of RateLimitSettings. func (r *RateLimitSettings) Validate() error { var errs []error if r.Rate <= 0 { @@ -243,6 +344,7 @@ func (r *RateLimitSettings) Validate() error { return errors.Join(errs...) } +// Validate performs semantic validation of a RateLimitOverrides instance. func (r *RateLimitOverrides) Validate() error { var errs []error if r.Rate != nil { @@ -270,6 +372,19 @@ func (config *Config) Validate() error { if err := config.DynamicRateLimiting.Validate(); err != nil { errs = append(errs, err) } + // Validate class-based configuration + if config.DefaultClass != "" { + if config.Classes == nil { + errs = append(errs, errors.New("default_class specified but no classes defined")) + } else if _, exists := config.Classes[config.DefaultClass]; !exists { + errs = append(errs, fmt.Errorf("default_class %q does not exist in classes", config.DefaultClass)) + } + } + for className, class := range config.Classes { + if err := class.Validate(); err != nil { + errs = append(errs, fmt.Errorf("class %q: %w", className, err)) + } + } } for key, override := range config.Overrides { if err := override.Validate(); err != nil { @@ -310,6 +425,7 @@ func (s ThrottleBehavior) Validate() error { ) } +// Validate ensures the RateLimiterType is one of the supported values. func (t RateLimiterType) Validate() error { switch t { case LocalRateLimiter, GubernatorRateLimiter: diff --git a/processor/ratelimitprocessor/config_test.go b/processor/ratelimitprocessor/config_test.go index e7d78350f..765641199 100644 --- a/processor/ratelimitprocessor/config_test.go +++ b/processor/ratelimitprocessor/config_test.go @@ -237,6 +237,18 @@ func TestLoadConfig(t *testing.T) { name: "invalid_type", expectedErr: `invalid rate limiter type "invalid", expected one of ["local" "gubernator"]`, }, + { + name: "invalid_default_class", + expectedErr: `default_class "nonexistent" does not exist in classes`, + }, + { + name: "invalid_class_rate_zero", + expectedErr: `class "trial": rate must be greater than zero`, + }, + { + name: "invalid_class_rate_negative", + expectedErr: `class "trial": rate must be greater than zero`, + }, } factory := NewFactory() @@ -262,7 +274,7 @@ func TestLoadConfig(t *testing.T) { } } -func TestResolveRateLimitSettings(t *testing.T) { +func TestResolveEffectiveRateLimit(t *testing.T) { cfg := &Config{ RateLimitSettings: RateLimitSettings{ Rate: 100, @@ -278,22 +290,45 @@ func TestResolveRateLimitSettings(t *testing.T) { ThrottleInterval: ptr(10 * time.Second), }, }, + Classes: map[string]Class{ + "trial": {Rate: 50, Burst: 50}, + "premium": {Rate: 1000, Burst: 2000, StaticOnly: true}, + }, + DefaultClass: "trial", } - t.Run("no override", func(t *testing.T) { - result := resolveRateLimitSettings(cfg, "default") - require.Equal(t, cfg.RateLimitSettings, result) + t.Run("override", func(t *testing.T) { + res, kind, class := resolveRateLimit(cfg, "project-id:e678ebd7-3a15-43dd-a95c-1cf0639a6292", "trial") + require.Equal(t, SourceKindOverride, kind) + require.Empty(t, class) + require.Equal(t, 300, res.Rate) + require.Equal(t, 400, res.Burst) + require.Equal(t, 10*time.Second, res.ThrottleInterval) }) - t.Run("override", func(t *testing.T) { - result := resolveRateLimitSettings(cfg, "project-id:e678ebd7-3a15-43dd-a95c-1cf0639a6292") - require.Equal(t, RateLimitSettings{ - Rate: 300, - Burst: 400, - Strategy: StrategyRateLimitRequests, - ThrottleBehavior: ThrottleBehaviorError, - ThrottleInterval: 10 * time.Second, - }, result) + t.Run("resolved class", func(t *testing.T) { + res, kind, class := resolveRateLimit(cfg, "some-other-key", "premium") + require.Equal(t, SourceKindClass, kind) + require.Equal(t, "premium", class) + require.Equal(t, 1000, res.Rate) + require.Equal(t, 2000, res.Burst) + require.True(t, res.disableDynamic, "premium is static-only") + }) + + t.Run("default class fallback", func(t *testing.T) { + res, kind, class := resolveRateLimit(cfg, "another-key", "nonexistent") + require.Equal(t, SourceKindClass, kind) + require.Equal(t, "trial", class) + require.Equal(t, 50, res.Rate) + require.Equal(t, 50, res.Burst) + }) + + t.Run("top-level fallback", func(t *testing.T) { + cfgNoClasses := &Config{RateLimitSettings: cfg.RateLimitSettings} + res, kind, class := resolveRateLimit(cfgNoClasses, "key", "") + require.Equal(t, SourceKindFallback, kind) + require.Empty(t, class) + require.Equal(t, cfg.RateLimitSettings, res) }) } diff --git a/processor/ratelimitprocessor/documentation.md b/processor/ratelimitprocessor/documentation.md index 03e6149cc..eedd4675d 100644 --- a/processor/ratelimitprocessor/documentation.md +++ b/processor/ratelimitprocessor/documentation.md @@ -14,6 +14,44 @@ Number of in-flight requests at any given time | ---- | ----------- | ---------- | | {requests} | Gauge | Int | +### otelcol_ratelimit.dynamic_escalations + +Total number of dynamic rate escalations (dynamic > static) + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {count} | Sum | Int | true | + +#### Attributes + +| Name | Description | Values | +| ---- | ----------- | ------ | +| class | rate limit class used | Any Str | +| source_kind | precedence source used to resolve rate | Any Str | + +### otelcol_ratelimit.dynamic_escalations_skipped + +Total number of times dynamic escalation was skipped (dynamic <= static) + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {count} | Sum | Int | true | + +#### Attributes + +| Name | Description | Values | +| ---- | ----------- | ------ | +| class | rate limit class used | Any Str | +| source_kind | precedence source used to resolve rate | Any Str | + +### otelcol_ratelimit.gubernator_degraded + +Total number of operations in degraded mode due to Gubernator unavailability + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {count} | Sum | Int | true | + ### otelcol_ratelimit.request_duration Time(in seconds) taken to process a rate limit request @@ -44,3 +82,17 @@ Number of rate-limiting requests | ---- | ----------- | ------ | | decision | rate limit decision | Any Str | | reason | rate limit reason | Any Str | + +### otelcol_ratelimit.resolver_failures + +Total number of class resolver failures + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {count} | Sum | Int | true | + +#### Attributes + +| Name | Description | Values | +| ---- | ----------- | ------ | +| unique_key | unique key for the request used in class resolution | Any Str | diff --git a/processor/ratelimitprocessor/factory.go b/processor/ratelimitprocessor/factory.go index e75e9e69b..a0ce7852e 100644 --- a/processor/ratelimitprocessor/factory.go +++ b/processor/ratelimitprocessor/factory.go @@ -55,10 +55,11 @@ func NewFactory() xprocessor.Factory { func getRateLimiter( config *Config, set processor.Settings, + telemetryBuilder *metadata.TelemetryBuilder, ) (*sharedcomponent.Component[rateLimiterComponent], error) { return rateLimiters.LoadOrStore(config, func() (rateLimiterComponent, error) { if config.Type == GubernatorRateLimiter { - return newGubernatorRateLimiter(config, set) + return newGubernatorRateLimiter(config, set.Logger, telemetryBuilder) } return newLocalRateLimiter(config, set) }) @@ -71,15 +72,19 @@ func createLogsProcessor( nextConsumer consumer.Logs, ) (processor.Logs, error) { config := cfg.(*Config) - rateLimiter, err := getRateLimiter(config, set) + tb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, err + } + rateLimiter, err := getRateLimiter(config, set, tb) if err != nil { return nil, err } - var inflight int64 return NewLogsRateLimiterProcessor( rateLimiter, - set.TelemetrySettings, + set.TelemetrySettings.Logger, + tb, config.Strategy, func(ctx context.Context, ld plog.Logs) error { return nextConsumer.ConsumeLogs(ctx, ld) @@ -96,14 +101,19 @@ func createMetricsProcessor( nextConsumer consumer.Metrics, ) (processor.Metrics, error) { config := cfg.(*Config) - rateLimiter, err := getRateLimiter(config, set) + tb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, err + } + rateLimiter, err := getRateLimiter(config, set, tb) if err != nil { return nil, err } var inflight int64 return NewMetricsRateLimiterProcessor( rateLimiter, - set.TelemetrySettings, + set.TelemetrySettings.Logger, + tb, config.Strategy, func(ctx context.Context, md pmetric.Metrics) error { return nextConsumer.ConsumeMetrics(ctx, md) @@ -120,14 +130,19 @@ func createTracesProcessor( nextConsumer consumer.Traces, ) (processor.Traces, error) { config := cfg.(*Config) - rateLimiter, err := getRateLimiter(config, set) + tb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, err + } + rateLimiter, err := getRateLimiter(config, set, tb) if err != nil { return nil, err } var inflight int64 return NewTracesRateLimiterProcessor( rateLimiter, - set.TelemetrySettings, + set.TelemetrySettings.Logger, + tb, config.Strategy, func(ctx context.Context, td ptrace.Traces) error { return nextConsumer.ConsumeTraces(ctx, td) @@ -144,14 +159,19 @@ func createProfilesProcessor( nextConsumer xconsumer.Profiles, ) (xprocessor.Profiles, error) { config := cfg.(*Config) - rateLimiter, err := getRateLimiter(config, set) + tb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, err + } + rateLimiter, err := getRateLimiter(config, set, tb) if err != nil { return nil, err } var inflight int64 return NewProfilesRateLimiterProcessor( rateLimiter, - set.TelemetrySettings, + set.TelemetrySettings.Logger, + tb, config.Strategy, func(ctx context.Context, td pprofile.Profiles) error { return nextConsumer.ConsumeProfiles(ctx, td) diff --git a/processor/ratelimitprocessor/factory_test.go b/processor/ratelimitprocessor/factory_test.go index f1a73b8b5..6029bbe9c 100644 --- a/processor/ratelimitprocessor/factory_test.go +++ b/processor/ratelimitprocessor/factory_test.go @@ -18,7 +18,6 @@ package ratelimitprocessor import ( - "context" "testing" "github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadata" @@ -44,8 +43,7 @@ func TestCreateDefaultConfig(t *testing.T) { } func TestCreateProcessor(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := t.Context() factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) diff --git a/processor/ratelimitprocessor/gubernator.go b/processor/ratelimitprocessor/gubernator.go index 7e081a221..aafe3e3a1 100644 --- a/processor/ratelimitprocessor/gubernator.go +++ b/processor/ratelimitprocessor/gubernator.go @@ -27,6 +27,8 @@ import ( "github.com/uptrace/opentelemetry-go-extra/otellogrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "github.com/sirupsen/logrus" "google.golang.org/grpc" @@ -34,21 +36,39 @@ import ( "github.com/gubernator-io/gubernator/v2" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/processor" "go.uber.org/zap" + + "github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadata" ) var _ RateLimiter = (*gubernatorRateLimiter)(nil) +// ClassResolver resolves the class for a given key. Since the resolution +// takes place in the hot path, it MUST be fast and concurrently safe. +// Implementations may implement caching to speed up resolution. +type ClassResolver interface { + // ResolveClass resolves the class for a given key. + ResolveClass(ctx context.Context, key string) (string, error) +} + +type noopResolver struct{} + +func (noopResolver) ResolveClass(context.Context, string) (string, error) { + return "", nil +} + type gubernatorRateLimiter struct { cfg *Config - set processor.Settings + logger *zap.Logger behavior gubernator.Behavior daemonCfg gubernator.DaemonConfig daemon *gubernator.Daemon client gubernator.V1Client clientConn *grpc.ClientConn + // Class resolver for class-based rate limiting + classResolver ClassResolver + telemetryBuilder *metadata.TelemetryBuilder } func newGubernatorDaemonConfig(logger *zap.Logger) (gubernator.DaemonConfig, error) { @@ -75,21 +95,34 @@ func newGubernatorDaemonConfig(logger *zap.Logger) (gubernator.DaemonConfig, err return conf, nil } -func newGubernatorRateLimiter(cfg *Config, set processor.Settings) (*gubernatorRateLimiter, error) { - daemonCfg, err := newGubernatorDaemonConfig(set.Logger) +func newGubernatorRateLimiter(cfg *Config, logger *zap.Logger, telemetryBuilder *metadata.TelemetryBuilder) (*gubernatorRateLimiter, error) { + daemonCfg, err := newGubernatorDaemonConfig(logger) if err != nil { return nil, fmt.Errorf("failed to create gubernator daemon config: %w", err) } return &gubernatorRateLimiter{ - cfg: cfg, - set: set, - behavior: gubernator.Behavior_BATCHING, - daemonCfg: daemonCfg, + cfg: cfg, + logger: logger, + behavior: gubernator.Behavior_BATCHING, + daemonCfg: daemonCfg, + telemetryBuilder: telemetryBuilder, + classResolver: noopResolver{}, }, nil } -func (r *gubernatorRateLimiter) Start(ctx context.Context, _ component.Host) (err error) { +func (r *gubernatorRateLimiter) Start(ctx context.Context, host component.Host) (err error) { + if res := r.cfg.ClassResolver; res.String() != "" { + cr, ok := host.GetExtensions()[res] + if !ok { + return fmt.Errorf("class resolver %s not found", res) + } + if err := cr.Start(ctx, host); err != nil { + return fmt.Errorf("failed to start class resolver %s: %w", res, err) + } + r.classResolver = cr.(ClassResolver) + } + r.daemon, err = gubernator.SpawnDaemon(ctx, r.daemonCfg) if err != nil { return fmt.Errorf("failed to spawn gubernator daemon: %w", err) @@ -106,7 +139,7 @@ func (r *gubernatorRateLimiter) Start(ctx context.Context, _ component.Host) (er return nil } -func (r *gubernatorRateLimiter) Shutdown(context.Context) error { +func (r *gubernatorRateLimiter) Shutdown(ctx context.Context) error { if r.daemon != nil { r.daemon.Close() r.daemon = nil @@ -116,19 +149,50 @@ func (r *gubernatorRateLimiter) Shutdown(context.Context) error { r.clientConn = nil } r.client = nil + if c, ok := r.classResolver.(component.Component); ok { + if err := c.Shutdown(ctx); err != nil { + return fmt.Errorf("failed to shutdown class resolver: %w", err) + } + } return nil } func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error { uniqueKey := getUniqueKey(ctx, r.cfg.MetadataKeys) - cfg := resolveRateLimitSettings(r.cfg, uniqueKey) - now := time.Now() - + // First resolve the class if classes are set. + class, err := r.classResolver.ResolveClass(ctx, uniqueKey) + if err != nil { + r.telemetryBuilder.RatelimitResolverFailures.Add(ctx, 1, + metric.WithAttributeSet( + attribute.NewSet(attribute.String("unique_key", uniqueKey)), + ), + ) + r.logger.Warn("class resolver failed, falling back", + zap.Error(err), + zap.String("unique_key", uniqueKey), + zap.String("default_class", r.cfg.DefaultClass), + ) + } + // Resolve rate limit precedence: + // override -> class -> default_class -> fallback. + cfg, sourceKind, className := resolveRateLimit(r.cfg, uniqueKey, class) rate, burst := cfg.Rate, cfg.Burst + now := time.Now() + // If dynamic rate limiting is enabled and not disabled for this request, + // calculate the dynamic rate and burst. if r.cfg.DynamicRateLimiting.Enabled && !cfg.disableDynamic { + attrs := metric.WithAttributeSet(attribute.NewSet( + attribute.String("source_kind", string(sourceKind)), + attribute.String("class", className), + )) rate, burst = r.calculateRateAndBurst(ctx, cfg, uniqueKey, hits, now) - if rate < 0 { - return fmt.Errorf("error calculating dynamic rate limit for unique key %s", uniqueKey) + if rate < 0 { // Degraded mode - Gubernator unreachable. Fallback to static rate. + r.telemetryBuilder.RatelimitGubernatorDegraded.Add(ctx, 1, attrs) + rate, burst = cfg.Rate, cfg.Burst + } else if rate > cfg.Rate { // Dynamic escalation occurred + r.telemetryBuilder.RatelimitDynamicEscalations.Add(ctx, 1, attrs) + } else { // Dynamic escalation was skipped (dynamic <= static) + r.telemetryBuilder.RatelimitDynamicEscalationsSkipped.Add(ctx, 1, attrs) } } // Execute rate actual limit check / recording. @@ -138,9 +202,10 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error { func (r *gubernatorRateLimiter) calculateRateAndBurst(ctx context.Context, cfg RateLimitSettings, uniqueKey string, hits int, now time.Time, ) (int, int) { + // limit is computed in requests-per-second units. limit, err := r.getDynamicLimit(ctx, cfg, uniqueKey, hits, now) if err != nil { - r.set.Logger.Error("failed to get dynamic limit from gubernator", + r.logger.Error("failed to get dynamic limit from gubernator", zap.Error(err), zap.String("unique_key", uniqueKey), ) @@ -196,7 +261,7 @@ func (r *gubernatorRateLimiter) executeRateLimit(ctx context.Context, } resp, err := makeRateLimitRequest(now.UnixMilli()) if err != nil { - r.set.Logger.Error("error executing gubernator rate limit request", + r.logger.Error("error executing gubernator rate limit request", zap.Error(err), zap.String("name", cfg.Strategy.String()), zap.String("unique_key", uniqueKey), diff --git a/processor/ratelimitprocessor/gubernator_test.go b/processor/ratelimitprocessor/gubernator_test.go index f45f4138e..25660bfcf 100644 --- a/processor/ratelimitprocessor/gubernator_test.go +++ b/processor/ratelimitprocessor/gubernator_test.go @@ -19,12 +19,15 @@ package ratelimitprocessor import ( "context" + "errors" + "fmt" "slices" "sync" "testing" "time" "github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadata" + "github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadatatest" "github.com/gubernator-io/gubernator/v2" "github.com/gubernator-io/gubernator/v2/cluster" "github.com/stretchr/testify/assert" @@ -32,12 +35,28 @@ import ( "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.uber.org/zap/zaptest" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) +func newTestGubernatorRateLimiterMetrics(t *testing.T, cfg *Config) ( + *gubernatorRateLimiter, *componenttest.Telemetry, +) { + rl := newTestGubernatorRateLimiter(t, cfg, nil) + tt := componenttest.NewTelemetry() + tb, err := metadata.NewTelemetryBuilder(tt.NewTelemetrySettings()) + require.NoError(t, err) + rl.telemetryBuilder = tb + t.Cleanup(func() { + tt.Shutdown(t.Context()) + }) + return rl, tt +} + // newTestGubernatorRateLimiter starts a local cluster with a gubernator // daemon and returns a new gubernatorRateLimiter instance that relies // on this daemon for rate limiting. @@ -49,6 +68,7 @@ func newTestGubernatorRateLimiter(t *testing.T, cfg *Config, c chan<- gubernator // Wait a bit after the test to shut down the daemon. time.Sleep(50 * time.Millisecond) err := rl.Shutdown(context.Background()) + rl.telemetryBuilder.Shutdown() require.NoError(t, err) cluster.Stop() }) @@ -56,28 +76,33 @@ func newTestGubernatorRateLimiter(t *testing.T, cfg *Config, c chan<- gubernator } func newGubernatorRateLimiterFrom(t *testing.T, cfg *Config, daemon *gubernator.Daemon) *gubernatorRateLimiter { - conn, err := grpc.NewClient(daemon.PeerInfo.GRPCAddress, - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - require.NoError(t, err) - - cl := gubernator.NewV1Client(conn) - require.NotNil(t, cl) telSettings := componenttest.NewNopTelemetrySettings() telSettings.Logger = zaptest.NewLogger(t) + tb, err := metadata.NewTelemetryBuilder(telSettings) + require.NoError(t, err) + // Copied from daemon.Client() + conn, err := grpc.NewClient( + fmt.Sprintf("static:///%s", daemon.PeerInfo.GRPCAddress), + grpc.WithResolvers(gubernator.NewStaticBuilder()), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + client := gubernator.NewV1Client(conn) + t.Cleanup(func() { // Close the gRPC connection and daemon on test cleanup. + conn.Close() + daemon.Close() + }) return &gubernatorRateLimiter{ - cfg: cfg, - set: processor.Settings{ - ID: component.NewIDWithName(metadata.Type, "abc123"), - TelemetrySettings: telSettings, - BuildInfo: component.NewDefaultBuildInfo(), - }, - behavior: gubernator.Behavior_BATCHING, - - daemon: daemon, - client: cl, - clientConn: conn, + cfg: cfg, + logger: telSettings.Logger, + telemetryBuilder: tb, + behavior: gubernator.Behavior_BATCHING, + + daemonCfg: daemon.Config(), + daemon: daemon, + clientConn: conn, + client: client, + classResolver: noopResolver{}, } } @@ -557,6 +582,414 @@ func TestGubernatorRateLimiter_OverrideDisablesDynamicLimit(t *testing.T) { }) } +func TestGubernatorRateLimiter_ClassResolver(t *testing.T) { + eventChannel := make(chan gubernator.HitEvent, 20) + rateLimiter := newTestGubernatorRateLimiter(t, &Config{ + Type: GubernatorRateLimiter, + RateLimitSettings: RateLimitSettings{ + Strategy: StrategyRateLimitRequests, + ThrottleBehavior: ThrottleBehaviorError, + ThrottleInterval: time.Second, + Rate: 500, + }, + DynamicRateLimiting: DynamicRateLimiting{ + Enabled: true, + WindowMultiplier: 2.0, + WindowDuration: 150 * time.Millisecond, + }, + MetadataKeys: []string{"x-tenant-id"}, + Classes: map[string]Class{ + "alpha": { + Rate: 2000, + Burst: 0, + }, + "bravo": { + Rate: 1000, + Burst: 0, + }, + "charlie": { + Rate: 500, + Burst: 0, + }, + }, + DefaultClass: "charlie", + }, eventChannel) + + // Set the classResolver manually + rateLimiter.classResolver = &fakeResolver{ + mapping: map[string]string{ + "x-tenant-id:12345": "alpha", + "x-tenant-id:67890": "bravo", + }, + } + t.Run("Known class 'alpha'", func(t *testing.T) { + ctx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "x-tenant-id": {"12345"}, + }), + }) + + actual := int64(500) + require.NoError(t, rateLimiter.RateLimit(ctx, int(actual))) + verify := func(evc <-chan gubernator.HitEvent) { + t.Helper() + event := lastReqLimitEvent(drainEvents(evc), t) + assertRequestRateLimitEvent(t, "x-tenant-id:12345", event, + actual, 2000, 2000-actual, gubernator.Status_UNDER_LIMIT, + ) + } + verify(eventChannel) + }) + t.Run("Known class 'bravo'", func(t *testing.T) { + ctx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "x-tenant-id": {"67890"}, + }), + }) + + actual := int64(300) + require.NoError(t, rateLimiter.RateLimit(ctx, int(actual))) + verify := func(evc <-chan gubernator.HitEvent) { + t.Helper() + event := lastReqLimitEvent(drainEvents(evc), t) + assertRequestRateLimitEvent(t, "x-tenant-id:67890", event, + actual, 1000, 1000-actual, gubernator.Status_UNDER_LIMIT, + ) + } + verify(eventChannel) + }) + t.Run("Unknown class -> default_class 'charlie'", func(t *testing.T) { + ctx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "x-tenant-id": {"unknown"}, + }), + }) + + actual := int64(400) + require.NoError(t, rateLimiter.RateLimit(ctx, int(actual))) + verify := func(evc <-chan gubernator.HitEvent) { + t.Helper() + event := lastReqLimitEvent(drainEvents(evc), t) + assertRequestRateLimitEvent(t, "x-tenant-id:unknown", event, + actual, 500, 500-actual, gubernator.Status_UNDER_LIMIT, + ) + } + verify(eventChannel) + }) +} + +func TestGubernatorRateLimiter_LoadClassResolverExtension(t *testing.T) { + // This test will validate that a class resolver extension can be loaded + // and used by the rate limiter. + // Since we cannot depend on an external extension in unit tests, we will + // just validate that the extension is loaded without error. + const extName = "fake_resolver" + cfg := &Config{ + Type: GubernatorRateLimiter, + RateLimitSettings: RateLimitSettings{ + Strategy: StrategyRateLimitRequests, + ThrottleBehavior: ThrottleBehaviorError, + ThrottleInterval: time.Second, + Rate: 1000, + }, + DynamicRateLimiting: DynamicRateLimiting{ + Enabled: true, + WindowMultiplier: 2.0, + WindowDuration: 150 * time.Millisecond, + }, + ClassResolver: component.MustNewID(extName), + } + + rateLimiter := newTestGubernatorRateLimiter(t, cfg, nil) + rateLimiter.Start(t.Context(), &fakeHost{ + component.MustNewID(extName): &fakeResolver{}, + }) + rateLimiter.Shutdown(t.Context()) + + // Set the resolver and validate it's used + r, ok := rateLimiter.classResolver.(*fakeResolver) + require.True(t, ok, "expected fakeResolver type") + require.True(t, r.calledStart, "expected Start to be called") + require.True(t, r.calledShutdown, "expected Shutdown to be called") +} + +func TestGubernatorRateLimiter_TelemetryCounters(t *testing.T) { + const ( + WindowPeriod = 150 * time.Millisecond + StaticRate = 1000 + ) + baseCfg := &Config{ + Type: GubernatorRateLimiter, + RateLimitSettings: RateLimitSettings{ + Strategy: StrategyRateLimitRequests, + ThrottleBehavior: ThrottleBehaviorError, + ThrottleInterval: time.Second, + Rate: StaticRate, + Burst: 0, + }, + DynamicRateLimiting: DynamicRateLimiting{ + Enabled: true, + WindowMultiplier: 1.5, + WindowDuration: WindowPeriod, + }, + MetadataKeys: []string{"x-tenant-id"}, + Classes: map[string]Class{ + "alpha": {Rate: StaticRate, Burst: 0}, + }, + DefaultClass: "alpha", + } + + newRL := func(t *testing.T) (*gubernatorRateLimiter, *componenttest.Telemetry) { + rl, tt := newTestGubernatorRateLimiterMetrics(t, baseCfg) + rl.classResolver = &fakeResolver{mapping: map[string]string{ + "x-tenant-id:12345": "alpha", + }} + return rl, tt + } + // Helper to seed previous window rate directly in Gubernator dynamic buckets + seedPrevWindow := func(t *testing.T, rl *gubernatorRateLimiter, uniqueKey string, reqsPerSec int) { + t.Helper() + // Align with the start of a window and move near its end to record hits in the current window A + waitUntilNextPeriod(WindowPeriod) + time.Sleep(WindowPeriod - 10*time.Millisecond) + now := time.Now() + drc := newDynamicRateContext(uniqueKey, now, rl.cfg.DynamicRateLimiting) + // Convert reqs/sec into hits during WindowPeriod + hits := int64(float64(reqsPerSec) * WindowPeriod.Seconds()) + _, err := rl.client.GetRateLimits(context.Background(), &gubernator.GetRateLimitsReq{ + Requests: []*gubernator.RateLimitReq{ + rl.newDynamicRequest(drc.currentKey, hits, drc), + }, + }) + require.NoError(t, err) + } + + // Common context containing the tenant id to produce unique key "x-tenant-id:12345" + ctx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "x-tenant-id": {"12345"}, + }), + }) + const uniqueKey = "x-tenant-id:12345" + + t.Run("dynamic_escalation_increments", func(t *testing.T) { + rl, tt := newRL(t) + // Seed previous window with ~900 req/sec so dynamic becomes 1350 (> static 1000) + seedPrevWindow(t, rl, uniqueKey, 900) + // Move to next window and perform one rate-limited call to trigger dynamic calculation + waitUntilNextPeriod(WindowPeriod) + assert.NoError(t, rl.RateLimit(ctx, 1)) + + // Expect one increment with attributes {class="alpha", source_kind="class"} + metadatatest.AssertEqualRatelimitDynamicEscalations(t, tt, []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("class", "alpha"), + attribute.String("source_kind", "class"), + ), + }, + }, metricdatatest.IgnoreTimestamp()) + }) + + t.Run("dynamic_escalation_skipped_increments", func(t *testing.T) { + rl, tt := newRL(t) + // Seed previous window with ~200 req/sec so dynamic becomes max(1000, 200*1.5)=1000 (skipped) + seedPrevWindow(t, rl, uniqueKey, 200) + waitUntilNextPeriod(WindowPeriod) + assert.NoError(t, rl.RateLimit(ctx, 1)) + + metadatatest.AssertEqualRatelimitDynamicEscalationsSkipped(t, tt, []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("class", "alpha"), + attribute.String("source_kind", "class"), + ), + }, + }, metricdatatest.IgnoreTimestamp()) + }) + + t.Run("gubernator_degraded_increments", func(t *testing.T) { + rl, tt := newRL(t) + rl.daemon.Close() // Close the daemon to force errors in dynamic calls + assert.Error(t, rl.RateLimit(ctx, 1)) + + metadatatest.AssertEqualRatelimitGubernatorDegraded(t, tt, []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("class", "alpha"), + attribute.String("source_kind", "class"), + ), + }, + }, metricdatatest.IgnoreTimestamp()) + }) +} + +func TestGubernatorRateLimiter_ResolverFailures(t *testing.T) { + const ( + WindowPeriod = 150 * time.Millisecond + StaticRate = 1000 + ) + t.Run("failure_uses_default_class_and_counts", func(t *testing.T) { + cfg := &Config{ + Type: GubernatorRateLimiter, + RateLimitSettings: RateLimitSettings{ + Strategy: StrategyRateLimitRequests, + ThrottleBehavior: ThrottleBehaviorError, + ThrottleInterval: time.Second, + Rate: StaticRate, + Burst: 0, + }, + DynamicRateLimiting: DynamicRateLimiting{ + Enabled: true, + WindowMultiplier: 1.5, + WindowDuration: WindowPeriod, + }, + MetadataKeys: []string{"x-tenant-id"}, + Classes: map[string]Class{ + "alpha": {Rate: StaticRate, Burst: 0}, + }, + DefaultClass: "alpha", + } + + rl, tt := newTestGubernatorRateLimiterMetrics(t, cfg) + // Simulate resolver error for this key + rl.classResolver = &fakeResolver{errorKeys: map[string]error{ + "x-tenant-id:fail1": errors.New("boom"), + }} + + ctx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "x-tenant-id": {"fail1"}, + }), + }) + + // One call triggers resolver failure and should fall back to default class + assert.NoError(t, rl.RateLimit(ctx, 1)) + + // 1) resolver_failures counter increments + metadatatest.AssertEqualRatelimitResolverFailures(t, tt, []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet(attribute.String( + "unique_key", "x-tenant-id:fail1", + )), + }, + }, metricdatatest.IgnoreTimestamp()) + + // 2) dynamic escalation skipped for baseline (no seeding) with class/default + metadatatest.AssertEqualRatelimitDynamicEscalationsSkipped(t, tt, []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("class", "alpha"), + attribute.String("source_kind", "class"), + ), + }, + }, metricdatatest.IgnoreTimestamp()) + }) + + t.Run("failure_falls_back_to_global_when_no_classes", func(t *testing.T) { + cfg := &Config{ + Type: GubernatorRateLimiter, + RateLimitSettings: RateLimitSettings{ + Strategy: StrategyRateLimitRequests, + ThrottleBehavior: ThrottleBehaviorError, + ThrottleInterval: time.Second, + Rate: StaticRate, + Burst: 0, + }, + DynamicRateLimiting: DynamicRateLimiting{ + Enabled: true, + WindowMultiplier: 1.5, + WindowDuration: WindowPeriod, + }, + MetadataKeys: []string{"x-tenant-id"}, + // No classes and no default class + } + + rl, tt := newTestGubernatorRateLimiterMetrics(t, cfg) + rl.classResolver = &fakeResolver{errorKeys: map[string]error{ + "x-tenant-id:fail2": errors.New("boom"), + }} + + ctx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "x-tenant-id": {"fail2"}, + }), + }) + + assert.NoError(t, rl.RateLimit(ctx, 1)) + + // resolver_failures increments + metadatatest.AssertEqualRatelimitResolverFailures(t, tt, []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet(attribute.String( + "unique_key", "x-tenant-id:fail2", + )), + }, + }, metricdatatest.IgnoreTimestamp()) + + // dynamic skipped with source_kind=fallback and empty class + metadatatest.AssertEqualRatelimitDynamicEscalationsSkipped(t, tt, []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("class", ""), + attribute.String("source_kind", "fallback"), + ), + }, + }, metricdatatest.IgnoreTimestamp()) + }) + + t.Run("multiple_failures_accumulate", func(t *testing.T) { + cfg := &Config{ + Type: GubernatorRateLimiter, + RateLimitSettings: RateLimitSettings{ + Strategy: StrategyRateLimitRequests, + ThrottleBehavior: ThrottleBehaviorError, + ThrottleInterval: time.Second, + Rate: StaticRate, + Burst: 0, + }, + DynamicRateLimiting: DynamicRateLimiting{ + Enabled: true, + WindowMultiplier: 1.5, + WindowDuration: WindowPeriod, + }, + MetadataKeys: []string{"x-tenant-id"}, + Classes: map[string]Class{ + "alpha": {Rate: StaticRate, Burst: 0}, + }, + DefaultClass: "alpha", + } + rl, tt := newTestGubernatorRateLimiterMetrics(t, cfg) + rl.classResolver = &fakeResolver{errorKeys: map[string]error{ + "x-tenant-id:fail3": errors.New("boom"), + }} + ctx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "x-tenant-id": {"fail3"}, + }), + }) + + for range 3 { + assert.NoError(t, rl.RateLimit(ctx, 1)) + } + metadatatest.AssertEqualRatelimitResolverFailures(t, tt, []metricdata.DataPoint[int64]{ + { + Value: 3, + Attributes: attribute.NewSet(attribute.String( + "unique_key", "x-tenant-id:fail3", + )), + }, + }, metricdatatest.IgnoreTimestamp()) + }) +} + // drainEvents drains events from the channel until it's been empty for a short while. func drainEvents(c <-chan gubernator.HitEvent) []gubernator.HitEvent { events := make([]gubernator.HitEvent, 0) @@ -628,7 +1061,7 @@ func TestGubernatorRateLimiter_MultipleRequests_Delay(t *testing.T) { var wg sync.WaitGroup wg.Add(requests) - for i := 0; i < requests; i++ { + for i := range requests { go func(i int) { defer wg.Done() err := rl.RateLimit(context.Background(), 1) @@ -655,3 +1088,46 @@ func TestGubernatorRateLimiter_MultipleRequests_Delay(t *testing.T) { } } } + +type fakeResolver struct { + mapping map[string]string + errorKeys map[string]error + callCount int + calledKeys []string + + calledStart bool + calledShutdown bool +} + +func (f *fakeResolver) ResolveClass(_ context.Context, key string) (string, error) { + f.callCount++ + f.calledKeys = append(f.calledKeys, key) + if err, hasError := f.errorKeys[key]; hasError { + return "", err + } + if class, exists := f.mapping[key]; exists { + return class, nil + } + return "", nil // Unknown class +} + +func (f *fakeResolver) Reset() { + f.callCount = 0 + f.calledKeys = nil +} + +func (f *fakeResolver) Start(context.Context, component.Host) error { + f.calledStart = true + return nil +} + +func (f *fakeResolver) Shutdown(context.Context) error { + f.calledShutdown = true + return nil +} + +type fakeHost map[component.ID]component.Component + +func (f fakeHost) GetExtensions() map[component.ID]component.Component { + return f +} diff --git a/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go b/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go index 926e2b081..40c8cf52a 100644 --- a/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go +++ b/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go @@ -40,13 +40,17 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { // TelemetryBuilder provides an interface for components to report telemetry // as defined in metadata and user config. type TelemetryBuilder struct { - meter metric.Meter - mu sync.Mutex - registrations []metric.Registration - RatelimitConcurrentRequests metric.Int64Gauge - RatelimitRequestDuration metric.Float64Histogram - RatelimitRequestSize metric.Int64Histogram - RatelimitRequests metric.Int64Counter + meter metric.Meter + mu sync.Mutex + registrations []metric.Registration + RatelimitConcurrentRequests metric.Int64Gauge + RatelimitDynamicEscalations metric.Int64Counter + RatelimitDynamicEscalationsSkipped metric.Int64Counter + RatelimitGubernatorDegraded metric.Int64Counter + RatelimitRequestDuration metric.Float64Histogram + RatelimitRequestSize metric.Int64Histogram + RatelimitRequests metric.Int64Counter + RatelimitResolverFailures metric.Int64Counter } // TelemetryBuilderOption applies changes to default builder. @@ -84,6 +88,24 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{requests}"), ) errs = errors.Join(errs, err) + builder.RatelimitDynamicEscalations, err = builder.meter.Int64Counter( + "otelcol_ratelimit.dynamic_escalations", + metric.WithDescription("Total number of dynamic rate escalations (dynamic > static)"), + metric.WithUnit("{count}"), + ) + errs = errors.Join(errs, err) + builder.RatelimitDynamicEscalationsSkipped, err = builder.meter.Int64Counter( + "otelcol_ratelimit.dynamic_escalations_skipped", + metric.WithDescription("Total number of times dynamic escalation was skipped (dynamic <= static)"), + metric.WithUnit("{count}"), + ) + errs = errors.Join(errs, err) + builder.RatelimitGubernatorDegraded, err = builder.meter.Int64Counter( + "otelcol_ratelimit.gubernator_degraded", + metric.WithDescription("Total number of operations in degraded mode due to Gubernator unavailability"), + metric.WithUnit("{count}"), + ) + errs = errors.Join(errs, err) builder.RatelimitRequestDuration, err = builder.meter.Float64Histogram( "otelcol_ratelimit.request_duration", metric.WithDescription("Time(in seconds) taken to process a rate limit request"), @@ -104,5 +126,11 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{requests}"), ) errs = errors.Join(errs, err) + builder.RatelimitResolverFailures, err = builder.meter.Int64Counter( + "otelcol_ratelimit.resolver_failures", + metric.WithDescription("Total number of class resolver failures"), + metric.WithUnit("{count}"), + ) + errs = errors.Join(errs, err) return &builder, errs } diff --git a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go index 6c81917bb..b76898c54 100644 --- a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go +++ b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go @@ -52,6 +52,54 @@ func AssertEqualRatelimitConcurrentRequests(t *testing.T, tt *componenttest.Tele metricdatatest.AssertEqual(t, want, got, opts...) } +func AssertEqualRatelimitDynamicEscalations(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_ratelimit.dynamic_escalations", + Description: "Total number of dynamic rate escalations (dynamic > static)", + Unit: "{count}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_ratelimit.dynamic_escalations") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualRatelimitDynamicEscalationsSkipped(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_ratelimit.dynamic_escalations_skipped", + Description: "Total number of times dynamic escalation was skipped (dynamic <= static)", + Unit: "{count}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_ratelimit.dynamic_escalations_skipped") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualRatelimitGubernatorDegraded(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_ratelimit.gubernator_degraded", + Description: "Total number of operations in degraded mode due to Gubernator unavailability", + Unit: "{count}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_ratelimit.gubernator_degraded") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + func AssertEqualRatelimitRequestDuration(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.HistogramDataPoint[float64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_ratelimit.request_duration", @@ -97,3 +145,19 @@ func AssertEqualRatelimitRequests(t *testing.T, tt *componenttest.Telemetry, dps require.NoError(t, err) metricdatatest.AssertEqual(t, want, got, opts...) } + +func AssertEqualRatelimitResolverFailures(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_ratelimit.resolver_failures", + Description: "Total number of class resolver failures", + Unit: "{count}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_ratelimit.resolver_failures") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} diff --git a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go index 6e9592f44..73a68385a 100644 --- a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go +++ b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go @@ -37,12 +37,25 @@ func TestSetupTelemetry(t *testing.T) { require.NoError(t, err) defer tb.Shutdown() tb.RatelimitConcurrentRequests.Record(context.Background(), 1) + tb.RatelimitDynamicEscalations.Add(context.Background(), 1) + tb.RatelimitDynamicEscalationsSkipped.Add(context.Background(), 1) + tb.RatelimitGubernatorDegraded.Add(context.Background(), 1) tb.RatelimitRequestDuration.Record(context.Background(), 1) tb.RatelimitRequestSize.Record(context.Background(), 1) tb.RatelimitRequests.Add(context.Background(), 1) + tb.RatelimitResolverFailures.Add(context.Background(), 1) AssertEqualRatelimitConcurrentRequests(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) + AssertEqualRatelimitDynamicEscalations(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualRatelimitDynamicEscalationsSkipped(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualRatelimitGubernatorDegraded(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) AssertEqualRatelimitRequestDuration(t, testTel, []metricdata.HistogramDataPoint[float64]{{}}, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp()) @@ -52,6 +65,9 @@ func TestSetupTelemetry(t *testing.T) { AssertEqualRatelimitRequests(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) + AssertEqualRatelimitResolverFailures(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) require.NoError(t, testTel.Shutdown(context.Background())) } diff --git a/processor/ratelimitprocessor/local.go b/processor/ratelimitprocessor/local.go index 13717516c..2e2fe8428 100644 --- a/processor/ratelimitprocessor/local.go +++ b/processor/ratelimitprocessor/local.go @@ -54,7 +54,8 @@ func (r *localRateLimiter) RateLimit(ctx context.Context, hits int) error { // Each (shared) processor gets its own rate limiter, // so it's enough to use client metadata-based unique key. key := getUniqueKey(ctx, r.cfg.MetadataKeys) - cfg := resolveRateLimitSettings(r.cfg, key) + // local rate limiter ignores classes (no resolver), so pass empty class. + cfg, _, _ := resolveRateLimit(r.cfg, key, "") v, _ := r.limiters.LoadOrStore(key, rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Burst)) limiter := v.(*rate.Limiter) diff --git a/processor/ratelimitprocessor/metadata.yaml b/processor/ratelimitprocessor/metadata.yaml index 71c95c9f0..cbb61697e 100644 --- a/processor/ratelimitprocessor/metadata.yaml +++ b/processor/ratelimitprocessor/metadata.yaml @@ -13,6 +13,37 @@ tests: telemetry: metrics: + ratelimit.resolver_failures: + enabled: true + description: Total number of class resolver failures + unit: "{count}" + sum: + value_type: int + monotonic: true + attributes: ["unique_key"] # high cardinality + ratelimit.gubernator_degraded: + enabled: true + description: Total number of operations in degraded mode due to Gubernator unavailability + unit: "{count}" + sum: + value_type: int + monotonic: true + ratelimit.dynamic_escalations: + enabled: true + description: Total number of dynamic rate escalations (dynamic > static) + unit: "{count}" + sum: + value_type: int + monotonic: true + attributes: ["class", "source_kind"] + ratelimit.dynamic_escalations_skipped: + enabled: true + description: Total number of times dynamic escalation was skipped (dynamic <= static) + unit: "{count}" + sum: + value_type: int + monotonic: true + attributes: ["class", "source_kind"] ratelimit.requests: enabled: true description: Number of rate-limiting requests @@ -82,3 +113,12 @@ attributes: reason: description: rate limit reason type: string + class: + description: rate limit class used + type: string + source_kind: + description: precedence source used to resolve rate + type: string + unique_key: + description: unique key for the request used in class resolution + type: string diff --git a/processor/ratelimitprocessor/processor.go b/processor/ratelimitprocessor/processor.go index 6711d9299..2ded70945 100644 --- a/processor/ratelimitprocessor/processor.go +++ b/processor/ratelimitprocessor/processor.go @@ -19,7 +19,6 @@ package ratelimitprocessor // import "github.com/elastic/opentelemetry-collector import ( "context" - "fmt" "sync/atomic" "time" @@ -76,23 +75,19 @@ type ProfilesRateLimiterProcessor struct { func NewLogsRateLimiterProcessor( rateLimiter *sharedcomponent.Component[rateLimiterComponent], - telemetrySettings component.TelemetrySettings, + logger *zap.Logger, + telemetryBuilder *metadata.TelemetryBuilder, strategy Strategy, next func(ctx context.Context, logs plog.Logs) error, inflight *int64, metadataKeys []string, ) (*LogsRateLimiterProcessor, error) { - telemetryBuilder, err := metadata.NewTelemetryBuilder(telemetrySettings) - if err != nil { - return nil, fmt.Errorf("failed to create telemetry builder: %w", err) - } - return &LogsRateLimiterProcessor{ rateLimiterProcessor: rateLimiterProcessor{ Component: rateLimiter, rl: rateLimiter.Unwrap(), telemetryBuilder: telemetryBuilder, - logger: telemetrySettings.Logger, + logger: logger, inflight: inflight, metadataKeys: metadataKeys, strategy: strategy, @@ -104,23 +99,19 @@ func NewLogsRateLimiterProcessor( func NewMetricsRateLimiterProcessor( rateLimiter *sharedcomponent.Component[rateLimiterComponent], - telemetrySettings component.TelemetrySettings, + logger *zap.Logger, + telemetryBuilder *metadata.TelemetryBuilder, strategy Strategy, next func(ctx context.Context, metrics pmetric.Metrics) error, inflight *int64, // used to calculate concurrent requests metadataKeys []string, ) (*MetricsRateLimiterProcessor, error) { - telemetryBuilder, err := metadata.NewTelemetryBuilder(telemetrySettings) - if err != nil { - return nil, fmt.Errorf("failed to create telemetry builder: %w", err) - } - return &MetricsRateLimiterProcessor{ rateLimiterProcessor: rateLimiterProcessor{ Component: rateLimiter, rl: rateLimiter.Unwrap(), telemetryBuilder: telemetryBuilder, - logger: telemetrySettings.Logger, + logger: logger, inflight: inflight, metadataKeys: metadataKeys, strategy: strategy, @@ -132,23 +123,19 @@ func NewMetricsRateLimiterProcessor( func NewTracesRateLimiterProcessor( rateLimiter *sharedcomponent.Component[rateLimiterComponent], - telemetrySettings component.TelemetrySettings, + logger *zap.Logger, + telemetryBuilder *metadata.TelemetryBuilder, strategy Strategy, next func(ctx context.Context, traces ptrace.Traces) error, inflight *int64, metadataKeys []string, ) (*TracesRateLimiterProcessor, error) { - telemetryBuilder, err := metadata.NewTelemetryBuilder(telemetrySettings) - if err != nil { - return nil, fmt.Errorf("failed to create telemetry builder: %w", err) - } - return &TracesRateLimiterProcessor{ rateLimiterProcessor: rateLimiterProcessor{ Component: rateLimiter, rl: rateLimiter.Unwrap(), telemetryBuilder: telemetryBuilder, - logger: telemetrySettings.Logger, + logger: logger, inflight: inflight, metadataKeys: metadataKeys, strategy: strategy, @@ -160,22 +147,19 @@ func NewTracesRateLimiterProcessor( func NewProfilesRateLimiterProcessor( rateLimiter *sharedcomponent.Component[rateLimiterComponent], - telemetrySettings component.TelemetrySettings, + logger *zap.Logger, + telemetryBuilder *metadata.TelemetryBuilder, strategy Strategy, next func(ctx context.Context, profiles pprofile.Profiles) error, inflight *int64, metadataKeys []string, ) (*ProfilesRateLimiterProcessor, error) { - telemetryBuilder, err := metadata.NewTelemetryBuilder(telemetrySettings) - if err != nil { - return nil, fmt.Errorf("failed to create telemetry builder: %w", err) - } - return &ProfilesRateLimiterProcessor{ rateLimiterProcessor: rateLimiterProcessor{ Component: rateLimiter, rl: rateLimiter.Unwrap(), telemetryBuilder: telemetryBuilder, + logger: logger, inflight: inflight, metadataKeys: metadataKeys, strategy: strategy, diff --git a/processor/ratelimitprocessor/processor_test.go b/processor/ratelimitprocessor/processor_test.go index 820a78b24..9d6bbc82c 100644 --- a/processor/ratelimitprocessor/processor_test.go +++ b/processor/ratelimitprocessor/processor_test.go @@ -388,7 +388,7 @@ func TestConcurrentRequestsTelemetry(t *testing.T) { } metrics := pmetric.NewMetrics() - for i := 0; i < numWorkers; i++ { + for range numWorkers { wg.Add(1) go func() { defer wg.Done() @@ -469,7 +469,7 @@ func testRatelimitLogMetadata(t *testing.T, logEntries []observer.LoggedEntry) { logEntry := logEntries[0] assert.Equal(t, zapcore.ErrorLevel, logEntry.Level) - fields := make(map[string]interface{}) + fields := make(map[string]any) for _, field := range logEntry.Context { switch field.Type { case zapcore.StringType: diff --git a/processor/ratelimitprocessor/testdata/config.yaml b/processor/ratelimitprocessor/testdata/config.yaml index 33663f02a..30bb927c8 100644 --- a/processor/ratelimitprocessor/testdata/config.yaml +++ b/processor/ratelimitprocessor/testdata/config.yaml @@ -98,3 +98,40 @@ ratelimit/dynamic_rate_limit: enabled: true window_multiplier: 1.5 window_duration: 1m + +ratelimit/invalid_default_class: + rate: 100 + burst: 200 + type: gubernator + classes: + trial: + rate: 200 + burst: 400 + default_class: nonexistent + +ratelimit/invalid_class_rate_zero: + rate: 100 + burst: 200 + type: gubernator + classes: + trial: + rate: 0 + burst: 400 + +ratelimit/invalid_class_rate_negative: + rate: 100 + burst: 200 + type: gubernator + classes: + trial: + rate: -100 + burst: 400 + +ratelimit/invalid_class_burst_negative: + rate: 100 + burst: 200 + type: gubernator + classes: + trial: + rate: 200 + burst: -1 From 014a7ed478dc64764457b03297ad431a587073dd Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Mon, 29 Sep 2025 09:16:50 +0800 Subject: [PATCH 2/8] make the linter very happy Signed-off-by: Marc Lopez Rubio --- processor/ratelimitprocessor/gubernator_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/processor/ratelimitprocessor/gubernator_test.go b/processor/ratelimitprocessor/gubernator_test.go index 25660bfcf..79b6b0d2d 100644 --- a/processor/ratelimitprocessor/gubernator_test.go +++ b/processor/ratelimitprocessor/gubernator_test.go @@ -52,7 +52,7 @@ func newTestGubernatorRateLimiterMetrics(t *testing.T, cfg *Config) ( require.NoError(t, err) rl.telemetryBuilder = tb t.Cleanup(func() { - tt.Shutdown(t.Context()) + require.NoError(t, tt.Shutdown(t.Context())) }) return rl, tt } @@ -87,6 +87,7 @@ func newGubernatorRateLimiterFrom(t *testing.T, cfg *Config, daemon *gubernator. grpc.WithResolvers(gubernator.NewStaticBuilder()), grpc.WithTransportCredentials(insecure.NewCredentials()), ) + require.NoError(t, err) client := gubernator.NewV1Client(conn) t.Cleanup(func() { // Close the gRPC connection and daemon on test cleanup. conn.Close() @@ -701,10 +702,11 @@ func TestGubernatorRateLimiter_LoadClassResolverExtension(t *testing.T) { } rateLimiter := newTestGubernatorRateLimiter(t, cfg, nil) - rateLimiter.Start(t.Context(), &fakeHost{ + err := rateLimiter.Start(t.Context(), &fakeHost{ component.MustNewID(extName): &fakeResolver{}, }) - rateLimiter.Shutdown(t.Context()) + require.NoError(t, err) + require.NoError(t, rateLimiter.Shutdown(t.Context())) // Set the resolver and validate it's used r, ok := rateLimiter.classResolver.(*fakeResolver) From 5c5246305c08515c1378645f18a3d796cc760715 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Mon, 29 Sep 2025 09:21:18 +0800 Subject: [PATCH 3/8] fix unittest failures Signed-off-by: Marc Lopez Rubio --- processor/ratelimitprocessor/gubernator_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/ratelimitprocessor/gubernator_test.go b/processor/ratelimitprocessor/gubernator_test.go index 79b6b0d2d..e42814c8d 100644 --- a/processor/ratelimitprocessor/gubernator_test.go +++ b/processor/ratelimitprocessor/gubernator_test.go @@ -52,7 +52,7 @@ func newTestGubernatorRateLimiterMetrics(t *testing.T, cfg *Config) ( require.NoError(t, err) rl.telemetryBuilder = tb t.Cleanup(func() { - require.NoError(t, tt.Shutdown(t.Context())) + _ = tt.Shutdown(t.Context()) }) return rl, tt } From b1bdcc097e234fa7227a2038cce9e293b4cdea51 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 2 Oct 2025 17:02:49 +0800 Subject: [PATCH 4/8] Address comments Signed-off-by: Marc Lopez Rubio --- processor/ratelimitprocessor/README.md | 13 ++------ processor/ratelimitprocessor/documentation.md | 26 ++------------- processor/ratelimitprocessor/gubernator.go | 24 +++++++++++--- .../ratelimitprocessor/gubernator_test.go | 13 +++++--- .../internal/metadata/generated_telemetry.go | 32 ++++++------------- .../metadatatest/generated_telemetrytest.go | 32 ------------------- .../generated_telemetrytest_test.go | 8 ----- processor/ratelimitprocessor/metadata.yaml | 22 +++---------- 8 files changed, 47 insertions(+), 123 deletions(-) diff --git a/processor/ratelimitprocessor/README.md b/processor/ratelimitprocessor/README.md index cbb3c8e38..c0d80cdce 100644 --- a/processor/ratelimitprocessor/README.md +++ b/processor/ratelimitprocessor/README.md @@ -265,6 +265,7 @@ The `class_resolver` option tells the processor which OpenTelemetry Collector ex ```yaml processors: ratelimiter: + # This is an example class_resolver. It doesn't exist. class_resolver: configmapclassresolverextension ``` @@ -287,19 +288,11 @@ Telemetry and metrics: * The processor emits attributes on relevant metrics to aid debugging and monitoring: * `rate_source`: one of `static`, `dynamic`, `fallback`, or `degraded` (indicates whether dynamic calculation was used or not) - * `class`: resolved class name when applicable - * `source_kind`: which precedence path was used (`override`, `class`, or `fallback`) + * `result`: one of `gubernator_error`, `success` or `skipped`. * Counters introduced to observe resolver and dynamic behavior include: * `ratelimit.resolver_failures` — total number of resolver failures - - * `ratelimit.gubernator_degraded` — total number of operations in degraded mode due to Gubernator unavailability - - * `ratelimit.dynamic_escalations` — number of times dynamic rate > static rate (attributes: `class`, `source_kind`) - - * `ratelimit.dynamic_escalations_skipped` — number of times dynamic escalation was skipped because dynamic <= static (attributes: `class`, `source_kind`) - -These telemetry signals help operators understand when resolution or dynamic logic is active and diagnose fallback behavior. + * `ratelimit.dynamic_escalations` — number of times dynamic rate was peeked (attributes: `class`, `source_kind`, `success`) diff --git a/processor/ratelimitprocessor/documentation.md b/processor/ratelimitprocessor/documentation.md index eedd4675d..8632bf300 100644 --- a/processor/ratelimitprocessor/documentation.md +++ b/processor/ratelimitprocessor/documentation.md @@ -27,30 +27,8 @@ Total number of dynamic rate escalations (dynamic > static) | Name | Description | Values | | ---- | ----------- | ------ | | class | rate limit class used | Any Str | -| source_kind | precedence source used to resolve rate | Any Str | - -### otelcol_ratelimit.dynamic_escalations_skipped - -Total number of times dynamic escalation was skipped (dynamic <= static) - -| Unit | Metric Type | Value Type | Monotonic | -| ---- | ----------- | ---------- | --------- | -| {count} | Sum | Int | true | - -#### Attributes - -| Name | Description | Values | -| ---- | ----------- | ------ | -| class | rate limit class used | Any Str | -| source_kind | precedence source used to resolve rate | Any Str | - -### otelcol_ratelimit.gubernator_degraded - -Total number of operations in degraded mode due to Gubernator unavailability - -| Unit | Metric Type | Value Type | Monotonic | -| ---- | ----------- | ---------- | --------- | -| {count} | Sum | Int | true | +| source_kind | precedence source used to resolve the rate limit settings | Any Str | +| result | result of the rate limit request | Any Str | ### otelcol_ratelimit.request_duration diff --git a/processor/ratelimitprocessor/gubernator.go b/processor/ratelimitprocessor/gubernator.go index aafe3e3a1..6db761d64 100644 --- a/processor/ratelimitprocessor/gubernator.go +++ b/processor/ratelimitprocessor/gubernator.go @@ -181,19 +181,33 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error { // If dynamic rate limiting is enabled and not disabled for this request, // calculate the dynamic rate and burst. if r.cfg.DynamicRateLimiting.Enabled && !cfg.disableDynamic { - attrs := metric.WithAttributeSet(attribute.NewSet( + attrs := make([]attribute.KeyValue, 0, 3) + attrs = append(attrs, attribute.String("source_kind", string(sourceKind)), attribute.String("class", className), - )) + ) rate, burst = r.calculateRateAndBurst(ctx, cfg, uniqueKey, hits, now) if rate < 0 { // Degraded mode - Gubernator unreachable. Fallback to static rate. - r.telemetryBuilder.RatelimitGubernatorDegraded.Add(ctx, 1, attrs) + r.telemetryBuilder.RatelimitDynamicEscalations.Add(ctx, 1, + metric.WithAttributeSet(attribute.NewSet(append(attrs, + attribute.String("reason", "gubernator_error"), + )...)), + ) rate, burst = cfg.Rate, cfg.Burst } else if rate > cfg.Rate { // Dynamic escalation occurred - r.telemetryBuilder.RatelimitDynamicEscalations.Add(ctx, 1, attrs) + r.telemetryBuilder.RatelimitDynamicEscalations.Add(ctx, 1, + metric.WithAttributeSet(attribute.NewSet(append(attrs, + attribute.String("reason", "success"), + )...)), + ) } else { // Dynamic escalation was skipped (dynamic <= static) - r.telemetryBuilder.RatelimitDynamicEscalationsSkipped.Add(ctx, 1, attrs) + r.telemetryBuilder.RatelimitDynamicEscalations.Add(ctx, 1, + metric.WithAttributeSet(attribute.NewSet(append(attrs, + attribute.String("reason", "skipped"), + )...)), + ) } + } // Execute rate actual limit check / recording. return r.executeRateLimit(ctx, cfg, uniqueKey, hits, rate, burst, now) diff --git a/processor/ratelimitprocessor/gubernator_test.go b/processor/ratelimitprocessor/gubernator_test.go index e42814c8d..01cdfc088 100644 --- a/processor/ratelimitprocessor/gubernator_test.go +++ b/processor/ratelimitprocessor/gubernator_test.go @@ -789,6 +789,7 @@ func TestGubernatorRateLimiter_TelemetryCounters(t *testing.T) { Attributes: attribute.NewSet( attribute.String("class", "alpha"), attribute.String("source_kind", "class"), + attribute.String("reason", "success"), ), }, }, metricdatatest.IgnoreTimestamp()) @@ -801,12 +802,13 @@ func TestGubernatorRateLimiter_TelemetryCounters(t *testing.T) { waitUntilNextPeriod(WindowPeriod) assert.NoError(t, rl.RateLimit(ctx, 1)) - metadatatest.AssertEqualRatelimitDynamicEscalationsSkipped(t, tt, []metricdata.DataPoint[int64]{ + metadatatest.AssertEqualRatelimitDynamicEscalations(t, tt, []metricdata.DataPoint[int64]{ { Value: 1, Attributes: attribute.NewSet( attribute.String("class", "alpha"), attribute.String("source_kind", "class"), + attribute.String("reason", "skipped"), ), }, }, metricdatatest.IgnoreTimestamp()) @@ -817,12 +819,13 @@ func TestGubernatorRateLimiter_TelemetryCounters(t *testing.T) { rl.daemon.Close() // Close the daemon to force errors in dynamic calls assert.Error(t, rl.RateLimit(ctx, 1)) - metadatatest.AssertEqualRatelimitGubernatorDegraded(t, tt, []metricdata.DataPoint[int64]{ + metadatatest.AssertEqualRatelimitDynamicEscalations(t, tt, []metricdata.DataPoint[int64]{ { Value: 1, Attributes: attribute.NewSet( attribute.String("class", "alpha"), attribute.String("source_kind", "class"), + attribute.String("reason", "gubernator_error"), ), }, }, metricdatatest.IgnoreTimestamp()) @@ -882,12 +885,13 @@ func TestGubernatorRateLimiter_ResolverFailures(t *testing.T) { }, metricdatatest.IgnoreTimestamp()) // 2) dynamic escalation skipped for baseline (no seeding) with class/default - metadatatest.AssertEqualRatelimitDynamicEscalationsSkipped(t, tt, []metricdata.DataPoint[int64]{ + metadatatest.AssertEqualRatelimitDynamicEscalations(t, tt, []metricdata.DataPoint[int64]{ { Value: 1, Attributes: attribute.NewSet( attribute.String("class", "alpha"), attribute.String("source_kind", "class"), + attribute.String("reason", "skipped"), ), }, }, metricdatatest.IgnoreTimestamp()) @@ -936,12 +940,13 @@ func TestGubernatorRateLimiter_ResolverFailures(t *testing.T) { }, metricdatatest.IgnoreTimestamp()) // dynamic skipped with source_kind=fallback and empty class - metadatatest.AssertEqualRatelimitDynamicEscalationsSkipped(t, tt, []metricdata.DataPoint[int64]{ + metadatatest.AssertEqualRatelimitDynamicEscalations(t, tt, []metricdata.DataPoint[int64]{ { Value: 1, Attributes: attribute.NewSet( attribute.String("class", ""), attribute.String("source_kind", "fallback"), + attribute.String("reason", "skipped"), ), }, }, metricdatatest.IgnoreTimestamp()) diff --git a/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go b/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go index 40c8cf52a..c434e4f34 100644 --- a/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go +++ b/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go @@ -40,17 +40,15 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { // TelemetryBuilder provides an interface for components to report telemetry // as defined in metadata and user config. type TelemetryBuilder struct { - meter metric.Meter - mu sync.Mutex - registrations []metric.Registration - RatelimitConcurrentRequests metric.Int64Gauge - RatelimitDynamicEscalations metric.Int64Counter - RatelimitDynamicEscalationsSkipped metric.Int64Counter - RatelimitGubernatorDegraded metric.Int64Counter - RatelimitRequestDuration metric.Float64Histogram - RatelimitRequestSize metric.Int64Histogram - RatelimitRequests metric.Int64Counter - RatelimitResolverFailures metric.Int64Counter + meter metric.Meter + mu sync.Mutex + registrations []metric.Registration + RatelimitConcurrentRequests metric.Int64Gauge + RatelimitDynamicEscalations metric.Int64Counter + RatelimitRequestDuration metric.Float64Histogram + RatelimitRequestSize metric.Int64Histogram + RatelimitRequests metric.Int64Counter + RatelimitResolverFailures metric.Int64Counter } // TelemetryBuilderOption applies changes to default builder. @@ -94,18 +92,6 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{count}"), ) errs = errors.Join(errs, err) - builder.RatelimitDynamicEscalationsSkipped, err = builder.meter.Int64Counter( - "otelcol_ratelimit.dynamic_escalations_skipped", - metric.WithDescription("Total number of times dynamic escalation was skipped (dynamic <= static)"), - metric.WithUnit("{count}"), - ) - errs = errors.Join(errs, err) - builder.RatelimitGubernatorDegraded, err = builder.meter.Int64Counter( - "otelcol_ratelimit.gubernator_degraded", - metric.WithDescription("Total number of operations in degraded mode due to Gubernator unavailability"), - metric.WithUnit("{count}"), - ) - errs = errors.Join(errs, err) builder.RatelimitRequestDuration, err = builder.meter.Float64Histogram( "otelcol_ratelimit.request_duration", metric.WithDescription("Time(in seconds) taken to process a rate limit request"), diff --git a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go index b76898c54..bd42c50f8 100644 --- a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go +++ b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go @@ -68,38 +68,6 @@ func AssertEqualRatelimitDynamicEscalations(t *testing.T, tt *componenttest.Tele metricdatatest.AssertEqual(t, want, got, opts...) } -func AssertEqualRatelimitDynamicEscalationsSkipped(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { - want := metricdata.Metrics{ - Name: "otelcol_ratelimit.dynamic_escalations_skipped", - Description: "Total number of times dynamic escalation was skipped (dynamic <= static)", - Unit: "{count}", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: dps, - }, - } - got, err := tt.GetMetric("otelcol_ratelimit.dynamic_escalations_skipped") - require.NoError(t, err) - metricdatatest.AssertEqual(t, want, got, opts...) -} - -func AssertEqualRatelimitGubernatorDegraded(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { - want := metricdata.Metrics{ - Name: "otelcol_ratelimit.gubernator_degraded", - Description: "Total number of operations in degraded mode due to Gubernator unavailability", - Unit: "{count}", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: dps, - }, - } - got, err := tt.GetMetric("otelcol_ratelimit.gubernator_degraded") - require.NoError(t, err) - metricdatatest.AssertEqual(t, want, got, opts...) -} - func AssertEqualRatelimitRequestDuration(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.HistogramDataPoint[float64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_ratelimit.request_duration", diff --git a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go index 73a68385a..13e11cee3 100644 --- a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go +++ b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go @@ -38,8 +38,6 @@ func TestSetupTelemetry(t *testing.T) { defer tb.Shutdown() tb.RatelimitConcurrentRequests.Record(context.Background(), 1) tb.RatelimitDynamicEscalations.Add(context.Background(), 1) - tb.RatelimitDynamicEscalationsSkipped.Add(context.Background(), 1) - tb.RatelimitGubernatorDegraded.Add(context.Background(), 1) tb.RatelimitRequestDuration.Record(context.Background(), 1) tb.RatelimitRequestSize.Record(context.Background(), 1) tb.RatelimitRequests.Add(context.Background(), 1) @@ -50,12 +48,6 @@ func TestSetupTelemetry(t *testing.T) { AssertEqualRatelimitDynamicEscalations(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) - AssertEqualRatelimitDynamicEscalationsSkipped(t, testTel, - []metricdata.DataPoint[int64]{{Value: 1}}, - metricdatatest.IgnoreTimestamp()) - AssertEqualRatelimitGubernatorDegraded(t, testTel, - []metricdata.DataPoint[int64]{{Value: 1}}, - metricdatatest.IgnoreTimestamp()) AssertEqualRatelimitRequestDuration(t, testTel, []metricdata.HistogramDataPoint[float64]{{}}, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp()) diff --git a/processor/ratelimitprocessor/metadata.yaml b/processor/ratelimitprocessor/metadata.yaml index cbb61697e..410df520f 100644 --- a/processor/ratelimitprocessor/metadata.yaml +++ b/processor/ratelimitprocessor/metadata.yaml @@ -21,13 +21,6 @@ telemetry: value_type: int monotonic: true attributes: ["unique_key"] # high cardinality - ratelimit.gubernator_degraded: - enabled: true - description: Total number of operations in degraded mode due to Gubernator unavailability - unit: "{count}" - sum: - value_type: int - monotonic: true ratelimit.dynamic_escalations: enabled: true description: Total number of dynamic rate escalations (dynamic > static) @@ -35,15 +28,7 @@ telemetry: sum: value_type: int monotonic: true - attributes: ["class", "source_kind"] - ratelimit.dynamic_escalations_skipped: - enabled: true - description: Total number of times dynamic escalation was skipped (dynamic <= static) - unit: "{count}" - sum: - value_type: int - monotonic: true - attributes: ["class", "source_kind"] + attributes: ["class", "source_kind", "result"] ratelimit.requests: enabled: true description: Number of rate-limiting requests @@ -117,8 +102,11 @@ attributes: description: rate limit class used type: string source_kind: - description: precedence source used to resolve rate + description: precedence source used to resolve the rate limit settings type: string unique_key: description: unique key for the request used in class resolution type: string + result: + description: result of the rate limit request + type: string From afdc0046bc4fb951e5140cee4e077b39b57de433 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Fri, 3 Oct 2025 10:32:21 +0800 Subject: [PATCH 5/8] Add resolver validation when classes are specified Signed-off-by: Marc Lopez Rubio --- processor/ratelimitprocessor/config.go | 7 ++++++- processor/ratelimitprocessor/config_test.go | 4 ++++ processor/ratelimitprocessor/testdata/config.yaml | 9 +++++++++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/processor/ratelimitprocessor/config.go b/processor/ratelimitprocessor/config.go index 40fd15dbf..c8eea41a0 100644 --- a/processor/ratelimitprocessor/config.go +++ b/processor/ratelimitprocessor/config.go @@ -374,7 +374,7 @@ func (config *Config) Validate() error { } // Validate class-based configuration if config.DefaultClass != "" { - if config.Classes == nil { + if len(config.Classes) == 0 { errs = append(errs, errors.New("default_class specified but no classes defined")) } else if _, exists := config.Classes[config.DefaultClass]; !exists { errs = append(errs, fmt.Errorf("default_class %q does not exist in classes", config.DefaultClass)) @@ -385,6 +385,11 @@ func (config *Config) Validate() error { errs = append(errs, fmt.Errorf("class %q: %w", className, err)) } } + if config.ClassResolver.String() == "" && len(config.Classes) > 0 { + errs = append(errs, errors.New( + "classes defined but class_resolver not specified", + )) + } } for key, override := range config.Overrides { if err := override.Validate(); err != nil { diff --git a/processor/ratelimitprocessor/config_test.go b/processor/ratelimitprocessor/config_test.go index 765641199..e2beb17c9 100644 --- a/processor/ratelimitprocessor/config_test.go +++ b/processor/ratelimitprocessor/config_test.go @@ -249,6 +249,10 @@ func TestLoadConfig(t *testing.T) { name: "invalid_class_rate_negative", expectedErr: `class "trial": rate must be greater than zero`, }, + { + name: "classes_set_but_no_class_resolver", + expectedErr: `classes defined but class_resolver not specified`, + }, } factory := NewFactory() diff --git a/processor/ratelimitprocessor/testdata/config.yaml b/processor/ratelimitprocessor/testdata/config.yaml index 30bb927c8..a7bc948f6 100644 --- a/processor/ratelimitprocessor/testdata/config.yaml +++ b/processor/ratelimitprocessor/testdata/config.yaml @@ -135,3 +135,12 @@ ratelimit/invalid_class_burst_negative: trial: rate: 200 burst: -1 + +ratelimit/classes_set_but_no_class_resolver: + rate: 100 + burst: 200 + type: gubernator + classes: + trial: + rate: 200 + burst: 400 From 13c4c24e9e2dd8e309475acb0d6631751186fb1a Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Fri, 3 Oct 2025 16:51:10 +0800 Subject: [PATCH 6/8] rename static_only to disable_dynamic Signed-off-by: Marc Lopez Rubio --- processor/ratelimitprocessor/README.md | 10 +++++----- processor/ratelimitprocessor/config.go | 14 +++++++------- processor/ratelimitprocessor/config_test.go | 6 +++--- processor/ratelimitprocessor/gubernator_test.go | 8 ++++---- processor/ratelimitprocessor/testdata/config.yaml | 4 ++-- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/processor/ratelimitprocessor/README.md b/processor/ratelimitprocessor/README.md index c0d80cdce..b26b04e70 100644 --- a/processor/ratelimitprocessor/README.md +++ b/processor/ratelimitprocessor/README.md @@ -30,7 +30,7 @@ You can override one or more of the following fields: | `rate` | Bucket refill rate, in tokens per second. | No | | | `burst` | Maximum number of tokens that can be consumed. | No | | | `throttle_interval` | Time interval for throttling. It has effect only when `type` is `gubernator`. | No | | -| `static_only` | Disables dynamic rate limiting for the override. | No | `false` | +| `disable_dynamic` | Disables dynamic rate limiting for the override. | No | `false` | ### Dynamic Rate Limiting @@ -222,17 +222,17 @@ processors: trial: rate: 50 # 50 requests/second for trial users burst: 100 # burst capacity of 100 - static_only: true # no dynamic escalation + disable_dynamic: true # no dynamic escalation paying: rate: 500 # 500 requests/second for paying customers burst: 1000 # burst capacity of 1000 - static_only: false + disable_dynamic: false enterprise: rate: 2000 # 2000 requests/second for enterprise burst: 4000 # burst capacity of 4000 - static_only: false # allow gradual increase. + disable_dynamic: false # allow gradual increase. # Default class when resolver returns unknown class default_class: "trial" @@ -248,7 +248,7 @@ processors: "customer-123": rate: 5000 # special override burst: 10000 - static_only: true + disable_dynamic: true ``` Class Resolution Precedence: diff --git a/processor/ratelimitprocessor/config.go b/processor/ratelimitprocessor/config.go index c8eea41a0..5f5fd1d81 100644 --- a/processor/ratelimitprocessor/config.go +++ b/processor/ratelimitprocessor/config.go @@ -87,9 +87,9 @@ type Class struct { // Burst holds the maximum capacity of rate limit buckets. Burst int `mapstructure:"burst"` - // StaticOnly disables dynamic rate escalation for this class. + // DisableDynamic disables dynamic rate escalation for this class. // When true, effective rate will always be the static Rate. - StaticOnly bool `mapstructure:"static_only"` + DisableDynamic bool `mapstructure:"disable_dynamic"` } // Validate checks the DynamicRateLimiting configuration. @@ -151,10 +151,10 @@ type RateLimitSettings struct { // RateLimitOverrides defines per-unique-key override settings. // It replaces the top-level RateLimitSettings fields when the unique key matches. // Nil pointer fields leave the corresponding top-level field unchanged. -// StaticOnly disables dynamic escalation for that specific key when true. +// DisableDynamic disables dynamic escalation for that specific key when true. type RateLimitOverrides struct { // Rate holds the override rate limit. - StaticOnly bool `mapstructure:"static_only"` + DisableDynamic bool `mapstructure:"disable_dynamic"` // Rate holds bucket refill rate, in tokens per second. Rate *int `mapstructure:"rate"` @@ -276,7 +276,7 @@ func resolveRateLimit(cfg *Config, if override.ThrottleInterval != nil { result.ThrottleInterval = *override.ThrottleInterval } - if override.StaticOnly { + if override.DisableDynamic { result.disableDynamic = true } return result, SourceKindOverride, "" @@ -288,7 +288,7 @@ func resolveRateLimit(cfg *Config, if class.Burst > 0 { result.Burst = class.Burst } - if class.StaticOnly { + if class.DisableDynamic { result.disableDynamic = true } return result, SourceKindClass, className @@ -301,7 +301,7 @@ func resolveRateLimit(cfg *Config, if class.Burst > 0 { result.Burst = class.Burst } - if class.StaticOnly { + if class.DisableDynamic { result.disableDynamic = true } return result, SourceKindClass, cfg.DefaultClass diff --git a/processor/ratelimitprocessor/config_test.go b/processor/ratelimitprocessor/config_test.go index e2beb17c9..c3dcc7bf8 100644 --- a/processor/ratelimitprocessor/config_test.go +++ b/processor/ratelimitprocessor/config_test.go @@ -181,7 +181,7 @@ func TestLoadConfig(t *testing.T) { }, }, { - name: "overrides_static_only", + name: "overrides_disable_dynamic", expected: &Config{ Type: LocalRateLimiter, RateLimitSettings: RateLimitSettings{ @@ -194,7 +194,7 @@ func TestLoadConfig(t *testing.T) { DynamicRateLimiting: defaultDynamicRateLimiting, Overrides: map[string]RateLimitOverrides{ "project-id:e678ebd7-3a15-43dd-a95c-1cf0639a6292": { - StaticOnly: true, + DisableDynamic: true, }, }, }, @@ -296,7 +296,7 @@ func TestResolveEffectiveRateLimit(t *testing.T) { }, Classes: map[string]Class{ "trial": {Rate: 50, Burst: 50}, - "premium": {Rate: 1000, Burst: 2000, StaticOnly: true}, + "premium": {Rate: 1000, Burst: 2000, DisableDynamic: true}, }, DefaultClass: "trial", } diff --git a/processor/ratelimitprocessor/gubernator_test.go b/processor/ratelimitprocessor/gubernator_test.go index 01cdfc088..6747e542f 100644 --- a/processor/ratelimitprocessor/gubernator_test.go +++ b/processor/ratelimitprocessor/gubernator_test.go @@ -455,7 +455,7 @@ func TestGubernatorRateLimiter_OverrideDisablesDynamicLimit(t *testing.T) { } } - t.Run("override_with_static_only_disables_dynamic", func(t *testing.T) { + t.Run("override_with_disable_dynamic_disables_dynamic", func(t *testing.T) { eventChannel := make(chan gubernator.HitEvent, EventBufferSize) // OVERRIDES rate := 500 // Static override rate for the test @@ -477,7 +477,7 @@ func TestGubernatorRateLimiter_OverrideDisablesDynamicLimit(t *testing.T) { MetadataKeys: []string{"x-tenant-id"}, Overrides: map[string]RateLimitOverrides{ "x-tenant-id:static-tenant": { - StaticOnly: true, + DisableDynamic: true, Rate: ptr(rate), // Lower than global rate to make test clearer ThrottleInterval: ptr(throttleInterval), }, @@ -521,7 +521,7 @@ func TestGubernatorRateLimiter_OverrideDisablesDynamicLimit(t *testing.T) { ) }) - t.Run("override_without_static_only_uses_override_rate_as_baseline", func(t *testing.T) { + t.Run("override_without_disable_dynamic_uses_override_rate_as_baseline", func(t *testing.T) { eventChannel := make(chan gubernator.HitEvent, EventBufferSize) rate := 100 // Override rate for the test @@ -543,7 +543,7 @@ func TestGubernatorRateLimiter_OverrideDisablesDynamicLimit(t *testing.T) { MetadataKeys: []string{"x-tenant-id"}, Overrides: map[string]RateLimitOverrides{ "x-tenant-id:dynamic-tenant": { - // StaticOnly is false (default), so dynamic scaling should work + // DisableDynamic is false (default), so dynamic scaling should work Rate: ptr(rate), // Override rate but still allow dynamic scaling ThrottleInterval: ptr(throttleInterval), }, diff --git a/processor/ratelimitprocessor/testdata/config.yaml b/processor/ratelimitprocessor/testdata/config.yaml index a7bc948f6..f63b7dbef 100644 --- a/processor/ratelimitprocessor/testdata/config.yaml +++ b/processor/ratelimitprocessor/testdata/config.yaml @@ -79,14 +79,14 @@ ratelimit/overrides_throttle_interval: rate: 400 throttle_interval: 10s -ratelimit/overrides_static_only: +ratelimit/overrides_disable_dynamic: rate: 100 burst: 200 strategy: bytes throttle_behavior: error overrides: project-id:e678ebd7-3a15-43dd-a95c-1cf0639a6292: - static_only: true + disable_dynamic: true ratelimit/dynamic_rate_limit: rate: 100 From 9061d641f09c9d30a3df035b0857ddcaa9a07c84 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Fri, 3 Oct 2025 16:55:49 +0800 Subject: [PATCH 7/8] Update metric names to use the `.` notation Signed-off-by: Marc Lopez Rubio --- processor/ratelimitprocessor/README.md | 4 ++-- processor/ratelimitprocessor/documentation.md | 4 ++-- processor/ratelimitprocessor/gubernator_test.go | 4 ++-- .../internal/metadata/generated_telemetry.go | 4 ++-- .../internal/metadatatest/generated_telemetrytest.go | 8 ++++---- processor/ratelimitprocessor/metadata.yaml | 4 ++-- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/processor/ratelimitprocessor/README.md b/processor/ratelimitprocessor/README.md index b26b04e70..e92bc39c1 100644 --- a/processor/ratelimitprocessor/README.md +++ b/processor/ratelimitprocessor/README.md @@ -294,5 +294,5 @@ Telemetry and metrics: * Counters introduced to observe resolver and dynamic behavior include: - * `ratelimit.resolver_failures` — total number of resolver failures - * `ratelimit.dynamic_escalations` — number of times dynamic rate was peeked (attributes: `class`, `source_kind`, `success`) + * `ratelimit.resolver.failures` — total number of resolver failures + * `ratelimit.dynamic.escalations` — number of times dynamic rate was peeked (attributes: `class`, `source_kind`, `success`) diff --git a/processor/ratelimitprocessor/documentation.md b/processor/ratelimitprocessor/documentation.md index 8632bf300..bc942f115 100644 --- a/processor/ratelimitprocessor/documentation.md +++ b/processor/ratelimitprocessor/documentation.md @@ -14,7 +14,7 @@ Number of in-flight requests at any given time | ---- | ----------- | ---------- | | {requests} | Gauge | Int | -### otelcol_ratelimit.dynamic_escalations +### otelcol_ratelimit.dynamic.escalations Total number of dynamic rate escalations (dynamic > static) @@ -61,7 +61,7 @@ Number of rate-limiting requests | decision | rate limit decision | Any Str | | reason | rate limit reason | Any Str | -### otelcol_ratelimit.resolver_failures +### otelcol_ratelimit.resolver.failures Total number of class resolver failures diff --git a/processor/ratelimitprocessor/gubernator_test.go b/processor/ratelimitprocessor/gubernator_test.go index 6747e542f..34792e8d1 100644 --- a/processor/ratelimitprocessor/gubernator_test.go +++ b/processor/ratelimitprocessor/gubernator_test.go @@ -874,7 +874,7 @@ func TestGubernatorRateLimiter_ResolverFailures(t *testing.T) { // One call triggers resolver failure and should fall back to default class assert.NoError(t, rl.RateLimit(ctx, 1)) - // 1) resolver_failures counter increments + // 1) resolver.failures counter increments metadatatest.AssertEqualRatelimitResolverFailures(t, tt, []metricdata.DataPoint[int64]{ { Value: 1, @@ -929,7 +929,7 @@ func TestGubernatorRateLimiter_ResolverFailures(t *testing.T) { assert.NoError(t, rl.RateLimit(ctx, 1)) - // resolver_failures increments + // resolver.failures increments metadatatest.AssertEqualRatelimitResolverFailures(t, tt, []metricdata.DataPoint[int64]{ { Value: 1, diff --git a/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go b/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go index c434e4f34..9bedec753 100644 --- a/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go +++ b/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go @@ -87,7 +87,7 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme ) errs = errors.Join(errs, err) builder.RatelimitDynamicEscalations, err = builder.meter.Int64Counter( - "otelcol_ratelimit.dynamic_escalations", + "otelcol_ratelimit.dynamic.escalations", metric.WithDescription("Total number of dynamic rate escalations (dynamic > static)"), metric.WithUnit("{count}"), ) @@ -113,7 +113,7 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme ) errs = errors.Join(errs, err) builder.RatelimitResolverFailures, err = builder.meter.Int64Counter( - "otelcol_ratelimit.resolver_failures", + "otelcol_ratelimit.resolver.failures", metric.WithDescription("Total number of class resolver failures"), metric.WithUnit("{count}"), ) diff --git a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go index bd42c50f8..a4b7ac399 100644 --- a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go +++ b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go @@ -54,7 +54,7 @@ func AssertEqualRatelimitConcurrentRequests(t *testing.T, tt *componenttest.Tele func AssertEqualRatelimitDynamicEscalations(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ - Name: "otelcol_ratelimit.dynamic_escalations", + Name: "otelcol_ratelimit.dynamic.escalations", Description: "Total number of dynamic rate escalations (dynamic > static)", Unit: "{count}", Data: metricdata.Sum[int64]{ @@ -63,7 +63,7 @@ func AssertEqualRatelimitDynamicEscalations(t *testing.T, tt *componenttest.Tele DataPoints: dps, }, } - got, err := tt.GetMetric("otelcol_ratelimit.dynamic_escalations") + got, err := tt.GetMetric("otelcol_ratelimit.dynamic.escalations") require.NoError(t, err) metricdatatest.AssertEqual(t, want, got, opts...) } @@ -116,7 +116,7 @@ func AssertEqualRatelimitRequests(t *testing.T, tt *componenttest.Telemetry, dps func AssertEqualRatelimitResolverFailures(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ - Name: "otelcol_ratelimit.resolver_failures", + Name: "otelcol_ratelimit.resolver.failures", Description: "Total number of class resolver failures", Unit: "{count}", Data: metricdata.Sum[int64]{ @@ -125,7 +125,7 @@ func AssertEqualRatelimitResolverFailures(t *testing.T, tt *componenttest.Teleme DataPoints: dps, }, } - got, err := tt.GetMetric("otelcol_ratelimit.resolver_failures") + got, err := tt.GetMetric("otelcol_ratelimit.resolver.failures") require.NoError(t, err) metricdatatest.AssertEqual(t, want, got, opts...) } diff --git a/processor/ratelimitprocessor/metadata.yaml b/processor/ratelimitprocessor/metadata.yaml index 410df520f..99591d83e 100644 --- a/processor/ratelimitprocessor/metadata.yaml +++ b/processor/ratelimitprocessor/metadata.yaml @@ -13,7 +13,7 @@ tests: telemetry: metrics: - ratelimit.resolver_failures: + ratelimit.resolver.failures: enabled: true description: Total number of class resolver failures unit: "{count}" @@ -21,7 +21,7 @@ telemetry: value_type: int monotonic: true attributes: ["unique_key"] # high cardinality - ratelimit.dynamic_escalations: + ratelimit.dynamic.escalations: enabled: true description: Total number of dynamic rate escalations (dynamic > static) unit: "{count}" From 85a3f94c61c65c2454a9cf17792524b7ccd91a98 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Fri, 3 Oct 2025 17:14:14 +0800 Subject: [PATCH 8/8] Fix class_resolver reference Signed-off-by: Marc Lopez Rubio --- processor/ratelimitprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/ratelimitprocessor/README.md b/processor/ratelimitprocessor/README.md index e92bc39c1..735e2e84f 100644 --- a/processor/ratelimitprocessor/README.md +++ b/processor/ratelimitprocessor/README.md @@ -216,7 +216,7 @@ processors: strategy: requests # define the class resolver ID. - class_resolver: configmapclassresolverextension + class_resolver: configmapclassresolver # Define named classes classes: trial: