1818
1919package org .apache .hadoop .hdds .scm .storage ;
2020
21+ import com .google .common .annotations .VisibleForTesting ;
2122import com .google .common .base .Preconditions ;
2223import org .apache .hadoop .hdds .protocol .DatanodeDetails ;
2324import org .apache .hadoop .hdds .protocol .datanode .proto .ContainerProtos ;
@@ -61,10 +62,18 @@ public class BlockInputStream extends InputStream implements Seekable {
6162 private XceiverClientManager xceiverClientManager ;
6263 private XceiverClientSpi xceiverClient ;
6364 private List <ChunkInfo > chunks ;
65+ // ChunkIndex points to the index current chunk in the buffers or the the
66+ // index of chunk which will be read next into the buffers in
67+ // readChunkFromContainer().
6468 private int chunkIndex ;
69+ // ChunkIndexOfCurrentBuffer points to the index of chunk read into the
70+ // buffers or index of the last chunk in the buffers. It is updated only
71+ // when a new chunk is read from container into the buffers.
72+ private int chunkIndexOfCurrentBuffer ;
6573 private long [] chunkOffset ;
6674 private List <ByteBuffer > buffers ;
6775 private int bufferIndex ;
76+ private long bufferPosition ;
6877 private final boolean verifyChecksum ;
6978
7079 /**
@@ -76,24 +85,34 @@ public class BlockInputStream extends InputStream implements Seekable {
7685 * @param chunks list of chunks to read
7786 * @param traceID container protocol call traceID
7887 * @param verifyChecksum verify checksum
88+ * @param initialPosition the initial position of the stream pointer. This
89+ * position is seeked now if the up-stream was seeked
90+ * before this was created.
7991 */
8092 public BlockInputStream (
8193 BlockID blockID , XceiverClientManager xceiverClientManager ,
8294 XceiverClientSpi xceiverClient , List <ChunkInfo > chunks , String traceID ,
83- boolean verifyChecksum ) {
95+ boolean verifyChecksum , long initialPosition ) throws IOException {
8496 this .blockID = blockID ;
8597 this .traceID = traceID ;
8698 this .xceiverClientManager = xceiverClientManager ;
8799 this .xceiverClient = xceiverClient ;
88100 this .chunks = chunks ;
89- this .chunkIndex = -1 ;
101+ this .chunkIndex = 0 ;
102+ this .chunkIndexOfCurrentBuffer = -1 ;
90103 // chunkOffset[i] stores offset at which chunk i stores data in
91104 // BlockInputStream
92105 this .chunkOffset = new long [this .chunks .size ()];
93106 initializeChunkOffset ();
94107 this .buffers = null ;
95108 this .bufferIndex = 0 ;
109+ this .bufferPosition = -1 ;
96110 this .verifyChecksum = verifyChecksum ;
111+ if (initialPosition > 0 ) {
112+ // The stream was seeked to a position before the stream was
113+ // initialized. So seeking to the position now.
114+ seek (initialPosition );
115+ }
97116 }
98117
99118 private void initializeChunkOffset () {
@@ -176,7 +195,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
176195 *
177196 * @return true if EOF, false if more data is available
178197 */
179- private boolean blockStreamEOF () {
198+ protected boolean blockStreamEOF () {
180199 if (buffersHaveData () || chunksRemaining ()) {
181200 return false ;
182201 } else {
@@ -223,12 +242,19 @@ private synchronized void checkOpen() throws IOException {
223242 */
224243 private synchronized int prepareRead (int len ) throws IOException {
225244 for (;;) {
245+ if (!buffersAllocated ()) {
246+ // The current chunk at chunkIndex has not been read from the
247+ // container. Read the chunk and put the data into buffers.
248+ readChunkFromContainer ();
249+ }
226250 if (buffersHaveData ()) {
227251 // Data is available from buffers
228252 ByteBuffer bb = buffers .get (bufferIndex );
229253 return len > bb .remaining () ? bb .remaining () : len ;
230254 } else if (chunksRemaining ()) {
231255 // There are additional chunks available.
256+ // Read the next chunk in the block.
257+ chunkIndex += 1 ;
232258 readChunkFromContainer ();
233259 } else {
234260 // All available input has been consumed.
@@ -237,26 +263,31 @@ private synchronized int prepareRead(int len) throws IOException {
237263 }
238264 }
239265
240- private boolean buffersHaveData () {
241- boolean hasData = false ;
242-
266+ private boolean buffersAllocated () {
243267 if (buffers == null || buffers .isEmpty ()) {
244268 return false ;
245269 }
270+ return true ;
271+ }
246272
247- while (bufferIndex < (buffers .size ())) {
248- if (buffers .get (bufferIndex ).hasRemaining ()) {
249- // current buffer has data
250- hasData = true ;
251- break ;
252- } else {
253- if (buffersRemaining ()) {
254- // move to next available buffer
255- ++bufferIndex ;
256- Preconditions .checkState (bufferIndex < buffers .size ());
257- } else {
258- // no more buffers remaining
273+ private boolean buffersHaveData () {
274+ boolean hasData = false ;
275+
276+ if (buffersAllocated ()) {
277+ while (bufferIndex < (buffers .size ())) {
278+ if (buffers .get (bufferIndex ).hasRemaining ()) {
279+ // current buffer has data
280+ hasData = true ;
259281 break ;
282+ } else {
283+ if (buffersRemaining ()) {
284+ // move to next available buffer
285+ ++bufferIndex ;
286+ Preconditions .checkState (bufferIndex < buffers .size ());
287+ } else {
288+ // no more buffers remaining
289+ break ;
290+ }
260291 }
261292 }
262293 }
@@ -272,7 +303,14 @@ private boolean chunksRemaining() {
272303 if ((chunks == null ) || chunks .isEmpty ()) {
273304 return false ;
274305 }
275- return (chunkIndex < (chunks .size () - 1 ));
306+ // Check if more chunks are remaining in the stream after chunkIndex
307+ if (chunkIndex < (chunks .size () - 1 )) {
308+ return true ;
309+ }
310+ // ChunkIndex is the last chunk in the stream. Check if this chunk has
311+ // been read from container or not. Return true if chunkIndex has not
312+ // been read yet and false otherwise.
313+ return chunkIndexOfCurrentBuffer != chunkIndex ;
276314 }
277315
278316 /**
@@ -283,34 +321,14 @@ private boolean chunksRemaining() {
283321 * @throws IOException if there is an I/O error while performing the call
284322 */
285323 private synchronized void readChunkFromContainer () throws IOException {
286- // On every chunk read chunkIndex should be increased so as to read the
287- // next chunk
288- chunkIndex += 1 ;
289- XceiverClientReply reply ;
290- ReadChunkResponseProto readChunkResponse = null ;
324+ // Read the chunk at chunkIndex
291325 final ChunkInfo chunkInfo = chunks .get (chunkIndex );
292326 List <DatanodeDetails > excludeDns = null ;
293327 ByteString byteString ;
294- List <DatanodeDetails > dnList = xceiverClient . getPipeline (). getNodes ();
328+ List <DatanodeDetails > dnList = getDatanodeList ();
295329 while (true ) {
296- try {
297- reply = ContainerProtocolCalls
298- .readChunk (xceiverClient , chunkInfo , blockID , traceID , excludeDns );
299- ContainerProtos .ContainerCommandResponseProto response ;
300- response = reply .getResponse ().get ();
301- ContainerProtocolCalls .validateContainerResponse (response );
302- readChunkResponse = response .getReadChunk ();
303- } catch (IOException e ) {
304- if (e instanceof StorageContainerException ) {
305- throw e ;
306- }
307- throw new IOException ("Unexpected OzoneException: " + e .toString (), e );
308- } catch (ExecutionException | InterruptedException e ) {
309- throw new IOException (
310- "Failed to execute ReadChunk command for chunk " + chunkInfo
311- .getChunkName (), e );
312- }
313- byteString = readChunkResponse .getData ();
330+ List <DatanodeDetails > dnListFromReadChunkCall = new ArrayList <>();
331+ byteString = readChunk (chunkInfo , excludeDns , dnListFromReadChunkCall );
314332 try {
315333 if (byteString .size () != chunkInfo .getLen ()) {
316334 // Bytes read from chunk should be equal to chunk size.
@@ -333,7 +351,7 @@ private synchronized void readChunkFromContainer() throws IOException {
333351 if (excludeDns == null ) {
334352 excludeDns = new ArrayList <>();
335353 }
336- excludeDns .addAll (reply . getDatanodes () );
354+ excludeDns .addAll (dnListFromReadChunkCall );
337355 if (excludeDns .size () == dnList .size ()) {
338356 throw ioe ;
339357 }
@@ -342,6 +360,47 @@ private synchronized void readChunkFromContainer() throws IOException {
342360
343361 buffers = byteString .asReadOnlyByteBufferList ();
344362 bufferIndex = 0 ;
363+ chunkIndexOfCurrentBuffer = chunkIndex ;
364+
365+ // The bufferIndex and position might need to be adjusted if seek() was
366+ // called on the stream before. This needs to be done so that the buffer
367+ // position can be advanced to the 'seeked' position.
368+ adjustBufferIndex ();
369+ }
370+
371+ /**
372+ * Send RPC call to get the chunk from the container.
373+ */
374+ @ VisibleForTesting
375+ protected ByteString readChunk (final ChunkInfo chunkInfo ,
376+ List <DatanodeDetails > excludeDns , List <DatanodeDetails > dnListFromReply )
377+ throws IOException {
378+ XceiverClientReply reply ;
379+ ReadChunkResponseProto readChunkResponse = null ;
380+ try {
381+ reply = ContainerProtocolCalls
382+ .readChunk (xceiverClient , chunkInfo , blockID , traceID , excludeDns );
383+ ContainerProtos .ContainerCommandResponseProto response ;
384+ response = reply .getResponse ().get ();
385+ ContainerProtocolCalls .validateContainerResponse (response );
386+ readChunkResponse = response .getReadChunk ();
387+ dnListFromReply .addAll (reply .getDatanodes ());
388+ } catch (IOException e ) {
389+ if (e instanceof StorageContainerException ) {
390+ throw e ;
391+ }
392+ throw new IOException ("Unexpected OzoneException: " + e .toString (), e );
393+ } catch (ExecutionException | InterruptedException e ) {
394+ throw new IOException (
395+ "Failed to execute ReadChunk command for chunk " + chunkInfo
396+ .getChunkName (), e );
397+ }
398+ return readChunkResponse .getData ();
399+ }
400+
401+ @ VisibleForTesting
402+ protected List <DatanodeDetails > getDatanodeList () {
403+ return xceiverClient .getPipeline ().getNodes ();
345404 }
346405
347406 @ Override
@@ -352,9 +411,8 @@ public synchronized void seek(long pos) throws IOException {
352411 throw new EOFException ("EOF encountered pos: " + pos + " container key: "
353412 + blockID .getLocalID ());
354413 }
355- if (chunkIndex == -1 ) {
356- chunkIndex = Arrays .binarySearch (chunkOffset , pos );
357- } else if (pos < chunkOffset [chunkIndex ]) {
414+
415+ if (pos < chunkOffset [chunkIndex ]) {
358416 chunkIndex = Arrays .binarySearch (chunkOffset , 0 , chunkIndex , pos );
359417 } else if (pos >= chunkOffset [chunkIndex ] + chunks .get (chunkIndex )
360418 .getLen ()) {
@@ -368,40 +426,71 @@ public synchronized void seek(long pos) throws IOException {
368426 // accordingly so that chunkIndex = insertionPoint - 1
369427 chunkIndex = -chunkIndex -2 ;
370428 }
371- // adjust chunkIndex so that readChunkFromContainer reads the correct chunk
372- chunkIndex -= 1 ;
373- readChunkFromContainer ();
374- adjustBufferIndex (pos );
429+
430+ // The bufferPosition should be adjusted to account for the chunk offset
431+ // of the chunk the the pos actually points to.
432+ bufferPosition = pos - chunkOffset [chunkIndex ];
433+
434+ // Check if current buffers correspond to the chunk index being seeked
435+ // and if the buffers have any data.
436+ if (chunkIndex == chunkIndexOfCurrentBuffer && buffersAllocated ()) {
437+ // Position the buffer to the seeked position.
438+ adjustBufferIndex ();
439+ } else {
440+ // Release the current buffers. The next readChunkFromContainer will
441+ // read the required chunk and position the buffer to the seeked
442+ // position.
443+ releaseBuffers ();
444+ }
375445 }
376446
377- private void adjustBufferIndex (long pos ) {
378- long tempOffest = chunkOffset [chunkIndex ];
447+ private void adjustBufferIndex () {
448+ if (bufferPosition == -1 ) {
449+ // The stream has not been seeked to a position. No need to adjust the
450+ // buffer Index and position.
451+ return ;
452+ }
453+ // The bufferPosition is w.r.t the buffers for current chunk.
454+ // Adjust the bufferIndex and position to the seeked position.
455+ long tempOffest = 0 ;
379456 for (int i = 0 ; i < buffers .size (); i ++) {
380- if (pos - tempOffest >= buffers .get (i ).capacity ()) {
457+ if (bufferPosition - tempOffest >= buffers .get (i ).capacity ()) {
381458 tempOffest += buffers .get (i ).capacity ();
382459 } else {
383460 bufferIndex = i ;
384461 break ;
385462 }
386463 }
387- buffers .get (bufferIndex ).position ((int ) (pos - tempOffest ));
464+ buffers .get (bufferIndex ).position ((int ) (bufferPosition - tempOffest ));
465+ // Reset the bufferPosition as the seek() operation has been completed.
466+ bufferPosition = -1 ;
388467 }
389468
390469 @ Override
391470 public synchronized long getPos () throws IOException {
392- if (chunkIndex == -1 ) {
393- // no data consumed yet, a new stream OR after seek
394- return 0 ;
395- }
396-
397- if (blockStreamEOF ()) {
471+ // position = chunkOffset of current chunk (at chunkIndex) + position of
472+ // the buffer corresponding to the chunk.
473+ long bufferPos = 0 ;
474+
475+ if (bufferPosition >= 0 ) {
476+ // seek has been called but the buffers were empty. Hence, the buffer
477+ // position will be advanced after the buffers are filled.
478+ // We return the chunkOffset + bufferPosition here as that will be the
479+ // position of the buffer pointer after reading the chunk file.
480+ bufferPos = bufferPosition ;
481+
482+ } else if (blockStreamEOF ()) {
398483 // all data consumed, buffers have been released.
399484 // get position from the chunk offset and chunk length of last chunk
400- return chunkOffset [chunkIndex ] + chunks .get (chunkIndex ).getLen ();
485+ bufferPos = chunks .get (chunkIndex ).getLen ();
486+
487+ } else if (buffersAllocated ()) {
488+ // get position from available buffers of current chunk
489+ bufferPos = buffers .get (bufferIndex ).position ();
490+
401491 }
402492
403- // get position from available buffers of current chunk
404- return chunkOffset [chunkIndex ] + buffers .get (bufferIndex ).position ();
493+ return chunkOffset [chunkIndex ] + bufferPos ;
405494 }
406495
407496 @ Override
@@ -412,4 +501,9 @@ public boolean seekToNewSource(long targetPos) throws IOException {
412501 public BlockID getBlockID () {
413502 return blockID ;
414503 }
504+
505+ @ VisibleForTesting
506+ protected int getChunkIndex () {
507+ return chunkIndex ;
508+ }
415509}
0 commit comments