Skip to content

Commit 0edfd7d

Browse files
authored
Fix user queue in scheduler that was not thread-safe (#6077)
1 parent a3fedc8 commit 0edfd7d

File tree

3 files changed

+48
-1
lines changed

3 files changed

+48
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@
2727
* [ENHANCEMENT] Ruler: Add support for filtering by `match` field on Rules API. #6083
2828
* [ENHANCEMENT] Distributor: Reduce memory usage when error volume is high. #6095
2929
* [ENHANCEMENT] Compactor: Add unique execution ID for each compaction cycle in log for easy debugging. #6097
30+
* [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040
3031
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
3132
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
3233
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018
3334
* [BUGFIX] Ingester: Fix issue with the minimize token generator where it was not taking in consideration the current ownerhip of an instance when generating extra tokens. #6062
35+
* [BUGFIX] Scheduler: Fix user queue in scheduler that was not thread-safe. #6077
3436

3537
## 1.17.1 2024-05-20
3638

pkg/scheduler/queue/user_queues.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package queue
33
import (
44
"math/rand"
55
"sort"
6+
"sync"
67
"time"
78

89
"github.com/prometheus/client_golang/prometheus"
@@ -37,7 +38,8 @@ type querier struct {
3738
// This struct holds user queues for pending requests. It also keeps track of connected queriers,
3839
// and mapping between users and queriers.
3940
type queues struct {
40-
userQueues map[string]*userQueue
41+
userQueues map[string]*userQueue
42+
userQueuesMx sync.RWMutex
4143

4244
// List of all users with queues, used for iteration when searching for next queue to handle.
4345
// Users removed from the middle are replaced with "". To avoid skipping users during iteration, we only shrink
@@ -103,6 +105,9 @@ func (q *queues) len() int {
103105
}
104106

105107
func (q *queues) deleteQueue(userID string) {
108+
q.userQueuesMx.Lock()
109+
defer q.userQueuesMx.Unlock()
110+
106111
uq := q.userQueues[userID]
107112
if uq == nil {
108113
return
@@ -132,6 +137,9 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue
132137
maxQueriers = 0
133138
}
134139

140+
q.userQueuesMx.Lock()
141+
defer q.userQueuesMx.Unlock()
142+
135143
uq := q.userQueues[userID]
136144
priorityEnabled := q.limits.QueryPriority(userID).Enabled
137145
maxOutstanding := q.limits.MaxOutstandingPerTenant(userID)
@@ -237,6 +245,9 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (us
237245
continue
238246
}
239247

248+
q.userQueuesMx.RLock()
249+
defer q.userQueuesMx.RUnlock()
250+
240251
uq := q.userQueues[u]
241252

242253
if uq.queriers != nil {

pkg/scheduler/queue/user_queues_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"math"
66
"math/rand"
77
"sort"
8+
"sync"
89
"testing"
910
"time"
1011

@@ -457,6 +458,39 @@ func TestGetOrAddQueueShouldUpdateProperties(t *testing.T) {
457458
}
458459
}
459460

461+
func TestQueueConcurrency(t *testing.T) {
462+
const numGoRoutines = 30
463+
limits := MockLimits{
464+
MaxOutstanding: 50,
465+
}
466+
q := newUserQueues(0, 0, limits, nil)
467+
q.addQuerierConnection("q-1")
468+
q.addQuerierConnection("q-2")
469+
q.addQuerierConnection("q-3")
470+
q.addQuerierConnection("q-4")
471+
q.addQuerierConnection("q-5")
472+
473+
var wg sync.WaitGroup
474+
wg.Add(numGoRoutines)
475+
476+
for i := 0; i < numGoRoutines; i++ {
477+
go func(cnt int) {
478+
defer wg.Done()
479+
queue := q.getOrAddQueue("userID", 2)
480+
if cnt%2 == 0 {
481+
queue.enqueueRequest(MockRequest{})
482+
q.getNextQueueForQuerier(0, "q-1")
483+
} else if cnt%5 == 0 {
484+
queue.dequeueRequest(0, false)
485+
} else if cnt%7 == 0 {
486+
q.deleteQueue("userID")
487+
}
488+
}(i)
489+
}
490+
491+
wg.Wait()
492+
}
493+
460494
func generateTenant(r *rand.Rand) string {
461495
return fmt.Sprint("tenant-", r.Int()%5)
462496
}

0 commit comments

Comments
 (0)