Skip to content

Commit 5e2ce37

Browse files
steveloughranliuml07
authored andcommitted
HADOOP-16759. Filesystem openFile() builder to take a FileStatus param (#1761). Contributed by Steve Loughran
* Enhanced builder + FS spec * s3a FS to use this to skip HEAD on open * and to use version/etag when opening the file works with S3AFileStatus FS and S3ALocatedFileStatus
1 parent 0696828 commit 5e2ce37

File tree

18 files changed

+544
-141
lines changed

18 files changed

+544
-141
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.NoSuchElementException;
34-
import java.util.Set;
3534
import java.util.StringTokenizer;
3635
import java.util.concurrent.CompletableFuture;
3736
import java.util.concurrent.ConcurrentHashMap;
@@ -45,6 +44,7 @@
4544
import org.apache.hadoop.fs.Options.CreateOpts;
4645
import org.apache.hadoop.fs.Options.Rename;
4746
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
47+
import org.apache.hadoop.fs.impl.OpenFileParameters;
4848
import org.apache.hadoop.fs.permission.AclEntry;
4949
import org.apache.hadoop.fs.permission.AclStatus;
5050
import org.apache.hadoop.fs.permission.FsAction;
@@ -1355,22 +1355,20 @@ public boolean equals(Object other) {
13551355
* setting up the expectation that the {@code get()} call
13561356
* is needed to evaluate the result.
13571357
* @param path path to the file
1358-
* @param mandatoryKeys set of options declared as mandatory.
1359-
* @param options options set during the build sequence.
1360-
* @param bufferSize buffer size
1358+
* @param parameters open file parameters from the builder.
13611359
* @return a future which will evaluate to the opened file.
13621360
* @throws IOException failure to resolve the link.
13631361
* @throws IllegalArgumentException unknown mandatory key
13641362
*/
13651363
public CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
1366-
Set<String> mandatoryKeys,
1367-
Configuration options,
1368-
int bufferSize) throws IOException {
1369-
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
1364+
final OpenFileParameters parameters) throws IOException {
1365+
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
1366+
parameters.getMandatoryKeys(),
13701367
Collections.emptySet(),
13711368
"for " + path);
13721369
return LambdaUtils.eval(
1373-
new CompletableFuture<>(), () -> open(path, bufferSize));
1370+
new CompletableFuture<>(), () ->
1371+
open(path, parameters.getBufferSize()));
13741372
}
13751373

13761374
public boolean hasPathCapability(final Path path,

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import java.util.Collections;
2828
import java.util.EnumSet;
2929
import java.util.List;
30-
import java.util.Locale;
31-
import java.util.Set;
3230
import java.util.concurrent.CompletableFuture;
3331

3432
import com.google.common.base.Preconditions;
@@ -37,6 +35,7 @@
3735
import org.apache.hadoop.conf.Configuration;
3836
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
3937
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
38+
import org.apache.hadoop.fs.impl.OpenFileParameters;
4039
import org.apache.hadoop.fs.permission.AclEntry;
4140
import org.apache.hadoop.fs.permission.FsPermission;
4241
import org.apache.hadoop.util.DataChecksum;
@@ -845,14 +844,14 @@ public FutureDataInputStreamBuilder openFile(final Path path)
845844
@Override
846845
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
847846
final Path path,
848-
final Set<String> mandatoryKeys,
849-
final Configuration options,
850-
final int bufferSize) throws IOException {
851-
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
847+
final OpenFileParameters parameters) throws IOException {
848+
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
849+
parameters.getMandatoryKeys(),
852850
Collections.emptySet(),
853851
"for " + path);
854852
return LambdaUtils.eval(
855-
new CompletableFuture<>(), () -> open(path, bufferSize));
853+
new CompletableFuture<>(),
854+
() -> open(path, parameters.getBufferSize()));
856855
}
857856

858857
/**

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@
2424
import java.util.Arrays;
2525
import java.util.EnumSet;
2626
import java.util.List;
27-
import java.util.Set;
2827
import java.util.concurrent.CompletableFuture;
2928

3029
import org.apache.hadoop.classification.InterfaceAudience;
3130
import org.apache.hadoop.classification.InterfaceStability;
3231
import org.apache.hadoop.conf.Configuration;
3332
import org.apache.hadoop.fs.Options.ChecksumOpt;
33+
import org.apache.hadoop.fs.impl.OpenFileParameters;
3434
import org.apache.hadoop.fs.permission.FsPermission;
3535
import org.apache.hadoop.security.token.Token;
3636
import org.apache.hadoop.util.Progressable;
@@ -266,20 +266,17 @@ public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
266266

267267
/**
268268
* Open a file by delegating to
269-
* {@link FileSystem#openFileWithOptions(Path, Set, Configuration, int)}.
269+
* {@link FileSystem#openFileWithOptions(Path, org.apache.hadoop.fs.impl.OpenFileParameters)}.
270270
* @param path path to the file
271-
* @param mandatoryKeys set of options declared as mandatory.
272-
* @param options options set during the build sequence.
273-
* @param bufferSize buffer size
274-
* @return a future which will evaluate to the opened file.
271+
* @param parameters open file parameters from the builder.
272+
*
273+
* @return a future which will evaluate to the opened file.ControlAlpha
275274
* @throws IOException failure to resolve the link.
276275
* @throws IllegalArgumentException unknown mandatory key
277276
*/
278277
public CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
279-
Set<String> mandatoryKeys,
280-
Configuration options,
281-
int bufferSize) throws IOException {
282-
return fsImpl.openFileWithOptions(path, mandatoryKeys, options, bufferSize);
278+
final OpenFileParameters parameters) throws IOException {
279+
return fsImpl.openFileWithOptions(path, parameters);
283280
}
284281

