-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-8582][Core] Add CheckpointingIterator to optimize checkpointing #7021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #35794 has finished for PR 7021 at commit
|
|
Test build #35796 has finished for PR 7021 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The partitioners of the rdds might have different numPartitions. It will causes error later.
|
Test build #35854 has finished for PR 7021 at commit
|
|
retest this please. |
|
Test build #35859 has finished for PR 7021 at commit
|
|
unrelated failure again. Looks like jenkin is unstable now? |
|
retest this please. |
|
Test build #35862 has finished for PR 7021 at commit
|
|
@viirya Thanks for working on this. Could you add some tests for this new iterator? In particular, we should have a test that fails before but no longer fails afterwards. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these should all be indented two spaces
Conflicts: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
|
@andrewor14 Thanks. I have added few tests for the new iterator. Other comments are addressed too. |
|
Test build #36508 has finished for PR 7021 at commit
|
|
Test build #36509 has finished for PR 7021 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be
checkpointData
.map(_.getCheckpointIterator(iter, context, split.index))
.getOrElse(iter)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this read from RDDCheckpointData.rddCheckpointDataPath?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, the path here should just be checkpointPath. Right now this duplicates some code.
|
@viirya Thanks for the tackling this issue, but I believe the existing implementation is not fully correct. There are two high level problems: First, if the checkpointing iterator is not fully consumed by the user, then we end up checkpointing only a subset of the computed data. I think we should ensure that the iterator is fully drained before we can safely truncate the RDD's lineage through Second, the state transition from I actually don't have a great idea on how to fix the first issue, however. We do not really have any visibility on how the higher level caller with use the iterator, and if we consume it eagerly ourselves then the application might fail. @tdas this seems like a fundamentally difficult problem. |
|
Ah, one thing we could do is the following: in |
|
@viirya by the way, I'm currently working on a major refactoring of all of this code in parallel. There will likely be a lot of conflicts to resolve at this rate. If you prefer, I could take up this issue and use your patch as a basis. In the release we'll be sure to give you credit for this fix. What do you think? |
|
@andrewor14 no problem, thanks. |
|
As discussed I have opened a patch #7279 that refactors all of this. After that one is merged I'll fix SPARK-8582 in the new refactored code based on the changes here. @viirya would you mind closing this then? Thanks for your time. |
JIRA: https://issues.apache.org/jira/browse/SPARK-8582