Skip to content

Commit 9a7ce70

Browse files
JoshRosenpwendell
authored andcommitted
[SPARK-5411] Allow SparkListeners to be specified in SparkConf and loaded when creating SparkContext
This patch introduces a new configuration option, `spark.extraListeners`, that allows SparkListeners to be specified in SparkConf and registered before the SparkContext is initialized. From the configuration documentation: > A comma-separated list of classes that implement SparkListener; when initializing SparkContext, instances of these classes will be created and registered with Spark's listener bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor will be called; otherwise, a zero-argument constructor will be called. If no valid constructor can be found, the SparkContext creation will fail with an exception. This motivation for this patch is to allow monitoring code to be easily injected into existing Spark programs without having to modify those programs' code. Author: Josh Rosen <[email protected]> Closes apache#4111 from JoshRosen/SPARK-5190-register-sparklistener-in-sc-constructor and squashes the following commits: 8370839 [Josh Rosen] Two minor fixes after merging with master 6e0122c [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-5190-register-sparklistener-in-sc-constructor 1a5b9a0 [Josh Rosen] Remove SPARK_EXTRA_LISTENERS environment variable. 2daff9b [Josh Rosen] Add a couple of explanatory comments for SPARK_EXTRA_LISTENERS. b9973da [Josh Rosen] Add test to ensure that conf and env var settings are merged, not overriden. d6f3113 [Josh Rosen] Use getConstructors() instead of try-catch to find right constructor. d0d276d [Josh Rosen] Move code into setupAndStartListenerBus() method b22b379 [Josh Rosen] Instantiate SparkListeners from classes listed in configurations. 9c0d8f1 [Josh Rosen] Revert "[SPARK-5190] Allow SparkListeners to be registered before SparkContext starts." 217ecc0 [Josh Rosen] Revert "Add addSparkListener to JavaSparkContext" 25988f3 [Josh Rosen] Add addSparkListener to JavaSparkContext 163ba19 [Josh Rosen] [SPARK-5190] Allow SparkListeners to be registered before SparkContext starts.
1 parent dc101b0 commit 9a7ce70

File tree

4 files changed

+106
-21
lines changed

4 files changed

+106
-21
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import scala.language.implicitConversions
2121

2222
import java.io._
23+
import java.lang.reflect.Constructor
2324
import java.net.URI
2425
import java.util.{Arrays, Properties, UUID}
2526
import java.util.concurrent.atomic.AtomicInteger
@@ -387,9 +388,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
387388
}
388389
executorAllocationManager.foreach(_.start())
389390

390-
// At this point, all relevant SparkListeners have been registered, so begin releasing events
391-
listenerBus.start()
392-
393391
private[spark] val cleaner: Option[ContextCleaner] = {
394392
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
395393
Some(new ContextCleaner(this))
@@ -399,6 +397,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
399397
}
400398
cleaner.foreach(_.start())
401399

400+
setupAndStartListenerBus()
402401
postEnvironmentUpdate()
403402
postApplicationStart()
404403

@@ -1563,6 +1562,58 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15631562
/** Register a new RDD, returning its RDD ID */
15641563
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
15651564

