Skip to content

Commit 0f4491f

Browse files
sahilTakiardeepakdamri
authored andcommitted
HDFS-14564: Add libhdfs APIs for readFully; add readFully to ByteBufferPositionedReadable (apache#963) Contributed by Sahil Takiar.
Reviewed-by: Siyao Meng <[email protected]>
1 parent 7c85ec6 commit 0f4491f

File tree

16 files changed

+427
-67
lines changed

16 files changed

+427
-67
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,8 @@ public int read(long position, byte[] buffer, int offset, int length)
330330
throws IOException {
331331
checkStream();
332332
if (!(in instanceof PositionedReadable)) {
333-
throw new UnsupportedOperationException("This stream does not support " +
334-
"positioned read.");
333+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
334+
+ " does not support positioned read.");
335335
}
336336
final int n = ((PositionedReadable) in).read(position, buffer, offset,
337337
length);
@@ -351,8 +351,8 @@ public int read(long position, final ByteBuffer buf)
351351
throws IOException {
352352
checkStream();
353353
if (!(in instanceof ByteBufferPositionedReadable)) {
354-
throw new UnsupportedOperationException("This stream does not support " +
355-
"positioned reads with byte buffers.");
354+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
355+
+ " does not support positioned reads with byte buffers.");
356356
}
357357
int bufPos = buf.position();
358358
final int n = ((ByteBufferPositionedReadable) in).read(position, buf);
@@ -363,7 +363,27 @@ public int read(long position, final ByteBuffer buf)
363363

364364
return n;
365365
}
366-
366+
367+
/**
368+
* Positioned readFully using {@link ByteBuffer}s. This method is thread-safe.
369+
*/
370+
@Override
371+
public void readFully(long position, final ByteBuffer buf)
372+
throws IOException {
373+
checkStream();
374+
if (!(in instanceof ByteBufferPositionedReadable)) {
375+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
376+
+ " does not support positioned reads with byte buffers.");
377+
}
378+
int bufPos = buf.position();
379+
((ByteBufferPositionedReadable) in).readFully(position, buf);
380+
final int n = buf.position() - bufPos;
381+
if (n > 0) {
382+
// This operation does not change the current offset of the file
383+
decrypt(position, buf, n, bufPos);
384+
}
385+
}
386+
367387
/**
368388
* Decrypt length bytes in buffer starting at offset. Output is also put
369389
* into buffer starting at offset. It is thread-safe.
@@ -480,8 +500,8 @@ public void readFully(long position, byte[] buffer, int offset, int length)
480500
throws IOException {
481501
checkStream();
482502
if (!(in instanceof PositionedReadable)) {
483-
throw new UnsupportedOperationException("This stream does not support " +
484-
"positioned readFully.");
503+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
504+
+ " does not support positioned readFully.");
485505
}
486506
((PositionedReadable) in).readFully(position, buffer, offset, length);
487507
if (length > 0) {
@@ -513,8 +533,8 @@ public void seek(long pos) throws IOException {
513533
}
514534
} else {
515535
if (!(in instanceof Seekable)) {
516-
throw new UnsupportedOperationException("This stream does not " +
517-
"support seek.");
536+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
537+
+ " does not support seek.");
518538
}
519539
((Seekable) in).seek(pos);
520540
resetStreamOffset(pos);
@@ -672,8 +692,8 @@ public boolean seekToNewSource(long targetPos) throws IOException {
672692
"Cannot seek to negative offset.");
673693
checkStream();
674694
if (!(in instanceof Seekable)) {
675-
throw new UnsupportedOperationException("This stream does not support " +
676-
"seekToNewSource.");
695+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
696+
+ " does not support seekToNewSource.");
677697
}
678698
boolean result = ((Seekable) in).seekToNewSource(targetPos);
679699
resetStreamOffset(targetPos);
@@ -687,16 +707,16 @@ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
687707
checkStream();
688708
if (outBuffer.remaining() > 0) {
689709
if (!(in instanceof Seekable)) {
690-
throw new UnsupportedOperationException("This stream does not " +
691-
"support seek.");
710+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
711+
+ " does not support seek.");
692712
}
693713
// Have some decrypted data unread, need to reset.
694714
((Seekable) in).seek(getPos());
695715
resetStreamOffset(getPos());
696716
}
697717
if (!(in instanceof HasEnhancedByteBufferAccess)) {
698-
throw new UnsupportedOperationException("This stream does not support " +
699-
"enhanced byte buffer access.");
718+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
719+
+ " does not support enhanced byte buffer access.");
700720
}
701721
final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
702722
read(bufferPool, maxLength, opts);
@@ -714,8 +734,8 @@ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
714734
@Override
715735
public void releaseBuffer(ByteBuffer buffer) {
716736
if (!(in instanceof HasEnhancedByteBufferAccess)) {
717-
throw new UnsupportedOperationException("This stream does not support " +
718-
"release buffer.");
737+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
738+
+ " does not support release buffer.");
719739
}
720740
((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
721741
}
@@ -724,8 +744,8 @@ public void releaseBuffer(ByteBuffer buffer) {
724744
public void setReadahead(Long readahead) throws IOException,
725745
UnsupportedOperationException {
726746
if (!(in instanceof CanSetReadahead)) {
727-
throw new UnsupportedOperationException("This stream does not support " +
728-
"setting the readahead caching strategy.");
747+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
748+
+ " does not support setting the readahead caching strategy.");
729749
}
730750
((CanSetReadahead) in).setReadahead(readahead);
731751
}
@@ -734,8 +754,9 @@ public void setReadahead(Long readahead) throws IOException,
734754
public void setDropBehind(Boolean dropCache) throws IOException,
735755
UnsupportedOperationException {
736756
if (!(in instanceof CanSetReadahead)) {
737-
throw new UnsupportedOperationException("This stream does not " +
738-
"support setting the drop-behind caching setting.");
757+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
758+
+ " stream does not support setting the drop-behind caching"
759+
+ " setting.");
739760
}
740761
((CanSetDropBehind) in).setDropBehind(dropCache);
741762
}
@@ -842,8 +863,8 @@ public boolean hasCapability(String capability) {
842863
case StreamCapabilities.READBYTEBUFFER:
843864
case StreamCapabilities.PREADBYTEBUFFER:
844865
if (!(in instanceof StreamCapabilities)) {
845-
throw new UnsupportedOperationException("This stream does not expose " +
846-
"its stream capabilities.");
866+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
867+
+ " does not expose its stream capabilities.");
847868
}
848869
return ((StreamCapabilities) in).hasCapability(capability);
849870
default:

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.fs;
1919

20+
import java.io.EOFException;
2021
import java.io.IOException;
2122
import java.nio.ByteBuffer;
2223

@@ -55,6 +56,8 @@ public interface ByteBufferPositionedReadable {
5556
* <p>
5657
* Implementations should treat 0-length requests as legitimate, and must not
5758
* signal an error upon their receipt.
59+
* <p>
60+
* This does not change the current offset of a file, and is thread-safe.
5861
*
5962
* @param position position within file
6063
* @param buf the ByteBuffer to receive the results of the read operation.
@@ -63,4 +66,25 @@ public interface ByteBufferPositionedReadable {
6366
* @throws IOException if there is some error performing the read
6467
*/
6568
int read(long position, ByteBuffer buf) throws IOException;
69+
70+
/**
71+
* Reads {@code buf.remaining()} bytes into buf from a given position in
72+
* the file or until the end of the data was reached before the read
73+
* operation completed. Callers should use {@code buf.limit(...)} to
74+
* control the size of the desired read and {@code buf.position(...)} to
75+
* control the offset into the buffer the data should be written to.
76+
* <p>
77+
* This operation provides similar semantics to
78+
* {@link #read(long, ByteBuffer)}, the difference is that this method is
79+
* guaranteed to read data until the {@link ByteBuffer} is full, or until
80+
* the end of the data stream is reached.
81+
*
82+
* @param position position within file
83+
* @param buf the ByteBuffer to receive the results of the read operation.
84+
* @throws IOException if there is some error performing the read
85+
* @throws EOFException the end of the data was reached before
86+
* the read operation completed
87+
* @see #read(long, ByteBuffer)
88+
*/
89+
void readFully(long position, ByteBuffer buf) throws IOException;
6690
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ public class FSDataInputStream extends DataInputStream
5151
public FSDataInputStream(InputStream in) {
5252
super(in);
5353
if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
54-
throw new IllegalArgumentException(
55-
"In is not an instance of Seekable or PositionedReadable");
54+
throw new IllegalArgumentException(in.getClass().getCanonicalName() +
55+
" is not an instance of Seekable or PositionedReadable");
5656
}
5757
}
5858