285282
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import org.apache.hadoop.fs.Options.CreateOpts;
4848
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
4949
import org.apache.hadoop.fs.impl.FsLinkResolution;
50-
import org.apache.hadoop.fs.impl.PathCapabilitiesSupport;
50+
import org.apache.hadoop.fs.impl.OpenFileParameters;
5151
import org.apache.hadoop.fs.permission.AclEntry;
5252
import org.apache.hadoop.fs.permission.AclStatus;
5353
import org.apache.hadoop.fs.permission.FsAction;
@@ -2924,16 +2924,18 @@ protected FSDataInputStreamBuilder(
29242924
@Override
29252925
public CompletableFuture<FSDataInputStream> build() throws IOException {
29262926
final Path absF = fixRelativePart(getPath());
2927+
OpenFileParameters parameters = new OpenFileParameters()
2928+
.withMandatoryKeys(getMandatoryKeys())
2929+
.withOptions(getOptions())
2930+
.withBufferSize(getBufferSize())
2931+
.withStatus(getStatus());
29272932
return new FSLinkResolver<CompletableFuture<FSDataInputStream>>() {
29282933
@Override
29292934
public CompletableFuture<FSDataInputStream> next(
29302935
final AbstractFileSystem fs,
29312936
final Path p)
29322937
throws IOException {
2933-
return fs.openFileWithOptions(p,
2934-
getMandatoryKeys(),
2935-
getOptions(),
2936-
getBufferSize());
2938+
return fs.openFileWithOptions(p, parameters);
29372939
}
29382940
}.resolve(FileContext.this, absF);
29392941
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.hadoop.fs.Options.Rename;
5959
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
6060
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
61+
import org.apache.hadoop.fs.impl.OpenFileParameters;
6162
import org.apache.hadoop.fs.permission.AclEntry;
6263
import org.apache.hadoop.fs.permission.AclStatus;
6364
import org.apache.hadoop.fs.permission.FsAction;
@@ -4485,43 +4486,39 @@ public FutureDataInputStreamBuilder openFile(PathHandle pathHandle)
44854486
* the action of opening the file should begin.
44864487
*
44874488
* The base implementation performs a blocking
4488-
* call to {@link #open(Path, int)}in this call;
4489+
* call to {@link #open(Path, int)} in this call;
44894490
* the actual outcome is in the returned {@code CompletableFuture}.
44904491
* This avoids having to create some thread pool, while still
44914492
* setting up the expectation that the {@code get()} call
44924493
* is needed to evaluate the result.
44934494
* @param path path to the file
4494-
* @param mandatoryKeys set of options declared as mandatory.
4495-
* @param options options set during the build sequence.
4496-
* @param bufferSize buffer size
4495+
* @param parameters open file parameters from the builder.
44974496
* @return a future which will evaluate to the opened file.
44984497
* @throws IOException failure to resolve the link.
44994498
* @throws IllegalArgumentException unknown mandatory key
45004499
*/
45014500
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
45024501
final Path path,
4503-
final Set<String> mandatoryKeys,
4504-
final Configuration options,
4505-
final int bufferSize) throws IOException {
4506-
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
4502+
final OpenFileParameters parameters) throws IOException {
4503+
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
4504+
parameters.getMandatoryKeys(),
45074505
Collections.emptySet(),
45084506
"for " + path);
45094507
return LambdaUtils.eval(
4510-
new CompletableFuture<>(), () -> open(path, bufferSize));
4508+
new CompletableFuture<>(), () ->
4509+
open(path, parameters.getBufferSize()));
45114510
}
45124511

45134512
/**
45144513
* Execute the actual open file operation.
45154514
* The base implementation performs a blocking
4516-
* call to {@link #open(Path, int)}in this call;
4515+
* call to {@link #open(Path, int)} in this call;
45174516
* the actual outcome is in the returned {@code CompletableFuture}.
45184517
* This avoids having to create some thread pool, while still
45194518
* setting up the expectation that the {@code get()} call
45204519
* is needed to evaluate the result.
45214520
* @param pathHandle path to the file
4522-
* @param mandatoryKeys set of options declared as mandatory.
4523-
* @param options options set during the build sequence.
4524-
* @param bufferSize buffer size
4521+
* @param parameters open file parameters from the builder.
45254522
* @return a future which will evaluate to the opened file.
45264523
* @throws IOException failure to resolve the link.
45274524
* @throws IllegalArgumentException unknown mandatory key
@@ -4530,14 +4527,13 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
45304527
*/
45314528
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
45324529
final PathHandle pathHandle,
4533-
final Set<String> mandatoryKeys,
4534-
final Configuration options,
4535-
final int bufferSize) throws IOException {
4536-
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
4530+
final OpenFileParameters parameters) throws IOException {
4531+
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
4532+
parameters.getMandatoryKeys(),
45374533
Collections.emptySet(), "");
45384534
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
45394535
try {
4540-
result.complete(open(pathHandle, bufferSize));
4536+
result.complete(open(pathHandle, parameters.getBufferSize()));
45414537
} catch (UnsupportedOperationException tx) {
45424538
// fail fast here
45434539
throw tx;
@@ -4639,12 +4635,17 @@ protected FSDataInputStreamBuilder(
46394635
@Override
46404636
public CompletableFuture<FSDataInputStream> build() throws IOException {
46414637
Optional<Path> optionalPath = getOptionalPath();
4638+
OpenFileParameters parameters = new OpenFileParameters()
4639+
.withMandatoryKeys(getMandatoryKeys())
4640+
.withOptions(getOptions())
4641+
.withBufferSize(getBufferSize())
4642+
.withStatus(super.getStatus()); // explicit to avoid IDE warnings
46424643
if(optionalPath.isPresent()) {
46434644
return getFS().openFileWithOptions(optionalPath.get(),
4644-
getMandatoryKeys(), getOptions(), getBufferSize());
4645+
parameters);
46454646
} else {
46464647
return getFS().openFileWithOptions(getPathHandle(),
4647-
getMandatoryKeys(), getOptions(), getBufferSize());
4648+
parameters);
46484649
}
46494650
}
46504651

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
import java.util.EnumSet;
2626
import java.util.List;
2727
import java.util.Map;
28-
import java.util.Set;
2928
import java.util.concurrent.CompletableFuture;
3029

