From a49dc74802738c318279e988bd699c8211d50670 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 26 Mar 2014 22:36:45 -0700 Subject: [PATCH 1/4] Blockingly drain all events in LiveListenerBus#stop(). Also, move bus.stop() call to the end of sc.stop(), so that slow listeners would not affect the cleaning of other states. --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/scheduler/LiveListenerBus.scala | 20 +++++++++++++++++- .../spark/scheduler/SparkListenerSuite.scala | 21 +++++++++++++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a1003b7925715..5d2867019d02e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -835,13 +835,13 @@ class SparkContext( if (dagSchedulerCopy != null) { metadataCleaner.cancel() dagSchedulerCopy.stop() - listenerBus.stop() taskScheduler = null // TODO: Cache.stop()? env.stop() SparkEnv.set(null) ShuffleMapTask.clearCache() ResultTask.clearCache() + listenerBus.stop() logInfo("Successfully stopped SparkContext") } else { logInfo("SparkContext already stopped") diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 353a48661b0f7..ecc2ce309d318 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -37,6 +37,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { private var queueFullErrorMessageLogged = false private var started = false + private var drained = false + private val drainedLock = new Object() + /** * Start sending events to attached listeners. * @@ -55,6 +58,10 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { while (true) { val event = eventQueue.take if (event == SparkListenerShutdown) { + drainedLock.synchronized { + drained = true + drainedLock.notify() + } // Get out of the while loop and shutdown the daemon thread return } @@ -92,10 +99,21 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { true } + /** + * Stop the listener bus; wait until all listener events are processed by the listener bus + * thread. The user has to make sure the listeners finish in a reasonable amount of time. + */ def stop() { if (!started) { throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") } - post(SparkListenerShutdown) + drainedLock.synchronized { + // put post() and wait() in the same synchronized block to ensure wait() happens before + // notify() + post(SparkListenerShutdown) + while (!drained) { + drainedLock.wait() + } + } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index a25ce35736146..f96fbcae75ced 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -72,6 +72,20 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } } + test("bus.stop() waits for the event queue to completely drain") { + val sleepyListener = new SleepyListener(1000) + val bus = new LiveListenerBus + bus.addListener(sleepyListener) + (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) } + + bus.start() + // since the handler is just thread sleep, the queue should not drain immediately + assert(!bus.waitUntilEmpty(0)) + bus.stop() + // bus.stop() should wait until the event queue is drained, ensuring no events are lost + assert(bus.waitUntilEmpty(0)) + } + test("basic creation of StageInfo") { val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) @@ -282,4 +296,11 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc startedGettingResultTasks += taskGettingResult.taskInfo.index } } + + class SleepyListener(val sleepTime: Long) extends SparkListener { + override def onJobEnd(job: SparkListenerJobEnd) = { + Thread.sleep(sleepTime) + } + } + } From 36bcc4beccaa18691f76222c6cfe9a3526595a86 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 26 Mar 2014 22:47:36 -0700 Subject: [PATCH 2/4] LiveListenerBus.scala: newline changes from CRLF to LF. --- .../spark/scheduler/LiveListenerBus.scala | 238 +++++++++--------- 1 file changed, 119 insertions(+), 119 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index ecc2ce309d318..79fbaee415d63 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -1,119 +1,119 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -import java.util.concurrent.LinkedBlockingQueue - -import org.apache.spark.Logging - -/** - * Asynchronously passes SparkListenerEvents to registered SparkListeners. - * - * Until start() is called, all posted events are only buffered. Only after this listener bus - * has started will events be actually propagated to all attached listeners. This listener bus - * is stopped when it receives a SparkListenerShutdown event, which is posted using stop(). - */ -private[spark] class LiveListenerBus extends SparkListenerBus with Logging { - - /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than - * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ - private val EVENT_QUEUE_CAPACITY = 10000 - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) - private var queueFullErrorMessageLogged = false - private var started = false - - private var drained = false - private val drainedLock = new Object() - - /** - * Start sending events to attached listeners. - * - * This first sends out all buffered events posted before this listener bus has started, then - * listens for any additional events asynchronously while the listener bus is still running. - * This should only be called once. - */ - def start() { - if (started) { - throw new IllegalStateException("Listener bus already started!") - } - started = true - new Thread("SparkListenerBus") { - setDaemon(true) - override def run() { - while (true) { - val event = eventQueue.take - if (event == SparkListenerShutdown) { - drainedLock.synchronized { - drained = true - drainedLock.notify() - } - // Get out of the while loop and shutdown the daemon thread - return - } - postToAll(event) - } - } - }.start() - } - - def post(event: SparkListenerEvent) { - val eventAdded = eventQueue.offer(event) - if (!eventAdded && !queueFullErrorMessageLogged) { - logError("Dropping SparkListenerEvent because no remaining room in event queue. " + - "This likely means one of the SparkListeners is too slow and cannot keep up with the " + - "rate at which tasks are being started by the scheduler.") - queueFullErrorMessageLogged = true - } - } - - /** - * Waits until there are no more events in the queue, or until the specified time has elapsed. - * Used for testing only. Returns true if the queue has emptied and false is the specified time - * elapsed before the queue emptied. - */ - def waitUntilEmpty(timeoutMillis: Int): Boolean = { - val finishTime = System.currentTimeMillis + timeoutMillis - while (!eventQueue.isEmpty) { - if (System.currentTimeMillis > finishTime) { - return false - } - /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify - * add overhead in the general case. */ - Thread.sleep(10) - } - true - } - - /** - * Stop the listener bus; wait until all listener events are processed by the listener bus - * thread. The user has to make sure the listeners finish in a reasonable amount of time. - */ - def stop() { - if (!started) { - throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") - } - drainedLock.synchronized { - // put post() and wait() in the same synchronized block to ensure wait() happens before - // notify() - post(SparkListenerShutdown) - while (!drained) { - drainedLock.wait() - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.LinkedBlockingQueue + +import org.apache.spark.Logging + +/** + * Asynchronously passes SparkListenerEvents to registered SparkListeners. + * + * Until start() is called, all posted events are only buffered. Only after this listener bus + * has started will events be actually propagated to all attached listeners. This listener bus + * is stopped when it receives a SparkListenerShutdown event, which is posted using stop(). + */ +private[spark] class LiveListenerBus extends SparkListenerBus with Logging { + + /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than + * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ + private val EVENT_QUEUE_CAPACITY = 10000 + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) + private var queueFullErrorMessageLogged = false + private var started = false + + private var drained = false + private val drainedLock = new Object() + + /** + * Start sending events to attached listeners. + * + * This first sends out all buffered events posted before this listener bus has started, then + * listens for any additional events asynchronously while the listener bus is still running. + * This should only be called once. + */ + def start() { + if (started) { + throw new IllegalStateException("Listener bus already started!") + } + started = true + new Thread("SparkListenerBus") { + setDaemon(true) + override def run() { + while (true) { + val event = eventQueue.take + if (event == SparkListenerShutdown) { + drainedLock.synchronized { + drained = true + drainedLock.notify() + } + // Get out of the while loop and shutdown the daemon thread + return + } + postToAll(event) + } + } + }.start() + } + + def post(event: SparkListenerEvent) { + val eventAdded = eventQueue.offer(event) + if (!eventAdded && !queueFullErrorMessageLogged) { + logError("Dropping SparkListenerEvent because no remaining room in event queue. " + + "This likely means one of the SparkListeners is too slow and cannot keep up with the " + + "rate at which tasks are being started by the scheduler.") + queueFullErrorMessageLogged = true + } + } + + /** + * Waits until there are no more events in the queue, or until the specified time has elapsed. + * Used for testing only. Returns true if the queue has emptied and false is the specified time + * elapsed before the queue emptied. + */ + def waitUntilEmpty(timeoutMillis: Int): Boolean = { + val finishTime = System.currentTimeMillis + timeoutMillis + while (!eventQueue.isEmpty) { + if (System.currentTimeMillis > finishTime) { + return false + } + /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify + * add overhead in the general case. */ + Thread.sleep(10) + } + true + } + + /** + * Stop the listener bus; wait until all listener events are processed by the listener bus + * thread. The user has to make sure the listeners finish in a reasonable amount of time. + */ + def stop() { + if (!started) { + throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") + } + drainedLock.synchronized { + // put post() and wait() in the same synchronized block to ensure wait() happens before + // notify() + post(SparkListenerShutdown) + while (!drained) { + drainedLock.wait() + } + } + } +} From 0dc8e5191cba66fb32784cb61afc6fa4b8885fe5 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Sun, 30 Mar 2014 08:30:28 -0700 Subject: [PATCH 3/4] Add isDrained() and fix listener bus stop() test (rely on semaphore instead). --- .../spark/scheduler/LiveListenerBus.scala | 6 +++ .../spark/scheduler/SparkListenerSuite.scala | 48 ++++++++++++++----- 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 79fbaee415d63..b64df5f5bfb25 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -99,6 +99,12 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { true } + /** + * Return true if the event queue has been drained, i.e. the listener bus thread has processed + * the SparkListenerShutdown message and has exited. Used for testing only. + */ + def isDrained = drainedLock.synchronized { drained } + /** * Stop the listener bus; wait until all listener events are processed by the listener bus * thread. The user has to make sure the listeners finish in a reasonable amount of time. diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index f96fbcae75ced..e656d22c41486 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.concurrent.Semaphore + import scala.collection.mutable import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} @@ -73,17 +75,41 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } test("bus.stop() waits for the event queue to completely drain") { - val sleepyListener = new SleepyListener(1000) val bus = new LiveListenerBus - bus.addListener(sleepyListener) - (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) } + val blockingListener = new BlockingListener(bus) + val sem = new Semaphore(0) + bus.addListener(blockingListener) + bus.post(SparkListenerJobEnd(0, JobSucceeded)) bus.start() - // since the handler is just thread sleep, the queue should not drain immediately - assert(!bus.waitUntilEmpty(0)) - bus.stop() - // bus.stop() should wait until the event queue is drained, ensuring no events are lost - assert(bus.waitUntilEmpty(0)) + // the queue should not drain immediately + assert(!bus.isDrained) + + new Thread("ListenerBusStopper") { + override def run() { + // stop() would block until cond.notify() is called in the below + bus.stop() + sem.release() + } + }.start() + + val startTime = System.currentTimeMillis() + val waitTime = 200 + var done = false + while (!done) { + if (System.currentTimeMillis() > startTime + waitTime) { + bus.synchronized { + bus.notify() + } + done = true + } else { + // bus.stop() should wait until the event queue is drained + // ensuring no events are lost + assert(!bus.isDrained) + } + } + sem.acquire() + assert(bus.isDrained) } test("basic creation of StageInfo") { @@ -297,9 +323,9 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } } - class SleepyListener(val sleepTime: Long) extends SparkListener { - override def onJobEnd(job: SparkListenerJobEnd) = { - Thread.sleep(sleepTime) + class BlockingListener(cond: AnyRef) extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd) = { + cond.synchronized { cond.wait() } } } From 8f6594927fe19dc060a8a02b0727191cfdcc71a5 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Sun, 30 Mar 2014 08:39:50 -0700 Subject: [PATCH 4/4] Minor comment fix. --- .../scala/org/apache/spark/scheduler/SparkListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index e656d22c41486..3807f4b14c461 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -87,7 +87,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc new Thread("ListenerBusStopper") { override def run() { - // stop() would block until cond.notify() is called in the below + // stop() would block until notify() is called below bus.stop() sem.release() }