Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

import com.google.common.base.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
Expand All @@ -40,6 +43,7 @@
*/
public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
StreamCapabilities {
private static Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);

private final AbfsClient client;
private final Statistics statistics;
Expand Down Expand Up @@ -234,6 +238,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
final AbfsRestOperation op;
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
} catch (AzureBlobFileSystemException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
*/

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;

import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED;

class ReadBuffer {

private AbfsInputStream stream;
Expand All @@ -40,6 +42,8 @@ class ReadBuffer {
private boolean isLastByteConsumed = false;
private boolean isAnyByteConsumed = false;

private IOException errException = null;

public AbfsInputStream getStream() {
return stream;
}
Expand Down Expand Up @@ -88,12 +92,23 @@ public void setBufferindex(int bufferindex) {
this.bufferindex = bufferindex;
}

public IOException getErrException() {
return errException;
}

public void setErrException(final IOException errException) {
this.errException = errException;
}

public ReadBufferStatus getStatus() {
return status;
}

public void setStatus(ReadBufferStatus status) {
this.status = status;
if (status == READ_FAILED) {
bufferindex = -1;
}
}

public CountDownLatch getLatch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;

import com.google.common.annotations.VisibleForTesting;

/**
* The Read Buffer Manager for Rest AbfsClient.
*/
Expand Down Expand Up @@ -141,7 +144,8 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi
* @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
* @return the number of bytes read
*/
int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) {
int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer)
throws IOException {
// not synchronized, so have to be careful with locking
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("getBlock for file {} position {} thread {}",
Expand Down Expand Up @@ -253,7 +257,12 @@ private synchronized boolean tryEvict() {
}

private boolean evict(final ReadBuffer buf) {
freeList.push(buf.getBufferindex());
// As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
// avoid adding it to freeList.
if (buf.getBufferindex() != -1) {
freeList.push(buf.getBufferindex());
}
Comment on lines +261 to +265
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @snvijaya
I am unable to understand the significance of this change. I couldn't find in code anywhere where bufferIndex is set to -1 in case of read failure apart from the default value in the class. But when the buffers initialised, they are always set to value from 0 to 15.
Trying to understand this for #3285. So please review that as well. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its set to -1 when read fails. You will find the diff for this in ReadBuffer.java line 110.
There is an issue with this commit though, for which a hotfix was made. Incase its relevant to your change -> https://issues.apache.org/jira/browse/HADOOP-17301
Will check on your PR by EOW.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @snvijaya


completedReadList.remove(buf);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
Expand Down Expand Up @@ -289,6 +298,27 @@ private ReadBuffer getFromList(final Collection<ReadBuffer> list, final AbfsInpu
return null;
}

/**
* Returns buffers that failed or passed from completed queue
* @param stream
* @param requestedOffset
* @return
*/
private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) {
for (ReadBuffer buffer : completedReadList) {
// Buffer is returned if the requestedOffset is at or above buffer's
// offset but less than buffer's length or the actual requestedLength
if ((buffer.getStream() == stream)
&& (requestedOffset >= buffer.getOffset())
&& ((requestedOffset < buffer.getOffset() + buffer.getLength())
|| (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) {
return buffer;
}
}

return null;
}

private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) {
ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
if (buffer != null) {
Expand All @@ -299,11 +329,28 @@ private void clearFromReadAheadQueue(final AbfsInputStream stream, final long re
}

private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length,
final byte[] buffer) {
ReadBuffer buf = getFromList(completedReadList, stream, position);
if (buf == null || position >= buf.getOffset() + buf.getLength()) {
final byte[] buffer) throws IOException {
ReadBuffer buf = getBufferFromCompletedQueue(stream, position);

if (buf == null) {
return 0;
}

if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
// To prevent new read requests to fail due to old read-ahead attempts,
// return exception only from buffers that failed within last THRESHOLD_AGE_MILLISECONDS
if ((currentTimeMillis() - (buf.getTimeStamp()) < THRESHOLD_AGE_MILLISECONDS)) {
throw buf.getErrException();
} else {
return 0;
}
}

if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
|| (position >= buf.getOffset() + buf.getLength())) {
return 0;
}

int cursor = (int) (position - buf.getOffset());
int availableLengthInBuffer = buf.getLength() - cursor;
int lengthToCopy = Math.min(length, availableLengthInBuffer);
Expand Down Expand Up @@ -368,14 +415,18 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i
inProgressList.remove(buffer);
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setTimeStamp(currentTimeMillis());
buffer.setLength(bytesActuallyRead);
completedReadList.add(buffer);
} else {
freeList.push(buffer.getBufferindex());
// buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
// buffer will be deleted as per the eviction policy.
}

buffer.setStatus(result);
buffer.setTimeStamp(currentTimeMillis());
completedReadList.add(buffer);
}

//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
Expand All @@ -392,4 +443,19 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i
private long currentTimeMillis() {
return System.nanoTime() / 1000 / 1000;
}

@VisibleForTesting
int getThreshold_age_milliseconds() {
return THRESHOLD_AGE_MILLISECONDS;
}

@VisibleForTesting
int getCompletedReadListSize() {
return completedReadList.size();
}

@VisibleForTesting
void callTryEvict() {
tryEvict();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
Expand Down Expand Up @@ -61,9 +62,18 @@ public void run() {
if (buffer != null) {
try {
// do the actual read, from the file.
int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
int bytesRead = buffer.getStream().readRemote(
buffer.getOffset()
, buffer.getBuffer()
, 0
// If AbfsInputStream was created with bigger buffer size than
// read-ahead buffer size, make sure a valid length is passed
// for remote read
, Math.min(buffer.getRequestedLength(), buffer.getBuffer().length));

bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
} catch (Exception ex) {
buffer.setErrException(new IOException(ex));
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
}
}
Expand Down
Loading