From 54078ce1e33a05c740840e74f37834934f085d79 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 17 Oct 2016 13:57:10 -0700 Subject: [PATCH 1/3] Fix concurrent executions in ForkJoinPool for SQL --- .../org/apache/spark/util/ThreadUtils.scala | 21 +++++++++++++++++++ scalastyle-config.xml | 1 + .../execution/basicPhysicalOperators.scala | 2 +- .../exchange/BroadcastExchangeExec.scala | 3 ++- 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 5a6dbc830448a..c1dc56d8da7c4 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -194,4 +194,25 @@ private[spark] object ThreadUtils { throw new SparkException("Exception thrown in awaitResult: ", t) } } + + /** + * Calls [[Awaitable.result]] directly, wraps and re-throws any exceptions with nice stack + * track. + * + * Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent + * executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method + * basically prevents ForkJoinPool from running other tasks in the current waiting thread. + */ + @throws(classOf[SparkException]) + def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: Duration): T = { + try { + // `awaitPermission` is not actually used anywhere so it's safe to pass in null here. + // See SPARK-13747. + val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] + awaitable.result(Duration.Inf)(awaitPermission) + } catch { + case NonFatal(t) => + throw new SparkException("Exception thrown in awaitResult: ", t) + } + } } diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 7fe0697202cd1..f4638577c9383 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -200,6 +200,7 @@ This file is divided into 3 sections: // scalastyle:off awaitresult Await.result(...) // scalastyle:on awaitresult + If your codes use ThreadLocal and run in a user's thread, use ThreadUtils.awaitResultInForkJoinSafely instead. ]]> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index dd78a784915d2..7255b0925be3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -562,7 +562,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { } override def executeCollect(): Array[InternalRow] = { - ThreadUtils.awaitResult(relationFuture, Duration.Inf) + ThreadUtils.awaitResultInForkJoinSafely(relationFuture, Duration.Inf) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 7be5d31d4a765..ce5013daeb1f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -128,7 +128,8 @@ case class BroadcastExchangeExec( } override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { - ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]] + ThreadUtils.awaitResultInForkJoinSafely(relationFuture, timeout) + .asInstanceOf[broadcast.Broadcast[T]] } } From 6aa9e2fad0da6848fa9bfff6d3288b604badcd3a Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 17 Oct 2016 22:25:30 -0700 Subject: [PATCH 2/3] Update comment --- core/src/main/scala/org/apache/spark/util/ThreadUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index c1dc56d8da7c4..d093e7bfc3dac 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -196,8 +196,8 @@ private[spark] object ThreadUtils { } /** - * Calls [[Awaitable.result]] directly, wraps and re-throws any exceptions with nice stack - * track. + * Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s `BlockingContext`, wraps + * and re-throws any exceptions with nice stack track. * * Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent * executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method From e9753ce175ef126646c2ce9b87a7939ccb9fbdb0 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 24 Oct 2016 10:35:56 -0700 Subject: [PATCH 3/3] Update --- scalastyle-config.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index f4638577c9383..81d57d723a720 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -200,7 +200,7 @@ This file is divided into 3 sections: // scalastyle:off awaitresult Await.result(...) // scalastyle:on awaitresult - If your codes use ThreadLocal and run in a user's thread, use ThreadUtils.awaitResultInForkJoinSafely instead. + If your codes use ThreadLocal and may run in threads created by the user, use ThreadUtils.awaitResultInForkJoinSafely instead. ]]>