diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 85bd37a7702c0..5ebab8463a804 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -239,6 +239,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS) private long sasTokenRenewPeriodForStreamsInSeconds; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_PREAD, + DefaultValue = DEFAULT_ENABLE_PREAD) + private boolean enablePread; + public AbfsConfiguration(final Configuration rawConfig, String accountName) throws IllegalAccessException, InvalidConfigurationValueException, IOException { this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders( @@ -640,6 +644,10 @@ public boolean shouldTrackLatency() { return this.trackLatency; } + public boolean isPreadEnabled() { + return this.enablePread; + } + public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException { AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey); if (authType == AuthType.OAuth) { @@ -862,6 +870,11 @@ void setIsNamespaceEnabledAccount(String isNamespaceEnabledAccount) { this.isNamespaceEnabledAccount = isNamespaceEnabledAccount; } + @VisibleForTesting + void setEnablePread(boolean enablePread) { + this.enablePread = enablePread; + } + private String getTrimmedPasswordString(String key, String defaultValue) throws IOException { String value = getPasswordString(key); if (StringUtils.isBlank(value)) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 59c2e263b25b9..dd3d0ce8efa39 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -546,6 +546,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext() { .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) + .withPreadEnabled(abfsConfiguration.isPreadEnabled()) .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .build(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 5f1ad31e4876c..5eec348007c2f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -140,6 +140,7 @@ public final class ConfigurationKeys { public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT = "fs.azure.account.oauth2.refresh.token.endpoint"; /** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */ public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track"; + public static final String FS_AZURE_ENABLE_PREAD = "fs.azure.enable.pread"; public static String accountProperty(String property, String account) { return property + "." + account; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index f70d102c1d905..6164be3564a5f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -94,5 +94,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_DELETE_CONSIDERED_IDEMPOTENT = true; public static final int DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS = 5 * 60 * 1000; // 5 mins + public static final boolean DEFAULT_ENABLE_PREAD = false; + private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 926c23d7c53b6..ae3c004c49f61 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -56,6 +56,14 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final String eTag; // eTag of the path when InputStream are created private final boolean tolerateOobAppends; // whether tolerate Oob Appends private final boolean readAheadEnabled; // whether enable readAhead; + /* + * By default the pread API will do a seek + read as in FSInputStream. This will fill read data + * size considering the bufferSize being passed. The read data will be kept in a buffer. When this + * is enabled, the pread API will read only the specified amount of data from the given offset and + * the buffer will not come into use at all. + * @see #read(long, byte[], int, int) + */ + private boolean enablePread; // SAS tokens can be re-used until they expire private CachedSASToken cachedSasToken; @@ -85,6 +93,7 @@ public AbfsInputStream( this.bufferSize = abfsInputStreamContext.getReadBufferSize(); this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); + this.enablePread = abfsInputStreamContext.isPreadEnabled(); this.eTag = eTag; this.readAheadEnabled = true; this.cachedSasToken = new CachedSASToken( @@ -96,6 +105,28 @@ public String getPath() { return path; } + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + if (!enablePread) { + return super.read(position, buffer, offset, length); + } + validatePositionedReadArgs(position, buffer, offset, length); + if (length == 0) { + return 0; + } + if (streamStatistics != null) { + streamStatistics.readOperationStarted(offset, length); + } + int bytesRead = readRemote(position, buffer, offset, length); + if (statistics != null) { + statistics.incrementBytesRead(bytesRead); + } + if (streamStatistics != null) { + streamStatistics.bytesRead(bytesRead); + } + return bytesRead; + } + @Override public int read() throws IOException { byte[] b = new byte[1]; @@ -503,6 +534,11 @@ public AbfsInputStreamStatistics getStreamStatistics() { return streamStatistics; } + @VisibleForTesting + void setEnablePread(boolean enablePread) { + this.enablePread = enablePread; + } + /** * Get the statistics of the stream. * @return a string value. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index f8d3b2a599bfe..a46b314162315 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -29,6 +29,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean tolerateOobAppends; + private boolean preadEnabled; + private AbfsInputStreamStatistics streamStatistics; public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { @@ -54,6 +56,11 @@ public AbfsInputStreamContext withTolerateOobAppends( return this; } + public AbfsInputStreamContext withPreadEnabled(final boolean preadEnabled) { + this.preadEnabled = preadEnabled; + return this; + } + public AbfsInputStreamContext withStreamStatistics( final AbfsInputStreamStatistics streamStatistics) { this.streamStatistics = streamStatistics; @@ -77,6 +84,10 @@ public boolean isTolerateOobAppends() { return tolerateOobAppends; } + public boolean isPreadEnabled() { + return preadEnabled; + } + public AbfsInputStreamStatistics getStreamStatistics() { return streamStatistics; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index 8385099a78d36..d6448898b4365 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -380,6 +380,80 @@ public void testReadAheadCounters() throws IOException, InterruptedException { } } + @Test + public void testPread() throws IOException { + describe("Testing preads in AbfsInputStream"); + + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + Path readStatPath = path(getMethodName()); + + AbfsOutputStream out = null; + AbfsInputStream in = null; + boolean oldPreadEnabled = abfss.getAbfsConfiguration().isPreadEnabled(); + try { + out = createAbfsOutputStreamWithFlushEnabled(fs, readStatPath); + /* + * Writing 1MB buffer to the file. + */ + out.write(defBuffer); + out.hflush(); + + in = abfss.openFileForRead(readStatPath, fs.getFsStatistics()); + /* + * Doing 10 bytes pread 10 times. + */ + int bytesPerRead = 10; + int pos = 0; + for (int i = 0; i < OPERATIONS; i++) { + in.read(pos, defBuffer, pos, bytesPerRead); + pos += bytesPerRead; + } + AbfsInputStreamStatisticsImpl stats = (AbfsInputStreamStatisticsImpl) in + .getStreamStatistics(); + LOG.info("STATISTICS: {}", stats.toString()); + /* + * bytesRead - Since each time 10 bytes are read, total bytes read would be equal to + * OPERATIONS * 10. + * + * readOps - Since each time read operation is performed OPERATIONS times, total number of + * read operations would be equal to OPERATIONS. + * + * remoteReadOps - Only a single remote read operation is done. Hence, total remote read ops + * is 1. + */ + assertEquals("Mismatch in bytesRead value", OPERATIONS * bytesPerRead, stats.getBytesRead()); + assertEquals("Mismatch in readOps value", OPERATIONS, stats.getReadOperations()); + assertEquals("Mismatch in remoteReadOps value", 1, stats.getRemoteReadOperations()); + in.close(); + // Verifying if stats are still readable after stream is closed. + LOG.info("STATISTICS after closing: {}", stats.toString()); + + // Now test with pread enabled. + abfss.getAbfsConfiguration().setEnablePread(true); + in = abfss.openFileForRead(readStatPath, fs.getFsStatistics()); + pos = 0; + for (int i = 0; i < OPERATIONS; i++) { + in.read(pos, defBuffer, pos, bytesPerRead); + pos += bytesPerRead; + } + stats = (AbfsInputStreamStatisticsImpl) in.getStreamStatistics(); + LOG.info("STATISTICS: {}", stats.toString()); + /* + * remoteReadOps - Pread will do exactly those many bytes being asked for and no buffering. So + * there will be 10 remote reads. + */ + assertEquals("Mismatch in bytesRead value", OPERATIONS * bytesPerRead, stats.getBytesRead()); + assertEquals("Mismatch in readOps value", OPERATIONS, stats.getReadOperations()); + assertEquals("Mismatch in remoteReadOps value", OPERATIONS, stats.getRemoteReadOperations()); + assertEquals("Mismatch in bytesReadFromBuffer value", 0, stats.getBytesReadFromBuffer()); + in.close(); + } finally { + abfss.getAbfsConfiguration().setEnablePread(oldPreadEnabled); + IOUtils.cleanupWithLogger(LOG, out, in); + } + } + /** * Method to assert the initial values of the statistics. * diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPread.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPread.java new file mode 100644 index 0000000000000..ad08e0b74513f --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPread.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.bouncycastle.util.Arrays; +import org.junit.Test; + +public class ITestAbfsPread extends AbstractAbfsIntegrationTest { + + public ITestAbfsPread() throws Exception { + } + + @Test + public void testPread() throws IOException { + describe("Testing preads in AbfsInputStream"); + Path dest = path("ITestAbfsPread"); + + int dataSize = 100; + byte[] data = ContractTestUtils.dataset(dataSize, 'a', 26); + ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length, dataSize, true); + int bytesToRead = 10; + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + assertTrue( + "unexpected stream type " + inputStream.getWrappedStream().getClass().getSimpleName(), + inputStream.getWrappedStream() instanceof AbfsInputStream); + byte[] readBuffer = new byte[bytesToRead]; + int pos = 0; + assertEquals("AbfsInputStream#read did not read the correct number of bytes", + bytesToRead, inputStream.read(pos, readBuffer, 0, bytesToRead)); + assertTrue("AbfsInputStream#read did not read the correct bytes", + Arrays.areEqual(Arrays.copyOfRange(data, pos, pos + bytesToRead), readBuffer)); + // Read only 10 bytes from offset 0. But by default it will do the seek and read where the + // entire 100 bytes get read into the AbfsInputStream buffer. + assertArrayEquals("AbfsInputStream#read did not read more data into its buffer", data, + Arrays.copyOfRange(((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 0, + dataSize)); + } + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + AbfsInputStream abfsIs = (AbfsInputStream) inputStream.getWrappedStream(); + abfsIs.setEnablePread(true); + byte[] readBuffer = new byte[bytesToRead]; + int pos = 10; + assertEquals("AbfsInputStream#read did not read the correct number of bytes", + bytesToRead, inputStream.read(pos, readBuffer, 0, bytesToRead)); + assertTrue("AbfsInputStream#read did not read the correct bytes", + Arrays.areEqual(Arrays.copyOfRange(data, pos, pos + bytesToRead), readBuffer)); + // Read only 10 bytes from offset 10. This time, as pread is enabled, it will only read the + // exact bytes as requested and no data will get read into the AbfsInputStream#buffer. Infact + // the buffer won't even get initialized. + assertNull("AbfsInputStream pread caused the internal buffer creation", abfsIs.getBuffer()); + // Now make a seek and read so that internal buffer gets created + inputStream.seek(0); + inputStream.read(readBuffer); + // This read would have fetched all 100 bytes into internal buffer. + assertArrayEquals("AbfsInputStream#read did not read more data into its buffer", data, + Arrays.copyOfRange(((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 0, + dataSize)); + // Now again do pos read and make sure not any extra data being fetched. + resetBuffer(abfsIs.getBuffer()); + pos = 0; + assertEquals("AbfsInputStream#read did not read the correct number of bytes", bytesToRead, + inputStream.read(pos, readBuffer, 0, bytesToRead)); + assertTrue("AbfsInputStream#read did not read the correct bytes", + Arrays.areEqual(Arrays.copyOfRange(data, pos, pos + bytesToRead), readBuffer)); + assertFalse("AbfsInputStream#read read more data into its buffer than expected", + Arrays.areEqual(data, Arrays.copyOfRange(abfsIs.getBuffer(), 0, dataSize))); + } + } + + private void resetBuffer(byte[] buf) { + for (int i = 0; i < buf.length; i++) { + buf[i] = (byte) 0; + } + } +}