|
68 | 68 | " min_memory=8,\n",
|
69 | 69 | " max_memory=8,\n",
|
70 | 70 | " num_gpus=1,\n",
|
| 71 | + " head_gpus=1,\n", |
71 | 72 | " image=\"quay.io/project-codeflare/ray:latest-py39-cu118\",\n",
|
72 | 73 | " write_to_file=False, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources \n",
|
73 | 74 | " # local_queue=\"local-queue-name\" # Specify the local queue manually\n",
|
|
147 | 148 | "metadata": {},
|
148 | 149 | "outputs": [],
|
149 | 150 | "source": [
|
150 |
| - "#before proceeding make sure the cluster exists and the uri is not empty\n", |
| 151 | + "# before proceeding make sure the cluster exists and the uri is not empty\n", |
151 | 152 | "assert ray_cluster_uri, \"Ray cluster needs to be started and set before proceeding\"\n",
|
152 | 153 | "\n",
|
153 | 154 | "import ray\n",
|
154 |
| - "from ray.air.config import ScalingConfig\n", |
155 | 155 | "\n",
|
156 | 156 | "# reset the ray context in case there's already one. \n",
|
157 | 157 | "ray.shutdown()\n",
|
158 | 158 | "# establish connection to ray cluster\n",
|
159 | 159 | "\n",
|
160 |
| - "#install additional libraries that will be required for model training\n", |
161 |
| - "runtime_env = {\"pip\": [\"transformers\", \"datasets\", \"evaluate\", \"pyarrow<7.0.0\", \"accelerate\"]}\n", |
162 |
| - "\n", |
| 160 | + "# install additional libraries that will be required for model training\n", |
| 161 | + "runtime_env = {\"pip\": [\"pytorch_lightning==1.5.10\", \"ray_lightning\", \"torchmetrics==0.9.1\", \"torchvision==0.12.0\"]}\n", |
163 | 162 | "# NOTE: This will work for in-cluster notebook servers (RHODS/ODH), but not for local machines\n",
|
164 | 163 | "# To see how to connect from your laptop, go to demo-notebooks/additional-demos/local_interactive.ipynb\n",
|
165 | 164 | "ray.init(address=ray_cluster_uri, runtime_env=runtime_env)\n",
|
|
172 | 171 | "id": "9711030b",
|
173 | 172 | "metadata": {},
|
174 | 173 | "source": [
|
175 |
| - "Now that we are connected (and have passed in some package requirements), let's try writing some training code for a DistilBERT transformer model via HuggingFace (using IMDB dataset):" |
| 174 | + "Now that we are connected (and have passed in some package requirements), let's try writing some training code:" |
176 | 175 | ]
|
177 | 176 | },
|
178 | 177 | {
|
|
184 | 183 | "source": [
|
185 | 184 | "@ray.remote\n",
|
186 | 185 | "def train_fn():\n",
|
187 |
| - " from datasets import load_dataset\n", |
188 |
| - " import transformers\n", |
189 |
| - " from transformers import AutoTokenizer, TrainingArguments\n", |
190 |
| - " from transformers import AutoModelForSequenceClassification\n", |
191 |
| - " import numpy as np\n", |
192 |
| - " from datasets import load_metric\n", |
| 186 | + " import torch\n", |
| 187 | + " import torch.nn as nn\n", |
193 | 188 | " import ray\n",
|
194 |
| - " from ray import tune\n", |
195 |
| - " from ray.train.huggingface import HuggingFaceTrainer\n", |
| 189 | + " from torch.utils.data import DataLoader\n", |
| 190 | + " from torchvision import datasets\n", |
| 191 | + " from torchvision.transforms import ToTensor\n", |
| 192 | + " from ray.train.torch import TorchTrainer\n", |
| 193 | + " from ray.train import ScalingConfig\n", |
196 | 194 | "\n",
|
197 |
| - " dataset = load_dataset(\"imdb\")\n", |
198 |
| - " tokenizer = AutoTokenizer.from_pretrained(\"distilbert-base-uncased\")\n", |
| 195 | + " def get_dataset():\n", |
| 196 | + " return datasets.FashionMNIST(\n", |
| 197 | + " root=\"/tmp/data\",\n", |
| 198 | + " train=True,\n", |
| 199 | + " download=True,\n", |
| 200 | + " transform=ToTensor(),\n", |
| 201 | + " )\n", |
199 | 202 | "\n",
|
200 |
| - " def tokenize_function(examples):\n", |
201 |
| - " return tokenizer(examples[\"text\"], padding=\"max_length\", truncation=True)\n", |
| 203 | + " class NeuralNetwork(nn.Module):\n", |
| 204 | + " def __init__(self):\n", |
| 205 | + " super().__init__()\n", |
| 206 | + " self.flatten = nn.Flatten()\n", |
| 207 | + " self.linear_relu_stack = nn.Sequential(\n", |
| 208 | + " nn.Linear(28 * 28, 512),\n", |
| 209 | + " nn.ReLU(),\n", |
| 210 | + " nn.Linear(512, 512),\n", |
| 211 | + " nn.ReLU(),\n", |
| 212 | + " nn.Linear(512, 10),\n", |
| 213 | + " )\n", |
202 | 214 | "\n",
|
203 |
| - " tokenized_datasets = dataset.map(tokenize_function, batched=True)\n", |
| 215 | + " def forward(self, inputs):\n", |
| 216 | + " inputs = self.flatten(inputs)\n", |
| 217 | + " logits = self.linear_relu_stack(inputs)\n", |
| 218 | + " return logits\n", |
204 | 219 | "\n",
|
205 |
| - " #using a fraction of dataset but you can run with the full dataset\n", |
206 |
| - " small_train_dataset = tokenized_datasets[\"train\"].shuffle(seed=42).select(range(100))\n", |
207 |
| - " small_eval_dataset = tokenized_datasets[\"test\"].shuffle(seed=42).select(range(100))\n", |
| 220 | + " def train_func_distributed():\n", |
| 221 | + " num_epochs = 3\n", |
| 222 | + " batch_size = 64\n", |
208 | 223 | "\n",
|
209 |
| - " print(f\"len of train {small_train_dataset} and test {small_eval_dataset}\")\n", |
| 224 | + " dataset = get_dataset()\n", |
| 225 | + " dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)\n", |
| 226 | + " dataloader = ray.train.torch.prepare_data_loader(dataloader)\n", |
210 | 227 | "\n",
|
211 |
| - " ray_train_ds = ray.data.from_huggingface(small_train_dataset)\n", |
212 |
| - " ray_evaluation_ds = ray.data.from_huggingface(small_eval_dataset)\n", |
| 228 | + " model = NeuralNetwork()\n", |
| 229 | + " model = ray.train.torch.prepare_model(model)\n", |
213 | 230 | "\n",
|
214 |
| - " def compute_metrics(eval_pred):\n", |
215 |
| - " metric = load_metric(\"accuracy\")\n", |
216 |
| - " logits, labels = eval_pred\n", |
217 |
| - " predictions = np.argmax(logits, axis=-1)\n", |
218 |
| - " return metric.compute(predictions=predictions, references=labels)\n", |
| 231 | + " criterion = nn.CrossEntropyLoss()\n", |
| 232 | + " optimizer = torch.optim.SGD(model.parameters(), lr=0.01)\n", |
219 | 233 | "\n",
|
220 |
| - " def trainer_init_per_worker(train_dataset, eval_dataset, **config):\n", |
221 |
| - " model = AutoModelForSequenceClassification.from_pretrained(\"distilbert-base-uncased\", num_labels=2)\n", |
| 234 | + " for epoch in range(num_epochs):\n", |
| 235 | + " if ray.train.get_context().get_world_size() > 1:\n", |
| 236 | + " dataloader.sampler.set_epoch(epoch)\n", |
222 | 237 | "\n",
|
223 |
| - " training_args = TrainingArguments(\"/tmp/hf_imdb/test\", eval_steps=1, disable_tqdm=True, \n", |
224 |
| - " num_train_epochs=1, skip_memory_metrics=True,\n", |
225 |
| - " learning_rate=2e-5,\n", |
226 |
| - " per_device_train_batch_size=16,\n", |
227 |
| - " per_device_eval_batch_size=16, \n", |
228 |
| - " weight_decay=0.01,)\n", |
229 |
| - " return transformers.Trainer(\n", |
230 |
| - " model=model,\n", |
231 |
| - " args=training_args,\n", |
232 |
| - " train_dataset=train_dataset,\n", |
233 |
| - " eval_dataset=eval_dataset,\n", |
234 |
| - " compute_metrics=compute_metrics\n", |
235 |
| - " )\n", |
| 238 | + " for inputs, labels in dataloader:\n", |
| 239 | + " optimizer.zero_grad()\n", |
| 240 | + " pred = model(inputs)\n", |
| 241 | + " loss = criterion(pred, labels)\n", |
| 242 | + " loss.backward()\n", |
| 243 | + " optimizer.step()\n", |
| 244 | + " print(f\"epoch: {epoch}, loss: {loss.item()}\")\n", |
236 | 245 | "\n",
|
237 |
| - " scaling_config = ScalingConfig(num_workers=2, use_gpu=True) #num workers is the number of gpus\n", |
| 246 | + " # For GPU Training, set `use_gpu` to True.\n", |
| 247 | + " use_gpu = True\n", |
238 | 248 | "\n",
|
239 |
| - " # we are using the ray native HuggingFaceTrainer, but you can swap out to use non ray Huggingface Trainer. Both have the same method signature. \n", |
240 |
| - " # the ray native HFTrainer has built in support for scaling to multiple GPUs\n", |
241 |
| - " trainer = HuggingFaceTrainer(\n", |
242 |
| - " trainer_init_per_worker=trainer_init_per_worker,\n", |
243 |
| - " scaling_config=scaling_config,\n", |
244 |
| - " datasets={\"train\": ray_train_ds, \"evaluation\": ray_evaluation_ds},\n", |
| 249 | + " trainer = TorchTrainer(\n", |
| 250 | + " train_func_distributed,\n", |
| 251 | + " scaling_config=ScalingConfig(\n", |
| 252 | + " num_workers=3, use_gpu=use_gpu\n", |
| 253 | + " ), # num_workers = number of worker nodes with the ray head node included\n", |
245 | 254 | " )\n",
|
246 |
| - " result = trainer.fit()" |
| 255 | + " trainer.fit()" |
247 | 256 | ]
|
248 | 257 | },
|
249 | 258 | {
|
|
0 commit comments