Skip to content

Commit 0d7e385

Browse files
committed
[SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD
## What changes were proposed in this pull request? EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed. This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException. The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic. ## How was this patch tested? New unit tests. Author: Tathagata Das <[email protected]> Closes #15396 from tdas/SPARK-14804. (cherry picked from commit 47d5d0d) Signed-off-by: Tathagata Das <[email protected]>
1 parent a5c10ff commit 0d7e385

File tree

3 files changed

+56
-2
lines changed

3 files changed

+56
-2
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1610,14 +1610,15 @@ abstract class RDD[T: ClassTag](
16101610
/**
16111611
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
16121612
*/
1613-
def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
1613+
def isCheckpointed: Boolean = isCheckpointedAndMaterialized
16141614

16151615
/**
16161616
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
16171617
* This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
16181618
* return value. Exposed for testing.
16191619
*/
1620-
private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
1620+
private[spark] def isCheckpointedAndMaterialized: Boolean =
1621+
checkpointData.exists(_.isCheckpointed)
16211622

16221623
/**
16231624
* Return whether this RDD is marked for local checkpointing.

graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
1919

2020
import org.apache.spark.SparkFunSuite
2121
import org.apache.spark.storage.StorageLevel
22+
import org.apache.spark.util.Utils
2223

2324
class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
2425

@@ -33,4 +34,30 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
3334
}
3435
}
3536

37+
test("checkpointing") {
38+
withSpark { sc =>
39+
val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
40+
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
41+
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
42+
edges.checkpoint()
43+
44+
// EdgeRDD not yet checkpointed
45+
assert(!edges.isCheckpointed)
46+
assert(!edges.isCheckpointedAndMaterialized)
47+
assert(!edges.partitionsRDD.isCheckpointed)
48+
assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)
49+
50+
val data = edges.collect().toSeq // force checkpointing
51+
52+
// EdgeRDD shows up as checkpointed, but internally it is not.
53+
// Only internal partitionsRDD is checkpointed.
54+
assert(edges.isCheckpointed)
55+
assert(!edges.isCheckpointedAndMaterialized)
56+
assert(edges.partitionsRDD.isCheckpointed)
57+
assert(edges.partitionsRDD.isCheckpointedAndMaterialized)
58+
59+
assert(edges.collect().toSeq === data) // test checkpointed RDD
60+
}
61+
}
62+
3663
}

graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
2020
import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
2121
import org.apache.spark.rdd.RDD
2222
import org.apache.spark.storage.StorageLevel
23+
import org.apache.spark.util.Utils
2324

2425
class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
2526

@@ -197,4 +198,29 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
197198
}
198199
}
199200

201+
test("checkpoint") {
202+
withSpark { sc =>
203+
val n = 100
204+
val verts = vertices(sc, n)
205+
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
206+
verts.checkpoint()
207+
208+
// VertexRDD not yet checkpointed
209+
assert(!verts.isCheckpointed)
210+
assert(!verts.isCheckpointedAndMaterialized)
211+
assert(!verts.partitionsRDD.isCheckpointed)
212+
assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)
213+
214+
val data = verts.collect().toSeq // force checkpointing
215+
216+
// VertexRDD shows up as checkpointed, but internally it is not.
217+
// Only internal partitionsRDD is checkpointed.
218+
assert(verts.isCheckpointed)
219+
assert(!verts.isCheckpointedAndMaterialized)
220+
assert(verts.partitionsRDD.isCheckpointed)
221+
assert(verts.partitionsRDD.isCheckpointedAndMaterialized)
222+
223+
assert(verts.collect().toSeq === data) // test checkpointed RDD
224+
}
225+
}
200226
}

0 commit comments

Comments
 (0)