Skip to content

Commit c01221e

Browse files
zeripathlafriks
andauthored
Queue: Make WorkerPools and Queues flushable (#10001)
* Make WorkerPools and Queues flushable Adds Flush methods to Queues and the WorkerPool Further abstracts the WorkerPool Adds a final step to Flush the queues in the defer from PrintCurrentTest Fixes an issue with Settings inheritance in queues Signed-off-by: Andrew Thornton <[email protected]> * Change to for loop * Add IsEmpty and begin just making the queues composed WorkerPools * subsume workerpool into the queues and create a flushable interface * Add manager command * Move flushall to queue.Manager and add to testlogger * As per @guillep2k * as per @guillep2k * Just make queues all implement flushable and clean up the wrapped queue flushes * cope with no timeout Co-authored-by: Lauris BH <[email protected]>
1 parent 7c84dbc commit c01221e

27 files changed

+1030
-327
lines changed

cmd/manager.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright 2020 The Gitea Authors. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package cmd
6+
7+
import (
8+
"fmt"
9+
"net/http"
10+
"os"
11+
"time"
12+
13+
"code.gitea.io/gitea/modules/private"
14+
15+
"github.com/urfave/cli"
16+
)
17+
18+
var (
19+
// CmdManager represents the manager command
20+
CmdManager = cli.Command{
21+
Name: "manager",
22+
Usage: "Manage the running gitea process",
23+
Description: "This is a command for managing the running gitea process",
24+
Subcommands: []cli.Command{
25+
subcmdShutdown,
26+
subcmdRestart,
27+
subcmdFlushQueues,
28+
},
29+
}
30+
subcmdShutdown = cli.Command{
31+
Name: "shutdown",
32+
Usage: "Gracefully shutdown the running process",
33+
Action: runShutdown,
34+
}
35+
subcmdRestart = cli.Command{
36+
Name: "restart",
37+
Usage: "Gracefully restart the running process - (not implemented for windows servers)",
38+
Action: runRestart,
39+
}
40+
subcmdFlushQueues = cli.Command{
41+
Name: "flush-queues",
42+
Usage: "Flush queues in the running process",
43+
Action: runFlushQueues,
44+
Flags: []cli.Flag{
45+
cli.DurationFlag{
46+
Name: "timeout",
47+
Value: 60 * time.Second,
48+
Usage: "Timeout for the flushing process",
49+
},
50+
cli.BoolFlag{
51+
Name: "non-blocking",
52+
Usage: "Set to true to not wait for flush to complete before returning",
53+
},
54+
},
55+
}
56+
)
57+
58+
func runShutdown(c *cli.Context) error {
59+
setup("manager", false)
60+
statusCode, msg := private.Shutdown()
61+
switch statusCode {
62+
case http.StatusInternalServerError:
63+
fail("InternalServerError", msg)
64+
}
65+
66+
fmt.Fprintln(os.Stdout, msg)
67+
return nil
68+
}
69+
70+
func runRestart(c *cli.Context) error {
71+
setup("manager", false)
72+
statusCode, msg := private.Restart()
73+
switch statusCode {
74+
case http.StatusInternalServerError:
75+
fail("InternalServerError", msg)
76+
}
77+
78+
fmt.Fprintln(os.Stdout, msg)
79+
return nil
80+
}
81+
82+
func runFlushQueues(c *cli.Context) error {
83+
setup("manager", false)
84+
statusCode, msg := private.FlushQueues(c.Duration("timeout"), c.Bool("non-blocking"))
85+
switch statusCode {
86+
case http.StatusInternalServerError:
87+
fail("InternalServerError", msg)
88+
}
89+
90+
fmt.Fprintln(os.Stdout, msg)
91+
return nil
92+
}

integrations/testlogger.go

+6
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,18 @@
55
package integrations
66

77
import (
8+
"context"
89
"encoding/json"
910
"fmt"
1011
"os"
1112
"runtime"
1213
"strings"
1314
"sync"
1415
"testing"
16+
"time"
1517

1618
"code.gitea.io/gitea/modules/log"
19+
"code.gitea.io/gitea/modules/queue"
1720
)
1821

1922
var prefix string
@@ -98,6 +101,9 @@ func PrintCurrentTest(t testing.TB, skip ...int) func() {
98101
}
99102
writerCloser.setT(&t)
100103
return func() {
104+
if err := queue.GetManager().FlushAll(context.Background(), 20*time.Second); err != nil {
105+
t.Errorf("Flushing queues failed with error %v", err)
106+
}
101107
_ = writerCloser.Close()
102108
}
103109
}

