Skip to content

Commit 0b3dbc0

Browse files
committed
Changed checkpointer constructor not to take initial data.
1 parent 568918c commit 0b3dbc0

File tree

5 files changed

+18
-47
lines changed

5 files changed

+18
-47
lines changed

mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -51,37 +51,11 @@ import org.apache.spark.storage.StorageLevel
5151
* - This class removes checkpoint files once later Datasets have been checkpointed.
5252
* However, references to the older Datasets will still return isCheckpointed = true.
5353
*
54-
* Example usage:
55-
* {{{
56-
* val (data1, data2, data3, ...) = ...
57-
* val cp = new PeriodicCheckpointer(data1, dir, 2)
58-
* data1.count();
59-
* // persisted: data1
60-
* cp.update(data2)
61-
* data2.count();
62-
* // persisted: data1, data2
63-
* // checkpointed: data2
64-
* cp.update(data3)
65-
* data3.count();
66-
* // persisted: data1, data2, data3
67-
* // checkpointed: data2
68-
* cp.update(data4)
69-
* data4.count();
70-
* // persisted: data2, data3, data4
71-
* // checkpointed: data4
72-
* cp.update(data5)
73-
* data5.count();
74-
* // persisted: data3, data4, data5
75-
* // checkpointed: data4
76-
* }}}
77-
*
78-
* @param currentData Initial Dataset
7954
* @param checkpointInterval Datasets will be checkpointed at this interval
8055
* @param sc SparkContext for the Datasets given to this checkpointer
8156
* @tparam T Dataset type, such as RDD[Double]
8257
*/
8358
private[mllib] abstract class PeriodicCheckpointer[T](
84-
var currentData: T,
8559
val checkpointInterval: Int,
8660
val sc: SparkContext) extends Logging {
8761

@@ -94,10 +68,8 @@ private[mllib] abstract class PeriodicCheckpointer[T](
9468
/** Number of times [[update()]] has been called */
9569
private var updateCount = 0
9670

97-
update(currentData)
98-
9971
/**
100-
* Update [[currentData]] with a new Dataset. Handle persistence and checkpointing as needed.
72+
* Update with a new Dataset. Handle persistence and checkpointing as needed.
10173
* Since this handles persistence and checkpointing, this should be called before the Dataset
10274
* has been materialized.
10375
*
@@ -124,15 +96,13 @@ private[mllib] abstract class PeriodicCheckpointer[T](
12496
var canDelete = true
12597
while (checkpointQueue.size > 1 && canDelete) {
12698
// Delete the oldest checkpoint only if the next checkpoint exists.
127-
if (isCheckpointed(checkpointQueue.get(1).get)) {
99+
if (isCheckpointed(checkpointQueue.head)) {
128100
removeCheckpointFile()
129101
} else {
130102
canDelete = false
131103
}
132104
}
133105
}
134-
135-
currentData = newData
136106
}
137107

138108
/** Checkpoint the Dataset */

mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointer.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.mllib.impl
1919

20+
import org.apache.spark.SparkContext
2021
import org.apache.spark.graphx.Graph
2122
import org.apache.spark.storage.StorageLevel
2223

@@ -47,7 +48,7 @@ import org.apache.spark.storage.StorageLevel
4748
* Example usage:
4849
* {{{
4950
* val (graph1, graph2, graph3, ...) = ...
50-
* val cp = new PeriodicGraphCheckpointer(graph1, dir, 2)
51+
* val cp = new PeriodicGraphCheckpointer(2, sc)
5152
* graph1.vertices.count(); graph1.edges.count()
5253
* // persisted: graph1
5354
* cp.updateGraph(graph2)
@@ -68,18 +69,16 @@ import org.apache.spark.storage.StorageLevel
6869
* // checkpointed: graph4
6970
* }}}
7071
*
71-
* @param initGraph Initial graph
7272
* @param checkpointInterval Graphs will be checkpointed at this interval
7373
* @tparam VD Vertex descriptor type
7474
* @tparam ED Edge descriptor type
7575
*
7676
* TODO: Move this out of MLlib?
7777
*/
7878
private[mllib] class PeriodicGraphCheckpointer[VD, ED](
79-
initGraph: Graph[VD, ED],
80-
checkpointInterval: Int)
81-
extends PeriodicCheckpointer[Graph[VD, ED]](initGraph, checkpointInterval,
82-
initGraph.vertices.sparkContext) {
79+
checkpointInterval: Int,
80+
sc: SparkContext)
81+
extends PeriodicCheckpointer[Graph[VD, ED]](checkpointInterval, sc) {
8382

8483
override def checkpoint(data: Graph[VD, ED]): Unit = data.checkpoint()
8584

mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.mllib.impl
1919

20+
import org.apache.spark.SparkContext
2021
import org.apache.spark.rdd.RDD
2122
import org.apache.spark.storage.StorageLevel
2223

@@ -47,7 +48,7 @@ import org.apache.spark.storage.StorageLevel
4748
* Example usage:
4849
* {{{
4950
* val (rdd1, rdd2, rdd3, ...) = ...
50-
* val cp = new PeriodicRDDCheckpointer(rdd1, dir, 2)
51+
* val cp = new PeriodicRDDCheckpointer(2, sc)
5152
* rdd1.count();
5253
* // persisted: rdd1
5354
* cp.update(rdd2)
@@ -68,16 +69,15 @@ import org.apache.spark.storage.StorageLevel
6869
* // checkpointed: rdd4
6970
* }}}
7071
*
71-
* @param initRDD Initial RDD
7272
* @param checkpointInterval RDDs will be checkpointed at this interval
7373
* @tparam T RDD element type
7474
*
7575
* TODO: Move this out of MLlib?
7676
*/
7777
private[mllib] class PeriodicRDDCheckpointer[T](
78-
initRDD: RDD[T],
79-
checkpointInterval: Int)
80-
extends PeriodicCheckpointer[RDD[T]](initRDD, checkpointInterval, initRDD.sparkContext) {
78+
checkpointInterval: Int,
79+
sc: SparkContext)
80+
extends PeriodicCheckpointer[RDD[T]](checkpointInterval, sc) {
8181

8282
override def checkpoint(data: RDD[T]): Unit = data.checkpoint()
8383

mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ class PeriodicGraphCheckpointerSuite extends SparkFunSuite with MLlibTestSparkCo
3434
var graphsToCheck = Seq.empty[GraphToCheck]
3535

3636
val graph1 = createGraph(sc)
37-
val checkpointer = new PeriodicGraphCheckpointer(graph1, 10)
37+
val checkpointer =
38+
new PeriodicGraphCheckpointer[Double, Double](10, graph1.vertices.sparkContext)
3839
graphsToCheck = graphsToCheck :+ GraphToCheck(graph1, 1)
3940
checkPersistence(graphsToCheck, 1)
4041

@@ -55,7 +56,8 @@ class PeriodicGraphCheckpointerSuite extends SparkFunSuite with MLlibTestSparkCo
5556
var graphsToCheck = Seq.empty[GraphToCheck]
5657
sc.setCheckpointDir(path)
5758
val graph1 = createGraph(sc)
58-
val checkpointer = new PeriodicGraphCheckpointer(graph1, checkpointInterval)
59+
val checkpointer = new PeriodicGraphCheckpointer[Double, Double](
60+
checkpointInterval, graph1.vertices.sparkContext)
5961
graph1.edges.count()
6062
graph1.vertices.count()
6163
graphsToCheck = graphsToCheck :+ GraphToCheck(graph1, 1)

mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with MLlibTestSparkCont
3434
var rddsToCheck = Seq.empty[RDDToCheck]
3535

3636
val rdd1 = createRDD(sc)
37-
val checkpointer = new PeriodicRDDCheckpointer(rdd1, 10)
37+
val checkpointer = new PeriodicRDDCheckpointer[Double](10, rdd1.sparkContext)
3838
rddsToCheck = rddsToCheck :+ RDDToCheck(rdd1, 1)
3939
checkPersistence(rddsToCheck, 1)
4040

@@ -55,7 +55,7 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with MLlibTestSparkCont
5555
var rddsToCheck = Seq.empty[RDDToCheck]
5656
sc.setCheckpointDir(path)
5757
val rdd1 = createRDD(sc)
58-
val checkpointer = new PeriodicRDDCheckpointer(rdd1, checkpointInterval)
58+
val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, rdd1.sparkContext)
5959
rdd1.count()
6060
rddsToCheck = rddsToCheck :+ RDDToCheck(rdd1, 1)
6161
checkCheckpoint(rddsToCheck, 1, checkpointInterval)

0 commit comments

Comments
 (0)