2626import java .util .EnumSet ;
2727import java .util .Random ;
2828
29+ import org .apache .hadoop .fs .ByteBufferPositionedReadable ;
2930import org .apache .hadoop .fs .ByteBufferReadable ;
3031import org .apache .hadoop .fs .CanUnbuffer ;
3132import org .apache .hadoop .fs .FSDataOutputStream ;
@@ -129,6 +130,32 @@ private void preadCheck(PositionedReadable in) throws Exception {
129130 Assert .assertArrayEquals (result , expectedData );
130131 }
131132
133+ private int byteBufferPreadAll (ByteBufferPositionedReadable in ,
134+ ByteBuffer buf ) throws IOException {
135+ int n = 0 ;
136+ int total = 0 ;
137+ while (n != -1 ) {
138+ total += n ;
139+ if (!buf .hasRemaining ()) {
140+ break ;
141+ }
142+ n = in .read (total , buf );
143+ }
144+
145+ return total ;
146+ }
147+
148+ private void byteBufferPreadCheck (ByteBufferPositionedReadable in )
149+ throws Exception {
150+ ByteBuffer result = ByteBuffer .allocate (dataLen );
151+ int n = byteBufferPreadAll (in , result );
152+
153+ Assert .assertEquals (dataLen , n );
154+ ByteBuffer expectedData = ByteBuffer .allocate (n );
155+ expectedData .put (data , 0 , n );
156+ Assert .assertArrayEquals (result .array (), expectedData .array ());
157+ }
158+
132159 protected OutputStream getOutputStream (int bufferSize ) throws IOException {
133160 return getOutputStream (bufferSize , key , iv );
134161 }
@@ -288,20 +315,36 @@ private int readAll(InputStream in, long pos, byte[] b, int off, int len)
288315
289316 return total ;
290317 }
318+
319+ private int readAll (InputStream in , long pos , ByteBuffer buf )
320+ throws IOException {
321+ int n = 0 ;
322+ int total = 0 ;
323+ while (n != -1 ) {
324+ total += n ;
325+ if (!buf .hasRemaining ()) {
326+ break ;
327+ }
328+ n = ((ByteBufferPositionedReadable ) in ).read (pos + total , buf );
329+ }
330+
331+ return total ;
332+ }
291333
292334 /** Test positioned read. */
293335 @ Test (timeout =120000 )
294336 public void testPositionedRead () throws Exception {
295- OutputStream out = getOutputStream (defaultBufferSize );
296- writeData (out );
337+ try (OutputStream out = getOutputStream (defaultBufferSize )) {
338+ writeData (out );
339+ }
297340
298- InputStream in = getInputStream (defaultBufferSize );
299- // Pos: 1/3 dataLen
300- positionedReadCheck (in , dataLen / 3 );
341+ try ( InputStream in = getInputStream (defaultBufferSize )) {
342+ // Pos: 1/3 dataLen
343+ positionedReadCheck (in , dataLen / 3 );
301344
302- // Pos: 1/2 dataLen
303- positionedReadCheck (in , dataLen / 2 );
304- in . close ();
345+ // Pos: 1/2 dataLen
346+ positionedReadCheck (in , dataLen / 2 );
347+ }
305348 }
306349
307350 private void positionedReadCheck (InputStream in , int pos ) throws Exception {
@@ -315,6 +358,35 @@ private void positionedReadCheck(InputStream in, int pos) throws Exception {
315358 System .arraycopy (data , pos , expectedData , 0 , n );
316359 Assert .assertArrayEquals (readData , expectedData );
317360 }
361+
362+ /** Test positioned read with ByteBuffers. */
363+ @ Test (timeout =120000 )
364+ public void testPositionedReadWithByteBuffer () throws Exception {
365+ try (OutputStream out = getOutputStream (defaultBufferSize )) {
366+ writeData (out );
367+ }
368+
369+ try (InputStream in = getInputStream (defaultBufferSize )) {
370+ // Pos: 1/3 dataLen
371+ positionedReadCheckWithByteBuffer (in , dataLen / 3 );
372+
373+ // Pos: 1/2 dataLen
374+ positionedReadCheckWithByteBuffer (in , dataLen / 2 );
375+ }
376+ }
377+
378+ private void positionedReadCheckWithByteBuffer (InputStream in , int pos )
379+ throws Exception {
380+ ByteBuffer result = ByteBuffer .allocate (dataLen );
381+ int n = readAll (in , pos , result );
382+
383+ Assert .assertEquals (dataLen , n + pos );
384+ byte [] readData = new byte [n ];
385+ System .arraycopy (result .array (), 0 , readData , 0 , n );
386+ byte [] expectedData = new byte [n ];
387+ System .arraycopy (data , pos , expectedData , 0 , n );
388+ Assert .assertArrayEquals (readData , expectedData );
389+ }
318390
319391 /** Test read fully */
320392 @ Test (timeout =120000 )
@@ -505,12 +577,40 @@ private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
505577 System .arraycopy (data , 0 , expectedData , 0 , n );
506578 Assert .assertArrayEquals (readData , expectedData );
507579 }
580+
581+ private void byteBufferPreadCheck (InputStream in , ByteBuffer buf ,
582+ int bufPos ) throws Exception {
583+ // Test reading from position 0
584+ buf .position (bufPos );
585+ int n = ((ByteBufferPositionedReadable ) in ).read (0 , buf );
586+ Assert .assertEquals (bufPos + n , buf .position ());
587+ byte [] readData = new byte [n ];
588+ buf .rewind ();
589+ buf .position (bufPos );
590+ buf .get (readData );
591+ byte [] expectedData = new byte [n ];
592+ System .arraycopy (data , 0 , expectedData , 0 , n );
593+ Assert .assertArrayEquals (readData , expectedData );
594+
595+ // Test reading from half way through the data
596+ buf .position (bufPos );
597+ n = ((ByteBufferPositionedReadable ) in ).read (dataLen / 2 , buf );
598+ Assert .assertEquals (bufPos + n , buf .position ());
599+ readData = new byte [n ];
600+ buf .rewind ();
601+ buf .position (bufPos );
602+ buf .get (readData );
603+ expectedData = new byte [n ];
604+ System .arraycopy (data , dataLen / 2 , expectedData , 0 , n );
605+ Assert .assertArrayEquals (readData , expectedData );
606+ }
508607
509608 /** Test byte buffer read with different buffer size. */
510609 @ Test (timeout =120000 )
511610 public void testByteBufferRead () throws Exception {
512- OutputStream out = getOutputStream (defaultBufferSize );
513- writeData (out );
611+ try (OutputStream out = getOutputStream (defaultBufferSize )) {
612+ writeData (out );
613+ }
514614
515615 // Default buffer size, initial buffer position is 0
516616 InputStream in = getInputStream (defaultBufferSize );
@@ -560,6 +660,53 @@ public void testByteBufferRead() throws Exception {
560660 byteBufferReadCheck (in , buf , 11 );
561661 in .close ();
562662 }
663+
664+ /** Test byte buffer pread with different buffer size. */
665+ @ Test (timeout =120000 )
666+ public void testByteBufferPread () throws Exception {
667+ try (OutputStream out = getOutputStream (defaultBufferSize )) {
668+ writeData (out );
669+ }
670+
671+ try (InputStream defaultBuf = getInputStream (defaultBufferSize );
672+ InputStream smallBuf = getInputStream (smallBufferSize )) {
673+
674+ ByteBuffer buf = ByteBuffer .allocate (dataLen + 100 );
675+
676+ // Default buffer size, initial buffer position is 0
677+ byteBufferPreadCheck (defaultBuf , buf , 0 );
678+
679+ // Default buffer size, initial buffer position is not 0
680+ buf .clear ();
681+ byteBufferPreadCheck (defaultBuf , buf , 11 );
682+
683+ // Small buffer size, initial buffer position is 0
684+ buf .clear ();
685+ byteBufferPreadCheck (smallBuf , buf , 0 );
686+
687+ // Small buffer size, initial buffer position is not 0
688+ buf .clear ();
689+ byteBufferPreadCheck (smallBuf , buf , 11 );
690+
691+ // Test with direct ByteBuffer
692+ buf = ByteBuffer .allocateDirect (dataLen + 100 );
693+
694+ // Direct buffer, default buffer size, initial buffer position is 0
695+ byteBufferPreadCheck (defaultBuf , buf , 0 );
696+
697+ // Direct buffer, default buffer size, initial buffer position is not 0
698+ buf .clear ();
699+ byteBufferPreadCheck (defaultBuf , buf , 11 );
700+
701+ // Direct buffer, small buffer size, initial buffer position is 0
702+ buf .clear ();
703+ byteBufferPreadCheck (smallBuf , buf , 0 );
704+
705+ // Direct buffer, small buffer size, initial buffer position is not 0
706+ buf .clear ();
707+ byteBufferPreadCheck (smallBuf , buf , 11 );
708+ }
709+ }
563710
564711 @ Test (timeout =120000 )
565712 public void testCombinedOp () throws Exception {
@@ -797,5 +944,23 @@ public void testUnbuffer() throws Exception {
797944 // The close will be called when exiting this try-with-resource block
798945 }
799946 }
947+
948+ // Test ByteBuffer pread
949+ try (InputStream in = getInputStream (smallBufferSize )) {
950+ if (in instanceof ByteBufferPositionedReadable ) {
951+ ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable ) in ;
952+
953+ // Test unbuffer after pread
954+ byteBufferPreadCheck (bbpin );
955+ ((CanUnbuffer ) in ).unbuffer ();
956+
957+ // Test pread again after unbuffer
958+ byteBufferPreadCheck (bbpin );
959+
960+ // Test close after unbuffer
961+ ((CanUnbuffer ) in ).unbuffer ();
962+ // The close will be called when exiting this try-with-resource block
963+ }
964+ }
800965 }
801966}
0 commit comments