From a690a0caddfa1f81195dcde32fdbf965558ae3e2 Mon Sep 17 00:00:00 2001 From: sumangala Date: Tue, 4 May 2021 17:33:11 +0530 Subject: [PATCH 01/20] use filestatus for open + prelim tests --- .../fs/azurebfs/AzureBlobFileSystem.java | 28 +++++-- .../fs/azurebfs/AzureBlobFileSystemStore.java | 32 +++++-- .../hadoop/fs/azurebfs/TestAbfsOpenFile.java | 84 +++++++++++++++++++ 3 files changed, 129 insertions(+), 15 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOpenFile.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 2a8d58678a203..498964875236b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -34,6 +34,7 @@ import java.util.EnumSet; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -185,17 +186,17 @@ public URI getUri() { public FSDataInputStream open(final Path path, final int bufferSize) throws IOException { LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize); // bufferSize is unused. - return open(path, Optional.empty()); + return open(path, new OpenFileParameters()); } private FSDataInputStream open(final Path path, - final Optional options) throws IOException { + final OpenFileParameters parameters) throws IOException { statIncrement(CALL_OPEN); Path qualifiedPath = makeQualified(path); try { InputStream inputStream = abfsStore.openFileForRead(qualifiedPath, - options, statistics); + parameters, statistics); return new FSDataInputStream(inputStream); } catch(AzureBlobFileSystemException ex) { checkException(path, ex); @@ -203,17 +204,30 @@ private FSDataInputStream open(final Path path, } } + /** + * Takes config and other options through OpenFileParameters. Ensure that + * FileStatus entered is up-to-date, as it will be used to create the + * InputStream (with info such as contentLength, eTag) + * @param path The location of file to be opened + * @param parameters OpenFileParameters instance; can hold FileStatus, + * Configuration, bufferSize and mandatoryKeys + * {@link org.apache.hadoop.fs.impl.OpenFileParameters} + */ @Override - protected CompletableFuture openFileWithOptions( - final Path path, final OpenFileParameters parameters) throws IOException { + public CompletableFuture openFileWithOptions( + final Path path, final OpenFileParameters parameters) { LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path); + Set mandatoryKeys = parameters.getMandatoryKeys(); + if (mandatoryKeys == null) { + mandatoryKeys = Collections.emptySet(); + } AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( - parameters.getMandatoryKeys(), + mandatoryKeys, Collections.emptySet(), "for " + path); return LambdaUtils.eval( new CompletableFuture<>(), () -> - open(path, Optional.of(parameters.getOptions()))); + open(path, parameters)); } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index fa7e12bc80e28..eff4380e8c033 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -52,6 +52,7 @@ import java.util.WeakHashMap; import java.util.concurrent.ExecutionException; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; @@ -128,6 +129,9 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION; @@ -647,11 +651,11 @@ public void createDirectory(final Path path, final FsPermission permission, fina public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) throws AzureBlobFileSystemException { - return openFileForRead(path, Optional.empty(), statistics); + return openFileForRead(path, new OpenFileParameters(), statistics); } public AbfsInputStream openFileForRead(final Path path, - final Optional options, + final OpenFileParameters parameters, final FileSystem.Statistics statistics) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) { @@ -659,14 +663,26 @@ public AbfsInputStream openFileForRead(final Path path, client.getFileSystem(), path); + String resourceType; + long contentLength; + String eTag; String relativePath = getRelativePath(path); + Configuration options = null; - final AbfsRestOperation op = client.getPathStatus(relativePath, false); - perfInfo.registerResult(op.getResult()); + try { + options = parameters.getOptions(); + FileStatus fileStatus = parameters.getStatus(); + resourceType = fileStatus.isFile() ? FILE : DIRECTORY; + contentLength = fileStatus.getLen(); + eTag = ((VersionedFileStatus) fileStatus).getVersion(); + } catch (Exception ex) { + final AbfsRestOperation op = client.getPathStatus(relativePath, false); + perfInfo.registerResult(op.getResult()); - final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + } if (parseIsDirectory(resourceType)) { throw new AbfsRestOperationException( @@ -681,7 +697,7 @@ public AbfsInputStream openFileForRead(final Path path, // Add statistics for InputStream return new AbfsInputStream(client, statistics, relativePath, contentLength, - populateAbfsInputStreamContext(options), + populateAbfsInputStreamContext(Optional.ofNullable(options)), eTag); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOpenFile.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOpenFile.java new file mode 100644 index 0000000000000..fcd33d40a00f3 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOpenFile.java @@ -0,0 +1,84 @@ +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.impl.OpenFileParameters; + +public class TestAbfsOpenFile extends AbstractAbfsIntegrationTest { + + public TestAbfsOpenFile() throws Exception { + } + + @Test + public void testOpen() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + Path path = new Path("/testFile"); + fs.create(path); + fs.open(path); + } + + @Test + public void testOpenWithEmptyOpenFileParams() + throws IOException, ExecutionException, InterruptedException { + AzureBlobFileSystem fs = getFileSystem(); + Path path = new Path("/testFile"); + fs.create(path); + fs.openFileWithOptions(path, + new OpenFileParameters()).get(); + } + + @Test + public void testOpenWithValidFileStatus() + throws IOException, ExecutionException, InterruptedException { + AzureBlobFileSystem fs = getFileSystem(); + Path path = new Path("/testFile"); + fs.create(path); + FileStatus fileStatus = fs.getFileStatus(path); + FSDataOutputStream out = fs.append(path); + out.write(1); + out.close(); + + // should not invoke GetPathStatus + FSDataInputStream in = fs.openFileWithOptions(path, + new OpenFileParameters().withStatus(fileStatus)).get(); + + assertEquals("One byte should be written to file", 1, + fs.getFileStatus(path).getLen()); + assertEquals( + "InputStream was created with old fileStatus, so contentLength " + + "reflected should be 0 post write", 0, + ((AbfsInputStream) in.getWrappedStream()).length()); + } + + @Test + public void testOpenWithOptions() + throws IOException, ExecutionException, InterruptedException { + AzureBlobFileSystem fs = getFileSystem(); + Path path = new Path("/testFile"); + fs.create(path); + Configuration testOptions = new Configuration(); + fs.openFileWithOptions(path, + new OpenFileParameters().withOptions(testOptions)).get(); + } + + @Test + public void testNonVersionedFileStatus() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + Path path = new Path("/testFile"); + fs.create(path); + FileStatus fileStatus = new FileStatus(); // no version specified + //Request with FileStatus without eTag should succeed using GetPathStatus + fs.openFileWithOptions(path, + new OpenFileParameters().withStatus(fileStatus)); + } + +} From 6f123194952f2473a7f7335305c7af633c2d0d5e Mon Sep 17 00:00:00 2001 From: sumangala Date: Tue, 4 May 2021 17:47:04 +0530 Subject: [PATCH 02/20] fix import --- .../apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index eff4380e8c033..57da28ce29dcf 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -52,7 +52,6 @@ import java.util.WeakHashMap; import java.util.concurrent.ExecutionException; -import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; @@ -115,6 +114,7 @@ import org.apache.hadoop.fs.azurebfs.utils.CRC64; import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -130,7 +130,6 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; From b0834b08bb5f9e557dbc765d547726b8c3eac883 Mon Sep 17 00:00:00 2001 From: sumangala Date: Wed, 5 May 2021 23:34:27 +0530 Subject: [PATCH 03/20] move tests, spy gfs count --- .../fs/azurebfs/AzureBlobFileSystem.java | 1 - .../fs/azurebfs/AzureBlobFileSystemStore.java | 48 +++++------ .../hadoop/fs/azurebfs/TestAbfsOpenFile.java | 84 ------------------- .../services/TestAbfsInputStream.java | 67 ++++++++++++++- 4 files changed, 88 insertions(+), 112 deletions(-) delete mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOpenFile.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 498964875236b..136f780855b7b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -33,7 +33,6 @@ import java.util.Collections; import java.util.EnumSet; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 57da28ce29dcf..1ae72ed5dfabb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -649,18 +649,17 @@ public void createDirectory(final Path path, final FsPermission permission, fina public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) - throws AzureBlobFileSystemException { + throws IOException { return openFileForRead(path, new OpenFileParameters(), statistics); } public AbfsInputStream openFileForRead(final Path path, final OpenFileParameters parameters, - final FileSystem.Statistics statistics) - throws AzureBlobFileSystemException { - try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) { + final FileSystem.Statistics statistics) throws IOException { + try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", + "getPathStatus")) { LOG.debug("openFileForRead filesystem: {} path: {}", - client.getFileSystem(), - path); + client.getFileSystem(), path); String resourceType; long contentLength; @@ -668,36 +667,33 @@ public AbfsInputStream openFileForRead(final Path path, String relativePath = getRelativePath(path); Configuration options = null; - try { + FileStatus fileStatus = null; + if (parameters != null) { options = parameters.getOptions(); - FileStatus fileStatus = parameters.getStatus(); - resourceType = fileStatus.isFile() ? FILE : DIRECTORY; - contentLength = fileStatus.getLen(); - eTag = ((VersionedFileStatus) fileStatus).getVersion(); - } catch (Exception ex) { - final AbfsRestOperation op = client.getPathStatus(relativePath, false); - perfInfo.registerResult(op.getResult()); - - resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + fileStatus = parameters.getStatus(); } + if (fileStatus == null) { + fileStatus = getFileStatus(new Path(relativePath)); + } + + resourceType = fileStatus.isFile() ? FILE : DIRECTORY; + contentLength = fileStatus.getLen(); + eTag = ((VersionedFileStatus) fileStatus).getVersion(); if (parseIsDirectory(resourceType)) { throw new AbfsRestOperationException( - AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), - AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), - "openFileForRead must be used with files and not directories", - null); + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + "openFileForRead must be used with files and not directories", + null); } perfInfo.registerSuccess(true); // Add statistics for InputStream - return new AbfsInputStream(client, statistics, - relativePath, contentLength, - populateAbfsInputStreamContext(Optional.ofNullable(options)), - eTag); + return new AbfsInputStream(client, statistics, relativePath, + contentLength, + populateAbfsInputStreamContext(Optional.ofNullable(options)), eTag); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOpenFile.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOpenFile.java deleted file mode 100644 index fcd33d40a00f3..0000000000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOpenFile.java +++ /dev/null @@ -1,84 +0,0 @@ -package org.apache.hadoop.fs.azurebfs; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; - -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; -import org.apache.hadoop.fs.impl.OpenFileParameters; - -public class TestAbfsOpenFile extends AbstractAbfsIntegrationTest { - - public TestAbfsOpenFile() throws Exception { - } - - @Test - public void testOpen() throws IOException { - AzureBlobFileSystem fs = getFileSystem(); - Path path = new Path("/testFile"); - fs.create(path); - fs.open(path); - } - - @Test - public void testOpenWithEmptyOpenFileParams() - throws IOException, ExecutionException, InterruptedException { - AzureBlobFileSystem fs = getFileSystem(); - Path path = new Path("/testFile"); - fs.create(path); - fs.openFileWithOptions(path, - new OpenFileParameters()).get(); - } - - @Test - public void testOpenWithValidFileStatus() - throws IOException, ExecutionException, InterruptedException { - AzureBlobFileSystem fs = getFileSystem(); - Path path = new Path("/testFile"); - fs.create(path); - FileStatus fileStatus = fs.getFileStatus(path); - FSDataOutputStream out = fs.append(path); - out.write(1); - out.close(); - - // should not invoke GetPathStatus - FSDataInputStream in = fs.openFileWithOptions(path, - new OpenFileParameters().withStatus(fileStatus)).get(); - - assertEquals("One byte should be written to file", 1, - fs.getFileStatus(path).getLen()); - assertEquals( - "InputStream was created with old fileStatus, so contentLength " - + "reflected should be 0 post write", 0, - ((AbfsInputStream) in.getWrappedStream()).length()); - } - - @Test - public void testOpenWithOptions() - throws IOException, ExecutionException, InterruptedException { - AzureBlobFileSystem fs = getFileSystem(); - Path path = new Path("/testFile"); - fs.create(path); - Configuration testOptions = new Configuration(); - fs.openFileWithOptions(path, - new OpenFileParameters().withOptions(testOptions)).get(); - } - - @Test - public void testNonVersionedFileStatus() throws IOException { - AzureBlobFileSystem fs = getFileSystem(); - Path path = new Path("/testFile"); - fs.create(path); - FileStatus fileStatus = new FileStatus(); // no version specified - //Request with FileStatus without eTag should succeed using GetPathStatus - fs.openFileWithOptions(path, - new OpenFileParameters().withStatus(fileStatus)); - } - -} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index cbf3d6a2a68ee..077124359f3ae 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -23,25 +23,32 @@ import org.junit.Assert; import org.junit.Test; import java.util.Arrays; +import java.util.Random; import org.assertj.core.api.Assertions; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken; +import org.apache.hadoop.fs.impl.OpenFileParameters; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.description; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -184,6 +191,64 @@ public TestAbfsInputStream() throws Exception { ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD); } + @Test + public void testOpenFileWithOptions() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + String testFolder = "/testFolder"; + String testFile = testFolder + "/testFile"; + fs.mkdirs(new Path(testFolder)); + byte[] buffer = new byte[5]; + new Random().nextBytes(buffer); + for (int i = 0; i<3; i++) { + Path file = new Path(testFile + i); + fs.create(file); + FSDataOutputStream out = fs.append(file); + out.write(buffer); + out.close(); + } + + // open with fileStatus from GetPathStatus + Path filePath = new Path(testFile + "0"); + FileStatus fileStatus = fs.getFileStatus(filePath); + FSDataInputStream in = fs.openFileWithOptions(filePath, + new OpenFileParameters().withStatus(fileStatus)).get(); + byte[] readBuf = new byte[5]; + in.read(readBuf); + assertArrayEquals( + "Open with FileStatus from GetPathStatus: Incorrect read data", buffer, + readBuf); + + // open with fileStatus from ListStatus + FileStatus[] fileStatuses = fs.listStatus(new Path(testFolder)); + for (int i = 0; i < fileStatuses.length; i++) { + in = fs.openFileWithOptions(new Path(testFile + i), + new OpenFileParameters().withStatus(fileStatuses[i])).get(); + in.read(readBuf); + assertArrayEquals( + "Open with fileStatus from ListStatus: Incorrect read data", buffer, + readBuf); + } + + // verify GetPathStatus not invoked when FileStatus is provided + AzureBlobFileSystemStore store = new AzureBlobFileSystemStore(fs.getUri(), + fs.isSecureScheme(), getRawConfiguration(), + new AbfsCountersImpl(fs.getUri())); + AzureBlobFileSystemStore mockStore = spy(store); + + mockStore.openFileForRead(new Path(testFile + "2"), + new OpenFileParameters().withStatus(fileStatuses[2]), null); + verify(mockStore, times(0).description( + "GetPathStatus should not be invoked when FileStatus is provided")) + .getFileStatus(any(Path.class)); + + // verify GetPathStatus invoked when FileStatus not provided + mockStore.openFileForRead(new Path(testFile + "2"), + new OpenFileParameters(), null); + verify(mockStore, times(1).description( + "GetPathStatus should be invoked when FileStatus not provided")) + .getFileStatus(any(Path.class)); + } + /** * This test expects AbfsInputStream to throw the exception that readAhead * thread received on read. The readAhead thread must be initiated from the From e1b79e8239c1569074193fd2fef95c056d4de0a4 Mon Sep 17 00:00:00 2001 From: sumangala Date: Thu, 6 May 2021 01:13:48 +0530 Subject: [PATCH 04/20] checkstyle --- .../hadoop/fs/azurebfs/services/TestAbfsInputStream.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 077124359f3ae..405be8f16ba2b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -44,7 +44,6 @@ import org.apache.hadoop.fs.impl.OpenFileParameters; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.description; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -247,6 +246,11 @@ public void testOpenFileWithOptions() throws Exception { verify(mockStore, times(1).description( "GetPathStatus should be invoked when FileStatus not provided")) .getFileStatus(any(Path.class)); + + // test open method that calls openFileWithOptions internally + in = fs.open(new Path(testFile + "0")); + in.read(readBuf); + assertArrayEquals("Open method: Incorrect read data", buffer, readBuf); } /** From 018bfe20ee076b212b8a9a162d66dea4c1e241c2 Mon Sep 17 00:00:00 2001 From: sumangala Date: Thu, 6 May 2021 11:15:35 +0530 Subject: [PATCH 05/20] minor changes --- .../fs/azurebfs/AzureBlobFileSystem.java | 4 ++-- .../fs/azurebfs/AzureBlobFileSystemStore.java | 22 ++++++++---------- .../services/TestAbfsInputStream.java | 23 +++++++++++-------- 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 136f780855b7b..f25ee9c9e0def 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -204,13 +204,13 @@ private FSDataInputStream open(final Path path, } /** - * Takes config and other options through OpenFileParameters. Ensure that + * Takes config and other options through + * {@link org.apache.hadoop.fs.impl.OpenFileParameters}. Ensure that * FileStatus entered is up-to-date, as it will be used to create the * InputStream (with info such as contentLength, eTag) * @param path The location of file to be opened * @param parameters OpenFileParameters instance; can hold FileStatus, * Configuration, bufferSize and mandatoryKeys - * {@link org.apache.hadoop.fs.impl.OpenFileParameters} */ @Override public CompletableFuture openFileWithOptions( diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 1ae72ed5dfabb..4bcf89d80c727 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -656,29 +656,26 @@ public AbfsInputStream openFileForRead(final Path path, public AbfsInputStream openFileForRead(final Path path, final OpenFileParameters parameters, final FileSystem.Statistics statistics) throws IOException { - try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", - "getPathStatus")) { + try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) { LOG.debug("openFileForRead filesystem: {} path: {}", client.getFileSystem(), path); - String resourceType; - long contentLength; - String eTag; - String relativePath = getRelativePath(path); - Configuration options = null; + Optional options = Optional.empty(); FileStatus fileStatus = null; if (parameters != null) { - options = parameters.getOptions(); + options = Optional.ofNullable(parameters.getOptions()); fileStatus = parameters.getStatus(); } + + String relativePath = getRelativePath(path); if (fileStatus == null) { fileStatus = getFileStatus(new Path(relativePath)); } - resourceType = fileStatus.isFile() ? FILE : DIRECTORY; - contentLength = fileStatus.getLen(); - eTag = ((VersionedFileStatus) fileStatus).getVersion(); + String resourceType = fileStatus.isFile() ? FILE : DIRECTORY; + long contentLength = fileStatus.getLen(); + String eTag = ((VersionedFileStatus) fileStatus).getVersion(); if (parseIsDirectory(resourceType)) { throw new AbfsRestOperationException( @@ -692,8 +689,7 @@ public AbfsInputStream openFileForRead(final Path path, // Add statistics for InputStream return new AbfsInputStream(client, statistics, relativePath, - contentLength, - populateAbfsInputStreamContext(Optional.ofNullable(options)), eTag); + contentLength, populateAbfsInputStreamContext(options), eTag); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 405be8f16ba2b..1efcd8ab73fa0 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -198,7 +198,7 @@ public void testOpenFileWithOptions() throws Exception { fs.mkdirs(new Path(testFolder)); byte[] buffer = new byte[5]; new Random().nextBytes(buffer); - for (int i = 0; i<3; i++) { + for (int i = 0; i < 3; i++) { Path file = new Path(testFile + i); fs.create(file); FSDataOutputStream out = fs.append(file); @@ -212,17 +212,19 @@ public void testOpenFileWithOptions() throws Exception { FSDataInputStream in = fs.openFileWithOptions(filePath, new OpenFileParameters().withStatus(fileStatus)).get(); byte[] readBuf = new byte[5]; - in.read(readBuf); + assertEquals("Incorrect number of bytes read", buffer.length, + in.read(readBuf)); assertArrayEquals( "Open with FileStatus from GetPathStatus: Incorrect read data", buffer, readBuf); // open with fileStatus from ListStatus FileStatus[] fileStatuses = fs.listStatus(new Path(testFolder)); - for (int i = 0; i < fileStatuses.length; i++) { + for (int i = 0; i < 3; i++) { in = fs.openFileWithOptions(new Path(testFile + i), new OpenFileParameters().withStatus(fileStatuses[i])).get(); - in.read(readBuf); + assertEquals("Incorrect number of bytes read", buffer.length, + in.read(readBuf)); assertArrayEquals( "Open with fileStatus from ListStatus: Incorrect read data", buffer, readBuf); @@ -237,7 +239,13 @@ public void testOpenFileWithOptions() throws Exception { mockStore.openFileForRead(new Path(testFile + "2"), new OpenFileParameters().withStatus(fileStatuses[2]), null); verify(mockStore, times(0).description( - "GetPathStatus should not be invoked when FileStatus is provided")) + "FileStatus [from ListPaths result] provided, GetFileStatus should not be invoked")) + .getFileStatus(any(Path.class)); + + mockStore.openFileForRead(new Path(testFile + "0"), + new OpenFileParameters().withStatus(fileStatus), null); + verify(mockStore, times(0).description( + "FileStatus [from GetPathStatus result] provided, GetFileStatus should not be invoked again")) .getFileStatus(any(Path.class)); // verify GetPathStatus invoked when FileStatus not provided @@ -246,11 +254,6 @@ public void testOpenFileWithOptions() throws Exception { verify(mockStore, times(1).description( "GetPathStatus should be invoked when FileStatus not provided")) .getFileStatus(any(Path.class)); - - // test open method that calls openFileWithOptions internally - in = fs.open(new Path(testFile + "0")); - in.read(readBuf); - assertArrayEquals("Open method: Incorrect read data", buffer, readBuf); } /** From 2752efb59068874d84e8f796d68ce4fc1745a7a2 Mon Sep 17 00:00:00 2001 From: sumangala Date: Thu, 6 May 2021 18:16:45 +0530 Subject: [PATCH 06/20] one file > readBufferSize --- .../services/TestAbfsInputStream.java | 101 +++++++++++------- 1 file changed, 60 insertions(+), 41 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 1efcd8ab73fa0..040335d64781e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -24,6 +24,7 @@ import org.junit.Test; import java.util.Arrays; import java.util.Random; +import java.util.concurrent.ExecutionException; import org.assertj.core.api.Assertions; @@ -190,67 +191,85 @@ public TestAbfsInputStream() throws Exception { ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD); } + private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + fs.create(testFile); + FSDataOutputStream out = fs.append(testFile); + out.write(buffer); + out.close(); + } + + private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, + byte[] buf, AbfsRestOperationType source) throws IOException, + ExecutionException, + InterruptedException { + byte[] readBuf = new byte[buf.length]; + FSDataInputStream in = getFileSystem().openFileWithOptions(path, + new OpenFileParameters().withStatus(fileStatus)).get(); + assertEquals(String.format( + "Open with fileStatus [from %s result]: Incorrect number of bytes read", + source), buf.length, in.read(readBuf)); + assertArrayEquals(String + .format("Open with fileStatus [from %s result]: Incorrect read data", + source), readBuf, buf); + } + @Test public void testOpenFileWithOptions() throws Exception { AzureBlobFileSystem fs = getFileSystem(); String testFolder = "/testFolder"; - String testFile = testFolder + "/testFile"; + Path smallTestFile = new Path(testFolder + "/testFile0"); + Path largeTestFile = new Path(testFolder + "/testFile1"); fs.mkdirs(new Path(testFolder)); - byte[] buffer = new byte[5]; - new Random().nextBytes(buffer); - for (int i = 0; i < 3; i++) { - Path file = new Path(testFile + i); - fs.create(file); - FSDataOutputStream out = fs.append(file); - out.write(buffer); - out.close(); - } + int readBufferSize = getConfiguration().getReadBufferSize(); + byte[] largeBuffer = new byte[readBufferSize + 5]; + byte[] smallBuffer = new byte[5]; + new Random().nextBytes(largeBuffer); + new Random().nextBytes(smallBuffer); + writeBufferToNewFile(smallTestFile, smallBuffer); + writeBufferToNewFile(largeTestFile, largeBuffer); // open with fileStatus from GetPathStatus - Path filePath = new Path(testFile + "0"); - FileStatus fileStatus = fs.getFileStatus(filePath); - FSDataInputStream in = fs.openFileWithOptions(filePath, - new OpenFileParameters().withStatus(fileStatus)).get(); - byte[] readBuf = new byte[5]; - assertEquals("Incorrect number of bytes read", buffer.length, - in.read(readBuf)); - assertArrayEquals( - "Open with FileStatus from GetPathStatus: Incorrect read data", buffer, - readBuf); + verifyOpenWithProvidedStatus(smallTestFile, fs.getFileStatus(smallTestFile), + smallBuffer, AbfsRestOperationType.GetPathStatus); + verifyOpenWithProvidedStatus(largeTestFile, fs.getFileStatus(largeTestFile), + largeBuffer, AbfsRestOperationType.GetPathStatus); // open with fileStatus from ListStatus FileStatus[] fileStatuses = fs.listStatus(new Path(testFolder)); - for (int i = 0; i < 3; i++) { - in = fs.openFileWithOptions(new Path(testFile + i), - new OpenFileParameters().withStatus(fileStatuses[i])).get(); - assertEquals("Incorrect number of bytes read", buffer.length, - in.read(readBuf)); - assertArrayEquals( - "Open with fileStatus from ListStatus: Incorrect read data", buffer, - readBuf); - } + verifyOpenWithProvidedStatus(smallTestFile, fileStatuses[0], smallBuffer, + AbfsRestOperationType.ListPaths); + verifyOpenWithProvidedStatus(largeTestFile, fileStatuses[1], largeBuffer, + AbfsRestOperationType.ListPaths); - // verify GetPathStatus not invoked when FileStatus is provided + // verify number of GetPathStatus invocations AzureBlobFileSystemStore store = new AzureBlobFileSystemStore(fs.getUri(), fs.isSecureScheme(), getRawConfiguration(), new AbfsCountersImpl(fs.getUri())); - AzureBlobFileSystemStore mockStore = spy(store); + checkGetPathStatusCalls(smallTestFile, fs.getFileStatus(smallTestFile), + spy(store), AbfsRestOperationType.GetPathStatus); + checkGetPathStatusCalls(largeTestFile, fs.getFileStatus(largeTestFile), + spy(store), AbfsRestOperationType.GetPathStatus); + checkGetPathStatusCalls(smallTestFile, fileStatuses[0], spy(store), + AbfsRestOperationType.ListPaths); + checkGetPathStatusCalls(largeTestFile, fileStatuses[1], spy(store), + AbfsRestOperationType.ListPaths); - mockStore.openFileForRead(new Path(testFile + "2"), - new OpenFileParameters().withStatus(fileStatuses[2]), null); - verify(mockStore, times(0).description( - "FileStatus [from ListPaths result] provided, GetFileStatus should not be invoked")) - .getFileStatus(any(Path.class)); + } + + void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, + AzureBlobFileSystemStore mockStore, AbfsRestOperationType source) + throws IOException { - mockStore.openFileForRead(new Path(testFile + "0"), + // verify GetPathStatus not invoked when FileStatus is provided + mockStore.openFileForRead(testFile, new OpenFileParameters().withStatus(fileStatus), null); - verify(mockStore, times(0).description( - "FileStatus [from GetPathStatus result] provided, GetFileStatus should not be invoked again")) + verify(mockStore, times(0).description((String.format( + "FileStatus [from %s result] provided, GetFileStatus should not be invoked", source)))) .getFileStatus(any(Path.class)); // verify GetPathStatus invoked when FileStatus not provided - mockStore.openFileForRead(new Path(testFile + "2"), - new OpenFileParameters(), null); + mockStore.openFileForRead(testFile, new OpenFileParameters(), null); verify(mockStore, times(1).description( "GetPathStatus should be invoked when FileStatus not provided")) .getFileStatus(any(Path.class)); From 02bfa1e7e8af44e29a42ccc3b71f86e0c1687976 Mon Sep 17 00:00:00 2001 From: sumangala Date: Fri, 7 May 2021 11:06:44 +0530 Subject: [PATCH 07/20] speed++ --- .../services/TestAbfsInputStream.java | 58 ++++++++++--------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 040335d64781e..bfdc98211c09d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException; import org.assertj.core.api.Assertions; +import org.mockito.Mockito; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -200,9 +201,8 @@ private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOExcepti } private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, - byte[] buf, AbfsRestOperationType source) throws IOException, - ExecutionException, - InterruptedException { + byte[] buf, AbfsRestOperationType source) + throws IOException, ExecutionException, InterruptedException { byte[] readBuf = new byte[buf.length]; FSDataInputStream in = getFileSystem().openFileWithOptions(path, new OpenFileParameters().withStatus(fileStatus)).get(); @@ -214,6 +214,26 @@ private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, source), readBuf, buf); } + void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, + AzureBlobFileSystemStore mockStore, AbfsRestOperationType source) + throws IOException { + + // verify GetPathStatus not invoked when FileStatus is provided + mockStore.openFileForRead(testFile, + new OpenFileParameters().withStatus(fileStatus), null); + verify(mockStore, times(0).description((String.format( + "FileStatus [from %s result] provided, GetFileStatus should not be invoked", source)))) + .getFileStatus(any(Path.class)); + + // verify GetPathStatus invoked when FileStatus not provided + mockStore.openFileForRead(testFile, new OpenFileParameters(), null); + verify(mockStore, times(1).description( + "GetPathStatus should be invoked when FileStatus not provided")) + .getFileStatus(any(Path.class)); + + Mockito.reset(mockStore); //clears invocation count for next test case + } + @Test public void testOpenFileWithOptions() throws Exception { AzureBlobFileSystem fs = getFileSystem(); @@ -246,33 +266,15 @@ public void testOpenFileWithOptions() throws Exception { AzureBlobFileSystemStore store = new AzureBlobFileSystemStore(fs.getUri(), fs.isSecureScheme(), getRawConfiguration(), new AbfsCountersImpl(fs.getUri())); + AzureBlobFileSystemStore mockStore = spy(store); checkGetPathStatusCalls(smallTestFile, fs.getFileStatus(smallTestFile), - spy(store), AbfsRestOperationType.GetPathStatus); + mockStore, AbfsRestOperationType.GetPathStatus); checkGetPathStatusCalls(largeTestFile, fs.getFileStatus(largeTestFile), - spy(store), AbfsRestOperationType.GetPathStatus); - checkGetPathStatusCalls(smallTestFile, fileStatuses[0], spy(store), - AbfsRestOperationType.ListPaths); - checkGetPathStatusCalls(largeTestFile, fileStatuses[1], spy(store), - AbfsRestOperationType.ListPaths); - - } - - void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, - AzureBlobFileSystemStore mockStore, AbfsRestOperationType source) - throws IOException { - - // verify GetPathStatus not invoked when FileStatus is provided - mockStore.openFileForRead(testFile, - new OpenFileParameters().withStatus(fileStatus), null); - verify(mockStore, times(0).description((String.format( - "FileStatus [from %s result] provided, GetFileStatus should not be invoked", source)))) - .getFileStatus(any(Path.class)); - - // verify GetPathStatus invoked when FileStatus not provided - mockStore.openFileForRead(testFile, new OpenFileParameters(), null); - verify(mockStore, times(1).description( - "GetPathStatus should be invoked when FileStatus not provided")) - .getFileStatus(any(Path.class)); + mockStore, AbfsRestOperationType.GetPathStatus); + checkGetPathStatusCalls(smallTestFile, fileStatuses[0], + mockStore, AbfsRestOperationType.ListPaths); + checkGetPathStatusCalls(largeTestFile, fileStatuses[1], + mockStore, AbfsRestOperationType.ListPaths); } /** From 75c596195a18b01c633e34cc890c19c41d5801d4 Mon Sep 17 00:00:00 2001 From: sumangala Date: Fri, 7 May 2021 14:06:19 +0530 Subject: [PATCH 08/20] minor change --- .../services/TestAbfsInputStream.java | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index bfdc98211c09d..1ca89f82068e1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -22,6 +22,7 @@ import org.junit.Assert; import org.junit.Test; + import java.util.Arrays; import java.util.Random; import java.util.concurrent.ExecutionException; @@ -214,7 +215,7 @@ private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, source), readBuf, buf); } - void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, + private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, AzureBlobFileSystemStore mockStore, AbfsRestOperationType source) throws IOException { @@ -242,24 +243,27 @@ public void testOpenFileWithOptions() throws Exception { Path largeTestFile = new Path(testFolder + "/testFile1"); fs.mkdirs(new Path(testFolder)); int readBufferSize = getConfiguration().getReadBufferSize(); - byte[] largeBuffer = new byte[readBufferSize + 5]; byte[] smallBuffer = new byte[5]; - new Random().nextBytes(largeBuffer); + byte[] largeBuffer = new byte[readBufferSize + 5]; new Random().nextBytes(smallBuffer); + new Random().nextBytes(largeBuffer); writeBufferToNewFile(smallTestFile, smallBuffer); writeBufferToNewFile(largeTestFile, largeBuffer); + FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile), + fs.getFileStatus(largeTestFile)}; + FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder)); + // open with fileStatus from GetPathStatus - verifyOpenWithProvidedStatus(smallTestFile, fs.getFileStatus(smallTestFile), + verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0], smallBuffer, AbfsRestOperationType.GetPathStatus); - verifyOpenWithProvidedStatus(largeTestFile, fs.getFileStatus(largeTestFile), + verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1], largeBuffer, AbfsRestOperationType.GetPathStatus); // open with fileStatus from ListStatus - FileStatus[] fileStatuses = fs.listStatus(new Path(testFolder)); - verifyOpenWithProvidedStatus(smallTestFile, fileStatuses[0], smallBuffer, + verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer, AbfsRestOperationType.ListPaths); - verifyOpenWithProvidedStatus(largeTestFile, fileStatuses[1], largeBuffer, + verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer, AbfsRestOperationType.ListPaths); // verify number of GetPathStatus invocations @@ -267,13 +271,13 @@ public void testOpenFileWithOptions() throws Exception { fs.isSecureScheme(), getRawConfiguration(), new AbfsCountersImpl(fs.getUri())); AzureBlobFileSystemStore mockStore = spy(store); - checkGetPathStatusCalls(smallTestFile, fs.getFileStatus(smallTestFile), + checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0], mockStore, AbfsRestOperationType.GetPathStatus); - checkGetPathStatusCalls(largeTestFile, fs.getFileStatus(largeTestFile), + checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1], mockStore, AbfsRestOperationType.GetPathStatus); - checkGetPathStatusCalls(smallTestFile, fileStatuses[0], + checkGetPathStatusCalls(smallTestFile, listStatusResults[0], mockStore, AbfsRestOperationType.ListPaths); - checkGetPathStatusCalls(largeTestFile, fileStatuses[1], + checkGetPathStatusCalls(largeTestFile, listStatusResults[1], mockStore, AbfsRestOperationType.ListPaths); } From 31ac9a2f2dd0034eb54a14f44a996e8a3b6f05fe Mon Sep 17 00:00:00 2001 From: sumangala Date: Fri, 7 May 2021 14:10:48 +0530 Subject: [PATCH 09/20] import --- .../hadoop/fs/azurebfs/services/TestAbfsInputStream.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 1ca89f82068e1..08e81aef720d2 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -19,15 +19,13 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; - -import org.junit.Assert; -import org.junit.Test; - import java.util.Arrays; import java.util.Random; import java.util.concurrent.ExecutionException; import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Test; import org.mockito.Mockito; import org.apache.hadoop.conf.Configuration; From de9e6ea0fae6193a4eccb66408915c7ba900fce1 Mon Sep 17 00:00:00 2001 From: sumangala Date: Tue, 13 Jul 2021 11:28:44 +0530 Subject: [PATCH 10/20] revw --- .../org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java | 2 +- .../hadoop/fs/azurebfs/services/TestAbfsInputStream.java | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index c297af1aff776..4fdeb7547d36d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -236,7 +236,7 @@ private FSDataInputStream open(final Path path, * Configuration, bufferSize and mandatoryKeys */ @Override - public CompletableFuture openFileWithOptions( + protected CompletableFuture openFileWithOptions( final Path path, final OpenFileParameters parameters) { LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path); Set mandatoryKeys = parameters.getMandatoryKeys(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 48f93e2901e58..3626dab3e2b4e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; @@ -210,8 +211,10 @@ private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, byte[] buf, AbfsRestOperationType source) throws IOException, ExecutionException, InterruptedException { byte[] readBuf = new byte[buf.length]; - FSDataInputStream in = getFileSystem().openFileWithOptions(path, - new OpenFileParameters().withStatus(fileStatus)).get(); + AzureBlobFileSystem fs = getFileSystem(); + FutureDataInputStreamBuilder builder = fs.openFile(path); + builder.withFileStatus(fileStatus); + FSDataInputStream in = builder.build().get(); assertEquals(String.format( "Open with fileStatus [from %s result]: Incorrect number of bytes read", source), buf.length, in.read(readBuf)); From dd777510db00c61cbd48646f3e0e556f7fd5c2c2 Mon Sep 17 00:00:00 2001 From: sumangala Date: Wed, 14 Jul 2021 04:40:31 +0530 Subject: [PATCH 11/20] call client fn + test spy --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 20 ++++++--- .../azurebfs/AbstractAbfsIntegrationTest.java | 10 +++++ .../services/TestAbfsInputStream.java | 43 ++++++++++--------- 3 files changed, 47 insertions(+), 26 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index cf27e663b79a8..937131a914c1a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -695,14 +695,22 @@ public AbfsInputStream openFileForRead(final Path path, } String relativePath = getRelativePath(path); - if (fileStatus == null) { - fileStatus = getFileStatus(new Path(relativePath), tracingContext); + String resourceType, eTag; + long contentLength; + if (fileStatus != null) { + resourceType = fileStatus.isFile() ? FILE : DIRECTORY; + contentLength = fileStatus.getLen(); + eTag = ((VersionedFileStatus) fileStatus).getVersion(); + } else { + AbfsHttpOperation op = client + .getPathStatus(relativePath, false, tracingContext).getResult(); + resourceType = op + .getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + contentLength = Long.parseLong( + op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); } - String resourceType = fileStatus.isFile() ? FILE : DIRECTORY; - long contentLength = fileStatus.getLen(); - String eTag = ((VersionedFileStatus) fileStatus).getVersion(); - if (parseIsDirectory(resourceType)) { throw new AbfsRestOperationException( AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 2497f8f1b63c5..8770edd550f40 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; @@ -427,6 +428,15 @@ public AzureBlobFileSystemStore getAbfsStore(final AzureBlobFileSystem fs) { return fs.getAbfsStore(); } + public AbfsClient getAbfsClient(final AzureBlobFileSystemStore abfsStore) { + return abfsStore.getClient(); + } + + public void setAbfsClient(AzureBlobFileSystemStore abfsStore, + AbfsClient client) { + abfsStore.setClient(client); + } + public Path makeQualified(Path path) throws java.io.IOException { return getFileSystem().makeQualified(path); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 3626dab3e2b4e..dcda13305aec9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -47,6 +47,8 @@ import org.apache.hadoop.fs.impl.OpenFileParameters; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -224,23 +226,25 @@ private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, } private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, - AzureBlobFileSystemStore mockStore, AbfsRestOperationType source, - TracingContext tracingContext) throws IOException { + AzureBlobFileSystemStore abfsStore, AbfsClient mockClient, + AbfsRestOperationType source, TracingContext tracingContext) + throws IOException { // verify GetPathStatus not invoked when FileStatus is provided - mockStore.openFileForRead(testFile, + abfsStore.openFileForRead(testFile, new OpenFileParameters().withStatus(fileStatus), null, tracingContext); - verify(mockStore, times(0).description((String.format( - "FileStatus [from %s result] provided, GetFileStatus should not be invoked", source)))) - .getFileStatus(any(Path.class), any(TracingContext.class)); - - // verify GetPathStatus invoked when FileStatus not provided - mockStore.openFileForRead(testFile, new OpenFileParameters(), null, tracingContext); - verify(mockStore, times(1).description( + verify(mockClient, times(0).description((String.format( + "FileStatus [from %s result] provided, GetFileStatus should not be invoked", + source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); + + // verify GetPathStatus invoked when FileStatus not provided + abfsStore.openFileForRead(testFile, new OpenFileParameters(), null, + tracingContext); + verify(mockClient, times(1).description( "GetPathStatus should be invoked when FileStatus not provided")) - .getFileStatus(any(Path.class), any(TracingContext.class)); + .getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); - Mockito.reset(mockStore); //clears invocation count for next test case + Mockito.reset(mockClient); //clears invocation count for next test case } @Test @@ -275,19 +279,18 @@ public void testOpenFileWithOptions() throws Exception { AbfsRestOperationType.ListPaths); // verify number of GetPathStatus invocations - AzureBlobFileSystemStore store = new AzureBlobFileSystemStore(fs.getUri(), - fs.isSecureScheme(), getRawConfiguration(), - new AbfsCountersImpl(fs.getUri())); - AzureBlobFileSystemStore mockStore = spy(store); + AzureBlobFileSystemStore abfsStore = getAbfsStore(fs); + AbfsClient mockClient = spy(getAbfsClient(abfsStore)); + setAbfsClient(abfsStore, mockClient); TracingContext tracingContext = getTestTracingContext(fs, false); checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0], - mockStore, AbfsRestOperationType.GetPathStatus, tracingContext); + abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext); checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1], - mockStore, AbfsRestOperationType.GetPathStatus, tracingContext); + abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext); checkGetPathStatusCalls(smallTestFile, listStatusResults[0], - mockStore, AbfsRestOperationType.ListPaths, tracingContext); + abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); checkGetPathStatusCalls(largeTestFile, listStatusResults[1], - mockStore, AbfsRestOperationType.ListPaths, tracingContext); + abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); } /** From 2e23baff44f6422075a3897f9bb2d23a91e4e338 Mon Sep 17 00:00:00 2001 From: sumangala Date: Wed, 14 Jul 2021 04:42:24 +0530 Subject: [PATCH 12/20] checkstyle --- .../java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 4fdeb7547d36d..4deb346ec3c9d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -33,7 +33,6 @@ import java.util.Collections; import java.util.EnumSet; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; From ed8e0340921aecee1637232e03f3a65fd5a363a8 Mon Sep 17 00:00:00 2001 From: sumangala Date: Wed, 14 Jul 2021 15:11:33 +0530 Subject: [PATCH 13/20] yetus + chkstyle --- .../apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index dcda13305aec9..d4d385b267778 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -35,7 +35,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; From a88399504692806b56050e2559e4442c56cb8a67 Mon Sep 17 00:00:00 2001 From: sumangala Date: Fri, 23 Jul 2021 12:50:56 +0530 Subject: [PATCH 14/20] optional --- .../fs/azurebfs/AzureBlobFileSystem.java | 18 ++++++-------- .../fs/azurebfs/AzureBlobFileSystemStore.java | 24 +++++++++---------- .../services/TestAbfsInputStream.java | 8 ++++--- 3 files changed, 23 insertions(+), 27 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 4deb346ec3c9d..60924e6b3898d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; @@ -204,11 +205,11 @@ public void registerListener(Listener listener1) { public FSDataInputStream open(final Path path, final int bufferSize) throws IOException { LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize); // bufferSize is unused. - return open(path, new OpenFileParameters()); + return open(path, Optional.empty()); } private FSDataInputStream open(final Path path, - final OpenFileParameters parameters) throws IOException { + final Optional parameters) throws IOException { statIncrement(CALL_OPEN); Path qualifiedPath = makeQualified(path); @@ -216,8 +217,7 @@ private FSDataInputStream open(final Path path, TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener); InputStream inputStream = abfsStore - .openFileForRead(qualifiedPath, parameters, statistics, - tracingContext); + .openFileForRead(qualifiedPath, parameters, statistics, tracingContext); return new FSDataInputStream(inputStream); } catch(AzureBlobFileSystemException ex) { checkException(path, ex); @@ -236,19 +236,15 @@ private FSDataInputStream open(final Path path, */ @Override protected CompletableFuture openFileWithOptions( - final Path path, final OpenFileParameters parameters) { + final Path path, final OpenFileParameters parameters) throws IOException { LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path); - Set mandatoryKeys = parameters.getMandatoryKeys(); - if (mandatoryKeys == null) { - mandatoryKeys = Collections.emptySet(); - } AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( - mandatoryKeys, + parameters.getMandatoryKeys(), Collections.emptySet(), "for " + path); return LambdaUtils.eval( new CompletableFuture<>(), () -> - open(path, parameters)); + open(path, Optional.of(parameters))); } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 6b8056e96a3b0..1b02e948861b7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -673,12 +673,12 @@ public void createDirectory(final Path path, final FsPermission permission, public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics, TracingContext tracingContext) throws IOException { - return openFileForRead(path, new OpenFileParameters(), statistics, + return openFileForRead(path, Optional.empty(), statistics, tracingContext); } public AbfsInputStream openFileForRead(final Path path, - final OpenFileParameters parameters, + final Optional parameters, final FileSystem.Statistics statistics, TracingContext tracingContext) throws IOException { try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", @@ -686,14 +686,10 @@ public AbfsInputStream openFileForRead(final Path path, LOG.debug("openFileForRead filesystem: {} path: {}", client.getFileSystem(), path); - Optional options = Optional.empty(); - FileStatus fileStatus = null; - if (parameters != null) { - options = Optional.ofNullable(parameters.getOptions()); - fileStatus = parameters.getStatus(); + if (parameters.isPresent()) { + fileStatus = parameters.get().getStatus(); } - String relativePath = getRelativePath(path); String resourceType, eTag; long contentLength; @@ -723,16 +719,18 @@ public AbfsInputStream openFileForRead(final Path path, // Add statistics for InputStream return new AbfsInputStream(client, statistics, relativePath, - contentLength, populateAbfsInputStreamContext(options), eTag, + contentLength, populateAbfsInputStreamContext(parameters), eTag, tracingContext); } } private AbfsInputStreamContext populateAbfsInputStreamContext( - Optional options) { - boolean bufferedPreadDisabled = options - .map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false)) - .orElse(false); + Optional parameters) { + boolean bufferedPreadDisabled = false; + if (parameters.isPresent() && parameters.get().getOptions() != null) { + bufferedPreadDisabled = parameters.get().getOptions() + .getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false); + } return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index d4d385b267778..31bf9e1ff6747 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Optional; import java.util.Random; import java.util.concurrent.ExecutionException; @@ -230,14 +231,15 @@ private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, throws IOException { // verify GetPathStatus not invoked when FileStatus is provided - abfsStore.openFileForRead(testFile, - new OpenFileParameters().withStatus(fileStatus), null, tracingContext); + abfsStore.openFileForRead(testFile, Optional + .ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, tracingContext); verify(mockClient, times(0).description((String.format( "FileStatus [from %s result] provided, GetFileStatus should not be invoked", source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); // verify GetPathStatus invoked when FileStatus not provided - abfsStore.openFileForRead(testFile, new OpenFileParameters(), null, + abfsStore.openFileForRead(testFile, + Optional.empty(), null, tracingContext); verify(mockClient, times(1).description( "GetPathStatus should be invoked when FileStatus not provided")) From 469671de17a3078d524d58c7a7e557dc083ffed3 Mon Sep 17 00:00:00 2001 From: sumangala Date: Mon, 26 Jul 2021 10:33:54 +0530 Subject: [PATCH 15/20] opt conf --- .../fs/azurebfs/AzureBlobFileSystem.java | 1 - .../fs/azurebfs/AzureBlobFileSystemStore.java | 21 ++++++++----------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 60924e6b3898d..91274289f5415 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -34,7 +34,6 @@ import java.util.EnumSet; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 1b02e948861b7..518ab2201380d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -686,10 +686,8 @@ public AbfsInputStream openFileForRead(final Path path, LOG.debug("openFileForRead filesystem: {} path: {}", client.getFileSystem(), path); - FileStatus fileStatus = null; - if (parameters.isPresent()) { - fileStatus = parameters.get().getStatus(); - } + FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus) + .orElse(null); String relativePath = getRelativePath(path); String resourceType, eTag; long contentLength; @@ -719,18 +717,17 @@ public AbfsInputStream openFileForRead(final Path path, // Add statistics for InputStream return new AbfsInputStream(client, statistics, relativePath, - contentLength, populateAbfsInputStreamContext(parameters), eTag, - tracingContext); + contentLength, populateAbfsInputStreamContext( + parameters.map(OpenFileParameters::getOptions)), + eTag, tracingContext); } } private AbfsInputStreamContext populateAbfsInputStreamContext( - Optional parameters) { - boolean bufferedPreadDisabled = false; - if (parameters.isPresent() && parameters.get().getOptions() != null) { - bufferedPreadDisabled = parameters.get().getOptions() - .getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false); - } + Optional options) { + boolean bufferedPreadDisabled = options + .map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false)) + .orElse(false); return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) From 1c50b7b977af758722565adad0c6e2620b70ef91 Mon Sep 17 00:00:00 2001 From: sumangala Date: Tue, 27 Jul 2021 11:44:10 +0530 Subject: [PATCH 16/20] support no etag + test --- .../hadoop/fs/azurebfs/AzureBlobFileSystemStore.java | 7 ++++++- .../apache/hadoop/fs/azurebfs/services/AbfsClient.java | 4 +++- .../hadoop/fs/azurebfs/services/TestAbfsInputStream.java | 8 +++++++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 518ab2201380d..14bdee0a7f151 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -131,6 +131,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; @@ -694,7 +695,11 @@ public AbfsInputStream openFileForRead(final Path path, if (fileStatus != null) { resourceType = fileStatus.isFile() ? FILE : DIRECTORY; contentLength = fileStatus.getLen(); - eTag = ((VersionedFileStatus) fileStatus).getVersion(); + if (fileStatus instanceof VersionedFileStatus) { + eTag = ((VersionedFileStatus) fileStatus).getVersion(); + } else { + eTag = EMPTY_STRING; + } } else { AbfsHttpOperation op = client .getPathStatus(relativePath, false, tracingContext).getResult(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 27206959ba533..b4821e472b5c4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -753,7 +753,9 @@ public AbfsRestOperation read(final String path, final long position, final byte addCustomerProvidedKeyHeaders(requestHeaders); requestHeaders.add(new AbfsHttpHeader(RANGE, String.format("bytes=%d-%d", position, position + bufferLength - 1))); - requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + if (!eTag.equals(EMPTY_STRING)) { + requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 31bf9e1ff6747..65b613c8f1361 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -237,7 +237,7 @@ private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, "FileStatus [from %s result] provided, GetFileStatus should not be invoked", source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); - // verify GetPathStatus invoked when FileStatus not provided + // verify GetPathStatus invoked when FileStatus not provided abfsStore.openFileForRead(testFile, Optional.empty(), null, tracingContext); @@ -292,6 +292,12 @@ public void testOpenFileWithOptions() throws Exception { abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); checkGetPathStatusCalls(largeTestFile, listStatusResults[1], abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); + + // Verify with general filestatus (not instance of VersionedFileStatus) + FileStatus fileStatus = new FileStatus(smallBuffer.length, false, 0, 0, + getFileStatusResults[0].getModificationTime(), smallTestFile); //no etag + verifyOpenWithProvidedStatus(smallTestFile, fileStatus, smallBuffer, + AbfsRestOperationType.GetPathStatus); } /** From e58dcdcd7c404e5ba57eb38213b99f6acf293a48 Mon Sep 17 00:00:00 2001 From: sumangala Date: Thu, 29 Jul 2021 09:41:48 +0530 Subject: [PATCH 17/20] fallback to getfilestatus --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 15 +++++++++------ .../hadoop/fs/azurebfs/services/AbfsClient.java | 4 +--- .../fs/azurebfs/services/TestAbfsInputStream.java | 10 ++++++---- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 14bdee0a7f151..51652fbea0006 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -692,14 +692,17 @@ public AbfsInputStream openFileForRead(final Path path, String relativePath = getRelativePath(path); String resourceType, eTag; long contentLength; - if (fileStatus != null) { + if (fileStatus instanceof VersionedFileStatus) { + if (fileStatus.getPath() != path) { + LOG.error(String.format( + "Filestatus path [%s] does not match with given path [%s]", + fileStatus.getPath(), path)); + throw new IOException( + "Provided fileStatus does not correspond to path" + path); + } resourceType = fileStatus.isFile() ? FILE : DIRECTORY; contentLength = fileStatus.getLen(); - if (fileStatus instanceof VersionedFileStatus) { - eTag = ((VersionedFileStatus) fileStatus).getVersion(); - } else { - eTag = EMPTY_STRING; - } + eTag = ((VersionedFileStatus) fileStatus).getVersion(); } else { AbfsHttpOperation op = client .getPathStatus(relativePath, false, tracingContext).getResult(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index b4821e472b5c4..27206959ba533 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -753,9 +753,7 @@ public AbfsRestOperation read(final String path, final long position, final byte addCustomerProvidedKeyHeaders(requestHeaders); requestHeaders.add(new AbfsHttpHeader(RANGE, String.format("bytes=%d-%d", position, position + bufferLength - 1))); - if (!eTag.equals(EMPTY_STRING)) { - requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); - } + requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 65b613c8f1361..e82ebcb4acdcf 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -293,11 +293,13 @@ public void testOpenFileWithOptions() throws Exception { checkGetPathStatusCalls(largeTestFile, listStatusResults[1], abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); - // Verify with general filestatus (not instance of VersionedFileStatus) + // Verify with incorrect filestatus FileStatus fileStatus = new FileStatus(smallBuffer.length, false, 0, 0, - getFileStatusResults[0].getModificationTime(), smallTestFile); //no etag - verifyOpenWithProvidedStatus(smallTestFile, fileStatus, smallBuffer, - AbfsRestOperationType.GetPathStatus); + getFileStatusResults[0].getModificationTime(), + new Path(smallTestFile + "wrongpath")); //no etag + intercept(IOException.class, + () -> verifyOpenWithProvidedStatus(smallTestFile, fileStatus, + smallBuffer, AbfsRestOperationType.GetPathStatus)); } /** From 79e8f8e5bd1f34fc375cea1bbc2eff75b23025ee Mon Sep 17 00:00:00 2001 From: sumangala Date: Thu, 29 Jul 2021 09:49:18 +0530 Subject: [PATCH 18/20] log warn --- .../hadoop/fs/azurebfs/AzureBlobFileSystemStore.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 51652fbea0006..a68c2cbfc4a12 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -704,10 +704,13 @@ public AbfsInputStream openFileForRead(final Path path, contentLength = fileStatus.getLen(); eTag = ((VersionedFileStatus) fileStatus).getVersion(); } else { - AbfsHttpOperation op = client - .getPathStatus(relativePath, false, tracingContext).getResult(); - resourceType = op - .getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + LOG.warn( + "Falling back to getPathStatus REST call as provided filestatus " + + "is not of type VersionedFileStatus"); + AbfsHttpOperation op = client.getPathStatus(relativePath, false, + tracingContext).getResult(); + resourceType = op.getResponseHeader( + HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); contentLength = Long.parseLong( op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); From 7e70d424544e196bd0299d1050094e8c311959cd Mon Sep 17 00:00:00 2001 From: sumangala Date: Thu, 29 Jul 2021 10:56:24 +0530 Subject: [PATCH 19/20] fix test --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 14 ++++++-------- .../fs/azurebfs/services/TestAbfsInputStream.java | 11 +++++------ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index a68c2cbfc4a12..0cfdd6b89d595 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -678,7 +678,7 @@ public AbfsInputStream openFileForRead(final Path path, tracingContext); } - public AbfsInputStream openFileForRead(final Path path, + public AbfsInputStream openFileForRead(Path path, final Optional parameters, final FileSystem.Statistics statistics, TracingContext tracingContext) throws IOException { @@ -693,13 +693,11 @@ public AbfsInputStream openFileForRead(final Path path, String resourceType, eTag; long contentLength; if (fileStatus instanceof VersionedFileStatus) { - if (fileStatus.getPath() != path) { - LOG.error(String.format( - "Filestatus path [%s] does not match with given path [%s]", - fileStatus.getPath(), path)); - throw new IOException( - "Provided fileStatus does not correspond to path" + path); - } + path = path.makeQualified(this.uri, path); + Preconditions.checkArgument(fileStatus.getPath().equals(path), + String.format( + "Filestatus path [%s] does not match with given path [%s]", + fileStatus.getPath(), path)); resourceType = fileStatus.isFile() ? FILE : DIRECTORY; contentLength = fileStatus.getLen(); eTag = ((VersionedFileStatus) fileStatus).getVersion(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index e82ebcb4acdcf..b1fdfe1efcaa9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -294,12 +294,11 @@ public void testOpenFileWithOptions() throws Exception { abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); // Verify with incorrect filestatus - FileStatus fileStatus = new FileStatus(smallBuffer.length, false, 0, 0, - getFileStatusResults[0].getModificationTime(), - new Path(smallTestFile + "wrongpath")); //no etag - intercept(IOException.class, - () -> verifyOpenWithProvidedStatus(smallTestFile, fileStatus, - smallBuffer, AbfsRestOperationType.GetPathStatus)); + getFileStatusResults[0].setPath(new Path("wrongPath")); + intercept(ExecutionException.class, + () -> verifyOpenWithProvidedStatus(smallTestFile, + getFileStatusResults[0], smallBuffer, + AbfsRestOperationType.GetPathStatus)); } /** From 7ccc6b019d27fb40794bc34317f51ee055dcf92a Mon Sep 17 00:00:00 2001 From: sumangala Date: Thu, 29 Jul 2021 12:02:00 +0530 Subject: [PATCH 20/20] warn freq --- .../hadoop/fs/azurebfs/AzureBlobFileSystemStore.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 0cfdd6b89d595..5202718b68e9a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -131,7 +131,6 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; @@ -702,9 +701,11 @@ public AbfsInputStream openFileForRead(Path path, contentLength = fileStatus.getLen(); eTag = ((VersionedFileStatus) fileStatus).getVersion(); } else { - LOG.warn( - "Falling back to getPathStatus REST call as provided filestatus " - + "is not of type VersionedFileStatus"); + if (fileStatus != null) { + LOG.warn( + "Fallback to getPathStatus REST call as provided filestatus " + + "is not of type VersionedFileStatus"); + } AbfsHttpOperation op = client.getPathStatus(relativePath, false, tracingContext).getResult(); resourceType = op.getResponseHeader(