Skip to content

Commit 13afd9b

Browse files
committed
Add Ray Cluster Upgrade test for Ray Job Long Running scenarios
1 parent 9a8603d commit 13afd9b

File tree

2 files changed

+245
-0
lines changed

2 files changed

+245
-0
lines changed

tests/e2e/mnist_sleep.py

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Copyright 2022 IBM, Red Hat
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import time
16+
import torch
17+
import torch.nn as nn
18+
from torch.utils.data import DataLoader
19+
from torchvision import datasets, transforms
20+
21+
# Define a simple neural network
22+
class NeuralNetwork(nn.Module):
23+
def __init__(self):
24+
super(NeuralNetwork, self).__init__()
25+
self.flatten = nn.Flatten()
26+
self.linear_relu_stack = nn.Sequential(
27+
nn.Linear(28*28, 512),
28+
nn.ReLU(),
29+
nn.Linear(512, 512),
30+
nn.ReLU(),
31+
nn.Linear(512, 10),
32+
)
33+
34+
def forward(self, x):
35+
x = self.flatten(x)
36+
logits = self.linear_relu_stack(x)
37+
return logits
38+
39+
# Define the training function
40+
def train():
41+
# Sleeping for 24 hours for upgrade test scenario
42+
print("Sleeping for 24 hours before starting the training for upgrade testing...")
43+
time.sleep(24 * 60 * 60)
44+
45+
# Load dataset
46+
transform = transforms.Compose([transforms.ToTensor()])
47+
train_dataset = datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
48+
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
49+
50+
# Initialize the neural network, loss function, and optimizer
51+
model = NeuralNetwork()
52+
criterion = nn.CrossEntropyLoss()
53+
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
54+
55+
# Train the model
56+
num_epochs = 3
57+
for epoch in range(num_epochs):
58+
for inputs, labels in train_loader:
59+
optimizer.zero_grad()
60+
outputs = model(inputs)
61+
loss = criterion(outputs, labels)
62+
loss.backward()
63+
optimizer.step()
64+
print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")
65+
66+
if __name__ == "__main__":
67+
train()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
import requests
2+
from time import sleep
3+
4+
from codeflare_sdk import (
5+
Cluster,
6+
ClusterConfiguration,
7+
TokenAuthentication,
8+
get_cluster,
9+
)
10+
from codeflare_sdk.job import RayJobClient
11+
12+
from tests.e2e.support import *
13+
14+
15+
from codeflare_sdk.utils.kube_api_helpers import _kube_api_error_handling
16+
17+
namespace = "test-ns-rayupgrade-sleep"
18+
# Global variables for kueue resources
19+
cluster_queue = "cluster-queue-mnist"
20+
flavor = "default-flavor-mnist"
21+
local_queue = "local-queue-mnist"
22+
23+
24+
# Creates a Ray cluster , submit RayJob mnist script long running
25+
class TestSetupSleepRayJob:
26+
def setup_method(self):
27+
initialize_kubernetes_client(self)
28+
create_namespace_with_name(self, namespace)
29+
try:
30+
create_cluster_queue(self, cluster_queue, flavor)
31+
create_resource_flavor(self, flavor)
32+
create_local_queue(self, cluster_queue, local_queue)
33+
except Exception as e:
34+
delete_namespace(self)
35+
delete_kueue_resources(self)
36+
return _kube_api_error_handling(e)
37+
38+
def test_mnist_ray_cluster_sdk_auth(self):
39+
self.run_mnist_raycluster_sdk_oauth()
40+
41+
def run_mnist_raycluster_sdk_oauth(self):
42+
ray_image = get_ray_image()
43+
44+
auth = TokenAuthentication(
45+
token=run_oc_command(["whoami", "--show-token=true"]),
46+
server=run_oc_command(["whoami", "--show-server=true"]),
47+
skip_tls=True,
48+
)
49+
auth.login()
50+
51+
cluster = Cluster(
52+
ClusterConfiguration(
53+
name="mnist",
54+
namespace=self.namespace,
55+
num_workers=1,
56+
head_cpus=1,
57+
head_memory=4,
58+
worker_cpu_requests=1,
59+
worker_cpu_limits=1,
60+
worker_memory_requests=4,
61+
worker_memory_limits=4,
62+
image=ray_image,
63+
write_to_file=True,
64+
verify_tls=False,
65+
)
66+
)
67+
68+
try:
69+
cluster.up()
70+
cluster.status()
71+
# wait for raycluster to be Ready
72+
cluster.wait_ready()
73+
cluster.status()
74+
# Check cluster details
75+
cluster.details()
76+
# Assert the cluster status is READY
77+
_, ready = cluster.status()
78+
assert ready
79+
submission_id = self.assert_jobsubmit()
80+
print(f"Job submitted successfully, job submission id: ", submission_id)
81+
82+
except Exception as e:
83+
print(f"An unexpected error occurred. Error: ", e)
84+
delete_namespace(self)
85+
delete_kueue_resources(self)
86+
assert False, "Cluster is not ready!"
87+
88+
def assert_jobsubmit(self):
89+
auth_token = run_oc_command(["whoami", "--show-token=true"])
90+
cluster = get_cluster("mnist", namespace)
91+
cluster.details()
92+
ray_dashboard = cluster.cluster_dashboard_uri()
93+
header = {"Authorization": f"Bearer {auth_token}"}
94+
client = RayJobClient(address=ray_dashboard, headers=header, verify=False)
95+
96+
# Submit the job
97+
submission_id = client.submit_job(
98+
entrypoint="python mnist_sleep.py",
99+
runtime_env={
100+
"working_dir": "./tests/e2e/",
101+
"pip": {
102+
"packages": ["torchvision==0.12.0"],
103+
"pip_check": False
104+
},
105+
"env_vars": get_setup_env_variables(),
106+
},
107+
)
108+
print(f"Submitted job with ID: {submission_id}")
109+
done = False
110+
time = 0
111+
timeout = 180
112+
while not done:
113+
status = client.get_job_status(submission_id)
114+
if status.is_terminal():
115+
break
116+
if status == "RUNNING":
117+
print(f"Job is Running: '{status}'")
118+
assert True
119+
break
120+
if not done:
121+
print(status)
122+
if timeout and time >= timeout:
123+
raise TimeoutError(f"job has timed out after waiting {timeout}s")
124+
sleep(5)
125+
time += 5
126+
127+
logs = client.get_job_logs(submission_id)
128+
print(logs)
129+
return submission_id
130+
131+
132+
class TestVerifySleepRayJobRunning:
133+
def setup_method(self):
134+
initialize_kubernetes_client(self)
135+
auth = TokenAuthentication(
136+
token=run_oc_command(["whoami", "--show-token=true"]),
137+
server=run_oc_command(["whoami", "--show-server=true"]),
138+
skip_tls=True,
139+
)
140+
auth.login()
141+
self.namespace = namespace
142+
self.cluster = get_cluster("mnist", self.namespace)
143+
self.cluster_queue = cluster_queue
144+
self.resource_flavor = flavor
145+
if not self.cluster:
146+
raise RuntimeError("TestSetupSleepRayJob needs to be run before this test")
147+
148+
def teardown_method(self):
149+
delete_namespace(self)
150+
delete_kueue_resources(self)
151+
152+
def test_mnist_job_running(self):
153+
client = self.get_ray_job_client(self.cluster)
154+
self.assertJobExists(client, 1)
155+
self.assertJobRunning(client)
156+
self.cluster.down()
157+
158+
def get_ray_job_client(self, cluster):
159+
auth_token = run_oc_command(["whoami", "--show-token=true"])
160+
ray_dashboard = cluster.cluster_dashboard_uri()
161+
header = {"Authorization": f"Bearer {auth_token}"}
162+
return RayJobClient(address=ray_dashboard, headers=header, verify=False)
163+
164+
# Assertions
165+
def assertJobExists(self, client, expectedJobsSize):
166+
job_list = client.list_jobs()
167+
assert len(job_list) == expectedJobsSize
168+
169+
def assertJobRunning(self, client):
170+
job_list = client.list_jobs()
171+
submission_id = job_list[0].submission_id
172+
status = client.get_job_status(submission_id)
173+
if status == "RUNNING":
174+
print(f"Job is Running: '{status}'")
175+
assert True
176+
else:
177+
print(f"Job is not in Running state: '{status}'")
178+
assert False

0 commit comments

Comments
 (0)