Skip to content

Commit 01187f5

Browse files
committed
[SPARK-7217] [STREAMING] Add configuration to control the default behavior of StreamingContext.stop() implicitly calling SparkContext.stop()
In environments like notebooks, the SparkContext is managed by the underlying infrastructure and it is expected that the SparkContext will not be stopped. However, StreamingContext.stop() calls SparkContext.stop() as a non-intuitive side-effect. This PR adds a configuration in SparkConf that sets the default StreamingContext stop behavior. It should be such that the existing behavior does not change for existing users. Author: Tathagata Das <[email protected]> Closes apache#5929 from tdas/SPARK-7217 and squashes the following commits: 869a763 [Tathagata Das] Changed implementation. 685fe00 [Tathagata Das] Added configuration
1 parent cfdadcb commit 01187f5

File tree

2 files changed

+24
-5
lines changed

2 files changed

+24
-5
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -563,13 +563,17 @@ class StreamingContext private[streaming] (
563563

564564
/**
565565
* Stop the execution of the streams immediately (does not wait for all received data
566-
* to be processed).
566+
* to be processed). By default, if `stopSparkContext` is not specified, the underlying
567+
* SparkContext will also be stopped. This implicit behavior can be configured using the
568+
* SparkConf configuration spark.streaming.stopSparkContextByDefault.
567569
*
568-
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
570+
* @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext
569571
* will be stopped regardless of whether this StreamingContext has been
570572
* started.
571573
*/
572-
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
574+
def stop(
575+
stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
576+
): Unit = synchronized {
573577
stop(stopSparkContext, false)
574578
}
575579

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
118118
assert(ssc.state === ssc.StreamingContextState.Started)
119119
ssc.stop()
120120
assert(ssc.state === ssc.StreamingContextState.Stopped)
121+
122+
// Make sure that the SparkContext is also stopped by default
123+
intercept[Exception] {
124+
ssc.sparkContext.makeRDD(1 to 10)
125+
}
121126
}
122127

123128
test("start multiple times") {
@@ -154,16 +159,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
154159
}
155160

156161
test("stop only streaming context") {
157-
ssc = new StreamingContext(master, appName, batchDuration)
162+
val conf = new SparkConf().setMaster(master).setAppName(appName)
163+
164+
// Explicitly do not stop SparkContext
165+
ssc = new StreamingContext(conf, batchDuration)
158166
sc = ssc.sparkContext
159167
addInputStream(ssc).register()
160168
ssc.start()
161169
ssc.stop(stopSparkContext = false)
162170
assert(sc.makeRDD(1 to 100).collect().size === 100)
163-
ssc = new StreamingContext(sc, batchDuration)
171+
sc.stop()
172+
173+
// Implicitly do not stop SparkContext
174+
conf.set("spark.streaming.stopSparkContextByDefault", "false")
175+
ssc = new StreamingContext(conf, batchDuration)
176+
sc = ssc.sparkContext
164177
addInputStream(ssc).register()
165178
ssc.start()
166179
ssc.stop()
180+
assert(sc.makeRDD(1 to 100).collect().size === 100)
181+
sc.stop()
167182
}
168183

169184
test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {

0 commit comments

Comments
 (0)