Skip to content

Commit f1859fc

Browse files
JerryLeadankurdave
authored andcommitted
[SPARK-4672][GraphX]Perform checkpoint() on PartitionsRDD to shorten the lineage
The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672 Iterative GraphX applications always have long lineage, while checkpoint() on EdgeRDD and VertexRDD themselves cannot shorten the lineage. In contrast, if we perform checkpoint() on their ParitionsRDD, the long lineage can be cut off. Moreover, the existing operations such as cache() in this code is performed on the PartitionsRDD, so checkpoint() should do the same way. More details and explanation can be found in the JIRA. Author: JerryLead <[email protected]> Author: Lijie Xu <[email protected]> Closes apache#3549 from JerryLead/my_graphX_checkpoint and squashes the following commits: d1aa8d8 [JerryLead] Perform checkpoint() on PartitionsRDD not VertexRDD and EdgeRDD themselves ff08ed4 [JerryLead] Merge branch 'master' of https://github.com/apache/spark c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark 52799e3 [Lijie Xu] Merge pull request #1 from apache/master (cherry picked from commit fc0a147) Signed-off-by: Ankur Dave <[email protected]>
1 parent 5e026a3 commit f1859fc

File tree

2 files changed

+8
-0
lines changed

2 files changed

+8
-0
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
7070
this
7171
}
7272

73+
override def checkpoint() = {
74+
partitionsRDD.checkpoint()
75+
}
76+
7377
/** The number of edges in the RDD. */
7478
override def count(): Long = {
7579
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ class VertexRDDImpl[VD] private[graphx] (
7171
this
7272
}
7373

74+
override def checkpoint() = {
75+
partitionsRDD.checkpoint()
76+
}
77+
7478
/** The number of vertices in the RDD. */
7579
override def count(): Long = {
7680
partitionsRDD.map(_.size).reduce(_ + _)

0 commit comments

Comments
 (0)