From 235738c1a8bff39a09b681d627a1a8096a4f4128 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 28 Jan 2015 12:39:20 -0800 Subject: [PATCH 1/6] Added isCheckpointed and getCheckpointFiles to Graph, GraphImpl --- .../main/scala/org/apache/spark/graphx/Graph.scala | 10 ++++++++++ .../org/apache/spark/graphx/impl/GraphImpl.scala | 11 +++++++++++ .../scala/org/apache/spark/graphx/GraphSuite.scala | 2 ++ project/MimaExcludes.scala | 6 ++++++ 4 files changed, 29 insertions(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 84b72b390ca35..622a59ade2da5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -104,6 +104,16 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def checkpoint(): Unit + /** + * Return whether this Graph has been checkpointed or not + */ + def isCheckpointed: Boolean + + /** + * Gets the name of the files to which this Graph was checkpointed + */ + def getCheckpointFiles: Seq[String] + /** * Uncaches both vertices and edges of this graph. This is useful in iterative algorithms that * build a new graph in each iteration. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 3f4a900d5b601..90a74d23a26cc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -70,6 +70,17 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( replicatedVertexView.edges.checkpoint() } + override def isCheckpointed: Boolean = { + vertices.isCheckpointed && replicatedVertexView.edges.isCheckpointed + } + + override def getCheckpointFiles: Seq[String] = { + Seq(vertices.getCheckpointFile, replicatedVertexView.edges.getCheckpointFile).flatMap { + case Some(path) => Seq(path) + case None => Seq() + } + } + override def unpersist(blocking: Boolean = true): Graph[VD, ED] = { unpersistVertices(blocking) replicatedVertexView.edges.unpersist(blocking) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index ed9876b8dc21c..b0f670674bbbd 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -375,6 +375,8 @@ class GraphSuite extends FunSuite with LocalSparkContext { val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)} val rdd = sc.parallelize(ring) val graph = Graph.fromEdges(rdd, 1.0F) + assert(!graph.isCheckpointed) + assert(graph.getCheckpointFiles.size === 0) graph.checkpoint() graph.edges.map(_.attr).count() graph.vertices.map(_._2).count() diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index e750fed7448cd..8a5639b3dec61 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -121,6 +121,12 @@ object MimaExcludes { // SPARK-5315 Spark Streaming Java API returns Scala DStream ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow") + ) ++ Seq( + // SPARK-5461 Graph should have isCheckpointed, getCheckpointFiles methods + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.graphx.Graph.getCheckpointFiles"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.graphx.Graph.isCheckpointed") ) case v if v.startsWith("1.2") => From 188665f0717715a1d6c7cdc00e1ef7d4784abec3 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 28 Jan 2015 13:48:24 -0800 Subject: [PATCH 2/6] improved documentation --- graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 622a59ade2da5..4455d2aa0e51a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -105,12 +105,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab def checkpoint(): Unit /** - * Return whether this Graph has been checkpointed or not + * Return whether this Graph has been checkpointed or not. + * This returns true iff both the vertices RDD and edges RDD have been checkpointed. */ def isCheckpointed: Boolean /** - * Gets the name of the files to which this Graph was checkpointed + * Gets the name of the files to which this Graph was checkpointed. + * (The vertices RDD and edges RDD are checkpointed separately.) */ def getCheckpointFiles: Seq[String] From cc007670b7581089f1d1b86a0a28340c6f33a134 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 28 Jan 2015 16:07:30 -0800 Subject: [PATCH 3/6] added overrides for isCheckpointed, getCheckpointFile in EdgeRDDImpl, VertexRDDImpl. The corresponding Graph methods now work. --- .../org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 10 +++++++++- .../org/apache/spark/graphx/impl/VertexRDDImpl.scala | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index f1550ac2e18ad..02174ec0da3da 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -73,7 +73,15 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( override def checkpoint() = { partitionsRDD.checkpoint() } - + + override def isCheckpointed: Boolean = { + partitionsRDD.isCheckpointed + } + + override def getCheckpointFile: Option[String] = { + partitionsRDD.getCheckpointFile + } + /** The number of edges in the RDD. */ override def count(): Long = { partitionsRDD.map(_._2.size.toLong).reduce(_ + _) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 9732c5b00c6d9..b6fec298b94e6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -74,7 +74,15 @@ class VertexRDDImpl[VD] private[graphx] ( override def checkpoint() = { partitionsRDD.checkpoint() } - + + override def isCheckpointed: Boolean = { + partitionsRDD.isCheckpointed + } + + override def getCheckpointFile: Option[String] = { + partitionsRDD.getCheckpointFile + } + /** The number of vertices in the RDD. */ override def count(): Long = { partitionsRDD.map(_.size).reduce(_ + _) From 695b7a3a060184314e7a30340a84b79c41fda9bf Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 29 Jan 2015 13:50:49 -0800 Subject: [PATCH 4/6] changed partitionsRDD in EdgeRDDImpl, VertexRDDImpl to be non-transient --- .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 2 +- .../main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala | 2 +- graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 02174ec0da3da..dfd37c6f2e51b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -26,7 +26,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( - @transient override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], + override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends EdgeRDD[ED](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index b6fec298b94e6..ff22daa92415a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -27,7 +27,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ class VertexRDDImpl[VD] private[graphx] ( - @transient val partitionsRDD: RDD[ShippableVertexPartition[VD]], + val partitionsRDD: RDD[ShippableVertexPartition[VD]], val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) (implicit override protected val vdTag: ClassTag[VD]) extends VertexRDD[VD](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index b0f670674bbbd..59a57ba7a33f1 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -385,6 +385,8 @@ class GraphSuite extends FunSuite with LocalSparkContext { val verticesDependencies = graph.vertices.partitionsRDD.dependencies assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]])) assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]])) + assert(graph.isCheckpointed) + assert(graph.getCheckpointFiles.size === 2) } } From 250810ed01fde66b9a53a05f393fa18cf9373ca8 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 2 Feb 2015 12:32:07 -0800 Subject: [PATCH 5/6] In EdgeRDDImple, VertexRDDImpl, added transient back to partitionsRDD, and made isCheckpointed check firstParent instead of partitionsRDD --- .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 4 ++-- .../scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index dfd37c6f2e51b..6c35d7029e078 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -26,7 +26,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( - override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], + @transient override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends EdgeRDD[ED](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { @@ -75,7 +75,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( } override def isCheckpointed: Boolean = { - partitionsRDD.isCheckpointed + firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed } override def getCheckpointFile: Option[String] = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index ff22daa92415a..20772623badff 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -27,7 +27,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ class VertexRDDImpl[VD] private[graphx] ( - val partitionsRDD: RDD[ShippableVertexPartition[VD]], + @transient val partitionsRDD: RDD[ShippableVertexPartition[VD]], val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) (implicit override protected val vdTag: ClassTag[VD]) extends VertexRDD[VD](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { @@ -76,7 +76,7 @@ class VertexRDDImpl[VD] private[graphx] ( } override def isCheckpointed: Boolean = { - partitionsRDD.isCheckpointed + firstParent.isCheckpointed } override def getCheckpointFile: Option[String] = { From b680148c7e40cb6b623c9640e3b30fbec4cec60f Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 2 Feb 2015 13:36:26 -0800 Subject: [PATCH 6/6] added class tag to firstParent call in VertexRDDImpl.isCheckpointed, though not needed to compile --- .../main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 20772623badff..3e4968d6c0d6f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -76,7 +76,7 @@ class VertexRDDImpl[VD] private[graphx] ( } override def isCheckpointed: Boolean = { - firstParent.isCheckpointed + firstParent[ShippableVertexPartition[VD]].isCheckpointed } override def getCheckpointFile: Option[String] = {