@@ -149,7 +149,7 @@ public int read(ByteBuffer buf) throws IOException {
149149
}
150150

151151
throw new UnsupportedOperationException("Byte-buffer read unsupported " +
152-
"by input stream");
152+
"by " + in.getClass().getCanonicalName());
153153
}
154154

155155
@Override
@@ -169,9 +169,8 @@ public void setReadahead(Long readahead)
169169
try {
170170
((CanSetReadahead)in).setReadahead(readahead);
171171
} catch (ClassCastException e) {
172-
throw new UnsupportedOperationException(
173-
"this stream does not support setting the readahead " +
174-
"caching strategy.");
172+
throw new UnsupportedOperationException(in.getClass().getCanonicalName() +
173+
" does not support setting the readahead caching strategy.");
175174
}
176175
}
177176

@@ -255,6 +254,16 @@ public int read(long position, ByteBuffer buf) throws IOException {
255254
return ((ByteBufferPositionedReadable) in).read(position, buf);
256255
}
257256
throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
258-
"by input stream");
257+
"by " + in.getClass().getCanonicalName());
258+
}
259+
260+
@Override
261+
public void readFully(long position, ByteBuffer buf) throws IOException {
262+
if (in instanceof ByteBufferPositionedReadable) {
263+
((ByteBufferPositionedReadable) in).readFully(position, buf);
264+
} else {
265+
throw new UnsupportedOperationException("Byte-buffer pread " +
266+
"unsupported by " + in.getClass().getCanonicalName());
267+
}
259268
}
260269
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java

