Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1582,14 +1582,15 @@ abstract class RDD[T: ClassTag](
/**
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
*/
def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
def isCheckpointed: Boolean = isCheckpointedAndMaterialized

/**
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
* This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to fix the comment here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait I just realized this is exactly the same as isCheckpointed. It's kind of confusing why we have two methods that do the same thing. Is it because VertexRDD or something overrides one but not the other?

Copy link
Contributor

@apivovarov apivovarov Oct 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 This method left as is in this PR - #15447

* return value. Exposed for testing.
*/
private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
private[spark] def isCheckpointedAndMaterialized: Boolean =
checkpointData.exists(_.isCheckpointed)
Copy link
Contributor

@andrewor14 andrewor14 Oct 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to fix the duplicate code is to make this one final and have isCheckpointed call this one, then implementations of RDD can feel free to override isCheckpointed without affecting our internal checkpointing logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean. if isCheckpointedAndMaterialized = isCheckpointed, and you override isCheckpointed, then that would change the behavior of isCheckpointedAndMaterialized as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I meant:

private[spark] final def isCheckpointedAndMaterialized: Boolean = {
  checkpointData.exists(_.isCheckpointed)
}

def isCheckpointed: Boolean = isCheckpointedAndMaterialized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed it. Can you check again?

Copy link
Contributor

@andrewor14 andrewor14 Jan 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, but you forgot to make this final. No big deal


/**
* Return whether this RDD is marked for local checkpointing.
Expand Down
27 changes: 27 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.graphx

import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {

Expand All @@ -33,4 +34,30 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("checkpointing") {
withSpark { sc =>
val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
edges.checkpoint()

// EdgeRDD not yet checkpointed
assert(!edges.isCheckpointed)
assert(!edges.isCheckpointedAndMaterialized)
assert(!edges.partitionsRDD.isCheckpointed)
assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)

val data = edges.collect().toSeq // force checkpointing

// EdgeRDD shows up as checkpointed, but internally it is not.
// Only internal partitionsRDD is checkpointed.
assert(edges.isCheckpointed)
assert(!edges.isCheckpointedAndMaterialized)
assert(edges.partitionsRDD.isCheckpointed)
assert(edges.partitionsRDD.isCheckpointedAndMaterialized)

assert(edges.collect().toSeq === data) // test checkpointed RDD
}
}

}
26 changes: 26 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.graphx
import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {

Expand Down Expand Up @@ -197,4 +198,29 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("checkpoint") {
withSpark { sc =>
val n = 100
val verts = vertices(sc, n)
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
verts.checkpoint()

// VertexRDD not yet checkpointed
assert(!verts.isCheckpointed)
assert(!verts.isCheckpointedAndMaterialized)
assert(!verts.partitionsRDD.isCheckpointed)
assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)

val data = verts.collect().toSeq // force checkpointing

// VertexRDD shows up as checkpointed, but internally it is not.
// Only internal partitionsRDD is checkpointed.
assert(verts.isCheckpointed)
assert(!verts.isCheckpointedAndMaterialized)
assert(verts.partitionsRDD.isCheckpointed)
assert(verts.partitionsRDD.isCheckpointedAndMaterialized)

assert(verts.collect().toSeq === data) // test checkpointed RDD
}
}
}