Skip to content

Commit dbcc929

Browse files
committed
Address more review comments
1 parent bdbe5da commit dbcc929

File tree

2 files changed

+16
-19
lines changed

2 files changed

+16
-19
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -492,16 +492,13 @@ class StreamingContext private[streaming] (
492492
* received data to be completed
493493
*/
494494
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
495-
if (state == Stopped) {
496-
logWarning("StreamingContext has already been stopped")
497-
} else {
498-
if (state == Initialized) {
499-
logWarning("StreamingContext has not been started yet")
500-
} else {
495+
state match {
496+
case Initialized => logWarning("StreamingContext has not been started yet")
497+
case Stopped => logWarning("StreamingContext has already been stopped")
498+
case Started =>
501499
scheduler.stop(stopGracefully)
502500
logInfo("StreamingContext stopped successfully")
503501
waiter.notifyStop()
504-
}
505502
}
506503
// Even if the streaming context has not been started, we still need to stop the SparkContext.
507504
// Even if we have already stopped, we still need to attempt to stop the SparkContext because

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -139,18 +139,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
139139
ssc.stop() // stop before start should not throw exception
140140
}
141141

142-
test("stop(stopContext=true) after stopSparkContext(stopContext=false)") {
143-
ssc = new StreamingContext(master, appName, batchDuration)
144-
addInputStream(ssc).register()
145-
ssc.stop(stopSparkContext = false)
146-
assert(ssc.sc.makeRDD(1 to 100).collect().size === 100)
147-
ssc.stop(stopSparkContext = true)
148-
// Check that the SparkContext is actually stopped:
149-
intercept[Exception] {
150-
ssc.sc.makeRDD(1 to 100).collect()
151-
}
152-
}
153-
154142
test("start after stop") {
155143
// Regression test for SPARK-4301
156144
ssc = new StreamingContext(master, appName, batchDuration)
@@ -174,6 +162,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
174162
ssc.stop()
175163
}
176164

165+
test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {
166+
ssc = new StreamingContext(master, appName, batchDuration)
167+
addInputStream(ssc).register()
168+
ssc.stop(stopSparkContext = false)
169+
assert(ssc.sc.makeRDD(1 to 100).collect().size === 100)
170+
ssc.stop(stopSparkContext = true)
171+
// Check that the SparkContext is actually stopped:
172+
intercept[Exception] {
173+
ssc.sc.makeRDD(1 to 100).collect()
174+
}
175+
}
176+
177177
test("stop gracefully") {
178178
val conf = new SparkConf().setMaster(master).setAppName(appName)
179179
conf.set("spark.cleaner.ttl", "3600")

0 commit comments

Comments
 (0)