-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18294][CORE] Implement commit protocol to support mapred package's committer
#15861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is ready for review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is moved to internal/io/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since most of the classes in mapred extend from mapreduce, we only have to override setupCommitter to make it support the OutputCommitter from mapred API. But the SparkHadoopWriter requires extensive refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This have been merged with the origin SparkHadoopWriter, the basic work flow of this doesn't change but a SparkHadoopWriterConfig class is imported to create output Format/Committer/Writer from JobConf/Configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved to a seprated file, with the content unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We call initOutputFormat here, encapsulate OutputFormat into SparkHadoopWriterConfig because the output format classes from mapred and mapreduce package don't have common super class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic appears duplicatedly in PairRDDFunctions, let's move it here and delete at all other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class creates output Format/Committer/Writer from JobConf using the mapred API, mainly create these stuffs from conf.get.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This supports the new mapreduce API, which creates OutputFormat from jobContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not supposed to create a SparkHadoopWriter inside this method, we can just create a HadoopMapRedCommitProtocol instead.
|
Test build #68558 has finished for PR 15861 at commit
|
|
Test build #68562 has finished for PR 15861 at commit
|
|
retest this please - looks it has passed all the test cases but the build didn't commit. |
|
Test build #68576 has finished for PR 15861 at commit
|
|
cc @mridulm too |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this? it seems like we don't want to make this configurable and there will only be two places that call this. Why not just have those two callers invoke the right constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reasonable, I'll address this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing that confuses me is why this is named "Config"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this correctly, this is basically an abstraction that makes both the old mapred API and the new mapreduce API work, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's an abstraction that conceal the differences between using the mapred and the mapreduce API. It is called SparkHadoopWriterConfig because we create everything from JobConf/Configuration, but I believe there is a more concise name for it, any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just HadoopWriteConfigUtil ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure - I'll update that.
|
Looks the failed test suite |
|
Test build #3423 has finished for PR 15861 at commit
|
|
Test build #68600 has finished for PR 15861 at commit
|
|
@mridulm Would you please look at this when you have time? Thank you! |
|
Would anyone look at this PR please? |
|
Test build #68762 has finished for PR 15861 at commit
|
f826a5a to
bedcd10
Compare
|
Test build #68894 has finished for PR 15861 at commit
|
|
cc @mridulm can you take a look? |
|
@rxin I did see this PR, unfortunately it is a bit big and I am tied up with other things - cant get to it for next few days. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split into multiple lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a setupJob on the committer here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a behavior change here - in earlier code, a new instance was used to check the output specs against.
Here, it is the same instance : IMO this should be fine, but wanted to call it out in case someone has thoughts on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, why not move this into assertConf with a isOutputSpecValidationEnabled check ?
| * @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do | ||
| * not use output committer that writes data directly. | ||
| * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad | ||
| * result of using direct output committer with speculation enabled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this removed ? It is still relevant now even if checked in a different method invoked from here
| // -------------------------------------------------------------------------- | ||
|
|
||
| def createJobContext(jobTrackerId: String, jobId: Int): NewJobContext = { | ||
| val jobAttemptId = new SerializableWritable(new JobID(jobTrackerId, jobId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wrap it in SerializableWritable ?
| FileCommitProtocol.instantiate( | ||
| className = classOf[HadoopMapReduceCommitProtocol].getName, | ||
| jobId = jobId.toString, | ||
| outputPath = getConf().get("mapred.output.dir"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR ?
If yes, we should have a test which fails for this case to catch future bugs.
| // -------------------------------------------------------------------------- | ||
|
|
||
| def assertConf(): Unit = { | ||
| // Do nothing for mapreduce API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a bunch of validations being done in saveAsHadoopDataset - shouldn't they not be here ?
Which includes SparkHadoopUtil.get.addCredentials, etc.
| val ret = Utils.tryWithSafeFinallyAndFailureCallbacks { | ||
| while (iterator.hasNext) { | ||
| val pair = iterator.next() | ||
| config.write(pair) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope this gets JIT'ed away ...
| // FileOutputFormat ignores the filesystem parameter | ||
| val ignoredFs = FileSystem.get(hadoopConf) | ||
| hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These validations should go into HadoopMapReduceWriteConfigUtil
| "ignored", pairs.keyClass, pairs.valueClass, classOf[FakeFormatWithCallback], conf) | ||
| } | ||
| assert(e.getMessage contains "failed to write") | ||
| assert(e.getCause.getMessage contains "failed to write") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, how/why did this change ?
|
@jiangxb1987 I did a single pass review - particularly given the similarities in both the codepaths and the classnames, I will need to go over it again to ensure we dont miss anything. |
|
(gentle ping @jiangxb1987) |
|
This PR should be separated into some smaller ones, I'll do this at about March. |
What changes were proposed in this pull request?
This PR makes the following changes:
HadoopMapRedCommitProtocolwhich support the oldmapredpackage's committer;SparkHadoopWriterandSparkHadoopMapReduceWriter, now they are combined together, thus we can support write through bothmapredandmapreduceAPI by the newSparkHadoopWriter, a lot of duplicated codes are removed;SparkHadoopWriterUtilsto a seprated file.How was this patch tested?
This PR is not changing any behavior, so it is tested by the existing test cases.