From b8543cdf4a6f99bb94e4e065bea30215cd659917 Mon Sep 17 00:00:00 2001 From: Mukund Madhav Thakur Date: Tue, 15 Sep 2020 15:11:01 +0530 Subject: [PATCH 1/4] HADOOP-17250 Lot of short reads can be merged with readahead. Introducing fs.azure.readahead.range parameter which can be set by user. Data will be populated in buffer for random reads as well which leads to lesser remote calls. This patch also changes the seek implementation to perform a lazy seek. Actual seek is done when a read is initiated and data is not present in buffer else date is returned from buffer thus reducing the number of remote calls. --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 10 + .../fs/azurebfs/AzureBlobFileSystemStore.java | 1 + .../azurebfs/constants/ConfigurationKeys.java | 8 + .../constants/FileSystemConfigurations.java | 2 + .../fs/azurebfs/services/AbfsInputStream.java | 60 +++-- .../services/AbfsInputStreamContext.java | 15 ++ .../ITestAbfsInputStreamStatistics.java | 6 +- ...TestAbfsConfigurationFieldsValidation.java | 2 + .../ITestAbfsFileSystemContractSeek.java | 223 ++++++++++++++++++ .../services/ITestAbfsInputStream.java | 4 +- 10 files changed, 308 insertions(+), 23 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 1261cc2c8c9be..50cc57447f92b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -123,6 +123,12 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_READ_BUFFER_SIZE) private int readBufferSize; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_AHEAD_RANGE, + MinValue = MIN_BUFFER_SIZE, + MaxValue = MAX_BUFFER_SIZE, + DefaultValue = DEFAULT_READ_AHEAD_RANGE) + private int readAheadRange; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MIN_BACKOFF_INTERVAL, DefaultValue = DEFAULT_MIN_BACKOFF_INTERVAL) private int minBackoffInterval; @@ -900,6 +906,10 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio } } + public int getReadAheadRange() { + return this.readAheadRange; + } + int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException { IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class); String value = get(validator.ConfigurationKey()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 553fdd713f4be..a3cd1532e675b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -721,6 +721,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely()) .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead()) + .withReadAheadRange(abfsConfiguration.getReadAheadRange()) .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .withShouldReadBufferSizeAlways( abfsConfiguration.shouldReadBufferSizeAlways()) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index df9845130bce5..c0b250705c814 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -68,6 +68,14 @@ public final class ConfigurationKeys { public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size"; public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely"; public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread"; + + /** + * Read ahead range parameter which can be set by user. + * Default value is {@code FileSystemConfigurations#DEFAULT_READ_AHEAD_RANGE}. + * This might reduce number of calls to remote as next requested + * data could already be present in buffer. + */ + public static final String AZURE_READ_AHEAD_RANGE = "fs.azure.readahead.range"; public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost"; public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 0044c261ae1db..b79424557e4a6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -62,6 +62,8 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false; public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false; public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB; + // Default value of read ahead range. + public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 5437da36096e6..d5185535b23be 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -82,6 +82,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, * @see #read(long, byte[], int, int) */ private final boolean bufferedPreadDisabled; + // User configured size of read ahead. + private final int readAheadRange; private boolean firstRead = true; // SAS tokens can be re-used until they expire @@ -112,6 +114,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final AbfsInputStreamContext context; private IOStatistics ioStatistics; + /** + * This is the actual position within the object, used by + * lazy seek to decide whether to seek on the next read or not. + */ + private long nextReadPos; public AbfsInputStream( final AbfsClient client, @@ -129,6 +136,7 @@ public AbfsInputStream( this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; + this.readAheadRange = abfsInputStreamContext.getReadAheadRange(); this.readAheadEnabled = true; this.alwaysReadBufferSize = abfsInputStreamContext.shouldReadBufferSizeAlways(); @@ -225,6 +233,22 @@ public synchronized int read(final byte[] b, final int off, final int len) throw } incrementReadOps(); do { + if (nextReadPos >= fCursor - limit && nextReadPos <= fCursor) { + // data can be read from buffer. + bCursor = (int) (nextReadPos - (fCursor - limit)); + + // When bCursor == limit, buffer will be filled again. + // So in this case we are not actually reading from buffer. + if(bCursor != limit && streamStatistics != null) { + streamStatistics.seekInBuffer(); + } + } else { + // Clearing the buffer and setting the file pointer + // based on previous seek() call. + fCursor = nextReadPos; + limit = 0; + bCursor = 0; + } if (shouldReadFully()) { lastReadBytes = readFileCompletely(b, currentOff, currentLen); } else if (shouldReadLastBlock()) { @@ -283,9 +307,13 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO } else { // Enable readAhead when reading sequentially if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { + LOG.debug("Sequential read with read ahead size of {}", bufferSize); bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); } else { - bytesRead = readInternal(fCursor, buffer, 0, b.length, true); + // Enabling read ahead for random reads as well to reduce number of remote calls. + int lengthWithReadAhead = Math.min(b.length + readAheadRange, bufferSize); + LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead); + bytesRead = readInternal(fCursor, buffer, 0, lengthWithReadAhead, true); } } if (firstRead) { @@ -419,6 +447,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){ int bytesToRead = min(len, bytesRemaining); System.arraycopy(buffer, bCursor, b, off, bytesToRead); bCursor += bytesToRead; + nextReadPos += bytesToRead; if (statistics != null) { statistics.incrementBytesRead(bytesToRead); } @@ -502,13 +531,13 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t final AbfsRestOperation op; AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { + if (streamStatistics != null) { + streamStatistics.remoteReadOperation(); + } LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get(), tracingContext); cachedSasToken.update(op.getSasToken()); - if (streamStatistics != null) { - streamStatistics.remoteReadOperation(); - } LOG.debug("issuing HTTP GET request params position = {} b.length = {} " + "offset = {} length = {}", position, b.length, offset, length); perfInfo.registerResult(op.getResult()).registerSuccess(true); @@ -566,21 +595,9 @@ public synchronized void seek(long n) throws IOException { streamStatistics.seek(n, fCursor); } - if (n>=fCursor-limit && n<=fCursor) { // within buffer - bCursor = (int) (n-(fCursor-limit)); - if (streamStatistics != null) { - streamStatistics.seekInBuffer(); - } - return; - } - // next read will read from here - fCursor = n; - LOG.debug("set fCursor to {}", fCursor); - - //invalidate buffer - limit = 0; - bCursor = 0; + nextReadPos = n; + LOG.debug("set nextReadPos to {}", nextReadPos); } @Override @@ -651,7 +668,7 @@ public synchronized long getPos() throws IOException { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - return fCursor - limit + bCursor; + return nextReadPos < 0 ? 0 : nextReadPos; } public TracingContext getTracingContext() { @@ -721,6 +738,11 @@ byte[] getBuffer() { return buffer; } + @VisibleForTesting + public int getReadAheadRange() { + return readAheadRange; + } + @VisibleForTesting protected void setCachedSasToken(final CachedSASToken cachedSasToken) { this.cachedSasToken = cachedSasToken; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index fe41f22a772ff..f8028bfb2e84b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -20,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; /** * Class to hold extra input stream configs. @@ -37,6 +38,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean alwaysReadBufferSize; private int readAheadBlockSize; + + private int readAheadRange; private AbfsInputStreamStatistics streamStatistics; @@ -69,6 +72,12 @@ public AbfsInputStreamContext withTolerateOobAppends( return this; } + public AbfsInputStreamContext withReadAheadRange( + final int readAheadRange) { + this.readAheadRange = readAheadRange; + return this; + } + public AbfsInputStreamContext withStreamStatistics( final AbfsInputStreamStatistics streamStatistics) { this.streamStatistics = streamStatistics; @@ -115,6 +124,8 @@ public AbfsInputStreamContext build() { readAheadBlockSize = readBufferSize; } // Validation of parameters to be done here. + Preconditions.checkArgument(readAheadRange > 0, + "Read ahead range should be greater than 0"); return this; } @@ -130,6 +141,10 @@ public boolean isTolerateOobAppends() { return tolerateOobAppends; } + public int getReadAheadRange() { + return readAheadRange; + } + public AbfsInputStreamStatistics getStreamStatistics() { return streamStatistics; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index ba0ce1477534d..d96f1a283609f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -129,6 +129,7 @@ public void testSeekStatistics() throws IOException { */ for (int i = 0; i < OPERATIONS; i++) { in.seek(0); + in.read(); in.seek(ONE_MB); } @@ -157,7 +158,7 @@ public void testSeekStatistics() throws IOException { * are in buffer. * * seekInBuffer - Since all seeks were in buffer, the seekInBuffer - * would be equal to 2 * OPERATIONS. + * would be equal to OPERATIONS. * */ assertEquals("Mismatch in seekOps value", 2 * OPERATIONS, @@ -170,7 +171,7 @@ public void testSeekStatistics() throws IOException { OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek()); assertEquals("Mismatch in bytesSkippedOnSeek value", 0, stats.getBytesSkippedOnSeek()); - assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS, + assertEquals("Mismatch in seekInBuffer value", OPERATIONS, stats.getSeekInBuffer()); in.close(); @@ -263,6 +264,7 @@ public void testWithNullStreamStatistics() throws IOException { .withReadBufferSize(getConfiguration().getReadBufferSize()) .withReadAheadQueueDepth(getConfiguration().getReadAheadQueueDepth()) .withStreamStatistics(null) + .withReadAheadRange(getConfiguration().getReadAheadRange()) .build(); AbfsOutputStream out = null; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java index bda845bb45ad5..fe25477beb61e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.azurebfs.utils.Base64; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_RANGE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS; @@ -141,6 +142,7 @@ public void testConfigServiceImplAnnotatedFieldsInitialized() throws Exception { assertEquals(DEFAULT_MAX_RETRY_ATTEMPTS, abfsConfiguration.getMaxIoRetries()); assertEquals(MAX_AZURE_BLOCK_SIZE, abfsConfiguration.getAzureBlockSize()); assertEquals(AZURE_BLOCK_LOCATION_HOST_DEFAULT, abfsConfiguration.getAzureBlockLocationHost()); + assertEquals(DEFAULT_READ_AHEAD_RANGE, abfsConfiguration.getReadAheadRange()); } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java index 35a5e1733d0e6..2c304374cfecb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java @@ -18,10 +18,27 @@ package org.apache.hadoop.fs.azurebfs.contract; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; import org.apache.hadoop.fs.contract.AbstractContractSeekTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture; + /** * Contract test for seek operation. */ @@ -29,6 +46,8 @@ public class ITestAbfsFileSystemContractSeek extends AbstractContractSeekTest{ private final boolean isSecure; private final ABFSContractTestBinding binding; + private static final byte[] BLOCK = dataset(100 * 1024, 0, 255); + public ITestAbfsFileSystemContractSeek() throws Exception { binding = new ABFSContractTestBinding(); this.isSecure = binding.isSecureMode(); @@ -47,6 +66,210 @@ protected Configuration createConfiguration() { @Override protected AbstractFSContract createContract(final Configuration conf) { + conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE); + conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE); return new AbfsFileSystemContract(conf, isSecure); } + + /** + * Test verifies if the data is read correctly + * when {@code ConfigurationKeys#AZURE_READ_AHEAD_RANGE} is set. + */ + @Test + public void testSeekAndReadWithReadAhead() throws IOException { + describe(" Testing seek and read with read ahead " + + "enabled for random reads"); + + Path testSeekFile = path(getMethodName() + "bigseekfile.txt"); + createDataSet(testSeekFile); + try (FSDataInputStream in = getFileSystem().open(testSeekFile)) { + AbfsInputStream inStream = ((AbfsInputStream) in.getWrappedStream()); + AbfsInputStreamStatisticsImpl streamStatistics = + (AbfsInputStreamStatisticsImpl) inStream.getStreamStatistics(); + assertEquals(String.format("Value of %s is not set correctly", AZURE_READ_AHEAD_RANGE), + MIN_BUFFER_SIZE, inStream.getReadAheadRange()); + + long remoteReadOperationsOldVal = streamStatistics.getRemoteReadOperations(); + assertEquals("Number of remote read ops should be 0 " + + "before any read call is made", 0, remoteReadOperationsOldVal); + + // Test read at first position. Remote read. + assertEquals("First call to getPos() should return 0", + 0, inStream.getPos()); + assertDataAtPos(0, (byte) in.read()); + assertSeekBufferStats(0, streamStatistics.getSeekInBuffer()); + long remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking just before read ahead range. Read from buffer. + int newSeek = inStream.getReadAheadRange() - 1; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(1, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking boundary of read ahead range. Read from buffer manager. + newSeek = inStream.getReadAheadRange(); + inStream.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(1, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking just after read ahead range. Read from buffer. + newSeek = inStream.getReadAheadRange() + 1; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(2, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking just 10 more bytes such that data is read from buffer. + newSeek += 10; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(3, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seek backward such that data is read from remote. + newSeek -= 100; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(3, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking just 10 more bytes such that data is read from buffer. + newSeek += 10; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(4, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Read multiple bytes across read ahead range. Remote read. + long oldSeek = newSeek; + newSeek = 2*inStream.getReadAheadRange() -1; + byte[] bytes = new byte[5]; + in.readFully(newSeek, bytes); + // With readFully getPos should return oldSeek pos. + // Adding one as one byte is already read + // after the last seek is done. + assertGetPosition(oldSeek + 1, in.getPos()); + assertSeekBufferStats(4, streamStatistics.getSeekInBuffer()); + assertDatasetEquals(newSeek, "Read across read ahead ", + bytes, bytes.length); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + } + } + + /** + * Test to validate the getPos() when a seek is done + * post {@code AbfsInputStream#unbuffer} call is made. + * Also using optimised builder api to open file. + */ + @Test + public void testSeekAfterUnbuffer() throws IOException { + describe("Test to make sure that seeking in AbfsInputStream after " + + "unbuffer() call is not doing anyIO."); + Path testFile = path(getMethodName() + ".txt"); + createDataSet(testFile); + final CompletableFuture future = + getFileSystem().openFile(testFile) + .build(); + try (FSDataInputStream inputStream = awaitFuture(future)) { + AbfsInputStream abfsInputStream = (AbfsInputStream) inputStream.getWrappedStream(); + AbfsInputStreamStatisticsImpl streamStatistics = + (AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics(); + int readAheadRange = abfsInputStream.getReadAheadRange(); + long seekPos = readAheadRange; + inputStream.seek(seekPos); + assertDataAtPos(readAheadRange, (byte) inputStream.read()); + long currentRemoteReadOps = streamStatistics.getRemoteReadOperations(); + assertIncrementInRemoteReadOps(0, currentRemoteReadOps); + inputStream.unbuffer(); + seekPos -= 10; + inputStream.seek(seekPos); + // Seek backwards shouldn't do any IO + assertNoIncrementInRemoteReadOps(currentRemoteReadOps, streamStatistics.getRemoteReadOperations()); + assertGetPosition(seekPos, inputStream.getPos()); + } + } + + private void createDataSet(Path path) throws IOException { + createFile(getFileSystem(), path, true, BLOCK); + } + + private void assertGetPosition(long expected, long actual) { + final String seekPosErrorMsg = "getPos() should return %s"; + assertEquals(String.format(seekPosErrorMsg, expected), expected, actual); + } + + private void assertDataAtPos(int pos, byte actualData) { + final String dataErrorMsg = "Mismatch in data@%s"; + assertEquals(String.format(dataErrorMsg, pos), BLOCK[pos], actualData); + } + + private void assertSeekBufferStats(long expected, long actual) { + final String statsErrorMsg = "Mismatch in seekInBuffer counts"; + assertEquals(statsErrorMsg, expected, actual); + } + + private void assertNoIncrementInRemoteReadOps(long oldVal, long newVal) { + final String incrementErrorMsg = "Number of remote read ops shouldn't increase"; + assertEquals(incrementErrorMsg, oldVal, newVal); + } + + private void assertIncrementInRemoteReadOps(long oldVal, long newVal) { + final String incrementErrorMsg = "Number of remote read ops should increase"; + Assertions.assertThat(newVal) + .describedAs(incrementErrorMsg) + .isGreaterThan(oldVal); + } + + /** + * Assert that the data read matches the dataset at the given offset. + * This helps verify that the seek process is moving the read pointer + * to the correct location in the file. + * @param readOffset the offset in the file where the read began. + * @param operation operation name for the assertion. + * @param data data read in. + * @param length length of data to check. + */ + private void assertDatasetEquals( + final int readOffset, + final String operation, + final byte[] data, + int length) { + for (int i = 0; i < length; i++) { + int o = readOffset + i; + assertEquals(operation + "with read offset " + readOffset + + ": data[" + i + "] != actualData[" + o + "]", + BLOCK[o], data[i]); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index 1a835439bd75b..a5d60c4b41a92 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -249,8 +249,8 @@ private void verifyBeforeSeek(AbfsInputStream abfsInputStream){ assertEquals(0, abfsInputStream.getBCursor()); } - private void verifyAfterSeek(AbfsInputStream abfsInputStream, long seekPos){ - assertEquals(seekPos, abfsInputStream.getFCursor()); + private void verifyAfterSeek(AbfsInputStream abfsInputStream, long seekPos) throws IOException { + assertEquals(seekPos, abfsInputStream.getPos()); assertEquals(-1, abfsInputStream.getFCursorAfterLastRead()); assertEquals(0, abfsInputStream.getLimit()); assertEquals(0, abfsInputStream.getBCursor()); From 4c10e70d0f3dea30cb2d4d85f9ef528af9e35827 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Fri, 25 Jun 2021 17:18:00 +0530 Subject: [PATCH 2/4] Review comments and checkstyle --- .../azurebfs/constants/ConfigurationKeys.java | 4 +- .../constants/FileSystemConfigurations.java | 1 - .../fs/azurebfs/services/AbfsInputStream.java | 14 ++++-- .../services/AbfsInputStreamContext.java | 2 +- .../ITestAbfsFileSystemContractSeek.java | 47 ++++++++++++------- 5 files changed, 44 insertions(+), 24 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index c0b250705c814..4a2c5951bd53d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -71,9 +71,9 @@ public final class ConfigurationKeys { /** * Read ahead range parameter which can be set by user. - * Default value is {@code FileSystemConfigurations#DEFAULT_READ_AHEAD_RANGE}. + * Default value is {@link FileSystemConfigurations#DEFAULT_READ_AHEAD_RANGE}. * This might reduce number of calls to remote as next requested - * data could already be present in buffer. + * data could already be present in buffer {@value}. */ public static final String AZURE_READ_AHEAD_RANGE = "fs.azure.readahead.range"; public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index b79424557e4a6..a1de9dfc0aca9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -62,7 +62,6 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false; public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false; public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB; - // Default value of read ahead range. public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index d5185535b23be..a184e68ce463b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -233,13 +233,19 @@ public synchronized int read(final byte[] b, final int off, final int len) throw } incrementReadOps(); do { - if (nextReadPos >= fCursor - limit && nextReadPos <= fCursor) { - // data can be read from buffer. - bCursor = (int) (nextReadPos - (fCursor - limit)); + + // limit is the maximum amount of data present in buffer. + // fCursor is the current file pointer. Thus maximum we can + // go back and read from buffer is fCursor - limit. + // There maybe case that we read less than requested data. + long bytesPresentInBuffer = fCursor - limit; + if (nextReadPos >= bytesPresentInBuffer && nextReadPos <= fCursor) { + // Determining position in buffer from where data is to be read. + bCursor = (int) (nextReadPos - bytesPresentInBuffer); // When bCursor == limit, buffer will be filled again. // So in this case we are not actually reading from buffer. - if(bCursor != limit && streamStatistics != null) { + if (bCursor != limit && streamStatistics != null) { streamStatistics.seekInBuffer(); } } else { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index f8028bfb2e84b..55f01bf15bcf7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -38,7 +38,7 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean alwaysReadBufferSize; private int readAheadBlockSize; - + private int readAheadRange; private AbfsInputStreamStatistics streamStatistics; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java index 2c304374cfecb..e22a169f683f5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java @@ -74,11 +74,14 @@ protected AbstractFSContract createContract(final Configuration conf) { /** * Test verifies if the data is read correctly * when {@code ConfigurationKeys#AZURE_READ_AHEAD_RANGE} is set. + * Reason for not breaking this test into smaller parts is we + * really want to simulate lot of forward and backward seeks + * similar to real production use case. */ @Test public void testSeekAndReadWithReadAhead() throws IOException { - describe(" Testing seek and read with read ahead " + - "enabled for random reads"); + describe(" Testing seek and read with read ahead " + + "enabled for random reads"); Path testSeekFile = path(getMethodName() + "bigseekfile.txt"); createDataSet(testSeekFile); @@ -90,12 +93,15 @@ public void testSeekAndReadWithReadAhead() throws IOException { MIN_BUFFER_SIZE, inStream.getReadAheadRange()); long remoteReadOperationsOldVal = streamStatistics.getRemoteReadOperations(); - assertEquals("Number of remote read ops should be 0 " + - "before any read call is made", 0, remoteReadOperationsOldVal); + Assertions.assertThat(remoteReadOperationsOldVal) + .describedAs("Number of remote read ops should be 0 " + + "before any read call is made") + .isEqualTo(0); // Test read at first position. Remote read. - assertEquals("First call to getPos() should return 0", - 0, inStream.getPos()); + Assertions.assertThat(inStream.getPos()) + .describedAs("First call to getPos() should return 0") + .isEqualTo(0); assertDataAtPos(0, (byte) in.read()); assertSeekBufferStats(0, streamStatistics.getSeekInBuffer()); long remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); @@ -148,7 +154,7 @@ public void testSeekAndReadWithReadAhead() throws IOException { remoteReadOperationsOldVal = remoteReadOperationsNewVal; // Seek backward such that data is read from remote. - newSeek -= 100; + newSeek -= 101; in.seek(newSeek); assertGetPosition(newSeek, in.getPos()); assertDataAtPos(newSeek, (byte) in.read()); @@ -194,8 +200,8 @@ public void testSeekAndReadWithReadAhead() throws IOException { */ @Test public void testSeekAfterUnbuffer() throws IOException { - describe("Test to make sure that seeking in AbfsInputStream after " + - "unbuffer() call is not doing anyIO."); + describe("Test to make sure that seeking in AbfsInputStream after " + + "unbuffer() call is not doing anyIO."); Path testFile = path(getMethodName() + ".txt"); createDataSet(testFile); final CompletableFuture future = @@ -226,22 +232,30 @@ private void createDataSet(Path path) throws IOException { private void assertGetPosition(long expected, long actual) { final String seekPosErrorMsg = "getPos() should return %s"; - assertEquals(String.format(seekPosErrorMsg, expected), expected, actual); + Assertions.assertThat(actual) + .describedAs(seekPosErrorMsg, expected) + .isEqualTo(actual); } private void assertDataAtPos(int pos, byte actualData) { final String dataErrorMsg = "Mismatch in data@%s"; - assertEquals(String.format(dataErrorMsg, pos), BLOCK[pos], actualData); + Assertions.assertThat(actualData) + .describedAs(dataErrorMsg, pos) + .isEqualTo(BLOCK[pos]); } private void assertSeekBufferStats(long expected, long actual) { final String statsErrorMsg = "Mismatch in seekInBuffer counts"; - assertEquals(statsErrorMsg, expected, actual); + Assertions.assertThat(actual) + .describedAs(statsErrorMsg) + .isEqualTo(expected); } private void assertNoIncrementInRemoteReadOps(long oldVal, long newVal) { final String incrementErrorMsg = "Number of remote read ops shouldn't increase"; - assertEquals(incrementErrorMsg, oldVal, newVal); + Assertions.assertThat(newVal) + .describedAs(incrementErrorMsg) + .isEqualTo(oldVal); } private void assertIncrementInRemoteReadOps(long oldVal, long newVal) { @@ -267,9 +281,10 @@ private void assertDatasetEquals( int length) { for (int i = 0; i < length; i++) { int o = readOffset + i; - assertEquals(operation + "with read offset " + readOffset - + ": data[" + i + "] != actualData[" + o + "]", - BLOCK[o], data[i]); + Assertions.assertThat(data[i]) + .describedAs(operation + "with read offset " + readOffset + + ": data[" + i + "] != actualData[" + o + "]") + .isEqualTo(BLOCK[o]); } } } From 5d44ac5a922cc330ad596a891261d7206c8acd2a Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Mon, 5 Jul 2021 12:13:26 +0530 Subject: [PATCH 3/4] HADOOP-17250 Review comment and checksyle --- .../apache/hadoop/fs/azurebfs/services/AbfsInputStream.java | 6 +++--- .../azurebfs/contract/ITestAbfsFileSystemContractSeek.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index a184e68ce463b..c98819cadc489 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -238,10 +238,10 @@ public synchronized int read(final byte[] b, final int off, final int len) throw // fCursor is the current file pointer. Thus maximum we can // go back and read from buffer is fCursor - limit. // There maybe case that we read less than requested data. - long bytesPresentInBuffer = fCursor - limit; - if (nextReadPos >= bytesPresentInBuffer && nextReadPos <= fCursor) { + long filePosAtStartOfBuffer = fCursor - limit; + if (nextReadPos >= filePosAtStartOfBuffer && nextReadPos <= fCursor) { // Determining position in buffer from where data is to be read. - bCursor = (int) (nextReadPos - bytesPresentInBuffer); + bCursor = (int) (nextReadPos - filePosAtStartOfBuffer); // When bCursor == limit, buffer will be filled again. // So in this case we are not actually reading from buffer. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java index e22a169f683f5..479f52a969a10 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java @@ -154,7 +154,7 @@ public void testSeekAndReadWithReadAhead() throws IOException { remoteReadOperationsOldVal = remoteReadOperationsNewVal; // Seek backward such that data is read from remote. - newSeek -= 101; + newSeek -= 105; in.seek(newSeek); assertGetPosition(newSeek, in.getPos()); assertDataAtPos(newSeek, (byte) in.read()); From cc79131214eb6eebbd3580bb17398ff9bd1d0049 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Mon, 5 Jul 2021 15:01:40 +0530 Subject: [PATCH 4/4] magic num fix --- .../fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java index 479f52a969a10..3222a15cd527d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java @@ -154,7 +154,7 @@ public void testSeekAndReadWithReadAhead() throws IOException { remoteReadOperationsOldVal = remoteReadOperationsNewVal; // Seek backward such that data is read from remote. - newSeek -= 105; + newSeek -= 106; in.seek(newSeek); assertGetPosition(newSeek, in.getPos()); assertDataAtPos(newSeek, (byte) in.read());