Skip to content

Commit 713985b

Browse files
authored
Prevent deadlocks in persistable channel pause test (#18410)
* Prevent deadlocks in persistable channel pause test Because of reuse of the old paused/resumed channels in this test there was a potential for deadlock. This PR ensures that the channels are always reobtained. It further adds some control code to detect hangs in future - and it ensures that the pausing warning is not shown on shutdown. Signed-off-by: Andrew Thornton <[email protected]> * do not warn but do pause Signed-off-by: Andrew Thornton <[email protected]>
1 parent b53fd5f commit 713985b

File tree

2 files changed

+53
-6
lines changed

2 files changed

+53
-6
lines changed

modules/queue/queue_disk_channel_test.go

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,11 +287,16 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
287287
assert.Nil(t, result2)
288288

289289
pausable.Resume()
290+
paused, resumed = pausable.IsPausedIsResumed()
290291

291292
select {
293+
case <-paused:
294+
assert.Fail(t, "Queue should be resumed")
295+
return
292296
case <-resumed:
293297
default:
294298
assert.Fail(t, "Queue should be resumed")
299+
return
295300
}
296301

297302
select {
@@ -345,16 +350,22 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
345350

346351
pausable.Resume()
347352

353+
paused, resumed = pausable.IsPausedIsResumed()
348354
select {
355+
case <-paused:
356+
assert.Fail(t, "Queue should not be paused")
357+
return
349358
case <-resumed:
350359
default:
351360
assert.Fail(t, "Queue should be resumed")
361+
return
352362
}
353363

354364
select {
355365
case result1 = <-handleChan:
356366
case <-time.After(500 * time.Millisecond):
357367
assert.Fail(t, "handler chan should contain test1")
368+
return
358369
}
359370
assert.Equal(t, test1.TestString, result1.TestString)
360371
assert.Equal(t, test1.TestInt, result1.TestInt)
@@ -369,7 +380,12 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
369380
}
370381

371382
// Wait til it is closed
372-
<-queue.(*PersistableChannelQueue).closed
383+
select {
384+
case <-queue.(*PersistableChannelQueue).closed:
385+
case <-time.After(5 * time.Second):
386+
assert.Fail(t, "queue should close")
387+
return
388+
}
373389

374390
err = queue.Push(&test1)
375391
assert.NoError(t, err)
@@ -378,6 +394,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
378394
select {
379395
case <-handleChan:
380396
assert.Fail(t, "Handler processing should have stopped")
397+
return
381398
default:
382399
}
383400

@@ -393,6 +410,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
393410
select {
394411
case <-handleChan:
395412
assert.Fail(t, "Handler processing should have stopped")
413+
return
396414
default:
397415
}
398416

@@ -431,6 +449,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
431449
select {
432450
case <-handleChan:
433451
assert.Fail(t, "Handler processing should have stopped")
452+
return
434453
case <-paused:
435454
}
436455

@@ -449,13 +468,36 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
449468
select {
450469
case <-handleChan:
451470
assert.Fail(t, "Handler processing should have stopped")
471+
return
452472
default:
453473
}
454474

455475
pausable.Resume()
476+
paused, resumed = pausable.IsPausedIsResumed()
477+
select {
478+
case <-paused:
479+
assert.Fail(t, "Queue should not be paused")
480+
return
481+
case <-resumed:
482+
default:
483+
assert.Fail(t, "Queue should be resumed")
484+
return
485+
}
456486

457-
result3 := <-handleChan
458-
result4 := <-handleChan
487+
var result3, result4 *testData
488+
489+
select {
490+
case result3 = <-handleChan:
491+
case <-time.After(1 * time.Second):
492+
assert.Fail(t, "Handler processing should have resumed")
493+
return
494+
}
495+
select {
496+
case result4 = <-handleChan:
497+
case <-time.After(1 * time.Second):
498+
assert.Fail(t, "Handler processing should have resumed")
499+
return
500+
}
459501
if result4.TestString == test1.TestString {
460502
result3, result4 = result4, result3
461503
}

modules/queue/workerpool.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,14 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
301301
cancel()
302302
}
303303
if p.hasNoWorkerScaling() {
304-
log.Warn(
305-
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
306-
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
304+
select {
305+
case <-p.baseCtx.Done():
306+
// Don't warn if the baseCtx is shutdown
307+
default:
308+
log.Warn(
309+
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
310+
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
311+
}
307312
p.pause()
308313
}
309314
p.lock.Unlock()

0 commit comments

Comments
 (0)