Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,23 +208,31 @@ public FSDataInputStream open(final Path path, final int bufferSize) throws IOEx
}

private FSDataInputStream open(final Path path,
final Optional<Configuration> options) throws IOException {
final Optional<OpenFileParameters> parameters) throws IOException {
statIncrement(CALL_OPEN);
Path qualifiedPath = makeQualified(path);

try {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat,
listener);
InputStream inputStream = abfsStore.openFileForRead(qualifiedPath,
options, statistics, tracingContext);
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener);
InputStream inputStream = abfsStore
.openFileForRead(qualifiedPath, parameters, statistics, tracingContext);
return new FSDataInputStream(inputStream);
} catch(AzureBlobFileSystemException ex) {
checkException(path, ex);
return null;
}
}

/**
* Takes config and other options through
* {@link org.apache.hadoop.fs.impl.OpenFileParameters}. Ensure that
* FileStatus entered is up-to-date, as it will be used to create the
* InputStream (with info such as contentLength, eTag)
* @param path The location of file to be opened
* @param parameters OpenFileParameters instance; can hold FileStatus,
* Configuration, bufferSize and mandatoryKeys
*/
@Override
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final Path path, final OpenFileParameters parameters) throws IOException {
Expand All @@ -235,7 +243,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () ->
open(path, Optional.of(parameters.getOptions())));
open(path, Optional.of(parameters)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
Expand All @@ -129,6 +130,8 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
Expand Down Expand Up @@ -669,44 +672,64 @@ public void createDirectory(final Path path, final FsPermission permission,

public AbfsInputStream openFileForRead(final Path path,
final FileSystem.Statistics statistics, TracingContext tracingContext)
throws AzureBlobFileSystemException {
return openFileForRead(path, Optional.empty(), statistics, tracingContext);
throws IOException {
return openFileForRead(path, Optional.empty(), statistics,
tracingContext);
}

public AbfsInputStream openFileForRead(final Path path,
final Optional<Configuration> options,
public AbfsInputStream openFileForRead(Path path,
final Optional<OpenFileParameters> parameters,
final FileSystem.Statistics statistics, TracingContext tracingContext)
throws AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) {
throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForRead",
"getPathStatus")) {
LOG.debug("openFileForRead filesystem: {} path: {}",
client.getFileSystem(),
path);
client.getFileSystem(), path);

FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
.orElse(null);
String relativePath = getRelativePath(path);

final AbfsRestOperation op = client
.getPathStatus(relativePath, false, tracingContext);
perfInfo.registerResult(op.getResult());

final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
String resourceType, eTag;
long contentLength;
if (fileStatus instanceof VersionedFileStatus) {
path = path.makeQualified(this.uri, path);
Preconditions.checkArgument(fileStatus.getPath().equals(path),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only check getPath().getName() for equivalence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make changes in HADOOP-17896
I believe getName() will verify the file name only. Would it allow paths with parent directories that don't match?

String.format(
"Filestatus path [%s] does not match with given path [%s]",
fileStatus.getPath(), path));
resourceType = fileStatus.isFile() ? FILE : DIRECTORY;
contentLength = fileStatus.getLen();
eTag = ((VersionedFileStatus) fileStatus).getVersion();
} else {
if (fileStatus != null) {
LOG.warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, just use length & type, don't worry about etag. Or at least just log @ debug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will modify in new JIRA

"Fallback to getPathStatus REST call as provided filestatus "
+ "is not of type VersionedFileStatus");
}
AbfsHttpOperation op = client.getPathStatus(relativePath, false,
tracingContext).getResult();
resourceType = op.getResponseHeader(
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
contentLength = Long.parseLong(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
}

if (parseIsDirectory(resourceType)) {
throw new AbfsRestOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a test for this

Copy link
Contributor Author

@sumangala-patki sumangala-patki Sep 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there is a contract test to verify that a FileNotFound exception is thrown on attempt to open a directory
org.apache.hadoop.fs.contract.AbstractContractOpenTest#testOpenReadDir

AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
"openFileForRead must be used with files and not directories",
null);
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
"openFileForRead must be used with files and not directories",
null);
}

perfInfo.registerSuccess(true);

// Add statistics for InputStream
return new AbfsInputStream(client, statistics,
relativePath, contentLength,
populateAbfsInputStreamContext(options),
eTag, tracingContext);
return new AbfsInputStream(client, statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
parameters.map(OpenFileParameters::getOptions)),
eTag, tracingContext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
Expand Down Expand Up @@ -427,6 +428,15 @@ public AzureBlobFileSystemStore getAbfsStore(final AzureBlobFileSystem fs) {
return fs.getAbfsStore();
}

public AbfsClient getAbfsClient(final AzureBlobFileSystemStore abfsStore) {
return abfsStore.getClient();
}

public void setAbfsClient(AzureBlobFileSystemStore abfsStore,
AbfsClient client) {
abfsStore.setClient(client);
}

public Path makeQualified(Path path) throws java.io.IOException {
return getFileSystem().makeQualified(path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,40 @@
package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;

import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;

import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.impl.OpenFileParameters;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -192,6 +201,106 @@ public TestAbfsInputStream() throws Exception {
ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
}

private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException {
AzureBlobFileSystem fs = getFileSystem();
fs.create(testFile);
FSDataOutputStream out = fs.append(testFile);
out.write(buffer);
out.close();
}

private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus,
byte[] buf, AbfsRestOperationType source)
throws IOException, ExecutionException, InterruptedException {
byte[] readBuf = new byte[buf.length];
AzureBlobFileSystem fs = getFileSystem();
FutureDataInputStreamBuilder builder = fs.openFile(path);
builder.withFileStatus(fileStatus);
FSDataInputStream in = builder.build().get();
assertEquals(String.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use AssertJ

"Open with fileStatus [from %s result]: Incorrect number of bytes read",
source), buf.length, in.read(readBuf));
assertArrayEquals(String
.format("Open with fileStatus [from %s result]: Incorrect read data",
source), readBuf, buf);
}

private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus,
AzureBlobFileSystemStore abfsStore, AbfsClient mockClient,
AbfsRestOperationType source, TracingContext tracingContext)
throws IOException {

// verify GetPathStatus not invoked when FileStatus is provided
abfsStore.openFileForRead(testFile, Optional
.ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, tracingContext);
verify(mockClient, times(0).description((String.format(
"FileStatus [from %s result] provided, GetFileStatus should not be invoked",
source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));

// verify GetPathStatus invoked when FileStatus not provided
abfsStore.openFileForRead(testFile,
Optional.empty(), null,
tracingContext);
verify(mockClient, times(1).description(
"GetPathStatus should be invoked when FileStatus not provided"))
.getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));

Mockito.reset(mockClient); //clears invocation count for next test case
}

