Skip to content

Commit d508f4e

Browse files
committed
Notification: move to use a queue
1 parent 34609c9 commit d508f4e

19 files changed

+2354
-134
lines changed

integrations/integration_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ func initIntegrationTest() {
178178
defer db.Close()
179179
}
180180
routers.GlobalInit(graceful.GetManager().HammerContext())
181+
NotifierListenerInit()
181182
}
182183

183184
func prepareTestEnv(t testing.TB, skip ...int) func() {
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright 2019 The Gitea Authors. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package integrations
6+
7+
import (
8+
"encoding/json"
9+
"reflect"
10+
"sync"
11+
"testing"
12+
13+
"code.gitea.io/gitea/models"
14+
"code.gitea.io/gitea/modules/log"
15+
"code.gitea.io/gitea/modules/notification"
16+
"code.gitea.io/gitea/modules/notification/base"
17+
"code.gitea.io/gitea/modules/queue"
18+
)
19+
20+
var notifierListener *NotifierListener
21+
22+
var once = sync.Once{}
23+
24+
type NotifierListener struct {
25+
lock sync.RWMutex
26+
callbacks map[string][]*func(string, [][]byte)
27+
notifier base.Notifier
28+
}
29+
30+
func NotifierListenerInit() {
31+
once.Do(func() {
32+
notifierListener = &NotifierListener{
33+
callbacks: map[string][]*func(string, [][]byte){},
34+
}
35+
notifierListener.notifier = base.NewQueueNotifierWithHandle("test-notifier", notifierListener.handle)
36+
notification.RegisterNotifier(notifierListener.notifier)
37+
})
38+
}
39+
40+
// Register will register a callback with the provided notifier function
41+
func (n *NotifierListener) Register(functionName string, callback *func(string, [][]byte)) {
42+
n.lock.Lock()
43+
n.callbacks[functionName] = append(n.callbacks[functionName], callback)
44+
n.lock.Unlock()
45+
}
46+
47+
// Deregister will remove the provided callback from the provided notifier function
48+
func (n *NotifierListener) Deregister(functionName string, callback *func(string, [][]byte)) {
49+
n.lock.Lock()
50+
found := -1
51+
for i, callbackPtr := range n.callbacks[functionName] {
52+
if callbackPtr == callback {
53+
found = i
54+
break
55+
}
56+
}
57+
if found > -1 {
58+
n.callbacks[functionName] = append(n.callbacks[functionName][0:found], n.callbacks[functionName][found+1:]...)
59+
}
60+
n.lock.Unlock()
61+
}
62+
63+
// RegisterChannel will register a provided channel with function name and return a function to deregister it
64+
func (n *NotifierListener) RegisterChannel(name string, channel chan<- interface{}, argNumber int, exemplar interface{}) (deregister func()) {
65+
t := reflect.TypeOf(exemplar)
66+
callback := func(_ string, args [][]byte) {
67+
n := reflect.New(t).Elem()
68+
err := json.Unmarshal(args[argNumber], n.Addr().Interface())
69+
if err != nil {
70+
log.Error("Wrong Argument passed to register channel: %v ", err)
71+
}
72+
channel <- n.Interface()
73+
}
74+
n.Register(name, &callback)
75+
76+
return func() {
77+
n.Deregister(name, &callback)
78+
}
79+
}
80+
81+
func (n *NotifierListener) handle(data ...queue.Data) {
82+
n.lock.RLock()
83+
defer n.lock.RUnlock()
84+
for _, datum := range data {
85+
call := datum.(*base.FunctionCall)
86+
callbacks, ok := n.callbacks[call.Name]
87+
if ok && len(callbacks) > 0 {
88+
for _, callback := range callbacks {
89+
(*callback)(call.Name, call.Args)
90+
}
91+
}
92+
}
93+
}
94+
95+
func TestNotifierListener(t *testing.T) {
96+
defer prepareTestEnv(t)()
97+
98+
createPullNotified := make(chan interface{}, 10)
99+
deregister := notifierListener.RegisterChannel("NotifyNewPullRequest", createPullNotified, 0, &models.PullRequest{})
100+
bs, _ := json.Marshal(&models.PullRequest{})
101+
notifierListener.handle(&base.FunctionCall{
102+
Name: "NotifyNewPullRequest",
103+
Args: [][]byte{
104+
bs,
105+
},
106+
})
107+
<-createPullNotified
108+
109+
notifierListener.notifier.NotifyNewPullRequest(&models.PullRequest{})
110+
<-createPullNotified
111+
112+
notification.NotifyNewPullRequest(&models.PullRequest{})
113+
<-createPullNotified
114+
115+
deregister()
116+
close(createPullNotified)
117+
118+
notification.NotifyNewPullRequest(&models.PullRequest{})
119+
// would panic if not deregistered
120+
}

integrations/pull_merge_test.go

Lines changed: 64 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,12 @@ func testPullCleanUp(t *testing.T, session *TestSession, user, repo, pullnum str
6161

6262
func TestPullMerge(t *testing.T) {
6363
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
64-
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
65-
assert.NoError(t, err)
66-
hookTasksLenBefore := len(hookTasks)
64+
mergePullNotified := make(chan interface{}, 10)
65+
deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", mergePullNotified, 0, &models.PullRequest{})
66+
defer func() {
67+
deferable()
68+
close(mergePullNotified)
69+
}()
6770

6871
session := loginUser(t, "user1")
6972
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
@@ -73,19 +76,44 @@ func TestPullMerge(t *testing.T) {
7376

7477
elem := strings.Split(test.RedirectURL(resp), "/")
7578
assert.EqualValues(t, "pulls", elem[3])
79+
7680
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleMerge)
7781

78-
hookTasks, err = models.HookTasks(1, 1)
79-
assert.NoError(t, err)
80-
assert.Len(t, hookTasks, hookTasksLenBefore+1)
82+
var prInterface interface{}
83+
select {
84+
case prInterface = <-mergePullNotified:
85+
case <-time.After(500 * time.Millisecond):
86+
assert.Fail(t, "Took too long to notify!")
87+
}
88+
89+
pr := prInterface.(*models.PullRequest)
90+
pr.LoadBaseRepo()
91+
pr.LoadHeadRepo()
92+
pr.BaseRepo.MustOwner()
93+
pr.HeadRepo.MustOwner()
94+
95+
assert.EqualValues(t, "user1", pr.HeadRepo.Owner.Name)
96+
assert.EqualValues(t, "repo1", pr.HeadRepo.Name)
97+
assert.EqualValues(t, "user2", pr.BaseRepo.Owner.Name)
98+
assert.EqualValues(t, "repo1", pr.BaseRepo.Name)
99+
100+
time.Sleep(100 * time.Millisecond)
101+
select {
102+
case prInterface = <-mergePullNotified:
103+
assert.Fail(t, "Should only have one pull create notification: %v", prInterface)
104+
default:
105+
}
81106
})
82107
}
83108

