Skip to content

Commit 64152a0

Browse files
authored
Merge pull request #3371 from alvaroaleman/cp-fix
[release-0.22] 🐛 priority queue: properly sync the waiter manipulation
2 parents 88269f3 + b3eff6d commit 64152a0

File tree

2 files changed

+13
-6
lines changed

2 files changed

+13
-6
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ type priorityqueue[T comparable] struct {
124124
get chan item[T]
125125

126126
// waiters is the number of routines blocked in Get, we use it to determine
127-
// if we can push items.
128-
waiters atomic.Int64
127+
// if we can push items. Every manipulation has to be protected with the lock.
128+
waiters int64
129129

130130
// Configurable for testing
131131
now func() time.Time
@@ -269,15 +269,15 @@ func (w *priorityqueue[T]) spin() {
269269
}
270270
}
271271

272-
if w.waiters.Load() == 0 {
272+
if w.waiters == 0 {
273273
// Have to keep iterating here to ensure we update metrics
274274
// for further items that became ready and set nextReady.
275275
return true
276276
}
277277

278278
w.metrics.get(item.Key, item.Priority)
279279
w.locked.Insert(item.Key)
280-
w.waiters.Add(-1)
280+
w.waiters--
281281
delete(w.items, item.Key)
282282
toDelete = append(toDelete, item)
283283
w.becameReady.Delete(item.Key)
@@ -316,7 +316,9 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)
316316
return zero, 0, true
317317
}
318318

319-
w.waiters.Add(1)
319+
w.lock.Lock()
320+
w.waiters++
321+
w.lock.Unlock()
320322

321323
w.notifyItemOrWaiterAdded()
322324

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,12 @@ var _ = Describe("Controllerworkqueue", func() {
378378
}()
379379

380380
// Verify the go routine above is now waiting for an item.
381-
Eventually(q.(*priorityqueue[string]).waiters.Load).Should(Equal(int64(1)))
381+
Eventually(func() int64 {
382+
q.(*priorityqueue[string]).lock.Lock()
383+
defer q.(*priorityqueue[string]).lock.Unlock()
384+
385+
return q.(*priorityqueue[string]).waiters
386+
}).Should(Equal(int64(1)))
382387
Consistently(getUnblocked).ShouldNot(BeClosed())
383388

384389
// shut down

0 commit comments

Comments
 (0)