Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1747,7 +1747,7 @@

<property>
<name>fs.s3a.committer.staging.conflict-mode</name>
<value>fail</value>
<value>append</value>
<description>
Staging committer conflict resolution policy.
Supported: "fail", "append", "replace".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private CommitConstants() {
public static final String CONFLICT_MODE_REPLACE = "replace";

/** Default conflict mode: {@value}. */
public static final String DEFAULT_CONFLICT_MODE = CONFLICT_MODE_FAIL;
public static final String DEFAULT_CONFLICT_MODE = CONFLICT_MODE_APPEND;

/**
* Number of threads in committers for parallel operations on files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

package org.apache.hadoop.fs.s3a.commit.staging;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand Down Expand Up @@ -65,11 +69,31 @@ public void setupJob(JobContext context) throws IOException {
super.setupJob(context);
Path outputPath = getOutputPath();
FileSystem fs = getDestFS();
if (getConflictResolutionMode(context, fs.getConf())
== ConflictResolution.FAIL
&& fs.exists(outputPath)) {
throw failDestinationExists(outputPath,
"Setting job as " + getRole());
ConflictResolution conflictResolution = getConflictResolutionMode(
context, fs.getConf());
LOG.info("Conflict Resolution mode is {}", conflictResolution);
try {
final FileStatus status = fs.getFileStatus(outputPath);

// if it is not a directory, fail fast for all conflict options.
if (!status.isDirectory()) {
throw new PathExistsException(outputPath.toString(),
"output path is not a directory: "
+ InternalCommitterConstants.E_DEST_EXISTS);
}
switch(conflictResolution) {
case FAIL:
throw failDestinationExists(outputPath,
"Setting job as " + getRole());
case APPEND:
case REPLACE:
LOG.debug("Destination directory exists; conflict policy permits this");
}
} catch (FileNotFoundException ignored) {
// there is no destination path, hence, no conflict.
// make the parent directory, which also triggers a recursive directory
// creation operation
fs.mkdirs(outputPath);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ public final ConflictResolution getConflictResolutionMode(
Configuration fsConf) {
if (conflictResolution == null) {
this.conflictResolution = ConflictResolution.valueOf(
getConfictModeOption(context, fsConf));
getConfictModeOption(context, fsConf, DEFAULT_CONFLICT_MODE));
}
return conflictResolution;
}
Expand Down Expand Up @@ -889,14 +889,15 @@ protected PathExistsException failDestinationExists(final Path path,
* Get the conflict mode option string.
* @param context context with the config
* @param fsConf filesystem config
* @param defVal default value.
* @return the trimmed configuration option, upper case.
*/
public static String getConfictModeOption(JobContext context,
Configuration fsConf) {
Configuration fsConf, String defVal) {
return getConfigurationOption(context,
fsConf,
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE,
DEFAULT_CONFLICT_MODE).toUpperCase(Locale.ENGLISH);
defVal).toUpperCase(Locale.ENGLISH);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ This then is the problem which the S3A committers address:
*How to safely and reliably commit work to Amazon S3 or compatible object store*


## Meet the S3A Commmitters
## Meet the S3A Committers

Since Hadoop 3.1, the S3A FileSystem has been accompanied by classes
designed to integrate with the Hadoop and Spark job commit protocols, classes
Expand Down Expand Up @@ -226,8 +226,8 @@ it is committed through the standard "v1" commit algorithm.
When the Job is committed, the Job Manager reads the lists of pending writes from its
HDFS Job destination directory and completes those uploads.

Cancelling a task is straightforward: the local directory is deleted with
its staged data. Cancelling a job is achieved by reading in the lists of
Canceling a task is straightforward: the local directory is deleted with
its staged data. Canceling a job is achieved by reading in the lists of
pending writes from the HDFS job attempt directory, and aborting those
uploads. For extra safety, all outstanding multipart writes to the destination directory
are aborted.
Expand Down Expand Up @@ -537,9 +537,8 @@ Conflict management is left to the execution engine itself.
|--------|-------|-----------|-------------|---------|---------|
| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | X | X | X | Write a `_SUCCESS` file at the end of each job | `true` |
| `fs.s3a.committer.threads` | X | X | X | Number of threads in committers for parallel operations on files. | 8 |
| `fs.s3a.committer.staging.conflict-mode` | | X | X | Conflict resolution: `fail`, `abort` or `overwrite`| `fail` |
| `fs.s3a.committer.staging.conflict-mode` | | X | X | Conflict resolution: `fail`, `append` or `replace`| `append` |
| `fs.s3a.committer.staging.unique-filenames` | | X | X | Generate unique filenames | `true` |

| `fs.s3a.committer.magic.enabled` | X | | | Enable "magic committer" support in the filesystem | `false` |


Expand Down Expand Up @@ -607,7 +606,7 @@ Conflict management is left to the execution engine itself.

<property>
<name>fs.s3a.committer.staging.conflict-mode</name>
<value>fail</value>
<value>append</value>
<description>
Staging committer conflict resolution policy.
Supported: "fail", "append", "replace".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.util.Progressable;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Relays FS calls to the mocked FS, allows for some extra logging with
* stack traces to be included, stubbing out other methods
Expand Down Expand Up @@ -240,6 +242,12 @@ public void setWorkingDirectory(Path newDir) {
mock.setWorkingDirectory(newDir);
}

@Override
public boolean mkdirs(Path f) throws IOException {
event("mkdirs(%s)", f);
return mock.mkdirs(f);
}

@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
event("mkdirs(%s)", f);
Expand All @@ -249,7 +257,8 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
@Override
public FileStatus getFileStatus(Path f) throws IOException {
event("getFileStatus(%s)", f);
return mock.getFileStatus(f);
return checkNotNull(mock.getFileStatus(f),
"Mock getFileStatus(%s) returned null", f);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

package org.apache.hadoop.fs.s3a.commit;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -53,7 +51,6 @@
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;

/**
Expand Down Expand Up @@ -109,6 +106,13 @@ protected Path path(String filepath) throws IOException {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
String bucketName = getTestBucketName(conf);
removeBucketOverrides(bucketName, conf,
MAGIC_COMMITTER_ENABLED,
S3A_COMMITTER_FACTORY_KEY,
FS_S3A_COMMITTER_NAME,
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);

conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
Expand Down Expand Up @@ -355,25 +359,7 @@ private String pathToPrefix(Path path) {
* @throws IOException IO Failure
*/
protected SuccessData verifySuccessMarker(Path dir) throws IOException {
assertPathExists("Success marker",
new Path(dir, _SUCCESS));
SuccessData successData = loadSuccessMarker(dir);
log().info("Success data {}", successData.toString());
log().info("Metrics\n{}",
successData.dumpMetrics(" ", " = ", "\n"));
log().info("Diagnostics\n{}",
successData.dumpDiagnostics(" ", " = ", "\n"));
return successData;
}

/**
* Load the success marker and return the data inside it.
* @param dir directory containing the marker
* @return the loaded data
* @throws IOException on any failure to load or validate the data
*/
protected SuccessData loadSuccessMarker(Path dir) throws IOException {
return SuccessData.load(getFileSystem(), new Path(dir, _SUCCESS));
return validateSuccessFile(dir, "", getFileSystem(), "query");
}

/**
Expand Down Expand Up @@ -447,58 +433,59 @@ public static TaskAttemptContext taskAttemptForJob(JobId jobId,
/**
* Load in the success data marker: this guarantees that an S3A
* committer was used.
* @param fs filesystem
* @param outputPath path of job
* @param committerName name of committer to match
* @param committerName name of committer to match, or ""
* @param fs filesystem
* @param origin origin (e.g. "teragen" for messages)
* @return the success data
* @throws IOException IO failure
*/
public static SuccessData validateSuccessFile(final S3AFileSystem fs,
final Path outputPath, final String committerName) throws IOException {
SuccessData successData = null;
try {
successData = loadSuccessFile(fs, outputPath);
} catch (FileNotFoundException e) {
// either the output path is missing or, if its the success file,
// somehow the relevant committer wasn't picked up.
String dest = outputPath.toString();
LOG.error("No _SUCCESS file found under {}", dest);
List<String> files = new ArrayList<>();
applyLocatedFiles(fs.listFiles(outputPath, true),
(status) -> {
files.add(status.getPath().toString());
LOG.error("{} {}", status.getPath(), status.getLen());
});
throw new AssertionError("No _SUCCESS file in " + dest
+ "; found : " + files.stream().collect(Collectors.joining("\n")),
e);
}
public static SuccessData validateSuccessFile(final Path outputPath,
final String committerName,
final S3AFileSystem fs,
final String origin) throws IOException {
SuccessData successData = loadSuccessFile(fs, outputPath, origin);
String commitDetails = successData.toString();
LOG.info("Committer name " + committerName + "\n{}",
commitDetails);
LOG.info("Committer statistics: \n{}",
successData.dumpMetrics(" ", " = ", "\n"));
LOG.info("Diagnostics\n{}",
successData.dumpDiagnostics(" ", " = ", "\n"));
assertEquals("Wrong committer in " + commitDetails,
committerName, successData.getCommitter());
if (!committerName.isEmpty()) {
assertEquals("Wrong committer in " + commitDetails,
committerName, successData.getCommitter());
}
return successData;
}

/**
* Load a success file; fail if the file is empty/nonexistent.
* @param fs filesystem
* @param outputPath directory containing the success file.
* @param origin origin of the file
* @return the loaded file.
* @throws IOException failure to find/load the file
* @throws AssertionError file is 0-bytes long
* @throws AssertionError file is 0-bytes long,
*/
public static SuccessData loadSuccessFile(final S3AFileSystem fs,
final Path outputPath) throws IOException {
final Path outputPath, final String origin) throws IOException {
ContractTestUtils.assertPathExists(fs,
"Output directory " + outputPath
+ " from " + origin
+ " not found: Job may not have executed",
outputPath);
Path success = new Path(outputPath, _SUCCESS);
ContractTestUtils.assertIsFile(fs, success);
FileStatus status = fs.getFileStatus(success);
assertTrue("0 byte success file - not a s3guard committer " + success,
FileStatus status = ContractTestUtils.verifyPathExists(fs,
"job completion marker " + success
+ " from " + origin
+ " not found: Job may have failed",
success);
assertTrue("_SUCCESS outout from " + origin + " is not a file " + status,
status.isFile());
assertTrue("0 byte success file "
+ success + " from " + origin
+ "; a s3guard committer was not used",
status.getLen() > 0);
return SuccessData.load(fs, success);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ public void testMRJob() throws Exception {
}
Collections.sort(actualFiles);

SuccessData successData = validateSuccessFile(fs, outputPath,
committerName());
SuccessData successData = validateSuccessFile(outputPath, committerName(),
fs, "MR job");
List<String> successFiles = successData.getFilenames();
String commitData = successData.toString();
assertTrue("No filenames in " + commitData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a.commit.staging;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
Expand Down Expand Up @@ -55,6 +56,7 @@

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -196,9 +198,34 @@ public static void pathExists(FileSystem mockS3, Path path)
when(mockS3.exists(path)).thenReturn(true);
}

public static void pathIsDirectory(FileSystem mockS3, Path path)
throws IOException {
hasFileStatus(mockS3, path,
new FileStatus(0, true, 0, 0, 0, path));
}

public static void pathIsFile(FileSystem mockS3, Path path)
throws IOException {
pathExists(mockS3, path);
hasFileStatus(mockS3, path,
new FileStatus(0, false, 0, 0, 0, path));
}

public static void pathDoesNotExist(FileSystem mockS3, Path path)
throws IOException {
when(mockS3.exists(path)).thenReturn(false);
when(mockS3.getFileStatus(path)).thenThrow(
new FileNotFoundException("mock fnfe of " + path));
}

public static void hasFileStatus(FileSystem mockS3,
Path path, FileStatus status) throws IOException {
when(mockS3.getFileStatus(path)).thenReturn(status);
}

public static void mkdirsHasOutcome(FileSystem mockS3,
Path path, boolean outcome) throws IOException {
when(mockS3.mkdirs(path)).thenReturn(outcome);
}

public static void canDelete(FileSystem mockS3, String... children)
Expand All @@ -221,7 +248,12 @@ public static void verifyExistenceChecked(FileSystem mockS3, String child)

public static void verifyExistenceChecked(FileSystem mockS3, Path path)
throws IOException {
verify(mockS3).exists(path);
verify(mockS3).getFileStatus(path);
}

public static void verifyMkdirsInvoked(FileSystem mockS3, Path path)
throws IOException {
verify(mockS3).mkdirs(path);
}

/**
Expand Down
Loading