Skip to content

Commit eba011a

Browse files
committed
[SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL
Calling `Await.result` will allow other tasks to be run on the same thread when using ForkJoinPool. However, SQL uses a `ThreadLocal` execution id to trace Spark jobs launched by a query, which doesn't work perfectly in ForkJoinPool. This PR just uses `Awaitable.result` instead to prevent ForkJoinPool from running other tasks in the current waiting thread. Jenkins Author: Shixiong Zhu <[email protected]> Closes #15520 from zsxwing/SPARK-13747. (cherry picked from commit 7ac70e7) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 773fbfe commit eba011a

File tree

4 files changed

+25
-2
lines changed

4 files changed

+25
-2
lines changed

core/src/main/scala/org/apache/spark/util/ThreadUtils.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,25 @@ private[spark] object ThreadUtils {
194194
throw new SparkException("Exception thrown in awaitResult: ", t)
195195
}
196196
}
197+
198+
/**
199+
* Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s `BlockingContext`, wraps
200+
* and re-throws any exceptions with nice stack track.
201+
*
202+
* Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent
203+
* executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method
204+
* basically prevents ForkJoinPool from running other tasks in the current waiting thread.
205+
*/
206+
@throws(classOf[SparkException])
207+
def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: Duration): T = {
208+
try {
209+
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
210+
// See SPARK-13747.
211+
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
212+
awaitable.result(Duration.Inf)(awaitPermission)
213+
} catch {
214+
case NonFatal(t) =>
215+
throw new SparkException("Exception thrown in awaitResult: ", t)
216+
}
217+
}
197218
}

scalastyle-config.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ This file is divided into 3 sections:
200200
// scalastyle:off awaitresult
201201
Await.result(...)
202202
// scalastyle:on awaitresult
203+
If your codes use ThreadLocal and may run in threads created by the user, use ThreadUtils.awaitResultInForkJoinSafely instead.
203204
]]></customMessage>
204205
</check>
205206

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
166166
protected def waitForSubqueries(): Unit = synchronized {
167167
// fill in the result of subqueries
168168
subqueryResults.foreach { case (e, futureResult) =>
169-
val rows = ThreadUtils.awaitResult(futureResult, Duration.Inf)
169+
val rows = ThreadUtils.awaitResultInForkJoinSafely(futureResult, Duration.Inf)
170170
if (rows.length > 1) {
171171
sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}")
172172
}

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ case class BroadcastExchangeExec(
117117
}
118118

119119
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
120-
ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
120+
ThreadUtils.awaitResultInForkJoinSafely(relationFuture, timeout)
121+
.asInstanceOf[broadcast.Broadcast[T]]
121122
}
122123
}
123124

0 commit comments

Comments
 (0)