Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4ac1896
Refactor checkpoint interface for modularity
Jul 7, 2015
8447454
Fix tests
Jul 7, 2015
2e902e5
First implementation of local checkpointing
Jul 7, 2015
0477eec
Rename a few methods with awkward names (minor)
Jul 7, 2015
4514dc9
Clean local checkpoint files through RDD cleanups
Jul 7, 2015
4dbbab1
Refactor CheckpointSuite to test local checkpointing
Jul 7, 2015
2e59646
Add local checkpoint clean up tests
Jul 8, 2015
56831c5
Add a few warnings and clear exception messages
Jul 8, 2015
e53d964
Fix style
Jul 8, 2015
172cb66
Fix mima?
Jul 8, 2015
d096c67
Fix mima
Jul 8, 2015
4880deb
Fix style
Jul 8, 2015
e4cf071
Simplify LocalCheckpointRDD + docs + clean ups
Jul 9, 2015
53b363b
Rename a few more awkwardly named methods (minor)
Jul 9, 2015
4a182f3
Add fine-grained tests for local checkpointing
Jul 9, 2015
c449b38
Fix style
Jul 9, 2015
87d43c6
Merge branch 'master' of github.com:apache/spark into local-checkpoint
Jul 13, 2015
db70dc2
Express local checkpointing through caching the original RDD
Jul 14, 2015
62aba3f
Merge branch 'master' of github.com:apache/spark into local-checkpoint
Jul 14, 2015
48a9996
Avoid traversing dependency tree + rewrite tests
Jul 15, 2015
1bbe154
Simplify LocalCheckpointRDD
Jul 15, 2015
4eb6eb1
Merge branch 'master' of github.com:apache/spark into local-checkpoint
Jul 15, 2015
e58e3e3
Merge branch 'master' of github.com:apache/spark into local-checkpoint
Jul 15, 2015
a92657d
Update a few comments
Jul 16, 2015
f5be0f3
Use MEMORY_AND_DISK as the default local checkpoint level
Jul 16, 2015
e908a42
Fix tests
Jul 17, 2015
33f167a
Merge branch 'master' of github.com:apache/spark into local-checkpoint
Jul 28, 2015
c2e111b
Address comments
Jul 28, 2015
ab003a3
Fix compile
Jul 28, 2015
bf846a6
Merge branch 'master' of github.com:apache/spark into local-checkpoint
Aug 2, 2015
3be5aea
Address comments
Aug 2, 2015
e43bbb6
Merge branch 'master' of github.com:apache/spark into local-checkpoint
Aug 2, 2015
34bc059
Avoid computing all partitions in local checkpoint
Aug 2, 2015
729600f
Oops, fix tests
Aug 3, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.ref.{ReferenceQueue, WeakReference}
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{RDDCheckpointData, RDD}
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -231,11 +231,14 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}

/** Perform checkpoint cleanup. */
/**
* Clean up checkpoint files written to a reliable storage.
* Locally checkpointed files are cleaned up separately through RDD cleanups.
*/
def doCleanCheckpoint(rddId: Int): Unit = {
try {
logDebug("Cleaning rdd checkpoint data " + rddId)
RDDCheckpointData.clearRDDCheckpointData(sc, rddId)
ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
listeners.foreach(_.checkpointCleaned(rddId))
logInfo("Cleaned rdd checkpoint data " + rddId)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withScope {
new CheckpointRDD[T](this, path)
new ReliableCheckpointRDD[T](this, path)
}

/** Build the union of a list of RDDs. */
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ object TaskContext {
* Unset the thread local TaskContext. Internal to Spark.
*/
protected[spark] def unset(): Unit = taskContext.remove()

/**
* Return an empty task context that is not actually used.
* Internal use only.
*/
private[spark] def empty(): TaskContext = {
new TaskContextImpl(0, 0, 0, 0, null, null)
}
}


Expand Down
153 changes: 14 additions & 139 deletions core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,156 +17,31 @@

package org.apache.spark.rdd

import java.io.IOException

import scala.reflect.ClassTag

import org.apache.hadoop.fs.Path

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.{Partition, SparkContext, TaskContext}

/**
* An RDD partition used to recover checkpointed data.
*/
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition

/**
* This RDD represents a RDD checkpoint file (similar to HadoopRDD).
* An RDD that recovers checkpointed data from storage.
*/
private[spark]
class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
private[spark] abstract class CheckpointRDD[T: ClassTag](@transient sc: SparkContext)
extends RDD[T](sc, Nil) {

private val broadcastedConf = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))

@transient private val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)