84109
func TestPullRebase(t *testing.T) {
85110
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
86-
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
87-
assert.NoError(t, err)
88-
hookTasksLenBefore := len(hookTasks)
111+
mergePullNotified := make(chan interface{}, 10)
112+
deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", mergePullNotified, 0, &models.PullRequest{})
113+
defer func() {
114+
deferable()
115+
close(mergePullNotified)
116+
}()
89117

90118
session := loginUser(t, "user1")
91119
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
@@ -96,21 +124,22 @@ func TestPullRebase(t *testing.T) {
96124
elem := strings.Split(test.RedirectURL(resp), "/")
97125
assert.EqualValues(t, "pulls", elem[3])
98126
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleRebase)
99-
100-
hookTasks, err = models.HookTasks(1, 1)
101-
assert.NoError(t, err)
102-
assert.Len(t, hookTasks, hookTasksLenBefore+1)
127+
select {
128+
case <-mergePullNotified:
129+
case <-time.After(500 * time.Millisecond):
130+
assert.Fail(t, "Took too long to notify!")
131+
}
103132
})
104133
}
105134

106135
func TestPullRebaseMerge(t *testing.T) {
107136
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
108-
defer prepareTestEnv(t)()
109-
110-
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
111-
assert.NoError(t, err)
112-
hookTasksLenBefore := len(hookTasks)
113-
137+
mergePullNotified := make(chan interface{}, 10)
138+
deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", mergePullNotified, 0, &models.PullRequest{})
139+
defer func() {
140+
deferable()
141+
close(mergePullNotified)
142+
}()
114143
session := loginUser(t, "user1")
115144
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
116145
testEditFile(t, session, "user1", "repo1", "master", "README.md", "Hello, World (Edited)\n")
@@ -121,19 +150,22 @@ func TestPullRebaseMerge(t *testing.T) {
121150
assert.EqualValues(t, "pulls", elem[3])
122151
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleRebaseMerge)
123152

124-
hookTasks, err = models.HookTasks(1, 1)
125-
assert.NoError(t, err)
126-
assert.Len(t, hookTasks, hookTasksLenBefore+1)
153+
select {
154+
case <-mergePullNotified:
155+
case <-time.After(500 * time.Millisecond):
156+
assert.Fail(t, "Took too long to notify!")
157+
}
127158
})
128159
}
129160

