Skip to content

Commit 57abfae

Browse files
committed
HADOOP-17450. Add Public IOStatistics API. (#2577)
This is the API and implementation classes of HADOOP-16830, which allows callers to query IO object instances (filesystems, streams, remote iterators, ...) and other classes for statistics on their I/O Usage: operation count and min/max/mean durations. New Packages org.apache.hadoop.fs.statistics. Public API, including: IOStatisticsSource IOStatistics IOStatisticsSnapshot (seralizable to java objects and json) +helper classes for logging and integration BufferedIOStatisticsInputStream implements IOStatisticsSource and StreamCapabilities BufferedIOStatisticsOutputStream implements IOStatisticsSource, Syncable and StreamCapabilities org.apache.hadoop.fs.statistics.impl Implementation classes for internal use. org.apache.hadoop.util.functional functional programming support for RemoteIterators and other operations which raise IOEs; all wrapper classes implement and propagate IOStatisticsSource Contributed by Steve Loughran. Change-Id: If56e8db2981613ff689c39239135e44feb25f78e
1 parent 8945268 commit 57abfae

File tree

72 files changed

+9890
-109
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+9890
-109
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,13 @@
4646
import org.apache.hadoop.fs.Seekable;
4747
import org.apache.hadoop.fs.StreamCapabilities;
4848
import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
49+
import org.apache.hadoop.fs.statistics.IOStatistics;
50+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
4951
import org.apache.hadoop.io.ByteBufferPool;
5052
import org.apache.hadoop.util.StringUtils;
5153

54+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
55+
5256
/**
5357
* CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
5458
* required in order to ensure that the plain text and cipher text have a 1:1
@@ -66,7 +70,7 @@ public class CryptoInputStream extends FilterInputStream implements
6670
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
6771
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
6872
ReadableByteChannel, CanUnbuffer, StreamCapabilities,
69-
ByteBufferPositionedReadable {
73+
ByteBufferPositionedReadable, IOStatisticsSource {
7074
private final byte[] oneByteBuf = new byte[1];
7175
private final CryptoCodec codec;
7276
private final Decryptor decryptor;
@@ -867,8 +871,16 @@ public boolean hasCapability(String capability) {
867871
+ " does not expose its stream capabilities.");
868872
}
869873
return ((StreamCapabilities) in).hasCapability(capability);
874+
case StreamCapabilities.IOSTATISTICS:
875+
return (in instanceof StreamCapabilities)
876+
&& ((StreamCapabilities) in).hasCapability(capability);
870877
default:
871878
return false;
872879
}
873880
}
881+
882+
@Override
883+
public IOStatistics getIOStatistics() {
884+
return retrieveIOStatistics(in);
885+
}
874886
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,13 @@
2828
import org.apache.hadoop.fs.CanSetDropBehind;
2929
import org.apache.hadoop.fs.StreamCapabilities;
3030
import org.apache.hadoop.fs.Syncable;
31+
import org.apache.hadoop.fs.statistics.IOStatistics;
32+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
3133

3234
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
3335

36+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
37+
3438
/**
3539
* CryptoOutputStream encrypts data. It is not thread-safe. AES CTR mode is
3640
* required in order to ensure that the plain text and cipher text have a 1:1
@@ -48,7 +52,7 @@
4852
@InterfaceAudience.Private
4953
@InterfaceStability.Evolving
5054
public class CryptoOutputStream extends FilterOutputStream implements
51-
Syncable, CanSetDropBehind, StreamCapabilities {
55+
Syncable, CanSetDropBehind, StreamCapabilities, IOStatisticsSource {
5256
private final byte[] oneByteBuf = new byte[1];
5357
private final CryptoCodec codec;
5458
private final Encryptor encryptor;
@@ -313,4 +317,9 @@ public boolean hasCapability(String capability) {
313317
}
314318
return false;
315319
}
320+
321+
@Override
322+
public IOStatistics getIOStatistics() {
323+
return retrieveIOStatistics(out);
324+
}
316325
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424

2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
27+
import org.apache.hadoop.fs.statistics.IOStatistics;
28+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
29+
30+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
2731

2832

2933
/**
@@ -33,7 +37,8 @@
3337
@InterfaceAudience.Private
3438
@InterfaceStability.Unstable
3539
public class BufferedFSInputStream extends BufferedInputStream
36-
implements Seekable, PositionedReadable, HasFileDescriptor {
40+
implements Seekable, PositionedReadable, HasFileDescriptor,
41+
IOStatisticsSource, StreamCapabilities {
3742
/**
3843
* Creates a <code>BufferedFSInputStream</code>
3944
* with the specified buffer size,
@@ -126,4 +131,26 @@ public FileDescriptor getFileDescriptor() throws IOException {
126131
return null;
127132
}
128133
}
134+
135+
/**
136+
* If the inner stream supports {@link StreamCapabilities},
137+
* forward the probe to it.
138+
* Otherwise: return false.
139+
*
140+
* @param capability string to query the stream support for.
141+
* @return true if a capability is known to be supported.
142+
*/
143+
@Override
144+
public boolean hasCapability(final String capability) {
145+
if (in instanceof StreamCapabilities) {
146+
return ((StreamCapabilities) in).hasCapability(capability);
147+
} else {
148+
return false;
149+
}
150+
}
151+
152+
@Override
153+
public IOStatistics getIOStatistics() {
154+
return retrieveIOStatistics(in);
155+
}
129156
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
import org.apache.hadoop.fs.impl.OpenFileParameters;
3939
import org.apache.hadoop.fs.permission.AclEntry;
4040
import org.apache.hadoop.fs.permission.FsPermission;
41+
import org.apache.hadoop.fs.statistics.IOStatistics;
42+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
43+
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
4144
import org.apache.hadoop.util.DataChecksum;
4245
import org.apache.hadoop.util.LambdaUtils;
4346
import org.apache.hadoop.util.Progressable;
@@ -134,7 +137,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {
134137
* For open()'s FSInputStream
135138
* It verifies that data matches checksums.
136139
*******************************************************/
137-
private static class ChecksumFSInputChecker extends FSInputChecker {
140+
private static class ChecksumFSInputChecker extends FSInputChecker implements
141+
IOStatisticsSource {
138142
private ChecksumFileSystem fs;
139143
private FSDataInputStream datas;
140144
private FSDataInputStream sums;
@@ -270,6 +274,17 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
270274
}
271275
return nread;
272276
}
277+
278+
/**
279+
* Get the IO Statistics of the nested stream, falling back to
280+
* null if the stream does not implement the interface
281+
* {@link IOStatisticsSource}.
282+
* @return an IOStatistics instance or null
283+
*/
284+
@Override
285+
public IOStatistics getIOStatistics() {
286+
return IOStatisticsSupport.retrieveIOStatistics(datas);
287+
}
273288
}
274289

275290
private static class FSDataBoundedInputStream extends FSDataInputStream {
@@ -395,7 +410,8 @@ public static long getChecksumLength(long size, int bytesPerSum) {
395410

396411
/** This class provides an output stream for a checksummed file.
397412
* It generates checksums for data. */
398-
private static class ChecksumFSOutputSummer extends FSOutputSummer {
413+
private static class ChecksumFSOutputSummer extends FSOutputSummer
414+
implements IOStatisticsSource, StreamCapabilities {
399415
private FSDataOutputStream datas;
400416
private FSDataOutputStream sums;
401417
private static final float CHKSUM_AS_FRACTION = 0.01f;
@@ -449,6 +465,28 @@ protected void checkClosed() throws IOException {
449465
throw new ClosedChannelException();
450466
}
451467
}
468+
469+
/**
470+
* Get the IO Statistics of the nested stream, falling back to
471+
* null if the stream does not implement the interface
472+
* {@link IOStatisticsSource}.
473+
* @return an IOStatistics instance or null
474+
*/
475+
@Override
476+
public IOStatistics getIOStatistics() {
477+
return IOStatisticsSupport.retrieveIOStatistics(datas);
478+
}
479+
480+
/**
481+
* Probe the inner stream for a capability.
482+
*
483+
* @param capability string to query the stream support for.
484+
* @return true if a capability is known to be supported.
485+
*/
486+
@Override
487+
public boolean hasCapability(final String capability) {
488+
return datas.hasCapability(capability);
489+
}
452490
}
453491

454492
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929

3030
import org.apache.hadoop.classification.InterfaceAudience;
3131
import org.apache.hadoop.classification.InterfaceStability;
32+
import org.apache.hadoop.fs.statistics.IOStatistics;
33+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
34+
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
3235
import org.apache.hadoop.io.ByteBufferPool;
3336
import org.apache.hadoop.util.IdentityHashStore;
3437

@@ -40,7 +43,7 @@ public class FSDataInputStream extends DataInputStream
4043
implements Seekable, PositionedReadable,
4144
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
4245
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
43-
ByteBufferPositionedReadable {
46+
ByteBufferPositionedReadable, IOStatisticsSource {
4447
/**
4548
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
4649
* objects
@@ -267,4 +270,15 @@ public void readFully(long position, ByteBuffer buf) throws IOException {
267270
"unsupported by " + in.getClass().getCanonicalName());
268271
}
269272
}
273+
274+
/**
275+
* Get the IO Statistics of the nested stream, falling back to
276+
* null if the stream does not implement the interface
277+
* {@link IOStatisticsSource}.
278+
* @return an IOStatistics instance or null
279+
*/
280+
@Override
281+
public IOStatistics getIOStatistics() {
282+
return IOStatisticsSupport.retrieveIOStatistics(in);
283+
}
270284
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@
2424

2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
27+
import org.apache.hadoop.fs.statistics.IOStatistics;
28+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
29+
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
2730

2831
/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
2932
*/
3033
@InterfaceAudience.Public
3134
@InterfaceStability.Stable
3235
public class FSDataOutputStream extends DataOutputStream
33-
implements Syncable, CanSetDropBehind, StreamCapabilities {
36+
implements Syncable, CanSetDropBehind, StreamCapabilities,
37+
IOStatisticsSource {
3438
private final OutputStream wrappedStream;
3539

3640
private static class PositionCache extends FilterOutputStream {
@@ -155,4 +159,15 @@ public void setDropBehind(Boolean dropBehind) throws IOException {
155159
"not support setting the drop-behind caching setting.");
156160
}
157161
}
162+
163+
/**
164+
* Get the IO Statistics of the nested stream, falling back to
165+
* empty statistics if the stream does not implement the interface
166+
* {@link IOStatisticsSource}.
167+
* @return an IOStatistics instance.
168+
*/
169+
@Override
170+
public IOStatistics getIOStatistics() {
171+
return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
172+
}
158173
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
27+
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
28+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
29+
2730
import org.slf4j.Logger;
2831
import org.slf4j.LoggerFactory;
2932

@@ -134,4 +137,23 @@ public void readFully(long position, byte[] buffer)
134137
throws IOException {
135138
readFully(position, buffer, 0, buffer.length);
136139
}
140+
141+
/**
142+
* toString method returns the superclass toString, but if the subclass
143+
* implements {@link IOStatisticsSource} then those statistics are
144+
* extracted and included in the output.
145+
* That is: statistics of subclasses are automatically reported.
146+
* @return a string value.
147+
*/
148+
@Override
149+
public String toString() {
150+
final StringBuilder sb = new StringBuilder(super.toString());
151+
sb.append('{');
152+
if (this instanceof IOStatisticsSource) {
153+
sb.append(IOStatisticsLogging.ioStatisticsSourceToString(
154+
(IOStatisticsSource) this));
155+
}
156+
sb.append('}');
157+
return sb.toString();
158+
}
137159
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,20 @@
2626

2727
import org.apache.hadoop.classification.InterfaceAudience;
2828
import org.apache.hadoop.classification.InterfaceStability;
29+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
2930

3031
/**
3132
* MultipartUploader is an interface for copying files multipart and across
3233
* multiple nodes.
34+
* <p></p>
35+
* The interface extends {@link IOStatisticsSource} so that there is no
36+
* need to cast an instance to see if is a source of statistics.
37+
* However, implementations MAY return null for their actual statistics.
3338
*/
3439
@InterfaceAudience.Public
3540
@InterfaceStability.Unstable
36-
public interface MultipartUploader extends Closeable {
41+
public interface MultipartUploader extends Closeable,
42+
IOStatisticsSource {
3743

3844

3945
/**

0 commit comments

Comments
 (0)