Skip to content

Commit 0b3d41b

Browse files
committed
SUBMARINE-54. Add test coverage for YarnServiceJobSubmitter and make it ready for extension for PyTorch. Contributed by Szilard Nemeth.
1 parent afe6613 commit 0b3d41b

File tree

55 files changed

+5629
-1844
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+5629
-1844
lines changed

hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,10 +293,18 @@ public String getPsDockerImage() {
293293
return psDockerImage;
294294
}
295295

296+
public void setPsDockerImage(String psDockerImage) {
297+
this.psDockerImage = psDockerImage;
298+
}
299+
296300
public String getWorkerDockerImage() {
297301
return workerDockerImage;
298302
}
299303

304+
public void setWorkerDockerImage(String workerDockerImage) {
305+
this.workerDockerImage = workerDockerImage;
306+
}
307+
300308
public boolean isDistributed() {
301309
return distributed;
302310
}
@@ -313,6 +321,10 @@ public String getTensorboardDockerImage() {
313321
return tensorboardDockerImage;
314322
}
315323

324+
public void setTensorboardDockerImage(String tensorboardDockerImage) {
325+
this.tensorboardDockerImage = tensorboardDockerImage;
326+
}
327+
316328
public List<Quicklink> getQuicklinks() {
317329
return quicklinks;
318330
}
@@ -366,6 +378,10 @@ public RunJobParameters setConfPairs(List<String> confPairs) {
366378
return this;
367379
}
368380

