Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ private int readInternal(final long position, final byte[] b, final int offset,
if (receivedBytes > 0) {
incrementReadOps();
LOG.debug("Received data from read ahead, not doing remote read");
if (streamStatistics != null) {
streamStatistics.readAheadBytesRead(receivedBytes);
}
return receivedBytes;
}

Expand Down Expand Up @@ -292,6 +295,9 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
throw new IOException(ex);
}
long bytesRead = op.getResult().getBytesReceived();
if (streamStatistics != null) {
streamStatistics.remoteBytesRead(bytesRead);
}
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ public interface AbfsInputStreamStatistics {
*/
void remoteReadOperation();

/**
* Records the bytes read from readAhead buffer.
* @param bytes the bytes to be incremented.
*/
void readAheadBytesRead(long bytes);

/**
* Records bytes read remotely after nothing from readAheadBuffer was read.
* @param bytes the bytes to be incremented.
*/
void remoteBytesRead(long bytes);

/**
* Makes the string of all the AbfsInputStream statistics.
* @return the string with all the statistics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class AbfsInputStreamStatisticsImpl
private long readOperations;
private long bytesReadFromBuffer;
private long remoteReadOperations;
private long readAheadBytesRead;
private long remoteBytesRead;

/**
* Seek backwards, incrementing the seek and backward seek counters.
Expand Down Expand Up @@ -128,6 +130,30 @@ public void readOperationStarted(long pos, long len) {
readOperations++;
}

/**
* Total bytes read from readAhead buffer during a read operation.
*
* @param bytes the bytes to be incremented.
*/
@Override
public void readAheadBytesRead(long bytes) {
if (bytes > 0) {
readAheadBytesRead += bytes;
}
}

/**
* Total bytes read remotely after nothing was read from readAhead buffer.
*
* @param bytes the bytes to be incremented.
*/
@Override
public void remoteBytesRead(long bytes) {
if (bytes > 0) {
remoteBytesRead += bytes;
}
}

/**
* {@inheritDoc}
*
Expand Down Expand Up @@ -178,6 +204,14 @@ public long getRemoteReadOperations() {
return remoteReadOperations;
}

public long getReadAheadBytesRead() {
return readAheadBytesRead;
}

public long getRemoteBytesRead() {
return remoteBytesRead;
}

/**
* String operator describes all the current statistics.
* <b>Important: there are no guarantees as to the stability
Expand All @@ -199,6 +233,8 @@ public String toString() {
sb.append(", ReadOperations=").append(readOperations);
sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
sb.append(", remoteReadOperations=").append(remoteReadOperations);
sb.append(", readAheadBytesRead=").append(readAheadBytesRead);
sb.append(", remoteBytesRead=").append(remoteBytesRead);
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,6 +40,10 @@ public class ITestAbfsInputStreamStatistics
LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class);
private static final int ONE_MB = 1024 * 1024;
private static final int ONE_KB = 1024;
private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024;
private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE;
private static final int THREAD_SLEEP_10_SECONDS = 10;
private static final int TIMEOUT_30_SECONDS = 30000;
private byte[] defBuffer = new byte[ONE_MB];

public ITestAbfsInputStreamStatistics() throws Exception {
Expand Down Expand Up @@ -75,6 +80,8 @@ public void testInitValues() throws IOException {
checkInitValue(stats.getReadOperations(), "readOps");
checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer");
checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps");
checkInitValue(stats.getReadAheadBytesRead(), "readAheadBytesRead");
checkInitValue(stats.getRemoteBytesRead(), "readAheadRemoteBytesRead");

} finally {
IOUtils.cleanupWithLogger(LOG, outputStream, inputStream);
Expand Down Expand Up @@ -285,6 +292,94 @@ public void testWithNullStreamStatistics() throws IOException {
}
}

/**
* Testing readAhead counters in AbfsInputStream with 30 seconds timeout.
*/
@Test(timeout = TIMEOUT_30_SECONDS)
public void testReadAheadCounters() throws IOException, InterruptedException {
describe("Test to check correct values for readAhead counters in "
+ "AbfsInputStream");

AzureBlobFileSystem fs = getFileSystem();
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
Path readAheadCountersPath = path(getMethodName());

/*
* Setting the block size for readAhead as 4KB.
*/
abfss.getAbfsConfiguration().setReadBufferSize(CUSTOM_BLOCK_BUFFER_SIZE);

AbfsOutputStream out = null;
AbfsInputStream in = null;

try {

/*
* Creating a file of 1MB size.
*/
out = createAbfsOutputStreamWithFlushEnabled(fs, readAheadCountersPath);
out.write(defBuffer);
out.close();

in = abfss.openFileForRead(readAheadCountersPath, fs.getFsStatistics());

/*
* Reading 1KB after each i * KB positions. Hence the reads are from 0
* to 1KB, 1KB to 2KB, and so on.. for 5 operations.
*/
for (int i = 0; i < 5; i++) {
in.seek(ONE_KB * i);
in.read(defBuffer, ONE_KB * i, ONE_KB);
}
AbfsInputStreamStatisticsImpl stats =
(AbfsInputStreamStatisticsImpl) in.getStreamStatistics();

/*
* Since, readAhead is done in background threads. Sometimes, the
* threads aren't finished in the background and could result in
* inaccurate results. So, we wait till we have the accurate values
* with a limit of 30 seconds as that's when the test times out.
*
*/
while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE
|| stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) {
Thread.sleep(THREAD_SLEEP_10_SECONDS);
}

/*
* Verifying the counter values of readAheadBytesRead and remoteBytesRead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice explanation

*
* readAheadBytesRead : Since, we read 1KBs 5 times, that means we go
* from 0 to 5KB in the file. The bufferSize is set to 4KB, and since
* we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB
* buffer. Our read is till 5KB, hence readAhead would ideally read 2
* blocks of 4KB which is equal to 8KB. But, sometimes to get more than
* one block from readAhead buffer we might have to wait for background
* threads to fill the buffer and hence we might do remote read which
* would be faster. Therefore, readAheadBytesRead would be equal to or
* greater than 4KB.
*
* remoteBytesRead : Since, the bufferSize is set to 4KB and the number
* of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4
* KB buffer on the first read, which is equal to 32KB. But, if we are not
* able to read some bytes that were in the buffer after doing
* readAhead, we might use remote read again. Thus, the bytes read
* remotely could also be greater than 32Kb.
*
*/
Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs(
"Mismatch in readAheadBytesRead counter value")
.isGreaterThanOrEqualTo(CUSTOM_BLOCK_BUFFER_SIZE);

Assertions.assertThat(stats.getRemoteBytesRead()).describedAs(
"Mismatch in remoteBytesRead counter value")
.isGreaterThanOrEqualTo(CUSTOM_READ_AHEAD_BUFFER_SIZE);

} finally {
IOUtils.cleanupWithLogger(LOG, out, in);
}
}

/**
* Method to assert the initial values of the statistics.
*
Expand Down