Skip to content

Commit 97735de

Browse files
sumangala-patkisteveloughran
authored andcommitted
HADOOP-17682. ABFS: Support FileStatus input to OpenFileWithOptions() via OpenFileParameters (apache#2975)
1 parent bb13e22 commit 97735de

File tree

4 files changed

+183
-33
lines changed

4 files changed

+183
-33
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -262,23 +262,31 @@ public FSDataInputStream open(final Path path, final int bufferSize) throws IOEx
262262
}
263263

264264
private FSDataInputStream open(final Path path,
265-
final Optional<Configuration> options) throws IOException {
265+
final Optional<OpenFileParameters> parameters) throws IOException {
266266
statIncrement(CALL_OPEN);
267267
Path qualifiedPath = makeQualified(path);
268268

269269
try {
270270
TracingContext tracingContext = new TracingContext(clientCorrelationId,
271-
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat,
272-
listener);
273-
InputStream inputStream = abfsStore.openFileForRead(qualifiedPath,
274-
options, statistics, tracingContext);
271+
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener);
272+
InputStream inputStream = abfsStore
273+
.openFileForRead(qualifiedPath, parameters, statistics, tracingContext);
275274
return new FSDataInputStream(inputStream);
276275
} catch (AzureBlobFileSystemException ex) {
277276
checkException(path, ex);
278277
return null;
279278
}
280279
}
281280

281+
/**
282+
* Takes config and other options through
283+
* {@link org.apache.hadoop.fs.impl.OpenFileParameters}. Ensure that
284+
* FileStatus entered is up-to-date, as it will be used to create the
285+
* InputStream (with info such as contentLength, eTag)
286+
* @param path The location of file to be opened
287+
* @param parameters OpenFileParameters instance; can hold FileStatus,
288+
* Configuration, bufferSize and mandatoryKeys
289+
*/
282290
@Override
283291
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
284292
final Path path, final OpenFileParameters parameters) throws IOException {
@@ -289,7 +297,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
289297
"for " + path);
290298
return LambdaUtils.eval(
291299
new CompletableFuture<>(), () ->
292-
open(path, Optional.of(parameters.getOptions())));
300+
open(path, Optional.of(parameters)));
293301
}
294302