381+
public void setDistributed(boolean distributed) {
382+
this.distributed = distributed;
383+
}
384+
369385
@VisibleForTesting
370386
public static class UnderscoreConverterPropertyUtils extends PropertyUtils {
371387
@Override

hadoop-submarine/hadoop-submarine-core/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,25 @@ public void testNoInputPathOptionButOnlyRunTensorboard() throws Exception {
177177
Assert.assertTrue(success);
178178
}
179179

180+
@Test
181+
public void testJobWithoutName() throws Exception {
182+
RunJobCli runJobCli = new RunJobCli(getMockClientContext());
183+
String expectedErrorMessage =
184+
"--" + CliConstants.NAME + " is absent";
185+
String actualMessage = "";
186+
try {
187+
runJobCli.run(
188+
new String[]{"--docker_image", "tf-docker:1.1.0",
189+
"--num_workers", "0", "--tensorboard", "--verbose",
190+
"--tensorboard_resources", "memory=2G,vcores=2",
191+
"--tensorboard_docker_image", "tb_docker_image:001"});
192+
} catch (ParseException e) {
193+
actualMessage = e.getMessage();
194+
e.printStackTrace();
195+
}
196+
assertEquals(expectedErrorMessage, actualMessage);
197+
}
198+
180199
@Test
181200
public void testLaunchCommandPatternReplace() throws Exception {
182201
RunJobCli runJobCli = new RunJobCli(getMockClientContext());

hadoop-submarine/hadoop-submarine-core/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.io.File;
2828
import java.io.IOException;
29+
import java.util.Objects;
2930

3031
public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
3132
private File jobsParentDir = null;
@@ -35,6 +36,7 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
3536
@Override
3637
public Path getJobStagingArea(String jobName, boolean create)
3738
throws IOException {
39+
Objects.requireNonNull(jobName, "Job name must not be null!");
3840
if (jobsParentDir == null && create) {
3941
jobsParentDir = new File(
4042
"target/_staging_area_" + System.currentTimeMillis());

hadoop-submarine/hadoop-submarine-yarnservice-runtime/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@
115115
<artifactId>hadoop-yarn-services-core</artifactId>
116116
<version>3.3.0-SNAPSHOT</version>
117117
</dependency>
118+
<dependency>
119+
<groupId>org.apache.hadoop</groupId>
120+
<artifactId>hadoop-yarn-common</artifactId>
121+
<type>test-jar</type>
122+
<scope>test</scope>
123+
</dependency>
118124
</dependencies>
119125

120126
<build>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
18+
19+
import org.apache.hadoop.conf.Configuration;
20+
import org.apache.hadoop.fs.Path;
21+
import org.apache.hadoop.yarn.service.api.records.Component;
22+
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
23+
import org.apache.hadoop.yarn.submarine.common.api.TaskType;
24+
import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
25+
import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.command.AbstractLaunchCommand;
26+
import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.command.LaunchCommandFactory;
27+
28+
import java.io.IOException;
29+
import java.util.Objects;
30+
31+
import static org.apache.hadoop.yarn.submarine.runtimes.yarnservice.tensorflow.TensorFlowCommons.getScriptFileName;
32+
33+
/**
34+
* Abstract base class for Component classes.
35+
* The implementations of this class are act like factories for
36+
* {@link Component} instances.
37+
* All dependencies are passed to the constructor so that child classes
38+
* are obliged to provide matching constructors.
39+
*/
40+
public abstract class AbstractComponent {
41+
private final FileSystemOperations fsOperations;
42+
protected final RunJobParameters parameters;
43+
protected final TaskType taskType;
44+
private final RemoteDirectoryManager remoteDirectoryManager;
45+
protected final Configuration yarnConfig;
46+
private final LaunchCommandFactory launchCommandFactory;
47+
48+
/**
49+
* This is only required for testing.
50+
*/
51+
private String localScriptFile;
52+
53+
public AbstractComponent(FileSystemOperations fsOperations,
54+
RemoteDirectoryManager remoteDirectoryManager,
55+
RunJobParameters parameters, TaskType taskType,
56+
Configuration yarnConfig,
57+
LaunchCommandFactory launchCommandFactory) {
58+
this.fsOperations = fsOperations;
59+
this.remoteDirectoryManager = remoteDirectoryManager;
60+
this.parameters = parameters;
61+
this.taskType = taskType;
62+
this.launchCommandFactory = launchCommandFactory;
63+
this.yarnConfig = yarnConfig;
64+
}
65+
66+
protected abstract Component createComponent() throws IOException;
67+
68+
/**
69+
* Generates a command launch script on local disk,
70+
* returns path to the script.
71+
*/
72+
protected void generateLaunchCommand(Component component)
73+
throws IOException {
74+
AbstractLaunchCommand launchCommand =
75+
launchCommandFactory.createLaunchCommand(taskType, component);
76+
this.localScriptFile = launchCommand.generateLaunchScript();
77+
78+
String remoteLaunchCommand = uploadLaunchCommand(component);
79+
component.setLaunchCommand(remoteLaunchCommand);
80+
}
81+
82+
private String uploadLaunchCommand(Component component)
83+
throws IOException {
84+
Objects.requireNonNull(localScriptFile, "localScriptFile should be " +
85+
"set before calling this method!");
86+
Path stagingDir =
87+
remoteDirectoryManager.getJobStagingArea(parameters.getName(), true);
88+
89+
String destScriptFileName = getScriptFileName(taskType);
90+
fsOperations.uploadToRemoteFileAndLocalizeToContainerWorkDir(stagingDir,
91+
localScriptFile, destScriptFileName, component);
92+
93+
return "./" + destScriptFileName;
94+
}
95+
96+
String getLocalScriptFile() {
97+
return localScriptFile;
98+
}
99+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
18+
19+
import com.google.common.annotations.VisibleForTesting;
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.FileStatus;
22+
import org.apache.hadoop.fs.FileSystem;
23+
import org.apache.hadoop.fs.FileUtil;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hadoop.fs.permission.FsPermission;
26+
import org.apache.hadoop.yarn.service.api.records.Component;
27+
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
28+
import org.apache.hadoop.yarn.submarine.common.ClientContext;
29+
import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration;
30+
import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
31+
import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
32+
import org.apache.hadoop.yarn.submarine.utils.ZipUtilities;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.io.File;
37+
import java.io.FileNotFoundException;
38+
import java.io.IOException;
39+
import java.util.HashSet;
40+
import java.util.Set;
41+
42+
/**
43+
* Contains methods to perform file system operations. Almost all of the methods
44+
* are regular non-static methods as the operations are performed with the help
45+
* of a {@link RemoteDirectoryManager} instance passed in as a constructor
46+
* dependency. Please note that some operations require to read config settings
47+
* as well, so that we have Submarine and YARN config objects as dependencies as
48+
* well.
49+
*/
50+
public class FileSystemOperations {
51+
private static final Logger LOG =
52+
LoggerFactory.getLogger(FileSystemOperations.class);
53+
private final Configuration submarineConfig;
54+
private final Configuration yarnConfig;
55+
56+
private Set<Path> uploadedFiles = new HashSet<>();
57+
private RemoteDirectoryManager remoteDirectoryManager;
58+
59+
public FileSystemOperations(ClientContext clientContext) {
60+
this.remoteDirectoryManager = clientContext.getRemoteDirectoryManager();
61+
this.submarineConfig = clientContext.getSubmarineConfig();
62+
this.yarnConfig = clientContext.getYarnConfig();
63+
}
64+
65+
/**
66+
* May download a remote uri(file/dir) and zip.
67+
* Skip download if local dir
68+
* Remote uri can be a local dir(won't download)
69+
* or remote HDFS dir, s3 dir/file .etc
70+
* */
71+
public String downloadAndZip(String remoteDir, String zipFileName,
72+
boolean doZip)
73+
throws IOException {
74+
//Append original modification time and size to zip file name
75+
String suffix;
76+
String srcDir = remoteDir;
77+
String zipDirPath =
78+
System.getProperty("java.io.tmpdir") + "/" + zipFileName;
79+
boolean needDeleteTempDir = false;
80+
if (remoteDirectoryManager.isRemote(remoteDir)) {
81+
//Append original modification time and size to zip file name
82+
FileStatus status =
83+
remoteDirectoryManager.getRemoteFileStatus(new Path(remoteDir));
84+
suffix = "_" + status.getModificationTime()
85+
+ "-" + remoteDirectoryManager.getRemoteFileSize(remoteDir);
86+
// Download them to temp dir
87+
boolean downloaded =
88+
remoteDirectoryManager.copyRemoteToLocal(remoteDir, zipDirPath);
89+
if (!downloaded) {
90+
throw new IOException("Failed to download files from "
91+
+ remoteDir);
92+
}
93+
LOG.info("Downloaded remote: {} to local: {}", remoteDir, zipDirPath);
94+
srcDir = zipDirPath;
95+
needDeleteTempDir = true;
96+
} else {
97+
File localDir = new File(remoteDir);
98+
suffix = "_" + localDir.lastModified()
99+
+ "-" + localDir.length();
100+
}
101+
if (!doZip) {
102+
return srcDir;
103+
}
104+
// zip a local dir
105+
String zipFileUri =
106+
ZipUtilities.zipDir(srcDir, zipDirPath + suffix + ".zip");
107+
// delete downloaded temp dir
108+
if (needDeleteTempDir) {
109+
deleteFiles(srcDir);
110+
}
111+
return zipFileUri;
112+
}
113+
114+
public void deleteFiles(String localUri) {
115+
boolean success = FileUtil.fullyDelete(new File(localUri));
116+
if (!success) {
117+
LOG.warn("Failed to delete {}", localUri);
118+
}
119+
LOG.info("Deleted {}", localUri);
120+
}
121+
122+
@VisibleForTesting
123+
public void uploadToRemoteFileAndLocalizeToContainerWorkDir(Path stagingDir,
124+
String fileToUpload, String destFilename, Component comp)
125+
throws IOException {
126+
Path uploadedFilePath = uploadToRemoteFile(stagingDir, fileToUpload);
127+
locateRemoteFileToContainerWorkDir(destFilename, comp, uploadedFilePath);
128+
}
129+
130+
private void locateRemoteFileToContainerWorkDir(String destFilename,
131+
Component comp, Path uploadedFilePath)
132+
throws IOException {
133+
FileSystem fs = FileSystem.get(yarnConfig);
134+
135+
FileStatus fileStatus = fs.getFileStatus(uploadedFilePath);
136+
LOG.info("Uploaded file path = " + fileStatus.getPath());
137+
138+
// Set it to component's files list
139+
comp.getConfiguration().getFiles().add(new ConfigFile().srcFile(
140+
fileStatus.getPath().toUri().toString()).destFile(destFilename)
141+
.type(ConfigFile.TypeEnum.STATIC));
142+
}
143+
144+
public Path uploadToRemoteFile(Path stagingDir, String fileToUpload) throws
145+
IOException {
146+
FileSystem fs = remoteDirectoryManager.getDefaultFileSystem();
147+
148+
// Upload to remote FS under staging area
149+
File localFile = new File(fileToUpload);
150+
if (!localFile.exists()) {
151+
throw new FileNotFoundException(
152+
"Trying to upload file=" + localFile.getAbsolutePath()
153+
+ " to remote, but couldn't find local file.");
154+
}
155+
String filename = new File(fileToUpload).getName();
156+
157+
Path uploadedFilePath = new Path(stagingDir, filename);
158+
if (!uploadedFiles.contains(uploadedFilePath)) {
159+
if (SubmarineLogs.isVerbose()) {
160+
LOG.info("Copying local file=" + fileToUpload + " to remote="
161+
+ uploadedFilePath);
162+
}
163+
fs.copyFromLocalFile(new Path(fileToUpload), uploadedFilePath);
164+
uploadedFiles.add(uploadedFilePath);
165+
}
166+
return uploadedFilePath;
167+
}
168+
169+
public void validFileSize(String uri) throws IOException {
170+
long actualSizeByte;
171+
String locationType = "Local";
172+
if (remoteDirectoryManager.isRemote(uri)) {
173+
actualSizeByte = remoteDirectoryManager.getRemoteFileSize(uri);
174+
locationType = "Remote";
175+
} else {
176+
actualSizeByte = FileUtil.getDU(new File(uri));
177+
}
178+
long maxFileSizeMB = submarineConfig
179+
.getLong(SubmarineConfiguration.LOCALIZATION_MAX_ALLOWED_FILE_SIZE_MB,
180+
SubmarineConfiguration.DEFAULT_MAX_ALLOWED_REMOTE_URI_SIZE_MB);
181+
LOG.info("{} fie/dir: {}, size(Byte):{},"
182+
+ " Allowed max file/dir size: {}",
183+
locationType, uri, actualSizeByte, maxFileSizeMB * 1024 * 1024);
184+
185+
if (actualSizeByte > maxFileSizeMB * 1024 * 1024) {
186+
throw new IOException(uri + " size(Byte): "
187+
+ actualSizeByte + " exceeds configured max size:"
188+
+ maxFileSizeMB * 1024 * 1024);
189+
}
190+
}
191+
192+
public void setPermission(Path destPath, FsPermission permission) throws
193+
IOException {
194+
FileSystem fs = FileSystem.get(yarnConfig);
195+
fs.setPermission(destPath, new FsPermission(permission));
196+
}
197+
198+
public static boolean needHdfs(String content) {
199+
return content != null && content.contains("hdfs://");
200+
}
201+
}

0 commit comments

Comments
 (0)