Skip to content

Commit 4d5c12a

Browse files
Lars Albertssonpwendell
authored andcommitted
SPARK-2113: awaitTermination() after stop() will hang in Spark Stremaing
Author: Lars Albertsson <[email protected]> Closes apache#1001 from lallea/contextwaiter_stopped and squashes the following commits: 93cd314 [Lars Albertsson] Mend StreamingContext stop() followed by awaitTermination().
1 parent e508f59 commit 4d5c12a

File tree

2 files changed

+13
-0
lines changed

2 files changed

+13
-0
lines changed

streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ private[streaming] class ContextWaiter {
2727
}
2828

2929
def notifyStop() = synchronized {
30+
stopped = true
3031
notifyAll()
3132
}
3233

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
223223
}
224224
}
225225

226+
test("awaitTermination after stop") {
227+
ssc = new StreamingContext(master, appName, batchDuration)
228+
val inputStream = addInputStream(ssc)
229+
inputStream.map(x => x).register()
230+
231+
failAfter(10000 millis) {
232+
ssc.start()
233+
ssc.stop()
234+
ssc.awaitTermination()
235+
}
236+
}
237+
226238
test("awaitTermination with error in task") {
227239
ssc = new StreamingContext(master, appName, batchDuration)
228240
val inputStream = addInputStream(ssc)

0 commit comments

Comments
 (0)