diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 66d485317c9ba..b2e7dad9a4d77 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -106,6 +106,12 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_READ_BUFFER_SIZE) private int readBufferSize; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_AHEAD_RANGE, + MinValue = MIN_BUFFER_SIZE, + MaxValue = MAX_BUFFER_SIZE, + DefaultValue = DEFAULT_READ_AHEAD_RANGE) + private int readAheadRange; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MIN_BACKOFF_INTERVAL, DefaultValue = DEFAULT_MIN_BACKOFF_INTERVAL) private int minBackoffInterval; @@ -765,6 +771,10 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio } } + public int getReadAheadRange() { + return this.readAheadRange; + } + int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException { IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class); String value = get(validator.ConfigurationKey()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 23d2b5a3d63fb..82d41a21a58a9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -552,6 +552,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext() { .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) + .withReadAheadRange(abfsConfiguration.getReadAheadRange()) .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .build(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 681390c019873..42ec17e76e3ca 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -56,6 +56,14 @@ public final class ConfigurationKeys { public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue"; public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size"; public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size"; + + /** + * Read ahead range parameter which can be set by user. + * Default value is {@code FileSystemConfigurations#DEFAULT_READ_AHEAD_RANGE}. + * This might reduce number of calls to remote as next requested + * data could already be present in buffer. + */ + public static final String AZURE_READ_AHEAD_RANGE = "fs.azure.readahead.range"; public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost"; public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index f70d102c1d905..619542f62afc8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -57,6 +57,8 @@ public final class FileSystemConfigurations { public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB + // Default value of read ahead range. + public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index ff3bd63cc7bf7..9188278ae629c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -56,6 +56,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final String eTag; // eTag of the path when InputStream are created private final boolean tolerateOobAppends; // whether tolerate Oob Appends private final boolean readAheadEnabled; // whether enable readAhead; + // User configured size of read ahead. + private final int readAheadRange; // SAS tokens can be re-used until they expire private CachedSASToken cachedSasToken; @@ -73,6 +75,12 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private long bytesFromReadAhead; // bytes read from readAhead; for testing private long bytesFromRemoteRead; // bytes read remotely; for testing + /** + * This is the actual position within the object, used by + * lazy seek to decide whether to seek on the next read or not. + */ + private long nextReadPos; + public AbfsInputStream( final AbfsClient client, final Statistics statistics, @@ -88,6 +96,7 @@ public AbfsInputStream( this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; + this.readAheadRange = abfsInputStreamContext.getReadAheadRange(); this.readAheadEnabled = true; this.cachedSasToken = new CachedSASToken( abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); @@ -128,6 +137,22 @@ public synchronized int read(final byte[] b, final int off, final int len) throw } incrementReadOps(); do { + if (nextReadPos >= fCursor - limit && nextReadPos <= fCursor) { + // data can be read from buffer. + bCursor = (int) (nextReadPos - (fCursor - limit)); + + // When bCursor == limit, buffer will be filled again. + // So in this case we are not actually reading from buffer. + if (bCursor != limit) { + streamStatistics.seekInBuffer(); + } + } else { + // Clearing the buffer and setting the file pointer + // based on previous seek() call. + fCursor = nextReadPos; + limit = 0; + bCursor = 0; + } lastReadBytes = readOneBlock(b, currentOff, currentLen); if (lastReadBytes > 0) { currentOff += lastReadBytes; @@ -180,9 +205,13 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO // Enable readAhead when reading sequentially if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { + LOG.debug("Sequential read with read ahead size of {}", bufferSize); bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); } else { - bytesRead = readInternal(fCursor, buffer, 0, b.length, true); + // Enabling read ahead for random reads as well to reduce number of remote calls. + int lengthWithReadAhead = Math.min(b.length + readAheadRange, bufferSize); + LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead); + bytesRead = readInternal(fCursor, buffer, 0, lengthWithReadAhead, true); } if (bytesRead == -1) { @@ -200,6 +229,7 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO int bytesToRead = Math.min(len, bytesRemaining); System.arraycopy(buffer, bCursor, b, off, bytesToRead); bCursor += bytesToRead; + nextReadPos += bytesToRead; if (statistics != null) { statistics.incrementBytesRead(bytesToRead); } @@ -278,12 +308,12 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti final AbfsRestOperation op; AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { - LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); - op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get()); - cachedSasToken.update(op.getSasToken()); if (streamStatistics != null) { streamStatistics.remoteReadOperation(); } + LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); + op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get()); + cachedSasToken.update(op.getSasToken()); LOG.debug("issuing HTTP GET request params position = {} b.length = {} " + "offset = {} length = {}", position, b.length, offset, length); perfInfo.registerResult(op.getResult()).registerSuccess(true); @@ -341,21 +371,9 @@ public synchronized void seek(long n) throws IOException { streamStatistics.seek(n, fCursor); } - if (n>=fCursor-limit && n<=fCursor) { // within buffer - bCursor = (int) (n-(fCursor-limit)); - if (streamStatistics != null) { - streamStatistics.seekInBuffer(); - } - return; - } - // next read will read from here - fCursor = n; - LOG.debug("set fCursor to {}", fCursor); - - //invalidate buffer - limit = 0; - bCursor = 0; + nextReadPos = n; + LOG.debug("set nextReadPos to {}", nextReadPos); } @Override @@ -426,7 +444,7 @@ public synchronized long getPos() throws IOException { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - return fCursor - limit + bCursor; + return nextReadPos < 0 ? 0 : nextReadPos; } /** @@ -492,6 +510,11 @@ byte[] getBuffer() { return buffer; } + @VisibleForTesting + public int getReadAheadRange() { + return readAheadRange; + } + @VisibleForTesting protected void setCachedSasToken(final CachedSASToken cachedSasToken) { this.cachedSasToken = cachedSasToken; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index f8d3b2a599bfe..7b50ba04ea454 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.azurebfs.services; +import com.google.common.base.Preconditions; + /** * Class to hold extra input stream configs. */ @@ -29,6 +31,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean tolerateOobAppends; + private int readAheadRange; + private AbfsInputStreamStatistics streamStatistics; public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { @@ -54,6 +58,12 @@ public AbfsInputStreamContext withTolerateOobAppends( return this; } + public AbfsInputStreamContext withReadAheadRange( + final int readAheadRange) { + this.readAheadRange = readAheadRange; + return this; + } + public AbfsInputStreamContext withStreamStatistics( final AbfsInputStreamStatistics streamStatistics) { this.streamStatistics = streamStatistics; @@ -62,6 +72,8 @@ public AbfsInputStreamContext withStreamStatistics( public AbfsInputStreamContext build() { // Validation of parameters to be done here. + Preconditions.checkArgument(readAheadRange > 0, + "Read ahead range should be greater than 0"); return this; } @@ -77,6 +89,10 @@ public boolean isTolerateOobAppends() { return tolerateOobAppends; } + public int getReadAheadRange() { + return readAheadRange; + } + public AbfsInputStreamStatistics getStreamStatistics() { return streamStatistics; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index 52dfdf2a61ca8..df145172fc454 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -121,6 +121,7 @@ public void testSeekStatistics() throws IOException { */ for (int i = 0; i < OPERATIONS; i++) { in.seek(0); + in.read(); in.seek(ONE_MB); } @@ -150,7 +151,7 @@ public void testSeekStatistics() throws IOException { * are in buffer. * * seekInBuffer - Since all seeks were in buffer, the seekInBuffer - * would be equal to 2 * OPERATIONS. + * would be equal to OPERATIONS. * */ assertEquals("Mismatch in seekOps value", 2 * OPERATIONS, @@ -163,7 +164,7 @@ public void testSeekStatistics() throws IOException { -1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek()); assertEquals("Mismatch in bytesSkippedOnSeek value", 0, stats.getBytesSkippedOnSeek()); - assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS, + assertEquals("Mismatch in seekInBuffer value", OPERATIONS, stats.getSeekInBuffer()); in.close(); @@ -255,6 +256,7 @@ public void testWithNullStreamStatistics() throws IOException { .withReadBufferSize(getConfiguration().getReadBufferSize()) .withReadAheadQueueDepth(getConfiguration().getReadAheadQueueDepth()) .withStreamStatistics(null) + .withReadAheadRange(getConfiguration().getReadAheadRange()) .build(); AbfsOutputStream out = null; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java index bda845bb45ad5..fe25477beb61e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.azurebfs.utils.Base64; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_RANGE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS; @@ -141,6 +142,7 @@ public void testConfigServiceImplAnnotatedFieldsInitialized() throws Exception { assertEquals(DEFAULT_MAX_RETRY_ATTEMPTS, abfsConfiguration.getMaxIoRetries()); assertEquals(MAX_AZURE_BLOCK_SIZE, abfsConfiguration.getAzureBlockSize()); assertEquals(AZURE_BLOCK_LOCATION_HOST_DEFAULT, abfsConfiguration.getAzureBlockLocationHost()); + assertEquals(DEFAULT_READ_AHEAD_RANGE, abfsConfiguration.getReadAheadRange()); } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java index 35a5e1733d0e6..8d066dc47b71e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java @@ -18,10 +18,27 @@ package org.apache.hadoop.fs.azurebfs.contract; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; import org.apache.hadoop.fs.contract.AbstractContractSeekTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture; + /** * Contract test for seek operation. */ @@ -29,6 +46,8 @@ public class ITestAbfsFileSystemContractSeek extends AbstractContractSeekTest{ private final boolean isSecure; private final ABFSContractTestBinding binding; + private static final byte[] BLOCK = dataset(100 * 1024, 0, 255); + public ITestAbfsFileSystemContractSeek() throws Exception { binding = new ABFSContractTestBinding(); this.isSecure = binding.isSecureMode(); @@ -47,6 +66,210 @@ protected Configuration createConfiguration() { @Override protected AbstractFSContract createContract(final Configuration conf) { + conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE); + conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE); return new AbfsFileSystemContract(conf, isSecure); } + + /** + * Test verifies if the data is read correctly + * when {@code ConfigurationKeys#AZURE_READ_AHEAD_RANGE} is set. + */ + @Test + public void testSeekAndReadWithReadAhead() throws IOException { + describe(" Testing seek and read with read ahead " + + "enabled for random reads"); + + Path testSeekFile = path(getMethodName() + "bigseekfile.txt"); + createDataSet(testSeekFile); + try (FSDataInputStream in = getFileSystem().open(testSeekFile)) { + AbfsInputStream inStream = ((AbfsInputStream) in.getWrappedStream()); + AbfsInputStreamStatisticsImpl streamStatistics = + (AbfsInputStreamStatisticsImpl) inStream.getStreamStatistics(); + assertEquals(String.format("Value of %s is not set correctly", AZURE_READ_AHEAD_RANGE), + MIN_BUFFER_SIZE, inStream.getReadAheadRange()); + + long remoteReadOperationsOldVal = streamStatistics.getRemoteReadOperations(); + assertEquals("Number of remote read ops should be 0 " + + "before any read call is made", 0, remoteReadOperationsOldVal); + + // Test read at first position. Remote read. + assertEquals("First call to getPos() should return 0", + 0, inStream.getPos()); + assertDataAtPos(0, (byte) in.read()); + assertSeekBufferStats(0, streamStatistics.getSeekInBuffer()); + long remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking just before read ahead range. Read from buffer. + int newSeek = inStream.getReadAheadRange() - 1; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(1, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking boundary of read ahead range. Read from buffer manager. + newSeek = inStream.getReadAheadRange(); + inStream.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(1, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking just after read ahead range. Read from buffer. + newSeek = inStream.getReadAheadRange() + 1; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(2, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking just 10 more bytes such that data is read from buffer. + newSeek += 10; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(3, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seek backward such that data is read from remote. + newSeek -= 101; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(3, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking just 10 more bytes such that data is read from buffer. + newSeek += 10; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(4, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Read multiple bytes across read ahead range. Remote read. + long oldSeek = newSeek; + newSeek = 2*inStream.getReadAheadRange() -1; + byte[] bytes = new byte[5]; + in.readFully(newSeek, bytes); + // With readFully getPos should return oldSeek pos. + // Adding one as one byte is already read + // after the last seek is done. + assertGetPosition(oldSeek + 1, in.getPos()); + assertSeekBufferStats(4, streamStatistics.getSeekInBuffer()); + assertDatasetEquals(newSeek, "Read across read ahead ", + bytes, bytes.length); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + } + } + + /** + * Test to validate the getPos() when a seek is done + * post {@code AbfsInputStream#unbuffer} call is made. + * Also using optimised builder api to open file. + */ + @Test + public void testSeekAfterUnbuffer() throws IOException { + describe("Test to make sure that seeking in AbfsInputStream after " + + "unbuffer() call is not doing anyIO."); + Path testFile = path(getMethodName() + ".txt"); + createDataSet(testFile); + final CompletableFuture future = + getFileSystem().openFile(testFile) + .build(); + try (FSDataInputStream inputStream = awaitFuture(future)) { + AbfsInputStream abfsInputStream = (AbfsInputStream) inputStream.getWrappedStream(); + AbfsInputStreamStatisticsImpl streamStatistics = + (AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics(); + int readAheadRange = abfsInputStream.getReadAheadRange(); + long seekPos = readAheadRange; + inputStream.seek(seekPos); + assertDataAtPos(readAheadRange, (byte) inputStream.read()); + long currentRemoteReadOps = streamStatistics.getRemoteReadOperations(); + assertIncrementInRemoteReadOps(0, currentRemoteReadOps); + inputStream.unbuffer(); + seekPos -= 10; + inputStream.seek(seekPos); + // Seek backwards shouldn't do any IO + assertNoIncrementInRemoteReadOps(currentRemoteReadOps, streamStatistics.getRemoteReadOperations()); + assertGetPosition(seekPos, inputStream.getPos()); + } + } + + private void createDataSet(Path path) throws IOException { + createFile(getFileSystem(), path, true, BLOCK); + } + + private void assertGetPosition(long expected, long actual) { + final String seekPosErrorMsg = "getPos() should return %s"; + assertEquals(String.format(seekPosErrorMsg, expected), expected, actual); + } + + private void assertDataAtPos(int pos, byte actualData) { + final String dataErrorMsg = "Mismatch in data@%s"; + assertEquals(String.format(dataErrorMsg, pos), BLOCK[pos], actualData); + } + + private void assertSeekBufferStats(long expected, long actual) { + final String statsErrorMsg = "Mismatch in seekInBuffer counts"; + assertEquals(statsErrorMsg, expected, actual); + } + + private void assertNoIncrementInRemoteReadOps(long oldVal, long newVal) { + final String incrementErrorMsg = "Number of remote read ops shouldn't increase"; + assertEquals(incrementErrorMsg, oldVal, newVal); + } + + private void assertIncrementInRemoteReadOps(long oldVal, long newVal) { + final String incrementErrorMsg = "Number of remote read ops should increase"; + Assertions.assertThat(newVal) + .describedAs(incrementErrorMsg) + .isGreaterThan(oldVal); + } + + /** + * Assert that the data read matches the dataset at the given offset. + * This helps verify that the seek process is moving the read pointer + * to the correct location in the file. + * @param readOffset the offset in the file where the read began. + * @param operation operation name for the assertion. + * @param data data read in. + * @param length length of data to check. + */ + private void assertDatasetEquals( + final int readOffset, + final String operation, + final byte[] data, + int length) { + for (int i = 0; i < length; i++) { + int o = readOffset + i; + assertEquals(operation + "with read offset " + readOffset + + ": data[" + i + "] != actualData[" + o + "]", + BLOCK[o], data[i]); + } + } }