@@ -31,7 +31,8 @@ class CheckpointManager extends Logging {
3131 /** Keys of RDD partitions that are being checkpointed. */
3232 private val checkpointingRDDPartitions = new mutable.HashSet [RDDBlockId ]
3333
34- /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
34+ /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is about to be
35+ * checkpointed. */
3536 def getOrCompute [T : ClassTag ](
3637 rdd : RDD [T ],
3738 checkpointData : ReliableRDDCheckpointData [T ],
@@ -49,7 +50,7 @@ class CheckpointManager extends Logging {
4950 } else {
5051 // Acquire a lock for loading this partition
5152 // If another thread already holds the lock, wait for it to finish return its results
52- val checkpoint = acquireLockForPartition[T ](key, path, conf , context)
53+ val checkpoint = acquireLockForPartition[T ](rdd, partition, key , context)
5354 if (checkpoint.isDefined) {
5455 return new InterruptibleIterator [T ](context, checkpoint.get)
5556 }
@@ -61,7 +62,7 @@ class CheckpointManager extends Logging {
6162 val computedValues = rdd.computeOrReadCache(partition, context)
6263 ReliableCheckpointRDD .writeCheckpointFile(
6364 context, computedValues, checkpointData.cpDir, conf, partition.index)
64- ReliableCheckpointRDD .readCheckpointFile(path, conf , context)
65+ rdd.computeOrReadCache(partition , context)
6566 } finally {
6667 checkpointingRDDPartitions.synchronized {
6768 checkpointingRDDPartitions.remove(key)
@@ -78,8 +79,10 @@ class CheckpointManager extends Logging {
7879 * thread.
7980 */
8081 private def acquireLockForPartition [T ](
81- id : RDDBlockId , path : Path , conf : Configuration , context : TaskContext ): Option [Iterator [T ]] =
82- {
82+ rdd : RDD [T ],
83+ partition : Partition ,
84+ id : RDDBlockId ,
85+ context : TaskContext ): Option [Iterator [T ]] = {
8386 checkpointingRDDPartitions.synchronized {
8487 if (! checkpointingRDDPartitions.contains(id)) {
8588 // If the partition is free, acquire its lock to compute its value
@@ -94,7 +97,7 @@ class CheckpointManager extends Logging {
9497 logInfo(s " Finished waiting for $id" )
9598 }
9699 }
97- Some (ReliableCheckpointRDD .readCheckpointFile(path, conf , context))
100+ Some (rdd.computeOrReadCache(partition , context))
98101 }
99102
100103}
0 commit comments