Skip to content

Commit 2c90338

Browse files
authored
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0 This adds functionality for Unique Queues * Add UniqueQueue interface and functions to create them * Add UniqueQueue implementations * Move TestPullRequests over to use UniqueQueue * Reduce code duplication * Add bytefifos * Ensure invalid types are logged * Fix close race in PersistableChannelQueue Shutdown
1 parent b491424 commit 2c90338

29 files changed

+1934
-500
lines changed

docs/content/doc/advanced/config-cheat-sheet.en-us.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,10 @@ relation to port exhaustion.
252252
- `BATCH_LENGTH`: **20**: Batch data before passing to the handler
253253
- `CONN_STR`: **addrs=127.0.0.1:6379 db=0**: Connection string for the redis queue type.
254254
- `QUEUE_NAME`: **_queue**: The suffix for default redis queue name. Individual queues will default to **`name`**`QUEUE_NAME` but can be overriden in the specific `queue.name` section.
255+
- `SET_NAME`: **_unique**: The suffix that will added to the default redis
256+
set name for unique queues. Individual queues will default to
257+
**`name`**`QUEUE_NAME`_`SET_NAME`_ but can be overridden in the specific
258+
`queue.name` section.
255259
- `WRAP_IF_NECESSARY`: **true**: Will wrap queues with a timeoutable queue if the selected queue is not ready to be created - (Only relevant for the level queue.)
256260
- `MAX_ATTEMPTS`: **10**: Maximum number of attempts to create the wrapped queue
257261
- `TIMEOUT`: **GRACEFUL_HAMMER_TIME + 30s**: Timeout the creation of the wrapped queue if it takes longer than this to create.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.13
44

55
require (
66
cloud.google.com/go v0.45.0 // indirect
7-
gitea.com/lunny/levelqueue v0.1.0
7+
gitea.com/lunny/levelqueue v0.2.0
88
gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b
99
gitea.com/macaron/cache v0.0.0-20190822004001-a6e7fee4ee76
1010
gitea.com/macaron/captcha v0.0.0-20190822015246-daa973478bae

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbf
1111
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
1212
gitea.com/lunny/levelqueue v0.1.0 h1:7wMk0VH6mvKN6vZEZCy9nUDgRmdPLgeNrm1NkW8EHNk=
1313
gitea.com/lunny/levelqueue v0.1.0/go.mod h1:G7hVb908t0Bl0uk7zGSg14fyzNtxgtD9Shf04wkMK7s=
14+
gitea.com/lunny/levelqueue v0.2.0 h1:lR/5EAwQtFcn5YvPEkNMw0p9pAy2/O2nSP5ImECLA2E=
15+
gitea.com/lunny/levelqueue v0.2.0/go.mod h1:G7hVb908t0Bl0uk7zGSg14fyzNtxgtD9Shf04wkMK7s=
1416
gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b h1:vXt85uYV17KURaUlhU7v4GbCShkqRZDSfo0TkC0YCjQ=
1517
gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b/go.mod h1:Cxadig6POWpPYYSfg23E7jo35Yf0yvsdC1lifoKWmPo=
1618
gitea.com/macaron/cache v0.0.0-20190822004001-a6e7fee4ee76 h1:mMsMEg90c5KXQgRWsH8D6GHXfZIW1RAe5S9VYIb12lM=

modules/queue/bytefifo.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2020 The Gitea Authors. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package queue
6+
7+
// ByteFIFO defines a FIFO that takes a byte array
8+
type ByteFIFO interface {
9+
// Len returns the length of the fifo
10+
Len() int64
11+
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
12+
PushFunc(data []byte, fn func() error) error
13+
// Pop pops data from the start of the fifo
14+
Pop() ([]byte, error)
15+
// Close this fifo
16+
Close() error
17+
}
18+
19+
// UniqueByteFIFO defines a FIFO that Uniques its contents
20+
type UniqueByteFIFO interface {
21+
ByteFIFO
22+
// Has returns whether the fifo contains this data
23+
Has(data []byte) (bool, error)
24+
}
25+
26+
var _ (ByteFIFO) = &DummyByteFIFO{}
27+
28+
// DummyByteFIFO represents a dummy fifo
29+
type DummyByteFIFO struct{}
30+
31+
// PushFunc returns nil
32+
func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error {
33+
return nil
34+
}
35+
36+
// Pop returns nil
37+
func (*DummyByteFIFO) Pop() ([]byte, error) {
38+
return []byte{}, nil
39+
}
40+
41+
// Close returns nil
42+
func (*DummyByteFIFO) Close() error {
43+
return nil
44+
}
45+
46+
// Len is always 0
47+
func (*DummyByteFIFO) Len() int64 {
48+
return 0
49+
}
50+
51+
var _ (UniqueByteFIFO) = &DummyUniqueByteFIFO{}
52+
53+
// DummyUniqueByteFIFO represents a dummy unique fifo
54+
type DummyUniqueByteFIFO struct {
55+
DummyByteFIFO
56+
}
57+
58+
// Has always returns false
59+
func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) {
60+
return false, nil
61+
}

