Skip to content

Conversation

@jkbradley
Copy link
Member

Added the 2 methods to Graph and GraphImpl. Both make calls to the underlying vertex and edge RDDs.

This is needed for another PR (for LDA): [https://github.com//pull/4047]

Notes:

  • getCheckpointedFiles is plural and returns a Seq[String] instead of an Option[String].
  • I attempted to test to make sure the methods returned the correct values after checkpointing. It did not work; I guess that checkpointing does not occur quickly enough? I noticed that there are not checkpointing tests for RDDs; is it just hard to test well?

CC: @rxin

CC: @mengxr (since related to LDA)

@jkbradley
Copy link
Member Author

Unrelated failure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe beef up the documentation to say this only returns true if both vertices and edges are checkpointed?

@jkbradley
Copy link
Member Author

@rxin Added more doc. Thanks!

@rxin
Copy link
Contributor

rxin commented Jan 28, 2015

lgtm.

@SparkQA
Copy link

SparkQA commented Jan 28, 2015

Test build #568 has finished for PR 4253 at commit 235738c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jkbradley
Copy link
Member Author

Failure in org.apache.spark.sql.CachedTableSuite...we'll see how the 2nd test fares

@jkbradley
Copy link
Member Author

I'm not sure if this patch actually works. The following code works with RDD, but the similar code (below) does not work with Graph:

// For RDD:
val rdd = sc.parallelize(Seq(1,2,3))
sc.setCheckpointDir("tmpcheckpoints")
rdd.checkpoint()
rdd.count
rdd.isCheckpointed // returns true
// For Graph
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Edge
sc.setCheckpointDir("tmpcheckpoints")
val edges = sc.parallelize(Seq(Edge[Int](0,1), Edge[Int](1,2), Edge[Int](2,3)))
val g = Graph.fromEdges(edges, 0)
g.checkpoint()
g.vertices.count
g.edges.count
g.isCheckpointed // returns false
g.vertices.isCheckpointed // returns false
g.edges.isCheckpointed // returns false

@rxin
Copy link
Contributor

rxin commented Jan 28, 2015

I think the problem is that VertexRDD didn't override isCheckpointed

@SparkQA
Copy link

SparkQA commented Jan 28, 2015

Test build #26253 has finished for PR 4253 at commit 188665f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

… VertexRDDImpl. The corresponding Graph methods now work.
@jkbradley
Copy link
Member Author

That broke some stuff...will fix soon

@jkbradley
Copy link
Member Author

OK, I'm actually confused about how to check isCheckpointed. It looks like it's called by workers, which is a problem when I try to use "partitionsRDD." What should I check instead?

@SparkQA
Copy link

SparkQA commented Jan 29, 2015

Test build #26269 has finished for PR 4253 at commit cc00767.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jan 29, 2015

is the problem partitionsRDD transient?

@jkbradley
Copy link
Member Author

@rxin I believe so, but I'm not sure what alternatives there are.

@SparkQA
Copy link

SparkQA commented Jan 29, 2015

Test build #26336 has finished for PR 4253 at commit 695b7a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jkbradley
Copy link
Member Author

@ankurdave I guess making partitionsRDD non-transient didn't cause any problems.

@mengxr
Copy link
Contributor

mengxr commented Jan 30, 2015

@jkbradley @rxin It seems that there was a long discussion about this transient:

https://issues.apache.org/jira/browse/SPARK-4672

@JerryLead Do you know any workaround to fix unit tests while keeping transient? Or would checkpointing added in this PR solve the serialization problem you described in SPARK-4672?

@uncleGen
Copy link
Contributor

uncleGen commented Feb 2, 2015

@jkbradley IMHO, we do not need to override the isCheckpointed() in EdgeRDDImpl and VertexRDDImpl, and only define a normal isCheckpointed(). IIUC, the func isCheckpointed() is just be used in driver.

@mengxr
Copy link
Contributor

mengxr commented Feb 2, 2015

isCheckedpointed is call in computeOrReadCheckpoint, which happens on workers:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L248

I think the correct solution won't be using transient to cut lineage but remove the dependencies properly.

…, and made isCheckpointed check firstParent instead of partitionsRDD
@jkbradley
Copy link
Member Author

That last commit added the transient tag back and uses firstParent in isCheckpointed, per @mengxr 's suggestion.

@SparkQA
Copy link

SparkQA commented Feb 2, 2015

Test build #26526 has finished for PR 4253 at commit 250810e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jkbradley
Copy link
Member Author

That latest test passing was before I added the class tag to firstParent; the next will be with the class tag added.

@SparkQA
Copy link

SparkQA commented Feb 2, 2015

Test build #26542 has finished for PR 4253 at commit b680148.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mengxr
Copy link
Contributor

mengxr commented Feb 2, 2015

The failed test is in Spark SQL. Already pinged @liancheng to take a look. I'm going to merge this PR.

@asfgit asfgit closed this in 842d000 Feb 2, 2015
@liancheng
Copy link
Contributor

PR #4173 unpersists tables in a non-blocking way, and causes race condition in CachedTableSuite. Filed SPARK-5538 to track this.

@jkbradley jkbradley deleted the graphx-checkpoint branch May 4, 2015 23:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants