From d505f87d9ea80cf14bed301318615731df96341b Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 10 Jul 2024 09:51:54 -0700 Subject: [PATCH 1/4] Fix user queue in scheduler that was not thread-safe Signed-off-by: Justin Jung --- pkg/scheduler/queue/user_queues.go | 13 +++++++++- pkg/scheduler/queue/user_queues_test.go | 33 +++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index 25f562ee022..159df7810bf 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -3,6 +3,7 @@ package queue import ( "math/rand" "sort" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -37,7 +38,8 @@ type querier struct { // This struct holds user queues for pending requests. It also keeps track of connected queriers, // and mapping between users and queriers. type queues struct { - userQueues map[string]*userQueue + userQueues map[string]*userQueue + userQueuesMx sync.RWMutex // List of all users with queues, used for iteration when searching for next queue to handle. // Users removed from the middle are replaced with "". To avoid skipping users during iteration, we only shrink @@ -103,6 +105,9 @@ func (q *queues) len() int { } func (q *queues) deleteQueue(userID string) { + q.userQueuesMx.Lock() + defer q.userQueuesMx.Unlock() + uq := q.userQueues[userID] if uq == nil { return @@ -132,6 +137,9 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue maxQueriers = 0 } + q.userQueuesMx.Lock() + defer q.userQueuesMx.Unlock() + uq := q.userQueues[userID] priorityEnabled := q.limits.QueryPriority(userID).Enabled maxOutstanding := q.limits.MaxOutstandingPerTenant(userID) @@ -237,6 +245,9 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (us continue } + q.userQueuesMx.RLock() + defer q.userQueuesMx.RUnlock() + uq := q.userQueues[u] if uq.queriers != nil { diff --git a/pkg/scheduler/queue/user_queues_test.go b/pkg/scheduler/queue/user_queues_test.go index ded597baa0c..ec61496a87b 100644 --- a/pkg/scheduler/queue/user_queues_test.go +++ b/pkg/scheduler/queue/user_queues_test.go @@ -5,6 +5,7 @@ import ( "math" "math/rand" "sort" + "sync" "testing" "time" @@ -457,6 +458,38 @@ func TestGetOrAddQueueShouldUpdateProperties(t *testing.T) { } } +func TestGetOrAddQueueConcurrency(t *testing.T) { + const numGoRoutines = 100 + limits := MockLimits{ + MaxOutstanding: 3, + } + q := newUserQueues(0, 0, limits, nil) + q.addQuerierConnection("q-1") + q.addQuerierConnection("q-2") + q.addQuerierConnection("q-3") + q.addQuerierConnection("q-4") + q.addQuerierConnection("q-5") + + var wg sync.WaitGroup + wg.Add(numGoRoutines) + + for i := 0; i < numGoRoutines; i++ { + go func(maxOutstanding int) { + defer wg.Done() + limits.MaxOutstanding = maxOutstanding + 50 + q.limits = limits + queue := q.getOrAddQueue("userID", 2) + if rand.Int()%2 == 0 { + queue.enqueueRequest(MockRequest{}) + } else if rand.Int()%9 == 0 { + queue.dequeueRequest(0, false) + } + }(i) + } + + wg.Wait() +} + func generateTenant(r *rand.Rand) string { return fmt.Sprint("tenant-", r.Int()%5) } From 416c08ee0d402137fa2d944df8932be0cac282d6 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 10 Jul 2024 09:54:51 -0700 Subject: [PATCH 2/4] changelog Signed-off-by: Justin Jung --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ab0be698a9..d29a355738d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ * [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952 * [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018 * [BUGFIX] Ingester: Fix issue with the minimize token generator where it was not taking in consideration the current ownerhip of an instance when generating extra tokens. #6062 +* [BUGFIX] Scheduler: Fix user queue in scheduler that was not thread-safe. #6077 * [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040 ## 1.17.1 2024-05-20 From b7dd40a5a0686dafd1957cb8bb6775cce56bb966 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 10 Jul 2024 15:45:44 -0700 Subject: [PATCH 3/4] Fix test Signed-off-by: Justin Jung --- pkg/scheduler/queue/user_queues_test.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/scheduler/queue/user_queues_test.go b/pkg/scheduler/queue/user_queues_test.go index ec61496a87b..fe2ff650d05 100644 --- a/pkg/scheduler/queue/user_queues_test.go +++ b/pkg/scheduler/queue/user_queues_test.go @@ -459,9 +459,9 @@ func TestGetOrAddQueueShouldUpdateProperties(t *testing.T) { } func TestGetOrAddQueueConcurrency(t *testing.T) { - const numGoRoutines = 100 + const numGoRoutines = 10 limits := MockLimits{ - MaxOutstanding: 3, + MaxOutstanding: 50, } q := newUserQueues(0, 0, limits, nil) q.addQuerierConnection("q-1") @@ -474,14 +474,12 @@ func TestGetOrAddQueueConcurrency(t *testing.T) { wg.Add(numGoRoutines) for i := 0; i < numGoRoutines; i++ { - go func(maxOutstanding int) { + go func(cnt int) { defer wg.Done() - limits.MaxOutstanding = maxOutstanding + 50 - q.limits = limits queue := q.getOrAddQueue("userID", 2) - if rand.Int()%2 == 0 { + if cnt%2 == 0 { queue.enqueueRequest(MockRequest{}) - } else if rand.Int()%9 == 0 { + } else if cnt%5 == 0 { queue.dequeueRequest(0, false) } }(i) From 5061ea8852663efc3d1d6c913729f129e01f8b5e Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Mon, 15 Jul 2024 09:19:09 -0700 Subject: [PATCH 4/4] Test more Signed-off-by: Justin Jung --- pkg/scheduler/queue/user_queues_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/queue/user_queues_test.go b/pkg/scheduler/queue/user_queues_test.go index fe2ff650d05..4e720de402e 100644 --- a/pkg/scheduler/queue/user_queues_test.go +++ b/pkg/scheduler/queue/user_queues_test.go @@ -458,8 +458,8 @@ func TestGetOrAddQueueShouldUpdateProperties(t *testing.T) { } } -func TestGetOrAddQueueConcurrency(t *testing.T) { - const numGoRoutines = 10 +func TestQueueConcurrency(t *testing.T) { + const numGoRoutines = 30 limits := MockLimits{ MaxOutstanding: 50, } @@ -479,8 +479,11 @@ func TestGetOrAddQueueConcurrency(t *testing.T) { queue := q.getOrAddQueue("userID", 2) if cnt%2 == 0 { queue.enqueueRequest(MockRequest{}) + q.getNextQueueForQuerier(0, "q-1") } else if cnt%5 == 0 { queue.dequeueRequest(0, false) + } else if cnt%7 == 0 { + q.deleteQueue("userID") } }(i) }