override def getCheckpointFile: Option[String] = Some(checkpointPath)

override def getPartitions: Array[Partition] = {
val cpath = new Path(checkpointPath)
val numPartitions =
// listStatus can throw exception if path does not exist.
if (fs.exists(cpath)) {
val dirContents = fs.listStatus(cpath).map(_.getPath)
val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
val numPart = partitionFiles.length
if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
}
numPart
} else 0

Array.tabulate(numPartitions)(i => new CheckpointRDDPartition(i))
}

override def getPreferredLocations(split: Partition): Seq[String] = {
val status = fs.getFileStatus(new Path(checkpointPath,
CheckpointRDD.splitIdToFile(split.index)))
val locations = fs.getFileBlockLocations(status, 0, status.getLen)
locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
}

override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
CheckpointRDD.readFromFile(file, broadcastedConf, context)
}

// CheckpointRDD should not be checkpointed again
override def checkpoint(): Unit = { }
override def doCheckpoint(): Unit = { }
}

private[spark] object CheckpointRDD extends Logging {
def splitIdToFile(splitId: Int): String = {
"part-%05d".format(splitId)
}

def writeToFile[T: ClassTag](
path: String,
broadcastedConf: Broadcast[SerializableConfiguration],
blockSize: Int = -1
)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
val fs = outputDir.getFileSystem(broadcastedConf.value.value)

val finalOutputName = splitIdToFile(ctx.partitionId)
val finalOutputPath = new Path(outputDir, finalOutputName)
val tempOutputPath =
new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptNumber)

if (fs.exists(tempOutputPath)) {
throw new IOException("Checkpoint failed: temporary path " +
tempOutputPath + " already exists")
}
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)

val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
} else {
// This is mainly for testing purpose
fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
Utils.tryWithSafeFinally {
serializeStream.writeAll(iterator)
} {
serializeStream.close()
}

if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
logInfo("Deleting tempOutputPath " + tempOutputPath)
fs.delete(tempOutputPath, false)
throw new IOException("Checkpoint failed: failed to save output of task: "
+ ctx.attemptNumber + " and final output path does not exist")
} else {
// Some other copy of this task must've finished before us and renamed it
logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
fs.delete(tempOutputPath, false)
}
}
}

def readFromFile[T](
path: Path,
broadcastedConf: Broadcast[SerializableConfiguration],
context: TaskContext
): Iterator[T] = {
val env = SparkEnv.get
val fs = path.getFileSystem(broadcastedConf.value.value)
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => deserializeStream.close())

deserializeStream.asIterator.asInstanceOf[Iterator[T]]
}
override def checkpoint(): Unit = { }
override def localCheckpoint(): this.type = this

// Test whether CheckpointRDD generate expected number of partitions despite
// each split file having multiple blocks. This needs to be run on a
// cluster (mesos or standalone) using HDFS.
def main(args: Array[String]) {
import org.apache.spark._
// Note: There is a bug in MiMa that complains about `AbstractMethodProblem`s in the
// base [[org.apache.spark.rdd.RDD]] class if we do not override the following methods.
// scalastyle:off
protected override def getPartitions: Array[Partition] = ???
override def compute(p: Partition, tc: TaskContext): Iterator[T] = ???
// scalastyle:on

val Array(cluster, hdfsPath) = args
val env = SparkEnv.get
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val fs = path.getFileSystem(conf)
val broadcastedConf = sc.broadcast(new SerializableConfiguration(conf))
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
fs.delete(path, true)
}
}
67 changes: 67 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark.{Partition, SparkContext, SparkEnv, SparkException, TaskContext}
import org.apache.spark.storage.RDDBlockId

