Skip to content

Fix the incorrect ordering for topk and bottomk in shardable queries #5170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* [BUGFIX] Ingester: Ingesters returning empty response for metadata APIs. #5081
* [BUGFIX] Ingester: Fix panic when querying metadata from blocks that are being deleted. #5119
* [BUGFIX] Ring: Fix case when dynamodb kv reaches the limit of 25 actions per batch call. #5136
* [BUGFIX] Query-frontend: Fix sorted queries do not produce sorted results for shardable queries. #5148
* [BUGFIX] Query-frontend: Fix shardable instant queries do not produce sorted results for `sort`, `sort_desc`, `topk`, `bottomk` functions. #5148, #5170
* [BUGFIX] Querier: Fix `/api/v1/series` returning 5XX instead of 4XX when limits are hit. #5169
* [FEATURE] Alertmanager: Add support for time_intervals. #5102

Expand Down
91 changes: 66 additions & 25 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Requ

func vectorMerge(req tripperware.Request, resps []*PrometheusInstantQueryResponse) (*Vector, error) {
output := map[string]*Sample{}
sortAsc, sortDesc, err := parseQueryForSort(req.GetQuery())
metrics := []string{} // Used to preserve the order for topk and bottomk.
sortPlan, err := sortPlanForQuery(req.GetQuery())
if err != nil {
return nil, err
}
Expand All @@ -327,17 +328,27 @@ func vectorMerge(req tripperware.Request, resps []*PrometheusInstantQueryRespons
metric := string(cortexpb.FromLabelAdaptersToLabels(sample.Labels).Bytes(buf))
if existingSample, ok := output[metric]; !ok {
output[metric] = s
metrics = append(metrics, metric) // Preserve the order of metric.
} else if existingSample.GetSample().TimestampMs < s.GetSample().TimestampMs {
// Choose the latest sample if we see overlap.
output[metric] = s
}
}
}

result := &Vector{
Samples: make([]*Sample, 0, len(output)),
}

if len(output) == 0 {
return &Vector{
Samples: make([]*Sample, 0),
}, nil
return result, nil
}

if sortPlan == mergeOnly {
for _, k := range metrics {
result.Samples = append(result.Samples, output[k])
}
return result, nil
}

type pair struct {
Expand All @@ -354,49 +365,79 @@ func vectorMerge(req tripperware.Request, resps []*PrometheusInstantQueryRespons
}

sort.Slice(samples, func(i, j int) bool {
// Order is determined by the sortFn in the query.
if sortAsc {
// Order is determined by vector
switch sortPlan {
case sortByValuesAsc:
return samples[i].s.Sample.Value < samples[j].s.Sample.Value
} else if sortDesc {
case sortByValuesDesc:
return samples[i].s.Sample.Value > samples[j].s.Sample.Value
} else {
// Fallback on sorting by labels.
return samples[i].metric < samples[j].metric
}
return samples[i].metric < samples[j].metric
})
result := &Vector{
Samples: make([]*Sample, 0, len(output)),
}

for _, p := range samples {
result.Samples = append(result.Samples, p.s)
}
return result, nil
}

func parseQueryForSort(q string) (bool, bool, error) {
type sortPlan int

const (
mergeOnly sortPlan = 0
sortByValuesAsc sortPlan = 1
sortByValuesDesc sortPlan = 2
sortByLabels sortPlan = 3
)

func sortPlanForQuery(q string) (sortPlan, error) {
expr, err := promqlparser.ParseExpr(q)
if err != nil {
return false, false, err
return 0, err
}
var sortAsc bool = false
var sortDesc bool = false
done := errors.New("done")
promqlparser.Inspect(expr, func(n promqlparser.Node, _ []promqlparser.Node) error {
if n, ok := n.(*promqlparser.Call); ok {
// Check if the root expression is topk or bottomk
if aggr, ok := expr.(*promqlparser.AggregateExpr); ok {
if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK {
return mergeOnly, nil
}
}
checkForSort := func(expr promqlparser.Expr) (sortAsc, sortDesc bool) {
if n, ok := expr.(*promqlparser.Call); ok {
if n.Func != nil {
if n.Func.Name == "sort" {
sortAsc = true
return done
}
if n.Func.Name == "sort_desc" {
sortDesc = true
return done
}
}
}
return nil
})
return sortAsc, sortDesc, nil
return sortAsc, sortDesc
}
// Check the root expression for sort
if sortAsc, sortDesc := checkForSort(expr); sortAsc || sortDesc {
if sortAsc {
return sortByValuesAsc, nil
}
return sortByValuesDesc, nil
}

// If the root expression is a binary expression, check the LHS and RHS for sort
if bin, ok := expr.(*promqlparser.BinaryExpr); ok {
if sortAsc, sortDesc := checkForSort(bin.LHS); sortAsc || sortDesc {
if sortAsc {
return sortByValuesAsc, nil
}
return sortByValuesDesc, nil
}
if sortAsc, sortDesc := checkForSort(bin.RHS); sortAsc || sortDesc {
if sortAsc {
return sortByValuesAsc, nil
}
return sortByValuesDesc, nil
}
}
return sortByLabels, nil
}

func matrixMerge(resps []*PrometheusInstantQueryResponse) []tripperware.SampleStream {
Expand Down
79 changes: 77 additions & 2 deletions pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestMergeResponse(t *testing.T) {
},
{
name: "merge two responses with sort",
req: &PrometheusRequest{Query: "sort(up)"},
req: &PrometheusRequest{Query: "sort(sum by (job) (up))"},
resps: []string{
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`,
Expand All @@ -270,13 +270,22 @@ func TestMergeResponse(t *testing.T) {
},
{
name: "merge two responses with sort_desc",
req: &PrometheusRequest{Query: "sort_desc(up)"},
req: &PrometheusRequest{Query: "sort_desc(sum by (job) (up))"},
resps: []string{
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`,
},
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]},{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
},
{
name: "merge two responses with topk",
req: &PrometheusRequest{Query: "topk(10, up) by(job)"},
resps: []string{
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`,
},
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]},{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`,
},
{
name: "merge two responses with stats",
req: defaultReq,
Expand Down Expand Up @@ -356,3 +365,69 @@ func TestMergeResponse(t *testing.T) {
})
}
}

func Test_sortPlanForQuery(t *testing.T) {
tc := []struct {
query string
expectedPlan sortPlan
err bool
}{
{
query: "invalid(10, up)",
expectedPlan: mergeOnly,
err: true,
},
{
query: "topk(10, up)",
expectedPlan: mergeOnly,
err: false,
},
{
query: "bottomk(10, up)",
expectedPlan: mergeOnly,
err: false,
},
{
query: "1 + topk(10, up)",
expectedPlan: sortByLabels,
err: false,
},
{
query: "1 + sort_desc(sum by (job) (up) )",
expectedPlan: sortByValuesDesc,
err: false,
},
{
query: "sort(topk by (job) (10, up))",
expectedPlan: sortByValuesAsc,
err: false,
},
{
query: "topk(5, up) by (job) + sort_desc(up)",
expectedPlan: sortByValuesDesc,
err: false,
},
{
query: "sort(up) + topk(5, up) by (job)",
expectedPlan: sortByValuesAsc,
err: false,
},
{
query: "sum(up) by (job)",
expectedPlan: sortByLabels,
err: false,
},
}

for _, tc := range tc {
t.Run(tc.query, func(t *testing.T) {
p, err := sortPlanForQuery(tc.query)
if tc.err {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tc.expectedPlan, p)
}
})
}
}