Skip to content

Commit 1a5b9a0

Browse files
committed
Remove SPARK_EXTRA_LISTENERS environment variable.
1 parent 2daff9b commit 1a5b9a0

File tree

2 files changed

+6
-44
lines changed

2 files changed

+6
-44
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,19 +1494,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14941494
if (listenerBus.hasBeenStarted) {
14951495
throw new IllegalStateException("listener bus has already been started")
14961496
}
1497-
// Use reflection to instantiate listeners specified via the `spark.extraListeners`
1498-
// configuration or the SPARK_EXTRA_LISTENERS environment variable. The purpose of
1499-
// SPARK_EXTRA_LISTENERS is to allow the execution environment to inject custom listeners
1500-
// without having to worry about them being overridden by user settings in SparkConf.
1497+
// Use reflection to instantiate listeners specified via `spark.extraListeners`
15011498
try {
1502-
val listenerClassNames: Seq[String] = {
1503-
// Merge configurations from both sources
1504-
val fromSparkConf = conf.get("spark.extraListeners", "").split(',')
1505-
val fromEnvVar = Option(conf.getenv("SPARK_EXTRA_LISTENERS")).getOrElse("").split(',')
1506-
// Filter out empty entries, which could occur when overriding environment variables
1507-
// (e.g. `export SPARK_EXTRA_LISTENERS="foo,$SPARK_EXTRA_LISTENERS`)
1508-
(fromSparkConf ++ fromEnvVar).map(_.trim).filter(_ != "")
1509-
}
1499+
val listenerClassNames: Seq[String] =
1500+
conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "")
15101501
for (className <- listenerClassNames) {
15111502
// Use reflection to find the right constructor
15121503
val constructors = {

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,11 @@ import java.util.concurrent.Semaphore
2121

2222
import scala.collection.mutable
2323

24-
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
24+
import org.scalatest.{FunSuite, Matchers}
25+
2526
import org.apache.spark.executor.TaskMetrics
2627
import org.apache.spark.util.ResetSystemProperties
27-
28-
import org.mockito.Mockito._
29-
import org.scalatest.{FunSuite, Matchers}
28+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
3029

3130
class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
3231
with ResetSystemProperties {
@@ -370,34 +369,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
370369
}.size should be (1)
371370
}
372371

373-
test("registering listeners via SPARK_EXTRA_LISTENERS") {
374-
val SPARK_EXTRA_LISTENERS = classOf[ListenerThatAcceptsSparkConf].getName + "," +
375-
classOf[BasicJobCounter].getName
376-
val conf = spy(new SparkConf().setMaster("local").setAppName("test"))
377-
when(conf.getenv("SPARK_EXTRA_LISTENERS")).thenReturn(SPARK_EXTRA_LISTENERS)
378-
when(conf.clone).thenReturn(conf) // so that our mock is still used
379-
sc = new SparkContext(conf)
380-
sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1)
381-
sc.listenerBus.sparkListeners.collect {
382-
case x: ListenerThatAcceptsSparkConf => x
383-
}.size should be (1)
384-
}
385-
386-
test("spark.extraListeners and SPARK_EXTRA_LISTENERS configurations are merged") {
387-
// This test ensures that we don't accidentally change the behavior such that one setting
388-
// overrides the other:
389-
val SPARK_EXTRA_LISTENERS = classOf[ListenerThatAcceptsSparkConf].getName
390-
val conf = spy(new SparkConf().setMaster("local").setAppName("test")
391-
.set("spark.extraListeners", classOf[BasicJobCounter].getName))
392-
when(conf.getenv("SPARK_EXTRA_LISTENERS")).thenReturn(SPARK_EXTRA_LISTENERS)
393-
when(conf.clone).thenReturn(conf) // so that our mock is still used
394-
sc = new SparkContext(conf)
395-
sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1)
396-
sc.listenerBus.sparkListeners.collect {
397-
case x: ListenerThatAcceptsSparkConf => x
398-
}.size should be (1)
399-
}
400-
401372
/**
402373
* Assert that the given list of numbers has an average that is greater than zero.
403374
*/

0 commit comments

Comments
 (0)