From aa4b43ce7a85cadfd2e392ad7fd4ec7897db13a6 Mon Sep 17 00:00:00 2001 From: Anoop Sam John Date: Fri, 25 Jun 2021 18:23:29 +0530 Subject: [PATCH 1/7] HADOOP-17770 WASB : Support disabling buffered reads in positional reads --- .../fs/azure/AzureNativeFileSystemStore.java | 31 +++++++++-- .../hadoop/fs/azure/BlockBlobInputStream.java | 45 ++++++++++++++-- .../fs/azure/NativeAzureFileSystem.java | 54 ++++++++++++++++++- .../fs/azure/NativeFileSystemStore.java | 4 ++ 4 files changed, 123 insertions(+), 11 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index 69faebb437710..39127712f8408 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -41,6 +41,7 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import org.apache.commons.lang3.StringUtils; @@ -241,6 +242,16 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { */ public static final String KEY_ENABLE_FLAT_LISTING = "fs.azure.flatlist.enable"; + /** + * Optional config to enable a lock free pread which will bypass buffer in + * BlockBlobInputStream. + * This is not a config which can be set at cluster level. It can be used as + * an option on FutureDataInputStreamBuilder. + * @see FileSystem#openFile(org.apache.hadoop.fs.Path) + */ + public static final String FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE = + "fs.azure.block.blob.buffered.pread.disable"; + /** * The set of directories where we should apply atomic folder rename * synchronized with createNonRecursive. @@ -1591,8 +1602,8 @@ private OutputStream openOutputStream(final CloudBlobWrapper blob) * Opens a new input stream for the given blob (page or block blob) * to read its data. */ - private InputStream openInputStream(CloudBlobWrapper blob) - throws StorageException, IOException { + private InputStream openInputStream(CloudBlobWrapper blob, + Optional options) throws StorageException, IOException { if (blob instanceof CloudBlockBlobWrapper) { LOG.debug("Using stream seek algorithm {}", inputStreamVersion); switch(inputStreamVersion) { @@ -1600,9 +1611,13 @@ private InputStream openInputStream(CloudBlobWrapper blob) return blob.openInputStream(getDownloadOptions(), getInstrumentedContext(isConcurrentOOBAppendAllowed())); case 2: + boolean bufferedPreadDisabled = options.map(c -> c + .getBoolean(FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, false)) + .orElse(false); return new BlockBlobInputStream((CloudBlockBlobWrapper) blob, getDownloadOptions(), - getInstrumentedContext(isConcurrentOOBAppendAllowed())); + getInstrumentedContext(isConcurrentOOBAppendAllowed()), + bufferedPreadDisabled); default: throw new IOException("Unknown seek algorithm: " + inputStreamVersion); } @@ -2290,6 +2305,12 @@ public InputStream retrieve(String key) throws AzureException, IOException { @Override public InputStream retrieve(String key, long startByteOffset) throws AzureException, IOException { + return retrieve(key, startByteOffset, Optional.empty()); + } + + @Override + public InputStream retrieve(String key, long startByteOffset, + Optional options) throws AzureException, IOException { try { // Check if a session exists, if not create a session with the // Azure storage server. @@ -2301,7 +2322,7 @@ public InputStream retrieve(String key, long startByteOffset) } checkContainer(ContainerAccessType.PureRead); - InputStream inputStream = openInputStream(getBlobReference(key)); + InputStream inputStream = openInputStream(getBlobReference(key), options); if (startByteOffset > 0) { // Skip bytes and ignore return value. This is okay // because if you try to skip too far you will be positioned @@ -2852,7 +2873,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease, OutputStream opStream = null; try { if (srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){ - ipStream = openInputStream(srcBlob); + ipStream = openInputStream(srcBlob, Optional.empty()); opStream = openOutputStream(dstBlob); byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE]; int len; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java index c37b2bec6ecd7..02ee8d6eac62c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java @@ -28,7 +28,7 @@ import com.microsoft.azure.storage.blob.BlobRequestOptions; import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper; /** @@ -36,10 +36,11 @@ * random access and seek. Random access performance is improved by several * orders of magnitude. */ -final class BlockBlobInputStream extends InputStream implements Seekable { +final class BlockBlobInputStream extends FSInputStream { private final CloudBlockBlobWrapper blob; private final BlobRequestOptions options; private final OperationContext opContext; + private final boolean bufferedPreadDisabled; private InputStream blobInputStream = null; private int minimumReadSizeInBytes = 0; private long streamPositionAfterLastRead = -1; @@ -62,12 +63,13 @@ final class BlockBlobInputStream extends InputStream implements Seekable { * @param opContext the blob operation context. * @throws IOException IO failure */ - BlockBlobInputStream(CloudBlockBlobWrapper blob, - BlobRequestOptions options, - OperationContext opContext) throws IOException { + BlockBlobInputStream(CloudBlockBlobWrapper blob, BlobRequestOptions options, + OperationContext opContext, boolean bufferedPreadDisabled) + throws IOException { this.blob = blob; this.options = options; this.opContext = opContext; + this.bufferedPreadDisabled = bufferedPreadDisabled; this.minimumReadSizeInBytes = blob.getStreamMinimumReadSizeInBytes(); @@ -263,6 +265,39 @@ private int doNetworkRead(byte[] buffer, int offset, int len) } } + @Override + public int read(long position, byte[] buffer, int offset, int length) + throws IOException { + synchronized (this) { + checkState(); + } + if (!bufferedPreadDisabled) { + // This will do a seek + read in which the streamBuffer will get used. + return super.read(position, buffer, offset, length); + } + validatePositionedReadArgs(position, buffer, offset, length); + if (length == 0) { + return 0; + } + if (position >= streamLength) { + throw new EOFException("position is beyond stream capacity"); + } + MemoryOutputStream os = new MemoryOutputStream(buffer, offset, length); + long bytesToRead = Math.min(minimumReadSizeInBytes, + Math.min(os.capacity(), streamLength - position)); + try { + blob.downloadRange(position, bytesToRead, os, options, opContext); + } catch (StorageException e) { + throw new IOException(e); + } + int bytesRead = os.size(); + if (bytesRead == 0) { + // This may happen if the blob was modified after the length was obtained. + throw new EOFException("End of stream reached unexpectedly."); + } + return bytesRead; + } + /** * Reads up to len bytes of data from the input stream into an * array of bytes. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index 48ef495d7b7ef..f49e89d90c53e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -33,11 +33,14 @@ import java.util.EnumSet; import java.util.TimeZone; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Stack; import java.util.HashMap; @@ -61,6 +64,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; @@ -70,6 +74,8 @@ import org.apache.hadoop.fs.azure.security.Constants; import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager; import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager; +import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; @@ -79,6 +85,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; +import org.apache.hadoop.util.LambdaUtils; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; @@ -915,6 +922,33 @@ public synchronized int read(byte[] b, int off, int len) throws FileNotFoundExce } } + @Override + public int read(long position, byte[] buffer, int offset, int length) + throws IOException { + if (in instanceof PositionedReadable) { + try { + int result = ((PositionedReadable) this.in).read(position, buffer, + offset, length); + if (null != statistics && result > 0) { + statistics.incrementBytesRead(result); + } + return result; + } catch (IOException e) { + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); + if (innerException instanceof StorageException) { + LOG.error("Encountered Storage Exception for read on Blob : {}" + + " Exception details: {} Error Code : {}", + key, e, ((StorageException) innerException).getErrorCode()); + if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { + throw new FileNotFoundException(String.format("%s is not found", key)); + } + } + throw e; + } + } + return super.read(position, buffer, offset, length); + } + @Override public synchronized void close() throws IOException { if (!closed) { @@ -3043,6 +3077,12 @@ public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws I @Override public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException { + return open(f, bufferSize, Optional.empty()); + } + + private FSDataInputStream open(Path f, int bufferSize, + Optional options) + throws FileNotFoundException, IOException { LOG.debug("Opening file: {}", f.toString()); @@ -3077,7 +3117,7 @@ public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundExcepti InputStream inputStream; try { - inputStream = store.retrieve(key); + inputStream = store.retrieve(key, 0, options); } catch(Exception ex) { Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); @@ -3094,6 +3134,18 @@ public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundExcepti new NativeAzureFsInputStream(inputStream, key, meta.getLen()), bufferSize)); } + @Override + protected CompletableFuture openFileWithOptions(Path path, + OpenFileParameters parameters) throws IOException { + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( + parameters.getMandatoryKeys(), + Collections.emptySet(), + "for " + path); + return LambdaUtils.eval( + new CompletableFuture<>(), () -> + open(path, parameters.getBufferSize(), Optional.of(parameters.getOptions()))); + } + @Override public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java index 0944b1b0987c1..91aad992a1f19 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.net.URI; import java.util.Date; +import java.util.Optional; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -50,6 +51,9 @@ void storeEmptyFolder(String key, PermissionStatus permissionStatus) InputStream retrieve(String key, long byteRangeStart) throws IOException; + InputStream retrieve(String key, long byteRangeStart, + Optional options) throws IOException; + DataOutputStream storefile(String keyEncoded, PermissionStatus permissionStatus, String key) throws AzureException; From 9be6c87ba91e5b6814954e06d9b43a8f6bcac87f Mon Sep 17 00:00:00 2001 From: Anoop Sam John Date: Fri, 9 Jul 2021 21:25:35 +0530 Subject: [PATCH 2/7] HADOOP-17770 WASB : Support disabling buffered reads in positional reads --- .../fs/azure/ITestBlockBlobInputStream.java | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java index 07a13df11f3ce..f9933092dced1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest; import org.apache.hadoop.fs.azure.integration.AzureTestUtils; @@ -306,6 +307,60 @@ private void verifyConsistentReads(FSDataInputStream inputStreamV1, assertArrayEquals("Mismatch in read data", bufferV1, bufferV2); } + @Test + public void test_202_PosReadTest() throws Exception { + assumeHugeFileExists(); + FutureDataInputStreamBuilder builder = accountUsingInputStreamV2 + .getFileSystem().openFile(TEST_FILE_PATH); + builder.opt(AzureNativeFileSystemStore.FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, true); + try ( + FSDataInputStream inputStreamV1 + = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH); + FSDataInputStream inputStreamV2_1 + = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH); + FSDataInputStream inputStreamV2_2 = builder.build().get(); + ) { + final int bufferSize = 4 * KILOBYTE; + byte[] bufferV1 = new byte[bufferSize]; + byte[] bufferV2_1 = new byte[bufferSize]; + byte[] bufferV2_2 = new byte[bufferSize]; + + verifyConsistentReads(inputStreamV1, inputStreamV2_1, inputStreamV2_2, 0, + bufferV1, bufferV2_1, bufferV2_2); + + int pos = 2 * KILOBYTE; + verifyConsistentReads(inputStreamV1, inputStreamV2_1, inputStreamV2_2, pos, + bufferV1, bufferV2_1, bufferV2_2); + + pos = 10 * KILOBYTE; + verifyConsistentReads(inputStreamV1, inputStreamV2_1, inputStreamV2_2, pos, + bufferV1, bufferV2_1, bufferV2_2); + + pos = 4100 * KILOBYTE; + verifyConsistentReads(inputStreamV1, inputStreamV2_1, inputStreamV2_2, pos, + bufferV1, bufferV2_1, bufferV2_2); + } + } + + private void verifyConsistentReads(FSDataInputStream inputStreamV1, + FSDataInputStream inputStreamV2_1, FSDataInputStream inputStreamV2_2, + int pos, byte[] bufferV1, byte[] bufferV2_1, byte[] bufferV2_2) + throws IOException { + int size = bufferV1.length; + int numBytesReadV1 = inputStreamV1.read(pos, bufferV1, 0, size); + assertEquals("Bytes read from V1 stream", size, numBytesReadV1); + + int numBytesReadV2 = inputStreamV2_1.read(pos, bufferV2_1, 0, size); + assertEquals("Bytes read from V2 stream", size, numBytesReadV2); + + int numBytesReadV2_2 = inputStreamV2_2.read(pos, bufferV2_2, 0, size); + assertEquals("Bytes read from V2 stream (buffered pread disabled)", size, + numBytesReadV2_2); + + assertArrayEquals("Mismatch in read data", bufferV1, bufferV2_1); + assertArrayEquals("Mismatch in read data", bufferV2_1, bufferV2_2); + } + /** * Validates the implementation of InputStream.markSupported. * @throws IOException From 1477678d0623429ec7c86a6098f6781fccd04ab4 Mon Sep 17 00:00:00 2001 From: Anoop Sam John Date: Sat, 10 Jul 2021 21:43:03 +0530 Subject: [PATCH 3/7] HADOOP-17770 WASB : Support disabling buffered reads in positional reads --- hadoop-tools/hadoop-azure/src/site/markdown/index.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md index 11d0a18b5585d..2af6b498a2743 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md @@ -545,6 +545,17 @@ The maximum number of entries that that cache can hold can be customized using t ``` +### Performance optimization configurations + +`fs.azure.block.blob.buffered.pread.disable`: By default the positional read API will do a +seek and read on input stream. This read will fill the buffer cache in +BlockBlobInputStream. If this configuration is true it will skip usage of buffer and do a +lock free call for reading from blob. This optimization is very much helpful for HBase kind +of short random read over a shared InputStream instance. +Note: This is not a config which can be set at cluster level. It can be used as +an option on FutureDataInputStreamBuilder. +See FileSystem#openFile(Path path) + ## Further Reading * [Testing the Azure WASB client](testing_azure.html). From f32c34e0d81b5045a70f4b171ce6b9bad24c90e8 Mon Sep 17 00:00:00 2001 From: Anoop Sam John Date: Sun, 11 Jul 2021 08:47:52 +0530 Subject: [PATCH 4/7] HADOOP-17770 WASB : Support disabling buffered reads in positional reads --- .../fs/azure/ITestBlockBlobInputStream.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java index f9933092dced1..62e461940aa16 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java @@ -316,49 +316,49 @@ public void test_202_PosReadTest() throws Exception { try ( FSDataInputStream inputStreamV1 = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH); - FSDataInputStream inputStreamV2_1 + FSDataInputStream inputStreamV2 = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH); - FSDataInputStream inputStreamV2_2 = builder.build().get(); + FSDataInputStream inputStreamV2NoBuffer = builder.build().get(); ) { final int bufferSize = 4 * KILOBYTE; byte[] bufferV1 = new byte[bufferSize]; - byte[] bufferV2_1 = new byte[bufferSize]; - byte[] bufferV2_2 = new byte[bufferSize]; + byte[] bufferV2 = new byte[bufferSize]; + byte[] bufferV2NoBuffer = new byte[bufferSize]; - verifyConsistentReads(inputStreamV1, inputStreamV2_1, inputStreamV2_2, 0, - bufferV1, bufferV2_1, bufferV2_2); + verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, 0, + bufferV1, bufferV2, bufferV2NoBuffer); int pos = 2 * KILOBYTE; - verifyConsistentReads(inputStreamV1, inputStreamV2_1, inputStreamV2_2, pos, - bufferV1, bufferV2_1, bufferV2_2); + verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos, + bufferV1, bufferV2, bufferV2NoBuffer); pos = 10 * KILOBYTE; - verifyConsistentReads(inputStreamV1, inputStreamV2_1, inputStreamV2_2, pos, - bufferV1, bufferV2_1, bufferV2_2); + verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos, + bufferV1, bufferV2, bufferV2NoBuffer); pos = 4100 * KILOBYTE; - verifyConsistentReads(inputStreamV1, inputStreamV2_1, inputStreamV2_2, pos, - bufferV1, bufferV2_1, bufferV2_2); + verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos, + bufferV1, bufferV2, bufferV2NoBuffer); } } private void verifyConsistentReads(FSDataInputStream inputStreamV1, - FSDataInputStream inputStreamV2_1, FSDataInputStream inputStreamV2_2, - int pos, byte[] bufferV1, byte[] bufferV2_1, byte[] bufferV2_2) + FSDataInputStream inputStreamV2, FSDataInputStream inputStreamV2NoBuffer, + int pos, byte[] bufferV1, byte[] bufferV2, byte[] bufferV2NoBuffer) throws IOException { int size = bufferV1.length; int numBytesReadV1 = inputStreamV1.read(pos, bufferV1, 0, size); assertEquals("Bytes read from V1 stream", size, numBytesReadV1); - int numBytesReadV2 = inputStreamV2_1.read(pos, bufferV2_1, 0, size); + int numBytesReadV2 = inputStreamV2.read(pos, bufferV2, 0, size); assertEquals("Bytes read from V2 stream", size, numBytesReadV2); - int numBytesReadV2_2 = inputStreamV2_2.read(pos, bufferV2_2, 0, size); + int numBytesReadV2_2 = inputStreamV2NoBuffer.read(pos, bufferV2NoBuffer, 0, size); assertEquals("Bytes read from V2 stream (buffered pread disabled)", size, numBytesReadV2_2); - assertArrayEquals("Mismatch in read data", bufferV1, bufferV2_1); - assertArrayEquals("Mismatch in read data", bufferV2_1, bufferV2_2); + assertArrayEquals("Mismatch in read data", bufferV1, bufferV2); + assertArrayEquals("Mismatch in read data", bufferV2, bufferV2NoBuffer); } /** From fa0f0d12c364f2ac680cf718c3d76048e4f83a49 Mon Sep 17 00:00:00 2001 From: Anoop Sam John Date: Sun, 11 Jul 2021 20:09:20 +0530 Subject: [PATCH 5/7] HADOOP-17770 WASB : Support disabling buffered reads in positional reads --- .../org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java index 62e461940aa16..bacc7c0815d24 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java @@ -352,7 +352,7 @@ private void verifyConsistentReads(FSDataInputStream inputStreamV1, int numBytesReadV2 = inputStreamV2.read(pos, bufferV2, 0, size); assertEquals("Bytes read from V2 stream", size, numBytesReadV2); - + int numBytesReadV2_2 = inputStreamV2NoBuffer.read(pos, bufferV2NoBuffer, 0, size); assertEquals("Bytes read from V2 stream (buffered pread disabled)", size, numBytesReadV2_2); From 3a6b55de9a96d1b5c6fbd804f1ee59cc50363206 Mon Sep 17 00:00:00 2001 From: Anoop Sam John Date: Mon, 12 Jul 2021 08:42:25 +0530 Subject: [PATCH 6/7] HADOOP-17770 WASB : Support disabling buffered reads in positional reads --- .../apache/hadoop/fs/azure/NativeAzureFileSystem.java | 10 ++++++++++ .../hadoop/fs/azure/ITestBlockBlobInputStream.java | 5 +++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index f49e89d90c53e..f1c486e655e1a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -925,6 +925,16 @@ public synchronized int read(byte[] b, int off, int len) throws FileNotFoundExce @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { + // SpotBugs reports bug type IS2_INCONSISTENT_SYNC here. + // This report is not valid here. + // 'this.in' is instance of BlockBlobInputStream and read(long, byte[], int, int) + // calls it's Super class method when 'fs.azure.block.blob.buffered.pread.disable' + // is configured false. Super class FSInputStream's implementation is having + // proper synchronization. + // When 'fs.azure.block.blob.buffered.pread.disable' is true, we want a lock free + // implementation of blob read. Here we don't use any of the InputStream's + // shared resource (buffer) and also don't change any cursor position etc. + // So its safe to go with unsynchronized way of read. if (in instanceof PositionedReadable) { try { int result = ((PositionedReadable) this.in).read(position, buffer, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java index bacc7c0815d24..cea11c0380e31 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java @@ -353,9 +353,10 @@ private void verifyConsistentReads(FSDataInputStream inputStreamV1, int numBytesReadV2 = inputStreamV2.read(pos, bufferV2, 0, size); assertEquals("Bytes read from V2 stream", size, numBytesReadV2); - int numBytesReadV2_2 = inputStreamV2NoBuffer.read(pos, bufferV2NoBuffer, 0, size); + int numBytesReadV2NoBuffer = inputStreamV2NoBuffer.read(pos, + bufferV2NoBuffer, 0, size); assertEquals("Bytes read from V2 stream (buffered pread disabled)", size, - numBytesReadV2_2); + numBytesReadV2NoBuffer); assertArrayEquals("Mismatch in read data", bufferV1, bufferV2); assertArrayEquals("Mismatch in read data", bufferV2, bufferV2NoBuffer); From 5707bfd481ca5f9bae3970f67364ff69f62f5e57 Mon Sep 17 00:00:00 2001 From: Anoop Sam John Date: Mon, 12 Jul 2021 10:34:20 +0530 Subject: [PATCH 7/7] HADOOP-17770 WASB : Support disabling buffered reads in positional reads --- .../java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index f1c486e655e1a..e9f0e784fc121 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -932,7 +932,7 @@ public int read(long position, byte[] buffer, int offset, int length) // is configured false. Super class FSInputStream's implementation is having // proper synchronization. // When 'fs.azure.block.blob.buffered.pread.disable' is true, we want a lock free - // implementation of blob read. Here we don't use any of the InputStream's + // implementation of blob read. Here we don't use any of the InputStream's // shared resource (buffer) and also don't change any cursor position etc. // So its safe to go with unsynchronized way of read. if (in instanceof PositionedReadable) {