Skip to content

Add flag for limiting OTLP request size #6333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2562,6 +2562,10 @@ ha_tracker:
# CLI flag: -distributor.max-recv-msg-size
[max_recv_msg_size: <int> | default = 104857600]

# Maximum OTLP request size in bytes that the Distributor can accept.
# CLI flag: -distributor.otlp-max-recv-msg-size
[otlp_max_recv_msg_size: <int> | default = 104857600]

# Timeout for downstream ingesters.
# CLI flag: -distributor.remote-timeout
[remote_timeout: <duration> | default = 2s]
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")
Expand Down
8 changes: 5 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ type Config struct {

HATrackerConfig HATrackerConfig `yaml:"ha_tracker"`

MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
RemoteTimeout time.Duration `yaml:"remote_timeout"`
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
OTLPMaxRecvMsgSize int `yaml:"otlp_max_recv_msg_size"`
RemoteTimeout time.Duration `yaml:"remote_timeout"`
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`

ShardingStrategy string `yaml:"sharding_strategy"`
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
Expand Down Expand Up @@ -186,6 +187,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(f)

f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "remote_write API max receive message size (bytes).")
f.IntVar(&cfg.OTLPMaxRecvMsgSize, "distributor.otlp-max-recv-msg-size", 100<<20, "Maximum OTLP request size in bytes that the Distributor can accept.")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"flag"
Expand Down Expand Up @@ -143,6 +144,7 @@ type CompressionType int
const (
NoCompression CompressionType = iota
RawSnappy
Gzip
)

// ParseProtoReader parses a compressed proto from an io.Reader.
Expand Down Expand Up @@ -215,6 +217,13 @@ func decompressFromReader(reader io.Reader, expectedSize, maxSize int, compressi
return nil, err
}
body, err = decompressFromBuffer(&buf, maxSize, RawSnappy, sp)
case Gzip:
reader, err = gzip.NewReader(reader)
if err != nil {
return nil, err
}
_, err = buf.ReadFrom(reader)
body = buf.Bytes()
}
return body, err
}
Expand Down
86 changes: 83 additions & 3 deletions pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package push

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"net/http"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
"github.com/prometheus/prometheus/util/annotations"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/distributor"
Expand All @@ -24,8 +28,13 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
pbContentType = "application/x-protobuf"
jsonContentType = "application/json"
)

// OTLPHandler is a http.Handler which accepts OTLP metrics.
func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := util_log.WithContext(ctx, util_log.Logger)
Expand All @@ -42,7 +51,7 @@ func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, so
return
}

req, err := remote.DecodeOTLPWriteRequest(r)
req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -90,6 +99,64 @@ func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, so
})
}

func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (pmetricotlp.ExportRequest, error) {
expectedSize := int(r.ContentLength)
if expectedSize > maxSize {
return pmetricotlp.NewExportRequest(), fmt.Errorf("received message larger than max (%d vs %d)", expectedSize, maxSize)
}

contentType := r.Header.Get("Content-Type")
contentEncoding := r.Header.Get("Content-Encoding")

var compressionType util.CompressionType
switch contentEncoding {
case "gzip":
compressionType = util.Gzip
case "":
compressionType = util.NoCompression
default:
return pmetricotlp.NewExportRequest(), fmt.Errorf("unsupported compression: %s, Supported compression types are \"gzip\" or '' (no compression)", contentEncoding)
}

var decoderFunc func(reader io.Reader) (pmetricotlp.ExportRequest, error)
switch contentType {
case pbContentType:
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
otlpReqProto := otlpProtoMessage{req: &req}
return req, util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType)
}
case jsonContentType:
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()

reader = io.LimitReader(reader, int64(maxSize)+1)
if compressionType == util.Gzip {
var err error
reader, err = gzip.NewReader(reader)
if err != nil {
return req, err
}
}

var buf bytes.Buffer
if expectedSize > 0 {
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
}
_, err := buf.ReadFrom(reader)
if err != nil {
return req, err
}

return req, req.UnmarshalJSON(buf.Bytes())
}
default:
return pmetricotlp.NewExportRequest(), fmt.Errorf("unsupported content type: %s, supported: [%s, %s]", contentType, jsonContentType, pbContentType)
}

return decoderFunc(r.Body)
}

func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, error) {
promConverter := prometheusremotewrite.NewPrometheusConverter()
settings := prometheusremotewrite.Settings{
Expand Down Expand Up @@ -223,3 +290,16 @@ func joinAttributeMaps(from, to pcommon.Map) {
return true
})
}

// otlpProtoMessage Implements proto.Meesage, proto.Unmarshaler
type otlpProtoMessage struct {
req *pmetricotlp.ExportRequest
}

func (otlpProtoMessage) ProtoMessage() {}

func (otlpProtoMessage) Reset() {}

func (otlpProtoMessage) String() string { return "" }

func (o otlpProtoMessage) Unmarshal(data []byte) error { return o.req.UnmarshalProto(data) }
Loading
Loading