From f071d14a1e3b0270b67cbe4ae7f16cdf1ff439b3 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 21 Sep 2020 12:40:31 +0100 Subject: [PATCH 1/3] MAPREDUCE-7282: remove v2 commit algorithm for correctness reasons. * All jobs requsting v2 algorithm are WARNED and then switched to v1. * the v2 codepaths in file output committer have been removed, even though some of the method names "rename or merge" are unchanged. This patch doesn't fix those tests which will fail; I have those changed and will submit after a Yetus run, to show what breaks/ is fixed Change-Id: Ic736e6c3794f40101ac4061173a5fdf70e8f9cca --- .../lib/output/FileOutputCommitter.java | 119 +++++++----------- .../src/main/resources/mapred-default.xml | 60 +++++++-- 2 files changed, 90 insertions(+), 89 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 7f1ea6175ed6e..c15646a3875e8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -68,7 +68,7 @@ public class FileOutputCommitter extends PathOutputCommitter { "mapreduce.fileoutputcommitter.marksuccessfuljobs"; public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version"; - public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 2; + public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1; // Skip cleanup _temporary folders under job's output directory public static final String FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED = "mapreduce.fileoutputcommitter.cleanup.skipped"; @@ -93,15 +93,19 @@ public class FileOutputCommitter extends PathOutputCommitter { // commitJob will recursively delete the entire job temporary directory. // HDFS has O(1) recursive delete, so this parameter is left false by default. // Users of object stores, for example, may want to set this to true. Note: - // this is only used if mapreduce.fileoutputcommitter.algorithm.version=2 + // this was only used if mapreduce.fileoutputcommitter.algorithm.version=2, + // and so is now unused + @Deprecated public static final String FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED = "mapreduce.fileoutputcommitter.task.cleanup.enabled"; + + @Deprecated public static final boolean FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false; private Path outputPath = null; private Path workPath = null; - private final int algorithmVersion; + //private final int algorithmVersion; private final boolean skipCleanup; private final boolean ignoreCleanupFailures; @@ -135,13 +139,18 @@ public FileOutputCommitter(Path outputPath, JobContext context) throws IOException { super(outputPath, context); Configuration conf = context.getConfiguration(); - algorithmVersion = + int algorithmVersion = conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT); LOG.info("File Output Committer Algorithm version is " + algorithmVersion); if (algorithmVersion != 1 && algorithmVersion != 2) { throw new IOException("Only 1 or 2 algorithm version is supported"); } + // Downgrade v2 to v1 with a warning. + if (algorithmVersion == 2) { + LOG.warn("The v2 algorithm has been removed to ensure the output of jobs" + + " are always correct"); + } // if skip cleanup skipCleanup = conf.getBoolean( @@ -399,10 +408,8 @@ protected void commitJobInternal(JobContext context) throws IOException { Path finalOutput = getOutputPath(); FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); - if (algorithmVersion == 1) { - for (FileStatus stat: getAllCommittedTaskPaths(context)) { - mergePaths(fs, stat, finalOutput, context); - } + for (FileStatus stat: getAllCommittedTaskPaths(context)) { + mergePaths(fs, stat, finalOutput, context); } if (skipCleanup) { @@ -503,16 +510,9 @@ private void reportProgress(JobContext context) { private void renameOrMerge(FileSystem fs, FileStatus from, Path to, JobContext context) throws IOException { - if (algorithmVersion == 1) { - if (!fs.rename(from.getPath(), to)) { - throw new IOException("Failed to rename " + from + " to " + to); - } - } else { - fs.mkdirs(to); - for (FileStatus subFrom : fs.listStatus(from.getPath())) { - Path subTo = new Path(to, subFrom.getPath().getName()); - mergePaths(fs, subFrom, subTo, context); - } + // this always does rename now that the V2 algorithm has been removed. + if (!fs.rename(from.getPath(), to)) { + throw new IOException("Failed to rename " + from + " to " + to); } } @@ -588,36 +588,18 @@ public void commitTask(TaskAttemptContext context, Path taskAttemptPath) } if (taskAttemptDirStatus != null) { - if (algorithmVersion == 1) { - Path committedTaskPath = getCommittedTaskPath(context); - if (fs.exists(committedTaskPath)) { - if (!fs.delete(committedTaskPath, true)) { - throw new IOException("Could not delete " + committedTaskPath); - } - } - if (!fs.rename(taskAttemptPath, committedTaskPath)) { - throw new IOException("Could not rename " + taskAttemptPath + " to " - + committedTaskPath); - } - LOG.info("Saved output of task '" + attemptId + "' to " + - committedTaskPath); - } else { - // directly merge everything from taskAttemptPath to output directory - mergePaths(fs, taskAttemptDirStatus, outputPath, context); - LOG.info("Saved output of task '" + attemptId + "' to " + - outputPath); - - if (context.getConfiguration().getBoolean( - FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED, - FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT)) { - LOG.debug(String.format( - "Deleting the temporary directory of '%s': '%s'", - attemptId, taskAttemptPath)); - if(!fs.delete(taskAttemptPath, true)) { - LOG.warn("Could not delete " + taskAttemptPath); - } - } + Path committedTaskPath = getCommittedTaskPath(context); + if (fs.exists(committedTaskPath)) { + if (!fs.delete(committedTaskPath, true)) { + throw new IOException("Could not delete " + committedTaskPath); + } } + if (!fs.rename(taskAttemptPath, committedTaskPath)) { + throw new IOException("Could not rename " + taskAttemptPath + " to " + + committedTaskPath); + } + LOG.info("Saved output of task '" + attemptId + "' to " + + committedTaskPath); } else { LOG.warn("No Output found for " + attemptId); } @@ -682,7 +664,7 @@ public boolean isRecoverySupported() { @Override public boolean isCommitJobRepeatable(JobContext context) throws IOException { - return algorithmVersion == 2; + return false; } @Override @@ -702,35 +684,21 @@ public void recoverTask(TaskAttemptContext context) if (LOG.isDebugEnabled()) { LOG.debug("Trying to recover task from " + previousCommittedTaskPath); } - if (algorithmVersion == 1) { - if (fs.exists(previousCommittedTaskPath)) { - Path committedTaskPath = getCommittedTaskPath(context); - if (!fs.delete(committedTaskPath, true) && - fs.exists(committedTaskPath)) { - throw new IOException("Could not delete " + committedTaskPath); - } - //Rename can fail if the parent directory does not yet exist. - Path committedParent = committedTaskPath.getParent(); - fs.mkdirs(committedParent); - if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) { - throw new IOException("Could not rename " + previousCommittedTaskPath + - " to " + committedTaskPath); - } - } else { - LOG.warn(attemptId+" had no output to recover."); + if (fs.exists(previousCommittedTaskPath)) { + Path committedTaskPath = getCommittedTaskPath(context); + if (!fs.delete(committedTaskPath, true) && + fs.exists(committedTaskPath)) { + throw new IOException("Could not delete " + committedTaskPath); } - } else { - // essentially a no-op, but for backwards compatibility - // after upgrade to the new fileOutputCommitter, - // check if there are any output left in committedTaskPath - try { - FileStatus from = fs.getFileStatus(previousCommittedTaskPath); - LOG.info("Recovering task for upgrading scenario, moving files from " - + previousCommittedTaskPath + " to " + outputPath); - mergePaths(fs, from, outputPath, context); - } catch (FileNotFoundException ignored) { + //Rename can fail if the parent directory does not yet exist. + Path committedParent = committedTaskPath.getParent(); + fs.mkdirs(committedParent); + if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) { + throw new IOException("Could not rename " + previousCommittedTaskPath + + " to " + committedTaskPath); } - LOG.info("Done recovering task " + attemptId); + } else { + LOG.warn(attemptId+" had no output to recover."); } } else { LOG.warn("Output Path is null in recoverTask()"); @@ -744,7 +712,6 @@ public String toString() { sb.append(super.toString()).append("; "); sb.append("outputPath=").append(outputPath); sb.append(", workPath=").append(workPath); - sb.append(", algorithmVersion=").append(algorithmVersion); sb.append(", skipCleanup=").append(skipCleanup); sb.append(", ignoreCleanupFailures=").append(ignoreCleanupFailures); sb.append('}'); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index c40bb0b19c88b..7b40768a116b0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1562,10 +1562,18 @@ mapreduce.fileoutputcommitter.algorithm.version - 2 - The file output committer algorithm version - valid algorithm version number: 1 or 2 - default to 2, which is the original algorithm + 21/value> + The file output committer algorithm version. + + There have been two algorithm versions in Hadoop, "1" and "2". + + The version 2 algorithm has been disabled as task commits + were not atomic. If the first task attempt failed partway + through the task commit, the output directory may end up + with data from that failed commit, as well as the data + from any subsequent attempts. + + See https://issues.apache.org/jira/browse/MAPREDUCE-7282 In algorithm version 1, @@ -1585,32 +1593,58 @@ $joboutput/, then it will delete $joboutput/_temporary/ and write $joboutput/_SUCCESS - It has a performance regression, which is discussed in MAPREDUCE-4815. + It has a performance limitation, which is discussed in MAPREDUCE-4815. + If a job generates many files to commit then the commitJob method call at the end of the job can take minutes. the commit is single-threaded and waits until all tasks have completed before commencing. - algorithm version 2 will change the behavior of commitTask, + The algorithm version 2 changed the behavior of commitTask, recoverTask, and commitJob. - 1. commitTask will rename all files in + 1. commitTask renamed all files in $joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/ to $joboutput/ - 2. recoverTask actually doesn't require to do anything, but for + 2. recoverTask didn't do anything, but for upgrade from version 1 to version 2 case, it will check if there are any files in $joboutput/_temporary/($appAttemptID - 1)/$taskID/ and rename them to $joboutput/ - 3. commitJob can simply delete $joboutput/_temporary and write + 3. commitJob deleted $joboutput/_temporary and wrotew $joboutput/_SUCCESS - This algorithm will reduce the output commit time for - large jobs by having the tasks commit directly to the final - output directory as they were completing and commitJob had - very little to do. + This algorithm reduced the output commit time for large jobs + as the work of renaming files to their destination took place + incrementally as tasks committed. However, it has a key flaw + + Task Attempt Commit is not atomic. If a task attempt ID 1 failed + partway through the rename, a second task attempt would be + scheduled. + + - If task attempts 1 and 2 generated files with different names, + then those files from task 1 with different names which were + already renamed into the destination, would still be present. + They would not be overwritten by the second attempt. + + - If task attempt 1 was still executing -even though the job + driver considered it to have failed- then a race condition + could arise where the output contained a mix of both task + attempts. + + The applications which use these committers, including MapReduce, + and Apache Spark expect task attempts to be atomic -this + commit algorithm is not compatible. + + Note: + * the Apache Hadoop S3A commit algorithms do have atomic task + commit and are safe. + * The Amazon "EMRFS S3-optimized Committer" completes its + multipart uploads in task commit, so has the same + limitation as for algorithm v2. + https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-committer-multipart.html From 048e17182001f9011598f4a4a136f0f74bc0db68 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 21 Sep 2020 13:30:41 +0100 Subject: [PATCH 2/3] MAPREDUCE-7282: remove V2 committer tests; fix XML Not done the full purge; the asserts on the v2 outcomes are still there. Change-Id: Id789429cf4fa7cf7908abc346b8a4109e84fa4b2 --- .../src/main/resources/mapred-default.xml | 2 +- .../mapred/TestFileOutputCommitter.java | 45 -------------- .../lib/output/TestFileOutputCommitter.java | 59 ------------------- 3 files changed, 1 insertion(+), 105 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 7b40768a116b0..a92719b8d2614 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1562,7 +1562,7 @@ mapreduce.fileoutputcommitter.algorithm.version - 21/value> + 1 The file output committer algorithm version. There have been two algorithm versions in Hadoop, "1" and "2". diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java index bb5c30e9511fa..36917531b4021 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java @@ -161,16 +161,6 @@ public void testRecoveryV1() throws Exception { testRecoveryInternal(1, 1); } - @Test - public void testRecoveryV2() throws Exception { - testRecoveryInternal(2, 2); - } - - @Test - public void testRecoveryUpgradeV1V2() throws Exception { - testRecoveryInternal(1, 2); - } - private void validateContent(Path dir) throws IOException { File fdir = new File(dir.toUri().getPath()); File expectedFile = new File(fdir, partFile); @@ -215,12 +205,6 @@ public void testCommitterWithFailureV1() throws Exception { testCommitterWithFailureInternal(1, 2); } - @Test - public void testCommitterWithFailureV2() throws Exception { - testCommitterWithFailureInternal(2, 1); - testCommitterWithFailureInternal(2, 2); - } - private void testCommitterWithFailureInternal(int version, int maxAttempts) throws Exception { JobConf conf = new JobConf(); FileOutputFormat.setOutputPath(conf, outDir); @@ -269,11 +253,6 @@ public void testCommitterWithDuplicatedCommitV1() throws Exception { testCommitterWithDuplicatedCommitInternal(1); } - @Test - public void testCommitterWithDuplicatedCommitV2() throws Exception { - testCommitterWithDuplicatedCommitInternal(2); - } - private void testCommitterWithDuplicatedCommitInternal(int version) throws Exception { JobConf conf = new JobConf(); @@ -355,11 +334,6 @@ public void testCommitterV1() throws Exception { testCommitterInternal(1); } - @Test - public void testCommitterV2() throws Exception { - testCommitterInternal(2); - } - private void testMapFileOutputCommitterInternal(int version) throws Exception { JobConf conf = new JobConf(); @@ -397,21 +371,11 @@ public void testMapFileOutputCommitterV1() throws Exception { testMapFileOutputCommitterInternal(1); } - @Test - public void testMapFileOutputCommitterV2() throws Exception { - testMapFileOutputCommitterInternal(2); - } - @Test public void testMapOnlyNoOutputV1() throws Exception { testMapOnlyNoOutputInternal(1); } - @Test - public void testMapOnlyNoOutputV2() throws Exception { - testMapOnlyNoOutputInternal(2); - } - private void testMapOnlyNoOutputInternal(int version) throws Exception { JobConf conf = new JobConf(); //This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir); @@ -477,11 +441,6 @@ public void testAbortV1() throws Exception { testAbortInternal(1); } - @Test - public void testAbortV2() throws Exception { - testAbortInternal(2); - } - public static class FakeFileSystem extends RawLocalFileSystem { public FakeFileSystem() { super(); @@ -560,10 +519,6 @@ public void testFailAbortV1() throws Exception { testFailAbortInternal(1); } - @Test - public void testFailAbortV2() throws Exception { - testFailAbortInternal(2); - } public static String slurp(File f) throws IOException { int len = (int) f.length(); byte[] buf = new byte[len]; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index 526485df93490..134823b6be671 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -209,16 +209,6 @@ public void testRecoveryV1() throws Exception { testRecoveryInternal(1, 1); } - @Test - public void testRecoveryV2() throws Exception { - testRecoveryInternal(2, 2); - } - - @Test - public void testRecoveryUpgradeV1V2() throws Exception { - testRecoveryInternal(1, 2); - } - private void validateContent(Path dir) throws IOException { validateContent(new File(dir.toUri().getPath())); } @@ -270,9 +260,6 @@ private void testCommitterInternal(int version, boolean taskCleanup) conf.setInt( FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); - conf.setBoolean( - FileOutputCommitter.FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED, - taskCleanup); JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); @@ -321,26 +308,11 @@ public void testCommitterV1() throws Exception { testCommitterInternal(1, false); } - @Test - public void testCommitterV2() throws Exception { - testCommitterInternal(2, false); - } - - @Test - public void testCommitterV2TaskCleanupEnabled() throws Exception { - testCommitterInternal(2, true); - } - @Test public void testCommitterWithDuplicatedCommitV1() throws Exception { testCommitterWithDuplicatedCommitInternal(1); } - @Test - public void testCommitterWithDuplicatedCommitV2() throws Exception { - testCommitterWithDuplicatedCommitInternal(2); - } - private void testCommitterWithDuplicatedCommitInternal(int version) throws Exception { Job job = Job.getInstance(); @@ -389,12 +361,6 @@ public void testCommitterWithFailureV1() throws Exception { testCommitterWithFailureInternal(1, 2); } - @Test - public void testCommitterWithFailureV2() throws Exception { - testCommitterWithFailureInternal(2, 1); - testCommitterWithFailureInternal(2, 2); - } - private void testCommitterWithFailureInternal(int version, int maxAttempts) throws Exception { Job job = Job.getInstance(); @@ -473,11 +439,6 @@ public void testCommitterRepeatableV1() throws Exception { testCommitterRetryInternal(1); } - @Test - public void testCommitterRepeatableV2() throws Exception { - testCommitterRetryInternal(2); - } - // retry committer for 2 times. private void testCommitterRetryInternal(int version) throws Exception { @@ -577,11 +538,6 @@ public void testMapFileOutputCommitterV1() throws Exception { testMapFileOutputCommitterInternal(1); } - @Test - public void testMapFileOutputCommitterV2() throws Exception { - testMapFileOutputCommitterInternal(2); - } - @Test public void testInvalidVersionNumber() throws IOException { Job job = Job.getInstance(); @@ -639,11 +595,6 @@ public void testAbortV1() throws IOException, InterruptedException { testAbortInternal(1); } - @Test - public void testAbortV2() throws IOException, InterruptedException { - testAbortInternal(2); - } - public static class FakeFileSystem extends RawLocalFileSystem { public FakeFileSystem() { super(); @@ -720,11 +671,6 @@ public void testFailAbortV1() throws Exception { testFailAbortInternal(1); } - @Test - public void testFailAbortV2() throws Exception { - testFailAbortInternal(2); - } - static class RLFS extends RawLocalFileSystem { private final ThreadLocal needNull = new ThreadLocal() { @Override @@ -823,11 +769,6 @@ public void testConcurrentCommitTaskWithSubDirV1() throws Exception { testConcurrentCommitTaskWithSubDir(1); } - @Test - public void testConcurrentCommitTaskWithSubDirV2() throws Exception { - testConcurrentCommitTaskWithSubDir(2); - } - public static String slurp(File f) throws IOException { int len = (int) f.length(); byte[] buf = new byte[len]; From 25661187a69067cead60c7d0e766d927bbc1dfe7 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 23 Sep 2020 19:34:42 +0100 Subject: [PATCH 3/3] MAPREDUCE-7282. checkstyle Change-Id: I6790e6e370526a25ddae159097fd14f548b1bf77 --- .../hadoop/mapreduce/lib/output/FileOutputCommitter.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index c15646a3875e8..2f1fc0ba61853 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -589,10 +589,9 @@ public void commitTask(TaskAttemptContext context, Path taskAttemptPath) if (taskAttemptDirStatus != null) { Path committedTaskPath = getCommittedTaskPath(context); - if (fs.exists(committedTaskPath)) { - if (!fs.delete(committedTaskPath, true)) { - throw new IOException("Could not delete " + committedTaskPath); - } + if (fs.exists(committedTaskPath) + && !fs.delete(committedTaskPath, true)) { + throw new IOException("Could not delete " + committedTaskPath); } if (!fs.rename(taskAttemptPath, committedTaskPath)) { throw new IOException("Could not rename " + taskAttemptPath + " to " @@ -698,7 +697,7 @@ public void recoverTask(TaskAttemptContext context) " to " + committedTaskPath); } } else { - LOG.warn(attemptId+" had no output to recover."); + LOG.warn(attemptId + " had no output to recover."); } } else { LOG.warn("Output Path is null in recoverTask()");