Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ object StaticSQLConf {
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].")
.createWithDefault(16)

val DYNAMIC_PRUNING_MAX_THREAD_THRESHOLD =
buildStaticConf("spark.sql.dynamic.pruning.maxThreadThreshold")
.internal()
.doc("The maximum degree of parallelism to execute the dynamic pruning.")
.intConf
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].")
.createWithDefault(16)

val SQL_EVENT_TRUNCATE_LENGTH = buildStaticConf("spark.sql.event.truncate.length")
.doc("Threshold of SQL length beyond which it will be truncated before adding to " +
"event. Defaults to no truncation. If set to 0, callsite will be logged instead.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.joins.{HashedRelation, HashJoin, LongHashedRelation}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.util.ThreadUtils

/**
Expand Down Expand Up @@ -57,7 +58,9 @@ case class SubqueryBroadcastExec(
private lazy val relationFuture: Future[Array[InternalRow]] = {
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future {
SQLExecution.withThreadLocalCaptured[Array[InternalRow]](
sqlContext.sparkSession,
SubqueryBroadcastExec.executionContext) {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
Expand All @@ -83,7 +86,7 @@ case class SubqueryBroadcastExec(

rows
}
}(SubqueryBroadcastExec.executionContext)
}
}

protected override def doPrepare(): Unit = {
Expand All @@ -104,5 +107,6 @@ case class SubqueryBroadcastExec(

object SubqueryBroadcastExec {
private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("dynamicpruning", 16))
ThreadUtils.newDaemonCachedThreadPool("dynamicpruning",
SQLConf.get.getConf(StaticSQLConf.DYNAMIC_PRUNING_MAX_THREAD_THRESHOLD)))
}