Skip to content

Commit 22d16b4

Browse files
committed
Fixed checkpointing
1 parent 9d8ae85 commit 22d16b4

File tree

3 files changed

+55
-1
lines changed

3 files changed

+55
-1
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1589,7 +1589,8 @@ abstract class RDD[T: ClassTag](
15891589
* This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
15901590
* return value. Exposed for testing.
15911591
*/
1592-
private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
1592+
private[spark] def isCheckpointedAndMaterialized: Boolean =
1593+
checkpointData.exists(_.isCheckpointed)
15931594

15941595
/**
15951596
* Return whether this RDD is marked for local checkpointing.

graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
1919

2020
import org.apache.spark.SparkFunSuite
2121
import org.apache.spark.storage.StorageLevel
22+
import org.apache.spark.util.Utils
2223

2324
class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
2425

@@ -33,4 +34,30 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
3334
}
3435
}
3536

37+
test("checkpointing") {
38+
withSpark { sc =>
39+
val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
40+
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
41+
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
42+
edges.checkpoint()
43+
44+
// EdgeRDD not yet checkpointed
45+
assert(!edges.isCheckpointed)
46+
assert(!edges.isCheckpointedAndMaterialized)
47+
assert(!edges.partitionsRDD.isCheckpointed)
48+
assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)
49+
50+
val data = edges.collect().toSeq // force checkpointing
51+
52+
// EdgeRDD shows up as checkpointed, but internally it is not.
53+
// Only internal partitionsRDD is checkpointed.
54+
assert(edges.isCheckpointed)
55+
assert(!edges.isCheckpointedAndMaterialized)
56+
assert(edges.partitionsRDD.isCheckpointed)
57+
assert(edges.partitionsRDD.isCheckpointedAndMaterialized)
58+
59+
assert(edges.collect().toSeq === data) // test checkpointed RDD
60+
}
61+
}
62+
3663
}

graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
2020
import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
2121
import org.apache.spark.rdd.RDD
2222
import org.apache.spark.storage.StorageLevel
23+
import org.apache.spark.util.Utils
2324

2425
class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
2526

@@ -197,4 +198,29 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
197198
}
198199
}
199200

201+
test("checkpoint") {
202+
withSpark { sc =>
203+
val n = 100
204+
val verts = vertices(sc, n)
205+
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
206+
verts.checkpoint()
207+
208+
// VertexRDD not yet checkpointed
209+
assert(!verts.isCheckpointed)
210+
assert(!verts.isCheckpointedAndMaterialized)
211+
assert(!verts.partitionsRDD.isCheckpointed)
212+
assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)
213+
214+
val data = verts.collect().toSeq // force checkpointing
215+
216+
// VertexRDD shows up as checkpointed, but internally it is not.
217+
// Only internal partitionsRDD is checkpointed.
218+
assert(verts.isCheckpointed)
219+
assert(!verts.isCheckpointedAndMaterialized)
220+
assert(verts.partitionsRDD.isCheckpointed)
221+
assert(verts.partitionsRDD.isCheckpointedAndMaterialized)
222+
223+
assert(verts.collect().toSeq === data) // test checkpointed RDD
224+
}
225+
}
200226
}

0 commit comments

Comments
 (0)