1565+
/**
1566+
* Registers listeners specified in spark.extraListeners, then starts the listener bus.
1567+
* This should be called after all internal listeners have been registered with the listener bus
1568+
* (e.g. after the web UI and event logging listeners have been registered).
1569+
*/
1570+
private def setupAndStartListenerBus(): Unit = {
1571+
// Use reflection to instantiate listeners specified via `spark.extraListeners`
1572+
try {
1573+
val listenerClassNames: Seq[String] =
1574+
conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "")
1575+
for (className <- listenerClassNames) {
1576+
// Use reflection to find the right constructor
1577+
val constructors = {
1578+
val listenerClass = Class.forName(className)
1579+
listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]]
1580+
}
1581+
val constructorTakingSparkConf = constructors.find { c =>
1582+
c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
1583+
}
1584+
lazy val zeroArgumentConstructor = constructors.find { c =>
1585+
c.getParameterTypes.isEmpty
1586+
}
1587+
val listener: SparkListener = {
1588+
if (constructorTakingSparkConf.isDefined) {
1589+
constructorTakingSparkConf.get.newInstance(conf)
1590+
} else if (zeroArgumentConstructor.isDefined) {
1591+
zeroArgumentConstructor.get.newInstance()
1592+
} else {
1593+
throw new SparkException(
1594+
s"$className did not have a zero-argument constructor or a" +
1595+
" single-argument constructor that accepts SparkConf. Note: if the class is" +
1596+
" defined inside of another Scala class, then its constructors may accept an" +
1597+
" implicit parameter that references the enclosing class; in this case, you must" +
1598+
" define the listener as a top-level class in order to prevent this extra" +
1599+
" parameter from breaking Spark's ability to find a valid constructor.")
1600+
}
1601+
}
1602+
listenerBus.addListener(listener)
1603+
logInfo(s"Registered listener $className")
1604+
}
1605+
} catch {
1606+
case e: Exception =>
1607+
try {
1608+
stop()
1609+
} finally {
1610+
throw new SparkException(s"Exception when registering SparkListener", e)
1611+
}
1612+
}
1613+
1614+
listenerBus.start()
1615+
}
1616+
15661617
/** Post the application start event */
15671618
private def postApplicationStart() {
15681619
// Note: this code assumes that the task scheduler has been initialized and has contacted

core/src/main/scala/org/apache/spark/util/ListenerBus.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import org.apache.spark.Logging
2828
*/
2929
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
3030

31-
private val listeners = new CopyOnWriteArrayList[L]
31+
// Marked `private[spark]` for access in tests.
32+
private[spark] val listeners = new CopyOnWriteArrayList[L]
3233

3334
/**
3435
* Add a listener to listen events. This method is thread-safe and can be called in any thread.

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,22 @@ package org.apache.spark.scheduler
2020
import java.util.concurrent.Semaphore
2121

2222
import scala.collection.mutable
23+
import scala.collection.JavaConversions._
2324

24-
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
25-
import org.scalatest.Matchers
25+
import org.scalatest.{FunSuite, Matchers}
2626

27-
import org.apache.spark.{LocalSparkContext, SparkContext}
2827
import org.apache.spark.executor.TaskMetrics
2928
import org.apache.spark.util.ResetSystemProperties
29+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
3030

31-
class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers with BeforeAndAfter
32-
with BeforeAndAfterAll with ResetSystemProperties {
31+
class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
32+
with ResetSystemProperties {
3333

3434
/** Length of time to wait while draining listener events. */
3535
val WAIT_TIMEOUT_MILLIS = 10000
3636

3737
val jobCompletionTime = 1421191296660L
3838

39-
before {
40-
sc = new SparkContext("local", "SparkListenerSuite")
41-
}
42-
4339
test("basic creation and shutdown of LiveListenerBus") {
4440
val counter = new BasicJobCounter
4541
val bus = new LiveListenerBus
@@ -127,6 +123,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
127123
}
128124

129125
test("basic creation of StageInfo") {
126+
sc = new SparkContext("local", "SparkListenerSuite")
130127
val listener = new SaveStageAndTaskInfo
131128
sc.addSparkListener(listener)
132129
val rdd1 = sc.parallelize(1 to 100, 4)
@@ -148,6 +145,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
148145
}
149146

150147
test("basic creation of StageInfo with shuffle") {
148+
sc = new SparkContext("local", "SparkListenerSuite")
151149
val listener = new SaveStageAndTaskInfo
152150
sc.addSparkListener(listener)
153151
val rdd1 = sc.parallelize(1 to 100, 4)
@@ -185,6 +183,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
185183
}
186184

187185
test("StageInfo with fewer tasks than partitions") {
186+
sc = new SparkContext("local", "SparkListenerSuite")
188187
val listener = new SaveStageAndTaskInfo
189188
sc.addSparkListener(listener)
190189
val rdd1 = sc.parallelize(1 to 100, 4)
@@ -201,6 +200,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
201200
}
202201

203202
test("local metrics") {
203+
sc = new SparkContext("local", "SparkListenerSuite")
204204
val listener = new SaveStageAndTaskInfo
205205
sc.addSparkListener(listener)
206206
sc.addSparkListener(new StatsReportListener)
@@ -267,6 +267,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
267267
}
268268

269269
test("onTaskGettingResult() called when result fetched remotely") {
270+
sc = new SparkContext("local", "SparkListenerSuite")
270271
val listener = new SaveTaskEvents
271272
sc.addSparkListener(listener)
272273

@@ -287,6 +288,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
287288
}
288289

289290
test("onTaskGettingResult() not called when result sent directly") {
291+
sc = new SparkContext("local", "SparkListenerSuite")
290292
val listener = new SaveTaskEvents
291293
sc.addSparkListener(listener)
292294

@@ -302,6 +304,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
302304
}
303305

304306
test("onTaskEnd() should be called for all started tasks, even after job has been killed") {
307+
sc = new SparkContext("local", "SparkListenerSuite")
305308
val WAIT_TIMEOUT_MILLIS = 10000
306309
val listener = new SaveTaskEvents
307310
sc.addSparkListener(listener)
@@ -356,21 +359,24 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
356359
assert(jobCounter2.count === 5)
357360
}
358361

362+
test("registering listeners via spark.extraListeners") {
363+
val conf = new SparkConf().setMaster("local").setAppName("test")
364+
.set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," +
365+
classOf[BasicJobCounter].getName)
366+
sc = new SparkContext(conf)
367+
sc.listenerBus.listeners.collect { case x: BasicJobCounter => x}.size should be (1)
368+
sc.listenerBus.listeners.collect {
369+
case x: ListenerThatAcceptsSparkConf => x
370+
}.size should be (1)
371+
}
372+
359373
/**
360374
* Assert that the given list of numbers has an average that is greater than zero.
361375
*/
362376
private def checkNonZeroAvg(m: Traversable[Long], msg: String) {
363377
assert(m.sum / m.size.toDouble > 0.0, msg)
364378
}
365379

