diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 1366251d0618..a7007a5ecf93 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS} import org.apache.spark.metrics.source.Source import org.apache.spark.scheduler._ -import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} +import org.apache.spark.util.{Clock, ListenerEventExecutor, SystemClock, ThreadUtils, Utils} /** * An agent that dynamically allocates and removes executors based on the workload. @@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager( * the scheduling task. */ def start(): Unit = { - listenerBus.addListener(listener) + listenerBus.addListener(listener, ListenerEventExecutor.ExecutorAllocationManagerGroup) val scheduleTask = new Runnable() { override def run(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 5242ab6f5523..e0a53ab5ef48 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -63,7 +63,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) this(sc, new SystemClock) } - sc.addSparkListener(this) + sc.listenerBus.addListener(this, ListenerEventExecutor.HeartBeatReceiverGroup) override val rpcEnv: RpcEnv = sc.env.rpcEnv diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index efb5f9d501e4..06908a14fc46 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -532,7 +532,7 @@ class SparkContext(config: SparkConf) extends Logging { new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration) logger.start() - listenerBus.addListener(logger) + listenerBus.addListener(logger, ListenerEventExecutor.EventLoggingGroup) Some(logger) } else { None @@ -1874,7 +1874,7 @@ class SparkContext(config: SparkConf) extends Logging { def stop(): Unit = { if (LiveListenerBus.withinListenerThread.value) { throw new SparkException( - s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}") + s"Cannot stop SparkContext within listener event executor thread") } // Use the stopping variable to ensure no contention for the stop scenario. // Still track the stopped variable for use elsewhere in the code. @@ -2329,7 +2329,7 @@ class SparkContext(config: SparkConf) extends Logging { " parameter from breaking Spark's ability to find a valid constructor.") } } - listenerBus.addListener(listener) + listenerBus.addListener(listener, ListenerEventExecutor.DefaultUserEventListenerGroup) logInfo(s"Registered listener $className") } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 5533f7b1f236..7a791bd64435 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -18,13 +18,12 @@ package org.apache.spark.scheduler import java.util.concurrent._ -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.util.DynamicVariable import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.config._ -import org.apache.spark.util.Utils + /** * Asynchronously passes SparkListenerEvents to registered SparkListeners. @@ -37,12 +36,9 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa self => - import LiveListenerBus._ - // Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() - private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) + override lazy val eventQueueSize = validateAndGetQueueSize() private def validateAndGetQueueSize(): Int = { val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) @@ -52,102 +48,8 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa queueSize } - // Indicate if `start()` is called - private val started = new AtomicBoolean(false) - // Indicate if `stop()` is called - private val stopped = new AtomicBoolean(false) - - /** A counter for dropped events. It will be reset every time we log it. */ - private val droppedEventsCounter = new AtomicLong(0L) - - /** When `droppedEventsCounter` was logged last time in milliseconds. */ - @volatile private var lastReportTimestamp = 0L - - // Indicate if we are processing some event - // Guarded by `self` - private var processingEvent = false - - private val logDroppedEvent = new AtomicBoolean(false) - - // A counter that represents the number of events produced and consumed in the queue - private val eventLock = new Semaphore(0) - - private val listenerThread = new Thread(name) { - setDaemon(true) - override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { - LiveListenerBus.withinListenerThread.withValue(true) { - while (true) { - eventLock.acquire() - self.synchronized { - processingEvent = true - } - try { - val event = eventQueue.poll - if (event == null) { - // Get out of the while loop and shutdown the daemon thread - if (!stopped.get) { - throw new IllegalStateException("Polling `null` from eventQueue means" + - " the listener bus has been stopped. So `stopped` must be true") - } - return - } - postToAll(event) - } finally { - self.synchronized { - processingEvent = false - } - } - } - } - } - } - - /** - * Start sending events to attached listeners. - * - * This first sends out all buffered events posted before this listener bus has started, then - * listens for any additional events asynchronously while the listener bus is still running. - * This should only be called once. - * - */ - def start(): Unit = { - if (started.compareAndSet(false, true)) { - listenerThread.start() - } else { - throw new IllegalStateException(s"$name already started!") - } - } - def post(event: SparkListenerEvent): Unit = { - if (stopped.get) { - // Drop further events to make `listenerThread` exit ASAP - logError(s"$name has already stopped! Dropping event $event") - return - } - val eventAdded = eventQueue.offer(event) - if (eventAdded) { - eventLock.release() - } else { - onDropEvent(event) - droppedEventsCounter.incrementAndGet() - } - - val droppedEvents = droppedEventsCounter.get - if (droppedEvents > 0) { - // Don't log too frequently - if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { - // There may be multiple threads trying to decrease droppedEventsCounter. - // Use "compareAndSet" to make sure only one thread can win. - // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and - // then that thread will update it. - if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { - val prevLastReportTimestamp = lastReportTimestamp - lastReportTimestamp = System.currentTimeMillis() - logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + - new java.util.Date(prevLastReportTimestamp)) - } - } - } + postToAll(event) } /** @@ -159,7 +61,8 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa @throws(classOf[TimeoutException]) def waitUntilEmpty(timeoutMillis: Long): Unit = { val finishTime = System.currentTimeMillis + timeoutMillis - while (!queueIsEmpty) { + + while (!isListenerBusEmpty) { if (System.currentTimeMillis > finishTime) { throw new TimeoutException( s"The event queue is not empty after $timeoutMillis milliseconds") @@ -169,60 +72,10 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa Thread.sleep(10) } } - - /** - * For testing only. Return whether the listener daemon thread is still alive. - * Exposed for testing. - */ - def listenerThreadIsAlive: Boolean = listenerThread.isAlive - - /** - * Return whether the event queue is empty. - * - * The use of synchronized here guarantees that all events that once belonged to this queue - * have already been processed by all attached listeners, if this returns true. - */ - private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent } - - /** - * Stop the listener bus. It will wait until the queued events have been processed, but drop the - * new events after stopping. - */ - def stop(): Unit = { - if (!started.get()) { - throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") - } - if (stopped.compareAndSet(false, true)) { - // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know - // `stop` is called. - eventLock.release() - listenerThread.join() - } else { - // Keep quiet - } - } - - /** - * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be - * notified with the dropped events. - * - * Note: `onDropEvent` can be called in any thread. - */ - def onDropEvent(event: SparkListenerEvent): Unit = { - if (logDroppedEvent.compareAndSet(false, true)) { - // Only log the following message once to avoid duplicated annoying logs. - logError("Dropping SparkListenerEvent because no remaining room in event queue. " + - "This likely means one of the SparkListeners is too slow and cannot keep up with " + - "the rate at which tasks are being started by the scheduler.") - } - } } private[spark] object LiveListenerBus { // Allows for Context to check whether stop() call is made within listener thread val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) - - /** The thread name of Spark listener bus */ - val name = "SparkListenerBus" } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 08e05ae0c095..51dc8a59dc26 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -82,7 +82,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { currentLine = entry._1 lineNumber = entry._2 + 1 - postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine))) + postToAllSync(JsonProtocol.sparkEventFromJson(parse(currentLine))) } catch { case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) => // Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1. diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 7d31ac54a717..372b93fa9301 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -178,7 +178,7 @@ private[spark] object SparkUI { Utils.getContextOrSparkClassLoader).asScala listenerFactories.foreach { listenerFactory => val listeners = listenerFactory.createListeners(conf, sparkUI) - listeners.foreach(listenerBus.addListener) + listeners.foreach(l => listenerBus.addListener(l)) } sparkUI } diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index 79fc2e94599c..9b43dd1a46b7 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -17,27 +17,156 @@ package org.apache.spark.util -import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent._ +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import java.util.concurrent.locks.{Condition, ReentrantLock} import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal +import com.google.common.util.concurrent.ThreadFactoryBuilder + import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.LiveListenerBus + +private class ListenerEventExecutor[L <: AnyRef] (listenerName: String, queueCapacity: Int) + extends Logging { + private val threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat(listenerName + "-event-executor") + .build() + val listeners = new CopyOnWriteArrayList[L]() + /** Holds the events to be processed by this listener. */ + private val eventQueue = new LinkedBlockingQueue[Runnable](queueCapacity) + /** + * A single threaded executor service guarantees ordered processing + * of the events per listener. + */ + private val executorService: ThreadPoolExecutor = + new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, eventQueue, threadFactory) + /** A counter for dropped events. It will be reset every time we log it. */ + private val droppedEventsCounter = new AtomicLong(0L) + /** When `droppedEventsCounter` was logged last time in milliseconds. */ + private var lastReportTimestamp = 0L + /** Indicates if we are in the middle of processing some event */ + private val processingEvent = new AtomicBoolean(false) + /** + * Indicates if the event executor is started. The executor thread will be + * blocked on the condition variable until the event executor is started to + * guarantee that we do not process any event before starting the event executor. + */ + private val isStarted = new AtomicBoolean(false) + private val lock = new ReentrantLock() + /** Condition variable which is signaled once the event executor is started */ + private val startCondition: Condition = lock.newCondition + + def addListener(listener: L): Unit = { + listeners.add(listener) + } + + def removeListener(listener: L): Unit = { + listeners.remove(listener) + } + + def start(): Unit = { + isStarted.set(true) + lock.lock() + try { + startCondition.signalAll() + } finally { + lock.unlock() + } + } + + private[this] def waitForStart(): Unit = { + lock.lock() + try { + while (!isStarted.get()) { + startCondition.await() + } + } finally { + lock.unlock() + } + } + + def submit(task: Runnable): Unit = { + try { + executorService.submit(new Runnable { + override def run(): Unit = LiveListenerBus.withinListenerThread.withValue(true) { + waitForStart() + processingEvent.set(true) + task.run() + processingEvent.set(false) + } + }) + } catch { + case e: RejectedExecutionException => + droppedEventsCounter.incrementAndGet() + if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { + val droppedEvents = droppedEventsCounter.get + // There may be multiple threads trying to decrease droppedEventsCounter. + // Use "compareAndSet" to make sure only one thread can win. + // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and + // then that thread will update it. + if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { + droppedEventsCounter.set(0) + val prevLastReportTimestamp = lastReportTimestamp + lastReportTimestamp = System.currentTimeMillis() + logError(s"Dropping $droppedEvents SparkListenerEvent since " + + new java.util.Date(prevLastReportTimestamp) + + " because no remaining room in event queue. This likely means" + + s" $listenerName event processor is too slow and cannot keep up " + + "with the rate at which tasks are being started by the scheduler.") + } + } + } + } + + private[this] def isProcessingEvent: Boolean = processingEvent.get() + + def isEmpty: Boolean = { + executorService.getQueue.size() == 0 && !isProcessingEvent + } + + def stop(): Unit = { + executorService.shutdown() + executorService.awaitTermination(10, TimeUnit.SECONDS) + } +} /** * An event bus which posts events to its listeners. */ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { - // Marked `private[spark]` for access in tests. - private[spark] val listeners = new CopyOnWriteArrayList[L] + // Cap the capacity of the event queue so we get an explicit error (rather than + // an OOM exception) if it's perpetually being added to more quickly than it's being drained. + protected def eventQueueSize = 10000 + private val eventGroupToEventExecutors = + new ConcurrentHashMap[String, ListenerEventExecutor[L]] () + + // Indicate if `start()` is called + private val started = new AtomicBoolean(false) + // Indicate if `stop()` is called + private val stopped = new AtomicBoolean(false) /** * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ - final def addListener(listener: L): Unit = { - listeners.add(listener) + final def addListener( + listener: L, eventListenerGroup: String = ListenerEventExecutor.DefaultEventListenerGroup): + Unit = synchronized { + var listenerEventExecutor = eventGroupToEventExecutors.get(eventListenerGroup) + if (listenerEventExecutor == null) { + listenerEventExecutor = + new ListenerEventExecutor[L](listener.getClass.getName, eventQueueSize) + eventGroupToEventExecutors.put(eventListenerGroup, listenerEventExecutor) + + } + listenerEventExecutor.addListener(listener) + if (started.get()) { + listenerEventExecutor.start + } } /** @@ -45,25 +174,65 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { * in any thread. */ final def removeListener(listener: L): Unit = { - listeners.remove(listener) + for (eventExecutor <- eventGroupToEventExecutors.values().asScala) { + eventExecutor.removeListener(listener) + } + } + + /** + * For testing only. Returns whether there is any event pending to be processed by + * any of the existing listener + */ + def isListenerBusEmpty: Boolean = { + for (eventExecutor <- eventGroupToEventExecutors.values().asScala) { + if (!eventExecutor.isEmpty) { + return false + } + } + true } /** - * Post the event to all registered listeners. The `postToAll` caller should guarantee calling - * `postToAll` in the same thread for all events. + * Posts the event to all registered listeners. This is an async call and it does not + * processes the event itself. Processing of the event is done in a separate thread in + * the {@link ListenerEventExecutor}. */ final def postToAll(event: E): Unit = { - // JavaConverters can create a JIterableWrapper if we use asScala. - // However, this method will be called frequently. To avoid the wrapper cost, here we use - // Java Iterator directly. - val iter = listeners.iterator - while (iter.hasNext) { - val listener = iter.next() - try { - doPostEvent(listener, event) - } catch { - case NonFatal(e) => - logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) + for (listenerEventProcessor <- eventGroupToEventExecutors.values().asScala) { + // JavaConverters can create a JIterableWrapper if we use asScala. + // However, this method will be called frequently. To avoid the wrapper cost, here we use + // Java Iterator directly. + val iter = listenerEventProcessor.listeners.iterator() + while (iter.hasNext) { + val listener = iter.next() + listenerEventProcessor.submit(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + doPostEvent(listener, event) + } + }) + } + } + } + + /** + * For testing only. Post the event to all registered listeners. + * This guarantees processing the event in the same thread for all + * events. + */ + final def postToAllSync(event: E): Unit = { + for (listenerEventProcessor <- eventGroupToEventExecutors.values().asScala) { + // JavaConverters can create a JIterableWrapper if we use asScala. + // However, this method will be called frequently. To avoid the wrapper cost, here we use + // Java Iterator directly. + val iter = listenerEventProcessor.listeners.iterator() + while (iter.hasNext) { + val listener = iter.next() + try { + doPostEvent(listener, event) + } catch { + case NonFatal(e) => + logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) + } } } } @@ -76,7 +245,55 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = { val c = implicitly[ClassTag[T]].runtimeClass - listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq + listeners().toSeq.filter(_.getClass == c).map(_.asInstanceOf[T]) + } + + private[spark] def listeners(): Seq[L] = { + eventGroupToEventExecutors.values.asScala.map(l => l.listeners.asScala).flatten.toSeq } + /** + * Start sending events to attached listeners. + * + * This first sends out all buffered events posted before this listener bus has started, then + * listens for any additional events asynchronously while the listener bus is still running. + * This should only be called once. + * + */ + def start(): Unit = { + if (!started.compareAndSet(false, true)) { + throw new IllegalStateException(s" already started!") + } + for (eventExecutor <- eventGroupToEventExecutors.values().asScala) { + eventExecutor.start() + } + } + + /** + * Stop the listener bus. It will wait until the queued events have been processed, but drop the + * new events after stopping. + */ + def stop(): Unit = { + if (!started.get()) { + throw new IllegalStateException(s"Attempted to stop hat has not yet started!") + } + if (stopped.compareAndSet(false, true)) { + } else { + // Keep quiet + } + val iter = eventGroupToEventExecutors.values().iterator() + while (iter.hasNext) { + iter.next().stop() + } + } +} + +private[spark] object ListenerEventExecutor { + val DefaultEventListenerGroup = "default-event-listener" + val DefaultUserEventListenerGroup = "default-user-event-listener" + val ExecutorAllocationManagerGroup = "executor-allocation-manager-listener" + val HeartBeatReceiverGroup = "heart-beat-receiver-listener" + val EventLoggingGroup = "event-logging-listener" + // Allows for Context to check whether stop() call is made within listener thread } + diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index ec409712b953..bc9d3d2fe458 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -95,7 +95,7 @@ class ExecutorAllocationManagerSuite test("add executors") { sc = createSparkContext(1, 10, 1) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Keep adding until the limit is reached assert(numExecutorsTarget(manager) === 1) @@ -140,7 +140,7 @@ class ExecutorAllocationManagerSuite test("add executors capped by num pending tasks") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(0, 5))) // Verify that we're capped at number of tasks in the stage assert(numExecutorsTarget(manager) === 0) @@ -156,10 +156,10 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(1, 3))) + sc.listenerBus.postToAllSync(SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) - sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAllSync(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 6) @@ -172,9 +172,9 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that re-running a task doesn't blow things up - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3))) - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(2, 3))) + sc.listenerBus.postToAllSync(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAllSync(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 9) assert(numExecutorsToAdd(manager) === 2) @@ -183,7 +183,7 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task once we're at our limit doesn't blow things up - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1"))) + sc.listenerBus.postToAllSync(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1"))) assert(addExecutors(manager) === 0) assert(numExecutorsTarget(manager) === 10) } @@ -191,7 +191,7 @@ class ExecutorAllocationManagerSuite test("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(2, 5))) assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) @@ -202,15 +202,15 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 3) val task1Info = createTaskInfo(0, 0, "executor-1") - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info)) + sc.listenerBus.postToAllSync(SparkListenerTaskStart(2, 0, task1Info)) assert(numExecutorsToAdd(manager) === 4) assert(addExecutors(manager) === 2) val task2Info = createTaskInfo(1, 0, "executor-1") - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info)) - sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task1Info, null)) - sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task2Info, null)) + sc.listenerBus.postToAllSync(SparkListenerTaskStart(2, 0, task2Info)) + sc.listenerBus.postToAllSync(SparkListenerTaskEnd(2, 0, null, Success, task1Info, null)) + sc.listenerBus.postToAllSync(SparkListenerTaskEnd(2, 0, null, Success, task2Info, null)) assert(adjustRequestedExecutors(manager) === -1) } @@ -317,7 +317,7 @@ class ExecutorAllocationManagerSuite test ("interleaving add and remove") { sc = createSparkContext(5, 10, 5) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Add a few executors assert(addExecutors(manager) === 1) @@ -489,7 +489,7 @@ class ExecutorAllocationManagerSuite val clock = new ManualClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Scheduler queue backlogged onSchedulerBacklogged(manager) @@ -602,26 +602,27 @@ class ExecutorAllocationManagerSuite // Starting a stage should start the add timer val numTasks = 10 - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, numTasks))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(0, numTasks))) assert(addTime(manager) !== NOT_SET) // Starting a subset of the tasks should not cancel the add timer val taskInfos = (0 to numTasks - 1).map { i => createTaskInfo(i, i, "executor-1") } - taskInfos.tail.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) } + taskInfos.tail.foreach { info => sc.listenerBus + .postToAllSync(SparkListenerTaskStart(0, 0, info)) } assert(addTime(manager) !== NOT_SET) // Starting all remaining tasks should cancel the add timer - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfos.head)) + sc.listenerBus.postToAllSync(SparkListenerTaskStart(0, 0, taskInfos.head)) assert(addTime(manager) === NOT_SET) // Start two different stages // The add timer should be canceled only if all tasks in both stages start running - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, numTasks))) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, numTasks))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(1, numTasks))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(2, numTasks))) assert(addTime(manager) !== NOT_SET) - taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, info)) } + taskInfos.foreach { info => sc.listenerBus.postToAllSync(SparkListenerTaskStart(1, 0, info)) } assert(addTime(manager) !== NOT_SET) - taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, info)) } + taskInfos.foreach { info => sc.listenerBus.postToAllSync(SparkListenerTaskStart(2, 0, info)) } assert(addTime(manager) === NOT_SET) } @@ -635,22 +636,22 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).size === 5) // Starting a task cancel the remove timer for that executor - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) + sc.listenerBus.postToAllSync(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAllSync(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) + sc.listenerBus.postToAllSync(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) assert(removeTimes(manager).size === 3) assert(!removeTimes(manager).contains("executor-1")) assert(!removeTimes(manager).contains("executor-2")) // Finishing all tasks running on an executor should start the remove timer for that executor - sc.listenerBus.postToAll(SparkListenerTaskEnd( + sc.listenerBus.postToAllSync(SparkListenerTaskEnd( 0, 0, "task-type", Success, createTaskInfo(0, 0, "executor-1"), new TaskMetrics)) - sc.listenerBus.postToAll(SparkListenerTaskEnd( + sc.listenerBus.postToAllSync(SparkListenerTaskEnd( 0, 0, "task-type", Success, createTaskInfo(2, 2, "executor-2"), new TaskMetrics)) assert(removeTimes(manager).size === 4) assert(!removeTimes(manager).contains("executor-1")) // executor-1 has not finished yet assert(removeTimes(manager).contains("executor-2")) - sc.listenerBus.postToAll(SparkListenerTaskEnd( + sc.listenerBus.postToAllSync(SparkListenerTaskEnd( 0, 0, "task-type", Success, createTaskInfo(1, 1, "executor-1"), new TaskMetrics)) assert(removeTimes(manager).size === 5) assert(removeTimes(manager).contains("executor-1")) // executor-1 has now finished @@ -663,13 +664,13 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).isEmpty) // New executors have registered - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + sc.listenerBus.postToAllSync(SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(removeTimes(manager).contains("executor-1")) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + sc.listenerBus.postToAllSync(SparkListenerExecutorAdded( 0L, "executor-2", new ExecutorInfo("host2", 1, Map.empty))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) @@ -677,14 +678,14 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).contains("executor-2")) // Existing executors have disconnected - sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", "")) + sc.listenerBus.postToAllSync(SparkListenerExecutorRemoved(0L, "executor-1", "")) assert(executorIds(manager).size === 1) assert(!executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(!removeTimes(manager).contains("executor-1")) // Unknown executor has disconnected - sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", "")) + sc.listenerBus.postToAllSync(SparkListenerExecutorRemoved(0L, "executor-3", "")) assert(executorIds(manager).size === 1) assert(removeTimes(manager).size === 1) } @@ -695,8 +696,8 @@ class ExecutorAllocationManagerSuite assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + sc.listenerBus.postToAllSync(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAllSync(SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) @@ -708,15 +709,15 @@ class ExecutorAllocationManagerSuite val manager = sc.executorAllocationManager.get assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + sc.listenerBus.postToAllSync(SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAllSync(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 0) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + sc.listenerBus.postToAllSync(SparkListenerExecutorAdded( 0L, "executor-2", new ExecutorInfo("host1", 1, Map.empty))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) @@ -729,7 +730,7 @@ class ExecutorAllocationManagerSuite sc = createSparkContext(0, 100000, 0) val manager = sc.executorAllocationManager.get val stage1 = createStageInfo(0, 1000) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(stage1)) assert(addExecutors(manager) === 1) assert(addExecutors(manager) === 2) @@ -740,12 +741,12 @@ class ExecutorAllocationManagerSuite onExecutorAdded(manager, s"executor-$i") } assert(executorIds(manager).size === 15) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) + sc.listenerBus.postToAllSync(SparkListenerStageCompleted(stage1)) adjustRequestedExecutors(manager) assert(numExecutorsTarget(manager) === 0) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 1000))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(1, 1000))) addExecutors(manager) assert(numExecutorsTarget(manager) === 16) } @@ -762,7 +763,7 @@ class ExecutorAllocationManagerSuite // Verify whether the initial number of executors is kept with no pending tasks assert(numExecutorsTarget(manager) === 3) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(1, 2))) clock.advance(100L) assert(maxNumExecutorsNeeded(manager) === 2) @@ -812,7 +813,7 @@ class ExecutorAllocationManagerSuite Seq.empty ) val stageInfo1 = createStageInfo(1, 5, localityPreferences1) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo1)) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(stageInfo1)) assert(localityAwareTasks(manager) === 3) assert(hostToLocalTaskCount(manager) === @@ -824,13 +825,13 @@ class ExecutorAllocationManagerSuite Seq.empty ) val stageInfo2 = createStageInfo(2, 3, localityPreferences2) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo2)) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(stageInfo2)) assert(localityAwareTasks(manager) === 5) assert(hostToLocalTaskCount(manager) === Map("host1" -> 2, "host2" -> 4, "host3" -> 4, "host4" -> 3, "host5" -> 2)) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo1)) + sc.listenerBus.postToAllSync(SparkListenerStageCompleted(stageInfo1)) assert(localityAwareTasks(manager) === 2) assert(hostToLocalTaskCount(manager) === Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2)) @@ -841,16 +842,16 @@ class ExecutorAllocationManagerSuite val manager = sc.executorAllocationManager.get assert(maxNumExecutorsNeeded(manager) === 0) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(0, 1))) assert(maxNumExecutorsNeeded(manager) === 1) val taskInfo = createTaskInfo(1, 1, "executor-1") - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfo)) + sc.listenerBus.postToAllSync(SparkListenerTaskStart(0, 0, taskInfo)) assert(maxNumExecutorsNeeded(manager) === 1) // If the task is failed, we expect it to be resubmitted later. val taskEndReason = ExceptionFailure(null, null, null, null, None) - sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null)) + sc.listenerBus.postToAllSync(SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null)) assert(maxNumExecutorsNeeded(manager) === 1) } @@ -862,7 +863,7 @@ class ExecutorAllocationManagerSuite // Allocation manager is reset when adding executor requests are sent without reporting back // executor added. - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(0, 10))) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 2) @@ -877,7 +878,7 @@ class ExecutorAllocationManagerSuite assert(executorIds(manager) === Set.empty) // Allocation manager is reset when executors are added. - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10))) + sc.listenerBus.postToAllSync(SparkListenerStageSubmitted(createStageInfo(0, 10))) addExecutors(manager) addExecutors(manager) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 4c3d0b102152..206b860c4510 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -155,7 +155,9 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit extraConf.foreach { case (k, v) => conf.set(k, v) } val logName = compressionCodec.map("test-" + _).getOrElse("test") val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) - val listenerBus = new LiveListenerBus(sc) + val listenerBus = new LiveListenerBus(sc) { + override lazy val eventQueueSize = 10000 + } val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey", None) val applicationEnd = SparkListenerApplicationEnd(1000L) @@ -164,8 +166,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit eventLogger.start() listenerBus.start() listenerBus.addListener(eventLogger) - listenerBus.postToAll(applicationStart) - listenerBus.postToAll(applicationEnd) + listenerBus.postToAllSync(applicationStart) + listenerBus.postToAllSync(applicationEnd) eventLogger.stop() // Verify file contains exactly the two events logged diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index e8a88d4909a8..f5f39db50b95 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler import java.util.concurrent.Semaphore import scala.collection.mutable -import scala.collection.JavaConverters._ import org.scalatest.Matchers @@ -28,6 +27,7 @@ import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.util.{ResetSystemProperties, RpcUtils} + class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers with ResetSystemProperties { @@ -364,11 +364,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match bus.start() // Post events to all listeners, and wait until the queue is drained - (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } - bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - // The exception should be caught, and the event should be propagated to other listeners - assert(bus.listenerThreadIsAlive) + (1 to 5).foreach { _ => + bus.postToAllSync(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } assert(jobCounter1.count === 5) assert(jobCounter2.count === 5) } @@ -381,10 +379,10 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val conf = new SparkConf().setMaster("local").setAppName("test") .set("spark.extraListeners", listeners.map(_.getName).mkString(",")) sc = new SparkContext(conf) - sc.listenerBus.listeners.asScala.count(_.isInstanceOf[BasicJobCounter]) should be (1) - sc.listenerBus.listeners.asScala + sc.listenerBus.listeners.count(_.isInstanceOf[BasicJobCounter]) should be (1) + sc.listenerBus.listeners .count(_.isInstanceOf[ListenerThatAcceptsSparkConf]) should be (1) - sc.listenerBus.listeners.asScala + sc.listenerBus.listeners .count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be (1) } diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index f6c8418ba3ac..15aa73e3f2a3 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -57,7 +57,7 @@ class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAn // 2 RDDs are known, but none are cached val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), Seq.empty, "details") - bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) + bus.postToAllSync(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 2) assert(storageListener.rddInfoList.isEmpty) @@ -68,7 +68,7 @@ class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAn rddInfo3Cached.numCachedPartitions = 1 val stageInfo1 = new StageInfo( 1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), Seq.empty, "details") - bus.postToAll(SparkListenerStageSubmitted(stageInfo1)) + bus.postToAllSync(SparkListenerStageSubmitted(stageInfo1)) assert(storageListener._rddInfoMap.size === 4) assert(storageListener.rddInfoList.size === 2) @@ -76,12 +76,12 @@ class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAn val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY, Seq(10)) rddInfo0Cached.numCachedPartitions = 1 val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), Seq.empty, "details") - bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached)) + bus.postToAllSync(SparkListenerStageSubmitted(stageInfo0Cached)) assert(storageListener._rddInfoMap.size === 4) assert(storageListener.rddInfoList.size === 2) // We only keep around the RDDs that are cached - bus.postToAll(SparkListenerStageCompleted(stageInfo0)) + bus.postToAllSync(SparkListenerStageCompleted(stageInfo0)) assert(storageListener._rddInfoMap.size === 2) assert(storageListener.rddInfoList.size === 2) } @@ -93,16 +93,16 @@ class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAn rddInfo1Cached.numCachedPartitions = 1 val stageInfo0 = new StageInfo( 0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), Seq.empty, "details") - bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) + bus.postToAllSync(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 2) assert(storageListener.rddInfoList.size === 2) - bus.postToAll(SparkListenerUnpersistRDD(0)) + bus.postToAllSync(SparkListenerUnpersistRDD(0)) assert(storageListener._rddInfoMap.size === 1) assert(storageListener.rddInfoList.size === 1) - bus.postToAll(SparkListenerUnpersistRDD(4)) // doesn't exist + bus.postToAllSync(SparkListenerUnpersistRDD(4)) // doesn't exist assert(storageListener._rddInfoMap.size === 1) assert(storageListener.rddInfoList.size === 1) - bus.postToAll(SparkListenerUnpersistRDD(1)) + bus.postToAllSync(SparkListenerUnpersistRDD(1)) assert(storageListener._rddInfoMap.size === 0) assert(storageListener.rddInfoList.size === 0) } @@ -113,8 +113,8 @@ class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAn val myRddInfo2 = rddInfo2 val stageInfo0 = new StageInfo( 0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), Seq.empty, "details") - bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) - bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) + bus.postToAllSync(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) + bus.postToAllSync(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 3) assert(storageListener.rddInfoList.size === 0) // not cached assert(!storageListener._rddInfoMap(0).isCached) @@ -165,26 +165,26 @@ class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAn val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details") val blockUpdateInfos1 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(0, 1), memOnly, 100L, 0L)) val blockUpdateInfos2 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(1, 1), memOnly, 200L, 0L)) - bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) - bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) + bus.postToAllSync(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) + bus.postToAllSync(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener.rddInfoList.size === 0) postUpdateBlocks(bus, blockUpdateInfos1) assert(storageListener.rddInfoList.size === 1) - bus.postToAll(SparkListenerStageSubmitted(stageInfo1)) + bus.postToAllSync(SparkListenerStageSubmitted(stageInfo1)) assert(storageListener.rddInfoList.size === 1) - bus.postToAll(SparkListenerStageCompleted(stageInfo0)) + bus.postToAllSync(SparkListenerStageCompleted(stageInfo0)) assert(storageListener.rddInfoList.size === 1) postUpdateBlocks(bus, blockUpdateInfos2) assert(storageListener.rddInfoList.size === 2) - bus.postToAll(SparkListenerStageCompleted(stageInfo1)) + bus.postToAllSync(SparkListenerStageCompleted(stageInfo1)) assert(storageListener.rddInfoList.size === 2) } test("verify StorageTab still contains a renamed RDD") { val rddInfo = new RDDInfo(0, "original_name", 1, memOnly, Seq(4)) val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo), Seq.empty, "details") - bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) - bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) + bus.postToAllSync(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) + bus.postToAllSync(SparkListenerStageSubmitted(stageInfo0)) val blockUpdateInfos1 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(0, 1), memOnly, 100L, 0L)) postUpdateBlocks(bus, blockUpdateInfos1) assert(storageListener.rddInfoList.size == 1) @@ -192,7 +192,7 @@ class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAn val newName = "new_name" val rddInfoRenamed = new RDDInfo(0, newName, 1, memOnly, Seq(4)) val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfoRenamed), Seq.empty, "details") - bus.postToAll(SparkListenerStageSubmitted(stageInfo1)) + bus.postToAllSync(SparkListenerStageSubmitted(stageInfo1)) assert(storageListener.rddInfoList.size == 1) assert(storageListener.rddInfoList.head.name == newName) } @@ -200,7 +200,7 @@ class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAn private def postUpdateBlocks( bus: SparkListenerBus, blockUpdateInfos: Seq[BlockUpdatedInfo]): Unit = { blockUpdateInfos.foreach { blockUpdateInfo => - bus.postToAll(SparkListenerBlockUpdated(blockUpdateInfo)) + bus.postToAllSync(SparkListenerBlockUpdated(blockUpdateInfo)) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index a2153d27e9fe..85f194e37eae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.ListenerBus * and StreamingQueryManager. So this bus will dispatch events to registered listeners for only * those queries that were started in the associated SparkSession. */ -class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) +class StreamingQueryListenerBus(val sparkListenerBus: LiveListenerBus) extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] { import StreamingQueryListener._ @@ -69,7 +69,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) activeQueryRunIds.synchronized { activeQueryRunIds += s.runId } sparkListenerBus.post(s) // post to local listeners to trigger callbacks - postToAll(s) + postToAllSync(s) case _ => sparkListenerBus.post(event) } @@ -83,7 +83,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) // we need to ignore QueryStartedEvent if this method is called within SparkListenerBus // thread if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) { - postToAll(e) + postToAllSync(e) } case _ => } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 0a4c141e5be3..4a15eb838320 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -661,7 +661,7 @@ class StreamingContext private[streaming] ( var shutdownHookRefToRemove: AnyRef = null if (LiveListenerBus.withinListenerThread.value) { throw new SparkException( - s"Cannot stop StreamingContext within listener thread of ${LiveListenerBus.name}") + "Cannot stop SparkContext within listener event executor thread") } synchronized { // The state should always be Stopped after calling `stop()`, even if we haven't started yet diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 5fb0bd057d0f..a6cbf6921d20 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.ListenerBus * registers itself with Spark listener bus, so that it can receive WrappedStreamingListenerEvents, * unwrap them as StreamingListenerEvent and dispatch them to StreamingListeners. */ -private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) +private[streaming] class StreamingListenerBus(val sparkListenerBus: LiveListenerBus) extends SparkListener with ListenerBus[StreamingListener, StreamingListenerEvent] { /** @@ -40,7 +40,7 @@ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { case WrappedStreamingListenerEvent(e) => - postToAll(e) + postToAllSync(e) case _ => } } @@ -75,16 +75,18 @@ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) * Register this one with the Spark listener bus so that it can receive Streaming events and * forward them to StreamingListeners. */ - def start(): Unit = { + override def start(): Unit = { sparkListenerBus.addListener(this) // for getting callbacks on spark events + super.start() } /** * Unregister this one with the Spark listener bus and all StreamingListeners won't receive any * events after that. */ - def stop(): Unit = { + override def stop(): Unit = { sparkListenerBus.removeListener(this) + super.stop() } /**