Skip to content

Commit bda7fa3

Browse files
committed
as per lunny
Signed-off-by: Andrew Thornton <[email protected]>
1 parent f227928 commit bda7fa3

File tree

1 file changed

+55
-43
lines changed

1 file changed

+55
-43
lines changed

modules/queue/queue_bytefifo.go

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -114,59 +114,71 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func()
114114
}
115115

116116
func (q *ByteFIFOQueue) readToChan() {
117+
// handle quick cancels
118+
select {
119+
case <-q.closed:
120+
// tell the pool to shutdown.
121+
q.cancel()
122+
return
123+
default:
124+
}
125+
117126
backOffTime := time.Millisecond * 100
118127
maxBackOffTime := time.Second * 3
119-
backOff := func() time.Duration {
120-
backOffTime += backOffTime / 2
121-
if backOffTime > maxBackOffTime {
122-
backOffTime = maxBackOffTime
123-
}
124-
return backOffTime
125-
}
126-
doBackOff := func() {
127-
select {
128-
case <-q.closed:
129-
case <-time.After(backOff()):
128+
for {
129+
success, resetBackoff := q.doPop()
130+
if resetBackoff {
131+
backOffTime = 100 * time.Millisecond
130132
}
131-
}
132133

133-
for {
134-
select {
135-
case <-q.closed:
136-
// tell the pool to shutdown.
137-
q.cancel()
138-
return
139-
default:
140-
q.lock.Lock()
141-
bs, err := q.byteFIFO.Pop()
142-
if err != nil {
143-
q.lock.Unlock()
144-
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
145-
doBackOff()
146-
continue
134+
if success {
135+
select {
136+
case <-q.closed:
137+
// tell the pool to shutdown.
138+
q.cancel()
139+
return
140+
default:
147141
}
148-
149-
if len(bs) == 0 {
150-
q.lock.Unlock()
151-
doBackOff()
152-
continue
142+
} else {
143+
select {
144+
case <-q.closed:
145+
// tell the pool to shutdown.
146+
q.cancel()
147+
return
148+
case <-time.After(backOffTime):
149+
}
150+
backOffTime += backOffTime / 2
151+
if backOffTime > maxBackOffTime {
152+
backOffTime = maxBackOffTime
153153
}
154+
}
155+
}
156+
}
154157

155-
backOffTime = time.Millisecond * 100
158+
func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) {
159+
q.lock.Lock()
160+
defer q.lock.Unlock()
161+
bs, err := q.byteFIFO.Pop()
162+
if err != nil {
163+
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
164+
return
165+
}
166+
if len(bs) == 0 {
167+
return
168+
}
156169

157-
data, err := unmarshalAs(bs, q.exemplar)
158-
if err != nil {
159-
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
160-
q.lock.Unlock()
161-
doBackOff()
162-
continue
163-
}
170+
resetBackoff = true
164171

165-
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
166-
q.WorkerPool.Push(data)
167-
q.lock.Unlock()
168-
}
172+
data, err := unmarshalAs(bs, q.exemplar)
173+
if err != nil {
174+
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
175+
return
169176
}
177+
178+
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
179+
q.WorkerPool.Push(data)
180+
success = true
181+
return
170182
}
171183

172184
// Shutdown processing from this queue

0 commit comments

Comments
 (0)