diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 7a5fb9a802354..22aa190fad87c 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager( * the scheduling task. */ def start(): Unit = { - listenerBus.addListener(listener) + listenerBus.addListener(listener, true) val scheduleTask = new Runnable() { override def run(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 136f0af7b2c9e..65f81cb43e78e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -52,6 +52,7 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.bus.BusQueue.GroupOfListener import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.storage._ @@ -522,7 +523,10 @@ class SparkContext(config: SparkConf) extends Logging { new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration) logger.start() - listenerBus.addListener(logger) + listenerBus.addProcessor( + ev => logger.log(ev), + "eventLoggerListener", + Some(EventLoggingListener.EVENT_FILTER)) Some(logger) } else { None @@ -2349,13 +2353,12 @@ class SparkContext(config: SparkConf) extends Logging { try { val listenerClassNames: Seq[String] = conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "") - for (className <- listenerClassNames) { - // Use reflection to find the right constructor + val extraListeners = listenerClassNames.map { className => val constructors = { val listenerClass = Utils.classForName(className) listenerClass - .getConstructors - .asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]] + .getConstructors + .asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]] } val constructorTakingSparkConf = constructors.find { c => c.getParameterTypes.sameElements(Array(classOf[SparkConf])) @@ -2378,8 +2381,13 @@ class SparkContext(config: SparkConf) extends Logging { " parameter from breaking Spark's ability to find a valid constructor.") } } - listenerBus.addListener(listener) - logInfo(s"Registered listener $className") + logInfo(s"listener $className created") + listener + } + if (extraListeners.nonEmpty) { + val group = GroupOfListener(extraListeners, "extraListeners") + listenerBus.addListener(group, true) + logInfo("extra-listeners registered") } } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0d3769a735869..ce7725c1297fe 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -171,12 +171,6 @@ package object config { .checkValue(_ > 0, "The capacity of listener bus event queue must not be negative") .createWithDefault(10000) - private[spark] val LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED = - ConfigBuilder("spark.scheduler.listenerbus.metrics.maxListenerClassesTimed") - .internal() - .intConf - .createWithDefault(128) - // This property sets the root namespace for metrics reporting private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace") .stringConf diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 00ab2a393e17f..594e5ef45f6d4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -55,8 +55,7 @@ private[spark] class EventLoggingListener( appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf, - hadoopConf: Configuration) - extends SparkListener with Logging { + hadoopConf: Configuration) extends Logging { import EventLoggingListener._ @@ -90,6 +89,8 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + private var nbMessageProcessed = 0 + /** * Creates the log file in the configured log directory. */ @@ -134,97 +135,38 @@ private[spark] class EventLoggingListener( } /** Log the event as JSON. */ - private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { + private def logEvent(event: SparkListenerEvent) { val eventJson = JsonProtocol.sparkEventToJson(event) // scalastyle:off println writer.foreach(_.println(compact(render(eventJson)))) // scalastyle:on println - if (flushLogger) { - writer.foreach(_.flush()) - hadoopDataStream.foreach(ds => ds.getWrappedStream match { - case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)) - case _ => ds.hflush() - }) - } if (testing) { loggedEvents += eventJson + flush() } } - // Events that do not trigger a flush - override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event) - - override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event) - - override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = logEvent(event) - - override def onTaskEnd(event: SparkListenerTaskEnd): Unit = logEvent(event) - - override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { - logEvent(redactEvent(event)) - } - - // Events that trigger a flush - override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { - logEvent(event, flushLogger = true) + private def flush(): Unit = { + writer.foreach(_.flush()) + hadoopDataStream.foreach(ds => ds.getWrappedStream match { + case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)) + case _ => ds.hflush() + }) } - override def onJobStart(event: SparkListenerJobStart): Unit = logEvent(event, flushLogger = true) - - override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, flushLogger = true) - - override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = { - logEvent(event, flushLogger = true) - } - - override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { - logEvent(event, flushLogger = true) - } - - override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { - logEvent(event, flushLogger = true) - } - - override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { - logEvent(event, flushLogger = true) - } - - override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { - logEvent(event, flushLogger = true) - } - override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { - logEvent(event, flushLogger = true) - } - - override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { - logEvent(event, flushLogger = true) - } - - override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { - logEvent(event, flushLogger = true) - } - - override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = { - logEvent(event, flushLogger = true) - } - - override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = { - logEvent(event, flushLogger = true) - } - - override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = { - logEvent(event, flushLogger = true) - } - - // No-op because logging every update would be overkill - override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {} - - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } - - override def onOtherEvent(event: SparkListenerEvent): Unit = { + def log(event: SparkListenerEvent): Unit = { if (event.logEvent) { - logEvent(event, flushLogger = true) + val toLog = event match { + case update: SparkListenerEnvironmentUpdate => + redactEvent(update) + case _ => event + } + logEvent(toLog) + nbMessageProcessed = nbMessageProcessed + 1 + if (nbMessageProcessed >= FLUSH_FREQUENCY) { + flush() + nbMessageProcessed = 0 + } } } @@ -278,6 +220,12 @@ private[spark] object EventLoggingListener extends Logging { val IN_PROGRESS = ".inprogress" val DEFAULT_LOG_DIR = "/tmp/spark-events" + private val FLUSH_FREQUENCY = 200 + + val EVENT_FILTER: SparkListenerEvent => Boolean = + ev => !(ev.isInstanceOf[SparkListenerBlockUpdated] || + ev.isInstanceOf[SparkListenerExecutorMetricsUpdate]) + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) // A cache for compression codecs to avoid creating the same codec many times diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 7d5e9809dd7b2..60b52674ef5ad 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -18,19 +18,20 @@ package org.apache.spark.scheduler import java.util.concurrent._ -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantLock -import scala.collection.mutable -import scala.util.DynamicVariable - -import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer} +import com.codahale.metrics.Timer +import scala.reflect.ClassTag +import scala.util.{DynamicVariable, Try} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.metrics.source.Source -import org.apache.spark.util.Utils +import org.apache.spark.scheduler.bus._ +import org.apache.spark.scheduler.bus.BusQueue.{GenericProcessor, GroupOfListener} +import org.apache.spark.util.WithListenerBus /** * Asynchronously passes SparkListenerEvents to registered SparkListeners. @@ -39,98 +40,114 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ -private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { - - self => +private[spark] class LiveListenerBus(conf: SparkConf) + extends WithListenerBus[SparkListenerInterface, SparkListenerEvent] with Logging{ import LiveListenerBus._ private var sparkContext: SparkContext = _ + private var metricsSystem: MetricsSystem = _ - // Cap the capacity of the event queue so we get an explicit error (rather than - // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private val eventQueue = - new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + private val defaultListenerPool = GroupOfListener("default") + private val defaultListenerQueue = BusQueue( + conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY), + defaultListenerPool, + BusQueue.ALL_MESSAGES) - private[spark] val metrics = new LiveListenerBusMetrics(conf, eventQueue) + @volatile private var otherListenerQueues = Seq.empty[BusQueue] - // Indicate if `start()` is called - private val started = new AtomicBoolean(false) - // Indicate if `stop()` is called + // start, stop and add/remove listener should be mutually exclusive + private val startStopAddRemoveLock = new ReentrantLock() + // Will be set modified in a synchronized function + @volatile private var started = false private val stopped = new AtomicBoolean(false) - /** A counter for dropped events. It will be reset every time we log it. */ - private val droppedEventsCounter = new AtomicLong(0L) - - /** When `droppedEventsCounter` was logged last time in milliseconds. */ - @volatile private var lastReportTimestamp = 0L - - // Indicate if we are processing some event - // Guarded by `self` - private var processingEvent = false - - private val logDroppedEvent = new AtomicBoolean(false) + /** + * if isolatedIfPossible is true, add the listener to an isolated pool. + * Otherwise add it to the default pool. + * This method is thread-safe and can be called in any thread. + */ + final override def addListener(listener: SparkListenerInterface, + isolatedIfPossible: Boolean): Unit = { + startStopAddRemoveLock.lock() + Try{ + if (isolatedIfPossible) { + addQueue(BusQueue( + conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY), + listener, + BusQueue.ALL_MESSAGES)) + } else { + defaultListenerPool.addListener(listener) + } + } + startStopAddRemoveLock.unlock() + } - // A counter that represents the number of events produced and consumed in the queue - private val eventLock = new Semaphore(0) + /** + * Add a generic listener to an isolated pool. + */ + def addProcessor(processor: SparkListenerEvent => Unit, + busName: String, + eventFilter: Option[SparkListenerEvent => Boolean] = None): Unit = { + addQueue(BusQueue( + busName, + conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY), + processor, + eventFilter.getOrElse(BusQueue.ALL_MESSAGES)) + ) + } - private val listenerThread = new Thread(name) { - setDaemon(true) - override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { - LiveListenerBus.withinListenerThread.withValue(true) { - val timer = metrics.eventProcessingTime - while (true) { - eventLock.acquire() - self.synchronized { - processingEvent = true - } - try { - val event = eventQueue.poll - if (event == null) { - // Get out of the while loop and shutdown the daemon thread - if (!stopped.get) { - throw new IllegalStateException("Polling `null` from eventQueue means" + - " the listener bus has been stopped. So `stopped` must be true") - } - return - } - val timerContext = timer.time() - try { - postToAll(event) - } finally { - timerContext.stop() - } - } finally { - self.synchronized { - processingEvent = false - } - } - } - } + def removeProcessor(processorBusName: String): Unit = { + startStopAddRemoveLock.lock() + Try { + val queue = otherListenerQueues + .filter(q => q.processor.isInstanceOf[GenericProcessor]) + .find(_.processor.asInstanceOf[GenericProcessor].name == processorBusName) + queue.foreach { q => + otherListenerQueues = otherListenerQueues.filter(_ != q) + q.askStop() + q.waitForStop() } } + startStopAddRemoveLock.unlock() + } - override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { - metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface])) + private def addQueue(queue : BusQueue): Unit = { + startStopAddRemoveLock.lock() + Try { + if (started) { + queue.start(sparkContext, metricsSystem) + } + otherListenerQueues = otherListenerQueues :+ queue + } + startStopAddRemoveLock.unlock() } - /** - * Start sending events to attached listeners. - * - * This first sends out all buffered events posted before this listener bus has started, then - * listens for any additional events asynchronously while the listener bus is still running. - * This should only be called once. - * - * @param sc Used to stop the SparkContext in case the listener thread dies. - */ - def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = { - if (started.compareAndSet(false, true)) { - sparkContext = sc - metricsSystem.registerSource(metrics) - listenerThread.start() - } else { - throw new IllegalStateException(s"$name already started!") + /** + * Remove a listener + * This method is thread-safe and can be called in any thread + */ + final override def removeListener(listener: SparkListenerInterface): Unit = { + startStopAddRemoveLock.lock() + Try { + // First we try to delete it from the default queue + defaultListenerPool.removeListener(listener) + // Then from the other queue. + val holder = otherListenerQueues.find(q => q.listeners.contains(listener)) + holder.foreach{q => + val listeners = q.listeners + if (listeners.size > 1) { + throw new IllegalArgumentException("Cannot remove a listener from a fixed group") + } else { + // First we remove the queue from the list (no more message will be posted) + otherListenerQueues = otherListenerQueues.filter(_ != q) + // Then stop it + q.askStop() + q.waitForStop() + } + } } + startStopAddRemoveLock.unlock() } def post(event: SparkListenerEvent): Unit = { @@ -139,30 +156,67 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { logDebug(s"$name has already stopped! Dropping event $event") return } - metrics.numEventsPosted.inc() - val eventAdded = eventQueue.offer(event) - if (eventAdded) { - eventLock.release() + defaultListenerQueue.post(event) + otherListenerQueues.foreach(q => q.post(event)) + } + + /** + * For testing only + */ + override private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag] = + defaultListenerQueue.findListenersByClass ++ otherListenerQueues.flatMap(_.findListenersByClass) + + override private[spark] def listeners = + defaultListenerQueue.listeners ++ otherListenerQueues.flatMap(_.listeners) + + /** + * Start sending events to attached listeners. + * + * This first sends out all buffered events posted before this listener bus has started, then + * listens for any additional events asynchronously while the listener bus is still running. + * This should only be called once. + * + */ + def start(sc: SparkContext, ms: MetricsSystem): Unit = { + startStopAddRemoveLock.lock() + if (!started) { + Try { + sparkContext = sc + metricsSystem = ms + defaultListenerQueue.start(sc, ms) + otherListenerQueues.foreach(_.start(sc, ms)) + started = true + } + startStopAddRemoveLock.unlock() } else { - onDropEvent(event) + startStopAddRemoveLock.unlock() + throw new IllegalStateException("LiveListener bus already started!") } + } - val droppedEvents = droppedEventsCounter.get - if (droppedEvents > 0) { - // Don't log too frequently - if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { - // There may be multiple threads trying to decrease droppedEventsCounter. - // Use "compareAndSet" to make sure only one thread can win. - // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and - // then that thread will update it. - if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { - val prevLastReportTimestamp = lastReportTimestamp - lastReportTimestamp = System.currentTimeMillis() - logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + - new java.util.Date(prevLastReportTimestamp)) - } + /** + * Stop the listener bus. It will wait until the queued events have been processed, but drop the + * new events after stopping. + */ + def stop(): Unit = { + startStopAddRemoveLock.lock() + if (!started) { + startStopAddRemoveLock.unlock() + throw new IllegalStateException("Attempted to stop the LiveListener " + + "bus that has not yet started!") + } + Try { + if (!stopped.get) { + stopped.set(true) + defaultListenerQueue.askStop() + otherListenerQueues.foreach(_.askStop()) + defaultListenerQueue.waitForStop() + otherListenerQueues.foreach(_.waitForStop()) + } else { + // Keep quiet } } + startStopAddRemoveLock.unlock() } /** @@ -172,7 +226,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { * Exposed for testing. */ @throws(classOf[TimeoutException]) - def waitUntilEmpty(timeoutMillis: Long): Unit = { + private[spark] def waitUntilEmpty(timeoutMillis: Long): Unit = { val finishTime = System.currentTimeMillis + timeoutMillis while (!queueIsEmpty) { if (System.currentTimeMillis > finishTime) { @@ -189,51 +243,24 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { * For testing only. Return whether the listener daemon thread is still alive. * Exposed for testing. */ - def listenerThreadIsAlive: Boolean = listenerThread.isAlive + private[scheduler] def listenerThreadIsAlive: Boolean = + defaultListenerQueue.isAlive && otherListenerQueues.forall(_.isAlive) + + /** + * Exposed for testing. + */ + private[scheduler] def metricsFromMainQueue: + (QueueMetrics, Map[SparkListenerInterface, Option[Timer]]) = ( + defaultListenerQueue.metrics, + defaultListenerPool.listeners.toMap + ) /** * Return whether the event queue is empty. - * - * The use of synchronized here guarantees that all events that once belonged to this queue - * have already been processed by all attached listeners, if this returns true. */ - private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent } + private def queueIsEmpty: Boolean = + defaultListenerQueue.isQueueEmpty && otherListenerQueues.forall(_.isQueueEmpty) - /** - * Stop the listener bus. It will wait until the queued events have been processed, but drop the - * new events after stopping. - */ - def stop(): Unit = { - if (!started.get()) { - throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") - } - if (stopped.compareAndSet(false, true)) { - // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know - // `stop` is called. - eventLock.release() - listenerThread.join() - } else { - // Keep quiet - } - } - - /** - * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be - * notified with the dropped events. - * - * Note: `onDropEvent` can be called in any thread. - */ - def onDropEvent(event: SparkListenerEvent): Unit = { - metrics.numDroppedEvents.inc() - droppedEventsCounter.incrementAndGet() - if (logDroppedEvent.compareAndSet(false, true)) { - // Only log the following message once to avoid duplicated annoying logs. - logError("Dropping SparkListenerEvent because no remaining room in event queue. " + - "This likely means one of the SparkListeners is too slow and cannot keep up with " + - "the rate at which tasks are being started by the scheduler.") - } - logTrace(s"Dropping event $event") - } } private[spark] object LiveListenerBus { @@ -244,64 +271,3 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } -private[spark] class LiveListenerBusMetrics( - conf: SparkConf, - queue: LinkedBlockingQueue[_]) - extends Source with Logging { - - override val sourceName: String = "LiveListenerBus" - override val metricRegistry: MetricRegistry = new MetricRegistry - - /** - * The total number of events posted to the LiveListenerBus. This is a count of the total number - * of events which have been produced by the application and sent to the listener bus, NOT a - * count of the number of events which have been processed and delivered to listeners (or dropped - * without being delivered). - */ - val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) - - /** - * The total number of events that were dropped without being delivered to listeners. - */ - val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) - - /** - * The amount of time taken to post a single event to all listeners. - */ - val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) - - /** - * The number of messages waiting in the queue. - */ - val queueSize: Gauge[Int] = { - metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ - override def getValue: Int = queue.size() - }) - } - - // Guarded by synchronization. - private val perListenerClassTimers = mutable.Map[String, Timer]() - - /** - * Returns a timer tracking the processing time of the given listener class. - * events processed by that listener. This method is thread-safe. - */ - def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = { - synchronized { - val className = cls.getName - val maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED) - perListenerClassTimers.get(className).orElse { - if (perListenerClassTimers.size == maxTimed) { - logError(s"Not measuring processing time for listener class $className because a " + - s"maximum of $maxTimed listener classes are already timed.") - None - } else { - perListenerClassTimers(className) = - metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", className)) - perListenerClassTimers.get(className) - } - } - } - } -} - diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index b76e560669d59..57fe819363d0c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -136,7 +136,10 @@ case class SparkListenerNodeUnblacklisted(time: Long, hostId: String) extends SparkListenerEvent @DeveloperApi -case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent +case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) + extends SparkListenerEvent { + override protected[spark] val logEvent = false +} /** * Periodic updates from executors. @@ -147,7 +150,9 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends case class SparkListenerExecutorMetricsUpdate( execId: String, accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) - extends SparkListenerEvent + extends SparkListenerEvent { + override protected[spark] val logEvent = false +} @DeveloperApi case class SparkListenerApplicationStart( diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 056c0cbded435..a82926f5e4085 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -27,7 +27,12 @@ private[spark] trait SparkListenerBus protected override def doPostEvent( listener: SparkListenerInterface, - event: SparkListenerEvent): Unit = { + event: SparkListenerEvent): Unit = SparkListenerEventDispatcher.dispatch(listener, event) +} + +private[spark] object SparkListenerEventDispatcher { + + def dispatch(listener: SparkListenerInterface, event: SparkListenerEvent): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => listener.onStageSubmitted(stageSubmitted) @@ -76,5 +81,4 @@ private[spark] trait SparkListenerBus case _ => listener.onOtherEvent(event) } } - } diff --git a/core/src/main/scala/org/apache/spark/scheduler/bus/BusQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/bus/BusQueue.scala new file mode 100644 index 0000000000000..9db3cc201d55b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/bus/BusQueue.scala @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.bus + +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference} + +import com.codahale.metrics.Timer +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.bus.BusQueue.SparkListenerEventProcessor +import org.apache.spark.util.Utils + + +// One producer one consumer asynchronous queue. +private[spark] class BusQueue ( + val processor: SparkListenerEventProcessor, + bufferSize: Int, + withEventProcessingTime: Boolean, + private val eventFilter: SparkListenerEvent => Boolean) extends Logging { + + import BusQueue._ + + private var sparkContext: SparkContext = _ + + private val circularBuffer = new ArrayBlockingQueue[SparkListenerEvent](bufferSize) + private val numberOfEvents = new AtomicInteger(0) + + private val hasDropped = new AtomicBoolean(false) + private val numberOfDrop = new AtomicLong(0L) + + private val stopped = new AtomicBoolean(false) + + private[scheduler] val metrics = + new QueueMetrics(processor.name, numberOfEvents, withEventProcessingTime) + + private val consumerThread = new Thread(s"${processor.name} bus consumer") { + setDaemon(true) + override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { + LiveListenerBus.withinListenerThread.withValue(true) { + val oTimer = metrics.eventProcessingTime + while (true) { + val newElem = circularBuffer.take() + if (newElem.eq(LAST_PROCESSED_MESSAGE)) { + return + } + else { + val timerContext = oTimer.map(_.time()) + try { + consumeEvent(newElem) + } catch { + case NonFatal(e) => + logError(s"Listener bus ${processor.name} threw an exception", e) + } + numberOfEvents.decrementAndGet() + timerContext.foreach(_.stop()) + } + } + } + } + } + + // should be called only once + private[scheduler] def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = { + sparkContext = sc + initAdditionalMetrics(metrics) + metricsSystem.registerSource(metrics) + consumerThread.start() + } + + private[scheduler] def askStop(): Unit = stopped.set(true) + + // should be called only once + private[scheduler] def waitForStop(): Unit = { + if (!stopped.get()) { + throw new IllegalStateException(s"${processor.name} was not asked for stop !") + } + circularBuffer.put(LAST_PROCESSED_MESSAGE) + consumerThread.join() + } + + private[scheduler] def post(event: SparkListenerEvent): Unit = { + if (eventFilter(event)) { + val hasPosted = circularBuffer.offer(event) + if (hasPosted) { + numberOfEvents.incrementAndGet() + metrics.numEventsPosted.inc() + } else { + onDropEvent() + } + } + } + + // For test only + private[scheduler] def isAlive: Boolean = consumerThread.isAlive + + // For test only + // need to test both value to be sure that the queue is empty and no event is being processed + private[scheduler] def isQueueEmpty: Boolean = + circularBuffer.size() == 0 && numberOfEvents.get() == 0 + + + private def onDropEvent(): Unit = { + if (hasDropped.compareAndSet(false, true)) { + logError(s"Dropping SparkListenerEvent from the bus ${processor.name} because no remaining " + + "room in event queue. " + + "This likely means one of the SparkListeners is too slow and cannot keep up with " + + "the rate at which tasks are being started by the scheduler.") + } + numberOfDrop.incrementAndGet() + metrics.numDroppedEvents.inc() + if (numberOfDrop.get() >= DROP_MESSAGE_LOG_FREQUENCY) { + logWarning(s"$DROP_MESSAGE_LOG_FREQUENCY SparkListenerEvents have been dropped " + + s"from the bus ${processor.name}") + numberOfDrop.set(0L) + } + } + + private def consumeEvent(ev: SparkListenerEvent): Unit = processor.consumeEvent(ev) + + private[spark] def listeners: Seq[SparkListenerInterface] = + processor.oListener + .map { + case group: GroupOfListener => group.listeners.map(_._1) + case l => Seq(l) + } + .getOrElse(Seq.empty) + + protected def initAdditionalMetrics(queueMetrics: QueueMetrics): Unit = + processor.oListener + .foreach { case group: GroupOfListener => group.initTimers(queueMetrics) } + + private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag] = { + val c = implicitly[ClassTag[T]].runtimeClass + listeners.filter(_.getClass == c).map(_.asInstanceOf[T]) + } + +} + +private[spark] object BusQueue { + + private val DROP_MESSAGE_LOG_FREQUENCY = 50 + + private object LAST_PROCESSED_MESSAGE extends SparkListenerEvent + + def apply(busName: String, + bufferSize: Int, + process: SparkListenerEvent => Unit, + eventFilter: SparkListenerEvent => Boolean): BusQueue = + new BusQueue(GenericProcessor(process, busName), bufferSize, true, eventFilter) + + def apply(bufferSize: Int, + listener: SparkListenerInterface, + eventFilter: SparkListenerEvent => Boolean): BusQueue = + new BusQueue( + ListenerProcessor(listener), + bufferSize, + listener match { + case _: GroupOfListener => false + case _ => true + }, + eventFilter + ) + + private[scheduler] trait SparkListenerEventProcessor { + def consumeEvent(ev: SparkListenerEvent): Unit + + val oListener: Option[SparkListenerInterface] + + val name: String + } + + private case class ListenerProcessor(listener: SparkListenerInterface) + extends SparkListenerEventProcessor{ + override def consumeEvent(ev: SparkListenerEvent): Unit = + SparkListenerEventDispatcher.dispatch(listener, ev) + + override val oListener: Option[SparkListenerInterface] = Some(listener) + override val name: String = listener match { + case group: GroupOfListener => group.busName + case simpleListener => simpleListener.getClass.getSimpleName + } + } + + private[scheduler] case class GenericProcessor(process: SparkListenerEvent => Unit, label: String) + extends SparkListenerEventProcessor{ + override def consumeEvent(ev: SparkListenerEvent): Unit = process(ev) + + override val oListener: Option[SparkListenerInterface] = None + override val name: String = label + } + + private[scheduler] val ALL_MESSAGES: SparkListenerEvent => Boolean = _ => true + + private[spark] object GroupOfListener { + + def apply(listenerSeq: Seq[SparkListenerInterface], busName: String): GroupOfListener = { + val group = GroupOfListener(busName) + listenerSeq.foreach( l => group.addListener(l)) + group + } + + def apply(busName: String): GroupOfListener = { + GroupOfListener(busName) + } + } + + private[bus] class GroupOfListener(val busName: String) + extends SparkListenerInterface with Logging { + + private var queueMetrics: Option[QueueMetrics] = None + + private val group: AtomicReference[Seq[(SparkListenerInterface, Option[Timer])]] = + new AtomicReference[Seq[(SparkListenerInterface, Option[Timer])]](Seq.empty) + + private[scheduler] def listeners = group.get() + + private[bus] def initTimers(metrics: QueueMetrics): Unit = { + queueMetrics = Some(metrics) + val current = listeners + group.set( + current.map(l => (l._1, + Some(metrics.getTimerForIndividualListener(l._1.getClass.getSimpleName))))) + } + + private[scheduler] def addListener(l: SparkListenerInterface): Unit = { + val current = listeners + val newVal = current :+ (l, + queueMetrics.map(_.getTimerForIndividualListener(l.getClass.getSimpleName))) + group.set(newVal) + } + + private[scheduler] def removeListener(l: SparkListenerInterface): Unit = { + val current = listeners + val newVal = current.filter(t => !(t._1 == l)) + group.set(newVal) + } + + override def onStageCompleted( + stageCompleted: SparkListenerStageCompleted): Unit = + postToAll(stageCompleted, _.onStageCompleted) + + override def onStageSubmitted( + stageSubmitted: SparkListenerStageSubmitted): Unit = + postToAll(stageSubmitted, _.onStageSubmitted) + + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = + postToAll(taskStart, _.onTaskStart) + + override def onTaskGettingResult( + taskGettingResult: SparkListenerTaskGettingResult): Unit = + postToAll(taskGettingResult, _.onTaskGettingResult) + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = + postToAll(taskEnd, _.onTaskEnd) + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = + postToAll(jobStart, _.onJobStart) + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = + postToAll(jobEnd, _.onJobEnd) + + override def onEnvironmentUpdate( + environmentUpdate: SparkListenerEnvironmentUpdate): Unit = + postToAll(environmentUpdate, _.onEnvironmentUpdate) + + override def onBlockManagerAdded( + blockManagerAdded: SparkListenerBlockManagerAdded): Unit = + postToAll(blockManagerAdded, _.onBlockManagerAdded) + + override def onBlockManagerRemoved( + blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = + postToAll(blockManagerRemoved, _.onBlockManagerRemoved) + + override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = + postToAll(unpersistRDD, _.onUnpersistRDD) + + override def onApplicationStart( + applicationStart: SparkListenerApplicationStart): Unit = + postToAll(applicationStart, _.onApplicationStart) + + override def onApplicationEnd( + applicationEnd: SparkListenerApplicationEnd): Unit = + postToAll(applicationEnd, _.onApplicationEnd) + + override def onExecutorMetricsUpdate( + executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = + postToAll(executorMetricsUpdate, _.onExecutorMetricsUpdate) + + override def onExecutorAdded( + executorAdded: SparkListenerExecutorAdded): Unit = + postToAll(executorAdded, _.onExecutorAdded) + + override def onExecutorRemoved( + executorRemoved: SparkListenerExecutorRemoved): Unit = + postToAll(executorRemoved, _.onExecutorRemoved) + + override def onExecutorBlacklisted( + executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = + postToAll(executorBlacklisted, _.onExecutorBlacklisted) + + override def onExecutorUnblacklisted( + executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = + postToAll(executorUnblacklisted, _.onExecutorUnblacklisted) + + override def onNodeBlacklisted( + nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = + postToAll(nodeBlacklisted, _.onNodeBlacklisted) + + override def onNodeUnblacklisted( + nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = + postToAll(nodeUnblacklisted, _.onNodeUnblacklisted) + + override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = + postToAll(blockUpdated, _.onBlockUpdated) + + override def onOtherEvent(event: SparkListenerEvent): Unit = + postToAll(event, _.onOtherEvent) + + private def postToAll[T <: SparkListenerEvent](ev: T, + func: SparkListenerInterface => T => Unit): Unit = { + val currentCollection = listeners + var i = 0 + while (i < currentCollection.length) { + val listenerAndTimer = currentCollection(i) + val timer = listenerAndTimer._2.map(_.time()) + try { + func(listenerAndTimer._1)(ev) + } catch { + case NonFatal(e) => + logError(s"a listener ${listenerAndTimer._1.getClass.getName} threw an exception", e) + } + timer.foreach(_.stop()) + i = i + 1 + } + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/bus/QueueMetrics.scala b/core/src/main/scala/org/apache/spark/scheduler/bus/QueueMetrics.scala new file mode 100644 index 0000000000000..bbde12017cc1b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/bus/QueueMetrics.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.bus + +import java.util.concurrent.atomic.AtomicInteger + +import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer} + +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.Source + +private[spark] class QueueMetrics( + busName: String, + private val nbElements: AtomicInteger, + withEventProcessingTime: Boolean) extends Source with Logging { + + override val sourceName: String = s"${busName}Bus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This is a count of the total number + * of events which have been produced by the application and sent to the listener bus, NOT a + * count of the number of events which have been processed and delivered to listeners (or dropped + * without being delivered). + */ + val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The number of messages waiting in the queue. + */ + val queueSize: Gauge[Int] = { + metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ + override def getValue: Int = nbElements.get() + }) + } + + val eventProcessingTime: Option[Timer] = { + if (withEventProcessingTime) { + Some(metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))) + } else { + None + } + } + + def getTimerForIndividualListener(listenerlabel: String): Timer = { + metricRegistry.timer(MetricRegistry.name( + "listeners", + listenerlabel, + "eventProcessingTime")) + } + +} + diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 589f811145519..3c635fb588a48 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -24,8 +24,8 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ -import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo, - UIRoot} +import org.apache.spark.scheduler.bus.BusQueue.GroupOfListener +import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo, UIRoot} import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} @@ -33,7 +33,7 @@ import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab} import org.apache.spark.ui.scope.RDDOperationGraphListener import org.apache.spark.ui.storage.{StorageListener, StorageTab} -import org.apache.spark.util.Utils +import org.apache.spark.util.{Utils, WithListenerBus} /** * Top level user interface for a Spark application. @@ -159,24 +159,24 @@ private[spark] object SparkUI { } def createLiveUI( - sc: SparkContext, - conf: SparkConf, - listenerBus: SparkListenerBus, - jobProgressListener: JobProgressListener, - securityManager: SecurityManager, - appName: String, - startTime: Long): SparkUI = { + sc: SparkContext, + conf: SparkConf, + listenerBus: WithListenerBus[SparkListenerInterface, SparkListenerEvent], + jobProgressListener: JobProgressListener, + securityManager: SecurityManager, + appName: String, + startTime: Long): SparkUI = { create(Some(sc), conf, listenerBus, securityManager, appName, jobProgressListener = Some(jobProgressListener), startTime = startTime) } def createHistoryUI( - conf: SparkConf, - listenerBus: SparkListenerBus, - securityManager: SecurityManager, - appName: String, - basePath: String, - startTime: Long): SparkUI = { + conf: SparkConf, + listenerBus: WithListenerBus[SparkListenerInterface, SparkListenerEvent], + securityManager: SecurityManager, + appName: String, + basePath: String, + startTime: Long): SparkUI = { val sparkUI = create( None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime) @@ -184,7 +184,7 @@ private[spark] object SparkUI { Utils.getContextOrSparkClassLoader).asScala listenerFactories.foreach { listenerFactory => val listeners = listenerFactory.createListeners(conf, sparkUI) - listeners.foreach(listenerBus.addListener) + listeners.foreach(l => listenerBus.addListener(l, false)) } sparkUI } @@ -197,14 +197,14 @@ private[spark] object SparkUI { * web UI will create and register its own JobProgressListener. */ private def create( - sc: Option[SparkContext], - conf: SparkConf, - listenerBus: SparkListenerBus, - securityManager: SecurityManager, - appName: String, - basePath: String = "", - jobProgressListener: Option[JobProgressListener] = None, - startTime: Long): SparkUI = { + sc: Option[SparkContext], + conf: SparkConf, + listenerBus: WithListenerBus[SparkListenerInterface, SparkListenerEvent], + securityManager: SecurityManager, + appName: String, + basePath: String = "", + jobProgressListener: Option[JobProgressListener] = None, + startTime: Long): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { val listener = new JobProgressListener(conf) @@ -218,11 +218,11 @@ private[spark] object SparkUI { val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) - listenerBus.addListener(environmentListener) - listenerBus.addListener(storageStatusListener) - listenerBus.addListener(executorsListener) - listenerBus.addListener(storageListener) - listenerBus.addListener(operationGraphListener) + listenerBus.addListener( + GroupOfListener( + Seq(environmentListener, storageStatusListener, executorsListener, + storageListener, operationGraphListener), + "ui"), true) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, executorsListener, _jobProgressListener, storageListener, operationGraphListener, diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index 76a56298aaebc..97dfde7e791b6 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -23,41 +23,40 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal -import com.codahale.metrics.Timer - import org.apache.spark.internal.Logging +private [spark] trait WithListenerBus[L <: AnyRef, E]{ + + def addListener(listener: L, isolatedIfPossible: Boolean = false): Unit + + def removeListener(listener: L): Unit + + private[spark] def findListenersByClass[T <: L : ClassTag]: Seq[T] + + private[spark] def listeners: Seq[L] +} + /** * An event bus which posts events to its listeners. */ -private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { - - private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])] +private[spark] trait ListenerBus[L <: AnyRef, E] extends WithListenerBus[L, E] with Logging { // Marked `private[spark]` for access in tests. - private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava - - /** - * Returns a CodaHale metrics Timer for measuring the listener's event processing time. - * This method is intended to be overridden by subclasses. - */ - protected def getTimer(listener: L): Option[Timer] = None + private[spark] val internalListeners = new CopyOnWriteArrayList[L] /** * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ - final def addListener(listener: L): Unit = { - listenersPlusTimers.add((listener, getTimer(listener))) + final override def addListener(listener: L, isolatedIfPossible: Boolean) : Unit = { + internalListeners.add(listener) } /** * Remove a listener and it won't receive any events. This method is thread-safe and can be called * in any thread. */ - final def removeListener(listener: L): Unit = { - listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer => - listenersPlusTimers.remove(listenerAndTimer) - } + final override def removeListener(listener: L): Unit = { + internalListeners.remove(listener) } /** @@ -68,25 +67,14 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. To avoid the wrapper cost, here we use // Java Iterator directly. - val iter = listenersPlusTimers.iterator + val iter = internalListeners.iterator while (iter.hasNext) { - val listenerAndMaybeTimer = iter.next() - val listener = listenerAndMaybeTimer._1 - val maybeTimer = listenerAndMaybeTimer._2 - val maybeTimerContext = if (maybeTimer.isDefined) { - maybeTimer.get.time() - } else { - null - } + val listener = iter.next() try { doPostEvent(listener, event) } catch { case NonFatal(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) - } finally { - if (maybeTimerContext != null) { - maybeTimerContext.stop() - } } } } @@ -97,9 +85,10 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { */ protected def doPostEvent(listener: L, event: E): Unit - private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = { + private[spark] override def findListenersByClass[T <: L : ClassTag]: Seq[T] = { val c = implicitly[ClassTag[T]].runtimeClass - listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq + listeners.filter(_.getClass == c).map(_.asInstanceOf[T]) } + override private[spark] def listeners = internalListeners.asScala } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 7da4bae0ab7eb..5276c3e38c2cb 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -95,7 +95,7 @@ class ExecutorAllocationManagerSuite test("add executors") { sc = createSparkContext(1, 10, 1) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Keep adding until the limit is reached assert(numExecutorsTarget(manager) === 1) @@ -140,7 +140,7 @@ class ExecutorAllocationManagerSuite test("add executors capped by num pending tasks") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 5))) // Verify that we're capped at number of tasks in the stage assert(numExecutorsTarget(manager) === 0) @@ -156,10 +156,10 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 3))) + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) - sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 6) @@ -172,9 +172,9 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that re-running a task doesn't blow things up - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3))) - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 3))) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 9) assert(numExecutorsToAdd(manager) === 2) @@ -183,7 +183,7 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task once we're at our limit doesn't blow things up - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1"))) assert(addExecutors(manager) === 0) assert(numExecutorsTarget(manager) === 10) } @@ -225,7 +225,7 @@ class ExecutorAllocationManagerSuite test("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5))) assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) @@ -236,15 +236,15 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 3) val task1Info = createTaskInfo(0, 0, "executor-1") - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info)) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, task1Info)) assert(numExecutorsToAdd(manager) === 4) assert(addExecutors(manager) === 2) val task2Info = createTaskInfo(1, 0, "executor-1") - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info)) - sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task1Info, null)) - sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task2Info, null)) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, task2Info)) + post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task1Info, null)) + post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task2Info, null)) assert(adjustRequestedExecutors(manager) === -1) } @@ -390,7 +390,7 @@ class ExecutorAllocationManagerSuite test ("interleaving add and remove") { sc = createSparkContext(5, 12, 5) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Add a few executors assert(addExecutors(manager) === 1) @@ -569,7 +569,7 @@ class ExecutorAllocationManagerSuite val clock = new ManualClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Scheduler queue backlogged onSchedulerBacklogged(manager) @@ -682,26 +682,26 @@ class ExecutorAllocationManagerSuite // Starting a stage should start the add timer val numTasks = 10 - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, numTasks))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, numTasks))) assert(addTime(manager) !== NOT_SET) // Starting a subset of the tasks should not cancel the add timer val taskInfos = (0 to numTasks - 1).map { i => createTaskInfo(i, i, "executor-1") } - taskInfos.tail.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) } + taskInfos.tail.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) } assert(addTime(manager) !== NOT_SET) // Starting all remaining tasks should cancel the add timer - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfos.head)) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfos.head)) assert(addTime(manager) === NOT_SET) // Start two different stages // The add timer should be canceled only if all tasks in both stages start running - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, numTasks))) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, numTasks))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, numTasks))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, numTasks))) assert(addTime(manager) !== NOT_SET) - taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, info)) } + taskInfos.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(1, 0, info)) } assert(addTime(manager) !== NOT_SET) - taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, info)) } + taskInfos.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(2, 0, info)) } assert(addTime(manager) === NOT_SET) } @@ -715,22 +715,22 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).size === 5) // Starting a task cancel the remove timer for that executor - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) assert(removeTimes(manager).size === 3) assert(!removeTimes(manager).contains("executor-1")) assert(!removeTimes(manager).contains("executor-2")) // Finishing all tasks running on an executor should start the remove timer for that executor - sc.listenerBus.postToAll(SparkListenerTaskEnd( + post(sc.listenerBus, SparkListenerTaskEnd( 0, 0, "task-type", Success, createTaskInfo(0, 0, "executor-1"), new TaskMetrics)) - sc.listenerBus.postToAll(SparkListenerTaskEnd( + post(sc.listenerBus, SparkListenerTaskEnd( 0, 0, "task-type", Success, createTaskInfo(2, 2, "executor-2"), new TaskMetrics)) assert(removeTimes(manager).size === 4) assert(!removeTimes(manager).contains("executor-1")) // executor-1 has not finished yet assert(removeTimes(manager).contains("executor-2")) - sc.listenerBus.postToAll(SparkListenerTaskEnd( + post(sc.listenerBus, SparkListenerTaskEnd( 0, 0, "task-type", Success, createTaskInfo(1, 1, "executor-1"), new TaskMetrics)) assert(removeTimes(manager).size === 5) assert(removeTimes(manager).contains("executor-1")) // executor-1 has now finished @@ -743,13 +743,13 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).isEmpty) // New executors have registered - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(removeTimes(manager).contains("executor-1")) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-2", new ExecutorInfo("host2", 1, Map.empty))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) @@ -757,14 +757,14 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).contains("executor-2")) // Existing executors have disconnected - sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", "")) + post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "executor-1", "")) assert(executorIds(manager).size === 1) assert(!executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(!removeTimes(manager).contains("executor-1")) // Unknown executor has disconnected - sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", "")) + post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "executor-3", "")) assert(executorIds(manager).size === 1) assert(removeTimes(manager).size === 1) } @@ -775,8 +775,8 @@ class ExecutorAllocationManagerSuite assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) @@ -788,15 +788,15 @@ class ExecutorAllocationManagerSuite val manager = sc.executorAllocationManager.get assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 0) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-2", new ExecutorInfo("host1", 1, Map.empty))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) @@ -809,7 +809,7 @@ class ExecutorAllocationManagerSuite sc = createSparkContext(0, 100000, 0) val manager = sc.executorAllocationManager.get val stage1 = createStageInfo(0, 1000) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) + post(sc.listenerBus, SparkListenerStageSubmitted(stage1)) assert(addExecutors(manager) === 1) assert(addExecutors(manager) === 2) @@ -820,12 +820,12 @@ class ExecutorAllocationManagerSuite onExecutorAdded(manager, s"executor-$i") } assert(executorIds(manager).size === 15) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) + post(sc.listenerBus, SparkListenerStageCompleted(stage1)) adjustRequestedExecutors(manager) assert(numExecutorsTarget(manager) === 0) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 1000))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 1000))) addExecutors(manager) assert(numExecutorsTarget(manager) === 16) } @@ -842,7 +842,7 @@ class ExecutorAllocationManagerSuite // Verify whether the initial number of executors is kept with no pending tasks assert(numExecutorsTarget(manager) === 3) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 2))) clock.advance(100L) assert(maxNumExecutorsNeeded(manager) === 2) @@ -892,7 +892,7 @@ class ExecutorAllocationManagerSuite Seq.empty ) val stageInfo1 = createStageInfo(1, 5, localityPreferences1) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo1)) + post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo1)) assert(localityAwareTasks(manager) === 3) assert(hostToLocalTaskCount(manager) === @@ -904,13 +904,13 @@ class ExecutorAllocationManagerSuite Seq.empty ) val stageInfo2 = createStageInfo(2, 3, localityPreferences2) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo2)) + post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo2)) assert(localityAwareTasks(manager) === 5) assert(hostToLocalTaskCount(manager) === Map("host1" -> 2, "host2" -> 4, "host3" -> 4, "host4" -> 3, "host5" -> 2)) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo1)) + post(sc.listenerBus, SparkListenerStageCompleted(stageInfo1)) assert(localityAwareTasks(manager) === 2) assert(hostToLocalTaskCount(manager) === Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2)) @@ -921,16 +921,16 @@ class ExecutorAllocationManagerSuite val manager = sc.executorAllocationManager.get assert(maxNumExecutorsNeeded(manager) === 0) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1))) assert(maxNumExecutorsNeeded(manager) === 1) val taskInfo = createTaskInfo(1, 1, "executor-1") - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfo)) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo)) assert(maxNumExecutorsNeeded(manager) === 1) // If the task is failed, we expect it to be resubmitted later. val taskEndReason = ExceptionFailure(null, null, null, null, None) - sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null)) + post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null)) assert(maxNumExecutorsNeeded(manager) === 1) } @@ -942,7 +942,7 @@ class ExecutorAllocationManagerSuite // Allocation manager is reset when adding executor requests are sent without reporting back // executor added. - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 10))) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 2) @@ -957,7 +957,7 @@ class ExecutorAllocationManagerSuite assert(executorIds(manager) === Set.empty) // Allocation manager is reset when executors are added. - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 10))) addExecutors(manager) addExecutors(manager) @@ -1045,6 +1045,11 @@ class ExecutorAllocationManagerSuite sc } + private def post(livelistener: LiveListenerBus, ev: SparkListenerEvent): Unit = { + livelistener.post(ev) + livelistener.waitUntilEmpty(200) + } + } /** diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 18da8c18939ed..8253aeb024380 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -388,6 +388,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.update.interval", "1s") .set("spark.eventLog.enabled", "true") .set("spark.history.cache.window", "250ms") + .set("spark.eventLog.testing", "true") .remove("spark.testing") val provider = new FsHistoryProvider(myConf) val securityManager = HistoryServer.createSecurityManager(myConf) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index a136d69b36d6c..4976c9f9be60b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -23,6 +23,7 @@ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfterEach import org.scalatest.mockito.MockitoSugar +import scala.collection.mutable import org.apache.spark._ import org.apache.spark.internal.config @@ -42,7 +43,6 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M conf = new SparkConf().setAppName("test").setMaster("local") .set(config.BLACKLIST_ENABLED.key, "true") scheduler = mockTaskSchedWithConf(conf) - clock.setTime(0) listenerBusMock = mock[LiveListenerBus] diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 0afd07b851cf9..78b239df0af70 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -164,9 +164,12 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite eventLogger.start() listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) - listenerBus.addListener(eventLogger) - listenerBus.postToAll(applicationStart) - listenerBus.postToAll(applicationEnd) + listenerBus.addProcessor( + ev => eventLogger.log(ev), + "eventLoggerListener", + Some(EventLoggingListener.EVENT_FILTER)) + listenerBus.post(applicationStart) + listenerBus.post(applicationEnd) listenerBus.stop() eventLogger.stop() diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index d17e3864854a8..18c4c743d37a6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ import org.scalatest.BeforeAndAfter +import scala.collection.mutable import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil @@ -61,7 +62,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val logData = fileSystem.open(logFilePath) - val eventMonster = new EventMonster(conf) + val eventMonster = new EventMonster() try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) @@ -69,9 +70,9 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp } finally { logData.close() } - assert(eventMonster.loggedEvents.size === 2) - assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart)) - assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd)) + assert(eventMonster.events.size === 2) + assert(eventMonster.events(0) === applicationStart) + assert(eventMonster.events(1) === applicationEnd) } /** @@ -107,7 +108,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val replayer = new ReplayListenerBus() - val eventMonster = new EventMonster(conf) + val eventMonster = new EventMonster() replayer.addListener(eventMonster) // Verify the replay returns the events given the input maybe truncated. @@ -115,7 +116,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp Utils.tryWithResource(new EarlyEOFInputStream(logData, buffered.size - 10)) { failingStream => replayer.replay(failingStream, logFilePath.toString, true) - assert(eventMonster.loggedEvents.size === 1) + assert(eventMonster.events.size === 1) assert(failingStream.didFail) } @@ -177,7 +178,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // Replay events val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem) - val eventMonster = new EventMonster(conf) + val eventMonster = new EventMonster() try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) @@ -189,11 +190,11 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // Verify the same events are replayed in the same order assert(sc.eventLogger.isDefined) val originalEvents = sc.eventLogger.get.loggedEvents - val replayedEvents = eventMonster.loggedEvents + val replayedEvents = eventMonster.events originalEvents.zip(replayedEvents).foreach { case (e1, e2) => // Don't compare the JSON here because accumulators in StageInfo may be out of order JsonProtocolSuite.assertEquals( - JsonProtocol.sparkEventFromJson(e1), JsonProtocol.sparkEventFromJson(e2)) + JsonProtocol.sparkEventFromJson(e1), e2) } } @@ -209,11 +210,62 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp * This child listener inherits only the event buffering functionality, but does not actually * log the events. */ - private class EventMonster(conf: SparkConf) - extends EventLoggingListener("test", None, new URI("testdir"), conf) { + private class EventMonster() extends SparkListener { + val events: mutable.Buffer[SparkListenerEvent] = mutable.Buffer() - override def start() { } + def addEvent(ev: SparkListenerEvent): Unit = events.append(ev) + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = addEvent(event) + + override def onTaskStart(event: SparkListenerTaskStart): Unit = addEvent(event) + + override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = + addEvent(event) + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = addEvent(event) + + override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = + addEvent(event) + + // Events that trigger a flush + override def onStageCompleted(event: SparkListenerStageCompleted): Unit = addEvent(event) + + override def onJobStart(event: SparkListenerJobStart): Unit = addEvent(event) + + override def onJobEnd(event: SparkListenerJobEnd): Unit = addEvent(event) + + override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = + addEvent(event) + + override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = + addEvent(event) + + override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = addEvent(event) + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = + addEvent(event) + + override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = addEvent(event) + override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = addEvent(event) + + override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = addEvent(event) + + override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = + addEvent(event) + + override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = + addEvent(event) + + override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = addEvent(event) + + override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = + addEvent(event) + + override def onOtherEvent(event: SparkListenerEvent): Unit = { + if (event.logEvent) { + addEvent(event) + } + } } /* diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 995df1dd52010..01c3feb28d43c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -63,38 +63,34 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val bus = new LiveListenerBus(conf) bus.addListener(counter) - // Metrics are initially empty. - assert(bus.metrics.numEventsPosted.getCount === 0) - assert(bus.metrics.numDroppedEvents.getCount === 0) - assert(bus.metrics.queueSize.getValue === 0) - assert(bus.metrics.eventProcessingTime.getCount === 0) + val metrics = bus.metricsFromMainQueue._1 + assert(metrics.numEventsPosted.getCount === 0) + assert(metrics.numDroppedEvents.getCount === 0) + assert(metrics.queueSize.getValue === 0) // Post five events: (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } // Five messages should be marked as received and queued, but no messages should be posted to // listeners yet because the the listener bus hasn't been started. - assert(bus.metrics.numEventsPosted.getCount === 5) - assert(bus.metrics.queueSize.getValue === 5) + assert(metrics.numEventsPosted.getCount === 5) + assert(metrics.queueSize.getValue === 5) assert(counter.count === 0) // Starting listener bus should flush all buffered events bus.start(mockSparkContext, mockMetricsSystem) - Mockito.verify(mockMetricsSystem).registerSource(bus.metrics) + Mockito.verify(mockMetricsSystem).registerSource(metrics) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(counter.count === 5) - assert(bus.metrics.queueSize.getValue === 0) - assert(bus.metrics.eventProcessingTime.getCount === 5) + assert(metrics.queueSize.getValue === 0) + assert(bus.metricsFromMainQueue._2.get(counter).flatten.get.getCount === 5) // After listener bus has stopped, posting events should not increment counter bus.stop() (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } assert(counter.count === 5) - assert(bus.metrics.numEventsPosted.getCount === 5) + assert(metrics.numEventsPosted.getCount === 5) - // Make sure per-listener-class timers were created: - assert(bus.metrics.getTimerForListenerClass( - classOf[BasicJobCounter].asSubclass(classOf[SparkListenerInterface])).get.getCount == 5) // Listener bus must not be started twice intercept[IllegalStateException] { @@ -177,22 +173,23 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match bus.start(mockSparkContext, mockMetricsSystem) + val metrics = bus.metricsFromMainQueue._1 // Post a message to the listener bus and wait for processing to begin: bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) listenerStarted.acquire() - assert(bus.metrics.queueSize.getValue === 0) - assert(bus.metrics.numDroppedEvents.getCount === 0) + assert(metrics.queueSize.getValue === 1) + assert(metrics.numDroppedEvents.getCount === 0) // If we post an additional message then it should remain in the queue because the listener is // busy processing the first event: bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - assert(bus.metrics.queueSize.getValue === 1) - assert(bus.metrics.numDroppedEvents.getCount === 0) + assert(metrics.queueSize.getValue === 2) + assert(metrics.numDroppedEvents.getCount === 0) // The queue is now full, so any additional events posted to the listener will be dropped: bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - assert(bus.metrics.queueSize.getValue === 1) - assert(bus.metrics.numDroppedEvents.getCount === 1) + assert(metrics.queueSize.getValue === 2) + assert(metrics.numDroppedEvents.getCount === 1) // Allow the the remaining events to be processed so we can stop the listener bus: @@ -442,10 +439,10 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val conf = new SparkConf().setMaster("local").setAppName("test") .set("spark.extraListeners", listeners.map(_.getName).mkString(",")) sc = new SparkContext(conf) - sc.listenerBus.listeners.asScala.count(_.isInstanceOf[BasicJobCounter]) should be (1) - sc.listenerBus.listeners.asScala + sc.listenerBus.listeners.count(_.isInstanceOf[BasicJobCounter]) should be (1) + sc.listenerBus.listeners .count(_.isInstanceOf[ListenerThatAcceptsSparkConf]) should be (1) - sc.listenerBus.listeners.asScala + sc.listenerBus.listeners .count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be (1) } @@ -517,7 +514,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match */ private class BasicJobCounter extends SparkListener { var count = 0 - override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1 + override def onJobEnd(job: SparkListenerJobEnd): Unit = { + count += 1 + } } /** diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 1cb52593e7060..1f0884b308a4e 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.storage._ * Test various functionality in the StorageListener that supports the StorageTab. */ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { - private var bus: LiveListenerBus = _ + private var bus: SparkListenerBus = _ private var storageStatusListener: StorageStatusListener = _ private var storageListener: StorageListener = _ private val memAndDisk = StorageLevel.MEMORY_AND_DISK @@ -43,7 +43,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { before { val conf = new SparkConf() - bus = new LiveListenerBus(conf) + bus = new SparkListenerBus() { } storageStatusListener = new StorageStatusListener(conf) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 5fb0bd057d0f1..ffa33b091968e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -27,7 +27,7 @@ import org.apache.spark.util.ListenerBus * unwrap them as StreamingListenerEvent and dispatch them to StreamingListeners. */ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) - extends SparkListener with ListenerBus[StreamingListener, StreamingListenerEvent] { + extends ListenerBus[StreamingListener, StreamingListenerEvent] { /** * Post a StreamingListenerEvent to the Spark listener bus asynchronously. This event will be @@ -37,14 +37,6 @@ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) sparkListenerBus.post(new WrappedStreamingListenerEvent(event)) } - override def onOtherEvent(event: SparkListenerEvent): Unit = { - event match { - case WrappedStreamingListenerEvent(e) => - postToAll(e) - case _ => - } - } - protected override def doPostEvent( listener: StreamingListener, event: StreamingListenerEvent): Unit = { @@ -76,7 +68,11 @@ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) * forward them to StreamingListeners. */ def start(): Unit = { - sparkListenerBus.addListener(this) // for getting callbacks on spark events + sparkListenerBus.addProcessor( + ev => postToAll(ev.asInstanceOf[WrappedStreamingListenerEvent].streamingListenerEvent), + "streaming", + Some(ev => ev.isInstanceOf[WrappedStreamingListenerEvent]) + ) } /** @@ -84,7 +80,7 @@ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) * events after that. */ def stop(): Unit = { - sparkListenerBus.removeListener(this) + sparkListenerBus.removeProcessor("streaming") } /**