Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* An exception thrown if RpcTimeout modifies a [[TimeoutException]].
Expand Down Expand Up @@ -72,15 +72,9 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S
* is still not ready
*/
def awaitResult[T](future: Future[T]): T = {
val wrapAndRethrow: PartialFunction[Throwable, T] = {
case NonFatal(t) =>
throw new SparkException("Exception thrown in awaitResult", t)
}
try {
// scalastyle:off awaitresult
Await.result(future, duration)
// scalastyle:on awaitresult
} catch addMessageIfTimeout.orElse(wrapAndRethrow)
ThreadUtils.awaitResult(future, duration)
} catch addMessageIfTimeout
}
}

Expand Down
41 changes: 16 additions & 25 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.util

import java.util.concurrent._

import scala.concurrent.{Await, Awaitable, ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.duration.Duration
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal
Expand Down Expand Up @@ -180,39 +180,30 @@ private[spark] object ThreadUtils {

// scalastyle:off awaitresult
/**
* Preferred alternative to `Await.result()`. This method wraps and re-throws any exceptions
* thrown by the underlying `Await` call, ensuring that this thread's stack trace appears in
* logs.
*/
@throws(classOf[SparkException])
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
try {
Await.result(awaitable, atMost)
// scalastyle:on awaitresult
} catch {
case NonFatal(t) =>
throw new SparkException("Exception thrown in awaitResult: ", t)
}
}

/**
* Calls `Awaitable.result` directly to avoid using `ForkJoinPool`'s `BlockingContext`, wraps
* and re-throws any exceptions with nice stack track.
* Preferred alternative to `Await.result()`.
*
* This method wraps and re-throws any exceptions thrown by the underlying `Await` call, ensuring
* that this thread's stack trace appears in logs.
*
* Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent
* executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method
* basically prevents ForkJoinPool from running other tasks in the current waiting thread.
* In addition, it calls `Awaitable.result` directly to avoid using `ForkJoinPool`'s
* `BlockingContext`. Codes running in the user's thread may be in a thread of Scala ForkJoinPool.
* As concurrent executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this
* method basically prevents ForkJoinPool from running other tasks in the current waiting thread.
* In general, we should use this method because many places in Spark use [[ThreadLocal]] and it's
* hard to debug when [[ThreadLocal]]s leak to other tasks.
*/
@throws(classOf[SparkException])
def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: Duration): T = {
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
try {
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
// See SPARK-13747.
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
awaitable.result(Duration.Inf)(awaitPermission)
awaitable.result(atMost)(awaitPermission)
} catch {
case NonFatal(t) =>
// TimeoutException is thrown in the current thread, so not need to warp the exception.
case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
throw new SparkException("Exception thrown in awaitResult: ", t)
}
}
// scalastyle:on awaitresult
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,9 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
val f = sc.parallelize(1 to 100, 4)
.mapPartitions(itr => { Thread.sleep(20); itr })
.countAsync()
val e = intercept[SparkException] {
intercept[TimeoutException] {
ThreadUtils.awaitResult(f, Duration(20, "milliseconds"))
}
assert(e.getCause.isInstanceOf[TimeoutException])
}

private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,9 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
0 until rdd.partitions.size, resultHandler, () => Unit)
// It's an error if the job completes successfully even though no committer was authorized,
// so throw an exception if the job was allowed to complete.
val e = intercept[SparkException] {
intercept[TimeoutException] {
ThreadUtils.awaitResult(futureAction, 5 seconds)
}
assert(e.getCause.isInstanceOf[TimeoutException])
assert(tempDir.list().size === 0)
}

Expand Down
1 change: 0 additions & 1 deletion scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ This file is divided into 3 sections:
// scalastyle:off awaitresult
Await.result(...)
// scalastyle:on awaitresult
If your codes use ThreadLocal and may run in threads created by the user, use ThreadUtils.awaitResultInForkJoinSafely instead.
]]></customMessage>
</check>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
}

override def executeCollect(): Array[InternalRow] = {
ThreadUtils.awaitResultInForkJoinSafely(relationFuture, Duration.Inf)
ThreadUtils.awaitResult(relationFuture, Duration.Inf)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ case class BroadcastExchangeExec(
}

override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
ThreadUtils.awaitResultInForkJoinSafely(relationFuture, timeout)
.asInstanceOf[broadcast.Broadcast[T]]
ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}
}

Expand Down