Skip to content

Commit 4af96d8

Browse files
committed
Disable checkpoint in PeriodicCheckpointer
1 parent a5c0343 commit 4af96d8

File tree

3 files changed

+11
-14
lines changed

3 files changed

+11
-14
lines changed

mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,9 @@ final class EMLDAOptimizer extends LDAOptimizer {
142142
this.k = k
143143
this.vocabSize = docs.take(1).head._2.size
144144
this.checkpointInterval = lda.getCheckpointInterval
145-
if (this.checkpointInterval != -1) {
146-
this.graphCheckpointer = new PeriodicGraphCheckpointer[TopicCounts, TokenCount](
147-
checkpointInterval, graph.vertices.sparkContext)
148-
this.graphCheckpointer.update(this.graph)
149-
}
145+
this.graphCheckpointer = new PeriodicGraphCheckpointer[TopicCounts, TokenCount](
146+
checkpointInterval, graph.vertices.sparkContext)
147+
this.graphCheckpointer.update(this.graph)
150148
this.globalTopicTotals = computeGlobalTopicTotals()
151149
this
152150
}
@@ -191,9 +189,7 @@ final class EMLDAOptimizer extends LDAOptimizer {
191189
// Update the vertex descriptors with the new counts.
192190
val newGraph = Graph(docTopicDistributions, graph.edges)
193191
graph = newGraph
194-
if (this.checkpointInterval != -1) {
195-
graphCheckpointer.update(newGraph)
196-
}
192+
graphCheckpointer.update(newGraph)
197193
globalTopicTotals = computeGlobalTopicTotals()
198194
this
199195
}
@@ -212,9 +208,7 @@ final class EMLDAOptimizer extends LDAOptimizer {
212208

213209
override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
214210
require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
215-
if (this.checkpointInterval != -1) {
216-
this.graphCheckpointer.deleteAllCheckpoints()
217-
}
211+
this.graphCheckpointer.deleteAllCheckpoints()
218212
// The constructor's default arguments assume gammaShape = 100 to ensure equivalence in
219213
// LDAModel.toLocal conversion
220214
new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, this.vocabSize,

mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ import org.apache.spark.storage.StorageLevel
5252
* - This class removes checkpoint files once later Datasets have been checkpointed.
5353
* However, references to the older Datasets will still return isCheckpointed = true.
5454
*
55-
* @param checkpointInterval Datasets will be checkpointed at this interval
55+
* @param checkpointInterval Datasets will be checkpointed at this interval.
56+
* If this interval was set as -1, then checkpointing will be disabled.
5657
* @param sc SparkContext for the Datasets given to this checkpointer
5758
* @tparam T Dataset type, such as RDD[Double]
5859
*/
@@ -89,7 +90,8 @@ private[mllib] abstract class PeriodicCheckpointer[T](
8990
updateCount += 1
9091

9192
// Handle checkpointing (after persisting)
92-
if ((updateCount % checkpointInterval) == 0 && sc.getCheckpointDir.nonEmpty) {
93+
if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
94+
&& sc.getCheckpointDir.nonEmpty) {
9395
// Add new checkpoint before removing old checkpoints.
9496
checkpoint(newData)
9597
checkpointQueue.enqueue(newData)

mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ import org.apache.spark.storage.StorageLevel
6969
* // checkpointed: graph4
7070
* }}}
7171
*
72-
* @param checkpointInterval Graphs will be checkpointed at this interval
72+
* @param checkpointInterval Graphs will be checkpointed at this interval.
73+
* If this interval was set as -1, then checkpointing will be disabled.
7374
* @tparam VD Vertex descriptor type
7475
* @tparam ED Edge descriptor type
7576
*

0 commit comments

Comments
 (0)