diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java index b7757a62e28ad..a4c7254cfeb3c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java @@ -61,6 +61,13 @@ public interface FSBuilder> { */ B opt(@Nonnull String key, float value); + /** + * Set optional long parameter for the Builder. + * + * @see #opt(String, String) + */ + B opt(@Nonnull String key, long value); + /** * Set optional double parameter for the Builder. * @@ -104,6 +111,13 @@ public interface FSBuilder> { */ B must(@Nonnull String key, float value); + /** + * Set mandatory long option. + * + * @see #must(String, String) + */ + B must(@Nonnull String key, long value); + /** * Set mandatory double option. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index e5f4ef3809f18..c0a3c0ffb1038 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -2926,6 +2926,7 @@ public CompletableFuture build() throws IOException { final Path absF = fixRelativePart(getPath()); OpenFileParameters parameters = new OpenFileParameters() .withMandatoryKeys(getMandatoryKeys()) + .withOptionalKeys(getOptionalKeys()) .withOptions(getOptions()) .withBufferSize(getBufferSize()) .withStatus(getStatus()); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index ab5040486dffc..8345b10ef4340 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -4536,7 +4536,7 @@ protected CompletableFuture openFileWithOptions( final OpenFileParameters parameters) throws IOException { AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( parameters.getMandatoryKeys(), - Collections.emptySet(), ""); + OpenFileParameters.STANDARD_OPTIONS, ""); CompletableFuture result = new CompletableFuture<>(); try { result.complete(open(pathHandle, parameters.getBufferSize())); @@ -4643,6 +4643,7 @@ public CompletableFuture build() throws IOException { Optional optionalPath = getOptionalPath(); OpenFileParameters parameters = new OpenFileParameters() .withMandatoryKeys(getMandatoryKeys()) + .withOptionalKeys(getOptionalKeys()) .withOptions(getOptions()) .withBufferSize(getBufferSize()) .withStatus(super.getStatus()); // explicit to avoid IDE warnings diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java index 5fc92e97be76c..4a7ca0c6a354f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java @@ -42,18 +42,20 @@ /** * Builder for filesystem/filecontext operations of various kinds, * with option support. - * - * + *

+ *
  *   .opt("foofs:option.a", true)
  *   .opt("foofs:option.b", "value")
- *   .opt("barfs:cache", true)
+ *   .opt("fs.s3a.open.option.etag", "9fe4c37c25b")
  *   .must("foofs:cache", true)
  *   .must("barfs:cache-size", 256 * 1024 * 1024)
  *   .build();
- * 
+ * 
+ *

* * Configuration keys declared in an {@code opt()} may be ignored by * a builder which does not recognise them. + *

* * Configuration keys declared in a {@code must()} function set must * be understood by the implementation or a @@ -88,6 +90,9 @@ /** Keep track of the keys for mandatory options. */ private final Set mandatoryKeys = new HashSet<>(); + /** Keep track of the optional keys. */ + private final Set optionalKeys = new HashSet<>(); + /** * Constructor with both optional path and path handle. * Either or both argument may be empty, but it is an error for @@ -163,6 +168,7 @@ public PathHandle getPathHandle() { @Override public B opt(@Nonnull final String key, @Nonnull final String value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.set(key, value); return getThisBuilder(); } @@ -175,6 +181,7 @@ public B opt(@Nonnull final String key, @Nonnull final String value) { @Override public B opt(@Nonnull final String key, boolean value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setBoolean(key, value); return getThisBuilder(); } @@ -187,10 +194,19 @@ public B opt(@Nonnull final String key, boolean value) { @Override public B opt(@Nonnull final String key, int value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setInt(key, value); return getThisBuilder(); } + @Override + public B opt(@Nonnull final String key, final long value) { + mandatoryKeys.remove(key); + optionalKeys.add(key); + options.setLong(key, value); + return getThisBuilder(); + } + /** * Set optional float parameter for the Builder. * @@ -199,6 +215,7 @@ public B opt(@Nonnull final String key, int value) { @Override public B opt(@Nonnull final String key, float value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setFloat(key, value); return getThisBuilder(); } @@ -211,6 +228,7 @@ public B opt(@Nonnull final String key, float value) { @Override public B opt(@Nonnull final String key, double value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setDouble(key, value); return getThisBuilder(); } @@ -223,6 +241,7 @@ public B opt(@Nonnull final String key, double value) { @Override public B opt(@Nonnull final String key, @Nonnull final String... values) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setStrings(key, values); return getThisBuilder(); } @@ -248,6 +267,7 @@ public B must(@Nonnull final String key, @Nonnull final String value) { @Override public B must(@Nonnull final String key, boolean value) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setBoolean(key, value); return getThisBuilder(); } @@ -260,10 +280,19 @@ public B must(@Nonnull final String key, boolean value) { @Override public B must(@Nonnull final String key, int value) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setInt(key, value); return getThisBuilder(); } + @Override + public B must(@Nonnull final String key, final long value) { + mandatoryKeys.add(key); + optionalKeys.remove(key); + options.setLong(key, value); + return getThisBuilder(); + } + /** * Set mandatory float option. * @@ -272,6 +301,7 @@ public B must(@Nonnull final String key, int value) { @Override public B must(@Nonnull final String key, float value) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setFloat(key, value); return getThisBuilder(); } @@ -284,6 +314,7 @@ public B must(@Nonnull final String key, float value) { @Override public B must(@Nonnull final String key, double value) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setDouble(key, value); return getThisBuilder(); } @@ -296,6 +327,7 @@ public B must(@Nonnull final String key, double value) { @Override public B must(@Nonnull final String key, @Nonnull final String... values) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setStrings(key, values); return getThisBuilder(); } @@ -314,6 +346,12 @@ public Configuration getOptions() { public Set getMandatoryKeys() { return Collections.unmodifiableSet(mandatoryKeys); } + /** + * Get all the keys that are set as optional keys. + */ + public Set getOptionalKeys() { + return Collections.unmodifiableSet(optionalKeys); + } /** * Reject a configuration if one or more mandatory keys are diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java index 24a8d49747fe6..04c84d5595196 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java @@ -148,7 +148,7 @@ public FutureDataInputStreamBuilder getThisBuilder() { @Override public FutureDataInputStreamBuilder withFileStatus(FileStatus st) { - this.status = requireNonNull(st, "status"); + this.status = st; return this; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java index 77b4ff52696a3..6e86f8f7afa51 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs.impl; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -38,6 +40,9 @@ public class OpenFileParameters { */ private Set mandatoryKeys; + /** The optional keys. */ + private Set optionalKeys; + /** * Options set during the build sequence. */ @@ -61,6 +66,11 @@ public OpenFileParameters withMandatoryKeys(final Set keys) { return this; } + public OpenFileParameters withOptionalKeys(final Set keys) { + this.optionalKeys = requireNonNull(keys); + return this; + } + public OpenFileParameters withOptions(final Configuration opts) { this.options = requireNonNull(opts); return this; @@ -80,6 +90,10 @@ public Set getMandatoryKeys() { return mandatoryKeys; } + public Set getOptionalKeys() { + return optionalKeys; + } + public Configuration getOptions() { return options; } @@ -91,4 +105,48 @@ public int getBufferSize() { public FileStatus getStatus() { return status; } + + + /** + * OpenFile option for seek policies: {@value}. + */ + public static final String FS_OPT_OPENFILE_FADVISE = + "fs.opt.openfile.fadvise"; + + /** + * fadvise policy: {@value}. + */ + public static final String FS_OPT_OPENFILE_FADVISE_NORMAL = "normal"; + + /** + * fadvise policy: {@value}. + */ + public static final String FS_OPT_OPENFILE_FADVISE_SEQUENTIAL = "sequential"; + + /** + * fadvise policy: {@value}. + */ + public static final String FS_OPT_OPENFILE_FADVISE_RANDOM = "random"; + + /** + * fadvise policy: {@value}. + */ + public static final String FS_OPT_OPENFILE_FADVISE_ADAPTIVE = "adaptive"; + + /** + * OpenFile option for seek policies: {@value}. + */ + public static final String FS_OPT_OPENFILE_LENGTH = + "fs.opt.openfile.length"; + + /** + * Set of standard options. + */ + public static final Set STANDARD_OPTIONS = + Stream.of( + FS_OPT_OPENFILE_FADVISE, + FS_OPT_OPENFILE_LENGTH) + .collect(Collectors.toSet()); + + } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md index eadba174fc1a6..11c6f673f9855 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md @@ -65,8 +65,29 @@ If a filesystem implementation extends the `FileStatus` returned in its implementation MAY use this information when opening the file. This is relevant with those stores which return version/etag information, -including the S3A and ABFS connectors -they MAY use this to guarantee that -the file they opened is exactly the one returned in the listing. +-they MAY use this to guarantee that the file they opened +is exactly the one returned in the listing. + + +The final `status.getPath().getName()` element of the supplied status MUST equal +the name value of the path supplied to the `openFile(path)` call. + +Filesystems MUST NOT validate the rest of the path. +This is needed to support viewfs and other mount-point wrapper filesystems +where schemas and paths are different. These often create their own FileStatus results + +Preconditions + +```python +status == null or status.getPath().getName() == path.getName() + +``` + +Filesystems MUST NOT require the class of `status` to equal +that of any specific subclass their implementation returns in filestatus/list +operations. This is to support wrapper filesystems and serialization/deserialization +of the status. + ### Set optional or mandatory parameters @@ -77,15 +98,22 @@ Set optional or mandatory parameters to the builder. Using `opt()` or `must()`, client can specify FS-specific parameters without inspecting the concrete type of `FileSystem`. +Example: + ```java out = fs.openFile(path) - .opt("fs.s3a.experimental.input.fadvise", "random") - .must("fs.s3a.readahead.range", 256 * 1024) + .must("fs.opt.openfile.fadvise", "random") + .opt("fs.http.connection.timeout", 30_000L) .withFileStatus(statusFromListing) .build() .get(); ``` +Here the seek policy of `random` has been specified, +with the requirement that the filesystem implementation must understand the option. +And s3a-specific option has been supplied which may be interpreted by any store; +the expectation is that the S3A connector will recognize it. + #### Implementation Notes Checking for supported options must be performed in the `build()` operation. @@ -151,3 +179,122 @@ evaluation, and so will surface when the future is completed: ```java FSDataInputStream in = future.get(); ``` + +## Standard options + +Individual filesystems MAY provide their own set of options for use in the builder. + +There are also some standard options with common names, all of which begin with +`fs.opt.openfile` + +| Name | Type | Description | +|------|------|-------------| +| `fs.opt.openfile.fadvise` | string | seek policy | +| `fs.opt.openfile.length` | long | file length | + +These policies are *not* declared as constants in +public `org.apache.hadoop` classes/interfaces, so as to avoid +applications being unable to link to a specific hadoop release +without that standard option. + +(Implementors: use `org.apache.hadoop.fs.impl.OpenFileParameters`.) + +### Seek Policy Option `fs.opt.openfile.fadvise` + +This is a hint as to what the expected read pattern of an input stream will be. +"sequential" -read a file from start to finish. "Random": client will be +reading data in different parts of the file using a sequence of `seek()/read()` +or via the `PositionedReadable` or `ByteBufferPositionedReadable` APIs. + +Sequential reads may be optimized with prefetching data and/or reading data in larger blocks. +In contrast, random IO performance may be best if little/no prefetching takes place, along +with other possible optimizations. +Queries over columnar formats such as Apach ORC and Apache Parquet +perform such random IO; other data formats are best for sequential reads. +Some applications (e.g. distCp) perform sequential IO even over columnar data. + +What is key is that optimizing reads for seqential reads may impair random performance +-and vice versa. + +Allowing applications to hint what their read policy is allows the filesystem clients +to optimize their interaction with the underlying data stores. + +1. The seek policy is a hint; even if declared as a `must()` option, the filesystem +MAY ignore it. +1. The interpretation/implementation of a policy is a filesystem specific behavior +-and it may change with Hadoop releases and/or specific storage subsystems. +1. If a policy is not recognized, the FileSystem MUST ignore it and revert to +whichever policy is active for that specific FileSystem instance. + +| Policy | Meaning | +|--------|---------| +| `normal` | Default policy for the filesystem | +| `sequential` | Optimize for sequential IO | +| `random` | Optimize for random IO | +| `adaptive` | Adapt seek policy based on read patterns | + + +#### Seek Policy `normal` + +The default policy for the filesystem instance. +Implementation/installation-specific + +#### Seek Policy `sequential` + +Expect sequential reads from the first byte read to the end of the file/until the stream is closed. + + +#### Seek Policy `random` + +Expect `seek()/read()` sequences, or use of `PositionedReadable` or `ByteBufferPositionedReadable` APIs. + +#### Seek Policy `adaptive` + +Try to adapt the seek policy to the read pattern of the application. + +The `normal` policy of the S3A client and the sole policy supported by the `wasb:` client are both +adaptive -they assume sequential IO, but once a backwards seek/positioned read call is made +the stream switches to random IO. + +Other filesystem implementations may wish to adopt similar strategies, and/or extend +the algorithms to detect forward seeks and/or switch from random to sequential IO if +that is considered more efficient. + +Adaptive seek policies have proven effective in the absence of the ability to declare +the seek policy in the `open()` API, so requiring it to be declared, if configurable, +in the cluster/application configuration. However, the switch from sequential to +random seek policies may + +When applications explicitly set the `fs.opt.openfile.fadvise` option, if they +know their read plan, they SHOULD declare which policy is most appropriate. + +_Implementor's Notes_ + +If a policy is unrecognized/unsupported: SHOULD log it and then MUST ignore the option. This is +to support future evolution of policies and implementations + + +_Futher reading_ + +* [Linux fadvise()](https://linux.die.net/man/2/fadvise). +* [Windows `CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior) + +### File Length hint `fs.opt.openfile.length` + +Declare that a file is expected to be *at least as long as the specified length* + +This can be used by clients to skip querying a remote store for the size of/existence of a file when +opening it, similar to declaring a file status through the `withFileStatus()` option. + +The supplied length MAY be shorter than the actual file. + +This allows worker tasks which only know of of their task's split to declare that as the +length of the file. + +If this option is used by the FileSystem implementation + +* A `length` < 0 MUST be rejected. +* If a file status is supplied along with a value in `fs.opt.openfile.length`; the file status +values take precedence. +* seek/read calls past the length value MAY be rejected, equally: they MAY be accepted. It is simply a hint. + diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java index a43053180fbf8..effae0382d9af 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java @@ -263,6 +263,7 @@ public void testChainedFailureAwaitFuture() throws Throwable { describe("await Future handles chained failures"); CompletableFuture f = getFileSystem() .openFile(path("testOpenFileUnknownOption")) + .withFileStatus(null) .build(); intercept(RuntimeException.class, "exceptionally", @@ -280,13 +281,34 @@ public void testOpenFileApplyRead() throws Throwable { int len = 4096; createFile(fs, path, true, dataset(len, 0x40, 0x80)); + FileStatus st = fs.getFileStatus(path); CompletableFuture readAllBytes = fs.openFile(path) - .withFileStatus(fs.getFileStatus(path)) + .withFileStatus(st) .build() .thenApply(ContractTestUtils::readStream); assertEquals("Wrong number of bytes read value", len, (long) readAllBytes.get()); + // now reattempt with a new FileStatus and a different path + // other than the final name element + // implementations MUST use path in openFile() call + FileStatus st2 = new FileStatus( + len, false, + st.getReplication(), + st.getBlockSize(), + st.getModificationTime(), + st.getAccessTime(), + st.getPermission(), + st.getOwner(), + st.getGroup(), + new Path("gopher:///localhost:/" + path.getName())); + assertEquals("Wrong number of bytes read value", + len, + (long) fs.openFile(path) + .withFileStatus(st2) + .build() + .thenApply(ContractTestUtils::readStream) + .get()); } @Test diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 4789630f95f1c..3ee0a17098386 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -1565,17 +1565,22 @@ public static int read(InputStream in) { /** * Read a whole stream; downgrades an IOE to a runtime exception. + * Closes the stream afterwards. * @param in input * @return the number of bytes read. * @throws AssertionError on any IOException */ public static long readStream(InputStream in) { - long count = 0; + try { + long count = 0; - while (read(in) >= 0) { - count++; + while (read(in) >= 0) { + count++; + } + return count; + } finally { + IOUtils.cleanupWithLogger(LOG, in); } - return count; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 22a0b45f1c7a5..609e8efc4a5a2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_LENGTH; + /** * All the constants used with the {@link S3AFileSystem}. * @@ -522,7 +524,6 @@ private Constants() { * reading data. * Value: {@value} */ - @InterfaceStability.Unstable public static final String INPUT_FADVISE = "fs.s3a.experimental.input.fadvise"; @@ -530,14 +531,12 @@ private Constants() { * General input. Some seeks, some reads. * Value: {@value} */ - @InterfaceStability.Unstable public static final String INPUT_FADV_NORMAL = "normal"; /** * Optimized for sequential access. * Value: {@value} */ - @InterfaceStability.Unstable public static final String INPUT_FADV_SEQUENTIAL = "sequential"; /** @@ -546,7 +545,6 @@ private Constants() { * more efficient {@code seek()} operations. * Value: {@value} */ - @InterfaceStability.Unstable public static final String INPUT_FADV_RANDOM = "random"; @InterfaceAudience.Private diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 286df44939312..1532c42537bf0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -104,8 +104,8 @@ import org.apache.hadoop.fs.s3a.impl.ContextAccessors; import org.apache.hadoop.fs.s3a.impl.CopyOutcome; import org.apache.hadoop.fs.s3a.impl.DeleteOperation; -import org.apache.hadoop.fs.s3a.impl.InternalConstants; import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport; +import org.apache.hadoop.fs.s3a.impl.S3AOpenFileHelper; import org.apache.hadoop.fs.s3a.impl.OperationCallbacks; import org.apache.hadoop.fs.s3a.impl.RenameOperation; import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder; @@ -114,7 +114,6 @@ import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; import org.apache.hadoop.fs.s3a.impl.statistics.S3AMultipartUploaderStatisticsImpl; import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; -import org.apache.hadoop.fs.s3a.select.InternalSelectConstants; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.DelegationTokenIssuer; @@ -164,7 +163,6 @@ import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.hadoop.util.concurrent.HadoopExecutors; -import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Invoker.*; @@ -293,6 +291,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private final S3AFileSystem.OperationCallbacksImpl operationCallbacks = new OperationCallbacksImpl(); + /** + * Helper for the openFile() method. + */ + private S3AOpenFileHelper openFileHelper; + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -455,6 +458,12 @@ public void initialize(URI name, Configuration originalConf) pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE, BULK_DELETE_PAGE_SIZE_DEFAULT, 0); + // now the open file logic + openFileHelper = new S3AOpenFileHelper( + inputPolicy, + changeDetectionPolicy, + readAhead, + username); } catch (AmazonClientException e) { // amazon client exception: stop all services then throw the translation stopAllServices(); @@ -1069,59 +1078,37 @@ protected URI canonicalizeUri(URI rawUri) { @Retries.RetryTranslated public FSDataInputStream open(Path f, int bufferSize) throws IOException { - return open(f, Optional.empty(), Optional.empty()); + entryPoint(INVOCATION_OPEN); + return open(qualify(f), openFileHelper.openSimpleFile()); } /** * Opens an FSDataInputStream at the indicated Path. - * if status contains an S3AFileStatus reference, it is used - * and so a HEAD request to the store is avoided. + * if fileInformation contains an FileStatus reference, + * then that is used to avoid a check for the file length/existence * - * @param file the file to open - * @param options configuration options if opened with the builder API. - * @param providedStatus optional file status. + * @param path the file to open + * @param fileInformation information about the file to open * @throws IOException IO failure. */ @Retries.RetryTranslated private FSDataInputStream open( - final Path file, - final Optional options, - final Optional providedStatus) + final Path path, + final S3AOpenFileHelper.OpenFileInformation fileInformation) throws IOException { - entryPoint(INVOCATION_OPEN); - final Path path = qualify(file); - S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, - providedStatus); - - S3AReadOpContext readContext; - if (options.isPresent()) { - Configuration o = options.get(); - // normal path. Open the file with the chosen seek policy, if different - // from the normal one. - // and readahead. - S3AInputPolicy policy = S3AInputPolicy.getPolicy( - o.get(INPUT_FADVISE, inputPolicy.toString())); - long readAheadRange2 = o.getLong(READAHEAD_RANGE, readAhead); - // TODO support change detection policy from options? - readContext = createReadContext( - fileStatus, - policy, - changeDetectionPolicy, - readAheadRange2); - } else { - readContext = createReadContext( - fileStatus, - inputPolicy, - changeDetectionPolicy, - readAhead); - } + final S3AFileStatus fileStatus = + extractOrFetchSimpleFileStatus(path, fileInformation); + S3AReadOpContext readContext = createReadContext( + fileStatus, + fileInformation.getInputPolicy(), + fileInformation.getChangePolicy(), + fileInformation.getReadAheadRange()); LOG.debug("Opening '{}'", readContext); - return new FSDataInputStream( new S3AInputStream( readContext, - createObjectAttributes(fileStatus), + createObjectAttributes(path, fileStatus), s3)); } @@ -1175,13 +1162,14 @@ private S3ObjectAttributes createObjectAttributes( /** * Create the attributes of an object for subsequent use. + * @param path path -this is used over the file status path. * @param fileStatus file status to build from. * @return attributes to use when building the query. */ private S3ObjectAttributes createObjectAttributes( - final S3AFileStatus fileStatus) { + final Path path, final S3AFileStatus fileStatus) { return createObjectAttributes( - fileStatus.getPath(), + path, fileStatus.getETag(), fileStatus.getVersionId(), fileStatus.getLen()); @@ -1507,7 +1495,9 @@ public S3ObjectAttributes createObjectAttributes(final Path path, @Override public S3ObjectAttributes createObjectAttributes( final S3AFileStatus fileStatus) { - return S3AFileSystem.this.createObjectAttributes(fileStatus); + return S3AFileSystem.this.createObjectAttributes( + fileStatus.getPath(), + fileStatus); } @Override @@ -4607,24 +4597,27 @@ protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) { * @throws IOException IO failure */ @Retries.RetryTranslated - private FSDataInputStream select(final Path source, - final String expression, + private FSDataInputStream select(final Path path, final Configuration options, - final Optional providedStatus) + final S3AOpenFileHelper.OpenFileInformation fileInformation) throws IOException { entryPoint(OBJECT_SELECT_REQUESTS); - requireSelectSupport(source); - final Path path = makeQualified(source); + requireSelectSupport(path); + String expression = fileInformation.getSql(); final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, - providedStatus); + fileInformation); // readahead range can be dynamically set - long ra = options.getLong(READAHEAD_RANGE, readAhead); - S3ObjectAttributes objectAttributes = createObjectAttributes(fileStatus); - S3AReadOpContext readContext = createReadContext(fileStatus, inputPolicy, - changeDetectionPolicy, ra); + S3ObjectAttributes objectAttributes = createObjectAttributes( + path, fileStatus); + ChangeDetectionPolicy changePolicy = fileInformation.getChangePolicy(); + S3AReadOpContext readContext = createReadContext( + fileStatus, + fileInformation.getInputPolicy(), + changePolicy, + fileInformation.getReadAheadRange()); - if (changeDetectionPolicy.getSource() != ChangeDetectionPolicy.Source.None + if (changePolicy.getSource() != ChangeDetectionPolicy.Source.None && fileStatus.getETag() != null) { // if there is change detection, and the status includes at least an // etag, @@ -4636,7 +4629,7 @@ private FSDataInputStream select(final Path source, // version in the final read; nor can we check the etag match) ChangeTracker changeTracker = new ChangeTracker(uri.toString(), - changeDetectionPolicy, + changePolicy, readContext.instrumentation.newInputStreamStatistics() .getVersionMismatchCounter(), objectAttributes); @@ -4669,21 +4662,20 @@ private void requireSelectSupport(final Path source) throws } /** - * Extract the status from the optional parameter, querying - * S3Guard/s3 if it is absent. + * Get the status for the file, querying + * S3Guard/s3 if there is none in the fileInformation parameter. * @param path path of the status - * @param optStatus optional status + * @param fileInformation information on the file to open * @return a file status * @throws FileNotFoundException if there is no normal file at that path * @throws IOException IO failure */ private S3AFileStatus extractOrFetchSimpleFileStatus( - final Path path, final Optional optStatus) + final Path path, + final S3AOpenFileHelper.OpenFileInformation fileInformation) throws IOException { - S3AFileStatus fileStatus; - if (optStatus.isPresent()) { - fileStatus = optStatus.get(); - } else { + S3AFileStatus fileStatus = fileInformation.getStatus(); + if (fileStatus == null) { // this looks at S3guard and gets any type of status back, // if it falls back to S3 it does a HEAD only. // therefore: if there is no S3Guard and there is a dir, this @@ -4716,54 +4708,19 @@ public CompletableFuture openFileWithOptions( final Path rawPath, final OpenFileParameters parameters) throws IOException { final Path path = qualify(rawPath); - Configuration options = parameters.getOptions(); - Set mandatoryKeys = parameters.getMandatoryKeys(); - String sql = options.get(SelectConstants.SELECT_SQL, null); - boolean isSelect = sql != null; - // choice of keys depends on open type - if (isSelect) { - rejectUnknownMandatoryKeys( - mandatoryKeys, - InternalSelectConstants.SELECT_OPTIONS, - "for " + path + " in S3 Select operation"); - } else { - rejectUnknownMandatoryKeys( - mandatoryKeys, - InternalConstants.STANDARD_OPENFILE_KEYS, - "for " + path + " in non-select file I/O"); - } - FileStatus providedStatus = parameters.getStatus(); - S3AFileStatus fileStatus; - if (providedStatus != null) { - Preconditions.checkArgument(path.equals(providedStatus.getPath()), - "FileStatus parameter is not for the path %s: %s", - path, providedStatus); - if (providedStatus instanceof S3AFileStatus) { - // can use this status to skip our own probes, - // including etag and version. - LOG.debug("File was opened with a supplied S3AFileStatus;" - + " skipping getFileStatus call in open() operation: {}", - providedStatus); - fileStatus = (S3AFileStatus) providedStatus; - } else if (providedStatus instanceof S3ALocatedFileStatus) { - LOG.debug("File was opened with a supplied S3ALocatedFileStatus;" - + " skipping getFileStatus call in open() operation: {}", - providedStatus); - fileStatus = ((S3ALocatedFileStatus) providedStatus).toS3AFileStatus(); - } else { - LOG.debug("Ignoring file status {}", providedStatus); - fileStatus = null; - } - } else { - fileStatus = null; - } - Optional ost = Optional.ofNullable(fileStatus); + S3AOpenFileHelper.OpenFileInformation fileInformation = + openFileHelper.prepareToOpenFile( + path, + parameters, + getDefaultBlockSize(path)); + boolean isSelect = fileInformation.isSql(); CompletableFuture result = new CompletableFuture<>(); + Configuration options = parameters.getOptions(); if (!isSelect) { // normal path. unboundedThreadPool.submit(() -> LambdaUtils.eval(result, - () -> open(path, Optional.of(options), ost))); + () -> open(path, fileInformation))); } else { // it is a select statement. // fail fast if the operation is not available @@ -4771,7 +4728,7 @@ public CompletableFuture openFileWithOptions( // submit the query unboundedThreadPool.submit(() -> LambdaUtils.eval(result, - () -> select(path, sql, options, ost))); + () -> select(path, options, fileInformation))); } return result; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java index c018410e11767..bc726fea4eb95 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java @@ -25,6 +25,7 @@ import java.util.Locale; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE_ADAPTIVE; import static org.apache.hadoop.fs.s3a.Constants.*; /** @@ -62,6 +63,7 @@ public static S3AInputPolicy getPolicy(String name) { String trimmed = name.trim().toLowerCase(Locale.ENGLISH); switch (trimmed) { case INPUT_FADV_NORMAL: + case FS_OPT_OPENFILE_FADVISE_ADAPTIVE: return Normal; case INPUT_FADV_RANDOM: return Random; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 9c8b9ae7a156e..d868c45d9852b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -142,7 +142,7 @@ public S3AInputStream(S3AReadOpContext ctx, this.context = ctx; this.bucket = s3Attributes.getBucket(); this.key = s3Attributes.getKey(); - this.pathStr = ctx.dstFileStatus.getPath().toString(); + this.pathStr = s3Attributes.getPath().toString(); this.contentLength = l; this.client = client; this.uri = "s3a://" + this.bucket + "/" + this.key; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 4d944a1721303..caed3dcc46e40 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -18,16 +18,18 @@ package org.apache.hadoop.fs.s3a.impl; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.s3a.Constants; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_LENGTH; + /** * Internal constants private only to the S3A codebase. * Please don't refer to these outside of this module & its tests. @@ -79,11 +81,14 @@ private InternalConstants() { * used becomes that of the select operation. */ @InterfaceStability.Unstable - public static final Set STANDARD_OPENFILE_KEYS = - Collections.unmodifiableSet( - new HashSet<>( - Arrays.asList(Constants.INPUT_FADVISE, - Constants.READAHEAD_RANGE))); + public static final Set S3A_OPENFILE_KEYS = + Stream.of( + FS_OPT_OPENFILE_FADVISE, + FS_OPT_OPENFILE_LENGTH, + Constants.INPUT_FADVISE, + Constants.READAHEAD_RANGE) + .collect(Collectors.toSet()); + /** 404 error code. */ public static final int SC_404 = 404; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AOpenFileHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AOpenFileHelper.java new file mode 100644 index 0000000000000..bb467613e29ad --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AOpenFileHelper.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.fs.s3a.select.InternalSelectConstants; +import org.apache.hadoop.fs.s3a.select.SelectConstants; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; + +/** + * Helper class for openFile() logic, especially processing file status + * args and length/etag/versionID. + *

+ * This got complex enough it merited removal from S3AFS -which + * also permits unit testing. + */ + public class S3AOpenFileHelper { + + private static final Logger LOG = + LoggerFactory.getLogger(S3AOpenFileHelper.class); + + private final S3AInputPolicy inputPolicy; + + private final ChangeDetectionPolicy changePolicy; + + private final long readAheadRange; + + private final String username; + + /** + * Simple file information is created in advance as it is always + * the same. + */ + private final OpenFileInformation simpleFileInformation; + + + /** + * Instantiate with the default options from the filesystem. + * @param inputPolicy input policy + * @param changePolicy change detection policy + * @param readAheadRange read ahead range + * @param username username + */ + public S3AOpenFileHelper( + final S3AInputPolicy inputPolicy, + final ChangeDetectionPolicy changePolicy, + final long readAheadRange, + final String username) { + this.inputPolicy = inputPolicy; + this.changePolicy = changePolicy; + this.readAheadRange = readAheadRange; + this.username = username; + + simpleFileInformation = new OpenFileInformation(false, null, null, + inputPolicy, changePolicy, readAheadRange); + } + + + /** + * The information on a file needed to open it. + */ + public static final class OpenFileInformation { + + /** Is this SQL? */ + private final boolean isSql; + + /** File status; may be null. */ + private final S3AFileStatus status; + + /** SQL string if this is a SQL select file*/ + private final String sql; + + /** Active input policy. */ + private final S3AInputPolicy inputPolicy; + + /** Change detection policy. */ + private final ChangeDetectionPolicy changePolicy; + + /** read ahead range. */ + private final long readAheadRange; + + private OpenFileInformation(final boolean isSql, + final S3AFileStatus status, + final String sql, + final S3AInputPolicy inputPolicy, + final ChangeDetectionPolicy changePolicy, + final long readAheadRange) { + this.isSql = isSql; + this.status = status; + this.sql = sql; + this.inputPolicy = inputPolicy; + this.changePolicy = changePolicy; + this.readAheadRange = readAheadRange; + } + + public boolean isSql() { + return isSql; + } + + public S3AFileStatus getStatus() { + return status; + } + + public String getSql() { + return sql; + } + + public S3AInputPolicy getInputPolicy() { + return inputPolicy; + } + + public ChangeDetectionPolicy getChangePolicy() { + return changePolicy; + } + + public long getReadAheadRange() { + return readAheadRange; + } + + @Override + public String toString() { + return "OpenFileInformation{" + + "isSql=" + isSql + + ", status=" + status + + ", sql='" + sql + '\'' + + ", inputPolicy=" + inputPolicy + + ", changePolicy=" + changePolicy + + ", readAheadRange=" + readAheadRange + + '}'; + } + } + + /** + * Prepare to open a file from the openFile parameters. + * @param path path to the file + * @param parameters open file parameters from the builder. + * @param blockSize for fileStatus + * @return open file options + * @throws IOException failure to resolve the link. + * @throws IllegalArgumentException unknown mandatory key + */ + public OpenFileInformation prepareToOpenFile( + final Path path, + final OpenFileParameters parameters, + final long blockSize) throws IOException { + Configuration options = parameters.getOptions(); + Set mandatoryKeys = parameters.getMandatoryKeys(); + String sql = options.get(SelectConstants.SELECT_SQL, null); + boolean isSelect = sql != null; + // choice of keys depends on open type + if (isSelect) { + // S3 Select call adds a large set of supported mandatory keys + rejectUnknownMandatoryKeys( + mandatoryKeys, + InternalSelectConstants.SELECT_OPTIONS, + "for " + path + " in S3 Select operation"); + } else { + rejectUnknownMandatoryKeys( + mandatoryKeys, + InternalConstants.S3A_OPENFILE_KEYS, + "for " + path + " in non-select file I/O"); + } + FileStatus providedStatus = parameters.getStatus(); + S3AFileStatus fileStatus; + if (providedStatus != null) { + // there's a file status + + // make sure the file name matches -the rest of the path + // MUST NOT be checked. + Path providedStatusPath = providedStatus.getPath(); + checkArgument(path.getName().equals(providedStatusPath.getName()), + "Filename mismatch between file being opened %s and" + + " supplied filestatus %s", + path, providedStatusPath); + + // make sure the status references a file + if (providedStatus.isDirectory()) { + throw new FileNotFoundException( + "Supplied status references a directory " + providedStatus); + } + // build up the values + long len = providedStatus.getLen(); + long modTime = providedStatus.getModificationTime(); + String versionId; + String eTag; + // can use this status to skip our own probes, + LOG.debug("File was opened with a supplied FileStatus;" + + " skipping getFileStatus call in open() operation: {}", + + providedStatus); + if (providedStatus instanceof S3AFileStatus) { + S3AFileStatus st = (S3AFileStatus) providedStatus; + versionId = st.getVersionId(); + eTag = st.getETag(); + } else if (providedStatus instanceof S3ALocatedFileStatus) { + // S3ALocatedFileStatus instance may supply etag and version. + S3ALocatedFileStatus st + = (S3ALocatedFileStatus) providedStatus; + versionId = st.getVersionId(); + eTag = st.getETag(); + } else { + // it is another type. + // build a status struct without etag or version. + LOG.debug("Converting file status {}", providedStatus); + versionId = null; + eTag = null; + } + // Construct a new file status with the real path of the file. + fileStatus = new S3AFileStatus( + len, + modTime, + path, + blockSize, + username, + eTag, + versionId); + } else if (options.get(FS_OPT_OPENFILE_LENGTH) != null) { + // build a minimal S3A FileStatus From the input length alone. + // this is all we actually need. + long length = options.getLong(FS_OPT_OPENFILE_LENGTH, 0); + LOG.debug("Fixing length of file to read {} as {}", path, length); + fileStatus = new S3AFileStatus( + length, + 0, + path, + blockSize, + username, + null, null); + } else { + // neither a file status nor a file length + fileStatus = null; + } + // seek policy from default, s3a opt or standard option + String policy1 = options.get(INPUT_FADVISE, inputPolicy.toString()); + String policy2 = options.get(FS_OPT_OPENFILE_FADVISE, policy1); + S3AInputPolicy policy = S3AInputPolicy.getPolicy(policy2); + + // readahead range + long ra = options.getLong(READAHEAD_RANGE, readAheadRange); + return new OpenFileInformation(isSelect, fileStatus, sql, policy, + changePolicy, ra); + } + + /** + * Open a simple file by builing the base + * @return the parameters needed to open a file through open(). + */ + public OpenFileInformation openSimpleFile() { + return simpleFileInformation; + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java index 4f387d8ccfa75..fbf5226afb82f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java @@ -71,7 +71,7 @@ private InternalSelectConstants() { CSV_OUTPUT_QUOTE_FIELDS, CSV_OUTPUT_RECORD_DELIMITER )); - options.addAll(InternalConstants.STANDARD_OPENFILE_KEYS); + options.addAll(InternalConstants.S3A_OPENFILE_KEYS); SELECT_OPTIONS = Collections.unmodifiableSet(options); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java index d78273b14710a..e0d9d4d9a06b1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java @@ -18,11 +18,22 @@ package org.apache.hadoop.fs.contract.s3a; +import java.io.FileNotFoundException; + +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractOpenTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * S3A contract tests opening files. @@ -54,4 +65,59 @@ protected AbstractFSContract createContract(Configuration conf) { protected boolean areZeroByteFilesEncrypted() { return true; } + + @Test + public void testOpenFileApplyReadBadName() throws Throwable { + describe("use the apply sequence to read a whole file"); + Path path = methodPath(); + FileSystem fs = getFileSystem(); + touch(fs, path); + FileStatus st = fs.getFileStatus(path); + // The final element of the path is different, so + // openFile must fail + FileStatus st2 = new FileStatus( + 0, false, + st.getReplication(), + st.getBlockSize(), + st.getModificationTime(), + st.getAccessTime(), + st.getPermission(), + st.getOwner(), + st.getGroup(), + new Path("gopher:///localhost/something.txt")); + intercept(IllegalArgumentException.class, () -> + fs.openFile(path) + .withFileStatus(st2) + .build()); + } + + /** + * Pass in a directory reference and expect the openFile call + * to fail. + */ + @Test + public void testOpenFileDirectory() throws Throwable { + describe("Change the status to a directory"); + Path path = methodPath(); + FileSystem fs = getFileSystem(); + int len = 4096; + createFile(fs, path, true, + dataset(len, 0x40, 0x80)); + FileStatus st = fs.getFileStatus(path); + FileStatus st2 = new FileStatus( + len, true, + st.getReplication(), + st.getBlockSize(), + st.getModificationTime(), + st.getAccessTime(), + st.getPermission(), + st.getOwner(), + st.getGroup(), + path); + intercept(FileNotFoundException.class, () -> + fs.openFile(path) + .withFileStatus(st2) + .build()); + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java index e54fd97a6af1e..f9260d6ab016c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -33,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.EOFException; import java.io.File; import java.io.FileNotFoundException; import java.net.URI; @@ -41,8 +43,12 @@ import java.util.EnumSet; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE_SEQUENTIAL; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_LENGTH; import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL; import static org.apache.hadoop.fs.s3a.Statistic.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; @@ -688,4 +694,99 @@ public void testCostOfGlobStatusNoSymlinkResolution() throws Throwable { // no additional operations from symlink resolution verifyOperationCount(2, 2); } + + /** + * Test when openFile() performs GET requests when file status + * and length options are passed down. + * Note that the input streams only update the FS statistics + * in close(), so metrics cannot be verified until all operations + * on a stream are complete. + * This is slightly less than ideal. + */ + @Test + public void testOpenFileCost() throws Throwable { + describe("Test cost of openFile with/without status; raw only"); + S3AFileSystem fs = getFileSystem(); + assume("Unguarded FS only", !fs.hasMetadataStore()); + Path testFile = path("testOpenFileCost"); + + writeTextFile(fs, testFile, "openfile", true); + FileStatus st = fs.getFileStatus(testFile); + + // now read that file back in using the openFile call. + // with a new FileStatus and a different path. + // this verifies that any FileStatus class/subclass is used + // as a source of the file length. + long len = st.getLen(); + FileStatus st2 = new FileStatus( + len, false, + st.getReplication(), + st.getBlockSize(), + st.getModificationTime(), + st.getAccessTime(), + st.getPermission(), + st.getOwner(), + st.getGroup(), + new Path("gopher:///localhost/" + testFile.getName())); + resetMetricDiffs(); + // measure GET requests issued. + MetricDiff getRequests = new MetricDiff(fs, STREAM_OPENED); + + final CompletableFuture future = + fs.openFile(testFile) + .withFileStatus(st) + .build(); + FSDataInputStream in = future.get(); + verifyOperationCount(0, 0); + getRequests.assertDiffEquals(0); + + long readLen = readStream(in); + assertEquals("bytes read from file", len, readLen); + verifyOperationCount(0, 0); + getRequests.assertDiffEquals(1); + getRequests.reset(); + resetMetricDiffs(); + + // do a second read with the length declared as short. + // we now expect the bytes read to be shorter. + MetricDiff bytesDiscarded = + new MetricDiff(fs, STREAM_CLOSE_BYTES_READ); + int offset = 2; + long shortLen = len - offset; + CompletableFuture f2 = fs.openFile(testFile) + .must(FS_OPT_OPENFILE_FADVISE, FS_OPT_OPENFILE_FADVISE_SEQUENTIAL) + .opt(FS_OPT_OPENFILE_LENGTH, shortLen) + .build(); + verifyOperationCount(0, 0); + getRequests.assertDiffEquals(0); + FSDataInputStream in2 = f2.get(); + long r2 = readStream(in2); + verifyOperationCount(0, 0); + getRequests.assertDiffEquals(1); + assertEquals("bytes read from file", shortLen, r2); + // the read has been ranged + bytesDiscarded.assertDiffEquals(0); + + // final read past EOF + getRequests.reset(); + + long longLen = len + 10; + FSDataInputStream in3 = fs.openFile(testFile) + .must(FS_OPT_OPENFILE_FADVISE, FS_OPT_OPENFILE_FADVISE_SEQUENTIAL) + .must(FS_OPT_OPENFILE_LENGTH, longLen) + .build() + .get(); + byte[] out = new byte[(int) longLen]; + intercept(EOFException.class, + () -> in3.readFully(0, out)); + in3.seek(longLen - 1); + assertEquals("read past real EOF on " + in3, + -1, in3.read()); + in3.close(); + // two GET calls were made, one for readFully, + // the second on the read() past the EOF + // the operation has got as far as S3 + getRequests.assertDiffEquals(2); + + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileHelper.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileHelper.java new file mode 100644 index 0000000000000..f3ac569843454 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileHelper.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.ObjectAssert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.test.HadoopTestBase; + +import static java.util.Collections.singleton; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE_ADAPTIVE; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_FADVISE_RANDOM; +import static org.apache.hadoop.fs.impl.OpenFileParameters.FS_OPT_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests for {@link S3AOpenFileHelper}. + */ +public class TestOpenFileHelper extends HadoopTestBase { + + private static final ChangeDetectionPolicy CHANGE_POLICY + = ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.Server, + ChangeDetectionPolicy.Source.None, false); + + private static final long READ_AHEAD_RANGE = 16; + + private static final String USERNAME = "hadoop"; + + public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential; + + public static final String TESTFILE = "s3a://bucket/name"; + + private static final Path TESTPATH = new Path(TESTFILE); + + private S3AOpenFileHelper helper; + + @Before + public void setup() { + helper = new S3AOpenFileHelper(INPUT_POLICY, + CHANGE_POLICY, READ_AHEAD_RANGE, USERNAME); + } + + @Test + public void testSimpleFile() throws Throwable { + ObjectAssert + asst = assertFI(helper.openSimpleFile()); + + asst.extracting(f -> f.getChangePolicy()) + .isEqualTo(CHANGE_POLICY); + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + asst.extracting(f -> f.getReadAheadRange()) + .isEqualTo(READ_AHEAD_RANGE); + } + + private ObjectAssert assertFI( + final S3AOpenFileHelper.OpenFileInformation fi) { + return Assertions.assertThat(fi) + .describedAs("File Information %s", fi); + } + + @Test + public void testUnknownMandatory() throws Throwable { + + String key = "unknown"; + intercept(IllegalArgumentException.class, key, () -> + helper.prepareToOpenFile(TESTPATH, + params(key, "undefined"), + 0)); + } + + @Test + public void testSeekPolicy() throws Throwable { + + // ask for random IO + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(INPUT_FADVISE, FS_OPT_OPENFILE_FADVISE_RANDOM), + 0)); + + // is picked up + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + // and as neither status nor length was set: no file status + asst.extracting(f -> f.getStatus()) + .isNull(); + } + + @Test + public void testSeekPolicyAdaptive() throws Throwable { + + // when caller asks for adaptive, they get "normal" + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(INPUT_FADVISE, FS_OPT_OPENFILE_FADVISE_ADAPTIVE), + 0)); + + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Normal); + } + + @Test + public void testUnknownSeekPolicy() throws Throwable { + + // fall back to the normal seek policy. + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(INPUT_FADVISE, "undefined"), + 0)); + + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Normal); + } + + @Test + public void testReadahead() throws Throwable { + + // readahead range option + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(READAHEAD_RANGE, "4096"), + 0)); + + asst.extracting(f -> f.getReadAheadRange()) + .isEqualTo(4096L); + } + + @Test + public void testStatusWithValidFilename() throws Throwable { + Path p = new Path("file:///tmp/" + TESTPATH.getName()); + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(FS_OPT_OPENFILE_LENGTH, "32") + .withStatus(status(p, 4096)), + 0)); + asst.extracting(f -> f.getStatus().getVersionId()) + .isEqualTo("version"); + asst.extracting(f -> f.getStatus().getETag()) + .isEqualTo("etag"); + asst.extracting(f -> f.getStatus().getLen()) + .isEqualTo(4096L); + } + + @Test + public void testLocatedStatus() throws Throwable { + Path p = new Path("file:///tmp/" + TESTPATH.getName()); + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(FS_OPT_OPENFILE_LENGTH, "32") + .withStatus( + new S3ALocatedFileStatus( + status(p, 4096), null)), + 0)); + asst.extracting(f -> f.getStatus().getVersionId()) + .isEqualTo("version"); + asst.extracting(f -> f.getStatus().getETag()) + .isEqualTo("etag"); + asst.extracting(f -> f.getStatus().getLen()) + .isEqualTo(4096L); + } + + /** + * Callers cannot supply a directory status when opening a file. + */ + @Test + public void testDirectoryStatus() throws Throwable { + intercept(FileNotFoundException.class, TESTFILE, () -> + helper.prepareToOpenFile(TESTPATH, + params(INPUT_FADVISE, "normal") + .withStatus(new S3AFileStatus(true, TESTPATH, USERNAME)), + 0)); + } + + /** + * File name must match the path argument to openFile(). + */ + @Test + public void testStatusWithInconsistentFilename() throws Throwable { + intercept(IllegalArgumentException.class, TESTFILE, () -> + helper.prepareToOpenFile(TESTPATH, + params(INPUT_FADVISE, "normal") + .withStatus(new S3AFileStatus(true, + new Path(TESTFILE + "-"), USERNAME)), + 0)); + } + + /** + * If a file length option is set, create a file status + */ + @Test + public void testFileLength() throws Throwable { + ObjectAssert asst = + assertFI(helper.prepareToOpenFile(TESTPATH, + params(FS_OPT_OPENFILE_LENGTH, "8192") + .withStatus(null), + 0)); + asst.extracting(f -> f.getStatus()) + .isNotNull(); + asst.extracting(f -> f.getStatus().getPath()) + .isEqualTo(TESTPATH); + asst.extracting(f -> f.getStatus().getLen()) + .isEqualTo(8192L); + } + + + private S3AFileStatus status(final Path p, final int length) { + return new S3AFileStatus(length, 0, + p, 0, "", "etag", "version"); + } + + private OpenFileParameters params(final String key, final String val) { + return new OpenFileParameters() + .withMandatoryKeys(singleton(key)) + .withOptions(conf(key, val)); + } + + private Configuration conf(String key, Object val) { + Configuration c = new Configuration(false); + c.set(key, val.toString()); + return c; + } +}