Skip to content

Commit 22ad7d6

Browse files
committed
HADOOP-17271. S3A statistics to support IOStatistics
Contains HADOOP-16830. Add IOStatistics API Change-Id: Ic5c4b014f90f1ed61c37d3ce9b17290736475047
1 parent 7fae413 commit 22ad7d6

File tree

156 files changed

+12424
-1225
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

156 files changed

+12424
-1225
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,13 @@
4646
import org.apache.hadoop.fs.Seekable;
4747
import org.apache.hadoop.fs.StreamCapabilities;
4848
import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
49+
import org.apache.hadoop.fs.statistics.IOStatistics;
50+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
4951
import org.apache.hadoop.io.ByteBufferPool;
5052
import org.apache.hadoop.util.StringUtils;
5153

54+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
55+
5256
/**
5357
* CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
5458
* required in order to ensure that the plain text and cipher text have a 1:1
@@ -66,7 +70,7 @@ public class CryptoInputStream extends FilterInputStream implements
6670
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
6771
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
6872
ReadableByteChannel, CanUnbuffer, StreamCapabilities,
69-
ByteBufferPositionedReadable {
73+
ByteBufferPositionedReadable, IOStatisticsSource {
7074
private final byte[] oneByteBuf = new byte[1];
7175
private final CryptoCodec codec;
7276
private final Decryptor decryptor;
@@ -867,8 +871,16 @@ public boolean hasCapability(String capability) {
867871
+ " does not expose its stream capabilities.");
868872
}
869873
return ((StreamCapabilities) in).hasCapability(capability);
874+
case StreamCapabilities.IOSTATISTICS:
875+
return (in instanceof StreamCapabilities)
876+
&& ((StreamCapabilities) in).hasCapability(capability);
870877
default:
871878
return false;
872879
}
873880
}
881+
882+
@Override
883+
public IOStatistics getIOStatistics() {
884+
return retrieveIOStatistics(in);
885+
}
874886
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,13 @@
2828
import org.apache.hadoop.fs.CanSetDropBehind;
2929
import org.apache.hadoop.fs.StreamCapabilities;
3030
import org.apache.hadoop.fs.Syncable;
31+
import org.apache.hadoop.fs.statistics.IOStatistics;
32+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
3133

3234
import com.google.common.base.Preconditions;
3335

36+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
37+
3438
/**
3539
* CryptoOutputStream encrypts data. It is not thread-safe. AES CTR mode is
3640
* required in order to ensure that the plain text and cipher text have a 1:1
@@ -48,7 +52,7 @@
4852
@InterfaceAudience.Private
4953
@InterfaceStability.Evolving
5054
public class CryptoOutputStream extends FilterOutputStream implements
51-
Syncable, CanSetDropBehind, StreamCapabilities {
55+
Syncable, CanSetDropBehind, StreamCapabilities, IOStatisticsSource {
5256
private final byte[] oneByteBuf = new byte[1];
5357
private final CryptoCodec codec;
5458
private final Encryptor encryptor;
@@ -313,4 +317,9 @@ public boolean hasCapability(String capability) {
313317
}
314318
return false;
315319
}
320+
321+
@Override
322+
public IOStatistics getIOStatistics() {
323+
return retrieveIOStatistics(out);
324+
}
316325
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424

2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
27+
import org.apache.hadoop.fs.statistics.IOStatistics;
28+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
29+
30+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
2731

2832

2933
/**
@@ -33,7 +37,8 @@
3337
@InterfaceAudience.Private
3438
@InterfaceStability.Unstable
3539
public class BufferedFSInputStream extends BufferedInputStream
36-
implements Seekable, PositionedReadable, HasFileDescriptor {
40+
implements Seekable, PositionedReadable, HasFileDescriptor,
41+
IOStatisticsSource, StreamCapabilities {
3742
/**
3843
* Creates a <code>BufferedFSInputStream</code>
3944
* with the specified buffer size,
@@ -126,4 +131,26 @@ public FileDescriptor getFileDescriptor() throws IOException {
126131
return null;
127132
}
128133
}
134+
135+
/**
136+
* If the inner stream supports {@link StreamCapabilities},
137+
* forward the probe to it.
138+
* Otherwise: return false.
139+
*
140+
* @param capability string to query the stream support for.
141+
* @return true if a capability is known to be supported.
142+
*/
143+
@Override
144+
public boolean hasCapability(final String capability) {
145+
if (in instanceof StreamCapabilities) {
146+
return ((StreamCapabilities) in).hasCapability(capability);
147+
} else {
148+
return false;
149+
}
150+
}
151+
152+
@Override
153+
public IOStatistics getIOStatistics() {
154+
return retrieveIOStatistics(in);
155+
}
129156
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
import org.apache.hadoop.fs.impl.OpenFileParameters;
3939
import org.apache.hadoop.fs.permission.AclEntry;
4040
import org.apache.hadoop.fs.permission.FsPermission;
41+
import org.apache.hadoop.fs.statistics.IOStatistics;
42+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
43+
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
4144
import org.apache.hadoop.util.DataChecksum;
4245
import org.apache.hadoop.util.LambdaUtils;
4346
import org.apache.hadoop.util.Progressable;
@@ -134,7 +137,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {
134137
* For open()'s FSInputStream
135138
* It verifies that data matches checksums.
136139
*******************************************************/
137-
private static class ChecksumFSInputChecker extends FSInputChecker {
140+
private static class ChecksumFSInputChecker extends FSInputChecker implements
141+
IOStatisticsSource {
138142
private ChecksumFileSystem fs;
139143
private FSDataInputStream datas;
140144
private FSDataInputStream sums;
@@ -270,6 +274,17 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
270274
}
271275
return nread;
272276
}
277+
278+
/**
279+
* Get the IO Statistics of the nested stream, falling back to
280+
* null if the stream does not implement the interface
281+
* {@link IOStatisticsSource}.
282+
* @return an IOStatistics instance or null
283+
*/
284+
@Override
285+
public IOStatistics getIOStatistics() {
286+
return IOStatisticsSupport.retrieveIOStatistics(datas);
287+
}
273288
}
274289

