Skip to content

[supervisor] Terminate reparented processes during shutdown #2920

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions components/registry-facade/pkg/handover/handover.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package handover

import (
Expand Down
4 changes: 4 additions & 0 deletions components/registry-facade/pkg/handover/handover_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package handover_test

import (
Expand Down
2 changes: 1 addition & 1 deletion components/supervisor-api/go/info.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/supervisor-api/go/status.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

333 changes: 206 additions & 127 deletions components/supervisor-api/go/terminal.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion components/supervisor-api/go/terminal.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion components/supervisor-api/go/token.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/supervisor-api/go/token.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions components/supervisor-api/terminal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ service TerminalService {
}

message OpenTerminalRequest {
string workdir = 1;
map<string, string> env = 2;
map<string, string> annotations = 3;
}
message OpenTerminalResponse {
string alias = 1;
Expand All @@ -69,6 +71,10 @@ message ListTerminalsResponse {
string alias = 1;
repeated string command = 2;
string title = 3;
int64 pid = 4;
string initial_workdir = 5;
string current_workdir = 6;
map<string, string> annotations = 7;
}

repeated Terminal terminals = 1;
Expand Down
10 changes: 8 additions & 2 deletions components/supervisor/cmd/terminal-list.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"os"
"sort"
"strings"
"text/tabwriter"
"time"
Expand All @@ -34,9 +35,14 @@ var terminalListCmd = &cobra.Command{
tw := tabwriter.NewWriter(os.Stdout, 2, 4, 1, ' ', 0)
defer tw.Flush()

fmt.Fprintf(tw, "ALIAS\tCOMMAND\n")
fmt.Fprintf(tw, "ALIAS\tPID\tCOMMAND\tANNOTATIONS\n")
for _, term := range resp.Terminals {
fmt.Fprintf(tw, "%s\t%s\n", term.Alias, strings.Join(term.Command, " "))
annotations := make([]string, 0, len(term.Annotations))
for k, v := range term.Annotations {
annotations = append(annotations, fmt.Sprintf("%s=%s", k, v))
}
sort.Slice(annotations, func(i, j int) bool { return annotations[i] < annotations[j] })
fmt.Fprintf(tw, "%s\t%d\t%s\t%s\n", term.Alias, term.Pid, strings.Join(term.Command, " "), strings.Join(annotations, ","))
}
},
}
Expand Down
7 changes: 5 additions & 2 deletions components/supervisor/pkg/ports/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@ func (s *Subscription) Updates() <-chan []*api.PortsStatus {
}

// Run starts the port manager which keeps running until one of its observers stops.
func (pm *Manager) Run() {
ctx, cancel := context.WithCancel(context.Background())
func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
defer log.Debug("portManager shutdown")

ctx, cancel := context.WithCancel(ctx)
defer func() {
// We copy the subscriptions to a list prior to closing them, to prevent a data race
// between the map iteration and entry removal when closing the subscription.
Expand Down
14 changes: 6 additions & 8 deletions components/supervisor/pkg/ports/ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,11 @@ func TestPortsUpdateState(t *testing.T) {
return ioutil.NopCloser(nil), nil
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
pm.Run()
}()
go pm.Run(ctx, &wg)
sub, err := pm.Subscribe()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -607,12 +606,11 @@ func TestPortsConcurrentSubscribe(t *testing.T) {
return ioutil.NopCloser(nil), nil
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
pm.Run()
}()
go pm.Run(ctx, &wg)
go func() {
defer wg.Done()
defer close(config.Error)
Expand Down
121 changes: 104 additions & 17 deletions components/supervisor/pkg/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package supervisor

import (
"bufio"
"context"
"encoding/json"
"fmt"
Expand All @@ -14,6 +15,7 @@ import (
"os"
"os/exec"
"os/signal"
"path/filepath"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -169,20 +171,30 @@ func Run(options ...RunOption) {
}
apiServices = append(apiServices, additionalServices...)

// The reaper can be turned into a terminating reaper by writing true to this channel.
// When in terminating mode, the reaper will send SIGTERM to each child that gets reparented
// to us and is still running. We use this mechanism to send SIGTERM to a shell child processes
// that get reparented once their parent shell terminates during shutdown.
terminatingReaper := make(chan bool)
// We keep the reaper until the bitter end because:
// - it doesn't need graceful shutdown
// - we want to do as much work as possible (SIGTERM'ing reparented processes during shutdown).
go reaper(terminatingReaper)

var ideWG sync.WaitGroup
ideWG.Add(1)
go startAndWatchIDE(ctx, cfg, &ideWG, ideReady)

var wg sync.WaitGroup
wg.Add(6)
go reaper(ctx, &wg)
go startAndWatchIDE(ctx, cfg, &wg, ideReady)
wg.Add(4)
go startContentInit(ctx, cfg, &wg, cstate)
go startAPIEndpoint(ctx, cfg, &wg, apiServices, apiEndpointOpts...)
go taskManager.Run(ctx, &wg)
go func() {
defer wg.Done()
if cfg.isHeadless() {
return
}
portMgmt.Run()
}()

if !cfg.isHeadless() {
wg.Add(1)
go portMgmt.Run(ctx, &wg)
}

if cfg.PreventMetadataAccess {
go func() {
Expand All @@ -203,15 +215,21 @@ func Run(options ...RunOption) {
}

log.Info("received SIGTERM - tearing down")
terminatingReaper <- true
cancel()
err = termMux.Close()
if err != nil {
log.WithError(err).Error("terminal closure failed")
}

// terminate all child processes once the IDE is gone
ideWG.Wait()
terminateChildProcesses()

if !opts.InNamespace {
callDaemonTeardown()
}

cancel()
wg.Wait()
}

Expand Down Expand Up @@ -305,16 +323,17 @@ func hasMetadataAccess() bool {
return false
}

func reaper(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
func reaper(terminatingReaper <-chan bool) {
defer log.Debug("reaper shutdown")

var terminating bool
sigs := make(chan os.Signal, 128)
signal.Notify(sigs, syscall.SIGCHLD)
for {
select {
case <-ctx.Done():
return
case <-sigs:
case terminating = <-terminatingReaper:
continue
}

pid, err := unix.Wait4(-1, nil, 0, nil)
Expand All @@ -325,12 +344,33 @@ func reaper(ctx context.Context, wg *sync.WaitGroup) {
}
if err != nil {
log.WithField("pid", pid).WithError(err).Debug("cannot call waitpid() for re-parented child")
continue
}

if !terminating {
continue
}
proc, err := os.FindProcess(pid)
if err != nil {
log.WithField("pid", pid).WithError(err).Debug("cannot find re-parented process")
continue
}
err = proc.Signal(syscall.SIGTERM)
if err != nil {
if !strings.Contains(err.Error(), "os: process already finished") {
log.WithField("pid", pid).WithError(err).Debug("cannot send SIGTERM to re-parented process")
}

continue
}
log.WithField("pid", pid).Debug("SIGTERM'ed reparented child process")
}
}

func startAndWatchIDE(ctx context.Context, cfg *Config, wg *sync.WaitGroup, ideReady *ideReadyState) {
defer wg.Done()
defer log.Debug("startAndWatchIDE shutdown")

if cfg.isHeadless() {
ideReady.Set(true)
return
Expand Down Expand Up @@ -383,7 +423,7 @@ supervisorLoop:
}()

err = cmd.Wait()
if err != nil && !strings.Contains(err.Error(), "signal: interrupt") {
if err != nil && !(strings.Contains(err.Error(), "signal: interrupt") || strings.Contains(err.Error(), "wait: no child processes")) {
log.WithError(err).Warn("IDE was stopped")
}

Expand Down Expand Up @@ -411,7 +451,7 @@ supervisorLoop:
case <-ideStopped:
return
case <-time.After(timeBudgetIDEShutdown):
log.Error("IDE did not stop in time - sending SIGKILL")
log.WithField("timeBudgetIDEShutdown", timeBudgetIDEShutdown.String()).Error("IDE did not stop in time - sending SIGKILL")
cmd.Process.Signal(syscall.SIGKILL)
}
}
Expand Down Expand Up @@ -530,6 +570,7 @@ func isBlacklistedEnvvar(name string) bool {

func startAPIEndpoint(ctx context.Context, cfg *Config, wg *sync.WaitGroup, services []RegisterableService, opts ...grpc.ServerOption) {
defer wg.Done()
defer log.Debug("startAPIEndpoint shutdown")

l, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.APIEndpointPort))
if err != nil {
Expand Down Expand Up @@ -642,6 +683,52 @@ func startContentInit(ctx context.Context, cfg *Config, wg *sync.WaitGroup, cst
cst.MarkContentReady(src)
}

func terminateChildProcesses() {
ppid := strconv.Itoa(os.Getpid())
dirs, err := ioutil.ReadDir("/proc")
if err != nil {
log.WithError(err).Warn("cannot terminate child processes")
return
}
for _, d := range dirs {
pid, err := strconv.Atoi(d.Name())
if err != nil {
// not a PID
continue
}
proc, err := os.FindProcess(pid)
if err != nil {
continue
}

var isChild bool
f, err := os.Open(filepath.Join("/proc", d.Name(), "status"))
if err != nil {
continue
}
scan := bufio.NewScanner(f)
for scan.Scan() {
l := strings.TrimSpace(scan.Text())
if !strings.HasPrefix(l, "PPid:") {
continue
}

isChild = strings.HasSuffix(l, ppid)
break
}
if !isChild {
continue
}

err = proc.Signal(unix.SIGTERM)
if err != nil {
log.WithError(err).WithField("pid", pid).Warn("cannot terminate child processe")
continue
}
log.WithField("pid", pid).Debug("SIGTERM'ed child process")
}
}

func callDaemonTeardown() {
log.Info("asking ws-daemon to tear down this workspace")
ctx, cancel := context.WithTimeout(context.Background(), timeBudgetDaemonTeardown)
Expand Down
2 changes: 2 additions & 0 deletions components/supervisor/pkg/supervisor/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ func (tm *tasksManager) init(ctx context.Context) {

func (tm *tasksManager) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
defer log.Debug("tasksManager shutdown")

tm.init(ctx)

for _, t := range tm.tasks {
Expand Down
Loading