Skip to content

Commit e2c7ac7

Browse files
authored
ABFS Streams to update FileSystem.Statistics counters on IO.
Contributed by Mehakmeet Singh
1 parent d312991 commit e2c7ac7

File tree

7 files changed

+261
-7
lines changed

7 files changed

+261
-7
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi
188188
Path qualifiedPath = makeQualified(f);
189189

190190
try {
191-
OutputStream outputStream = abfsStore.createFile(qualifiedPath, overwrite,
191+
OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
192192
permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
193193
return new FSDataOutputStream(outputStream, statistics);
194194
} catch(AzureBlobFileSystemException ex) {
@@ -250,7 +250,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr
250250
Path qualifiedPath = makeQualified(f);
251251

252252
try {
253-
OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, false);
253+
OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, statistics, false);
254254
return new FSDataOutputStream(outputStream, statistics);
255255
} catch(AzureBlobFileSystemException ex) {
256256
checkException(f, ex);

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -412,8 +412,10 @@ public void deleteFilesystem() throws AzureBlobFileSystemException {
412412
}
413413
}
414414

415-
public OutputStream createFile(final Path path, final boolean overwrite, final FsPermission permission,
416-
final FsPermission umask) throws AzureBlobFileSystemException {
415+
public OutputStream createFile(final Path path,
416+
final FileSystem.Statistics statistics,
417+
final boolean overwrite, final FsPermission permission,
418+
final FsPermission umask) throws AzureBlobFileSystemException {
417419
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
418420
boolean isNamespaceEnabled = getIsNamespaceEnabled();
419421
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
@@ -436,6 +438,7 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F
436438

437439
return new AbfsOutputStream(
438440
client,
441+
statistics,
439442
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
440443
0,
441444
abfsConfiguration.getWriteBufferSize(),
@@ -496,7 +499,7 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist
496499
}
497500
}
498501

499-
public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws
502+
public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws
500503
AzureBlobFileSystemException {
501504
try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
502505
LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
@@ -529,6 +532,7 @@ public OutputStream openFileForWrite(final Path path, final boolean overwrite) t
529532

530533
return new AbfsOutputStream(
531534
client,
535+
statistics,
532536
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
533537
offset,
534538
abfsConfiguration.getWriteBufferSize(),

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
101101
int currentLen = len;
102102
int lastReadBytes;
103103
int totalReadBytes = 0;
104+
incrementReadOps();
104105
do {
105106
lastReadBytes = readOneBlock(b, currentOff, currentLen);
106107
if (lastReadBytes > 0) {
@@ -201,6 +202,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
201202
// try reading from buffers first
202203
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
203204
if (receivedBytes > 0) {
205+
incrementReadOps();
204206
return receivedBytes;
205207
}
206208

@@ -236,6 +238,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
236238
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
237239
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
238240
perfInfo.registerResult(op.getResult()).registerSuccess(true);
241+
incrementReadOps();
239242
} catch (AzureBlobFileSystemException ex) {
240243
if (ex instanceof AbfsRestOperationException) {
241244
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
@@ -252,6 +255,15 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
252255
return (int) bytesRead;
253256
}
254257

258+
/**
259+
* Increment Read Operations.
260+
*/
261+
private void incrementReadOps() {
262+
if (statistics != null) {
263+
statistics.incrementReadOps(1);
264+
}
265+
}
266+
255267
/**
256268
* Seek to given position in stream.
257269
* @param n position to seek to

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
4040
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
4141
import org.apache.hadoop.io.ElasticByteBufferPool;
42+
import org.apache.hadoop.fs.FileSystem.Statistics;
4243
import org.apache.hadoop.fs.FSExceptionMessages;
4344
import org.apache.hadoop.fs.StreamCapabilities;
4445
import org.apache.hadoop.fs.Syncable;
@@ -80,8 +81,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
8081
private final ElasticByteBufferPool byteBufferPool
8182
= new ElasticByteBufferPool();
8283

84+
private final Statistics statistics;
85+
8386
public AbfsOutputStream(
8487
final AbfsClient client,
88+
final Statistics statistics,
8589
final String path,
8690
final long position,
8791
final int bufferSize,
@@ -90,6 +94,7 @@ public AbfsOutputStream(
9094
final boolean supportAppendWithFlush,
9195
final boolean appendBlob) {
9296
this.client = client;
97+
this.statistics = statistics;
9398
this.path = path;
9499
this.position = position;
95100
this.closed = false;
@@ -187,6 +192,16 @@ public synchronized void write(final byte[] data, final int off, final int lengt
187192

188193
writableBytes = bufferSize - bufferIndex;
189194
}
195+
incrementWriteOps();
196+
}
197+
198+
/**
199+
* Increment Write Operations.
200+
*/
201+
private void incrementWriteOps() {
202+
if (statistics != null) {
203+
statistics.incrementWriteOps(1);
204+
}
190205
}
191206

192207
/**

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@
1717
*/
1818
package org.apache.hadoop.fs.azurebfs;
1919

20+
import java.io.IOException;
21+
2022
import org.junit.Assert;
2123
import org.junit.Before;
2224
import org.junit.BeforeClass;
2325
import org.junit.Rule;
2426
import org.junit.rules.TestName;
2527
import org.junit.rules.Timeout;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import org.apache.hadoop.fs.FSDataInputStream;
32+
import org.apache.hadoop.fs.Path;
2633

2734
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_TIMEOUT;
2835

@@ -31,6 +38,9 @@
3138
* This class does not attempt to bind to Azure.
3239
*/
3340
public class AbstractAbfsTestWithTimeout extends Assert {
41+
private static final Logger LOG =
42+
LoggerFactory.getLogger(AbstractAbfsTestWithTimeout.class);
43+
3444
/**
3545
* The name of the current method.
3646
*/
@@ -67,4 +77,53 @@ public void nameThread() {
6777
protected int getTestTimeoutMillis() {
6878
return TEST_TIMEOUT;
6979
}
80+
81+
/**
82+
* Describe a test in the logs.
83+
*
84+
* @param text text to print
85+
* @param args arguments to format in the printing
86+
*/
87+
protected void describe(String text, Object... args) {
88+
LOG.info("\n\n{}: {}\n",
89+
methodName.getMethodName(),
90+
String.format(text, args));
91+
}
92+
93+
/**
94+
* Validate Contents written on a file in Abfs.
95+
*
96+
* @param fs AzureBlobFileSystem
97+
* @param path Path of the file
98+
* @param originalByteArray original byte array
99+
* @return if content is validated true else, false
100+
* @throws IOException
101+
*/
102+
protected boolean validateContent(AzureBlobFileSystem fs, Path path,
103+
byte[] originalByteArray)
104+
throws IOException {
105+
int pos = 0;
106+
int lenOfOriginalByteArray = originalByteArray.length;
107+
108+
try (FSDataInputStream in = fs.open(path)) {
109+
byte valueOfContentAtPos = (byte) in.read();
110+
111+
while (valueOfContentAtPos != -1 && pos < lenOfOriginalByteArray) {
112+
if (originalByteArray[pos] != valueOfContentAtPos) {
113+
assertEquals("Mismatch in content validation at position {}", pos,
114+
originalByteArray[pos], valueOfContentAtPos);
115+
return false;
116+
}
117+
valueOfContentAtPos = (byte) in.read();
118+
pos++;
119+
}
120+
if (valueOfContentAtPos != -1) {
121+
assertEquals("Expected end of file", -1, valueOfContentAtPos);
122+
return false;
123+
}
124+
return true;
125+
}
126+
127+
}
128+
70129
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs;
20+
21+
import org.junit.Test;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import org.apache.hadoop.fs.FSDataOutputStream;
26+
import org.apache.hadoop.fs.FSDataInputStream;
27+
import org.apache.hadoop.fs.FileSystem;
28+
import org.apache.hadoop.fs.Path;
29+
import org.apache.hadoop.io.IOUtils;
30+
31+
/**
32+
* Test Abfs Stream.
33+
*/
34+
35+
public class ITestAbfsStreamStatistics extends AbstractAbfsIntegrationTest {
36+
public ITestAbfsStreamStatistics() throws Exception {
37+
}
38+
39+
private static final Logger LOG =
40+
LoggerFactory.getLogger(ITestAbfsStreamStatistics.class);
41+
42+
private static int LARGE_NUMBER_OF_OPS = 1000000;
43+
44+
/***
45+
* Testing {@code incrementReadOps()} in class {@code AbfsInputStream} and
46+
* {@code incrementWriteOps()} in class {@code AbfsOutputStream}.
47+
*
48+
* @throws Exception
49+
*/
50+
@Test
51+
public void testAbfsStreamOps() throws Exception {
52+
describe("Test to see correct population of read and write operations in "
53+
+ "Abfs");
54+
55+
final AzureBlobFileSystem fs = getFileSystem();
56+
Path smallOperationsFile = new Path("testOneReadWriteOps");
57+
Path largeOperationsFile = new Path("testLargeReadWriteOps");
58+
FileSystem.Statistics statistics = fs.getFsStatistics();
59+
String testReadWriteOps = "test this";
60+
statistics.reset();
61+
62+
//Test for zero write operation
63+
assertReadWriteOps("write", 0, statistics.getWriteOps());
64+
65+
//Test for zero read operation
66+
assertReadWriteOps("read", 0, statistics.getReadOps());
67+
68+
FSDataOutputStream outForOneOperation = null;
69+
FSDataInputStream inForOneOperation = null;
70+
try {
71+
outForOneOperation = fs.create(smallOperationsFile);
72+
statistics.reset();
73+
outForOneOperation.write(testReadWriteOps.getBytes());
74+
75+
//Test for a single write operation
76+
assertReadWriteOps("write", 1, statistics.getWriteOps());
77+
78+
//Flushing output stream to see content to read
79+
outForOneOperation.hflush();
80+
inForOneOperation = fs.open(smallOperationsFile);
81+
statistics.reset();
82+
int result = inForOneOperation.read(testReadWriteOps.getBytes(), 0,
83+
testReadWriteOps.getBytes().length);
84+
85+
LOG.info("Result of Read operation : {}", result);
86+
/*
87+
Testing if 2 read_ops value is coming after reading full content from a
88+
file (3 if anything to read from Buffer too).
89+
Reason: read() call gives read_ops=1,
90+
reading from AbfsClient(http GET) gives read_ops=2.
91+
*/
92+
assertReadWriteOps("read", 2, statistics.getReadOps());
93+
94+
} finally {
95+
IOUtils.cleanupWithLogger(LOG, inForOneOperation,
96+
outForOneOperation);
97+
}
98+
99+
//Validating if content is being written in the smallOperationsFile
100+
assertTrue("Mismatch in content validation",
101+
validateContent(fs, smallOperationsFile,
102+
testReadWriteOps.getBytes()));
103+
104+
FSDataOutputStream outForLargeOperations = null;
105+
FSDataInputStream inForLargeOperations = null;
106+
StringBuilder largeOperationsValidationString = new StringBuilder();
107+
try {
108+
outForLargeOperations = fs.create(largeOperationsFile);
109+
statistics.reset();
110+
int largeValue = LARGE_NUMBER_OF_OPS;
111+
for (int i = 0; i < largeValue; i++) {
112+
outForLargeOperations.write(testReadWriteOps.getBytes());
113+
114+
//Creating the String for content Validation
115+
largeOperationsValidationString.append(testReadWriteOps);
116+
}
117+
LOG.info("Number of bytes of Large data written: {}",
118+
largeOperationsValidationString.toString().getBytes().length);
119+
120+
//Test for 1000000 write operations
121+
assertReadWriteOps("write", largeValue, statistics.getWriteOps());
122+
123+
inForLargeOperations = fs.open(largeOperationsFile);
124+
for (int i = 0; i < largeValue; i++) {
125+
inForLargeOperations
126+
.read(testReadWriteOps.getBytes(), 0,
127+
testReadWriteOps.getBytes().length);
128+
}
129+
130+
//Test for 1000000 read operations
131+
assertReadWriteOps("read", largeValue, statistics.getReadOps());
132+
133+
} finally {
134+
IOUtils.cleanupWithLogger(LOG, inForLargeOperations,
135+
outForLargeOperations);
136+
}
137+
//Validating if content is being written in largeOperationsFile
138+
assertTrue("Mismatch in content validation",
139+
validateContent(fs, largeOperationsFile,
140+
largeOperationsValidationString.toString().getBytes()));
141+
142+
}
143+
144+
/**
145+
* Generic method to assert both Read an write operations.
146+
*
147+
* @param operation what operation is being asserted
148+
* @param expectedValue value which is expected
149+
* @param actualValue value which is actual
150+
*/
151+
152+
private void assertReadWriteOps(String operation, long expectedValue,
153+
long actualValue) {
154+
assertEquals("Mismatch in " + operation + " operations", expectedValue,
155+
actualValue);
156+
}
157+
}

0 commit comments

Comments
 (0)