Skip to content

Commit 6c1f7fa

Browse files
committed
test
1 parent 41e1047 commit 6c1f7fa

File tree

3 files changed

+56
-47
lines changed

3 files changed

+56
-47
lines changed

pkg/querier/tripperware/instantquery/instantquery.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,5 @@ message Sample {
4444
}
4545

4646
message Matrix {
47-
repeated tripperware.SampleStream sampleStreams = 1;
47+
repeated tripperware.SampleStream sampleStreams = 1 [(gogoproto.nullable) = false];
4848
}

pkg/querier/tripperware/merge.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package tripperware
2+
3+
import (
4+
"github.com/cortexproject/cortex/pkg/cortexpb"
5+
"github.com/cortexproject/cortex/pkg/querier/tripperware/utils"
6+
"sort"
7+
)
8+
9+
type SampleStreamResponse interface {
10+
GetSampleStreams() []*SampleStream
11+
}
12+
13+
func MatrixMerge(resps []SampleStreamResponse) []*SampleStream {
14+
output := map[string]*SampleStream{}
15+
for _, resp := range resps {
16+
for _, stream := range resp.GetSampleStreams() {
17+
metric := cortexpb.FromLabelAdaptersToLabels(stream.Labels).String()
18+
existing, ok := output[metric]
19+
if !ok {
20+
existing = &SampleStream{
21+
Labels: stream.Labels,
22+
}
23+
}
24+
// We need to make sure we don't repeat samples. This causes some visualisations to be broken in Grafana.
25+
// The prometheus API is inclusive of start and end timestamps.
26+
if len(existing.Samples) > 0 && len(stream.Samples) > 0 {
27+
existingEndTs := existing.Samples[len(existing.Samples)-1].TimestampMs
28+
if existingEndTs == stream.Samples[0].TimestampMs {
29+
// Typically this the cases where only 1 sample point overlap,
30+
// so optimize with simple code.
31+
stream.Samples = stream.Samples[1:]
32+
} else if existingEndTs > stream.Samples[0].TimestampMs {
33+
// Overlap might be big, use heavier algorithm to remove overlap.
34+
stream.Samples = utils.SliceSamples(stream.Samples, existingEndTs)
35+
} // else there is no overlap, yay!
36+
}
37+
existing.Samples = append(existing.Samples, stream.Samples...)
38+
output[metric] = existing
39+
}
40+
}
41+
42+
keys := make([]string, 0, len(output))
43+
for key := range output {
44+
keys = append(keys, key)
45+
}
46+
sort.Strings(keys)
47+
48+
result := make([]SampleStream, 0, len(output))
49+
for _, key := range keys {
50+
result = append(result, *output[key])
51+
}
52+
53+
return result
54+
}

pkg/querier/tripperware/queryrange/query_range.go

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ import (
2121
"github.com/prometheus/prometheus/model/timestamp"
2222
"github.com/weaveworks/common/httpgrpc"
2323

24-
"github.com/cortexproject/cortex/pkg/cortexpb"
2524
"github.com/cortexproject/cortex/pkg/querier/tripperware"
26-
"github.com/cortexproject/cortex/pkg/querier/tripperware/utils"
2725
"github.com/cortexproject/cortex/pkg/util"
2826
"github.com/cortexproject/cortex/pkg/util/spanlogger"
2927
)
@@ -150,7 +148,7 @@ func (c prometheusCodec) MergeResponse(responses ...tripperware.Response) (tripp
150148
Status: StatusSuccess,
151149
Data: PrometheusData{
152150
ResultType: model.ValMatrix.String(),
153-
Result: matrixMerge(promResponses),
151+
Result: tripperware.MatrixMerge(promResponses),
154152
Stats: statsMerge(c.sharded, promResponses),
155153
},
156154
}
@@ -345,49 +343,6 @@ func statsMerge(shouldSumStats bool, resps []*PrometheusResponse) *tripperware.P
345343
return tripperware.StatsMerge(output)
346344
}
347345

348-
func matrixMerge(resps []*PrometheusResponse) []tripperware.SampleStream {
349-
output := map[string]*tripperware.SampleStream{}
350-
for _, resp := range resps {
351-
for _, stream := range resp.Data.Result {
352-
metric := cortexpb.FromLabelAdaptersToLabels(stream.Labels).String()
353-
existing, ok := output[metric]
354-
if !ok {
355-
existing = &tripperware.SampleStream{
356-
Labels: stream.Labels,
357-
}
358-
}
359-
// We need to make sure we don't repeat samples. This causes some visualisations to be broken in Grafana.
360-
// The prometheus API is inclusive of start and end timestamps.
361-
if len(existing.Samples) > 0 && len(stream.Samples) > 0 {
362-
existingEndTs := existing.Samples[len(existing.Samples)-1].TimestampMs
363-
if existingEndTs == stream.Samples[0].TimestampMs {
364-
// Typically this the cases where only 1 sample point overlap,
365-
// so optimize with simple code.
366-
stream.Samples = stream.Samples[1:]
367-
} else if existingEndTs > stream.Samples[0].TimestampMs {
368-
// Overlap might be big, use heavier algorithm to remove overlap.
369-
stream.Samples = utils.SliceSamples(stream.Samples, existingEndTs)
370-
} // else there is no overlap, yay!
371-
}
372-
existing.Samples = append(existing.Samples, stream.Samples...)
373-
output[metric] = existing
374-
}
375-
}
376-
377-
keys := make([]string, 0, len(output))
378-
for key := range output {
379-
keys = append(keys, key)
380-
}
381-
sort.Strings(keys)
382-
383-
result := make([]tripperware.SampleStream, 0, len(output))
384-
for _, key := range keys {
385-
result = append(result, *output[key])
386-
}
387-
388-
return result
389-
}
390-
391346
func parseDurationMs(s string) (int64, error) {
392347
if d, err := strconv.ParseFloat(s, 64); err == nil {
393348
ts := d * float64(time.Second/time.Millisecond)

0 commit comments

Comments
 (0)