Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand Down Expand Up @@ -61,10 +62,18 @@ public class BlockInputStream extends InputStream implements Seekable {
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
private List<ChunkInfo> chunks;
// ChunkIndex points to the index current chunk in the buffers or the the
// index of chunk which will be read next into the buffers in
// readChunkFromContainer().
private int chunkIndex;
// ChunkIndexOfCurrentBuffer points to the index of chunk read into the
// buffers or index of the last chunk in the buffers. It is updated only
// when a new chunk is read from container into the buffers.
private int chunkIndexOfCurrentBuffer;
private long[] chunkOffset;
private List<ByteBuffer> buffers;
private int bufferIndex;
private long bufferPosition;
private final boolean verifyChecksum;

/**
Expand All @@ -76,24 +85,34 @@ public class BlockInputStream extends InputStream implements Seekable {
* @param chunks list of chunks to read
* @param traceID container protocol call traceID
* @param verifyChecksum verify checksum
* @param initialPosition the initial position of the stream pointer. This
* position is seeked now if the up-stream was seeked
* before this was created.
*/
public BlockInputStream(
BlockID blockID, XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID,
boolean verifyChecksum) {
boolean verifyChecksum, long initialPosition) throws IOException {
this.blockID = blockID;
this.traceID = traceID;
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
this.chunks = chunks;
this.chunkIndex = -1;
this.chunkIndex = 0;
this.chunkIndexOfCurrentBuffer = -1;
// chunkOffset[i] stores offset at which chunk i stores data in
// BlockInputStream
this.chunkOffset = new long[this.chunks.size()];
initializeChunkOffset();
this.buffers = null;
this.bufferIndex = 0;
this.bufferPosition = -1;
this.verifyChecksum = verifyChecksum;
if (initialPosition > 0) {
// The stream was seeked to a position before the stream was
// initialized. So seeking to the position now.
seek(initialPosition);
}
}

private void initializeChunkOffset() {
Expand Down Expand Up @@ -176,7 +195,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
*
* @return true if EOF, false if more data is available
*/
private boolean blockStreamEOF() {
protected boolean blockStreamEOF() {
if (buffersHaveData() || chunksRemaining()) {
return false;
} else {
Expand Down Expand Up @@ -223,12 +242,19 @@ private synchronized void checkOpen() throws IOException {
*/
private synchronized int prepareRead(int len) throws IOException {
for (;;) {
if (!buffersAllocated()) {
// The current chunk at chunkIndex has not been read from the
// container. Read the chunk and put the data into buffers.
readChunkFromContainer();
}
if (buffersHaveData()) {
// Data is available from buffers
ByteBuffer bb = buffers.get(bufferIndex);
return len > bb.remaining() ? bb.remaining() : len;
} else if (chunksRemaining()) {
// There are additional chunks available.
// Read the next chunk in the block.
chunkIndex += 1;
readChunkFromContainer();
} else {
// All available input has been consumed.
Expand All @@ -237,26 +263,31 @@ private synchronized int prepareRead(int len) throws IOException {
}
}

private boolean buffersHaveData() {
boolean hasData = false;

private boolean buffersAllocated() {
if (buffers == null || buffers.isEmpty()) {
return false;
}
return true;
}

while (bufferIndex < (buffers.size())) {
if (buffers.get(bufferIndex).hasRemaining()) {
// current buffer has data
hasData = true;
break;
} else {
if (buffersRemaining()) {
// move to next available buffer
++bufferIndex;
Preconditions.checkState(bufferIndex < buffers.size());
} else {
// no more buffers remaining
private boolean buffersHaveData() {
boolean hasData = false;

if (buffersAllocated()) {
while (bufferIndex < (buffers.size())) {
if (buffers.get(bufferIndex).hasRemaining()) {
// current buffer has data
hasData = true;
break;
} else {
if (buffersRemaining()) {
// move to next available buffer
++bufferIndex;
Preconditions.checkState(bufferIndex < buffers.size());
} else {
// no more buffers remaining
break;
}
}
}
}
Expand All @@ -272,7 +303,14 @@ private boolean chunksRemaining() {
if ((chunks == null) || chunks.isEmpty()) {
return false;
}
return (chunkIndex < (chunks.size() - 1));
// Check if more chunks are remaining in the stream after chunkIndex
if (chunkIndex < (chunks.size() - 1)) {
return true;
}
// ChunkIndex is the last chunk in the stream. Check if this chunk has
// been read from container or not. Return true if chunkIndex has not
// been read yet and false otherwise.
return chunkIndexOfCurrentBuffer != chunkIndex;
}

/**
Expand All @@ -283,34 +321,14 @@ private boolean chunksRemaining() {
* @throws IOException if there is an I/O error while performing the call
*/
private synchronized void readChunkFromContainer() throws IOException {
// On every chunk read chunkIndex should be increased so as to read the
// next chunk
chunkIndex += 1;
XceiverClientReply reply;
ReadChunkResponseProto readChunkResponse = null;
// Read the chunk at chunkIndex
final ChunkInfo chunkInfo = chunks.get(chunkIndex);
List<DatanodeDetails> excludeDns = null;
ByteString byteString;
List<DatanodeDetails> dnList = xceiverClient.getPipeline().getNodes();
List<DatanodeDetails> dnList = getDatanodeList();
while (true) {
try {
reply = ContainerProtocolCalls
.readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
ContainerProtos.ContainerCommandResponseProto response;
response = reply.getResponse().get();
ContainerProtocolCalls.validateContainerResponse(response);
readChunkResponse = response.getReadChunk();
} catch (IOException e) {
if (e instanceof StorageContainerException) {
throw e;
}
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
} catch (ExecutionException | InterruptedException e) {
throw new IOException(
"Failed to execute ReadChunk command for chunk " + chunkInfo
.getChunkName(), e);
}
byteString = readChunkResponse.getData();
List<DatanodeDetails> dnListFromReadChunkCall = new ArrayList<>();
byteString = readChunk(chunkInfo, excludeDns, dnListFromReadChunkCall);
try {
if (byteString.size() != chunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
Expand All @@ -333,7 +351,7 @@ private synchronized void readChunkFromContainer() throws IOException {
if (excludeDns == null) {
excludeDns = new ArrayList<>();
}
excludeDns.addAll(reply.getDatanodes());
excludeDns.addAll(dnListFromReadChunkCall);
if (excludeDns.size() == dnList.size()) {
throw ioe;
}
Expand All @@ -342,6 +360,47 @@ private synchronized void readChunkFromContainer() throws IOException {

buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0;
chunkIndexOfCurrentBuffer = chunkIndex;

// The bufferIndex and position might need to be adjusted if seek() was
// called on the stream before. This needs to be done so that the buffer
// position can be advanced to the 'seeked' position.
adjustBufferIndex();
}

/**
* Send RPC call to get the chunk from the container.
*/
@VisibleForTesting
protected ByteString readChunk(final ChunkInfo chunkInfo,
List<DatanodeDetails> excludeDns, List<DatanodeDetails> dnListFromReply)
throws IOException {
XceiverClientReply reply;
ReadChunkResponseProto readChunkResponse = null;
try {
reply = ContainerProtocolCalls
.readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
ContainerProtos.ContainerCommandResponseProto response;
response = reply.getResponse().get();
ContainerProtocolCalls.validateContainerResponse(response);
readChunkResponse = response.getReadChunk();
dnListFromReply.addAll(reply.getDatanodes());
} catch (IOException e) {
if (e instanceof StorageContainerException) {
throw e;
}
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
} catch (ExecutionException | InterruptedException e) {
throw new IOException(
"Failed to execute ReadChunk command for chunk " + chunkInfo
.getChunkName(), e);
}
return readChunkResponse.getData();
}

@VisibleForTesting
protected List<DatanodeDetails> getDatanodeList() {
return xceiverClient.getPipeline().getNodes();
}

@Override
Expand All @@ -352,9 +411,8 @@ public synchronized void seek(long pos) throws IOException {
throw new EOFException("EOF encountered pos: " + pos + " container key: "
+ blockID.getLocalID());
}
if (chunkIndex == -1) {
chunkIndex = Arrays.binarySearch(chunkOffset, pos);
} else if (pos < chunkOffset[chunkIndex]) {

if (pos < chunkOffset[chunkIndex]) {
chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos);
} else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex)
.getLen()) {
Expand All @@ -368,40 +426,71 @@ public synchronized void seek(long pos) throws IOException {
// accordingly so that chunkIndex = insertionPoint - 1
chunkIndex = -chunkIndex -2;
}
// adjust chunkIndex so that readChunkFromContainer reads the correct chunk
chunkIndex -= 1;
readChunkFromContainer();
adjustBufferIndex(pos);

// The bufferPosition should be adjusted to account for the chunk offset
// of the chunk the the pos actually points to.
bufferPosition = pos - chunkOffset[chunkIndex];

// Check if current buffers correspond to the chunk index being seeked
// and if the buffers have any data.
if (chunkIndex == chunkIndexOfCurrentBuffer && buffersAllocated()) {
// Position the buffer to the seeked position.
adjustBufferIndex();
} else {
// Release the current buffers. The next readChunkFromContainer will
// read the required chunk and position the buffer to the seeked
// position.
releaseBuffers();
}
}

private void adjustBufferIndex(long pos) {
long tempOffest = chunkOffset[chunkIndex];
private void adjustBufferIndex() {
if (bufferPosition == -1) {
// The stream has not been seeked to a position. No need to adjust the
// buffer Index and position.
return;
}
// The bufferPosition is w.r.t the buffers for current chunk.
// Adjust the bufferIndex and position to the seeked position.
long tempOffest = 0;
for (int i = 0; i < buffers.size(); i++) {
if (pos - tempOffest >= buffers.get(i).capacity()) {
if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
tempOffest += buffers.get(i).capacity();
} else {
bufferIndex = i;
break;
}
}
buffers.get(bufferIndex).position((int) (pos - tempOffest));
buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
// Reset the bufferPosition as the seek() operation has been completed.
bufferPosition = -1;
}

@Override
public synchronized long getPos() throws IOException {
if (chunkIndex == -1) {
// no data consumed yet, a new stream OR after seek
return 0;
}

if (blockStreamEOF()) {
// position = chunkOffset of current chunk (at chunkIndex) + position of
// the buffer corresponding to the chunk.
long bufferPos = 0;

if (bufferPosition >= 0) {
// seek has been called but the buffers were empty. Hence, the buffer
// position will be advanced after the buffers are filled.
// We return the chunkOffset + bufferPosition here as that will be the
// position of the buffer pointer after reading the chunk file.
bufferPos = bufferPosition;

} else if (blockStreamEOF()) {
// all data consumed, buffers have been released.
// get position from the chunk offset and chunk length of last chunk
return chunkOffset[chunkIndex] + chunks.get(chunkIndex).getLen();
bufferPos = chunks.get(chunkIndex).getLen();

} else if (buffersAllocated()) {
// get position from available buffers of current chunk
bufferPos = buffers.get(bufferIndex).position();

}

// get position from available buffers of current chunk
return chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
return chunkOffset[chunkIndex] + bufferPos;
}

@Override
Expand All @@ -412,4 +501,9 @@ public boolean seekToNewSource(long targetPos) throws IOException {
public BlockID getBlockID() {
return blockID;
}

@VisibleForTesting
protected int getChunkIndex() {
return chunkIndex;
}
}
Loading