Skip to content

Conversation

@bxshi
Copy link

@bxshi bxshi commented Jun 22, 2014

Seems one can not materialize VertexRDD by simply calling count method, which is overridden by VertexRDD. But if you call RDD's count, it could materialize it.

Is this a feature that designed to get the count without materialize VertexRDD? If so, do you guys think it is necessary to add a materialize method to VertexRDD?

By the way, does count() is the cheapest way to materialize a RDD? Or it just cost the same resources like other actions?

Best,

@bxshi
Copy link
Author

bxshi commented Jun 22, 2014

Here's a simple code that could reproduce the problem

    val conf = new SparkConf().setAppName("HDTM")
      .setMaster("local[4]")

    val sc = new SparkContext(conf)

    sc.setCheckpointDir("./checkpoint")
    val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L, 2L)))
    val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L, 1L), Edge(2L, 0L, 2L)))
    val g = Graph(v, e)
    g.vertices.checkpoint()
    g.edges.checkpoint()
    g.vertices.count()
    g.numEdges
    println(s"${g.vertices.isCheckpointed } ${g.edges.isCheckpointed}")

    g.vertices.materialize()
    println(s"${g.vertices.isCheckpointed } ${g.edges.isCheckpointed}")

The first output is false true and after calling materialize the output is true true, which means vertexRDD is correctly check pointed.

@bxshi bxshi changed the title add a materialize method to materialize VertexRDD by calling RDD's count [SPARK-2245] add a materialize method to materialize VertexRDD by calling RDD's count Jun 23, 2014
@ankurdave
Copy link
Contributor

Thanks for pointing this out. See my comment on the JIRA issue -- the right solution is to override checkpoint() in VertexRDD.

delegate checkpoint related method to partitionsRDD
@bxshi
Copy link
Author

bxshi commented Jun 26, 2014

I override those public checkpoint related functions, but that will let to other exceptions. Hope you can help me on that. The detailed description is on SPARK-2245

@pwendell
Copy link
Contributor

pwendell commented Sep 2, 2014

@bxshi can you add [GraphX] to the title? This isn't getting sorted properly in our PR tool.

@bxshi bxshi changed the title [SPARK-2245] add a materialize method to materialize VertexRDD by calling RDD's count [GraphX][SPARK-2245] add a materialize method to materialize VertexRDD by calling RDD's count Sep 2, 2014
@SparkQA
Copy link

SparkQA commented Sep 5, 2014

Can one of the admins verify this patch?

@nchammas
Copy link
Contributor

@ankurdave @bxshi What's the status of this PR?

It hasn't been updated in a while, though it looks simple enough from a review standpoint.

@bxshi
Copy link
Author

bxshi commented Feb 25, 2015

It's not as simple as I thought...

Here is my reply on JIRA about this PR

I edited my original comment to add the updates, but I do not know if you can get them via email. So I resubmit it again. Hope that won't bother you. Ankur Dave
Hi Ankur Dave, I changed my pull request. But there is another exception, ShippableVertexPartition is not serializable. So I serialized it, but there is another exception org.apache.spark.graphx.impl.RoutingTablePartition is not serializable. Then I serialized it again, but on iteration 2 there will be an exception: org.apache.spark.graphx.impl.ShippableVertexPartition cannot be cast to scala.Tuple2
The code I'm using are:

val conf = new SparkConf().setAppName("HDTM")
.setMaster("local[4]")
val sc = new SparkContext(conf)
sc.setCheckpointDir("./checkpoint")
val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L, 2L)))
val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L, 1L), Edge(2L, 0L, 2L)))
var g = Graph(v, e)
val vertexIds = Seq(0L, 1L, 2L)
var prevG: Graph[VertexId, Long] = null
for (i <- 1 to 2000) {
vertexIds.toStream.foreach(id =>
{ prevG = g g = Graph(g.vertices, g.edges) g.vertices.cache() g.edges.cache() prevG.unpersistVertices(blocking = false) prevG.edges.unpersist(blocking = false) }
)
g.vertices.checkpoint()
g.edges.checkpoint()
g.edges.count()
g.vertices.count()
println(s"$
{g.vertices.isCheckpointed}
$
{g.edges.isCheckpointed}
")
println(" iter " + i + " finished")
}
println(g.vertices.collect().mkString(" "))
println(g.edges.collect().mkString(" "))

Am I on the right track? Or Should there be another way to change it?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@bxshi bxshi closed this Jul 14, 2015
mapr-devops pushed a commit to mapr/spark that referenced this pull request May 8, 2025
This reverts commit b9984350a3d2b706db92e50253d66c75f4304bb2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants