Skip to content

Commit 92c2207

Browse files
committed
Move TestPullRequests over to use UniqueQueue
1 parent 22c4563 commit 92c2207

File tree

4 files changed

+143
-59
lines changed

4 files changed

+143
-59
lines changed

modules/setting/queue.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,17 @@ func NewQueueService() {
149149
if _, ok := sectionMap["LENGTH"]; !ok {
150150
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
151151
}
152+
153+
// Handle the old test pull requests configuration
154+
// Please note this will be a unique queue
155+
section = Cfg.Section("queue.test_pull_requests")
156+
sectionMap = map[string]bool{}
157+
for _, key := range section.Keys() {
158+
sectionMap[key.Name()] = true
159+
}
160+
if _, ok := sectionMap["LENGTH"]; !ok {
161+
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
162+
}
152163
}
153164

154165
// ParseQueueConnStr parses a queue connection string

routers/init.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ func GlobalInit(ctx context.Context) {
113113
code_indexer.Init()
114114
mirror_service.InitSyncMirrors()
115115
webhook.InitDeliverHooks()
116-
pull_service.Init()
116+
if err := pull_service.Init(); err != nil {
117+
log.Fatal("Failed to initialize test pull requests queue: %v", err)
118+
}
117119
if err := task.Init(); err != nil {
118120
log.Fatal("Failed to initialize task scheduler: %v", err)
119121
}

services/pull/check.go

Lines changed: 73 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,40 @@ import (
1010
"fmt"
1111
"io/ioutil"
1212
"os"
13+
"strconv"
1314
"strings"
1415

1516
"code.gitea.io/gitea/models"
1617
"code.gitea.io/gitea/modules/git"
1718
"code.gitea.io/gitea/modules/graceful"
1819
"code.gitea.io/gitea/modules/log"
1920
"code.gitea.io/gitea/modules/notification"
20-
"code.gitea.io/gitea/modules/setting"
21-
"code.gitea.io/gitea/modules/sync"
21+
"code.gitea.io/gitea/modules/queue"
2222
"code.gitea.io/gitea/modules/timeutil"
2323

2424
"github.com/unknwon/com"
2525
)
2626

27-
// pullRequestQueue represents a queue to handle update pull request tests
28-
var pullRequestQueue = sync.NewUniqueQueue(setting.Repository.PullRequestQueueLength)
27+
// prQueue represents a queue to handle update pull request tests
28+
var prQueue queue.UniqueQueue
2929

3030
// AddToTaskQueue adds itself to pull request test task queue.
3131
func AddToTaskQueue(pr *models.PullRequest) {
32-
go pullRequestQueue.AddFunc(pr.ID, func() {
33-
pr.Status = models.PullRequestStatusChecking
34-
if err := pr.UpdateCols("status"); err != nil {
35-
log.Error("AddToTaskQueue.UpdateCols[%d].(add to queue): %v", pr.ID, err)
32+
go func() {
33+
err := prQueue.PushFunc(strconv.FormatInt(pr.ID, 10), func() error {
34+
pr.Status = models.PullRequestStatusChecking
35+
err := pr.UpdateCols("status")
36+
if err != nil {
37+
log.Error("AddToTaskQueue.UpdateCols[%d].(add to queue): %v", pr.ID, err)
38+
} else {
39+
log.Trace("Adding PR ID: %d to the test pull requests queue", pr.ID)
40+
}
41+
return err
42+
})
43+
if err != nil && err != queue.ErrAlreadyInQueue {
44+
log.Error("Error adding prID %d to the test pull requests queue: %v", pr.ID, err)
3645
}
37-
})
46+
}()
3847
}
3948

4049
// checkAndUpdateStatus checks if pull request is possible to leaving checking status,
@@ -46,7 +55,12 @@ func checkAndUpdateStatus(pr *models.PullRequest) {
4655
}
4756

4857
// Make sure there is no waiting test to process before leaving the checking status.
49-
if !pullRequestQueue.Exist(pr.ID) {
58+
has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10))
59+
if err != nil {
60+
log.Error("Unable to check if the queue is waiting to reprocess pr.ID %d. Error: %v", pr.ID, err)
61+
}
62+
63+
if !has {
5064
if err := pr.UpdateCols("status, conflicted_files"); err != nil {
5165
log.Error("Update[%d]: %v", pr.ID, err)
5266
}
@@ -155,61 +169,65 @@ func manuallyMerged(pr *models.PullRequest) bool {
155169
return false
156170
}
157171

158-
// TestPullRequests checks and tests untested patches of pull requests.
159-
// TODO: test more pull requests at same time.
160-
func TestPullRequests(ctx context.Context) {
161-
162-
go func() {
163-
prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking)
164-
if err != nil {
165-
log.Error("Find Checking PRs: %v", err)
172+
// InitializePullRequests checks and tests untested patches of pull requests.
173+
func InitializePullRequests(ctx context.Context) {
174+
prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking)
175+
if err != nil {
176+
log.Error("Find Checking PRs: %v", err)
177+
return
178+
}
179+
for _, prID := range prs {
180+
select {
181+
case <-ctx.Done():
166182
return
167-
}
168-
for _, prID := range prs {
169-
select {
170-
case <-ctx.Done():
171-
return
172-
default:
173-
pullRequestQueue.Add(prID)
183+
default:
184+
if err := prQueue.PushFunc(strconv.FormatInt(prID, 10), func() error {
185+
log.Trace("Adding PR ID: %d to the test pull requests queue", prID)
186+
return nil
187+
}); err != nil {
188+
log.Error("Error adding prID: %s to the test pull requests queue %v", prID, err)
174189
}
175190
}
176-
}()
191+
}
192+
}
177193

178-
// Start listening on new test requests.
179-
for {
180-
select {
181-
case prID := <-pullRequestQueue.Queue():
182-
log.Trace("TestPullRequests[%v]: processing test task", prID)
183-
pullRequestQueue.Remove(prID)
194+
// handle passed PR IDs and test the PRs
195+
func handle(data ...queue.Data) {
196+
for _, datum := range data {
197+
prID := datum.(string)
198+
id := com.StrTo(prID).MustInt64()
184199

185-
id := com.StrTo(prID).MustInt64()
200+
log.Trace("Testing PR ID %d from the test pull requests queue", id)
186201

187-
pr, err := models.GetPullRequestByID(id)
188-
if err != nil {
189-
log.Error("GetPullRequestByID[%s]: %v", prID, err)
190-
continue
191-
} else if pr.Status != models.PullRequestStatusChecking {
192-
continue
193-
} else if manuallyMerged(pr) {
194-
continue
195-
} else if err = TestPatch(pr); err != nil {
196-
log.Error("testPatch[%d]: %v", pr.ID, err)
197-
pr.Status = models.PullRequestStatusError
198-
if err := pr.UpdateCols("status"); err != nil {
199-
log.Error("update pr [%d] status to PullRequestStatusError failed: %v", pr.ID, err)
200-
}
201-
continue
202+
pr, err := models.GetPullRequestByID(id)
203+
if err != nil {
204+
log.Error("GetPullRequestByID[%s]: %v", prID, err)
205+
continue
206+
} else if pr.Status != models.PullRequestStatusChecking {
207+
continue
208+
} else if manuallyMerged(pr) {
209+
continue
210+
} else if err = TestPatch(pr); err != nil {
211+
log.Error("testPatch[%d]: %v", pr.ID, err)
212+
pr.Status = models.PullRequestStatusError
213+
if err := pr.UpdateCols("status"); err != nil {
214+
log.Error("update pr [%d] status to PullRequestStatusError failed: %v", pr.ID, err)
202215
}
203-
checkAndUpdateStatus(pr)
204-
case <-ctx.Done():
205-
pullRequestQueue.Close()
206-
log.Info("PID: %d Pull Request testing shutdown", os.Getpid())
207-
return
216+
continue
208217
}
218+
checkAndUpdateStatus(pr)
209219
}
210220
}
211221

212222
// Init runs the task queue to test all the checking status pull requests
213-
func Init() {
214-
go graceful.GetManager().RunWithShutdownContext(TestPullRequests)
223+
func Init() error {
224+
prQueue = queue.CreateUniqueQueue("test_pull_requests", handle, "").(queue.UniqueQueue)
225+
226+
if prQueue == nil {
227+
return fmt.Errorf("Unable to create test_pull_requests Queue")
228+
}
229+
230+
go graceful.GetManager().RunWithShutdownFns(prQueue.Run)
231+
go graceful.GetManager().RunWithShutdownContext(InitializePullRequests)
232+
return nil
215233
}

services/pull/check_test.go

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,82 @@
66
package pull
77

88
import (
9+
"context"
910
"strconv"
1011
"testing"
1112
"time"
1213

1314
"code.gitea.io/gitea/models"
15+
"code.gitea.io/gitea/modules/queue"
1416

1517
"github.com/stretchr/testify/assert"
18+
"github.com/unknwon/com"
1619
)
1720

1821
func TestPullRequest_AddToTaskQueue(t *testing.T) {
1922
assert.NoError(t, models.PrepareTestDatabase())
2023

24+
idChan := make(chan int64, 10)
25+
26+
q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) {
27+
for _, datum := range data {
28+
prID := datum.(string)
29+
id := com.StrTo(prID).MustInt64()
30+
idChan <- id
31+
}
32+
}, queue.ChannelUniqueQueueConfiguration{
33+
WorkerPoolConfiguration: queue.WorkerPoolConfiguration{
34+
QueueLength: 10,
35+
BatchLength: 1,
36+
},
37+
Workers: 1,
38+
Name: "temporary-queue",
39+
}, "")
40+
assert.NoError(t, err)
41+
42+
queueShutdown := []func(){}
43+
queueTerminate := []func(){}
44+
45+
prQueue = q.(queue.UniqueQueue)
46+
2147
pr := models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest)
2248
AddToTaskQueue(pr)
2349

50+
assert.Eventually(t, func() bool {
51+
pr = models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest)
52+
return pr.Status == models.PullRequestStatusChecking
53+
}, 1*time.Second, 100*time.Millisecond)
54+
55+
has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10))
56+
assert.True(t, has)
57+
assert.NoError(t, err)
58+
59+
prQueue.Run(func(_ context.Context, shutdown func()) {
60+
queueShutdown = append(queueShutdown, shutdown)
61+
}, func(_ context.Context, terminate func()) {
62+
queueTerminate = append(queueTerminate, terminate)
63+
})
64+
2465
select {
25-
case id := <-pullRequestQueue.Queue():
26-
assert.EqualValues(t, strconv.FormatInt(pr.ID, 10), id)
66+
case id := <-idChan:
67+
assert.EqualValues(t, pr.ID, id)
2768
case <-time.After(time.Second):
2869
assert.Fail(t, "Timeout: nothing was added to pullRequestQueue")
2970
}
3071

31-
assert.True(t, pullRequestQueue.Exist(pr.ID))
72+
has, err = prQueue.Has(strconv.FormatInt(pr.ID, 10))
73+
assert.False(t, has)
74+
assert.NoError(t, err)
75+
3276
pr = models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest)
3377
assert.Equal(t, models.PullRequestStatusChecking, pr.Status)
78+
79+
for _, callback := range queueShutdown {
80+
callback()
81+
}
82+
for _, callback := range queueTerminate {
83+
callback()
84+
}
85+
86+
prQueue = nil
3487
}

0 commit comments

Comments
 (0)