3130
import org.apache.hadoop.classification.InterfaceAudience;
3231
import org.apache.hadoop.classification.InterfaceStability;
3332
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.fs.impl.OpenFileParameters;
3434
import org.apache.hadoop.fs.permission.AclEntry;
3535
import org.apache.hadoop.fs.permission.AclStatus;
3636
import org.apache.hadoop.fs.permission.FsAction;
@@ -714,20 +714,15 @@ public FutureDataInputStreamBuilder openFile(final PathHandle pathHandle)
714714
@Override
715715
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
716716
final Path path,
717-
final Set<String> mandatoryKeys,
718-
final Configuration options,
719-
final int bufferSize) throws IOException {
720-
return fs.openFileWithOptions(path, mandatoryKeys, options, bufferSize);
717+
final OpenFileParameters parameters) throws IOException {
718+
return fs.openFileWithOptions(path, parameters);
721719
}
722720

723721
@Override
724722
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
725723
final PathHandle pathHandle,
726-
final Set<String> mandatoryKeys,
727-
final Configuration options,
728-
final int bufferSize) throws IOException {
729-
return fs.openFileWithOptions(pathHandle, mandatoryKeys, options,
730-
bufferSize);
724+
final OpenFileParameters parameters) throws IOException {
725+
return fs.openFileWithOptions(pathHandle, parameters);
731726
}
732727

733728
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,12 @@
2626
import java.util.EnumSet;
2727
import java.util.List;
2828
import java.util.Map;
29-
import java.util.Set;
3029
import java.util.concurrent.CompletableFuture;
3130

3231
import org.apache.hadoop.classification.InterfaceAudience;
3332
import org.apache.hadoop.classification.InterfaceStability;
34-
import org.apache.hadoop.conf.Configuration;
3533
import org.apache.hadoop.fs.FileSystem.Statistics;
34+
import org.apache.hadoop.fs.impl.OpenFileParameters;
3635
import org.apache.hadoop.fs.permission.AclEntry;
3736
import org.apache.hadoop.fs.permission.AclStatus;
3837
import org.apache.hadoop.fs.permission.FsAction;
@@ -440,10 +439,8 @@ public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
440439
@Override
441440
public CompletableFuture<FSDataInputStream> openFileWithOptions(
442441
final Path path,
443-
final Set<String> mandatoryKeys,
444-
final Configuration options,
445-
final int bufferSize) throws IOException {
446-
return myFs.openFileWithOptions(path, mandatoryKeys, options, bufferSize);
442+
final OpenFileParameters parameters) throws IOException {
443+
return myFs.openFileWithOptions(path, parameters);
447444
}
448445

449446
public boolean hasPathCapability(final Path path,

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,15 @@ public interface FutureDataInputStreamBuilder
4747
CompletableFuture<FSDataInputStream> build()
4848
throws IllegalArgumentException, UnsupportedOperationException,
4949
IOException;
50+
51+
/**
52+
* A FileStatus may be provided to the open request.
53+
* It is up to the implementation whether to use this or not.
54+
* @param status status.
55+
* @return the builder.
56+
*/
57+
default FutureDataInputStreamBuilder withFileStatus(FileStatus status) {
58+
return this;
59+
}
60+
5061
}

0 commit comments

Comments
 (0)