130161
func TestPullSquash(t *testing.T) {
131162
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
132-
defer prepareTestEnv(t)()
133-
134-
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
135-
assert.NoError(t, err)
136-
hookTasksLenBefore := len(hookTasks)
163+
mergePullNotified := make(chan interface{}, 10)
164+
deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", mergePullNotified, 0, &models.PullRequest{})
165+
defer func() {
166+
deferable()
167+
close(mergePullNotified)
168+
}()
137169

138170
session := loginUser(t, "user1")
139171
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
@@ -146,15 +178,16 @@ func TestPullSquash(t *testing.T) {
146178
assert.EqualValues(t, "pulls", elem[3])
147179
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleSquash)
148180

149-
hookTasks, err = models.HookTasks(1, 1)
150-
assert.NoError(t, err)
151-
assert.Len(t, hookTasks, hookTasksLenBefore+1)
181+
select {
182+
case <-mergePullNotified:
183+
case <-time.After(500 * time.Millisecond):
184+
assert.Fail(t, "Took too long to notify!")
185+
}
152186
})
153187
}
154188

155189
func TestPullCleanUpAfterMerge(t *testing.T) {
156190
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
157-
defer prepareTestEnv(t)()
158191
session := loginUser(t, "user1")
159192
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
160193
testEditFileToNewBranch(t, session, "user1", "repo1", "master", "feature/test", "README.md", "Hello, World (Edited)\n")
@@ -190,7 +223,6 @@ func TestPullCleanUpAfterMerge(t *testing.T) {
190223

191224
func TestCantMergeWorkInProgress(t *testing.T) {
192225
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
193-
defer prepareTestEnv(t)()
194226
session := loginUser(t, "user1")
195227
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
196228
testEditFile(t, session, "user1", "repo1", "master", "README.md", "Hello, World (Edited)\n")
@@ -212,7 +244,6 @@ func TestCantMergeWorkInProgress(t *testing.T) {
212244

213245
func TestCantMergeConflict(t *testing.T) {
214246
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
215-
defer prepareTestEnv(t)()
216247
session := loginUser(t, "user1")
217248
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
218249
testEditFileToNewBranch(t, session, "user1", "repo1", "master", "conflict", "README.md", "Hello, World (Edited Once)\n")
@@ -258,7 +289,6 @@ func TestCantMergeConflict(t *testing.T) {
258289

259290
func TestCantMergeUnrelated(t *testing.T) {
260291
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
261-
defer prepareTestEnv(t)()
262292
session := loginUser(t, "user1")
263293
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
264294
testEditFileToNewBranch(t, session, "user1", "repo1", "master", "base", "README.md", "Hello, World (Edited Twice)\n")

integrations/sqlite.ini

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,10 @@ INTERNAL_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE0OTI3OTU5ODN9.O
8181
[oauth2]
8282
JWT_SECRET = KZb_QLUd4fYVyxetjxC4eZkrBgWM2SndOOWDNtgUUko
8383

84+
[queue]
85+
TYPE=channel
86+
87+
[queue.test-notifier]
88+
BATCH_LENGTH=1
89+
LENGTH=20
90+

models/issue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type Issue struct {
4747
IsClosed bool `xorm:"INDEX"`
4848
IsRead bool `xorm:"-"`
4949
IsPull bool `xorm:"INDEX"` // Indicates whether is a pull request or not.
50-
PullRequest *PullRequest `xorm:"-"`
50+
PullRequest *PullRequest `xorm:"-" json:"-"`
5151
NumComments int
5252
Ref string
5353

models/pull.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,9 @@ func (pr *PullRequest) apiFormat(e Engine) *api.PullRequest {
210210
log.Error("loadRepo[%d]: %v", pr.ID, err)
211211
return nil
212212
}
213+
if pr.Issue.PullRequest == nil {
214+
pr.Issue.PullRequest = pr
215+
}
213216
apiIssue := pr.Issue.apiFormat(e)
214217
if pr.BaseRepo == nil {
215218
pr.BaseRepo, err = getRepositoryByID(e, pr.BaseRepoID)

0 commit comments

Comments
 (0)