Skip to content

Commit 12d7d26

Browse files
sahilTakiardeepakdamri
authored andcommitted
HDFS-14564: Add libhdfs APIs for readFully; add readFully to ByteBufferPositionedReadable (apache#963) Contributed by Sahil Takiar.
Reviewed-by: Siyao Meng <[email protected]>
1 parent 981117b commit 12d7d26

File tree

16 files changed

+1471
-372
lines changed

16 files changed

+1471
-372
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.common.base.Preconditions;
3434
import org.apache.hadoop.classification.InterfaceAudience;
3535
import org.apache.hadoop.classification.InterfaceStability;
36+
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
3637
import org.apache.hadoop.fs.ByteBufferReadable;
3738
import org.apache.hadoop.fs.CanSetDropBehind;
3839
import org.apache.hadoop.fs.CanSetReadahead;
@@ -328,20 +329,40 @@ public int read(long position, byte[] buffer, int offset, int length)
328329
throws IOException {
329330
checkStream();
330331
try {
331-
final int n = ((PositionedReadable) in).read(position, buffer, offset,
332+
final int n = ((PositionedReadable) in).read(position, buffer, offset,
332333
length);
333334
if (n > 0) {
334335
// This operation does not change the current offset of the file
335336
decrypt(position, buffer, offset, n);
336337
}
337-
338+
338339
return n;
339340
} catch (ClassCastException e) {
340341
throw new UnsupportedOperationException("This stream does not support " +
341342
"positioned read.");
342343
}
343344
}
344-
345+
346+
/**
347+
* Positioned readFully using {@link ByteBuffer}s. This method is thread-safe.
348+
*/
349+
@Override
350+
public void readFully(long position, final ByteBuffer buf)
351+
throws IOException {
352+
checkStream();
353+
if (!(in instanceof ByteBufferPositionedReadable)) {
354+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
355+
+ " does not support positioned reads with byte buffers.");
356+
}
357+
int bufPos = buf.position();
358+
((ByteBufferPositionedReadable) in).readFully(position, buf);
359+
final int n = buf.position() - bufPos;
360+
if (n > 0) {
361+
// This operation does not change the current offset of the file
362+
decrypt(position, buf, n, bufPos);
363+
}
364+
}
365+
345366
/**
346367
* Decrypt length bytes in buffer starting at offset. Output is also put
347368
* into buffer starting at offset. It is thread-safe.
@@ -375,7 +396,7 @@ private void decrypt(long position, byte[] buffer, int offset, int length)
375396
returnDecryptor(decryptor);
376397
}
377398
}
378-
399+
379400
/** Positioned read fully. It is thread-safe */
380401
@Override
381402
public void readFully(long position, byte[] buffer, int offset, int length)
@@ -407,7 +428,7 @@ public void seek(long pos) throws IOException {
407428
checkStream();
408429
try {
409430
/*
410-
* If data of target pos in the underlying stream has already been read
431+
* If data of target pos in the underlying stream has already been read
411432
* and decrypted in outBuffer, we just need to re-position outBuffer.
412433
*/
413434
if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
@@ -523,7 +544,7 @@ public int read(ByteBuffer buf) throws IOException {
523544
* Output is also buf and same start position.
524545
* buf.position() and buf.limit() should be unchanged after decryption.
525546
*/
526-
private void decrypt(ByteBuffer buf, int n, int start)
547+
private void decrypt(ByteBuffer buf, int n, int start)
527548
throws IOException {
528549
final int pos = buf.position();
529550
final int limit = buf.limit();
@@ -605,7 +626,7 @@ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
605626
}
606627
return buffer;
607628
} catch (ClassCastException e) {
608-
throw new UnsupportedOperationException("This stream does not support " +
629+
throw new UnsupportedOperationException("This stream does not support " +
609630
"enhanced byte buffer access.");
610631
}
611632
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.fs;
19+
20+
import java.io.EOFException;
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
24+
import org.apache.hadoop.classification.InterfaceAudience;
25+
import org.apache.hadoop.classification.InterfaceStability;
26+
27+
/**
28+
* Implementers of this interface provide a positioned read API that writes to a
29+
* {@link ByteBuffer} rather than a {@code byte[]}.
30+
*
31+
* @see PositionedReadable
32+
* @see ByteBufferReadable
33+
*/
34+
@InterfaceAudience.Public
35+
@InterfaceStability.Evolving
36+
public interface ByteBufferPositionedReadable {
37+
/**
38+
* Reads up to {@code buf.remaining()} bytes into buf from a given position
39+
* in the file and returns the number of bytes read. Callers should use
40+
* {@code buf.limit(...)} to control the size of the desired read and
41+
* {@code buf.position(...)} to control the offset into the buffer the data
42+
* should be written to.
43+
* <p>
44+
* After a successful call, {@code buf.position()} will be advanced by the
45+
* number of bytes read and {@code buf.limit()} will be unchanged.
46+
* <p>
47+
* In the case of an exception, the state of the buffer (the contents of the
48+
* buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
49+
* undefined, and callers should be prepared to recover from this
50+
* eventuality.
51+
* <p>
52+
* Callers should use {@link StreamCapabilities#hasCapability(String)} with
53+
* {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying
54+
* stream supports this interface, otherwise they might get a
55+
* {@link UnsupportedOperationException}.
56+
* <p>
57+
* Implementations should treat 0-length requests as legitimate, and must not
58+
* signal an error upon their receipt.
59+
* <p>
60+
* This does not change the current offset of a file, and is thread-safe.
61+
*
62+
* @param position position within file
63+
* @param buf the ByteBuffer to receive the results of the read operation.
64+
* @return the number of bytes read, possibly zero, or -1 if reached
65+
* end-of-stream
66+
* @throws IOException if there is some error performing the read
67+
*/
68+
int read(long position, ByteBuffer buf) throws IOException;
69+
70+
/**
71+
* Reads {@code buf.remaining()} bytes into buf from a given position in
72+
* the file or until the end of the data was reached before the read
73+
* operation completed. Callers should use {@code buf.limit(...)} to
74+
* control the size of the desired read and {@code buf.position(...)} to
75+
* control the offset into the buffer the data should be written to.
76+
* <p>
77+
* This operation provides similar semantics to
78+
* {@link #read(long, ByteBuffer)}, the difference is that this method is
79+
* guaranteed to read data until the {@link ByteBuffer} is full, or until
80+
* the end of the data stream is reached.
81+
*
82+
* @param position position within file
83+
* @param buf the ByteBuffer to receive the results of the read operation.
84+
* @throws IOException if there is some error performing the read
85+
* @throws EOFException the end of the data was reached before
86+
* the read operation completed
87+
* @see #read(long, ByteBuffer)
88+
*/
89+
void readFully(long position, ByteBuffer buf) throws IOException;
90+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
public class FSDataInputStream extends DataInputStream
3939
implements Seekable, PositionedReadable,
4040
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
41-
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
41+
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
42+
ByteBufferPositionedReadable {
4243
/**
4344
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
4445
* objects
@@ -50,8 +51,8 @@ public class FSDataInputStream extends DataInputStream
5051
public FSDataInputStream(InputStream in) {
5152
super(in);
5253
if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
53-
throw new IllegalArgumentException(
54-
"In is not an instance of Seekable or PositionedReadable");
54+
throw new IllegalArgumentException(in.getClass().getCanonicalName() +
55+
" is not an instance of Seekable or PositionedReadable");
5556
}
5657
}
5758

@@ -147,7 +148,8 @@ public int read(ByteBuffer buf) throws IOException {
147148
return ((ByteBufferReadable)in).read(buf);
148149
}
149150

150-
throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
151+
throw new UnsupportedOperationException("Byte-buffer read unsupported " +
152+
"by " + in.getClass().getCanonicalName());
151153
}
152154

153155
@Override
@@ -167,9 +169,8 @@ public void setReadahead(Long readahead)
167169
try {
168170
((CanSetReadahead)in).setReadahead(readahead);
169171
} catch (ClassCastException e) {
170-
throw new UnsupportedOperationException(
171-
"this stream does not support setting the readahead " +
172-
"caching strategy.");
172+
throw new UnsupportedOperationException(in.getClass().getCanonicalName() +
173+
" does not support setting the readahead caching strategy.");
173174
}
174175
}
175176

@@ -246,4 +247,23 @@ public boolean hasCapability(String capability) {
246247
public String toString() {
247248
return super.toString() + ": " + in;
248249
}
250+
251+
@Override
252+
public int read(long position, ByteBuffer buf) throws IOException {
253+
if (in instanceof ByteBufferPositionedReadable) {
254+
return ((ByteBufferPositionedReadable) in).read(position, buf);
255+
}
256+
throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
257+
"by " + in.getClass().getCanonicalName());
258+
}
259+
260+
@Override
261+
public void readFully(long position, ByteBuffer buf) throws IOException {
262+
if (in instanceof ByteBufferPositionedReadable) {
263+
((ByteBufferPositionedReadable) in).readFully(position, buf);
264+
} else {
265+
throw new UnsupportedOperationException("Byte-buffer pread " +
266+
"unsupported by " + in.getClass().getCanonicalName());
267+
}
268+
}
249269
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java

Lines changed: 84 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -316,42 +316,41 @@ private void positionedReadCheck(InputStream in, int pos) throws Exception {
316316
Assert.assertArrayEquals(readData, expectedData);
317317
}
318318

319-
/** Test read fully */
319+
/** Test read fully. */
320320
@Test(timeout=120000)
321321
public void testReadFully() throws Exception {
322322
OutputStream out = getOutputStream(defaultBufferSize);
323323
writeData(out);
324324

325-
InputStream in = getInputStream(defaultBufferSize);
326-
final int len1 = dataLen / 4;
327-
// Read len1 bytes
328-
byte[] readData = new byte[len1];
329-
readAll(in, readData, 0, len1);
330-
byte[] expectedData = new byte[len1];
331-
System.arraycopy(data, 0, expectedData, 0, len1);
332-
Assert.assertArrayEquals(readData, expectedData);
333-
334-
// Pos: 1/3 dataLen
335-
readFullyCheck(in, dataLen / 3);
336-
337-
// Read len1 bytes
338-
readData = new byte[len1];
339-
readAll(in, readData, 0, len1);
340-
expectedData = new byte[len1];
341-
System.arraycopy(data, len1, expectedData, 0, len1);
342-
Assert.assertArrayEquals(readData, expectedData);
343-
344-
// Pos: 1/2 dataLen
345-
readFullyCheck(in, dataLen / 2);
346-
347-
// Read len1 bytes
348-
readData = new byte[len1];
349-
readAll(in, readData, 0, len1);
350-
expectedData = new byte[len1];
351-
System.arraycopy(data, 2 * len1, expectedData, 0, len1);
352-
Assert.assertArrayEquals(readData, expectedData);
353-
354-
in.close();
325+
try (InputStream in = getInputStream(defaultBufferSize)) {
326+
final int len1 = dataLen / 4;
327+
// Read len1 bytes
328+
byte[] readData = new byte[len1];
329+
readAll(in, readData, 0, len1);
330+
byte[] expectedData = new byte[len1];
331+
System.arraycopy(data, 0, expectedData, 0, len1);
332+
Assert.assertArrayEquals(readData, expectedData);
333+
334+
// Pos: 1/3 dataLen
335+
readFullyCheck(in, dataLen / 3);
336+
337+
// Read len1 bytes
338+
readData = new byte[len1];
339+
readAll(in, readData, 0, len1);
340+
expectedData = new byte[len1];
341+
System.arraycopy(data, len1, expectedData, 0, len1);
342+
Assert.assertArrayEquals(readData, expectedData);
343+
344+
// Pos: 1/2 dataLen
345+
readFullyCheck(in, dataLen / 2);
346+
347+
// Read len1 bytes
348+
readData = new byte[len1];
349+
readAll(in, readData, 0, len1);
350+
expectedData = new byte[len1];
351+
System.arraycopy(data, 2 * len1, expectedData, 0, len1);
352+
Assert.assertArrayEquals(readData, expectedData);
353+
}
355354
}
356355

357356
private void readFullyCheck(InputStream in, int pos) throws Exception {
@@ -369,6 +368,60 @@ private void readFullyCheck(InputStream in, int pos) throws Exception {
369368
} catch (EOFException e) {
370369
}
371370
}
371+
372+
/** Test byte byffer read fully. */
373+
@Test(timeout=120000)
374+
public void testByteBufferReadFully() throws Exception {
375+
OutputStream out = getOutputStream(defaultBufferSize);
376+
writeData(out);
377+
378+
try (InputStream in = getInputStream(defaultBufferSize)) {
379+
final int len1 = dataLen / 4;
380+
// Read len1 bytes
381+
byte[] readData = new byte[len1];
382+
readAll(in, readData, 0, len1);
383+
byte[] expectedData = new byte[len1];
384+
System.arraycopy(data, 0, expectedData, 0, len1);
385+
Assert.assertArrayEquals(readData, expectedData);
386+
387+
// Pos: 1/3 dataLen
388+
byteBufferReadFullyCheck(in, dataLen / 3);
389+
390+
// Read len1 bytes
391+
readData = new byte[len1];
392+
readAll(in, readData, 0, len1);
393+
expectedData = new byte[len1];
394+
System.arraycopy(data, len1, expectedData, 0, len1);
395+
Assert.assertArrayEquals(readData, expectedData);
396+
397+
// Pos: 1/2 dataLen
398+
byteBufferReadFullyCheck(in, dataLen / 2);
399+
400+
// Read len1 bytes
401+
readData = new byte[len1];
402+
readAll(in, readData, 0, len1);
403+
expectedData = new byte[len1];
404+
System.arraycopy(data, 2 * len1, expectedData, 0, len1);
405+
Assert.assertArrayEquals(readData, expectedData);
406+
}
407+
}
408+
409+
private void byteBufferReadFullyCheck(InputStream in, int pos)
410+
throws Exception {
411+
ByteBuffer result = ByteBuffer.allocate(dataLen - pos);
412+
((ByteBufferPositionedReadable) in).readFully(pos, result);
413+
414+
byte[] expectedData = new byte[dataLen - pos];
415+
System.arraycopy(data, pos, expectedData, 0, dataLen - pos);
416+
Assert.assertArrayEquals(result.array(), expectedData);
417+
418+
result = ByteBuffer.allocate(dataLen); // Exceeds maximum length
419+
try {
420+
((ByteBufferPositionedReadable) in).readFully(pos, result);
421+
Assert.fail("Read fully exceeds maximum length should fail.");
422+
} catch (EOFException e) {
423+
}
424+
}
372425

373426
/** Test seek to different position. */
374427
@Test(timeout=120000)

0 commit comments

Comments
 (0)