Skip to content

Commit a49dc74

Browse files
Blockingly drain all events in LiveListenerBus#stop().
Also, move bus.stop() call to the end of sc.stop(), so that slow listeners would not affect the cleaning of other states.
1 parent dc126f2 commit a49dc74

File tree

3 files changed

+41
-2
lines changed

3 files changed

+41
-2
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -835,13 +835,13 @@ class SparkContext(
835835
if (dagSchedulerCopy != null) {
836836
metadataCleaner.cancel()
837837
dagSchedulerCopy.stop()
838-
listenerBus.stop()
839838
taskScheduler = null
840839
// TODO: Cache.stop()?
841840
env.stop()
842841
SparkEnv.set(null)
843842
ShuffleMapTask.clearCache()
844843
ResultTask.clearCache()
844+
listenerBus.stop()
845845
logInfo("Successfully stopped SparkContext")
846846
} else {
847847
logInfo("SparkContext already stopped")

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
3737
private var queueFullErrorMessageLogged = false
3838
private var started = false
3939

40+
private var drained = false
41+
private val drainedLock = new Object()
42+
4043
/**
4144
* Start sending events to attached listeners.
4245
*
@@ -55,6 +58,10 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
5558
while (true) {
5659
val event = eventQueue.take
5760
if (event == SparkListenerShutdown) {
61+
drainedLock.synchronized {
62+
drained = true
63+
drainedLock.notify()
64+
}
5865
// Get out of the while loop and shutdown the daemon thread
5966
return
6067
}
@@ -92,10 +99,21 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
9299
true
93100
}
94101

102+
/**
103+
* Stop the listener bus; wait until all listener events are processed by the listener bus
104+
* thread. The user has to make sure the listeners finish in a reasonable amount of time.
105+
*/
95106
def stop() {
96107
if (!started) {
97108
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
98109
}
99-
post(SparkListenerShutdown)
110+
drainedLock.synchronized {
111+
// put post() and wait() in the same synchronized block to ensure wait() happens before
112+
// notify()
113+
post(SparkListenerShutdown)
114+
while (!drained) {
115+
drainedLock.wait()
116+
}
117+
}
100118
}
101119
}

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,20 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
7272
}
7373
}
7474

75+
test("bus.stop() waits for the event queue to completely drain") {
76+
val sleepyListener = new SleepyListener(1000)
77+
val bus = new LiveListenerBus
78+
bus.addListener(sleepyListener)
79+
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
80+
81+
bus.start()
82+
// since the handler is just thread sleep, the queue should not drain immediately
83+
assert(!bus.waitUntilEmpty(0))
84+
bus.stop()
85+
// bus.stop() should wait until the event queue is drained, ensuring no events are lost
86+
assert(bus.waitUntilEmpty(0))
87+
}
88+
7589
test("basic creation of StageInfo") {
7690
val listener = new SaveStageAndTaskInfo
7791
sc.addSparkListener(listener)
@@ -282,4 +296,11 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
282296
startedGettingResultTasks += taskGettingResult.taskInfo.index
283297
}
284298
}
299+
300+
class SleepyListener(val sleepTime: Long) extends SparkListener {
301+
override def onJobEnd(job: SparkListenerJobEnd) = {
302+
Thread.sleep(sleepTime)
303+
}
304+
}
305+
285306
}

0 commit comments

Comments
 (0)