Skip to content

Commit a432a2b

Browse files
cloud-fanAndrew Or
authored andcommitted
[SPARK-15116] In REPL we should create SparkSession first and get SparkContext from it
## What changes were proposed in this pull request? see #12873 (comment). The problem is, if we create `SparkContext` first and then call `SparkSession.builder.enableHiveSupport().getOrCreate()`, we will reuse the existing `SparkContext` and the hive flag won't be set. ## How was this patch tested? verified it locally. Author: Wenchen Fan <[email protected]> Closes #12890 from cloud-fan/repl.
1 parent eb019af commit a432a2b

File tree

4 files changed

+26
-43
lines changed

4 files changed

+26
-43
lines changed

repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ class SparkILoop(
10031003

10041004
// NOTE: Must be public for visibility
10051005
@DeveloperApi
1006-
def createSparkContext(): SparkContext = {
1006+
def createSparkSession(): SparkSession = {
10071007
val execUri = System.getenv("SPARK_EXECUTOR_URI")
10081008
val jars = SparkILoop.getAddedJars
10091009
val conf = new SparkConf()
@@ -1019,22 +1019,18 @@ class SparkILoop(
10191019
if (execUri != null) {
10201020
conf.set("spark.executor.uri", execUri)
10211021
}
1022-
sparkContext = new SparkContext(conf)
1023-
logInfo("Created spark context..")
1024-
Signaling.cancelOnInterrupt(sparkContext)
1025-
sparkContext
1026-
}
10271022

1028-
@DeveloperApi
1029-
// TODO: don't duplicate this code
1030-
def createSparkSession(): SparkSession = {
1031-
if (SparkSession.hiveClassesArePresent) {
1023+
val builder = SparkSession.builder.config(conf)
1024+
val sparkSession = if (SparkSession.hiveClassesArePresent) {
10321025
logInfo("Creating Spark session with Hive support")
1033-
SparkSession.builder.enableHiveSupport().getOrCreate()
1026+
builder.enableHiveSupport().getOrCreate()
10341027
} else {
10351028
logInfo("Creating Spark session")
1036-
SparkSession.builder.getOrCreate()
1029+
builder.getOrCreate()
10371030
}
1031+
sparkContext = sparkSession.sparkContext
1032+
Signaling.cancelOnInterrupt(sparkContext)
1033+
sparkSession
10381034
}
10391035

10401036
private def getMaster(): String = {

repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,19 +123,14 @@ private[repl] trait SparkILoopInit {
123123
def initializeSpark() {
124124
intp.beQuietDuring {
125125
command("""
126+
@transient val spark = org.apache.spark.repl.Main.interp.createSparkSession()
126127
@transient val sc = {
127-
val _sc = org.apache.spark.repl.Main.interp.createSparkContext()
128+
val _sc = spark.sparkContext
128129
_sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))
129130
println("Spark context available as 'sc' " +
130131
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
131-
_sc
132-
}
133-
""")
134-
command("""
135-
@transient val spark = {
136-
val _session = org.apache.spark.repl.Main.interp.createSparkSession()
137132
println("Spark session available as 'spark'.")
138-
_session
133+
_sc
139134
}
140135
""")
141136
command("import org.apache.spark.SparkContext._")

repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -71,35 +71,32 @@ object Main extends Logging {
7171
}
7272
}
7373

74-
def createSparkContext(): SparkContext = {
74+
def createSparkSession(): SparkSession = {
7575
val execUri = System.getenv("SPARK_EXECUTOR_URI")
7676
conf.setIfMissing("spark.app.name", "Spark shell")
77-
// SparkContext will detect this configuration and register it with the RpcEnv's
78-
// file server, setting spark.repl.class.uri to the actual URI for executors to
79-
// use. This is sort of ugly but since executors are started as part of SparkContext
80-
// initialization in certain cases, there's an initialization order issue that prevents
81-
// this from being set after SparkContext is instantiated.
82-
.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
77+
// SparkContext will detect this configuration and register it with the RpcEnv's
78+
// file server, setting spark.repl.class.uri to the actual URI for executors to
79+
// use. This is sort of ugly but since executors are started as part of SparkContext
80+
// initialization in certain cases, there's an initialization order issue that prevents
81+
// this from being set after SparkContext is instantiated.
82+
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
8383
if (execUri != null) {
8484
conf.set("spark.executor.uri", execUri)
8585
}
8686
if (System.getenv("SPARK_HOME") != null) {
8787
conf.setSparkHome(System.getenv("SPARK_HOME"))
8888
}
89-
sparkContext = new SparkContext(conf)
90-
logInfo("Created spark context..")
91-
Signaling.cancelOnInterrupt(sparkContext)
92-
sparkContext
93-
}
9489

95-
def createSparkSession(): SparkSession = {
90+
val builder = SparkSession.builder.config(conf)
9691
if (SparkSession.hiveClassesArePresent) {
97-
sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate()
92+
sparkSession = builder.enableHiveSupport().getOrCreate()
9893
logInfo("Created Spark session with Hive support")
9994
} else {
100-
sparkSession = SparkSession.builder.getOrCreate()
95+
sparkSession = builder.getOrCreate()
10196
logInfo("Created Spark session")
10297
}
98+
sparkContext = sparkSession.sparkContext
99+
Signaling.cancelOnInterrupt(sparkContext)
103100
sparkSession
104101
}
105102

repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,14 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
3636
def initializeSpark() {
3737
intp.beQuietDuring {
3838
processLine("""
39+
@transient val spark = org.apache.spark.repl.Main.createSparkSession()
3940
@transient val sc = {
40-
val _sc = org.apache.spark.repl.Main.createSparkContext()
41+
val _sc = spark.sparkContext
4142
_sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))
4243
println("Spark context available as 'sc' " +
4344
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
44-
_sc
45-
}
46-
""")
47-
processLine("""
48-
@transient val spark = {
49-
val _session = org.apache.spark.repl.Main.createSparkSession()
5045
println("Spark session available as 'spark'.")
51-
_session
46+
_sc
5247
}
5348
""")
5449
processLine("import org.apache.spark.SparkContext._")

0 commit comments

Comments
 (0)