Skip to content

Commit 0cdac14

Browse files
sahilTakiardeepakdamri
authored andcommitted
HDFS-3246: pRead equivalent for direct read path (apache#597)
HDFS-3246: pRead equivalent for direct read path Contributed by Sahil Takiar
1 parent 2e333be commit 0cdac14

File tree

8 files changed

+266
-18
lines changed

8 files changed

+266
-18
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,52 @@ private void decrypt(ByteBuffer buf, int n, int start)
566566
}
567567
buf.position(pos);
568568
}
569-
569+
570+
private void decrypt(long filePosition, ByteBuffer buf, int length, int start)
571+
throws IOException {
572+
ByteBuffer localInBuffer = null;
573+
ByteBuffer localOutBuffer = null;
574+
575+
// Duplicate the buffer so we don't have to worry about resetting the
576+
// original position and limit at the end of the method
577+
buf = buf.duplicate();
578+
579+
int decryptedBytes = 0;
580+
Decryptor localDecryptor = null;
581+
try {
582+
localInBuffer = getBuffer();
583+
localOutBuffer = getBuffer();
584+
localDecryptor = getDecryptor();
585+
byte[] localIV = initIV.clone();
586+
updateDecryptor(localDecryptor, filePosition, localIV);
587+
byte localPadding = getPadding(filePosition);
588+
// Set proper filePosition for inputdata.
589+
localInBuffer.position(localPadding);
590+
591+
while (decryptedBytes < length) {
592+
buf.position(start + decryptedBytes);
593+
buf.limit(start + decryptedBytes +
594+
Math.min(length - decryptedBytes, localInBuffer.remaining()));
595+
localInBuffer.put(buf);
596+
// Do decryption
597+
try {
598+
decrypt(localDecryptor, localInBuffer, localOutBuffer, localPadding);
599+
buf.position(start + decryptedBytes);
600+
buf.limit(start + length);
601+
decryptedBytes += localOutBuffer.remaining();
602+
buf.put(localOutBuffer);
603+
} finally {
604+
localPadding = afterDecryption(localDecryptor, localInBuffer,
605+
filePosition + length, localIV);
606+
}
607+
}
608+
} finally {
609+
returnBuffer(localInBuffer);
610+
returnBuffer(localOutBuffer);
611+
returnDecryptor(localDecryptor);
612+
}
613+
}
614+
570615
@Override
571616
public int available() throws IOException {
572617
checkStream();

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hadoop.fs;
2020

2121
import java.io.DataInputStream;
22+
import java.io.EOFException;
2223
import java.io.FileDescriptor;
2324
import java.io.FileInputStream;
2425
import java.io.IOException;

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ public interface StreamCapabilities {
5959
*/
6060
String UNBUFFER = "in:unbuffer";
6161

62+
/**
63+
* Stream read(ByteBuffer) capability implemented by
64+
* {@link ByteBufferReadable#read(java.nio.ByteBuffer)}.
65+
*/
66+
String READBYTEBUFFER = "in:readbytebuffer";
67+
68+
/**
69+
* Stream read(long, ByteBuffer) capability implemented by
70+
* {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
71+
*/
72+
String PREADBYTEBUFFER = "in:preadbytebuffer";
73+
6274
/**
6375
* Capabilities that a stream can support and be queried for.
6476
*/

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java

Lines changed: 175 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.EnumSet;
2727
import java.util.Random;
2828

29+
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
2930
import org.apache.hadoop.fs.ByteBufferReadable;
3031
import org.apache.hadoop.fs.CanUnbuffer;
3132
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -129,6 +130,32 @@ private void preadCheck(PositionedReadable in) throws Exception {
129130
Assert.assertArrayEquals(result, expectedData);
130131
}
131132

133+
private int byteBufferPreadAll(ByteBufferPositionedReadable in,
134+
ByteBuffer buf) throws IOException {
135+
int n = 0;
136+
int total = 0;
137+
while (n != -1) {
138+
total += n;
139+
if (!buf.hasRemaining()) {
140+
break;
141+
}
142+
n = in.read(total, buf);
143+
}
144+
145+
return total;
146+
}
147+
148+
private void byteBufferPreadCheck(ByteBufferPositionedReadable in)
149+
throws Exception {
150+
ByteBuffer result = ByteBuffer.allocate(dataLen);
151+
int n = byteBufferPreadAll(in, result);
152+
153+
Assert.assertEquals(dataLen, n);
154+
ByteBuffer expectedData = ByteBuffer.allocate(n);
155+
expectedData.put(data, 0, n);
156+
Assert.assertArrayEquals(result.array(), expectedData.array());
157+
}
158+
132159
protected OutputStream getOutputStream(int bufferSize) throws IOException {
133160
return getOutputStream(bufferSize, key, iv);
134161
}
@@ -288,20 +315,36 @@ private int readAll(InputStream in, long pos, byte[] b, int off, int len)
288315

289316
return total;
290317
}
318+
319+
private int readAll(InputStream in, long pos, ByteBuffer buf)
320+
throws IOException {
321+
int n = 0;
322+
int total = 0;
323+
while (n != -1) {
324+
total += n;
325+
if (!buf.hasRemaining()) {
326+
break;
327+
}
328+
n = ((ByteBufferPositionedReadable) in).read(pos + total, buf);
329+
}
330+
331+
return total;
332+
}
291333

292334
/** Test positioned read. */
293335
@Test(timeout=120000)
294336
public void testPositionedRead() throws Exception {
295-
OutputStream out = getOutputStream(defaultBufferSize);
296-
writeData(out);
337+
try (OutputStream out = getOutputStream(defaultBufferSize)) {
338+
writeData(out);
339+
}
297340

298-
InputStream in = getInputStream(defaultBufferSize);
299-
// Pos: 1/3 dataLen
300-
positionedReadCheck(in , dataLen / 3);
341+
try (InputStream in = getInputStream(defaultBufferSize)) {
342+
// Pos: 1/3 dataLen
343+
positionedReadCheck(in, dataLen / 3);
301344

302-
// Pos: 1/2 dataLen
303-
positionedReadCheck(in, dataLen / 2);
304-
in.close();
345+
// Pos: 1/2 dataLen
346+
positionedReadCheck(in, dataLen / 2);
347+
}
305348
}
306349

307350
private void positionedReadCheck(InputStream in, int pos) throws Exception {
@@ -315,6 +358,35 @@ private void positionedReadCheck(InputStream in, int pos) throws Exception {
315358
System.arraycopy(data, pos, expectedData, 0, n);
316359
Assert.assertArrayEquals(readData, expectedData);
317360
}
361+
362+
/** Test positioned read with ByteBuffers. */
363+
@Test(timeout=120000)
364+
public void testPositionedReadWithByteBuffer() throws Exception {
365+
try (OutputStream out = getOutputStream(defaultBufferSize)) {
366+
writeData(out);
367+
}
368+
369+
try (InputStream in = getInputStream(defaultBufferSize)) {
370+
// Pos: 1/3 dataLen
371+
positionedReadCheckWithByteBuffer(in, dataLen / 3);
372+
373+
// Pos: 1/2 dataLen
374+
positionedReadCheckWithByteBuffer(in, dataLen / 2);
375+
}
376+
}
377+
378+
private void positionedReadCheckWithByteBuffer(InputStream in, int pos)
379+
throws Exception {
380+
ByteBuffer result = ByteBuffer.allocate(dataLen);
381+
int n = readAll(in, pos, result);
382+
383+
Assert.assertEquals(dataLen, n + pos);
384+
byte[] readData = new byte[n];
385+
System.arraycopy(result.array(), 0, readData, 0, n);
386+
byte[] expectedData = new byte[n];
387+
System.arraycopy(data, pos, expectedData, 0, n);
388+
Assert.assertArrayEquals(readData, expectedData);
389+
}
318390

319391
/** Test read fully. */
320392
@Test(timeout=120000)
@@ -558,12 +630,40 @@ private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
558630
System.arraycopy(data, 0, expectedData, 0, n);
559631
Assert.assertArrayEquals(readData, expectedData);
560632
}
633+
634+
private void byteBufferPreadCheck(InputStream in, ByteBuffer buf,
635+
int bufPos) throws Exception {
636+
// Test reading from position 0
637+
buf.position(bufPos);
638+
int n = ((ByteBufferPositionedReadable) in).read(0, buf);
639+
Assert.assertEquals(bufPos + n, buf.position());
640+
byte[] readData = new byte[n];
641+
buf.rewind();
642+
buf.position(bufPos);
643+
buf.get(readData);
644+
byte[] expectedData = new byte[n];
645+
System.arraycopy(data, 0, expectedData, 0, n);
646+
Assert.assertArrayEquals(readData, expectedData);
647+
648+
// Test reading from half way through the data
649+
buf.position(bufPos);
650+
n = ((ByteBufferPositionedReadable) in).read(dataLen / 2, buf);
651+
Assert.assertEquals(bufPos + n, buf.position());
652+
readData = new byte[n];
653+
buf.rewind();
654+
buf.position(bufPos);
655+
buf.get(readData);
656+
expectedData = new byte[n];
657+
System.arraycopy(data, dataLen / 2, expectedData, 0, n);
658+
Assert.assertArrayEquals(readData, expectedData);
659+
}
561660

562661
/** Test byte buffer read with different buffer size. */
563662
@Test(timeout=120000)
564663
public void testByteBufferRead() throws Exception {
565-
OutputStream out = getOutputStream(defaultBufferSize);
566-
writeData(out);
664+
try (OutputStream out = getOutputStream(defaultBufferSize)) {
665+
writeData(out);
666+
}
567667

568668
// Default buffer size, initial buffer position is 0
569669
InputStream in = getInputStream(defaultBufferSize);
@@ -613,6 +713,53 @@ public void testByteBufferRead() throws Exception {
613713
byteBufferReadCheck(in, buf, 11);
614714
in.close();
615715
}
716+
717+
/** Test byte buffer pread with different buffer size. */
718+
@Test(timeout=120000)
719+
public void testByteBufferPread() throws Exception {
720+
try (OutputStream out = getOutputStream(defaultBufferSize)) {
721+
writeData(out);
722+
}
723+
724+
try (InputStream defaultBuf = getInputStream(defaultBufferSize);
725+
InputStream smallBuf = getInputStream(smallBufferSize)) {
726+
727+
ByteBuffer buf = ByteBuffer.allocate(dataLen + 100);
728+
729+
// Default buffer size, initial buffer position is 0
730+
byteBufferPreadCheck(defaultBuf, buf, 0);
731+
732+
// Default buffer size, initial buffer position is not 0
733+
buf.clear();
734+
byteBufferPreadCheck(defaultBuf, buf, 11);
735+
736+
// Small buffer size, initial buffer position is 0
737+
buf.clear();
738+
byteBufferPreadCheck(smallBuf, buf, 0);
739+
740+
// Small buffer size, initial buffer position is not 0
741+
buf.clear();
742+
byteBufferPreadCheck(smallBuf, buf, 11);
743+
744+
// Test with direct ByteBuffer
745+
buf = ByteBuffer.allocateDirect(dataLen + 100);
746+
747+
// Direct buffer, default buffer size, initial buffer position is 0
748+
byteBufferPreadCheck(defaultBuf, buf, 0);
749+
750+
// Direct buffer, default buffer size, initial buffer position is not 0
751+
buf.clear();
752+
byteBufferPreadCheck(defaultBuf, buf, 11);
753+
754+
// Direct buffer, small buffer size, initial buffer position is 0
755+
buf.clear();
756+
byteBufferPreadCheck(smallBuf, buf, 0);
757+
758+
// Direct buffer, small buffer size, initial buffer position is not 0
759+
buf.clear();
760+
byteBufferPreadCheck(smallBuf, buf, 11);
761+
}
762+
}
616763

617764
@Test(timeout=120000)
618765
public void testCombinedOp() throws Exception {
@@ -850,5 +997,23 @@ public void testUnbuffer() throws Exception {
850997
// The close will be called when exiting this try-with-resource block
851998
}
852999
}
1000+
1001+
// Test ByteBuffer pread
1002+
try (InputStream in = getInputStream(smallBufferSize)) {
1003+
if (in instanceof ByteBufferPositionedReadable) {
1004+
ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable) in;
1005+
1006+
// Test unbuffer after pread
1007+
byteBufferPreadCheck(bbpin);
1008+
((CanUnbuffer) in).unbuffer();
1009+
1010+
// Test pread again after unbuffer
1011+
byteBufferPreadCheck(bbpin);
1012+
1013+
// Test close after unbuffer
1014+
((CanUnbuffer) in).unbuffer();
1015+
// The close will be called when exiting this try-with-resource block
1016+
}
1017+
}
8531018
}
8541019
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,8 @@ public boolean hasCapability(String capability) {
429429
case StreamCapabilities.READAHEAD:
430430
case StreamCapabilities.DROPBEHIND:
431431
case StreamCapabilities.UNBUFFER:
432+
case StreamCapabilities.READBYTEBUFFER:
433+
case StreamCapabilities.PREADBYTEBUFFER:
432434
return true;
433435
default:
434436
return false;

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,27 +105,32 @@ public void testByteBufferReadFully() throws Exception {}
105105
@Override
106106
@Test(timeout=10000)
107107
public void testReadFully() throws IOException {}
108-
108+
109109
@Ignore("Wrapped stream doesn't support Seek")
110110
@Override
111111
@Test(timeout=10000)
112112
public void testSeek() throws IOException {}
113-
113+
114114
@Ignore("Wrapped stream doesn't support ByteBufferRead")
115115
@Override
116116
@Test(timeout=10000)
117117
public void testByteBufferRead() throws IOException {}
118-
118+
119+
@Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
120+
@Override
121+
@Test(timeout=10000)
122+
public void testByteBufferPread() throws IOException {}
123+
119124
@Ignore("Wrapped stream doesn't support ByteBufferRead, Seek")
120125
@Override
121126
@Test(timeout=10000)
122127
public void testCombinedOp() throws IOException {}
123-
128+
124129
@Ignore("Wrapped stream doesn't support SeekToNewSource")
125130
@Override
126131
@Test(timeout=10000)
127132
public void testSeekToNewSource() throws IOException {}
128-
133+
129134
@Ignore("Wrapped stream doesn't support HasEnhancedByteBufferAccess")
130135
@Override
131136
@Test(timeout=10000)
@@ -135,4 +140,4 @@ public void testHasEnhancedByteBufferAccess() throws IOException {}
135140
@Override
136141
@Test
137142
public void testUnbuffer() throws Exception {}
138-
}
143+
}

0 commit comments

Comments
 (0)