Lines changed: 84 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -388,42 +388,41 @@ private void positionedReadCheckWithByteBuffer(InputStream in, int pos)
388388
Assert.assertArrayEquals(readData, expectedData);
389389
}
390390

391-
/** Test read fully */
391+
/** Test read fully. */
392392
@Test(timeout=120000)
393393
public void testReadFully() throws Exception {
394394
OutputStream out = getOutputStream(defaultBufferSize);
395395
writeData(out);
396396

397-
InputStream in = getInputStream(defaultBufferSize);
398-
final int len1 = dataLen / 4;
399-
// Read len1 bytes
400-
byte[] readData = new byte[len1];
401-
readAll(in, readData, 0, len1);
402-
byte[] expectedData = new byte[len1];
403-
System.arraycopy(data, 0, expectedData, 0, len1);
404-
Assert.assertArrayEquals(readData, expectedData);
405-
406-
// Pos: 1/3 dataLen
407-
readFullyCheck(in, dataLen / 3);
408-
409-
// Read len1 bytes
410-
readData = new byte[len1];
411-
readAll(in, readData, 0, len1);
412-
expectedData = new byte[len1];
413-
System.arraycopy(data, len1, expectedData, 0, len1);
414-
Assert.assertArrayEquals(readData, expectedData);
415-
416-
// Pos: 1/2 dataLen
417-
readFullyCheck(in, dataLen / 2);
418-
419-
// Read len1 bytes
420-
readData = new byte[len1];
421-
readAll(in, readData, 0, len1);
422-
expectedData = new byte[len1];
423-
System.arraycopy(data, 2 * len1, expectedData, 0, len1);
424-
Assert.assertArrayEquals(readData, expectedData);
425-
426-
in.close();
397+
try (InputStream in = getInputStream(defaultBufferSize)) {
398+
final int len1 = dataLen / 4;
399+
// Read len1 bytes
400+
byte[] readData = new byte[len1];
401+
readAll(in, readData, 0, len1);
402+
byte[] expectedData = new byte[len1];
403+
System.arraycopy(data, 0, expectedData, 0, len1);
404+
Assert.assertArrayEquals(readData, expectedData);
405+
406+
// Pos: 1/3 dataLen
407+
readFullyCheck(in, dataLen / 3);
408+
409+
// Read len1 bytes
410+
readData = new byte[len1];
411+
readAll(in, readData, 0, len1);
412+
expectedData = new byte[len1];
413+
System.arraycopy(data, len1, expectedData, 0, len1);
414+
Assert.assertArrayEquals(readData, expectedData);
415+
416+
// Pos: 1/2 dataLen
417+
readFullyCheck(in, dataLen / 2);
418+
419+
// Read len1 bytes
420+
readData = new byte[len1];
421+
readAll(in, readData, 0, len1);
422+
expectedData = new byte[len1];
423+
System.arraycopy(data, 2 * len1, expectedData, 0, len1);
424+
Assert.assertArrayEquals(readData, expectedData);
425+
}
427426
}
428427

