Skip to content

Commit 5434dbc

Browse files
sahilTakiardeepakdamri
authored andcommitted
HDFS-14564: Add libhdfs APIs for readFully; add readFully to ByteBufferPositionedReadable (apache#963) Contributed by Sahil Takiar.
Reviewed-by: Siyao Meng <[email protected]>
1 parent 6e4777e commit 5434dbc

File tree

8 files changed

+257
-19
lines changed

8 files changed

+257
-19
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -328,20 +328,40 @@ public int read(long position, byte[] buffer, int offset, int length)
328328
throws IOException {
329329
checkStream();
330330
try {
331-
final int n = ((PositionedReadable) in).read(position, buffer, offset,
331+
final int n = ((PositionedReadable) in).read(position, buffer, offset,
332332
length);
333333
if (n > 0) {
334334
// This operation does not change the current offset of the file
335335
decrypt(position, buffer, offset, n);
336336
}
337-
337+
338338
return n;
339339
} catch (ClassCastException e) {
340340
throw new UnsupportedOperationException("This stream does not support " +
341341
"positioned read.");
342342
}
343343
}
344-
344+
345+
/**
346+
* Positioned readFully using {@link ByteBuffer}s. This method is thread-safe.
347+
*/
348+
@Override
349+
public void readFully(long position, final ByteBuffer buf)
350+
throws IOException {
351+
checkStream();
352+
if (!(in instanceof ByteBufferPositionedReadable)) {
353+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
354+
+ " does not support positioned reads with byte buffers.");
355+
}
356+
int bufPos = buf.position();
357+
((ByteBufferPositionedReadable) in).readFully(position, buf);
358+
final int n = buf.position() - bufPos;
359+
if (n > 0) {
360+
// This operation does not change the current offset of the file
361+
decrypt(position, buf, n, bufPos);
362+
}
363+
}
364+
345365
/**
346366
* Decrypt length bytes in buffer starting at offset. Output is also put
347367
* into buffer starting at offset. It is thread-safe.
@@ -375,7 +395,7 @@ private void decrypt(long position, byte[] buffer, int offset, int length)
375395
returnDecryptor(decryptor);
376396
}
377397
}
378-
398+
379399
/** Positioned read fully. It is thread-safe */
380400
@Override
381401
public void readFully(long position, byte[] buffer, int offset, int length)
@@ -407,7 +427,7 @@ public void seek(long pos) throws IOException {
407427
checkStream();
408428
try {
409429
/*
410-
* If data of target pos in the underlying stream has already been read
430+
* If data of target pos in the underlying stream has already been read
411431
* and decrypted in outBuffer, we just need to re-position outBuffer.
412432
*/
413433
if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
@@ -523,7 +543,7 @@ public int read(ByteBuffer buf) throws IOException {
523543
* Output is also buf and same start position.
524544
* buf.position() and buf.limit() should be unchanged after decryption.
525545
*/
526-
private void decrypt(ByteBuffer buf, int n, int start)
546+
private void decrypt(ByteBuffer buf, int n, int start)
527547
throws IOException {
528548
final int pos = buf.position();
529549
final int limit = buf.limit();
@@ -605,7 +625,7 @@ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
605625
}
606626
return buffer;
607627
} catch (ClassCastException e) {
608-
throw new UnsupportedOperationException("This stream does not support " +
628+
throw new UnsupportedOperationException("This stream does not support " +
609629
"enhanced byte buffer access.");
610630
}
611631
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ public class FSDataInputStream extends DataInputStream
5151
public FSDataInputStream(InputStream in) {
5252
super(in);
5353
if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
54-
throw new IllegalArgumentException(
55-
"In is not an instance of Seekable or PositionedReadable");
54+
throw new IllegalArgumentException(in.getClass().getCanonicalName() +
55+
" is not an instance of Seekable or PositionedReadable");
5656
}
5757
}
5858

@@ -149,7 +149,7 @@ public int read(ByteBuffer buf) throws IOException {
149149
}
150150

151151
throw new UnsupportedOperationException("Byte-buffer read unsupported " +
152-
"by input stream");
152+
"by " + in.getClass().getCanonicalName());
153153
}
154154

155155
@Override
@@ -169,9 +169,8 @@ public void setReadahead(Long readahead)
169169
try {
170170
((CanSetReadahead)in).setReadahead(readahead);
171171
} catch (ClassCastException e) {
172-
throw new UnsupportedOperationException(
173-
"this stream does not support setting the readahead " +
174-
"caching strategy.");
172+
throw new UnsupportedOperationException(in.getClass().getCanonicalName() +
173+
" does not support setting the readahead caching strategy.");
175174
}
176175
}
177176

