Skip to content

Commit 20692fa

Browse files
committed
Address comments
Signed-off-by: Justin Jung <[email protected]>
1 parent e40406a commit 20692fa

File tree

11 files changed

+118
-97
lines changed

11 files changed

+118
-97
lines changed

pkg/frontend/transport/roundtripper.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ package transport
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"io"
78
"net/http"
89
"net/url"
10+
"regexp"
11+
"strings"
912
"time"
1013

1114
"github.com/weaveworks/common/httpgrpc"
@@ -14,7 +17,7 @@ import (
1417

1518
// GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages.
1619
type GrpcRoundTripper interface {
17-
RoundTripGRPC(context.Context, url.Values, time.Time, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
20+
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest, url.Values, time.Time) (*httpgrpc.HTTPResponse, error)
1821
}
1922

2023
func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper {
@@ -36,16 +39,27 @@ func (b *buffer) Bytes() []byte {
3639
}
3740

3841
func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) {
42+
regexp.MustCompile("str")
3943
req, err := server.HTTPRequest(r)
4044
if err != nil {
4145
return nil, err
4246
}
47+
fmt.Println(r.URL.Path)
4348

44-
if err = r.ParseForm(); err != nil {
45-
return nil, err
49+
var (
50+
resp *httpgrpc.HTTPResponse
51+
reqValues url.Values
52+
ts time.Time
53+
)
54+
55+
if strings.HasSuffix(r.URL.Path, "/query") || strings.HasSuffix(r.URL.Path, "/query_range") {
56+
if err = r.ParseForm(); err == nil {
57+
reqValues = r.Form
58+
ts = time.Now()
59+
}
4660
}
4761

48-
resp, err := a.roundTripper.RoundTripGRPC(r.Context(), r.Form, time.Now(), req)
62+
resp, err = a.roundTripper.RoundTripGRPC(r.Context(), req, reqValues, ts)
4963
if err != nil {
5064
return nil, err
5165
}

pkg/frontend/v1/frontend.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,14 @@ type request struct {
9595
queueSpan opentracing.Span
9696
originalCtx context.Context
9797

98-
request *httpgrpc.HTTPRequest
99-
err chan error
100-
response chan *httpgrpc.HTTPResponse
101-
isHighPriority bool
98+
request *httpgrpc.HTTPRequest
99+
err chan error
100+
response chan *httpgrpc.HTTPResponse
101+
highPriority bool
102102
}
103103

104104
func (r request) IsHighPriority() bool {
105-
return r.isHighPriority
105+
return r.highPriority
106106
}
107107

108108
// New creates a new frontend. Frontend implements service, and must be started and stopped.
@@ -177,7 +177,7 @@ func (f *Frontend) cleanupInactiveUserMetrics(user string) {
177177
}
178178

179179
// RoundTripGRPC round trips a proto (instead of a HTTP request).
180-
func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, timestamp time.Time, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
180+
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, reqParams url.Values, ts time.Time) (*httpgrpc.HTTPResponse, error) {
181181
// Propagate trace context in gRPC too - this will be ignored if using HTTP.
182182
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
183183
if tracer != nil && span != nil {
@@ -196,9 +196,8 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values,
196196

197197
return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
198198
request := request{
199-
request: req,
200-
originalCtx: ctx,
201-
isHighPriority: util_query.IsHighPriority(requestParams, timestamp, f.limits.HighPriorityQueries(userID)),
199+
request: req,
200+
originalCtx: ctx,
202201

203202
// Buffer of 1 to ensure response can be written by the server side
204203
// of the Process stream, even if this goroutine goes away due to
@@ -207,6 +206,10 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values,
207206
response: make(chan *httpgrpc.HTTPResponse, 1),
208207
}
209208

209+
if reqParams != nil {
210+
request.highPriority = util_query.IsHighPriority(reqParams, ts, f.limits.HighPriorityQueries(userID))
211+
}
212+
210213
if err := f.queueRequest(ctx, &request); err != nil {
211214
return nil, err
212215
}

pkg/frontend/v2/frontend.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010
"sync"
1111
"time"
1212

13-
"github.com/cortexproject/cortex/pkg/scheduler"
14-
1513
"github.com/go-kit/log"
1614
"github.com/go-kit/log/level"
1715
"github.com/opentracing/opentracing-go"
@@ -24,6 +22,7 @@ import (
2422
"github.com/cortexproject/cortex/pkg/frontend/transport"
2523
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
2624
"github.com/cortexproject/cortex/pkg/querier/stats"
25+
"github.com/cortexproject/cortex/pkg/scheduler"
2726
"github.com/cortexproject/cortex/pkg/tenant"
2827
"github.com/cortexproject/cortex/pkg/util/flagext"
2928
"github.com/cortexproject/cortex/pkg/util/grpcclient"
@@ -86,11 +85,11 @@ type Frontend struct {
8685
}
8786

8887
type frontendRequest struct {
89-
queryID uint64
90-
request *httpgrpc.HTTPRequest
91-
userID string
92-
statsEnabled bool
93-
isHighPriority bool
88+
queryID uint64
89+
request *httpgrpc.HTTPRequest
90+
userID string
91+
statsEnabled bool
92+
highPriority bool
9493

9594
cancel context.CancelFunc
9695

@@ -171,7 +170,7 @@ func (f *Frontend) stopping(_ error) error {
171170
}
172171

173172
// RoundTripGRPC round trips a proto (instead of a HTTP request).
174-
func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, timestamp time.Time, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
173+
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, reqParams url.Values, ts time.Time) (*httpgrpc.HTTPResponse, error) {
175174
if s := f.State(); s != services.Running {
176175
return nil, fmt.Errorf("frontend not running: %v", s)
177176
}
@@ -196,11 +195,10 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values,
196195

197196
return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
198197
freq := &frontendRequest{
199-
queryID: f.lastQueryID.Inc(),
200-
request: req,
201-
userID: userID,
202-
statsEnabled: stats.IsEnabled(ctx),
203-
isHighPriority: util_query.IsHighPriority(requestParams, timestamp, f.limits.HighPriorityQueries(userID)),
198+
queryID: f.lastQueryID.Inc(),
199+
request: req,
200+
userID: userID,
201+
statsEnabled: stats.IsEnabled(ctx),
204202

205203
cancel: cancel,
206204

@@ -212,6 +210,10 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values,
212210
retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1,
213211
}
214212

213+
if reqParams != nil {
214+
freq.highPriority = util_query.IsHighPriority(reqParams, ts, f.limits.HighPriorityQueries(userID))
215+
}
216+
215217
f.requests.put(freq)
216218
defer f.requests.delete(freq.queryID)
217219

pkg/frontend/v2/frontend_scheduler_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
263263
HttpRequest: req.request,
264264
FrontendAddress: w.frontendAddr,
265265
StatsEnabled: req.statsEnabled,
266-
IsHighPriority: req.isHighPriority,
266+
HighPriority: req.highPriority,
267267
})
268268

269269
if err != nil {

pkg/scheduler/queue/queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func TestRequestQueue_QueriersShouldGetHighPriorityQueryFirst(t *testing.T) {
174174
isHighPriority: false,
175175
}
176176
normalRequest2 := MockRequest{
177-
id: "normal query 1",
177+
id: "normal query 2",
178178
isHighPriority: false,
179179
}
180180
highPriorityRequest := MockRequest{

pkg/scheduler/queue/user_queues.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type Limits interface {
2020
// ReservedHighPriorityQueriers returns the minimum number of queriers dedicated for high priority
2121
// queue per tenant. All queriers still handle priority queue first, but this provides extra protection on
2222
// high priority queries from slow normal queries exhausting all queriers.
23-
// If ReservedHighPriorityQueriers is capped by MaxQueriersPerUser.
23+
// ReservedHighPriorityQueriers is capped by MaxQueriersPerUser.
2424
// If less than 1, it will be applied as a percentage of MaxQueriersPerUser.
2525
ReservedHighPriorityQueriers(user string) float64
2626

@@ -118,7 +118,7 @@ func (q *queues) deleteQueue(userID string) {
118118
// MaxQueriers is used to compute which queriers should handle requests for this user.
119119
// If maxQueriers is <= 0, all queriers can handle this user's requests.
120120
// If maxQueriers has changed since the last call, queriers for this are recomputed.
121-
func (q *queues) getOrAddQueue(userID string, maxQueriers int, isHighPriority bool) chan Request {
121+
func (q *queues) getOrAddQueue(userID string, maxQueriers int, highPriority bool) chan Request {
122122
// Empty user is not allowed, as that would break our users list ("" is used for free spot).
123123
if userID == "" {
124124
return nil
@@ -166,7 +166,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int, isHighPriority bo
166166
uq.queriers, uq.reservedQueriers = shuffleQueriersForUser(uq.seed, maxQueriers, q.sortedQueriers, q.limits.ReservedHighPriorityQueriers(userID), nil)
167167
}
168168

169-
if isHighPriority {
169+
if highPriority {
170170
return uq.highPriorityQueue
171171
}
172172

pkg/scheduler/scheduler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ type schedulerRequest struct {
154154
queryID uint64
155155
request *httpgrpc.HTTPRequest
156156
statsEnabled bool
157-
isHighPriority bool
157+
highPriority bool
158158

159159
enqueueTime time.Time
160160

@@ -167,7 +167,7 @@ type schedulerRequest struct {
167167
}
168168

169169
func (s schedulerRequest) IsHighPriority() bool {
170-
return s.isHighPriority
170+
return s.highPriority
171171
}
172172

173173
// FrontendLoop handles connection from frontend.
@@ -298,7 +298,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr
298298
queryID: msg.QueryID,
299299
request: msg.HttpRequest,
300300
statsEnabled: msg.StatsEnabled,
301-
isHighPriority: msg.IsHighPriority,
301+
highPriority: msg.HighPriority,
302302
}
303303

304304
now := time.Now()

0 commit comments

Comments
 (0)