From 204814ea2be868257b32f686e1455254f5d60582 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 8 Apr 2015 15:30:48 -0700 Subject: [PATCH 1/5] Added StreamingContext.getOrCreate with existing SparkContext --- .../apache/spark/streaming/Checkpoint.scala | 27 ++- .../spark/streaming/StreamingContext.scala | 76 ++++++++- .../spark/streaming/CheckpointSuite.scala | 3 +- .../streaming/StreamingContextSuite.scala | 159 +++++++++++++++++- 4 files changed, 252 insertions(+), 13 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index f73b463d07779..c0c8ad873fbac 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -77,7 +77,8 @@ object Checkpoint extends Logging { } /** Get checkpoint files present in the give directory, ordered by oldest-first */ - def getCheckpointFiles(checkpointDir: String, fs: FileSystem): Seq[Path] = { + def getCheckpointFiles(checkpointDir: String, fsOption: Option[FileSystem] = None): Seq[Path] = { + def sortFunc(path1: Path, path2: Path): Boolean = { val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } @@ -85,6 +86,7 @@ object Checkpoint extends Logging { } val path = new Path(checkpointDir) + val fs = fsOption.getOrElse(path.getFileSystem(new Configuration())) if (fs.exists(path)) { val statuses = fs.listStatus(path) if (statuses != null) { @@ -157,7 +159,7 @@ class CheckpointWriter( } // Delete old checkpoint files - val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs) + val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)) if (allCheckpointFiles.size > 10) { allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => { logInfo("Deleting " + file) @@ -229,15 +231,24 @@ class CheckpointWriter( private[streaming] object CheckpointReader extends Logging { - def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = - { + /** + * Read checkpoint files present in the given checkpoint directory. If there are no checkpoint + * files, then return None, else try to return the latest valid checkpoint object. If no + * checkpoint files could be read correctly, then return None (if ignoreReadError = true), + * or throw exception (if ignoreReadError = false). + */ + def read( + checkpointDir: String, + conf: SparkConf, + hadoopConf: Configuration, + ignoreReadError: Boolean = false): Option[Checkpoint] = { val checkpointPath = new Path(checkpointDir) // TODO(rxin): Why is this a def?! def fs = checkpointPath.getFileSystem(hadoopConf) // Try to find the checkpoint files - val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse + val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).reverse if (checkpointFiles.isEmpty) { return None } @@ -270,8 +281,12 @@ object CheckpointReader extends Logging { } }) + // If none of checkpoint files could be read, then throw exception - throw new SparkException("Failed to read checkpoint from directory " + checkpointPath) + if (!ignoreReadError) { + throw new SparkException("Failed to read checkpoint from directory " + checkpointPath) + } + None } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index f57f295874645..0ea8fefed5982 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -107,6 +107,15 @@ class StreamingContext private[streaming] ( */ def this(path: String) = this(path, new Configuration) + + def this(path: String, sparkContext: SparkContext) = { + this( + sparkContext, + CheckpointReader.read(path, new SparkConf(), sparkContext.hadoopConfiguration).get, + null) + } + + if (sc_ == null && cp_ == null) { throw new Exception("Spark Streaming cannot be initialized with " + "both SparkContext and checkpoint as null") @@ -114,11 +123,15 @@ class StreamingContext private[streaming] ( private[streaming] val isCheckpointPresent = (cp_ != null) + private[streaming] val isSparkContextPresent = (sc_ != null) + private[streaming] val sc: SparkContext = { - if (isCheckpointPresent) { + if (isSparkContextPresent) { + sc_ + } else if (isCheckpointPresent) { new SparkContext(cp_.createSparkConf()) } else { - sc_ + throw new SparkException("Cannot create StreamingContext without a SparkContext") } } @@ -129,7 +142,7 @@ class StreamingContext private[streaming] ( private[streaming] val conf = sc.conf - private[streaming] val env = SparkEnv.get + private[streaming] val env = sc.env private[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { @@ -174,7 +187,9 @@ class StreamingContext private[streaming] ( /** Register streaming source to metrics system */ private val streamingSource = new StreamingSource(this) - SparkEnv.get.metricsSystem.registerSource(streamingSource) + assert(env != null) + assert(env.metricsSystem != null) + env.metricsSystem.registerSource(streamingSource) /** Enumeration to identify current state of the StreamingContext */ private[streaming] object StreamingContextState extends Enumeration { @@ -621,6 +636,7 @@ object StreamingContext extends Logging { hadoopConf: Configuration = new Configuration(), createOnError: Boolean = false ): StreamingContext = { + /* val checkpointOption = try { CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf) } catch { @@ -632,6 +648,58 @@ object StreamingContext extends Logging { } } checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) + */ + val checkpointOption = CheckpointReader.read( + checkpointPath, new SparkConf(), hadoopConf, createOnError) + checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) + } + + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note + * that the SparkConf configuration in the checkpoint data will not be restored as the + * SparkContext has already been created. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param creatingFunc Function to create a new StreamingContext using the given SparkContext + * @param sparkContext SparkContext using which the StreamingContext will be created + */ + def getOrCreate( + checkpointPath: String, + creatingFunc: SparkContext => StreamingContext, + sparkContext: SparkContext + ): StreamingContext = { + getOrCreate(checkpointPath, creatingFunc, sparkContext, createOnError = false) + } + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note + * that the SparkConf configuration in the checkpoint data will not be restored as the + * SparkContext has already been created. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param creatingFunc Function to create a new StreamingContext using the given SparkContext + * @param sparkContext SparkContext using which the StreamingContext will be created + * @param createOnError Whether to create a new StreamingContext if there is an + * error in reading checkpoint data. By default, an exception will be + * thrown on error. + */ + def getOrCreate( + checkpointPath: String, + creatingFunc: SparkContext => StreamingContext, + sparkContext: SparkContext, + createOnError: Boolean + ): StreamingContext = { + val checkpointOption = CheckpointReader.read( + checkpointPath, sparkContext.conf, sparkContext.hadoopConfiguration, createOnError) + checkpointOption.map(new StreamingContext(sparkContext, _, null)) + .getOrElse(creatingFunc(sparkContext)) } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 91a2b2bba461d..06ee0d16e5a8f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -406,9 +406,8 @@ class CheckpointSuite extends TestSuiteBase { assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3) } // Wait for a checkpoint to be written - val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration) eventually(eventuallyTimeout) { - assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 6) + assert(Checkpoint.getCheckpointFiles(checkpointDir).size === 6) } ssc.stop() // Check that we shut down while the third batch was being processed diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 2e5005ef6ff14..abe1c0d79f75f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -17,8 +17,10 @@ package org.apache.spark.streaming +import java.io.File import java.util.concurrent.atomic.AtomicInteger +import org.apache.commons.io.FileUtils import org.scalatest.{Assertions, BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Timeouts import org.scalatest.concurrent.Eventually._ @@ -53,7 +55,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w sc = null } } - +/* test("from no conf constructor") { ssc = new StreamingContext(master, appName, batchDuration) assert(ssc.sparkContext.conf.get("spark.master") === master) @@ -327,6 +329,138 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(ssc.awaitTerminationOrTimeout(10000) === true) } } +*/ + test("getOrCreate") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + conf.set("newContext", "true") // to identify the context as new + + // Function to create StreamingContext that has a config to identify it to be new context + def creatingFunction(): StreamingContext = { + new StreamingContext(conf, batchDuration) + } + + // Call ssc.stop after a body of code + def withSscStop(body: => Unit): Unit = { + try { + body + } finally { + if (ssc != null) { + ssc.stop() + } + ssc = null + } + } + + val emptyPath = Utils.createTempDir().getAbsolutePath() + + // getOrCreate should create new context with empty path + withSscStop { + ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _) + assert(ssc != null, "no context created") + assert(ssc.conf.getBoolean("newContext", false), "new context not created") + } + + val corrutedCheckpointPath = createCorruptedCheckpoint() + + // getOrCreate should throw exception with fake checkpoint file and createOnError = false + intercept[Exception] { + ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _) + } + + // getOrCreate should throw exception with fake checkpoint file + intercept[Exception] { + ssc = StreamingContext.getOrCreate( + corrutedCheckpointPath, creatingFunction _, createOnError = false) + } + + // getOrCreate should create new context with fake checkpoint file and createOnError = true + withSscStop { + ssc = StreamingContext.getOrCreate( + corrutedCheckpointPath, creatingFunction _, createOnError = true) + assert(ssc != null, "no context created") + assert(ssc.conf.getBoolean("newContext", false), "new context not created") + } + + val checkpointPath = createValidCheckpoint() + + // getOrCreate should recover context with checkpoint path + withSscStop { + ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _) + assert(ssc != null, "no context created") + assert(!ssc.conf.contains("newContext"), "old context not recovered") + } + } + + test("getOrCreate with existing SparkContext") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + conf.set("newContext", "true") // to identify the context as new + sc = new SparkContext(conf) + + // Function to create StreamingContext that has a config to identify it to be new context + def creatingFunction(sparkContext: SparkContext): StreamingContext = { + // context with no output ops + new StreamingContext(sparkContext, batchDuration) + } + + // Call ssc.stop(stopSparkContext = false) after a body of cody + def withSscStop(body: => Unit): Unit = { + try { + body + } finally { + if (ssc != null) { + ssc.stop(stopSparkContext = false) + } + ssc = null + } + } + + val emptyPath = Utils.createTempDir().getAbsolutePath() + + // getOrCreate should create new context with empty path + withSscStop { + ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true) + assert(ssc != null, "no context created") + assert(ssc.conf.getBoolean("newContext", false), + "new StreamingContext does not use existing SparkContext") + assert(ssc.graph.getOutputStreams().isEmpty, // number of output ops in new context = 0 + "created StreamingContext is not new!") + } + + val corrutedCheckpointPath = createCorruptedCheckpoint() + + // getOrCreate should throw exception with fake checkpoint file and createOnError = false + intercept[Exception] { + ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _, sc) + } + + // getOrCreate should throw exception with fake checkpoint file + intercept[Exception] { + ssc = StreamingContext.getOrCreate( + corrutedCheckpointPath, creatingFunction _, sc, createOnError = false) + } + + // getOrCreate should create new context with fake checkpoint file and createOnError = true + withSscStop { + ssc = StreamingContext.getOrCreate( + corrutedCheckpointPath, creatingFunction _, sc, createOnError = true) + assert(ssc != null, "no context created") + assert(ssc.conf.getBoolean("newContext", false), + "new StreamingContext does not use existing SparkContext") + assert(ssc.graph.getOutputStreams().isEmpty, // number of output ops in new context = 0 + "created StreamingContext is not new") } + + val checkpointPath = createValidCheckpoint() + + // StreamingContext.getOrCreate should recover context with checkpoint path + withSscStop { + ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc) + assert(ssc != null, "no context created") + assert(ssc.conf.getBoolean("newContext", false), + "recovered StreamingContext does not use existing SparkContext") + assert(ssc.graph.getOutputStreams().nonEmpty, + "new StreamingContext created instead of recovering old one") + } + } test("DStream and generated RDD creation sites") { testPackage.test() @@ -337,6 +471,29 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w val inputStream = new TestInputStream(s, input, 1) inputStream } + + def createValidCheckpoint(): String = { + val testDirectory = Utils.createTempDir().getAbsolutePath() + val checkpointDirectory = Utils.createTempDir().getAbsolutePath() + val conf = new SparkConf().setMaster(master).setAppName(appName) + ssc = new StreamingContext(conf, batchDuration) + ssc.checkpoint(checkpointDirectory) + ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() } + ssc.start() + eventually(timeout(10000 millis)) { + assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1) + } + ssc.stop() + checkpointDirectory + } + + def createCorruptedCheckpoint(): String = { + val checkpointDirectory = Utils.createTempDir().getAbsolutePath() + val fakeCheckpointFile = Checkpoint.checkpointFile(checkpointDirectory, Time(1000)) + FileUtils.write(new File(fakeCheckpointFile.toString()), "blablabla") + assert(Checkpoint.getCheckpointFiles(checkpointDirectory).nonEmpty) + checkpointDirectory + } } class TestException(msg: String) extends Exception(msg) From 36a782356e9fc032a0d6a42251ae82e2af25aeaa Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 8 Apr 2015 15:32:02 -0700 Subject: [PATCH 2/5] Minor changes. --- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 3 +-- .../org/apache/spark/streaming/StreamingContextSuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index c0c8ad873fbac..d8ff3f4a6f544 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -233,7 +233,7 @@ object CheckpointReader extends Logging { /** * Read checkpoint files present in the given checkpoint directory. If there are no checkpoint - * files, then return None, else try to return the latest valid checkpoint object. If no + * files, then return None, else try to return the latest valid checkpoint object. If no * checkpoint files could be read correctly, then return None (if ignoreReadError = true), * or throw exception (if ignoreReadError = false). */ @@ -281,7 +281,6 @@ object CheckpointReader extends Logging { } }) - // If none of checkpoint files could be read, then throw exception if (!ignoreReadError) { throw new SparkException("Failed to read checkpoint from directory " + checkpointPath) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index abe1c0d79f75f..10132961d12f2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -55,7 +55,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w sc = null } } -/* + test("from no conf constructor") { ssc = new StreamingContext(master, appName, batchDuration) assert(ssc.sparkContext.conf.get("spark.master") === master) @@ -329,7 +329,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(ssc.awaitTerminationOrTimeout(10000) === true) } } -*/ + test("getOrCreate") { val conf = new SparkConf().setMaster(master).setAppName(appName) conf.set("newContext", "true") // to identify the context as new @@ -361,7 +361,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } val corrutedCheckpointPath = createCorruptedCheckpoint() - + // getOrCreate should throw exception with fake checkpoint file and createOnError = false intercept[Exception] { ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _) From eabd092fd794e50c67c82a926b44b173a8dfc5e6 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 15 Apr 2015 18:14:11 -0700 Subject: [PATCH 3/5] Added Function0, Java API and unit tests for StreamingContext.getOrCreate --- .../spark/api/java/function/Function0.java | 27 ++++ .../spark/streaming/StreamingContext.scala | 13 -- .../api/java/JavaStreamingContext.scala | 119 ++++++++++++++- .../apache/spark/streaming/JavaAPISuite.java | 140 +++++++++++++----- 4 files changed, 247 insertions(+), 52 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/api/java/function/Function0.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function0.java b/core/src/main/java/org/apache/spark/api/java/function/Function0.java new file mode 100644 index 0000000000000..38e410c5debe6 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/java/function/Function0.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function; + +import java.io.Serializable; + +/** + * A zero-argument function that returns an R. + */ +public interface Function0 extends Serializable { + public R call() throws Exception; +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 0ea8fefed5982..eb8a9b55e8321 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -636,19 +636,6 @@ object StreamingContext extends Logging { hadoopConf: Configuration = new Configuration(), createOnError: Boolean = false ): StreamingContext = { - /* - val checkpointOption = try { - CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf) - } catch { - case e: Exception => - if (createOnError) { - None - } else { - throw e - } - } - checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) - */ val checkpointOption = CheckpointReader.read( checkpointPath, new SparkConf(), hadoopConf, createOnError) checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index e3db01c1e12c6..90ea7285ea1a8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -32,13 +32,14 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.api.java.function.{Function0 => JFunction0} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.StreamingListener -import org.apache.hadoop.conf.Configuration -import org.apache.spark.streaming.dstream.{PluggableInputDStream, ReceiverInputDStream, DStream} +import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver +import org.apache.hadoop.conf.Configuration /** * A Java-friendly version of [[org.apache.spark.streaming.StreamingContext]] which is the main @@ -655,6 +656,7 @@ object JavaStreamingContext { * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext */ + @deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0") def getOrCreate( checkpointPath: String, factory: JavaStreamingContextFactory @@ -676,6 +678,7 @@ object JavaStreamingContext { * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible * file system */ + @deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0") def getOrCreate( checkpointPath: String, hadoopConf: Configuration, @@ -700,6 +703,7 @@ object JavaStreamingContext { * @param createOnError Whether to create a new JavaStreamingContext if there is an * error in reading checkpoint data. */ + @deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0") def getOrCreate( checkpointPath: String, hadoopConf: Configuration, @@ -712,6 +716,117 @@ object JavaStreamingContext { new JavaStreamingContext(ssc) } + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. + * + * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program + * @param creatingFunc Function to create a new JavaStreamingContext + */ + def getOrCreate( + checkpointPath: String, + creatingFunc: JFunction0[JavaStreamingContext] + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + creatingFunc.call().ssc + }) + new JavaStreamingContext(ssc) + } + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param creatingFunc Function to create a new JavaStreamingContext + * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible + * file system + */ + def getOrCreate( + checkpointPath: String, + creatingFunc: JFunction0[JavaStreamingContext], + hadoopConf: Configuration + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + creatingFunc.call().ssc + }, hadoopConf) + new JavaStreamingContext(ssc) + } + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param creatingFunc Function to create a new JavaStreamingContext + * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible + * file system + * @param createOnError Whether to create a new JavaStreamingContext if there is an + * error in reading checkpoint data. + */ + def getOrCreate( + checkpointPath: String, + creatingFunc: JFunction0[JavaStreamingContext], + hadoopConf: Configuration, + createOnError: Boolean + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + creatingFunc.call().ssc + }, hadoopConf, createOnError) + new JavaStreamingContext(ssc) + } + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param creatingFunc Function to create a new JavaStreamingContext + * @param sparkContext SparkContext using which the StreamingContext will be created + */ + def getOrCreate( + checkpointPath: String, + creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext], + sparkContext: JavaSparkContext + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => { + creatingFunc.call(new JavaSparkContext(sparkContext)).ssc + }, sparkContext.sc) + new JavaStreamingContext(ssc) + } + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param creatingFunc Function to create a new JavaStreamingContext + * @param sparkContext SparkContext using which the StreamingContext will be created + * @param createOnError Whether to create a new JavaStreamingContext if there is an + * error in reading checkpoint data. + */ + def getOrCreate( + checkpointPath: String, + creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext], + sparkContext: JavaSparkContext, + createOnError: Boolean + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => { + creatingFunc.call(new JavaSparkContext(sparkContext)).ssc + }, sparkContext.sc, createOnError) + new JavaStreamingContext(ssc) + } + /** * Find the JAR from which a given class was loaded, to make it easy for users to pass * their JARs to StreamingContext. diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 90340753a4eed..cf931ac102ec7 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -45,6 +45,7 @@ import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.*; import org.apache.spark.util.Utils; +import org.apache.spark.SparkConf; // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; @@ -929,7 +930,7 @@ public void testPairMap() { // Maps pair -> pair of different type public Tuple2 call(Tuple2 in) throws Exception { return in.swap(); } - }); + }); JavaTestUtils.attachTestOutputStream(reversed); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -987,12 +988,12 @@ public void testPairMap2() { // Maps pair -> single JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); JavaDStream reversed = pairStream.map( - new Function, Integer>() { - @Override - public Integer call(Tuple2 in) throws Exception { - return in._2(); - } - }); + new Function, Integer>() { + @Override + public Integer call(Tuple2 in) throws Exception { + return in._2(); + } + }); JavaTestUtils.attachTestOutputStream(reversed); List> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1123,7 +1124,7 @@ public void testCombineByKey() { JavaPairDStream combined = pairStream.combineByKey( new Function() { - @Override + @Override public Integer call(Integer i) throws Exception { return i; } @@ -1144,14 +1145,14 @@ public void testCountByValue() { Arrays.asList("hello")); List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2("hello", 1L), - new Tuple2("world", 1L)), - Arrays.asList( - new Tuple2("hello", 1L), - new Tuple2("moon", 1L)), - Arrays.asList( - new Tuple2("hello", 1L))); + Arrays.asList( + new Tuple2("hello", 1L), + new Tuple2("world", 1L)), + Arrays.asList( + new Tuple2("hello", 1L), + new Tuple2("moon", 1L)), + Arrays.asList( + new Tuple2("hello", 1L))); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream counted = stream.countByValue(); @@ -1249,17 +1250,17 @@ public void testUpdateStateByKey() { JavaPairDStream updated = pairStream.updateStateByKey( new Function2, Optional, Optional>() { - @Override - public Optional call(List values, Optional state) { - int out = 0; - if (state.isPresent()) { - out = out + state.get(); - } - for (Integer v: values) { - out = out + v; + @Override + public Optional call(List values, Optional state) { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v : values) { + out = out + v; + } + return Optional.of(out); } - return Optional.of(out); - } }); JavaTestUtils.attachTestOutputStream(updated); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1292,17 +1293,17 @@ public void testUpdateStateByKeyWithInitial() { JavaPairDStream updated = pairStream.updateStateByKey( new Function2, Optional, Optional>() { - @Override - public Optional call(List values, Optional state) { - int out = 0; - if (state.isPresent()) { - out = out + state.get(); - } - for (Integer v: values) { - out = out + v; + @Override + public Optional call(List values, Optional state) { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v : values) { + out = out + v; + } + return Optional.of(out); } - return Optional.of(out); - } }, new HashPartitioner(1), initialRDD); JavaTestUtils.attachTestOutputStream(updated); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1328,7 +1329,7 @@ public void testReduceByKeyAndWindowWithInverse() { JavaPairDStream reduceWindowed = pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), - new Duration(2000), new Duration(1000)); + new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reduceWindowed); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1707,6 +1708,71 @@ public Integer call(String s) throws Exception { Utils.deleteRecursively(tempDir); } + @SuppressWarnings("unchecked") + @Test + public void testContextGetOrCreate() throws InterruptedException { + + final SparkConf conf = new SparkConf() + .setMaster("local[2]") + .setAppName("test") + .set("newContext", "true"); + + File emptyDir = Files.createTempDir(); + emptyDir.deleteOnExit(); + StreamingContextSuite contextSuite = new StreamingContextSuite(); + String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint(); + String checkpointDir = contextSuite.createValidCheckpoint(); + + // Function to create JavaStreamingContext without any output operations + // (used to detect the new context) + Function0 creatingFunc = new Function0() { + public JavaStreamingContext call() { + return new JavaStreamingContext(conf, Seconds.apply(1)); + } + }; + + ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc); + Assert.assertTrue("new context not created", + ssc.ssc().graph().getOutputStreams().length == 0); + ssc.stop(); + + ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc, + new org.apache.hadoop.conf.Configuration()); + Assert.assertFalse("old context not recovered", + ssc.ssc().graph().getOutputStreams().length == 0); + ssc.stop(); + + ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc, + new org.apache.hadoop.conf.Configuration(), true); + Assert.assertTrue("new context not created", + ssc.ssc().graph().getOutputStreams().length == 0); + ssc.stop(); + + // Function to create JavaStreamingContext using existing JavaSparkContext + // without any output operations (used to detect the new context) + Function creatingFunc2 = + new Function() { + public JavaStreamingContext call(JavaSparkContext context) { + return new JavaStreamingContext(context, Seconds.apply(1)); + } + }; + + JavaSparkContext sc = new JavaSparkContext(conf); + ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc); + Assert.assertTrue("new context not created", + ssc.ssc().graph().getOutputStreams().length == 0); + ssc.stop(false); + + ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc); + Assert.assertFalse("old context not recovered", + ssc.ssc().graph().getOutputStreams().length == 0); + ssc.stop(false); + + ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true); + Assert.assertTrue("new context not created", + ssc.ssc().graph().getOutputStreams().length == 0); + ssc.stop(); + } /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD @SuppressWarnings("unchecked") From 524f519ae69d7e1e70a637f826e8f0d859690aaf Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 22 Apr 2015 01:23:23 -0700 Subject: [PATCH 4/5] Many changes based on PR comments. --- .../apache/spark/streaming/Checkpoint.scala | 2 +- .../spark/streaming/StreamingContext.scala | 12 ++-- .../api/java/JavaStreamingContext.scala | 4 +- .../apache/spark/streaming/JavaAPISuite.java | 41 ++++++++------ .../streaming/StreamingContextSuite.scala | 55 ++++++++++--------- 5 files changed, 61 insertions(+), 53 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index d8ff3f4a6f544..2d62978e2b651 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -283,7 +283,7 @@ object CheckpointReader extends Logging { // If none of checkpoint files could be read, then throw exception if (!ignoreReadError) { - throw new SparkException("Failed to read checkpoint from directory " + checkpointPath) + throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath") } None } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index eb8a9b55e8321..90c8b47aebce0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -107,11 +107,15 @@ class StreamingContext private[streaming] ( */ def this(path: String) = this(path, new Configuration) - + /** + * Recreate a StreamingContext from a checkpoint file using an existing SparkContext. + * @param path Path to the directory that was specified as the checkpoint directory + * @param sparkContext Existing SparkContext + */ def this(path: String, sparkContext: SparkContext) = { this( sparkContext, - CheckpointReader.read(path, new SparkConf(), sparkContext.hadoopConfiguration).get, + CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).get, null) } @@ -123,10 +127,8 @@ class StreamingContext private[streaming] ( private[streaming] val isCheckpointPresent = (cp_ != null) - private[streaming] val isSparkContextPresent = (sc_ != null) - private[streaming] val sc: SparkContext = { - if (isSparkContextPresent) { + if (sc_ != null) { sc_ } else if (isCheckpointPresent) { new SparkContext(cp_.createSparkConf()) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 90ea7285ea1a8..f8cf3f2a84c90 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -678,7 +678,7 @@ object JavaStreamingContext { * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible * file system */ - @deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0") + @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") def getOrCreate( checkpointPath: String, hadoopConf: Configuration, @@ -703,7 +703,7 @@ object JavaStreamingContext { * @param createOnError Whether to create a new JavaStreamingContext if there is an * error in reading checkpoint data. */ - @deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0") + @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") def getOrCreate( checkpointPath: String, hadoopConf: Configuration, diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index cf931ac102ec7..cb2e8380b4933 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -22,10 +22,12 @@ import java.nio.charset.Charset; import java.util.*; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; + import scala.Tuple2; import org.junit.Assert; @@ -1725,27 +1727,29 @@ public void testContextGetOrCreate() throws InterruptedException { // Function to create JavaStreamingContext without any output operations // (used to detect the new context) + final MutableBoolean newContextCreated = new MutableBoolean(false); Function0 creatingFunc = new Function0() { public JavaStreamingContext call() { + newContextCreated.setValue(true); return new JavaStreamingContext(conf, Seconds.apply(1)); } }; + newContextCreated.setValue(false); ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc); - Assert.assertTrue("new context not created", - ssc.ssc().graph().getOutputStreams().length == 0); - ssc.stop(); - - ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc, - new org.apache.hadoop.conf.Configuration()); - Assert.assertFalse("old context not recovered", - ssc.ssc().graph().getOutputStreams().length == 0); + Assert.assertTrue("new context not created", newContextCreated.isTrue()); ssc.stop(); + newContextCreated.setValue(false); ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc, new org.apache.hadoop.conf.Configuration(), true); - Assert.assertTrue("new context not created", - ssc.ssc().graph().getOutputStreams().length == 0); + Assert.assertTrue("new context not created", newContextCreated.isTrue()); + ssc.stop(); + + newContextCreated.setValue(false); + ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc, + new org.apache.hadoop.conf.Configuration()); + Assert.assertTrue("old context not recovered", newContextCreated.isFalse()); ssc.stop(); // Function to create JavaStreamingContext using existing JavaSparkContext @@ -1753,24 +1757,25 @@ public JavaStreamingContext call() { Function creatingFunc2 = new Function() { public JavaStreamingContext call(JavaSparkContext context) { + newContextCreated.setValue(true); return new JavaStreamingContext(context, Seconds.apply(1)); } }; JavaSparkContext sc = new JavaSparkContext(conf); + newContextCreated.setValue(false); ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc); - Assert.assertTrue("new context not created", - ssc.ssc().graph().getOutputStreams().length == 0); + Assert.assertTrue("new context not created", newContextCreated.isTrue()); ssc.stop(false); - ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc); - Assert.assertFalse("old context not recovered", - ssc.ssc().graph().getOutputStreams().length == 0); + newContextCreated.setValue(false); + ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true); + Assert.assertTrue("new context not created", newContextCreated.isTrue()); ssc.stop(false); - ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true); - Assert.assertTrue("new context not created", - ssc.ssc().graph().getOutputStreams().length == 0); + newContextCreated.setValue(false); + ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc); + Assert.assertTrue("old context not recovered", newContextCreated.isFalse()); ssc.stop(); } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 10132961d12f2..6fe62ca4f3231 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -332,15 +332,17 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("getOrCreate") { val conf = new SparkConf().setMaster(master).setAppName(appName) - conf.set("newContext", "true") // to identify the context as new // Function to create StreamingContext that has a config to identify it to be new context + var newContextCreated = false def creatingFunction(): StreamingContext = { + newContextCreated = true new StreamingContext(conf, batchDuration) } // Call ssc.stop after a body of code - def withSscStop(body: => Unit): Unit = { + def testGetOrCreate(body: => Unit): Unit = { + newContextCreated = false try { body } finally { @@ -354,10 +356,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w val emptyPath = Utils.createTempDir().getAbsolutePath() // getOrCreate should create new context with empty path - withSscStop { + testGetOrCreate { ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _) assert(ssc != null, "no context created") - assert(ssc.conf.getBoolean("newContext", false), "new context not created") + assert(newContextCreated, "new context not created") } val corrutedCheckpointPath = createCorruptedCheckpoint() @@ -374,36 +376,38 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } // getOrCreate should create new context with fake checkpoint file and createOnError = true - withSscStop { + testGetOrCreate { ssc = StreamingContext.getOrCreate( corrutedCheckpointPath, creatingFunction _, createOnError = true) assert(ssc != null, "no context created") - assert(ssc.conf.getBoolean("newContext", false), "new context not created") + assert(newContextCreated, "new context not created") } val checkpointPath = createValidCheckpoint() - // getOrCreate should recover context with checkpoint path - withSscStop { + // getOrCreate should recover context with checkpoint path, and recover old configuration + testGetOrCreate { ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _) assert(ssc != null, "no context created") - assert(!ssc.conf.contains("newContext"), "old context not recovered") + assert(!newContextCreated, "old context not recovered") + assert(ssc.conf.get("someKey") === "someValue") } } test("getOrCreate with existing SparkContext") { val conf = new SparkConf().setMaster(master).setAppName(appName) - conf.set("newContext", "true") // to identify the context as new sc = new SparkContext(conf) // Function to create StreamingContext that has a config to identify it to be new context + var newContextCreated = false def creatingFunction(sparkContext: SparkContext): StreamingContext = { - // context with no output ops + newContextCreated = true new StreamingContext(sparkContext, batchDuration) } // Call ssc.stop(stopSparkContext = false) after a body of cody - def withSscStop(body: => Unit): Unit = { + def testGetOrCreate(body: => Unit): Unit = { + newContextCreated = false try { body } finally { @@ -417,13 +421,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w val emptyPath = Utils.createTempDir().getAbsolutePath() // getOrCreate should create new context with empty path - withSscStop { + testGetOrCreate { ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true) assert(ssc != null, "no context created") - assert(ssc.conf.getBoolean("newContext", false), - "new StreamingContext does not use existing SparkContext") - assert(ssc.graph.getOutputStreams().isEmpty, // number of output ops in new context = 0 - "created StreamingContext is not new!") + assert(newContextCreated, "new context not created") + assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext") } val corrutedCheckpointPath = createCorruptedCheckpoint() @@ -440,25 +442,23 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } // getOrCreate should create new context with fake checkpoint file and createOnError = true - withSscStop { + testGetOrCreate { ssc = StreamingContext.getOrCreate( corrutedCheckpointPath, creatingFunction _, sc, createOnError = true) assert(ssc != null, "no context created") - assert(ssc.conf.getBoolean("newContext", false), - "new StreamingContext does not use existing SparkContext") - assert(ssc.graph.getOutputStreams().isEmpty, // number of output ops in new context = 0 - "created StreamingContext is not new") } + assert(newContextCreated, "new context not created") + assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext") + } val checkpointPath = createValidCheckpoint() // StreamingContext.getOrCreate should recover context with checkpoint path - withSscStop { + testGetOrCreate { ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc) assert(ssc != null, "no context created") - assert(ssc.conf.getBoolean("newContext", false), - "recovered StreamingContext does not use existing SparkContext") - assert(ssc.graph.getOutputStreams().nonEmpty, - "new StreamingContext created instead of recovering old one") + assert(!newContextCreated, "old context not recovered") + assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext") + assert(!ssc.conf.contains("someKey"), "recovered StreamingContext unexpectedly has old config") } } @@ -476,6 +476,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w val testDirectory = Utils.createTempDir().getAbsolutePath() val checkpointDirectory = Utils.createTempDir().getAbsolutePath() val conf = new SparkConf().setMaster(master).setAppName(appName) + conf.set("someKey", "someValue") ssc = new StreamingContext(conf, batchDuration) ssc.checkpoint(checkpointDirectory) ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() } From 94db63c7603c159d2156bd5fe55acf1149a3b89b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 22 Apr 2015 03:34:23 -0700 Subject: [PATCH 5/5] Fix long line. --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 6fe62ca4f3231..5b9edc42fb179 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -458,7 +458,8 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(ssc != null, "no context created") assert(!newContextCreated, "old context not recovered") assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext") - assert(!ssc.conf.contains("someKey"), "recovered StreamingContext unexpectedly has old config") + assert(!ssc.conf.contains("someKey"), + "recovered StreamingContext unexpectedly has old config") } } @@ -528,7 +529,8 @@ object TestReceiver { } /** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */ -class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging { +class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) + extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging { var receivingThreadOption: Option[Thread] = None