Skip to content

Commit d0d276d

Browse files
committed
Move code into setupAndStartListenerBus() method
1 parent b22b379 commit d0d276d

File tree

2 files changed

+50
-38
lines changed

2 files changed

+50
-38
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -379,44 +379,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
379379
}
380380
executorAllocationManager.foreach(_.start())
381381

382-
// Use reflection to instantiate listeners specified via the `spark.extraListeners` configuration
383-
// or the SPARK_EXTRA_LISTENERS environment variable
384-
try {
385-
val listenerClassNames: Seq[String] = {
386-
val fromSparkConf = conf.get("spark.extraListeners", "").split(',')
387-
val fromEnvVar = Option(conf.getenv("SPARK_EXTRA_LISTENERS")).getOrElse("").split(',')
388-
(fromSparkConf ++ fromEnvVar).map(_.trim).filter(_ != "")
389-
}
390-
for (className <- listenerClassNames) {
391-
val listenerClass = Class.forName(className).asInstanceOf[Class[_ <: SparkListener]]
392-
val listener = try {
393-
listenerClass.getConstructor(classOf[SparkConf]).newInstance(conf)
394-
} catch {
395-
case e: NoSuchMethodException =>
396-
try {
397-
listenerClass.newInstance()
398-
} catch {
399-
case e: NoSuchMethodException =>
400-
throw new SparkException(
401-
s"$listenerClass did not have a zero-argument constructor or a" +
402-
" single-argument constructor that accepts SparkConf (is it a nested Scala class?)")
403-
}
404-
}
405-
listenerBus.addListener(listener)
406-
logInfo(s"Registered listener $listenerClass")
407-
}
408-
} catch {
409-
case e: Exception =>
410-
try {
411-
stop()
412-
} finally {
413-
throw new SparkException(s"Exception when registering SparkListener", e)
414-
}
415-
}
416-
417-
// At this point, all relevant SparkListeners have been registered, so begin releasing events
418-
listenerBus.start()
419-
420382
private[spark] val cleaner: Option[ContextCleaner] = {
421383
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
422384
Some(new ContextCleaner(this))
@@ -426,6 +388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
426388
}
427389
cleaner.foreach(_.start())
428390

391+
setupAndStartListenerBus()
429392
postEnvironmentUpdate()
430393
postApplicationStart()
431394

@@ -1521,6 +1484,53 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15211484
/** Register a new RDD, returning its RDD ID */
15221485
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
15231486

1487+
/**
1488+
* Registers listeners specified in spark.extraListeners, then starts the listener bus.
1489+
* This should be called after all internal listeners have been registered with the listener bus
1490+
* (e.g. after the web UI and event logging listeners have been registered).
1491+
*/
1492+
private def setupAndStartListenerBus(): Unit = {
1493+
if (listenerBus.hasBeenStarted) {
1494+
throw new IllegalStateException("listener bus has already been started")
1495+
}
1496+
// Use reflection to instantiate listeners specified via the `spark.extraListeners`
1497+
// configuration or the SPARK_EXTRA_LISTENERS environment variable
1498+
try {
1499+
val listenerClassNames: Seq[String] = {
1500+
val fromSparkConf = conf.get("spark.extraListeners", "").split(',')
1501+
val fromEnvVar = Option(conf.getenv("SPARK_EXTRA_LISTENERS")).getOrElse("").split(',')
1502+
(fromSparkConf ++ fromEnvVar).map(_.trim).filter(_ != "")
1503+
}
1504+
for (className <- listenerClassNames) {
1505+
val listenerClass = Class.forName(className).asInstanceOf[Class[_ <: SparkListener]]
1506+
val listener = try {
1507+
listenerClass.getConstructor(classOf[SparkConf]).newInstance(conf)
1508+
} catch {
1509+
case e: NoSuchMethodException =>
1510+
try {
1511+
listenerClass.newInstance()
1512+
} catch {
1513+
case e: NoSuchMethodException =>
1514+
throw new SparkException(
1515+
s"$listenerClass did not have a zero-argument constructor or a" +
1516+
" single-argument constructor that accepts SparkConf (is it a nested Scala class?)")
1517+
}
1518+
}
1519+
listenerBus.addListener(listener)
1520+
logInfo(s"Registered listener $listenerClass")
1521+
}
1522+
} catch {
1523+
case e: Exception =>
1524+
try {
1525+
stop()
1526+
} finally {
1527+
throw new SparkException(s"Exception when registering SparkListener", e)
1528+
}
1529+
}
1530+
1531+
listenerBus.start()
1532+
}
1533+
15241534
/** Post the application start event */
15251535
private def postApplicationStart() {
15261536
// Note: this code assumes that the task scheduler has been initialized and has contacted

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
3838
private var queueFullErrorMessageLogged = false
3939
private var started = false
4040

41+
private[spark] def hasBeenStarted: Boolean = started
42+
4143
// A counter that represents the number of events produced and consumed in the queue
4244
private val eventLock = new Semaphore(0)
4345

0 commit comments

Comments
 (0)