Skip to content

Commit dbff389

Browse files
zsxwingtdas
authored andcommitted
[SPARK-10071] [STREAMING] Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream
Output a warning when serializing QueueInputDStream rather than throwing an exception to allow unit tests use it. Moreover, this PR also throws an better exception when deserializing QueueInputDStream to make the user find out the problem easily. The previous exception is hard to understand: https://issues.apache.org/jira/browse/SPARK-8553 Author: zsxwing <[email protected]> Closes #8624 from zsxwing/SPARK-10071 and squashes the following commits: 847cfa8 [zsxwing] Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream (cherry picked from commit 820913f) Signed-off-by: Tathagata Das <[email protected]>
1 parent 8f82bb4 commit dbff389

File tree

3 files changed

+30
-13
lines changed

3 files changed

+30
-13
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ object CheckpointReader extends Logging {
307307

308308
// Try to read the checkpoint files in the order
309309
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
310-
val compressionCodec = CompressionCodec.createCodec(conf)
310+
var readError: Exception = null
311311
checkpointFiles.foreach(file => {
312312
logInfo("Attempting to load checkpoint from file " + file)
313313
try {
@@ -318,13 +318,15 @@ object CheckpointReader extends Logging {
318318
return Some(cp)
319319
} catch {
320320
case e: Exception =>
321+
readError = e
321322
logWarning("Error reading checkpoint from file " + file, e)
322323
}
323324
})
324325

325326
// If none of checkpoint files could be read, then throw exception
326327
if (!ignoreReadError) {
327-
throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath")
328+
throw new SparkException(
329+
s"Failed to read checkpoint from directory $checkpointPath", readError)
328330
}
329331
None
330332
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.streaming.dstream
1919

20-
import java.io.{NotSerializableException, ObjectOutputStream}
20+
import java.io.{NotSerializableException, ObjectInputStream, ObjectOutputStream}
2121

2222
import scala.collection.mutable.{ArrayBuffer, Queue}
2323
import scala.reflect.ClassTag
@@ -37,8 +37,13 @@ class QueueInputDStream[T: ClassTag](
3737

3838
override def stop() { }
3939

40+
private def readObject(in: ObjectInputStream): Unit = {
41+
throw new NotSerializableException("queueStream doesn't support checkpointing. " +
42+
"Please don't use queueStream when checkpointing is enabled.")
43+
}
44+
4045
private def writeObject(oos: ObjectOutputStream): Unit = {
41-
throw new NotSerializableException("queueStream doesn't support checkpointing")
46+
logWarning("queueStream doesn't support checkpointing")
4247
}
4348

4449
override def compute(validTime: Time): Option[RDD[T]] = {

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.scalatest.concurrent.Timeouts
3030
import org.scalatest.exceptions.TestFailedDueToTimeoutException
3131
import org.scalatest.time.SpanSugar._
3232

33-
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
33+
import org.apache.spark._
3434
import org.apache.spark.metrics.MetricsSystem
3535
import org.apache.spark.metrics.source.Source
3636
import org.apache.spark.storage.StorageLevel
@@ -690,16 +690,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
690690
}
691691

692692
test("queueStream doesn't support checkpointing") {
693-
val checkpointDir = Utils.createTempDir()
694-
ssc = new StreamingContext(master, appName, batchDuration)
695-
val rdd = ssc.sparkContext.parallelize(1 to 10)
696-
ssc.queueStream[Int](Queue(rdd)).print()
697-
ssc.checkpoint(checkpointDir.getAbsolutePath)
698-
val e = intercept[NotSerializableException] {
699-
ssc.start()
693+
val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
694+
def creatingFunction(): StreamingContext = {
695+
val _ssc = new StreamingContext(conf, batchDuration)
696+
val rdd = _ssc.sparkContext.parallelize(1 to 10)
697+
_ssc.checkpoint(checkpointDirectory)
698+
_ssc.queueStream[Int](Queue(rdd)).register()
699+
_ssc
700+
}
701+
ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
702+
ssc.start()
703+
eventually(timeout(10000 millis)) {
704+
assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
705+
}
706+
ssc.stop()
707+
val e = intercept[SparkException] {
708+
ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
700709
}
701710
// StreamingContext.validate changes the message, so use "contains" here
702-
assert(e.getMessage.contains("queueStream doesn't support checkpointing"))
711+
assert(e.getCause.getMessage.contains("queueStream doesn't support checkpointing. " +
712+
"Please don't use queueStream when checkpointing is enabled."))
703713
}
704714

705715
def addInputStream(s: StreamingContext): DStream[Int] = {

0 commit comments

Comments
 (0)