Skip to content

Commit 7713744

Browse files
committed
create a zero boost so that if there are no workers in a pool - boost to start the workers
Signed-off-by: Andrew Thornton <[email protected]>
1 parent 5733a8f commit 7713744

File tree

1 file changed

+29
-1
lines changed

1 file changed

+29
-1
lines changed

modules/queue/workerpool.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,42 @@ func (p *WorkerPool) Push(data Data) {
7070
atomic.AddInt64(&p.numInQueue, 1)
7171
p.lock.Lock()
7272
if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
73-
p.lock.Unlock()
73+
if p.numberOfWorkers == 0 {
74+
p.zeroBoost()
75+
} else {
76+
p.lock.Unlock()
77+
}
7478
p.pushBoost(data)
7579
} else {
7680
p.lock.Unlock()
7781
p.dataChan <- data
7882
}
7983
}
8084

85+
func (p *WorkerPool) zeroBoost() {
86+
ctx, cancel := context.WithCancel(p.baseCtx)
87+
mq := GetManager().GetManagedQueue(p.qid)
88+
boost := p.boostWorkers
89+
if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
90+
boost = p.maxNumberOfWorkers - p.numberOfWorkers
91+
}
92+
if mq != nil {
93+
log.Warn("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout)
94+
95+
start := time.Now()
96+
pid := mq.RegisterWorkers(boost, start, false, start, cancel, false)
97+
go func() {
98+
<-ctx.Done()
99+
mq.RemoveWorkers(pid)
100+
cancel()
101+
}()
102+
} else {
103+
log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
104+
}
105+
p.lock.Unlock()
106+
p.addWorkers(ctx, boost)
107+
}
108+
81109
func (p *WorkerPool) pushBoost(data Data) {
82110
select {
83111
case p.dataChan <- data:

0 commit comments

Comments
 (0)