@@ -5,8 +5,10 @@ package queue
5
5
6
6
import (
7
7
"context"
8
+ "slices"
8
9
"strconv"
9
10
"sync"
11
+ "sync/atomic"
10
12
"testing"
11
13
"time"
12
14
@@ -250,23 +252,34 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
250
252
251
253
func TestWorkerPoolQueueWorkerIdleReset (t * testing.T ) {
252
254
defer test .MockVariableValue (& workerIdleDuration , 10 * time .Millisecond )()
253
- defer mockBackoffDuration (10 * time .Millisecond )()
255
+ defer mockBackoffDuration (5 * time .Millisecond )()
254
256
257
+ var q * WorkerPoolQueue [int ]
258
+ var handledCount atomic.Int32
259
+ var hasOnlyOneWorkerRunning atomic.Bool
255
260
handler := func (items ... int ) (unhandled []int ) {
256
- time .Sleep (50 * time .Millisecond )
261
+ handledCount .Add (int32 (len (items )))
262
+ // make each work have different duration, and check the active worker number periodically
263
+ var activeNums []int
264
+ for i := 0 ; i < 5 - items [0 ]% 2 ; i ++ {
265
+ time .Sleep (workerIdleDuration * 2 )
266
+ activeNums = append (activeNums , q .GetWorkerActiveNumber ())
267
+ }
268
+ // When the queue never becomes empty, the existing workers should keep working
269
+ // It is not 100% true at the moment because the data-race in workergroup.go is not resolved, see that TODO */
270
+ // If the "active worker numbers" is like [2 2 ... 1 1], it means that an existing worker exited and the no new worker is started.
271
+ if slices .Equal ([]int {1 , 1 }, activeNums [len (activeNums )- 2 :]) {
272
+ hasOnlyOneWorkerRunning .Store (true )
273
+ }
257
274
return nil
258
275
}
259
-
260
- q , _ := newWorkerPoolQueueForTest ("test-workpoolqueue" , setting.QueueSettings {Type : "channel" , BatchLength : 1 , MaxWorkers : 2 , Length : 100 }, handler , false )
276
+ q , _ = newWorkerPoolQueueForTest ("test-workpoolqueue" , setting.QueueSettings {Type : "channel" , BatchLength : 1 , MaxWorkers : 2 , Length : 100 }, handler , false )
261
277
stop := runWorkerPoolQueue (q )
262
- for i := 0 ; i < 20 ; i ++ {
278
+ for i := 0 ; i < 100 ; i ++ {
263
279
assert .NoError (t , q .Push (i ))
264
280
}
265
-
266
281
time .Sleep (500 * time .Millisecond )
267
- assert .EqualValues (t , 2 , q .GetWorkerNumber ())
268
- assert .EqualValues (t , 2 , q .GetWorkerActiveNumber ())
269
- // when the queue never becomes empty, the existing workers should keep working
270
- assert .EqualValues (t , 2 , q .workerStartedCounter )
282
+ assert .Greater (t , int (handledCount .Load ()), 4 ) // make sure there are enough items handled during the test
283
+ assert .False (t , hasOnlyOneWorkerRunning .Load (), "a slow handler should not block other workers from starting" )
271
284
stop ()
272
285
}
0 commit comments