366-
/**
367-
* A simple listener that counts the number of jobs observed.
368-
*/
369-
private class BasicJobCounter extends SparkListener {
370-
var count = 0
371-
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
372-
}
373-
374380
/**
375381
* A simple listener that saves all task infos and task metrics.
376382
*/
@@ -423,3 +429,19 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
423429
}
424430

425431
}
432+
433+
// These classes can't be declared inside of the SparkListenerSuite class because we don't want
434+
// their constructors to contain references to SparkListenerSuite:
435+
436+
/**
437+
* A simple listener that counts the number of jobs observed.
438+
*/
439+
private class BasicJobCounter extends SparkListener {
440+
var count = 0
441+
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
442+
}
443+
444+
private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener {
445+
var count = 0
446+
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
447+
}

docs/configuration.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,17 @@ of the most common options to set are:
190190
Logs the effective SparkConf as INFO when a SparkContext is started.
191191
</td>
192192
</tr>
193+
<tr>
194+
<td><code>spark.extraListeners</code></td>
195+
<td>(none)</td>
196+
<td>
197+
A comma-separated list of classes that implement <code>SparkListener</code>; when initializing
198+
SparkContext, instances of these classes will be created and registered with Spark's listener
199+
bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor
200+
will be called; otherwise, a zero-argument constructor will be called. If no valid constructor
201+
can be found, the SparkContext creation will fail with an exception.
202+
</td>
203+
</tr>
193204
</table>
194205

195206
Apart from these, the following properties are also available, and may be useful in some situations:

0 commit comments

Comments
 (0)