Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2245,6 +2245,9 @@ object SparkContext extends Logging {
if (activeContext.get() == null) {
setActiveContext(new SparkContext(config), allowMultipleContexts = false)
}
if (config.getAll.nonEmpty) {
logWarning("Use an existing SparkContext, some configuration may not take effect.")
}
activeContext.get()
}
}
Expand Down
14 changes: 12 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ object SparkSession {
/**
* Builder for [[SparkSession]].
*/
class Builder {
class Builder extends Logging {

private[this] val options = new scala.collection.mutable.HashMap[String, String]

Expand Down Expand Up @@ -750,6 +750,9 @@ object SparkSession {
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.conf.set(k, v) }
if (options.nonEmpty) {
logWarning("Use an existing SparkSession, some configuration may not take effect.")
}
return session
}

Expand All @@ -759,6 +762,9 @@ object SparkSession {
session = defaultSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.conf.set(k, v) }
if (options.nonEmpty) {
logWarning("Use an existing SparkSession, some configuration may not take effect.")
}
return session
}

Expand All @@ -771,7 +777,11 @@ object SparkSession {

val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }
SparkContext.getOrCreate(sparkConf)
val sc = SparkContext.getOrCreate(sparkConf)
// maybe this is an existing SparkContext, update its SparkConf which maybe used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can check whether the sc was pre-existing, right ?

In that case the foreach below is not needed.

// by SparkSession
options.foreach { case (k, v) => sc.conf.set(k, v) }
sc
}
session = new SparkSession(sparkContext)
options.foreach { case (k, v) => session.conf.set(k, v) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql

import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}

/**
* Test cases for the builder pattern of [[SparkSession]].
Expand Down Expand Up @@ -90,4 +90,16 @@ class SparkSessionBuilderSuite extends SparkFunSuite {
assert(newSession != activeSession)
newSession.stop()
}

test("create SparkContext first then SparkSession") {
sparkContext.stop()
val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1")
val sparkContext2 = new SparkContext(conf)
val session = SparkSession.builder().config("key2", "value2").getOrCreate()
assert(session.conf.get("key1") == "value1")
assert(session.conf.get("key2") == "value2")
assert(session.sparkContext.conf.get("key1") == "value1")
assert(session.sparkContext.conf.get("key2") == "value2")
session.stop()
}
}