From 50421567cb90015c659cf566de99fb559214342a Mon Sep 17 00:00:00 2001 From: Ajith Date: Fri, 17 Jan 2020 22:34:31 +0530 Subject: [PATCH 01/11] Copy localproperties to child thread of subquery execution --- .../apache/spark/sql/execution/basicPhysicalOperators.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index e128d59dca6b..27c2528efaa5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.{LongType, StructType} -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} /** Physical plan for Project. */ @@ -749,9 +749,11 @@ case class SubqueryExec(name: String, child: SparkPlan) private lazy val relationFuture: Future[Array[InternalRow]] = { // relationFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + val localProps = Utils.cloneProperties(sparkContext.getLocalProperties) Future { // This will run in another thread. Set the execution id so that we can connect these jobs // with the correct execution. + sparkContext.setLocalProperties(localProps) SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) { val beforeCollect = System.nanoTime() // Note that we use .executeCollect() because we don't want to convert data to Scala types From 577904fda354a043ed403e6320e7880c1d7b5594 Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 21 Jan 2020 14:27:23 +0530 Subject: [PATCH 02/11] Update with UT --- .../spark/sql/internal/StaticSQLConf.scala | 8 +++++ .../execution/basicPhysicalOperators.scala | 4 ++- .../internal/ExecutorSideSQLConfSuite.scala | 33 +++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index d2f27da23901..21b16d33b476 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -148,6 +148,14 @@ object StaticSQLConf { .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in [0,128].") .createWithDefault(128) + val SUBQUERY_MAX_THREAD_THRESHOLD = + buildStaticConf("spark.sql.subquery.maxThreadThreshold") + .internal() + .doc("The maximum degree of parallelism to execute the subquery. ") + .intConf + .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in [0,128].") + .createWithDefault(16) + val SQL_EVENT_TRUNCATE_LENGTH = buildStaticConf("spark.sql.event.truncate.length") .doc("Threshold of SQL length beyond which it will be truncated before adding to " + "event. Defaults to no truncation. If set to 0, callsite will be logged instead.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 27c2528efaa5..e67fe9a52b9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.{LongType, StructType} import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} @@ -790,7 +791,8 @@ case class SubqueryExec(name: String, child: SparkPlan) object SubqueryExec { private[execution] val executionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + ThreadUtils.newDaemonCachedThreadPool("subquery", + SQLConf.get.getConf(StaticSQLConf.SUBQUERY_MAX_THREAD_THRESHOLD))) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index 776cdb107084..5f0262e7d3d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -125,6 +125,39 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect()) assert(e.getCause.isInstanceOf[NoSuchElementException]) } + + test("SPARK-30556 propagate local properties to subquery execution thread") { + withSQLConf("spark.sql.subquery.maxThreadThreshold" -> "1") { + spark.sparkContext.setLocalProperty("spark.sql.y", "e") + Seq(true) + .toDF() + .createOrReplaceTempView("l") + Seq(true) + .toDF() + .mapPartitions { _ => + val conf = SQLConf.get + conf.isInstanceOf[ReadOnlySQLConf] && conf.getConfString("spark.sql.y") == "e" match { + case true => Iterator(true) + case false => Iterator.empty + } + } + .createOrReplaceTempView("m") + assert(sql("select * from l where exists (select * from m )").collect.size == 1) + + spark.sparkContext.setLocalProperty("spark.sql.y", "f") + Seq(true) + .toDF() + .mapPartitions { _ => + val conf = SQLConf.get + conf.isInstanceOf[ReadOnlySQLConf] && conf.getConfString("spark.sql.y") == "f" match { + case true => Iterator(true) + case false => Iterator.empty + } + } + .createOrReplaceTempView("n") + assert(sql("select value from l where exists (select * from n )").collect().size == 1) + } + } } case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode { From b6b5bae43b45a92ae34c177d84fba1a80da8b781 Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 21 Jan 2020 16:31:49 +0530 Subject: [PATCH 03/11] Updated as per review comments --- .../apache/spark/sql/execution/SQLExecution.scala | 15 +++++++++++++++ .../sql/execution/basicPhysicalOperators.scala | 4 +--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 6046805ae95d..4e9c128d977f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong +import scala.concurrent.{ExecutionContext, Future} + import org.apache.spark.SparkContext import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.sql.SparkSession @@ -164,4 +166,17 @@ object SQLExecution { } } } + + /** + * Wrap passed function to ensure sparkContext local properties are forwarded to execution thread + */ + def withThreadLocalCaptured[T](sparkContext: SparkContext) + (body: => T) + (exec: ExecutionContext): Future[T] = { + val localProps = Utils.cloneProperties(sparkContext.getLocalProperties) + Future { + sparkContext.setLocalProperties(localProps) + body + }(exec) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index e67fe9a52b9c..034b54a704ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -750,11 +750,9 @@ case class SubqueryExec(name: String, child: SparkPlan) private lazy val relationFuture: Future[Array[InternalRow]] = { // relationFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - val localProps = Utils.cloneProperties(sparkContext.getLocalProperties) - Future { + SQLExecution.withThreadLocalCaptured[Array[InternalRow]](sparkContext) { // This will run in another thread. Set the execution id so that we can connect these jobs // with the correct execution. - sparkContext.setLocalProperties(localProps) SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) { val beforeCollect = System.nanoTime() // Note that we use .executeCollect() because we don't want to convert data to Scala types From c2b157a943f848e3b0dc394bbddd9f31063d9a93 Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 21 Jan 2020 16:34:31 +0530 Subject: [PATCH 04/11] Update format --- .../scala/org/apache/spark/sql/execution/SQLExecution.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 4e9c128d977f..8907e54a5ad7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -170,9 +170,8 @@ object SQLExecution { /** * Wrap passed function to ensure sparkContext local properties are forwarded to execution thread */ - def withThreadLocalCaptured[T](sparkContext: SparkContext) - (body: => T) - (exec: ExecutionContext): Future[T] = { + def withThreadLocalCaptured[T](sparkContext: SparkContext)(body: => T)( + exec: ExecutionContext): Future[T] = { val localProps = Utils.cloneProperties(sparkContext.getLocalProperties) Future { sparkContext.setLocalProperties(localProps) From 1ca7981b736e0fe801645b62402b363f086831fc Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 21 Jan 2020 16:59:54 +0530 Subject: [PATCH 05/11] Update to accept active session also --- .../org/apache/spark/sql/execution/SQLExecution.scala | 8 +++++--- .../spark/sql/execution/basicPhysicalOperators.scala | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 8907e54a5ad7..d63b0fac3c9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -170,11 +170,13 @@ object SQLExecution { /** * Wrap passed function to ensure sparkContext local properties are forwarded to execution thread */ - def withThreadLocalCaptured[T](sparkContext: SparkContext)(body: => T)( + def withThreadLocalCaptured[T](sparkSession: SparkSession)(body: => T)( exec: ExecutionContext): Future[T] = { - val localProps = Utils.cloneProperties(sparkContext.getLocalProperties) + val activeSession = sparkSession + val localProps = Utils.cloneProperties(sparkSession.sparkContext.getLocalProperties) Future { - sparkContext.setLocalProperties(localProps) + SparkSession.setActiveSession(activeSession) + sparkSession.sparkContext.setLocalProperties(localProps) body }(exec) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 034b54a704ee..dfd0b4f29806 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -750,7 +750,7 @@ case class SubqueryExec(name: String, child: SparkPlan) private lazy val relationFuture: Future[Array[InternalRow]] = { // relationFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLExecution.withThreadLocalCaptured[Array[InternalRow]](sparkContext) { + SQLExecution.withThreadLocalCaptured[Array[InternalRow]](sqlContext.sparkSession) { // This will run in another thread. Set the execution id so that we can connect these jobs // with the correct execution. SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) { From fdc71e3733f2ae248c20234556c5096c458ae9f1 Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 21 Jan 2020 19:40:52 +0530 Subject: [PATCH 06/11] Update comment --- .../scala/org/apache/spark/sql/execution/SQLExecution.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index d63b0fac3c9c..7716a5c03fa6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -168,7 +168,8 @@ object SQLExecution { } /** - * Wrap passed function to ensure sparkContext local properties are forwarded to execution thread + * Wrap passed function to ensure necessary thread-local variables like + * SparkContext local properties are forwarded to execution thread */ def withThreadLocalCaptured[T](sparkSession: SparkSession)(body: => T)( exec: ExecutionContext): Future[T] = { From aeea500b69acacc19655245030c40a638d876f86 Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 21 Jan 2020 23:09:46 +0530 Subject: [PATCH 07/11] Review Comment --- .../spark/sql/execution/SQLExecution.scala | 4 +- .../execution/basicPhysicalOperators.scala | 6 ++- .../internal/ExecutorSideSQLConfSuite.scala | 42 ++++++++++--------- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 7716a5c03fa6..e2e05d869e74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -171,8 +171,8 @@ object SQLExecution { * Wrap passed function to ensure necessary thread-local variables like * SparkContext local properties are forwarded to execution thread */ - def withThreadLocalCaptured[T](sparkSession: SparkSession)(body: => T)( - exec: ExecutionContext): Future[T] = { + def withThreadLocalCaptured[T](sparkSession: SparkSession, exec: ExecutionContext)( + body: => T): Future[T] = { val activeSession = sparkSession val localProps = Utils.cloneProperties(sparkSession.sparkContext.getLocalProperties) Future { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index dfd0b4f29806..f3f756425a15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -750,7 +750,9 @@ case class SubqueryExec(name: String, child: SparkPlan) private lazy val relationFuture: Future[Array[InternalRow]] = { // relationFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLExecution.withThreadLocalCaptured[Array[InternalRow]](sqlContext.sparkSession) { + SQLExecution.withThreadLocalCaptured[Array[InternalRow]]( + sqlContext.sparkSession, + SubqueryExec.executionContext) { // This will run in another thread. Set the execution id so that we can connect these jobs // with the correct execution. SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) { @@ -765,7 +767,7 @@ case class SubqueryExec(name: String, child: SparkPlan) SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq) rows } - }(SubqueryExec.executionContext) + } } protected override def doCanonicalize(): SparkPlan = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index 5f0262e7d3d0..ff5b7c81e531 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -128,36 +128,38 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { test("SPARK-30556 propagate local properties to subquery execution thread") { withSQLConf("spark.sql.subquery.maxThreadThreshold" -> "1") { - spark.sparkContext.setLocalProperty("spark.sql.y", "e") Seq(true) .toDF() .createOrReplaceTempView("l") - Seq(true) - .toDF() - .mapPartitions { _ => - val conf = SQLConf.get - conf.isInstanceOf[ReadOnlySQLConf] && conf.getConfString("spark.sql.y") == "e" match { - case true => Iterator(true) - case false => Iterator.empty - } - } + val confKey = "spark.sql.y" + + // set local configuration and assert + val confValue1 = "e" + createDataframe(confKey, confValue1) .createOrReplaceTempView("m") + spark.sparkContext.setLocalProperty(confKey, confValue1) assert(sql("select * from l where exists (select * from m )").collect.size == 1) - spark.sparkContext.setLocalProperty("spark.sql.y", "f") - Seq(true) - .toDF() - .mapPartitions { _ => - val conf = SQLConf.get - conf.isInstanceOf[ReadOnlySQLConf] && conf.getConfString("spark.sql.y") == "f" match { - case true => Iterator(true) - case false => Iterator.empty - } - } + // change the conf value and assert again + val confValue2 = "f" + createDataframe(confKey, confValue2) .createOrReplaceTempView("n") + spark.sparkContext.setLocalProperty(confKey, confValue2) assert(sql("select value from l where exists (select * from n )").collect().size == 1) } } + + private def createDataframe(confKey: String, confValue: String) = { + Seq(true) + .toDF() + .mapPartitions { _ => + val conf = SQLConf.get + conf.isInstanceOf[ReadOnlySQLConf] && conf.getConfString(confKey) == confValue match { + case true => Iterator(true) + case false => Iterator.empty + } + } + } } case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode { From ade75c63ed1b246d92fdb36a2dc240f2c6c3cd1f Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 21 Jan 2020 23:27:12 +0530 Subject: [PATCH 08/11] Make df creation method local --- .../internal/ExecutorSideSQLConfSuite.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index ff5b7c81e531..1936f9266ffe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.Assertions._ import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LocalRelation @@ -133,6 +133,18 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { .createOrReplaceTempView("l") val confKey = "spark.sql.y" + def createDataframe(confKey: String, confValue: String): Dataset[Boolean] = { + Seq(true) + .toDF() + .mapPartitions { _ => + val conf = SQLConf.get + conf.isInstanceOf[ReadOnlySQLConf] && conf.getConfString(confKey) == confValue match { + case true => Iterator(true) + case false => Iterator.empty + } + } + } + // set local configuration and assert val confValue1 = "e" createDataframe(confKey, confValue1) @@ -148,18 +160,6 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { assert(sql("select value from l where exists (select * from n )").collect().size == 1) } } - - private def createDataframe(confKey: String, confValue: String) = { - Seq(true) - .toDF() - .mapPartitions { _ => - val conf = SQLConf.get - conf.isInstanceOf[ReadOnlySQLConf] && conf.getConfString(confKey) == confValue match { - case true => Iterator(true) - case false => Iterator.empty - } - } - } } case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode { From 3f4435638f0f9b09a747cca0a351891777a8ea53 Mon Sep 17 00:00:00 2001 From: Ajith Date: Wed, 22 Jan 2020 16:03:55 +0530 Subject: [PATCH 09/11] Review Comments --- .../spark/sql/internal/StaticSQLConf.scala | 4 +- .../spark/sql/execution/SQLExecution.scala | 5 +- .../internal/ExecutorSideSQLConfSuite.scala | 55 ++++++++++--------- 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 21b16d33b476..683d4e4481b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -145,7 +145,7 @@ object StaticSQLConf { "cause longer waiting for other broadcasting. Also, increasing parallelism may " + "cause memory problem.") .intConf - .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in [0,128].") + .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].") .createWithDefault(128) val SUBQUERY_MAX_THREAD_THRESHOLD = @@ -153,7 +153,7 @@ object StaticSQLConf { .internal() .doc("The maximum degree of parallelism to execute the subquery. ") .intConf - .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in [0,128].") + .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].") .createWithDefault(16) val SQL_EVENT_TRUNCATE_LENGTH = buildStaticConf("spark.sql.event.truncate.length") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index e2e05d869e74..e1ee4bb72741 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -174,10 +174,11 @@ object SQLExecution { def withThreadLocalCaptured[T](sparkSession: SparkSession, exec: ExecutionContext)( body: => T): Future[T] = { val activeSession = sparkSession - val localProps = Utils.cloneProperties(sparkSession.sparkContext.getLocalProperties) + val sc = sparkSession.sparkContext + val localProps = Utils.cloneProperties(sc.getLocalProperties) Future { SparkSession.setActiveSession(activeSession) - sparkSession.sparkContext.setLocalProperties(localProps) + sc.setLocalProperties(localProps) body }(exec) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index 1936f9266ffe..240231c179f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -128,36 +128,39 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { test("SPARK-30556 propagate local properties to subquery execution thread") { withSQLConf("spark.sql.subquery.maxThreadThreshold" -> "1") { - Seq(true) - .toDF() - .createOrReplaceTempView("l") - val confKey = "spark.sql.y" - - def createDataframe(confKey: String, confValue: String): Dataset[Boolean] = { + withTempView("l", "m", "n") { Seq(true) .toDF() - .mapPartitions { _ => - val conf = SQLConf.get - conf.isInstanceOf[ReadOnlySQLConf] && conf.getConfString(confKey) == confValue match { - case true => Iterator(true) - case false => Iterator.empty + .createOrReplaceTempView("l") + val confKey = "spark.sql.y" + + def createDataframe(confKey: String, confValue: String): Dataset[Boolean] = { + Seq(true) + .toDF() + .mapPartitions { _ => + val conf = SQLConf.get + conf + .isInstanceOf[ReadOnlySQLConf] && conf.getConfString(confKey) == confValue match { + case true => Iterator(true) + case false => Iterator.empty + } } - } - } + } - // set local configuration and assert - val confValue1 = "e" - createDataframe(confKey, confValue1) - .createOrReplaceTempView("m") - spark.sparkContext.setLocalProperty(confKey, confValue1) - assert(sql("select * from l where exists (select * from m )").collect.size == 1) - - // change the conf value and assert again - val confValue2 = "f" - createDataframe(confKey, confValue2) - .createOrReplaceTempView("n") - spark.sparkContext.setLocalProperty(confKey, confValue2) - assert(sql("select value from l where exists (select * from n )").collect().size == 1) + // set local configuration and assert + val confValue1 = "e" + createDataframe(confKey, confValue1) + .createOrReplaceTempView("m") + spark.sparkContext.setLocalProperty(confKey, confValue1) + assert(sql("select * from l where exists (select * from m)").collect.size == 1) + + // change the conf value and assert again + val confValue2 = "f" + createDataframe(confKey, confValue2) + .createOrReplaceTempView("n") + spark.sparkContext.setLocalProperty(confKey, confValue2) + assert(sql("select value from l where exists (select * from n)").collect().size == 1) + } } } } From 89b95a3eeb33598b36c0004469b41701be9e76eb Mon Sep 17 00:00:00 2001 From: Ajith Date: Wed, 22 Jan 2020 17:44:30 +0530 Subject: [PATCH 10/11] Update style --- .../spark/sql/execution/SQLExecution.scala | 4 ++-- .../sql/internal/ExecutorSideSQLConfSuite.scala | 16 +++++----------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index e1ee4bb72741..995d94ef5eac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -171,8 +171,8 @@ object SQLExecution { * Wrap passed function to ensure necessary thread-local variables like * SparkContext local properties are forwarded to execution thread */ - def withThreadLocalCaptured[T](sparkSession: SparkSession, exec: ExecutionContext)( - body: => T): Future[T] = { + def withThreadLocalCaptured[T]( + sparkSession: SparkSession, exec: ExecutionContext)(body: => T): Future[T] = { val activeSession = sparkSession val sc = sparkSession.sparkContext val localProps = Utils.cloneProperties(sc.getLocalProperties) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index 240231c179f3..f0646902e8c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.internal import org.scalatest.Assertions._ -import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.{SparkException, SparkFunSuite, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.InternalRow @@ -129,18 +129,14 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { test("SPARK-30556 propagate local properties to subquery execution thread") { withSQLConf("spark.sql.subquery.maxThreadThreshold" -> "1") { withTempView("l", "m", "n") { - Seq(true) - .toDF() - .createOrReplaceTempView("l") + Seq(true).toDF().createOrReplaceTempView("l") val confKey = "spark.sql.y" def createDataframe(confKey: String, confValue: String): Dataset[Boolean] = { Seq(true) .toDF() .mapPartitions { _ => - val conf = SQLConf.get - conf - .isInstanceOf[ReadOnlySQLConf] && conf.getConfString(confKey) == confValue match { + TaskContext.get.getLocalProperty(confKey) == confValue match { case true => Iterator(true) case false => Iterator.empty } @@ -149,15 +145,13 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { // set local configuration and assert val confValue1 = "e" - createDataframe(confKey, confValue1) - .createOrReplaceTempView("m") + createDataframe(confKey, confValue1).createOrReplaceTempView("m") spark.sparkContext.setLocalProperty(confKey, confValue1) assert(sql("select * from l where exists (select * from m)").collect.size == 1) // change the conf value and assert again val confValue2 = "f" - createDataframe(confKey, confValue2) - .createOrReplaceTempView("n") + createDataframe(confKey, confValue2).createOrReplaceTempView("n") spark.sparkContext.setLocalProperty(confKey, confValue2) assert(sql("select value from l where exists (select * from n)").collect().size == 1) } From f1cac4de513dbe698604d6157fc8f663ecf8af72 Mon Sep 17 00:00:00 2001 From: Ajith Date: Thu, 23 Jan 2020 01:00:29 +0530 Subject: [PATCH 11/11] Review --- .../scala/org/apache/spark/sql/internal/StaticSQLConf.scala | 2 +- .../spark/sql/internal/ExecutorSideSQLConfSuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 683d4e4481b4..66ac9ddb21aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -151,7 +151,7 @@ object StaticSQLConf { val SUBQUERY_MAX_THREAD_THRESHOLD = buildStaticConf("spark.sql.subquery.maxThreadThreshold") .internal() - .doc("The maximum degree of parallelism to execute the subquery. ") + .doc("The maximum degree of parallelism to execute the subquery.") .intConf .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].") .createWithDefault(16) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index f0646902e8c7..0cc658c49961 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -127,7 +127,7 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { } test("SPARK-30556 propagate local properties to subquery execution thread") { - withSQLConf("spark.sql.subquery.maxThreadThreshold" -> "1") { + withSQLConf(StaticSQLConf.SUBQUERY_MAX_THREAD_THRESHOLD.key -> "1") { withTempView("l", "m", "n") { Seq(true).toDF().createOrReplaceTempView("l") val confKey = "spark.sql.y" @@ -147,13 +147,13 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { val confValue1 = "e" createDataframe(confKey, confValue1).createOrReplaceTempView("m") spark.sparkContext.setLocalProperty(confKey, confValue1) - assert(sql("select * from l where exists (select * from m)").collect.size == 1) + assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM m)").collect.size == 1) // change the conf value and assert again val confValue2 = "f" createDataframe(confKey, confValue2).createOrReplaceTempView("n") spark.sparkContext.setLocalProperty(confKey, confValue2) - assert(sql("select value from l where exists (select * from n)").collect().size == 1) + assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM n)").collect().size == 1) } } }