Skip to content

Commit 5f93620

Browse files
committed
Allow ruler to retrieve proto format query response
Signed-off-by: SungJin1212 <[email protected]>
1 parent 7fb98ab commit 5f93620

25 files changed

+973
-155
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [CHANGE] Change all max async concurrency default values `50` to `3` #6268
1010
* [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265
1111
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
12+
* [FEATURE] Ruler: Add an experimental flag `-ruler.query-response-format` to retrieve query response as a proto format. #6345
1213
* [FEATURE] Ruler: Pagination support for List Rules API. #6299
1314
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
1415
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4252,6 +4252,11 @@ The `ruler_config` configures the Cortex ruler.
42524252
# CLI flag: -ruler.frontend-address
42534253
[frontend_address: <string> | default = ""]
42544254
4255+
# [Experimental] Query response format to get query results from Query Frontend
4256+
# when the rule evaluation. Supported values: json,protobuf
4257+
# CLI flag: -ruler.query-response-format
4258+
[query_response_format: <string> | default = "protobuf"]
4259+
42554260
frontend_client:
42564261
# gRPC client max receive message size (bytes).
42574262
# CLI flag: -ruler.frontendClient.grpc-max-recv-msg-size

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ require (
8080
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3
8181
github.com/cespare/xxhash/v2 v2.3.0
8282
github.com/google/go-cmp v0.6.0
83+
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
8384
github.com/sercand/kuberesolver/v4 v4.0.0
8485
go.opentelemetry.io/collector/pdata v1.19.0
8586
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
@@ -186,7 +187,6 @@ require (
186187
github.com/mitchellh/mapstructure v1.5.0 // indirect
187188
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
188189
github.com/modern-go/reflect2 v1.0.2 // indirect
189-
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
190190
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
191191
github.com/ncw/swift v1.0.53 // indirect
192192
github.com/oklog/run v1.1.0 // indirect

integration/ruler_test.go

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,42 +1670,61 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) {
16701670
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
16711671
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
16721672
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
1673-
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
1674-
require.NoError(t, s.Start(queryFrontend))
1675-
1676-
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1677-
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1678-
}), "")
1679-
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1680-
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1681-
}), "")
1682-
require.NoError(t, s.StartAndWaitReady(ruler, querier))
1683-
1684-
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
1685-
require.NoError(t, err)
1673+
for _, format := range []string{"protobuf", "json"} {
1674+
t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) {
1675+
queryFrontendFlag := mergeFlags(flags, map[string]string{
1676+
"-ruler.query-response-format": format,
1677+
})
1678+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "")
1679+
require.NoError(t, s.Start(queryFrontend))
16861680

1687-
expression := "metric"
1688-
groupName := "rule_group"
1689-
ruleName := "rule_name"
1690-
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))
1681+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(queryFrontendFlag, map[string]string{
1682+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1683+
}), "")
1684+
require.NoError(t, s.StartAndWaitReady(querier))
16911685

1692-
rgMatcher := ruleGroupMatcher(user, namespace, groupName)
1693-
// Wait until ruler has loaded the group.
1694-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1695-
// Wait until rule group has tried to evaluate the rule.
1696-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1686+
rulerFlag := mergeFlags(queryFrontendFlag, map[string]string{
1687+
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1688+
})
1689+
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), rulerFlag, "")
1690+
require.NoError(t, s.StartAndWaitReady(ruler))
16971691

1698-
matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
1699-
// Check that cortex_ruler_query_frontend_clients went up
1700-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
1701-
// Check that cortex_ruler_queries_total went up
1702-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1703-
// Check that cortex_ruler_queries_failed_total is zero
1704-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1705-
// Check that cortex_ruler_write_requests_total went up
1706-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1707-
// Check that cortex_ruler_write_requests_failed_total is zero
1708-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1692+
t.Cleanup(func() {
1693+
_ = s.Stop(ruler)
1694+
_ = s.Stop(queryFrontend)
1695+
_ = s.Stop(querier)
1696+
})
1697+
1698+
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
1699+
require.NoError(t, err)
1700+
1701+
expression := "metric" // vector
1702+
//expression := "scalar(count(up == 1)) > bool 1" // scalar
1703+
groupName := "rule_group"
1704+
ruleName := "rule_name"
1705+
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))
1706+
1707+
rgMatcher := ruleGroupMatcher(user, namespace, groupName)
1708+
// Wait until ruler has loaded the group.
1709+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1710+
// Wait until rule group has tried to evaluate the rule.
1711+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1712+
// Make sure not to fail
1713+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1714+
1715+
matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
1716+
// Check that cortex_ruler_query_frontend_clients went up
1717+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
1718+
// Check that cortex_ruler_queries_total went up
1719+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1720+
// Check that cortex_ruler_queries_failed_total is zero
1721+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1722+
// Check that cortex_ruler_write_requests_total went up
1723+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1724+
// Check that cortex_ruler_write_requests_failed_total is zero
1725+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1726+
})
1727+
}
17091728
}
17101729

