diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index c7f8e36c3f675..548e65f7fda0e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -46,6 +46,7 @@ import org.apache.hadoop.util.Progressable; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; +import static org.apache.hadoop.fs.impl.ResilientCommitByRename.RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; /**************************************************************** @@ -929,6 +930,7 @@ public boolean hasPathCapability(final Path path, final String capability) switch (validatePathCapabilityArgs(p, capability)) { case CommonPathCapabilities.FS_APPEND: case CommonPathCapabilities.FS_CONCAT: + case RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY: return false; default: return super.hasPathCapability(p, capability); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index edcc4a8b99e77..2a4b11f63e14a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -35,19 +35,25 @@ import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.NoSuchFileException; +import java.nio.file.StandardCopyOption; import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributeView; import java.nio.file.attribute.FileTime; import java.util.Arrays; import java.util.EnumSet; +import java.util.HashSet; import java.util.Locale; import java.util.Optional; +import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.ResilientCommitByRename; import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -76,7 +82,9 @@ *****************************************************************/ @InterfaceAudience.Public @InterfaceStability.Stable -public class RawLocalFileSystem extends FileSystem { +public class RawLocalFileSystem extends FileSystem + implements ResilientCommitByRename { + static final URI NAME = URI.create("file:///"); private Path workingDir; private long defaultBlockSize; @@ -505,6 +513,61 @@ public boolean rename(Path src, Path dst) throws IOException { return FileUtil.copy(this, src, this, dst, true, getConf()); } + /** + * Use java nio operations to fail meaningfully. + * This is here to test the API without requiring an object + * store which implements it. + * It also provides the reference "what should happen here" + * semantics. + * {@inheritDoc} + */ + @Override + @VisibleForTesting + public CommitByRenameOutcome commitSingleFileByRename(final Path source, + final Path dest, + @Nullable final String sourceEtag, + @Nullable final FileStatus sourceStatus, + final CommitFlags... options) + throws IOException { + LOG.debug("commitSingleFileByRename src: {} dst: {}", source, dest); + Path qualifiedSourcePath = makeQualified(source); + Path qualifiedDestPath = makeQualified(dest); + + // initial checks + if (qualifiedSourcePath.equals(qualifiedDestPath)) { + // rename to itself is forbidden + throw new PathIOException(qualifiedSourcePath.toString(), "cannot rename object onto self"); + } + final File sourceFile = pathToFile(qualifiedSourcePath); + if (!sourceFile.exists()) { + throw new FileNotFoundException(qualifiedSourcePath.toString()); + } + final File destFile = pathToFile(qualifiedDestPath); + Set flags = new HashSet<>(Arrays.asList(options)); + if (!flags.contains(CommitFlags.DESTINATION_DOES_NOT_EXIST) && destFile.exists()) { + // dest exists reject dirs; files allowed if overwrite is true. + if (!flags.contains(CommitFlags.OVERWRITE) && destFile.isFile()) { + throw new FileAlreadyExistsException(qualifiedDestPath.toString()); + } + if (destFile.isDirectory()) { + throw new FileAlreadyExistsException(qualifiedDestPath.toString()); + } + } + // do the move + try { + Files.move(sourceFile.toPath(), + destFile.toPath(), + StandardCopyOption.ATOMIC_MOVE); + } catch (UnsupportedOperationException e) { + // raised when flags are not supported + throw new PathIOException(qualifiedSourcePath.toString(), + e.toString(), + e); + } + + return new CommitByRenameOutcome(false, false, false); + } + @VisibleForTesting public final boolean handleEmptyDstDirectoryOnWindows(Path src, File srcFile, Path dst, File dstFile) throws IOException { @@ -1183,6 +1246,7 @@ public boolean hasPathCapability(final Path path, final String capability) case CommonPathCapabilities.FS_PATHHANDLES: case CommonPathCapabilities.FS_PERMISSIONS: case CommonPathCapabilities.FS_TRUNCATE: + case RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY: return true; case CommonPathCapabilities.FS_SYMLINKS: return FileSystem.areSymlinksEnabled(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/ResilientCommitByRename.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/ResilientCommitByRename.java new file mode 100644 index 0000000000000..50b5c44630e18 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/ResilientCommitByRename.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import javax.annotation.Nullable; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; + +/** + * This is something internal to make our rename-based job committers + * more resilient to failures. + * If you are in the hive team: do not use this as it lacks + * spec, tests, stability, etc. if we find you using it we will change + * the signature just to stop your code compiling. + * View this as a proof of concept of the functionality we'd want from a + * "modern" rename call, but not the API (which would be builder based, + * return a future, etc). + */ +@InterfaceAudience.LimitedPrivate({"Filesystems", "hadoop-mapreduce-client-core"}) +@InterfaceStability.Unstable +public interface ResilientCommitByRename { + + /** + * Path capability. + * FS Instances which support the operation MUST return + * true and implement the method; FileSystem instances which do not + * MUST return false. + * There's a risk wrapper filesystems may pass the probe + * through. + * Clients MUST check for both the interface and this + * cqpability. + */ + String RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY = + "org.apache.hadoop.fs.impl.resilient.commit.by.rename"; + + /** + * Rename source file to dest path *Exactly*; no subdirectory games here. + * if the op does not raise an exception,then + * the data at dest is the data which was at source. + * + * Requirements + * + *
+   *   exists(FS, source) else raise FileNotFoundException
+   *   source != dest else raise PathIOException
+   *   not exists(FS, dest)
+   *   isDir(FS, dest.getParent)
+   * 
+ *
    + *
  1. supported in this instance else raise PathIOException
  2. + *
  3. source != dest else raise PathIOException
  4. + *
  5. source must exist else raise FileNotFoundException
  6. + *
  7. source must exist and be a file
  8. + *
  9. dest must not exist;
  10. + *
  11. dest.getParent() must be a dir
  12. + *
  13. if sourceEtag is non-empty, it MAY be used to qualify/validate the rename.
  14. + *
+ * + * The outcome of the operation is undefined if source is not a file, dest exists, + * dest.getParent() doesn't exist/is a file. + * That is: implementations SHOULD assume that the code calling this method has + * set up the destination directory tree and is only invoking this call on a file. + * Accordingly: implementations MAY skip validation checks + * + * If sourceStatus is not null, its contents MAY be used to qualify the rename. + *
    + *
  1. Values extracted from sourceStatus SHALL take priority over + * sourceEtag/sourceLastModified parameter.
  2. + *
  3. sourceStatus.getPath().getName() MUST equal source.getName()
  4. + *
  5. If store has a subclass of FileStatus and it is sourceStatus is of this type, + * custom information MAY be used to qualify/validate the request. + * This MAY include etag or S3 version ID extraction,
  6. + *
+ * + * Filesystems MAY support this call on an instance-by-instance basis, depending on + * the nature of the remote store. + * If not available the implementation MUST {@code ResilientCommitByRenameUnsupported}. + * Callers SHOULD use a check of + * {@code hasPathCapability(source, RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY} + * before trying to use this call. + * + * PostConditions on a successful operation: + *
+   * FS' where:
+   *     not exists(FS', source)
+   *     and exists(FS', dest)
+   *     and data(FS', dest) == data (FS, source)
+   * 
+ * This is exactly the same outcome as `FileSystem.rename()` when the same preconditions + * are met. This API call simply restricts the operation to file rename with strict + * conditions, (no need to be 'clever' about dest path calculation) and the ability + * to pass in etags, modtimes and file status values. + * + * @param source path to source file + * @param dest destination of rename. + * @param sourceEtag etag of source file. may be null or empty + * @param sourceStatus nullable FileStatus of source. + * @param options rename flags + * @throws FileNotFoundException source file not found + * @throws ResilientCommitByRenameUnsupported not available on this store. + * @throws PathIOException failure, including source and dest being the same path + * @throws IOException any other exception + */ + default CommitByRenameOutcome commitSingleFileByRename( + Path source, + Path dest, + @Nullable String sourceEtag, + @Nullable FileStatus sourceStatus, + CommitFlags... options) + throws FileNotFoundException, + ResilientCommitByRenameUnsupported, + PathIOException, + IOException { + throw new ResilientCommitByRenameUnsupported(source.toString()); + } + + /** + * The outcome. This is always a success, but it + * may include some information about what happened. + */ + class CommitByRenameOutcome { + + /* Throttling encountered and recovered from. */ + private boolean throttlingEncountered; + + /* The new commit operation has been rejected; falling back. */ + private boolean commitRejected; + + /* Classic rename was used. */ + private boolean classicRenameUsed; + + public CommitByRenameOutcome() { + } + + public CommitByRenameOutcome( + final boolean throttlingEncountered, + final boolean commitRejected, + final boolean classicRenameUsed) { + this.throttlingEncountered = throttlingEncountered; + this.commitRejected = commitRejected; + this.classicRenameUsed = classicRenameUsed; + } + + public boolean isThrottlingEncountered() { + return throttlingEncountered; + } + + public void setThrottlingEncountered(final boolean throttlingEncountered) { + this.throttlingEncountered = throttlingEncountered; + } + + public boolean isCommitRejected() { + return commitRejected; + } + + public void setCommitRejected(final boolean commitRejected) { + this.commitRejected = commitRejected; + } + + public boolean isClassicRenameUsed() { + return classicRenameUsed; + } + + public void setClassicRenameUsed(final boolean classicRenameUsed) { + this.classicRenameUsed = classicRenameUsed; + } + } + + final class ResilientCommitByRenameUnsupported extends PathIOException { + public ResilientCommitByRenameUnsupported(final String path) { + super(path, "ResilientCommit operations not supported"); + } + } + + /** + * Enum of options. + */ + enum CommitFlags { + NONE, // No options + OVERWRITE, // Overwrite the rename destination + DESTINATION_DOES_NOT_EXIST; // dest known to have been deleted + + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/ResilientCommitByRenameHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/ResilientCommitByRenameHelper.java new file mode 100644 index 0000000000000..9c6a7ab310ef3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/ResilientCommitByRenameHelper.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.util.DurationInfo; + +import static java.util.Objects.requireNonNull; + +/** + * Support for committing work through {@link ResilientCommitByRename} + * where present. + * This is for internal use only and will be removed when there is a + * public rename operation which takes etags and FileStatus entries. + */ +@InterfaceAudience.LimitedPrivate({"Filesystems", "hadoop-mapreduce-client-core"}) +@InterfaceStability.Unstable +public class ResilientCommitByRenameHelper { + + private static final Logger LOG = + LoggerFactory.getLogger(ResilientCommitByRenameHelper.class); + + /** + * Target FS. + */ + private final FileSystem fileSystem; + + /** + * Was one more commits through the new API rejected? + */ + private final AtomicBoolean commitRejected; + + /** + * Instantiate. + * @param fileSystem filesystem to work with. + */ + public ResilientCommitByRenameHelper(final FileSystem fileSystem) { + this.fileSystem = requireNonNull(fileSystem); + commitRejected = new AtomicBoolean(false); + } + + /** + * Is resilient commit available on this filesystem/path? + * @param sourcePath path to commit under. + * @return true if the resilient commit API can b eused + */ + public boolean resilientCommitAvailable(Path sourcePath) { + if (commitRejected.get()) { + return false; + } + return filesystemHasResilientCommmit(this.fileSystem, sourcePath); + } + + /** + * What is the resilence of this filesystem? + * @param fs filesystem + * @param sourcePath path to use + * @return true if the conditions of use are met. + */ + public static boolean filesystemHasResilientCommmit( + final FileSystem fs, + final Path sourcePath) { + try { + return fs instanceof ResilientCommitByRename + && fs.hasPathCapability(sourcePath, + ResilientCommitByRename.RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY); + } catch (IOException e) { + return false; + } + } + + /** + * Commit a file. + * If the store supports {@link ResilientCommitByRename} then + * its API is used to commit the file, passing in the etag. + * @param sourceStatus source status file + * @param dest destination path + * @param options rename flags + * @return the outcome + * @throws IOException any failure in resilient commit other than rejection + * of the operation, and/or classic rename failed. + */ + public ResilientCommitByRename.CommitByRenameOutcome commitFile( + final FileStatus sourceStatus, final Path dest, + final ResilientCommitByRename.CommitFlags... options) + throws IOException { + final Path sourcePath = sourceStatus.getPath(); + boolean rejected = false; + if (resilientCommitAvailable(sourcePath)) { + + // use the better file rename operation. + try (DurationInfo du = new DurationInfo(LOG, "commit(%s, %s) with status %s", + sourcePath, dest, sourceStatus)) { + return ((ResilientCommitByRename) fileSystem).commitSingleFileByRename( + sourcePath, + dest, + null, + sourceStatus, + options); + } catch (ResilientCommitByRename.ResilientCommitByRenameUnsupported + | UnsupportedOperationException e) { + commitWasRejected(sourcePath, e); + rejected = true; + } + } + // fall back to rename. + try (DurationInfo du = new DurationInfo(LOG, "rename(%s, %s)", + sourcePath, dest, sourceStatus)) { + Set flags = new HashSet<>(Arrays.asList(options)); + if (!flags.contains(ResilientCommitByRename.CommitFlags.DESTINATION_DOES_NOT_EXIST)) { + try { + final FileStatus destStatus = fileSystem.getFileStatus(dest); + if (!flags.contains(ResilientCommitByRename.CommitFlags.OVERWRITE) + || destStatus.isDirectory()) { + // don't support renaming over a dir or, if not overwriting, a file + throw new FileAlreadyExistsException(dest.toUri().toString()); + } + } catch (FileNotFoundException ignored) { + // dest doesn't exist. + + } + } + + if (!fileSystem.rename(sourcePath, dest)) { + escalateRenameFailure(sourcePath, dest); + } + } + return new ResilientCommitByRename.CommitByRenameOutcome(false, rejected,true); + } + + /** + * The commit was rejected. + * Log once and remember, so don't bother trying again through + * the rest of this commit. + * @param path source. + * @param e exception. + */ + private void commitWasRejected(Path path, final Exception e) { + if (!commitRejected.getAndSet(true)) { + LOG.warn("Resilent Commit to {} rejected as unsupported", path); + LOG.debug("full exception", e); + } + } + + /** + * Escalate a rename failure to an exception. + * This never returns + * @param source source path + * @param dest dest path + * @throws IOException always + */ + private void escalateRenameFailure(Path source, Path dest) + throws IOException { + // rename just returned false. + // collect information for a meaningful error message + // and include in an exception raised. + + // get the source status; this will implicitly raise + // a FNFE. + final FileStatus sourceStatus = fileSystem.getFileStatus(source); + + // and look to see if there is anything at the destination + FileStatus destStatus; + try { + destStatus = fileSystem.getFileStatus(dest); + } catch (IOException e) { + destStatus = null; + } + + LOG.error("Failure to rename {} to {} with" + + " source status {} " + + " and destination status {}", + source, dest, + sourceStatus, destStatus); + + throw new PathIOException(source.toString(), + "Failed to rename to " + dest); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractResilientCommitByRenameTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractResilientCommitByRenameTest.java new file mode 100644 index 0000000000000..5d1b2f5fe6879 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractResilientCommitByRenameTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.ResilientCommitByRename; +import org.apache.hadoop.fs.impl.ResilientCommitByRenameHelper; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertListStatusFinds; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; +import static org.apache.hadoop.fs.impl.ResilientCommitByRename.RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test {@link ResilientCommitByRename}. + */ +public abstract class AbstractContractResilientCommitByRenameTest extends + AbstractFSContractTestBase { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractContractResilientCommitByRenameTest.class); + + private ResilientCommitByRenameHelper committer; + + @Override + public void setup() throws Exception { + super.setup(); + final FileSystem fs = getFileSystem(); + final Path path = methodPath(); + committer = new ResilientCommitByRenameHelper(fs); + if (isResilient()) { + assertIsResilient(path); + } else { + assertNotResilient(path); + } + } + + /** + * Is this expected to be resilient? + * @return true iff the FS should be resilient + */ + public boolean isResilient() { + return true; + } + + private void assertIsResilient(final Path path) throws IOException { + final FileSystem fs = getFileSystem(); + + Assertions.assertThat(fs) + .describedAs("FS %s", fs) + .isInstanceOf(ResilientCommitByRename.class); + Assertions.assertThat(fs.hasPathCapability(path, + RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY)) + .describedAs("FS %s path capability %s", fs, + RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY) + .isTrue(); + + Assertions.assertThat(committer.resilientCommitAvailable(path)) + .describedAs("resilient commit available") + .isTrue(); + } + + private void assertNotResilient(final Path path) throws IOException { + final FileSystem fs = getFileSystem(); + + Assertions.assertThat(fs) + .describedAs("FS %s", fs) + .isNotInstanceOf(ResilientCommitByRename.class); + Assertions.assertThat(fs.hasPathCapability(path, + RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY)) + .describedAs("FS %s path capability %s", fs, + RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY) + .isFalse(); + + Assertions.assertThat(committer.resilientCommitAvailable(path)) + .describedAs("resilient commit available") + .isFalse(); + } + + /** + * Commit a file. + * @param source source + * @param dest dest path + * @param options rename options + * @return the outcome + * @throws IOException failure + */ + protected ResilientCommitByRename.CommitByRenameOutcome commit( + final Path source, + final Path dest, + final ResilientCommitByRename.CommitFlags... options) throws IOException { + return committer.commitFile(getFileSystem().getFileStatus(source), dest, options); + } + + @Test + public void testCommitNewFileSameDir() throws Throwable { + describe("rename a file into a new file in the same directory"); + Path base = methodPath(); + Path source = new Path(base, "src"); + Path dest = new Path(base, "dest"); + byte[] data = dataset(256, 'a', 'z'); + final FileSystem fs = getFileSystem(); + writeDataset(fs, source, + data, data.length, 1024 * 1024, false); + commit(source, dest); + + assertListStatusFinds(fs, + dest.getParent(), dest); + verifyFileContents(fs, dest, data); + } + + @Test + public void testCommitMissingFile() throws Throwable { + describe("trying to commit a missing file raises FNFE"); + Path base = methodPath(); + Path source = new Path(base, "src"); + final FileSystem fs = getFileSystem(); + ContractTestUtils.touch(fs, source); + final FileStatus status = fs.getFileStatus(source); + fs.delete(source, false); + Path dest = new Path(base, "dest"); + intercept(FileNotFoundException.class, + () -> committer.commitFile(status, dest)); + assertPathDoesNotExist("rename nonexistent file created a destination file", + dest); + } + + /** + * It is an error if the destination exists. + */ + @Test + public void testCommitExistingFileNoOverwrite() throws Throwable { + describe("overwrite in commit is forbidden"); + Path base = methodPath(); + Path source = new Path(base, "source-256.txt"); + byte[] sourceData = dataset(256, 'a', 'z'); + final FileSystem fs = getFileSystem(); + writeDataset(fs, source, sourceData, sourceData.length, 1024, false); + Path dest = new Path(base, "dest-512.txt"); + byte[] destData = dataset(512, 'A', 'Z'); + writeDataset(fs, dest, destData, destData.length, 1024, false); + final IOException exception = intercept(FileAlreadyExistsException.class, () -> + commit(source, dest, ResilientCommitByRename.CommitFlags.NONE)); + LOG.info("caught exception", exception); + } + + /** + * It is an error if the destination exists. + */ + @Test + public void testCommitExistingFileWithOverwrite() throws Throwable { + describe("overwrite in commit is allowed when requested"); + Path base = methodPath(); + Path source = new Path(base, "source-256.txt"); + byte[] sourceData = dataset(256, 'a', 'z'); + final FileSystem fs = getFileSystem(); + writeDataset(fs, source, sourceData, sourceData.length, 1024, false); + Path dest = new Path(base, "dest-512.txt"); + byte[] destData = dataset(512, 'A', 'Z'); + writeDataset(fs, dest, destData, destData.length, 1024, false); + commit(source, dest, ResilientCommitByRename.CommitFlags.OVERWRITE); + + } + + + @Test + public void testCommitParentIsFile() throws Exception { + String action = "commit where the parent is a file"; + describe(action); + Path base = methodPath(); + Path grandparent = new Path(base, "file"); + expectCommitUnderFileFails(action, + grandparent, + new Path(base, "testCommitSrc"), + new Path(grandparent, "testCommitTarget")); + } + + @Test + public void testCommitGrandparentIsFile() throws Exception { + String action = "commit where the grandparent is a file"; + describe(action); + Path base = methodPath(); + Path grandparent = new Path(base, "file"); + Path parent = new Path(grandparent, "parent"); + expectCommitUnderFileFails(action, + grandparent, + new Path(base, "testCommitSrc"), + new Path(parent, "testCommitTarget")); + } + + protected void expectCommitUnderFileFails(String action, + Path file, Path renameSrc, Path renameTarget) + throws Exception { + byte[] data = dataset(256, 'a', 'z'); + FileSystem fs = getFileSystem(); + writeDataset(fs, file, data, data.length, 1024 * 1024, + true); + writeDataset(fs, renameSrc, data, data.length, 1024 * 1024, + true); + + IOException exception = intercept(IOException.class, () -> + commit(renameSrc, renameTarget)); + LOG.info("caught exception", exception); + assertPathDoesNotExist("after failure ", renameTarget); + assertPathExists(action, renameSrc); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractResilientCommitByRename.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractResilientCommitByRename.java new file mode 100644 index 0000000000000..c8a1ee02900c0 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractResilientCommitByRename.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.localfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractResilientCommitByRenameTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Local FS does not support resilient commit; this suite verifies + * the fallback is consistent. + */ +public class TestLocalFSContractResilientCommitByRename extends + AbstractContractResilientCommitByRenameTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new LocalFSContract(conf); + } + + @Override + public boolean isResilient() { + return false; + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawlocalContractResilientCommitByRename.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawlocalContractResilientCommitByRename.java new file mode 100644 index 0000000000000..433a85ddd8c1d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawlocalContractResilientCommitByRename.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.rawlocal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractResilientCommitByRenameTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Raw local FS does resilient commit by rename. + */ +public class TestRawlocalContractResilientCommitByRename extends + AbstractContractResilientCommitByRenameTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RawlocalFSContract(conf); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 877d73c5a59de..2d7c9b8d2b2d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -20,6 +20,14 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -29,6 +37,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.impl.ResilientCommitByRename; +import org.apache.hadoop.fs.impl.ResilientCommitByRenameHelper; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -38,12 +49,13 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** An {@link OutputCommitter} that commits files specified +/** An {@link OutputCommitter} that commits files specified * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. **/ @InterfaceAudience.Public @@ -100,11 +112,24 @@ public class FileOutputCommitter extends PathOutputCommitter { public static final boolean FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false; + public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS = + "mapreduce.fileoutputcommitter.algorithm.version.v1.experimental.mv.threads"; + public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS_DEFAULT = 1; + + public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT = + "mapreduce.fileoutputcommitter.algorithm.version.v1.experimental.parallel.task.commit"; + public static final boolean + FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT_DEFAULT = true; + private Path outputPath = null; private Path workPath = null; private final int algorithmVersion; private final boolean skipCleanup; private final boolean ignoreCleanupFailures; + private final int moveThreads; + private final AtomicInteger numberOfTasks = new AtomicInteger(); + private final boolean isParallelTaskCommitEnabled; + private ResilientCommitByRenameHelper resilientCommitHelper; /** * Create a file output committer @@ -139,6 +164,14 @@ public FileOutputCommitter(Path outputPath, algorithmVersion = conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT); + int configuredThreads = + context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS, + FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS_DEFAULT); + int minThreads = Math.max(1, configuredThreads); + moveThreads = Math.min(minThreads, (Runtime.getRuntime().availableProcessors() * 16)); + isParallelTaskCommitEnabled = context.getConfiguration().getBoolean( + FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT, + FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT_DEFAULT); LOG.info("File Output Committer Algorithm version is " + algorithmVersion); if (algorithmVersion != 1 && algorithmVersion != 2) { throw new IOException("Only 1 or 2 algorithm version is supported"); @@ -396,6 +429,11 @@ public void commitJob(JobContext context) throws IOException { */ @VisibleForTesting protected void commitJobInternal(JobContext context) throws IOException { + if (isParallelMoveEnabled()) { + // Note: Code paths are intentionally copied + parallelCommitJobInternal(context); + return; + } if (hasOutputPath()) { Path finalOutput = getOutputPath(); FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); @@ -497,6 +535,302 @@ private void mergePaths(FileSystem fs, final FileStatus from, } } + @VisibleForTesting + public int getMoveThreads() { + return moveThreads; + } + + public boolean isParallelMoveEnabled() { + // Only available for algo v1 + return (moveThreads > 1 && algorithmVersion == 1); + } + + void validateParallelMove() throws IOException { + if (!isParallelMoveEnabled()) { + throw new IOException("Parallel file move is not enabled. " + + "moveThreads=" + moveThreads + + ", algo=" + algorithmVersion); + } + } + + void validateThreadPool(ExecutorService pool, BlockingQueue> futures) + throws IOException { + boolean threadPoolEnabled = isParallelMoveEnabled(); + if (!threadPoolEnabled || pool == null || futures == null) { + String errorMsg = "Thread pool is not configured correctly. " + + "threadPoolEnabled: " + threadPoolEnabled + + ", pool: " + pool + + ", futures: " + futures; + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + } + + /** + * Get executor service for moving files for v1 algorithm. + * @return executor service + * @throws IOException on error + */ + private ExecutorService createExecutorService() throws IOException { + // intentional validation + validateParallelMove(); + + ExecutorService pool = new ThreadPoolExecutor(moveThreads, moveThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("FileCommitter-v1-move-thread-%d") + .build(), + new ThreadPoolExecutor.CallerRunsPolicy() + ); + LOG.info("Size of move thread pool: {}, pool: {}", moveThreads, pool); + return pool; + } + + private void parallelCommitJobInternal(JobContext context) throws IOException { + // validate to be on safer side. + validateParallelMove(); + + if (hasOutputPath()) { + Path finalOutput = getOutputPath(); + FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); + // created resilient commit helper bonded to the destination FS/path + resilientCommitHelper = new ResilientCommitByRenameHelper(fs); + if (resilientCommitHelper.resilientCommitAvailable(finalOutput)) { + LOG.info("Using resilient commit API to move files"); + } + + // No need to check for algo ver=1, as this entire code path is for V1. + try (DurationInfo d = new DurationInfo(LOG, true, + "Merging data from committed tasks %s", finalOutput)) { + mergeCommittedTasksInParallel(fs, context, finalOutput); + } + + if (skipCleanup) { + LOG.info("Skip cleanup the _temporary folders under job's output " + + "directory in commitJob."); + } else { + // delete the _temporary folder. + try { + cleanupJob(context); + } catch (IOException e) { + if (ignoreCleanupFailures) { + // swallow exceptions in cleanup as user configure to make sure + // commitJob could be success even when cleanup get failure. + LOG.error("Error in cleanup job, manually cleanup is needed.", e); + } else { + // throw back exception to fail commitJob. + throw e; + } + } + } + // True if the job requires output.dir marked on successful job. + // Note that by default it is set to true. + if (context.getConfiguration().getBoolean( + SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { + Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME); + // If job commit is repeatable and previous/another AM could write + // mark file already, we need to set overwritten to be true explicitly + // in case other FS implementations don't overwritten by default. + fs.create(markerPath, true).close(); + } + } else { + LOG.warn("Output Path is null in commitJob()"); + } + } + + private void mergeCommittedTasksInParallel(FileSystem fs, JobContext context, + Path finalOutput) throws IOException { + + // validate to be on safer side. + validateParallelMove(); + + ExecutorService pool = createExecutorService(); + BlockingQueue> futures = new LinkedBlockingQueue<>(); + try { + for (FileStatus stat : getAllCommittedTaskPaths(context)) { + LOG.info("Merging in parallel, from: {}, to: {}, parallelTaskCommitEnabled: {}", + stat.getPath(), finalOutput, isParallelTaskCommitEnabled); + if (isParallelTaskCommitEnabled) { + futures.add(pool.submit(() -> { + mergePathsInParallel(fs, stat, finalOutput, context, pool, futures); + return null; + })); + numberOfTasks.getAndIncrement(); + } else { + mergePathsInParallel(fs, stat, finalOutput, context, pool, futures); + } + } + drainFutures(pool, futures); + } finally { + shutdownThreadPool(pool); + } + } + + /** + * Drain all future tasks and clean up thread pool. + * + * @throws IOException if an exception was raised in any pool. + */ + private void drainFutures(ExecutorService pool, BlockingQueue> futures) + throws IOException { + if (futures == null) { + return; + } + try { + int i = 0; + while(!futures.isEmpty()) { + futures.take().get(); + if (i % 1000 == 0) { + LOG.info("Drained task id: {}, overall: {}", i, numberOfTasks.get()); + } + i++; + } + } catch (InterruptedException | ExecutionException e) { + throw cancelTasks(pool, e); + } + } + + @VisibleForTesting + public int getNumCompletedTasks() { + return numberOfTasks.get(); + } + + /** + * Cancel pending tasks in case of exception. + * + * @param pool threadpool + * @param e exception + * @return IOException + */ + private IOException cancelTasks(ExecutorService pool, Exception e) { + if (e == null) { + // shouldn't land here + return new IOException("exception was null"); + } + LOG.error("Cancelling all tasks and shutting down thread pool.", e); + pool.shutdownNow(); + if (e.getCause() instanceof IOException) { + return (IOException) e.getCause(); + } + return new IOException(e); + } + + /** + * Shutdown thread pool. + */ + private void shutdownThreadPool(ExecutorService pool) { + if (pool != null && !pool.isTerminated()) { + LOG.info("Shutting down thread pool"); + pool.shutdown(); + numberOfTasks.set(0); + } + } + + /** + * Merge two paths together with parallel threads. + * Anything in from will be moved into to, if there + * are any name conflicts while merging the files or directories in from win. + * @param fs the File System to use + * @param from the path data is coming from. + * @param to the path data is going to. + * @throws IOException on any error + */ + private void mergePathsInParallel(FileSystem fs, final FileStatus from, + final Path to, JobContext context, ExecutorService pool, + BlockingQueue> futures) throws IOException { + validateThreadPool(pool, futures); + + try (DurationInfo d = new DurationInfo(LOG, false, + "Merging data from %s to %s", from.getPath(), to)) { + reportProgress(context); + FileStatus toStat; + try { + toStat = fs.getFileStatus(to); + } catch (FileNotFoundException fnfe) { + toStat = null; + } + + if (from.isFile()) { + if (toStat != null) { + if (!fs.delete(to, true) && fs.exists(to)) { + throw new IOException("Failed to delete " + to); + } + } + + // do the rename + moveFileInParallelCommit(from, to); + } else if (from.isDirectory()) { + if (toStat != null) { + if (!toStat.isDirectory()) { + if (!fs.delete(to, true)) { + throw new IOException("Failed to delete " + to); + } + boolean dirCreated = fs.mkdirs(to); + LOG.debug("Merging from:{} to:{}, destCreated: {}", from.getPath(), to, dirCreated); + mergePathsInParallel(fs, from, to, context, pool, futures); + } else { + //It is a directory so merge everything in the directories + LOG.debug("Dir merge from : {} to: {}", from.getPath(), to); + mergeDirInParallel(fs, from, to, context, pool, futures); + } + } else { + // Init dir to avoid conflicting multi-threaded rename + boolean dirCreated = fs.mkdirs(to); + LOG.debug("Created dir upfront. from: {} --> to:{}, totalTasks:{}, destCreated: {}", + from.getPath(), to, futures.size(), dirCreated); + mergePathsInParallel(fs, from, to, context, pool, futures); + } + } + } + } + + /** + * Rename the file via the resilient commit helper. + * Becquse any file at the destination will have been deleted, + * tell the commit helper that there is no need to probe the + * store for existance. + * This assumes that no two tasks created files with the same name, + * but so does any overwrite check performed nonatomically + * on the client. + * @param from source filestatus + * @param to destination path + * @throws IOException failure to commit or rename. + */ + private void moveFileInParallelCommit(final FileStatus from, final Path to) + throws IOException { + resilientCommitHelper.commitFile(from, to, + ResilientCommitByRename.CommitFlags.DESTINATION_DOES_NOT_EXIST); + } + + /** + * Merge a directory in parallel fashion. + * + * @param fs + * @param from + * @param to + * @param context + * @throws IOException + */ + private void mergeDirInParallel(FileSystem fs, FileStatus from, Path to, + JobContext context, ExecutorService pool, + BlockingQueue> futures) throws IOException { + + validateThreadPool(pool, futures); + RemoteIterator it = fs.listStatusIterator(from.getPath()); + while(it.hasNext()) { + FileStatus subFrom = it.next(); + Path subTo = new Path(to, subFrom.getPath().getName()); + LOG.debug("Merge from: {}, to: {} in parallel", subFrom.getPath(), subTo); + futures.add(pool.submit(() -> { + mergePathsInParallel(fs, subFrom, subTo, context, pool, futures); + return null; + })); + numberOfTasks.getAndIncrement(); + } + } + private void reportProgress(JobContext context) { if (context instanceof Progressable) { ((Progressable) context).progress(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index 526485df93490..9e438e0ece1e1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -23,14 +23,29 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_FILE_IMPL_KEY; +import static org.apache.hadoop.fs.contract.ContractTestUtils.createSubdirs; +import static org.apache.hadoop.fs.impl.ResilientCommitByRenameHelper.filesystemHasResilientCommmit; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.PENDING_DIR_NAME; import static org.junit.Assert.*; + +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.Assert; @@ -39,7 +54,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; @@ -47,6 +66,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -55,6 +75,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +85,8 @@ import static org.mockito.Mockito.verify; @SuppressWarnings("unchecked") -public class TestFileOutputCommitter { +@RunWith(Parameterized.class) +public class TestFileOutputCommitter extends AbstractHadoopTestBase { private static final Path outDir = new Path( System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), @@ -88,14 +110,39 @@ public class TestFileOutputCommitter { private Text key2 = new Text("key2"); private Text val1 = new Text("val1"); private Text val2 = new Text("val2"); + private final int mvThreads; + /** Use the raw local FS which implements resilient rename. */ + private final boolean useRawLocalFS; + + + public TestFileOutputCommitter(int threads, final boolean useRawLocalFS) { + this.mvThreads = threads; + this.useRawLocalFS = useRawLocalFS; + } + + /** + * Test parameters are on thread count and whether or not to + * use a resilient committer filesystem. + * Intermixed to avoid an explosion of test runs. + * @return test params + */ + @Parameterized.Parameters(name="t-{0}-resilient-{1}") + public static Collection getParameters() { + // -1 is covered in separate test case + return Arrays.asList(new Object[][]{ + {0, false}, + {1, true}, + {4, false}, + {8, true} + }); + } - private static void cleanup() throws IOException { Configuration conf = new Configuration(); FileSystem fs = outDir.getFileSystem(conf); fs.delete(outDir, true); } - + @Before public void setUp() throws IOException { cleanup(); @@ -144,6 +191,7 @@ private void testRecoveryInternal(int commitVersion, int recoveryVersion) Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, @@ -204,6 +252,20 @@ private void testRecoveryInternal(int commitVersion, int recoveryVersion) FileUtil.fullyDelete(new File(outDir.toString())); } + /** + * Apply test parameters. + * @param conf configuration to patch. + */ + private void applyParameters(final Configuration conf) { + conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS, mvThreads); + conf.setClass(FS_FILE_IMPL_KEY, + useRawLocalFS + ? RawLocalFileSystem.class + : LocalFileSystem.class, + FileSystem.class); + conf.setBoolean("fs.file.impl.disable.cache", true); + } + @Test public void testRecoveryV1() throws Exception { testRecoveryInternal(1, 1); @@ -237,6 +299,19 @@ private void validateContent(File dir) throws IOException { assertThat(output).isEqualTo(expectedOutput.toString()); } + private void validateSpecificFile(File expectedFile) throws IOException { + assertTrue("Could not find "+expectedFile, expectedFile.exists()); + StringBuffer expectedOutput = new StringBuffer(); + expectedOutput.append(key1).append('\t').append(val1).append("\n"); + expectedOutput.append(val1).append("\n"); + expectedOutput.append(val2).append("\n"); + expectedOutput.append(key2).append("\n"); + expectedOutput.append(key1).append("\n"); + expectedOutput.append(key2).append('\t').append(val2).append("\n"); + String output = slurp(expectedFile); + assertEquals(output, expectedOutput.toString()); + } + private void validateMapFileOutputContent( FileSystem fs, Path dir) throws IOException { // map output is a directory with index and data files @@ -266,6 +341,7 @@ private void testCommitterInternal(int version, boolean taskCleanup) Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt( FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, @@ -288,7 +364,7 @@ private void testCommitterInternal(int version, boolean taskCleanup) // check task and job temp directories exist File jobOutputDir = new File( - new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME).toString()); + new Path(outDir, PENDING_DIR_NAME).toString()); File taskOutputDir = new File(Path.getPathWithoutSchemeAndAuthority( committer.getWorkPath()).toString()); assertTrue("job temp dir does not exist", jobOutputDir.exists()); @@ -316,6 +392,280 @@ private void testCommitterInternal(int version, boolean taskCleanup) FileUtil.fullyDelete(new File(outDir.toString())); } + @Test + public void testNegativeThreadCount() throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS, -1); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + assertFalse("Threadpool disabled for v1 with -1 thread count", + committer.isParallelMoveEnabled()); + assertEquals("Threadpool disabled for thread config of -1", + 1, committer.getMoveThreads()); + } + + @Test + public void testThreadsWithAlgoV2() throws Exception { + testThreadsWithAlgoV2(mvThreads); + } + + @Test + public void testNegativeThreadCountAlgoV2() throws Exception { + testThreadsWithAlgoV2(-1); + } + + public void testThreadsWithAlgoV2(int threads) throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + + conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 2); + conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS, threads); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + assertFalse("Threadpool disabled for algo v2", committer.isParallelMoveEnabled()); + } + + private void createAndCommitTask(Configuration conf, String attempt, TaskAttemptID tID, + int version, boolean taskCleanup, final boolean setupJob) throws IOException, InterruptedException { + applyParameters(conf); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + conf.setInt( + FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, + version); + conf.setBoolean( + FileOutputCommitter.FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED, + taskCleanup); + JobContext jContext = new JobContextImpl(conf, tID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, tID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + + // setup + if (setupJob) { + committer.setupJob(jContext); + } + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeOutput(theRecordWriter, tContext); + + String filename = String.format("dummy-file-%s-", tID.getTaskID()); + ContractTestUtils.TreeScanResults created = + createSubdirs(FileSystem.get(conf), + committer.getWorkPath(), 3, 3, 2, 0, + "sub-dir-", filename, "0"); + LOG.info("Created subdirs: {}, toString: {}", created.getDirectories(), + created); + + // check task and job temp directories exist + File jobOutputDir = new File( + new Path(outDir, PENDING_DIR_NAME).toString()); + File taskOutputDir = new File(Path.getPathWithoutSchemeAndAuthority( + committer.getWorkPath()).toString()); + assertTrue("job temp dir does not exist", jobOutputDir.exists()); + assertTrue("task temp dir does not exist", taskOutputDir.exists()); + + // do commit + committer.commitTask(tContext); + assertTrue("job temp dir does not exist", jobOutputDir.exists()); + + if (version == 1 || taskCleanup) { + // Task temp dir gets renamed in v1 and deleted if taskCleanup is + // enabled in v2 + assertFalse("task temp dir still exists", taskOutputDir.exists()); + } else { + // By default, in v2 the task temp dir is only deleted during commitJob + assertTrue("task temp dir does not exist", taskOutputDir.exists()); + } + } + + private void createNTasks(Configuration conf, int version, boolean taskCleanup) + throws IOException, InterruptedException { + for (int i = 0; i <= 9; i++) { + String attempt = String.format("attempt_200707121733_0001_m_%03d_0", i); + TaskAttemptID taskID = TaskAttemptID.forName(attempt); + createAndCommitTask(conf, attempt, taskID, version, taskCleanup, i == 0); + } + } + + private void testCommitterInternalWithMultipleTasks(int version, boolean taskCleanup, + boolean parallelCommit) throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + applyParameters(conf); + conf.setBoolean(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT, parallelCommit); + + // Create multiple tasks and commit + createNTasks(conf, version, taskCleanup); + + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + FileOutputCommitter committer = new FileOutputCommitter(outDir, jContext); + try (DurationInfo du = new DurationInfo(LOG, + "commit job with with mvThreads: %s", mvThreads)) { + committer.commitJob(jContext); + } + + //Verify if temp dirs are cleared up + if (committer.hasOutputPath()) { + Path path = new Path(committer.getOutputPath(), PENDING_DIR_NAME); + final FileSystem fs = path.getFileSystem(conf); + ContractTestUtils.assertPathDoesNotExist(fs, + "Job attempt path should have been deleted", + path); + } + + RemoteIterator it = FileSystem.get(conf).listFiles(outDir, true); + while(it.hasNext()) { + LocatedFileStatus fileStatus = it.next(); + Path file = fileStatus.getPath(); + if (file.getName().equals("_SUCCESS")) { + continue; + } + // Validate only real file (ignoring dummy-file-* created via createSubdirs() here). + if (fileStatus.isFile() && !file.getName().contains("dummy-file-")) { + LOG.info("validate file:{}", file); + validateSpecificFile(new File(file.toUri().getPath())); + } else { + LOG.info("Not validating {}", file.toString()); + } + } + FileUtil.fullyDelete(new File(outDir.toString())); + } + + private void testAbortWithMultipleTasksV1(int version, boolean taskCleanup, + boolean parallelCommit) throws IOException, InterruptedException { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + applyParameters(conf); + conf.setBoolean(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT, parallelCommit); + + // Create multiple tasks and commit + createNTasks(conf, version, taskCleanup); + + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + FileOutputCommitter committer = new FileOutputCommitter(outDir, jContext); + LOG.info("Running with mvThreads:{}", mvThreads); + // Abort the job + committer.abortJob(jContext, JobStatus.State.FAILED); + File expectedFile = new File(new Path(outDir, PENDING_DIR_NAME) + .toString()); + assertFalse("job temp dir still exists", expectedFile.exists()); + assertEquals("Output directory not empty", 0, new File(outDir.toString()) + .listFiles().length); + verifyNumScheduledTasks(committer); + FileUtil.fullyDelete(new File(outDir.toString())); + } + + @Test + public void testCommitterInternalWithMultipleTasksV1() throws Exception { + testCommitterInternalWithMultipleTasks(1, true, false); + } + + @Test + public void testCommitterInternalWithMultipleTasksV1Parallel() throws Exception { + testCommitterInternalWithMultipleTasks(1, true, true); + } + + @Test + public void testAbortWithMultipleTasksV1() throws IOException, InterruptedException { + testAbortWithMultipleTasksV1(1, true, false); + } + @Test + public void testAbortWithMultipleTasksV1Parallel() throws IOException, InterruptedException { + testAbortWithMultipleTasksV1(1, true, true); + } + + @Test + public void testVerifyFilesystemResilience() throws Throwable { + final Configuration conf = new Configuration(); + applyParameters(conf); + FileSystem fs = outDir.getFileSystem(conf); + Assertions.assertThat(filesystemHasResilientCommmit(fs, outDir)) + .describedAs("filesystem resilience of %s", fs) + .isEqualTo(useRawLocalFS); + } + + static class CustomJobContextImpl extends JobContextImpl implements Progressable { + FileOutputCommitter committer; + + public CustomJobContextImpl(Configuration conf, JobID jobId) { + super(conf, jobId); + } + + public void progress() { + if (committer != null && committer.isParallelMoveEnabled()) { + throw new RuntimeException("Throwing exception during progress. moveThreads " + + committer.getMoveThreads()); + } + } + + public void setCommitter(FileOutputCommitter committer) { + this.committer = committer; + } + } + + @Test + public void testV1CommitterInternalWithException() throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + applyParameters(conf); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + conf.setInt( + FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 1); + conf.setBoolean( + FileOutputCommitter.FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED, true); + // Custom job context which can be used for triggering exceptions + CustomJobContextImpl jContext = new CustomJobContextImpl(conf, taskID.getJobID()); + TaskAttemptContextImpl tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + //This will help in triggering exceptions when parallel threads are enabled + jContext.setCommitter(committer); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeOutput(theRecordWriter, tContext); + + // check task and job temp directories exist + File jobOutputDir = new File( + new Path(outDir, PENDING_DIR_NAME).toString()); + File taskOutputDir = new File(Path.getPathWithoutSchemeAndAuthority( + committer.getWorkPath()).toString()); + assertTrue("job temp dir does not exist", jobOutputDir.exists()); + assertTrue("task temp dir does not exist", taskOutputDir.exists()); + + // do commit + committer.commitTask(tContext); + assertTrue("job temp dir does not exist", jobOutputDir.exists()); + + try { + committer.commitJob(jContext); + if (committer.isParallelMoveEnabled()) { + // Exception is thrown from CustomJobContextImpl, only when parallel file moves are enabled. + Assert.fail("Commit successful: wrong behavior for version 1. moveThreads:" + mvThreads); + } + } catch(IOException e) { + if (committer.isParallelMoveEnabled()) { + assertTrue("Exception from getProgress should have been caught", + e.getMessage().contains("Throwing exception during progress")); + } + } + + // Clear off output dir + FileUtil.fullyDelete(new File(outDir.toString())); + } + @Test public void testCommitterV1() throws Exception { testCommitterInternal(1, false); @@ -346,6 +696,7 @@ private void testCommitterWithDuplicatedCommitInternal(int version) throws Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); @@ -400,6 +751,7 @@ private void testCommitterWithFailureInternal(int version, int maxAttempts) Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); @@ -444,6 +796,7 @@ public void testProgressDuringMerge() throws Exception { Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 2); @@ -484,6 +837,7 @@ private void testCommitterRetryInternal(int version) Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); @@ -539,6 +893,7 @@ private void testMapFileOutputCommitterInternal(int version) Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); @@ -587,6 +942,7 @@ public void testInvalidVersionNumber() throws IOException { Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 3); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); @@ -598,11 +954,17 @@ public void testInvalidVersionNumber() throws IOException { } } + private void verifyNumScheduledTasks(FileOutputCommitter committer) { + assertEquals("Scheduled tasks should have been 0 after shutting down thread pool", + 0, committer.getNumCompletedTasks()); + } + private void testAbortInternal(int version) throws IOException, InterruptedException { Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); @@ -626,11 +988,12 @@ private void testAbortInternal(int version) assertFalse("task temp dir still exists", expectedFile.exists()); committer.abortJob(jContext, JobStatus.State.FAILED); - expectedFile = new File(new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME) + expectedFile = new File(new Path(outDir, PENDING_DIR_NAME) .toString()); assertFalse("job temp dir still exists", expectedFile.exists()); assertEquals("Output directory not empty", 0, new File(outDir.toString()) .listFiles().length); + verifyNumScheduledTasks(committer); FileUtil.fullyDelete(new File(outDir.toString())); } @@ -713,6 +1076,7 @@ private void testFailAbortInternal(int version) assertTrue(th.getMessage().contains("fake delete failed")); assertTrue("job temp dir does not exists", jobTmpDir.exists()); FileUtil.fullyDelete(new File(outDir.toString())); + verifyNumScheduledTasks(committer); } @Test @@ -816,6 +1180,7 @@ public Void call() throws IOException, InterruptedException { // validate output validateContent(OUT_SUB_DIR); FileUtil.fullyDelete(new File(outDir.toString())); + verifyNumScheduledTasks(amCommitter); } @Test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java index 19b712f3da19b..3914db7e77e54 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java @@ -20,6 +20,8 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -36,16 +38,35 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; /** * A JUnit test to test Map-Reduce job committer. */ +@RunWith(Parameterized.class) public class TestJobOutputCommitter extends HadoopTestCase { - public TestJobOutputCommitter() throws IOException { - super(CLUSTER_MR, LOCAL_FS, 1, 1); + public TestJobOutputCommitter(int mrMode, int fsMode, int taskTrackers, int dataNodes, + int mvThreads) throws IOException { + super(mrMode, fsMode, taskTrackers, dataNodes); + this.mvThreads = mvThreads; + } + + @Parameterized.Parameters + public static Collection getParameters() { + // CLUSTER_MR, LOCAL_FS, taskTrackers, dataNodes, mvThreads + return Arrays.asList(new Object[][] { + { 2, 4, 1, 1, 1 }, + { 2, 4, 1, 1, 2 }, + { 2, 4, 1, 1, 4 }, + { 2, 4, 1, 1, 8 }, + { 2, 4, 1, 1, 10 }, + }); } private static String TEST_ROOT_DIR = new File(System.getProperty( @@ -58,11 +79,13 @@ public TestJobOutputCommitter() throws IOException { private static int outDirs = 0; private FileSystem fs; private Configuration conf = null; + private int mvThreads; @Before public void setUp() throws Exception { super.setUp(); conf = createJobConf(); + conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS, mvThreads); fs = getFileSystem(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index d30129b7a7d68..139975af9a39b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -27,6 +27,8 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.file.AccessDeniedException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Hashtable; import java.util.List; import java.util.ArrayList; @@ -34,6 +36,7 @@ import java.util.EnumSet; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -42,6 +45,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import javax.annotation.Nullable; + import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; @@ -84,6 +89,7 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.impl.ResilientCommitByRename; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -116,7 +122,7 @@ */ @InterfaceStability.Evolving public class AzureBlobFileSystem extends FileSystem - implements IOStatisticsSource { + implements IOStatisticsSource, ResilientCommitByRename { public static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystem.class); private URI uri; private Path workingDir; @@ -433,7 +439,7 @@ public boolean rename(final Path src, final Path dst) throws IOException { qualifiedDstPath = makeQualified(adjustedDst); - abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext); + abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null, null); return true; } catch(AzureBlobFileSystemException ex) { LOG.debug("Rename operation failed. ", ex); @@ -451,6 +457,52 @@ public boolean rename(final Path src, final Path dst) throws IOException { } + @Override + public CommitByRenameOutcome commitSingleFileByRename(final Path src, + final Path dst, + @Nullable final String sourceEtag, + @Nullable final FileStatus sourceStatus, + final CommitFlags... options) throws IOException { + + LOG.debug("AzureBlobFileSystem.commitSingleFileByRename src: {} dst: {}", src, dst); + statIncrement(CALL_RENAME); + + trailingPeriodCheck(dst); + Path qualifiedSrcPath = makeQualified(src); + Path qualifiedDstPath = makeQualified(dst); + + TracingContext tracingContext = new TracingContext(clientCorrelationId, + fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat, + listener); + + if (qualifiedSrcPath.equals(qualifiedDstPath)) { + // rename to itself is forbidden + throw new PathIOException(qualifiedSrcPath.toString(), "cannot rename object onto self"); + } + + // reject non-HNS operations for simplicity + if (!abfsStore.getIsNamespaceEnabled(tracingContext)) { + throw new ResilientCommitByRenameUnsupported(qualifiedSrcPath.toString()); + } + Set flags = new HashSet<>(Arrays.asList(options)); + + if (!flags.contains(CommitFlags.DESTINATION_DOES_NOT_EXIST) + && flags.contains(CommitFlags.OVERWRITE)) { + // nonrecursive delete the path at the destination as this FS is strict. + abfsStore.delete(qualifiedDstPath, false, tracingContext); + } + try { + abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, sourceEtag, sourceStatus); + return new CommitByRenameOutcome(false, false, false); + } catch(AzureBlobFileSystemException ex) { + LOG.debug("Rename operation failed. ", ex); + checkException(src, ex); + // never reached + return null; + } + + } + @Override public boolean delete(final Path f, final boolean recursive) throws IOException { LOG.debug( @@ -1499,6 +1551,9 @@ public boolean hasPathCapability(final Path path, final String capability) case CommonPathCapabilities.FS_PERMISSIONS: case CommonPathCapabilities.FS_APPEND: return true; + + // resilient commit is restricted to heirarchical storage. + case ResilientCommitByRename.RESILIENT_COMMIT_BY_RENAME_PATH_CAPABILITY: case CommonPathCapabilities.FS_ACLS: return getIsNamespaceEnabled( new TracingContext(clientCorrelationId, fileSystemId, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 4fd194b1e01cb..65fe013f6d57b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -877,7 +877,22 @@ public void breakLease(final Path path, final TracingContext tracingContext) thr client.breakLease(getRelativePath(path), tracingContext); } - public void rename(final Path source, final Path destination, TracingContext tracingContext) throws + /** + * Rename a file or dir. + * If the etag/source status are passed in and non-null. + * it is implicit that the source is a file, not a directory. + * @param source path to source file/dir + * @param destination destination of rename. + * @param tracingContext tracing + * @param sourceEtag etag of source file. may be null or empty + * @param sourceStatus nullable FileStatus of source + * @throws AzureBlobFileSystemException failure + */ + public void rename(final Path source, + final Path destination, + final TracingContext tracingContext, + final String sourceEtag, + final FileStatus sourceStatus) throws AzureBlobFileSystemException { final Instant startAggregate = abfsPerfTracker.getLatencyInstant(); long countAggregate = 0; @@ -975,7 +990,7 @@ public FileStatus getFileStatus(final Path path, final long blockSize = abfsConfiguration.getAzureBlockSize(); final AbfsHttpOperation result = op.getResult(); - final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG); + String eTag = extractEtagHeader(result); final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS)); final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions); @@ -1733,7 +1748,7 @@ private AbfsPerfInfo startTracking(String callerName, String calleeName) { return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName); } - private static class VersionedFileStatus extends FileStatus { + public static class VersionedFileStatus extends FileStatus { private final String version; VersionedFileStatus( @@ -1797,6 +1812,15 @@ public String getVersion() { return this.version; } + /** + * Returns the etag of this FileStatus + * + * @return an etag if known. + */ + public String getEtag() { + return getVersion(); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder( @@ -1902,4 +1926,30 @@ boolean areLeasesFreed() { } return true; } + + /** + * Get the etag header from a response, stripping any quotations. + * see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag + * @param result response to process. + * @return the quote-unwrapped etag. + */ + private static String extractEtagHeader(AbfsHttpOperation result) { + String etag = result.getResponseHeader(HttpHeaderConfigurations.ETAG); + if (etag != null) { + // strip out any wrapper "" quotes which come back, for consistency with + // list calls + if (etag.startsWith("W/\"")) { + // Weak etag + etag = etag.substring(3); + } else if (etag.startsWith("\"")) { + // strong etag + etag = etag.substring(1); + } + if (etag.endsWith("\"")) { + // trailing quote + etag = etag.substring(0, etag.length() - 1); + } + } + return etag; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractResilientCommitByRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractResilientCommitByRename.java new file mode 100644 index 0000000000000..0f4f52754bc71 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractResilientCommitByRename.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractResilientCommitByRenameTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * ABFS FS does resilient commit by rename. + */ +public class ITestAbfsContractResilientCommitByRename extends + AbstractContractResilientCommitByRenameTest { + + private final ABFSContractTestBinding binding; + + public ITestAbfsContractResilientCommitByRename() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + } + + @Override + protected Configuration createConfiguration() { + return binding.getRawConfiguration(); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, binding.isSecureMode()); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsEtagSupport.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsEtagSupport.java new file mode 100644 index 0000000000000..ea3dd1fcd7b48 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsEtagSupport.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contract; + +import java.nio.charset.StandardCharsets; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; +import org.apache.hadoop.fs.contract.ContractTestUtils; + +/** + * Test requirements for etag support. + */ +public class ITestAbfsEtagSupport extends AbstractFSContractTestBase { + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsEtagSupport.class); + + private final ABFSContractTestBinding binding; + + + public ITestAbfsEtagSupport() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + // Base rename contract test class re-uses the test folder + // This leads to failures when the test is re-run as same ABFS test + // containers are re-used for test run and creation of source and + // destination test paths fail, as they are already present. + binding.getFileSystem().delete(binding.getTestPath(), true); + } + + @Override + protected Configuration createConfiguration() { + return binding.getRawConfiguration(); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, binding.isSecureMode()); + } + + /** + * basic consistency across operations, as well as being non-empty. + */ + @Test + public void testEtagConsistencyAcrossListAndHead() throws Throwable { + describe("Etag values must be non-empty and consistent across LIST and HEAD Calls."); + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + + ContractTestUtils.touch(fs, path); + + final FileStatus st = fs.getFileStatus(path); + final String etag = etagFromStatus(st); + Assertions.assertThat(etag) + .describedAs("Etag of %s", st) + .isNotBlank(); + LOG.info("etag of empty file is \"{}\"", etag); + + final FileStatus[] statuses = fs.listStatus(path); + Assertions.assertThat(statuses) + .describedAs("List(%s)", path) + .hasSize(1); + final FileStatus lsStatus = statuses[0]; + Assertions.assertThat(etagFromStatus(lsStatus)) + .describedAs("etag of list status (%s) compared to HEAD value of %s", lsStatus, st) + .isEqualTo(etag); + } + + /** + * Get an etag from a FileStatus which MUST BE + * an implementation of EtagSource and + * whose etag MUST NOT BE null/empty. + * @param st the status + * @return the etag + */ + String etagFromStatus(FileStatus st) { + Assertions.assertThat(st) + .describedAs("FileStatus %s", st) + .isInstanceOf(AzureBlobFileSystemStore.VersionedFileStatus.class); + final String etag = ((AzureBlobFileSystemStore.VersionedFileStatus) st).getEtag(); + Assertions.assertThat(etag) + .describedAs("Etag of %s", st) + .isNotBlank(); + return etag; + } + + /** + * Overwritten data has different etags. + */ + @Test + public void testEtagsOfDifferentDataDifferent() throws Throwable { + describe("Verify that two different blocks of data written have different tags"); + + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + Path src = new Path(path, "src"); + + ContractTestUtils.createFile(fs, src, true, + "data1234".getBytes(StandardCharsets.UTF_8)); + final FileStatus srcStatus = fs.getFileStatus(src); + final String srcTag = etagFromStatus(srcStatus); + LOG.info("etag of file 1 is \"{}\"", srcTag); + + // now overwrite with data of same length + // (ensure that path or length aren't used exclusively as tag) + ContractTestUtils.createFile(fs, src, true, + "1234data".getBytes(StandardCharsets.UTF_8)); + + // validate + final String tag2 = etagFromStatus(fs.getFileStatus(src)); + LOG.info("etag of file 2 is \"{}\"", tag2); + + Assertions.assertThat(tag2) + .describedAs("etag of updated file") + .isNotEqualTo(srcTag); + } + + /** + * If supported, rename preserves etags. + */ + @Test + public void testEtagConsistencyAcrossRename() throws Throwable { + describe("Verify that when a file is renamed, the etag remains unchanged"); + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + + Path src = new Path(path, "src"); + Path dest = new Path(path, "dest"); + + ContractTestUtils.createFile(fs, src, true, + "sample data".getBytes(StandardCharsets.UTF_8)); + final FileStatus srcStatus = fs.getFileStatus(src); + final String srcTag = etagFromStatus(srcStatus); + LOG.info("etag of short file is \"{}\"", srcTag); + + Assertions.assertThat(srcTag) + .describedAs("Etag of %s", srcStatus) + .isNotBlank(); + + // rename + fs.rename(src, dest); + + // validate + FileStatus destStatus = fs.getFileStatus(dest); + final String destTag = etagFromStatus(destStatus); + Assertions.assertThat(destTag) + .describedAs("etag of list status (%s) compared to HEAD value of %s", destStatus, srcStatus) + .isEqualTo(srcTag); + } + +}