Skip to content

Commit 9c457ee

Browse files
Mehakmeet Singhsteveloughran
authored andcommitted
CDPD-8485. HADOOP-16910 : ABFS Streams to update FileSystem.Statistics counters on IO. (apache#1918).
Contributed by Mehakmeet Singh. Change-Id: If7d82e40427a3ccc1418b935cd4574dee262b37b
1 parent 6f4e462 commit 9c457ee

File tree

7 files changed

+275
-21
lines changed

7 files changed

+275
-21
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi
188188
Path qualifiedPath = makeQualified(f);
189189

190190
try {
191-
OutputStream outputStream = abfsStore.createFile(qualifiedPath, overwrite,
191+
OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
192192
permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
193193
return new FSDataOutputStream(outputStream, statistics);
194194
} catch(AzureBlobFileSystemException ex) {
@@ -250,7 +250,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr
250250
Path qualifiedPath = makeQualified(f);
251251

252252
try {
253-
OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, false);
253+
OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, statistics, false);
254254
return new FSDataOutputStream(outputStream, statistics);
255255
} catch(AzureBlobFileSystemException ex) {
256256
checkException(f, ex);

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import com.google.common.annotations.VisibleForTesting;
5353
import com.google.common.base.Preconditions;
5454
import com.google.common.base.Strings;
55+
import org.slf4j.Logger;
56+
import org.slf4j.LoggerFactory;
5557

5658
import org.apache.hadoop.classification.InterfaceAudience;
5759
import org.apache.hadoop.classification.InterfaceStability;
@@ -99,8 +101,6 @@
99101
import org.apache.hadoop.io.IOUtils;
100102
import org.apache.hadoop.security.UserGroupInformation;
101103
import org.apache.http.client.utils.URIBuilder;
102-
import org.slf4j.Logger;
103-
import org.slf4j.LoggerFactory;
104104

105105
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
106106
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH;
@@ -403,8 +403,10 @@ public void deleteFilesystem() throws AzureBlobFileSystemException {
403403
}
404404
}
405405

406-
public OutputStream createFile(final Path path, final boolean overwrite, final FsPermission permission,
407-
final FsPermission umask) throws AzureBlobFileSystemException {
406+
public OutputStream createFile(final Path path,
407+
final FileSystem.Statistics statistics,
408+
final boolean overwrite, final FsPermission permission,
409+
final FsPermission umask) throws AzureBlobFileSystemException {
408410
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
409411
boolean isNamespaceEnabled = getIsNamespaceEnabled();
410412
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
@@ -421,12 +423,13 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F
421423
perfInfo.registerResult(op.getResult()).registerSuccess(true);
422424

423425
return new AbfsOutputStream(
424-
client,
425-
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
426-
0,
427-
abfsConfiguration.getWriteBufferSize(),
428-
abfsConfiguration.isFlushEnabled(),
429-
abfsConfiguration.isOutputStreamFlushDisabled());
426+
client,
427+
statistics,
428+
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
429+
0,
430+
abfsConfiguration.getWriteBufferSize(),
431+
abfsConfiguration.isFlushEnabled(),
432+
abfsConfiguration.isOutputStreamFlushDisabled());
430433
}
431434
}
432435

@@ -482,7 +485,7 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist
482485
}
483486
}
484487

