Skip to content

[ws-manager] Properly stop workspaces that never make it to the RUNNING phase #5396

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions components/common-go/testing/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ var force = flag.Bool("force", false, "overwrite .golden files even if they alre

// FixtureTest is a test that is based on fixture and golden files. This is very convenient to test a largely variable surface with many variants.
type FixtureTest struct {
T *testing.T
Path string
Test FixtureTestFunc
Fixture func() interface{}
Gold func() interface{}
T *testing.T
Path string
GoldPath func(path string) string
Test FixtureTestFunc
Fixture func() interface{}
Gold func() interface{}
}

// FixtureTestFunc implements the actual fixture test
Expand Down Expand Up @@ -90,6 +91,9 @@ func (ft *FixtureTest) Run() {
}

goldenFilePath := fmt.Sprintf("%s.golden", strings.TrimSuffix(fn, filepath.Ext(fn)))
if ft.GoldPath != nil {
goldenFilePath = ft.GoldPath(fn)
}
if *update {
if _, err := os.Stat(goldenFilePath); *force || os.IsNotExist(err) {
err = os.WriteFile(goldenFilePath, actual, 0600)
Expand Down
50 changes: 36 additions & 14 deletions components/ws-manager/pkg/manager/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Monitor struct {
finalizerMap map[string]context.CancelFunc
finalizerMapLock sync.Mutex

act actingManager

OnError func(error)
}

Expand All @@ -97,6 +99,10 @@ func (m *Manager) CreateMonitor() (*Monitor, error) {
},
}
res.eventpool = workpool.NewEventWorkerPool(res.handleEvent)
res.act = struct {
*Monitor
*Manager
}{&res, m}

return &res, nil
}
Expand Down Expand Up @@ -202,7 +208,7 @@ func (m *Monitor) onPodEvent(evt watch.Event) error {
}()

m.writeEventTraceLog(status, wso)
err = m.actOnPodEvent(ctx, status, wso)
err = actOnPodEvent(ctx, m.act, status, wso)

// To make the tracing work though we have to re-sync with OnChange. But we don't want OnChange to block our event
// handling, thus we wait for it to finish in a Go routine.
Expand All @@ -216,7 +222,7 @@ func (m *Monitor) onPodEvent(evt watch.Event) error {

// actOnPodEvent performs actions when a kubernetes event comes in. For example we shut down failed workspaces or start
// polling the ready state of initializing ones.
func (m *Monitor) actOnPodEvent(ctx context.Context, status *api.WorkspaceStatus, wso *workspaceObjects) (err error) {
func actOnPodEvent(ctx context.Context, m actingManager, status *api.WorkspaceStatus, wso *workspaceObjects) (err error) {
pod := wso.Pod

span, ctx := tracing.FromContext(ctx, "actOnPodEvent")
Expand All @@ -231,9 +237,7 @@ func (m *Monitor) actOnPodEvent(ctx context.Context, status *api.WorkspaceStatus
if status.Phase == api.WorkspacePhase_STOPPING || status.Phase == api.WorkspacePhase_STOPPED {
// Beware: do not else-if this condition with the other phases as we don't want the stop
// login in any other phase, too.
m.initializerMapLock.Lock()
delete(m.initializerMap, pod.Name)
m.initializerMapLock.Unlock()
m.clearInitializerFromMap(pod.Name)

// Special case: workspaces timing out during backup. Normally a timed out workspace would just be stopped
// regularly. When a workspace times out during backup though, stopping it won't do any good.
Expand All @@ -246,7 +250,7 @@ func (m *Monitor) actOnPodEvent(ctx context.Context, status *api.WorkspaceStatus
return err
}

err = m.manager.markWorkspace(ctx, workspaceID, addMark(disposalStatusAnnotation, string(b)))
err = m.markWorkspace(ctx, workspaceID, addMark(disposalStatusAnnotation, string(b)))
if err != nil {
return err
}
Expand All @@ -264,7 +268,7 @@ func (m *Monitor) actOnPodEvent(ctx context.Context, status *api.WorkspaceStatus
if status.Conditions.Failed != "" && !hasFailureAnnotation {
// If this marking operation failes that's ok - we'll still continue to shut down the workspace.
// The failure message won't persist while stopping the workspace though.
err := m.manager.markWorkspace(ctx, workspaceID, addMark(workspaceFailedBeforeStoppingAnnotation, "true"))
err := m.markWorkspace(ctx, workspaceID, addMark(workspaceFailedBeforeStoppingAnnotation, "true"))
if err != nil {
log.WithFields(wsk8s.GetOWIFromObject(&pod.ObjectMeta)).WithError(err).Debug("cannot mark workspace as workspaceFailedBeforeStoppingAnnotation")
}
Expand All @@ -277,7 +281,7 @@ func (m *Monitor) actOnPodEvent(ctx context.Context, status *api.WorkspaceStatus
// The alternative is to stop the pod only when the workspaceFailedBeforeStoppingAnnotation is present.
// However, that's much more brittle than stopping the workspace twice (something that Kubernetes can handle).
// It is important that we do not fail here if the pod is already gone, i.e. when we lost the race.
err := m.manager.stopWorkspace(ctx, workspaceID, stopWorkspaceNormallyGracePeriod)
err := m.stopWorkspace(ctx, workspaceID, stopWorkspaceNormallyGracePeriod)
if err != nil && !isKubernetesObjNotFoundError(err) {
return xerrors.Errorf("cannot stop workspace: %w", err)
}
Expand All @@ -292,7 +296,7 @@ func (m *Monitor) actOnPodEvent(ctx context.Context, status *api.WorkspaceStatus

if err != nil {
// workspace initialization failed, which means the workspace as a whole failed
err = m.manager.markWorkspace(ctx, workspaceID, addMark(workspaceExplicitFailAnnotation, err.Error()))
err = m.markWorkspace(ctx, workspaceID, addMark(workspaceExplicitFailAnnotation, err.Error()))
if err != nil {
log.WithError(err).Warn("was unable to mark workspace as failed")
}
Expand All @@ -309,7 +313,7 @@ func (m *Monitor) actOnPodEvent(ctx context.Context, status *api.WorkspaceStatus

if err != nil {
// workspace initialization failed, which means the workspace as a whole failed
err = m.manager.markWorkspace(ctx, workspaceID, addMark(workspaceExplicitFailAnnotation, err.Error()))
err = m.markWorkspace(ctx, workspaceID, addMark(workspaceExplicitFailAnnotation, err.Error()))
if err != nil {
log.WithError(err).Warn("was unable to mark workspace as failed")
}
Expand All @@ -320,7 +324,7 @@ func (m *Monitor) actOnPodEvent(ctx context.Context, status *api.WorkspaceStatus
if status.Phase == api.WorkspacePhase_RUNNING {
// We need to register the finalizer before the pod is deleted (see https://book.kubebuilder.io/reference/using-finalizers.html).
// TODO (cw): Figure out if we can replace the "neverReady" flag.
err = m.manager.modifyFinalizer(ctx, workspaceID, gitpodFinalizerName, true)
err = m.modifyFinalizer(ctx, workspaceID, gitpodFinalizerName, true)
if err != nil {
return xerrors.Errorf("cannot add gitpod finalizer: %w", err)
}
Expand All @@ -330,7 +334,7 @@ func (m *Monitor) actOnPodEvent(ctx context.Context, status *api.WorkspaceStatus
//
// Also, in case the pod gets evicted we would not know the hostIP that pod ran on anymore.
// In preparation for those cases, we'll add it as an annotation.
err := m.manager.markWorkspace(ctx, workspaceID, deleteMark(wsk8s.TraceIDAnnotation), addMark(nodeNameAnnotation, wso.NodeName()))
err := m.markWorkspace(ctx, workspaceID, deleteMark(wsk8s.TraceIDAnnotation), addMark(nodeNameAnnotation, wso.NodeName()))
if err != nil {
log.WithError(err).Warn("was unable to remove traceID and/or add host IP annotation from/to workspace")
}
Expand All @@ -339,7 +343,7 @@ func (m *Monitor) actOnPodEvent(ctx context.Context, status *api.WorkspaceStatus
if status.Phase == api.WorkspacePhase_STOPPING {
if !isPodBeingDeleted(pod) {
// this might be the case if a headless workspace has just completed but has not been deleted by anyone, yet
err := m.manager.stopWorkspace(ctx, workspaceID, stopWorkspaceNormallyGracePeriod)
err := m.stopWorkspace(ctx, workspaceID, stopWorkspaceNormallyGracePeriod)
if err != nil && !isKubernetesObjNotFoundError(err) {
return xerrors.Errorf("cannot stop workspace: %w", err)
}
Expand Down Expand Up @@ -367,12 +371,30 @@ func (m *Monitor) actOnPodEvent(ctx context.Context, status *api.WorkspaceStatus

if status.Phase == api.WorkspacePhase_STOPPED {
// we've disposed already - try to remove the finalizer and call it a day
return m.manager.modifyFinalizer(ctx, workspaceID, gitpodFinalizerName, false)
return m.modifyFinalizer(ctx, workspaceID, gitpodFinalizerName, false)
}

return nil
}

// actingManager contains all functions needed by actOnPodEvent
type actingManager interface {
waitForWorkspaceReady(ctx context.Context, pod *corev1.Pod) (err error)
stopWorkspace(ctx context.Context, workspaceID string, gracePeriod time.Duration) (err error)
markWorkspace(ctx context.Context, workspaceID string, annotations ...*annotation) error

clearInitializerFromMap(podName string)
initializeWorkspaceContent(ctx context.Context, pod *corev1.Pod) (err error)
finalizeWorkspaceContent(ctx context.Context, wso *workspaceObjects)
modifyFinalizer(ctx context.Context, workspaceID string, finalizer string, add bool) error
}

func (m *Monitor) clearInitializerFromMap(podName string) {
m.initializerMapLock.Lock()
delete(m.initializerMap, podName)
m.initializerMapLock.Unlock()
}

// doHouskeeping is called regularly by the monitor and removes timed out or dangling workspaces/services
func (m *Monitor) doHousekeeping(ctx context.Context) {
span, ctx := tracing.FromContext(ctx, "doHousekeeping")
Expand Down
137 changes: 137 additions & 0 deletions components/ws-manager/pkg/manager/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package manager

import (
"context"
"fmt"
"path/filepath"
"strings"
"testing"
"time"

ctesting "github.com/gitpod-io/gitpod/common-go/testing"
corev1 "k8s.io/api/core/v1"
)

func TestActOnPodEvent(t *testing.T) {
type actOnPodEventResult struct {
Actions []actRecord `json:"actions"`
Error string `json:"error,omitempty"`
}

test := ctesting.FixtureTest{
T: t,
Path: "testdata/status_*.json",
GoldPath: func(fn string) string {
res := fmt.Sprintf("%s.golden", strings.TrimSuffix(fn, filepath.Ext(fn)))
res = strings.ReplaceAll(res, "/status_", "/actOnPodEvent_")
return res
},
Test: func(t *testing.T, input interface{}) interface{} {
fixture := input.(*workspaceObjects)
manager := Manager{}
status, serr := manager.getWorkspaceStatus(*fixture)
if serr != nil {
t.Skipf("skipping due to status computation error: %v", serr)
}

var rec actRecorder
err := actOnPodEvent(context.Background(), &rec, status, fixture)

result := actOnPodEventResult{Actions: rec.Records}
if err != nil {
result.Error = err.Error()
}
return &result
},
Fixture: func() interface{} { return &workspaceObjects{} },
Gold: func() interface{} { return &actOnPodEventResult{} },
}
test.Run()
}

type actRecord struct {
Func string
Params map[string]interface{}
}

type actRecorder struct {
Records []actRecord
}

func (r *actRecorder) waitForWorkspaceReady(ctx context.Context, pod *corev1.Pod) (err error) {
r.Records = append(r.Records, actRecord{
Func: "waitForWorkspaceReady",
Params: map[string]interface{}{
"pod": pod,
},
})
return nil
}

func (r *actRecorder) stopWorkspace(ctx context.Context, workspaceID string, gracePeriod time.Duration) (err error) {
r.Records = append(r.Records, actRecord{
Func: "stopWorkspace",
Params: map[string]interface{}{
"workspaceID": workspaceID,
"gracePeriod": gracePeriod,
},
})
return nil
}

func (r *actRecorder) markWorkspace(ctx context.Context, workspaceID string, annotations ...*annotation) error {
r.Records = append(r.Records, actRecord{
Func: "markWorkspace",
Params: map[string]interface{}{
"workspaceID": workspaceID,
"annotations": annotations,
},
})
return nil
}

func (r *actRecorder) clearInitializerFromMap(podName string) {
r.Records = append(r.Records, actRecord{
Func: "clearInitializerFromMap",
Params: map[string]interface{}{
"podName": podName,
},
})
}

func (r *actRecorder) initializeWorkspaceContent(ctx context.Context, pod *corev1.Pod) (err error) {
r.Records = append(r.Records, actRecord{
Func: "initializeWorkspaceContent",
Params: map[string]interface{}{
"pod": pod,
},
})
return nil
}

func (r *actRecorder) finalizeWorkspaceContent(ctx context.Context, wso *workspaceObjects) {
r.Records = append(r.Records, actRecord{
Func: "finalizeWorkspaceContent",
Params: map[string]interface{}{
"wso": wso,
},
})
}

func (r *actRecorder) modifyFinalizer(ctx context.Context, workspaceID string, finalizer string, add bool) error {
r.Records = append(r.Records, actRecord{
Func: "modifyFinalizer",
Params: map[string]interface{}{
"workspaceID": workspaceID,
"finalizer": finalizer,
"add": add,
},
})
return nil
}

var _ actingManager = &actRecorder{}
Loading