Skip to content

Commit 17bdef6

Browse files
committed
Add reserved querier logic
Signed-off-by: Justin Jung <[email protected]>
1 parent 9970bce commit 17bdef6

File tree

14 files changed

+226
-157
lines changed

14 files changed

+226
-157
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
* [FEATURE] Store Gateway: Add `-blocks-storage.bucket-store.max-inflight-requests` for store gateways to reject further requests upon reaching the limit. #5553
4343
* [FEATURE] Store Gateway: Add `cortex_bucket_store_block_load_duration_seconds` histogram to track time to load blocks. #5580
4444
* [FEATURE] AlertManager: Add `cortex_alertmanager_dispatcher_aggregation_groups` and `cortex_alertmanager_dispatcher_alert_processing_duration_seconds` metrics for dispatcher. #5592
45-
* [FEATURE] Query Frontend/Scheduler: Add high-priority query support. #5605
45+
* [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605
4646
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
4747
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
4848
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323

pkg/frontend/v1/frontend.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"net/http"
88
"net/url"
9+
"reflect"
910
"time"
1011

1112
"github.com/go-kit/log"
@@ -79,6 +80,9 @@ type Frontend struct {
7980
requestQueue *queue.RequestQueue
8081
activeUsers *util.ActiveUsersCleanupService
8182

83+
queryPriority validation.QueryPriority
84+
compiledQueryPriority validation.QueryPriority
85+
8286
// Subservices manager.
8387
subservices *services.Manager
8488
subservicesWatcher *services.FailureWatcher
@@ -207,8 +211,15 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest,
207211
}
208212

209213
queryPriority := f.limits.QueryPriority(userID)
214+
210215
if reqParams != nil && queryPriority.Enabled {
211-
request.priority = util_query.GetPriority(reqParams, ts, queryPriority)
216+
queryPriorityChanged := !reflect.DeepEqual(f.queryPriority, queryPriority)
217+
if queryPriorityChanged {
218+
f.queryPriority = queryPriority
219+
f.compiledQueryPriority = queryPriority
220+
}
221+
222+
request.priority = util_query.GetPriority(reqParams, ts, &f.compiledQueryPriority, queryPriorityChanged)
212223
}
213224

214225
if err := f.queueRequest(ctx, &request); err != nil {

pkg/frontend/v2/frontend.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"math/rand"
88
"net/http"
99
"net/url"
10+
"reflect"
1011
"sync"
1112
"time"
1213

@@ -30,6 +31,7 @@ import (
3031
util_log "github.com/cortexproject/cortex/pkg/util/log"
3132
util_query "github.com/cortexproject/cortex/pkg/util/query"
3233
"github.com/cortexproject/cortex/pkg/util/services"
34+
"github.com/cortexproject/cortex/pkg/util/validation"
3335
)
3436

3537
// Config for a Frontend.
@@ -74,6 +76,9 @@ type Frontend struct {
7476

7577
lastQueryID atomic.Uint64
7678

79+
queryPriority validation.QueryPriority
80+
compiledQueryPriority validation.QueryPriority
81+
7782
// frontend workers will read from this channel, and send request to scheduler.
7883
requestsCh chan *frontendRequest
7984

@@ -211,8 +216,15 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest,
211216
}
212217

213218
queryPriority := f.limits.QueryPriority(userID)
219+
214220
if reqParams != nil && queryPriority.Enabled {
215-
freq.priority = util_query.GetPriority(reqParams, ts, queryPriority)
221+
queryPriorityChanged := !reflect.DeepEqual(f.queryPriority, queryPriority)
222+
if queryPriorityChanged {
223+
f.queryPriority = queryPriority
224+
f.compiledQueryPriority = queryPriority
225+
}
226+
227+
freq.priority = util_query.GetPriority(reqParams, ts, &f.compiledQueryPriority, queryPriorityChanged)
216228
}
217229

218230
f.requests.put(freq)

pkg/scheduler/queue/queue.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers fl
9898
}
9999

100100
shardSize := util.DynamicShardSize(maxQueriers, len(q.queues.queriers))
101-
queue := q.queues.getOrAddQueue(userID, shardSize)
101+
priorityList, priorityEnabled := q.getPriorityList(userID)
102+
queue := q.queues.getOrAddQueue(userID, shardSize, priorityList, priorityEnabled)
102103
maxOutstandingRequests := q.queues.limits.MaxOutstandingPerTenant(userID)
103104

104105
if queue == nil {
@@ -123,6 +124,24 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers fl
123124
return nil
124125
}
125126

127+
func (q *RequestQueue) getPriorityList(userID string) ([]int64, bool) {
128+
var priorityList []int64
129+
130+
queryPriority := q.queues.limits.QueryPriority(userID)
131+
132+
if queryPriority.Enabled {
133+
for _, priority := range queryPriority.Priorities {
134+
reservedQuerierShardSize := util.DynamicShardSize(priority.ReservedQueriers, len(q.queues.queriers))
135+
136+
for i := 0; i < reservedQuerierShardSize; i++ {
137+
priorityList = append(priorityList, priority.Priority)
138+
}
139+
}
140+
}
141+
142+
return priorityList, queryPriority.Enabled
143+
}
144+
126145
// GetNextRequestForQuerier find next user queue and takes the next request off of it. Will block if there are no requests.
127146
// By passing user index from previous call of this method, querier guarantees that it iterates over all users fairly.
128147
// If querier finds that request from the user is already expired, it can get a request for the same user by using UserIndex.ReuseLastUser.
@@ -156,7 +175,8 @@ FindQueue:
156175

157176
// Pick next request from the queue.
158177
for {
159-
request := queue.dequeueRequest(q.queues.getMinPriority(userID, querierID))
178+
minPriority, checkMinPriority := q.queues.getMinPriority(userID, querierID)
179+
request := queue.dequeueRequest(minPriority, checkMinPriority)
160180
if request == nil {
161181
// the queue does not contain request with the min priority, break to wait for more requests
162182
break

pkg/scheduler/queue/queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package queue
33
import (
44
"context"
55
"fmt"
6-
"github.com/cortexproject/cortex/pkg/util/validation"
76
"strconv"
87
"sync"
98
"testing"
@@ -14,6 +13,7 @@ import (
1413
"github.com/stretchr/testify/require"
1514

1615
"github.com/cortexproject/cortex/pkg/util/services"
16+
"github.com/cortexproject/cortex/pkg/util/validation"
1717
)
1818

1919
func BenchmarkGetNextRequest(b *testing.B) {

pkg/scheduler/queue/user_queues.go

Lines changed: 97 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package queue
22

33
import (
44
"math/rand"
5+
"reflect"
56
"sort"
67
"time"
78

@@ -63,8 +64,11 @@ type userQueue struct {
6364

6465
// If not nil, only these queriers can handle user requests. If nil, all queriers can.
6566
// We set this to nil if number of available queriers <= maxQueriers.
66-
queriers map[string]struct{}
67-
maxQueriers int
67+
queriers map[string]struct{}
68+
reservedQueriers map[string]int64
69+
priorityList []int64
70+
priorityEnabled bool
71+
maxQueriers int
6872

6973
// Seed for shuffle sharding of queriers. This seed is based on userID only and is therefore consistent
7074
// between different frontends.
@@ -110,7 +114,7 @@ func (q *queues) deleteQueue(userID string) {
110114
// MaxQueriers is used to compute which queriers should handle requests for this user.
111115
// If maxQueriers is <= 0, all queriers can handle this user's requests.
112116
// If maxQueriers has changed since the last call, queriers for this are recomputed.
113-
func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue {
117+
func (q *queues) getOrAddQueue(userID string, maxQueriers int, priorityList []int64, priorityEnabled bool) userRequestQueue {
114118
// Empty user is not allowed, as that would break our users list ("" is used for free spot).
115119
if userID == "" {
116120
return nil
@@ -121,48 +125,12 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue
121125
}
122126

123127
uq := q.userQueues[userID]
124-
queryPriority := q.limits.QueryPriority(userID)
125128

126129
if uq == nil {
127-
queueSize := q.limits.MaxOutstandingPerTenant(userID)
128-
// 0 is the default value of the flag. If the old flag is set
129-
// then we use its value for compatibility reason.
130-
if q.maxUserQueueSize != 0 {
131-
queueSize = q.maxUserQueueSize
132-
}
133-
uq = &userQueue{
134-
seed: util.ShuffleShardSeed(userID, ""),
135-
index: -1,
136-
}
137-
138-
if queryPriority.Enabled {
139-
uq.queue = NewPriorityRequestQueue(util.NewPriorityQueue(nil))
140-
} else {
141-
uq.queue = NewFIFORequestQueue(make(chan Request, queueSize))
142-
}
143-
144-
q.userQueues[userID] = uq
145-
146-
// Add user to the list of users... find first free spot, and put it there.
147-
for ix, u := range q.users {
148-
if u == "" {
149-
uq.index = ix
150-
q.users[ix] = userID
151-
break
152-
}
153-
}
154-
155-
// ... or add to the end.
156-
if uq.index < 0 {
157-
uq.index = len(q.users)
158-
q.users = append(q.users, userID)
159-
}
130+
uq = q.createUserQueue(userID)
160131
}
161132

162-
if uq.maxQueriers != maxQueriers {
163-
uq.maxQueriers = maxQueriers
164-
uq.queriers = shuffleQueriersForUser(uq.seed, maxQueriers, q.sortedQueriers, nil)
165-
}
133+
q.updateUserQueuesAttributes(uq, userID, maxQueriers, priorityList, priorityEnabled)
166134

167135
return uq.queue
168136
}
@@ -196,21 +164,98 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (us
196164
}
197165
}
198166

199-
// TODO: justinjung04, reserved queriers
200-
//if priority, isReserved := uq.reservedQueriers[querierID]; isReserved {
201-
// return uq.queues[priority], u, uid
202-
//}
203-
204167
return uq.queue, u, uid
205168
}
206169
return nil, "", uid
207170
}
208171

209-
func (q *queues) getMinPriority(userID string, querierID string) int64 {
210-
// TODO: justinjung04 reserved querier
211-
// check list of queriers and QueryPriority config
212-
// from QueryPriority config, establish map of
213-
return 0
172+
func (q *queues) createUserQueue(userID string) *userQueue {
173+
uq := &userQueue{
174+
seed: util.ShuffleShardSeed(userID, ""),
175+
index: -1,
176+
}
177+
178+
uq.queue = q.createUserRequestQueue(userID)
179+
q.userQueues[userID] = uq
180+
181+
// Add user to the list of users... find first free spot, and put it there.
182+
for ix, u := range q.users {
183+
if u == "" {
184+
uq.index = ix
185+
q.users[ix] = userID
186+
break
187+
}
188+
}
189+
190+
// ... or add to the end.
191+
if uq.index < 0 {
192+
uq.index = len(q.users)
193+
q.users = append(q.users, userID)
194+
}
195+
196+
return uq
197+
}
198+
199+
func (q *queues) createUserRequestQueue(userID string) userRequestQueue {
200+
if q.limits.QueryPriority(userID).Enabled {
201+
return NewPriorityRequestQueue(util.NewPriorityQueue(nil))
202+
}
203+
204+
return NewFIFORequestQueue(make(chan Request, q.getQueueSize(userID)))
205+
}
206+
207+
func (q *queues) getQueueSize(userID string) int {
208+
queueSize := q.limits.MaxOutstandingPerTenant(userID)
209+
210+
// 0 is the default value of the flag. If the old flag is set
211+
// then we use its value for compatibility reason.
212+
if q.maxUserQueueSize != 0 {
213+
queueSize = q.maxUserQueueSize
214+
}
215+
216+
return queueSize
217+
}
218+
219+
func (q *queues) updateUserQueuesAttributes(uq *userQueue, userID string, maxQueriers int, priorityList []int64, priorityEnabled bool) {
220+
if uq.maxQueriers != maxQueriers {
221+
uq.maxQueriers = maxQueriers
222+
uq.queriers = shuffleQueriersForUser(uq.seed, maxQueriers, q.sortedQueriers, nil)
223+
}
224+
225+
// if query priority is newly enabled/disabled, transfer the requests to the new queue
226+
if uq.priorityEnabled != priorityEnabled {
227+
tmpQueue := q.createUserRequestQueue(userID)
228+
uq.queue.closeQueue()
229+
230+
for uq.queue.length() > 0 {
231+
tmpQueue.enqueueRequest(uq.queue.dequeueRequest(0, false))
232+
}
233+
uq.queue = tmpQueue
234+
}
235+
236+
if priorityEnabled && !reflect.DeepEqual(uq.priorityList, priorityList) {
237+
reservedQueriers := make(map[string]int64)
238+
239+
i := 0
240+
for querierID := range uq.queriers {
241+
reservedQueriers[querierID] = priorityList[i]
242+
i++
243+
if i == len(priorityList) {
244+
break
245+
}
246+
}
247+
248+
uq.reservedQueriers = reservedQueriers
249+
uq.priorityList = priorityList
250+
uq.priorityEnabled = priorityEnabled
251+
}
252+
}
253+
254+
func (q *queues) getMinPriority(userID string, querierID string) (int64, bool) {
255+
if priority, ok := q.userQueues[userID].reservedQueriers[querierID]; ok {
256+
return priority, true
257+
}
258+
return 0, false
214259
}
215260

216261
func (q *queues) addQuerierConnection(querierID string) {
@@ -349,7 +394,7 @@ type MockLimits struct {
349394
queryPriority validation.QueryPriority
350395
}
351396

352-
func (l MockLimits) MaxQueriersPerUser(user string) float64 {
397+
func (l MockLimits) MaxQueriersPerUser(_ string) float64 {
353398
return l.maxQueriersPerUser
354399
}
355400

pkg/scheduler/queue/user_queues_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestQueuesConsistency(t *testing.T) {
156156
conns := map[string]int{}
157157

158158
for i := 0; i < 10000; i++ {
159-
queue := uq.getOrAddQueue(generateTenant(r), 3)
159+
queue := uq.getOrAddQueue(generateTenant(r), 3, []int64{}, true)
160160
switch r.Int() % 6 {
161161
case 0:
162162
assert.NotNil(t, queue)
@@ -352,6 +352,10 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget
352352
}
353353
}
354354

355+
func TestQueuesUpdatedConfig(t *testing.T) {
356+
// TODO: justinjung04
357+
}
358+
355359
func generateTenant(r *rand.Rand) string {
356360
return fmt.Sprint("tenant-", r.Int()%5)
357361
}
@@ -361,10 +365,10 @@ func generateQuerier(r *rand.Rand) string {
361365
}
362366

363367
func getOrAdd(t *testing.T, uq *queues, tenant string, maxQueriers int) userRequestQueue {
364-
q := uq.getOrAddQueue(tenant, maxQueriers)
368+
q := uq.getOrAddQueue(tenant, maxQueriers, []int64{}, true)
365369
assert.NotNil(t, q)
366370
assert.NoError(t, isConsistent(uq))
367-
assert.Equal(t, q, uq.getOrAddQueue(tenant, maxQueriers))
371+
assert.Equal(t, q, uq.getOrAddQueue(tenant, maxQueriers, []int64{}, true))
368372
return q
369373
}
370374

0 commit comments

Comments
 (0)