Skip to content

Commit a85e75b

Browse files
authored
Prevent deadlock in TestPersistableChannelQueue (#17717)
* Prevent deadlock in TestPersistableChannelQueue There is a potential deadlock in TestPersistableChannelQueue due to attempting to shutdown the test queue before it is ready. Signed-off-by: Andrew Thornton <[email protected]> * prevent npe Signed-off-by: Andrew Thornton <[email protected]>
1 parent 72b0882 commit a85e75b

File tree

1 file changed

+33
-0
lines changed

1 file changed

+33
-0
lines changed

modules/queue/queue_disk_channel_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ func TestPersistableChannelQueue(t *testing.T) {
1818
handleChan := make(chan *testData)
1919
handle := func(data ...Data) {
2020
for _, datum := range data {
21+
if datum == nil {
22+
continue
23+
}
2124
testDatum := datum.(*testData)
2225
handleChan <- testDatum
2326
}
@@ -42,13 +45,26 @@ func TestPersistableChannelQueue(t *testing.T) {
4245
}, &testData{})
4346
assert.NoError(t, err)
4447

48+
readyForShutdown := make(chan struct{})
49+
readyForTerminate := make(chan struct{})
50+
4551
go queue.Run(func(shutdown func()) {
4652
lock.Lock()
4753
defer lock.Unlock()
54+
select {
55+
case <-readyForShutdown:
56+
default:
57+
close(readyForShutdown)
58+
}
4859
queueShutdown = append(queueShutdown, shutdown)
4960
}, func(terminate func()) {
5061
lock.Lock()
5162
defer lock.Unlock()
63+
select {
64+
case <-readyForTerminate:
65+
default:
66+
close(readyForTerminate)
67+
}
5268
queueTerminate = append(queueTerminate, terminate)
5369
})
5470

@@ -74,6 +90,7 @@ func TestPersistableChannelQueue(t *testing.T) {
7490
err = queue.Push(test1)
7591
assert.Error(t, err)
7692

93+
<-readyForShutdown
7794
// Now shutdown the queue
7895
lock.Lock()
7996
callbacks := make([]func(), len(queueShutdown))
@@ -97,6 +114,7 @@ func TestPersistableChannelQueue(t *testing.T) {
97114
}
98115

99116
// terminate the queue
117+
<-readyForTerminate
100118
lock.Lock()
101119
callbacks = make([]func(), len(queueTerminate))
102120
copy(callbacks, queueTerminate)
@@ -123,13 +141,26 @@ func TestPersistableChannelQueue(t *testing.T) {
123141
}, &testData{})
124142
assert.NoError(t, err)
125143

144+
readyForShutdown = make(chan struct{})
145+
readyForTerminate = make(chan struct{})
146+
126147
go queue.Run(func(shutdown func()) {
127148
lock.Lock()
128149
defer lock.Unlock()
150+
select {
151+
case <-readyForShutdown:
152+
default:
153+
close(readyForShutdown)
154+
}
129155
queueShutdown = append(queueShutdown, shutdown)
130156
}, func(terminate func()) {
131157
lock.Lock()
132158
defer lock.Unlock()
159+
select {
160+
case <-readyForTerminate:
161+
default:
162+
close(readyForTerminate)
163+
}
133164
queueTerminate = append(queueTerminate, terminate)
134165
})
135166

@@ -141,13 +172,15 @@ func TestPersistableChannelQueue(t *testing.T) {
141172
assert.Equal(t, test2.TestString, result4.TestString)
142173
assert.Equal(t, test2.TestInt, result4.TestInt)
143174

175+
<-readyForShutdown
144176
lock.Lock()
145177
callbacks = make([]func(), len(queueShutdown))
146178
copy(callbacks, queueShutdown)
147179
lock.Unlock()
148180
for _, callback := range callbacks {
149181
callback()
150182
}
183+
<-readyForTerminate
151184
lock.Lock()
152185
callbacks = make([]func(), len(queueTerminate))
153186
copy(callbacks, queueTerminate)

0 commit comments

Comments
 (0)