Skip to content

Commit fd6f1bd

Browse files
steveloughranMehakmeet Singh
authored andcommitted
CDPD-11948. MAPREDUCE-7315. LocatedFileStatusFetcher to collect/publish IOStatistics. (apache#2579)
Part of the HADOOP-16830 IOStatistics API feature. If the source FileSystem's listing RemoteIterators implement IOStatisticsSource, these are collected and served through the IOStatisticsSource API. If they are not: getIOStatistics() returns null. Only the listing statistics are collected; FileSystem.globStatus() doesn't provide any, so IO use there is not included in the aggregate results. Contributed by Steve Loughran. Change-Id: Iff1485297c2c7e181b54eaf1d2c4f80faeee7cfa
1 parent 3d77158 commit fd6f1bd

File tree

1 file changed

+58
-2
lines changed
  • hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred

1 file changed

+58
-2
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import java.util.LinkedList;
2323
import java.util.List;
24+
import java.util.StringJoiner;
2425
import java.util.concurrent.BlockingQueue;
2526
import java.util.concurrent.Callable;
2627
import java.util.concurrent.ExecutorService;
@@ -37,6 +38,9 @@
3738
import org.apache.hadoop.fs.Path;
3839
import org.apache.hadoop.fs.PathFilter;
3940
import org.apache.hadoop.fs.RemoteIterator;
41+
import org.apache.hadoop.fs.statistics.IOStatistics;
42+
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
43+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
4044
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
4145

4246
import com.google.common.collect.Iterables;
@@ -51,6 +55,9 @@
5155

5256
import org.apache.hadoop.util.concurrent.HadoopExecutors;
5357

58+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
59+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
60+
5461
/**
5562
* Utility class to fetch block locations for specified Input paths using a
5663
* configured number of threads.
@@ -59,7 +66,7 @@
5966
* configuration.
6067
*/
6168
@Private
62-
public class LocatedFileStatusFetcher {
69+
public class LocatedFileStatusFetcher implements IOStatisticsSource {
6370

6471
public static final Logger LOG =
6572
LoggerFactory.getLogger(LocatedFileStatusFetcher.class.getName());
@@ -86,6 +93,12 @@ public class LocatedFileStatusFetcher {
8693

8794
private volatile Throwable unknownError;
8895

96+
/**
97+
* Demand created IO Statistics: only if the filesystem
98+
* returns statistics does this fetch collect them.
99+
*/
100+
private IOStatisticsSnapshot iostats;
101+
89102
/**
90103
* Instantiate.
91104
* The newApi switch is only used to configure what exception is raised
@@ -224,7 +237,46 @@ private void decrementRunningAndCheckCompletion() {
224237
lock.unlock();
225238
}
226239
}
227-
240+
241+
/**
242+
* Return any IOStatistics collected during listing.
243+
* @return IO stats accrued.
244+
*/
245+
@Override
246+
public synchronized IOStatistics getIOStatistics() {
247+
return iostats;
248+
}
249+
250+
/**
251+
* Add the statistics of an individual thread's scan.
252+
* @param stats possibly null statistics.
253+
*/
254+
private void addResultStatistics(IOStatistics stats) {
255+
if (stats != null) {
256+
// demand creation of IO statistics.
257+
synchronized (this) {
258+
LOG.debug("Adding IOStatistics: {}", stats);
259+
if (iostats == null) {
260+
// demand create the statistics
261+
iostats = snapshotIOStatistics(stats);
262+
} else {
263+
iostats.aggregate(stats);
264+
}
265+
}
266+
}
267+
}
268+
269+
@Override
270+
public String toString() {
271+
final IOStatistics ioStatistics = getIOStatistics();
272+
StringJoiner stringJoiner = new StringJoiner(", ",
273+
LocatedFileStatusFetcher.class.getSimpleName() + "[", "]");
274+
if (ioStatistics != null) {
275+
stringJoiner.add("IOStatistics=" + ioStatistics);
276+
}
277+
return stringJoiner.toString();
278+
}
279+
228280
/**
229281
* Retrieves block locations for the given @link {@link FileStatus}, and adds
230282
* additional paths to the process queue if required.
@@ -263,6 +315,8 @@ public Result call() throws Exception {
263315
}
264316
}
265317
}
318+
// aggregate any stats
319+
result.stats = retrieveIOStatistics(iter);
266320
} else {
267321
result.locatedFileStatuses.add(fileStatus);
268322
}
@@ -273,6 +327,7 @@ private static class Result {
273327
private List<FileStatus> locatedFileStatuses = new LinkedList<>();
274328
private List<FileStatus> dirsNeedingRecursiveCalls = new LinkedList<>();
275329
private FileSystem fs;
330+
private IOStatistics stats;
276331
}
277332
}
278333

@@ -287,6 +342,7 @@ private class ProcessInputDirCallback implements
287342
@Override
288343
public void onSuccess(ProcessInputDirCallable.Result result) {
289344
try {
345+
addResultStatistics(result.stats);
290346
if (!result.locatedFileStatuses.isEmpty()) {
291347
resultQueue.add(result.locatedFileStatuses);
292348
}

0 commit comments

Comments
 (0)