Skip to content

Commit df03ba1

Browse files
committed
Add Ray Cluster Upgrade test for Ray Job Long Running scenarios
1 parent 9a8603d commit df03ba1

File tree

2 files changed

+247
-0
lines changed

2 files changed

+247
-0
lines changed

tests/e2e/mnist_sleep.py

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Copyright 2022 IBM, Red Hat
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import time
16+
import torch
17+
import torch.nn as nn
18+
from torch.utils.data import DataLoader
19+
from torchvision import datasets, transforms
20+
21+
22+
# Define a simple neural network
23+
class NeuralNetwork(nn.Module):
24+
def __init__(self):
25+
super(NeuralNetwork, self).__init__()
26+
self.flatten = nn.Flatten()
27+
self.linear_relu_stack = nn.Sequential(
28+
nn.Linear(28 * 28, 512),
29+
nn.ReLU(),
30+
nn.Linear(512, 512),
31+
nn.ReLU(),
32+
nn.Linear(512, 10),
33+
)
34+
35+
def forward(self, x):
36+
x = self.flatten(x)
37+
logits = self.linear_relu_stack(x)
38+
return logits
39+
40+
41+
# Define the training function
42+
def train():
43+
# Sleeping for 24 hours for upgrade test scenario
44+
print("Sleeping for 24 hours before starting the training for upgrade testing...")
45+
time.sleep(24 * 60 * 60)
46+
47+
# Load dataset
48+
transform = transforms.Compose([transforms.ToTensor()])
49+
train_dataset = datasets.FashionMNIST(
50+
root="./data", train=True, download=True, transform=transform
51+
)
52+
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
53+
54+
# Initialize the neural network, loss function, and optimizer
55+
model = NeuralNetwork()
56+
criterion = nn.CrossEntropyLoss()
57+
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
58+
59+
# Train the model
60+
num_epochs = 3
61+
for epoch in range(num_epochs):
62+
for inputs, labels in train_loader:
63+
optimizer.zero_grad()
64+
outputs = model(inputs)
65+
loss = criterion(outputs, labels)
66+
loss.backward()
67+
optimizer.step()
68+
print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")
69+
70+
71+
if __name__ == "__main__":
72+
train()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import requests
2+
from time import sleep
3+
4+
from codeflare_sdk import (
5+
Cluster,
6+
ClusterConfiguration,
7+
TokenAuthentication,
8+
get_cluster,
9+
)
10+
from codeflare_sdk.job import RayJobClient
11+
12+
from tests.e2e.support import *
13+
14+
15+
from codeflare_sdk.utils.kube_api_helpers import _kube_api_error_handling
16+
17+
namespace = "test-ns-rayupgrade-sleep"
18+
# Global variables for kueue resources
19+
cluster_queue = "cluster-queue-mnist"
20+
flavor = "default-flavor-mnist"
21+
local_queue = "local-queue-mnist"
22+
23+
24+
# Creates a Ray cluster , submit RayJob mnist script long running
25+
class TestSetupSleepRayJob:
26+
def setup_method(self):
27+
initialize_kubernetes_client(self)
28+
create_namespace_with_name(self, namespace)
29+
try:
30+
create_cluster_queue(self, cluster_queue, flavor)
31+
create_resource_flavor(self, flavor)
32+
create_local_queue(self, cluster_queue, local_queue)
33+
except Exception as e:
34+
delete_namespace(self)
35+
delete_kueue_resources(self)
36+
return _kube_api_error_handling(e)
37+
38+
def test_mnist_ray_cluster_sdk_auth(self):
39+
self.run_mnist_raycluster_sdk_oauth()
40+
41+
def run_mnist_raycluster_sdk_oauth(self):
42+
ray_image = get_ray_image()
43+
44+
auth = TokenAuthentication(
45+
token=run_oc_command(["whoami", "--show-token=true"]),
46+
server=run_oc_command(["whoami", "--show-server=true"]),
47+
skip_tls=True,
48+
)
49+
auth.login()
50+
51+
cluster = Cluster(
52+
ClusterConfiguration(
53+
name="mnist",
54+
namespace=self.namespace,
55+
num_workers=1,
56+
head_cpus=1,
57+
head_memory=4,
58+
worker_cpu_requests=1,
59+
worker_cpu_limits=1,
60+
worker_memory_requests=4,
61+
worker_memory_limits=4,
62+
image=ray_image,
63+
write_to_file=True,
64+
verify_tls=False,
65+
)
66+
)
67+
68+
try:
69+
cluster.up()
70+
cluster.status()
71+
# wait for raycluster to be Ready
72+
cluster.wait_ready()
73+
cluster.status()
74+
# Check cluster details
75+
cluster.details()
76+
# Assert the cluster status is READY
77+
_, ready = cluster.status()
78+
assert ready
79+
submission_id = self.assert_jobsubmit()
80+
print(f"Job submitted successfully, job submission id: ", submission_id)
81+
82+
except Exception as e:
83+
print(f"An unexpected error occurred. Error: ", e)
84+
delete_namespace(self)
85+
delete_kueue_resources(self)
86+
assert False, "Cluster is not ready!"
87+
88+
def assert_jobsubmit(self):
89+
auth_token = run_oc_command(["whoami", "--show-token=true"])
90+
cluster = get_cluster("mnist", namespace)
91+
cluster.details()
92+
ray_dashboard = cluster.cluster_dashboard_uri()
93+
header = {"Authorization": f"Bearer {auth_token}"}
94+
client = RayJobClient(address=ray_dashboard, headers=header, verify=False)
95+
96+
# Submit the job
97+
submission_id = client.submit_job(
98+
entrypoint="python mnist_sleep.py",
99+
runtime_env={
100+
"working_dir": "./tests/e2e/",
101+
"pip": {"packages": ["torchvision==0.12.0"], "pip_check": False},
102+
"env_vars": get_setup_env_variables(),
103+
},
104+
)
105+
print(f"Submitted job with ID: {submission_id}")
106+
done = False
107+
time = 0
108+
timeout = 180
109+
while not done:
110+
status = client.get_job_status(submission_id)
111+
if status.is_terminal():
112+
break
113+
if status == "RUNNING":
114+
print(f"Job is Running: '{status}'")
115+
assert True
116+
break
117+
if not done:
118+
print(status)
119+
if timeout and time >= timeout:
120+
raise TimeoutError(f"job has timed out after waiting {timeout}s")
121+
sleep(5)
122+
time += 5
123+
124+
logs = client.get_job_logs(submission_id)
125+
print(logs)
126+
return submission_id
127+
128+
129+
class TestVerifySleepRayJobRunning:
130+
def setup_method(self):
131+
initialize_kubernetes_client(self)
132+
auth = TokenAuthentication(
133+
token=run_oc_command(["whoami", "--show-token=true"]),
134+
server=run_oc_command(["whoami", "--show-server=true"]),
135+
skip_tls=True,
136+
)
137+
auth.login()
138+
self.namespace = namespace
139+
self.cluster = get_cluster("mnist", self.namespace)
140+
self.cluster_queue = cluster_queue
141+
self.resource_flavor = flavor
142+
if not self.cluster:
143+
raise RuntimeError("TestSetupSleepRayJob needs to be run before this test")
144+
145+
def teardown_method(self):
146+
delete_namespace(self)
147+
delete_kueue_resources(self)
148+
149+
def test_mnist_job_running(self):
150+
client = self.get_ray_job_client(self.cluster)
151+
self.assertJobExists(client, 1)
152+
self.assertJobRunning(client)
153+
self.cluster.down()
154+
155+
def get_ray_job_client(self, cluster):
156+
auth_token = run_oc_command(["whoami", "--show-token=true"])
157+
ray_dashboard = cluster.cluster_dashboard_uri()
158+
header = {"Authorization": f"Bearer {auth_token}"}
159+
return RayJobClient(address=ray_dashboard, headers=header, verify=False)
160+
161+
# Assertions
162+
def assertJobExists(self, client, expectedJobsSize):
163+
job_list = client.list_jobs()
164+
assert len(job_list) == expectedJobsSize
165+
166+
def assertJobRunning(self, client):
167+
job_list = client.list_jobs()
168+
submission_id = job_list[0].submission_id
169+
status = client.get_job_status(submission_id)
170+
if status == "RUNNING":
171+
print(f"Job is Running: '{status}'")
172+
assert True
173+
else:
174+
print(f"Job is not in Running state: '{status}'")
175+
assert False

0 commit comments

Comments
 (0)