-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18191][CORE] Port RDD API to use commit protocol #15769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #68137 has finished for PR 15769 at commit
|
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't touch SparkHadoopWriter and PairRDDFunctions.saveAsHadoopDataset, because they use the older mapred API, which is not supported by the FileCommitProtocol framework now.
| def createJobTrackerID(time: Date): String = { | ||
| new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to generate jobTrackerID seprately in SparkNewHadoopWriter
| (2, ArrayBuffer(1)))) | ||
| } | ||
|
|
||
| test("saveNewAPIHadoopFile should call setConf if format is configurable") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is no longer needed.
| /* | ||
| These classes are fakes for testing | ||
| "saveNewAPIHadoopFile should call setConf if format is configurable". | ||
| These classes are fakes for testing saveAsHadoopFile/saveNewAPIHadoopFile. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment have been out-of-date for a while.
|
Test build #68139 has finished for PR 15769 at commit
|
| * OutputCommitter, is serializable. | ||
| */ | ||
| private[spark] | ||
| class SparkNewHadoopWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this into internal/io
|
Doesn't need to be in this PR, but can you also implement a HadoopMapRedCommitProtocol that supports the older mapred package's commiter? |
| // Instantiate writer | ||
| val committer = FileCommitProtocol.instantiate( | ||
| className = classOf[HadoopMapReduceCommitProtocol].getName, | ||
| jobId = stageId.toString, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this how we determine the old job id? i thought it had some date in it too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact I'm not sure what value should be assigned to jobId here, should it be jobTrackerId combined with jobId, or any value else? I failed to find some code to follow on this topic.
| jobFormat.checkOutputSpecs(job) | ||
| } | ||
|
|
||
| // Instantiate writer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think you can move most of the logic from saveAsNewAPIHadoopDataset into SparkNewHadoopWriter? It'd be similar to how FileFormatWriter works, but much simpler because there is no dynamic partition insert.
|
Test build #68168 has finished for PR 15769 at commit
|
|
Test build #68213 has finished for PR 15769 at commit
|
|
Moved the logic from |
|
Test build #68214 has finished for PR 15769 at commit
|
| import org.apache.hadoop.mapred._ | ||
| import org.apache.hadoop.mapreduce.TaskType | ||
|
|
||
| import org.apache.spark.internal.Logging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd move this file into spark/internal/io as well to be closer to SparkNewHadoopWriter.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can do it in a separate pr when you do the mapred committer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I'll do it later.
| * (from the newer mapreduce API, not the old mapred API). | ||
| */ | ||
| private[spark] | ||
| object SparkNewHadoopWriter extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make more sense to call it SparkHadoopMapReduceWriter to be more consistent? I understand we use "new" vs "old" in the RDD API, but that's always been fairly confusing to me.
| object SparkNewHadoopWriter extends Logging { | ||
|
|
||
| /** A shared job description for all the write tasks. */ | ||
| private class WriteJobDescription[K, V]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just remove this, since you have only 3 items here. In SQL there were a lot of items.
| def write[K, V: ClassTag]( | ||
| sparkContext: SparkContext, | ||
| rdd: RDD[(K, V)], | ||
| committer: HadoopMapReduceCommitProtocol, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd move the creation of the commit protocol here here. The reason I put it outside in SQL was because streaming and batch needed to specify different protocols, but that problem doesn't exist in core.
|
Test build #68215 has finished for PR 15769 at commit
|
|
Test build #68219 has finished for PR 15769 at commit
|
|
The failed test case is imported by #15725, which is not related to our changes in this PR. |
|
|
||
| // Try to write all RDD partitions as a Hadoop OutputFormat. | ||
| try { | ||
| sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to collect the result coming from the commit protocol here, and pass it into commitJob, don't we?
|
The rest looks good. cc @ericl for another look too |
|
retest this please. |
|
Test build #68258 has finished for PR 15769 at commit
|
|
Test build #68265 has finished for PR 15769 at commit
|
|
I've disabled the test. |
|
Test build #3417 has finished for PR 15769 at commit
|
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, added a few comments though.
|
|
||
| committer.commitJob(jobContext, ret) | ||
| logInfo(s"Job ${jobContext.getJobID} committed.") | ||
| } catch { case cause: Throwable => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case in a new line ?
| outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => | ||
| om.setBytesWritten(callback()) | ||
| om.setRecordsWritten(recordsWritten) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a behavior change - metric's are getting updated even when exceptions are thrown.
Do it after Utils.tryWithSafeFinallyAndFailureCallbacks completes, not in finally
|
|
||
| if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(sparkConf)) { | ||
| // FileOutputFormat ignores the filesystem parameter | ||
| val jobFormat = format.newInstance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is Configurable, we should invoke setConf on it - the earlier code was doing this.
The tests for that seem to have been removed as well.
Any reason why ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic is to create a OutputCommitter from the given hadoop conf, which is now handled by HadoopMapReduceCommitProtocol, so this code and corresponding tests are no longer needed.
I've search the history code and failed to figure out why we were doing this...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I added the comment to the wrong location (though it is relevant here, it is probably less serious ?).
HadoopMapReduceCommitProtocol.setupCommitter() should be doing a setConf if OutputFormat is Configurable.
This needs to be fixed to ensure custom OutputFormat's work.
I see that the PR has already been committed - can you please file a bug and fix it ?
The test will also need to be re-added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh...I see the problem now. Will add that in a follow up, thank you for clarifying.
| } | ||
| } | ||
|
|
||
| val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private and move it to top of the object ?
|
@mridulm I've addressed most of your comments. Thank you! |
|
Test build #68330 has finished for PR 15769 at commit
|
|
Thanks - merging in master. |
…nfigurable`. ## What changes were proposed in this pull request? We should call `setConf` if `OutputFormat` is `Configurable`, this should be done before we create `OutputCommitter` and `RecordWriter`. This is follow up of #15769, see discussion [here](https://github.com/apache/spark/pull/15769/files#r87064229) ## How was this patch tested? Add test of this case in `PairRDDFunctionsSuite`. Author: jiangxingbo <[email protected]> Closes #15823 from jiangxb1987/config-format.
## What changes were proposed in this pull request? This PR port RDD API to use commit protocol, the changes made here: 1. Add new internal helper class that saves an RDD using a Hadoop OutputFormat named `SparkNewHadoopWriter`, it's similar with `SparkHadoopWriter` but uses commit protocol. This class supports the newer `mapreduce` API, instead of the old `mapred` API which is supported by `SparkHadoopWriter`; 2. Rewrite `PairRDDFunctions.saveAsNewAPIHadoopDataset` function, so it uses commit protocol now. ## How was this patch tested? Exsiting test cases. Author: jiangxingbo <[email protected]> Closes apache#15769 from jiangxb1987/rdd-commit.
…nfigurable`. ## What changes were proposed in this pull request? We should call `setConf` if `OutputFormat` is `Configurable`, this should be done before we create `OutputCommitter` and `RecordWriter`. This is follow up of apache#15769, see discussion [here](https://github.com/apache/spark/pull/15769/files#r87064229) ## How was this patch tested? Add test of this case in `PairRDDFunctionsSuite`. Author: jiangxingbo <[email protected]> Closes apache#15823 from jiangxb1987/config-format.
| hadoopConf: Configuration): Unit = { | ||
| // Extract context and configuration from RDD. | ||
| val sparkContext = rdd.context | ||
| val stageId = rdd.id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this accurate? Seems weird that satgeId is set to be equal to rdd.id. What is the commit protocol here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It follows the previous behavior, what's your concern here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had a test failure using Spark 2.2 that seems to happen after this commit. The failure didn't occur before using Spark 2.0. We thought there could be a problem since the jobContext passed to commiter.setupJob() (line 84) has a different JobID comparing to the task context that is passed to to commiter.setupTask() (line 126) in the former, it comes from rdd.id and in the later, its from stage.id. Just wanted to check what is the protocol here that requires such a difference? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I investigated the problem a little more and I filed a jira & fix, see SPARK-22162. Thank you in advance.
What changes were proposed in this pull request?
This PR port RDD API to use commit protocol, the changes made here:
SparkNewHadoopWriter, it's similar withSparkHadoopWriterbut uses commit protocol. This class supports the newermapreduceAPI, instead of the oldmapredAPI which is supported bySparkHadoopWriter;PairRDDFunctions.saveAsNewAPIHadoopDatasetfunction, so it uses commit protocol now.How was this patch tested?
Exsiting test cases.