429428
private void readFullyCheck(InputStream in, int pos) throws Exception {
@@ -441,6 +440,60 @@ private void readFullyCheck(InputStream in, int pos) throws Exception {
441440
} catch (EOFException e) {
442441
}
443442
}
443+
444+
/** Test byte byffer read fully. */
445+
@Test(timeout=120000)
446+
public void testByteBufferReadFully() throws Exception {
447+
OutputStream out = getOutputStream(defaultBufferSize);
448+
writeData(out);
449+
450+
try (InputStream in = getInputStream(defaultBufferSize)) {
451+
final int len1 = dataLen / 4;
452+
// Read len1 bytes
453+
byte[] readData = new byte[len1];
454+
readAll(in, readData, 0, len1);
455+
byte[] expectedData = new byte[len1];
456+
System.arraycopy(data, 0, expectedData, 0, len1);
457+
Assert.assertArrayEquals(readData, expectedData);
458+
459+
// Pos: 1/3 dataLen
460+
byteBufferReadFullyCheck(in, dataLen / 3);
461+
462+
// Read len1 bytes
463+
readData = new byte[len1];
464+
readAll(in, readData, 0, len1);
465+
expectedData = new byte[len1];
466+
System.arraycopy(data, len1, expectedData, 0, len1);
467+
Assert.assertArrayEquals(readData, expectedData);
468+
469+
// Pos: 1/2 dataLen
470+
byteBufferReadFullyCheck(in, dataLen / 2);
471+
472+
// Read len1 bytes
473+
readData = new byte[len1];
474+
readAll(in, readData, 0, len1);
475+
expectedData = new byte[len1];
476+
System.arraycopy(data, 2 * len1, expectedData, 0, len1);
477+
Assert.assertArrayEquals(readData, expectedData);
478+
}
479+
}
480+
481+
private void byteBufferReadFullyCheck(InputStream in, int pos)
482+
throws Exception {
483+
ByteBuffer result = ByteBuffer.allocate(dataLen - pos);
484+
((ByteBufferPositionedReadable) in).readFully(pos, result);
485+
486+
byte[] expectedData = new byte[dataLen - pos];
487+
System.arraycopy(data, pos, expectedData, 0, dataLen - pos);
488+
Assert.assertArrayEquals(result.array(), expectedData);
489+
490+
result = ByteBuffer.allocate(dataLen); // Exceeds maximum length
491+
try {
492+
((ByteBufferPositionedReadable) in).readFully(pos, result);
493+
Assert.fail("Read fully exceeds maximum length should fail.");
494+
} catch (EOFException e) {
495+
}
496+
}
444497

445498
/** Test seek to different position. */
446499
@Test(timeout=120000)

0 commit comments

Comments
 (0)