Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,25 @@ private[spark] object ThreadUtils {
throw new SparkException("Exception thrown in awaitResult: ", t)
}
}

/**
* Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s `BlockingContext`, wraps
* and re-throws any exceptions with nice stack track.
*
* Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent
* executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method
* basically prevents ForkJoinPool from running other tasks in the current waiting thread.
*/
@throws(classOf[SparkException])
def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: Duration): T = {
try {
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
// See SPARK-13747.
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
awaitable.result(Duration.Inf)(awaitPermission)
} catch {
case NonFatal(t) =>
throw new SparkException("Exception thrown in awaitResult: ", t)
}
}
}
1 change: 1 addition & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ This file is divided into 3 sections:
// scalastyle:off awaitresult
Await.result(...)
// scalastyle:on awaitresult
If your codes use ThreadLocal and may run in threads created by the user, use ThreadUtils.awaitResultInForkJoinSafely instead.
]]></customMessage>
</check>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
}

override def executeCollect(): Array[InternalRow] = {
ThreadUtils.awaitResult(relationFuture, Duration.Inf)
ThreadUtils.awaitResultInForkJoinSafely(relationFuture, Duration.Inf)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ case class BroadcastExchangeExec(
}

override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
ThreadUtils.awaitResultInForkJoinSafely(relationFuture, timeout)
.asInstanceOf[broadcast.Broadcast[T]]
}
}

Expand Down