Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,17 @@ object StaticSQLConf {
"cause longer waiting for other broadcasting. Also, increasing parallelism may " +
"cause memory problem.")
.intConf
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in [0,128].")
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].")
.createWithDefault(128)

val SUBQUERY_MAX_THREAD_THRESHOLD =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a different change right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should also be a static configuration since we can only change it at startup.

Copy link
Contributor Author

@ajithme ajithme Jan 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This is just opening as a configuration to make the change testable. DO you want me to raise separate PR just to make this configuration change seperate.?

  2. This is part of StaticSQLConf which is defined at startup, is there any other mechanism to define static conf.?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If its a static conf, then why isn't your unite test failing? Moreover, if its static then setting it in your test probably does not have any effect because we use the same JVM/SparkContext to run most tests, the chances are pretty high that it has been set before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see. as per documentation in

* Static SQL configuration is a cross-session, immutable Spark configuration. External users can
its should not modified. I followed the same way BroadcastExchangeExec creates executionContext. My initial guess is executionContext is not created till first subquery hence it work for UT. i will further investigate and get back with analysis.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should submit a separate PR for this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could keep this in StaticSQLConf.

  • I locally test the static config takes effect, it can only be set while startup.
  • The UT can pass because it was used in lazy val SubqueryExec.relationFuture on the executor side, so the withSQLConf in UT could set the config before executor start.

cc @hvanhovell

buildStaticConf("spark.sql.subquery.maxThreadThreshold")
.internal()
.doc("The maximum degree of parallelism to execute the subquery.")
.intConf
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].")
.createWithDefault(16)

val SQL_EVENT_TRUNCATE_LENGTH = buildStaticConf("spark.sql.event.truncate.length")
.doc("Threshold of SQL length beyond which it will be truncated before adding to " +
"event. Defaults to no truncation. If set to 0, callsite will be logged instead.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.SparkContext
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -164,4 +166,20 @@ object SQLExecution {
}
}
}

/**
* Wrap passed function to ensure necessary thread-local variables like
* SparkContext local properties are forwarded to execution thread
*/
def withThreadLocalCaptured[T](
sparkSession: SparkSession, exec: ExecutionContext)(body: => T): Future[T] = {
val activeSession = sparkSession
val sc = sparkSession.sparkContext
val localProps = Utils.cloneProperties(sc.getLocalProperties)
Future {
SparkSession.setActiveSession(activeSession)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please unset/restore both of them to their old state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean i should clear the thread locals.? as these are worker threads, they do not have any old state as such. For example, when the thread is created first time, it will inherit from parent thread, but when resued, restoring would cause the previous properties to be set which is stale. IMO, setting this on every execution should suffice. Please correct me if wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should restore them to their previous state. In this particular case you can hold on to a SparkSession that is no longer in use.

sc.setLocalProperties(localProps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell two questions:

  • Shouldn't we clonelocalProps here? in the sense that what if a concurrent thread modify them?
  • Does the order of setting localProps and activeSession matter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the localProps is already a clone: https://github.com/apache/spark/pull/27267/files#diff-ab49028253e599e6e74cc4f4dcb2e3a8R178

And I think the order doesn't matter.

body
}(exec)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}

/** Physical plan for Project. */
Expand Down Expand Up @@ -749,7 +750,9 @@ case class SubqueryExec(name: String, child: SparkPlan)
private lazy val relationFuture: Future[Array[InternalRow]] = {
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future {
SQLExecution.withThreadLocalCaptured[Array[InternalRow]](
sqlContext.sparkSession,
SubqueryExec.executionContext) {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
Expand All @@ -764,7 +767,7 @@ case class SubqueryExec(name: String, child: SparkPlan)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
rows
}
}(SubqueryExec.executionContext)
}
}

protected override def doCanonicalize(): SparkPlan = {
Expand All @@ -788,7 +791,8 @@ case class SubqueryExec(name: String, child: SparkPlan)

object SubqueryExec {
private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
ThreadUtils.newDaemonCachedThreadPool("subquery",
SQLConf.get.getConf(StaticSQLConf.SUBQUERY_MAX_THREAD_THRESHOLD)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.sql.internal

import org.scalatest.Assertions._

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.{SparkException, SparkFunSuite, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
Expand Down Expand Up @@ -125,6 +125,38 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect())
assert(e.getCause.isInstanceOf[NoSuchElementException])
}

test("SPARK-30556 propagate local properties to subquery execution thread") {
withSQLConf(StaticSQLConf.SUBQUERY_MAX_THREAD_THRESHOLD.key -> "1") {
withTempView("l", "m", "n") {
Seq(true).toDF().createOrReplaceTempView("l")
val confKey = "spark.sql.y"

def createDataframe(confKey: String, confValue: String): Dataset[Boolean] = {
Seq(true)
.toDF()
.mapPartitions { _ =>
TaskContext.get.getLocalProperty(confKey) == confValue match {
case true => Iterator(true)
case false => Iterator.empty
}
}
}

// set local configuration and assert
val confValue1 = "e"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it is better to use something unique here to avoid a fluke. How about using UUID.randomUUID().toString()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am neutral about both approaches. Just wanted to have a fixed input value to get predictable output value from testcase. I can update this and raise followup if you insist

createDataframe(confKey, confValue1).createOrReplaceTempView("m")
spark.sparkContext.setLocalProperty(confKey, confValue1)
assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM m)").collect.size == 1)

// change the conf value and assert again
val confValue2 = "f"
createDataframe(confKey, confValue2).createOrReplaceTempView("n")
spark.sparkContext.setLocalProperty(confKey, confValue2)
assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM n)").collect().size == 1)
}
}
}
}

case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode {
Expand Down