modules/queue/queue.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,25 +74,35 @@ type DummyQueue struct {
7474
}
7575

7676
// Run does nothing
77-
func (b *DummyQueue) Run(_, _ func(context.Context, func())) {}
77+
func (*DummyQueue) Run(_, _ func(context.Context, func())) {}
7878

7979
// Push fakes a push of data to the queue
80-
func (b *DummyQueue) Push(Data) error {
80+
func (*DummyQueue) Push(Data) error {
8181
return nil
8282
}
8383

84+
// PushFunc fakes a push of data to the queue with a function. The function is never run.
85+
func (*DummyQueue) PushFunc(Data, func() error) error {
86+
return nil
87+
}
88+
89+
// Has always returns false as this queue never does anything
90+
func (*DummyQueue) Has(Data) (bool, error) {
91+
return false, nil
92+
}
93+
8494
// Flush always returns nil
85-
func (b *DummyQueue) Flush(time.Duration) error {
95+
func (*DummyQueue) Flush(time.Duration) error {
8696
return nil
8797
}
8898

8999
// FlushWithContext always returns nil
90-
func (b *DummyQueue) FlushWithContext(context.Context) error {
100+
func (*DummyQueue) FlushWithContext(context.Context) error {
91101
return nil
92102
}
93103

94104
// IsEmpty asserts that the queue is empty
95-
func (b *DummyQueue) IsEmpty() bool {
105+
func (*DummyQueue) IsEmpty() bool {
96106
return true
97107
}
98108

modules/queue/queue_bytefifo.go

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
// Copyright 2020 The Gitea Authors. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package queue
6+
7+
import (
8+
"context"
9+
"encoding/json"
10+
"fmt"
11+
"sync"
12+
"time"
13+
14+
"code.gitea.io/gitea/modules/log"
15+
)
16+
17+
// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
18+
type ByteFIFOQueueConfiguration struct {
19+
WorkerPoolConfiguration
20+
Workers int
21+
Name string
22+
}
23+
24+
var _ (Queue) = &ByteFIFOQueue{}
25+
26+
// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
27+
type ByteFIFOQueue struct {
28+
*WorkerPool
29+
byteFIFO ByteFIFO
30+
typ Type
31+
closed chan struct{}
32+
terminated chan struct{}
33+
exemplar interface{}
34+
workers int
35+
name string
36+
lock sync.Mutex
37+
}
38+
39+
// NewByteFIFOQueue creates a new ByteFIFOQueue
40+
func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) {
41+
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
42+
if err != nil {
43+
return nil, err
44+
}
45+
config := configInterface.(ByteFIFOQueueConfiguration)
46+
47+
return &ByteFIFOQueue{
48+
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
49+
byteFIFO: byteFIFO,
50+
typ: typ,
51+
closed: make(chan struct{}),
52+
terminated: make(chan struct{}),
53+
exemplar: exemplar,
54+
workers: config.Workers,
55+
name: config.Name,
56+
}, nil
57+
}
58+
59+
// Name returns the name of this queue
60+
func (q *ByteFIFOQueue) Name() string {
61+
return q.name
62+
}
63+
64+
// Push pushes data to the fifo
65+
func (q *ByteFIFOQueue) Push(data Data) error {
66+
return q.PushFunc(data, nil)
67+
}
68+
69+
// PushFunc pushes data to the fifo
70+
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
71+
if !assignableTo(data, q.exemplar) {
72+
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
73+
}
74+
bs, err := json.Marshal(data)
75+
if err != nil {
76+
return err
77+
}
78+
return q.byteFIFO.PushFunc(bs, fn)
79+
}
80+
81+
// IsEmpty checks if the queue is empty
82+
func (q *ByteFIFOQueue) IsEmpty() bool {
83+
q.lock.Lock()
84+
defer q.lock.Unlock()
85+
if !q.WorkerPool.IsEmpty() {
86+
return false
87+
}
88+
return q.byteFIFO.Len() == 0
89+
}
90+
91+
// Run runs the bytefifo queue
92+
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
93+
atShutdown(context.Background(), q.Shutdown)
94+
atTerminate(context.Background(), q.Terminate)
95+
log.Debug("%s: %s Starting", q.typ, q.name)
96+
97+
go func() {
98+
_ = q.AddWorkers(q.workers, 0)
99+
}()
100+
101+
go q.readToChan()
102+
103+
log.Trace("%s: %s Waiting til closed", q.typ, q.name)
104+
<-q.closed
105+
log.Trace("%s: %s Waiting til done", q.typ, q.name)
106+
q.Wait()
107+
108+
log.Trace("%s: %s Waiting til cleaned", q.typ, q.name)
109+
ctx, cancel := context.WithCancel(context.Background())
110+
atTerminate(ctx, cancel)
111+
q.CleanUp(ctx)
112+
cancel()
113+
}
114+
115+
func (q *ByteFIFOQueue) readToChan() {
116+
for {
117+
select {
118+
case <-q.closed:
119+
// tell the pool to shutdown.
120+
q.cancel()
121+
return
122+
default:
123+
q.lock.Lock()
124+
bs, err := q.byteFIFO.Pop()
125+
if err != nil {
126+
q.lock.Unlock()
127+
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
128+
time.Sleep(time.Millisecond * 100)
129+
continue
130+
}
131+
132+
if len(bs) == 0 {
133+
q.lock.Unlock()
134+
time.Sleep(time.Millisecond * 100)
135+
continue
136+
}
137+
138+
data, err := unmarshalAs(bs, q.exemplar)
139+
if err != nil {
140+
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
141+
q.lock.Unlock()
142+
time.Sleep(time.Millisecond * 100)
143+
continue
144+
}
145+
146+
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
147+
q.WorkerPool.Push(data)
148+
q.lock.Unlock()
149+
}
150+
}
151+
}
152+
153+
// Shutdown processing from this queue
154+
func (q *ByteFIFOQueue) Shutdown() {
155+
log.Trace("%s: %s Shutting down", q.typ, q.name)
156+
q.lock.Lock()
157+
select {
158+
case <-q.closed:
159+
default:
160+
close(q.closed)
161+
}
162+
q.lock.Unlock()
163+
log.Debug("%s: %s Shutdown", q.typ, q.name)
164+
}
165+
166+
// Terminate this queue and close the queue
167+
func (q *ByteFIFOQueue) Terminate() {
168+
log.Trace("%s: %s Terminating", q.typ, q.name)
169+
q.Shutdown()
170+
q.lock.Lock()
171+
select {
172+
case <-q.terminated:
173+
q.lock.Unlock()
174+
return
175+
default:
176+
}
177+
close(q.terminated)
178+
q.lock.Unlock()
179+
if log.IsDebug() {
180+
log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len())
181+
}
182+
if err := q.byteFIFO.Close(); err != nil {
183+
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
184+
}
185+
log.Debug("%s: %s Terminated", q.typ, q.name)
186+
}
187+
188+
var _ (UniqueQueue) = &ByteFIFOUniqueQueue{}
189+
190+
// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo
191+
type ByteFIFOUniqueQueue struct {
192+
ByteFIFOQueue
193+
}
194+
195+
// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue
196+
func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) {
197+
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
198+
if err != nil {
199+
return nil, err
200+
}
201+
config := configInterface.(ByteFIFOQueueConfiguration)
202+
203+
return &ByteFIFOUniqueQueue{
204+
ByteFIFOQueue: ByteFIFOQueue{
205+
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
206+
byteFIFO: byteFIFO,
207+
typ: typ,
208+
closed: make(chan struct{}),
209+
terminated: make(chan struct{}),
210+
exemplar: exemplar,
211+
workers: config.Workers,
212+
name: config.Name,
213+
},
214+
}, nil
215+
}
216+
217+
// Has checks if the provided data is in the queue
218+
func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
219+
if !assignableTo(data, q.exemplar) {
220+
return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
221+
}
222+
bs, err := json.Marshal(data)
223+
if err != nil {
224+
return false, err
225+
}
226+
return q.byteFIFO.(UniqueByteFIFO).Has(bs)
227+
}

0 commit comments

Comments
 (0)