Skip to content

Commit 32f895f

Browse files
authored
Fix queue test (#30646) (#30650)
Backport #30553 and #30646
1 parent d95408b commit 32f895f

File tree

4 files changed

+43
-19
lines changed

4 files changed

+43
-19
lines changed

modules/queue/backoff.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"time"
99
)
1010

11-
const (
11+
var (
1212
backoffBegin = 50 * time.Millisecond
1313
backoffUpper = 2 * time.Second
1414
)
@@ -18,6 +18,14 @@ type (
1818
backoffFuncErr func() (retry bool, err error)
1919
)
2020

21+
func mockBackoffDuration(d time.Duration) func() {
22+
oldBegin, oldUpper := backoffBegin, backoffUpper
23+
backoffBegin, backoffUpper = d, d
24+
return func() {
25+
backoffBegin, backoffUpper = oldBegin, oldUpper
26+
}
27+
}
28+
2129
func backoffRetErr[T any](ctx context.Context, begin, upper time.Duration, end <-chan time.Time, fn backoffFuncRetErr[T]) (ret T, err error) {
2230
d := begin
2331
for {

modules/queue/workergroup.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushCh
6363
// TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
6464
// The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
6565
// So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary.
66+
// This data-race is not serious, as long as a new worker will be started soon to make sure there are enough workers,
67+
// so no need to hugely refactor at the moment.
6668
q.workerNumMu.Lock()
6769
noWorker := q.workerNum == 0
6870
if full || noWorker {
@@ -136,6 +138,14 @@ func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool {
136138
return true
137139
}
138140

141+
func resetIdleTicker(t *time.Ticker, dur time.Duration) {
142+
t.Reset(dur)
143+
select {
144+
case <-t.C:
145+
default:
146+
}
147+
}
148+
139149
// doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items.
140150
func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
141151
wp.wg.Add(1)
@@ -146,8 +156,6 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
146156
log.Debug("Queue %q starts new worker", q.GetName())
147157
defer log.Debug("Queue %q stops idle worker", q.GetName())
148158

149-
atomic.AddInt32(&q.workerStartedCounter, 1) // Only increase counter, used for debugging
150-
151159
t := time.NewTicker(workerIdleDuration)
152160
defer t.Stop()
153161

@@ -169,11 +177,7 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
169177
}
170178
q.doWorkerHandle(batch)
171179
// reset the idle ticker, and drain the tick after reset in case a tick is already triggered
172-
t.Reset(workerIdleDuration)
173-
select {
174-
case <-t.C:
175-
default:
176-
}
180+
resetIdleTicker(t, workerIdleDuration) // key code for TestWorkerPoolQueueWorkerIdleReset
177181
case <-t.C:
178182
q.workerNumMu.Lock()
179183
keepWorking = q.workerNum <= 1 // keep the last worker running

modules/queue/workerqueue.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ type WorkerPoolQueue[T any] struct {
4040
workerMaxNum int
4141
workerActiveNum int
4242
workerNumMu sync.Mutex
43-
44-
workerStartedCounter int32
4543
}
4644

4745
type flushType chan struct{}

modules/queue/workerqueue_test.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ package queue
55

66
import (
77
"context"
8+
"slices"
89
"strconv"
910
"sync"
11+
"sync/atomic"
1012
"testing"
1113
"time"
1214

@@ -250,22 +252,34 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
250252

251253
func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
252254
defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()
255+
defer mockBackoffDuration(5 * time.Millisecond)()
253256

257+
var q *WorkerPoolQueue[int]
258+
var handledCount atomic.Int32
259+
var hasOnlyOneWorkerRunning atomic.Bool
254260
handler := func(items ...int) (unhandled []int) {
255-
time.Sleep(50 * time.Millisecond)
261+
handledCount.Add(int32(len(items)))
262+
// make each work have different duration, and check the active worker number periodically
263+
var activeNums []int
264+
for i := 0; i < 5-items[0]%2; i++ {
265+
time.Sleep(workerIdleDuration * 2)
266+
activeNums = append(activeNums, q.GetWorkerActiveNumber())
267+
}
268+
// When the queue never becomes empty, the existing workers should keep working
269+
// It is not 100% true at the moment because the data-race in workergroup.go is not resolved, see that TODO */
270+
// If the "active worker numbers" is like [2 2 ... 1 1], it means that an existing worker exited and the no new worker is started.
271+
if slices.Equal([]int{1, 1}, activeNums[len(activeNums)-2:]) {
272+
hasOnlyOneWorkerRunning.Store(true)
273+
}
256274
return nil
257275
}
258-
259-
q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
276+
q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
260277
stop := runWorkerPoolQueue(q)
261-
for i := 0; i < 20; i++ {
278+
for i := 0; i < 100; i++ {
262279
assert.NoError(t, q.Push(i))
263280
}
264-
265281
time.Sleep(500 * time.Millisecond)
266-
assert.EqualValues(t, 2, q.GetWorkerNumber())
267-
assert.EqualValues(t, 2, q.GetWorkerActiveNumber())
268-
// when the queue never becomes empty, the existing workers should keep working
269-
assert.EqualValues(t, 2, q.workerStartedCounter)
282+
assert.Greater(t, int(handledCount.Load()), 4) // make sure there are enough items handled during the test
283+
assert.False(t, hasOnlyOneWorkerRunning.Load(), "a slow handler should not block other workers from starting")
270284
stop()
271285
}

0 commit comments

Comments
 (0)