Skip to content

Commit 7c85ec6

Browse files
sahilTakiardeepakdamri
authored andcommitted
HDFS-3246: pRead equivalent for direct read path (apache#597)
HDFS-3246: pRead equivalent for direct read path Contributed by Sahil Takiar
1 parent d1abb65 commit 7c85ec6

File tree

14 files changed

+1424
-523
lines changed

14 files changed

+1424
-523
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

Lines changed: 198 additions & 94 deletions
Large diffs are not rendered by default.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.fs;
19+
20+
import java.io.IOException;
21+
import java.nio.ByteBuffer;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
import org.apache.hadoop.classification.InterfaceStability;
25+
26+
/**
27+
* Implementers of this interface provide a positioned read API that writes to a
28+
* {@link ByteBuffer} rather than a {@code byte[]}.
29+
*
30+
* @see PositionedReadable
31+
* @see ByteBufferReadable
32+
*/
33+
@InterfaceAudience.Public
34+
@InterfaceStability.Evolving
35+
public interface ByteBufferPositionedReadable {
36+
/**
37+
* Reads up to {@code buf.remaining()} bytes into buf from a given position
38+
* in the file and returns the number of bytes read. Callers should use
39+
* {@code buf.limit(...)} to control the size of the desired read and
40+
* {@code buf.position(...)} to control the offset into the buffer the data
41+
* should be written to.
42+
* <p>
43+
* After a successful call, {@code buf.position()} will be advanced by the
44+
* number of bytes read and {@code buf.limit()} will be unchanged.
45+
* <p>
46+
* In the case of an exception, the state of the buffer (the contents of the
47+
* buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
48+
* undefined, and callers should be prepared to recover from this
49+
* eventuality.
50+
* <p>
51+
* Callers should use {@link StreamCapabilities#hasCapability(String)} with
52+
* {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying
53+
* stream supports this interface, otherwise they might get a
54+
* {@link UnsupportedOperationException}.
55+
* <p>
56+
* Implementations should treat 0-length requests as legitimate, and must not
57+
* signal an error upon their receipt.
58+
*
59+
* @param position position within file
60+
* @param buf the ByteBuffer to receive the results of the read operation.
61+
* @return the number of bytes read, possibly zero, or -1 if reached
62+
* end-of-stream
63+
* @throws IOException if there is some error performing the read
64+
*/
65+
int read(long position, ByteBuffer buf) throws IOException;
66+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
22+
2223
import org.apache.hadoop.classification.InterfaceAudience;
2324
import org.apache.hadoop.classification.InterfaceStability;
2425

@@ -32,18 +33,20 @@ public interface ByteBufferReadable {
3233
/**
3334
* Reads up to buf.remaining() bytes into buf. Callers should use
3435
* buf.limit(..) to control the size of the desired read.
35-
* <p/>
36-
* After a successful call, buf.position() will be advanced by the number
37-
* of bytes read and buf.limit() should be unchanged.
38-
* <p/>
39-
* In the case of an exception, the values of buf.position() and buf.limit()
40-
* are undefined, and callers should be prepared to recover from this
36+
* <p>
37+
* After a successful call, {@code buf.position()} will be advanced by the
38+
* number of bytes read and {@code buf.limit()} will be unchanged.
39+
* <p>
40+
* In the case of an exception, the state of the buffer (the contents of the
41+
* buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
42+
* undefined, and callers should be prepared to recover from this
4143
* eventuality.
42-
* <p/>
43-
* Many implementations will throw {@link UnsupportedOperationException}, so
44-
* callers that are not confident in support for this method from the
45-
* underlying filesystem should be prepared to handle that exception.
46-
* <p/>
44+
* <p>
45+
* Callers should use {@link StreamCapabilities#hasCapability(String)} with
46+
* {@link StreamCapabilities#READBYTEBUFFER} to check if the underlying
47+
* stream supports this interface, otherwise they might get a
48+
* {@link UnsupportedOperationException}.
49+
* <p>
4750
* Implementations should treat 0-length requests as legitimate, and must not
4851
* signal an error upon their receipt.
4952
*

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
public class FSDataInputStream extends DataInputStream
3939
implements Seekable, PositionedReadable,
4040
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
41-
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
41+
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
42+
ByteBufferPositionedReadable {
4243
/**
4344
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
4445
* objects
@@ -147,7 +148,8 @@ public int read(ByteBuffer buf) throws IOException {
147148
return ((ByteBufferReadable)in).read(buf);
148149
}
149150

150-
throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
151+
throw new UnsupportedOperationException("Byte-buffer read unsupported " +
152+
"by input stream");
151153
}
152154

153155
@Override
@@ -246,4 +248,13 @@ public boolean hasCapability(String capability) {
246248
public String toString() {
247249
return super.toString() + ": " + in;
248250
}
251+
252+
@Override
253+
public int read(long position, ByteBuffer buf) throws IOException {
254+
if (in instanceof ByteBufferPositionedReadable) {
255+
return ((ByteBufferPositionedReadable) in).read(position, buf);
256+
}
257+
throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
258+
"by input stream");
259+
}
249260
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ public interface StreamCapabilities {
6565
*/
6666
String READBYTEBUFFER = "in:readbytebuffer";
6767

68+
/**
69+
* Stream read(long, ByteBuffer) capability implemented by
70+
* {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
71+
*/
72+
String PREADBYTEBUFFER = "in:preadbytebuffer";
73+
6874
/**
6975
* Capabilities that a stream can support and be queried for.
7076
*/

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java

Lines changed: 175 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.EnumSet;
2727
import java.util.Random;
2828

29+
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
2930
import org.apache.hadoop.fs.ByteBufferReadable;
3031
import org.apache.hadoop.fs.CanUnbuffer;
3132
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -129,6 +130,32 @@ private void preadCheck(PositionedReadable in) throws Exception {
129130
Assert.assertArrayEquals(result, expectedData);
130131
}
131132

133+
private int byteBufferPreadAll(ByteBufferPositionedReadable in,
134+
ByteBuffer buf) throws IOException {
135+
int n = 0;
136+
int total = 0;
137+
while (n != -1) {
138+
total += n;
139+
if (!buf.hasRemaining()) {
140+
break;
141+
}
142+
n = in.read(total, buf);
143+
}
144+
145+
return total;
146+
}
147+
148+
private void byteBufferPreadCheck(ByteBufferPositionedReadable in)
149+
throws Exception {
150+
ByteBuffer result = ByteBuffer.allocate(dataLen);
151+
int n = byteBufferPreadAll(in, result);
152+
153+
Assert.assertEquals(dataLen, n);
154+
ByteBuffer expectedData = ByteBuffer.allocate(n);
155+
expectedData.put(data, 0, n);
156+
Assert.assertArrayEquals(result.array(), expectedData.array());
157+
}
158+
132159
protected OutputStream getOutputStream(int bufferSize) throws IOException {
133160
return getOutputStream(bufferSize, key, iv);
134161
}
@@ -288,20 +315,36 @@ private int readAll(InputStream in, long pos, byte[] b, int off, int len)
288315

289316
return total;
290317
}
318+
319+
private int readAll(InputStream in, long pos, ByteBuffer buf)
320+
throws IOException {
321+
int n = 0;
322+
int total = 0;
323+
while (n != -1) {
324+
total += n;
325+
if (!buf.hasRemaining()) {
326+
break;
327+
}
328+
n = ((ByteBufferPositionedReadable) in).read(pos + total, buf);
329+
}
330+
331+
return total;
332+
}
291333

292334
/** Test positioned read. */
293335
@Test(timeout=120000)
294336
public void testPositionedRead() throws Exception {
295-
OutputStream out = getOutputStream(defaultBufferSize);
296-
writeData(out);
337+
try (OutputStream out = getOutputStream(defaultBufferSize)) {
338+
writeData(out);
339+
}
297340

298-
InputStream in = getInputStream(defaultBufferSize);
299-
// Pos: 1/3 dataLen
300-
positionedReadCheck(in , dataLen / 3);
341+
try (InputStream in = getInputStream(defaultBufferSize)) {
342+
// Pos: 1/3 dataLen
343+
positionedReadCheck(in, dataLen / 3);
301344

302-
// Pos: 1/2 dataLen
303-
positionedReadCheck(in, dataLen / 2);
304-
in.close();
345+
// Pos: 1/2 dataLen
346+
positionedReadCheck(in, dataLen / 2);
347+
}
305348
}
306349

307350
private void positionedReadCheck(InputStream in, int pos) throws Exception {
@@ -315,6 +358,35 @@ private void positionedReadCheck(InputStream in, int pos) throws Exception {
315358
System.arraycopy(data, pos, expectedData, 0, n);
316359
Assert.assertArrayEquals(readData, expectedData);
317360
}
361+
362+
/** Test positioned read with ByteBuffers. */
363+
@Test(timeout=120000)
364+
public void testPositionedReadWithByteBuffer() throws Exception {
365+
try (OutputStream out = getOutputStream(defaultBufferSize)) {
366+
writeData(out);
367+
}
368+
369+
try (InputStream in = getInputStream(defaultBufferSize)) {
370+
// Pos: 1/3 dataLen
371+
positionedReadCheckWithByteBuffer(in, dataLen / 3);
372+
373+
// Pos: 1/2 dataLen
374+
positionedReadCheckWithByteBuffer(in, dataLen / 2);
375+
}
376+
}
377+
378+
private void positionedReadCheckWithByteBuffer(InputStream in, int pos)
379+
throws Exception {
380+
ByteBuffer result = ByteBuffer.allocate(dataLen);
381+
int n = readAll(in, pos, result);
382+
383+
Assert.assertEquals(dataLen, n + pos);
384+
byte[] readData = new byte[n];
385+
System.arraycopy(result.array(), 0, readData, 0, n);
386+
byte[] expectedData = new byte[n];
387+
System.arraycopy(data, pos, expectedData, 0, n);
388+
Assert.assertArrayEquals(readData, expectedData);
389+
}
318390

319391
/** Test read fully */
320392
@Test(timeout=120000)
@@ -505,12 +577,40 @@ private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
505577
System.arraycopy(data, 0, expectedData, 0, n);
506578
Assert.assertArrayEquals(readData, expectedData);
507579
}
580+
581+
private void byteBufferPreadCheck(InputStream in, ByteBuffer buf,
582+
int bufPos) throws Exception {
583+
// Test reading from position 0
584+
buf.position(bufPos);
585+
int n = ((ByteBufferPositionedReadable) in).read(0, buf);
586+
Assert.assertEquals(bufPos + n, buf.position());
587+
byte[] readData = new byte[n];
588+
buf.rewind();
589+
buf.position(bufPos);
590+
buf.get(readData);
591+
byte[] expectedData = new byte[n];
592+
System.arraycopy(data, 0, expectedData, 0, n);
593+
Assert.assertArrayEquals(readData, expectedData);
594+
595+
// Test reading from half way through the data
596+
buf.position(bufPos);
597+
n = ((ByteBufferPositionedReadable) in).read(dataLen / 2, buf);
598+
Assert.assertEquals(bufPos + n, buf.position());
599+
readData = new byte[n];
600+
buf.rewind();
601+
buf.position(bufPos);
602+
buf.get(readData);
603+
expectedData = new byte[n];
604+
System.arraycopy(data, dataLen / 2, expectedData, 0, n);
605+
Assert.assertArrayEquals(readData, expectedData);
606+
}
508607

509608
/** Test byte buffer read with different buffer size. */
510609
@Test(timeout=120000)
511610
public void testByteBufferRead() throws Exception {
512-
OutputStream out = getOutputStream(defaultBufferSize);
513-
writeData(out);
611+
try (OutputStream out = getOutputStream(defaultBufferSize)) {
612+
writeData(out);
613+
}
514614

515615
// Default buffer size, initial buffer position is 0
516616
InputStream in = getInputStream(defaultBufferSize);
@@ -560,6 +660,53 @@ public void testByteBufferRead() throws Exception {
560660
byteBufferReadCheck(in, buf, 11);
561661
in.close();
562662
}
663+
664+
/** Test byte buffer pread with different buffer size. */
665+
@Test(timeout=120000)
666+
public void testByteBufferPread() throws Exception {
667+
try (OutputStream out = getOutputStream(defaultBufferSize)) {
668+
writeData(out);
669+
}
670+
671+
try (InputStream defaultBuf = getInputStream(defaultBufferSize);
672+
InputStream smallBuf = getInputStream(smallBufferSize)) {
673+
674+
ByteBuffer buf = ByteBuffer.allocate(dataLen + 100);
675+
676+
// Default buffer size, initial buffer position is 0
677+
byteBufferPreadCheck(defaultBuf, buf, 0);
678+
679+
// Default buffer size, initial buffer position is not 0
680+
buf.clear();
681+
byteBufferPreadCheck(defaultBuf, buf, 11);
682+
683+
// Small buffer size, initial buffer position is 0
684+
buf.clear();
685+
byteBufferPreadCheck(smallBuf, buf, 0);
686+
687+
// Small buffer size, initial buffer position is not 0
688+
buf.clear();
689+
byteBufferPreadCheck(smallBuf, buf, 11);
690+
691+
// Test with direct ByteBuffer
692+
buf = ByteBuffer.allocateDirect(dataLen + 100);
693+
694+
// Direct buffer, default buffer size, initial buffer position is 0
695+
byteBufferPreadCheck(defaultBuf, buf, 0);
696+
697+
// Direct buffer, default buffer size, initial buffer position is not 0
698+
buf.clear();
699+
byteBufferPreadCheck(defaultBuf, buf, 11);
700+
701+
// Direct buffer, small buffer size, initial buffer position is 0
702+
buf.clear();
703+
byteBufferPreadCheck(smallBuf, buf, 0);
704+
705+
// Direct buffer, small buffer size, initial buffer position is not 0
706+
buf.clear();
707+
byteBufferPreadCheck(smallBuf, buf, 11);
708+
}
709+
}
563710

564711
@Test(timeout=120000)
565712
public void testCombinedOp() throws Exception {
@@ -797,5 +944,23 @@ public void testUnbuffer() throws Exception {
797944
// The close will be called when exiting this try-with-resource block
798945
}
799946
}
947+
948+
// Test ByteBuffer pread
949+
try (InputStream in = getInputStream(smallBufferSize)) {
950+
if (in instanceof ByteBufferPositionedReadable) {
951+
ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable) in;
952+
953+
// Test unbuffer after pread
954+
byteBufferPreadCheck(bbpin);
955+
((CanUnbuffer) in).unbuffer();
956+
957+
// Test pread again after unbuffer
958+
byteBufferPreadCheck(bbpin);
959+
960+
// Test close after unbuffer
961+
((CanUnbuffer) in).unbuffer();
962+
// The close will be called when exiting this try-with-resource block
963+
}
964+
}
800965
}
801966
}

0 commit comments

Comments
 (0)