Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
private static final Logger LOG =
LoggerFactory.getLogger(FadvisedFileRegion.class);

private final Object closeLock = new Object();
private final boolean manageOsCache;
private final int readaheadLength;
private final ReadaheadPool readaheadPool;
Expand All @@ -51,12 +52,12 @@ public class FadvisedFileRegion extends DefaultFileRegion {
private final int shuffleBufferSize;
private final boolean shuffleTransferToAllowed;
private final FileChannel fileChannel;
private ReadaheadRequest readaheadRequest;

private volatile ReadaheadRequest readaheadRequest;

public FadvisedFileRegion(RandomAccessFile file, long position, long count,
boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
String identifier, int shuffleBufferSize,
String identifier, int shuffleBufferSize,
boolean shuffleTransferToAllowed) throws IOException {
super(file.getChannel(), position, count);
this.manageOsCache = manageOsCache;
Expand All @@ -73,97 +74,110 @@ public FadvisedFileRegion(RandomAccessFile file, long position, long count,

@Override
public long transferTo(WritableByteChannel target, long position)
throws IOException {
if (readaheadPool != null && readaheadLength > 0) {
readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
position() + position, readaheadLength,
position() + count(), readaheadRequest);
throws IOException {
synchronized (closeLock) {
if (fd.valid()) {
if (readaheadPool != null && readaheadLength > 0) {
readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
position() + position, readaheadLength,
position() + count(), readaheadRequest);
}

if(this.shuffleTransferToAllowed) {
return super.transferTo(target, position);
} else {
return customShuffleTransfer(target, position);
}
} else {
return 0L;
}
}

if(this.shuffleTransferToAllowed) {
return super.transferTo(target, position);
} else {
return customShuffleTransfer(target, position);
}

}

/**
* This method transfers data using local buffer. It transfers data from
* a disk to a local buffer in memory, and then it transfers data from the
* This method transfers data using local buffer. It transfers data from
* a disk to a local buffer in memory, and then it transfers data from the
* buffer to the target. This is used only if transferTo is disallowed in
* the configuration file. super.TransferTo does not perform well on Windows
* due to a small IO request generated. customShuffleTransfer can control
* the size of the IO requests by changing the size of the intermediate
* the configuration file. super.TransferTo does not perform well on Windows
* due to a small IO request generated. customShuffleTransfer can control
* the size of the IO requests by changing the size of the intermediate
* buffer.
*/
@VisibleForTesting
long customShuffleTransfer(WritableByteChannel target, long position)
throws IOException {
throws IOException {
long actualCount = this.count - position;
if (actualCount < 0 || position < 0) {
throw new IllegalArgumentException(
"position out of range: " + position +
" (expected: 0 - " + (this.count - 1) + ')');
"position out of range: " + position +
" (expected: 0 - " + (this.count - 1) + ')');
}
if (actualCount == 0) {
return 0L;
}

long trans = actualCount;
int readSize;
ByteBuffer byteBuffer = ByteBuffer.allocate(
Math.min(
this.shuffleBufferSize,
trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
Math.min(
this.shuffleBufferSize,
trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));

while(trans > 0L &&
(readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
(readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
//adjust counters and buffer limit
if(readSize < trans) {
trans -= readSize;
position += readSize;
byteBuffer.flip();
} else {
//We can read more than we need if the actualCount is not multiple
//We can read more than we need if the actualCount is not multiple
//of the byteBuffer size and file is big enough. In that case we cannot
//use flip method but we need to set buffer limit manually to trans.
byteBuffer.limit((int)trans);
byteBuffer.position(0);
position += trans;
position += trans;
trans = 0;
}

//write data to the target
while(byteBuffer.hasRemaining()) {
target.write(byteBuffer);
}

byteBuffer.clear();
}

return actualCount - trans;
}


@Override
protected void deallocate() {
if (readaheadRequest != null) {
readaheadRequest.cancel();
synchronized (closeLock) {
if (readaheadRequest != null) {
readaheadRequest.cancel();
readaheadRequest = null;
}
super.deallocate();
}
super.deallocate();
}

/**
* Call when the transfer completes successfully so we can advise the OS that
* we don't need the region to be cached anymore.
*/
public void transferSuccessful() {
if (manageOsCache && count() > 0) {
try {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
fd, position(), count(), POSIX_FADV_DONTNEED);
} catch (Throwable t) {
LOG.warn("Failed to manage OS cache for " + identifier, t);
synchronized (closeLock) {
if (fd.valid() && manageOsCache && count() > 0) {
try {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
fd, position(), count(), POSIX_FADV_DONTNEED);
} catch (Throwable t) {
LOG.warn("Failed to manage OS cache for " + identifier +
" fd " + fd, t);
}
}
}
}
Expand Down