Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ class SparkContext(config: SparkConf) extends Logging {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()

// In order to prevent SparkContext from being created in executors.
SparkContext.assertOnDriver()
if (!config.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) {
// In order to prevent SparkContext from being created in executors.
SparkContext.assertOnDriver()
}

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having started construction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1908,4 +1908,11 @@ package object config {
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val ALLOW_SPARK_CONTEXT_IN_EXECUTORS =
ConfigBuilder("spark.driver.allowSparkContextInExecutors")
.doc("If set to true, SparkContext can be created in executors.")
.version("3.0.1")
Copy link
Member Author

@ueshin ueshin Jul 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll submit another PR with the opposite default value against branch-3.0 when this is good to go.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I'll add a migration guide later.

.booleanConf
.createWithDefault(false)
}
9 changes: 9 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,15 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu

assert(error.contains("SparkContext should only be created and accessed on the driver."))
}

test("SPARK-32160: Allow to create SparkContext in executors if the config is set") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))

sc.range(0, 1).foreach { _ =>
new SparkContext(new SparkConf().setAppName("test").setMaster("local")
.set(ALLOW_SPARK_CONTEXT_IN_EXECUTORS, true)).stop()
}
}
}

object SparkContextSuite {
Expand Down
4 changes: 4 additions & 0 deletions docs/core-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ license: |
* Table of contents
{:toc}

## Upgrading from Core 3.0 to 3.1

- In Spark 3.0 and below, `SparkContext` can be created in executors. Since Spark 3.1, an exception will be thrown when creating `SparkContext` in executors. You can allow it by setting the configuration `spark.driver.allowSparkContextInExecutors` when creating `SparkContext` in executors.

## Upgrading from Core 2.4 to 3.0

- The `org.apache.spark.ExecutorPlugin` interface and related configuration has been replaced with
Expand Down
6 changes: 4 additions & 2 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
...
ValueError:...
"""
# In order to prevent SparkContext from being created in executors.
SparkContext._assert_on_driver()
if (conf is None or
conf.get("spark.driver.allowSparkContextInExecutors", "false").lower() != "true"):
# In order to prevent SparkContext from being created in executors.
SparkContext._assert_on_driver()

self._callsite = first_spark_call() or CallSite(None, None, None)
if gateway is not None and gateway.gateway_parameters.auth_token is None:
Expand Down
11 changes: 11 additions & 0 deletions python/pyspark/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,17 @@ def test_disallow_to_create_spark_context_in_executors(self):
self.assertIn("SparkContext should only be created and accessed on the driver.",
str(context.exception))

def test_allow_to_create_spark_context_in_executors(self):
# SPARK-32160: SparkContext can be created in executors if the config is set.

def create_spark_context():
conf = SparkConf().set("spark.driver.allowSparkContextInExecutors", "true")
with SparkContext(conf=conf):
pass

with SparkContext("local-cluster[3, 1, 1024]") as sc:
sc.range(2).foreach(lambda _: create_spark_context())


class ContextTestsWithResources(unittest.TestCase):

Expand Down
12 changes: 8 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ALLOW_SPARK_CONTEXT_IN_EXECUTORS
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalog.Catalog
Expand Down Expand Up @@ -900,7 +901,13 @@ object SparkSession extends Logging {
* @since 2.0.0
*/
def getOrCreate(): SparkSession = synchronized {
assertOnDriver()
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }

if (!sparkConf.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) {
assertOnDriver()
}

// Get the session from current thread's active session.
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
Expand All @@ -919,9 +926,6 @@ object SparkSession extends Logging {

// No active nor global default session. Create a new one.
val sparkContext = userSuppliedContext.getOrElse {
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }

// set a random app name if not given.
if (!sparkConf.contains("spark.app.name")) {
sparkConf.setAppName(java.util.UUID.randomUUID().toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.internal.config.ALLOW_SPARK_CONTEXT_IN_EXECUTORS
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf._
Expand Down Expand Up @@ -257,4 +258,27 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
context.stop()
}
}

test("SPARK-32160: Disallow to create SparkSession in executors") {
val session = SparkSession.builder().master("local-cluster[3, 1, 1024]").getOrCreate()

val error = intercept[SparkException] {
session.range(1).foreach { v =>
SparkSession.builder.master("local").getOrCreate()
()
}
}.getMessage()

assert(error.contains("SparkSession should only be created and accessed on the driver."))
}

test("SPARK-32160: Allow to create SparkSession in executors if the config is set") {
val session = SparkSession.builder().master("local-cluster[3, 1, 1024]").getOrCreate()

session.range(1).foreach { v =>
SparkSession.builder.master("local")
.config(ALLOW_SPARK_CONTEXT_IN_EXECUTORS.key, true).getOrCreate().stop()
()
}
}
}