From 5558e703b53fc13e54280e0bb92b3db6eea664f3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Nov 2014 18:23:22 -0800 Subject: [PATCH 1/7] StreamingContext.stop() should stop SparkContext even if StreamingContext has not been started yet. --- .../spark/streaming/StreamingContext.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 23d6d1c5e50f..d838f03fbae3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -487,20 +487,20 @@ class StreamingContext private[streaming] ( * received data to be completed */ def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized { - // Warn (but not fail) if context is stopped twice, - // or context is stopped before starting - if (state == Initialized) { - logWarning("StreamingContext has not been started yet") - return - } if (state == Stopped) { logWarning("StreamingContext has already been stopped") - return - } // no need to throw an exception as its okay to stop twice - scheduler.stop(stopGracefully) - logInfo("StreamingContext stopped successfully") - waiter.notifyStop() - if (stopSparkContext) sc.stop() + } else { + // Even if the streaming context has not been started, we still need to stop the SparkContext: + if (stopSparkContext) sc.stop() + if (state == Initialized) { + logWarning("StreamingContext has not been started yet") + } else { + scheduler.stop(stopGracefully) + logInfo("StreamingContext stopped successfully") + waiter.notifyStop() + } + } + // The state should always be Stopped after calling `stop()`, even if we haven't started yet: state = Stopped } } From 813e4717472b585dbb4b92795ccca96d374679d9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 7 Nov 2014 10:54:55 -0800 Subject: [PATCH 2/7] Revert workaround added in https://github.com/apache/spark/pull/3053/files#diff-e144dbee130ed84f9465853ddce65f8eR49 --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f47772947d67..655cec1573f5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -46,10 +46,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w after { if (ssc != null) { ssc.stop() - if (ssc.sc != null) { - // Calling ssc.stop() does not always stop the associated SparkContext. - ssc.sc.stop() - } ssc = null } if (sc != null) { From 5142517536ed59eda8fed6609fe263e95c94ad0e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 7 Nov 2014 11:17:29 -0800 Subject: [PATCH 3/7] Add tests; improve Scaladoc. --- .../apache/spark/streaming/StreamingContext.scala | 13 ++++++++----- .../spark/streaming/StreamingContextSuite.scala | 9 +++++++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index d838f03fbae3..deb85a28aebd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -436,10 +436,10 @@ class StreamingContext private[streaming] ( /** * Start the execution of the streams. + * + * @throws SparkException if the context has already been started or stopped. */ def start(): Unit = synchronized { - // Throw exception if the context has already been started once - // or if a stopped context is being started again if (state == Started) { throw new SparkException("StreamingContext has already been started") } @@ -472,8 +472,9 @@ class StreamingContext private[streaming] ( /** * Stop the execution of the streams immediately (does not wait for all received data * to be processed). - * @param stopSparkContext Stop the associated SparkContext or not * + * @param stopSparkContext if true, stops the associated SparkContext. The SparkContext will be + * stopped regardless of whether this StreamingContext has been started. */ def stop(stopSparkContext: Boolean = true): Unit = synchronized { stop(stopSparkContext, false) @@ -482,8 +483,10 @@ class StreamingContext private[streaming] ( /** * Stop the execution of the streams, with option of ensuring all received data * has been processed. - * @param stopSparkContext Stop the associated SparkContext or not - * @param stopGracefully Stop gracefully by waiting for the processing of all + * + * @param stopSparkContext if true, stops the associated SparkContext. The SparkContext will be + * stopped regardless of whether this StreamingContext has been started. + * @param stopGracefully if true, stops gracefully by waiting for the processing of all * received data to be completed */ def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 655cec1573f5..42efdbc86bae 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -133,11 +133,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc.stop() } - test("stop before start and start after stop") { + test("stop before start") { ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register() ssc.stop() // stop before start should not throw exception - ssc.start() + } + + test("start after stop") { + // Regression test for SPARK-4301 + ssc = new StreamingContext(master, appName, batchDuration) + addInputStream(ssc).register() ssc.stop() intercept[SparkException] { ssc.start() // start after stop should throw exception From 832a7f420bd38a698fbbdc817f23b6815f1c5679 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 7 Nov 2014 11:34:35 -0800 Subject: [PATCH 4/7] Address review comment --- .../org/apache/spark/streaming/StreamingContext.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index deb85a28aebd..749afade6070 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -473,8 +473,9 @@ class StreamingContext private[streaming] ( * Stop the execution of the streams immediately (does not wait for all received data * to be processed). * - * @param stopSparkContext if true, stops the associated SparkContext. The SparkContext will be - * stopped regardless of whether this StreamingContext has been started. + * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext + * will be stopped regardless of whether this StreamingContext has been + * started. */ def stop(stopSparkContext: Boolean = true): Unit = synchronized { stop(stopSparkContext, false) @@ -484,8 +485,9 @@ class StreamingContext private[streaming] ( * Stop the execution of the streams, with option of ensuring all received data * has been processed. * - * @param stopSparkContext if true, stops the associated SparkContext. The SparkContext will be - * stopped regardless of whether this StreamingContext has been started. + * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext + * will be stopped regardless of whether this StreamingContext has been + * started. * @param stopGracefully if true, stops gracefully by waiting for the processing of all * received data to be completed */ From 03e9c402683c421ab987625e2f9de9a627091bf7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 7 Nov 2014 13:25:40 -0800 Subject: [PATCH 5/7] Always stop SparkContext, even if stop(false) has already been called. This strengthens the invariant that calling stop(true) _always_ stops the underlying SparkContext, no matter what sequence of calls may have preceded it. --- .../apache/spark/streaming/StreamingContext.scala | 6 ++++-- .../spark/streaming/StreamingContextSuite.scala | 12 ++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 749afade6070..fdd74baa832f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -492,11 +492,13 @@ class StreamingContext private[streaming] ( * received data to be completed */ def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized { + // Even if the streaming context has not been started, we still need to stop the SparkContext. + // Even if we have already stopped, we still need to attempt to stop the SparkContext because + // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). + if (stopSparkContext) sc.stop() if (state == Stopped) { logWarning("StreamingContext has already been stopped") } else { - // Even if the streaming context has not been started, we still need to stop the SparkContext: - if (stopSparkContext) sc.stop() if (state == Initialized) { logWarning("StreamingContext has not been started yet") } else { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 42efdbc86bae..93ffea81f62e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -139,6 +139,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc.stop() // stop before start should not throw exception } + test("stop(stopContext=true) after stopSparkContext(stopContext=false)") { + ssc = new StreamingContext(master, appName, batchDuration) + addInputStream(ssc).register() + ssc.stop(stopSparkContext = false) + assert(ssc.sc.makeRDD(1 to 100).collect().size === 100) + ssc.stop(stopSparkContext = true) + // Check that the SparkContext is actually stopped: + intercept[Exception] { + ssc.sc.makeRDD(1 to 100).collect() + } + } + test("start after stop") { // Regression test for SPARK-4301 ssc = new StreamingContext(master, appName, batchDuration) From bdbe5da19e212eee5a2787ecce52c39d0d8c3d30 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 7 Nov 2014 13:43:20 -0800 Subject: [PATCH 6/7] Stop SparkContext after stopping scheduler, not before. --- .../org/apache/spark/streaming/StreamingContext.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index fdd74baa832f..ce0f7e6273b2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -492,10 +492,6 @@ class StreamingContext private[streaming] ( * received data to be completed */ def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized { - // Even if the streaming context has not been started, we still need to stop the SparkContext. - // Even if we have already stopped, we still need to attempt to stop the SparkContext because - // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). - if (stopSparkContext) sc.stop() if (state == Stopped) { logWarning("StreamingContext has already been stopped") } else { @@ -507,6 +503,10 @@ class StreamingContext private[streaming] ( waiter.notifyStop() } } + // Even if the streaming context has not been started, we still need to stop the SparkContext. + // Even if we have already stopped, we still need to attempt to stop the SparkContext because + // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). + if (stopSparkContext) sc.stop() // The state should always be Stopped after calling `stop()`, even if we haven't started yet: state = Stopped } From dbcc9292cdcf860b1e437ef453c7aa0039ef6634 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 7 Nov 2014 14:08:04 -0800 Subject: [PATCH 7/7] Address more review comments --- .../spark/streaming/StreamingContext.scala | 11 ++++----- .../streaming/StreamingContextSuite.scala | 24 +++++++++---------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ce0f7e6273b2..54b219711efb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -492,16 +492,13 @@ class StreamingContext private[streaming] ( * received data to be completed */ def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized { - if (state == Stopped) { - logWarning("StreamingContext has already been stopped") - } else { - if (state == Initialized) { - logWarning("StreamingContext has not been started yet") - } else { + state match { + case Initialized => logWarning("StreamingContext has not been started yet") + case Stopped => logWarning("StreamingContext has already been stopped") + case Started => scheduler.stop(stopGracefully) logInfo("StreamingContext stopped successfully") waiter.notifyStop() - } } // Even if the streaming context has not been started, we still need to stop the SparkContext. // Even if we have already stopped, we still need to attempt to stop the SparkContext because diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 93ffea81f62e..4b49c4d25164 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -139,18 +139,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc.stop() // stop before start should not throw exception } - test("stop(stopContext=true) after stopSparkContext(stopContext=false)") { - ssc = new StreamingContext(master, appName, batchDuration) - addInputStream(ssc).register() - ssc.stop(stopSparkContext = false) - assert(ssc.sc.makeRDD(1 to 100).collect().size === 100) - ssc.stop(stopSparkContext = true) - // Check that the SparkContext is actually stopped: - intercept[Exception] { - ssc.sc.makeRDD(1 to 100).collect() - } - } - test("start after stop") { // Regression test for SPARK-4301 ssc = new StreamingContext(master, appName, batchDuration) @@ -174,6 +162,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc.stop() } + test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") { + ssc = new StreamingContext(master, appName, batchDuration) + addInputStream(ssc).register() + ssc.stop(stopSparkContext = false) + assert(ssc.sc.makeRDD(1 to 100).collect().size === 100) + ssc.stop(stopSparkContext = true) + // Check that the SparkContext is actually stopped: + intercept[Exception] { + ssc.sc.makeRDD(1 to 100).collect() + } + } + test("stop gracefully") { val conf = new SparkConf().setMaster(master).setAppName(appName) conf.set("spark.cleaner.ttl", "3600")