Skip to content

Commit f964274

Browse files
committed
Ensure that Webhook tasks are not double delivered
When re-retrieving hook tasks from the DB double check if they have not been delivered in the meantime. Further ensure that tasks are marked as delivered when they are being delivered. In addition improve the error reporting and make sure that the webhook task population script runs in a separate goroutine. Signed-off-by: Andrew Thornton <[email protected]>
1 parent 88a03a6 commit f964274

File tree

3 files changed

+89
-31
lines changed

3 files changed

+89
-31
lines changed

models/webhook/hooktask.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,30 @@ func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask,
233233
return newTask, db.Insert(ctx, newTask)
234234
}
235235

236-
// FindUndeliveredHookTasks represents find the undelivered hook tasks
237-
func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) {
238-
tasks := make([]*HookTask, 0, 10)
236+
// FindUndeliveredHookTaskIDs will find the next 100 undelivered hook tasks with ID greater than the provided lowerID
237+
func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, error) {
238+
const batchSize = 100
239+
240+
tasks := make([]int64, 0, batchSize)
239241
return tasks, db.GetEngine(ctx).
242+
Select("id").
243+
Table(new(HookTask)).
240244
Where("is_delivered=?", false).
245+
And("id > ?", lowerID).
246+
Asc("id").
247+
Limit(batchSize).
241248
Find(&tasks)
242249
}
243250

251+
func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) {
252+
count, err := db.GetEngine(ctx).ID(task.ID).SetExpr("is_delivered = ?", true).Update(&HookTask{
253+
ID: task.ID,
254+
IsDelivered: true,
255+
})
256+
257+
return count != 0, err
258+
}
259+
244260
// CleanupHookTaskTable deletes rows from hook_task as needed.
245261
func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error {
246262
log.Trace("Doing: CleanupHookTaskTable")

services/webhook/deliver.go

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"code.gitea.io/gitea/modules/graceful"
2424
"code.gitea.io/gitea/modules/hostmatcher"
2525
"code.gitea.io/gitea/modules/log"
26+
"code.gitea.io/gitea/modules/process"
2627
"code.gitea.io/gitea/modules/proxy"
2728
"code.gitea.io/gitea/modules/queue"
2829
"code.gitea.io/gitea/modules/setting"
@@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
4344
return
4445
}
4546
// There was a panic whilst delivering a hook...
46-
log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
47+
log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
4748
}()
4849

4950
t.IsDelivered = true
@@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
5253

5354
switch w.HTTPMethod {
5455
case "":
55-
log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID)
56+
log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL)
5657
fallthrough
5758
case http.MethodPost:
5859
switch w.ContentType {
@@ -78,27 +79,27 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
7879
case http.MethodGet:
7980
u, err := url.Parse(w.URL)
8081
if err != nil {
81-
return err
82+
return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err)
8283
}
8384
vals := u.Query()
8485
vals["payload"] = []string{t.PayloadContent}
8586
u.RawQuery = vals.Encode()
8687
req, err = http.NewRequest("GET", u.String(), nil)
8788
if err != nil {
88-
return err
89+
return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err)
8990
}
9091
case http.MethodPut:
9192
switch w.Type {
9293
case webhook_model.MATRIX:
9394
req, err = getMatrixHookRequest(w, t)
9495
if err != nil {
95-
return err
96+
return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err)
9697
}
9798
default:
98-
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod)
99+
return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
99100
}
100101
default:
101-
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod)
102+
return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
102103
}
103104

104105
var signatureSHA1 string
@@ -170,18 +171,32 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
170171
}
171172
}()
172173

174+
// OK We're now ready to attempt to deliver the task - we must double check that it
175+
// has not been delivered in the meantime
176+
marked, err := webhook_model.MarkTaskDelivered(ctx, t)
177+
if err != nil {
178+
log.Error("MarkTaskDelivered[%d]: %v", err)
179+
return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err)
180+
}
181+
if !marked {
182+
// This webhook task has already been delivered
183+
log.Trace("Webhook Task[%d] already delivered", t.ID)
184+
return nil
185+
}
186+
173187
if setting.DisableWebhooks {
174188
return fmt.Errorf("webhook task skipped (webhooks disabled): [%d]", t.ID)
175189
}
176190

