-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-8582][Core]Optimize checkpointing to avoid computing an RDD twice #9428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Conflicts: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
|
/cc @andrewor14 @tdas |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if _cpDir exists before mkdirs() is called ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's how it was before.
If checkpointing fails at the first time, _cpDir won't be deleted. Then the user may try to do it again, so we should allow checkpointing the same RDD even if _cpDir exists.
|
Test build #44863 has finished for PR 9428 at commit
|
|
/cc @JoshRosen |
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you indent these 2 lines
|
@zsxwing Looks great. All my comments are pretty minor. On second thought, local checkpointing doesn't really have this problem so as long as we handle the reliable checkpointing case we're good. Have you had a chance to test this on a real cluster? |
|
Test build #45393 has finished for PR 9428 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Throwable here because it will be thrown later. It's better to cleanup as well for fatal errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, still need to handle ControlThrowable. Updated it.
Yes. Tested this PR with Streaming |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just found a corner case for CheckpointingIterator. In this case, CheckpointingIterator.complete of parCollection will be called before lazyRDD's.
I cannot find any solution for this case since we cannot run CheckpointingIterator.completes in the correct order. Maybe we should revisit the approach of #9258
|
Test build #45431 has finished for PR 9428 at commit
|
|
Test build #45435 has finished for PR 9428 at commit
|
|
Test build #45447 has finished for PR 9428 at commit
|
|
As discussed offline, we cannot go with this approach. @zsxwing can you close this PR for now until we decide to tackle it some other way later? |
|
@zsxwing @andrewor14 Would either of you be able to explain briefly why this approach doesn't work? |
|
@michaelmior please take a look at this test |
|
@zsxwing Thanks for the pointer. It's not clear to me why this needs to be supported (and in fact the test no longer exists). However, I'm also not clear why the tests fails in the first place (I compiled and ran the code) but that's probably because I'm relatively new to Scala and don't fully understand the semantics of lazy vals. |
|
Hello. I find this feature to be really important and I would be happy to contribute here. Even though we would potentially not support every use case, it would already be great if in the majority of cases we could avoid the double computation, while in other cases we raise a warning saying that computation is gonna happen twice. This is specially important for a use case I have where a transformation creates random numbers, so I simply cant recompute things as results will be different. So in my case the only option to break lineage seems to be a full write() followed by read(). |
A task/stage may run multiple times due to failure. Why is this not a problem for you? |
|
That's the reason why I want to checkpoint when they are first calculated. Further transformations use these results several times. Of course it's not a problem per se to calculate twice for the checkpoint, but doing so for 1+TB of data is nonsense and I can't cache. |
## What changes were proposed in this pull request? This change adds local checkpoint support to datasets and respective bind from Python Dataframe API. If reliability requirements can be lowered to favor performance, as in cases of further quick transformations followed by a reliable save, localCheckpoints() fit very well. Furthermore, at the moment Reliable checkpoints still incur double computation (see apache#9428) In general it makes the API more complete as well. ## How was this patch tested? Python land quick use case: ```python >>> from time import sleep >>> from pyspark.sql import types as T >>> from pyspark.sql import functions as F >>> def f(x): sleep(1) return x*2 ...: >>> df1 = spark.range(30, numPartitions=6) >>> df2 = df1.select(F.udf(f, T.LongType())("id")) >>> %time _ = df2.collect() CPU times: user 7.79 ms, sys: 5.84 ms, total: 13.6 ms Wall time: 12.2 s >>> %time df3 = df2.localCheckpoint() CPU times: user 2.38 ms, sys: 2.3 ms, total: 4.68 ms Wall time: 10.3 s >>> %time _ = df3.collect() CPU times: user 5.09 ms, sys: 410 µs, total: 5.5 ms Wall time: 148 ms >>> sc.setCheckpointDir(".") >>> %time df3 = df2.checkpoint() CPU times: user 4.04 ms, sys: 1.63 ms, total: 5.67 ms Wall time: 20.3 s ``` Author: Fernando Pereira <[email protected]> Closes apache#19805 from ferdonline/feature_dataset_localCheckpoint.
Took over #7021 and fixed the following potential issues in the previous PR: