Skip to content

Commit 408ae76

Browse files
author
Christian Weichel
committed
[supervisor] Make reaper terminating during shutdown
Fixes #2654
1 parent 812c98e commit 408ae76

File tree

5 files changed

+120
-30
lines changed

5 files changed

+120
-30
lines changed

components/supervisor/pkg/ports/ports.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,11 @@ func (s *Subscription) Updates() <-chan []*api.PortsStatus {
103103
}
104104

105105
// Run starts the port manager which keeps running until one of its observers stops.
106-
func (pm *Manager) Run() {
107-
ctx, cancel := context.WithCancel(context.Background())
106+
func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) {
107+
defer wg.Done()
108+
defer log.Debug("portManager shutdown")
109+
110+
ctx, cancel := context.WithCancel(ctx)
108111
defer func() {
109112
// We copy the subscriptions to a list prior to closing them, to prevent a data race
110113
// between the map iteration and entry removal when closing the subscription.

components/supervisor/pkg/ports/ports_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -471,12 +471,11 @@ func TestPortsUpdateState(t *testing.T) {
471471
return ioutil.NopCloser(nil), nil
472472
}
473473

474+
ctx, cancel := context.WithCancel(context.Background())
475+
defer cancel()
474476
var wg sync.WaitGroup
475477
wg.Add(3)
476-
go func() {
477-
defer wg.Done()
478-
pm.Run()
479-
}()
478+
go pm.Run(ctx, &wg)
480479
sub, err := pm.Subscribe()
481480
if err != nil {
482481
t.Fatal(err)
@@ -607,12 +606,11 @@ func TestPortsConcurrentSubscribe(t *testing.T) {
607606
return ioutil.NopCloser(nil), nil
608607
}
609608

609+
ctx, cancel := context.WithCancel(context.Background())
610+
defer cancel()
610611
var wg sync.WaitGroup
611612
wg.Add(2)
612-
go func() {
613-
defer wg.Done()
614-
pm.Run()
615-
}()
613+
go pm.Run(ctx, &wg)
616614
go func() {
617615
defer wg.Done()
618616
defer close(config.Error)

components/supervisor/pkg/supervisor/supervisor.go

Lines changed: 104 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package supervisor
66

77
import (
8+
"bufio"
89
"context"
910
"encoding/json"
1011
"fmt"
@@ -14,6 +15,7 @@ import (
1415
"os"
1516
"os/exec"
1617
"os/signal"
18+
"path/filepath"
1719
"runtime"
1820
"strconv"
1921
"strings"
@@ -169,20 +171,30 @@ func Run(options ...RunOption) {
169171
}
170172
apiServices = append(apiServices, additionalServices...)
171173

174+
// The reaper can be turned into a terminating reaper by writing true to this channel.
175+
// When in terminating mode, the reaper will send SIGTERM to each child that gets reparented
176+
// to us and is still running. We use this mechanism to send SIGTERM to a shell child processes
177+
// that get reparented once their parent shell terminates during shutdown.
178+
terminatingReaper := make(chan bool)
179+
// We keep the reaper until the bitter end because:
180+
// - it doesn't need graceful shutdown
181+
// - we want to do as much work as possible (SIGTERM'ing reparented processes during shutdown).
182+
go reaper(terminatingReaper)
183+
184+
var ideWG sync.WaitGroup
185+
ideWG.Add(1)
186+
go startAndWatchIDE(ctx, cfg, &ideWG, ideReady)
187+
172188
var wg sync.WaitGroup
173-
wg.Add(6)
174-
go reaper(ctx, &wg)
175-
go startAndWatchIDE(ctx, cfg, &wg, ideReady)
189+
wg.Add(4)
176190
go startContentInit(ctx, cfg, &wg, cstate)
177191
go startAPIEndpoint(ctx, cfg, &wg, apiServices, apiEndpointOpts...)
178192
go taskManager.Run(ctx, &wg)
179-
go func() {
180-
defer wg.Done()
181-
if cfg.isHeadless() {
182-
return
183-
}
184-
portMgmt.Run()
185-
}()
193+
194+
if !cfg.isHeadless() {
195+
wg.Add(1)
196+
go portMgmt.Run(ctx, &wg)
197+
}
186198

187199
if cfg.PreventMetadataAccess {
188200
go func() {
@@ -203,15 +215,21 @@ func Run(options ...RunOption) {
203215
}
204216

205217
log.Info("received SIGTERM - tearing down")
218+
terminatingReaper <- true
219+
cancel()
206220
err = termMux.Close()
207221
if err != nil {
208222
log.WithError(err).Error("terminal closure failed")
209223
}
224+
225+
// terminate all child processes once the IDE is gone
226+
ideWG.Wait()
227+
terminateChildProcesses()
228+
210229
if !opts.InNamespace {
211230
callDaemonTeardown()
212231
}
213232

214-
cancel()
215233
wg.Wait()
216234
}
217235

@@ -305,16 +323,17 @@ func hasMetadataAccess() bool {
305323
return false
306324
}
307325

308-
func reaper(ctx context.Context, wg *sync.WaitGroup) {
309-
defer wg.Done()
326+
func reaper(terminatingReaper <-chan bool) {
327+
defer log.Debug("reaper shutdown")
310328

329+
var terminating bool
311330
sigs := make(chan os.Signal, 128)
312331
signal.Notify(sigs, syscall.SIGCHLD)
313332
for {
314333
select {
315-
case <-ctx.Done():
316-
return
317334
case <-sigs:
335+
case terminating = <-terminatingReaper:
336+
continue
318337
}
319338

320339
pid, err := unix.Wait4(-1, nil, 0, nil)
@@ -325,12 +344,33 @@ func reaper(ctx context.Context, wg *sync.WaitGroup) {
325344
}
326345
if err != nil {
327346
log.WithField("pid", pid).WithError(err).Debug("cannot call waitpid() for re-parented child")
347+
continue
348+
}
349+
350+
if !terminating {
351+
continue
352+
}
353+
proc, err := os.FindProcess(pid)
354+
if err != nil {
355+
log.WithField("pid", pid).WithError(err).Debug("cannot find re-parented process")
356+
continue
328357
}
358+
err = proc.Signal(syscall.SIGTERM)
359+
if err != nil {
360+
if !strings.Contains(err.Error(), "os: process already finished") {
361+
log.WithField("pid", pid).WithError(err).Debug("cannot send SIGTERM to re-parented process")
362+
}
363+
364+
continue
365+
}
366+
log.WithField("pid", pid).Debug("SIGTERM'ed reparented child process")
329367
}
330368
}
331369

332370
func startAndWatchIDE(ctx context.Context, cfg *Config, wg *sync.WaitGroup, ideReady *ideReadyState) {
333371
defer wg.Done()
372+
defer log.Debug("startAndWatchIDE shutdown")
373+
334374
if cfg.isHeadless() {
335375
ideReady.Set(true)
336376
return
@@ -383,7 +423,7 @@ supervisorLoop:
383423
}()
384424

385425
err = cmd.Wait()
386-
if err != nil && !strings.Contains(err.Error(), "signal: interrupt") {
426+
if err != nil && !(strings.Contains(err.Error(), "signal: interrupt") || strings.Contains(err.Error(), "wait: no child processes")) {
387427
log.WithError(err).Warn("IDE was stopped")
388428
}
389429

@@ -411,7 +451,7 @@ supervisorLoop:
411451
case <-ideStopped:
412452
return
413453
case <-time.After(timeBudgetIDEShutdown):
414-
log.Error("IDE did not stop in time - sending SIGKILL")
454+
log.WithField("timeBudgetIDEShutdown", timeBudgetIDEShutdown.String()).Error("IDE did not stop in time - sending SIGKILL")
415455
cmd.Process.Signal(syscall.SIGKILL)
416456
}
417457
}
@@ -530,6 +570,7 @@ func isBlacklistedEnvvar(name string) bool {
530570

531571
func startAPIEndpoint(ctx context.Context, cfg *Config, wg *sync.WaitGroup, services []RegisterableService, opts ...grpc.ServerOption) {
532572
defer wg.Done()
573+
defer log.Debug("startAPIEndpoint shutdown")
533574

534575
l, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.APIEndpointPort))
535576
if err != nil {
@@ -642,6 +683,52 @@ func startContentInit(ctx context.Context, cfg *Config, wg *sync.WaitGroup, cst
642683
cst.MarkContentReady(src)
643684
}
644685

686+
func terminateChildProcesses() {
687+
ppid := strconv.Itoa(os.Getpid())
688+
dirs, err := ioutil.ReadDir("/proc")
689+
if err != nil {
690+
log.WithError(err).Warn("cannot terminate child processes")
691+
return
692+
}
693+
for _, d := range dirs {
694+
pid, err := strconv.Atoi(d.Name())
695+
if err != nil {
696+
// not a PID
697+
continue
698+
}
699+
proc, err := os.FindProcess(pid)
700+
if err != nil {
701+
continue
702+
}
703+
704+
var isChild bool
705+
f, err := os.Open(filepath.Join("/proc", d.Name(), "status"))
706+
if err != nil {
707+
continue
708+
}
709+
scan := bufio.NewScanner(f)
710+
for scan.Scan() {
711+
l := strings.TrimSpace(scan.Text())
712+
if !strings.HasPrefix(l, "PPid:") {
713+
continue
714+
}
715+
716+
isChild = strings.HasSuffix(l, ppid)
717+
break
718+
}
719+
if !isChild {
720+
continue
721+
}
722+
723+
err = proc.Signal(unix.SIGTERM)
724+
if err != nil {
725+
log.WithError(err).WithField("pid", pid).Warn("cannot terminate child processe")
726+
continue
727+
}
728+
log.WithField("pid", pid).Debug("SIGTERM'ed child process")
729+
}
730+
}
731+
645732
func callDaemonTeardown() {
646733
log.Info("asking ws-daemon to tear down this workspace")
647734
ctx, cancel := context.WithTimeout(context.Background(), timeBudgetDaemonTeardown)

components/supervisor/pkg/supervisor/tasks.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ func (tm *tasksManager) init(ctx context.Context) {
200200

201201
func (tm *tasksManager) Run(ctx context.Context, wg *sync.WaitGroup) {
202202
defer wg.Done()
203+
defer log.Debug("tasksManager shutdown")
204+
203205
tm.init(ctx)
204206

205207
for _, t := range tm.tasks {

components/supervisor/pkg/terminal/terminal.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ func (m *Mux) Close() error {
8585
var err error
8686
for k := range m.terms {
8787
cerr := m.doClose(k, closeTerminaldefaultGracePeriod)
88-
if cerr == nil {
89-
log.WithError(err).WithField("alias", k).Warn("cannot properly close terminal")
88+
if cerr != nil {
89+
log.WithError(cerr).WithField("alias", k).Warn("cannot properly close terminal")
9090
if err != nil {
9191
err = cerr
9292
}
@@ -142,7 +142,7 @@ func gracefullyShutdownProcess(p *os.Process, gracePeriod time.Duration) error {
142142
return p.Kill()
143143
}
144144

145-
err := p.Signal(unix.SIGINT)
145+
err := p.Signal(unix.SIGTERM)
146146
if err != nil {
147147
return err
148148
}

0 commit comments

Comments
 (0)