177191
if !w.IsActive {
192+
log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID)
178193
return nil
179194
}
180195

181196
resp, err := webhookHTTPClient.Do(req.WithContext(ctx))
182197
if err != nil {
183198
t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err)
184-
return err
199+
return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err)
185200
}
186201
defer resp.Body.Close()
187202

@@ -195,7 +210,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
195210
p, err := io.ReadAll(resp.Body)
196211
if err != nil {
197212
t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err)
198-
return err
213+
return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err)
199214
}
200215
t.ResponseInfo.Body = string(p)
201216
return nil
@@ -257,17 +272,37 @@ func Init() error {
257272
}
258273
go graceful.GetManager().RunWithShutdownFns(hookQueue.Run)
259274

260-
tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext())
261-
if err != nil {
262-
log.Error("FindUndeliveredHookTasks failed: %v", err)
263-
return err
264-
}
275+
go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue)
276+
277+
return nil
278+
}
279+
280+
func populateWebhookSendingQueue(ctx context.Context) {
281+
ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue")
282+
defer finished()
265283

266-
for _, task := range tasks {
267-
if err := enqueueHookTask(task); err != nil {
268-
log.Error("enqueueHookTask failed: %v", err)
284+
lowerID := int64(0)
285+
for {
286+
taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID)
287+
if err != nil {
288+
log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err)
289+
return
290+
}
291+
if len(taskIDs) == 0 {
292+
return
293+
}
294+
lowerID = taskIDs[len(taskIDs)-1]
295+
296+
for _, taskID := range taskIDs {
297+
select {
298+
case <-ctx.Done():
299+
log.Warn("Shutdown before Webhook Sending queue finishing being populated")
300+
return
301+
default:
302+
}
303+
if err := enqueueHookTask(taskID); err != nil {
304+
log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err)
305+
}
269306
}
270307
}
271-
272-
return nil
273308
}

services/webhook/webhook.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,19 +116,26 @@ func handle(data ...queue.Data) []queue.Data {
116116
for _, taskID := range data {
117117
task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64))
118118
if err != nil {
119-
log.Error("GetHookTaskByID failed: %v", err)
120-
} else {
121-
if err := Deliver(ctx, task); err != nil {
122-
log.Error("webhook.Deliver failed: %v", err)
123-
}
119+
log.Error("GetHookTaskByID[%d] failed: %v", taskID.(int64), err)
120+
continue
121+
}
122+
123+
if task.IsDelivered {
124+
// Already delivered in the meantime
125+
log.Trace("Task[%d] has already been delivered", task.ID)
126+
continue
127+
}
128+
129+
if err := Deliver(ctx, task); err != nil {
130+
log.Error("Unable to deliver webhook task[%d]: %v", task.ID, err)
124131
}
125132
}
126133

127134
return nil
128135
}
129136

130-
func enqueueHookTask(task *webhook_model.HookTask) error {
131-
err := hookQueue.PushFunc(task.ID, nil)
137+
func enqueueHookTask(taskID int64) error {
138+
err := hookQueue.Push(taskID)
132139
if err != nil && err != queue.ErrAlreadyInQueue {
133140
return err
134141
}
@@ -205,7 +212,7 @@ func PrepareWebhook(ctx context.Context, w *webhook_model.Webhook, event webhook
205212
return fmt.Errorf("CreateHookTask: %v", err)
206213
}
207214

208-
return enqueueHookTask(task)
215+
return enqueueHookTask(task.ID)
209216
}
210217

211218
// PrepareWebhooks adds new webhooks to task queue for given payload.
@@ -265,5 +272,5 @@ func ReplayHookTask(ctx context.Context, w *webhook_model.Webhook, uuid string)
265272
return err
266273
}
267274

268-
return enqueueHookTask(task)
275+
return enqueueHookTask(task.ID)
269276
}

0 commit comments

Comments
 (0)