@@ -255,7 +254,17 @@ public int read(long position, ByteBuffer buf) throws IOException {
255254
return ((ByteBufferPositionedReadable) in).read(position, buf);
256255
}
257256
throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
258-
"by input stream");
257+
"by " + in.getClass().getCanonicalName());
258+
}
259+
260+
@Override
261+
public void readFully(long position, ByteBuffer buf) throws IOException {
262+
if (in instanceof ByteBufferPositionedReadable) {
263+
((ByteBufferPositionedReadable) in).readFully(position, buf);
264+
} else {
265+
throw new UnsupportedOperationException("Byte-buffer pread " +
266+
"unsupported by " + in.getClass().getCanonicalName());
267+
}
259268
}
260269

261270
@Override

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ public void testByteBufferRead() throws Exception {}
9595
@Override
9696
@Test(timeout=10000)
9797
public void testPositionedReadWithByteBuffer() throws IOException {}
98+
99+
@Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
100+
@Override
101+
@Test(timeout=10000)
102+
public void testByteBufferReadFully() throws Exception {}
98103

99104
@Ignore("ChecksumFSOutputSummer doesn't support Syncable")
100105
@Override

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ public void testPositionedRead() throws IOException {}
9696
@Test(timeout=10000)
9797
public void testPositionedReadWithByteBuffer() throws IOException {}
9898

99+
@Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
100+
@Override
101+
@Test(timeout=10000)
102+
public void testByteBufferReadFully() throws Exception {}
103+
99104
@Ignore("Wrapped stream doesn't support ReadFully")
100105
@Override
101106
@Test(timeout=10000)

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.hadoop.fs.CanSetReadahead;
5353
import org.apache.hadoop.fs.CanUnbuffer;
5454
import org.apache.hadoop.fs.ChecksumException;
55+
import org.apache.hadoop.fs.FSExceptionMessages;
5556
import org.apache.hadoop.fs.FSInputStream;
5657
import org.apache.hadoop.fs.FileEncryptionInfo;
5758
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
@@ -1653,6 +1654,27 @@ public void reset() throws IOException {
16531654
throw new IOException("Mark/reset not supported");
16541655
}
16551656