/**
* A dummy CheckpointRDD that exists to provide informative error messages during failures.
*
* This is simply a placeholder because the original checkpointed RDD is expected to be
* fully cached. Only if an executor fails or if the user explicitly unpersists the original
* RDD will Spark ever attempt to compute this CheckpointRDD. When this happens, however,
* we must provide an informative error message.
*
* @param sc the active SparkContext
* @param rddId the ID of the checkpointed RDD
* @param numPartitions the number of partitions in the checkpointed RDD
*/
private[spark] class LocalCheckpointRDD[T: ClassTag](
@transient sc: SparkContext,
rddId: Int,
numPartitions: Int)
extends CheckpointRDD[T](sc) {

def this(rdd: RDD[T]) {
this(rdd.context, rdd.id, rdd.partitions.size)
}

protected override def getPartitions: Array[Partition] = {
(0 until numPartitions).toArray.map { i => new CheckpointRDDPartition(i) }
}

/**
* Throw an exception indicating that the relevant block is not found.
*
* This should only be called if the original RDD is explicitly unpersisted or if an
* executor is lost. Under normal circumstances, however, the original RDD (our child)
* is expected to be fully cached and so all partitions should already be computed and
* available in the block storage.
*/
override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
throw new SparkException(
s"Checkpoint block ${RDDBlockId(rddId, partition.index)} not found! Either the executor " +
s"that originally checkpointed this partition is no longer alive, or the original RDD is " +
s"unpersisted. If this problem persists, you may consider using `rdd.checkpoint()` " +
s"instead, which is slower than local checkpointing but more fault-tolerant.")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark.{Logging, SparkEnv, SparkException, TaskContext}
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.Utils

/**
* An implementation of checkpointing implemented on top of Spark's caching layer.
*
* Local checkpointing trades off fault tolerance for performance by skipping the expensive
* step of saving the RDD data to a reliable and fault-tolerant storage. Instead, the data
* is written to the local, ephemeral block storage that lives in each executor. This is useful
* for use cases where RDDs build up long lineages that need to be truncated often (e.g. GraphX).
*/
private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {

/**
* Ensure the RDD is fully cached so the partitions can be recovered later.
*/
protected override def doCheckpoint(): CheckpointRDD[T] = {
val level = rdd.getStorageLevel

// Assume storage level uses disk; otherwise memory eviction may cause data loss
assume(level.useDisk, s"Storage level $level is not appropriate for local checkpointing")

// Not all actions compute all partitions of the RDD (e.g. take). For correctness, we
// must cache any missing partitions. TODO: avoid running another job here (SPARK-8582).
val action = (tc: TaskContext, iterator: Iterator[T]) => Utils.getIteratorSize(iterator)
val missingPartitionIndices = rdd.partitions.map(_.index).filter { i =>
!SparkEnv.get.blockManager.master.contains(RDDBlockId(rdd.id, i))
}
if (missingPartitionIndices.nonEmpty) {
rdd.sparkContext.runJob(rdd, action, missingPartitionIndices)
}

new LocalCheckpointRDD[T](rdd)
}

}

private[spark] object LocalRDDCheckpointData {

val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK

/**
* Transform the specified storage level to one that uses disk.
*
* This guarantees that the RDD can be recomputed multiple times correctly as long as
* executors do not fail. Otherwise, if the RDD is cached in memory only, for instance,
* the checkpoint data will be lost if the relevant block is evicted from memory.
*
* This method is idempotent.
*/
def transformStorageLevel(level: StorageLevel): StorageLevel = {
// If this RDD is to be cached off-heap, fail fast since we cannot provide any
// correctness guarantees about subsequent computations after the first one
if (level.useOffHeap) {
throw new SparkException("Local checkpointing is not compatible with off-heap caching.")
}

StorageLevel(useDisk = true, level.useMemory, level.deserialized, level.replication)
}
}
Loading