Skip to content

Fix sorted queries do not produce sorted results for shardable queries #5148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [BUGFIX] Ingester: Ingesters returning empty response for metadata APIs. #5081
* [BUGFIX] Ingester: Fix panic when querying metadata from blocks that are being deleted. #5119
* [BUGFIX] Ring: Fix case when dynamodb kv reaches the limit of 25 actions per batch call. #5136
* [BUGFIX] Query-frontend: Fix sorted queries do not produce sorted results for shardable queries. #5148
* [FEATURE] Alertmanager: Add support for time_intervals. #5102

## 1.14.0 2022-12-02
Expand Down
76 changes: 65 additions & 11 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc/status"

promqlparser "github.com/prometheus/prometheus/promql/parser"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
Expand Down Expand Up @@ -245,7 +247,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
return &resp, nil
}

func (instantQueryCodec) MergeResponse(ctx context.Context, responses ...tripperware.Response) (tripperware.Response, error) {
func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Request, responses ...tripperware.Response) (tripperware.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "PrometheusInstantQueryResponse.MergeResponse")
sp.SetTag("response_count", len(responses))
defer sp.Finish()
Expand All @@ -265,11 +267,15 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, responses ...tripper
// For now, we only shard queries that returns a vector.
switch promResponses[0].Data.ResultType {
case model.ValVector.String():
v, err := vectorMerge(req, promResponses)
if err != nil {
return nil, err
}
data = PrometheusInstantQueryData{
ResultType: model.ValVector.String(),
Result: PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Vector{
Vector: vectorMerge(promResponses),
Vector: v,
},
},
Stats: statsMerge(promResponses),
Expand Down Expand Up @@ -297,8 +303,12 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, responses ...tripper
return res, nil
}

func vectorMerge(resps []*PrometheusInstantQueryResponse) *Vector {
func vectorMerge(req tripperware.Request, resps []*PrometheusInstantQueryResponse) (*Vector, error) {
output := map[string]*Sample{}
sortAsc, sortDesc, err := parseQueryForSort(req.GetQuery())
if err != nil {
return nil, err
}
buf := make([]byte, 0, 1024)
for _, resp := range resps {
if resp == nil {
Expand Down Expand Up @@ -327,22 +337,66 @@ func vectorMerge(resps []*PrometheusInstantQueryResponse) *Vector {
if len(output) == 0 {
return &Vector{
Samples: make([]*Sample, 0),
}
}, nil
}

keys := make([]string, 0, len(output))
for key := range output {
keys = append(keys, key)
type pair struct {
metric string
s *Sample
}

samples := make([]*pair, 0, len(output))
for k, v := range output {
samples = append(samples, &pair{
metric: k,
s: v,
})
}
sort.Strings(keys)

sort.Slice(samples, func(i, j int) bool {
// Order is determined by the sortFn in the query.
if sortAsc {
return samples[i].s.Sample.Value < samples[j].s.Sample.Value
} else if sortDesc {
return samples[i].s.Sample.Value > samples[j].s.Sample.Value
} else {
// Fallback on sorting by labels.
return samples[i].metric < samples[j].metric
}
})
result := &Vector{
Samples: make([]*Sample, 0, len(output)),
}
for _, key := range keys {
result.Samples = append(result.Samples, output[key])
for _, p := range samples {
result.Samples = append(result.Samples, p.s)
}
return result
return result, nil
}

func parseQueryForSort(q string) (bool, bool, error) {
expr, err := promqlparser.ParseExpr(q)
if err != nil {
return false, false, err
}
var sortAsc bool = false
var sortDesc bool = false
done := errors.New("done")
promqlparser.Inspect(expr, func(n promqlparser.Node, _ []promqlparser.Node) error {
if n, ok := n.(*promqlparser.Call); ok {
if n.Func != nil {
if n.Func.Name == "sort" {
sortAsc = true
return done
}
if n.Func.Name == "sort_desc" {
sortDesc = true
return done
}
}
}
return nil
})
return sortAsc, sortDesc, nil
}

func matrixMerge(resps []*PrometheusInstantQueryResponse) []tripperware.SampleStream {
Expand Down
37 changes: 36 additions & 1 deletion pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,34 +198,43 @@ func TestResponse(t *testing.T) {
}

func TestMergeResponse(t *testing.T) {
defaultReq := &PrometheusRequest{
Query: "sum(up)",
}
for _, tc := range []struct {
name string
req tripperware.Request
resps []string
expectedResp string
expectedErr error
}{
{
name: "empty response",
req: defaultReq,
resps: []string{`{"status":"success","data":{"resultType":"vector","result":[]}}`},
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[]}}`,
},
{
name: "empty response with stats",
req: defaultReq,
resps: []string{`{"status":"success","data":{"resultType":"vector","result":[],"stats":{"samples":{"totalQueryableSamples":0,"totalQueryableSamplesPerStep":[]}}}}`},
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[],"stats":{"samples":{"totalQueryableSamples":0,"totalQueryableSamplesPerStep":[]}}}}`,
},
{
name: "single response",
req: defaultReq,
resps: []string{`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}]}}`},
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}]}}`,
},
{
name: "single response with stats",
req: defaultReq,
resps: []string{`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]]}}}}`},
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]]}}}}`,
},
{
name: "duplicated response",
req: defaultReq,
resps: []string{
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}]}}`,
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}]}}`,
Expand All @@ -234,6 +243,7 @@ func TestMergeResponse(t *testing.T) {
},
{
name: "duplicated response with stats",
req: defaultReq,
resps: []string{
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]]}}}}`,
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]]}}}}`,
Expand All @@ -242,14 +252,34 @@ func TestMergeResponse(t *testing.T) {
},
{
name: "merge two responses",
req: defaultReq,
resps: []string{
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[2,"2"]}]}}`,
},
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[2,"2"]},{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
},
{
name: "merge two responses with sort",
req: &PrometheusRequest{Query: "sort(up)"},
resps: []string{
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`,
},
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]},{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`,
},
{
name: "merge two responses with sort_desc",
req: &PrometheusRequest{Query: "sort_desc(up)"},
resps: []string{
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`,
},
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]},{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
},
{
name: "merge two responses with stats",
req: defaultReq,
resps: []string{
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]]}}}}`,
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[2,"2"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]]}}}}`,
Expand All @@ -258,6 +288,7 @@ func TestMergeResponse(t *testing.T) {
},
{
name: "responses don't contain vector, should return an error",
req: defaultReq,
resps: []string{
`{"status":"success","data":{"resultType":"string","result":[1662682521.409,"foo"]}}`,
`{"status":"success","data":{"resultType":"string","result":[1662682521.409,"foo"]}}`,
Expand All @@ -266,13 +297,15 @@ func TestMergeResponse(t *testing.T) {
},
{
name: "single matrix response",
req: defaultReq,
resps: []string{
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"up"},"values":[[1,"1"],[2,"2"]]}]}}`,
},
expectedResp: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"up"},"values":[[1,"1"],[2,"2"]]}]}}`,
},
{
name: "multiple matrix responses without duplicated series",
req: defaultReq,
resps: []string{
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"]]}]}}`,
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"foo"},"values":[[3,"3"],[4,"4"]]}]}}`,
Expand All @@ -281,6 +314,7 @@ func TestMergeResponse(t *testing.T) {
},
{
name: "multiple matrix responses with duplicated series, but not same samples",
req: defaultReq,
resps: []string{
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"]]}]}}`,
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[3,"3"]]}]}}`,
Expand All @@ -289,6 +323,7 @@ func TestMergeResponse(t *testing.T) {
},
{
name: "multiple matrix responses with duplicated series and same samples",
req: defaultReq,
resps: []string{
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"]]}]}}`,
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"],[3,"3"]]}]}}`,
Expand All @@ -308,7 +343,7 @@ func TestMergeResponse(t *testing.T) {
require.NoError(t, err)
resps = append(resps, dr)
}
resp, err := InstantQueryCodec.MergeResponse(context.Background(), resps...)
resp, err := InstantQueryCodec.MergeResponse(context.Background(), tc.req, resps...)
assert.Equal(t, err, tc.expectedErr)
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Codec interface {
// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
type Merger interface {
// MergeResponse merges responses from multiple requests into a single Response
MergeResponse(context.Context, ...Response) (Response, error)
MergeResponse(context.Context, Request, ...Response) (Response, error)
}

// Response represents a query range response.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func NewEmptyPrometheusResponse() *PrometheusResponse {
}
}