1657+
@Override
1658+
public int read(long position, final ByteBuffer buf) throws IOException {
1659+
if (!buf.hasRemaining()) {
1660+
return 0;
1661+
}
1662+
return pread(position, buf);
1663+
}
1664+
1665+
@Override
1666+
public void readFully(long position, final ByteBuffer buf)
1667+
throws IOException {
1668+
int nread = 0;
1669+
while (buf.hasRemaining()) {
1670+
int nbytes = read(position + nread, buf);
1671+
if (nbytes < 0) {
1672+
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
1673+
}
1674+
nread += nbytes;
1675+
}
1676+
}
1677+
16561678
/** Utility class to encapsulate data node info and its address. */
16571679
static final class DNAddrPair {
16581680
final DatanodeInfo info;

hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,25 @@ int main(int argc, char **argv) {
360360
shutdown_and_exit(cl, -1);
361361
}
362362

363+
// hdfsPreadFully (direct) test
364+
if (hdfsPreadFully(fs, preadFile, 0, (void*)buffer,
365+
(tSize)(strlen(fileContents) + 1))) {
366+
fprintf(stderr, "Failed to preadFully (direct).");
367+
shutdown_and_exit(cl, -1);
368+
}
369+
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
370+
fprintf(stderr, "Failed to preadFully (direct). Expected %s but "
371+
"got %s\n", fileContents, buffer);
372+
shutdown_and_exit(cl, -1);
373+
}
374+
fprintf(stderr, "PreadFully (direct) following %d bytes:\n%s\n",
375+
num_pread_bytes, buffer);
376+
memset(buffer, 0, strlen(fileContents + 1));
377+
if (hdfsTell(fs, preadFile) != 0) {
378+
fprintf(stderr, "PreadFully changed position of file\n");
379+
shutdown_and_exit(cl, -1);
380+
}
381+
363382
// Disable the direct pread path so that we really go through the slow
364383
// read path
365384
hdfsFileDisableDirectPread(preadFile);
@@ -388,19 +407,39 @@ int main(int argc, char **argv) {
388407
shutdown_and_exit(cl, -1);
389408
}
390409

410+
// Test pread midway through the file rather than at the beginning
391411
num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer));
392412
if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) {
393-
fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n",
413+
fprintf(stderr, "Failed to pread. Expected %s but got %s (%d bytes)\n",
394414
fileContentsChunk, buffer, num_read_bytes);
395415
shutdown_and_exit(cl, -1);
396416
}
397-
fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer);
417+
fprintf(stderr, "Pread following %d bytes:\n%s\n", num_pread_bytes, buffer);
398418
memset(buffer, 0, strlen(fileContents + 1));
399419
if (hdfsTell(fs, preadFile) != 0) {
400420
fprintf(stderr, "Pread changed position of file\n");
401421
shutdown_and_exit(cl, -1);
402422
}
403423

424+
// hdfsPreadFully test
425+
if (hdfsPreadFully(fs, preadFile, 0, (void*)buffer,
426+
(tSize)(strlen(fileContents) + 1))) {
427+
fprintf(stderr, "Failed to preadFully.");
428+
shutdown_and_exit(cl, -1);
429+
}
430+
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
431+
fprintf(stderr, "Failed to preadFully. Expected %s but got %s\n",
432+
fileContents, buffer);
433+
shutdown_and_exit(cl, -1);
434+
}
435+
fprintf(stderr, "PreadFully following %d bytes:\n%s\n",
436+
num_pread_bytes, buffer);
437+
memset(buffer, 0, strlen(fileContents + 1));
438+
if (hdfsTell(fs, preadFile) != 0) {
439+
fprintf(stderr, "PreadFully changed position of file\n");
440+
shutdown_and_exit(cl, -1);
441+
}
442+
404443
hdfsCloseFile(fs, preadFile);
405444

406445
// Test correct behaviour for unsupported filesystems

hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
5757
tSize preadDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
5858
tSize length);
5959

60+
int preadFullyDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
61+
tSize length);
62+
6063
static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
6164

