Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager(
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
listenerBus.addListener(listener, true)

val scheduleTask = new Runnable() {
override def run(): Unit = {
Expand Down
22 changes: 15 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.bus.BusQueue.GroupOfListener
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.storage._
Expand Down Expand Up @@ -522,7 +523,10 @@ class SparkContext(config: SparkConf) extends Logging {
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
listenerBus.addProcessor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a hard time finding the declaration of this method. I can't find it in your code nor in the existing master branch. Can you link to it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In LiveListenerBus.scala line 86

ev => logger.log(ev),
"eventLoggerListener",
Some(EventLoggingListener.EVENT_FILTER))
Some(logger)
} else {
None
Expand Down Expand Up @@ -2349,13 +2353,12 @@ class SparkContext(config: SparkConf) extends Logging {
try {
val listenerClassNames: Seq[String] =
conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "")
for (className <- listenerClassNames) {
// Use reflection to find the right constructor
val extraListeners = listenerClassNames.map { className =>
val constructors = {
val listenerClass = Utils.classForName(className)
listenerClass
.getConstructors
.asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]]
.getConstructors
.asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]]
}
val constructorTakingSparkConf = constructors.find { c =>
c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
Expand All @@ -2378,8 +2381,13 @@ class SparkContext(config: SparkConf) extends Logging {
" parameter from breaking Spark's ability to find a valid constructor.")
}
}
listenerBus.addListener(listener)
logInfo(s"Registered listener $className")
logInfo(s"listener $className created")
listener
}
if (extraListeners.nonEmpty) {
val group = GroupOfListener(extraListeners, "extraListeners")
listenerBus.addListener(group, true)
logInfo("extra-listeners registered")
}
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,6 @@ package object config {
.checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
.createWithDefault(10000)

private[spark] val LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED =
ConfigBuilder("spark.scheduler.listenerbus.metrics.maxListenerClassesTimed")
.internal()
.intConf
.createWithDefault(128)

// This property sets the root namespace for metrics reporting
private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ private[spark] class EventLoggingListener(
appAttemptId : Option[String],
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {
hadoopConf: Configuration) extends Logging {

import EventLoggingListener._

Expand Down Expand Up @@ -90,6 +89,8 @@ private[spark] class EventLoggingListener(
// Visible for tests only.
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)

private var nbMessageProcessed = 0

/**
* Creates the log file in the configured log directory.
*/
Expand Down Expand Up @@ -134,97 +135,38 @@ private[spark] class EventLoggingListener(
}

/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
private def logEvent(event: SparkListenerEvent) {
val eventJson = JsonProtocol.sparkEventToJson(event)
// scalastyle:off println
writer.foreach(_.println(compact(render(eventJson))))
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
hadoopDataStream.foreach(ds => ds.getWrappedStream match {
case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH))
case _ => ds.hflush()
})
}
if (testing) {
loggedEvents += eventJson
flush()
}
}

// Events that do not trigger a flush
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event)

override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event)

override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = logEvent(event)

override def onTaskEnd(event: SparkListenerTaskEnd): Unit = logEvent(event)

override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
logEvent(redactEvent(event))
}

// Events that trigger a flush
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
logEvent(event, flushLogger = true)
private def flush(): Unit = {
writer.foreach(_.flush())
hadoopDataStream.foreach(ds => ds.getWrappedStream match {
case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH))
case _ => ds.hflush()
})
}

override def onJobStart(event: SparkListenerJobStart): Unit = logEvent(event, flushLogger = true)

override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, flushLogger = true)

override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
logEvent(event, flushLogger = true)
}

override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = {
logEvent(event, flushLogger = true)
}

override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
logEvent(event, flushLogger = true)
}

override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
logEvent(event, flushLogger = true)
}

override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
logEvent(event, flushLogger = true)
}
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
logEvent(event, flushLogger = true)
}

override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
logEvent(event, flushLogger = true)
}

override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = {
logEvent(event, flushLogger = true)
}

override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = {
logEvent(event, flushLogger = true)
}

override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = {
logEvent(event, flushLogger = true)
}

override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = {
logEvent(event, flushLogger = true)
}

// No-op because logging every update would be overkill
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}

// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }

override def onOtherEvent(event: SparkListenerEvent): Unit = {
def log(event: SparkListenerEvent): Unit = {
if (event.logEvent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're adding an event filter, you could perform this check there...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep the current behavior, it is not simple to put this filtering (if (event.logEvent)) in the event filter. Indeed I want to perform it only if the type of the event is not a "basic" type. It would imply to complexify a lot the EventFilter, which acts here as a "pre-filter" (discard only part of the event that we do not want to log)

logEvent(event, flushLogger = true)
val toLog = event match {
case update: SparkListenerEnvironmentUpdate =>
redactEvent(update)
case _ => event
}
logEvent(toLog)
nbMessageProcessed = nbMessageProcessed + 1
if (nbMessageProcessed >= FLUSH_FREQUENCY) {
flush()
nbMessageProcessed = 0
}
}
}

Expand Down Expand Up @@ -278,6 +220,12 @@ private[spark] object EventLoggingListener extends Logging {
val IN_PROGRESS = ".inprogress"
val DEFAULT_LOG_DIR = "/tmp/spark-events"

private val FLUSH_FREQUENCY = 200

val EVENT_FILTER: SparkListenerEvent => Boolean =
ev => !(ev.isInstanceOf[SparkListenerBlockUpdated] ||
ev.isInstanceOf[SparkListenerExecutorMetricsUpdate])

private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)

// A cache for compression codecs to avoid creating the same codec many times
Expand Down
Loading