-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-16202. Enhance/Stabilize openFile() #2168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a408a4b
89a1642
8d03de3
72437de
7d0bd3a
66243dc
f56ea29
0e88cec
0966a28
194c1fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> { | |
| */ | ||
| B opt(@Nonnull String key, float value); | ||
|
|
||
| /** | ||
| * Set optional long parameter for the Builder. | ||
| * | ||
| * @see #opt(String, String) | ||
| */ | ||
| B opt(@Nonnull String key, long value); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could use properties (getter and setter) for the standard options. We'd have one for buffer size, one for file length, etc. |
||
|
|
||
| /** | ||
| * Set optional double parameter for the Builder. | ||
| * | ||
|
|
@@ -104,6 +111,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> { | |
| */ | ||
| B must(@Nonnull String key, float value); | ||
|
|
||
| /** | ||
| * Set mandatory long option. | ||
| * | ||
| * @see #must(String, String) | ||
| */ | ||
| B must(@Nonnull String key, long value); | ||
|
|
||
| /** | ||
| * Set mandatory double option. | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -91,6 +91,7 @@ | |
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; | ||
| import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument; | ||
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*; | ||
| import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; | ||
|
|
@@ -4627,7 +4628,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions( | |
| final OpenFileParameters parameters) throws IOException { | ||
| AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( | ||
| parameters.getMandatoryKeys(), | ||
| Collections.emptySet(), | ||
| Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, | ||
| "for " + path); | ||
| return LambdaUtils.eval( | ||
| new CompletableFuture<>(), () -> | ||
|
|
@@ -4655,7 +4656,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions( | |
| final OpenFileParameters parameters) throws IOException { | ||
| AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This rejectUnknownMandatoryKeys function would not be necessary if the mandatory keys were strongly typed fields. |
||
| parameters.getMandatoryKeys(), | ||
| Collections.emptySet(), ""); | ||
| Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, ""); | ||
| CompletableFuture<FSDataInputStream> result = new CompletableFuture<>(); | ||
| try { | ||
| result.complete(open(pathHandle, parameters.getBufferSize())); | ||
|
|
@@ -4762,9 +4763,13 @@ public CompletableFuture<FSDataInputStream> build() throws IOException { | |
| Optional<Path> optionalPath = getOptionalPath(); | ||
| OpenFileParameters parameters = new OpenFileParameters() | ||
| .withMandatoryKeys(getMandatoryKeys()) | ||
| .withOptionalKeys(getOptionalKeys()) | ||
| .withOptions(getOptions()) | ||
| .withBufferSize(getBufferSize()) | ||
| .withStatus(super.getStatus()); // explicit to avoid IDE warnings | ||
| // buffer size can be configured | ||
| parameters.withBufferSize( | ||
| getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, | ||
| getBufferSize())); | ||
| if(optionalPath.isPresent()) { | ||
| return getFS().openFileWithOptions(optionalPath.get(), | ||
| parameters); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,6 +71,11 @@ | |
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_FADVISE; | ||
| import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_FADVISE_SEQUENTIAL; | ||
| import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; | ||
| import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture; | ||
|
|
||
| /** | ||
| * A collection of file-processing util methods | ||
| */ | ||
|
|
@@ -390,7 +395,32 @@ public static boolean copy(FileSystem srcFS, Path src, | |
| return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf); | ||
| } | ||
|
|
||
| /** Copy files between FileSystems. */ | ||
| /** | ||
| * Copy a file/directory tree within/between filesystems. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Copy a file or directory between filesystems. |
||
| * <p></p> | ||
| * returns true if the operation succeeded. When deleteSource is true, | ||
| * this means "after the copy, delete(source) returned true" | ||
| * If the destination is a directory, and mkdirs (dest) fails, | ||
| * the operation will return false rather than raise any exception. | ||
| * <p></p> | ||
| * The overwrite flag is about overwriting files; it has no effect about | ||
| * handing an attempt to copy a file atop a directory (expect an IOException), | ||
| * or a directory over a path which contains a file (mkdir will fail, so | ||
| * "false"). | ||
| * <p></p> | ||
| * The operation is recursive, and the deleteSource operation takes place | ||
| * as each subdirectory is copied. Therefore, if an operation fails partway | ||
| * through, the source tree may be partially deleted. | ||
| * @param srcFS source filesystem | ||
| * @param srcStatus status of source | ||
| * @param dstFS destination filesystem | ||
| * @param dst path of source | ||
| * @param deleteSource delete the source? | ||
| * @param overwrite overwrite files at destination? | ||
| * @param conf configuration to use when opening files | ||
| * @return true if the operation succeeded. | ||
| * @throws IOException failure | ||
|
Comment on lines
+421
to
+422
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the benefit of returning true on success and throwing on a failure, as opposed to returning void and throwing on failure? |
||
| */ | ||
| public static boolean copy(FileSystem srcFS, FileStatus srcStatus, | ||
| FileSystem dstFS, Path dst, | ||
| boolean deleteSource, | ||
|
|
@@ -409,22 +439,27 @@ public static boolean copy(FileSystem srcFS, FileStatus srcStatus, | |
| if (!dstFS.mkdirs(dst)) { | ||
| return false; | ||
| } | ||
| FileStatus contents[] = srcFS.listStatus(src); | ||
| for (int i = 0; i < contents.length; i++) { | ||
| copy(srcFS, contents[i], dstFS, | ||
| new Path(dst, contents[i].getPath().getName()), | ||
| deleteSource, overwrite, conf); | ||
| RemoteIterator<FileStatus> contents = srcFS.listStatusIterator(src); | ||
| while (contents.hasNext()) { | ||
| FileStatus next = contents.next(); | ||
| copy(srcFS, next, dstFS, | ||
| new Path(dst, next.getPath().getName()), | ||
| deleteSource, overwrite, conf); | ||
| } | ||
| } else { | ||
| InputStream in=null; | ||
| InputStream in = null; | ||
| OutputStream out = null; | ||
| try { | ||
| in = srcFS.open(src); | ||
| in = awaitFuture(srcFS.openFile(src) | ||
| .opt(FS_OPTION_OPENFILE_FADVISE, | ||
| FS_OPTION_OPENFILE_FADVISE_SEQUENTIAL) | ||
| .opt(FS_OPTION_OPENFILE_LENGTH, | ||
| srcStatus.getLen()) // file length hint for object stores | ||
| .build()); | ||
| out = dstFS.create(dst, overwrite); | ||
| IOUtils.copyBytes(in, out, conf, true); | ||
| } catch (IOException e) { | ||
| IOUtils.closeStream(out); | ||
| IOUtils.closeStream(in); | ||
| IOUtils.cleanupWithLogger(LOG, in, out); | ||
| throw e; | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,9 +17,13 @@ | |
| */ | ||
| package org.apache.hadoop.fs; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.function.Function; | ||
| import java.util.function.BiFunction; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
|
|
||
| import org.apache.hadoop.classification.InterfaceAudience; | ||
| import org.apache.hadoop.classification.InterfaceStability; | ||
|
|
@@ -518,4 +522,94 @@ public enum ChecksumCombineMode { | |
| MD5MD5CRC, // MD5 of block checksums, which are MD5 over chunk CRCs | ||
| COMPOSITE_CRC // Block/chunk-independent composite CRC | ||
| } | ||
|
|
||
| /** | ||
| * The standard {@code openFile()} options. | ||
| */ | ||
| @InterfaceAudience.Public | ||
| @InterfaceStability.Evolving | ||
| public static final class OpenFileOptions { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may be time to break this file out into multiple files, each with one class.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. did have that initially, then pulled back in. No real preference |
||
|
|
||
| private OpenFileOptions() { | ||
| } | ||
|
|
||
| /** | ||
| * Prefix for all standard filesystem options: {@value}. | ||
| */ | ||
| public static final String FILESYSTEM_OPTION = "fs.option."; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you could remove this and define the constants below with a string literal, or at least make it private. |
||
|
|
||
| /** | ||
| * Prefix for all openFile options: {@value}. | ||
| */ | ||
| public static final String FS_OPTION_OPENFILE = | ||
| FILESYSTEM_OPTION + "openfile."; | ||
|
|
||
| /** | ||
| * OpenFile option for file length: {@value}. | ||
| */ | ||
| public static final String FS_OPTION_OPENFILE_LENGTH = | ||
| FS_OPTION_OPENFILE + "length"; | ||
|
|
||
| /** | ||
| * OpenFile option for split start: {@value}. | ||
| */ | ||
| public static final String FS_OPTION_OPENFILE_SPLIT_START = | ||
| FS_OPTION_OPENFILE + "split.start"; | ||
|
|
||
| /** | ||
| * OpenFile option for split end: {@value}. | ||
| */ | ||
| public static final String FS_OPTION_OPENFILE_SPLIT_END = | ||
| FS_OPTION_OPENFILE + "split.end"; | ||
|
|
||
| /** | ||
| * OpenFile option for buffer size: {@value}. | ||
| */ | ||
| public static final String FS_OPTION_OPENFILE_BUFFER_SIZE = | ||
| FS_OPTION_OPENFILE + "buffer.size"; | ||
|
|
||
| /** | ||
| * OpenFile option for seek policies: {@value}. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe you changed the terminology. "Seek policies" should be replaced with the new nomenclature, not only here, but everywhere. I'm ok with any nomenclature, but it helps to be consistent, so "read strategy", "read mode", or "read policy" all sound good to me. |
||
| */ | ||
| public static final String FS_OPTION_OPENFILE_FADVISE = | ||
| FS_OPTION_OPENFILE + "fadvise"; | ||
|
|
||
| /** | ||
| * fadvise policy 'normal' -up to implementation: {@value}. | ||
| */ | ||
| public static final String FS_OPTION_OPENFILE_FADVISE_NORMAL = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. None or default might be a better name for this than normal. |
||
| "normal"; | ||
|
|
||
| /** | ||
| * fadvise policy for sequential IO: {@value}. | ||
| */ | ||
| public static final String FS_OPTION_OPENFILE_FADVISE_SEQUENTIAL = | ||
| "sequential"; | ||
|
|
||
| /** | ||
| * fadvise policy for random: {@value}. | ||
| */ | ||
| public static final String FS_OPTION_OPENFILE_FADVISE_RANDOM = | ||
| "random"; | ||
|
|
||
| /** | ||
| * fadvise policy for adaptive IO: {@value}. | ||
| */ | ||
| public static final String FS_OPTION_OPENFILE_FADVISE_ADAPTIVE = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need adaptive? If the caller requests sequential, random, or normal, I think that will suffice. |
||
| "adaptive"; | ||
|
|
||
| /** | ||
| * Set of standard options which openFile implementations | ||
| * MUST recognize, even if they ignore the actual values. | ||
|
Comment on lines
+602
to
+603
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my comment in the builder about possibly making these properties (with getter and setter). |
||
| */ | ||
| public static final Set<String> FS_OPTION_OPENFILE_STANDARD_OPTIONS = | ||
| Collections.unmodifiableSet(Stream.of( | ||
| FS_OPTION_OPENFILE_BUFFER_SIZE, | ||
| FS_OPTION_OPENFILE_FADVISE, | ||
| FS_OPTION_OPENFILE_LENGTH, | ||
| FS_OPTION_OPENFILE_SPLIT_START, | ||
| FS_OPTION_OPENFILE_SPLIT_END) | ||
| .collect(Collectors.toSet())); | ||
|
|
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems ChecksumFileSystem should not override the base class implementation, since it has the same implementation as the base class.