Skip to content

Commit 1be49fd

Browse files
authored
Improve retrying index issues (#27554)
Fix #27540
1 parent cddf245 commit 1be49fd

File tree

5 files changed

+52
-40
lines changed

5 files changed

+52
-40
lines changed

modules/indexer/issues/indexer.go

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -204,12 +204,13 @@ func getIssueIndexerQueueHandler(ctx context.Context) func(items ...*IndexerMeta
204204
func populateIssueIndexer(ctx context.Context) {
205205
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: PopulateIssueIndexer", process.SystemProcessType, true)
206206
defer finished()
207-
if err := PopulateIssueIndexer(ctx, true); err != nil {
207+
ctx = contextWithKeepRetry(ctx) // keep retrying since it's a background task
208+
if err := PopulateIssueIndexer(ctx); err != nil {
208209
log.Error("Issue indexer population failed: %v", err)
209210
}
210211
}
211212

212-
func PopulateIssueIndexer(ctx context.Context, keepRetrying bool) error {
213+
func PopulateIssueIndexer(ctx context.Context) error {
213214
for page := 1; ; page++ {
214215
select {
215216
case <-ctx.Done():
@@ -232,20 +233,8 @@ func PopulateIssueIndexer(ctx context.Context, keepRetrying bool) error {
232233
}
233234

234235
for _, repo := range repos {
235-
for {
236-
select {
237-
case <-ctx.Done():
238-
return fmt.Errorf("shutdown before completion: %w", ctx.Err())
239-
default:
240-
}
241-
if err := updateRepoIndexer(ctx, repo.ID); err != nil {
242-
if keepRetrying && ctx.Err() == nil {
243-
log.Warn("Retry to populate issue indexer for repo %d: %v", repo.ID, err)
244-
continue
245-
}
246-
return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err)
247-
}
248-
break
236+
if err := updateRepoIndexer(ctx, repo.ID); err != nil {
237+
return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err)
249238
}
250239
}
251240
}
@@ -259,8 +248,8 @@ func UpdateRepoIndexer(ctx context.Context, repoID int64) {
259248
}
260249

261250
// UpdateIssueIndexer add/update an issue to the issue indexer
262-
func UpdateIssueIndexer(issueID int64) {
263-
if err := updateIssueIndexer(issueID); err != nil {
251+
func UpdateIssueIndexer(ctx context.Context, issueID int64) {
252+
if err := updateIssueIndexer(ctx, issueID); err != nil {
264253
log.Error("Unable to push issue %d to issue indexer: %v", issueID, err)
265254
}
266255
}

modules/indexer/issues/util.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ func updateRepoIndexer(ctx context.Context, repoID int64) error {
127127
return fmt.Errorf("issue_model.GetIssueIDsByRepoID: %w", err)
128128
}
129129
for _, id := range ids {
130-
if err := updateIssueIndexer(id); err != nil {
130+
if err := updateIssueIndexer(ctx, id); err != nil {
131131
return err
132132
}
133133
}
134134
return nil
135135
}
136136

137-
func updateIssueIndexer(issueID int64) error {
138-
return pushIssueIndexerQueue(&IndexerMetadata{ID: issueID})
137+
func updateIssueIndexer(ctx context.Context, issueID int64) error {
138+
return pushIssueIndexerQueue(ctx, &IndexerMetadata{ID: issueID})
139139
}
140140

141141
func deleteRepoIssueIndexer(ctx context.Context, repoID int64) error {
@@ -148,26 +148,48 @@ func deleteRepoIssueIndexer(ctx context.Context, repoID int64) error {
148148
if len(ids) == 0 {
149149
return nil
150150
}
151-
return pushIssueIndexerQueue(&IndexerMetadata{
151+
return pushIssueIndexerQueue(ctx, &IndexerMetadata{
152152
IDs: ids,
153153
IsDelete: true,
154154
})
155155
}
156156

157-
func pushIssueIndexerQueue(data *IndexerMetadata) error {
157+
type keepRetryKey struct{}
158+
159+
// contextWithKeepRetry returns a context with a key indicating that the indexer should keep retrying.
160+
// Please note that it's for background tasks only, and it should not be used for user requests, or it may cause blocking.
161+
func contextWithKeepRetry(ctx context.Context) context.Context {
162+
return context.WithValue(ctx, keepRetryKey{}, true)
163+
}
164+
165+
func pushIssueIndexerQueue(ctx context.Context, data *IndexerMetadata) error {
158166
if issueIndexerQueue == nil {
159167
// Some unit tests will trigger indexing, but the queue is not initialized.
160168
// It's OK to ignore it, but log a warning message in case it's not a unit test.
161169
log.Warn("Trying to push %+v to issue indexer queue, but the queue is not initialized, it's OK if it's a unit test", data)
162170
return nil
163171
}
164172

165-
err := issueIndexerQueue.Push(data)
166-
if errors.Is(err, queue.ErrAlreadyInQueue) {
167-
return nil
168-
}
169-
if errors.Is(err, context.DeadlineExceeded) {
170-
log.Warn("It seems that issue indexer is slow and the queue is full. Please check the issue indexer or increase the queue size.")
173+
for {
174+
select {
175+
case <-ctx.Done():
176+
return ctx.Err()
177+
default:
178+
}
179+
err := issueIndexerQueue.Push(data)
180+
if errors.Is(err, queue.ErrAlreadyInQueue) {
181+
return nil
182+
}
183+
if errors.Is(err, context.DeadlineExceeded) { // the queue is full
184+
log.Warn("It seems that issue indexer is slow and the queue is full. Please check the issue indexer or increase the queue size.")
185+
if ctx.Value(keepRetryKey{}) == nil {
186+
return err
187+
}
188+
// It will be better to increase the queue size instead of retrying, but users may ignore the previous warning message.
189+
// However, even it retries, it may still cause index loss when there's a deadline in the context.
190+
log.Debug("Retry to push %+v to issue indexer queue", data)
191+
continue
192+
}
193+
return err
171194
}
172-
return err
173195
}

services/cron/tasks_extended.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func registerRebuildIssueIndexer() {
219219
RunAtStart: false,
220220
Schedule: "@annually",
221221
}, func(ctx context.Context, _ *user_model.User, config Config) error {
222-
return issue_indexer.PopulateIssueIndexer(ctx, false)
222+
return issue_indexer.PopulateIssueIndexer(ctx)
223223
})
224224
}
225225

services/indexer/notify.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,35 +36,35 @@ func (r *indexerNotifier) AdoptRepository(ctx context.Context, doer, u *user_mod
3636
func (r *indexerNotifier) CreateIssueComment(ctx context.Context, doer *user_model.User, repo *repo_model.Repository,
3737
issue *issues_model.Issue, comment *issues_model.Comment, mentions []*user_model.User,
3838
) {
39-
issue_indexer.UpdateIssueIndexer(issue.ID)
39+
issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
4040
}
4141

4242
func (r *indexerNotifier) NewIssue(ctx context.Context, issue *issues_model.Issue, mentions []*user_model.User) {
43-
issue_indexer.UpdateIssueIndexer(issue.ID)
43+
issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
4444
}
4545

4646
func (r *indexerNotifier) NewPullRequest(ctx context.Context, pr *issues_model.PullRequest, mentions []*user_model.User) {
4747
if err := pr.LoadIssue(ctx); err != nil {
4848
log.Error("LoadIssue: %v", err)
4949
return
5050
}
51-
issue_indexer.UpdateIssueIndexer(pr.Issue.ID)
51+
issue_indexer.UpdateIssueIndexer(ctx, pr.Issue.ID)
5252
}
5353

5454
func (r *indexerNotifier) UpdateComment(ctx context.Context, doer *user_model.User, c *issues_model.Comment, oldContent string) {
5555
if err := c.LoadIssue(ctx); err != nil {
5656
log.Error("LoadIssue: %v", err)
5757
return
5858
}
59-
issue_indexer.UpdateIssueIndexer(c.Issue.ID)
59+
issue_indexer.UpdateIssueIndexer(ctx, c.Issue.ID)
6060
}
6161

6262
func (r *indexerNotifier) DeleteComment(ctx context.Context, doer *user_model.User, comment *issues_model.Comment) {
6363
if err := comment.LoadIssue(ctx); err != nil {
6464
log.Error("LoadIssue: %v", err)
6565
return
6666
}
67-
issue_indexer.UpdateIssueIndexer(comment.Issue.ID)
67+
issue_indexer.UpdateIssueIndexer(ctx, comment.Issue.ID)
6868
}
6969

7070
func (r *indexerNotifier) DeleteRepository(ctx context.Context, doer *user_model.User, repo *repo_model.Repository) {
@@ -120,13 +120,13 @@ func (r *indexerNotifier) ChangeDefaultBranch(ctx context.Context, repo *repo_mo
120120
}
121121

122122
func (r *indexerNotifier) IssueChangeContent(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldContent string) {
123-
issue_indexer.UpdateIssueIndexer(issue.ID)
123+
issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
124124
}
125125

126126
func (r *indexerNotifier) IssueChangeTitle(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldTitle string) {
127-
issue_indexer.UpdateIssueIndexer(issue.ID)
127+
issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
128128
}
129129

130130
func (r *indexerNotifier) IssueChangeRef(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldRef string) {
131-
issue_indexer.UpdateIssueIndexer(issue.ID)
131+
issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
132132
}

tests/integration/issue_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package integration
55

66
import (
7+
"context"
78
"fmt"
89
"net/http"
910
"net/url"
@@ -99,7 +100,7 @@ func TestViewIssuesKeyword(t *testing.T) {
99100
RepoID: repo.ID,
100101
Index: 1,
101102
})
102-
issues.UpdateIssueIndexer(issue.ID)
103+
issues.UpdateIssueIndexer(context.Background(), issue.ID)
103104
time.Sleep(time.Second * 1)
104105
const keyword = "first"
105106
req := NewRequestf(t, "GET", "%s/issues?q=%s", repo.Link(), keyword)

0 commit comments

Comments
 (0)