17111730
func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
474474
prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
475475
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
476476
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
477-
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
477+
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec, t.Cfg.Ruler.QueryResponseFormat)
478478

479479
queryRangeMiddlewares, cache, err := queryrange.Middlewares(
480480
t.Cfg.QueryRange,

pkg/querier/codec/protobuf_codec.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ func createPrometheusQueryResponse(resp *v1.Response) (*tripperware.PrometheusRe
5252
Samples: *getVectorSamples(data),
5353
},
5454
}
55+
case model.ValScalar.String():
56+
scalar := getScalar(data)
57+
queryResult.Result = &tripperware.PrometheusQueryResult_Scalar{
58+
Scalar: &scalar,
59+
}
5560
default:
5661
json := jsoniter.ConfigCompatibleWithStandardLibrary
5762
rawBytes, err := json.Marshal(data)
@@ -141,6 +146,15 @@ func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample {
141146
return &vectorSamples
142147
}
143148

149+
func getScalar(data *v1.QueryData) tripperware.Scalar {
150+
s := data.Result.(promql.Scalar)
151+
152+
return tripperware.Scalar{
153+
Value: s.V,
154+
TimestampMs: s.T,
155+
}
156+
}
157+
144158
func getStats(builtin *stats.BuiltinStats) *tripperware.PrometheusResponseSamplesStats {
145159
queryableSamplesStatsPerStepLen := len(builtin.Samples.TotalQueryableSamplesPerStep)
146160
queryableSamplesStatsPerStep := make([]*tripperware.PrometheusResponseQueryableSamplesStatsPerStep, queryableSamplesStatsPerStepLen)
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package codec
2+
3+
import (
4+
"testing"
5+
6+
jsoniter "github.com/json-iterator/go"
7+
"github.com/prometheus/prometheus/model/labels"
8+
"github.com/prometheus/prometheus/promql"
9+
"github.com/prometheus/prometheus/promql/parser"
10+
v1 "github.com/prometheus/prometheus/web/api/v1"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
var (
15+
json = jsoniter.Config{
16+
EscapeHTML: false, // No HTML in our responses.
17+
SortMapKeys: true,
18+
ValidateJsonRawMessage: false,
19+
}.Froze()
20+
)
21+
22+
func TestPrometheusResponse_JsonMarshal(t *testing.T) {
23+
var tests = []struct {
24+
name string
25+
resp *v1.Response
26+
expectedJson string
27+
}{
28+
{
29+
name: "scalar",
30+
resp: &v1.Response{
31+
Status: "success",
32+
Data: &v1.QueryData{
33+
ResultType: "scalar",
34+
Result: promql.Scalar{T: 1000, V: 2},
35+
},
36+
},
37+
expectedJson: `{"status":"success","data":{"resultType":"scalar","result":[1,"2"]}}`,
38+
},
39+
{
40+
name: "vector",
41+
resp: &v1.Response{
42+
Status: "success",
43+
Data: &v1.QueryData{
44+
ResultType: "vector",
45+
Result: promql.Vector{
46+
{
47+
Metric: labels.FromStrings("name", "value"),
48+
T: 1234,
49+
F: 5.67,
50+
},
51+
},
52+
},
53+
},
54+
expectedJson: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"name":"value"},"value":[1.234,"5.67"]}]}}`,
55+
},
56+
{
57+
name: "matrix",
58+
resp: &v1.Response{
59+
Status: "success",
60+
Data: &v1.QueryData{
61+
ResultType: parser.ValueTypeMatrix,
62+
Result: promql.Matrix{
63+
{
64+
Metric: labels.FromStrings("name1", "value1"),
65+
Floats: []promql.FPoint{
66+
{T: 12, F: 3.4},
67+
{T: 56, F: 7.8},
68+
},
69+
},
70+
{
71+
Metric: labels.FromStrings("name2", "value2"),
72+
Floats: []promql.FPoint{
73+
{T: 12, F: 3.4},
74+
{T: 56, F: 7.8},
75+
},
76+
},
77+
},
78+
},
79+
},
80+
expectedJson: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"name1":"value1"},"values":[[0.012,"3.4"],[0.056,"7.8"]]},{"metric":{"name2":"value2"},"values":[[0.012,"3.4"],[0.056,"7.8"]]}]}}`,
81+
},
82+
}
83+
for _, test := range tests {
84+
t.Run(test.name, func(t *testing.T) {
85+
prometheusResponse, err := createPrometheusQueryResponse(test.resp)
86+
require.NoError(t, err)
87+
88+
b, err := json.Marshal(prometheusResponse)
89+
require.NoError(t, err)
90+
require.Equal(t, test.expectedJson, string(b))
91+
})
92+
}
93+
}

