Skip to content

Commit cd13fc3

Browse files
steveloughrandeepakdamri
authored andcommitted
HADOOP-16202. Enhanced openFile(): hadoop-common changes. (apache#2584/1)
This defines standard option and values for the openFile() builder API for opening a file: fs.option.openfile.read.policy A list of the desired read policy, in preferred order. standard values are adaptive, default, random, sequential, vector, whole-file fs.option.openfile.length How long the file is. fs.option.openfile.split.start start of a task's split fs.option.openfile.split.end end of a task's split These can be used by filesystem connectors to optimize their reading of the source file, including but not limited to * skipping existence/length probes when opening a file * choosing a policy for prefetching/caching data The hadoop shell commands which read files all declare "whole-file" and "sequential", as appropriate. Contributed by Steve Loughran. Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1
1 parent b2f0c3f commit cd13fc3

File tree

18 files changed

+486
-128
lines changed

18 files changed

+486
-128
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
2727

28+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
29+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
30+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
31+
2832
/** Adapts an {@link FSDataInputStream} to Avro's SeekableInput interface. */
2933
@InterfaceAudience.Public
3034
@InterfaceStability.Stable
@@ -42,7 +46,12 @@ public AvroFSInput(final FSDataInputStream in, final long len) {
4246
public AvroFSInput(final FileContext fc, final Path p) throws IOException {
4347
FileStatus status = fc.getFileStatus(p);
4448
this.len = status.getLen();
45-
this.stream = fc.open(p);
49+
this.stream = awaitFuture(fc.openFile(p)
50+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
51+
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
52+
.withFileStatus(status)
53+
.build());
54+
fc.open(p);
4655
}
4756

4857
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
6161
*/
6262
B opt(@Nonnull String key, float value);
6363

64+
/**
65+
* Set optional long parameter for the Builder.
66+
*
67+
* @see #opt(String, String)
68+
*/
69+
B opt(@Nonnull String key, long value);
70+
6471
/**
6572
* Set optional double parameter for the Builder.
6673
*
@@ -104,6 +111,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
104111
*/
105112
B must(@Nonnull String key, float value);
106113

114+
/**
115+
* Set mandatory long option.
116+
*
117+
* @see #must(String, String)
118+
*/
119+
B must(@Nonnull String key, long value);
120+
107121
/**
108122
* Set mandatory double option.
109123
*

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@
7070
import org.slf4j.Logger;
7171
import org.slf4j.LoggerFactory;
7272

73+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
74+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
75+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
76+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
7377
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
78+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
7479

7580
/**
7681
* The FileContext class provides an interface for users of the Hadoop
@@ -2194,7 +2199,12 @@ public boolean copy(final Path src, final Path dst, boolean deleteSource,
21942199
EnumSet<CreateFlag> createFlag = overwrite ? EnumSet.of(
21952200
CreateFlag.CREATE, CreateFlag.OVERWRITE) :
21962201
EnumSet.of(CreateFlag.CREATE);
2197-
InputStream in = open(qSrc);
2202+
InputStream in = awaitFuture(openFile(qSrc)
2203+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
2204+
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
2205+
.opt(FS_OPTION_OPENFILE_LENGTH,
2206+
fs.getLen()) // file length hint for object stores
2207+
.build());
21982208
try (OutputStream out = create(qDst, createFlag)) {
21992209
IOUtils.copyBytes(in, out, conf, true);
22002210
} finally {
@@ -2926,9 +2936,11 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
29262936
final Path absF = fixRelativePart(getPath());
29272937
OpenFileParameters parameters = new OpenFileParameters()
29282938
.withMandatoryKeys(getMandatoryKeys())
2939+
.withOptionalKeys(getOptionalKeys())
29292940
.withOptions(getOptions())
2930-
.withBufferSize(getBufferSize())
2931-
.withStatus(getStatus());
2941+
.withStatus(getStatus())
2942+
.withBufferSize(
2943+
getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
29322944
return new FSLinkResolver<CompletableFuture<FSDataInputStream>>() {
29332945
@Override
29342946
public CompletableFuture<FSDataInputStream> next(

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.fs;
1919

20+
import javax.annotation.Nullable;
2021
import java.io.IOException;
2122
import java.util.concurrent.CompletableFuture;
2223

@@ -34,7 +35,7 @@
3435
* options accordingly, for example:
3536
*
3637
* If the option is not related to the file system, the option will be ignored.
37-
* If the option is must, but not supported by the file system, a
38+
* If the option is must, but not supported/known by the file system, an
3839
* {@link IllegalArgumentException} will be thrown.
3940
*
4041
*/
@@ -51,10 +52,11 @@ CompletableFuture<FSDataInputStream> build()
5152
/**
5253
* A FileStatus may be provided to the open request.
5354
* It is up to the implementation whether to use this or not.
54-
* @param status status.
55+
* @param status status: may be null
5556
* @return the builder.
5657
*/
57-
default FutureDataInputStreamBuilder withFileStatus(FileStatus status) {
58+
default FutureDataInputStreamBuilder withFileStatus(
59+
@Nullable FileStatus status) {
5860
return this;
5961
}
6062

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717
*/
1818
package org.apache.hadoop.fs;
1919

20+
import java.util.Collections;
2021
import java.util.Optional;
22+
import java.util.Set;
2123
import java.util.function.Function;
2224
import java.util.function.BiFunction;
25+
import java.util.stream.Collectors;
26+
import java.util.stream.Stream;
2327

2428
import org.apache.hadoop.classification.InterfaceAudience;
2529
import org.apache.hadoop.classification.InterfaceStability;
@@ -518,4 +522,119 @@ public enum ChecksumCombineMode {
518522
MD5MD5CRC, // MD5 of block checksums, which are MD5 over chunk CRCs
519523
COMPOSITE_CRC // Block/chunk-independent composite CRC
520524
}
525+
526+
/**
527+
* The standard {@code openFile()} options.
528+
*/
529+
@InterfaceAudience.Public
530+
@InterfaceStability.Evolving
531+
public static final class OpenFileOptions {
532+
533+
private OpenFileOptions() {
534+
}
535+
536+
/**
537+
* Prefix for all standard filesystem options: {@value}.
538+
*/
539+
private static final String FILESYSTEM_OPTION = "fs.option.";
540+
541+
/**
542+
* Prefix for all openFile options: {@value}.
543+
*/
544+
public static final String FS_OPTION_OPENFILE =
545+
FILESYSTEM_OPTION + "openfile.";
546+
547+
/**
548+
* OpenFile option for file length: {@value}.
549+
*/
550+
public static final String FS_OPTION_OPENFILE_LENGTH =
551+
FS_OPTION_OPENFILE + "length";
552+
553+
/**
554+
* OpenFile option for split start: {@value}.
555+
*/
556+
public static final String FS_OPTION_OPENFILE_SPLIT_START =
557+
FS_OPTION_OPENFILE + "split.start";
558+
559+
/**
560+
* OpenFile option for split end: {@value}.
561+
*/
562+
public static final String FS_OPTION_OPENFILE_SPLIT_END =
563+
FS_OPTION_OPENFILE + "split.end";
564+
565+
/**
566+
* OpenFile option for buffer size: {@value}.
567+
*/
568+
public static final String FS_OPTION_OPENFILE_BUFFER_SIZE =
569+
FS_OPTION_OPENFILE + "buffer.size";
570+
571+
/**
572+
* OpenFile option for read policies: {@value}.
573+
*/
574+
public static final String FS_OPTION_OPENFILE_READ_POLICY =
575+
FS_OPTION_OPENFILE + "read.policy";
576+
577+
/**
578+
* Set of standard options which openFile implementations
579+
* MUST recognize, even if they ignore the actual values.
580+
*/
581+
public static final Set<String> FS_OPTION_OPENFILE_STANDARD_OPTIONS =
582+
Collections.unmodifiableSet(Stream.of(
583+
FS_OPTION_OPENFILE_BUFFER_SIZE,
584+
FS_OPTION_OPENFILE_READ_POLICY,
585+
FS_OPTION_OPENFILE_LENGTH,
586+
FS_OPTION_OPENFILE_SPLIT_START,
587+
FS_OPTION_OPENFILE_SPLIT_END)
588+
.collect(Collectors.toSet()));
589+
590+
/**
591+
* Read policy for adaptive IO: {@value}.
592+
*/
593+
public static final String FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE =
594+
"adaptive";
595+
596+
/**
597+
* Read policy {@value} -whateve the implementation does by default.
598+
*/
599+
public static final String FS_OPTION_OPENFILE_READ_POLICY_DEFAULT =
600+
"default";
601+
602+
/**
603+
* Read policy for random IO: {@value}.
604+
*/
605+
public static final String FS_OPTION_OPENFILE_READ_POLICY_RANDOM =
606+
"random";
607+
608+
/**
609+
* Read policy for sequential IO: {@value}.
610+
*/
611+
public static final String FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL =
612+
"sequential";
613+
614+
/**
615+
* Vectored IO API to be used: {@value}.
616+
*/
617+
public static final String FS_OPTION_OPENFILE_READ_POLICY_VECTOR =
618+
"vector";
619+
620+
/**
621+
* Whole file to be read, end-to-end: {@value}.
622+
*/
623+
public static final String FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE =
624+
"whole-file";
625+
626+
/**
627+
* All the current read policies as a set.
628+
*/
629+
public static final Set<String> FS_OPTION_OPENFILE_READ_POLICIES =
630+
Collections.unmodifiableSet(Stream.of(
631+
FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE,
632+
FS_OPTION_OPENFILE_READ_POLICY_DEFAULT,
633+
FS_OPTION_OPENFILE_READ_POLICY_RANDOM,
634+
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL,
635+
FS_OPTION_OPENFILE_READ_POLICY_VECTOR,
636+
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
637+
.collect(Collectors.toSet()));
638+
639+
}
521640
}

0 commit comments

Comments
 (0)