From eb486ae4c1d95682c0ebeb62edb242109210312e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 24 Apr 2014 17:21:51 -0700 Subject: [PATCH 1/2] Synchronize accesses to the LiveListenerBus' event queue This guards against the race condition in which we (1) dequeue an event, and (2) check for queue emptiness before (3) actually processing the event in all attached listeners. The solution is to make steps (1) and (3) atomic relatively to (2). --- .../spark/scheduler/LiveListenerBus.scala | 35 ++++++++++++++----- .../spark/scheduler/SparkListenerSuite.scala | 5 +-- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index cbac4c13ca6fe..addadc8f028ae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.{LinkedBlockingQueue, Semaphore} import org.apache.spark.Logging @@ -36,16 +36,24 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) private var queueFullErrorMessageLogged = false private var started = false + + // A counter that represents the number of events produced and consumed in the queue + private val eventLock = new Semaphore(0) + private val listenerThread = new Thread("SparkListenerBus") { setDaemon(true) override def run() { while (true) { - val event = eventQueue.take - if (event == SparkListenerShutdown) { - // Get out of the while loop and shutdown the daemon thread - return + eventLock.acquire() + // Atomically remove and process this event + LiveListenerBus.this.synchronized { + val event = eventQueue.poll + if (event == SparkListenerShutdown) { + // Get out of the while loop and shutdown the daemon thread + return + } + Option(event).foreach(postToAll) } - postToAll(event) } } } @@ -73,16 +81,17 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { "rate at which tasks are being started by the scheduler.") queueFullErrorMessageLogged = true } + eventLock.release() } /** - * Waits until there are no more events in the queue, or until the specified time has elapsed. - * Used for testing only. Returns true if the queue has emptied and false is the specified time + * For testing only. Wait until there are no more events in the queue, or until the specified + * time has elapsed. Return true if the queue has emptied and false is the specified time * elapsed before the queue emptied. */ def waitUntilEmpty(timeoutMillis: Int): Boolean = { val finishTime = System.currentTimeMillis + timeoutMillis - while (!eventQueue.isEmpty) { + while (!queueIsEmpty) { if (System.currentTimeMillis > finishTime) { return false } @@ -93,6 +102,14 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { true } + /** + * Return whether the event queue is empty. + * + * The use of synchronized here guarantees that all events that once belonged to this queue + * have already been processed by all attached listeners, if this returns true. + */ + def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty } + def stop() { if (!started) { throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index ba048ced32a93..4e9fd07e68a21 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -29,7 +29,8 @@ import org.apache.spark.SparkContext._ import org.apache.spark.executor.TaskMetrics class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers - with BeforeAndAfter with BeforeAndAfterAll { + with BeforeAndAfter with BeforeAndAfterAll { + /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 @@ -37,7 +38,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc sc = new SparkContext("local", "SparkListenerSuite") } - override def afterAll { + override def afterAll() { System.clearProperty("spark.akka.frameSize") } From 56dbbcb8f88ab7b48ec4b238a883bb09bec27965 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 24 Apr 2014 18:52:01 -0700 Subject: [PATCH 2/2] Check if event is actually added before releasing semaphore --- .../scala/org/apache/spark/scheduler/LiveListenerBus.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index addadc8f028ae..dec3316bf7745 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -75,13 +75,14 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { def post(event: SparkListenerEvent) { val eventAdded = eventQueue.offer(event) - if (!eventAdded && !queueFullErrorMessageLogged) { + if (eventAdded) { + eventLock.release() + } else if (!queueFullErrorMessageLogged) { logError("Dropping SparkListenerEvent because no remaining room in event queue. " + "This likely means one of the SparkListeners is too slow and cannot keep up with the " + "rate at which tasks are being started by the scheduler.") queueFullErrorMessageLogged = true } - eventLock.release() } /**