Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ object CheckpointReader extends Logging {

// Try to read the checkpoint files in the order
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
val compressionCodec = CompressionCodec.createCodec(conf)
var readError: Exception = null
checkpointFiles.foreach(file => {
logInfo("Attempting to load checkpoint from file " + file)
try {
Expand All @@ -330,13 +330,15 @@ object CheckpointReader extends Logging {
return Some(cp)
} catch {
case e: Exception =>
readError = e
logWarning("Error reading checkpoint from file " + file, e)
}
})

// If none of checkpoint files could be read, then throw exception
if (!ignoreReadError) {
throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath")
throw new SparkException(
s"Failed to read checkpoint from directory $checkpointPath", readError)
}
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.streaming.dstream

import java.io.{NotSerializableException, ObjectOutputStream}
import java.io.{NotSerializableException, ObjectInputStream, ObjectOutputStream}

import scala.collection.mutable.{ArrayBuffer, Queue}
import scala.reflect.ClassTag
Expand All @@ -37,8 +37,13 @@ class QueueInputDStream[T: ClassTag](

override def stop() { }

private def readObject(in: ObjectInputStream): Unit = {
throw new NotSerializableException("queueStream doesn't support checkpointing. " +
"Please don't use queueStream when checkpointing is enabled.")
}

private def writeObject(oos: ObjectOutputStream): Unit = {
throw new NotSerializableException("queueStream doesn't support checkpointing")
logWarning("queueStream doesn't support checkpointing")
}

override def compute(validTime: Time): Option[RDD[T]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._

import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -726,16 +726,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
}

test("queueStream doesn't support checkpointing") {
val checkpointDir = Utils.createTempDir()
ssc = new StreamingContext(master, appName, batchDuration)
val rdd = ssc.sparkContext.parallelize(1 to 10)
ssc.queueStream[Int](Queue(rdd)).print()
ssc.checkpoint(checkpointDir.getAbsolutePath)
val e = intercept[NotSerializableException] {
ssc.start()
val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
def creatingFunction(): StreamingContext = {
val _ssc = new StreamingContext(conf, batchDuration)
val rdd = _ssc.sparkContext.parallelize(1 to 10)
_ssc.checkpoint(checkpointDirectory)
_ssc.queueStream[Int](Queue(rdd)).register()
_ssc
}
ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
ssc.start()
eventually(timeout(10000 millis)) {
assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
}
ssc.stop()
val e = intercept[SparkException] {
ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
}
// StreamingContext.validate changes the message, so use "contains" here
assert(e.getMessage.contains("queueStream doesn't support checkpointing"))
assert(e.getCause.getMessage.contains("queueStream doesn't support checkpointing. " +
"Please don't use queueStream when checkpointing is enabled."))
}

def addInputStream(s: StreamingContext): DStream[Int] = {
Expand Down