diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index 9dbc80d405c..e6eab8a2169 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -11,11 +11,13 @@ import ( "testing" "time" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/tripperware" ) @@ -444,3 +446,66 @@ func Test_sortPlanForQuery(t *testing.T) { }) } } + +func Benchmark_Decode(b *testing.B) { + maxSamplesCount := 1000000 + samples := make([]tripperware.SampleStream, maxSamplesCount) + + for i := 0; i < maxSamplesCount; i++ { + samples[i].Labels = append(samples[i].Labels, cortexpb.LabelAdapter{Name: fmt.Sprintf("Sample%v", i), Value: fmt.Sprintf("Value%v", i)}) + samples[i].Labels = append(samples[i].Labels, cortexpb.LabelAdapter{Name: fmt.Sprintf("Sample2%v", i), Value: fmt.Sprintf("Value2%v", i)}) + samples[i].Labels = append(samples[i].Labels, cortexpb.LabelAdapter{Name: fmt.Sprintf("Sample3%v", i), Value: fmt.Sprintf("Value3%v", i)}) + samples[i].Samples = append(samples[i].Samples, cortexpb.Sample{TimestampMs: int64(i), Value: float64(i)}) + } + + for name, tc := range map[string]struct { + sampleStream []tripperware.SampleStream + }{ + "100 samples": { + sampleStream: samples[:100], + }, + "1000 samples": { + sampleStream: samples[:1000], + }, + "10000 samples": { + sampleStream: samples[:10000], + }, + "100000 samples": { + sampleStream: samples[:100000], + }, + "1000000 samples": { + sampleStream: samples[:1000000], + }, + } { + b.Run(name, func(b *testing.B) { + r := PrometheusInstantQueryResponse{ + Data: PrometheusInstantQueryData{ + ResultType: model.ValMatrix.String(), + Result: PrometheusInstantQueryResult{ + Result: &PrometheusInstantQueryResult_Matrix{ + Matrix: &Matrix{ + SampleStreams: tc.sampleStream, + }, + }, + }, + }, + } + + body, err := json.Marshal(r) + require.NoError(b, err) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + response := &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewBuffer(body)), + } + _, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil) + require.NoError(b, err) + } + }) + } + +} diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index c2d444de85a..42de413e52b 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -83,6 +83,31 @@ type Request interface { WithStats(stats string) Request } +func decodeSampleStream(ptr unsafe.Pointer, iter *jsoniter.Iterator) { + lbls := labels.Labels{} + samples := []cortexpb.Sample{} + for field := iter.ReadObject(); field != ""; field = iter.ReadObject() { + switch field { + case "metric": + iter.ReadVal(&lbls) + case "values": + for { + if !iter.ReadArray() { + break + } + s := cortexpb.Sample{} + cortexpb.SampleJsoniterDecode(unsafe.Pointer(&s), iter) + samples = append(samples, s) + } + } + } + + *(*SampleStream)(ptr) = SampleStream{ + Samples: samples, + Labels: cortexpb.FromLabelsToLabelAdapters(lbls), + } +} + func encodeSampleStream(ptr unsafe.Pointer, stream *jsoniter.Stream) { ss := (*SampleStream)(ptr) stream.WriteObjectStart() @@ -160,6 +185,7 @@ func init() { jsoniter.RegisterTypeEncoderFunc("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode, func(unsafe.Pointer) bool { return false }) jsoniter.RegisterTypeDecoderFunc("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode) jsoniter.RegisterTypeEncoderFunc("tripperware.SampleStream", encodeSampleStream, func(unsafe.Pointer) bool { return false }) + jsoniter.RegisterTypeDecoderFunc("tripperware.SampleStream", decodeSampleStream) } func EncodeTime(t int64) string {