275290
private static class FSDataBoundedInputStream extends FSDataInputStream {
@@ -395,7 +410,8 @@ public static long getChecksumLength(long size, int bytesPerSum) {
395410

396411
/** This class provides an output stream for a checksummed file.
397412
* It generates checksums for data. */
398-
private static class ChecksumFSOutputSummer extends FSOutputSummer {
413+
private static class ChecksumFSOutputSummer extends FSOutputSummer
414+
implements IOStatisticsSource, StreamCapabilities {
399415
private FSDataOutputStream datas;
400416
private FSDataOutputStream sums;
401417
private static final float CHKSUM_AS_FRACTION = 0.01f;
@@ -449,6 +465,28 @@ protected void checkClosed() throws IOException {
449465
throw new ClosedChannelException();
450466
}
451467
}
468+
469+
/**
470+
* Get the IO Statistics of the nested stream, falling back to
471+
* null if the stream does not implement the interface
472+
* {@link IOStatisticsSource}.
473+
* @return an IOStatistics instance or null
474+
*/
475+
@Override
476+
public IOStatistics getIOStatistics() {
477+
return IOStatisticsSupport.retrieveIOStatistics(datas);
478+
}
479+
480+
/**
481+
* Probe the inner stream for a capability.
482+
*
483+
* @param capability string to query the stream support for.
484+
* @return true if a capability is known to be supported.
485+
*/
486+
@Override
487+
public boolean hasCapability(final String capability) {
488+
return datas.hasCapability(capability);
489+
}
452490
}
453491

454492
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929

3030
import org.apache.hadoop.classification.InterfaceAudience;
3131
import org.apache.hadoop.classification.InterfaceStability;
32+
import org.apache.hadoop.fs.statistics.IOStatistics;
33+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
34+
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
3235
import org.apache.hadoop.io.ByteBufferPool;
3336
import org.apache.hadoop.util.IdentityHashStore;
3437

@@ -40,7 +43,7 @@ public class FSDataInputStream extends DataInputStream
4043
implements Seekable, PositionedReadable,
4144
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
4245
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
43-
ByteBufferPositionedReadable {
46+
ByteBufferPositionedReadable, IOStatisticsSource {
4447
/**
4548
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
4649
* objects
@@ -267,4 +270,15 @@ public void readFully(long position, ByteBuffer buf) throws IOException {
267270
"unsupported by " + in.getClass().getCanonicalName());
268271
}
269272
}
273+
274+
/**
275+
* Get the IO Statistics of the nested stream, falling back to
276+
* null if the stream does not implement the interface
277+
* {@link IOStatisticsSource}.
278+
* @return an IOStatistics instance or null
279+
*/
280+
@Override
281+
public IOStatistics getIOStatistics() {
282+
return IOStatisticsSupport.retrieveIOStatistics(in);
283+
}
270284
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@
2424

2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
27+
import org.apache.hadoop.fs.statistics.IOStatistics;
28+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
29+
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
2730

2831
/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
2932
*/
3033
@InterfaceAudience.Public
3134
@InterfaceStability.Stable
3235
public class FSDataOutputStream extends DataOutputStream
33-
implements Syncable, CanSetDropBehind, StreamCapabilities {
36+
implements Syncable, CanSetDropBehind, StreamCapabilities,
37+
IOStatisticsSource {
3438
private final OutputStream wrappedStream;
3539

3640
private static class PositionCache extends FilterOutputStream {
@@ -155,4 +159,15 @@ public void setDropBehind(Boolean dropBehind) throws IOException {
155159
"not support setting the drop-behind caching setting.");
156160
}
157161
}
162+
163+
/**
164+
* Get the IO Statistics of the nested stream, falling back to
165+
* empty statistics if the stream does not implement the interface
166+
* {@link IOStatisticsSource}.
167+
* @return an IOStatistics instance.
168+
*/
169+
@Override
170+
public IOStatistics getIOStatistics() {
171+
return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
172+
}
158173
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import com.google.common.base.Preconditions;
2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
27+
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
28+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
29+
2730
import org.slf4j.Logger;
2831
import org.slf4j.LoggerFactory;
2932

@@ -134,4 +137,23 @@ public void readFully(long position, byte[] buffer)
134137
throws IOException {
135138
readFully(position, buffer, 0, buffer.length);
136139
}
140+
141+
/**
142+
* toString method returns the superclass toString, but if the subclass
143+
* implements {@link IOStatisticsSource} then those statistics are
144+
* extracted and included in the output.
145+
* That is: statistics of subclasses are automatically reported.
146+
* @return a string value.
147+
*/
148+
@Override
149+
public String toString() {
150+
final StringBuilder sb = new StringBuilder(super.toString());
151+
sb.append('{');
152+
if (this instanceof IOStatisticsSource) {
153+
sb.append(IOStatisticsLogging.ioStatisticsSourceToString(
154+
(IOStatisticsSource) this));
155+
}
156+
sb.append('}');
157+
return sb.toString();
158+
}
137159
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,20 @@
2626

2727
import org.apache.hadoop.classification.InterfaceAudience;
2828
import org.apache.hadoop.classification.InterfaceStability;
29+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
2930

3031
/**
3132
* MultipartUploader is an interface for copying files multipart and across
3233
* multiple nodes.
34+
* <p></p>
35+
* The interface extends {@link IOStatisticsSource} so that there is no
36+
* need to cast an instance to see if is a source of statistics.
37+
* However, implementations MAY return null for their actual statistics.
3338
*/
3439
@InterfaceAudience.Public
3540
@InterfaceStability.Unstable
36-
public interface MultipartUploader extends Closeable {
41+
public interface MultipartUploader extends Closeable,
42+
IOStatisticsSource {
3743

3844

3945
/**

0 commit comments

Comments
 (0)