Skip to content

Commit 7e750b4

Browse files
committed
refactor query frontend to return prometheus error response
Signed-off-by: Ben Ye <[email protected]>
1 parent e961770 commit 7e750b4

File tree

15 files changed

+241
-119
lines changed

15 files changed

+241
-119
lines changed

integration/query_frontend_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,3 +530,91 @@ func TestQueryFrontendNoRetryChunkPool(t *testing.T) {
530530
// We shouldn't be able to see any retries.
531531
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_query_frontend_retries"}, e2e.WaitMissingMetrics))
532532
}
533+
534+
func TestQueryFrontendErrorResponseFormat(t *testing.T) {
535+
const blockRangePeriod = 5 * time.Second
536+
537+
s, err := e2e.NewScenario(networkName)
538+
require.NoError(t, err)
539+
defer s.Close()
540+
541+
// Configure the blocks storage to frequently compact TSDB head
542+
// and ship blocks to the storage.
543+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
544+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
545+
"-blocks-storage.tsdb.ship-interval": "1s",
546+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
547+
"-blocks-storage.bucket-store.max-chunk-pool-bytes": "1",
548+
})
549+
550+
// Start dependencies.
551+
consul := e2edb.NewConsul()
552+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
553+
require.NoError(t, s.StartAndWaitReady(consul, minio))
554+
555+
// Start Cortex components for the write path.
556+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
557+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
558+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
559+
560+
// Wait until the distributor has updated the ring.
561+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
562+
563+
// Push some series to Cortex.
564+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
565+
require.NoError(t, err)
566+
567+
seriesTimestamp := time.Now()
568+
series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2)
569+
series1, _ := generateSeries("series_1", seriesTimestamp, prompb.Label{Name: "job", Value: "test"})
570+
series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "job", Value: "test"})
571+
572+
res, err := c.Push(series1)
573+
require.NoError(t, err)
574+
require.Equal(t, 200, res.StatusCode)
575+
576+
res, err = c.Push(series2)
577+
require.NoError(t, err)
578+
require.Equal(t, 200, res.StatusCode)
579+
580+
// Wait until the TSDB head is compacted and shipped to the storage.
581+
// The shipped block contains the 1st series, while the 2ns series is in the head.
582+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total"))
583+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total"))
584+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total"))
585+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series"))
586+
587+
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
588+
require.NoError(t, s.Start(queryFrontend))
589+
590+
// Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check.
591+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
592+
"-blocks-storage.bucket-store.sync-interval": "5s",
593+
}), "")
594+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
595+
"-blocks-storage.bucket-store.sync-interval": "1s",
596+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
597+
}), "")
598+
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))
599+
600+
// Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check
601+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
602+
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
603+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount))
604+
605+
// Sleep 3 * bucket sync interval to make sure consistency checker
606+
// doesn't consider block is uploaded recently.
607+
time.Sleep(3 * time.Second)
608+
609+
// Query back the series.
610+
c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1")
611+
require.NoError(t, err)
612+
613+
// We expect request to hit chunk pool exhaustion.
614+
resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp)
615+
require.NoError(t, err)
616+
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
617+
require.Contains(t, string(body), pool.ErrPoolExhausted.Error())
618+
// We shouldn't be able to see any retries.
619+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_query_frontend_retries"}, e2e.WaitMissingMetrics))
620+
}

pkg/frontend/transport/handler.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@ import (
1616

1717
"github.com/go-kit/log"
1818
"github.com/go-kit/log/level"
19+
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
1920
"github.com/prometheus/client_golang/prometheus"
2021
"github.com/prometheus/client_golang/prometheus/promauto"
2122
"github.com/weaveworks/common/httpgrpc"
22-
"github.com/weaveworks/common/httpgrpc/server"
23+
"google.golang.org/grpc/codes"
2324
"google.golang.org/grpc/status"
2425

2526
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
2627
"github.com/cortexproject/cortex/pkg/querier/tripperware"
2728
"github.com/cortexproject/cortex/pkg/tenant"
2829
"github.com/cortexproject/cortex/pkg/util"
30+
util_api "github.com/cortexproject/cortex/pkg/util/api"
2931
util_log "github.com/cortexproject/cortex/pkg/util/log"
3032
)
3133

@@ -239,8 +241,9 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
239241
writeServiceTimingHeader(queryResponseTime, hs, stats)
240242
}
241243

244+
logger := util_log.WithContext(r.Context(), f.log)
242245
if err != nil {
243-
writeError(w, err, hs)
246+
writeError(logger, w, err, hs)
244247
return
245248
}
246249

