Skip to content

Commit 86cd94c

Browse files
authored
Fix queue worker incorrectly stopped when there are still more items in the queue (#29532) (#29546)
Backport #29532 Without `case <-t.C`, the workers would stop incorrectly, the test won't pass. For the worse case, there might be only one running worker processing the queue items for long time because other workers are stopped. The root cause is related to the logic of doDispatchBatchToWorker. It isn't a serious problem at the moment, so keep it as-is.
1 parent 8723389 commit 86cd94c

File tree

3 files changed

+42
-9
lines changed

3 files changed

+42
-9
lines changed

modules/queue/workergroup.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushCh
6060
full = true
6161
}
6262

63+
// TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
64+
// The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
65+
// So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary.
6366
q.workerNumMu.Lock()
6467
noWorker := q.workerNum == 0
6568
if full || noWorker {
@@ -143,7 +146,11 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
143146
log.Debug("Queue %q starts new worker", q.GetName())
144147
defer log.Debug("Queue %q stops idle worker", q.GetName())
145148

149+
atomic.AddInt32(&q.workerStartedCounter, 1) // Only increase counter, used for debugging
150+
146151
t := time.NewTicker(workerIdleDuration)
152+
defer t.Stop()
153+
147154
keepWorking := true
148155
stopWorking := func() {
149156
q.workerNumMu.Lock()
@@ -158,13 +165,18 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
158165
case batch, ok := <-q.batchChan:
159166
if !ok {
160167
stopWorking()
161-
} else {
162-
q.doWorkerHandle(batch)
163-
t.Reset(workerIdleDuration)
168+
continue
169+
}
170+
q.doWorkerHandle(batch)
171+
// reset the idle ticker, and drain the tick after reset in case a tick is already triggered
172+
t.Reset(workerIdleDuration)
173+
select {
174+
case <-t.C:
175+
default:
164176
}
165177
case <-t.C:
166178
q.workerNumMu.Lock()
167-
keepWorking = q.workerNum <= 1
179+
keepWorking = q.workerNum <= 1 // keep the last worker running
168180
if !keepWorking {
169181
q.workerNum--
170182
}

modules/queue/workerqueue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type WorkerPoolQueue[T any] struct {
4040
workerMaxNum int
4141
workerActiveNum int
4242
workerNumMu sync.Mutex
43+
44+
workerStartedCounter int32
4345
}
4446

4547
type flushType chan struct{}

modules/queue/workerqueue_test.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"code.gitea.io/gitea/modules/setting"
14+
"code.gitea.io/gitea/modules/test"
1415

1516
"github.com/stretchr/testify/assert"
1617
)
@@ -175,11 +176,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett
175176
}
176177

177178
func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
178-
oldWorkerIdleDuration := workerIdleDuration
179-
workerIdleDuration = 300 * time.Millisecond
180-
defer func() {
181-
workerIdleDuration = oldWorkerIdleDuration
182-
}()
179+
defer test.MockVariableValue(&workerIdleDuration, 300*time.Millisecond)()
183180

184181
handler := func(items ...int) (unhandled []int) {
185182
time.Sleep(100 * time.Millisecond)
@@ -250,3 +247,25 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
250247
q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
251248
assert.EqualValues(t, 20, q.GetQueueItemNumber())
252249
}
250+
251+
func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
252+
defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()
253+
254+
handler := func(items ...int) (unhandled []int) {
255+
time.Sleep(50 * time.Millisecond)
256+
return nil
257+
}
258+
259+
q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
260+
stop := runWorkerPoolQueue(q)
261+
for i := 0; i < 20; i++ {
262+
assert.NoError(t, q.Push(i))
263+
}
264+
265+
time.Sleep(500 * time.Millisecond)
266+
assert.EqualValues(t, 2, q.GetWorkerNumber())
267+
assert.EqualValues(t, 2, q.GetWorkerActiveNumber())
268+
// when the queue never becomes empty, the existing workers should keep working
269+
assert.EqualValues(t, 2, q.workerStartedCounter)
270+
stop()
271+
}

0 commit comments

Comments
 (0)