2626import java .util .EnumSet ;
2727import java .util .Random ;
2828
29+ import org .apache .hadoop .fs .ByteBufferPositionedReadable ;
2930import org .apache .hadoop .fs .ByteBufferReadable ;
3031import org .apache .hadoop .fs .CanUnbuffer ;
3132import org .apache .hadoop .fs .FSDataOutputStream ;
@@ -129,6 +130,32 @@ private void preadCheck(PositionedReadable in) throws Exception {
129130 Assert .assertArrayEquals (result , expectedData );
130131 }
131132
133+ private int byteBufferPreadAll (ByteBufferPositionedReadable in ,
134+ ByteBuffer buf ) throws IOException {
135+ int n = 0 ;
136+ int total = 0 ;
137+ while (n != -1 ) {
138+ total += n ;
139+ if (!buf .hasRemaining ()) {
140+ break ;
141+ }
142+ n = in .read (total , buf );
143+ }
144+
145+ return total ;
146+ }
147+
148+ private void byteBufferPreadCheck (ByteBufferPositionedReadable in )
149+ throws Exception {
150+ ByteBuffer result = ByteBuffer .allocate (dataLen );
151+ int n = byteBufferPreadAll (in , result );
152+
153+ Assert .assertEquals (dataLen , n );
154+ ByteBuffer expectedData = ByteBuffer .allocate (n );
155+ expectedData .put (data , 0 , n );
156+ Assert .assertArrayEquals (result .array (), expectedData .array ());
157+ }
158+
132159 protected OutputStream getOutputStream (int bufferSize ) throws IOException {
133160 return getOutputStream (bufferSize , key , iv );
134161 }
@@ -288,20 +315,36 @@ private int readAll(InputStream in, long pos, byte[] b, int off, int len)
288315
289316 return total ;
290317 }
318+
319+ private int readAll (InputStream in , long pos , ByteBuffer buf )
320+ throws IOException {
321+ int n = 0 ;
322+ int total = 0 ;
323+ while (n != -1 ) {
324+ total += n ;
325+ if (!buf .hasRemaining ()) {
326+ break ;
327+ }
328+ n = ((ByteBufferPositionedReadable ) in ).read (pos + total , buf );
329+ }
330+
331+ return total ;
332+ }
291333
292334 /** Test positioned read. */
293335 @ Test (timeout =120000 )
294336 public void testPositionedRead () throws Exception {
295- OutputStream out = getOutputStream (defaultBufferSize );
296- writeData (out );
337+ try (OutputStream out = getOutputStream (defaultBufferSize )) {
338+ writeData (out );
339+ }
297340
298- InputStream in = getInputStream (defaultBufferSize );
299- // Pos: 1/3 dataLen
300- positionedReadCheck (in , dataLen / 3 );
341+ try ( InputStream in = getInputStream (defaultBufferSize )) {
342+ // Pos: 1/3 dataLen
343+ positionedReadCheck (in , dataLen / 3 );
301344
302- // Pos: 1/2 dataLen
303- positionedReadCheck (in , dataLen / 2 );
304- in . close ();
345+ // Pos: 1/2 dataLen
346+ positionedReadCheck (in , dataLen / 2 );
347+ }
305348 }
306349
307350 private void positionedReadCheck (InputStream in , int pos ) throws Exception {
@@ -315,6 +358,35 @@ private void positionedReadCheck(InputStream in, int pos) throws Exception {
315358 System .arraycopy (data , pos , expectedData , 0 , n );
316359 Assert .assertArrayEquals (readData , expectedData );
317360 }
361+
362+ /** Test positioned read with ByteBuffers. */
363+ @ Test (timeout =120000 )
364+ public void testPositionedReadWithByteBuffer () throws Exception {
365+ try (OutputStream out = getOutputStream (defaultBufferSize )) {
366+ writeData (out );
367+ }
368+
369+ try (InputStream in = getInputStream (defaultBufferSize )) {
370+ // Pos: 1/3 dataLen
371+ positionedReadCheckWithByteBuffer (in , dataLen / 3 );
372+
373+ // Pos: 1/2 dataLen
374+ positionedReadCheckWithByteBuffer (in , dataLen / 2 );
375+ }
376+ }
377+
378+ private void positionedReadCheckWithByteBuffer (InputStream in , int pos )
379+ throws Exception {
380+ ByteBuffer result = ByteBuffer .allocate (dataLen );
381+ int n = readAll (in , pos , result );
382+
383+ Assert .assertEquals (dataLen , n + pos );
384+ byte [] readData = new byte [n ];
385+ System .arraycopy (result .array (), 0 , readData , 0 , n );
386+ byte [] expectedData = new byte [n ];
387+ System .arraycopy (data , pos , expectedData , 0 , n );
388+ Assert .assertArrayEquals (readData , expectedData );
389+ }
318390
319391 /** Test read fully. */
320392 @ Test (timeout =120000 )
@@ -558,12 +630,40 @@ private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
558630 System .arraycopy (data , 0 , expectedData , 0 , n );
559631 Assert .assertArrayEquals (readData , expectedData );
560632 }
633+
634+ private void byteBufferPreadCheck (InputStream in , ByteBuffer buf ,
635+ int bufPos ) throws Exception {
636+ // Test reading from position 0
637+ buf .position (bufPos );
638+ int n = ((ByteBufferPositionedReadable ) in ).read (0 , buf );
639+ Assert .assertEquals (bufPos + n , buf .position ());
640+ byte [] readData = new byte [n ];
641+ buf .rewind ();
642+ buf .position (bufPos );
643+ buf .get (readData );
644+ byte [] expectedData = new byte [n ];
645+ System .arraycopy (data , 0 , expectedData , 0 , n );
646+ Assert .assertArrayEquals (readData , expectedData );
647+
648+ // Test reading from half way through the data
649+ buf .position (bufPos );
650+ n = ((ByteBufferPositionedReadable ) in ).read (dataLen / 2 , buf );
651+ Assert .assertEquals (bufPos + n , buf .position ());
652+ readData = new byte [n ];
653+ buf .rewind ();
654+ buf .position (bufPos );
655+ buf .get (readData );
656+ expectedData = new byte [n ];
657+ System .arraycopy (data , dataLen / 2 , expectedData , 0 , n );
658+ Assert .assertArrayEquals (readData , expectedData );
659+ }
561660
562661 /** Test byte buffer read with different buffer size. */
563662 @ Test (timeout =120000 )
564663 public void testByteBufferRead () throws Exception {
565- OutputStream out = getOutputStream (defaultBufferSize );
566- writeData (out );
664+ try (OutputStream out = getOutputStream (defaultBufferSize )) {
665+ writeData (out );
666+ }
567667
568668 // Default buffer size, initial buffer position is 0
569669 InputStream in = getInputStream (defaultBufferSize );
@@ -613,6 +713,53 @@ public void testByteBufferRead() throws Exception {
613713 byteBufferReadCheck (in , buf , 11 );
614714 in .close ();
615715 }
716+
717+ /** Test byte buffer pread with different buffer size. */
718+ @ Test (timeout =120000 )
719+ public void testByteBufferPread () throws Exception {
720+ try (OutputStream out = getOutputStream (defaultBufferSize )) {
721+ writeData (out );
722+ }
723+
724+ try (InputStream defaultBuf = getInputStream (defaultBufferSize );
725+ InputStream smallBuf = getInputStream (smallBufferSize )) {
726+
727+ ByteBuffer buf = ByteBuffer .allocate (dataLen + 100 );
728+
729+ // Default buffer size, initial buffer position is 0
730+ byteBufferPreadCheck (defaultBuf , buf , 0 );
731+
732+ // Default buffer size, initial buffer position is not 0
733+ buf .clear ();
734+ byteBufferPreadCheck (defaultBuf , buf , 11 );
735+
736+ // Small buffer size, initial buffer position is 0
737+ buf .clear ();
738+ byteBufferPreadCheck (smallBuf , buf , 0 );
739+
740+ // Small buffer size, initial buffer position is not 0
741+ buf .clear ();
742+ byteBufferPreadCheck (smallBuf , buf , 11 );
743+
744+ // Test with direct ByteBuffer
745+ buf = ByteBuffer .allocateDirect (dataLen + 100 );
746+
747+ // Direct buffer, default buffer size, initial buffer position is 0
748+ byteBufferPreadCheck (defaultBuf , buf , 0 );
749+
750+ // Direct buffer, default buffer size, initial buffer position is not 0
751+ buf .clear ();
752+ byteBufferPreadCheck (defaultBuf , buf , 11 );
753+
754+ // Direct buffer, small buffer size, initial buffer position is 0
755+ buf .clear ();
756+ byteBufferPreadCheck (smallBuf , buf , 0 );
757+
758+ // Direct buffer, small buffer size, initial buffer position is not 0
759+ buf .clear ();
760+ byteBufferPreadCheck (smallBuf , buf , 11 );
761+ }
762+ }
616763
617764 @ Test (timeout =120000 )
618765 public void testCombinedOp () throws Exception {
@@ -850,5 +997,23 @@ public void testUnbuffer() throws Exception {
850997 // The close will be called when exiting this try-with-resource block
851998 }
852999 }
1000+
1001+ // Test ByteBuffer pread
1002+ try (InputStream in = getInputStream (smallBufferSize )) {
1003+ if (in instanceof ByteBufferPositionedReadable ) {
1004+ ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable ) in ;
1005+
1006+ // Test unbuffer after pread
1007+ byteBufferPreadCheck (bbpin );
1008+ ((CanUnbuffer ) in ).unbuffer ();
1009+
1010+ // Test pread again after unbuffer
1011+ byteBufferPreadCheck (bbpin );
1012+
1013+ // Test close after unbuffer
1014+ ((CanUnbuffer ) in ).unbuffer ();
1015+ // The close will be called when exiting this try-with-resource block
1016+ }
1017+ }
8531018 }
8541019}
0 commit comments