func (c prometheusCodec) MergeResponse(ctx context.Context, responses ...tripperware.Response) (tripperware.Response, error) {
func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Request, responses ...tripperware.Response) (tripperware.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "QueryRangeResponse.MergeResponse")
sp.SetTag("response_count", len(responses))
defer sp.Finish()
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func TestMergeAPIResponses(t *testing.T) {
},
}} {
t.Run(tc.name, func(t *testing.T) {
output, err := PrometheusCodec.MergeResponse(context.Background(), tc.input...)
output, err := PrometheusCodec.MergeResponse(context.Background(), nil, tc.input...)
require.NoError(t, err)
require.Equal(t, tc.expected, output)
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/tripperware/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte
return nil, nil, err
}
if len(requests) == 0 {
response, err := s.merger.MergeResponse(context.Background(), responses...)
response, err := s.merger.MergeResponse(context.Background(), r, responses...)
// No downstream requests so no need to write back to the cache.
return response, nil, err
}
Expand Down Expand Up @@ -469,7 +469,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte
if err != nil {
return nil, nil, err
}
merged, err := s.merger.MergeResponse(ctx, accumulator.Response, currentRes)
merged, err := s.merger.MergeResponse(ctx, r, accumulator.Response, currentRes)
if err != nil {
return nil, nil, err
}
Expand All @@ -481,7 +481,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte
return nil, nil, err
}

response, err := s.merger.MergeResponse(ctx, responses...)
response, err := s.merger.MergeResponse(ctx, r, responses...)
return response, mergedExtents, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripper
resps = append(resps, reqResp.Response)
}

response, err := s.merger.MergeResponse(ctx, resps...)
response, err := s.merger.MergeResponse(ctx, nil, resps...)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func TestSplitQuery(t *testing.T) {
}

func TestSplitByDay(t *testing.T) {
mergedResponse, err := PrometheusCodec.MergeResponse(context.Background(), parsedResponse, parsedResponse)
mergedResponse, err := PrometheusCodec.MergeResponse(context.Background(), nil, parsedResponse, parsedResponse)
require.NoError(t, err)

mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), mergedResponse)
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/shard_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {
resps = append(resps, reqResp.Response)
}

return s.merger.MergeResponse(ctx, resps...)
return s.merger.MergeResponse(ctx, r, resps...)
}

func (s shardBy) shardQuery(l log.Logger, numShards int, r Request, analysis querysharding.QueryAnalysis) []Request {
Expand Down