Skip to content

Commit 28ba710

Browse files
committed
fix
1 parent 58ce1de commit 28ba710

File tree

3 files changed

+42
-9
lines changed

3 files changed

+42
-9
lines changed

modules/queue/workergroup.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushCh
6060
full = true
6161
}
6262

63+
// TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
64+
// The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
65+
// So ideally, it should check whether there are enough workers periodically, and start new workers if necessary.
6366
q.workerNumMu.Lock()
6467
noWorker := q.workerNum == 0
6568
if full || noWorker {
@@ -143,7 +146,11 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
143146
log.Debug("Queue %q starts new worker", q.GetName())
144147
defer log.Debug("Queue %q stops idle worker", q.GetName())
145148

149+
atomic.AddInt32(&q.workerStartedCounter, 1)
150+
146151
t := time.NewTicker(workerIdleDuration)
152+
defer t.Stop()
153+
147154
keepWorking := true
148155
stopWorking := func() {
149156
q.workerNumMu.Lock()
@@ -158,13 +165,18 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
158165
case batch, ok := <-q.batchChan:
159166
if !ok {
160167
stopWorking()
161-
} else {
162-
q.doWorkerHandle(batch)
163-
t.Reset(workerIdleDuration)
168+
continue
169+
}
170+
q.doWorkerHandle(batch)
171+
// reset the idle ticker, and drain the tick after reset in case a tick is already triggered
172+
t.Reset(workerIdleDuration)
173+
select {
174+
case <-t.C:
175+
default:
164176
}
165177
case <-t.C:
166178
q.workerNumMu.Lock()
167-
keepWorking = q.workerNum <= 1
179+
keepWorking = q.workerNum <= 1 // keep the last worker running
168180
if !keepWorking {
169181
q.workerNum--
170182
}

modules/queue/workerqueue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type WorkerPoolQueue[T any] struct {
4040
workerMaxNum int
4141
workerActiveNum int
4242
workerNumMu sync.Mutex
43+
44+
workerStartedCounter int32
4345
}
4446

4547
type flushType chan struct{}

modules/queue/workerqueue_test.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"code.gitea.io/gitea/modules/setting"
14+
"code.gitea.io/gitea/modules/test"
1415

1516
"github.com/stretchr/testify/assert"
1617
)
@@ -175,11 +176,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett
175176
}
176177

177178
func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
178-
oldWorkerIdleDuration := workerIdleDuration
179-
workerIdleDuration = 300 * time.Millisecond
180-
defer func() {
181-
workerIdleDuration = oldWorkerIdleDuration
182-
}()
179+
defer test.MockVariableValue(&workerIdleDuration, 300*time.Millisecond)()
183180

184181
handler := func(items ...int) (unhandled []int) {
185182
time.Sleep(100 * time.Millisecond)
@@ -250,3 +247,25 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
250247
q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
251248
assert.EqualValues(t, 20, q.GetQueueItemNumber())
252249
}
250+
251+
func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
252+
defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()
253+
254+
handler := func(items ...int) (unhandled []int) {
255+
time.Sleep(50 * time.Millisecond)
256+
return nil
257+
}
258+
259+
q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
260+
stop := runWorkerPoolQueue(q)
261+
for i := 0; i < 20; i++ {
262+
assert.NoError(t, q.Push(i))
263+
}
264+
265+
time.Sleep(500 * time.Millisecond)
266+
assert.EqualValues(t, 2, q.GetWorkerNumber())
267+
assert.EqualValues(t, 2, q.GetWorkerActiveNumber())
268+
// when the queue never becomes empty, the existing workers should keep working
269+
assert.EqualValues(t, 2, q.workerStartedCounter)
270+
stop()
271+
}

0 commit comments

Comments
 (0)