main.go

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ arguments - which can alternatively be run by running the subcommand web.`
6969
cmd.CmdKeys,
7070
cmd.CmdConvert,
7171
cmd.CmdDoctor,
72+
cmd.CmdManager,
7273
}
7374
// Now adjust these commands to add our global configuration options
7475

modules/graceful/manager_unix.go

+31-15
Original file line numberDiff line numberDiff line change
@@ -110,36 +110,27 @@ func (g *Manager) handleSignals(ctx context.Context) {
110110
case sig := <-signalChannel:
111111
switch sig {
112112
case syscall.SIGHUP:
113-
if setting.GracefulRestartable {
114-
log.Info("PID: %d. Received SIGHUP. Forking...", pid)
115-
err := g.doFork()
116-
if err != nil && err.Error() != "another process already forked. Ignoring this one" {
117-
log.Error("Error whilst forking from PID: %d : %v", pid, err)
118-
}
119-
} else {
120-
log.Info("PID: %d. Received SIGHUP. Not set restartable. Shutting down...", pid)
121-
122-
g.doShutdown()
123-
}
113+
log.Info("PID: %d. Received SIGHUP. Attempting GracefulShutdown...", pid)
114+
g.DoGracefulShutdown()
124115
case syscall.SIGUSR1:
125116
log.Info("PID %d. Received SIGUSR1.", pid)
126117
case syscall.SIGUSR2:
127118
log.Warn("PID %d. Received SIGUSR2. Hammering...", pid)
128-
g.doHammerTime(0 * time.Second)
119+
g.DoImmediateHammer()
129120
case syscall.SIGINT:
130121
log.Warn("PID %d. Received SIGINT. Shutting down...", pid)
131-
g.doShutdown()
122+
g.DoGracefulShutdown()
132123
case syscall.SIGTERM:
133124
log.Warn("PID %d. Received SIGTERM. Shutting down...", pid)
134-
g.doShutdown()
125+
g.DoGracefulShutdown()
135126
case syscall.SIGTSTP:
136127
log.Info("PID %d. Received SIGTSTP.", pid)
137128
default:
138129
log.Info("PID %d. Received %v.", pid, sig)
139130
}
140131
case <-ctx.Done():
141132
log.Warn("PID: %d. Background context for manager closed - %v - Shutting down...", pid, ctx.Err())
142-
g.doShutdown()
133+
g.DoGracefulShutdown()
143134
}
144135
}
145136
}
@@ -160,6 +151,31 @@ func (g *Manager) doFork() error {
160151
return err
161152
}
162153

154+
// DoGracefulRestart causes a graceful restart
155+
func (g *Manager) DoGracefulRestart() {
156+
if setting.GracefulRestartable {
157+
log.Info("PID: %d. Forking...", os.Getpid())
158+
err := g.doFork()
159+
if err != nil && err.Error() != "another process already forked. Ignoring this one" {
160+
log.Error("Error whilst forking from PID: %d : %v", os.Getpid(), err)
161+
}
162+
} else {
163+
log.Info("PID: %d. Not set restartable. Shutting down...", os.Getpid())
164+
165+
g.doShutdown()
166+
}
167+
}
168+
169+
// DoImmediateHammer causes an immediate hammer
170+
func (g *Manager) DoImmediateHammer() {
171+
g.doHammerTime(0 * time.Second)
172+
}
173+
174+
// DoGracefulShutdown causes a graceful shutdown
175+
func (g *Manager) DoGracefulShutdown() {
176+
g.doShutdown()
177+
}
178+
163179
// RegisterServer registers the running of a listening server, in the case of unix this means that the parent process can now die.
164180
// Any call to RegisterServer must be matched by a call to ServerDone
165181
func (g *Manager) RegisterServer() {

modules/graceful/manager_windows.go

+28-5
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Manager struct {
4343
runningServerWaitGroup sync.WaitGroup
4444
createServerWaitGroup sync.WaitGroup
4545
terminateWaitGroup sync.WaitGroup
46+
shutdownRequested chan struct{}
4647
}
4748

4849
func newGracefulManager(ctx context.Context) *Manager {
@@ -62,6 +63,7 @@ func (g *Manager) start() {
6263
g.shutdown = make(chan struct{})
6364
g.hammer = make(chan struct{})
6465
g.done = make(chan struct{})
66+
g.shutdownRequested = make(chan struct{})
6567

6668
// Set the running state
6769
g.setState(stateRunning)
@@ -107,20 +109,23 @@ loop:
107109
for {
108110
select {
109111
case <-g.ctx.Done():
110-
g.doShutdown()
112+
g.DoGracefulShutdown()
113+
waitTime += setting.GracefulHammerTime
114+
break loop
115+
case <-g.shutdownRequested:
111116
waitTime += setting.GracefulHammerTime
112117
break loop
113118
case change := <-changes:
114119
switch change.Cmd {
115120
case svc.Interrogate:
116121
status <- change.CurrentStatus
117122
case svc.Stop, svc.Shutdown:
118-
g.doShutdown()
123+
g.DoGracefulShutdown()
119124
waitTime += setting.GracefulHammerTime
120125
break loop
121126
case hammerCode:
122-
g.doShutdown()
123-
g.doHammerTime(0 * time.Second)
127+
g.DoGracefulShutdown()
128+
g.DoImmediateHammer()
124129
break loop
125130
default:
126131
log.Debug("Unexpected control request: %v", change.Cmd)
@@ -140,7 +145,7 @@ hammerLoop:
140145
case svc.Interrogate:
141146
status <- change.CurrentStatus
142147
case svc.Stop, svc.Shutdown, hammerCmd:
143-
g.doHammerTime(0 * time.Second)
148+
g.DoImmediateHammer()
144149
break hammerLoop
145150
default:
146151
log.Debug("Unexpected control request: %v", change.Cmd)
@@ -152,6 +157,24 @@ hammerLoop:
152157
return false, 0
153158
}
154159

160+
// DoImmediateHammer causes an immediate hammer
161+
func (g *Manager) DoImmediateHammer() {
162+
g.doHammerTime(0 * time.Second)
163+
}
164+
165+
// DoGracefulShutdown causes a graceful shutdown
166+
func (g *Manager) DoGracefulShutdown() {
167+
g.lock.Lock()
168+
select {
169+
case <-g.shutdownRequested:
170+
g.lock.Unlock()
171+
default:
172+
close(g.shutdownRequested)
173+
g.lock.Unlock()
174+
g.doShutdown()
175+
}
176+
}
177+
155178
// RegisterServer registers the running of a listening server.
156179
// Any call to RegisterServer must be matched by a call to ServerDone
157180
func (g *Manager) RegisterServer() {

modules/private/manager.go

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright 2020 The Gitea Authors. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package private
6+
7+
import (
8+
"encoding/json"
9+
"fmt"
10+
"net/http"
11+
"time"
12+
13+
"code.gitea.io/gitea/modules/setting"
14+
)
15+
16+
// Shutdown calls the internal shutdown function
17+
func Shutdown() (int, string) {
18+
reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/shutdown")
19+
20+
req := newInternalRequest(reqURL, "POST")
21+
resp, err := req.Response()
22+
if err != nil {
23+
return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
24+
}
25+
defer resp.Body.Close()
26+
27+
if resp.StatusCode != http.StatusOK {
28+
return resp.StatusCode, decodeJSONError(resp).Err
29+
}
30+
31+
return http.StatusOK, "Shutting down"
32+
}
33+
34+
// Restart calls the internal restart function
35+
func Restart() (int, string) {
36+
reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/restart")
37+
38+
req := newInternalRequest(reqURL, "POST")
39+
resp, err := req.Response()
40+
if err != nil {
41+
return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
42+
}
43+
defer resp.Body.Close()
44+
45+
if resp.StatusCode != http.StatusOK {
46+
return resp.StatusCode, decodeJSONError(resp).Err
47+
}
48+
49+
return http.StatusOK, "Restarting"
50+
}
51+
52+
// FlushOptions represents the options for the flush call
53+
type FlushOptions struct {
54+
Timeout time.Duration
55+
NonBlocking bool
56+
}
57+
58+
// FlushQueues calls the internal flush-queues function
59+
func FlushQueues(timeout time.Duration, nonBlocking bool) (int, string) {
60+
reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/flush-queues")
61+
62+
req := newInternalRequest(reqURL, "POST")
63+
if timeout > 0 {
64+
req.SetTimeout(timeout+10*time.Second, timeout+10*time.Second)
65+
}
66+
req = req.Header("Content-Type", "application/json")
67+
jsonBytes, _ := json.Marshal(FlushOptions{
68+
Timeout: timeout,
69+
NonBlocking: nonBlocking,
70+
})
71+
req.Body(jsonBytes)
72+
resp, err := req.Response()
73+
if err != nil {
74+
return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
75+
}
76+
defer resp.Body.Close()
77+
78+
if resp.StatusCode != http.StatusOK {
79+
return resp.StatusCode, decodeJSONError(resp).Err
80+
}
81+
82+
return http.StatusOK, "Flushed"
83+
}

0 commit comments

Comments
 (0)