-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-1855] Local checkpointing #7279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #36749 has finished for PR 7279 at commit
|
|
This is very cool PR. |
|
Thank you @witgo :) |
|
Test build #36756 has finished for PR 7279 at commit
|
|
Test build #36761 has finished for PR 7279 at commit
|
|
Test build #36815 has finished for PR 7279 at commit
|
125af6f to
03f3126
Compare
|
Test build #36826 has finished for PR 7279 at commit
|
|
Test build #36836 has finished for PR 7279 at commit
|
|
Test build #36820 has finished for PR 7279 at commit
|
This commit makes two classes abstract: `RDDCheckpointData` and `CheckpointRDD`. It implements the existing fault-tolerant checkpointing by subclassing these abstract classes. The goal of this commit is to retain as much functionality as possible. Much of the code is just moved from one file to another. The following commits will add an implementation for unreliable checkpointing.
The write path runs a job to put each partition into disk store, while the read path simply reads these blocks back from the disk store.
This commit simplifies the previous one in removing the special `LocalCheckpointBlockId`, which is not needed if we use the checkpoint RDD's ID instead of the parent RDD's. This allows us to simply reuse the RDD cleanup code path, which is nice.
This commit makes each test in CheckpointSuite run twice, once for normal checkpointing and another time for local checkpointing. This commit also fixes legitimate test failures after the refactoring.
This commit does several things: (1) First, LocalCheckpointRDD is made significantly simplier. Instead of fetching block IDs from everyone and verifying whether the partition indices are continuous, we simply use the original RDD's partition indices. (2) Many checkpoint-related methods are now documented, and failure conditions in local checkpointing now present more informative error messages. (3) General code clean ups (reordering things for readability)
3d4a717 to
5da18c7
Compare
5da18c7 to
0db1e4b
Compare
This augments the existing end-to-end tests in CheckpointSuite.
0db1e4b to
4a182f3
Compare
|
Test build #36882 has finished for PR 7279 at commit
|
|
I did a pass. And there is one critical question that is not clear to me -- how does the LocalCheckpointRDDData.doCheckpoint() work in a distributed manner. |
|
OK, I fixed the concern with the local |
|
Test build #39406 has finished for PR 7279 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Why would this ever fail if the rdd.collect() gives same result before and after checkpointing has occurred? If above you simply test whether rdd.collect() is sufficient, then these further tests seems superfluous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is testing the following case:
rdd.localCheckpoint().map(...).filter(...).reduceByKey(...).first()
where the action doesn't happen immediately after the local checkpoint. This is a real case that needs to be tested because we need to look at the last RDD's ancestors to see whether they are checkpointed even if the last RDD is not.
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned in the earlier comment thread, you may have missed it. Recommenting it here.
This rdd.count isnt great either. Even when its cached, it may be cached on disk, or serialized in memory. In which case running a count may be costly and time consuming, and pretty much defeats the purpose of making this a cheap checkpointing. Also, in the majority of the cases, this will be fully cached, in which case running this job is superfluous. The right thing to do (which aint too hard) is to find out which partitions are missing and only run those partitions.
val missingPartitionIds = rdd.partition.filter { p =>
!blockManagerMaster.contains(RDDBlockId(rdd.id, p.index))
}.map { _.index }
rdd.sparkContext.runJob(
rdd,
(tc: TaskContext, iterator: Iterator[T]) => Utils.getIteratorSize(iterator) // same as count()
missintPartitionIds
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alright. Ideally the fix in SPARK-8582 will make the need to do this go away completely, but in the mean time we'll go with your suggestion.
|
Test build #1282 has finished for PR 7279 at commit
|
|
Test build #1281 has finished for PR 7279 at commit
|
|
Test build #39442 has finished for PR 7279 at commit
|
|
Test build #1284 has finished for PR 7279 at commit
|
|
Test build #1285 has finished for PR 7279 at commit
|
|
Test build #39466 has finished for PR 7279 at commit
|
|
Test build #1286 has finished for PR 7279 at commit
|
This proves that the test is valid!
|
Test build #39495 has finished for PR 7279 at commit
|
|
Test build #1301 has finished for PR 7279 at commit
|
|
Test build #1300 has finished for PR 7279 at commit
|
|
Test build #1302 timed out for PR 7279 at commit |
|
retest this please |
|
Test build #39536 has finished for PR 7279 at commit
|
|
Test build #1304 has finished for PR 7279 at commit
|
|
@tdas good to go? |
|
Yep, LGTM. Merging this to master. Great patch!! Thanks! |
Certain use cases of Spark involve RDDs with long lineages that must be truncated periodically (e.g. GraphX). The existing way of doing it is through
rdd.checkpoint(), which is expensive because it writes to HDFS. This patch provides an alternative to truncate lineages cheaply without providing the same level of fault tolerance.Local checkpointing writes checkpointed data to the local file system through the block manager. It is much faster than replicating to a reliable storage and provides the same semantics as long as executors do not fail. It is accessible through a new operator
rdd.localCheckpoint()and leaves the old one unchanged. Users may even decide to combine the two and call the reliable one less frequently.The bulk of this patch involves refactoring the checkpointing interface to accept custom implementations of checkpointing. Design doc.