295303
@Override

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@
119119
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
120120
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
121121
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
122+
import org.apache.hadoop.fs.impl.OpenFileParameters;
122123
import org.apache.hadoop.fs.permission.AclEntry;
123124
import org.apache.hadoop.fs.permission.AclStatus;
124125
import org.apache.hadoop.fs.permission.FsAction;
@@ -137,6 +138,8 @@
137138
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS;
138139
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
139140
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
141+
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
142+
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
140143
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
141144
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
142145
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
@@ -733,44 +736,64 @@ public void createDirectory(final Path path, final FsPermission permission,
733736

734737
public AbfsInputStream openFileForRead(final Path path,
735738
final FileSystem.Statistics statistics, TracingContext tracingContext)
736-
throws AzureBlobFileSystemException {
737-
return openFileForRead(path, Optional.empty(), statistics, tracingContext);
739+
throws IOException {
740+
return openFileForRead(path, Optional.empty(), statistics,
741+
tracingContext);
738742
}
739743

740-
public AbfsInputStream openFileForRead(final Path path,
741-
final Optional<Configuration> options,
744+
public AbfsInputStream openFileForRead(Path path,
745+
final Optional<OpenFileParameters> parameters,
742746
final FileSystem.Statistics statistics, TracingContext tracingContext)
743-
throws AzureBlobFileSystemException {
744-
try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) {
747+
throws IOException {
748+
try (AbfsPerfInfo perfInfo = startTracking("openFileForRead",
749+
"getPathStatus")) {
745750
LOG.debug("openFileForRead filesystem: {} path: {}",
746-
client.getFileSystem(),
747-
path);
751+
client.getFileSystem(), path);
748752

753+
FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
754+
.orElse(null);
749755
String relativePath = getRelativePath(path);
750-
751-
final AbfsRestOperation op = client
752-
.getPathStatus(relativePath, false, tracingContext);
753-
perfInfo.registerResult(op.getResult());
754-
755-
final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
756-
final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
757-
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
756+
String resourceType, eTag;
757+
long contentLength;
758+
if (fileStatus instanceof VersionedFileStatus) {
759+
path = path.makeQualified(this.uri, path);
760+
Preconditions.checkArgument(fileStatus.getPath().equals(path),
761+
String.format(
762+
"Filestatus path [%s] does not match with given path [%s]",
763+
fileStatus.getPath(), path));
764+
resourceType = fileStatus.isFile() ? FILE : DIRECTORY;
765+
contentLength = fileStatus.getLen();
766+
eTag = ((VersionedFileStatus) fileStatus).getVersion();
767+
} else {
768+
if (fileStatus != null) {
769+
LOG.warn(
770+
"Fallback to getPathStatus REST call as provided filestatus "
771+
+ "is not of type VersionedFileStatus");
772+
}
773+
AbfsHttpOperation op = client.getPathStatus(relativePath, false,
774+
tracingContext).getResult();
775+
resourceType = op.getResponseHeader(
776+
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
777+
contentLength = Long.parseLong(
778+
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
779+
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
780+
}
758781

759782
if (parseIsDirectory(resourceType)) {
760783
throw new AbfsRestOperationException(
761-
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
762-
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
763-
"openFileForRead must be used with files and not directories",
764-
null);
784+
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
785+
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
786+
"openFileForRead must be used with files and not directories",
787+
null);
765788
}
766789

767790
perfInfo.registerSuccess(true);
768791

769792
// Add statistics for InputStream
770-
return new AbfsInputStream(client, statistics,
771-
relativePath, contentLength,
772-
populateAbfsInputStreamContext(options),
773-
eTag, tracingContext);
793+
return new AbfsInputStream(client, statistics, relativePath,
794+
contentLength, populateAbfsInputStreamContext(
795+
parameters.map(OpenFileParameters::getOptions)),
796+
eTag, tracingContext);
774797
}
775798
}
776799

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
3939
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
4040
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
41+
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
4142
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
4243
import org.apache.hadoop.fs.azurebfs.services.AuthType;
4344
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
@@ -430,6 +431,15 @@ public AzureBlobFileSystemStore getAbfsStore(final AzureBlobFileSystem fs) {
430431
return fs.getAbfsStore();
431432
}
432433

434+
public AbfsClient getAbfsClient(final AzureBlobFileSystemStore abfsStore) {
435+
return abfsStore.getClient();
436+
}
437+
438+
public void setAbfsClient(AzureBlobFileSystemStore abfsStore,
439+
AbfsClient client) {
440+
abfsStore.setClient(client);
441+
}
442+
433443
public Path makeQualified(Path path) throws java.io.IOException {
434444
return getFileSystem().makeQualified(path);
435445
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,40 @@
1919
package org.apache.hadoop.fs.azurebfs.services;
2020

2121
import java.io.IOException;
22-
23-
import org.junit.Assert;
24-
import org.junit.Test;
2522
import java.util.Arrays;
23+
import java.util.Optional;
24+
import java.util.Random;
25+
import java.util.concurrent.ExecutionException;
2626

2727
import org.assertj.core.api.Assertions;
28+
import org.junit.Assert;
29+
import org.junit.Test;
30+
import org.mockito.Mockito;
2831

2932
import org.apache.hadoop.conf.Configuration;
3033
import org.apache.hadoop.fs.FileSystem;
3134
import org.apache.hadoop.fs.FSDataInputStream;
3235
import org.apache.hadoop.fs.FSDataOutputStream;
3336
import org.apache.hadoop.fs.FileStatus;
37+
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
3438
import org.apache.hadoop.fs.Path;
3539
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
3640
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
41+
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
3742
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
3843
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
3944
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
4045
import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
4146
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
47+
import org.apache.hadoop.fs.impl.OpenFileParameters;
4248

4349
import static org.mockito.ArgumentMatchers.any;
50+
import static org.mockito.ArgumentMatchers.anyBoolean;
51+
import static org.mockito.ArgumentMatchers.anyString;
4452
import static org.mockito.Mockito.doReturn;
4553
import static org.mockito.Mockito.doThrow;
4654
import static org.mockito.Mockito.mock;
55+
import static org.mockito.Mockito.spy;
4756
import static org.mockito.Mockito.times;
4857
import static org.mockito.Mockito.verify;
4958
import static org.mockito.Mockito.when;
@@ -192,6 +201,106 @@ public TestAbfsInputStream() throws Exception {
192201
ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
193202
}
194203

204+
private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException {
205+
AzureBlobFileSystem fs = getFileSystem();
206+
fs.create(testFile);
207+
FSDataOutputStream out = fs.append(testFile);
208+
out.write(buffer);
209+
out.close();
210+
}
211+
212+
private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus,
213+
byte[] buf, AbfsRestOperationType source)
214+
throws IOException, ExecutionException, InterruptedException {
215+
byte[] readBuf = new byte[buf.length];
216+
AzureBlobFileSystem fs = getFileSystem();
217+
FutureDataInputStreamBuilder builder = fs.openFile(path);
218+
builder.withFileStatus(fileStatus);
219+
FSDataInputStream in = builder.build().get();
220+
assertEquals(String.format(
221+
"Open with fileStatus [from %s result]: Incorrect number of bytes read",
222+
source), buf.length, in.read(readBuf));
223+
assertArrayEquals(String
224+
.format("Open with fileStatus [from %s result]: Incorrect read data",
225+
source), readBuf, buf);
226+
}
227+
228+
private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus,
229+
AzureBlobFileSystemStore abfsStore, AbfsClient mockClient,
230+
AbfsRestOperationType source, TracingContext tracingContext)
231+
throws IOException {
232+
233+
// verify GetPathStatus not invoked when FileStatus is provided
234+
abfsStore.openFileForRead(testFile, Optional
235+
.ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, tracingContext);
236+
verify(mockClient, times(0).description((String.format(
237+
"FileStatus [from %s result] provided, GetFileStatus should not be invoked",
238+
source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));
239+
240+
// verify GetPathStatus invoked when FileStatus not provided
241+
abfsStore.openFileForRead(testFile,
242+
Optional.empty(), null,
243+
tracingContext);
244+
verify(mockClient, times(1).description(
245+
"GetPathStatus should be invoked when FileStatus not provided"))
246+
.getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));
247+
248+
Mockito.reset(mockClient); //clears invocation count for next test case
249+
}
250+
251+
@Test
252+
public void testOpenFileWithOptions() throws Exception {
253+
AzureBlobFileSystem fs = getFileSystem();
254+
String testFolder = "/testFolder";
255+
Path smallTestFile = new Path(testFolder + "/testFile0");
256+
Path largeTestFile = new Path(testFolder + "/testFile1");
257+
fs.mkdirs(new Path(testFolder));
258+
int readBufferSize = getConfiguration().getReadBufferSize();
259+
byte[] smallBuffer = new byte[5];
260+
byte[] largeBuffer = new byte[readBufferSize + 5];
261+
new Random().nextBytes(smallBuffer);
262+
new Random().nextBytes(largeBuffer);
263+
writeBufferToNewFile(smallTestFile, smallBuffer);
264+
writeBufferToNewFile(largeTestFile, largeBuffer);
265+
266+
FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile),
267+
fs.getFileStatus(largeTestFile)};
268+
FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder));
269+
270+
// open with fileStatus from GetPathStatus
271+
verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0],
272+
smallBuffer, AbfsRestOperationType.GetPathStatus);
273+
verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1],
274+
largeBuffer, AbfsRestOperationType.GetPathStatus);
275+
276+
// open with fileStatus from ListStatus
277+
verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer,
278+
AbfsRestOperationType.ListPaths);
279+
verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer,
280+
AbfsRestOperationType.ListPaths);
281+
282+
// verify number of GetPathStatus invocations
283+
AzureBlobFileSystemStore abfsStore = getAbfsStore(fs);
284+
AbfsClient mockClient = spy(getAbfsClient(abfsStore));
285+
setAbfsClient(abfsStore, mockClient);
286+
TracingContext tracingContext = getTestTracingContext(fs, false);
287+
checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0],
288+
abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
289+
checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1],
290+
abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
291+
checkGetPathStatusCalls(smallTestFile, listStatusResults[0],
292+
abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
293+
checkGetPathStatusCalls(largeTestFile, listStatusResults[1],
294+
abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
295+
296+
// Verify with incorrect filestatus
297+
getFileStatusResults[0].setPath(new Path("wrongPath"));
298+
intercept(ExecutionException.class,
299+
() -> verifyOpenWithProvidedStatus(smallTestFile,
300+
getFileStatusResults[0], smallBuffer,
301+
AbfsRestOperationType.GetPathStatus));
302+
}
303+
195304
/**
196305
* This test expects AbfsInputStream to throw the exception that readAhead
197306
* thread received on read. The readAhead thread must be initiated from the

0 commit comments

Comments
 (0)