@@ -94,7 +94,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
94
94
},
95
95
Workers : 0 ,
96
96
},
97
- DataDir : config .DataDir ,
97
+ DataDir : config .DataDir ,
98
+ QueueName : config .Name + "-level" ,
98
99
}
99
100
100
101
queue .channelQueue = channelUniqueQueue .(* ChannelUniqueQueue )
@@ -209,17 +210,29 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
209
210
atTerminate (q .Terminate )
210
211
_ = q .channelQueue .AddWorkers (q .channelQueue .workers , 0 )
211
212
212
- if luq , ok := q .internal .(* LevelUniqueQueue ); ok && luq .ByteFIFOUniqueQueue . byteFIFO . Len ( luq . shutdownCtx ) != 0 {
213
+ if luq , ok := q .internal .(* LevelUniqueQueue ); ok && ! luq .IsEmpty () {
213
214
// Just run the level queue - we shut it down once it's flushed
214
- go q . internal .Run (func (_ func ()) {}, func (_ func ()) {})
215
+ go luq .Run (func (_ func ()) {}, func (_ func ()) {})
215
216
go func () {
216
- _ = q .internal .Flush (0 )
217
- log .Debug ("LevelUniqueQueue: %s flushed so shutting down" , q .internal .(* LevelUniqueQueue ).Name ())
218
- q .internal .(* LevelUniqueQueue ).Shutdown ()
219
- GetManager ().Remove (q .internal .(* LevelUniqueQueue ).qid )
217
+ _ = luq .Flush (0 )
218
+ for ! luq .IsEmpty () {
219
+ _ = luq .Flush (0 )
220
+ select {
221
+ case <- time .After (100 * time .Millisecond ):
222
+ case <- luq .shutdownCtx .Done ():
223
+ if luq .byteFIFO .Len (luq .terminateCtx ) > 0 {
224
+ log .Warn ("LevelUniqueQueue: %s shut down before completely flushed" , luq .Name ())
225
+ }
226
+ return
227
+ }
228
+ }
229
+ log .Debug ("LevelUniqueQueue: %s flushed so shutting down" , luq .Name ())
230
+ luq .Shutdown ()
231
+ GetManager ().Remove (luq .qid )
220
232
}()
221
233
} else {
222
234
log .Debug ("PersistableChannelUniqueQueue: %s Skipping running the empty level queue" , q .delayedStarter .name )
235
+ _ = q .internal .Flush (0 )
223
236
q .internal .(* LevelUniqueQueue ).Shutdown ()
224
237
GetManager ().Remove (q .internal .(* LevelUniqueQueue ).qid )
225
238
}
@@ -285,8 +298,20 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
285
298
// Redirect all remaining data in the chan to the internal channel
286
299
close (q .channelQueue .dataChan )
287
300
log .Trace ("PersistableChannelUniqueQueue: %s Redirecting remaining data" , q .delayedStarter .name )
301
+ countOK , countLost := 0 , 0
288
302
for data := range q .channelQueue .dataChan {
289
- _ = q .internal .Push (data )
303
+ err := q .internal .(* LevelUniqueQueue ).Push (data )
304
+ if err != nil {
305
+ log .Error ("PersistableChannelUniqueQueue: %s Unable redirect %v due to: %v" , q .delayedStarter .name , data , err )
306
+ countLost ++
307
+ } else {
308
+ countOK ++
309
+ }
310
+ }
311
+ if countLost > 0 {
312
+ log .Warn ("PersistableChannelUniqueQueue: %s %d will be restored on restart, %d lost" , q .delayedStarter .name , countOK , countLost )
313
+ } else if countOK > 0 {
314
+ log .Warn ("PersistableChannelUniqueQueue: %s %d will be restored on restart" , q .delayedStarter .name , countOK )
290
315
}
291
316
log .Trace ("PersistableChannelUniqueQueue: %s Done Redirecting remaining data" , q .delayedStarter .name )
292
317
0 commit comments