@@ -252,7 +255,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
252255
// log copy response body error so that we will know even though success response code returned
253256
bytesCopied, err := io.Copy(w, resp.Body)
254257
if err != nil && !errors.Is(err, syscall.EPIPE) {
255-
level.Error(util_log.WithContext(r.Context(), f.log)).Log("msg", "write response body error", "bytesCopied", bytesCopied, "err", err)
258+
level.Error(logger).Log("msg", "write response body error", "bytesCopied", bytesCopied, "err", err)
256259
}
257260
}
258261

@@ -441,7 +444,7 @@ func formatQueryString(queryString url.Values) (fields []interface{}) {
441444
return fields
442445
}
443446

444-
func writeError(w http.ResponseWriter, err error, additionalHeaders http.Header) {
447+
func writeError(logger log.Logger, w http.ResponseWriter, err error, additionalHeaders http.Header) {
445448
switch err {
446449
case context.Canceled:
447450
err = errCanceled
@@ -453,20 +456,35 @@ func writeError(w http.ResponseWriter, err error, additionalHeaders http.Header)
453456
}
454457
}
455458

459+
headers := w.Header()
460+
for k, values := range additionalHeaders {
461+
for _, value := range values {
462+
headers.Set(k, value)
463+
}
464+
}
456465
resp, ok := httpgrpc.HTTPResponseFromError(err)
457466
if ok {
458-
for k, values := range additionalHeaders {
459-
resp.Headers = append(resp.Headers, &httpgrpc.Header{Key: k, Values: values})
467+
code := int(resp.Code)
468+
var errTyp v1.ErrorType
469+
switch resp.Code {
470+
case http.StatusBadRequest, http.StatusRequestEntityTooLarge:
471+
errTyp = v1.ErrBadData
472+
case StatusClientClosedRequest:
473+
errTyp = v1.ErrCanceled
474+
case http.StatusGatewayTimeout:
475+
errTyp = v1.ErrTimeout
476+
case http.StatusUnprocessableEntity:
477+
errTyp = v1.ErrExec
478+
case int32(codes.PermissionDenied):
479+
// Convert gRPC status code to HTTP status code.
480+
code = http.StatusUnprocessableEntity
481+
errTyp = v1.ErrBadData
482+
default:
483+
errTyp = v1.ErrServer
460484
}
461-
_ = server.WriteResponse(w, resp)
485+
util_api.RespondError(logger, w, errTyp, string(resp.Body), code)
462486
} else {
463-
headers := w.Header()
464-
for k, values := range additionalHeaders {
465-
for _, value := range values {
466-
headers.Set(k, value)
467-
}
468-
}
469-
http.Error(w, err.Error(), http.StatusInternalServerError)
487+
util_api.RespondError(logger, w, v1.ErrServer, err.Error(), http.StatusInternalServerError)
470488
}
471489
}
472490

pkg/frontend/transport/handler_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/weaveworks/common/user"
2222

2323
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
24+
util_log "github.com/cortexproject/cortex/pkg/util/log"
2425
)
2526