485-
public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws
488+
public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws
486489
AzureBlobFileSystemException {
487490
try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
488491
LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
@@ -509,12 +512,13 @@ public OutputStream openFileForWrite(final Path path, final boolean overwrite) t
509512
perfInfo.registerSuccess(true);
510513

511514
return new AbfsOutputStream(
512-
client,
513-
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
514-
offset,
515-
abfsConfiguration.getWriteBufferSize(),
516-
abfsConfiguration.isFlushEnabled(),
517-
abfsConfiguration.isOutputStreamFlushDisabled());
515+
client,
516+
statistics,
517+
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
518+
offset,
519+
abfsConfiguration.getWriteBufferSize(),
520+
abfsConfiguration.isFlushEnabled(),
521+
abfsConfiguration.isOutputStreamFlushDisabled());
518522
}
519523
}
520524

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
131131
int lastReadBytes;
132132
int totalReadBytes = 0;
133133
streamStatistics.readOperationStarted(off, len);
134+
incrementReadOps();
134135
do {
135136
if (nextReadPos >= fCursor - limit && nextReadPos <= fCursor) {
136137
// data can be read from buffer.
@@ -261,6 +262,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
261262
if (receivedBytes > 0) {
262263
LOG.debug("Received data from read ahead, not doing remote read");
263264
streamStatistics.bytesReadFromBuffer(receivedBytes);
265+
incrementReadOps();
264266
return receivedBytes;
265267
}
266268

@@ -301,6 +303,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
301303
position, b.length, offset, length);
302304
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
303305
perfInfo.registerResult(op.getResult()).registerSuccess(true);
306+
incrementReadOps();
304307
} catch (AzureBlobFileSystemException ex) {
305308
if (ex instanceof AbfsRestOperationException) {
306309
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
@@ -318,6 +321,15 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
318321
return (int) bytesRead;
319322
}
320323

324+
/**
325+
* Increment Read Operations.
326+
*/
327+
private void incrementReadOps() {
328+
if (statistics != null) {
329+
statistics.incrementReadOps(1);
330+
}
331+
}
332+
321333
/**
322334
* Seek to given position in stream.
323335
* @param n position to seek to

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
4040
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
4141
import org.apache.hadoop.io.ElasticByteBufferPool;
42+
import org.apache.hadoop.fs.FileSystem.Statistics;
4243
import org.apache.hadoop.fs.FSExceptionMessages;
4344
import org.apache.hadoop.fs.StreamCapabilities;
4445
import org.apache.hadoop.fs.Syncable;
@@ -78,14 +79,18 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
7879
private final ElasticByteBufferPool byteBufferPool
7980
= new ElasticByteBufferPool();
8081

82+
private final Statistics statistics;
83+
8184
public AbfsOutputStream(
8285
final AbfsClient client,
86+
final Statistics statistics,
8387
final String path,
8488
final long position,
8589
final int bufferSize,
8690
final boolean supportFlush,
8791
final boolean disableOutputStreamFlush) {
8892
this.client = client;
93+
this.statistics = statistics;
8994
this.path = path;
9095
this.position = position;
9196
this.closed = false;
@@ -181,6 +186,16 @@ public synchronized void write(final byte[] data, final int off, final int lengt
181186

182187
writableBytes = bufferSize - bufferIndex;
183188
}
189+
incrementWriteOps();
190+
}
191+
192+
/**
193+
* Increment Write Operations.
194+
*/
195+
private void incrementWriteOps() {
196+
if (statistics != null) {
197+
statistics.incrementWriteOps(1);
198+
}
184199
}
185200

186201
/**

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@
1717
*/
1818
package org.apache.hadoop.fs.azurebfs;
1919

20+
import java.io.IOException;
21+
2022
import org.junit.Assert;
2123
import org.junit.Before;
2224
import org.junit.BeforeClass;
2325
import org.junit.Rule;
2426
import org.junit.rules.TestName;
2527
import org.junit.rules.Timeout;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import org.apache.hadoop.fs.FSDataInputStream;
32+
import org.apache.hadoop.fs.Path;
2633

2734
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_TIMEOUT;
2835

@@ -31,6 +38,9 @@
3138
* This class does not attempt to bind to Azure.
3239
*/
3340
public class AbstractAbfsTestWithTimeout extends Assert {
41+
private static final Logger LOG =
42+
LoggerFactory.getLogger(AbstractAbfsTestWithTimeout.class);
43+
3444
/**
3545
* The name of the current method.
3646
*/
@@ -67,4 +77,53 @@ public void nameThread() {
6777
protected int getTestTimeoutMillis() {
6878
return TEST_TIMEOUT;
6979
}
80+
81+
/**
82+
* Describe a test in the logs.
83+
*
84+
* @param text text to print
85+
* @param args arguments to format in the printing
86+
*/
87+
protected void describe(String text, Object... args) {
88+
LOG.info("\n\n{}: {}\n",
89+
methodName.getMethodName(),
90+
String.format(text, args));
91+
}
92+
93+
/**
94+
* Validate Contents written on a file in Abfs.
95+
*
96+
* @param fs AzureBlobFileSystem
97+
* @param path Path of the file
98+
* @param originalByteArray original byte array
99+
* @return if content is validated true else, false
100+
* @throws IOException
101+
*/
102+
protected boolean validateContent(AzureBlobFileSystem fs, Path path,
103+
byte[] originalByteArray)
104+
throws IOException {
105+
int pos = 0;
106+
int lenOfOriginalByteArray = originalByteArray.length;
107+
108+
try (FSDataInputStream in = fs.open(path)) {
109+
byte valueOfContentAtPos = (byte) in.read();
110+
111+
while (valueOfContentAtPos != -1 && pos < lenOfOriginalByteArray) {
112+
if (originalByteArray[pos] != valueOfContentAtPos) {
113+
assertEquals("Mismatch in content validation at position {}", pos,
114+
originalByteArray[pos], valueOfContentAtPos);
115+
return false;
116+
}
117+
valueOfContentAtPos = (byte) in.read();
118+
pos++;
119+
}
120+
if (valueOfContentAtPos != -1) {
121+
assertEquals("Expected end of file", -1, valueOfContentAtPos);
122+
return false;
123+
}
124+
return true;
125+
}
126+
127+
}
128+
70129
}

0 commit comments

Comments
 (0)