- 
        Couldn't load subscription status. 
- Fork 9.1k
HADOOP-18797: Support Concurrent Writes With S3A Magic Committer #6006
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| 💔 -1 overall 
 
 This message was automatically generated. | 
32cff61    to
    d99eb99      
    Compare
  
    | 💔 -1 overall 
 
 This message was automatically generated. | 
| public static Path getMagicJobAttemptsPath(Path out) { | ||
| return new Path(out, MAGIC); | ||
| public static Path getMagicJobAttemptsPath(Path out, String jobId) { | ||
| return new Path(out, MAGIC + "_" + jobId); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using "MAGIC PATH_PREFIX" here, receiving the actual job uuid as a parameter instead of the "job-uuid" formatted version.. this method formatJobDir(..) could be removed I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. It makes sense!
btw formatJobDir() cannot be removed yet, Since public static String formatAppAttemptDir still uses it!
d99eb99    to
    9a370a6      
    Compare
  
    | 🎊 +1 overall 
 
 This message was automatically generated. | 
| @steveloughran Could you please review the changes? | 
| will do tomorrow...sorry, been really busy with the v2 upgrade, #5995, and exploration of what breaks. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented. I'd actually like one of the committer tests to verify that given a job ID it creates a path containing it.
| */ | ||
| public static boolean isMagicPath(List<String> elements) { | ||
| return elements.contains(MAGIC); | ||
| for (String element : elements) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or use some java8 stream thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
| */ | ||
| public static int magicElementIndex(List<String> elements) { | ||
| int index = elements.indexOf(MAGIC); | ||
| int index = 0; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, you could coalesce this with a L114-115 some method to get the index without raising an exception; isMagicPath(elements) calls this and returns true if the index > 0. or you do very fancy java8 Optional stuff...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
| List<String> children = magicPathChildren(elements); | ||
| checkArgument(!children.isEmpty(), "No path found under " + | ||
| MAGIC); | ||
| checkArgument(!children.isEmpty(), "No path found under the prefix" + | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs a space at the end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
| Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(), | ||
| () -> deleteWithWarning(getDestFS(), path, true)); | ||
| // and the job temp directory with manifests | ||
| Invoker.ignoreIOExceptions(LOG, "cleanup job directory", path.toString(), | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
job dir cleanup MUST always happen; make the root path cleanup the optional one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per this change, The metadata (.pending set) of all the pending commits will be stored in /__magic_job-. So there is no common root metadata directory unlike the current case where it is stored in __magic/job-.
This change was introduced to fix https://issues.apache.org/jira/browse/HADOOP-18568, Where jobs were bottlenecked by the time to delete the __magic/job-/ directory and in this case it will be __magic_job-.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to keep the scope of this change confined. I will remove this change which can be handled in the original Jira itself. (If it needs more discussion)
        
          
                ...tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | final SuccessData successData) | ||
| throws Exception { | ||
| Path magicDir = new Path(destPath, MAGIC); | ||
| Path magicDir = new Path(destPath, MAGIC_PATH_PREFIX); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is a job id getting down? it should be, for completeness
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack. make sense!
| 
 ack. | 
9a370a6    to
    92518fc      
    Compare
  
    | @steveloughran - Thanks for the reviews. | 
| 💔 -1 overall 
 
 This message was automatically generated. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So one aspect of this change is that __magic on its own is no longer valid. I think I might still like it as "Magic "even if our own committer do not use it any more. That is in case something is, not that they should except in bizarre test ways. But: if this makes the code too complicated then I'm not over worried. We aren't ever going to get into the condition where I an older version of the committee is trying to write through this version of the file system, because they are all bundled in the same jar.
Now, some bad news: the docs are going to need to be updated as well. I think in the committer architecture doc we should actually add a paragraph on supporting parallel commit to discuss why this was needed and what are the change was. But the various strings there and those in doc comments needs to be changed.
Maybe the thing to do is use "Magic Path" in capitals and define it in the architect a document as "path where the filesystem knows to remap paths". So rather than update the various __magic references in source docs with the new prefix, just change to "Magic Path". Gives us the option of easier maintenance in future, if ever needed (hopefully not).
| Assertions.assertThat(pathStr) | ||
| .describedAs("Magic path") | ||
| .contains(MAGIC); | ||
| .contains(MAGIC_PATH_PREFIX + jobId); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should really look for / at both ends of this string, for strictness, shouldn't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. Ideally we should to avoid misinterpreting the file names or partition names with same prefix
| 
 Ack. | 
92518fc    to
    4981e51      
    Compare
  
    | 🎊 +1 overall 
 
 This message was automatically generated. | 
| @steveloughran , I have addressed your comments. Could you please re-review? | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, only a few minor changes left.
when you submit these changes, don't bother rebasing and force pushing unless changes in trunk have broken the code. it's a lot easier to review the update when the new changes are isolated. thanks
| **Magic Committer: Name of directory** | ||
|  | ||
| The design proposes the name `__magic` for the directory. HDFS and | ||
| The design proposes the name ``"MAGIC PATH"`` for the directory. HDFS and | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this to
proposes the prefix  "__magic_job-" as the prefix for the magic paths of different jobs. 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack.
| #### Resolved issues | ||
|  | ||
|  | ||
| **Magic Committer: Name of directory** | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace with
**Magic Committer: Directory Naming**
to reflect changes in text
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
        
          
                ...p-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitUtilsWithMR.java
          
            Show resolved
            Hide resolved
        
      | 
 +1. It makes sense! | 
| @steveloughran - I have addressed your comments with the lastest commit. Could you please re-review? | 
| 🎊 +1 overall 
 
 This message was automatically generated. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
+1
| merged...can you do a pr (and retest) against branch-3.3. don't expect any reviews of this, it's just to identify and address any merge problems through yetus. thanks | 
| @steveloughran - Sure, Can we close the Jira as well? | 
| lets close once the 3.3 merge is in...stops us forgetting about it | 
| @steveloughran - i have raised a PR against branch-3.3 (#6122) | 
…che#6006) Jobs which commit their work to S3 thr magic committer now use a unique magic containing the job ID: __magic_job-${jobid} This allows for multiple jobs to write to the same destination simultaneously. Contributed by Syed Shameerur Rahman
Jobs which commit their work to S3 through
the magic committer now use a unique magic path
containing the job ID:
 __magic_job-${jobid}
This allows for multiple jobs to write
to the same destination simultaneously.
Contributed by Syed Shameerur Rahman
    …che#6006) Jobs which commit their work to S3 thr magic committer now use a unique magic containing the job ID: __magic_job-${jobid} This allows for multiple jobs to write to the same destination simultaneously. Contributed by Syed Shameerur Rahman
…che#6006) Jobs which commit their work to S3 thr magic committer now use a unique magic containing the job ID: __magic_job-${jobid} This allows for multiple jobs to write to the same destination simultaneously. Contributed by Syed Shameerur Rahman
Description of PR
Currently concurrent writes are not supported by S3A Magic Committer. When the user tries to write to same parent , but to a different partition/sub-directory, The MPU metadata (.pendingset) of slower running jobs might be deleted by the the jobs which completes first.
This happens because, The __magic directory is common across all the jobs and it gets cleanedup after the job completion which might affect the other jobs.
Proposed Changes
Instead of a global magic directory __magic, Each job will have its own magic directory of the format _magic and all the .pendingset are written to that directory.
The default value of
fs.s3a.committer.abort.pending.uploadsis set to false to support concurrent writes by default.How was this patch tested?
Ran S3A Unit Tests
Ran S3A Integration test in
us-west-1region[INFO] Running org.apache.hadoop.fs.s3a.commit.staging.TestDirectoryCommitterScale [INFO] Running org.apache.hadoop.fs.s3a.commit.staging.TestPaths [INFO] Running org.apache.hadoop.fs.s3a.commit.TestMagicCommitPaths [INFO] Running org.apache.hadoop.fs.s3a.commit.staging.TestStagingCommitter [INFO] Running org.apache.hadoop.fs.s3a.commit.staging.TestStagingPartitionedFileListing [INFO] Running org.apache.hadoop.fs.s3a.commit.staging.TestStagingDirectoryOutputCommitter [INFO] Running org.apache.hadoop.fs.s3a.commit.staging.TestStagingPartitionedJobCommit [INFO] Running org.apache.hadoop.fs.s3a.commit.staging.TestStagingPartitionedTaskCommit [INFO] Tests run: 28, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.473 s - in org.apache.hadoop.fs.s3a.commit.TestMagicCommitPaths [INFO] Tests run: 14, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.611 s - in org.apache.hadoop.fs.s3a.commit.staging.TestPaths [INFO] Tests run: 8, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 20.965 s - in org.apache.hadoop.fs.s3a.commit.staging.TestStagingDirectoryOutputCommitter [INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 23.474 s - in org.apache.hadoop.fs.s3a.commit.staging.TestStagingPartitionedFileListing [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 27.627 s - in org.apache.hadoop.fs.s3a.commit.staging.TestStagingPartitionedJobCommit [INFO] Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 29.249 s - in org.apache.hadoop.fs.s3a.commit.staging.TestStagingPartitionedTaskCommit [INFO] Tests run: 63, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 61.11 s - in org.apache.hadoop.fs.s3a.commit.staging.TestStagingCommitter [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 84.193 s - in org.apache.hadoop.fs.s3a.commit.staging.TestDirectoryCommitterScale [INFO] Running org.apache.hadoop.fs.s3a.commit.staging.integration.ITestStagingCommitProtocol [INFO] Running org.apache.hadoop.fs.s3a.commit.staging.integration.ITestDirectoryCommitProtocol [INFO] Running org.apache.hadoop.fs.s3a.commit.staging.integration.ITestPartitionedCommitProtocol [INFO] Running org.apache.hadoop.fs.s3a.commit.staging.integration.ITestStagingCommitProtocolFailure [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.901 s - in .hadoop.fs.s3a.commit.staging.integration.ITestStagingCommitProtocolFailure [INFO] Running org.apache.hadoop.fs.s3a.commit.magic.ITestMagicCommitProtocol [INFO] Running org.apache.hadoop.fs.s3a.commit.magic.ITestMagicCommitProtocolFailure [INFO] Running org.apache.hadoop.fs.s3a.commit.integration.ITestS3ACommitterMRJob [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.225 s - in org.apache.hadoop.fs.s3a.commit.magic.ITestMagicCommitProtocolFailure [INFO] Running org.apache.hadoop.fs.s3a.commit.ITestS3ACommitterFactory [INFO] Running org.apache.hadoop.fs.s3a.commit.ITestCommitOperations [INFO] Running org.apache.hadoop.fs.s3a.commit.ITestCommitOperationCost [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 7.05 s - in org.apache.hadoop.fs.s3a.commit.ITestS3ACommitterFactory [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 43.398 s - in org.apache.hadoop.fs.s3a.commit.ITestCommitOperationCost [INFO] Tests run: 18, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 143.308 s - in org.apache.hadoop.fs.s3a.commit.ITestCommitOperations [INFO] Running org.apache.hadoop.fs.s3a.auth.ITestAssumedRoleCommitOperations [INFO] Tests run: 12, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 218.466 s - in org.apache.hadoop.fs.s3a.commit.integration.ITestS3ACommitterMRJob [WARNING] Tests run: 18, Failures: 0, Errors: 0, Skipped: 18, Time elapsed: 62.036 s - in org.apache.hadoop.fs.s3a.auth.ITestAssumedRoleCommitOperations [WARNING] Tests run: 24, Failures: 0, Errors: 0, Skipped: 1, Time elapsed: 429.367 s - in .hadoop.fs.s3a.commit.staging.integration.ITestPartitionedCommitProtocol [INFO] Tests run: 24, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 472.06 s - in .hadoop.fs.s3a.commit.staging.integration.ITestStagingCommitProtocol [INFO] Tests run: 25, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 498.078 s - in .hadoop.fs.s3a.commit.staging.integration.ITestDirectoryCommitProtocol [INFO] Tests run: 23, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 861.607 s - in org.apache.hadoop.fs.s3a.commit.magic.ITestMagicCommitProtocol [INFO] Running org.apache.hadoop.fs.s3a.commit.magic.ITestS3AHugeMagicCommits [WARNING] Tests run: 10, Failures: 0, Errors: 0, Skipped: 10, Time elapsed: 29.141 s - in org.apache.hadoop.fs.s3a.commit.magic.ITestS3AHugeMagicCommits [INFO] Running org.apache.hadoop.fs.s3a.commit.terasort.ITestTerasortOnS3A [WARNING] Tests run: 14, Failures: 0, Errors: 0, Skipped: 14, Time elapsed: 47.485 s - in org.apache.hadoop.fs.s3a.commit.terasort.ITestTerasortOnS3A