Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 106 additions & 1 deletion processor/ratelimitprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ a in-memory rate limiter, or makes use of a [gubernator](https://github.com/gube
| `type` | Type of rate limiter. Options are `local` or `gubernator`. | No | `local` |
| `overrides` | Allows customizing rate limiting parameters for specific metadata key-value pairs. Use this to apply different rate limits to different tenants, projects, or other entities identified by metadata. Each override is keyed by a metadata value and can specify custom `rate`, `burst`, and `throttle_interval` settings that take precedence over the global configuration for matching requests. | No | |
| `dynamic_limits` | Holds the dynamic rate limiting configuration. This is only applicable when the rate limiter type is `gubernator`. | No | |
| `classes` | Named rate limit class definitions for class-based dynamic rate limiting. Only applicable when the rate limiter type is `gubernator`. | No | |
| `default_class` | Default class name to use when resolver returns unknown/empty class. Must exist in classes when set. Only applicable when the rate limiter type is `gubernator`. | No | |

### Overrides

Expand All @@ -28,7 +30,7 @@ You can override one or more of the following fields:
| `rate` | Bucket refill rate, in tokens per second. | No | |
| `burst` | Maximum number of tokens that can be consumed. | No | |
| `throttle_interval` | Time interval for throttling. It has effect only when `type` is `gubernator`. | No | |
| `static_only` | Disables dynamic rate limiting for the override. | No | `false` |
| `disable_dynamic` | Disables dynamic rate limiting for the override. | No | `false` |

### Dynamic Rate Limiting

Expand Down Expand Up @@ -191,3 +193,106 @@ processors:
window_multiplier: 1.5
window_duration: 1m
```

### Class-Based Dynamic Rate Limiting

When using the Gubernator rate limiter type, you can define named rate limit classes with different base rates and escalation policies. A class resolver extension maps unique keys to class names, enabling different rate limits per customer tier, API version, or other business logic.

Example with class-based configuration:

```yaml
extension:
configmapclassresolverextension:
name: resolutions
namespace: default

processors:
ratelimiter:
metadata_keys:
- x-customer-id
rate: 100 # fallback rate
burst: 200 # fallback burst
type: gubernator
strategy: requests

# define the class resolver ID.
class_resolver: configmapclassresolver
# Define named classes
classes:
trial:
rate: 50 # 50 requests/second for trial users
burst: 100 # burst capacity of 100
disable_dynamic: true # no dynamic escalation

paying:
rate: 500 # 500 requests/second for paying customers
burst: 1000 # burst capacity of 1000
disable_dynamic: false

enterprise:
rate: 2000 # 2000 requests/second for enterprise
burst: 4000 # burst capacity of 4000
disable_dynamic: false # allow gradual increase.

# Default class when resolver returns unknown class
default_class: "trial"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wouldnt it be better to move this inside classes key to act as a catch all?

classes:
   default: ? 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a bit awkard since classes is a map[string]Class. That means that you always require a named class "default" to be declared, is that what you meant?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I meant default to be always present, not a big deal, feels a bit awkward to have a separate flag to choose what is the default class.


# Enable dynamic rate limiting
dynamic_limits:
enabled: true
window_multiplier: 1.3
window_duration: 60s

# Per-key overrides (highest precedence)
overrides:
"customer-123":
rate: 5000 # special override
burst: 10000
disable_dynamic: true
```

Class Resolution Precedence:

1. **Per-key override** (highest) - From `overrides` section
2. **Resolved class** - From class resolver extension
3. **Default class** - From `default_class` setting
4. **Top-level fallback** (lowest) - From top-level `rate`/`burst`

### Class resolver

The `class_resolver` option tells the processor which OpenTelemetry Collector extension should be used to map a unique key (derived from the configured `metadata_keys`) to a class name. The value is the extension ID (the extension's configured name), for example:

```yaml
processors:
ratelimiter:
# This is an example class_resolver. It doesn't exist.
class_resolver: configmapclassresolverextension
```

Behavior and notes:

* The resolver is optional. If no `class_resolver` is configured the processor skips class resolution and falls back to the top-level `rate`/`burst` values.

* The processor initializes the resolver during Start. If the extension is not found the processor logs a warning and proceeds without resolution.

* When the resolver returns an unknown or empty class name, the processor treats it as "unknown" and uses the configured `default_class` (if set) or falls back to the top-level rate/burst.

Caching and performance:

* Implementations of resolver extensions should be mindful of latency; the processor assumes the resolver is reasonably fast. The processor will fall back when resolver errors occur or if the extension is absent.

* Ideally, implementations of the `class_resolver` implement their own caching to guarantee performance if they require on an external source.

Telemetry and metrics:

* The processor emits attributes on relevant metrics to aid debugging and monitoring:

* `rate_source`: one of `static`, `dynamic`, `fallback`, or `degraded` (indicates whether dynamic calculation was used or not)
* `class`: resolved class name when applicable
* `source_kind`: which precedence path was used (`override`, `class`, or `fallback`)
* `result`: one of `gubernator_error`, `success` or `skipped`.

* Counters introduced to observe resolver and dynamic behavior include:

* `ratelimit.resolver.failures` — total number of resolver failures
* `ratelimit.dynamic.escalations` — number of times dynamic rate was peeked (attributes: `class`, `source_kind`, `success`)
141 changes: 131 additions & 10 deletions processor/ratelimitprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,24 @@ type Config struct {
//
// Defaults to empty
Overrides map[string]RateLimitOverrides `mapstructure:"overrides"`

// Classes holds named rate limit class definitions for class-based dynamic rate limiting.
// Only applicable when the rate limiter type is "gubernator".
//
// Defaults to empty
Classes map[string]Class `mapstructure:"classes"`

// DefaultClass specifies the class name to use when no override exists and
// the class resolver returns unknown/empty class. Must exist in Classes when set.
// Only applicable when the rate limiter type is "gubernator".
//
// Defaults to empty (no default class)
DefaultClass string `mapstructure:"default_class"`

// ClassResolverClass is the component ID of the class resolver extension to use.
// If not set, class resolution is disabled.
// Only applicable when the rate limiter type is "gubernator".
ClassResolver component.ID `mapstructure:"class_resolver"`
}

// DynamicRateLimiting defines settings for dynamic rate limiting.
Expand All @@ -61,6 +79,19 @@ type DynamicRateLimiting struct {
WindowDuration time.Duration `mapstructure:"window_duration"`
}

// Class defines a named rate limit class for class-based dynamic rate limiting.
type Class struct {
// Rate holds bucket refill rate, in tokens per second.
Rate int `mapstructure:"rate"`

// Burst holds the maximum capacity of rate limit buckets.
Burst int `mapstructure:"burst"`

// DisableDynamic disables dynamic rate escalation for this class.
// When true, effective rate will always be the static Rate.
DisableDynamic bool `mapstructure:"disable_dynamic"`
}

// Validate checks the DynamicRateLimiting configuration.
func (d *DynamicRateLimiting) Validate() error {
if !d.Enabled {
Expand All @@ -76,6 +107,21 @@ func (d *DynamicRateLimiting) Validate() error {
return errors.Join(errs...)
}

// Validate checks the Class configuration.
func (c *Class) Validate() error {
var errs []error
if c.Rate <= 0 {
errs = append(errs, errors.New("rate must be greater than zero"))
}
if c.Burst < 0 {
errs = append(errs, errors.New("burst must be non-negative"))
}
if c.Burst > 0 && c.Burst < c.Rate {
errs = append(errs, errors.New("burst must be greater than or equal to rate when specified"))
}
return errors.Join(errs...)
}

// RateLimitSettings holds the core rate limiting configuration.
type RateLimitSettings struct {
// Strategy holds the rate limiting strategy.
Expand All @@ -102,9 +148,13 @@ type RateLimitSettings struct {
disableDynamic bool `mapstructure:"-"`
}

// RateLimitOverrides defines per-unique-key override settings.
// It replaces the top-level RateLimitSettings fields when the unique key matches.
// Nil pointer fields leave the corresponding top-level field unchanged.
// DisableDynamic disables dynamic escalation for that specific key when true.
type RateLimitOverrides struct {
// Rate holds the override rate limit.
StaticOnly bool `mapstructure:"static_only"`
DisableDynamic bool `mapstructure:"disable_dynamic"`

// Rate holds bucket refill rate, in tokens per second.
Rate *int `mapstructure:"rate"`
Expand Down Expand Up @@ -197,16 +247,26 @@ func createDefaultConfig() component.Config {
WindowMultiplier: 1.3,
WindowDuration: 2 * time.Minute,
},
Classes: nil,
DefaultClass: "",
}
}

// resolveRateLimitSettings returns the rate limit settings for the given unique key.
// If no override is found, the default rate limit settings are returned.
func resolveRateLimitSettings(cfg *Config, uniqueKey string) RateLimitSettings {
// We start from the default settings
result := cfg.RateLimitSettings
if override, ok := cfg.Overrides[uniqueKey]; ok {
// If an override is found, we apply it
// resolveRateLimit computes the effective RateLimitSettings for a given unique key.
// It unifies the legacy per-key override resolution and the class-based precedence logic.
// Precedence order:
// 1. Explicit per-key override (SourceKindOverride)
// 2. Resolved class (SourceKindClass)
// 3. DefaultClass (SourceKindClass)
// 4. Top-level fallback config (SourceKindFallback)
//
// When sourceKind is override or fallback, className will be empty.
func resolveRateLimit(cfg *Config,
uniqueKey, className string,
) (result RateLimitSettings, kind SourceKind, name string) {
result = cfg.RateLimitSettings
// 1. Per-key override takes absolute precedence regardless of classes.
if override, hasOverride := cfg.Overrides[uniqueKey]; hasOverride {
if override.Rate != nil {
result.Rate = *override.Rate
}
Expand All @@ -216,13 +276,54 @@ func resolveRateLimitSettings(cfg *Config, uniqueKey string) RateLimitSettings {
if override.ThrottleInterval != nil {
result.ThrottleInterval = *override.ThrottleInterval
}
if override.StaticOnly {
if override.DisableDynamic {
result.disableDynamic = true
}
return result, SourceKindOverride, ""
}
// 2. Resolved class (only if provided and exists)
if className != "" {
if class, exists := cfg.Classes[className]; exists {
result.Rate = class.Rate
if class.Burst > 0 {
result.Burst = class.Burst
}
if class.DisableDynamic {
result.disableDynamic = true
}
return result, SourceKindClass, className
}
}
return result
// 3. DefaultClass (if configured & exists)
if cfg.DefaultClass != "" {
if class, exists := cfg.Classes[cfg.DefaultClass]; exists {
result.Rate = class.Rate
if class.Burst > 0 {
result.Burst = class.Burst
}
if class.DisableDynamic {
result.disableDynamic = true
}
return result, SourceKindClass, cfg.DefaultClass
}
}
// 4. Fallback to top-level settings.
return cfg.RateLimitSettings, SourceKindFallback, ""
}

// SourceKind indicates the source of rate limit settings for telemetry.
type SourceKind string

const (
// SourceKindOverride indicates the settings originated from an explicit per-key override.
SourceKindOverride SourceKind = "override"
// SourceKindClass indicates the settings originated from a class (either resolved or default_class).
SourceKindClass SourceKind = "class"
// SourceKindFallback indicates the settings originated from the top-level fallback configuration.
SourceKindFallback SourceKind = "fallback"
)

// Validate performs semantic validation of RateLimitSettings.
func (r *RateLimitSettings) Validate() error {
var errs []error
if r.Rate <= 0 {
Expand All @@ -243,6 +344,7 @@ func (r *RateLimitSettings) Validate() error {
return errors.Join(errs...)
}

// Validate performs semantic validation of a RateLimitOverrides instance.
func (r *RateLimitOverrides) Validate() error {
var errs []error
if r.Rate != nil {
Expand Down Expand Up @@ -270,6 +372,24 @@ func (config *Config) Validate() error {
if err := config.DynamicRateLimiting.Validate(); err != nil {
errs = append(errs, err)
}
// Validate class-based configuration
if config.DefaultClass != "" {
if len(config.Classes) == 0 {
errs = append(errs, errors.New("default_class specified but no classes defined"))
} else if _, exists := config.Classes[config.DefaultClass]; !exists {
errs = append(errs, fmt.Errorf("default_class %q does not exist in classes", config.DefaultClass))
}
}
for className, class := range config.Classes {
if err := class.Validate(); err != nil {
errs = append(errs, fmt.Errorf("class %q: %w", className, err))
}
}
if config.ClassResolver.String() == "" && len(config.Classes) > 0 {
errs = append(errs, errors.New(
"classes defined but class_resolver not specified",
))
}
}
for key, override := range config.Overrides {
if err := override.Validate(); err != nil {
Expand Down Expand Up @@ -310,6 +430,7 @@ func (s ThrottleBehavior) Validate() error {
)
}

// Validate ensures the RateLimiterType is one of the supported values.
func (t RateLimiterType) Validate() error {
switch t {
case LocalRateLimiter, GubernatorRateLimiter:
Expand Down
Loading