Skip to content

Commit 6bb670c

Browse files
committed
Integration test
Signed-off-by: Friedrich Gonzalez <[email protected]>
1 parent b8fd660 commit 6bb670c

File tree

2 files changed

+147
-0
lines changed

2 files changed

+147
-0
lines changed

integration/e2ecortex/client.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ import (
2727
yaml "gopkg.in/yaml.v3"
2828

2929
"github.com/cortexproject/cortex/pkg/ruler"
30+
"go.opentelemetry.io/collector/pdata/pcommon"
31+
"go.opentelemetry.io/collector/pdata/pmetric"
32+
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
3033
)
3134

3235
var ErrNotFound = errors.New("not found")
@@ -121,6 +124,74 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
121124
return res, nil
122125
}
123126

127+
func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
128+
var metricName string
129+
attributes := make(map[string]any)
130+
for _, label := range ts.Labels {
131+
if label.Name == model.MetricNameLabel {
132+
metricName = label.Value
133+
} else {
134+
attributes[label.Name] = label.Value
135+
}
136+
}
137+
return metricName, attributes
138+
}
139+
140+
func createDatapointsGauge(newMetric pmetric.Metric, attributes map[string]any, samples []prompb.Sample) {
141+
newMetric.SetEmptyGauge()
142+
for _, sample := range samples {
143+
datapoint := newMetric.Gauge().DataPoints().AppendEmpty()
144+
datapoint.SetDoubleValue(sample.Value)
145+
datapoint.SetTimestamp(pcommon.Timestamp(sample.Timestamp * time.Millisecond.Nanoseconds()))
146+
datapoint.Attributes().FromRaw(attributes)
147+
}
148+
}
149+
150+
// Convert Timeseries to Metrics
151+
func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics {
152+
metrics := pmetric.NewMetrics()
153+
for _, ts := range timeseries {
154+
metricName, attributes := getNameAndAttributes(ts)
155+
newMetric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
156+
newMetric.SetName(metricName)
157+
//TODO Set description for new metric
158+
//TODO Set unit for new metric
159+
createDatapointsGauge(newMetric, attributes, ts.Samples)
160+
//TODO(friedrichg): Add support for histograms
161+
}
162+
return metrics
163+
}
164+
165+
// Push series to OTLP endpoint
166+
func (c *Client) OTLP(timeseries []prompb.TimeSeries) (*http.Response, error) {
167+
168+
data, err := pmetricotlp.NewExportRequestFromMetrics(convertTimeseriesToMetrics(timeseries)).MarshalProto()
169+
if err != nil {
170+
return nil, err
171+
}
172+
173+
// Create HTTP request
174+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/otlp/v1/metrics", c.distributorAddress), bytes.NewReader(data))
175+
if err != nil {
176+
return nil, err
177+
}
178+
179+
req.Header.Set("Content-Type", "application/x-protobuf")
180+
req.Header.Set("X-Scope-OrgID", c.orgID)
181+
182+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
183+
defer cancel()
184+
185+
// Execute HTTP request
186+
res, err := c.httpClient.Do(req.WithContext(ctx))
187+
if err != nil {
188+
return nil, err
189+
}
190+
191+
defer res.Body.Close()
192+
return res, nil
193+
}
194+
124195
// Query runs an instant query.
125196
func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
126197
value, _, err := c.querierClient.Query(context.Background(), query, ts)

integration/otlp_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//go:build requires_docker
2+
// +build requires_docker
3+
4+
package integration
5+
6+
import (
7+
"fmt"
8+
"testing"
9+
"time"
10+
11+
"github.com/prometheus/common/model"
12+
"github.com/prometheus/prometheus/prompb"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
16+
"github.com/cortexproject/cortex/integration/e2e"
17+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
18+
"github.com/cortexproject/cortex/integration/e2ecortex"
19+
)
20+
21+
func TestOTLP(t *testing.T) {
22+
s, err := e2e.NewScenario(networkName)
23+
require.NoError(t, err)
24+
defer s.Close()
25+
26+
// Start dependencies.
27+
minio := e2edb.NewMinio(9000, bucketName)
28+
require.NoError(t, s.StartAndWaitReady(minio))
29+
30+
// Start Cortex components.
31+
require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks.yaml", cortexConfigFile))
32+
33+
// Start Cortex in single binary mode, reading the config from file and overwriting
34+
// the backend config to make it work with Minio.
35+
flags := map[string]string{
36+
"-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
37+
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
38+
"-blocks-storage.s3.bucket-name": bucketName,
39+
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
40+
"-blocks-storage.s3.insecure": "true",
41+
}
42+
43+
cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex-1", cortexConfigFile, flags, "", 9009, 9095)
44+
require.NoError(t, s.StartAndWaitReady(cortex))
45+
46+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
47+
require.NoError(t, err)
48+
49+
// Push some series to Cortex.
50+
now := time.Now()
51+
series, expectedVector := generateSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"})
52+
53+
res, err := c.OTLP(series)
54+
require.NoError(t, err)
55+
require.Equal(t, 200, res.StatusCode)
56+
57+
// Query the series.
58+
result, err := c.Query("series_1", now)
59+
require.NoError(t, err)
60+
require.Equal(t, model.ValVector, result.Type())
61+
assert.Equal(t, expectedVector, result.(model.Vector))
62+
63+
labelValues, err := c.LabelValues("foo", time.Time{}, time.Time{}, nil)
64+
require.NoError(t, err)
65+
require.Equal(t, model.LabelValues{"bar"}, labelValues)
66+
67+
labelNames, err := c.LabelNames(time.Time{}, time.Time{})
68+
require.NoError(t, err)
69+
require.Equal(t, []string{"__name__", "foo"}, labelNames)
70+
71+
// Check that a range query does not return an error to sanity check the queryrange tripperware.
72+
_, err = c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second)
73+
require.NoError(t, err)
74+
75+
//TODO(friedrichg): test histograms
76+
}

0 commit comments

Comments
 (0)