Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions test/e2e/appwrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -306,6 +307,30 @@ var _ = Describe("AppWrapper E2E Test", func() {
})
})

Describe("Autopilot Job Migration", Label("slow"), Label("Kueue", "Standalone"), func() {
It("A running job is migrated away from an unhealthy node", func() {
aw := createAppWrapper(ctx, autopilotjob(200, 1))
appwrappers = append(appwrappers, aw)
awName := types.NamespacedName{Name: aw.Name, Namespace: aw.Namespace}
By("workload is running")
Expect(waitAWPodsReady(ctx, aw)).Should(Succeed())
By("node is labeled by autopilot")
nodeName, err := getNodeForAppwrapper(ctx, awName)
Expect(err).ShouldNot(HaveOccurred())
DeferCleanup(func() {
err := updateNode(ctx, nodeName, func(n *v1.Node) { delete(n.Labels, "autopilot.ibm.com/gpuhealth") })
Expect(err).ShouldNot(HaveOccurred())
})
err = updateNode(ctx, nodeName, func(n *v1.Node) { n.Labels["autopilot.ibm.com/gpuhealth"] = "ERR" })
Expect(err).ShouldNot(HaveOccurred())
By("workload is reset")
Eventually(AppWrapperPhase(ctx, aw), 120*time.Second).Should(Equal(workloadv1beta2.AppWrapperResetting))
By("workload is running again")
Eventually(AppWrapperPhase(ctx, aw), 120*time.Second).Should(Equal(workloadv1beta2.AppWrapperRunning))
Expect(waitAWPodsReady(ctx, aw)).Should(Succeed())
})
})

Describe("Load Testing", Label("slow"), Label("Kueue", "Standalone"), func() {
It("Create 50 AppWrappers", func() {
const (
Expand Down
41 changes: 41 additions & 0 deletions test/e2e/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,44 @@ func jobSet(replicasWorker int, milliCPUWorker int64) workloadv1beta2.AppWrapper
Template: runtime.RawExtension{Raw: jsonBytes},
}
}

const autopilotJobYAML = `
apiVersion: batch/v1
kind: Job
metadata:
generateName: %v
spec:
template:
spec:
restartPolicy: Never
terminationGracePeriodSeconds: 0
containers:
- name: busybox
image: quay.io/project-codeflare/busybox:1.36
command: ["sh", "-c", "sleep 600"]
resources:
requests:
cpu: %v
nvidia.com/gpu: %v
limits:
nvidia.com/gpu: %v
`

func autopilotjob(milliCPU int64, gpus int64) workloadv1beta2.AppWrapperComponent {
yamlString := fmt.Sprintf(autopilotJobYAML,
"apjob-",
resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
resource.NewQuantity(gpus, resource.DecimalSI),
resource.NewQuantity(gpus, resource.DecimalSI))

jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
Expect(err).NotTo(HaveOccurred())
return workloadv1beta2.AppWrapperComponent{
Annotations: map[string]string{
workloadv1beta2.RetryPausePeriodDurationAnnotation: "5s",
workloadv1beta2.FailureGracePeriodDurationAnnotation: "5s",
},
DeclaredPodSets: []workloadv1beta2.AppWrapperPodSet{{Path: "template.spec.template"}},
Template: runtime.RawExtension{Raw: jsonBytes},
}
}
38 changes: 35 additions & 3 deletions test/e2e/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package e2e

import (
"context"
"fmt"
"time"

// . "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -124,10 +125,11 @@ func ensureTestQueuesExist(ctx context.Context) {
Spec: kueue.ClusterQueueSpec{
NamespaceSelector: &metav1.LabelSelector{},
ResourceGroups: []kueue.ResourceGroup{{
CoveredResources: []v1.ResourceName{v1.ResourceCPU},
CoveredResources: []v1.ResourceName{v1.ResourceCPU, "nvidia.com/gpu"},
Flavors: []kueue.FlavorQuotas{{
Name: testFlavorName,
Resources: []kueue.ResourceQuota{{Name: v1.ResourceCPU, NominalQuota: *resource.NewMilliQuantity(2000, resource.DecimalSI)}},
Name: testFlavorName,
Resources: []kueue.ResourceQuota{{Name: v1.ResourceCPU, NominalQuota: *resource.NewMilliQuantity(2000, resource.DecimalSI)},
{Name: "nvidia.com/gpu", NominalQuota: *resource.NewQuantity(2, resource.DecimalSI)}},
}},
},
},
Expand Down Expand Up @@ -207,6 +209,36 @@ func getAppWrapper(ctx context.Context, awName types.NamespacedName) *workloadv1
return aw
}

func getNodeForAppwrapper(ctx context.Context, awName types.NamespacedName) (string, error) {
podList := &v1.PodList{}
err := getClient(ctx).List(ctx, podList, &client.ListOptions{Namespace: awName.Namespace})
if err != nil {
return "", err
}
for _, pod := range podList.Items {
if awn, found := pod.Labels[appwrapper.AppWrapperLabel]; found && awn == awName.Name {
return pod.Spec.NodeName, nil
}
}
return "", fmt.Errorf("No pods found for %v", awName)
}

func updateNode(ctx context.Context, nodeName string, update func(*v1.Node)) error {
for {
node := &v1.Node{}
err := getClient(ctx).Get(ctx, types.NamespacedName{Name: nodeName}, node)
Expect(err).NotTo(HaveOccurred())
update(node)
err = getClient(ctx).Update(ctx, node)
if err == nil {
return nil
}
if !apierrors.IsConflict(err) {
return err
}
}
}

func podsInPhase(awNamespace string, awName string, phase []v1.PodPhase, minimumPodCount int32) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
podList := &v1.PodList{}
Expand Down