From 4a6858f57124a8b405e8ce414f61088a68dc27eb Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 5 Sep 2017 17:07:01 -0700 Subject: [PATCH 01/15] [SPARK-18838][core] Add separate listener queues to LiveListenerBus. This change modifies the live listener bus so that all listeners are added to queues; each queue has its own thread to dispatch events, making it possible to separate slow listeners from other more performance-sensitive ones. The public API has not changed - all listeners added with the existing "addListener" method, which after this change mostly means all user-defined listeners, end up in a default queue. Internally, there's an API allowing listeners to be added to specific queues, and that API is used to separate the internal Spark listeners into 3 categories: application status listeners (e.g. UI), executor management (e.g. dynamic allocation), and the event log. The queueing logic, while abstracted away in a separate class, is kept as much as possible hidden away from consumers. Aside from choosing their queue, there's no code change needed to take advantage of queues. Metrics were also simplified a little bit; the live bus now keeps track of metrics per queue instead of individual listeners. This is mostly to make integration with the existing metrics code in `ListenerBus` easier, without having to refactor the code; that can be done later if queue-level metrics are not enough. Test coverage relies on existing tests; a few tests had to be tweaked because they relied on `LiveListenerBus.postToAll` being synchronous, and the change makes that method asynchronous. Other tests were simplified not to use the asynchronous LiveListenerBus. --- .../spark/ExecutorAllocationManager.scala | 2 +- .../org/apache/spark/HeartbeatReceiver.scala | 2 +- .../scala/org/apache/spark/SparkContext.scala | 9 +- .../spark/scheduler/ListenerQueue.scala | 315 ++++++++++++++++++ .../spark/scheduler/LiveListenerBus.scala | 277 ++++++--------- .../scala/org/apache/spark/ui/SparkUI.scala | 25 +- .../org/apache/spark/util/ListenerBus.scala | 4 +- .../ExecutorAllocationManagerSuite.scala | 128 +++---- .../spark/scheduler/SparkListenerSuite.scala | 81 +++-- .../spark/ui/storage/StorageTabSuite.scala | 4 +- .../spark/sql/internal/SharedState.scala | 3 +- .../spark/streaming/StreamingContext.scala | 3 +- .../scheduler/StreamingListenerBus.scala | 2 +- .../streaming/StreamingContextSuite.scala | 4 +- 14 files changed, 569 insertions(+), 290 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/ListenerQueue.scala diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 7a5fb9a802354..c5ac9401cec24 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager( * the scheduling task. */ def start(): Unit = { - listenerBus.addListener(listener) + listenerBus.addToQueue(listener, LiveListenerBus.EXECUTOR_MGMT_QUEUE) val scheduleTask = new Runnable() { override def run(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 5242ab6f55235..e2340cfc3a57a 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -63,7 +63,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) this(sc, new SystemClock) } - sc.addSparkListener(this) + sc.listenerBus.addToQueue(this, LiveListenerBus.EXECUTOR_MGMT_QUEUE) override val rpcEnv: RpcEnv = sc.env.rpcEnv diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 136f0af7b2c9e..59fd4de20c399 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -419,7 +419,7 @@ class SparkContext(config: SparkConf) extends Logging { // "_jobProgressListener" should be set up before creating SparkEnv because when creating // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them. _jobProgressListener = new JobProgressListener(_conf) - listenerBus.addListener(jobProgressListener) + listenerBus.addToQueue(jobProgressListener, LiveListenerBus.APP_STATUS_QUEUE) // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) @@ -442,7 +442,7 @@ class SparkContext(config: SparkConf) extends Logging { _ui = if (conf.getBoolean("spark.ui.enabled", true)) { - Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, + Some(SparkUI.createLiveUI(this, _conf, _jobProgressListener, _env.securityManager, appName, startTime = startTime)) } else { // For tests, do not enable the UI @@ -522,7 +522,7 @@ class SparkContext(config: SparkConf) extends Logging { new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration) logger.start() - listenerBus.addListener(logger) + listenerBus.addToQueue(logger, "eventLog") Some(logger) } else { None @@ -1879,8 +1879,7 @@ class SparkContext(config: SparkConf) extends Logging { */ def stop(): Unit = { if (LiveListenerBus.withinListenerThread.value) { - throw new SparkException( - s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}") + throw new SparkException(s"Cannot stop SparkContext within listener bus thread.") } // Use the stopping variable to ensure no contention for the stop scenario. // Still track the stopped variable for use elsewhere in the code. diff --git a/core/src/main/scala/org/apache/spark/scheduler/ListenerQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/ListenerQueue.scala new file mode 100644 index 0000000000000..54fb692606a21 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/ListenerQueue.scala @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.{ArrayList, List => JList} +import java.util.concurrent._ +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + +import scala.util.control.NonFatal + +import com.codahale.metrics.{Counter, Gauge, MetricRegistry} + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils + +/** + * An asynchronous queue for events. All events posted to this queue will be delivered to the child + * listeners in a separate thread. + * + * Delivery will only begin when the `start()` method is called. The `stop()` method should be + * called when no more events need to be delivered. + * + * Instances of `ListenerQueue` are listeners themselves, but they're not to be used like regular + * listeners; they are used internally by `LiveListenerBus`, and are tightly coupled to the + * lifecycle of that implementation. + */ +private class ListenerQueue(val name: String, conf: SparkConf) + extends SparkListenerInterface + with Logging { + + import ListenerQueue._ + + private val _listeners = new CopyOnWriteArrayList[SparkListenerInterface]() + + def addListener(l: SparkListenerInterface): Unit = { + _listeners.add(l) + } + + /** + * @return Whether there are remainning listeners in the queue. + */ + def removeListener(l: SparkListenerInterface): Boolean = { + _listeners.remove(l) + !_listeners.isEmpty() + } + + def listeners: JList[SparkListenerInterface] = new ArrayList(_listeners) + + // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if + // it's perpetually being added to more quickly than it's being drained. + private val taskQueue = new LinkedBlockingQueue[SparkListenerInterface => Unit]( + conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + + // Keep the event count separately, so that waitUntilEmpty() can be implemented properly; + // this allows that method to return only when the events in the queue have been fully + // processed (instead of just dequeued). + private val eventCount = new AtomicLong() + + /** A counter for dropped events. It will be reset every time we log it. */ + private val droppedEventsCounter = new AtomicLong(0L) + + /** When `droppedEventsCounter` was logged last time in milliseconds. */ + @volatile private var lastReportTimestamp = 0L + + private val logDroppedEvent = new AtomicBoolean(false) + + private var sc: SparkContext = null + + private val started = new AtomicBoolean(false) + private val stopped = new AtomicBoolean(false) + + private var droppedEvents: Counter = null + + private val dispatchThread = new Thread(s"spark-listener-group-$name") { + setDaemon(true) + override def run(): Unit = Utils.tryOrStopSparkContext(sc) { + dispatch() + } + } + + private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) { + try { + var task: SparkListenerInterface => Unit = taskQueue.take() + while (task != POISON_PILL) { + val it = _listeners.iterator() + while (it.hasNext()) { + val listener = it.next() + try { + task(listener) + } catch { + case NonFatal(e) => + logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) + } + } + eventCount.decrementAndGet() + task = taskQueue.take() + } + eventCount.decrementAndGet() + } catch { + case ie: InterruptedException => + logInfo(s"Stopping listener queue $name.", ie) + } + } + + /** + * Start an asynchronous thread to dispatch events to the underlying listeners. + * + * @param sc Used to stop the SparkContext in case the a listener fails. + * @param metrics Used to report listener performance metrics. + */ + private[scheduler] def start(sc: SparkContext, metrics: LiveListenerBusMetrics): Unit = { + if (started.compareAndSet(false, true)) { + this.sc = sc + this.droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents") + + // Avoid warnings in the logs if this queue is being re-created; it will reuse the same + // gauge as before. + val queueSizeGauge = s"queue.$name.size" + if (metrics.metricRegistry.getGauges().get(queueSizeGauge) == null) { + metrics.metricRegistry.register(queueSizeGauge, new Gauge[Int] { + override def getValue: Int = taskQueue.size() + }) + } + + dispatchThread.start() + } else { + throw new IllegalStateException(s"$name already started!") + } + } + + /** + * Stop the listener bus. It will wait until the queued events have been processed, but new + * events will be dropped. + */ + private[scheduler] def stop(): Unit = { + if (!started.get()) { + throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") + } + if (stopped.compareAndSet(false, true)) { + taskQueue.put(POISON_PILL) + eventCount.incrementAndGet() + } + dispatchThread.join() + } + + private def post(event: SparkListenerEvent)(task: SparkListenerInterface => Unit): Unit = { + if (stopped.get()) { + return + } + + eventCount.incrementAndGet() + if (taskQueue.offer(task)) { + return + } + + eventCount.decrementAndGet() + droppedEvents.inc() + droppedEventsCounter.incrementAndGet() + if (logDroppedEvent.compareAndSet(false, true)) { + // Only log the following message once to avoid duplicated annoying logs. + logError(s"Dropping event from queue $name. " + + "This likely means one of the listeners is too slow and cannot keep up with " + + "the rate at which tasks are being started by the scheduler.") + } + logTrace(s"Dropping event $event") + + val droppedCount = droppedEventsCounter.get + if (droppedCount > 0) { + // Don't log too frequently + if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { + // There may be multiple threads trying to decrease droppedEventsCounter. + // Use "compareAndSet" to make sure only one thread can win. + // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and + // then that thread will update it. + if (droppedEventsCounter.compareAndSet(droppedCount, 0)) { + val prevLastReportTimestamp = lastReportTimestamp + lastReportTimestamp = System.currentTimeMillis() + val previous = new java.util.Date(prevLastReportTimestamp) + logWarning(s"Dropped $droppedEvents events from $name since $previous.") + } + } + } + } + + /** + * For testing only. Wait until there are no more events in the queue. + * + * @return true if the queue is empty. + */ + def waitUntilEmpty(deadline: Long): Boolean = { + while (eventCount.get() != 0) { + if (System.currentTimeMillis > deadline) { + return false + } + Thread.sleep(10) + } + true + } + + override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + post(event)(_.onStageCompleted(event)) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + post(event)(_.onStageSubmitted(event)) + } + + override def onTaskStart(event: SparkListenerTaskStart): Unit = { + post(event)(_.onTaskStart(event)) + } + + override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = { + post(event)(_.onTaskGettingResult(event)) + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { + post(event)(_.onTaskEnd(event)) + } + + override def onJobStart(event: SparkListenerJobStart): Unit = { + post(event)(_.onJobStart(event)) + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { + post(event)(_.onJobEnd(event)) + } + + override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { + post(event)(_.onEnvironmentUpdate(event)) + } + + override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = { + post(event)(_.onBlockManagerAdded(event)) + } + + override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { + post(event)(_.onBlockManagerRemoved(event)) + } + + override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { + post(event)(_.onUnpersistRDD(event)) + } + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { + post(event)(_.onApplicationStart(event)) + } + + override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { + post(event)(_.onApplicationEnd(event)) + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + post(event)(_.onExecutorMetricsUpdate(event)) + } + + override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { + post(event)(_.onExecutorAdded(event)) + } + + override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { + post(event)(_.onExecutorRemoved(event)) + } + + override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { + post(event)(_.onExecutorBlacklisted(event)) + } + + override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = { + post(event)(_.onExecutorUnblacklisted(event)) + } + + override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = { + post(event)(_.onNodeBlacklisted(event)) + } + + override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = { + post(event)(_.onNodeUnblacklisted(event)) + } + + override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { + post(event)(_.onBlockUpdated(event)) + } + + override def onSpeculativeTaskSubmitted(event: SparkListenerSpeculativeTaskSubmitted): Unit = { + post(event)(_.onSpeculativeTaskSubmitted(event)) + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = { + post(event)(_.onOtherEvent(event)) + } + +} + +private object ListenerQueue { + + val POISON_PILL: SparkListenerInterface => Unit = { _ => Unit } + +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 7d5e9809dd7b2..b1478b21f290e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -17,20 +17,22 @@ package org.apache.spark.scheduler +import java.util.{List => JList} import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.reflect.ClassTag import scala.util.DynamicVariable -import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer} +import com.codahale.metrics.{Counter, MetricRegistry, Timer} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source -import org.apache.spark.util.Utils /** * Asynchronously passes SparkListenerEvents to registered SparkListeners. @@ -41,18 +43,11 @@ import org.apache.spark.util.Utils */ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { - self => - import LiveListenerBus._ private var sparkContext: SparkContext = _ - // Cap the capacity of the event queue so we get an explicit error (rather than - // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private val eventQueue = - new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) - - private[spark] val metrics = new LiveListenerBusMetrics(conf, eventQueue) + private[spark] val metrics = new LiveListenerBusMetrics(conf) // Indicate if `start()` is called private val started = new AtomicBoolean(false) @@ -65,53 +60,60 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { /** When `droppedEventsCounter` was logged last time in milliseconds. */ @volatile private var lastReportTimestamp = 0L - // Indicate if we are processing some event - // Guarded by `self` - private var processingEvent = false - - private val logDroppedEvent = new AtomicBoolean(false) - - // A counter that represents the number of events produced and consumed in the queue - private val eventLock = new Semaphore(0) - - private val listenerThread = new Thread(name) { - setDaemon(true) - override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { - LiveListenerBus.withinListenerThread.withValue(true) { - val timer = metrics.eventProcessingTime - while (true) { - eventLock.acquire() - self.synchronized { - processingEvent = true - } - try { - val event = eventQueue.poll - if (event == null) { - // Get out of the while loop and shutdown the daemon thread - if (!stopped.get) { - throw new IllegalStateException("Polling `null` from eventQueue means" + - " the listener bus has been stopped. So `stopped` must be true") - } - return - } - val timerContext = timer.time() - try { - postToAll(event) - } finally { - timerContext.stop() - } - } finally { - self.synchronized { - processingEvent = false - } - } + /** Add a listener to the default queue. */ + override def addListener(listener: SparkListenerInterface): Unit = { + addToQueue(listener, "default") + } + + /** + * Add a listener to a specific queue, creating a new queue if needed. Queues are independent + * of each other (each one uses a separate thread for delivering events), allowing slower + * listeners to be somewhat isolated from others. + */ + def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized { + if (stopped.get()) { + throw new IllegalStateException("LiveListenerBus is stopped.") + } + + queues.asScala.find(_.name == queue) match { + case Some(queue) => + queue.addListener(listener) + + case None => + val newQueue = new ListenerQueue(queue, conf) + newQueue.addListener(listener) + if (started.get() && !stopped.get()) { + newQueue.start(sparkContext, metrics) } + super.addListener(newQueue) + } + } + + override def removeListener(listener: SparkListenerInterface): Unit = synchronized { + // Remove listener from all queues it was added to, and stop queues that have become empty. + queues.asScala + .filter(!_.removeListener(listener)) + .foreach { toRemove => + if (started.get() && !stopped.get()) { + toRemove.stop() + } + super.removeListener(toRemove) } + } + + /** An alias for postToAll(), to avoid changing all call sites. */ + def post(event: SparkListenerEvent): Unit = postToAll(event) + + override def postToAll(event: SparkListenerEvent): Unit = { + if (!stopped.get()) { + metrics.numEventsPosted.inc() + super.postToAll(event) } } override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { - metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface])) + val name = listener.asInstanceOf[ListenerQueue].name + metrics.getTimer(s"queue.$name") } /** @@ -123,46 +125,13 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { * * @param sc Used to stop the SparkContext in case the listener thread dies. */ - def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = { - if (started.compareAndSet(false, true)) { - sparkContext = sc - metricsSystem.registerSource(metrics) - listenerThread.start() - } else { - throw new IllegalStateException(s"$name already started!") - } - } - - def post(event: SparkListenerEvent): Unit = { - if (stopped.get) { - // Drop further events to make `listenerThread` exit ASAP - logDebug(s"$name has already stopped! Dropping event $event") - return - } - metrics.numEventsPosted.inc() - val eventAdded = eventQueue.offer(event) - if (eventAdded) { - eventLock.release() - } else { - onDropEvent(event) + def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized { + if (!started.compareAndSet(false, true)) { + throw new IllegalStateException("LiveListenerBus already started.") } - val droppedEvents = droppedEventsCounter.get - if (droppedEvents > 0) { - // Don't log too frequently - if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { - // There may be multiple threads trying to decrease droppedEventsCounter. - // Use "compareAndSet" to make sure only one thread can win. - // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and - // then that thread will update it. - if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { - val prevLastReportTimestamp = lastReportTimestamp - lastReportTimestamp = System.currentTimeMillis() - logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + - new java.util.Date(prevLastReportTimestamp)) - } - } - } + queues.asScala.foreach(_.start(sc, metrics)) + metricsSystem.registerSource(metrics) } /** @@ -173,80 +142,63 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { */ @throws(classOf[TimeoutException]) def waitUntilEmpty(timeoutMillis: Long): Unit = { - val finishTime = System.currentTimeMillis + timeoutMillis - while (!queueIsEmpty) { - if (System.currentTimeMillis > finishTime) { - throw new TimeoutException( - s"The event queue is not empty after $timeoutMillis milliseconds") + val deadline = System.currentTimeMillis + timeoutMillis + queues.asScala.foreach { queue => + if (!queue.waitUntilEmpty(deadline)) { + throw new TimeoutException(s"The event queue is not empty after $timeoutMillis ms.") } - /* Sleep rather than using wait/notify, because this is used only for testing and - * wait/notify add overhead in the general case. */ - Thread.sleep(10) } } - /** - * For testing only. Return whether the listener daemon thread is still alive. - * Exposed for testing. - */ - def listenerThreadIsAlive: Boolean = listenerThread.isAlive - - /** - * Return whether the event queue is empty. - * - * The use of synchronized here guarantees that all events that once belonged to this queue - * have already been processed by all attached listeners, if this returns true. - */ - private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent } - /** * Stop the listener bus. It will wait until the queued events have been processed, but drop the * new events after stopping. */ def stop(): Unit = { if (!started.get()) { - throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") + throw new IllegalStateException(s"Attempted to stop bus that has not yet started!") + } + + if (!stopped.compareAndSet(false, true)) { + return } - if (stopped.compareAndSet(false, true)) { - // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know - // `stop` is called. - eventLock.release() - listenerThread.join() - } else { - // Keep quiet + + synchronized { + queues.asScala.foreach(_.stop()) } } - /** - * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be - * notified with the dropped events. - * - * Note: `onDropEvent` can be called in any thread. - */ - def onDropEvent(event: SparkListenerEvent): Unit = { - metrics.numDroppedEvents.inc() - droppedEventsCounter.incrementAndGet() - if (logDroppedEvent.compareAndSet(false, true)) { - // Only log the following message once to avoid duplicated annoying logs. - logError("Dropping SparkListenerEvent because no remaining room in event queue. " + - "This likely means one of the SparkListeners is too slow and cannot keep up with " + - "the rate at which tasks are being started by the scheduler.") + override private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): + Seq[T] = { + val c = implicitly[ClassTag[T]].runtimeClass + queues.asScala.flatMap { queue => + queue.listeners.asScala.filter(_.getClass() == c).map(_.asInstanceOf[T]) } - logTrace(s"Dropping event $event") } + + override private[spark] def listeners: JList[SparkListenerInterface] = { + queues.asScala.flatMap(_.listeners.asScala).asJava + } + + // Exposed for testing. + private[scheduler] def queues: JList[ListenerQueue] = { + super.listeners.asInstanceOf[JList[ListenerQueue]] + } + } private[spark] object LiveListenerBus { // Allows for Context to check whether stop() call is made within listener thread val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) - /** The thread name of Spark listener bus */ - val name = "SparkListenerBus" + /** Queue name where status-related listeners are grouped together. */ + val APP_STATUS_QUEUE = "appStatus" + + /** Queue name where executor management-related listeners are grouped together. */ + val EXECUTOR_MGMT_QUEUE = "executorMgmt" } -private[spark] class LiveListenerBusMetrics( - conf: SparkConf, - queue: LinkedBlockingQueue[_]) +private[spark] class LiveListenerBusMetrics(conf: SparkConf) extends Source with Logging { override val sourceName: String = "LiveListenerBus" @@ -260,48 +212,25 @@ private[spark] class LiveListenerBusMetrics( */ val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) - /** - * The total number of events that were dropped without being delivered to listeners. - */ - val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) - - /** - * The amount of time taken to post a single event to all listeners. - */ - val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) - - /** - * The number of messages waiting in the queue. - */ - val queueSize: Gauge[Int] = { - metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ - override def getValue: Int = queue.size() - }) - } - // Guarded by synchronization. - private val perListenerClassTimers = mutable.Map[String, Timer]() + private val allTimers = mutable.Map[String, Timer]() /** * Returns a timer tracking the processing time of the given listener class. * events processed by that listener. This method is thread-safe. */ - def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = { - synchronized { - val className = cls.getName - val maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED) - perListenerClassTimers.get(className).orElse { - if (perListenerClassTimers.size == maxTimed) { - logError(s"Not measuring processing time for listener class $className because a " + - s"maximum of $maxTimed listener classes are already timed.") - None - } else { - perListenerClassTimers(className) = - metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", className)) - perListenerClassTimers.get(className) - } + def getTimer(name: String): Option[Timer] = synchronized { + val maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED) + allTimers.get(name).orElse { + if (allTimers.size == maxTimed) { + logError(s"Not measuring processing time for listener $name because a " + + s"maximum of $maxTimed are already timed.") + None + } else { + allTimers(name) = metricRegistry.timer(s"$name.listenerProcessingTime") + allTimers.get(name) } } } -} +} diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 589f811145519..a4e5109d84196 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -161,13 +161,14 @@ private[spark] object SparkUI { def createLiveUI( sc: SparkContext, conf: SparkConf, - listenerBus: SparkListenerBus, jobProgressListener: JobProgressListener, securityManager: SecurityManager, appName: String, startTime: Long): SparkUI = { - create(Some(sc), conf, listenerBus, securityManager, appName, - jobProgressListener = Some(jobProgressListener), startTime = startTime) + create(Some(sc), conf, + l => sc.listenerBus.addToQueue(l, LiveListenerBus.APP_STATUS_QUEUE), + securityManager, appName, jobProgressListener = Some(jobProgressListener), + startTime = startTime) } def createHistoryUI( @@ -177,8 +178,8 @@ private[spark] object SparkUI { appName: String, basePath: String, startTime: Long): SparkUI = { - val sparkUI = create( - None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime) + val sparkUI = create(None, conf, listenerBus.addListener, securityManager, appName, basePath, + startTime = startTime) val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], Utils.getContextOrSparkClassLoader).asScala @@ -199,7 +200,7 @@ private[spark] object SparkUI { private def create( sc: Option[SparkContext], conf: SparkConf, - listenerBus: SparkListenerBus, + addListenerFn: SparkListenerInterface => Unit, securityManager: SecurityManager, appName: String, basePath: String = "", @@ -208,7 +209,7 @@ private[spark] object SparkUI { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { val listener = new JobProgressListener(conf) - listenerBus.addListener(listener) + addListenerFn(listener) listener } @@ -218,11 +219,11 @@ private[spark] object SparkUI { val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) - listenerBus.addListener(environmentListener) - listenerBus.addListener(storageStatusListener) - listenerBus.addListener(executorsListener) - listenerBus.addListener(storageListener) - listenerBus.addListener(operationGraphListener) + addListenerFn(environmentListener) + addListenerFn(storageStatusListener) + addListenerFn(executorsListener) + addListenerFn(storageListener) + addListenerFn(operationGraphListener) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, executorsListener, _jobProgressListener, storageListener, operationGraphListener, diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index 76a56298aaebc..e68a2d3562e77 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -46,7 +46,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { /** * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ - final def addListener(listener: L): Unit = { + def addListener(listener: L): Unit = { listenersPlusTimers.add((listener, getTimer(listener))) } @@ -54,7 +54,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { * Remove a listener and it won't receive any events. This method is thread-safe and can be called * in any thread. */ - final def removeListener(listener: L): Unit = { + def removeListener(listener: L): Unit = { listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer => listenersPlusTimers.remove(listenerAndTimer) } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 7da4bae0ab7eb..a91e09b7cb69f 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -49,6 +49,11 @@ class ExecutorAllocationManagerSuite contexts.foreach(_.stop()) } + private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = { + bus.post(event) + bus.waitUntilEmpty(1000) + } + test("verify min/max executors") { val conf = new SparkConf() .setMaster("myDummyLocalExternalClusterManager") @@ -95,7 +100,7 @@ class ExecutorAllocationManagerSuite test("add executors") { sc = createSparkContext(1, 10, 1) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Keep adding until the limit is reached assert(numExecutorsTarget(manager) === 1) @@ -140,7 +145,7 @@ class ExecutorAllocationManagerSuite test("add executors capped by num pending tasks") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 5))) // Verify that we're capped at number of tasks in the stage assert(numExecutorsTarget(manager) === 0) @@ -156,10 +161,10 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 3))) + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) - sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 6) @@ -172,9 +177,9 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that re-running a task doesn't blow things up - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3))) - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 3))) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 9) assert(numExecutorsToAdd(manager) === 2) @@ -183,7 +188,7 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task once we're at our limit doesn't blow things up - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1"))) assert(addExecutors(manager) === 0) assert(numExecutorsTarget(manager) === 10) } @@ -193,13 +198,13 @@ class ExecutorAllocationManagerSuite val manager = sc.executorAllocationManager.get // Verify that we're capped at number of tasks including the speculative ones in the stage - sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) + post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1)) assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 1) - sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) - sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2))) + post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1)) + post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1)) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 2))) assert(numExecutorsTarget(manager) === 1) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 2) @@ -210,13 +215,13 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) // Verify that running a speculative task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true))) + post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) @@ -225,7 +230,7 @@ class ExecutorAllocationManagerSuite test("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5))) assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) @@ -236,15 +241,15 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 3) val task1Info = createTaskInfo(0, 0, "executor-1") - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info)) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, task1Info)) assert(numExecutorsToAdd(manager) === 4) assert(addExecutors(manager) === 2) val task2Info = createTaskInfo(1, 0, "executor-1") - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info)) - sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task1Info, null)) - sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task2Info, null)) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, task2Info)) + post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task1Info, null)) + post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task2Info, null)) assert(adjustRequestedExecutors(manager) === -1) } @@ -352,21 +357,22 @@ class ExecutorAllocationManagerSuite sc = createSparkContext(5, 12, 5) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 8))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 8))) // Remove when numExecutorsTarget is the same as the current number of executors assert(addExecutors(manager) === 1) assert(addExecutors(manager) === 2) (1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach { - info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) } + info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) } assert(executorIds(manager).size === 8) assert(numExecutorsTarget(manager) === 8) assert(maxNumExecutorsNeeded(manager) == 8) assert(!removeExecutor(manager, "1")) // won't work since numExecutorsTarget == numExecutors // Remove executors when numExecutorsTarget is lower than current number of executors - (1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach { - info => sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, Success, info, null)) } + (1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach { info => + post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, info, null)) + } adjustRequestedExecutors(manager) assert(executorIds(manager).size === 8) assert(numExecutorsTarget(manager) === 5) @@ -378,7 +384,7 @@ class ExecutorAllocationManagerSuite onExecutorRemoved(manager, "3") // numExecutorsTarget is lower than minNumExecutors - sc.listenerBus.postToAll( + post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), null)) assert(executorIds(manager).size === 5) assert(numExecutorsTarget(manager) === 5) @@ -390,7 +396,7 @@ class ExecutorAllocationManagerSuite test ("interleaving add and remove") { sc = createSparkContext(5, 12, 5) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Add a few executors assert(addExecutors(manager) === 1) @@ -569,7 +575,7 @@ class ExecutorAllocationManagerSuite val clock = new ManualClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Scheduler queue backlogged onSchedulerBacklogged(manager) @@ -682,26 +688,26 @@ class ExecutorAllocationManagerSuite // Starting a stage should start the add timer val numTasks = 10 - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, numTasks))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, numTasks))) assert(addTime(manager) !== NOT_SET) // Starting a subset of the tasks should not cancel the add timer val taskInfos = (0 to numTasks - 1).map { i => createTaskInfo(i, i, "executor-1") } - taskInfos.tail.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) } + taskInfos.tail.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) } assert(addTime(manager) !== NOT_SET) // Starting all remaining tasks should cancel the add timer - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfos.head)) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfos.head)) assert(addTime(manager) === NOT_SET) // Start two different stages // The add timer should be canceled only if all tasks in both stages start running - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, numTasks))) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, numTasks))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, numTasks))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, numTasks))) assert(addTime(manager) !== NOT_SET) - taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, info)) } + taskInfos.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(1, 0, info)) } assert(addTime(manager) !== NOT_SET) - taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, info)) } + taskInfos.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(2, 0, info)) } assert(addTime(manager) === NOT_SET) } @@ -715,22 +721,22 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).size === 5) // Starting a task cancel the remove timer for that executor - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) assert(removeTimes(manager).size === 3) assert(!removeTimes(manager).contains("executor-1")) assert(!removeTimes(manager).contains("executor-2")) // Finishing all tasks running on an executor should start the remove timer for that executor - sc.listenerBus.postToAll(SparkListenerTaskEnd( + post(sc.listenerBus, SparkListenerTaskEnd( 0, 0, "task-type", Success, createTaskInfo(0, 0, "executor-1"), new TaskMetrics)) - sc.listenerBus.postToAll(SparkListenerTaskEnd( + post(sc.listenerBus, SparkListenerTaskEnd( 0, 0, "task-type", Success, createTaskInfo(2, 2, "executor-2"), new TaskMetrics)) assert(removeTimes(manager).size === 4) assert(!removeTimes(manager).contains("executor-1")) // executor-1 has not finished yet assert(removeTimes(manager).contains("executor-2")) - sc.listenerBus.postToAll(SparkListenerTaskEnd( + post(sc.listenerBus, SparkListenerTaskEnd( 0, 0, "task-type", Success, createTaskInfo(1, 1, "executor-1"), new TaskMetrics)) assert(removeTimes(manager).size === 5) assert(removeTimes(manager).contains("executor-1")) // executor-1 has now finished @@ -743,13 +749,13 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).isEmpty) // New executors have registered - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(removeTimes(manager).contains("executor-1")) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-2", new ExecutorInfo("host2", 1, Map.empty))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) @@ -757,14 +763,14 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).contains("executor-2")) // Existing executors have disconnected - sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", "")) + post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "executor-1", "")) assert(executorIds(manager).size === 1) assert(!executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(!removeTimes(manager).contains("executor-1")) // Unknown executor has disconnected - sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", "")) + post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "executor-3", "")) assert(executorIds(manager).size === 1) assert(removeTimes(manager).size === 1) } @@ -775,8 +781,8 @@ class ExecutorAllocationManagerSuite assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) @@ -788,15 +794,15 @@ class ExecutorAllocationManagerSuite val manager = sc.executorAllocationManager.get assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 0) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-2", new ExecutorInfo("host1", 1, Map.empty))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) @@ -809,7 +815,7 @@ class ExecutorAllocationManagerSuite sc = createSparkContext(0, 100000, 0) val manager = sc.executorAllocationManager.get val stage1 = createStageInfo(0, 1000) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) + post(sc.listenerBus, SparkListenerStageSubmitted(stage1)) assert(addExecutors(manager) === 1) assert(addExecutors(manager) === 2) @@ -820,12 +826,12 @@ class ExecutorAllocationManagerSuite onExecutorAdded(manager, s"executor-$i") } assert(executorIds(manager).size === 15) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) + post(sc.listenerBus, SparkListenerStageCompleted(stage1)) adjustRequestedExecutors(manager) assert(numExecutorsTarget(manager) === 0) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 1000))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 1000))) addExecutors(manager) assert(numExecutorsTarget(manager) === 16) } @@ -842,7 +848,7 @@ class ExecutorAllocationManagerSuite // Verify whether the initial number of executors is kept with no pending tasks assert(numExecutorsTarget(manager) === 3) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 2))) clock.advance(100L) assert(maxNumExecutorsNeeded(manager) === 2) @@ -892,7 +898,7 @@ class ExecutorAllocationManagerSuite Seq.empty ) val stageInfo1 = createStageInfo(1, 5, localityPreferences1) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo1)) + post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo1)) assert(localityAwareTasks(manager) === 3) assert(hostToLocalTaskCount(manager) === @@ -904,13 +910,13 @@ class ExecutorAllocationManagerSuite Seq.empty ) val stageInfo2 = createStageInfo(2, 3, localityPreferences2) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo2)) + post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo2)) assert(localityAwareTasks(manager) === 5) assert(hostToLocalTaskCount(manager) === Map("host1" -> 2, "host2" -> 4, "host3" -> 4, "host4" -> 3, "host5" -> 2)) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo1)) + post(sc.listenerBus, SparkListenerStageCompleted(stageInfo1)) assert(localityAwareTasks(manager) === 2) assert(hostToLocalTaskCount(manager) === Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2)) @@ -921,16 +927,16 @@ class ExecutorAllocationManagerSuite val manager = sc.executorAllocationManager.get assert(maxNumExecutorsNeeded(manager) === 0) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1))) assert(maxNumExecutorsNeeded(manager) === 1) val taskInfo = createTaskInfo(1, 1, "executor-1") - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfo)) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo)) assert(maxNumExecutorsNeeded(manager) === 1) // If the task is failed, we expect it to be resubmitted later. val taskEndReason = ExceptionFailure(null, null, null, null, None) - sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null)) + post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null)) assert(maxNumExecutorsNeeded(manager) === 1) } @@ -942,7 +948,7 @@ class ExecutorAllocationManagerSuite // Allocation manager is reset when adding executor requests are sent without reporting back // executor added. - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 10))) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 2) @@ -957,7 +963,7 @@ class ExecutorAllocationManagerSuite assert(executorIds(manager) === Set.empty) // Allocation manager is reset when executors are added. - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 10))) addExecutors(manager) addExecutors(manager) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 995df1dd52010..4dba28049f435 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -42,18 +42,28 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match private val mockSparkContext: SparkContext = Mockito.mock(classOf[SparkContext]) private val mockMetricsSystem: MetricsSystem = Mockito.mock(classOf[MetricsSystem]) + private def numDroppedEvents(bus: LiveListenerBus): Long = { + bus.metrics.metricRegistry.counter("queue.default.numDroppedEvents").getCount + } + + private def queueSize(bus: LiveListenerBus): Option[Int] = { + Option(bus.metrics.metricRegistry.getGauges().get("queue.default.size")) + .map(_.getValue().asInstanceOf[Int]) + } + + private def eventProcessingTimeCount(bus: LiveListenerBus): Long = { + bus.metrics.metricRegistry.timer("queue.default.listenerProcessingTime").getCount() + } + test("don't call sc.stop in listener") { sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val listener = new SparkContextStoppingListener(sc) - val bus = new LiveListenerBus(sc.conf) - bus.addListener(listener) - // Starting listener bus should flush all buffered events - bus.start(sc, sc.env.metricsSystem) - bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.addListener(listener) + sc.listenerBus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.stop() - bus.stop() assert(listener.sparkExSeen) } @@ -65,9 +75,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Metrics are initially empty. assert(bus.metrics.numEventsPosted.getCount === 0) - assert(bus.metrics.numDroppedEvents.getCount === 0) - assert(bus.metrics.queueSize.getValue === 0) - assert(bus.metrics.eventProcessingTime.getCount === 0) + assert(numDroppedEvents(bus) === 0) + assert(queueSize(bus) === None) + assert(eventProcessingTimeCount(bus) === 0) // Post five events: (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } @@ -75,7 +85,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Five messages should be marked as received and queued, but no messages should be posted to // listeners yet because the the listener bus hasn't been started. assert(bus.metrics.numEventsPosted.getCount === 5) - assert(bus.metrics.queueSize.getValue === 5) + assert(queueSize(bus) === None) assert(counter.count === 0) // Starting listener bus should flush all buffered events @@ -83,18 +93,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match Mockito.verify(mockMetricsSystem).registerSource(bus.metrics) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(counter.count === 5) - assert(bus.metrics.queueSize.getValue === 0) - assert(bus.metrics.eventProcessingTime.getCount === 5) + assert(queueSize(bus) === Some(0L)) + assert(eventProcessingTimeCount(bus) === 5) // After listener bus has stopped, posting events should not increment counter bus.stop() (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } assert(counter.count === 5) - assert(bus.metrics.numEventsPosted.getCount === 5) - - // Make sure per-listener-class timers were created: - assert(bus.metrics.getTimerForListenerClass( - classOf[BasicJobCounter].asSubclass(classOf[SparkListenerInterface])).get.getCount == 5) + assert(eventProcessingTimeCount(bus) === 5) // Listener bus must not be started twice intercept[IllegalStateException] { @@ -180,20 +186,19 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Post a message to the listener bus and wait for processing to begin: bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) listenerStarted.acquire() - assert(bus.metrics.queueSize.getValue === 0) - assert(bus.metrics.numDroppedEvents.getCount === 0) + assert(queueSize(bus) === Some(0)) + assert(numDroppedEvents(bus) === 0) // If we post an additional message then it should remain in the queue because the listener is // busy processing the first event: bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - assert(bus.metrics.queueSize.getValue === 1) - assert(bus.metrics.numDroppedEvents.getCount === 0) + assert(queueSize(bus) === Some(1)) + assert(numDroppedEvents(bus) === 0) // The queue is now full, so any additional events posted to the listener will be dropped: bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - assert(bus.metrics.queueSize.getValue === 1) - assert(bus.metrics.numDroppedEvents.getCount === 1) - + assert(queueSize(bus) === Some(1)) + assert(numDroppedEvents(bus) === 1) // Allow the the remaining events to be processed so we can stop the listener bus: listenerWait.release(2) @@ -429,7 +434,6 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // The exception should be caught, and the event should be propagated to other listeners - assert(bus.listenerThreadIsAlive) assert(jobCounter1.count === 5) assert(jobCounter2.count === 5) } @@ -449,6 +453,31 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match .count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be (1) } + test("add and remove listeners to/from LiveListenerBus queues") { + val bus = new LiveListenerBus(new SparkConf(false)) + val counter1 = new BasicJobCounter() + val counter2 = new BasicJobCounter() + val counter3 = new BasicJobCounter() + + bus.addListener(counter1) + bus.addToQueue(counter2, "other") + bus.addToQueue(counter3, "other") + assert(bus.queues.asScala.map(_.name) === Seq("default", "other")) + assert(bus.findListenersByClass[BasicJobCounter]().size === 3) + + bus.removeListener(counter1) + assert(bus.queues.asScala.map(_.name) === Seq("other")) + assert(bus.findListenersByClass[BasicJobCounter]().size === 2) + + bus.removeListener(counter2) + assert(bus.queues.asScala.map(_.name) === Seq("other")) + assert(bus.findListenersByClass[BasicJobCounter]().size === 1) + + bus.removeListener(counter3) + assert(bus.queues.isEmpty) + assert(bus.findListenersByClass[BasicJobCounter]().isEmpty) + } + /** * Assert that the given list of numbers has an average that is greater than zero. */ diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 1cb52593e7060..79f02f2e50bbd 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.storage._ * Test various functionality in the StorageListener that supports the StorageTab. */ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { - private var bus: LiveListenerBus = _ + private var bus: SparkListenerBus = _ private var storageStatusListener: StorageStatusListener = _ private var storageListener: StorageListener = _ private val memAndDisk = StorageLevel.MEMORY_AND_DISK @@ -43,7 +43,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { before { val conf = new SparkConf() - bus = new LiveListenerBus(conf) + bus = new ReplayListenerBus() storageStatusListener = new StorageStatusListener(conf) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 7202f1222d10f..bf5ba16927e54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FsUrlStreamHandlerFactory import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager @@ -148,7 +149,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { if (SparkSession.sqlListener.get() == null) { val listener = new SQLListener(sc.conf) if (SparkSession.sqlListener.compareAndSet(null, listener)) { - sc.addSparkListener(listener) + sc.listenerBus.addToQueue(listener, LiveListenerBus.APP_STATUS_QUEUE) sc.ui.foreach(new SQLTab(listener, _)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index f3b4ff2d1d80c..8c7418ec7ac10 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -659,8 +659,7 @@ class StreamingContext private[streaming] ( def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { var shutdownHookRefToRemove: AnyRef = null if (LiveListenerBus.withinListenerThread.value) { - throw new SparkException( - s"Cannot stop StreamingContext within listener thread of ${LiveListenerBus.name}") + throw new SparkException(s"Cannot stop StreamingContext within listener bus thread.") } synchronized { // The state should always be Stopped after calling `stop()`, even if we haven't started yet diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 5fb0bd057d0f1..ea7fadbc5417c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -76,7 +76,7 @@ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) * forward them to StreamingListeners. */ def start(): Unit = { - sparkListenerBus.addListener(this) // for getting callbacks on spark events + sparkListenerBus.addToQueue(this, LiveListenerBus.APP_STATUS_QUEUE) } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 96ab5a2080b8e..5810e73f4098b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -575,8 +575,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL test("getActive and getActiveOrCreate") { require(StreamingContext.getActive().isEmpty, "context exists from before") - sc = new SparkContext(conf) - var newContextCreated = false def creatingFunc(): StreamingContext = { @@ -603,6 +601,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL // getActiveOrCreate should create new context and getActive should return it only // after starting the context testGetActiveOrCreate { + sc = new SparkContext(conf) ssc = StreamingContext.getActiveOrCreate(creatingFunc _) assert(ssc != null, "no context created") assert(newContextCreated === true, "new context not created") @@ -622,6 +621,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL // getActiveOrCreate and getActive should return independently created context after activating testGetActiveOrCreate { + sc = new SparkContext(conf) ssc = creatingFunc() // Create assert(StreamingContext.getActive().isEmpty, "new initialized context returned before starting") From 9fc30bad170fd3acde033c02caa7aaf9f473f6df Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Sep 2017 00:35:49 -0700 Subject: [PATCH 02/15] Better name for ListenerQueue. --- .../{ListenerQueue.scala => AsyncEventQueue.scala} | 8 ++++---- .../org/apache/spark/scheduler/LiveListenerBus.scala | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) rename core/src/main/scala/org/apache/spark/scheduler/{ListenerQueue.scala => AsyncEventQueue.scala} (97%) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ListenerQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala similarity index 97% rename from core/src/main/scala/org/apache/spark/scheduler/ListenerQueue.scala rename to core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 54fb692606a21..78451c3fc7e4e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ListenerQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -37,15 +37,15 @@ import org.apache.spark.util.Utils * Delivery will only begin when the `start()` method is called. The `stop()` method should be * called when no more events need to be delivered. * - * Instances of `ListenerQueue` are listeners themselves, but they're not to be used like regular + * Instances of `AsyncEventQueue` are listeners themselves, but they're not to be used like regular * listeners; they are used internally by `LiveListenerBus`, and are tightly coupled to the * lifecycle of that implementation. */ -private class ListenerQueue(val name: String, conf: SparkConf) +private class AsyncEventQueue(val name: String, conf: SparkConf) extends SparkListenerInterface with Logging { - import ListenerQueue._ + import AsyncEventQueue._ private val _listeners = new CopyOnWriteArrayList[SparkListenerInterface]() @@ -308,7 +308,7 @@ private class ListenerQueue(val name: String, conf: SparkConf) } -private object ListenerQueue { +private object AsyncEventQueue { val POISON_PILL: SparkListenerInterface => Unit = { _ => Unit } diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index b1478b21f290e..d17a2a1e41dbf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -80,7 +80,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { queue.addListener(listener) case None => - val newQueue = new ListenerQueue(queue, conf) + val newQueue = new AsyncEventQueue(queue, conf) newQueue.addListener(listener) if (started.get() && !stopped.get()) { newQueue.start(sparkContext, metrics) @@ -112,7 +112,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { } override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { - val name = listener.asInstanceOf[ListenerQueue].name + val name = listener.asInstanceOf[AsyncEventQueue].name metrics.getTimer(s"queue.$name") } @@ -181,8 +181,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { } // Exposed for testing. - private[scheduler] def queues: JList[ListenerQueue] = { - super.listeners.asInstanceOf[JList[ListenerQueue]] + private[scheduler] def queues: JList[AsyncEventQueue] = { + super.listeners.asInstanceOf[JList[AsyncEventQueue]] } } From 6bee2149ff85214167a98f1a31717b468b30a34b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Sep 2017 00:43:09 -0700 Subject: [PATCH 03/15] Cleanup metric initialization a bit. Also fix a problem where the queue size metric might be out of date. --- .../spark/scheduler/AsyncEventQueue.scala | 24 ++++++++----------- .../spark/scheduler/LiveListenerBus.scala | 6 ++--- .../spark/scheduler/SparkListenerSuite.scala | 17 +++++++------ 3 files changed, 21 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 78451c3fc7e4e..ee3a300f4e116 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -41,7 +41,7 @@ import org.apache.spark.util.Utils * listeners; they are used internally by `LiveListenerBus`, and are tightly coupled to the * lifecycle of that implementation. */ -private class AsyncEventQueue(val name: String, conf: SparkConf) +private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics) extends SparkListenerInterface with Logging { @@ -86,7 +86,14 @@ private class AsyncEventQueue(val name: String, conf: SparkConf) private val started = new AtomicBoolean(false) private val stopped = new AtomicBoolean(false) - private var droppedEvents: Counter = null + private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents") + + // Remove the queue size gauge first, in case it was created by a previous incarnation of + // this queue that was removed from the listener bus. + metrics.metricRegistry.remove(s"queue.$name.size") + metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] { + override def getValue: Int = taskQueue.size() + }) private val dispatchThread = new Thread(s"spark-listener-group-$name") { setDaemon(true) @@ -125,20 +132,9 @@ private class AsyncEventQueue(val name: String, conf: SparkConf) * @param sc Used to stop the SparkContext in case the a listener fails. * @param metrics Used to report listener performance metrics. */ - private[scheduler] def start(sc: SparkContext, metrics: LiveListenerBusMetrics): Unit = { + private[scheduler] def start(sc: SparkContext): Unit = { if (started.compareAndSet(false, true)) { this.sc = sc - this.droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents") - - // Avoid warnings in the logs if this queue is being re-created; it will reuse the same - // gauge as before. - val queueSizeGauge = s"queue.$name.size" - if (metrics.metricRegistry.getGauges().get(queueSizeGauge) == null) { - metrics.metricRegistry.register(queueSizeGauge, new Gauge[Int] { - override def getValue: Int = taskQueue.size() - }) - } - dispatchThread.start() } else { throw new IllegalStateException(s"$name already started!") diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index d17a2a1e41dbf..e8d196f411526 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -80,10 +80,10 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { queue.addListener(listener) case None => - val newQueue = new AsyncEventQueue(queue, conf) + val newQueue = new AsyncEventQueue(queue, conf, metrics) newQueue.addListener(listener) if (started.get() && !stopped.get()) { - newQueue.start(sparkContext, metrics) + newQueue.start(sparkContext) } super.addListener(newQueue) } @@ -130,7 +130,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { throw new IllegalStateException("LiveListenerBus already started.") } - queues.asScala.foreach(_.start(sc, metrics)) + queues.asScala.foreach(_.start(sc)) metricsSystem.registerSource(metrics) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 4dba28049f435..27854464598e7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -46,9 +46,8 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match bus.metrics.metricRegistry.counter("queue.default.numDroppedEvents").getCount } - private def queueSize(bus: LiveListenerBus): Option[Int] = { - Option(bus.metrics.metricRegistry.getGauges().get("queue.default.size")) - .map(_.getValue().asInstanceOf[Int]) + private def queueSize(bus: LiveListenerBus): Int = { + bus.metrics.metricRegistry.getGauges().get("queue.default.size").getValue().asInstanceOf[Int] } private def eventProcessingTimeCount(bus: LiveListenerBus): Long = { @@ -76,7 +75,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Metrics are initially empty. assert(bus.metrics.numEventsPosted.getCount === 0) assert(numDroppedEvents(bus) === 0) - assert(queueSize(bus) === None) + assert(queueSize(bus) === 0) assert(eventProcessingTimeCount(bus) === 0) // Post five events: @@ -85,7 +84,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Five messages should be marked as received and queued, but no messages should be posted to // listeners yet because the the listener bus hasn't been started. assert(bus.metrics.numEventsPosted.getCount === 5) - assert(queueSize(bus) === None) + assert(queueSize(bus) === 5) assert(counter.count === 0) // Starting listener bus should flush all buffered events @@ -93,7 +92,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match Mockito.verify(mockMetricsSystem).registerSource(bus.metrics) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(counter.count === 5) - assert(queueSize(bus) === Some(0L)) + assert(queueSize(bus) === 0) assert(eventProcessingTimeCount(bus) === 5) // After listener bus has stopped, posting events should not increment counter @@ -186,18 +185,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Post a message to the listener bus and wait for processing to begin: bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) listenerStarted.acquire() - assert(queueSize(bus) === Some(0)) + assert(queueSize(bus) === 0) assert(numDroppedEvents(bus) === 0) // If we post an additional message then it should remain in the queue because the listener is // busy processing the first event: bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - assert(queueSize(bus) === Some(1)) + assert(queueSize(bus) === 1) assert(numDroppedEvents(bus) === 0) // The queue is now full, so any additional events posted to the listener will be dropped: bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - assert(queueSize(bus) === Some(1)) + assert(queueSize(bus) === 1) assert(numDroppedEvents(bus) === 1) // Allow the the remaining events to be processed so we can stop the listener bus: From 2915a5ec1bd9d4bc7a40b0ad20ca5b0db8f5382e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Sep 2017 09:10:28 -0700 Subject: [PATCH 04/15] Move dispatching to the event thread. This change makes the event queue implement SparkListenerBus and inherit all the metrics and dispatching behavior, making the change easier on the scheduler and also restoring per-listener metrics. --- .../spark/scheduler/AsyncEventQueue.scala | 158 +++--------------- .../spark/scheduler/LiveListenerBus.scala | 41 ++--- .../scheduler/EventLoggingListenerSuite.scala | 4 +- .../spark/scheduler/SparkListenerSuite.scala | 8 +- 4 files changed, 49 insertions(+), 162 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index ee3a300f4e116..c8481f8dcf2d4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -17,13 +17,10 @@ package org.apache.spark.scheduler -import java.util.{ArrayList, List => JList} -import java.util.concurrent._ +import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import scala.util.control.NonFatal - -import com.codahale.metrics.{Counter, Gauge, MetricRegistry} +import com.codahale.metrics.{Gauge, Timer} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging @@ -36,36 +33,16 @@ import org.apache.spark.util.Utils * * Delivery will only begin when the `start()` method is called. The `stop()` method should be * called when no more events need to be delivered. - * - * Instances of `AsyncEventQueue` are listeners themselves, but they're not to be used like regular - * listeners; they are used internally by `LiveListenerBus`, and are tightly coupled to the - * lifecycle of that implementation. */ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics) - extends SparkListenerInterface + extends SparkListenerBus with Logging { import AsyncEventQueue._ - private val _listeners = new CopyOnWriteArrayList[SparkListenerInterface]() - - def addListener(l: SparkListenerInterface): Unit = { - _listeners.add(l) - } - - /** - * @return Whether there are remainning listeners in the queue. - */ - def removeListener(l: SparkListenerInterface): Boolean = { - _listeners.remove(l) - !_listeners.isEmpty() - } - - def listeners: JList[SparkListenerInterface] = new ArrayList(_listeners) - // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if // it's perpetually being added to more quickly than it's being drained. - private val taskQueue = new LinkedBlockingQueue[SparkListenerInterface => Unit]( + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]( conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) // Keep the event count separately, so that waitUntilEmpty() can be implemented properly; @@ -87,12 +64,13 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi private val stopped = new AtomicBoolean(false) private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents") + private val processingTime = metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime") // Remove the queue size gauge first, in case it was created by a previous incarnation of // this queue that was removed from the listener bus. metrics.metricRegistry.remove(s"queue.$name.size") metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] { - override def getValue: Int = taskQueue.size() + override def getValue: Int = eventQueue.size() }) private val dispatchThread = new Thread(s"spark-listener-group-$name") { @@ -104,20 +82,16 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) { try { - var task: SparkListenerInterface => Unit = taskQueue.take() - while (task != POISON_PILL) { - val it = _listeners.iterator() - while (it.hasNext()) { - val listener = it.next() - try { - task(listener) - } catch { - case NonFatal(e) => - logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) - } + var next: SparkListenerEvent = eventQueue.take() + while (next != POISON_PILL) { + val ctx = processingTime.time() + try { + super.postToAll(next) + } finally { + ctx.stop() } eventCount.decrementAndGet() - task = taskQueue.take() + next = eventQueue.take() } eventCount.decrementAndGet() } catch { @@ -126,6 +100,10 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi } } + override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { + metrics.getTimer(listener.getClass().getName()) + } + /** * Start an asynchronous thread to dispatch events to the underlying listeners. * @@ -150,19 +128,19 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") } if (stopped.compareAndSet(false, true)) { - taskQueue.put(POISON_PILL) + eventQueue.put(POISON_PILL) eventCount.incrementAndGet() } dispatchThread.join() } - private def post(event: SparkListenerEvent)(task: SparkListenerInterface => Unit): Unit = { + def post(event: SparkListenerEvent): Unit = { if (stopped.get()) { return } eventCount.incrementAndGet() - if (taskQueue.offer(task)) { + if (eventQueue.offer(event)) { return } @@ -210,102 +188,10 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi true } - override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { - post(event)(_.onStageCompleted(event)) - } - - override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { - post(event)(_.onStageSubmitted(event)) - } - - override def onTaskStart(event: SparkListenerTaskStart): Unit = { - post(event)(_.onTaskStart(event)) - } - - override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = { - post(event)(_.onTaskGettingResult(event)) - } - - override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { - post(event)(_.onTaskEnd(event)) - } - - override def onJobStart(event: SparkListenerJobStart): Unit = { - post(event)(_.onJobStart(event)) - } - - override def onJobEnd(event: SparkListenerJobEnd): Unit = { - post(event)(_.onJobEnd(event)) - } - - override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { - post(event)(_.onEnvironmentUpdate(event)) - } - - override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = { - post(event)(_.onBlockManagerAdded(event)) - } - - override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { - post(event)(_.onBlockManagerRemoved(event)) - } - - override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { - post(event)(_.onUnpersistRDD(event)) - } - - override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { - post(event)(_.onApplicationStart(event)) - } - - override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { - post(event)(_.onApplicationEnd(event)) - } - - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { - post(event)(_.onExecutorMetricsUpdate(event)) - } - - override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { - post(event)(_.onExecutorAdded(event)) - } - - override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { - post(event)(_.onExecutorRemoved(event)) - } - - override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { - post(event)(_.onExecutorBlacklisted(event)) - } - - override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = { - post(event)(_.onExecutorUnblacklisted(event)) - } - - override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = { - post(event)(_.onNodeBlacklisted(event)) - } - - override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = { - post(event)(_.onNodeUnblacklisted(event)) - } - - override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { - post(event)(_.onBlockUpdated(event)) - } - - override def onSpeculativeTaskSubmitted(event: SparkListenerSpeculativeTaskSubmitted): Unit = { - post(event)(_.onSpeculativeTaskSubmitted(event)) - } - - override def onOtherEvent(event: SparkListenerEvent): Unit = { - post(event)(_.onOtherEvent(event)) - } - } private object AsyncEventQueue { - val POISON_PILL: SparkListenerInterface => Unit = { _ => Unit } + val POISON_PILL = new SparkListenerEvent() { } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index e8d196f411526..2cf81ce9d8bcd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -41,7 +41,7 @@ import org.apache.spark.metrics.source.Source * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ -private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { +private[spark] class LiveListenerBus(conf: SparkConf) { import LiveListenerBus._ @@ -60,8 +60,10 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { /** When `droppedEventsCounter` was logged last time in milliseconds. */ @volatile private var lastReportTimestamp = 0L + private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() + /** Add a listener to the default queue. */ - override def addListener(listener: SparkListenerInterface): Unit = { + def addListener(listener: SparkListenerInterface): Unit = { addToQueue(listener, "default") } @@ -85,37 +87,36 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { if (started.get() && !stopped.get()) { newQueue.start(sparkContext) } - super.addListener(newQueue) + queues.add(newQueue) } } - override def removeListener(listener: SparkListenerInterface): Unit = synchronized { + def removeListener(listener: SparkListenerInterface): Unit = synchronized { // Remove listener from all queues it was added to, and stop queues that have become empty. queues.asScala - .filter(!_.removeListener(listener)) + .filter { queue => + queue.removeListener(listener) + queue.listeners.isEmpty() + } .foreach { toRemove => if (started.get() && !stopped.get()) { toRemove.stop() } - super.removeListener(toRemove) + queues.remove(toRemove) } } /** An alias for postToAll(), to avoid changing all call sites. */ - def post(event: SparkListenerEvent): Unit = postToAll(event) - - override def postToAll(event: SparkListenerEvent): Unit = { + def post(event: SparkListenerEvent): Unit = { if (!stopped.get()) { metrics.numEventsPosted.inc() - super.postToAll(event) + val it = queues.iterator() + while (it.hasNext()) { + it.next().post(event) + } } } - override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { - val name = listener.asInstanceOf[AsyncEventQueue].name - metrics.getTimer(s"queue.$name") - } - /** * Start sending events to attached listeners. * @@ -168,7 +169,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { } } - override private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): + private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): Seq[T] = { val c = implicitly[ClassTag[T]].runtimeClass queues.asScala.flatMap { queue => @@ -176,13 +177,13 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { } } - override private[spark] def listeners: JList[SparkListenerInterface] = { + private[spark] def listeners: JList[SparkListenerInterface] = { queues.asScala.flatMap(_.listeners.asScala).asJava } - // Exposed for testing. - private[scheduler] def queues: JList[AsyncEventQueue] = { - super.listeners.asInstanceOf[JList[AsyncEventQueue]] + // For testing only. + private[scheduler] def activeQueues(): Seq[String] = { + queues.asScala.map(_.name).toSeq } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 0afd07b851cf9..b14461a2c2873 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -165,8 +165,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit eventLogger.start() listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) listenerBus.addListener(eventLogger) - listenerBus.postToAll(applicationStart) - listenerBus.postToAll(applicationEnd) + listenerBus.post(applicationStart) + listenerBus.post(applicationEnd) listenerBus.stop() eventLogger.stop() diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 27854464598e7..da5e5a7e86cac 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -461,19 +461,19 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match bus.addListener(counter1) bus.addToQueue(counter2, "other") bus.addToQueue(counter3, "other") - assert(bus.queues.asScala.map(_.name) === Seq("default", "other")) + assert(bus.activeQueues() === Seq("default", "other")) assert(bus.findListenersByClass[BasicJobCounter]().size === 3) bus.removeListener(counter1) - assert(bus.queues.asScala.map(_.name) === Seq("other")) + assert(bus.activeQueues() === Seq("other")) assert(bus.findListenersByClass[BasicJobCounter]().size === 2) bus.removeListener(counter2) - assert(bus.queues.asScala.map(_.name) === Seq("other")) + assert(bus.activeQueues() === Seq("other")) assert(bus.findListenersByClass[BasicJobCounter]().size === 1) bus.removeListener(counter3) - assert(bus.queues.isEmpty) + assert(bus.activeQueues().isEmpty) assert(bus.findListenersByClass[BasicJobCounter]().isEmpty) } From cf5c6ce74c185ebd90ea0f9040b177c64161cccc Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Sep 2017 09:29:27 -0700 Subject: [PATCH 05/15] Minor cleanup. --- .../org/apache/spark/scheduler/LiveListenerBus.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 2cf81ce9d8bcd..73a7c6bb191f8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -169,12 +169,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) { } } - private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): - Seq[T] = { - val c = implicitly[ClassTag[T]].runtimeClass - queues.asScala.flatMap { queue => - queue.listeners.asScala.filter(_.getClass() == c).map(_.asInstanceOf[T]) - } + private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): Seq[T] = { + queues.asScala.flatMap { queue => queue.findListenersByClass[T]() } } private[spark] def listeners: JList[SparkListenerInterface] = { From 24f5c8d0c78a8a362f4690ad03dac9dd07808f85 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Sep 2017 09:43:30 -0700 Subject: [PATCH 06/15] Undo obsolete changes. --- core/src/main/scala/org/apache/spark/util/ListenerBus.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index e68a2d3562e77..76a56298aaebc 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -46,7 +46,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { /** * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ - def addListener(listener: L): Unit = { + final def addListener(listener: L): Unit = { listenersPlusTimers.add((listener, getTimer(listener))) } @@ -54,7 +54,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { * Remove a listener and it won't receive any events. This method is thread-safe and can be called * in any thread. */ - def removeListener(listener: L): Unit = { + final def removeListener(listener: L): Unit = { listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer => listenersPlusTimers.remove(listenerAndTimer) } From ad6ff49de17c204e8d4feb775185a05d7fa9f53b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Sep 2017 10:13:09 -0700 Subject: [PATCH 07/15] Fix a comment after changes. --- .../scala/org/apache/spark/scheduler/LiveListenerBus.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 73a7c6bb191f8..9baf7aff29b34 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -106,7 +106,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) { } } - /** An alias for postToAll(), to avoid changing all call sites. */ + /** Post an event to all queues. */ def post(event: SparkListenerEvent): Unit = { if (!stopped.get()) { metrics.numEventsPosted.inc() @@ -166,6 +166,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) { synchronized { queues.asScala.foreach(_.stop()) + queues.clear() } } From 563be321dff3999a4172499307642ccf8b787093 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Sep 2017 18:01:19 -0700 Subject: [PATCH 08/15] Undo unnecessary changes. --- .../spark/scheduler/AsyncEventQueue.scala | 2 +- .../spark/scheduler/LiveListenerBus.scala | 26 +++++++++++-------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index c8481f8dcf2d4..ddfd198fe6bd2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -101,7 +101,7 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi } override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { - metrics.getTimer(listener.getClass().getName()) + metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface])) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 9baf7aff29b34..d696e4fd83a5b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -211,22 +211,26 @@ private[spark] class LiveListenerBusMetrics(conf: SparkConf) val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) // Guarded by synchronization. - private val allTimers = mutable.Map[String, Timer]() + private val perListenerClassTimers = mutable.Map[String, Timer]() /** * Returns a timer tracking the processing time of the given listener class. * events processed by that listener. This method is thread-safe. */ - def getTimer(name: String): Option[Timer] = synchronized { - val maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED) - allTimers.get(name).orElse { - if (allTimers.size == maxTimed) { - logError(s"Not measuring processing time for listener $name because a " + - s"maximum of $maxTimed are already timed.") - None - } else { - allTimers(name) = metricRegistry.timer(s"$name.listenerProcessingTime") - allTimers.get(name) + def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = { + synchronized { + val className = cls.getName + val maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED) + perListenerClassTimers.get(className).orElse { + if (perListenerClassTimers.size == maxTimed) { + logError(s"Not measuring processing time for listener class $className because a " + + s"maximum of $maxTimed listener classes are already timed.") + None + } else { + perListenerClassTimers(className) = + metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", className)) + perListenerClassTimers.get(className) + } } } } From 20b83826a70ac8574e289db9fdcae37c305c01bd Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Sep 2017 18:03:58 -0700 Subject: [PATCH 09/15] Make the event log queue name a constant too. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/scheduler/LiveListenerBus.scala | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 59fd4de20c399..4ab97c3d0e2af 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -522,7 +522,7 @@ class SparkContext(config: SparkConf) extends Logging { new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration) logger.start() - listenerBus.addToQueue(logger, "eventLog") + listenerBus.addToQueue(logger, LiveListenerBus.EVENT_LOG_QUEUE) Some(logger) } else { None diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index d696e4fd83a5b..3a3612aca5118 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -189,11 +189,14 @@ private[spark] object LiveListenerBus { // Allows for Context to check whether stop() call is made within listener thread val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) - /** Queue name where status-related listeners are grouped together. */ + /** Name of queue where status-related listeners are grouped together. */ val APP_STATUS_QUEUE = "appStatus" - /** Queue name where executor management-related listeners are grouped together. */ + /** Name of queue where executor management-related listeners are grouped together. */ val EXECUTOR_MGMT_QUEUE = "executorMgmt" + + /** Name of queue where the event log listener is placed. */ + val EVENT_LOG_QUEUE = "eventLog" } private[spark] class LiveListenerBusMetrics(conf: SparkConf) From 324041fe38a9b3bf176afa333ee1000d2b5e4415 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 15 Sep 2017 10:08:27 -0700 Subject: [PATCH 10/15] Explicit add methods for each queue. --- .../spark/ExecutorAllocationManager.scala | 2 +- .../org/apache/spark/HeartbeatReceiver.scala | 2 +- .../scala/org/apache/spark/SparkContext.scala | 4 +-- .../spark/scheduler/LiveListenerBus.scala | 34 +++++++++++++------ .../scala/org/apache/spark/ui/SparkUI.scala | 2 +- .../spark/scheduler/SparkListenerSuite.scala | 12 ++++--- .../spark/sql/internal/SharedState.scala | 2 +- .../scheduler/StreamingListenerBus.scala | 2 +- 8 files changed, 38 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index c5ac9401cec24..119b426a9af34 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager( * the scheduling task. */ def start(): Unit = { - listenerBus.addToQueue(listener, LiveListenerBus.EXECUTOR_MGMT_QUEUE) + listenerBus.addToManagementQueue(listener) val scheduleTask = new Runnable() { override def run(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index e2340cfc3a57a..ff960b396dbf1 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -63,7 +63,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) this(sc, new SystemClock) } - sc.listenerBus.addToQueue(this, LiveListenerBus.EXECUTOR_MGMT_QUEUE) + sc.listenerBus.addToManagementQueue(this) override val rpcEnv: RpcEnv = sc.env.rpcEnv diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4ab97c3d0e2af..adf57af19d3d5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -419,7 +419,7 @@ class SparkContext(config: SparkConf) extends Logging { // "_jobProgressListener" should be set up before creating SparkEnv because when creating // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them. _jobProgressListener = new JobProgressListener(_conf) - listenerBus.addToQueue(jobProgressListener, LiveListenerBus.APP_STATUS_QUEUE) + listenerBus.addToStatusQueue(jobProgressListener) // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) @@ -522,7 +522,7 @@ class SparkContext(config: SparkConf) extends Logging { new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration) logger.start() - listenerBus.addToQueue(logger, LiveListenerBus.EVENT_LOG_QUEUE) + listenerBus.addToEventLogQueue(logger) Some(logger) } else { None diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 3a3612aca5118..5a480d0bda632 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -64,7 +64,22 @@ private[spark] class LiveListenerBus(conf: SparkConf) { /** Add a listener to the default queue. */ def addListener(listener: SparkListenerInterface): Unit = { - addToQueue(listener, "default") + addToQueue(listener, DEFAULT_QUEUE) + } + + /** Add a listener to the executor management queue. */ + def addToManagementQueue(listener: SparkListenerInterface): Unit = { + addToQueue(listener, EXECUTOR_MGMT_QUEUE) + } + + /** Add a listener to the application status queue. */ + def addToStatusQueue(listener: SparkListenerInterface): Unit = { + addToQueue(listener, APP_STATUS_QUEUE) + } + + /** Add a listener to the event log queue. */ + def addToEventLogQueue(listener: SparkListenerInterface): Unit = { + addToQueue(listener, EVENT_LOG_QUEUE) } /** @@ -72,7 +87,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) { * of each other (each one uses a separate thread for delivering events), allowing slower * listeners to be somewhat isolated from others. */ - def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized { + private def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized { if (stopped.get()) { throw new IllegalStateException("LiveListenerBus is stopped.") } @@ -179,8 +194,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) { } // For testing only. - private[scheduler] def activeQueues(): Seq[String] = { - queues.asScala.map(_.name).toSeq + private[scheduler] def activeQueues(): Set[String] = { + queues.asScala.map(_.name).toSet } } @@ -189,14 +204,13 @@ private[spark] object LiveListenerBus { // Allows for Context to check whether stop() call is made within listener thread val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) - /** Name of queue where status-related listeners are grouped together. */ - val APP_STATUS_QUEUE = "appStatus" + private[scheduler] val DEFAULT_QUEUE = "default" + + private[scheduler] val APP_STATUS_QUEUE = "appStatus" - /** Name of queue where executor management-related listeners are grouped together. */ - val EXECUTOR_MGMT_QUEUE = "executorMgmt" + private[scheduler] val EXECUTOR_MGMT_QUEUE = "executorMgmt" - /** Name of queue where the event log listener is placed. */ - val EVENT_LOG_QUEUE = "eventLog" + private[scheduler] val EVENT_LOG_QUEUE = "eventLog" } private[spark] class LiveListenerBusMetrics(conf: SparkConf) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 9b2ae95680cf8..6e94073238a56 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -167,7 +167,7 @@ private[spark] object SparkUI { appName: String, startTime: Long): SparkUI = { create(Some(sc), conf, - l => sc.listenerBus.addToQueue(l, LiveListenerBus.APP_STATUS_QUEUE), + sc.listenerBus.addToStatusQueue, securityManager, appName, jobProgressListener = Some(jobProgressListener), startTime = startTime) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index da5e5a7e86cac..94594ff9f16f1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -453,23 +453,25 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("add and remove listeners to/from LiveListenerBus queues") { + import LiveListenerBus._ + val bus = new LiveListenerBus(new SparkConf(false)) val counter1 = new BasicJobCounter() val counter2 = new BasicJobCounter() val counter3 = new BasicJobCounter() bus.addListener(counter1) - bus.addToQueue(counter2, "other") - bus.addToQueue(counter3, "other") - assert(bus.activeQueues() === Seq("default", "other")) + bus.addToStatusQueue(counter2) + bus.addToStatusQueue(counter3) + assert(bus.activeQueues() === Set(DEFAULT_QUEUE, APP_STATUS_QUEUE)) assert(bus.findListenersByClass[BasicJobCounter]().size === 3) bus.removeListener(counter1) - assert(bus.activeQueues() === Seq("other")) + assert(bus.activeQueues() === Set(APP_STATUS_QUEUE)) assert(bus.findListenersByClass[BasicJobCounter]().size === 2) bus.removeListener(counter2) - assert(bus.activeQueues() === Seq("other")) + assert(bus.activeQueues() === Set(APP_STATUS_QUEUE)) assert(bus.findListenersByClass[BasicJobCounter]().size === 1) bus.removeListener(counter3) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index bf5ba16927e54..ad9db308b2627 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -149,7 +149,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { if (SparkSession.sqlListener.get() == null) { val listener = new SQLListener(sc.conf) if (SparkSession.sqlListener.compareAndSet(null, listener)) { - sc.listenerBus.addToQueue(listener, LiveListenerBus.APP_STATUS_QUEUE) + sc.listenerBus.addToStatusQueue(listener) sc.ui.foreach(new SQLTab(listener, _)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index ea7fadbc5417c..6a70bf7406b3c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -76,7 +76,7 @@ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) * forward them to StreamingListeners. */ def start(): Unit = { - sparkListenerBus.addToQueue(this, LiveListenerBus.APP_STATUS_QUEUE) + sparkListenerBus.addToStatusQueue(this) } /** From 3b218c22a133767c115b0cca09a0ecc4e861e0b2 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 15 Sep 2017 10:13:52 -0700 Subject: [PATCH 11/15] Rename addListener() to addToSharedQueue(). --- .../scala/org/apache/spark/SparkContext.scala | 4 +-- .../spark/scheduler/LiveListenerBus.scala | 8 ++--- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../scheduler/EventLoggingListenerSuite.scala | 2 +- .../spark/scheduler/SparkListenerSuite.scala | 29 ++++++++++--------- .../streaming/StreamingQueryListenerBus.scala | 2 +- 6 files changed, 24 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index adf57af19d3d5..1821bc87bf626 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1563,7 +1563,7 @@ class SparkContext(config: SparkConf) extends Logging { */ @DeveloperApi def addSparkListener(listener: SparkListenerInterface) { - listenerBus.addListener(listener) + listenerBus.addToSharedQueue(listener) } /** @@ -2377,7 +2377,7 @@ class SparkContext(config: SparkConf) extends Logging { " parameter from breaking Spark's ability to find a valid constructor.") } } - listenerBus.addListener(listener) + listenerBus.addToSharedQueue(listener) logInfo(s"Registered listener $className") } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 5a480d0bda632..33b344702956e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -62,9 +62,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) { private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() - /** Add a listener to the default queue. */ - def addListener(listener: SparkListenerInterface): Unit = { - addToQueue(listener, DEFAULT_QUEUE) + /** Add a listener to queue shared by all non-internal listeners. */ + def addToSharedQueue(listener: SparkListenerInterface): Unit = { + addToQueue(listener, SHARED_QUEUE) } /** Add a listener to the executor management queue. */ @@ -204,7 +204,7 @@ private[spark] object LiveListenerBus { // Allows for Context to check whether stop() call is made within listener thread val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) - private[scheduler] val DEFAULT_QUEUE = "default" + private[scheduler] val SHARED_QUEUE = "shared" private[scheduler] val APP_STATUS_QUEUE = "appStatus" diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 703fc1b34c387..6222e576d1ce9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -751,7 +751,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Helper functions to extract commonly used code in Fetch Failure test cases private def setupStageAbortTest(sc: SparkContext) { - sc.listenerBus.addListener(new EndListener()) + sc.listenerBus.addToSharedQueue(new EndListener()) ended = false jobResult = null } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index b14461a2c2873..6b42775ccb0f6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -164,7 +164,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite eventLogger.start() listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) - listenerBus.addListener(eventLogger) + listenerBus.addToEventLogQueue(eventLogger) listenerBus.post(applicationStart) listenerBus.post(applicationEnd) listenerBus.stop() diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 94594ff9f16f1..d061c7845f4a6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -34,6 +34,8 @@ import org.apache.spark.util.{ResetSystemProperties, RpcUtils} class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers with ResetSystemProperties { + import LiveListenerBus._ + /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 @@ -43,22 +45,23 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match private val mockMetricsSystem: MetricsSystem = Mockito.mock(classOf[MetricsSystem]) private def numDroppedEvents(bus: LiveListenerBus): Long = { - bus.metrics.metricRegistry.counter("queue.default.numDroppedEvents").getCount + bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount } private def queueSize(bus: LiveListenerBus): Int = { - bus.metrics.metricRegistry.getGauges().get("queue.default.size").getValue().asInstanceOf[Int] + bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue() + .asInstanceOf[Int] } private def eventProcessingTimeCount(bus: LiveListenerBus): Long = { - bus.metrics.metricRegistry.timer("queue.default.listenerProcessingTime").getCount() + bus.metrics.metricRegistry.timer(s"queue.$SHARED_QUEUE.listenerProcessingTime").getCount() } test("don't call sc.stop in listener") { sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val listener = new SparkContextStoppingListener(sc) - sc.listenerBus.addListener(listener) + sc.listenerBus.addToSharedQueue(listener) sc.listenerBus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) sc.stop() @@ -70,7 +73,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val conf = new SparkConf() val counter = new BasicJobCounter val bus = new LiveListenerBus(conf) - bus.addListener(counter) + bus.addToSharedQueue(counter) // Metrics are initially empty. assert(bus.metrics.numEventsPosted.getCount === 0) @@ -140,7 +143,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val bus = new LiveListenerBus(new SparkConf()) val blockingListener = new BlockingListener - bus.addListener(blockingListener) + bus.addToSharedQueue(blockingListener) bus.start(mockSparkContext, mockMetricsSystem) bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) @@ -173,7 +176,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val listenerStarted = new Semaphore(0) val listenerWait = new Semaphore(0) - bus.addListener(new SparkListener { + bus.addToSharedQueue(new SparkListener { override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { listenerStarted.release() listenerWait.acquire() @@ -423,9 +426,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val bus = new LiveListenerBus(new SparkConf()) // Propagate events to bad listener first - bus.addListener(badListener) - bus.addListener(jobCounter1) - bus.addListener(jobCounter2) + bus.addToSharedQueue(badListener) + bus.addToSharedQueue(jobCounter1) + bus.addToSharedQueue(jobCounter2) bus.start(mockSparkContext, mockMetricsSystem) // Post events to all listeners, and wait until the queue is drained @@ -453,17 +456,15 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("add and remove listeners to/from LiveListenerBus queues") { - import LiveListenerBus._ - val bus = new LiveListenerBus(new SparkConf(false)) val counter1 = new BasicJobCounter() val counter2 = new BasicJobCounter() val counter3 = new BasicJobCounter() - bus.addListener(counter1) + bus.addToSharedQueue(counter1) bus.addToStatusQueue(counter2) bus.addToStatusQueue(counter3) - assert(bus.activeQueues() === Set(DEFAULT_QUEUE, APP_STATUS_QUEUE)) + assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE)) assert(bus.findListenersByClass[BasicJobCounter]().size === 3) bus.removeListener(counter1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 4207013c3f75d..07e39023c8366 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) import StreamingQueryListener._ - sparkListenerBus.addListener(this) + sparkListenerBus.addToSharedQueue(this) /** * RunIds of active queries whose events are supposed to be forwarded by this ListenerBus From ed714778385f8dcb117f3c01a204bd9b024ea83c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 15 Sep 2017 10:18:32 -0700 Subject: [PATCH 12/15] Remove stale comment. --- .../scala/org/apache/spark/scheduler/AsyncEventQueue.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index ddfd198fe6bd2..8605e1da161c7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -107,8 +107,7 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi /** * Start an asynchronous thread to dispatch events to the underlying listeners. * - * @param sc Used to stop the SparkContext in case the a listener fails. - * @param metrics Used to report listener performance metrics. + * @param sc Used to stop the SparkContext in case the async dispatcher fails. */ private[scheduler] def start(sc: SparkContext): Unit = { if (started.compareAndSet(false, true)) { From 77dd8ecb549b8f5d7b7b20ac7384b8f56f1df67e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 18 Sep 2017 08:39:05 -0700 Subject: [PATCH 13/15] Remove redundant check. --- .../main/scala/org/apache/spark/scheduler/LiveListenerBus.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 33b344702956e..ada69e7e5f90d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -99,7 +99,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) { case None => val newQueue = new AsyncEventQueue(queue, conf, metrics) newQueue.addListener(listener) - if (started.get() && !stopped.get()) { + if (started.get()) { newQueue.start(sparkContext) } queues.add(newQueue) From 35d3428f6e248c2e21e9190418115fe8c0bbee5d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 18 Sep 2017 13:23:55 -0700 Subject: [PATCH 14/15] Keep reference to SparkContext. --- .../main/scala/org/apache/spark/scheduler/LiveListenerBus.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index ada69e7e5f90d..1a4aea15a0aaa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -146,6 +146,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) { throw new IllegalStateException("LiveListenerBus already started.") } + this.sparkContext = sc queues.asScala.foreach(_.start(sc)) metricsSystem.registerSource(metrics) } From 283b733dbcad326898eee2f4629a8fc1891a77c7 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 19 Sep 2017 09:45:01 -0700 Subject: [PATCH 15/15] Nits. --- .../scala/org/apache/spark/scheduler/LiveListenerBus.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 1a4aea15a0aaa..2f93c497c5771 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -69,7 +69,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) { /** Add a listener to the executor management queue. */ def addToManagementQueue(listener: SparkListenerInterface): Unit = { - addToQueue(listener, EXECUTOR_MGMT_QUEUE) + addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE) } /** Add a listener to the application status queue. */ @@ -186,10 +186,12 @@ private[spark] class LiveListenerBus(conf: SparkConf) { } } + // For testing only. private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): Seq[T] = { queues.asScala.flatMap { queue => queue.findListenersByClass[T]() } } + // For testing only. private[spark] def listeners: JList[SparkListenerInterface] = { queues.asScala.flatMap(_.listeners.asScala).asJava } @@ -209,7 +211,7 @@ private[spark] object LiveListenerBus { private[scheduler] val APP_STATUS_QUEUE = "appStatus" - private[scheduler] val EXECUTOR_MGMT_QUEUE = "executorMgmt" + private[scheduler] val EXECUTOR_MANAGEMENT_QUEUE = "executorManagement" private[scheduler] val EVENT_LOG_QUEUE = "eventLog" }