Skip to content

Commit 38fc6c7

Browse files
authored
Restart zero worker if there is still work to do (#18658) (#18672)
* Restart zero worker if there is still work to do (#18658) Backport #18658 It is possible for the zero worker to timeout before all the work is finished. This may mean that work may take a long time to complete because a worker will only be induced on repushing. Also ensure that requested count is reset after pulls and push mirror sync requests and add some more trace logging to the queue push. Fix #18607 Signed-off-by: Andrew Thornton <[email protected]> * Update modules/queue/workerpool.go
1 parent 8671602 commit 38fc6c7

File tree

2 files changed

+47
-10
lines changed

2 files changed

+47
-10
lines changed

modules/queue/workerpool.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,20 @@ func (p *WorkerPool) Push(data Data) {
8787
}
8888
}
8989

90+
// HasNoWorkerScaling will return true if the queue has no workers, and has no worker boosting
91+
func (p *WorkerPool) HasNoWorkerScaling() bool {
92+
p.lock.Lock()
93+
defer p.lock.Unlock()
94+
return p.hasNoWorkerScaling()
95+
}
96+
97+
func (p *WorkerPool) hasNoWorkerScaling() bool {
98+
return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0)
99+
}
100+
101+
// zeroBoost will add a temporary boost worker for a no worker queue
102+
// p.lock must be locked at the start of this function BUT it will be unlocked by the end of this function
103+
// (This is because addWorkers has to be called whilst unlocked)
90104
func (p *WorkerPool) zeroBoost() {
91105
ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
92106
mq := GetManager().GetManagedQueue(p.qid)
@@ -277,6 +291,21 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
277291
p.cond.Broadcast()
278292
cancel()
279293
}
294+
295+
select {
296+
case <-p.baseCtx.Done():
297+
// Don't warn if the baseCtx is shutdown
298+
default:
299+
if p.hasNoWorkerScaling() {
300+
log.Warn(
301+
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.", p.qid)
302+
} else if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
303+
// OK there are no workers but... there's still work to be done -> Reboost
304+
p.zeroBoost()
305+
// p.lock will be unlocked by zeroBoost
306+
return
307+
}
308+
}
280309
p.lock.Unlock()
281310
}()
282311
}

services/mirror/mirror.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,13 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
5959

6060
handler := func(idx int, bean interface{}, limit int) error {
6161
var item SyncRequest
62+
var repo *repo_model.Repository
6263
if m, ok := bean.(*repo_model.Mirror); ok {
6364
if m.Repo == nil {
6465
log.Error("Disconnected mirror found: %d", m.ID)
6566
return nil
6667
}
68+
repo = m.Repo
6769
item = SyncRequest{
6870
Type: PullMirrorType,
6971
RepoID: m.RepoID,
@@ -73,6 +75,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
7375
log.Error("Disconnected push-mirror found: %d", m.ID)
7476
return nil
7577
}
78+
repo = m.Repo
7679
item = SyncRequest{
7780
Type: PushMirrorType,
7881
RepoID: m.RepoID,
@@ -89,17 +92,16 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
8992
default:
9093
}
9194

92-
// Check if this request is already in the queue
93-
has, err := mirrorQueue.Has(&item)
94-
if err != nil {
95-
return err
96-
}
97-
if has {
98-
return nil
99-
}
100-
10195
// Push to the Queue
10296
if err := mirrorQueue.Push(&item); err != nil {
97+
if err == queue.ErrAlreadyInQueue {
98+
if item.Type == PushMirrorType {
99+
log.Trace("PushMirrors for %-v already queued for sync", repo)
100+
} else {
101+
log.Trace("PullMirrors for %-v already queued for sync", repo)
102+
}
103+
return nil
104+
}
103105
return err
104106
}
105107

@@ -110,23 +112,29 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
110112
return nil
111113
}
112114

115+
pullMirrorsRequested := 0
113116
if pullLimit != 0 {
117+
requested = 0
114118
if err := repo_model.MirrorsIterate(func(idx int, bean interface{}) error {
115119
return handler(idx, bean, pullLimit)
116120
}); err != nil && err != errLimit {
117121
log.Error("MirrorsIterate: %v", err)
118122
return err
119123
}
124+
pullMirrorsRequested, requested = requested, 0
120125
}
126+
pushMirrorsRequested := 0
121127
if pushLimit != 0 {
128+
requested = 0
122129
if err := repo_model.PushMirrorsIterate(func(idx int, bean interface{}) error {
123130
return handler(idx, bean, pushLimit)
124131
}); err != nil && err != errLimit {
125132
log.Error("PushMirrorsIterate: %v", err)
126133
return err
127134
}
135+
pushMirrorsRequested, requested = requested, 0
128136
}
129-
log.Trace("Finished: Update")
137+
log.Trace("Finished: Update: %d pull mirrors and %d push mirrors queued", pullMirrorsRequested, pushMirrorsRequested)
130138
return nil
131139
}
132140

0 commit comments

Comments
 (0)