From b63f2d47ada6e842d8056699d60b835e815e853a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 21 Apr 2020 21:10:40 +0100 Subject: [PATCH 1/4] Tune openFile() 1. openFile()(path).opt(fs.s3a.input.length, len) to let caller specify file length. All we actually need 2. if a filestatus of a different type is passed in, we create an FS instance from it. 3. use the path of the command, not that of the filestatus. this helps viewfs support the API Change-Id: Ib9f0f98f701176124436aeaaa0360a426c257bc6 --- .../org/apache/hadoop/fs/s3a/Constants.java | 8 +++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 52 ++++++++++++++----- .../apache/hadoop/fs/s3a/S3AInputStream.java | 2 +- 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 22a0b45f1c7a5..d618eee09ac32 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -549,6 +549,14 @@ private Constants() { @InterfaceStability.Unstable public static final String INPUT_FADV_RANDOM = "random"; + /** + * What is the length of the file. + * This is used in open() to let the caller specify the length + * of the file. + */ + @InterfaceStability.Unstable + public static final String INPUT_OPTION_LENGTH = "fs.s3a.input.length"; + @InterfaceAudience.Private @InterfaceStability.Unstable public static final String S3_CLIENT_FACTORY_IMPL = diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 286df44939312..f8b90a058518c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1091,18 +1091,33 @@ private FSDataInputStream open( entryPoint(INVOCATION_OPEN); final Path path = qualify(file); - S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, - providedStatus); + S3AFileStatus fileStatus; + Configuration opts = options.orElse(null); + if (!providedStatus.isPresent() + && opts != null + && opts.get(INPUT_OPTION_LENGTH) != null) { + // build a minimal S3A FileStatus From the input length alone. + // this is all we actually need. + long length = opts.getLong(INPUT_OPTION_LENGTH, 0); + LOG.debug("Fixing length of file to read {} as {}", path, length); + fileStatus = new S3AFileStatus( + length, + 0, path, + getDefaultBlockSize(path), + username, null, null); + } else { + fileStatus = extractOrFetchSimpleFileStatus(path, + providedStatus); + } S3AReadOpContext readContext; - if (options.isPresent()) { - Configuration o = options.get(); + if (opts != null) { // normal path. Open the file with the chosen seek policy, if different // from the normal one. // and readahead. S3AInputPolicy policy = S3AInputPolicy.getPolicy( - o.get(INPUT_FADVISE, inputPolicy.toString())); - long readAheadRange2 = o.getLong(READAHEAD_RANGE, readAhead); + opts.get(INPUT_FADVISE, inputPolicy.toString())); + long readAheadRange2 = opts.getLong(READAHEAD_RANGE, readAhead); // TODO support change detection policy from options? readContext = createReadContext( fileStatus, @@ -1121,7 +1136,7 @@ private FSDataInputStream open( return new FSDataInputStream( new S3AInputStream( readContext, - createObjectAttributes(fileStatus), + createObjectAttributes(path, fileStatus), s3)); } @@ -1175,13 +1190,14 @@ private S3ObjectAttributes createObjectAttributes( /** * Create the attributes of an object for subsequent use. + * @param path path -this is used over the file status path. * @param fileStatus file status to build from. * @return attributes to use when building the query. */ private S3ObjectAttributes createObjectAttributes( - final S3AFileStatus fileStatus) { + final Path path, final S3AFileStatus fileStatus) { return createObjectAttributes( - fileStatus.getPath(), + path, fileStatus.getETag(), fileStatus.getVersionId(), fileStatus.getLen()); @@ -1507,7 +1523,9 @@ public S3ObjectAttributes createObjectAttributes(final Path path, @Override public S3ObjectAttributes createObjectAttributes( final S3AFileStatus fileStatus) { - return S3AFileSystem.this.createObjectAttributes(fileStatus); + return S3AFileSystem.this.createObjectAttributes( + fileStatus.getPath(), + fileStatus); } @Override @@ -4620,7 +4638,8 @@ private FSDataInputStream select(final Path source, // readahead range can be dynamically set long ra = options.getLong(READAHEAD_RANGE, readAhead); - S3ObjectAttributes objectAttributes = createObjectAttributes(fileStatus); + S3ObjectAttributes objectAttributes = createObjectAttributes( + path, fileStatus); S3AReadOpContext readContext = createReadContext(fileStatus, inputPolicy, changeDetectionPolicy, ra); @@ -4751,8 +4770,15 @@ public CompletableFuture openFileWithOptions( providedStatus); fileStatus = ((S3ALocatedFileStatus) providedStatus).toS3AFileStatus(); } else { - LOG.debug("Ignoring file status {}", providedStatus); - fileStatus = null; + // it is another type. + // build a status struct without etag or version. + LOG.debug("Converting file status {}", providedStatus); + fileStatus = new S3AFileStatus(providedStatus.getLen(), + providedStatus.getModificationTime(), + path, + getDefaultBlockSize(path), + username, + null, null); } } else { fileStatus = null; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 9c8b9ae7a156e..d868c45d9852b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -142,7 +142,7 @@ public S3AInputStream(S3AReadOpContext ctx, this.context = ctx; this.bucket = s3Attributes.getBucket(); this.key = s3Attributes.getKey(); - this.pathStr = ctx.dstFileStatus.getPath().toString(); + this.pathStr = s3Attributes.getPath().toString(); this.contentLength = l; this.client = client; this.uri = "s3a://" + this.bucket + "/" + this.key; From 61faeec378e1d5aa83b90143c47c44b4c329d264 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 2 Jun 2020 17:16:21 +0100 Subject: [PATCH 2/4] HADOOP-16202 Enhance S3A openFile() Change-Id: I9f0c11e2fa5cc0dbb84115b6824caf160e1abb62 --- .../org/apache/hadoop/fs/FileContext.java | 1 + .../java/org/apache/hadoop/fs/FileSystem.java | 1 + .../hadoop/fs/impl/AbstractFSBuilderImpl.java | 30 +++- .../hadoop/fs/impl/OpenFileParameters.java | 12 ++ .../filesystem/fsdatainputstreambuilder.md | 5 + .../org/apache/hadoop/fs/s3a/Constants.java | 8 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 66 ++------- .../hadoop/fs/s3a/impl/InternalConstants.java | 1 + .../hadoop/fs/s3a/impl/OpenFileHelper.java | 140 ++++++++++++++++++ 9 files changed, 202 insertions(+), 62 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileHelper.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index e5f4ef3809f18..c0a3c0ffb1038 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -2926,6 +2926,7 @@ public CompletableFuture build() throws IOException { final Path absF = fixRelativePart(getPath()); OpenFileParameters parameters = new OpenFileParameters() .withMandatoryKeys(getMandatoryKeys()) + .withOptionalKeys(getOptionalKeys()) .withOptions(getOptions()) .withBufferSize(getBufferSize()) .withStatus(getStatus()); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index ab5040486dffc..850e4d1b49f0c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -4643,6 +4643,7 @@ public CompletableFuture build() throws IOException { Optional optionalPath = getOptionalPath(); OpenFileParameters parameters = new OpenFileParameters() .withMandatoryKeys(getMandatoryKeys()) + .withOptionalKeys(getOptionalKeys()) .withOptions(getOptions()) .withBufferSize(getBufferSize()) .withStatus(super.getStatus()); // explicit to avoid IDE warnings diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java index 5fc92e97be76c..402bfbeaaeeb6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java @@ -42,18 +42,20 @@ /** * Builder for filesystem/filecontext operations of various kinds, * with option support. - * - * + *

+ *
  *   .opt("foofs:option.a", true)
  *   .opt("foofs:option.b", "value")
- *   .opt("barfs:cache", true)
+ *   .opt("fs.s3a.open.option.etag", "9fe4c37c25b")
  *   .must("foofs:cache", true)
  *   .must("barfs:cache-size", 256 * 1024 * 1024)
  *   .build();
- * 
+ * 
+ *

* * Configuration keys declared in an {@code opt()} may be ignored by * a builder which does not recognise them. + *

* * Configuration keys declared in a {@code must()} function set must * be understood by the implementation or a @@ -88,6 +90,9 @@ /** Keep track of the keys for mandatory options. */ private final Set mandatoryKeys = new HashSet<>(); + /** Keep track of the optional keys. */ + private final Set optionalKeys = new HashSet<>(); + /** * Constructor with both optional path and path handle. * Either or both argument may be empty, but it is an error for @@ -163,6 +168,7 @@ public PathHandle getPathHandle() { @Override public B opt(@Nonnull final String key, @Nonnull final String value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.set(key, value); return getThisBuilder(); } @@ -175,6 +181,7 @@ public B opt(@Nonnull final String key, @Nonnull final String value) { @Override public B opt(@Nonnull final String key, boolean value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setBoolean(key, value); return getThisBuilder(); } @@ -187,6 +194,7 @@ public B opt(@Nonnull final String key, boolean value) { @Override public B opt(@Nonnull final String key, int value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setInt(key, value); return getThisBuilder(); } @@ -199,6 +207,7 @@ public B opt(@Nonnull final String key, int value) { @Override public B opt(@Nonnull final String key, float value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setFloat(key, value); return getThisBuilder(); } @@ -211,6 +220,7 @@ public B opt(@Nonnull final String key, float value) { @Override public B opt(@Nonnull final String key, double value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setDouble(key, value); return getThisBuilder(); } @@ -223,6 +233,7 @@ public B opt(@Nonnull final String key, double value) { @Override public B opt(@Nonnull final String key, @Nonnull final String... values) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setStrings(key, values); return getThisBuilder(); } @@ -248,6 +259,7 @@ public B must(@Nonnull final String key, @Nonnull final String value) { @Override public B must(@Nonnull final String key, boolean value) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setBoolean(key, value); return getThisBuilder(); } @@ -260,6 +272,7 @@ public B must(@Nonnull final String key, boolean value) { @Override public B must(@Nonnull final String key, int value) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setInt(key, value); return getThisBuilder(); } @@ -272,6 +285,7 @@ public B must(@Nonnull final String key, int value) { @Override public B must(@Nonnull final String key, float value) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setFloat(key, value); return getThisBuilder(); } @@ -284,6 +298,7 @@ public B must(@Nonnull final String key, float value) { @Override public B must(@Nonnull final String key, double value) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setDouble(key, value); return getThisBuilder(); } @@ -296,6 +311,7 @@ public B must(@Nonnull final String key, double value) { @Override public B must(@Nonnull final String key, @Nonnull final String... values) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setStrings(key, values); return getThisBuilder(); } @@ -314,6 +330,12 @@ public Configuration getOptions() { public Set getMandatoryKeys() { return Collections.unmodifiableSet(mandatoryKeys); } + /** + * Get all the keys that are set as optional keys. + */ + public Set getOptionalKeys() { + return Collections.unmodifiableSet(optionalKeys); + } /** * Reject a configuration if one or more mandatory keys are diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java index 77b4ff52696a3..7ac1509fc9823 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java @@ -38,6 +38,9 @@ public class OpenFileParameters { */ private Set mandatoryKeys; + /** The optional keys. */ + private Set optionalKeys; + /** * Options set during the build sequence. */ @@ -61,6 +64,11 @@ public OpenFileParameters withMandatoryKeys(final Set keys) { return this; } + public OpenFileParameters withOptionalKeys(final Set keys) { + this.optionalKeys = requireNonNull(keys); + return this; + } + public OpenFileParameters withOptions(final Configuration opts) { this.options = requireNonNull(opts); return this; @@ -80,6 +88,10 @@ public Set getMandatoryKeys() { return mandatoryKeys; } + public Set getOptionalKeys() { + return optionalKeys; + } + public Configuration getOptions() { return options; } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md index eadba174fc1a6..a50f1ba427250 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md @@ -68,6 +68,11 @@ This is relevant with those stores which return version/etag information, including the S3A and ABFS connectors -they MAY use this to guarantee that the file they opened is exactly the one returned in the listing. +There is no requirement for `status.getPath()` to the resolved path of +the file being opened. This is needed to support viewfs and other mount-point/ +wrapper filesystems. The sole path declaring the file to open is supplied +in `openFile(path)`. + ### Set optional or mandatory parameters FSDataInputStreamBuilder opt(String key, ...) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index d618eee09ac32..f6217e3f4997e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -550,12 +550,12 @@ private Constants() { public static final String INPUT_FADV_RANDOM = "random"; /** - * What is the length of the file. - * This is used in open() to let the caller specify the length - * of the file. + * Declare the length of the file to open: {@value}. + * This is supported in openFile(). */ @InterfaceStability.Unstable - public static final String INPUT_OPTION_LENGTH = "fs.s3a.input.length"; + public static final String OPEN_OPTION_LENGTH + = "fs.s3a.open.option.length"; @InterfaceAudience.Private @InterfaceStability.Unstable diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index f8b90a058518c..58a9e2fd18579 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -104,8 +104,8 @@ import org.apache.hadoop.fs.s3a.impl.ContextAccessors; import org.apache.hadoop.fs.s3a.impl.CopyOutcome; import org.apache.hadoop.fs.s3a.impl.DeleteOperation; -import org.apache.hadoop.fs.s3a.impl.InternalConstants; import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport; +import org.apache.hadoop.fs.s3a.impl.OpenFileHelper; import org.apache.hadoop.fs.s3a.impl.OperationCallbacks; import org.apache.hadoop.fs.s3a.impl.RenameOperation; import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder; @@ -114,7 +114,6 @@ import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; import org.apache.hadoop.fs.s3a.impl.statistics.S3AMultipartUploaderStatisticsImpl; import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; -import org.apache.hadoop.fs.s3a.select.InternalSelectConstants; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.DelegationTokenIssuer; @@ -164,7 +163,6 @@ import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.hadoop.util.concurrent.HadoopExecutors; -import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Invoker.*; @@ -1091,14 +1089,15 @@ private FSDataInputStream open( entryPoint(INVOCATION_OPEN); final Path path = qualify(file); + S3AFileStatus fileStatus; Configuration opts = options.orElse(null); if (!providedStatus.isPresent() && opts != null - && opts.get(INPUT_OPTION_LENGTH) != null) { + && opts.get(OPEN_OPTION_LENGTH) != null) { // build a minimal S3A FileStatus From the input length alone. // this is all we actually need. - long length = opts.getLong(INPUT_OPTION_LENGTH, 0); + long length = opts.getLong(OPEN_OPTION_LENGTH, 0); LOG.debug("Fixing length of file to read {} as {}", path, length); fileStatus = new S3AFileStatus( length, @@ -4735,56 +4734,15 @@ public CompletableFuture openFileWithOptions( final Path rawPath, final OpenFileParameters parameters) throws IOException { final Path path = qualify(rawPath); - Configuration options = parameters.getOptions(); - Set mandatoryKeys = parameters.getMandatoryKeys(); - String sql = options.get(SelectConstants.SELECT_SQL, null); - boolean isSelect = sql != null; - // choice of keys depends on open type - if (isSelect) { - rejectUnknownMandatoryKeys( - mandatoryKeys, - InternalSelectConstants.SELECT_OPTIONS, - "for " + path + " in S3 Select operation"); - } else { - rejectUnknownMandatoryKeys( - mandatoryKeys, - InternalConstants.STANDARD_OPENFILE_KEYS, - "for " + path + " in non-select file I/O"); - } - FileStatus providedStatus = parameters.getStatus(); - S3AFileStatus fileStatus; - if (providedStatus != null) { - Preconditions.checkArgument(path.equals(providedStatus.getPath()), - "FileStatus parameter is not for the path %s: %s", - path, providedStatus); - if (providedStatus instanceof S3AFileStatus) { - // can use this status to skip our own probes, - // including etag and version. - LOG.debug("File was opened with a supplied S3AFileStatus;" - + " skipping getFileStatus call in open() operation: {}", - providedStatus); - fileStatus = (S3AFileStatus) providedStatus; - } else if (providedStatus instanceof S3ALocatedFileStatus) { - LOG.debug("File was opened with a supplied S3ALocatedFileStatus;" - + " skipping getFileStatus call in open() operation: {}", - providedStatus); - fileStatus = ((S3ALocatedFileStatus) providedStatus).toS3AFileStatus(); - } else { - // it is another type. - // build a status struct without etag or version. - LOG.debug("Converting file status {}", providedStatus); - fileStatus = new S3AFileStatus(providedStatus.getLen(), - providedStatus.getModificationTime(), - path, - getDefaultBlockSize(path), - username, - null, null); - } - } else { - fileStatus = null; - } - Optional ost = Optional.ofNullable(fileStatus); + Triple statusT = new OpenFileHelper() + .prepareToOpenFile( + path, parameters, username, + getDefaultBlockSize(path)); + boolean isSelect = statusT.getLeft(); + String sql = statusT.getRight(); + Optional ost = Optional.ofNullable(statusT.getMiddle()); CompletableFuture result = new CompletableFuture<>(); + Configuration options = parameters.getOptions(); if (!isSelect) { // normal path. unboundedThreadPool.submit(() -> diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 4d944a1721303..b26b0fe377cb4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -83,6 +83,7 @@ private InternalConstants() { Collections.unmodifiableSet( new HashSet<>( Arrays.asList(Constants.INPUT_FADVISE, + Constants.OPEN_OPTION_LENGTH, Constants.READAHEAD_RANGE))); /** 404 error code. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileHelper.java new file mode 100644 index 0000000000000..0ece47a1dc9fd --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileHelper.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.tuple.Triple; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.fs.s3a.select.InternalSelectConstants; +import org.apache.hadoop.fs.s3a.select.SelectConstants; + +import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys; +import static org.apache.hadoop.fs.s3a.Constants.OPEN_OPTION_LENGTH; + +/** + * Helper class for openFile() logic, especially processing file status + * args and length/etag/versionID. + * This has got complex enough it merited removal from S3AFS; there is + * an interface to call back into that where needed. + */ +public class OpenFileHelper { + + private static final Logger LOG = + LoggerFactory.getLogger(OpenFileHelper.class); + + /** + * Prepare to open a file from the openFile parameters. + * @param path path to the file + * @param parameters open file parameters from the builder. + * @param username username for fileStatus + * @param blockSize for fileStatus + * @return triple of (isSql, fileStatus?, sql?) + * @throws IOException failure to resolve the link. + * @throws IllegalArgumentException unknown mandatory key + */ + public Triple prepareToOpenFile( + final Path path, + final OpenFileParameters parameters, + final String username, + final long blockSize) throws IOException { + Configuration options = parameters.getOptions(); + Set mandatoryKeys = parameters.getMandatoryKeys(); + String sql = options.get(SelectConstants.SELECT_SQL, null); + boolean isSelect = sql != null; + // choice of keys depends on open type + if (isSelect) { + rejectUnknownMandatoryKeys( + mandatoryKeys, + InternalSelectConstants.SELECT_OPTIONS, + "for " + path + " in S3 Select operation"); + } else { + rejectUnknownMandatoryKeys( + mandatoryKeys, + InternalConstants.STANDARD_OPENFILE_KEYS, + "for " + path + " in non-select file I/O"); + } + FileStatus providedStatus = parameters.getStatus(); + S3AFileStatus fileStatus; + if (providedStatus != null) { + long len = providedStatus.getLen(); + long modTime = providedStatus.getModificationTime(); + String versionId; + String eTag; + // can use this status to skip our own probes, + LOG.debug("File was opened with a supplied FileStatus;" + + " skipping getFileStatus call in open() operation: {}", + + providedStatus); + if (providedStatus instanceof S3AFileStatus) { + // including etag and version. + S3AFileStatus st = (S3AFileStatus) providedStatus; + versionId = st.getVersionId(); + eTag = st.getETag(); + } else if (providedStatus instanceof S3ALocatedFileStatus) { + LOG.debug("File was opened with a supplied S3ALocatedFileStatus;" + + " skipping getFileStatus call in open() operation: {}", + providedStatus); + S3ALocatedFileStatus st + = (S3ALocatedFileStatus) providedStatus; + versionId = st.getVersionId(); + eTag = st.getETag(); + } else { + // it is another type. + // build a status struct without etag or version. + LOG.debug("Converting file status {}", providedStatus); + versionId = null; + eTag = null; + } + fileStatus = new S3AFileStatus( + len, + modTime, + path, + blockSize, + username, + eTag, + versionId); + } else if (options.get(OPEN_OPTION_LENGTH) != null) { + // build a minimal S3A FileStatus From the input length alone. + // this is all we actually need. + long length = options.getLong(OPEN_OPTION_LENGTH, 0); + LOG.debug("Fixing length of file to read {} as {}", path, length); + fileStatus = new S3AFileStatus( + length, + 0, + path, + blockSize, + username, + null, null); + } else { + fileStatus = null; + } + return Triple.of(isSelect, fileStatus, sql); + } + +} From 0e6c3fa72ce8d364ec0f6ef7f255f182296bc6c8 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 3 Jun 2020 20:17:09 +0100 Subject: [PATCH 3/4] HADOOP-16202. S3A openFile tuning This has the tests for the changes in the specification (path of supplied status is ignored), and for S3A enhancements, where the cost of the operations against the the raw store is used to determine whether or not any probes for a file were performed when opening it. The test also adds tests for where the file length is set in the option fs.s3a.open.option.length. -in a shorter file, read() will return -1 when the declared length is reached, not the actual. in a longer file, read() will return -1 once the real file length is exceeded. readFully() will raise an EOFException. +fix bug in CTU.readStream() which didn't close the stream after reading. Now, troublespot. I had to add new opt/must operations for long values. this is bad as -we already have an int one which we now overload. I'd delete that one except nothing anyone has already compiled will link. -you can't add a long value and have your code compile against an older version of the code. Not sure what the best action is there. Change-Id: I77ebaa07515f5e2e18fd610ce1f04ef286126f12 --- .../java/org/apache/hadoop/fs/FSBuilder.java | 14 +++ .../hadoop/fs/impl/AbstractFSBuilderImpl.java | 16 +++ .../fs/contract/AbstractContractOpenTest.java | 22 +++- .../hadoop/fs/contract/ContractTestUtils.java | 13 ++- .../fs/s3a/ITestS3AFileOperationCost.java | 101 ++++++++++++++++++ 5 files changed, 161 insertions(+), 5 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java index b7757a62e28ad..a4c7254cfeb3c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java @@ -61,6 +61,13 @@ public interface FSBuilder> { */ B opt(@Nonnull String key, float value); + /** + * Set optional long parameter for the Builder. + * + * @see #opt(String, String) + */ + B opt(@Nonnull String key, long value); + /** * Set optional double parameter for the Builder. * @@ -104,6 +111,13 @@ public interface FSBuilder> { */ B must(@Nonnull String key, float value); + /** + * Set mandatory long option. + * + * @see #must(String, String) + */ + B must(@Nonnull String key, long value); + /** * Set mandatory double option. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java index 402bfbeaaeeb6..4a7ca0c6a354f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java @@ -199,6 +199,14 @@ public B opt(@Nonnull final String key, int value) { return getThisBuilder(); } + @Override + public B opt(@Nonnull final String key, final long value) { + mandatoryKeys.remove(key); + optionalKeys.add(key); + options.setLong(key, value); + return getThisBuilder(); + } + /** * Set optional float parameter for the Builder. * @@ -277,6 +285,14 @@ public B must(@Nonnull final String key, int value) { return getThisBuilder(); } + @Override + public B must(@Nonnull final String key, final long value) { + mandatoryKeys.add(key); + optionalKeys.remove(key); + options.setLong(key, value); + return getThisBuilder(); + } + /** * Set mandatory float option. * diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java index a43053180fbf8..1203ac11a7edd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java @@ -280,13 +280,33 @@ public void testOpenFileApplyRead() throws Throwable { int len = 4096; createFile(fs, path, true, dataset(len, 0x40, 0x80)); + FileStatus st = fs.getFileStatus(path); CompletableFuture readAllBytes = fs.openFile(path) - .withFileStatus(fs.getFileStatus(path)) + .withFileStatus(st) .build() .thenApply(ContractTestUtils::readStream); assertEquals("Wrong number of bytes read value", len, (long) readAllBytes.get()); + // now reattempt with a new FileStatus and a different path. + // implementations MUST use path in openFile() call + FileStatus st2 = new FileStatus( + len, false, + st.getReplication(), + st.getBlockSize(), + st.getModificationTime(), + st.getAccessTime(), + st.getPermission(), + st.getOwner(), + st.getGroup(), + new Path("gopher:///")); + assertEquals("Wrong number of bytes read value", + len, + (long) fs.openFile(path) + .withFileStatus(st2) + .build() + .thenApply(ContractTestUtils::readStream) + .get()); } @Test diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 4789630f95f1c..3ee0a17098386 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -1565,17 +1565,22 @@ public static int read(InputStream in) { /** * Read a whole stream; downgrades an IOE to a runtime exception. + * Closes the stream afterwards. * @param in input * @return the number of bytes read. * @throws AssertionError on any IOException */ public static long readStream(InputStream in) { - long count = 0; + try { + long count = 0; - while (read(in) >= 0) { - count++; + while (read(in) >= 0) { + count++; + } + return count; + } finally { + IOUtils.cleanupWithLogger(LOG, in); } - return count; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java index e54fd97a6af1e..8d02328e35c88 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -33,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.EOFException; import java.io.File; import java.io.FileNotFoundException; import java.net.URI; @@ -41,8 +43,12 @@ import java.util.EnumSet; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL; +import static org.apache.hadoop.fs.s3a.Constants.OPEN_OPTION_LENGTH; import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL; import static org.apache.hadoop.fs.s3a.Statistic.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; @@ -688,4 +694,99 @@ public void testCostOfGlobStatusNoSymlinkResolution() throws Throwable { // no additional operations from symlink resolution verifyOperationCount(2, 2); } + + /** + * Test when openFile() performs GET requests when file status + * and length options are passed down. + * Note that the input streams only update the FS statistics + * in close(), so metrics cannot be verified until all operations + * on a stream are complete. + * This is slightly less than ideal. + */ + @Test + public void testOpenFileCost() throws Throwable { + describe("Test cost of openFile with/without status; raw only"); + S3AFileSystem fs = getFileSystem(); + assume("Unguarded FS only", !fs.hasMetadataStore()); + Path testFile = path("testOpenFileCost"); + + writeTextFile(fs, testFile, "openfile", true); + FileStatus st = fs.getFileStatus(testFile); + + // now read that file back in using the openFile call. + // with a new FileStatus and a different path. + // this verifies that any FileStatus class/subclass is used + // as a source of the file length. + long len = st.getLen(); + FileStatus st2 = new FileStatus( + len, false, + st.getReplication(), + st.getBlockSize(), + st.getModificationTime(), + st.getAccessTime(), + st.getPermission(), + st.getOwner(), + st.getGroup(), + new Path("gopher:///")); + resetMetricDiffs(); + // measure GET requests issued. + MetricDiff getRequests = new MetricDiff(fs, STREAM_OPENED); + + final CompletableFuture future = + fs.openFile(testFile) + .withFileStatus(st) + .build(); + FSDataInputStream in = future.get(); + verifyOperationCount(0, 0); + getRequests.assertDiffEquals(0); + + long readLen = readStream(in); + assertEquals("bytes read from file", len, readLen); + verifyOperationCount(0, 0); + getRequests.assertDiffEquals(1); + getRequests.reset(); + resetMetricDiffs(); + + // do a second read with the length declared as short. + // we now expect the bytes read to be shorter. + MetricDiff bytesDiscarded = + new MetricDiff(fs, STREAM_CLOSE_BYTES_READ); + int offset = 2; + long shortLen = len - offset; + CompletableFuture f2 = fs.openFile(testFile) + .must(INPUT_FADVISE, INPUT_FADV_SEQUENTIAL) + .must(OPEN_OPTION_LENGTH, shortLen) + .build(); + verifyOperationCount(0, 0); + getRequests.assertDiffEquals(0); + FSDataInputStream in2 = f2.get(); + long r2 = readStream(in2); + verifyOperationCount(0, 0); + getRequests.assertDiffEquals(1); + assertEquals("bytes read from file", shortLen, r2); + // the read has been ranged + bytesDiscarded.assertDiffEquals(0); + + // final read past EOF + getRequests.reset(); + + long longLen = len + 10; + FSDataInputStream in3 = fs.openFile(testFile) + .must(INPUT_FADVISE, INPUT_FADV_SEQUENTIAL) + .must(OPEN_OPTION_LENGTH, longLen) + .build() + .get(); + byte[] out = new byte[(int) longLen]; + intercept(EOFException.class, + () -> in3.readFully(0, out)); + in3.seek(longLen - 1); + assertEquals("read past real EOF on " + in3, + -1, in3.read()); + in3.close(); + // two GET calls were made, one for readFully, + // the second on the read() past the EOF + // the operation has got as far as S3 + getRequests.assertDiffEquals(2); + + } } From 277d22ad5b5093a78313046c3a855521be248fa3 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 23 Jul 2020 20:11:23 +0100 Subject: [PATCH 4/4] HADOOP-16202 Enhanced openFile * the file length/seek policy are now defined in hadoop common and the specification, including evolution policy for seek. * S3A FS uses the new options, retaining support for the older s3a seek policy * moved as much as possible into S3AOpenFileHelper * which has unit tests to verify its extraction of properties and file status. Change-Id: I6981394c48583c33b706f9b5eb6c58c5ca078715 --- .../java/org/apache/hadoop/fs/FileSystem.java | 2 +- .../FutureDataInputStreamBuilderImpl.java | 2 +- .../hadoop/fs/impl/OpenFileParameters.java | 46 +++ .../filesystem/fsdatainputstreambuilder.md | 158 +++++++++- .../fs/contract/AbstractContractOpenTest.java | 6 +- .../org/apache/hadoop/fs/s3a/Constants.java | 14 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 135 ++++---- .../apache/hadoop/fs/s3a/S3AInputPolicy.java | 2 + .../hadoop/fs/s3a/impl/InternalConstants.java | 22 +- .../hadoop/fs/s3a/impl/OpenFileHelper.java | 140 --------- .../hadoop/fs/s3a/impl/S3AOpenFileHelper.java | 287 ++++++++++++++++++ .../s3a/select/InternalSelectConstants.java | 2 +- .../fs/contract/s3a/ITestS3AContractOpen.java | 66 ++++ .../fs/s3a/ITestS3AFileOperationCost.java | 16 +- .../fs/s3a/impl/TestOpenFileHelper.java | 250 +++++++++++++++ 15 files changed, 885 insertions(+), 263 deletions(-) delete mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileHelper.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AOpenFileHelper.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileHelper.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 850e4d1b49f0c..8345b10ef4340 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -4536,7 +4536,7 @@ protected CompletableFuture openFileWithOptions( final OpenFileParameters parameters) throws IOException { AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( parameters.getMandatoryKeys(), - Collections.emptySet(), ""); + OpenFileParameters.STANDARD_OPTIONS, ""); CompletableFuture result = new CompletableFuture<>(); try { result.complete(open(pathHandle, parameters.getBufferSize())); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java index 24a8d49747fe6..04c84d5595196 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java @@ -148,7 +148,7 @@ public FutureDataInputStreamBuilder getThisBuilder() { @Override public FutureDataInputStreamBuilder withFileStatus(FileStatus st) { - this.status = requireNonNull(st, "status"); + this.status = st; return this; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java index 7ac1509fc9823..6e86f8f7afa51 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs.impl; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -103,4 +105,48 @@ public int getBufferSize() { public FileStatus getStatus() { return status; } + + + /** + * OpenFile option for seek policies: {@value}. + */ + public static final String FS_OPT_OPENFILE_FADVISE = + "fs.opt.openfile.fadvise"; + + /** + * fadvise policy: {@value}. + */ + public static final String FS_OPT_OPENFILE_FADVISE_NORMAL = "normal"; + + /** + * fadvise policy: {@value}. + */ + public static final String FS_OPT_OPENFILE_FADVISE_SEQUENTIAL = "sequential"; + + /** + * fadvise policy: {@value}. + */ + public static final String FS_OPT_OPENFILE_FADVISE_RANDOM = "random"; + + /** + * fadvise policy: {@value}. + */ + public static final String FS_OPT_OPENFILE_FADVISE_ADAPTIVE = "adaptive"; + + /** + * OpenFile option for seek policies: {@value}. + */ + public static final String FS_OPT_OPENFILE_LENGTH = + "fs.opt.openfile.length"; + + /** + * Set of standard options. + */ + public static final Set STANDARD_OPTIONS = + Stream.of( + FS_OPT_OPENFILE_FADVISE, + FS_OPT_OPENFILE_LENGTH) + .collect(Collectors.toSet()); + + } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md index a50f1ba427250..11c6f673f9855 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md @@ -65,13 +65,29 @@ If a filesystem implementation extends the `FileStatus` returned in its implementation MAY use this information when opening the file. This is relevant with those stores which return version/etag information, -including the S3A and ABFS connectors -they MAY use this to guarantee that -the file they opened is exactly the one returned in the listing. +-they MAY use this to guarantee that the file they opened +is exactly the one returned in the listing. + + +The final `status.getPath().getName()` element of the supplied status MUST equal +the name value of the path supplied to the `openFile(path)` call. + +Filesystems MUST NOT validate the rest of the path. +This is needed to support viewfs and other mount-point wrapper filesystems +where schemas and paths are different. These often create their own FileStatus results + +Preconditions + +```python +status == null or status.getPath().getName() == path.getName() + +``` + +Filesystems MUST NOT require the class of `status` to equal +that of any specific subclass their implementation returns in filestatus/list +operations. This is to support wrapper filesystems and serialization/deserialization +of the status. -There is no requirement for `status.getPath()` to the resolved path of -the file being opened. This is needed to support viewfs and other mount-point/ -wrapper filesystems. The sole path declaring the file to open is supplied -in `openFile(path)`. ### Set optional or mandatory parameters @@ -82,15 +98,22 @@ Set optional or mandatory parameters to the builder. Using `opt()` or `must()`, client can specify FS-specific parameters without inspecting the concrete type of `FileSystem`. +Example: + ```java out = fs.openFile(path) - .opt("fs.s3a.experimental.input.fadvise", "random") - .must("fs.s3a.readahead.range", 256 * 1024) + .must("fs.opt.openfile.fadvise", "random") + .opt("fs.http.connection.timeout", 30_000L) .withFileStatus(statusFromListing) .build() .get(); ``` +Here the seek policy of `random` has been specified, +with the requirement that the filesystem implementation must understand the option. +And s3a-specific option has been supplied which may be interpreted by any store; +the expectation is that the S3A connector will recognize it. + #### Implementation Notes Checking for supported options must be performed in the `build()` operation. @@ -156,3 +179,122 @@ evaluation, and so will surface when the future is completed: ```java FSDataInputStream in = future.get(); ``` + +## Standard options + +Individual filesystems MAY provide their own set of options for use in the builder. + +There are also some standard options with common names, all of which begin with +`fs.opt.openfile` + +| Name | Type | Description | +|------|------|-------------| +| `fs.opt.openfile.fadvise` | string | seek policy | +| `fs.opt.openfile.length` | long | file length | + +These policies are *not* declared as constants in +public `org.apache.hadoop` classes/interfaces, so as to avoid +applications being unable to link to a specific hadoop release +without that standard option. + +(Implementors: use `org.apache.hadoop.fs.impl.OpenFileParameters`.) + +### Seek Policy Option `fs.opt.openfile.fadvise` + +This is a hint as to what the expected read pattern of an input stream will be. +"sequential" -read a file from start to finish. "Random": client will be +reading data in different parts of the file using a sequence of `seek()/read()` +or via the `PositionedReadable` or `ByteBufferPositionedReadable` APIs. + +Sequential reads may be optimized with prefetching data and/or reading data in larger blocks. +In contrast, random IO performance may be best if little/no prefetching takes place, along +with other possible optimizations. +Queries over columnar formats such as Apach ORC and Apache Parquet +perform such random IO; other data formats are best for sequential reads. +Some applications (e.g. distCp) perform sequential IO even over columnar data. + +What is key is that optimizing reads for seqential reads may impair random performance +-and vice versa. + +Allowing applications to hint what their read policy is allows the filesystem clients +to optimize their interaction with the underlying data stores. + +1. The seek policy is a hint; even if declared as a `must()` option, the filesystem +MAY ignore it. +1. The interpretation/implementation of a policy is a filesystem specific behavior +-and it may change with Hadoop releases and/or specific storage subsystems. +1. If a policy is not recognized, the FileSystem MUST ignore it and revert to +whichever policy is active for that specific FileSystem instance. + +| Policy | Meaning | +|--------|---------| +| `normal` | Default policy for the filesystem | +| `sequential` | Optimize for sequential IO | +| `random` | Optimize for random IO | +| `adaptive` | Adapt seek policy based on read patterns | + + +#### Seek Policy `normal` + +The default policy for the filesystem instance. +Implementation/installation-specific + +#### Seek Policy `sequential` + +Expect sequential reads from the first byte read to the end of the file/until the stream is closed. + + +#### Seek Policy `random` + +Expect `seek()/read()` sequences, or use of `PositionedReadable` or `ByteBufferPositionedReadable` APIs. + +#### Seek Policy `adaptive` + +Try to adapt the seek policy to the read pattern of the application. + +The `normal` policy of the S3A client and the sole policy supported by the `wasb:` client are both +adaptive -they assume sequential IO, but once a backwards seek/positioned read call is made +the stream switches to random IO. + +Other filesystem implementations may wish to adopt similar strategies, and/or extend +the algorithms to detect forward seeks and/or switch from random to sequential IO if +that is considered more efficient. + +Adaptive seek policies have proven effective in the absence of the ability to declare +the seek policy in the `open()` API, so requiring it to be declared, if configurable, +in the cluster/application configuration. However, the switch from sequential to +random seek policies may + +When applications explicitly set the `fs.opt.openfile.fadvise` option, if they +know their read plan, they SHOULD declare which policy is most appropriate. + +_Implementor's Notes_ + +If a policy is unrecognized/unsupported: SHOULD log it and then MUST ignore the option. This is +to support future evolution of policies and implementations + + +_Futher reading_ + +* [Linux fadvise()](https://linux.die.net/man/2/fadvise). +* [Windows `CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior) + +### File Length hint `fs.opt.openfile.length` + +Declare that a file is expected to be *at least as long as the specified length* + +This can be used by clients to skip querying a remote store for the size of/existence of a file when +opening it, similar to declaring a file status through the `withFileStatus()` option. + +The supplied length MAY be shorter than the actual file. + +This allows worker tasks which only know of of their task's split to declare that as the +length of the file. + +If this option is used by the FileSystem implementation + +* A `length` < 0 MUST be rejected. +* If a file status is supplied along with a value in `fs.opt.openfile.length`; the file status +values take precedence. +* seek/read calls past the length value MAY be rejected, equally: they MAY be accepted. It is simply a hint. + diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java index 1203ac11a7edd..effae0382d9af 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java @@ -263,6 +263,7 @@ public void testChainedFailureAwaitFuture() throws Throwable { describe("await Future handles chained failures"); CompletableFuture f = getFileSystem() .openFile(path("testOpenFileUnknownOption")) + .withFileStatus(null) .build(); intercept(RuntimeException.class, "exceptionally", @@ -288,7 +289,8 @@ public void testOpenFileApplyRead() throws Throwable { assertEquals("Wrong number of bytes read value", len, (long) readAllBytes.get()); - // now reattempt with a new FileStatus and a different path. + // now reattempt with a new FileStatus and a different path + // other than the final name element // implementations MUST use path in openFile() call FileStatus st2 = new FileStatus( len, false, @@ -299,7 +301,7 @@ public void testOpenFileApplyRead() throws Throwable { st.getPermission(), st.getOwner(), st.getGroup(), - new Path("gopher:///")); + new Path("gopher:///localhost:/" + path.getName())); assertEquals("Wrong number of bytes read value", len, (long) fs.openFile(path) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index f6217e3f4997e..609e8efc4a5a2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_LENGTH; + /** * All the constants used with the {@link S3AFileSystem}. * @@ -522,7 +524,6 @@ private Constants() { * reading data. * Value: {@value} */ - @InterfaceStability.Unstable public static final String INPUT_FADVISE = "fs.s3a.experimental.input.fadvise"; @@ -530,14 +531,12 @@ private Constants() { * General input. Some seeks, some reads. * Value: {@value} */ - @InterfaceStability.Unstable public static final String INPUT_FADV_NORMAL = "normal"; /** * Optimized for sequential access. * Value: {@value} */ - @InterfaceStability.Unstable public static final String INPUT_FADV_SEQUENTIAL = "sequential"; /** @@ -546,17 +545,8 @@ private Constants() { * more efficient {@code seek()} operations. * Value: {@value} */ - @InterfaceStability.Unstable public static final String INPUT_FADV_RANDOM = "random"; - /** - * Declare the length of the file to open: {@value}. - * This is supported in openFile(). - */ - @InterfaceStability.Unstable - public static final String OPEN_OPTION_LENGTH - = "fs.s3a.open.option.length"; - @InterfaceAudience.Private @InterfaceStability.Unstable public static final String S3_CLIENT_FACTORY_IMPL = diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 58a9e2fd18579..1532c42537bf0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -105,7 +105,7 @@ import org.apache.hadoop.fs.s3a.impl.CopyOutcome; import org.apache.hadoop.fs.s3a.impl.DeleteOperation; import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport; -import org.apache.hadoop.fs.s3a.impl.OpenFileHelper; +import org.apache.hadoop.fs.s3a.impl.S3AOpenFileHelper; import org.apache.hadoop.fs.s3a.impl.OperationCallbacks; import org.apache.hadoop.fs.s3a.impl.RenameOperation; import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder; @@ -291,6 +291,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private final S3AFileSystem.OperationCallbacksImpl operationCallbacks = new OperationCallbacksImpl(); + /** + * Helper for the openFile() method. + */ + private S3AOpenFileHelper openFileHelper; + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -453,6 +458,12 @@ public void initialize(URI name, Configuration originalConf) pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE, BULK_DELETE_PAGE_SIZE_DEFAULT, 0); + // now the open file logic + openFileHelper = new S3AOpenFileHelper( + inputPolicy, + changeDetectionPolicy, + readAhead, + username); } catch (AmazonClientException e) { // amazon client exception: stop all services then throw the translation stopAllServices(); @@ -1067,71 +1078,33 @@ protected URI canonicalizeUri(URI rawUri) { @Retries.RetryTranslated public FSDataInputStream open(Path f, int bufferSize) throws IOException { - return open(f, Optional.empty(), Optional.empty()); + entryPoint(INVOCATION_OPEN); + return open(qualify(f), openFileHelper.openSimpleFile()); } /** * Opens an FSDataInputStream at the indicated Path. - * if status contains an S3AFileStatus reference, it is used - * and so a HEAD request to the store is avoided. + * if fileInformation contains an FileStatus reference, + * then that is used to avoid a check for the file length/existence * - * @param file the file to open - * @param options configuration options if opened with the builder API. - * @param providedStatus optional file status. + * @param path the file to open + * @param fileInformation information about the file to open * @throws IOException IO failure. */ @Retries.RetryTranslated private FSDataInputStream open( - final Path file, - final Optional options, - final Optional providedStatus) + final Path path, + final S3AOpenFileHelper.OpenFileInformation fileInformation) throws IOException { - entryPoint(INVOCATION_OPEN); - final Path path = qualify(file); - - S3AFileStatus fileStatus; - Configuration opts = options.orElse(null); - if (!providedStatus.isPresent() - && opts != null - && opts.get(OPEN_OPTION_LENGTH) != null) { - // build a minimal S3A FileStatus From the input length alone. - // this is all we actually need. - long length = opts.getLong(OPEN_OPTION_LENGTH, 0); - LOG.debug("Fixing length of file to read {} as {}", path, length); - fileStatus = new S3AFileStatus( - length, - 0, path, - getDefaultBlockSize(path), - username, null, null); - } else { - fileStatus = extractOrFetchSimpleFileStatus(path, - providedStatus); - } - - S3AReadOpContext readContext; - if (opts != null) { - // normal path. Open the file with the chosen seek policy, if different - // from the normal one. - // and readahead. - S3AInputPolicy policy = S3AInputPolicy.getPolicy( - opts.get(INPUT_FADVISE, inputPolicy.toString())); - long readAheadRange2 = opts.getLong(READAHEAD_RANGE, readAhead); - // TODO support change detection policy from options? - readContext = createReadContext( - fileStatus, - policy, - changeDetectionPolicy, - readAheadRange2); - } else { - readContext = createReadContext( - fileStatus, - inputPolicy, - changeDetectionPolicy, - readAhead); - } + final S3AFileStatus fileStatus = + extractOrFetchSimpleFileStatus(path, fileInformation); + S3AReadOpContext readContext = createReadContext( + fileStatus, + fileInformation.getInputPolicy(), + fileInformation.getChangePolicy(), + fileInformation.getReadAheadRange()); LOG.debug("Opening '{}'", readContext); - return new FSDataInputStream( new S3AInputStream( readContext, @@ -4624,25 +4597,27 @@ protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) { * @throws IOException IO failure */ @Retries.RetryTranslated - private FSDataInputStream select(final Path source, - final String expression, + private FSDataInputStream select(final Path path, final Configuration options, - final Optional providedStatus) + final S3AOpenFileHelper.OpenFileInformation fileInformation) throws IOException { entryPoint(OBJECT_SELECT_REQUESTS); - requireSelectSupport(source); - final Path path = makeQualified(source); + requireSelectSupport(path); + String expression = fileInformation.getSql(); final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, - providedStatus); + fileInformation); // readahead range can be dynamically set - long ra = options.getLong(READAHEAD_RANGE, readAhead); S3ObjectAttributes objectAttributes = createObjectAttributes( path, fileStatus); - S3AReadOpContext readContext = createReadContext(fileStatus, inputPolicy, - changeDetectionPolicy, ra); + ChangeDetectionPolicy changePolicy = fileInformation.getChangePolicy(); + S3AReadOpContext readContext = createReadContext( + fileStatus, + fileInformation.getInputPolicy(), + changePolicy, + fileInformation.getReadAheadRange()); - if (changeDetectionPolicy.getSource() != ChangeDetectionPolicy.Source.None + if (changePolicy.getSource() != ChangeDetectionPolicy.Source.None && fileStatus.getETag() != null) { // if there is change detection, and the status includes at least an // etag, @@ -4654,7 +4629,7 @@ private FSDataInputStream select(final Path source, // version in the final read; nor can we check the etag match) ChangeTracker changeTracker = new ChangeTracker(uri.toString(), - changeDetectionPolicy, + changePolicy, readContext.instrumentation.newInputStreamStatistics() .getVersionMismatchCounter(), objectAttributes); @@ -4687,21 +4662,20 @@ private void requireSelectSupport(final Path source) throws } /** - * Extract the status from the optional parameter, querying - * S3Guard/s3 if it is absent. + * Get the status for the file, querying + * S3Guard/s3 if there is none in the fileInformation parameter. * @param path path of the status - * @param optStatus optional status + * @param fileInformation information on the file to open * @return a file status * @throws FileNotFoundException if there is no normal file at that path * @throws IOException IO failure */ private S3AFileStatus extractOrFetchSimpleFileStatus( - final Path path, final Optional optStatus) + final Path path, + final S3AOpenFileHelper.OpenFileInformation fileInformation) throws IOException { - S3AFileStatus fileStatus; - if (optStatus.isPresent()) { - fileStatus = optStatus.get(); - } else { + S3AFileStatus fileStatus = fileInformation.getStatus(); + if (fileStatus == null) { // this looks at S3guard and gets any type of status back, // if it falls back to S3 it does a HEAD only. // therefore: if there is no S3Guard and there is a dir, this @@ -4734,20 +4708,19 @@ public CompletableFuture openFileWithOptions( final Path rawPath, final OpenFileParameters parameters) throws IOException { final Path path = qualify(rawPath); - Triple statusT = new OpenFileHelper() - .prepareToOpenFile( - path, parameters, username, + S3AOpenFileHelper.OpenFileInformation fileInformation = + openFileHelper.prepareToOpenFile( + path, + parameters, getDefaultBlockSize(path)); - boolean isSelect = statusT.getLeft(); - String sql = statusT.getRight(); - Optional ost = Optional.ofNullable(statusT.getMiddle()); + boolean isSelect = fileInformation.isSql(); CompletableFuture result = new CompletableFuture<>(); Configuration options = parameters.getOptions(); if (!isSelect) { // normal path. unboundedThreadPool.submit(() -> LambdaUtils.eval(result, - () -> open(path, Optional.of(options), ost))); + () -> open(path, fileInformation))); } else { // it is a select statement. // fail fast if the operation is not available @@ -4755,7 +4728,7 @@ public CompletableFuture openFileWithOptions( // submit the query unboundedThreadPool.submit(() -> LambdaUtils.eval(result, - () -> select(path, sql, options, ost))); + () -> select(path, options, fileInformation))); } return result; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java index c018410e11767..bc726fea4eb95 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java @@ -25,6 +25,7 @@ import java.util.Locale; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE_ADAPTIVE; import static org.apache.hadoop.fs.s3a.Constants.*; /** @@ -62,6 +63,7 @@ public static S3AInputPolicy getPolicy(String name) { String trimmed = name.trim().toLowerCase(Locale.ENGLISH); switch (trimmed) { case INPUT_FADV_NORMAL: + case FS_OPT_OPENFILE_FADVISE_ADAPTIVE: return Normal; case INPUT_FADV_RANDOM: return Random; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index b26b0fe377cb4..caed3dcc46e40 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -18,16 +18,18 @@ package org.apache.hadoop.fs.s3a.impl; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.s3a.Constants; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_LENGTH; + /** * Internal constants private only to the S3A codebase. * Please don't refer to these outside of this module & its tests. @@ -79,12 +81,14 @@ private InternalConstants() { * used becomes that of the select operation. */ @InterfaceStability.Unstable - public static final Set STANDARD_OPENFILE_KEYS = - Collections.unmodifiableSet( - new HashSet<>( - Arrays.asList(Constants.INPUT_FADVISE, - Constants.OPEN_OPTION_LENGTH, - Constants.READAHEAD_RANGE))); + public static final Set S3A_OPENFILE_KEYS = + Stream.of( + FS_OPT_OPENFILE_FADVISE, + FS_OPT_OPENFILE_LENGTH, + Constants.INPUT_FADVISE, + Constants.READAHEAD_RANGE) + .collect(Collectors.toSet()); + /** 404 error code. */ public static final int SC_404 = 404; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileHelper.java deleted file mode 100644 index 0ece47a1dc9fd..0000000000000 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileHelper.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.impl; - -import java.io.IOException; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.lang3.tuple.Triple; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.impl.OpenFileParameters; -import org.apache.hadoop.fs.s3a.S3AFileStatus; -import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; -import org.apache.hadoop.fs.s3a.select.InternalSelectConstants; -import org.apache.hadoop.fs.s3a.select.SelectConstants; - -import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys; -import static org.apache.hadoop.fs.s3a.Constants.OPEN_OPTION_LENGTH; - -/** - * Helper class for openFile() logic, especially processing file status - * args and length/etag/versionID. - * This has got complex enough it merited removal from S3AFS; there is - * an interface to call back into that where needed. - */ -public class OpenFileHelper { - - private static final Logger LOG = - LoggerFactory.getLogger(OpenFileHelper.class); - - /** - * Prepare to open a file from the openFile parameters. - * @param path path to the file - * @param parameters open file parameters from the builder. - * @param username username for fileStatus - * @param blockSize for fileStatus - * @return triple of (isSql, fileStatus?, sql?) - * @throws IOException failure to resolve the link. - * @throws IllegalArgumentException unknown mandatory key - */ - public Triple prepareToOpenFile( - final Path path, - final OpenFileParameters parameters, - final String username, - final long blockSize) throws IOException { - Configuration options = parameters.getOptions(); - Set mandatoryKeys = parameters.getMandatoryKeys(); - String sql = options.get(SelectConstants.SELECT_SQL, null); - boolean isSelect = sql != null; - // choice of keys depends on open type - if (isSelect) { - rejectUnknownMandatoryKeys( - mandatoryKeys, - InternalSelectConstants.SELECT_OPTIONS, - "for " + path + " in S3 Select operation"); - } else { - rejectUnknownMandatoryKeys( - mandatoryKeys, - InternalConstants.STANDARD_OPENFILE_KEYS, - "for " + path + " in non-select file I/O"); - } - FileStatus providedStatus = parameters.getStatus(); - S3AFileStatus fileStatus; - if (providedStatus != null) { - long len = providedStatus.getLen(); - long modTime = providedStatus.getModificationTime(); - String versionId; - String eTag; - // can use this status to skip our own probes, - LOG.debug("File was opened with a supplied FileStatus;" - + " skipping getFileStatus call in open() operation: {}", - - providedStatus); - if (providedStatus instanceof S3AFileStatus) { - // including etag and version. - S3AFileStatus st = (S3AFileStatus) providedStatus; - versionId = st.getVersionId(); - eTag = st.getETag(); - } else if (providedStatus instanceof S3ALocatedFileStatus) { - LOG.debug("File was opened with a supplied S3ALocatedFileStatus;" - + " skipping getFileStatus call in open() operation: {}", - providedStatus); - S3ALocatedFileStatus st - = (S3ALocatedFileStatus) providedStatus; - versionId = st.getVersionId(); - eTag = st.getETag(); - } else { - // it is another type. - // build a status struct without etag or version. - LOG.debug("Converting file status {}", providedStatus); - versionId = null; - eTag = null; - } - fileStatus = new S3AFileStatus( - len, - modTime, - path, - blockSize, - username, - eTag, - versionId); - } else if (options.get(OPEN_OPTION_LENGTH) != null) { - // build a minimal S3A FileStatus From the input length alone. - // this is all we actually need. - long length = options.getLong(OPEN_OPTION_LENGTH, 0); - LOG.debug("Fixing length of file to read {} as {}", path, length); - fileStatus = new S3AFileStatus( - length, - 0, - path, - blockSize, - username, - null, null); - } else { - fileStatus = null; - } - return Triple.of(isSelect, fileStatus, sql); - } - -} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AOpenFileHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AOpenFileHelper.java new file mode 100644 index 0000000000000..bb467613e29ad --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AOpenFileHelper.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.fs.s3a.select.InternalSelectConstants; +import org.apache.hadoop.fs.s3a.select.SelectConstants; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; + +/** + * Helper class for openFile() logic, especially processing file status + * args and length/etag/versionID. + *

+ * This got complex enough it merited removal from S3AFS -which + * also permits unit testing. + */ + public class S3AOpenFileHelper { + + private static final Logger LOG = + LoggerFactory.getLogger(S3AOpenFileHelper.class); + + private final S3AInputPolicy inputPolicy; + + private final ChangeDetectionPolicy changePolicy; + + private final long readAheadRange; + + private final String username; + + /** + * Simple file information is created in advance as it is always + * the same. + */ + private final OpenFileInformation simpleFileInformation; + + + /** + * Instantiate with the default options from the filesystem. + * @param inputPolicy input policy + * @param changePolicy change detection policy + * @param readAheadRange read ahead range + * @param username username + */ + public S3AOpenFileHelper( + final S3AInputPolicy inputPolicy, + final ChangeDetectionPolicy changePolicy, + final long readAheadRange, + final String username) { + this.inputPolicy = inputPolicy; + this.changePolicy = changePolicy; + this.readAheadRange = readAheadRange; + this.username = username; + + simpleFileInformation = new OpenFileInformation(false, null, null, + inputPolicy, changePolicy, readAheadRange); + } + + + /** + * The information on a file needed to open it. + */ + public static final class OpenFileInformation { + + /** Is this SQL? */ + private final boolean isSql; + + /** File status; may be null. */ + private final S3AFileStatus status; + + /** SQL string if this is a SQL select file*/ + private final String sql; + + /** Active input policy. */ + private final S3AInputPolicy inputPolicy; + + /** Change detection policy. */ + private final ChangeDetectionPolicy changePolicy; + + /** read ahead range. */ + private final long readAheadRange; + + private OpenFileInformation(final boolean isSql, + final S3AFileStatus status, + final String sql, + final S3AInputPolicy inputPolicy, + final ChangeDetectionPolicy changePolicy, + final long readAheadRange) { + this.isSql = isSql; + this.status = status; + this.sql = sql; + this.inputPolicy = inputPolicy; + this.changePolicy = changePolicy; + this.readAheadRange = readAheadRange; + } + + public boolean isSql() { + return isSql; + } + + public S3AFileStatus getStatus() { + return status; + } + + public String getSql() { + return sql; + } + + public S3AInputPolicy getInputPolicy() { + return inputPolicy; + } + + public ChangeDetectionPolicy getChangePolicy() { + return changePolicy; + } + + public long getReadAheadRange() { + return readAheadRange; + } + + @Override + public String toString() { + return "OpenFileInformation{" + + "isSql=" + isSql + + ", status=" + status + + ", sql='" + sql + '\'' + + ", inputPolicy=" + inputPolicy + + ", changePolicy=" + changePolicy + + ", readAheadRange=" + readAheadRange + + '}'; + } + } + + /** + * Prepare to open a file from the openFile parameters. + * @param path path to the file + * @param parameters open file parameters from the builder. + * @param blockSize for fileStatus + * @return open file options + * @throws IOException failure to resolve the link. + * @throws IllegalArgumentException unknown mandatory key + */ + public OpenFileInformation prepareToOpenFile( + final Path path, + final OpenFileParameters parameters, + final long blockSize) throws IOException { + Configuration options = parameters.getOptions(); + Set mandatoryKeys = parameters.getMandatoryKeys(); + String sql = options.get(SelectConstants.SELECT_SQL, null); + boolean isSelect = sql != null; + // choice of keys depends on open type + if (isSelect) { + // S3 Select call adds a large set of supported mandatory keys + rejectUnknownMandatoryKeys( + mandatoryKeys, + InternalSelectConstants.SELECT_OPTIONS, + "for " + path + " in S3 Select operation"); + } else { + rejectUnknownMandatoryKeys( + mandatoryKeys, + InternalConstants.S3A_OPENFILE_KEYS, + "for " + path + " in non-select file I/O"); + } + FileStatus providedStatus = parameters.getStatus(); + S3AFileStatus fileStatus; + if (providedStatus != null) { + // there's a file status + + // make sure the file name matches -the rest of the path + // MUST NOT be checked. + Path providedStatusPath = providedStatus.getPath(); + checkArgument(path.getName().equals(providedStatusPath.getName()), + "Filename mismatch between file being opened %s and" + + " supplied filestatus %s", + path, providedStatusPath); + + // make sure the status references a file + if (providedStatus.isDirectory()) { + throw new FileNotFoundException( + "Supplied status references a directory " + providedStatus); + } + // build up the values + long len = providedStatus.getLen(); + long modTime = providedStatus.getModificationTime(); + String versionId; + String eTag; + // can use this status to skip our own probes, + LOG.debug("File was opened with a supplied FileStatus;" + + " skipping getFileStatus call in open() operation: {}", + + providedStatus); + if (providedStatus instanceof S3AFileStatus) { + S3AFileStatus st = (S3AFileStatus) providedStatus; + versionId = st.getVersionId(); + eTag = st.getETag(); + } else if (providedStatus instanceof S3ALocatedFileStatus) { + // S3ALocatedFileStatus instance may supply etag and version. + S3ALocatedFileStatus st + = (S3ALocatedFileStatus) providedStatus; + versionId = st.getVersionId(); + eTag = st.getETag(); + } else { + // it is another type. + // build a status struct without etag or version. + LOG.debug("Converting file status {}", providedStatus); + versionId = null; + eTag = null; + } + // Construct a new file status with the real path of the file. + fileStatus = new S3AFileStatus( + len, + modTime, + path, + blockSize, + username, + eTag, + versionId); + } else if (options.get(FS_OPT_OPENFILE_LENGTH) != null) { + // build a minimal S3A FileStatus From the input length alone. + // this is all we actually need. + long length = options.getLong(FS_OPT_OPENFILE_LENGTH, 0); + LOG.debug("Fixing length of file to read {} as {}", path, length); + fileStatus = new S3AFileStatus( + length, + 0, + path, + blockSize, + username, + null, null); + } else { + // neither a file status nor a file length + fileStatus = null; + } + // seek policy from default, s3a opt or standard option + String policy1 = options.get(INPUT_FADVISE, inputPolicy.toString()); + String policy2 = options.get(FS_OPT_OPENFILE_FADVISE, policy1); + S3AInputPolicy policy = S3AInputPolicy.getPolicy(policy2); + + // readahead range + long ra = options.getLong(READAHEAD_RANGE, readAheadRange); + return new OpenFileInformation(isSelect, fileStatus, sql, policy, + changePolicy, ra); + } + + /** + * Open a simple file by builing the base + * @return the parameters needed to open a file through open(). + */ + public OpenFileInformation openSimpleFile() { + return simpleFileInformation; + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java index 4f387d8ccfa75..fbf5226afb82f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java @@ -71,7 +71,7 @@ private InternalSelectConstants() { CSV_OUTPUT_QUOTE_FIELDS, CSV_OUTPUT_RECORD_DELIMITER )); - options.addAll(InternalConstants.STANDARD_OPENFILE_KEYS); + options.addAll(InternalConstants.S3A_OPENFILE_KEYS); SELECT_OPTIONS = Collections.unmodifiableSet(options); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java index d78273b14710a..e0d9d4d9a06b1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java @@ -18,11 +18,22 @@ package org.apache.hadoop.fs.contract.s3a; +import java.io.FileNotFoundException; + +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractOpenTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * S3A contract tests opening files. @@ -54,4 +65,59 @@ protected AbstractFSContract createContract(Configuration conf) { protected boolean areZeroByteFilesEncrypted() { return true; } + + @Test + public void testOpenFileApplyReadBadName() throws Throwable { + describe("use the apply sequence to read a whole file"); + Path path = methodPath(); + FileSystem fs = getFileSystem(); + touch(fs, path); + FileStatus st = fs.getFileStatus(path); + // The final element of the path is different, so + // openFile must fail + FileStatus st2 = new FileStatus( + 0, false, + st.getReplication(), + st.getBlockSize(), + st.getModificationTime(), + st.getAccessTime(), + st.getPermission(), + st.getOwner(), + st.getGroup(), + new Path("gopher:///localhost/something.txt")); + intercept(IllegalArgumentException.class, () -> + fs.openFile(path) + .withFileStatus(st2) + .build()); + } + + /** + * Pass in a directory reference and expect the openFile call + * to fail. + */ + @Test + public void testOpenFileDirectory() throws Throwable { + describe("Change the status to a directory"); + Path path = methodPath(); + FileSystem fs = getFileSystem(); + int len = 4096; + createFile(fs, path, true, + dataset(len, 0x40, 0x80)); + FileStatus st = fs.getFileStatus(path); + FileStatus st2 = new FileStatus( + len, true, + st.getReplication(), + st.getBlockSize(), + st.getModificationTime(), + st.getAccessTime(), + st.getPermission(), + st.getOwner(), + st.getGroup(), + path); + intercept(FileNotFoundException.class, () -> + fs.openFile(path) + .withFileStatus(st2) + .build()); + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java index 8d02328e35c88..f9260d6ab016c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java @@ -46,9 +46,9 @@ import java.util.concurrent.CompletableFuture; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; -import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; -import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL; -import static org.apache.hadoop.fs.s3a.Constants.OPEN_OPTION_LENGTH; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE_SEQUENTIAL; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_LENGTH; import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL; import static org.apache.hadoop.fs.s3a.Statistic.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; @@ -727,7 +727,7 @@ public void testOpenFileCost() throws Throwable { st.getPermission(), st.getOwner(), st.getGroup(), - new Path("gopher:///")); + new Path("gopher:///localhost/" + testFile.getName())); resetMetricDiffs(); // measure GET requests issued. MetricDiff getRequests = new MetricDiff(fs, STREAM_OPENED); @@ -754,8 +754,8 @@ public void testOpenFileCost() throws Throwable { int offset = 2; long shortLen = len - offset; CompletableFuture f2 = fs.openFile(testFile) - .must(INPUT_FADVISE, INPUT_FADV_SEQUENTIAL) - .must(OPEN_OPTION_LENGTH, shortLen) + .must(FS_OPT_OPENFILE_FADVISE, FS_OPT_OPENFILE_FADVISE_SEQUENTIAL) + .opt(FS_OPT_OPENFILE_LENGTH, shortLen) .build(); verifyOperationCount(0, 0); getRequests.assertDiffEquals(0); @@ -772,8 +772,8 @@ public void testOpenFileCost() throws Throwable { long longLen = len + 10; FSDataInputStream in3 = fs.openFile(testFile) - .must(INPUT_FADVISE, INPUT_FADV_SEQUENTIAL) - .must(OPEN_OPTION_LENGTH, longLen) + .must(FS_OPT_OPENFILE_FADVISE, FS_OPT_OPENFILE_FADVISE_SEQUENTIAL) + .must(FS_OPT_OPENFILE_LENGTH, longLen) .build() .get(); byte[] out = new byte[(int) longLen]; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileHelper.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileHelper.java new file mode 100644 index 0000000000000..f3ac569843454 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileHelper.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.ObjectAssert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.test.HadoopTestBase; + +import static java.util.Collections.singleton; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE_ADAPTIVE; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE_RANDOM; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests for {@link S3AOpenFileHelper}. + */ +public class TestOpenFileHelper extends HadoopTestBase { + + private static final ChangeDetectionPolicy CHANGE_POLICY + = ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.Server, + ChangeDetectionPolicy.Source.None, false); + + private static final long READ_AHEAD_RANGE = 16; + + private static final String USERNAME = "hadoop"; + + public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential; + + public static final String TESTFILE = "s3a://bucket/name"; + + private static final Path TESTPATH = new Path(TESTFILE); + + private S3AOpenFileHelper helper; + + @Before + public void setup() { + helper = new S3AOpenFileHelper(INPUT_POLICY, + CHANGE_POLICY, READ_AHEAD_RANGE, USERNAME); + } + + @Test + public void testSimpleFile() throws Throwable { + ObjectAssert + asst = assertFI(helper.openSimpleFile()); + + asst.extracting(f -> f.getChangePolicy()) + .isEqualTo(CHANGE_POLICY); + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + asst.extracting(f -> f.getReadAheadRange()) + .isEqualTo(READ_AHEAD_RANGE); + } + + private ObjectAssert assertFI( + final S3AOpenFileHelper.OpenFileInformation fi) { + return Assertions.assertThat(fi) + .describedAs("File Information %s", fi); + } + + @Test + public void testUnknownMandatory() throws Throwable { + + String key = "unknown"; + intercept(IllegalArgumentException.class, key, () -> + helper.prepareToOpenFile(TESTPATH, + params(key, "undefined"), + 0)); + } + + @Test + public void testSeekPolicy() throws Throwable { + + // ask for random IO + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(INPUT_FADVISE, FS_OPT_OPENFILE_FADVISE_RANDOM), + 0)); + + // is picked up + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + // and as neither status nor length was set: no file status + asst.extracting(f -> f.getStatus()) + .isNull(); + } + + @Test + public void testSeekPolicyAdaptive() throws Throwable { + + // when caller asks for adaptive, they get "normal" + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(INPUT_FADVISE, FS_OPT_OPENFILE_FADVISE_ADAPTIVE), + 0)); + + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Normal); + } + + @Test + public void testUnknownSeekPolicy() throws Throwable { + + // fall back to the normal seek policy. + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(INPUT_FADVISE, "undefined"), + 0)); + + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Normal); + } + + @Test + public void testReadahead() throws Throwable { + + // readahead range option + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(READAHEAD_RANGE, "4096"), + 0)); + + asst.extracting(f -> f.getReadAheadRange()) + .isEqualTo(4096L); + } + + @Test + public void testStatusWithValidFilename() throws Throwable { + Path p = new Path("file:///tmp/" + TESTPATH.getName()); + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(FS_OPT_OPENFILE_LENGTH, "32") + .withStatus(status(p, 4096)), + 0)); + asst.extracting(f -> f.getStatus().getVersionId()) + .isEqualTo("version"); + asst.extracting(f -> f.getStatus().getETag()) + .isEqualTo("etag"); + asst.extracting(f -> f.getStatus().getLen()) + .isEqualTo(4096L); + } + + @Test + public void testLocatedStatus() throws Throwable { + Path p = new Path("file:///tmp/" + TESTPATH.getName()); + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(FS_OPT_OPENFILE_LENGTH, "32") + .withStatus( + new S3ALocatedFileStatus( + status(p, 4096), null)), + 0)); + asst.extracting(f -> f.getStatus().getVersionId()) + .isEqualTo("version"); + asst.extracting(f -> f.getStatus().getETag()) + .isEqualTo("etag"); + asst.extracting(f -> f.getStatus().getLen()) + .isEqualTo(4096L); + } + + /** + * Callers cannot supply a directory status when opening a file. + */ + @Test + public void testDirectoryStatus() throws Throwable { + intercept(FileNotFoundException.class, TESTFILE, () -> + helper.prepareToOpenFile(TESTPATH, + params(INPUT_FADVISE, "normal") + .withStatus(new S3AFileStatus(true, TESTPATH, USERNAME)), + 0)); + } + + /** + * File name must match the path argument to openFile(). + */ + @Test + public void testStatusWithInconsistentFilename() throws Throwable { + intercept(IllegalArgumentException.class, TESTFILE, () -> + helper.prepareToOpenFile(TESTPATH, + params(INPUT_FADVISE, "normal") + .withStatus(new S3AFileStatus(true, + new Path(TESTFILE + "-"), USERNAME)), + 0)); + } + + /** + * If a file length option is set, create a file status + */ + @Test + public void testFileLength() throws Throwable { + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(FS_OPT_OPENFILE_LENGTH, "8192") + .withStatus(null), + 0)); + asst.extracting(f -> f.getStatus()) + .isNotNull(); + asst.extracting(f -> f.getStatus().getPath()) + .isEqualTo(TESTPATH); + asst.extracting(f -> f.getStatus().getLen()) + .isEqualTo(8192L); + } + + + private S3AFileStatus status(final Path p, final int length) { + return new S3AFileStatus(length, 0, + p, 0, "", "etag", "version"); + } + + private OpenFileParameters params(final String key, final String val) { + return new OpenFileParameters() + .withMandatoryKeys(singleton(key)) + .withOptions(conf(key, val)); + } + + private Configuration conf(String key, Object val) { + Configuration c = new Configuration(false); + c.set(key, val.toString()); + return c; + } +}