Skip to content

Commit 1fc864f

Browse files
committed
HADOOP-13786: MR job test now also runs as an IT test
1 parent 7d5edac commit 1fc864f

14 files changed

+953
-149
lines changed

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,4 +794,6 @@ public boolean accept(Path path) {
794794
}
795795
}
796796

797+
public final static PathFilter TEMP_FILE_FILTER = new FilterTempFiles();
798+
797799
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a;
20+
21+
import com.google.common.base.Joiner;
22+
import com.google.common.base.Preconditions;
23+
import com.google.common.collect.Maps;
24+
import org.apache.hadoop.fs.FileSystem;
25+
import org.apache.hadoop.fs.StorageStatistics;
26+
27+
import java.util.HashMap;
28+
import java.util.Iterator;
29+
import java.util.Map;
30+
31+
/**
32+
* Class to track storage statistics of a filesystem, generate diffs.
33+
*/
34+
public class StorageStatisticsTracker {
35+
36+
private final FileSystem fs;
37+
private Map<String, Long> stats;
38+
39+
public StorageStatisticsTracker(FileSystem fs) {
40+
this.fs = fs;
41+
snapshot();
42+
}
43+
44+
public void mark() {
45+
stats = snapshot();
46+
}
47+
48+
public Map<String, Long> compare(Map<String, Long> current) {
49+
Map<String, Long> diff = new HashMap<>(stats.size());
50+
for (Map.Entry<String, Long> entry : stats.entrySet()) {
51+
String key = entry.getKey();
52+
Long latest = current.get(key);
53+
if (latest != null && !latest.equals(entry.getValue())) {
54+
diff.put(key, entry.getValue() - latest);
55+
}
56+
}
57+
return diff;
58+
}
59+
60+
public Map<String, Long> compareToCurrent() {
61+
return compare(snapshot());
62+
}
63+
64+
public String toString(Map<String, Long> map) {
65+
return Joiner.on("\n").withKeyValueSeparator("=").join(map);
66+
}
67+
68+
public Map<String, Long> snapshot() {
69+
StatsIterator values = latestValues();
70+
Map<String, Long> snapshot = new HashMap<>(
71+
stats == null ? 0 : stats.size());
72+
for (StorageStatistics.LongStatistic value : values) {
73+
snapshot.put(value.getName(), value.getValue());
74+
}
75+
return snapshot;
76+
}
77+
78+
public StatsIterator latestValues() {
79+
return new StatsIterator(fs.getStorageStatistics());
80+
}
81+
82+
/**
83+
* Provide an iterator to the stats.
84+
*/
85+
public static class StatsIterator implements
86+
Iterable<StorageStatistics.LongStatistic> {
87+
private final StorageStatistics statistics;
88+
89+
public StatsIterator(StorageStatistics statistics) {
90+
this.statistics = statistics;
91+
}
92+
93+
@Override
94+
public Iterator<StorageStatistics.LongStatistic> iterator() {
95+
return statistics.getLongStatistics();
96+
}
97+
}
98+
99+
100+
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.commit;
20+
21+
import com.google.common.collect.Sets;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.fs.FileStatus;
24+
import org.apache.hadoop.fs.FileSystem;
25+
import org.apache.hadoop.fs.Path;
26+
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
27+
import org.apache.hadoop.fs.s3a.StorageStatisticsTracker;
28+
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants;
29+
import org.apache.hadoop.hdfs.MiniDFSCluster;
30+
import org.apache.hadoop.io.LongWritable;
31+
import org.apache.hadoop.io.Text;
32+
import org.apache.hadoop.mapred.JobConf;
33+
import org.apache.hadoop.mapreduce.Job;
34+
import org.apache.hadoop.mapreduce.Mapper;
35+
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
36+
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
37+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
38+
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
39+
import org.apache.hadoop.service.ServiceOperations;
40+
import org.junit.AfterClass;
41+
import org.junit.BeforeClass;
42+
import org.junit.Rule;
43+
import org.junit.Test;
44+
import org.junit.rules.TemporaryFolder;
45+
46+
import java.io.File;
47+
import java.io.FileOutputStream;
48+
import java.io.IOException;
49+
import java.nio.charset.StandardCharsets;
50+
import java.util.Set;
51+
import java.util.UUID;
52+
53+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
54+
import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.COMMITTER_UNIQUE_FILENAMES;
55+
56+
/** Test suite.*/
57+
public class AbstractITCommitMRJob extends AbstractS3ATestBase {
58+
59+
private static MiniDFSTestCluster hdfs;
60+
private static MiniMRYarnCluster yarn = null;
61+
private static JobConf conf = null;
62+
private boolean uniqueFilenames = false;
63+
64+
protected static FileSystem getDFS() {
65+
return hdfs.getClusterFS();
66+
}
67+
68+
@BeforeClass
69+
public static void setupClusters() throws IOException {
70+
JobConf c = new JobConf();
71+
hdfs = new MiniDFSTestCluster();
72+
hdfs.init(c);
73+
hdfs.start();
74+
conf = c;
75+
yarn = new MiniMRYarnCluster(
76+
"TestStagingMRJobr", 2);
77+
yarn.init(c);
78+
yarn.start();
79+
}
80+
81+
@AfterClass
82+
public static void teardownClusters() throws IOException {
83+
conf = null;
84+
ServiceOperations.stopQuietly(yarn);
85+
ServiceOperations.stopQuietly(hdfs);
86+
hdfs = null;
87+
yarn = null;
88+
}
89+
90+
public static JobConf getConf() {
91+
return conf;
92+
}
93+
94+
public static MiniDFSCluster getHdfs() {
95+
return hdfs.getCluster();
96+
}
97+
98+
public static FileSystem getLocalFS() {
99+
return hdfs.getLocalFS();
100+
}
101+
102+
/** Test Mapper. */
103+
public static class MapClass extends Mapper<LongWritable, Text, LongWritable, Text> {
104+
@Override
105+
protected void map(LongWritable key, Text value, Context context)
106+
throws IOException, InterruptedException {
107+
context.write(key, value);
108+
}
109+
}
110+
111+
@Rule
112+
public final TemporaryFolder temp = new TemporaryFolder();
113+
114+
@Test
115+
public void testMRJob() throws Exception {
116+
// FileSystem mockS3 = mock(FileSystem.class);
117+
FileSystem s3 = getFileSystem();
118+
// final dest is in S3A
119+
Path outputPath = path("testMRJob");
120+
StorageStatisticsTracker tracker = new StorageStatisticsTracker(s3);
121+
122+
String commitUUID = UUID.randomUUID().toString();
123+
String suffix = uniqueFilenames ? ("-" + commitUUID) : "";
124+
int numFiles = 3;
125+
Set<String> expectedFiles = Sets.newHashSet();
126+
for (int i = 0; i < numFiles; i += 1) {
127+
File file = temp.newFile(String.valueOf(i) + ".text");
128+
try (FileOutputStream out = new FileOutputStream(file)) {
129+
out.write(("file " + i).getBytes(StandardCharsets.UTF_8));
130+
}
131+
String filename = "part-m-0000" + i +
132+
suffix;
133+
expectedFiles.add(new Path(
134+
outputPath, filename).toString());
135+
}
136+
137+
Job mrJob = Job.getInstance(yarn.getConfig(), "test-committer-job");
138+
Configuration conf = mrJob.getConfiguration();
139+
conf.setBoolean(COMMITTER_UNIQUE_FILENAMES, uniqueFilenames);
140+
141+
142+
mrJob.setOutputFormatClass(LoggingTextOutputFormat.class);
143+
FileOutputFormat.setOutputPath(mrJob, outputPath);
144+
145+
File mockResultsFile = temp.newFile("committer.bin");
146+
mockResultsFile.delete();
147+
String committerPath = "file:" + mockResultsFile;
148+
conf.set("mock-results-file", committerPath);
149+
conf.set(StagingCommitterConstants.UPLOAD_UUID, commitUUID);
150+
151+
mrJob.setInputFormatClass(TextInputFormat.class);
152+
FileInputFormat.addInputPath(mrJob,
153+
new Path(temp.getRoot().toURI()));
154+
155+
mrJob.setMapperClass(MapClass.class);
156+
mrJob.setNumReduceTasks(0);
157+
158+
describe("Submitting Job");
159+
mrJob.submit();
160+
boolean succeeded = mrJob.waitForCompletion(true);
161+
assertTrue("MR job failed", succeeded);
162+
163+
assertPathExists("Output directory", outputPath);
164+
Set<String> actualFiles = Sets.newHashSet();
165+
FileStatus[] results = s3.listStatus(outputPath, TEMP_FILE_FILTER);
166+
LOG.info("Found {} files", results.length);
167+
for (FileStatus result : results) {
168+
LOG.debug("result: {}", result);
169+
actualFiles.add(result.getPath().toString());
170+
}
171+
172+
assertEquals("Should commit the correct file paths",
173+
expectedFiles, actualFiles);
174+
175+
}
176+
177+
}

0 commit comments

Comments
 (0)