Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3654,7 +3654,7 @@ public UserGroupInformation getOwner() {
* directories. Has the semantics of Unix {@code 'mkdir -p'}.
* Existence of the directory hierarchy is not an error.
* Parent elements are scanned to see if any are a file,
* <i>except under __magic</i> paths.
* <i>except under "MAGIC PATH"</i> paths.
* There the FS assumes that the destination directory creation
* did that scan and that paths in job/task attempts are all
* "well formed"
Expand Down Expand Up @@ -4735,7 +4735,7 @@ public boolean isMagicCommitPath(Path path) {

/**
* Predicate: is a path under a magic commit path?
* True if magic commit is enabled and the path is under __magic,
* True if magic commit is enabled and the path is under "MAGIC PATH",
* irrespective of file type.
* @param path path to examine
* @return true if the path is in a magic dir and the FS has magic writes enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ private CommitConstants() {
* {@value}.
*/
public static final String MAGIC = "__magic";
public static final String JOB_ID_PREFIX = "job-";
public static final String MAGIC_PATH_PREFIX = MAGIC + "_" + JOB_ID_PREFIX;

/**
* Marker of the start of a directory tree for calculating
Expand Down Expand Up @@ -280,10 +282,12 @@ private CommitConstants() {
/**
* Default configuration value for
* {@link #FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS}.
* It is disabled by default to support concurrent writes on the same
* parent directory but different partition/sub directory.
* Value: {@value}.
*/
public static final boolean DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS =
true;
false;

/**
* The limit to the number of committed objects tracked during
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitterFactory;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory;

import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;

/**
* These are internal constants not intended for public use.
Expand Down Expand Up @@ -108,7 +108,7 @@ private InternalCommitterConstants() {

/** Error message for a path without a magic element in the list: {@value}. */
public static final String E_NO_MAGIC_PATH_ELEMENT
= "No " + MAGIC + " element in path";
= "No " + MAGIC_PATH_PREFIX + " element in path";

/**
* The UUID for jobs: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_MAGIC_PATH_ELEMENT;

/**
Expand Down Expand Up @@ -76,7 +78,8 @@ public static List<String> splitPathToElements(Path path) {
* @return true if a path is considered magic
*/
public static boolean isMagicPath(List<String> elements) {
return elements.contains(MAGIC);
return elements.stream()
.anyMatch(element -> element.startsWith(MAGIC_PATH_PREFIX));
}

/**
Expand All @@ -96,9 +99,16 @@ public static boolean containsBasePath(List<String> elements) {
* @throws IllegalArgumentException if there is no magic element
*/
public static int magicElementIndex(List<String> elements) {
int index = elements.indexOf(MAGIC);
checkArgument(index >= 0, E_NO_MAGIC_PATH_ELEMENT);
return index;
Optional<Integer> index = IntStream.range(0, elements.size())
.filter(i -> elements.get(i).startsWith(MAGIC_PATH_PREFIX))
.boxed()
.findFirst();

if (index.isPresent()) {
return index.get();
} else {
throw new IllegalArgumentException(E_NO_MAGIC_PATH_ELEMENT);
}
}

/**
Expand Down Expand Up @@ -182,18 +192,9 @@ public static String lastElement(List<String> strings) {
return strings.get(strings.size() - 1);
}

/**
* Get the magic subdirectory of a destination directory.
* @param destDir the destination directory
* @return a new path.
*/
public static Path magicSubdir(Path destDir) {
return new Path(destDir, MAGIC);
}

/**
* Calculates the final destination of a file.
* This is the parent of any {@code __magic} element, and the filename
* This is the parent of any "MAGIC PATH" element, and the filename
* of the path. That is: all intermediate child path elements are discarded.
* Why so? paths under the magic path include job attempt and task attempt
* subdirectories, which need to be skipped.
Expand All @@ -208,8 +209,8 @@ public static List<String> finalDestination(List<String> elements) {
if (isMagicPath(elements)) {
List<String> destDir = magicPathParents(elements);
List<String> children = magicPathChildren(elements);
checkArgument(!children.isEmpty(), "No path found under " +
MAGIC);
checkArgument(!children.isEmpty(), "No path found under the prefix " +
MAGIC_PATH_PREFIX);
ArrayList<String> dest = new ArrayList<>(destDir);
if (containsBasePath(children)) {
// there's a base marker in the path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Preconditions;

import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.JOB_ID_PREFIX;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA;

/**
Expand All @@ -49,10 +51,13 @@ private CommitUtilsWithMR() {
/**
* Get the location of magic job attempts.
* @param out the base output directory.
* @param jobUUID unique Job ID.
* @return the location of magic job attempts.
*/
public static Path getMagicJobAttemptsPath(Path out) {
return new Path(out, MAGIC);
public static Path getMagicJobAttemptsPath(Path out, String jobUUID) {
Preconditions.checkArgument(jobUUID != null && !(jobUUID.isEmpty()),
"Invalid job ID: %s", jobUUID);
return new Path(out, MAGIC_PATH_PREFIX + jobUUID);
}

/**
Expand All @@ -76,7 +81,7 @@ public static Path getMagicJobAttemptPath(String jobUUID,
int appAttemptId,
Path dest) {
return new Path(
getMagicJobAttemptsPath(dest),
getMagicJobAttemptsPath(dest, jobUUID),
formatAppAttemptDir(jobUUID, appAttemptId));
}

Expand All @@ -88,9 +93,7 @@ public static Path getMagicJobAttemptPath(String jobUUID,
*/
public static Path getMagicJobPath(String jobUUID,
Path dest) {
return new Path(
getMagicJobAttemptsPath(dest),
formatJobDir(jobUUID));
return getMagicJobAttemptsPath(dest, jobUUID);
}

/**
Expand All @@ -102,7 +105,7 @@ public static Path getMagicJobPath(String jobUUID,
*/
public static String formatJobDir(
String jobUUID) {
return String.format("job-%s", jobUUID);
return JOB_ID_PREFIX + jobUUID;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ public void setupJob(JobContext context) throws IOException {
Path jobPath = getJobPath();
final FileSystem destFS = getDestinationFS(jobPath,
context.getConfiguration());
destFS.delete(jobPath, true);
destFS.mkdirs(jobPath);
}
}
Expand All @@ -132,7 +131,7 @@ protected ActiveCommit listPendingUploadsToCommit(
*/
public void cleanupStagingDirs() {
final Path out = getOutputPath();
Path path = magicSubdir(out);
Path path = getMagicJobPath(getUUID(), out);
try(DurationInfo ignored = new DurationInfo(LOG, true,
"Deleting magic directory %s", path)) {
Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1308,12 +1308,12 @@ so returning the special new stream.



This is done with a "magic" temporary directory name, `__magic`, to indicate that all files
This is done with a "MAGIC PATH" (where the filesystem knows to remap paths with prefix `__magic_job-${jobId}`) temporary directory name to indicate that all files
created under this path are not to be completed during the stream write process.
Directories created under the path will still be created —this allows job- and
task-specific directories to be created for individual job and task attempts.

For example, the pattern `__magic/${jobID}/${taskId}` could be used to
For example, the pattern `${MAGIC PATH}/${jobID}/${taskId}` could be used to
store pending commits to the final directory for that specific task. If that
task is committed, all pending commit files stored in that path will be loaded
and used to commit the final uploads.
Expand All @@ -1322,19 +1322,19 @@ Consider a job with the final directory `/results/latest`

The intermediate directory for the task 01 attempt 01 of job `job_400_1` would be

/results/latest/__magic/job_400_1/_task_01_01
/results/latest/__magic_job-400/job_400_1/_task_01_01

This would be returned as the temp directory.

When a client attempted to create the file
`/results/latest/__magic/job_400_1/task_01_01/latest.orc.lzo` , the S3A FS would initiate
`/results/latest/__magic_job-400/job_400_1/task_01_01/latest.orc.lzo` , the S3A FS would initiate
a multipart request with the final destination of `/results/latest/latest.orc.lzo`.

As data was written to the output stream, it would be incrementally uploaded as
individual multipart PUT operations

On `close()`, summary data would be written to the file
`/results/latest/__magic/job400_1/task_01_01/latest.orc.lzo.pending`.
`/results/latest/__magic_job-400/job400_1/task_01_01/latest.orc.lzo.pending`.
This would contain the upload ID and all the parts and etags of uploaded data.

A marker file is also created, so that code which verifies that a newly created file
Expand All @@ -1358,7 +1358,7 @@ to the job attempt.
1. These are merged into to a single `Pendingset` structure.
1. Which is saved to a `.pendingset` file in the job attempt directory.
1. Finally, the task attempt directory is deleted. In the example, this
would be to `/results/latest/__magic/job400_1/task_01_01.pendingset`;
would be to `/results/latest/__magic_job-400/job400_1/task_01_01.pendingset`;


A failure to load any of the single pending upload files (i.e. the file
Expand Down Expand Up @@ -1386,9 +1386,9 @@ file.

To allow tasks to generate data in subdirectories, a special filename `__base`
will be used to provide an extra cue as to the final path. When mapping an output
path `/results/latest/__magic/job_400/task_01_01/__base/2017/2017-01-01.orc.lzo.pending`
path `/results/latest/__magic_job-400/job_400/task_01_01/__base/2017/2017-01-01.orc.lzo.pending`
to a final destination path, the path will become `/results/latest/2017/2017-01-01.orc.lzo`.
That is: all directories between `__magic` and `__base` inclusive will be ignored.
That is: all directories between `__magic_job-400` and `__base` inclusive will be ignored.


**Issues**
Expand Down Expand Up @@ -1479,16 +1479,16 @@ Job drivers themselves may be preempted.

One failure case is that the entire execution framework failed; a new process
must identify outstanding jobs with pending work, and abort them, then delete
the appropriate `__magic` directories.
the appropriate `"MAGIC PATH"` directories.

This can be done either by scanning the directory tree for `__magic` directories
This can be done either by scanning the directory tree for `"MAGIC PATH"` directories
and scanning underneath them, or by using the `listMultipartUploads()` call to
list multipart uploads under a path, then cancel them. The most efficient solution
may be to use `listMultipartUploads` to identify all outstanding request, and use that
to identify which requests to cancel, and where to scan for `__magic` directories.
to identify which requests to cancel, and where to scan for `"MAGIC PATH"` directories.
This strategy should address scalability problems when working with repositories
with many millions of objects —rather than list all keys searching for those
with `/__magic/**/*.pending` in their name, work backwards from the active uploads to
with `/${MAGIC PATH}/**/*.pending` in their name, work backwards from the active uploads to
the directories with the data.

We may also want to consider having a cleanup operation in the S3 CLI to
Expand Down Expand Up @@ -1569,11 +1569,11 @@ a directory, then it is not going to work: the existing data will not be cleaned
up. A cleanup operation would need to be included in the job commit, deleting
all files in the destination directory which where not being overwritten.

1. It requires a path element, such as `__magic` which cannot be used
1. It requires a path element, such as `"MAGIC PATH"` which cannot be used
for any purpose other than for the storage of pending commit data.

1. Unless extra code is added to every FS operation, it will still be possible
to manipulate files under the `__magic` tree. That's not bad, just potentially
to manipulate files under the `"MAGIC PATH"` tree. That's not bad, just potentially
confusing.

1. As written data is not materialized until the commit, it will not be possible
Expand Down Expand Up @@ -1692,9 +1692,9 @@ must be used, which means: the V2 classes.
#### Resolved issues


**Magic Committer: Name of directory**
**Magic Committer: Directory Naming**

The design proposes the name `__magic` for the directory. HDFS and
The design proposes `__magic_job-` as the prefix for the magic paths of different jobs for the directory. HDFS and
the various scanning routines always treat files and directories starting with `_`
as temporary/excluded data.

Expand All @@ -1705,14 +1705,14 @@ It is legal to create subdirectories in a task work directory, which
will then be moved into the destination directory, retaining that directory
tree.

That is, a if the task working dir is `dest/__magic/app1/task1/`, all files
under `dest/__magic/app1/task1/part-0000/` must end up under the path
That is, a if the task working dir is `dest/${MAGIC PATH}/app1/task1/`, all files
under `dest/${MAGIC PATH}/app1/task1/part-0000/` must end up under the path
`dest/part-0000/`.

This behavior is relied upon for the writing of intermediate map data in an MR
job.

This means it is not simply enough to strip off all elements of under `__magic`,
This means it is not simply enough to strip off all elements of under ``"MAGIC PATH"``,
it is critical to determine the base path.

Proposed: use the special name `__base` as a marker of the base element for
Expand Down Expand Up @@ -1918,9 +1918,9 @@ bandwidth and the data upload bandwidth.

No use is made of the cluster filesystem; there are no risks there.

A malicious user with write access to the `__magic` directory could manipulate
A malicious user with write access to the ``"MAGIC PATH"`` directory could manipulate
or delete the metadata of pending uploads, or potentially inject new work int
the commit. Having access to the `__magic` directory implies write access
the commit. Having access to the ``"MAGIC PATH"`` directory implies write access
to the parent destination directory: a malicious user could just as easily
manipulate the final output, without needing to attack the committer's intermediate
files.
Expand Down
Loading