Skip to content

Commit ce76fe1

Browse files
committed
HDFS-14564: Add libhdfs APIs for readFully; add readFully to ByteBufferPositionedReadable (apache#963) Contributed by Sahil Takiar.
Reviewed-by: Siyao Meng <[email protected]>
1 parent 56c6d87 commit ce76fe1

File tree

2 files changed

+335
-0
lines changed

2 files changed

+335
-0
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.fs;
19+
20+
import java.io.IOException;
21+
import java.nio.ByteBuffer;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
import org.apache.hadoop.classification.InterfaceStability;
25+
26+
/**
27+
* Implementers of this interface provide a positioned read API that writes to a
28+
* {@link ByteBuffer} rather than a {@code byte[]}.
29+
*
30+
* @see PositionedReadable
31+
* @see ByteBufferReadable
32+
*/
33+
@InterfaceAudience.Public
34+
@InterfaceStability.Evolving
35+
public interface ByteBufferPositionedReadable {
36+
/**
37+
* Reads up to {@code buf.remaining()} bytes into buf from a given position
38+
* in the file and returns the number of bytes read. Callers should use
39+
* {@code buf.limit(...)} to control the size of the desired read and
40+
* {@code buf.position(...)} to control the offset into the buffer the data
41+
* should be written to.
42+
* <p>
43+
* After a successful call, {@code buf.position()} will be advanced by the
44+
* number of bytes read and {@code buf.limit()} will be unchanged.
45+
* <p>
46+
* In the case of an exception, the state of the buffer (the contents of the
47+
* buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
48+
* undefined, and callers should be prepared to recover from this
49+
* eventuality.
50+
* <p>
51+
* Callers should use {@link StreamCapabilities#hasCapability(String)} with
52+
* {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying
53+
* stream supports this interface, otherwise they might get a
54+
* {@link UnsupportedOperationException}.
55+
* <p>
56+
* Implementations should treat 0-length requests as legitimate, and must not
57+
* signal an error upon their receipt.
58+
*
59+
* @param position position within file
60+
* @param buf the ByteBuffer to receive the results of the read operation.
61+
* @return the number of bytes read, possibly zero, or -1 if reached
62+
* end-of-stream
63+
* @throws IOException if there is some error performing the read
64+
*/
65+
int read(long position, ByteBuffer buf) throws IOException;
66+
}
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs;
19+
20+
import java.io.IOException;
21+
import java.nio.ByteBuffer;
22+
import java.util.Arrays;
23+
import java.util.Random;
24+
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.fs.FSDataInputStream;
27+
import org.apache.hadoop.fs.FSDataOutputStream;
28+
import org.apache.hadoop.fs.FileSystem;
29+
import org.apache.hadoop.fs.Path;
30+
31+
import org.junit.AfterClass;
32+
import org.junit.BeforeClass;
33+
import org.junit.Test;
34+
35+
import static org.junit.Assert.assertArrayEquals;
36+
import static org.junit.Assert.assertEquals;
37+
import static org.junit.Assert.assertFalse;
38+
import static org.junit.Assert.assertTrue;
39+
40+
/**
41+
* This class tests the DFS positional read functionality on a single node
42+
* mini-cluster. These tests are inspired from {@link TestPread}. The tests
43+
* are much less comprehensive than other pread tests because pread already
44+
* internally uses {@link ByteBuffer}s.
45+
*/
46+
public class TestByteBufferPread {
47+
48+
private static MiniDFSCluster cluster;
49+
private static FileSystem fs;
50+
private static byte[] fileContents;
51+
private static Path testFile;
52+
private static Random rand;
53+
54+
private static final long SEED = 0xDEADBEEFL;
55+
private static final int BLOCK_SIZE = 4096;
56+
private static final int FILE_SIZE = 12 * BLOCK_SIZE;
57+
58+
@BeforeClass
59+
public static void setup() throws IOException {
60+
// Setup the cluster with a small block size so we can create small files
61+
// that span multiple blocks
62+
Configuration conf = new Configuration();
63+
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
64+
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
65+
fs = cluster.getFileSystem();
66+
67+
// Create a test file that spans 12 blocks, and contains a bunch of random
68+
// bytes
69+
fileContents = new byte[FILE_SIZE];
70+
rand = new Random(SEED);
71+
rand.nextBytes(fileContents);
72+
testFile = new Path("/byte-buffer-pread-test.dat");
73+
try (FSDataOutputStream out = fs.create(testFile, (short) 3)) {
74+
out.write(fileContents);
75+
}
76+
}
77+
78+
/**
79+
* Test preads with {@link java.nio.HeapByteBuffer}s.
80+
*/
81+
@Test
82+
public void testPreadWithHeapByteBuffer() throws IOException {
83+
testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
84+
testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE));
85+
testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
86+
testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
87+
testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
88+
}
89+
90+
/**
91+
* Test preads with {@link java.nio.DirectByteBuffer}s.
92+
*/
93+
@Test
94+
public void testPreadWithDirectByteBuffer() throws IOException {
95+
testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
96+
testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
97+
testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
98+
testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
99+
testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
100+
}
101+
102+
/**
103+
* Reads the entire testFile using the pread API and validates that its
104+
* contents are properly loaded into the supplied {@link ByteBuffer}.
105+
*/
106+
private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException {
107+
int bytesRead;
108+
int totalBytesRead = 0;
109+
try (FSDataInputStream in = fs.open(testFile)) {
110+
while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
111+
totalBytesRead += bytesRead;
112+
// Check that each call to read changes the position of the ByteBuffer
113+
// correctly
114+
assertEquals(totalBytesRead, buffer.position());
115+
}
116+
117+
// Make sure the buffer is full
118+
assertFalse(buffer.hasRemaining());
119+
// Make sure the contents of the read buffer equal the contents of the
120+
// file
121+
buffer.position(0);
122+
byte[] bufferContents = new byte[FILE_SIZE];
123+
buffer.get(bufferContents);
124+
assertArrayEquals(bufferContents, fileContents);
125+
buffer.position(buffer.limit());
126+
}
127+
}
128+
129+
/**
130+
* Attempts to read the testFile into a {@link ByteBuffer} that is already
131+
* full, and validates that doing so does not change the contents of the
132+
* supplied {@link ByteBuffer}.
133+
*/
134+
private void testPreadWithFullByteBuffer(ByteBuffer buffer)
135+
throws IOException {
136+
// Load some dummy data into the buffer
137+
byte[] existingBufferBytes = new byte[FILE_SIZE];
138+
rand.nextBytes(existingBufferBytes);
139+
buffer.put(existingBufferBytes);
140+
// Make sure the buffer is full
141+
assertFalse(buffer.hasRemaining());
142+
143+
try (FSDataInputStream in = fs.open(testFile)) {
144+
// Attempt to read into the buffer, 0 bytes should be read since the
145+
// buffer is full
146+
assertEquals(0, in.read(buffer));
147+
148+
// Double check the buffer is still full and its contents have not
149+
// changed
150+
assertFalse(buffer.hasRemaining());
151+
buffer.position(0);
152+
byte[] bufferContents = new byte[FILE_SIZE];
153+
buffer.get(bufferContents);
154+
assertArrayEquals(bufferContents, existingBufferBytes);
155+
}
156+
}
157+
158+
/**
159+
* Reads half of the testFile into the {@link ByteBuffer} by setting a
160+
* {@link ByteBuffer#limit} on the buffer. Validates that only half of the
161+
* testFile is loaded into the buffer.
162+
*/
163+
private void testPreadWithLimitedByteBuffer(
164+
ByteBuffer buffer) throws IOException {
165+
int bytesRead;
166+
int totalBytesRead = 0;
167+
// Set the buffer limit to half the size of the file
168+
buffer.limit(FILE_SIZE / 2);
169+
170+
try (FSDataInputStream in = fs.open(testFile)) {
171+
while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
172+
totalBytesRead += bytesRead;
173+
// Check that each call to read changes the position of the ByteBuffer
174+
// correctly
175+
assertEquals(totalBytesRead, buffer.position());
176+
}
177+
178+
// Since we set the buffer limit to half the size of the file, we should
179+
// have only read half of the file into the buffer
180+
assertEquals(totalBytesRead, FILE_SIZE / 2);
181+
// Check that the buffer is full and the contents equal the first half of
182+
// the file
183+
assertFalse(buffer.hasRemaining());
184+
buffer.position(0);
185+
byte[] bufferContents = new byte[FILE_SIZE / 2];
186+
buffer.get(bufferContents);
187+
assertArrayEquals(bufferContents,
188+
Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
189+
}
190+
}
191+
192+
/**
193+
* Reads half of the testFile into the {@link ByteBuffer} by setting the
194+
* {@link ByteBuffer#position} the half the size of the file. Validates that
195+
* only half of the testFile is loaded into the buffer.
196+
*/
197+
private void testPreadWithPositionedByteBuffer(
198+
ByteBuffer buffer) throws IOException {
199+
int bytesRead;
200+
int totalBytesRead = 0;
201+
// Set the buffer position to half the size of the file
202+
buffer.position(FILE_SIZE / 2);
203+
204+
try (FSDataInputStream in = fs.open(testFile)) {
205+
while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
206+
totalBytesRead += bytesRead;
207+
// Check that each call to read changes the position of the ByteBuffer
208+
// correctly
209+
assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position());
210+
}
211+
212+
// Since we set the buffer position to half the size of the file, we
213+
// should have only read half of the file into the buffer
214+
assertEquals(totalBytesRead, FILE_SIZE / 2);
215+
// Check that the buffer is full and the contents equal the first half of
216+
// the file
217+
assertFalse(buffer.hasRemaining());
218+
buffer.position(FILE_SIZE / 2);
219+
byte[] bufferContents = new byte[FILE_SIZE / 2];
220+
buffer.get(bufferContents);
221+
assertArrayEquals(bufferContents,
222+
Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
223+
}
224+
}
225+
226+
/**
227+
* Reads half of the testFile into the {@link ByteBuffer} by specifying a
228+
* position for the pread API that is half of the file size. Validates that
229+
* only half of the testFile is loaded into the buffer.
230+
*/
231+
private void testPositionedPreadWithByteBuffer(
232+
ByteBuffer buffer) throws IOException {
233+
int bytesRead;
234+
int totalBytesRead = 0;
235+
236+
try (FSDataInputStream in = fs.open(testFile)) {
237+
// Start reading from halfway through the file
238+
while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2,
239+
buffer)) > 0) {
240+
totalBytesRead += bytesRead;
241+
// Check that each call to read changes the position of the ByteBuffer
242+
// correctly
243+
assertEquals(totalBytesRead, buffer.position());
244+
}
245+
246+
// Since we starting reading halfway through the file, the buffer should
247+
// only be half full
248+
assertEquals(totalBytesRead, FILE_SIZE / 2);
249+
assertEquals(buffer.position(), FILE_SIZE / 2);
250+
assertTrue(buffer.hasRemaining());
251+
// Check that the buffer contents equal the second half of the file
252+
buffer.position(0);
253+
byte[] bufferContents = new byte[FILE_SIZE / 2];
254+
buffer.get(bufferContents);
255+
assertArrayEquals(bufferContents,
256+
Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE));
257+
}
258+
}
259+
260+
@AfterClass
261+
public static void shutdown() throws IOException {
262+
try {
263+
fs.delete(testFile, false);
264+
fs.close();
265+
} finally {
266+
cluster.shutdown(true);
267+
}
268+
}
269+
}

0 commit comments

Comments
 (0)