Skip to content

Commit 2475cbb

Browse files
sahilTakiardeepakdamri
authored andcommitted
HDFS-14564: Add libhdfs APIs for readFully; add readFully to ByteBufferPositionedReadable (apache#963) Contributed by Sahil Takiar.
Reviewed-by: Siyao Meng <[email protected]>
1 parent 981117b commit 2475cbb

File tree

16 files changed

+1475
-375
lines changed

16 files changed

+1475
-375
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.common.base.Preconditions;
3434
import org.apache.hadoop.classification.InterfaceAudience;
3535
import org.apache.hadoop.classification.InterfaceStability;
36+
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
3637
import org.apache.hadoop.fs.ByteBufferReadable;
3738
import org.apache.hadoop.fs.CanSetDropBehind;
3839
import org.apache.hadoop.fs.CanSetReadahead;
@@ -53,18 +54,19 @@
5354
* required in order to ensure that the plain text and cipher text have a 1:1
5455
* mapping. The decryption is buffer based. The key points of the decryption
5556
* are (1) calculating the counter and (2) padding through stream position:
56-
* <p/>
57+
* <p>
5758
* counter = base + pos/(algorithm blocksize);
5859
* padding = pos%(algorithm blocksize);
59-
* <p/>
60+
* <p>
6061
* The underlying stream offset is maintained as state.
6162
*/
6263
@InterfaceAudience.Private
6364
@InterfaceStability.Evolving
6465
public class CryptoInputStream extends FilterInputStream implements
6566
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
6667
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
67-
ReadableByteChannel, CanUnbuffer, StreamCapabilities {
68+
ReadableByteChannel, CanUnbuffer, StreamCapabilities,
69+
ByteBufferPositionedReadable {
6870
private final byte[] oneByteBuf = new byte[1];
6971
private final CryptoCodec codec;
7072
private final Decryptor decryptor;
@@ -328,20 +330,40 @@ public int read(long position, byte[] buffer, int offset, int length)
328330
throws IOException {
329331
checkStream();
330332
try {
331-
final int n = ((PositionedReadable) in).read(position, buffer, offset,
333+
final int n = ((PositionedReadable) in).read(position, buffer, offset,
332334
length);
333335
if (n > 0) {
334336
// This operation does not change the current offset of the file
335337
decrypt(position, buffer, offset, n);
336338
}
337-
339+
338340
return n;
339341
} catch (ClassCastException e) {
340342
throw new UnsupportedOperationException("This stream does not support " +
341343
"positioned read.");
342344
}
343345
}
344-
346+
347+
/**
348+
* Positioned readFully using {@link ByteBuffer}s. This method is thread-safe.
349+
*/
350+
@Override
351+
public void readFully(long position, final ByteBuffer buf)
352+
throws IOException {
353+
checkStream();
354+
if (!(in instanceof ByteBufferPositionedReadable)) {
355+
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
356+
+ " does not support positioned reads with byte buffers.");
357+
}
358+
int bufPos = buf.position();
359+
((ByteBufferPositionedReadable) in).readFully(position, buf);
360+
final int n = buf.position() - bufPos;
361+
if (n > 0) {
362+
// This operation does not change the current offset of the file
363+
decrypt(position, buf, n, bufPos);
364+
}
365+
}
366+
345367
/**
346368
* Decrypt length bytes in buffer starting at offset. Output is also put
347369
* into buffer starting at offset. It is thread-safe.
@@ -375,7 +397,7 @@ private void decrypt(long position, byte[] buffer, int offset, int length)
375397
returnDecryptor(decryptor);
376398
}
377399
}
378-
400+
379401
/** Positioned read fully. It is thread-safe */
380402
@Override
381403
public void readFully(long position, byte[] buffer, int offset, int length)
@@ -407,7 +429,7 @@ public void seek(long pos) throws IOException {
407429
checkStream();
408430
try {
409431
/*
410-
* If data of target pos in the underlying stream has already been read
432+
* If data of target pos in the underlying stream has already been read
411433
* and decrypted in outBuffer, we just need to re-position outBuffer.
412434
*/
413435
if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
@@ -523,7 +545,7 @@ public int read(ByteBuffer buf) throws IOException {
523545
* Output is also buf and same start position.
524546
* buf.position() and buf.limit() should be unchanged after decryption.
525547
*/
526-
private void decrypt(ByteBuffer buf, int n, int start)
548+
private void decrypt(ByteBuffer buf, int n, int start)
527549
throws IOException {
528550
final int pos = buf.position();
529551
final int limit = buf.limit();
@@ -605,7 +627,7 @@ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
605627
}
606628
return buffer;
607629
} catch (ClassCastException e) {
608-
throw new UnsupportedOperationException("This stream does not support " +
630+
throw new UnsupportedOperationException("This stream does not support " +
609631
"enhanced byte buffer access.");
610632
}
611633
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.fs;
19+
20+
import java.io.EOFException;
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
24+
import org.apache.hadoop.classification.InterfaceAudience;
25+
import org.apache.hadoop.classification.InterfaceStability;
26+
27+
/**
28+
* Implementers of this interface provide a positioned read API that writes to a
29+
* {@link ByteBuffer} rather than a {@code byte[]}.
30+
*
31+
* @see PositionedReadable
32+
* @see ByteBufferReadable
33+
*/
34+
@InterfaceAudience.Public
35+
@InterfaceStability.Evolving
36+
public interface ByteBufferPositionedReadable {
37+
/**
38+
* Reads up to {@code buf.remaining()} bytes into buf from a given position
39+
* in the file and returns the number of bytes read. Callers should use
40+
* {@code buf.limit(...)} to control the size of the desired read and
41+
* {@code buf.position(...)} to control the offset into the buffer the data
42+
* should be written to.
43+
* <p>
44+
* After a successful call, {@code buf.position()} will be advanced by the
45+
* number of bytes read and {@code buf.limit()} will be unchanged.
46+
* <p>
47+
* In the case of an exception, the state of the buffer (the contents of the
48+
* buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
49+
* undefined, and callers should be prepared to recover from this
50+
* eventuality.
51+
* <p>
52+
* Callers should use {@link StreamCapabilities#hasCapability(String)} with
53+
* {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying
54+
* stream supports this interface, otherwise they might get a
55+
* {@link UnsupportedOperationException}.
56+
* <p>
57+
* Implementations should treat 0-length requests as legitimate, and must not
58+
* signal an error upon their receipt.
59+
* <p>
60+
* This does not change the current offset of a file, and is thread-safe.
61+
*
62+
* @param position position within file
63+
* @param buf the ByteBuffer to receive the results of the read operation.
64+
* @return the number of bytes read, possibly zero, or -1 if reached
65+
* end-of-stream
66+
* @throws IOException if there is some error performing the read
67+
*/
68+
int read(long position, ByteBuffer buf) throws IOException;
69+
70+
/**
71+
* Reads {@code buf.remaining()} bytes into buf from a given position in
72+
* the file or until the end of the data was reached before the read
73+
* operation completed. Callers should use {@code buf.limit(...)} to
74+
* control the size of the desired read and {@code buf.position(...)} to
75+
* control the offset into the buffer the data should be written to.
76+
* <p>
77+
* This operation provides similar semantics to
78+
* {@link #read(long, ByteBuffer)}, the difference is that this method is
79+
* guaranteed to read data until the {@link ByteBuffer} is full, or until
80+
* the end of the data stream is reached.
81+
*
82+
* @param position position within file
83+
* @param buf the ByteBuffer to receive the results of the read operation.
84+
* @throws IOException if there is some error performing the read
85+
* @throws EOFException the end of the data was reached before
86+
* the read operation completed
87+
* @see #read(long, ByteBuffer)
88+
*/
89+
void readFully(long position, ByteBuffer buf) throws IOException;
90+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
public class FSDataInputStream extends DataInputStream
3939
implements Seekable, PositionedReadable,
4040
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
41-
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
41+
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
42+
ByteBufferPositionedReadable {
4243
/**
4344
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
4445
* objects
@@ -50,8 +51,8 @@ public class FSDataInputStream extends DataInputStream
5051
public FSDataInputStream(InputStream in) {
5152
super(in);
5253
if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
53-
throw new IllegalArgumentException(
54-
"In is not an instance of Seekable or PositionedReadable");
54+
throw new IllegalArgumentException(in.getClass().getCanonicalName() +
55+
" is not an instance of Seekable or PositionedReadable");
5556
}
5657
}
5758

@@ -147,7 +148,8 @@ public int read(ByteBuffer buf) throws IOException {
147148
return ((ByteBufferReadable)in).read(buf);
148149
}
149150

150-
throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
151+
throw new UnsupportedOperationException("Byte-buffer read unsupported " +
152+
"by " + in.getClass().getCanonicalName());
151153
}
152154

153155
@Override
@@ -167,9 +169,8 @@ public void setReadahead(Long readahead)
167169
try {
168170
((CanSetReadahead)in).setReadahead(readahead);
169171
} catch (ClassCastException e) {
170-
throw new UnsupportedOperationException(
171-
"this stream does not support setting the readahead " +
172-
"caching strategy.");
172+
throw new UnsupportedOperationException(in.getClass().getCanonicalName() +
173+
" does not support setting the readahead caching strategy.");
173174
}
174175
}
175176

@@ -246,4 +247,23 @@ public boolean hasCapability(String capability) {
246247
public String toString() {
247248
return super.toString() + ": " + in;
248249
}
250+
251+
@Override
252+
public int read(long position, ByteBuffer buf) throws IOException {
253+
if (in instanceof ByteBufferPositionedReadable) {
254+
return ((ByteBufferPositionedReadable) in).read(position, buf);
255+
}
256+
throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
257+
"by " + in.getClass().getCanonicalName());
258+
}
259+
260+
@Override
261+
public void readFully(long position, ByteBuffer buf) throws IOException {
262+
if (in instanceof ByteBufferPositionedReadable) {
263+
((ByteBufferPositionedReadable) in).readFully(position, buf);
264+
} else {
265+
throw new UnsupportedOperationException("Byte-buffer pread " +
266+
"unsupported by " + in.getClass().getCanonicalName());
267+
}
268+
}
249269
}

0 commit comments

Comments
 (0)