Skip to content

Commit 74a30fc

Browse files
committed
Add IsEmpty and begin just making the queues composed WorkerPools
1 parent 5da04b3 commit 74a30fc

File tree

7 files changed

+94
-24
lines changed

7 files changed

+94
-24
lines changed

modules/queue/queue.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type Queue interface {
5959
Run(atShutdown, atTerminate func(context.Context, func()))
6060
Push(Data) error
6161
Flush(time.Duration) error
62+
IsEmpty() bool
6263
}
6364

6465
// DummyQueueType is the type for the dummy queue
@@ -86,6 +87,11 @@ func (b *DummyQueue) Flush(time.Duration) error {
8687
return nil
8788
}
8889

90+
// IsEmpty asserts that the queue is empty
91+
func (b *DummyQueue) IsEmpty() bool {
92+
return true
93+
}
94+
8995
var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue}
9096

9197
// RegisteredTypes provides the list of requested types of queues

modules/queue/queue_channel.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package queue
77
import (
88
"context"
99
"fmt"
10-
"time"
1110

1211
"code.gitea.io/gitea/modules/log"
1312
)
@@ -27,7 +26,7 @@ type ChannelQueueConfiguration struct {
2726
// A channel queue is not persistable and does not shutdown or terminate cleanly
2827
// It is basically a very thin wrapper around a WorkerPool
2928
type ChannelQueue struct {
30-
pool *WorkerPool
29+
*WorkerPool
3130
exemplar interface{}
3231
workers int
3332
name string
@@ -44,12 +43,12 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
4443
config.BatchLength = 1
4544
}
4645
queue := &ChannelQueue{
47-
pool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
48-
exemplar: exemplar,
49-
workers: config.Workers,
50-
name: config.Name,
46+
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
47+
exemplar: exemplar,
48+
workers: config.Workers,
49+
name: config.Name,
5150
}
52-
queue.pool.qid = GetManager().Add(config.Name, ChannelQueueType, config, exemplar, queue.pool)
51+
queue.qid = GetManager().Add(config.Name, ChannelQueueType, config, exemplar, queue)
5352
return queue, nil
5453
}
5554

@@ -63,7 +62,7 @@ func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())
6362
})
6463
log.Debug("ChannelQueue: %s Starting", c.name)
6564
go func() {
66-
_ = c.pool.AddWorkers(c.workers, 0)
65+
_ = c.AddWorkers(c.workers, 0)
6766
}()
6867
}
6968

@@ -72,7 +71,7 @@ func (c *ChannelQueue) Push(data Data) error {
7271
if !assignableTo(data, c.exemplar) {
7372
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
7473
}
75-
c.pool.Push(data)
74+
c.WorkerPool.Push(data)
7675
return nil
7776
}
7877

@@ -81,11 +80,6 @@ func (c *ChannelQueue) Name() string {
8180
return c.name
8281
}
8382

84-
// Flush flushes the queue and blocks till the queue is empty
85-
func (c *ChannelQueue) Flush(timeout time.Duration) error {
86-
return c.pool.Flush(timeout)
87-
}
88-
8983
func init() {
9084
queuesMap[ChannelQueueType] = NewChannelQueue
9185
}

modules/queue/queue_disk.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,14 @@ func (l *LevelQueue) Flush(timeout time.Duration) error {
145145
return l.pool.Flush(timeout)
146146
}
147147

