Skip to content

Commit 06fbe27

Browse files
committed
Propagate the stats via gRPC
Only partially done, still need to merge & record the results in the query frontend. Signed-off-by: Tom Wilkie <[email protected]>
1 parent 07b09de commit 06fbe27

File tree

10 files changed

+262
-63
lines changed

10 files changed

+262
-63
lines changed

pkg/frontend/v1/frontendv1pb/frontend.pb.go

Lines changed: 97 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/frontend/v1/frontendv1pb/frontend.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ option go_package = "frontendv1pb";
88

99
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
1010
import "github.com/weaveworks/common/httpgrpc/httpgrpc.proto";
11+
import "github.com/cortexproject/cortex/pkg/querier/stats/stats.proto";
1112

1213
option (gogoproto.marshaler_all) = true;
1314
option (gogoproto.unmarshaler_all) = true;
@@ -31,4 +32,5 @@ message FrontendToClient {
3132
message ClientToFrontend {
3233
httpgrpc.HTTPResponse httpResponse = 1;
3334
string clientID = 2;
35+
stats.Stats stats = 3;
3436
}

pkg/frontend/v2/frontend.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"go.uber.org/atomic"
2121

2222
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
23+
"github.com/cortexproject/cortex/pkg/querier/stats"
2324
"github.com/cortexproject/cortex/pkg/util/flagext"
2425
"github.com/cortexproject/cortex/pkg/util/grpcclient"
2526
"github.com/cortexproject/cortex/pkg/util/grpcutil"
@@ -79,7 +80,7 @@ type frontendRequest struct {
7980
cancel context.CancelFunc
8081

8182
enqueue chan enqueueResult
82-
response chan *httpgrpc.HTTPResponse
83+
response chan *frontendv2pb.QueryResultRequest
8384
}
8485

8586
type enqueueStatus int
@@ -178,7 +179,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
178179
// Buffer of 1 to ensure response or error can be written to the channel
179180
// even if this goroutine goes away due to client context cancellation.
180181
enqueue: make(chan enqueueResult, 1),
181-
response: make(chan *httpgrpc.HTTPResponse, 1),
182+
response: make(chan *frontendv2pb.QueryResultRequest, 1),
182183
}
183184

184185
f.requests.put(freq)
@@ -228,7 +229,9 @@ enqueueAgain:
228229
return nil, ctx.Err()
229230

230231
case resp := <-freq.response:
231-
return resp, nil
232+
stats := stats.FromContext(ctx)
233+
stats.Merge(resp.Stats)
234+
return resp.HttpResponse, nil
232235
}
233236
}
234237

@@ -244,7 +247,7 @@ func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryRes
244247
// To avoid mixing results from different queries, we randomize queryID counter on start.
245248
if req != nil && req.userID == userID {
246249
select {
247-
case req.response <- qrReq.HttpResponse:
250+
case req.response <- qrReq:
248251
// Should always be possible, unless QueryResult is called multiple times with the same queryID.
249252
default:
250253
level.Warn(f.log).Log("msg", "failed to write query result to the response channel", "queryID", qrReq.QueryID, "user", userID)

pkg/frontend/v2/frontend_scheduler_worker.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/weaveworks/common/httpgrpc"
1313
"google.golang.org/grpc"
1414

15+
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
1516
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
1617
"github.com/cortexproject/cortex/pkg/util"
1718
"github.com/cortexproject/cortex/pkg/util/services"
@@ -285,16 +286,20 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
285286

286287
case schedulerpb.ERROR:
287288
req.enqueue <- enqueueResult{status: waitForResponse}
288-
req.response <- &httpgrpc.HTTPResponse{
289-
Code: http.StatusInternalServerError,
290-
Body: []byte(err.Error()),
289+
req.response <- &frontendv2pb.QueryResultRequest{
290+
HttpResponse: &httpgrpc.HTTPResponse{
291+
Code: http.StatusInternalServerError,
292+
Body: []byte(err.Error()),
293+
},
291294
}
292295

293296
case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT:
294297
req.enqueue <- enqueueResult{status: waitForResponse}
295-
req.response <- &httpgrpc.HTTPResponse{
296-
Code: http.StatusTooManyRequests,
297-
Body: []byte("too many outstanding requests"),
298+
req.response <- &frontendv2pb.QueryResultRequest{
299+
HttpResponse: &httpgrpc.HTTPResponse{
300+
Code: http.StatusTooManyRequests,
301+
Body: []byte("too many outstanding requests"),
302+
},
298303
}
299304
}
300305

pkg/frontend/v2/frontend_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"google.golang.org/grpc"
1818

1919
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
20+
"github.com/cortexproject/cortex/pkg/querier/stats"
2021
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
2122
"github.com/cortexproject/cortex/pkg/util/flagext"
2223
"github.com/cortexproject/cortex/pkg/util/services"
@@ -87,6 +88,7 @@ func sendResponseWithDelay(f *Frontend, delay time.Duration, userID string, quer
8788
_, _ = f.QueryResult(ctx, &frontendv2pb.QueryResultRequest{
8889
QueryID: queryID,
8990
HttpResponse: resp,
91+
Stats: &stats.Stats{},
9092
})
9193
}
9294

0 commit comments

Comments
 (0)