Skip to content

Commit f4950b3

Browse files
committed
HADOOP-17981. ResilientCommitByRename
Selective cherrypick of MAPREDUCE-7341 files; move etag tests into hadoop-azure. RawLocalFS implements the API; checksum fs does not Change-Id: Ic68337742de6aefd067eda14313fd07088c73ac7
1 parent a85d778 commit f4950b3

File tree

11 files changed

+924
-10
lines changed

11 files changed

+924
-10
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.hadoop.util.Progressable;
4747

4848
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
49+
import static org.apache.hadoop.fs.impl.ResilientCommitByRename.RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY;
4950
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
5051

5152
/****************************************************************
@@ -929,6 +930,7 @@ public boolean hasPathCapability(final Path path, final String capability)
929930
switch (validatePathCapabilityArgs(p, capability)) {
930931
case CommonPathCapabilities.FS_APPEND:
931932
case CommonPathCapabilities.FS_CONCAT:
933+
case RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY:
932934
return false;
933935
default:
934936
return super.hasPathCapability(p, capability);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.nio.ByteBuffer;
3636
import java.nio.file.Files;
3737
import java.nio.file.NoSuchFileException;
38+
import java.nio.file.StandardCopyOption;
3839
import java.nio.file.attribute.BasicFileAttributes;
3940
import java.nio.file.attribute.BasicFileAttributeView;
4041
import java.nio.file.attribute.FileTime;
@@ -45,9 +46,12 @@
4546
import java.util.StringTokenizer;
4647
import java.util.concurrent.atomic.AtomicLong;
4748

49+
import javax.annotation.Nullable;
50+
4851
import org.apache.hadoop.classification.InterfaceAudience;
4952
import org.apache.hadoop.classification.InterfaceStability;
5053
import org.apache.hadoop.conf.Configuration;
54+
import org.apache.hadoop.fs.impl.ResilientCommitByRename;
5155
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
5256
import org.apache.hadoop.fs.permission.FsPermission;
5357
import org.apache.hadoop.fs.statistics.IOStatistics;
@@ -76,7 +80,9 @@
7680
*****************************************************************/
7781
@InterfaceAudience.Public
7882
@InterfaceStability.Stable
79-
public class RawLocalFileSystem extends FileSystem {
83+
public class RawLocalFileSystem extends FileSystem
84+
implements ResilientCommitByRename {
85+
8086
static final URI NAME = URI.create("file:///");
8187
private Path workingDir;
8288
private long defaultBlockSize;
@@ -505,6 +511,49 @@ public boolean rename(Path src, Path dst) throws IOException {
505511
return FileUtil.copy(this, src, this, dst, true, getConf());
506512
}
507513

514+
/**
515+
* Use java nio operations to fail meaningfully.
516+
* {@inheritDoc}
517+
*/
518+
@Override
519+
public CommitByRenameOutcome commitSingleFileByRename(final Path source,
520+
final Path dest,
521+
@Nullable final String sourceEtag,
522+
@Nullable final FileStatus sourceStatus)
523+
throws IOException {
524+
LOG.debug("commitSingleFileByRename src: {} dst: {}", source, dest);
525+
Path qualifiedSourcePath = makeQualified(source);
526+
Path qualifiedDestPath = makeQualified(dest);
527+
528+
// initial checks
529+
if (qualifiedSourcePath.equals(qualifiedDestPath)) {
530+
// rename to itself is forbidden
531+
throw new PathIOException(qualifiedSourcePath.toString(), "cannot rename object onto self");
532+
}
533+
final File sourceFile = pathToFile(qualifiedSourcePath);
534+
if (!sourceFile.exists()) {
535+
throw new FileNotFoundException(qualifiedSourcePath.toString());
536+
}
537+
final File destFile = pathToFile(qualifiedDestPath);
538+
if (destFile.exists()) {
539+
throw new FileAlreadyExistsException(qualifiedDestPath.toString());
540+
}
541+
// do the move
542+
try {
543+
Files.move(sourceFile.toPath(),
544+
destFile.toPath(),
545+
StandardCopyOption.ATOMIC_MOVE);
546+
} catch (UnsupportedOperationException e) {
547+
// raised when flags are not supported
548+
throw new PathIOException(qualifiedSourcePath.toString(),
549+
e.toString(),
550+
e);
551+
552+
}
553+
554+
return new CommitByRenameOutcome();
555+
}
556+
508557
@VisibleForTesting
509558
public final boolean handleEmptyDstDirectoryOnWindows(Path src, File srcFile,
510559
Path dst, File dstFile) throws IOException {
@@ -1183,6 +1232,7 @@ public boolean hasPathCapability(final Path path, final String capability)
11831232
case CommonPathCapabilities.FS_PATHHANDLES:
11841233
case CommonPathCapabilities.FS_PERMISSIONS:
11851234
case CommonPathCapabilities.FS_TRUNCATE:
1235+
case RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY:
11861236
return true;
11871237
case CommonPathCapabilities.FS_SYMLINKS:
11881238
return FileSystem.areSymlinksEnabled();
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.impl;
20+
21+
import java.io.FileNotFoundException;
22+
import java.io.IOException;
23+
24+
import javax.annotation.Nullable;
25+
26+
import org.apache.hadoop.classification.InterfaceAudience;
27+
import org.apache.hadoop.classification.InterfaceStability;
28+
import org.apache.hadoop.fs.FileStatus;
29+
import org.apache.hadoop.fs.Path;
30+
import org.apache.hadoop.fs.PathIOException;
31+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
32+
33+
/**
34+
* This is something internal to make our rename-based job committers
35+
* more resilient to failures.
36+
* If you are in the hive team: do not use this as it lacks
37+
* spec, tests, stability, etc. if we find you using it we will change
38+
* the signature just to stop your code compiling.
39+
* View this as a proof of concept of the functionality we'd want from a
40+
* "modern" rename call, but not the API (which would be builder based,
41+
* return a future, etc).
42+
*/
43+
@InterfaceAudience.LimitedPrivate({"Filesystems", "hadoop-mapreduce-client-core"})
44+
@InterfaceStability.Unstable
45+
public interface ResilientCommitByRename {
46+
47+
/**
48+
* Path capability.
49+
* FS Instances which support the operation MUST return
50+
* true and implement the method; FileSystem instances which do not
51+
* MUST return false.
52+
* There's a risk wrapper filesystems may pass the probe
53+
* through.
54+
* Clients MUST check for both the interface and this
55+
* cqpability.
56+
*/
57+
String RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY =
58+
"org.apache.hadoop.fs.impl.resilient.commit.by.rename";
59+
60+
/**
61+
* Rename source file to dest path *Exactly*; no subdirectory games here.
62+
* if the op does not raise an exception,then
63+
* the data at dest is the data which was at source.
64+
*
65+
* Requirements
66+
*
67+
* <pre>
68+
* exists(FS, source) else raise FileNotFoundException
69+
* source != dest else raise PathIOException
70+
* not exists(FS, dest)
71+
* isDir(FS, dest.getParent)
72+
* </pre>
73+
* <ol>
74+
* <li>supported in this instance else raise PathIOException</li>
75+
* <li>source != dest else raise PathIOException</li>
76+
* <li>source must exist else raise FileNotFoundException</li>
77+
* <li>source must exist and be a file</li>
78+
* <li>dest must not exist; </li>
79+
* <li>dest.getParent() must be a dir</li>
80+
* <li>if sourceEtag is non-empty, it MAY be used to qualify/validate the rename.</li>
81+
* </ol>
82+
*
83+
* The outcome of the operation is undefined if source is not a file, dest exists,
84+
* dest.getParent() doesn't exist/is a file.
85+
* That is: implementations SHOULD assume that the code calling this method has
86+
* set up the destination directory tree and is only invoking this call on a file.
87+
* Accordingly: <i>implementations MAY skip validation checks</i>
88+
*
89+
* If sourceStatus is not null, its contents MAY be used to qualify the rename.
90+
* <ol>
91+
* <li>Values extracted from sourceStatus SHALL take priority over
92+
* sourceEtag/sourceLastModified parameter.</li>
93+
* <li>sourceStatus.getPath().getName() MUST equal source.getName()</li>
94+
* <li>If store has a subclass of FileStatus and it is sourceStatus is of this type,
95+
* custom information MAY be used to qualify/validate the request.
96+
* This MAY include etag or S3 version ID extraction,</li>
97+
* </ol>
98+
*
99+
* Filesystems MAY support this call on an instance-by-instance basis, depending on
100+
* the nature of the remote store.
101+
* If not available the implementation MUST {@code ResilientCommitByRenameUnsupported}.
102+
* Callers SHOULD use a check of
103+
* {@code hasPathCapability(source, RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY}
104+
* before trying to use this call.
105+
*
106+
* PostConditions on a successful operation:
107+
* <pre>
108+
* FS' where:
109+
* not exists(FS', source)
110+
* and exists(FS', dest)
111+
* and data(FS', dest) == data (FS, source)
112+
* </pre>
113+
* This is exactly the same outcome as `FileSystem.rename()` when the same preconditions
114+
* are met. This API call simply restricts the operation to file rename with strict
115+
* conditions, (no need to be 'clever' about dest path calculation) and the ability
116+
* to pass in etags, modtimes and file status values.
117+
*
118+
* @param source path to source file
119+
* @param dest destination of rename.
120+
* @param sourceEtag etag of source file. may be null or empty
121+
* @param sourceStatus nullable FileStatus of source.
122+
* @throws FileNotFoundException source file not found
123+
* @throws ResilientCommitByRenameUnsupported not available on this store.
124+
* @throws PathIOException failure, including source and dest being the same path
125+
* @throws IOException any other exception
126+
*/
127+
default CommitByRenameOutcome commitSingleFileByRename(
128+
Path source,
129+
Path dest,
130+
@Nullable String sourceEtag,
131+
@Nullable FileStatus sourceStatus)
132+
throws FileNotFoundException,
133+
ResilientCommitByRenameUnsupported,
134+
PathIOException,
135+
IOException {
136+
throw new ResilientCommitByRenameUnsupported(source.toString());
137+
}
138+
139+
/**
140+
* The outcome. This is always a success, but it
141+
* may include some information about what happened.
142+
*/
143+
class CommitByRenameOutcome implements IOStatisticsSource {
144+
145+
}
146+
147+
final class ResilientCommitByRenameUnsupported extends PathIOException {
148+
public ResilientCommitByRenameUnsupported(final String path) {
149+
super(path, "ResilientCommit operations not supported");
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)