148+
// IsEmpty checks whether the queue is empty
149+
func (l *LevelQueue) IsEmpty() bool {
150+
if !l.pool.IsEmpty() {
151+
return false
152+
}
153+
return l.queue.Len() == 0
154+
}
155+
148156
// Shutdown this queue and stop processing
149157
func (l *LevelQueue) Shutdown() {
150158
l.lock.Lock()

modules/queue/queue_disk_channel.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte
134134

135135
p.lock.Lock()
136136
if p.internal == nil {
137-
err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar)
137+
err := p.setInternal(atShutdown, p.ChannelQueue.handle, p.exemplar)
138138
p.lock.Unlock()
139139
if err != nil {
140140
log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err)
@@ -150,21 +150,21 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte
150150
go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
151151

152152
go func() {
153-
_ = p.ChannelQueue.pool.AddWorkers(p.workers, 0)
153+
_ = p.ChannelQueue.AddWorkers(p.workers, 0)
154154
}()
155155

156156
log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
157157
<-p.closed
158158
log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
159-
p.ChannelQueue.pool.cancel()
159+
p.ChannelQueue.cancel()
160160
p.internal.(*LevelQueue).pool.cancel()
161161
log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
162-
p.ChannelQueue.pool.Wait()
162+
p.ChannelQueue.Wait()
163163
p.internal.(*LevelQueue).pool.Wait()
164164
// Redirect all remaining data in the chan to the internal channel
165165
go func() {
166166
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
167-
for data := range p.ChannelQueue.pool.dataChan {
167+
for data := range p.ChannelQueue.dataChan {
168168
_ = p.internal.Push(data)
169169
}
170170
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
@@ -174,7 +174,20 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte
174174

175175
// Flush flushes the queue and blocks till the queue is empty
176176
func (p *PersistableChannelQueue) Flush(timeout time.Duration) error {
177-
return p.ChannelQueue.pool.Flush(timeout)
177+
return p.ChannelQueue.Flush(timeout)
178+
}
179+
180+
// IsEmpty checks if a queue is empty
181+
func (p *PersistableChannelQueue) IsEmpty() bool {
182+
if !p.ChannelQueue.IsEmpty() {
183+
return false
184+
}
185+
p.lock.Lock()
186+
defer p.lock.Unlock()
187+
if p.internal == nil {
188+
return false
189+
}
190+
return p.internal.IsEmpty()
178191
}
179192

180193
// Shutdown processing this queue

modules/queue/queue_redis.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const RedisQueueType Type = "redis"
2424
type redisClient interface {
2525
RPush(key string, args ...interface{}) *redis.IntCmd
2626
LPop(key string) *redis.StringCmd
27+
LLen(key string) *redis.IntCmd
2728
Ping() *redis.StatusCmd
2829
Close() error
2930
}
@@ -168,6 +169,19 @@ func (r *RedisQueue) Flush(timeout time.Duration) error {
168169
return r.pool.Flush(timeout)
169170
}
170171

172+
// IsEmpty checks if the queue is empty
173+
func (r *RedisQueue) IsEmpty() bool {
174+
if !r.pool.IsEmpty() {
175+
return false
176+
}
177+
length, err := r.client.LLen(r.queueName).Result()
178+
if err != nil {
179+
log.Error("Error whilst getting queue length for %s: Error: %v", r.name, err)
180+
return false
181+
}
182+
return length == 0
183+
}
184+
171185
// Shutdown processing from this queue
172186
func (r *RedisQueue) Shutdown() {
173187
log.Trace("RedisQueue: %s Shutting down", r.name)

modules/queue/queue_wrapped.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"context"
99
"fmt"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"code.gitea.io/gitea/modules/log"
@@ -87,10 +88,11 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
8788
// WrappedQueue wraps a delayed starting queue
8889
type WrappedQueue struct {
8990
delayedStarter
90-
lock sync.Mutex
91-
handle HandlerFunc
92-
exemplar interface{}
93-
channel chan Data
91+
lock sync.Mutex
92+
handle HandlerFunc
93+
exemplar interface{}
94+
channel chan Data
95+
numInQueue int64
9496
}
9597

9698
// NewWrappedQueue will attempt to create a queue of the provided type,
@@ -140,6 +142,7 @@ func (q *WrappedQueue) Push(data Data) error {
140142
if !assignableTo(data, q.exemplar) {
141143
return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
142144
}
145+
atomic.AddInt64(&q.numInQueue, 1)
143146
q.channel <- data
144147
return nil
145148
}
@@ -181,6 +184,20 @@ func (q *WrappedQueue) Flush(timeout time.Duration) error {
181184
}
182185
}
183186

187+
// IsEmpty checks whether the queue is empty
188+
func (q *WrappedQueue) IsEmpty() bool {
189+
val := atomic.LoadInt64(&q.numInQueue)
190+
if val != 0 {
191+
return false
192+
}
193+
q.lock.Lock()
194+
defer q.lock.Unlock()
195+
if q.internal == nil {
196+
return false
197+
}
198+
return q.internal.IsEmpty()
199+
}
200+
184201
// Run starts to run the queue and attempts to create the internal queue
185202
func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
186203
log.Debug("WrappedQueue: %s Starting", q.name)
@@ -195,6 +212,7 @@ func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())
195212
go func() {
196213
for data := range q.channel {
197214
_ = q.internal.Push(data)
215+
atomic.AddInt64(&q.numInQueue, -1)
198216
}
199217
}()
200218
} else {

modules/queue/workerpool.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package queue
77
import (
88
"context"
99
"sync"
10+
"sync/atomic"
1011
"time"
1112

1213
"code.gitea.io/gitea/modules/log"
@@ -30,6 +31,7 @@ type WorkerPool struct {
3031
blockTimeout time.Duration
3132
boostTimeout time.Duration
3233
boostWorkers int
34+
numInQueue int64
3335
}
3436

3537
// WorkerPoolConfiguration is the basic configuration for a WorkerPool
@@ -65,6 +67,7 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
6567
// Push pushes the data to the internal channel
6668
func (p *WorkerPool) Push(data Data) {
6769
p.lock.Lock()
70+
atomic.AddInt64(&p.numInQueue, 1)
6871
if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
6972
p.lock.Unlock()
7073
p.pushBoost(data)
@@ -273,6 +276,7 @@ func (p *WorkerPool) CleanUp(ctx context.Context) {
273276
close(p.dataChan)
274277
for data := range p.dataChan {
275278
p.handle(data)
279+
atomic.AddInt64(&p.numInQueue, -1)
276280
select {
277281
case <-ctx.Done():
278282
log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid)
@@ -290,6 +294,11 @@ func (p *WorkerPool) Flush(timeout time.Duration) error {
290294
return p.FlushWithContext(ctx)
291295
}
292296

297+
// IsEmpty returns if true if the worker queue is empty
298+
func (p *WorkerPool) IsEmpty() bool {
299+
return atomic.LoadInt64(&p.numInQueue) == 0
300+
}
301+
293302
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
294303
// NB: The worker will not be registered with the manager.
295304
func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
@@ -298,6 +307,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
298307
select {
299308
case data := <-p.dataChan:
300309
p.handle(data)
310+
atomic.AddInt64(&p.numInQueue, -1)
301311
case <-p.baseCtx.Done():
302312
return p.baseCtx.Err()
303313
case <-ctx.Done():
@@ -317,6 +327,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
317327
if len(data) > 0 {
318328
log.Trace("Handling: %d data, %v", len(data), data)
319329
p.handle(data...)
330+
atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
320331
}
321332
log.Trace("Worker shutting down")
322333
return
@@ -326,6 +337,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
326337
if len(data) > 0 {
327338
log.Trace("Handling: %d data, %v", len(data), data)
328339
p.handle(data...)
340+
atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
329341
}
330342
log.Trace("Worker shutting down")
331343
return
@@ -334,6 +346,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
334346
if len(data) >= p.batchLength {
335347
log.Trace("Handling: %d data, %v", len(data), data)
336348
p.handle(data...)
349+
atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
337350
data = make([]Data, 0, p.batchLength)
338351
}
339352
default:
@@ -349,6 +362,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
349362
if len(data) > 0 {
350363
log.Trace("Handling: %d data, %v", len(data), data)
351364
p.handle(data...)
365+
atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
352366
}
353367
log.Trace("Worker shutting down")
354368
return
@@ -364,6 +378,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
364378
if len(data) > 0 {
365379
log.Trace("Handling: %d data, %v", len(data), data)
366380
p.handle(data...)
381+
atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
367382
}
368383
log.Trace("Worker shutting down")
369384
return
@@ -372,13 +387,15 @@ func (p *WorkerPool) doWork(ctx context.Context) {
372387
if len(data) >= p.batchLength {
373388
log.Trace("Handling: %d data, %v", len(data), data)
374389
p.handle(data...)
390+
atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
375391
data = make([]Data, 0, p.batchLength)
376392
}
377393
case <-timer.C:
378394
delay = time.Millisecond * 100
379395
if len(data) > 0 {
380396
log.Trace("Handling: %d data, %v", len(data), data)
381397
p.handle(data...)
398+
atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
382399
data = make([]Data, 0, p.batchLength)
383400
}
384401

0 commit comments

Comments
 (0)