Skip to content

Commit 126310c

Browse files
caneGuyHyukjinKwon
authored andcommitted
[SPARK-26601][SQL] Make broadcast-exchange thread pool configurable
## What changes were proposed in this pull request? Currently,thread number of broadcast-exchange thread pool is fixed and keepAliveSeconds is also fixed as 60s. ``` object BroadcastExchangeExec { private[execution] val executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) } /** * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ def newDaemonCachedThreadPool( prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) val threadPool = new ThreadPoolExecutor( maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used keepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable], threadFactory) threadPool.allowCoreThreadTimeOut(true) threadPool } ``` But some times, if the Thead object do not GC quickly it may caused server(driver) OOM. In such case,we need to make this thread pool configurable. A case has described in https://issues.apache.org/jira/browse/SPARK-26601 ## How was this patch tested? UT Closes apache#23670 from caneGuy/zhoukang/make-broadcat-config. Authored-by: zhoukang <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 2bc42ad commit 126310c

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,19 @@ object StaticSQLConf {
132132
.intConf
133133
.createWithDefault(1000)
134134

135+
val BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD =
136+
buildStaticConf("spark.sql.broadcastExchange.maxThreadThreshold")
137+
.internal()
138+
.doc("The maximum degree of parallelism to fetch and broadcast the table. " +
139+
"If we encounter memory issue like frequently full GC or OOM when broadcast table " +
140+
"we can decrease this number in order to reduce memory usage. " +
141+
"Notice the number should be carefully chosen since decreasing parallelism might " +
142+
"cause longer waiting for other broadcasting. Also, increasing parallelism may " +
143+
"cause memory problem.")
144+
.intConf
145+
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in [0,128].")
146+
.createWithDefault(128)
147+
135148
val SQL_EVENT_TRUNCATE_LENGTH = buildStaticConf("spark.sql.event.truncate.length")
136149
.doc("Threshold of SQL length beyond which it will be truncated before adding to " +
137150
"event. Defaults to no truncation. If set to 0, callsite will be logged instead.")

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPar
3232
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
3333
import org.apache.spark.sql.execution.joins.HashedRelation
3434
import org.apache.spark.sql.execution.metric.SQLMetrics
35-
import org.apache.spark.sql.internal.SQLConf
35+
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
3636
import org.apache.spark.util.{SparkFatalException, ThreadUtils}
3737

3838
/**
@@ -157,5 +157,6 @@ case class BroadcastExchangeExec(
157157

158158
object BroadcastExchangeExec {
159159
private[execution] val executionContext = ExecutionContext.fromExecutorService(
160-
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
160+
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange",
161+
SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD)))
161162
}

0 commit comments

Comments
 (0)