Skip to content

Commit 7b41b17

Browse files
JoshRosentdas
authored andcommitted
[SPARK-4301] StreamingContext should not allow start() to be called after calling stop()
In Spark 1.0.0+, calling `stop()` on a StreamingContext that has not been started is a no-op which has no side-effects. This allows users to call `stop()` on a fresh StreamingContext followed by `start()`. I believe that this almost always indicates an error and is not behavior that we should support. Since we don't allow `start() stop() start()` then I don't think it makes sense to allow `stop() start()`. The current behavior can lead to resource leaks when StreamingContext constructs its own SparkContext: if I call `stop(stopSparkContext=True)`, then I expect StreamingContext's underlying SparkContext to be stopped irrespective of whether the StreamingContext has been started. This is useful when writing unit test fixtures. Prior discussions: - #3053 (diff) - #3121 (comment) Author: Josh Rosen <[email protected]> Closes #3160 from JoshRosen/SPARK-4301 and squashes the following commits: dbcc929 [Josh Rosen] Address more review comments bdbe5da [Josh Rosen] Stop SparkContext after stopping scheduler, not before. 03e9c40 [Josh Rosen] Always stop SparkContext, even if stop(false) has already been called. 832a7f4 [Josh Rosen] Address review comment 5142517 [Josh Rosen] Add tests; improve Scaladoc. 813e471 [Josh Rosen] Revert workaround added in https://github.com/apache/spark/pull/3053/files#diff-e144dbee130ed84f9465853ddce65f8eR49 5558e70 [Josh Rosen] StreamingContext.stop() should stop SparkContext even if StreamingContext has not been started yet.
1 parent 4af5c7e commit 7b41b17

File tree

2 files changed

+40
-23
lines changed

2 files changed

+40
-23
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -436,10 +436,10 @@ class StreamingContext private[streaming] (
436436

437437
/**
438438
* Start the execution of the streams.
439+
*
440+
* @throws SparkException if the context has already been started or stopped.
439441
*/
440442
def start(): Unit = synchronized {
441-
// Throw exception if the context has already been started once
442-
// or if a stopped context is being started again
443443
if (state == Started) {
444444
throw new SparkException("StreamingContext has already been started")
445445
}
@@ -472,8 +472,10 @@ class StreamingContext private[streaming] (
472472
/**
473473
* Stop the execution of the streams immediately (does not wait for all received data
474474
* to be processed).
475-
* @param stopSparkContext Stop the associated SparkContext or not
476475
*
476+
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
477+
* will be stopped regardless of whether this StreamingContext has been
478+
* started.
477479
*/
478480
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
479481
stop(stopSparkContext, false)
@@ -482,25 +484,27 @@ class StreamingContext private[streaming] (
482484
/**
483485
* Stop the execution of the streams, with option of ensuring all received data
484486
* has been processed.
485-
* @param stopSparkContext Stop the associated SparkContext or not
486-
* @param stopGracefully Stop gracefully by waiting for the processing of all
487+
*
488+
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
489+
* will be stopped regardless of whether this StreamingContext has been
490+
* started.
491+
* @param stopGracefully if true, stops gracefully by waiting for the processing of all
487492
* received data to be completed
488493
*/
489494
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
490-
// Warn (but not fail) if context is stopped twice,
491-
// or context is stopped before starting
492-
if (state == Initialized) {
493-
logWarning("StreamingContext has not been started yet")
494-
return
495+
state match {
496+
case Initialized => logWarning("StreamingContext has not been started yet")
497+
case Stopped => logWarning("StreamingContext has already been stopped")
498+
case Started =>
499+
scheduler.stop(stopGracefully)
500+
logInfo("StreamingContext stopped successfully")
501+
waiter.notifyStop()
495502
}
496-
if (state == Stopped) {
497-
logWarning("StreamingContext has already been stopped")
498-
return
499-
} // no need to throw an exception as its okay to stop twice
500-
scheduler.stop(stopGracefully)
501-
logInfo("StreamingContext stopped successfully")
502-
waiter.notifyStop()
503+
// Even if the streaming context has not been started, we still need to stop the SparkContext.
504+
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
505+
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
503506
if (stopSparkContext) sc.stop()
507+
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
504508
state = Stopped
505509
}
506510
}

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
4646
after {
4747
if (ssc != null) {
4848
ssc.stop()
49-
if (ssc.sc != null) {
50-
// Calling ssc.stop() does not always stop the associated SparkContext.
51-
ssc.sc.stop()
52-
}
5349
ssc = null
5450
}
5551
if (sc != null) {
@@ -137,11 +133,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
137133
ssc.stop()
138134
}
139135

140-
test("stop before start and start after stop") {
136+
test("stop before start") {
141137
ssc = new StreamingContext(master, appName, batchDuration)
142138
addInputStream(ssc).register()
143139
ssc.stop() // stop before start should not throw exception
144-
ssc.start()
140+
}
141+
142+
test("start after stop") {
143+
// Regression test for SPARK-4301
144+
ssc = new StreamingContext(master, appName, batchDuration)
145+
addInputStream(ssc).register()
145146
ssc.stop()
146147
intercept[SparkException] {
147148
ssc.start() // start after stop should throw exception
@@ -161,6 +162,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
161162
ssc.stop()
162163
}
163164

165+
test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {
166+
ssc = new StreamingContext(master, appName, batchDuration)
167+
addInputStream(ssc).register()
168+
ssc.stop(stopSparkContext = false)
169+
assert(ssc.sc.makeRDD(1 to 100).collect().size === 100)
170+
ssc.stop(stopSparkContext = true)
171+
// Check that the SparkContext is actually stopped:
172+
intercept[Exception] {
173+
ssc.sc.makeRDD(1 to 100).collect()
174+
}
175+
}
176+
164177
test("stop gracefully") {
165178
val conf = new SparkConf().setMaster(master).setAppName(appName)
166179
conf.set("spark.cleaner.ttl", "3600")

0 commit comments

Comments
 (0)