Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,10 @@ class StreamingContext private[streaming] (

/**
* Start the execution of the streams.
*
* @throws SparkException if the context has already been started or stopped.
*/
def start(): Unit = synchronized {
// Throw exception if the context has already been started once
// or if a stopped context is being started again
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
Expand Down Expand Up @@ -472,8 +472,10 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed).
* @param stopSparkContext Stop the associated SparkContext or not
*
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
*/
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
stop(stopSparkContext, false)
Expand All @@ -482,25 +484,27 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams, with option of ensuring all received data
* has been processed.
* @param stopSparkContext Stop the associated SparkContext or not
* @param stopGracefully Stop gracefully by waiting for the processing of all
*
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
* @param stopGracefully if true, stops gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
// Warn (but not fail) if context is stopped twice,
// or context is stopped before starting
if (state == Initialized) {
logWarning("StreamingContext has not been started yet")
return
state match {
case Initialized => logWarning("StreamingContext has not been started yet")
case Stopped => logWarning("StreamingContext has already been stopped")
case Started =>
scheduler.stop(stopGracefully)
logInfo("StreamingContext stopped successfully")
waiter.notifyStop()
}
if (state == Stopped) {
logWarning("StreamingContext has already been stopped")
return
} // no need to throw an exception as its okay to stop twice
scheduler.stop(stopGracefully)
logInfo("StreamingContext stopped successfully")
waiter.notifyStop()
// Even if the streaming context has not been started, we still need to stop the SparkContext.
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
if (stopSparkContext) sc.stop()
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
state = Stopped
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
after {
if (ssc != null) {
ssc.stop()
if (ssc.sc != null) {
// Calling ssc.stop() does not always stop the associated SparkContext.
ssc.sc.stop()
}
ssc = null
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
if (sc != null) {
Expand Down Expand Up @@ -137,11 +133,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc.stop()
}

test("stop before start and start after stop") {
test("stop before start") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
ssc.stop() // stop before start should not throw exception
ssc.start()
}

test("start after stop") {
// Regression test for SPARK-4301
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
ssc.stop()
intercept[SparkException] {
ssc.start() // start after stop should throw exception
Expand All @@ -161,6 +162,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc.stop()
}

test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
ssc.stop(stopSparkContext = false)
assert(ssc.sc.makeRDD(1 to 100).collect().size === 100)
ssc.stop(stopSparkContext = true)
// Check that the SparkContext is actually stopped:
intercept[Exception] {
ssc.sc.makeRDD(1 to 100).collect()
}
}

test("stop gracefully") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
conf.set("spark.cleaner.ttl", "3600")
Expand Down