Skip to content

Commit 3e84c62

Browse files
committed
HDFS-3246: pRead equivalent for direct read path (apache#597)
HDFS-3246: pRead equivalent for direct read path Contributed by Sahil Takiar
1 parent 02553c0 commit 3e84c62

File tree

12 files changed

+531
-127
lines changed

12 files changed

+531
-127
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,7 @@
3333
import com.google.common.base.Preconditions;
3434
import org.apache.hadoop.classification.InterfaceAudience;
3535
import org.apache.hadoop.classification.InterfaceStability;
36-
import org.apache.hadoop.fs.ByteBufferReadable;
37-
import org.apache.hadoop.fs.CanSetDropBehind;
38-
import org.apache.hadoop.fs.CanSetReadahead;
39-
import org.apache.hadoop.fs.CanUnbuffer;
40-
import org.apache.hadoop.fs.FSExceptionMessages;
41-
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
42-
import org.apache.hadoop.fs.HasFileDescriptor;
43-
import org.apache.hadoop.fs.PositionedReadable;
44-
import org.apache.hadoop.fs.ReadOption;
45-
import org.apache.hadoop.fs.Seekable;
46-
import org.apache.hadoop.fs.StreamCapabilities;
47-
import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
36+
import org.apache.hadoop.fs.*;
4837
import org.apache.hadoop.io.ByteBufferPool;
4938
import org.apache.hadoop.util.StringUtils;
5039

@@ -64,7 +53,7 @@
6453
public class CryptoInputStream extends FilterInputStream implements
6554
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
6655
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
67-
ReadableByteChannel, CanUnbuffer, StreamCapabilities {
56+
ReadableByteChannel, CanUnbuffer, StreamCapabilities{
6857
private final byte[] oneByteBuf = new byte[1];
6958
private final CryptoCodec codec;
7059
private final Decryptor decryptor;
@@ -565,6 +554,51 @@ private void decrypt(ByteBuffer buf, int n, int start)
565554
}
566555
buf.position(pos);
567556
}
557+
558+
private void decrypt(long filePosition, ByteBuffer buf, int length, int start)
559+
throws IOException {
560+
ByteBuffer localInBuffer = null;
561+
ByteBuffer localOutBuffer = null;
562+
563+
// Duplicate the buffer so we don't have to worry about resetting the
564+
// original position and limit at the end of the method
565+
buf = buf.duplicate();
566+
567+
int decryptedBytes = 0;
568+
Decryptor localDecryptor = null;
569+
try {
570+
localInBuffer = getBuffer();
571+
localOutBuffer = getBuffer();
572+
localDecryptor = getDecryptor();
573+
byte[] localIV = initIV.clone();
574+
updateDecryptor(localDecryptor, filePosition, localIV);
575+
byte localPadding = getPadding(filePosition);
576+
// Set proper filePosition for inputdata.
577+
localInBuffer.position(localPadding);
578+
579+
while (decryptedBytes < length) {
580+
buf.position(start + decryptedBytes);
581+
buf.limit(start + decryptedBytes +
582+
Math.min(length - decryptedBytes, localInBuffer.remaining()));
583+
localInBuffer.put(buf);
584+
// Do decryption
585+
try {
586+
decrypt(localDecryptor, localInBuffer, localOutBuffer, localPadding);
587+
buf.position(start + decryptedBytes);
588+
buf.limit(start + length);
589+
decryptedBytes += localOutBuffer.remaining();
590+
buf.put(localOutBuffer);
591+
} finally {
592+
localPadding = afterDecryption(localDecryptor, localInBuffer,
593+
filePosition + length, localIV);
594+
}
595+
}
596+
} finally {
597+
returnBuffer(localInBuffer);
598+
returnBuffer(localOutBuffer);
599+
returnDecryptor(localDecryptor);
600+
}
601+
}
568602

569603
@Override
570604
public int available() throws IOException {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 52 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -41,29 +41,29 @@ public class FSDataInputStream extends DataInputStream
4141
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
4242
ByteBufferPositionedReadable {
4343
/**
44-
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
44+
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
4545
* objects
4646
*/
4747
private final IdentityHashStore<ByteBuffer, ByteBufferPool>
48-
extendedReadBuffers
49-
= new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
48+
extendedReadBuffers
49+
= new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
5050

5151
public FSDataInputStream(InputStream in) {
5252
super(in);
53-
if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
53+
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
5454
throw new IllegalArgumentException(in.getClass().getCanonicalName() +
55-
" is not an instance of Seekable or PositionedReadable");
55+
" is not an instance of Seekable or PositionedReadable");
5656
}
5757
}
58-
58+
5959
/**
6060
* Seek to the given offset.
6161
*
6262
* @param desired offset to seek to
6363
*/
6464
@Override
6565
public void seek(long desired) throws IOException {
66-
((Seekable)in).seek(desired);
66+
((Seekable) in).seek(desired);
6767
}
6868

6969
/**
@@ -73,65 +73,65 @@ public void seek(long desired) throws IOException {
7373
*/
7474
@Override
7575
public long getPos() throws IOException {
76-
return ((Seekable)in).getPos();
76+
return ((Seekable) in).getPos();
7777
}
78-
78+
7979
/**
8080
* Read bytes from the given position in the stream to the given buffer.
8181
*
82-
* @param position position in the input stream to seek
83-
* @param buffer buffer into which data is read
84-
* @param offset offset into the buffer in which data is written
85-
* @param length maximum number of bytes to read
82+
* @param position position in the input stream to seek
83+
* @param buffer buffer into which data is read
84+
* @param offset offset into the buffer in which data is written
85+
* @param length maximum number of bytes to read
8686
* @return total number of bytes read into the buffer, or <code>-1</code>
87-
* if there is no more data because the end of the stream has been
88-
* reached
87+
* if there is no more data because the end of the stream has been
88+
* reached
8989
*/
9090
@Override
9191
public int read(long position, byte[] buffer, int offset, int length)
92-
throws IOException {
93-
return ((PositionedReadable)in).read(position, buffer, offset, length);
92+
throws IOException {
93+
return ((PositionedReadable) in).read(position, buffer, offset, length);
9494
}
9595

9696
/**
9797
* Read bytes from the given position in the stream to the given buffer.
9898
* Continues to read until <code>length</code> bytes have been read.
9999
*
100-
* @param position position in the input stream to seek
101-
* @param buffer buffer into which data is read
102-
* @param offset offset into the buffer in which data is written
103-
* @param length the number of bytes to read
104-
* @throws IOException IO problems
100+
* @param position position in the input stream to seek
101+
* @param buffer buffer into which data is read
102+
* @param offset offset into the buffer in which data is written
103+
* @param length the number of bytes to read
104+
* @throws IOException IO problems
105105
* @throws EOFException If the end of stream is reached while reading.
106106
* If an exception is thrown an undetermined number
107-
* of bytes in the buffer may have been written.
107+
* of bytes in the buffer may have been written.
108108
*/
109109
@Override
110110
public void readFully(long position, byte[] buffer, int offset, int length)
111-
throws IOException {
112-
((PositionedReadable)in).readFully(position, buffer, offset, length);
111+
throws IOException {
112+
((PositionedReadable) in).readFully(position, buffer, offset, length);
113113
}
114-
114+
115115
/**
116116
* See {@link #readFully(long, byte[], int, int)}.
117117
*/
118118
@Override
119119
public void readFully(long position, byte[] buffer)
120-
throws IOException {
121-
((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
120+
throws IOException {
121+
((PositionedReadable) in).readFully(position, buffer, 0, buffer.length);
122122
}
123-
123+
124124
/**
125125
* Seek to the given position on an alternate copy of the data.
126126
*
127-
* @param targetPos position to seek to
127+
* @param targetPos position to seek to
128128
* @return true if a new source is found, false otherwise
129129
*/
130130
@Override
131131
public boolean seekToNewSource(long targetPos) throws IOException {
132-
return ((Seekable)in).seekToNewSource(targetPos);
132+
return ((Seekable) in).seekToNewSource(targetPos);
133133
}
134-
134+
135135
/**
136136
* Get a reference to the wrapped input stream. Used by unit tests.
137137
*
@@ -145,7 +145,7 @@ public InputStream getWrappedStream() {
145145
@Override
146146
public int read(ByteBuffer buf) throws IOException {
147147
if (in instanceof ByteBufferReadable) {
148-
return ((ByteBufferReadable)in).read(buf);
148+
return ((ByteBufferReadable) in).read(buf);
149149
}
150150

151151
throw new UnsupportedOperationException("Byte-buffer read unsupported " +
@@ -165,37 +165,36 @@ public FileDescriptor getFileDescriptor() throws IOException {
165165

166166
@Override
167167
public void setReadahead(Long readahead)
168-
throws IOException, UnsupportedOperationException {
168+
throws IOException, UnsupportedOperationException {
169169
try {
170-
((CanSetReadahead)in).setReadahead(readahead);
170+
((CanSetReadahead) in).setReadahead(readahead);
171171
} catch (ClassCastException e) {
172172
throw new UnsupportedOperationException(in.getClass().getCanonicalName() +
173-
" does not support setting the readahead caching strategy.");
173+
" does not support setting the readahead caching strategy.");
174174
}
175175
}
176176

177177
@Override
178178
public void setDropBehind(Boolean dropBehind)
179-
throws IOException, UnsupportedOperationException {
179+
throws IOException, UnsupportedOperationException {
180180
try {
181-
((CanSetDropBehind)in).setDropBehind(dropBehind);
181+
((CanSetDropBehind) in).setDropBehind(dropBehind);
182182
} catch (ClassCastException e) {
183183
throw new UnsupportedOperationException("this stream does not " +
184-
"support setting the drop-behind caching setting.");
184+
"support setting the drop-behind caching setting.");
185185
}
186186
}
187187

188188
@Override
189189
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
190-
EnumSet<ReadOption> opts)
190+
EnumSet<ReadOption> opts)
191191
throws IOException, UnsupportedOperationException {
192192
try {
193-
return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
194-
maxLength, opts);
195-
}
196-
catch (ClassCastException e) {
193+
return ((HasEnhancedByteBufferAccess) in).read(bufferPool,
194+
maxLength, opts);
195+
} catch (ClassCastException e) {
197196
ByteBuffer buffer = ByteBufferUtil.
198-
fallbackRead(this, bufferPool, maxLength);
197+
fallbackRead(this, bufferPool, maxLength);
199198
if (buffer != null) {
200199
extendedReadBuffers.put(buffer, bufferPool);
201200
}
@@ -204,23 +203,22 @@ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
204203
}
205204

206205
private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET =
207-
EnumSet.noneOf(ReadOption.class);
206+
EnumSet.noneOf(ReadOption.class);
208207

209208
final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
210209
throws IOException, UnsupportedOperationException {
211210
return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
212211
}
213-
212+
214213
@Override
215214
public void releaseBuffer(ByteBuffer buffer) {
216215
try {
217-
((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
218-
}
219-
catch (ClassCastException e) {
220-
ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
216+
((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
217+
} catch (ClassCastException e) {
218+
ByteBufferPool bufferPool = extendedReadBuffers.remove(buffer);
221219
if (bufferPool == null) {
222220
throw new IllegalArgumentException("tried to release a buffer " +
223-
"that was not created by this stream.");
221+
"that was not created by this stream.");
224222
}
225223
bufferPool.putBuffer(buffer);
226224
}
@@ -241,6 +239,7 @@ public boolean hasCapability(String capability) {
241239

242240
/**
243241
* String value. Includes the string value of the inner stream
242+
*
244243
* @return the stream
245244
*/
246245
@Override
@@ -254,17 +253,7 @@ public int read(long position, ByteBuffer buf) throws IOException {
254253
return ((ByteBufferPositionedReadable) in).read(position, buf);
255254
}
256255
throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
257-
"by " + in.getClass().getCanonicalName());
258-
}
259-
260-
@Override
261-
public void readFully(long position, ByteBuffer buf) throws IOException {
262-
if (in instanceof ByteBufferPositionedReadable) {
263-
((ByteBufferPositionedReadable) in).readFully(position, buf);
264-
} else {
265-
throw new UnsupportedOperationException("Byte-buffer pread " +
266-
"unsupported by " + in.getClass().getCanonicalName());
267-
}
256+
"by " + in.getClass().getCanonicalName());
268257
}
269258

270259
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ public interface StreamCapabilities {
5959
*/
6060
String UNBUFFER = "in:unbuffer";
6161

62+
/**
63+
* Stream read(ByteBuffer) capability implemented by
64+
* {@link ByteBufferReadable#read(java.nio.ByteBuffer)}.
65+
*/
66+
String READBYTEBUFFER = "in:readbytebuffer";
67+
68+
/**
69+
* Stream read(long, ByteBuffer) capability implemented by
70+
* {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
71+
*/
72+
String PREADBYTEBUFFER = "in:preadbytebuffer";
73+
6274
/**
6375
* Capabilities that a stream can support and be queried for.
6476
*/

0 commit comments

Comments
 (0)