diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ec49d0f42d12..0a24eb332a2a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -204,6 +204,8 @@ class StreamingContext private[streaming] ( private var shutdownHookRef: AnyRef = _ + conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint) + /** * Return the associated Spark context */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 56b4ce5638a5..5cf7b84a2778 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -112,6 +112,15 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10) } + test("checkPoint from conf") { + val checkpointDirectory = Utils.createTempDir().getAbsolutePath() + + val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) + myConf.set("spark.streaming.checkpoint.directory", checkpointDirectory) + val ssc = new StreamingContext(myConf, batchDuration) + assert(ssc.checkpointDir != null) + } + test("state matching") { import StreamingContextState._ assert(INITIALIZED === INITIALIZED)