Skip to content

Commit e208c5f

Browse files
committed
[SPARK-11572] Process outstanding requests after seeing stop flag
1 parent 3e2e187 commit e208c5f

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,24 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
5656

5757
// A counter that represents the number of events produced and consumed in the queue
5858
private val eventLock = new Semaphore(0)
59+
// limit on the number of events to process before exiting. -1 means no limit
60+
private val eventLimit = -1
5961

6062
private val listenerThread = new Thread(name) {
6163
setDaemon(true)
6264
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
63-
while (true) {
65+
while (eventLimit != 0) {
6466
eventLock.acquire()
6567
self.synchronized {
6668
processingEvent = true
6769
}
6870
try {
6971
if (stopped.get()) {
70-
// Get out of the while loop and shutdown the daemon thread
71-
return
72+
eventLimit = eventQueue.size
73+
if (eventLimit == 0) {
74+
// Get out of the while loop and shutdown the daemon thread
75+
return
76+
}
7277
}
7378
val event = eventQueue.poll
7479
assert(event != null, "event queue was empty but the listener bus was not stopped")

0 commit comments

Comments
 (0)