2627
type roundTripperFunc func(*http.Request) (*http.Response, error)
@@ -46,7 +47,7 @@ func TestWriteError(t *testing.T) {
4647
} {
4748
t.Run(test.err.Error(), func(t *testing.T) {
4849
w := httptest.NewRecorder()
49-
writeError(w, test.err, test.additionalHeaders)
50+
writeError(util_log.Logger, w, test.err, test.additionalHeaders)
5051
require.Equal(t, test.status, w.Result().StatusCode)
5152
expectedAdditionalHeaders := test.additionalHeaders
5253
if expectedAdditionalHeaders != nil {

pkg/querier/tripperware/instantquery/limits.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ func (l limitsMiddleware) Do(ctx context.Context, r tripperware.Request) (trippe
4848
if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength); maxQueryLength > 0 {
4949
expr, err := parser.ParseExpr(r.GetQuery())
5050
if err != nil {
51-
// Let Querier propagates the parsing error.
52-
return l.next.Do(ctx, r)
51+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
5352
}
5453

5554
// Enforce query length across all selectors in the query.

pkg/querier/tripperware/instantquery/limits_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package instantquery
22

33
import (
44
"context"
5+
"net/http"
56
"testing"
67
"time"
78

9+
"github.com/prometheus/prometheus/promql/parser"
810
"github.com/stretchr/testify/assert"
911
"github.com/stretchr/testify/mock"
1012
"github.com/stretchr/testify/require"
13+
"github.com/weaveworks/common/httpgrpc"
1114
"github.com/weaveworks/common/user"
1215

1316
"github.com/cortexproject/cortex/pkg/querier/tripperware"
@@ -20,6 +23,9 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
2023
thirtyDays = 30 * 24 * time.Hour
2124
)
2225

26+
wrongQuery := `up[`
27+
_, parserErr := parser.ParseExpr(wrongQuery)
28+
2329
tests := map[string]struct {
2430
maxQueryLength time.Duration
2531
query string
@@ -31,6 +37,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
3137
"even though failed to parse expression, should return no error since request will pass to next middleware": {
3238
query: `up[`,
3339
maxQueryLength: thirtyDays,
40+
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, parserErr.Error()).Error(),
3441
},
3542
"should succeed on a query not exceeding time range": {
3643
query: `up`,

pkg/querier/tripperware/queryrange/limits.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ func (l limitsMiddleware) Do(ctx context.Context, r tripperware.Request) (trippe
8484

8585
expr, err := parser.ParseExpr(r.GetQuery())
8686
if err != nil {
87-
// Let Querier propagates the parsing error.
88-
return l.next.Do(ctx, r)
87+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
8988
}
9089

9190
// Enforce query length across all selectors in the query.

pkg/querier/tripperware/queryrange/limits_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package queryrange
22

33
import (
44
"context"
5+
"github.com/prometheus/prometheus/promql/parser"
6+
"github.com/weaveworks/common/httpgrpc"
7+
"net/http"
58
"testing"
69
"time"
710

@@ -115,6 +118,9 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
115118

116119
now := time.Now()
117120

121+
wrongQuery := `up[`
122+
_, parserErr := parser.ParseExpr(wrongQuery)
123+
118124
tests := map[string]struct {
119125
maxQueryLength time.Duration
120126
query string
@@ -132,6 +138,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
132138
reqStartTime: now.Add(-time.Hour),
133139
reqEndTime: now,
134140
maxQueryLength: thirtyDays,
141+
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, parserErr.Error()).Error(),
135142
},
136143
"should succeed on a query on short time range, ending now": {
137144
maxQueryLength: thirtyDays,

pkg/querier/tripperware/queryrange/split_by_interval.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,7 @@ func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripper
4747
// to line up the boundaries with step.
4848
reqs, err := splitQuery(r, s.interval(r))
4949
if err != nil {
50-
// If the query itself is bad, we don't return error but send the query
51-
// to querier to return the expected error message. This is not very efficient
52-
// but should be okay for now.
53-
// TODO(yeya24): query frontend can reuse the Prometheus API handler and return
54-
// expected error message locally without passing it to the querier through network.
55-
return s.next.Do(ctx, r)
50+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
5651
}
5752
s.splitByCounter.Add(float64(len(reqs)))
5853

pkg/querier/tripperware/roundtrip.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func NewQueryTripperware(
143143
tenantIDs, err := tenant.TenantIDs(r.Context())
144144
// This should never happen anyways because we have auth middleware before this.
145145
if err != nil {
146-
return nil, err
146+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
147147
}
148148
now := time.Now()
149149
userStr := tenant.JoinTenantIDs(tenantIDs)
@@ -162,8 +162,7 @@ func NewQueryTripperware(
162162

163163
expr, err := parser.ParseExpr(query)
164164
if err != nil {
165-
// If query is invalid, no need to go through tripperwares for further splitting.
166-
return next.RoundTrip(r)
165+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
167166
}
168167

169168
reqStats := stats.FromContext(r.Context())
@@ -189,7 +188,10 @@ func NewQueryTripperware(
189188
return next.RoundTrip(r)
190189
}
191190
analysis, err := queryAnalyzer.Analyze(query)
192-
if err != nil || !analysis.IsShardable() {
191+
if err != nil {
192+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
193+
}
194+
if !analysis.IsShardable() {
193195
return next.RoundTrip(r)
194196
}
195197
return instantQuery.RoundTrip(r)

pkg/querier/tripperware/shard_by.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {
5555
analysis, err := s.analyzer.Analyze(r.GetQuery())
5656
if err != nil {
5757
level.Warn(logger).Log("msg", "error analyzing query", "q", r.GetQuery(), "err", err)
58+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
5859
}
5960

6061
stats.AddExtraFields(
@@ -63,7 +64,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {
6364
"shard_by.sharding_labels", analysis.ShardingLabels(),
6465
)
6566

66-
if err != nil || !analysis.IsShardable() {
67+
if !analysis.IsShardable() {
6768
return s.next.Do(ctx, r)
6869
}
6970

0 commit comments

Comments
 (0)