@Test
public void testOpenFileWithOptions() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
String testFolder = "/testFolder";
Path smallTestFile = new Path(testFolder + "/testFile0");
Path largeTestFile = new Path(testFolder + "/testFile1");
fs.mkdirs(new Path(testFolder));
int readBufferSize = getConfiguration().getReadBufferSize();
byte[] smallBuffer = new byte[5];
byte[] largeBuffer = new byte[readBufferSize + 5];
new Random().nextBytes(smallBuffer);
new Random().nextBytes(largeBuffer);
writeBufferToNewFile(smallTestFile, smallBuffer);
writeBufferToNewFile(largeTestFile, largeBuffer);

FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile),
fs.getFileStatus(largeTestFile)};
FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder));

// open with fileStatus from GetPathStatus
verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0],
smallBuffer, AbfsRestOperationType.GetPathStatus);
verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1],
largeBuffer, AbfsRestOperationType.GetPathStatus);

// open with fileStatus from ListStatus
verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer,
AbfsRestOperationType.ListPaths);
verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer,
AbfsRestOperationType.ListPaths);

// verify number of GetPathStatus invocations
AzureBlobFileSystemStore abfsStore = getAbfsStore(fs);
AbfsClient mockClient = spy(getAbfsClient(abfsStore));
setAbfsClient(abfsStore, mockClient);
TracingContext tracingContext = getTestTracingContext(fs, false);
checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0],
abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1],
abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
checkGetPathStatusCalls(smallTestFile, listStatusResults[0],
abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
checkGetPathStatusCalls(largeTestFile, listStatusResults[1],
abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);

// Verify with incorrect filestatus
getFileStatusResults[0].setPath(new Path("wrongPath"));
intercept(ExecutionException.class,
() -> verifyOpenWithProvidedStatus(smallTestFile,
getFileStatusResults[0], smallBuffer,
AbfsRestOperationType.GetPathStatus));
}

/**
* This test expects AbfsInputStream to throw the exception that readAhead
* thread received on read. The readAhead thread must be initiated from the
Expand Down