Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.util.Progressable;

import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.impl.ResilientCommitByRename.RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;

/****************************************************************
Expand Down Expand Up @@ -929,6 +930,7 @@ public boolean hasPathCapability(final Path path, final String capability)
switch (validatePathCapabilityArgs(p, capability)) {
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.FS_CONCAT:
case RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY:
return false;
default:
return super.hasPathCapability(p, capability);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,25 @@
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.FileTime;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicLong;

import javax.annotation.Nullable;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.ResilientCommitByRename;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
Expand Down Expand Up @@ -76,7 +82,9 @@
*****************************************************************/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class RawLocalFileSystem extends FileSystem {
public class RawLocalFileSystem extends FileSystem
implements ResilientCommitByRename {

static final URI NAME = URI.create("file:///");
private Path workingDir;
private long defaultBlockSize;
Expand Down Expand Up @@ -505,6 +513,61 @@ public boolean rename(Path src, Path dst) throws IOException {
return FileUtil.copy(this, src, this, dst, true, getConf());
}

/**
* Use java nio operations to fail meaningfully.
* This is here to test the API without requiring an object
* store which implements it.
* It also provides the reference "what should happen here"
* semantics.
* {@inheritDoc}
*/
@Override
@VisibleForTesting
public CommitByRenameOutcome commitSingleFileByRename(final Path source,
final Path dest,
@Nullable final String sourceEtag,
@Nullable final FileStatus sourceStatus,
final CommitFlags... options)
throws IOException {
LOG.debug("commitSingleFileByRename src: {} dst: {}", source, dest);
Path qualifiedSourcePath = makeQualified(source);
Path qualifiedDestPath = makeQualified(dest);

// initial checks
if (qualifiedSourcePath.equals(qualifiedDestPath)) {
// rename to itself is forbidden
throw new PathIOException(qualifiedSourcePath.toString(), "cannot rename object onto self");
}
final File sourceFile = pathToFile(qualifiedSourcePath);
if (!sourceFile.exists()) {
throw new FileNotFoundException(qualifiedSourcePath.toString());
}
final File destFile = pathToFile(qualifiedDestPath);
Set<CommitFlags> flags = new HashSet<>(Arrays.asList(options));
if (!flags.contains(CommitFlags.DESTINATION_DOES_NOT_EXIST) && destFile.exists()) {
// dest exists reject dirs; files allowed if overwrite is true.
if (!flags.contains(CommitFlags.OVERWRITE) && destFile.isFile()) {
throw new FileAlreadyExistsException(qualifiedDestPath.toString());
}
if (destFile.isDirectory()) {
throw new FileAlreadyExistsException(qualifiedDestPath.toString());
}
}
// do the move
try {
Files.move(sourceFile.toPath(),
destFile.toPath(),
StandardCopyOption.ATOMIC_MOVE);
} catch (UnsupportedOperationException e) {
// raised when flags are not supported
throw new PathIOException(qualifiedSourcePath.toString(),
e.toString(),
e);
}

return new CommitByRenameOutcome(false, false, false);
}

@VisibleForTesting
public final boolean handleEmptyDstDirectoryOnWindows(Path src, File srcFile,
Path dst, File dstFile) throws IOException {
Expand Down Expand Up @@ -1183,6 +1246,7 @@ public boolean hasPathCapability(final Path path, final String capability)
case CommonPathCapabilities.FS_PATHHANDLES:
case CommonPathCapabilities.FS_PERMISSIONS:
case CommonPathCapabilities.FS_TRUNCATE:
case RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY:
return true;
case CommonPathCapabilities.FS_SYMLINKS:
return FileSystem.areSymlinksEnabled();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.impl;

import java.io.FileNotFoundException;
import java.io.IOException;

import javax.annotation.Nullable;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;

/**
* This is something internal to make our rename-based job committers
* more resilient to failures.
* If you are in the hive team: do not use this as it lacks
* spec, tests, stability, etc. if we find you using it we will change
* the signature just to stop your code compiling.
* View this as a proof of concept of the functionality we'd want from a
* "modern" rename call, but not the API (which would be builder based,
* return a future, etc).
*/
@InterfaceAudience.LimitedPrivate({"Filesystems", "hadoop-mapreduce-client-core"})
@InterfaceStability.Unstable
public interface ResilientCommitByRename {

/**
* Path capability.
* FS Instances which support the operation MUST return
* true and implement the method; FileSystem instances which do not
* MUST return false.
* There's a risk wrapper filesystems may pass the probe
* through.
* Clients MUST check for both the interface and this
* cqpability.
*/
String RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY =
"org.apache.hadoop.fs.impl.resilient.commit.by.rename";

/**
* Rename source file to dest path *Exactly*; no subdirectory games here.
* if the op does not raise an exception,then
* the data at dest is the data which was at source.
*
* Requirements
*
* <pre>
* exists(FS, source) else raise FileNotFoundException
* source != dest else raise PathIOException
* not exists(FS, dest)
* isDir(FS, dest.getParent)
* </pre>
* <ol>
* <li>supported in this instance else raise PathIOException</li>
* <li>source != dest else raise PathIOException</li>
* <li>source must exist else raise FileNotFoundException</li>
* <li>source must exist and be a file</li>
* <li>dest must not exist; </li>
* <li>dest.getParent() must be a dir</li>
* <li>if sourceEtag is non-empty, it MAY be used to qualify/validate the rename.</li>
* </ol>
*
* The outcome of the operation is undefined if source is not a file, dest exists,
* dest.getParent() doesn't exist/is a file.
* That is: implementations SHOULD assume that the code calling this method has
* set up the destination directory tree and is only invoking this call on a file.
* Accordingly: <i>implementations MAY skip validation checks</i>
*
* If sourceStatus is not null, its contents MAY be used to qualify the rename.
* <ol>
* <li>Values extracted from sourceStatus SHALL take priority over
* sourceEtag/sourceLastModified parameter.</li>
* <li>sourceStatus.getPath().getName() MUST equal source.getName()</li>
* <li>If store has a subclass of FileStatus and it is sourceStatus is of this type,
* custom information MAY be used to qualify/validate the request.
* This MAY include etag or S3 version ID extraction,</li>
* </ol>
*
* Filesystems MAY support this call on an instance-by-instance basis, depending on
* the nature of the remote store.
* If not available the implementation MUST {@code ResilientCommitByRenameUnsupported}.
* Callers SHOULD use a check of
* {@code hasPathCapability(source, RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY}
* before trying to use this call.
*
* PostConditions on a successful operation:
* <pre>
* FS' where:
* not exists(FS', source)
* and exists(FS', dest)
* and data(FS', dest) == data (FS, source)
* </pre>
* This is exactly the same outcome as `FileSystem.rename()` when the same preconditions
* are met. This API call simply restricts the operation to file rename with strict
* conditions, (no need to be 'clever' about dest path calculation) and the ability
* to pass in etags, modtimes and file status values.
*
* @param source path to source file
* @param dest destination of rename.
* @param sourceEtag etag of source file. may be null or empty
* @param sourceStatus nullable FileStatus of source.
* @param options rename flags
* @throws FileNotFoundException source file not found
* @throws ResilientCommitByRenameUnsupported not available on this store.
* @throws PathIOException failure, including source and dest being the same path
* @throws IOException any other exception
*/
default CommitByRenameOutcome commitSingleFileByRename(
Path source,
Path dest,
@Nullable String sourceEtag,
@Nullable FileStatus sourceStatus,
CommitFlags... options)
throws FileNotFoundException,
ResilientCommitByRenameUnsupported,
PathIOException,
IOException {
throw new ResilientCommitByRenameUnsupported(source.toString());
}

/**
* The outcome. This is always a success, but it
* may include some information about what happened.
*/
class CommitByRenameOutcome {

/* Throttling encountered and recovered from. */
private boolean throttlingEncountered;

/* The new commit operation has been rejected; falling back. */
private boolean commitRejected;

/* Classic rename was used. */
private boolean classicRenameUsed;

public CommitByRenameOutcome() {
}

public CommitByRenameOutcome(
final boolean throttlingEncountered,
final boolean commitRejected,
final boolean classicRenameUsed) {
this.throttlingEncountered = throttlingEncountered;
this.commitRejected = commitRejected;
this.classicRenameUsed = classicRenameUsed;
}

public boolean isThrottlingEncountered() {
return throttlingEncountered;
}

public void setThrottlingEncountered(final boolean throttlingEncountered) {
this.throttlingEncountered = throttlingEncountered;
}

public boolean isCommitRejected() {
return commitRejected;
}

public void setCommitRejected(final boolean commitRejected) {
this.commitRejected = commitRejected;
}

public boolean isClassicRenameUsed() {
return classicRenameUsed;
}

public void setClassicRenameUsed(final boolean classicRenameUsed) {
this.classicRenameUsed = classicRenameUsed;
}
}

final class ResilientCommitByRenameUnsupported extends PathIOException {
public ResilientCommitByRenameUnsupported(final String path) {
super(path, "ResilientCommit operations not supported");
}
}

/**
* Enum of options.
*/
enum CommitFlags {
NONE, // No options
OVERWRITE, // Overwrite the rename destination
DESTINATION_DOES_NOT_EXIST; // dest known to have been deleted

}
}
Loading