Skip to content

Commit 9c0d8f1

Browse files
committed
Revert "[SPARK-5190] Allow SparkListeners to be registered before SparkContext starts."
This reverts commit 163ba19.
1 parent 217ecc0 commit 9c0d8f1

File tree

3 files changed

+10
-79
lines changed

3 files changed

+10
-79
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,8 @@ import org.apache.spark.util._
6363
*
6464
* @param config a Spark Config object describing the application configuration. Any settings in
6565
* this config overrides the default configs as well as system properties.
66-
* @param sparkListeners an optional list of [[SparkListener]]s to register.
6766
*/
68-
class SparkContext(
69-
config: SparkConf,
70-
sparkListeners: Seq[SparkListener] = Nil
71-
) extends Logging with ExecutorAllocationClient {
67+
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
7268

7369
// The call site where this SparkContext was constructed.
7470
private val creationSite: CallSite = Utils.getCallSite()
@@ -93,15 +89,7 @@ class SparkContext(
9389
* Create a SparkContext that loads settings from system properties (for instance, when
9490
* launching with ./bin/spark-submit).
9591
*/
96-
def this() = this(new SparkConf(), Nil)
97-
98-
/**
99-
* Alternative constructor for binary compatibility.
100-
*
101-
* @param config a Spark Config object describing the application configuration. Any settings in
102-
* this config overrides the default configs as well as system properties.
103-
*/
104-
def this(config: SparkConf) = this(config, Nil)
92+
def this() = this(new SparkConf())
10593

10694
/**
10795
* :: DeveloperApi ::
@@ -136,40 +124,19 @@ class SparkContext(
136124
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
137125
* system or HDFS, HTTP, HTTPS, or FTP URLs.
138126
* @param environment Environment variables to set on worker nodes.
139-
* @param sparkListeners an optional list of [[SparkListener]]s to register.
140127
*/
141128
def this(
142129
master: String,
143130
appName: String,
144131
sparkHome: String = null,
145132
jars: Seq[String] = Nil,
146133
environment: Map[String, String] = Map(),
147-
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map(),
148-
sparkListeners: Seq[SparkListener] = Nil) = {
149-
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment),
150-
sparkListeners)
134+
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
135+
{
136+
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
151137
this.preferredNodeLocationData = preferredNodeLocationData
152138
}
153139

154-
/**
155-
* Alternative constructor for binary compatibility.
156-
*
157-
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
158-
* @param appName A name for your application, to display on the cluster web UI.
159-
* @param sparkHome Location where Spark is installed on cluster nodes.
160-
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
161-
* system or HDFS, HTTP, HTTPS, or FTP URLs.
162-
* @param environment Environment variables to set on worker nodes.
163-
*/
164-
def this(
165-
master: String,
166-
appName: String,
167-
sparkHome: String,
168-
jars: Seq[String],
169-
environment: Map[String, String],
170-
preferredNodeLocationData: Map[String, Set[SplitInfo]]) =
171-
this(master, appName, sparkHome, jars, environment, preferredNodeLocationData, Nil)
172-
173140
// NOTE: The below constructors could be consolidated using default arguments. Due to
174141
// Scala bug SI-8479, however, this causes the compile step to fail when generating docs.
175142
// Until we have a good workaround for that bug the constructors remain broken out.
@@ -181,7 +148,7 @@ class SparkContext(
181148
* @param appName A name for your application, to display on the cluster web UI.
182149
*/
183150
private[spark] def this(master: String, appName: String) =
184-
this(master, appName, null, Nil, Map(), Map(), Nil)
151+
this(master, appName, null, Nil, Map(), Map())
185152

186153
/**
187154
* Alternative constructor that allows setting common Spark properties directly
@@ -191,7 +158,7 @@ class SparkContext(
191158
* @param sparkHome Location where Spark is installed on cluster nodes.
192159
*/
193160
private[spark] def this(master: String, appName: String, sparkHome: String) =
194-
this(master, appName, sparkHome, Nil, Map(), Map(), Nil)
161+
this(master, appName, sparkHome, Nil, Map(), Map())
195162

196163
/**
197164
* Alternative constructor that allows setting common Spark properties directly
@@ -203,7 +170,7 @@ class SparkContext(
203170
* system or HDFS, HTTP, HTTPS, or FTP URLs.
204171
*/
205172
private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
206-
this(master, appName, sparkHome, jars, Map(), Map(), Nil)
173+
this(master, appName, sparkHome, jars, Map(), Map())
207174

208175
// log out Spark Version in Spark driver log
209176
logInfo(s"Running Spark version $SPARK_VERSION")
@@ -412,8 +379,6 @@ class SparkContext(
412379
}
413380
executorAllocationManager.foreach(_.start())
414381

415-
sparkListeners.foreach(listenerBus.addListener)
416-
417382
// At this point, all relevant SparkListeners have been registered, so begin releasing events
418383
listenerBus.start()
419384

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import org.apache.spark.annotation.Experimental
3838
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3939
import org.apache.spark.broadcast.Broadcast
4040
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
41-
import org.apache.spark.scheduler.SparkListener
4241

4342
/**
4443
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
@@ -105,21 +104,7 @@ class JavaSparkContext(val sc: SparkContext)
105104
*/
106105
def this(master: String, appName: String, sparkHome: String, jars: Array[String],
107106
environment: JMap[String, String]) =
108-
this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map(), Nil))
109-
110-
/**
111-
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
112-
* @param appName A name for your application, to display on the cluster web UI
113-
* @param sparkHome The SPARK_HOME directory on the slave nodes
114-
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
115-
* system or HDFS, HTTP, HTTPS, or FTP URLs.
116-
* @param environment Environment variables to set on worker nodes
117-
* @param sparkListeners an optional list of [[SparkListener]]s to register.
118-
*/
119-
def this(master: String, appName: String, sparkHome: String, jars: Array[String],
120-
environment: JMap[String, String], sparkListeners: Array[SparkListener]) =
121-
this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map(),
122-
sparkListeners))
107+
this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map()))
123108

124109
private[spark] val env = sc.env
125110

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,9 @@
1717

1818
package org.apache.spark
1919

20-
import org.apache.hadoop.io.BytesWritable
2120
import org.scalatest.FunSuite
22-
import org.scalatest.concurrent.Eventually._
23-
24-
import scala.concurrent.duration._
25-
import scala.language.{implicitConversions, postfixOps}
2621

27-
import org.apache.spark.scheduler.{SparkListener, SparkListenerEnvironmentUpdate}
22+
import org.apache.hadoop.io.BytesWritable
2823

2924
class SparkContextSuite extends FunSuite with LocalSparkContext {
3025

@@ -77,18 +72,4 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
7772
val byteArray2 = converter.convert(bytesWritable)
7873
assert(byteArray2.length === 0)
7974
}
80-
81-
test("SparkListeners can be registered via the SparkContext constructor (SPARK-5190)") {
82-
@volatile var gotEnvironmentUpdate: Boolean = false
83-
val listener = new SparkListener {
84-
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = {
85-
gotEnvironmentUpdate = true
86-
}
87-
}
88-
val conf = new SparkConf().setAppName("test").setMaster("local")
89-
sc = new SparkContext(conf, Seq(listener))
90-
eventually(timeout(10 seconds)) {
91-
assert(gotEnvironmentUpdate === true)
92-
}
93-
}
9475
}

0 commit comments

Comments
 (0)