6265
/**
@@ -1645,6 +1648,7 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
16451648
"hdfsPread: NewByteArray");
16461649
return -1;
16471650
}
1651+
16481652
jthr = invokeMethod(env, &jVal, INSTANCE, f->file,
16491653
JC_FS_DATA_INPUT_STREAM, "read", "(J[BII)I", position,
16501654
jbRarray, 0, length);
@@ -1727,6 +1731,119 @@ tSize preadDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer,
17271731
return jVal.i;
17281732
}
17291733

1734+
/**
1735+
* Like hdfsPread, if the underlying stream supports the
1736+
* ByteBufferPositionedReadable interface then this method will transparently
1737+
* use readFully(long, ByteBuffer).
1738+
*/
1739+
int hdfsPreadFully(hdfsFS fs, hdfsFile f, tOffset position,
1740+
void* buffer, tSize length) {
1741+
JNIEnv* env;
1742+
jbyteArray jbRarray;
1743+
jthrowable jthr;
1744+
1745+
if (length == 0) {
1746+
return 0;
1747+
} else if (length < 0) {
1748+
errno = EINVAL;
1749+
return -1;
1750+
}
1751+
if (!f || f->type == HDFS_STREAM_UNINITIALIZED) {
1752+
errno = EBADF;
1753+
return -1;
1754+
}
1755+
1756+
if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) {
1757+
return preadFullyDirect(fs, f, position, buffer, length);
1758+
}
1759+
1760+
env = getJNIEnv();
1761+
if (env == NULL) {
1762+
errno = EINTERNAL;
1763+
return -1;
1764+
}
1765+
1766+
//Error checking... make sure that this file is 'readable'
1767+
if (f->type != HDFS_STREAM_INPUT) {
1768+
fprintf(stderr, "Cannot read from a non-InputStream object!\n");
1769+
errno = EINVAL;
1770+
return -1;
1771+
}
1772+
1773+
// JAVA EQUIVALENT:
1774+
// byte [] bR = new byte[length];
1775+
// fis.read(pos, bR, 0, length);
1776+
jbRarray = (*env)->NewByteArray(env, length);
1777+
if (!jbRarray) {
1778+
errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
1779+
"hdfsPread: NewByteArray");
1780+
return -1;
1781+
}
1782+
1783+
jthr = invokeMethod(env, NULL, INSTANCE, f->file,
1784+
JC_FS_DATA_INPUT_STREAM, "readFully", "(J[BII)V",
1785+
position, jbRarray, 0, length);
1786+
if (jthr) {
1787+
destroyLocalReference(env, jbRarray);
1788+
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
1789+
"hdfsPread: FSDataInputStream#read");
1790+
return -1;
1791+
}
1792+
1793+
(*env)->GetByteArrayRegion(env, jbRarray, 0, length, buffer);
1794+
destroyLocalReference(env, jbRarray);
1795+
if ((*env)->ExceptionCheck(env)) {
1796+
errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
1797+
"hdfsPread: GetByteArrayRegion");
1798+
return -1;
1799+
}
1800+
return 0;
1801+
}
1802+
1803+
int preadFullyDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer,
1804+
tSize length)
1805+
{
1806+
// JAVA EQUIVALENT:
1807+
// ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer
1808+
// fis.read(position, buf);
1809+
1810+
jthrowable jthr;
1811+
jobject bb;
1812+
1813+
//Get the JNIEnv* corresponding to current thread
1814+
JNIEnv* env = getJNIEnv();
1815+
if (env == NULL) {
1816+
errno = EINTERNAL;
1817+
return -1;
1818+
}
1819+
1820+
//Error checking... make sure that this file is 'readable'
1821+
if (f->type != HDFS_STREAM_INPUT) {
1822+
fprintf(stderr, "Cannot read from a non-InputStream object!\n");
1823+
errno = EINVAL;
1824+
return -1;
1825+
}
1826+
1827+
//Read the requisite bytes
1828+
bb = (*env)->NewDirectByteBuffer(env, buffer, length);
1829+
if (bb == NULL) {
1830+
errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
1831+
"readDirect: NewDirectByteBuffer");
1832+
return -1;
1833+
}
1834+
1835+
jthr = invokeMethod(env, NULL, INSTANCE, f->file,
1836+
JC_FS_DATA_INPUT_STREAM, "readFully",
1837+
"(JLjava/nio/ByteBuffer;)V", position, bb);
1838+
destroyLocalReference(env, bb);
1839+
if (jthr) {
1840+
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
1841+
"preadDirect: FSDataInputStream#read");
1842+
return -1;
1843+
}
1844+
return 0;
1845+
}
1846+
17301847
tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
17311848
{
17321849
// JAVA EQUIVALENT

0 commit comments

Comments
 (0)