From 33547e06eac8b6e725b219245681677038ec17de Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 12 Dec 2019 18:44:48 +0000 Subject: [PATCH 1/4] HADOOP-16759. Filesystem openFile() builder to take a FileStatus param * Enhanced builder + FS spec * s3a FS to use this to skip HEAD on open * and to use version/etag when opening the file works with S3AFileStatus FS and S3ALocatedFileStatus Change-Id: If80f73137643fd50a969a92ad5794d0d09e3aee6 --- .../apache/hadoop/fs/AbstractFileSystem.java | 15 ++-- .../apache/hadoop/fs/ChecksumFileSystem.java | 12 ++- .../hadoop/fs/DelegateToFileSystem.java | 17 ++-- .../org/apache/hadoop/fs/FileContext.java | 11 ++- .../java/org/apache/hadoop/fs/FileSystem.java | 41 ++++----- .../apache/hadoop/fs/FilterFileSystem.java | 15 ++-- .../java/org/apache/hadoop/fs/FilterFs.java | 9 +- .../fs/FutureDataInputStreamBuilder.java | 11 +++ .../FutureDataInputStreamBuilderImpl.java | 21 +++++ .../hadoop/fs/impl/OpenFileParameters.java | 87 +++++++++++++++++++ .../site/markdown/filesystem/filesystem.md | 23 +++-- .../filesystem/fsdatainputstreambuilder.md | 41 +++++++++ .../fs/contract/AbstractContractOpenTest.java | 1 + .../apache/hadoop/fs/s3a/S3AFileSystem.java | 63 +++++++++++--- .../fs/s3a/ITestS3ARemoteFileChanged.java | 69 ++++++++++++--- .../hadoop/fs/s3a/select/ITestS3Select.java | 1 + 16 files changed, 341 insertions(+), 96 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java index 0453ca14537c3..b8b3ef8a418fb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -1355,22 +1356,20 @@ public boolean equals(Object other) { * setting up the expectation that the {@code get()} call * is needed to evaluate the result. * @param path path to the file - * @param mandatoryKeys set of options declared as mandatory. - * @param options options set during the build sequence. - * @param bufferSize buffer size + * @param parameters open file parameters from the builder. * @return a future which will evaluate to the opened file. * @throws IOException failure to resolve the link. * @throws IllegalArgumentException unknown mandatory key */ public CompletableFuture openFileWithOptions(Path path, - Set mandatoryKeys, - Configuration options, - int bufferSize) throws IOException { - AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys, + final OpenFileParameters parameters) throws IOException { + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( + parameters.getMandatoryKeys(), Collections.emptySet(), "for " + path); return LambdaUtils.eval( - new CompletableFuture<>(), () -> open(path, bufferSize)); + new CompletableFuture<>(), () -> + open(path, parameters.getBufferSize())); } public boolean hasPathCapability(final Path path, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 5e5d29a28bfce..28de73a9c7530 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -27,8 +27,6 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; -import java.util.Locale; -import java.util.Set; import java.util.concurrent.CompletableFuture; import com.google.common.base.Preconditions; @@ -37,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.DataChecksum; @@ -845,14 +844,13 @@ public FutureDataInputStreamBuilder openFile(final Path path) @Override protected CompletableFuture openFileWithOptions( final Path path, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { - AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys, + final OpenFileParameters parameters) throws IOException { + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( + parameters.getMandatoryKeys(), Collections.emptySet(), "for " + path); return LambdaUtils.eval( - new CompletableFuture<>(), () -> open(path, bufferSize)); + new CompletableFuture<>(), () -> open(path, parameters.getBufferSize())); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java index a8f294f379158..3a139781e0372 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java @@ -24,13 +24,13 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Options.ChecksumOpt; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; @@ -266,20 +266,17 @@ public List> getDelegationTokens(String renewer) throws IOException { /** * Open a file by delegating to - * {@link FileSystem#openFileWithOptions(Path, Set, Configuration, int)}. + * {@link FileSystem#openFileWithOptions(Path, org.apache.hadoop.fs.impl.OpenFileParameters)}. * @param path path to the file - * @param mandatoryKeys set of options declared as mandatory. - * @param options options set during the build sequence. - * @param bufferSize buffer size - * @return a future which will evaluate to the opened file. + * @param parameters open file parameters from the builder. + * + * @return a future which will evaluate to the opened file.ControlAlpha * @throws IOException failure to resolve the link. * @throws IllegalArgumentException unknown mandatory key */ public CompletableFuture openFileWithOptions(Path path, - Set mandatoryKeys, - Configuration options, - int bufferSize) throws IOException { - return fsImpl.openFileWithOptions(path, mandatoryKeys, options, bufferSize); + final OpenFileParameters parameters) throws IOException { + return fsImpl.openFileWithOptions(path, parameters); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index b2c1369a9c1fe..3532ff0e8888a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -47,7 +47,7 @@ import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; import org.apache.hadoop.fs.impl.FsLinkResolution; -import org.apache.hadoop.fs.impl.PathCapabilitiesSupport; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -2924,6 +2924,11 @@ protected FSDataInputStreamBuilder( @Override public CompletableFuture build() throws IOException { final Path absF = fixRelativePart(getPath()); + OpenFileParameters parameters = new OpenFileParameters(); + parameters.setMandatoryKeys(getMandatoryKeys()); + parameters.setOptions(getOptions()); + parameters.setBufferSize(getBufferSize()); + parameters.setStatus(getStatus()); return new FSLinkResolver>() { @Override public CompletableFuture next( @@ -2931,9 +2936,7 @@ public CompletableFuture next( final Path p) throws IOException { return fs.openFileWithOptions(p, - getMandatoryKeys(), - getOptions(), - getBufferSize()); + parameters); } }.resolve(FileContext.this, absF); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index d2fddf8d8e235..c0c75e74988f6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -58,6 +58,7 @@ import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -4458,43 +4459,39 @@ public FutureDataInputStreamBuilder openFile(PathHandle pathHandle) * the action of opening the file should begin. * * The base implementation performs a blocking - * call to {@link #open(Path, int)}in this call; + * call to {@link #open(Path, int)} in this call; * the actual outcome is in the returned {@code CompletableFuture}. * This avoids having to create some thread pool, while still * setting up the expectation that the {@code get()} call * is needed to evaluate the result. * @param path path to the file - * @param mandatoryKeys set of options declared as mandatory. - * @param options options set during the build sequence. - * @param bufferSize buffer size + * @param parameters open file parameters from the builder. * @return a future which will evaluate to the opened file. * @throws IOException failure to resolve the link. * @throws IllegalArgumentException unknown mandatory key */ protected CompletableFuture openFileWithOptions( final Path path, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { - AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys, + final OpenFileParameters parameters) throws IOException { + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( + parameters.getMandatoryKeys(), Collections.emptySet(), "for " + path); return LambdaUtils.eval( - new CompletableFuture<>(), () -> open(path, bufferSize)); + new CompletableFuture<>(), () -> + open(path, parameters.getBufferSize())); } /** * Execute the actual open file operation. * The base implementation performs a blocking - * call to {@link #open(Path, int)}in this call; + * call to {@link #open(Path, int)} in this call; * the actual outcome is in the returned {@code CompletableFuture}. * This avoids having to create some thread pool, while still * setting up the expectation that the {@code get()} call * is needed to evaluate the result. * @param pathHandle path to the file - * @param mandatoryKeys set of options declared as mandatory. - * @param options options set during the build sequence. - * @param bufferSize buffer size + * @param parameters open file parameters from the builder. * @return a future which will evaluate to the opened file. * @throws IOException failure to resolve the link. * @throws IllegalArgumentException unknown mandatory key @@ -4503,14 +4500,13 @@ protected CompletableFuture openFileWithOptions( */ protected CompletableFuture openFileWithOptions( final PathHandle pathHandle, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { - AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys, + final OpenFileParameters parameters) throws IOException { + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( + parameters.getMandatoryKeys(), Collections.emptySet(), ""); CompletableFuture result = new CompletableFuture<>(); try { - result.complete(open(pathHandle, bufferSize)); + result.complete(open(pathHandle, parameters.getBufferSize())); } catch (UnsupportedOperationException tx) { // fail fast here throw tx; @@ -4612,12 +4608,17 @@ protected FSDataInputStreamBuilder( @Override public CompletableFuture build() throws IOException { Optional optionalPath = getOptionalPath(); + OpenFileParameters parameters = new OpenFileParameters(); + parameters.setMandatoryKeys(getMandatoryKeys()); + parameters.setOptions(getOptions()); + parameters.setBufferSize(getBufferSize()); + parameters.setStatus(getStatus()); if(optionalPath.isPresent()) { return getFS().openFileWithOptions(optionalPath.get(), - getMandatoryKeys(), getOptions(), getBufferSize()); + parameters); } else { return getFS().openFileWithOptions(getPathHandle(), - getMandatoryKeys(), getOptions(), getBufferSize()); + parameters); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index 3bc3cb2e9b07a..cf12ea3898a7f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -25,12 +25,12 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -714,20 +714,15 @@ public FutureDataInputStreamBuilder openFile(final PathHandle pathHandle) @Override protected CompletableFuture openFileWithOptions( final Path path, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { - return fs.openFileWithOptions(path, mandatoryKeys, options, bufferSize); + final OpenFileParameters parameters) throws IOException { + return fs.openFileWithOptions(path, parameters); } @Override protected CompletableFuture openFileWithOptions( final PathHandle pathHandle, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { - return fs.openFileWithOptions(pathHandle, mandatoryKeys, options, - bufferSize); + final OpenFileParameters parameters) throws IOException { + return fs.openFileWithOptions(pathHandle, parameters); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java index 731a52a7b4137..e197506edc88b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java @@ -26,13 +26,12 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -440,10 +439,8 @@ public Collection getAllStoragePolicies() @Override public CompletableFuture openFileWithOptions( final Path path, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { - return myFs.openFileWithOptions(path, mandatoryKeys, options, bufferSize); + final OpenFileParameters parameters) throws IOException { + return myFs.openFileWithOptions(path, parameters); } public boolean hasPathCapability(final Path path, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java index 774d30927df2c..27a522e593001 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java @@ -47,4 +47,15 @@ public interface FutureDataInputStreamBuilder CompletableFuture build() throws IllegalArgumentException, UnsupportedOperationException, IOException; + + /** + * A FileStatus may be provided to the open request. + * It is up to the implementation whether to use this or not. + * @param status status. + * @return the builder. + */ + default FutureDataInputStreamBuilder withFileStatus(FileStatus status) { + return this; + } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java index 2aa4a5d95fcc7..1ef1f4e763213 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; @@ -60,6 +61,12 @@ public abstract class FutureDataInputStreamBuilderImpl private int bufferSize; + /** + * File status passed in through a {@link #withFileStatus(FileStatus)} + * call; null otherwise. + */ + private FileStatus status; + /** * Construct from a {@link FileContext}. * @@ -138,4 +145,18 @@ public FutureDataInputStreamBuilder builder() { public FutureDataInputStreamBuilder getThisBuilder() { return this; } + + @Override + public FutureDataInputStreamBuilder withFileStatus(FileStatus status) { + this.status = checkNotNull(status); + return this; + } + + /** + * Get any status set in {@link #withFileStatus(FileStatus)} + * @return a status value or null. + */ + protected FileStatus getStatus() { + return status; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java new file mode 100644 index 0000000000000..570086d4862f6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; + +/** + * All the parameters from the openFile builder for the {@code openFileWithOptions} commands. + * + * If/when new attributes added to the builder, this class will be extended. + */ +public class OpenFileParameters { + + /** + * Set of options declared as mandatory. + */ + private Set mandatoryKeys; + + /** + * Options set during the build sequence. + */ + private Configuration options; + + /** + * Buffer size. + */ + private int bufferSize; + + /** + * Optional file status. + */ + private FileStatus status; + + public OpenFileParameters() { + } + + public Set getMandatoryKeys() { + return mandatoryKeys; + } + + public void setMandatoryKeys(final Set mandatoryKeys) { + this.mandatoryKeys = mandatoryKeys; + } + + public Configuration getOptions() { + return options; + } + + public void setOptions(final Configuration options) { + this.options = options; + } + + public int getBufferSize() { + return bufferSize; + } + + public void setBufferSize(final int bufferSize) { + this.bufferSize = bufferSize; + } + + public FileStatus getStatus() { + return status; + } + + public void setStatus(final FileStatus status) { + this.status = status; + } +} diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index abbd2293fb2e9..07a48f9049f71 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -718,24 +718,29 @@ exists in the metadata, but no copies of any its blocks can be located; Creates a [`FSDataInputStreamBuilder`](fsdatainputstreambuilder.html) to construct a operation to open the file at `path` for reading. - When `build()` is invoked on the returned `FSDataInputStreamBuilder` instance, the builder parameters are verified and -`openFileWithOptions(Path, Set, Configuration, int)` invoked. +`openFileWithOptions(Path, OpenFileParameters)` invoked. This (protected) operation returns a `CompletableFuture` which, when its `get()` method is called, either returns an input stream of the contents of opened file, or raises an exception. -The base implementation of the `openFileWithOptions(PathHandle, Set, Configuration, int)` +The base implementation of the `openFileWithOptions(PathHandle, OpenFileParameters)` ultimately invokes `open(Path, int)`. Thus the chain `openFile(path).build().get()` has the same preconditions and postconditions as `open(Path p, int bufferSize)` +However, there is one difference which implementations are free to +take advantage of: + +The returned stream MAY implement a lazy open where file non-existence or +access permission failures may not surface until the first `read()` of the +actual data. -The `openFile()` operation may check the state of the filesystem during this -call, but as the state of the filesystem may change betwen this call and +The `openFile()` operation may check the state of the filesystem during its +invocation, but as the state of the filesystem may change betwen this call and the actual `build()` and `get()` operations, this file-specific preconditions (file exists, file is readable, etc) MUST NOT be checked here. @@ -766,6 +771,10 @@ It SHOULD be possible to always open a file without specifying any options, so as to present a consistent model to users. However, an implementation MAY opt to require one or more mandatory options to be set. +The returned stream may perform "lazy" evaluation of file access. This is +relevant for object stores where the probes for existence are expensive, and, +even with an asynchronous open, may be considered needless. + ### `FSDataInputStreamBuilder openFile(PathHandle)` Creates a `FSDataInputStreamBuilder` to build an operation to open a file. @@ -774,13 +783,13 @@ to construct a operation to open the file identified by the given `PathHandle` f When `build()` is invoked on the returned `FSDataInputStreamBuilder` instance, the builder parameters are verified and -`openFileWithOptions(PathHandle, Set, Configuration, int)` invoked. +`openFileWithOptions(PathHandle, OpenFileParameters)` invoked. This (protected) operation returns a `CompletableFuture` which, when its `get()` method is called, either returns an input stream of the contents of opened file, or raises an exception. -The base implementation of the `openFileWithOptions(Path,PathHandle, Set, Configuration, int)` method +The base implementation of the `openFileWithOptions(PathHandle, OpenFileParameters)` method returns a future which invokes `open(Path, int)`. Thus the chain `openFile(pathhandle).build().get()` has the same preconditions diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md index a7c393d9a41c1..eadba174fc1a6 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md @@ -43,6 +43,31 @@ path validation. Set the size of the buffer to be used. +### `FSDataInputStreamBuilder withFileStatus(FileStatus status)` + +A `FileStatus` instance which refers to the file being opened. + +This MAY be used by implementations to short-circuit checks for the file, +So potentially saving on remote calls especially to object stores. + +Requirements: + +* `status != null` +* `status.getPath()` == the resolved path of the file being opened. + +The path validation MUST take place if the store uses the `FileStatus` when +it opens files, and MAY be performed otherwise. The validation +SHOULD be postponed until the `build()` operation. + +This operation should be considered a hint to the filesystem. + +If a filesystem implementation extends the `FileStatus` returned in its +implementation MAY use this information when opening the file. + +This is relevant with those stores which return version/etag information, +including the S3A and ABFS connectors -they MAY use this to guarantee that +the file they opened is exactly the one returned in the listing. + ### Set optional or mandatory parameters FSDataInputStreamBuilder opt(String key, ...) @@ -56,6 +81,7 @@ of `FileSystem`. out = fs.openFile(path) .opt("fs.s3a.experimental.input.fadvise", "random") .must("fs.s3a.readahead.range", 256 * 1024) + .withFileStatus(statusFromListing) .build() .get(); ``` @@ -76,6 +102,21 @@ builder methods (i.e., `bufferSize()`) and `opt()`/`must()` is as follows: > The last option specified defines the value and its optional/mandatory state. +If the `FileStatus` option passed in `withFileStatus()` is used, implementations +MUST accept all subclasses of `FileStatus`, including `LocatedFileStatus`, +rather than just any FS-specific subclass implemented by the implementation +(e.g `S3AFileStatus`). They MAY simply ignore those which are not the +custom subclasses. + +This is critical to ensure safe use of the feature: directory listing/ +status serialization/deserialization can result result in the `withFileStatus()` +argumennt not being the custom subclass returned by the Filesystem instance's +own `getFileStatus()`, `listFiles()`, `listLocatedStatus()` calls, etc. + +In such a situation the implementations must: + +1. Validate the path (always). +1. Use the status/convert to the custom type, *or* simply discard it. ## Builder interface diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java index b6e94a664165e..d8e2230c82d06 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java @@ -281,6 +281,7 @@ public void testOpenFileApplyRead() throws Throwable { createFile(fs, path, true, dataset(len, 0x40, 0x80)); CompletableFuture readAllBytes = fs.openFile(path) + .withFileStatus(fs.getFileStatus(path)) .build() .thenApply(ContractTestUtils::readStream); assertEquals("Wrong number of bytes read value", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 615be2cfbdb95..d91fd465e1e76 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -95,6 +95,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Globber; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.s3a.auth.SignerManager; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; @@ -978,23 +979,33 @@ protected URI canonicalizeUri(URI rawUri) { @Retries.RetryTranslated public FSDataInputStream open(Path f, int bufferSize) throws IOException { - return open(f, Optional.empty()); + return open(f, Optional.empty(), null); } /** * Opens an FSDataInputStream at the indicated Path. - * @param path the file to open + * if status contains an S3AFileStatus reference, it is used + * and so a HEAD request to the store is avoided. + * + * @param file the file to open * @param options configuration options if opened with the builder API. + * @param status optional file status. * @throws IOException IO failure. */ @Retries.RetryTranslated private FSDataInputStream open( - final Path path, - final Optional options) + final Path file, + final Optional options, + final S3AFileStatus status) throws IOException { entryPoint(INVOCATION_OPEN); - final S3AFileStatus fileStatus = (S3AFileStatus) getFileStatus(path); + final Path path = qualify(file); + // if no suitable status was supplied: look for the file. + // this will raise an FNFE if there's no normal file there + final S3AFileStatus fileStatus = status != null + ? status + : (S3AFileStatus) getFileStatus(path); if (fileStatus.isDirectory()) { throw new FileNotFoundException("Can't open " + path + " because it is a directory"); @@ -4367,9 +4378,8 @@ private void requireSelectSupport(final Path source) throws /** * Initiate the open or select operation. * This is invoked from both the FileSystem and FileContext APIs - * @param path path to the file - * @param mandatoryKeys set of options declared as mandatory. - * @param options options set during the build sequence. + * @param rawPath path to the file + * @param parameters open file parameters from the builder. * @return a future which will evaluate to the opened/selected file. * @throws IOException failure to resolve the link. * @throws PathIOException operation is a select request but S3 select is @@ -4379,10 +4389,12 @@ private void requireSelectSupport(final Path source) throws @Override @Retries.RetryTranslated public CompletableFuture openFileWithOptions( - final Path path, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { + final Path rawPath, + final OpenFileParameters parameters) throws IOException { + final Path path = qualify(rawPath); + + Configuration options = parameters.getOptions(); + Set mandatoryKeys = parameters.getMandatoryKeys(); String sql = options.get(SelectConstants.SELECT_SQL, null); boolean isSelect = sql != null; // choice of keys depends on open type @@ -4397,15 +4409,38 @@ public CompletableFuture openFileWithOptions( InternalConstants.STANDARD_OPENFILE_KEYS, "for " + path + " in non-select file I/O"); } + FileStatus status = parameters.getStatus(); + S3AFileStatus fileStatus = null; + if (status != null) { + Preconditions.checkArgument(path.equals(status.getPath()), + "FileStatus parameter is not for the path %s: %s", + path, status); + if (status instanceof S3AFileStatus) { + // can use this status to skip our own probes, + // including etag and version. + LOG.debug("File was opened with a supplied S3AFileStatus;" + + " skipping getFileStatus call in open() operation: {}", status); + fileStatus = (S3AFileStatus) status; + } else if (status instanceof S3ALocatedFileStatus) { + LOG.debug("File was opened with a supplied S3ALocatedFileStatus;" + + " skipping getFileStatus call in open() operation: {}", status); + fileStatus = ((S3ALocatedFileStatus) status).toS3AFileStatus(); + } else { + LOG.debug("Ignoring file status {}", status); + } + } + // copy reference to stop the compiler complaining about the variable + // not being effectively final. + S3AFileStatus st = fileStatus; CompletableFuture result = new CompletableFuture<>(); if (!isSelect) { // normal path. unboundedThreadPool.submit(() -> LambdaUtils.eval(result, - () -> open(path, Optional.of(options)))); + () -> open(path, Optional.of(options), st))); } else { // it is a select statement. - // fail fast if the method is not present + // fail fast if the operation is not available requireSelectSupport(path); // submit the query unboundedThreadPool.submit(() -> diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java index 20f25f26c5979..ec0a0cf0aca98 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java @@ -49,6 +49,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Mode; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source; @@ -432,6 +433,42 @@ public void testReadFileChangedOutOfSyncMetadata() throws Throwable { } } + /** + * Verifies that when the openFile builder is passed in a status, + * then that is used to eliminate the getFileStatus call in open(); + * thus the version and etag passed down are still used. + */ + @Test + public void testOpenFileWithStatus() throws Throwable { + final Path testpath = path("testOpenFileWithStatus.dat"); + final byte[] dataset = TEST_DATA_BYTES; + S3AFileStatus originalStatus = + writeFile(testpath, dataset, dataset.length, true); + + // forge a file status with a different tag + S3AFileStatus forgedStatus = + S3AFileStatus.fromFileStatus(originalStatus, Tristate.FALSE, + originalStatus.getETag() + "-fake", + originalStatus.getVersionId() + ""); + fs.getMetadataStore().put( + new PathMetadata(forgedStatus, Tristate.FALSE, false)); + + // By passing in the status open() doesn't need to check s3guard + // And hence the existing file is opened + try (FSDataInputStream instream = fs.openFile(testpath) + .withFileStatus(originalStatus) + .build().get()) { + instream.read(); + } + + // and this holds for S3A Located Status + try (FSDataInputStream instream = fs.openFile(testpath) + .withFileStatus(new S3ALocatedFileStatus(originalStatus, null)) + .build().get()) { + instream.read(); + } + } + /** * Ensures a file can be read when there is no version metadata * (ETag, versionId). @@ -902,15 +939,11 @@ public void testEventuallyConsistentReadOnReopen() throws Throwable { private Path writeOutOfSyncFileVersion(String filename) throws IOException { final Path testpath = path(filename); final byte[] dataset = TEST_DATA_BYTES; - writeDataset(fs, testpath, dataset, dataset.length, - 1024, false); - S3AFileStatus originalStatus = (S3AFileStatus) fs.getFileStatus(testpath); + S3AFileStatus originalStatus = + writeFile(testpath, dataset, dataset.length, false); // overwrite with half the content - writeDataset(fs, testpath, dataset, dataset.length / 2, - 1024, true); - - S3AFileStatus newStatus = (S3AFileStatus) fs.getFileStatus(testpath); + S3AFileStatus newStatus = writeFile(testpath, dataset, dataset.length / 2, true); // put back the original etag, versionId S3AFileStatus forgedStatus = @@ -922,6 +955,23 @@ private Path writeOutOfSyncFileVersion(String filename) throws IOException { return testpath; } + /** + * Write data to a file; return the status from the filesystem. + * @param path file path + * @param dataset dataset to write from + * @param length number of bytes from the dataset to write. + * @param overwrite overwrite flag + * @return the retrieved file status. + */ + private S3AFileStatus writeFile(final Path path, + final byte[] dataset, + final int length, + final boolean overwrite) throws IOException { + writeDataset(fs, path, dataset, length, + 1024, overwrite); + return (S3AFileStatus) fs.getFileStatus(path); + } + /** * Writes {@link #TEST_DATA} to a file where the file will be inconsistent * in S3 for a set of operations. @@ -1208,9 +1258,8 @@ public ObjectMetadata answer(InvocationOnMock invocation) private Path writeFileWithNoVersionMetadata(String filename) throws IOException { final Path testpath = path(filename); - writeDataset(fs, testpath, TEST_DATA_BYTES, TEST_DATA_BYTES.length, - 1024, false); - S3AFileStatus originalStatus = (S3AFileStatus) fs.getFileStatus(testpath); + S3AFileStatus originalStatus = writeFile(testpath, TEST_DATA_BYTES, + TEST_DATA_BYTES.length, false); // remove ETag and versionId S3AFileStatus newStatus = S3AFileStatus.fromFileStatus(originalStatus, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java index d6058d19521be..18aba69fd33f2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java @@ -256,6 +256,7 @@ public void testSelectEmptyFile() throws Throwable { ContractTestUtils.touch(fs, path); parseToLines(fs.openFile(path) .must(SELECT_SQL, SELECT_EVERYTHING) + .withFileStatus(fs.getFileStatus(path)) .build() .get(), 0); From 19c9d4a1cef6bce8619c46116fa1bf6f1aa7e9c5 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 8 Jan 2020 18:09:46 +0000 Subject: [PATCH 2/4] HADOOP-16759 address Mingliang's comments * ~builder API for openFileParameters * S3AFS moves to Optional where appropriate * Which includes the select() operation too * extra test to verify pickup of invalid status when a valid one is not passed in * also contract test to verify that null value is rejected Change-Id: I391abb6030503fc6288c6494156b85391cb7c196 --- .../org/apache/hadoop/fs/FileContext.java | 13 ++- .../java/org/apache/hadoop/fs/FileSystem.java | 10 +-- .../hadoop/fs/impl/OpenFileParameters.java | 36 ++++---- .../fs/contract/AbstractContractOpenTest.java | 8 ++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 88 +++++++++++-------- .../fs/s3a/ITestS3ARemoteFileChanged.java | 12 ++- 6 files changed, 102 insertions(+), 65 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index 3532ff0e8888a..df93e89750ee0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -2924,19 +2924,18 @@ protected FSDataInputStreamBuilder( @Override public CompletableFuture build() throws IOException { final Path absF = fixRelativePart(getPath()); - OpenFileParameters parameters = new OpenFileParameters(); - parameters.setMandatoryKeys(getMandatoryKeys()); - parameters.setOptions(getOptions()); - parameters.setBufferSize(getBufferSize()); - parameters.setStatus(getStatus()); + OpenFileParameters parameters = new OpenFileParameters() + .withMandatoryKeys(getMandatoryKeys()) + .withOptions(getOptions()) + .withBufferSize(getBufferSize()) + .withStatus(getStatus()); return new FSLinkResolver>() { @Override public CompletableFuture next( final AbstractFileSystem fs, final Path p) throws IOException { - return fs.openFileWithOptions(p, - parameters); + return fs.openFileWithOptions(p, parameters); } }.resolve(FileContext.this, absF); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index c0c75e74988f6..4d1153f96d7df 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -4608,11 +4608,11 @@ protected FSDataInputStreamBuilder( @Override public CompletableFuture build() throws IOException { Optional optionalPath = getOptionalPath(); - OpenFileParameters parameters = new OpenFileParameters(); - parameters.setMandatoryKeys(getMandatoryKeys()); - parameters.setOptions(getOptions()); - parameters.setBufferSize(getBufferSize()); - parameters.setStatus(getStatus()); + OpenFileParameters parameters = new OpenFileParameters() + .withMandatoryKeys(getMandatoryKeys()) + .withOptions(getOptions()) + .withBufferSize(getBufferSize()) + .withStatus(getStatus()); if(optionalPath.isPresent()) { return getFS().openFileWithOptions(optionalPath.get(), parameters); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java index 570086d4862f6..ff1409013d81a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java @@ -53,35 +53,39 @@ public class OpenFileParameters { public OpenFileParameters() { } - public Set getMandatoryKeys() { - return mandatoryKeys; + public OpenFileParameters withMandatoryKeys(final Set mandatoryKeys) { + this.mandatoryKeys = mandatoryKeys; + return this; } - public void setMandatoryKeys(final Set mandatoryKeys) { - this.mandatoryKeys = mandatoryKeys; + public OpenFileParameters withOptions(final Configuration options) { + this.options = options; + return this; } - public Configuration getOptions() { - return options; + public OpenFileParameters withBufferSize(final int bufferSize) { + this.bufferSize = bufferSize; + return this; } - public void setOptions(final Configuration options) { - this.options = options; + public OpenFileParameters withStatus(final FileStatus status) { + this.status = status; + return this; } - public int getBufferSize() { - return bufferSize; + public Set getMandatoryKeys() { + return mandatoryKeys; } - public void setBufferSize(final int bufferSize) { - this.bufferSize = bufferSize; + public Configuration getOptions() { + return options; } - public FileStatus getStatus() { - return status; + public int getBufferSize() { + return bufferSize; } - public void setStatus(final FileStatus status) { - this.status = status; + public FileStatus getStatus() { + return status; } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java index d8e2230c82d06..a43053180fbf8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java @@ -303,4 +303,12 @@ public void testOpenFileApplyAsyncRead() throws Throwable { accepted.get()); } + @Test + public void testOpenFileNullStatus() throws Throwable { + describe("use openFile() with a null status"); + Path path = path("testOpenFileNullStatus"); + intercept(NullPointerException.class, + () -> getFileSystem().openFile(path).withFileStatus(null)); + } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index d91fd465e1e76..bc52cf054ce16 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -979,7 +979,7 @@ protected URI canonicalizeUri(URI rawUri) { @Retries.RetryTranslated public FSDataInputStream open(Path f, int bufferSize) throws IOException { - return open(f, Optional.empty(), null); + return open(f, Optional.empty(), Optional.empty()); } /** @@ -989,27 +989,19 @@ public FSDataInputStream open(Path f, int bufferSize) * * @param file the file to open * @param options configuration options if opened with the builder API. - * @param status optional file status. + * @param providedStatus optional file status. * @throws IOException IO failure. */ @Retries.RetryTranslated private FSDataInputStream open( final Path file, final Optional options, - final S3AFileStatus status) + final Optional providedStatus) throws IOException { entryPoint(INVOCATION_OPEN); final Path path = qualify(file); - // if no suitable status was supplied: look for the file. - // this will raise an FNFE if there's no normal file there - final S3AFileStatus fileStatus = status != null - ? status - : (S3AFileStatus) getFileStatus(path); - if (fileStatus.isDirectory()) { - throw new FileNotFoundException("Can't open " + path - + " because it is a directory"); - } + S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, providedStatus); S3AReadOpContext readContext; if (options.isPresent()) { @@ -4314,22 +4306,20 @@ protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) { * @param source path to source data * @param expression select expression * @param options request configuration from the builder. + * @param providedStatus any passed in status * @return the stream of the results * @throws IOException IO failure */ @Retries.RetryTranslated private FSDataInputStream select(final Path source, final String expression, - final Configuration options) + final Configuration options, + final Optional providedStatus) throws IOException { entryPoint(OBJECT_SELECT_REQUESTS); requireSelectSupport(source); final Path path = makeQualified(source); - // call getFileStatus(), which will look at S3Guard first, - // so the operation will fail if it is not there or S3Guard believes it has - // been deleted. - // validation of the file status are delegated to the binding. - final S3AFileStatus fileStatus = (S3AFileStatus) getFileStatus(path); + final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, providedStatus); // readahead range can be dynamically set long ra = options.getLong(READAHEAD_RANGE, readAhead); @@ -4337,10 +4327,16 @@ private FSDataInputStream select(final Path source, S3AReadOpContext readContext = createReadContext(fileStatus, inputPolicy, changeDetectionPolicy, ra); - if (!fileStatus.isDirectory()) { + if (changeDetectionPolicy.getSource() != ChangeDetectionPolicy.Source.None + && fileStatus.getETag() != null) { + // if there is change detection, and the status includes at least an + // etag, // check that the object metadata lines up with what is expected // based on the object attributes (which may contain an eTag or - // versionId) from S3Guard + // versionId). + // This is because the select API doesn't offer this. + // (note: this is trouble for version checking as cannot force the old + // version in the final read; nor can we check the etag match) ChangeTracker changeTracker = new ChangeTracker(uri.toString(), changeDetectionPolicy, @@ -4375,6 +4371,26 @@ private void requireSelectSupport(final Path source) throws } } + /** + * Extract the status from the optional parameter, falling + * back to a HEAD for the file only (not the directory). + * @param path path of the status + * @param status optional status + * @return a file status + * @throws FileNotFoundException if there is no normal file at that path + * @throws IOException IO failure + */ + private S3AFileStatus extractOrFetchSimpleFileStatus( + final Path path, final Optional status) + throws IOException { + if (status.isPresent()) { + return status.get(); + } else { + return innerGetFileStatus(path, false, + StatusProbeEnum.HEAD_ONLY); + } + } + /** * Initiate the open or select operation. * This is invoked from both the FileSystem and FileContext APIs @@ -4409,35 +4425,35 @@ public CompletableFuture openFileWithOptions( InternalConstants.STANDARD_OPENFILE_KEYS, "for " + path + " in non-select file I/O"); } - FileStatus status = parameters.getStatus(); + FileStatus providedStatus = parameters.getStatus(); S3AFileStatus fileStatus = null; - if (status != null) { - Preconditions.checkArgument(path.equals(status.getPath()), + if (providedStatus != null) { + Preconditions.checkArgument(path.equals(providedStatus.getPath()), "FileStatus parameter is not for the path %s: %s", - path, status); - if (status instanceof S3AFileStatus) { + path, providedStatus); + if (providedStatus instanceof S3AFileStatus) { // can use this status to skip our own probes, // including etag and version. LOG.debug("File was opened with a supplied S3AFileStatus;" - + " skipping getFileStatus call in open() operation: {}", status); - fileStatus = (S3AFileStatus) status; - } else if (status instanceof S3ALocatedFileStatus) { + + " skipping getFileStatus call in open() operation: {}", + providedStatus); + fileStatus = (S3AFileStatus) providedStatus; + } else if (providedStatus instanceof S3ALocatedFileStatus) { LOG.debug("File was opened with a supplied S3ALocatedFileStatus;" - + " skipping getFileStatus call in open() operation: {}", status); - fileStatus = ((S3ALocatedFileStatus) status).toS3AFileStatus(); + + " skipping getFileStatus call in open() operation: {}", + providedStatus); + fileStatus = ((S3ALocatedFileStatus) providedStatus).toS3AFileStatus(); } else { - LOG.debug("Ignoring file status {}", status); + LOG.debug("Ignoring file status {}", providedStatus); } } - // copy reference to stop the compiler complaining about the variable - // not being effectively final. - S3AFileStatus st = fileStatus; + Optional ost = Optional.of(fileStatus); CompletableFuture result = new CompletableFuture<>(); if (!isSelect) { // normal path. unboundedThreadPool.submit(() -> LambdaUtils.eval(result, - () -> open(path, Optional.of(options), st))); + () -> open(path, Optional.of(options), ost))); } else { // it is a select statement. // fail fast if the operation is not available @@ -4445,7 +4461,7 @@ public CompletableFuture openFileWithOptions( // submit the query unboundedThreadPool.submit(() -> LambdaUtils.eval(result, - () -> select(path, sql, options))); + () -> select(path, sql, options, ost))); } return result; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java index ec0a0cf0aca98..81a0fc041e1f9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java @@ -49,7 +49,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Mode; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source; @@ -467,6 +466,17 @@ public void testOpenFileWithStatus() throws Throwable { .build().get()) { instream.read(); } + try (FSDataInputStream instream = fs.openFile(testpath) + .build().get()) { + try { + instream.read(); + // No exception only if we don't enforce change detection as exception + assertTrue(changeDetectionMode.equals(CHANGE_DETECT_MODE_NONE) || + changeDetectionMode.equals(CHANGE_DETECT_MODE_WARN)); + } catch (Exception ignored) { + // Ignored. + } + } } /** From 6a1a951c0c519e2b4d067492abc4df22afb04705 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 8 Jan 2020 18:38:22 +0000 Subject: [PATCH 3/4] HADOOP-16759 address Mingliang's comments * approximate builder API for openFileParameters * S3AFS moves to Optional where appropriate, which includes the select() operation too. There's a common method to do the extraction. * Switch to objects.requireNonNull in stream builder / openFileParameters plus error text (NPE debugging...) Extra tests * verify pickup of invalid status when none is supplied * rejection of a status declaring the source is a directory * after an object is deleted, the 404 isn't picked up until the read() call initiates the GET. * And there, if you use server-side versionID, *you get the file still* * +contract test to verify that withFileStatus(null) status value is rejected Change-Id: I326f68538940f245e1af56d3b6055015fd3e1bfe --- .../java/org/apache/hadoop/fs/FileSystem.java | 2 +- .../FutureDataInputStreamBuilderImpl.java | 14 ++-- .../hadoop/fs/impl/OpenFileParameters.java | 9 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 34 +++++--- .../fs/s3a/ITestS3ARemoteFileChanged.java | 84 +++++++++++++++---- 5 files changed, 108 insertions(+), 35 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 4d1153f96d7df..10e3fdda9f12c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -4612,7 +4612,7 @@ public CompletableFuture build() throws IOException { .withMandatoryKeys(getMandatoryKeys()) .withOptions(getOptions()) .withBufferSize(getBufferSize()) - .withStatus(getStatus()); + .withStatus(super.getStatus()); // explicit so as to avoid IDE warnings if(optionalPath.isPresent()) { return getFS().openFileWithOptions(optionalPath.get(), parameters); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java index 1ef1f4e763213..40fc6bd2efa93 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java @@ -32,7 +32,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathHandle; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; @@ -76,8 +76,8 @@ public abstract class FutureDataInputStreamBuilderImpl */ protected FutureDataInputStreamBuilderImpl(@Nonnull FileContext fc, @Nonnull Path path) throws IOException { - super(checkNotNull(path)); - checkNotNull(fc); + super(requireNonNull(path, "path")); + requireNonNull(fc, "file context"); this.fileSystem = null; bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT; } @@ -89,8 +89,8 @@ protected FutureDataInputStreamBuilderImpl(@Nonnull FileContext fc, */ protected FutureDataInputStreamBuilderImpl(@Nonnull FileSystem fileSystem, @Nonnull Path path) { - super(checkNotNull(path)); - this.fileSystem = checkNotNull(fileSystem); + super(requireNonNull(path, "path")); + this.fileSystem = requireNonNull(fileSystem, "fileSystem"); initFromFS(); } @@ -115,7 +115,7 @@ private void initFromFS() { } protected FileSystem getFS() { - checkNotNull(fileSystem); + requireNonNull(fileSystem, "fileSystem"); return fileSystem; } @@ -148,7 +148,7 @@ public FutureDataInputStreamBuilder getThisBuilder() { @Override public FutureDataInputStreamBuilder withFileStatus(FileStatus status) { - this.status = checkNotNull(status); + this.status = requireNonNull(status, "status"); return this; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java index ff1409013d81a..1200974769830 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java @@ -23,8 +23,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import static java.util.Objects.requireNonNull; + /** - * All the parameters from the openFile builder for the {@code openFileWithOptions} commands. + * All the parameters from the openFile builder for the + * {@code openFileWithOptions} commands. * * If/when new attributes added to the builder, this class will be extended. */ @@ -54,12 +57,12 @@ public OpenFileParameters() { } public OpenFileParameters withMandatoryKeys(final Set mandatoryKeys) { - this.mandatoryKeys = mandatoryKeys; + this.mandatoryKeys = requireNonNull(mandatoryKeys); return this; } public OpenFileParameters withOptions(final Configuration options) { - this.options = options; + this.options = requireNonNull(options); return this; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index bc52cf054ce16..9a93f8e101b8f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -4372,23 +4372,35 @@ private void requireSelectSupport(final Path source) throws } /** - * Extract the status from the optional parameter, falling - * back to a HEAD for the file only (not the directory). + * Extract the status from the optional parameter, querying + * S3Guard/s3 if it is absent. * @param path path of the status - * @param status optional status + * @param optStatus optional status * @return a file status * @throws FileNotFoundException if there is no normal file at that path * @throws IOException IO failure */ private S3AFileStatus extractOrFetchSimpleFileStatus( - final Path path, final Optional status) + final Path path, final Optional optStatus) throws IOException { - if (status.isPresent()) { - return status.get(); + S3AFileStatus fileStatus; + if (optStatus.isPresent()) { + fileStatus = optStatus.get(); } else { - return innerGetFileStatus(path, false, + // this looks at s3guard and gets any type of status back, + // if it falls back to S3 it does a HEAD only. + // therefore: if there is no S3Guard and there is a dir, this + // will fail + fileStatus = innerGetFileStatus(path, false, StatusProbeEnum.HEAD_ONLY); } + // we check here for the passed in status or the S3Guard value + // for being a directory + if (fileStatus.isDirectory()) { + throw new FileNotFoundException("Can't open " + path + + " because it is a directory"); + } + return fileStatus; } /** @@ -4408,7 +4420,6 @@ public CompletableFuture openFileWithOptions( final Path rawPath, final OpenFileParameters parameters) throws IOException { final Path path = qualify(rawPath); - Configuration options = parameters.getOptions(); Set mandatoryKeys = parameters.getMandatoryKeys(); String sql = options.get(SelectConstants.SELECT_SQL, null); @@ -4426,7 +4437,7 @@ public CompletableFuture openFileWithOptions( "for " + path + " in non-select file I/O"); } FileStatus providedStatus = parameters.getStatus(); - S3AFileStatus fileStatus = null; + S3AFileStatus fileStatus; if (providedStatus != null) { Preconditions.checkArgument(path.equals(providedStatus.getPath()), "FileStatus parameter is not for the path %s: %s", @@ -4445,9 +4456,12 @@ public CompletableFuture openFileWithOptions( fileStatus = ((S3ALocatedFileStatus) providedStatus).toS3AFileStatus(); } else { LOG.debug("Ignoring file status {}", providedStatus); + fileStatus = null; } + } else { + fileStatus = null; } - Optional ost = Optional.of(fileStatus); + Optional ost = Optional.ofNullable(fileStatus); CompletableFuture result = new CompletableFuture<>(); if (!isSelect) { // normal path. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java index 81a0fc041e1f9..5835c4b7285d3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java @@ -20,7 +20,7 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; import java.util.Optional; @@ -444,7 +444,9 @@ public void testOpenFileWithStatus() throws Throwable { S3AFileStatus originalStatus = writeFile(testpath, dataset, dataset.length, true); - // forge a file status with a different tag + // forge a file status with a different etag + // no attempt is made to change the versionID as it will + // get rejected by S3 as an invalid version S3AFileStatus forgedStatus = S3AFileStatus.fromFileStatus(originalStatus, Tristate.FALSE, originalStatus.getETag() + "-fake", @@ -452,31 +454,83 @@ public void testOpenFileWithStatus() throws Throwable { fs.getMetadataStore().put( new PathMetadata(forgedStatus, Tristate.FALSE, false)); + // verify the bad etag gets picked up...which it does for etag but not + // version ID + LOG.info("Opening stream with s3guard's (invalid) status."); + try (FSDataInputStream instream = fs.openFile(testpath) + .build() + .get()) { + try { + instream.read(); + // No exception only if we don't enforce change detection as exception + assertTrue( + "Read did not raise an exception even though the change detection " + + "mode was " + changeDetectionMode + + " and the inserted file status was invalid", + changeDetectionMode.equals(CHANGE_DETECT_MODE_NONE) || + changeDetectionMode.equals(CHANGE_DETECT_MODE_WARN) + || changeDetectionSource.equals(CHANGE_DETECT_SOURCE_VERSION_ID)); + } catch (Exception ignored) { + // Ignored. + } + } + // By passing in the status open() doesn't need to check s3guard // And hence the existing file is opened + LOG.info("Opening stream with the original status."); try (FSDataInputStream instream = fs.openFile(testpath) .withFileStatus(originalStatus) - .build().get()) { + .build() + .get()) { instream.read(); } // and this holds for S3A Located Status + LOG.info("Opening stream with S3ALocatedFileStatus."); try (FSDataInputStream instream = fs.openFile(testpath) .withFileStatus(new S3ALocatedFileStatus(originalStatus, null)) - .build().get()) { + .build() + .get()) { instream.read(); } + + // if you pass in a status of a dir, it will be rejected + S3AFileStatus s2 = new S3AFileStatus(true, testpath, "alice"); + assertTrue("not a directory " + s2, s2.isDirectory()); + LOG.info("Open with directory status"); + interceptFuture(FileNotFoundException.class, "", + fs.openFile(testpath) + .withFileStatus(s2) + .build()); + + // now, we delete the file from the store and s3guard + // when we pass in the status, there's no HEAD request, so it's only + // in the read call where the 404 surfaces. + // and there, when versionID is passed to the GET, the data is returned + LOG.info("Testing opening a deleted file"); + fs.delete(testpath, false); try (FSDataInputStream instream = fs.openFile(testpath) - .build().get()) { - try { - instream.read(); - // No exception only if we don't enforce change detection as exception - assertTrue(changeDetectionMode.equals(CHANGE_DETECT_MODE_NONE) || - changeDetectionMode.equals(CHANGE_DETECT_MODE_WARN)); - } catch (Exception ignored) { - // Ignored. + .withFileStatus(originalStatus) + .build() + .get()) { + if (changeDetectionSource.equals(CHANGE_DETECT_SOURCE_VERSION_ID) + && changeDetectionMode.equals(CHANGE_DETECT_MODE_SERVER)) { + // the deleted file is still there if you know the version ID + // and the check is server-side + instream.read(); + } else { + // all other cases, the read will return 404. + intercept(FileNotFoundException.class, + () -> instream.read()); } + } + + // whereas without that status, you fail in the get() when a HEAD is + // issued + interceptFuture(FileNotFoundException.class, "", + fs.openFile(testpath).build()); + } /** @@ -571,9 +625,11 @@ public void testSelectWithNoVersionMetadata() throws Throwable { writeFileWithNoVersionMetadata("selectnoversion.dat"); try (FSDataInputStream instream = fs.openFile(testpath) - .must(SELECT_SQL, "SELECT * FROM S3OBJECT").build().get()) { + .must(SELECT_SQL, "SELECT * FROM S3OBJECT") + .build() + .get()) { assertEquals(QUOTED_TEST_DATA, - IOUtils.toString(instream, Charset.forName("UTF-8")).trim()); + IOUtils.toString(instream, StandardCharsets.UTF_8).trim()); } } From 76e3d46b67d76df4e86dd6ff6711956dc4c09ea9 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 20 Jan 2020 20:38:21 +0000 Subject: [PATCH 4/4] HADOOP-16759. withFileStatus(): checkstyle and testing Address the checkstyle issues. Add some tests of the API in ITestS3GuardOutOfBandOperations, for auth and non-auth. This includes the discovery (and fix!) of the fact that with the specific s3guard retry logic of HADOOP-16490, we needed to set a different retry option to get the tests to fail fast on deleted file in auth mode. It has been like that for a few months, but we've never noticed...though even in parallel runs it would have reduced performance by using up a process for 2+ minutes. Running in the IDE I initially thought my changes had broken something. Also: ITestS3Select fails as trying to select a dir raises an FNFE, just as open() always has. Because we skip looking for dir markers in select or open, attempts to read a nonexistent file will fail faster (though still add a 404 to the S3 cache) Change-Id: I33ffc90ce470c590143cebacc6efd2d2849d2106 --- .../apache/hadoop/fs/AbstractFileSystem.java | 1 - .../apache/hadoop/fs/ChecksumFileSystem.java | 3 +- .../java/org/apache/hadoop/fs/FileSystem.java | 2 +- .../FutureDataInputStreamBuilderImpl.java | 6 +- .../hadoop/fs/impl/OpenFileParameters.java | 16 +++--- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 13 +++-- .../fs/s3a/ITestS3ARemoteFileChanged.java | 24 ++++---- .../s3a/ITestS3GuardOutOfBandOperations.java | 56 +++++++++++++------ .../apache/hadoop/fs/s3a/S3ATestUtils.java | 27 +++++++++ .../hadoop/fs/s3a/select/ITestS3Select.java | 6 +- 10 files changed, 101 insertions(+), 53 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java index b8b3ef8a418fb..1df68b647c99a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 28de73a9c7530..cc9c284c9fa55 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -850,7 +850,8 @@ protected CompletableFuture openFileWithOptions( Collections.emptySet(), "for " + path); return LambdaUtils.eval( - new CompletableFuture<>(), () -> open(path, parameters.getBufferSize())); + new CompletableFuture<>(), + () -> open(path, parameters.getBufferSize())); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 10e3fdda9f12c..2cbcfb90b16eb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -4612,7 +4612,7 @@ public CompletableFuture build() throws IOException { .withMandatoryKeys(getMandatoryKeys()) .withOptions(getOptions()) .withBufferSize(getBufferSize()) - .withStatus(super.getStatus()); // explicit so as to avoid IDE warnings + .withStatus(super.getStatus()); // explicit to avoid IDE warnings if(optionalPath.isPresent()) { return getFS().openFileWithOptions(optionalPath.get(), parameters); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java index 40fc6bd2efa93..24a8d49747fe6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java @@ -147,13 +147,13 @@ public FutureDataInputStreamBuilder getThisBuilder() { } @Override - public FutureDataInputStreamBuilder withFileStatus(FileStatus status) { - this.status = requireNonNull(status, "status"); + public FutureDataInputStreamBuilder withFileStatus(FileStatus st) { + this.status = requireNonNull(st, "status"); return this; } /** - * Get any status set in {@link #withFileStatus(FileStatus)} + * Get any status set in {@link #withFileStatus(FileStatus)}. * @return a status value or null. */ protected FileStatus getStatus() { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java index 1200974769830..77b4ff52696a3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java @@ -56,23 +56,23 @@ public class OpenFileParameters { public OpenFileParameters() { } - public OpenFileParameters withMandatoryKeys(final Set mandatoryKeys) { - this.mandatoryKeys = requireNonNull(mandatoryKeys); + public OpenFileParameters withMandatoryKeys(final Set keys) { + this.mandatoryKeys = requireNonNull(keys); return this; } - public OpenFileParameters withOptions(final Configuration options) { - this.options = requireNonNull(options); + public OpenFileParameters withOptions(final Configuration opts) { + this.options = requireNonNull(opts); return this; } - public OpenFileParameters withBufferSize(final int bufferSize) { - this.bufferSize = bufferSize; + public OpenFileParameters withBufferSize(final int size) { + this.bufferSize = size; return this; } - public OpenFileParameters withStatus(final FileStatus status) { - this.status = status; + public OpenFileParameters withStatus(final FileStatus st) { + this.status = st; return this; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 9a93f8e101b8f..7b6117283bc2f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1001,7 +1001,8 @@ private FSDataInputStream open( entryPoint(INVOCATION_OPEN); final Path path = qualify(file); - S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, providedStatus); + S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, + providedStatus); S3AReadOpContext readContext; if (options.isPresent()) { @@ -4319,7 +4320,8 @@ private FSDataInputStream select(final Path source, entryPoint(OBJECT_SELECT_REQUESTS); requireSelectSupport(source); final Path path = makeQualified(source); - final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, providedStatus); + final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, + providedStatus); // readahead range can be dynamically set long ra = options.getLong(READAHEAD_RANGE, readAhead); @@ -4387,18 +4389,17 @@ private S3AFileStatus extractOrFetchSimpleFileStatus( if (optStatus.isPresent()) { fileStatus = optStatus.get(); } else { - // this looks at s3guard and gets any type of status back, + // this looks at S3guard and gets any type of status back, // if it falls back to S3 it does a HEAD only. // therefore: if there is no S3Guard and there is a dir, this - // will fail + // will raise a FileNotFoundException fileStatus = innerGetFileStatus(path, false, StatusProbeEnum.HEAD_ONLY); } // we check here for the passed in status or the S3Guard value // for being a directory if (fileStatus.isDirectory()) { - throw new FileNotFoundException("Can't open " + path - + " because it is a directory"); + throw new FileNotFoundException(path.toString() + " is a directory"); } return fileStatus; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java index 5835c4b7285d3..3fd70be931997 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java @@ -454,8 +454,7 @@ public void testOpenFileWithStatus() throws Throwable { fs.getMetadataStore().put( new PathMetadata(forgedStatus, Tristate.FALSE, false)); - // verify the bad etag gets picked up...which it does for etag but not - // version ID + // verify the bad etag gets picked up. LOG.info("Opening stream with s3guard's (invalid) status."); try (FSDataInputStream instream = fs.openFile(testpath) .build() @@ -467,10 +466,10 @@ public void testOpenFileWithStatus() throws Throwable { "Read did not raise an exception even though the change detection " + "mode was " + changeDetectionMode + " and the inserted file status was invalid", - changeDetectionMode.equals(CHANGE_DETECT_MODE_NONE) || - changeDetectionMode.equals(CHANGE_DETECT_MODE_WARN) - || changeDetectionSource.equals(CHANGE_DETECT_SOURCE_VERSION_ID)); - } catch (Exception ignored) { + changeDetectionMode.equals(CHANGE_DETECT_MODE_NONE) + || changeDetectionMode.equals(CHANGE_DETECT_MODE_WARN) + || changeDetectionSource.equals(CHANGE_DETECT_SOURCE_VERSION_ID)); + } catch (RemoteFileChangedException ignored) { // Ignored. } } @@ -482,7 +481,7 @@ public void testOpenFileWithStatus() throws Throwable { .withFileStatus(originalStatus) .build() .get()) { - instream.read(); + instream.read(); } // and this holds for S3A Located Status @@ -515,10 +514,10 @@ public void testOpenFileWithStatus() throws Throwable { .get()) { if (changeDetectionSource.equals(CHANGE_DETECT_SOURCE_VERSION_ID) && changeDetectionMode.equals(CHANGE_DETECT_MODE_SERVER)) { - // the deleted file is still there if you know the version ID - // and the check is server-side - instream.read(); - } else { + // the deleted file is still there if you know the version ID + // and the check is server-side + instream.read(); + } else { // all other cases, the read will return 404. intercept(FileNotFoundException.class, () -> instream.read()); @@ -1009,7 +1008,8 @@ private Path writeOutOfSyncFileVersion(String filename) throws IOException { writeFile(testpath, dataset, dataset.length, false); // overwrite with half the content - S3AFileStatus newStatus = writeFile(testpath, dataset, dataset.length / 2, true); + S3AFileStatus newStatus = writeFile(testpath, dataset, dataset.length / 2, + true); // put back the original etag, versionId S3AFileStatus forgedStatus = diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java index 96f913cdb0901..3cdb4e6eeee3b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java @@ -28,13 +28,13 @@ import java.util.stream.Collectors; import org.assertj.core.api.Assertions; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -42,7 +42,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source; import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; @@ -60,15 +59,17 @@ import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL; import static org.apache.hadoop.fs.s3a.Constants.RETRY_INTERVAL; import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_CONSISTENCY_RETRY_INTERVAL; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT; import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL; import static org.apache.hadoop.fs.s3a.S3ATestUtils.PROBE_INTERVAL_MILLIS; import static org.apache.hadoop.fs.s3a.S3ATestUtils.STABILIZATION_TIME; import static org.apache.hadoop.fs.s3a.S3ATestUtils.TIMESTAMP_SLEEP; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.awaitDeletedFileDisappearance; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.awaitFileStatus; import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingContainsPath; import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingDoesNotContainPath; import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.read; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.readWithStatus; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.test.LambdaTestUtils.eventually; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -163,9 +164,14 @@ protected Configuration createConfiguration() { // speeding up the tests removeBaseAndBucketOverrides(conf, RETRY_LIMIT, - RETRY_INTERVAL); + RETRY_INTERVAL, + S3GUARD_CONSISTENCY_RETRY_INTERVAL, + S3GUARD_CONSISTENCY_RETRY_LIMIT); conf.setInt(RETRY_LIMIT, 3); - conf.set(RETRY_INTERVAL, "10ms"); + conf.setInt(S3GUARD_CONSISTENCY_RETRY_LIMIT, 3); + final String delay = "10ms"; + conf.set(RETRY_INTERVAL, delay); + conf.set(S3GUARD_CONSISTENCY_RETRY_INTERVAL, delay); return conf; } @@ -284,11 +290,6 @@ public void testLongerLengthOverwrite() throws Exception { @Test public void testOutOfBandDeletes() throws Exception { - ChangeDetectionPolicy changeDetectionPolicy = - ((S3AFileSystem) getFileSystem()).getChangeDetectionPolicy(); - Assume.assumeFalse("FNF not expected when using a bucket with" - + " object versioning", - changeDetectionPolicy.getSource() == Source.VersionId); Path testFileName = path("OutOfBandDelete-" + UUID.randomUUID()); outOfBandDeletes(testFileName, authoritative); @@ -658,8 +659,22 @@ private void outOfBandDeletes( FileStatus status = guardedFs.getFileStatus(testFilePath); LOG.info("Authoritative: {} status path: {}", allowAuthoritative, status.getPath()); - expectExceptionWhenReading(testFilePath, text); - expectExceptionWhenReadingOpenFileAPI(testFilePath, text); + final boolean versionedChangeDetection = + getFileSystem().getChangeDetectionPolicy().getSource() + == Source.VersionId; + if (!versionedChangeDetection) { + expectExceptionWhenReading(testFilePath, text); + expectExceptionWhenReadingOpenFileAPI(testFilePath, text, null); + expectExceptionWhenReadingOpenFileAPI(testFilePath, text, status); + } else { + // FNFE not expected when using a bucket with object versioning + final String read1 = read(guardedFs, testFilePath); + assertEquals("File read from the auth FS", text, read1); + // and when the status is passed in, even the raw FS will ask for it + // via the versionId in the status + final String read2 = readWithStatus(rawFS, status); + assertEquals("File read from the raw FS", text, read2); + } } finally { guardedFs.delete(testFilePath, true); } @@ -957,7 +972,8 @@ private void deleteFileInListing() FileStatus status = guardedFs.getFileStatus(testFilePath); LOG.info("authoritative: {} status: {}", allowAuthoritative, status); expectExceptionWhenReading(testFilePath, text); - expectExceptionWhenReadingOpenFileAPI(testFilePath, text); + expectExceptionWhenReadingOpenFileAPI(testFilePath, text, null); + expectExceptionWhenReadingOpenFileAPI(testFilePath, text, status); } finally { guardedFs.delete(testDirPath, true); } @@ -983,14 +999,18 @@ private void expectExceptionWhenReading(Path testFilePath, String text) * We expect the read to fail with an FNFE: open will be happy. * @param testFilePath path of the test file * @param text the context in the file. + * @param status optional status for the withFileStatus operation. * @throws Exception failure other than the FNFE */ private void expectExceptionWhenReadingOpenFileAPI( - Path testFilePath, String text) + Path testFilePath, String text, FileStatus status) throws Exception { - try ( - FSDataInputStream in = guardedFs.openFile(testFilePath).build().get() - ) { + final FutureDataInputStreamBuilder builder + = guardedFs.openFile(testFilePath); + if (status != null) { + builder.withFileStatus(status); + } + try (FSDataInputStream in = builder.build().get()) { intercept(FileNotFoundException.class, () -> { byte[] bytes = new byte[text.length()]; return in.read(bytes, 0, bytes.length); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index db871a5f9177a..f75e37ecaaf37 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -66,12 +66,14 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; +import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture; import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*; import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -1184,6 +1186,31 @@ public static String read(FileSystem fs, } } + /** + * Read in a file and convert to an ascii string, using the openFile + * builder API and the file status. + * If the status is an S3A FileStatus, any etag or versionId used + * will be picked up. + * @param fs filesystem + * @param status file status, including path + * @return the bytes read and converted to a string + * @throws IOException IO problems + */ + public static String readWithStatus( + final FileSystem fs, + final FileStatus status) throws IOException { + final CompletableFuture future = + fs.openFile(status.getPath()) + .withFileStatus(status) + .build(); + + try (FSDataInputStream in = awaitFuture(future)) { + byte[] buf = new byte[(int) status.getLen()]; + in.readFully(0, buf); + return new String(buf); + } + } + /** * List a directory/directory tree. * @param fileSystem FS diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java index 18aba69fd33f2..64974db5a466c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java @@ -549,14 +549,14 @@ public void testSelectDirectoryFails() throws Throwable { FutureDataInputStreamBuilder builder = getFileSystem().openFile(dir) .must(SELECT_SQL, SELECT_ODD_ENTRIES); - interceptFuture(PathIOException.class, + interceptFuture(FileNotFoundException.class, "", builder.build()); // try the parent builder = getFileSystem().openFile(dir.getParent()) .must(SELECT_SQL, SELECT_ODD_ENTRIES); - interceptFuture(PathIOException.class, + interceptFuture(FileNotFoundException.class, "", builder.build()); } @@ -566,7 +566,7 @@ public void testSelectRootFails() throws Throwable { FutureDataInputStreamBuilder builder = getFileSystem().openFile(path("/")) .must(SELECT_SQL, SELECT_ODD_ENTRIES); - interceptFuture(PathIOException.class, + interceptFuture(FileNotFoundException.class, "", builder.build()); }