Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.{LinkedBlockingQueue, Semaphore}

import org.apache.spark.Logging

Expand All @@ -36,16 +36,24 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false

// A counter that represents the number of events produced and consumed in the queue
private val eventLock = new Semaphore(0)

private val listenerThread = new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
while (true) {
val event = eventQueue.take
if (event == SparkListenerShutdown) {
// Get out of the while loop and shutdown the daemon thread
return
eventLock.acquire()
// Atomically remove and process this event
LiveListenerBus.this.synchronized {
val event = eventQueue.poll
if (event == SparkListenerShutdown) {
// Get out of the while loop and shutdown the daemon thread
return
}
Option(event).foreach(postToAll)
}
postToAll(event)
}
}
}
Expand All @@ -67,7 +75,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {

def post(event: SparkListenerEvent) {
val eventAdded = eventQueue.offer(event)
if (!eventAdded && !queueFullErrorMessageLogged) {
if (eventAdded) {
eventLock.release()
} else if (!queueFullErrorMessageLogged) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
"rate at which tasks are being started by the scheduler.")
Expand All @@ -76,13 +86,13 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}

/**
* Waits until there are no more events in the queue, or until the specified time has elapsed.
* Used for testing only. Returns true if the queue has emptied and false is the specified time
* For testing only. Wait until there are no more events in the queue, or until the specified
* time has elapsed. Return true if the queue has emptied and false is the specified time
* elapsed before the queue emptied.
*/
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!eventQueue.isEmpty) {
while (!queueIsEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
}
Expand All @@ -93,6 +103,14 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
true
}

/**
* Return whether the event queue is empty.
*
* The use of synchronized here guarantees that all events that once belonged to this queue
* have already been processed by all attached listeners, if this returns true.
*/
def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }

def stop() {
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ import org.apache.spark.SparkContext._
import org.apache.spark.executor.TaskMetrics

class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
with BeforeAndAfter with BeforeAndAfterAll {
with BeforeAndAfter with BeforeAndAfterAll {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

before {
sc = new SparkContext("local", "SparkListenerSuite")
}

override def afterAll {
override def afterAll() {
System.clearProperty("spark.akka.frameSize")
}

Expand Down