Skip to content

Commit d41902c

Browse files
committed
Oops, forgot to update an extra time in the checkpointer tests, after the last commit. I'll fix that. I'll also make some of the checkpointer methods protected, which I should have done before.
1 parent 32b23b8 commit d41902c

File tree

5 files changed

+21
-15
lines changed

5 files changed

+21
-15
lines changed

mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,22 +106,22 @@ private[mllib] abstract class PeriodicCheckpointer[T](
106106
}
107107

108108
/** Checkpoint the Dataset */
109-
def checkpoint(data: T): Unit
109+
protected def checkpoint(data: T): Unit
110110

111111
/** Return true iff the Dataset is checkpointed */
112-
def isCheckpointed(data: T): Boolean
112+
protected def isCheckpointed(data: T): Boolean
113113

114114
/**
115115
* Persist the Dataset.
116116
* Note: This should handle checking the current [[StorageLevel]] of the Dataset.
117117
*/
118-
def persist(data: T): Unit
118+
protected def persist(data: T): Unit
119119

120120
/** Unpersist the Dataset */
121-
def unpersist(data: T): Unit
121+
protected def unpersist(data: T): Unit
122122

123123
/** Get list of checkpoint files for this given Dataset */
124-
def getCheckpointFiles(data: T): Iterable[String]
124+
protected def getCheckpointFiles(data: T): Iterable[String]
125125

126126
/**
127127
* Call this at the end to delete any remaining checkpoint files.

mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointer.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,19 @@ private[mllib] class PeriodicGraphCheckpointer[VD, ED](
8080
sc: SparkContext)
8181
extends PeriodicCheckpointer[Graph[VD, ED]](checkpointInterval, sc) {
8282

83-
override def checkpoint(data: Graph[VD, ED]): Unit = data.checkpoint()
83+
override protected def checkpoint(data: Graph[VD, ED]): Unit = data.checkpoint()
8484

85-
override def isCheckpointed(data: Graph[VD, ED]): Boolean = data.isCheckpointed
85+
override protected def isCheckpointed(data: Graph[VD, ED]): Boolean = data.isCheckpointed
8686

87-
override def persist(data: Graph[VD, ED]): Unit = {
87+
override protected def persist(data: Graph[VD, ED]): Unit = {
8888
if (data.vertices.getStorageLevel == StorageLevel.NONE) {
8989
data.persist()
9090
}
9191
}
9292

93-
override def unpersist(data: Graph[VD, ED]): Unit = data.unpersist(blocking = false)
93+
override protected def unpersist(data: Graph[VD, ED]): Unit = data.unpersist(blocking = false)
9494

95-
override def getCheckpointFiles(data: Graph[VD, ED]): Iterable[String] = data.getCheckpointFiles
95+
override protected def getCheckpointFiles(data: Graph[VD, ED]): Iterable[String] = {
96+
data.getCheckpointFiles
97+
}
9698
}

mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,19 +79,19 @@ private[mllib] class PeriodicRDDCheckpointer[T](
7979
sc: SparkContext)
8080
extends PeriodicCheckpointer[RDD[T]](checkpointInterval, sc) {
8181

82-
override def checkpoint(data: RDD[T]): Unit = data.checkpoint()
82+
override protected def checkpoint(data: RDD[T]): Unit = data.checkpoint()
8383

84-
override def isCheckpointed(data: RDD[T]): Boolean = data.isCheckpointed
84+
override protected def isCheckpointed(data: RDD[T]): Boolean = data.isCheckpointed
8585

86-
override def persist(data: RDD[T]): Unit = {
86+
override protected def persist(data: RDD[T]): Unit = {
8787
if (data.getStorageLevel == StorageLevel.NONE) {
8888
data.persist()
8989
}
9090
}
9191

92-
override def unpersist(data: RDD[T]): Unit = data.unpersist(blocking = false)
92+
override protected def unpersist(data: RDD[T]): Unit = data.unpersist(blocking = false)
9393

94-
override def getCheckpointFiles(data: RDD[T]): Iterable[String] = {
94+
override protected def getCheckpointFiles(data: RDD[T]): Iterable[String] = {
9595
data.getCheckpointFile.map(x => x)
9696
}
9797
}

mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class PeriodicGraphCheckpointerSuite extends SparkFunSuite with MLlibTestSparkCo
3636
val graph1 = createGraph(sc)
3737
val checkpointer =
3838
new PeriodicGraphCheckpointer[Double, Double](10, graph1.vertices.sparkContext)
39+
checkpointer.update(graph1)
3940
graphsToCheck = graphsToCheck :+ GraphToCheck(graph1, 1)
4041
checkPersistence(graphsToCheck, 1)
4142

@@ -58,6 +59,7 @@ class PeriodicGraphCheckpointerSuite extends SparkFunSuite with MLlibTestSparkCo
5859
val graph1 = createGraph(sc)
5960
val checkpointer = new PeriodicGraphCheckpointer[Double, Double](
6061
checkpointInterval, graph1.vertices.sparkContext)
62+
checkpointer.update(graph1)
6163
graph1.edges.count()
6264
graph1.vertices.count()
6365
graphsToCheck = graphsToCheck :+ GraphToCheck(graph1, 1)

mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with MLlibTestSparkCont
3535

3636
val rdd1 = createRDD(sc)
3737
val checkpointer = new PeriodicRDDCheckpointer[Double](10, rdd1.sparkContext)
38+
checkpointer.update(rdd1)
3839
rddsToCheck = rddsToCheck :+ RDDToCheck(rdd1, 1)
3940
checkPersistence(rddsToCheck, 1)
4041

@@ -56,6 +57,7 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with MLlibTestSparkCont
5657
sc.setCheckpointDir(path)
5758
val rdd1 = createRDD(sc)
5859
val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, rdd1.sparkContext)
60+
checkpointer.update(rdd1)
5961
rdd1.count()
6062
rddsToCheck = rddsToCheck :+ RDDToCheck(rdd1, 1)
6163
checkCheckpoint(rddsToCheck, 1, checkpointInterval)

0 commit comments

Comments
 (0)