From e208c5fc111553e38db8b60741debdb35a4a41c6 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 15 Nov 2015 15:20:51 -0800 Subject: [PATCH 1/5] [SPARK-11572] Process outstanding requests after seeing stop flag --- .../apache/spark/util/AsynchronousListenerBus.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index b3b54af972cb4..cc58bc5924a7f 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -56,19 +56,24 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri // A counter that represents the number of events produced and consumed in the queue private val eventLock = new Semaphore(0) + // limit on the number of events to process before exiting. -1 means no limit + private val eventLimit = -1 private val listenerThread = new Thread(name) { setDaemon(true) override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { - while (true) { + while (eventLimit != 0) { eventLock.acquire() self.synchronized { processingEvent = true } try { if (stopped.get()) { - // Get out of the while loop and shutdown the daemon thread - return + eventLimit = eventQueue.size + if (eventLimit == 0) { + // Get out of the while loop and shutdown the daemon thread + return + } } val event = eventQueue.poll assert(event != null, "event queue was empty but the listener bus was not stopped") From cb4132da49176adf5f98934ea06b41526ccf8cc2 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 15 Nov 2015 15:55:58 -0800 Subject: [PATCH 2/5] Fix compilation --- .../scala/org/apache/spark/util/AsynchronousListenerBus.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index cc58bc5924a7f..ed1c65d82bf50 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -57,7 +57,7 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri // A counter that represents the number of events produced and consumed in the queue private val eventLock = new Semaphore(0) // limit on the number of events to process before exiting. -1 means no limit - private val eventLimit = -1 + private var eventLimit = -1 private val listenerThread = new Thread(name) { setDaemon(true) From 3df8051a154b547055a061d64f46f828ce6f9a8c Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 15 Nov 2015 17:05:38 -0800 Subject: [PATCH 3/5] Adjust eventLimit after getting event --- .../scala/org/apache/spark/util/AsynchronousListenerBus.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index ed1c65d82bf50..79222998d7cd1 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -77,6 +77,9 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri } val event = eventQueue.poll assert(event != null, "event queue was empty but the listener bus was not stopped") + if (eventLimit > 0) { + eventLimit-- + } postToAll(event) } finally { self.synchronized { From 8be6a7ceff31ea774b0b1ce86c041ecdfd99a9e3 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 15 Nov 2015 17:53:17 -0800 Subject: [PATCH 4/5] Remove tab in code --- .../scala/org/apache/spark/util/AsynchronousListenerBus.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index 79222998d7cd1..9b75c648a95fe 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -77,7 +77,7 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri } val event = eventQueue.poll assert(event != null, "event queue was empty but the listener bus was not stopped") - if (eventLimit > 0) { + if (eventLimit > 0) { eventLimit-- } postToAll(event) From cd9b2f2b8e2cfaa4e6e9814baea1238f3f8bc1b4 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 15 Nov 2015 18:24:15 -0800 Subject: [PATCH 5/5] Decrement --- .../scala/org/apache/spark/util/AsynchronousListenerBus.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index 9b75c648a95fe..e753b6683fec2 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -78,7 +78,7 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri val event = eventQueue.poll assert(event != null, "event queue was empty but the listener bus was not stopped") if (eventLimit > 0) { - eventLimit-- + eventLimit-=1 } postToAll(event) } finally {