Skip to content

Commit 633baf5

Browse files
committed
Change otlp attribute conversion to consist with prometheus
Signed-off-by: SungJin1212 <[email protected]>
1 parent c6347f0 commit 633baf5

File tree

6 files changed

+230
-24
lines changed

6 files changed

+230
-24
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
1010
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1111
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
12+
* [ENHANCEMENT] OTLP: Change otlp handler to consist with the Prometheus otlp handler. #6272
1213
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
1314
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
1415
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188

docs/configuration/config-file-reference.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2608,6 +2608,15 @@ instance_limits:
26082608
# unlimited.
26092609
# CLI flag: -distributor.instance-limits.max-inflight-push-requests
26102610
[max_inflight_push_requests: <int> | default = 0]
2611+
2612+
otlp:
2613+
# If enabled, all resource attributes are converted to labels.
2614+
# CLI flag: -distributor.otlp-config.convert-all-attributes
2615+
[convert_all_attributes: <boolean> | default = false]
2616+
2617+
# A list of resource attributes that should be converted to labels.
2618+
# CLI flag: -distributor.otlp-config.promote-resource-attributes
2619+
[promote_resource_attributes: <list of string> | default = []]
26112620
```
26122621

26132622
### `etcd_config`

pkg/api/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
277277
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
278278

279279
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
280-
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
280+
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
281281

282282
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
283283
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")

pkg/distributor/distributor.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/cortexproject/cortex/pkg/tenant"
3737
"github.com/cortexproject/cortex/pkg/util"
3838
"github.com/cortexproject/cortex/pkg/util/extract"
39+
"github.com/cortexproject/cortex/pkg/util/flagext"
3940
"github.com/cortexproject/cortex/pkg/util/limiter"
4041
util_log "github.com/cortexproject/cortex/pkg/util/log"
4142
util_math "github.com/cortexproject/cortex/pkg/util/math"
@@ -164,13 +165,21 @@ type Config struct {
164165

165166
// Limits for distributor
166167
InstanceLimits InstanceLimits `yaml:"instance_limits"`
168+
169+
// OTLPConfig
170+
OTLPConfig OTLPConfig `yaml:"otlp"`
167171
}
168172

169173
type InstanceLimits struct {
170174
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
171175
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
172176
}
173177

178+
type OTLPConfig struct {
179+
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
180+
PromoteResourceAttributes []string `yaml:"promote_resource_attributes"`
181+
}
182+
174183
// RegisterFlags adds the flags required to config this to the given FlagSet
175184
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
176185
cfg.PoolConfig.RegisterFlags(f)
@@ -188,6 +197,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
188197

189198
f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
190199
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")
200+
201+
f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp-config.convert-all-attributes", false, "If enabled, all resource attributes are converted to labels.")
202+
f.Var((*flagext.StringSlice)(&cfg.OTLPConfig.PromoteResourceAttributes), "distributor.otlp-config.promote-resource-attributes", "A list of resource attributes that should be converted to labels.")
191203
}
192204

193205
// Validate config and returns error on failure

pkg/util/push/otlp.go

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,37 @@
11
package push
22

33
import (
4+
"context"
45
"net/http"
56

7+
"github.com/go-kit/log"
68
"github.com/go-kit/log/level"
79
"github.com/prometheus/prometheus/model/labels"
810
"github.com/prometheus/prometheus/prompb"
911
"github.com/prometheus/prometheus/storage/remote"
1012
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
13+
"github.com/prometheus/prometheus/util/annotations"
1114
"github.com/weaveworks/common/httpgrpc"
1215
"github.com/weaveworks/common/middleware"
1316
"go.opentelemetry.io/collector/pdata/pcommon"
1417
"go.opentelemetry.io/collector/pdata/pmetric"
1518

1619
"github.com/cortexproject/cortex/pkg/cortexpb"
20+
"github.com/cortexproject/cortex/pkg/distributor"
1721
"github.com/cortexproject/cortex/pkg/util"
18-
"github.com/cortexproject/cortex/pkg/util/log"
22+
util_log "github.com/cortexproject/cortex/pkg/util/log"
1923
)
2024

2125
// OTLPHandler is a http.Handler which accepts OTLP metrics.
22-
func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
26+
func OTLPHandler(cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
2327
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2428
ctx := r.Context()
25-
logger := log.WithContext(ctx, log.Logger)
29+
logger := util_log.WithContext(ctx, util_log.Logger)
2630
if sourceIPs != nil {
2731
source := sourceIPs.Get(r)
2832
if source != "" {
2933
ctx = util.AddSourceIPsToOutgoingContext(ctx, source)
30-
logger = log.WithSourceIPs(source, logger)
34+
logger = util_log.WithSourceIPs(source, logger)
3135
}
3236
}
3337
req, err := remote.DecodeOTLPWriteRequest(r)
@@ -37,31 +41,22 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle
3741
return
3842
}
3943

40-
promConverter := prometheusremotewrite.NewPrometheusConverter()
41-
setting := prometheusremotewrite.Settings{
42-
AddMetricSuffixes: true,
43-
DisableTargetInfo: true,
44-
}
45-
annots, err := promConverter.FromMetrics(ctx, convertToMetricsAttributes(req.Metrics()), setting)
46-
ws, _ := annots.AsStrings("", 0, 0)
47-
if len(ws) > 0 {
48-
level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
44+
prwReq := cortexpb.WriteRequest{
45+
Source: cortexpb.API,
46+
Metadata: nil,
47+
SkipLabelNameValidation: false,
4948
}
5049

50+
// otlp to prompb TimeSeries
51+
promTsList, err := convertToPromTS(r.Context(), req.Metrics(), cfg, logger)
5152
if err != nil {
52-
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
5353
http.Error(w, err.Error(), http.StatusBadRequest)
5454
return
5555
}
5656

57-
prwReq := cortexpb.WriteRequest{
58-
Source: cortexpb.API,
59-
Metadata: nil,
60-
SkipLabelNameValidation: false,
61-
}
62-
57+
// convert prompb to cortexpb TimeSeries
6358
tsList := []cortexpb.PreallocTimeseries(nil)
64-
for _, v := range promConverter.TimeSeries() {
59+
for _, v := range promTsList {
6560
tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{
6661
Labels: makeLabels(v.Labels),
6762
Samples: makeSamples(v.Samples),
@@ -87,6 +82,32 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle
8782
})
8883
}
8984

85+
func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, logger log.Logger) ([]prompb.TimeSeries, error) {
86+
promConverter := prometheusremotewrite.NewPrometheusConverter()
87+
settings := prometheusremotewrite.Settings{
88+
AddMetricSuffixes: true,
89+
PromoteResourceAttributes: cfg.PromoteResourceAttributes,
90+
}
91+
var annots annotations.Annotations
92+
var err error
93+
if cfg.ConvertAllAttributes {
94+
annots, err = promConverter.FromMetrics(ctx, convertToMetricsAttributes(pmetrics), settings)
95+
} else {
96+
annots, err = promConverter.FromMetrics(ctx, pmetrics, settings)
97+
}
98+
99+
ws, _ := annots.AsStrings("", 0, 0)
100+
if len(ws) > 0 {
101+
level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
102+
}
103+
104+
if err != nil {
105+
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
106+
return nil, err
107+
}
108+
return promConverter.TimeSeries(), nil
109+
}
110+
90111
func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter {
91112
out := make(labels.Labels, 0, len(in))
92113
for _, l := range in {

pkg/util/push/otlp_test.go

Lines changed: 165 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,179 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/go-kit/log"
12+
"github.com/prometheus/prometheus/prompb"
1113
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
1315
"go.opentelemetry.io/collector/pdata/pcommon"
1416
"go.opentelemetry.io/collector/pdata/pmetric"
1517
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
1618

1719
"github.com/cortexproject/cortex/pkg/cortexpb"
20+
"github.com/cortexproject/cortex/pkg/distributor"
1821
)
1922

23+
func TestOTLPConvertToPromTS(t *testing.T) {
24+
logger := log.NewNopLogger()
25+
ctx := context.Background()
26+
d := pmetric.NewMetrics()
27+
resourceMetric := d.ResourceMetrics().AppendEmpty()
28+
resourceMetric.Resource().Attributes().PutStr("service.name", "test-service") // converted to job, service_name
29+
resourceMetric.Resource().Attributes().PutStr("attr1", "value")
30+
resourceMetric.Resource().Attributes().PutStr("attr2", "value")
31+
resourceMetric.Resource().Attributes().PutStr("attr3", "value")
32+
33+
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()
34+
35+
//Generate One Counter
36+
timestamp := time.Now()
37+
counterMetric := scopeMetric.Metrics().AppendEmpty()
38+
counterMetric.SetName("test-counter")
39+
counterMetric.SetDescription("test-counter-description")
40+
counterMetric.SetEmptySum()
41+
counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
42+
counterMetric.Sum().SetIsMonotonic(true)
43+
44+
counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty()
45+
counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
46+
counterDataPoint.SetDoubleValue(10.0)
47+
48+
tests := []struct {
49+
description string
50+
cfg distributor.OTLPConfig
51+
expectedLabels []prompb.Label
52+
}{
53+
{
54+
description: "only attributes that exist in promote resource attributes should be converted",
55+
cfg: distributor.OTLPConfig{
56+
ConvertAllAttributes: false,
57+
PromoteResourceAttributes: []string{"attr1"},
58+
},
59+
expectedLabels: []prompb.Label{
60+
{
61+
Name: "__name__",
62+
Value: "test_counter_total",
63+
},
64+
{
65+
Name: "attr1",
66+
Value: "value",
67+
},
68+
{
69+
Name: "job",
70+
Value: "test-service",
71+
},
72+
},
73+
},
74+
{
75+
description: "not exist attribute is ignored",
76+
cfg: distributor.OTLPConfig{
77+
ConvertAllAttributes: false,
78+
PromoteResourceAttributes: []string{"dummy"},
79+
},
80+
expectedLabels: []prompb.Label{
81+
{
82+
Name: "__name__",
83+
Value: "test_counter_total",
84+
},
85+
{
86+
Name: "job",
87+
Value: "test-service",
88+
},
89+
},
90+
},
91+
{
92+
description: "should convert all attribute",
93+
cfg: distributor.OTLPConfig{
94+
ConvertAllAttributes: true,
95+
PromoteResourceAttributes: nil,
96+
},
97+
expectedLabels: []prompb.Label{
98+
{
99+
Name: "__name__",
100+
Value: "test_counter_total",
101+
},
102+
{
103+
Name: "attr1",
104+
Value: "value",
105+
},
106+
{
107+
Name: "attr2",
108+
Value: "value",
109+
},
110+
{
111+
Name: "attr3",
112+
Value: "value",
113+
},
114+
{
115+
Name: "job",
116+
Value: "test-service",
117+
},
118+
{
119+
Name: "service_name",
120+
Value: "test-service",
121+
},
122+
},
123+
},
124+
{
125+
description: "should convert all attribute regardless of promote resource attributes",
126+
cfg: distributor.OTLPConfig{
127+
ConvertAllAttributes: true,
128+
PromoteResourceAttributes: []string{"attr1", "attr2"},
129+
},
130+
expectedLabels: []prompb.Label{
131+
{
132+
Name: "__name__",
133+
Value: "test_counter_total",
134+
},
135+
{
136+
Name: "attr1",
137+
Value: "value",
138+
},
139+
{
140+
Name: "attr2",
141+
Value: "value",
142+
},
143+
{
144+
Name: "attr3",
145+
Value: "value",
146+
},
147+
{
148+
Name: "job",
149+
Value: "test-service",
150+
},
151+
{
152+
Name: "service_name",
153+
Value: "test-service",
154+
},
155+
},
156+
},
157+
}
158+
159+
for _, test := range tests {
160+
t.Run(test.description, func(t *testing.T) {
161+
tsList, err := convertToPromTS(ctx, d, test.cfg, logger)
162+
require.NoError(t, err)
163+
require.Equal(t, 2, len(tsList)) // target_info + test_counter_total
164+
165+
var counterTs prompb.TimeSeries
166+
for _, ts := range tsList {
167+
for _, label := range ts.Labels {
168+
if label.Name == "__name__" && label.Value == "test_counter_total" {
169+
// get counter ts
170+
counterTs = ts
171+
}
172+
}
173+
}
174+
require.ElementsMatch(t, test.expectedLabels, counterTs.Labels)
175+
})
176+
}
177+
}
178+
20179
func TestOTLPWriteHandler(t *testing.T) {
180+
cfg := distributor.OTLPConfig{
181+
PromoteResourceAttributes: []string{},
182+
}
183+
21184
exportRequest := generateOTLPWriteRequest(t)
22185

23186
buf, err := exportRequest.MarshalProto()
@@ -28,7 +191,7 @@ func TestOTLPWriteHandler(t *testing.T) {
28191
req.Header.Set("Content-Type", "application/x-protobuf")
29192

30193
push := verifyOTLPWriteRequestHandler(t, cortexpb.API)
31-
handler := OTLPHandler(nil, push)
194+
handler := OTLPHandler(cfg, nil, push)
32195

33196
recorder := httptest.NewRecorder()
34197
handler.ServeHTTP(recorder, req)
@@ -120,7 +283,7 @@ func generateOTLPWriteRequest(t *testing.T) pmetricotlp.ExportRequest {
120283
func verifyOTLPWriteRequestHandler(t *testing.T, expectSource cortexpb.WriteRequest_SourceEnum) func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) {
121284
t.Helper()
122285
return func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) {
123-
assert.Len(t, request.Timeseries, 12) // 1 (counter) + 1 (gauge) + 7 (hist_bucket) + 2 (hist_sum, hist_count) + 1 (exponential histogram)
286+
assert.Len(t, request.Timeseries, 13) // 1 (target_info) + 1 (counter) + 1 (gauge) + 7 (hist_bucket) + 2 (hist_sum, hist_count) + 1 (exponential histogram)
124287
// TODO: test more things
125288
assert.Equal(t, expectSource, request.Source)
126289
assert.False(t, request.SkipLabelNameValidation)

0 commit comments

Comments
 (0)