pkg/querier/tripperware/instantquery/instant_query.go

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import (
1111
"time"
1212

1313
jsoniter "github.com/json-iterator/go"
14+
"github.com/munnerz/goautoneg"
1415
"github.com/opentracing/opentracing-go"
1516
otlog "github.com/opentracing/opentracing-go/log"
1617
"github.com/prometheus/common/model"
18+
v1 "github.com/prometheus/prometheus/web/api/v1"
1719
"github.com/weaveworks/common/httpgrpc"
1820
"google.golang.org/grpc/status"
1921

@@ -28,16 +30,20 @@ var (
2830
SortMapKeys: true,
2931
ValidateJsonRawMessage: false,
3032
}.Froze()
33+
34+
protobufMIMEType = v1.MIMEType{Type: "application", SubType: "x-protobuf"}
35+
jsonMIMEType = v1.MIMEType{Type: "application", SubType: "json"}
3136
)
3237

3338
type instantQueryCodec struct {
3439
tripperware.Codec
35-
compression tripperware.Compression
36-
defaultCodecType tripperware.CodecType
37-
now func() time.Time
40+
compression tripperware.Compression
41+
defaultCodecType tripperware.CodecType
42+
now func() time.Time
43+
rulerQueryResponseFormat string
3844
}
3945

40-
func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) instantQueryCodec {
46+
func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string, rulerQueryResponseFormat string) instantQueryCodec {
4147
compression := tripperware.NonCompression // default
4248
if compressionStr == string(tripperware.GzipCompression) {
4349
compression = tripperware.GzipCompression
@@ -49,9 +55,10 @@ func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) ins
4955
}
5056

5157
return instantQueryCodec{
52-
compression: compression,
53-
defaultCodecType: defaultCodecType,
54-
now: time.Now,
58+
compression: compression,
59+
defaultCodecType: defaultCodecType,
60+
now: time.Now,
61+
rulerQueryResponseFormat: rulerQueryResponseFormat,
5562
}
5663
}
5764

@@ -114,6 +121,12 @@ func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _
114121
if resp.Data.Result.GetVector().Samples == nil {
115122
resp.Data.Result.GetVector().Samples = []tripperware.Sample{}
116123
}
124+
case model.ValScalar.String():
125+
if resp.Data.Result.GetScalar() == nil {
126+
resp.Data.Result.Result = &tripperware.PrometheusQueryResult_Scalar{
127+
Scalar: &tripperware.Scalar{},
128+
}
129+
}
117130
}
118131

119132
if resp.Headers == nil {
@@ -167,7 +180,7 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
167180
return req.WithContext(ctx), nil
168181
}
169182

170-
func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Response) (*http.Response, error) {
183+
func (c instantQueryCodec) EncodeResponse(ctx context.Context, req *http.Request, res tripperware.Response) (*http.Response, error) {
171184
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
172185
defer sp.Finish()
173186

@@ -176,7 +189,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
176189
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format")
177190
}
178191

179-
b, err := json.Marshal(a)
192+
contentType, b, err := marshalResponse(a, req.Header.Get("Accept"))
180193
if err != nil {
181194
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err)
182195
}
@@ -185,7 +198,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
185198

186199
resp := http.Response{
187200
Header: http.Header{
188-
"Content-Type": []string{tripperware.ApplicationJson},
201+
"Content-Type": []string{contentType},
189202
},
190203
Body: io.NopCloser(bytes.NewBuffer(b)),
191204
StatusCode: http.StatusOK,
@@ -213,3 +226,22 @@ func decorateWithParamName(err error, field string) error {
213226
}
214227
return fmt.Errorf(errTmpl, field, err)
215228
}
229+
230+
func marshalResponse(resp *tripperware.PrometheusResponse, acceptHeader string) (string, []byte, error) {
231+
if acceptHeader == "" {
232+
b, err := json.Marshal(resp)
233+
return tripperware.ApplicationJson, b, err
234+
}
235+
236+
for _, clause := range goautoneg.ParseAccept(acceptHeader) {
237+
if jsonMIMEType.Satisfies(clause) {
238+
b, err := json.Marshal(resp)
239+
return tripperware.ApplicationJson, b, err
240+
} else if protobufMIMEType.Satisfies(clause) {
241+
b, err := resp.Marshal()
242+
return tripperware.ApplicationProtobuf, b, err
243+
}
244+
}
245+
246+
return "", nil, fmt.Errorf("failed to marshal response with accept header: %s", acceptHeader)
247+
}

0 commit comments

Comments
 (0)