Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class FileOutputCommitter extends PathOutputCommitter {
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
"mapreduce.fileoutputcommitter.algorithm.version";
public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 2;
public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
// Skip cleanup _temporary folders under job's output directory
public static final String FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED =
"mapreduce.fileoutputcommitter.cleanup.skipped";
Expand All @@ -93,15 +93,19 @@ public class FileOutputCommitter extends PathOutputCommitter {
// commitJob will recursively delete the entire job temporary directory.
// HDFS has O(1) recursive delete, so this parameter is left false by default.
// Users of object stores, for example, may want to set this to true. Note:
// this is only used if mapreduce.fileoutputcommitter.algorithm.version=2
// this was only used if mapreduce.fileoutputcommitter.algorithm.version=2,
// and so is now unused
@Deprecated
public static final String FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED =
"mapreduce.fileoutputcommitter.task.cleanup.enabled";

@Deprecated
public static final boolean
FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false;

private Path outputPath = null;
private Path workPath = null;
private final int algorithmVersion;
//private final int algorithmVersion;
private final boolean skipCleanup;
private final boolean ignoreCleanupFailures;

Expand Down Expand Up @@ -135,13 +139,18 @@ public FileOutputCommitter(Path outputPath,
JobContext context) throws IOException {
super(outputPath, context);
Configuration conf = context.getConfiguration();
algorithmVersion =
int algorithmVersion =
conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
if (algorithmVersion != 1 && algorithmVersion != 2) {
throw new IOException("Only 1 or 2 algorithm version is supported");
}
// Downgrade v2 to v1 with a warning.
if (algorithmVersion == 2) {
LOG.warn("The v2 algorithm has been removed to ensure the output of jobs"
+ " are always correct");
}

// if skip cleanup
skipCleanup = conf.getBoolean(
Expand Down Expand Up @@ -399,10 +408,8 @@ protected void commitJobInternal(JobContext context) throws IOException {
Path finalOutput = getOutputPath();
FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());

if (algorithmVersion == 1) {
for (FileStatus stat: getAllCommittedTaskPaths(context)) {
mergePaths(fs, stat, finalOutput, context);
}
for (FileStatus stat: getAllCommittedTaskPaths(context)) {
mergePaths(fs, stat, finalOutput, context);
}

if (skipCleanup) {
Expand Down Expand Up @@ -503,16 +510,9 @@ private void reportProgress(JobContext context) {

private void renameOrMerge(FileSystem fs, FileStatus from, Path to,
JobContext context) throws IOException {
if (algorithmVersion == 1) {
if (!fs.rename(from.getPath(), to)) {
throw new IOException("Failed to rename " + from + " to " + to);
}
} else {
fs.mkdirs(to);
for (FileStatus subFrom : fs.listStatus(from.getPath())) {
Path subTo = new Path(to, subFrom.getPath().getName());
mergePaths(fs, subFrom, subTo, context);
}
// this always does rename now that the V2 algorithm has been removed.
if (!fs.rename(from.getPath(), to)) {
throw new IOException("Failed to rename " + from + " to " + to);
}
}

Expand Down Expand Up @@ -588,36 +588,17 @@ public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
}

if (taskAttemptDirStatus != null) {
if (algorithmVersion == 1) {
Path committedTaskPath = getCommittedTaskPath(context);
if (fs.exists(committedTaskPath)) {
if (!fs.delete(committedTaskPath, true)) {
throw new IOException("Could not delete " + committedTaskPath);
}
}
if (!fs.rename(taskAttemptPath, committedTaskPath)) {
throw new IOException("Could not rename " + taskAttemptPath + " to "
+ committedTaskPath);
}
LOG.info("Saved output of task '" + attemptId + "' to " +
committedTaskPath);
} else {
// directly merge everything from taskAttemptPath to output directory
mergePaths(fs, taskAttemptDirStatus, outputPath, context);
LOG.info("Saved output of task '" + attemptId + "' to " +
outputPath);

if (context.getConfiguration().getBoolean(
FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED,
FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT)) {
LOG.debug(String.format(
"Deleting the temporary directory of '%s': '%s'",
attemptId, taskAttemptPath));
if(!fs.delete(taskAttemptPath, true)) {
LOG.warn("Could not delete " + taskAttemptPath);
}
}
Path committedTaskPath = getCommittedTaskPath(context);
if (fs.exists(committedTaskPath)
&& !fs.delete(committedTaskPath, true)) {
throw new IOException("Could not delete " + committedTaskPath);
}
if (!fs.rename(taskAttemptPath, committedTaskPath)) {
throw new IOException("Could not rename " + taskAttemptPath + " to "
+ committedTaskPath);
}
LOG.info("Saved output of task '" + attemptId + "' to " +
committedTaskPath);
} else {
LOG.warn("No Output found for " + attemptId);
}
Expand Down Expand Up @@ -682,7 +663,7 @@ public boolean isRecoverySupported() {

@Override
public boolean isCommitJobRepeatable(JobContext context) throws IOException {
return algorithmVersion == 2;
return false;
}

@Override
Expand All @@ -702,35 +683,21 @@ public void recoverTask(TaskAttemptContext context)
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
}
if (algorithmVersion == 1) {
if (fs.exists(previousCommittedTaskPath)) {
Path committedTaskPath = getCommittedTaskPath(context);
if (!fs.delete(committedTaskPath, true) &&
fs.exists(committedTaskPath)) {
throw new IOException("Could not delete " + committedTaskPath);
}
//Rename can fail if the parent directory does not yet exist.
Path committedParent = committedTaskPath.getParent();
fs.mkdirs(committedParent);
if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
throw new IOException("Could not rename " + previousCommittedTaskPath +
" to " + committedTaskPath);
}
} else {
LOG.warn(attemptId+" had no output to recover.");
if (fs.exists(previousCommittedTaskPath)) {
Path committedTaskPath = getCommittedTaskPath(context);
if (!fs.delete(committedTaskPath, true) &&
fs.exists(committedTaskPath)) {
throw new IOException("Could not delete " + committedTaskPath);
}
} else {
// essentially a no-op, but for backwards compatibility
// after upgrade to the new fileOutputCommitter,
// check if there are any output left in committedTaskPath
try {
FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
LOG.info("Recovering task for upgrading scenario, moving files from "
+ previousCommittedTaskPath + " to " + outputPath);
mergePaths(fs, from, outputPath, context);
} catch (FileNotFoundException ignored) {
//Rename can fail if the parent directory does not yet exist.
Path committedParent = committedTaskPath.getParent();
fs.mkdirs(committedParent);
if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
throw new IOException("Could not rename " + previousCommittedTaskPath +
" to " + committedTaskPath);
}
LOG.info("Done recovering task " + attemptId);
} else {
LOG.warn(attemptId + " had no output to recover.");
}
} else {
LOG.warn("Output Path is null in recoverTask()");
Expand All @@ -744,7 +711,6 @@ public String toString() {
sb.append(super.toString()).append("; ");
sb.append("outputPath=").append(outputPath);
sb.append(", workPath=").append(workPath);
sb.append(", algorithmVersion=").append(algorithmVersion);
sb.append(", skipCleanup=").append(skipCleanup);
sb.append(", ignoreCleanupFailures=").append(ignoreCleanupFailures);
sb.append('}');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1562,10 +1562,18 @@

<property>
<name>mapreduce.fileoutputcommitter.algorithm.version</name>
<value>2</value>
<description>The file output committer algorithm version
valid algorithm version number: 1 or 2
default to 2, which is the original algorithm
<value>1</value>
<description>The file output committer algorithm version.

There have been two algorithm versions in Hadoop, "1" and "2".

The version 2 algorithm has been disabled as task commits
were not atomic. If the first task attempt failed partway
through the task commit, the output directory may end up
with data from that failed commit, as well as the data
from any subsequent attempts.

See https://issues.apache.org/jira/browse/MAPREDUCE-7282

In algorithm version 1,

Expand All @@ -1585,32 +1593,58 @@
$joboutput/, then it will delete $joboutput/_temporary/
and write $joboutput/_SUCCESS

It has a performance regression, which is discussed in MAPREDUCE-4815.
It has a performance limitation, which is discussed in MAPREDUCE-4815.

If a job generates many files to commit then the commitJob
method call at the end of the job can take minutes.
the commit is single-threaded and waits until all
tasks have completed before commencing.

algorithm version 2 will change the behavior of commitTask,
The algorithm version 2 changed the behavior of commitTask,
recoverTask, and commitJob.

1. commitTask will rename all files in
1. commitTask renamed all files in
$joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/
to $joboutput/

2. recoverTask actually doesn't require to do anything, but for
2. recoverTask didn't do anything, but for
upgrade from version 1 to version 2 case, it will check if there
are any files in
$joboutput/_temporary/($appAttemptID - 1)/$taskID/
and rename them to $joboutput/

3. commitJob can simply delete $joboutput/_temporary and write
3. commitJob deleted $joboutput/_temporary and wrotew
$joboutput/_SUCCESS

This algorithm will reduce the output commit time for
large jobs by having the tasks commit directly to the final
output directory as they were completing and commitJob had
very little to do.
This algorithm reduced the output commit time for large jobs
as the work of renaming files to their destination took place
incrementally as tasks committed. However, it has a key flaw

Task Attempt Commit is not atomic. If a task attempt ID 1 failed
partway through the rename, a second task attempt would be
scheduled.

- If task attempts 1 and 2 generated files with different names,
then those files from task 1 with different names which were
already renamed into the destination, would still be present.
They would not be overwritten by the second attempt.

- If task attempt 1 was still executing -even though the job
driver considered it to have failed- then a race condition
could arise where the output contained a mix of both task
attempts.

The applications which use these committers, including MapReduce,
and Apache Spark expect task attempts to be atomic -this
commit algorithm is not compatible.

Note:
* the Apache Hadoop S3A commit algorithms do have atomic task
commit and are safe.
* The Amazon "EMRFS S3-optimized Committer" completes its
multipart uploads in task commit, so has the same
limitation as for algorithm v2.
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-committer-multipart.html
</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,6 @@ public void testRecoveryV1() throws Exception {
testRecoveryInternal(1, 1);
}

@Test
public void testRecoveryV2() throws Exception {
testRecoveryInternal(2, 2);
}

@Test
public void testRecoveryUpgradeV1V2() throws Exception {
testRecoveryInternal(1, 2);
}

private void validateContent(Path dir) throws IOException {
File fdir = new File(dir.toUri().getPath());
File expectedFile = new File(fdir, partFile);
Expand Down Expand Up @@ -215,12 +205,6 @@ public void testCommitterWithFailureV1() throws Exception {
testCommitterWithFailureInternal(1, 2);
}

@Test
public void testCommitterWithFailureV2() throws Exception {
testCommitterWithFailureInternal(2, 1);
testCommitterWithFailureInternal(2, 2);
}

private void testCommitterWithFailureInternal(int version, int maxAttempts) throws Exception {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
Expand Down Expand Up @@ -269,11 +253,6 @@ public void testCommitterWithDuplicatedCommitV1() throws Exception {
testCommitterWithDuplicatedCommitInternal(1);
}

@Test
public void testCommitterWithDuplicatedCommitV2() throws Exception {
testCommitterWithDuplicatedCommitInternal(2);
}

private void testCommitterWithDuplicatedCommitInternal(int version) throws
Exception {
JobConf conf = new JobConf();
Expand Down Expand Up @@ -355,11 +334,6 @@ public void testCommitterV1() throws Exception {
testCommitterInternal(1);
}

@Test
public void testCommitterV2() throws Exception {
testCommitterInternal(2);
}

private void testMapFileOutputCommitterInternal(int version)
throws Exception {
JobConf conf = new JobConf();
Expand Down Expand Up @@ -397,21 +371,11 @@ public void testMapFileOutputCommitterV1() throws Exception {
testMapFileOutputCommitterInternal(1);
}

@Test
public void testMapFileOutputCommitterV2() throws Exception {
testMapFileOutputCommitterInternal(2);
}

@Test
public void testMapOnlyNoOutputV1() throws Exception {
testMapOnlyNoOutputInternal(1);
}

@Test
public void testMapOnlyNoOutputV2() throws Exception {
testMapOnlyNoOutputInternal(2);
}

private void testMapOnlyNoOutputInternal(int version) throws Exception {
JobConf conf = new JobConf();
//This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir);
Expand Down Expand Up @@ -477,11 +441,6 @@ public void testAbortV1() throws Exception {
testAbortInternal(1);
}

@Test
public void testAbortV2() throws Exception {
testAbortInternal(2);
}

public static class FakeFileSystem extends RawLocalFileSystem {
public FakeFileSystem() {
super();
Expand Down Expand Up @@ -560,10 +519,6 @@ public void testFailAbortV1() throws Exception {
testFailAbortInternal(1);
}

@Test
public void testFailAbortV2() throws Exception {
testFailAbortInternal(2);
}
public static String slurp(File f) throws IOException {
int len = (int) f.length();
byte[] buf = new byte[len];
Expand Down
Loading