From df20ed5fab517d3d9e7f7cc1872c5af57314a5aa Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 13 Jul 2015 20:23:39 -0700 Subject: [PATCH 01/12] Add regression test --- .../org/apache/spark/FutureActionSuite.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/FutureActionSuite.scala b/core/src/test/scala/org/apache/spark/FutureActionSuite.scala index 1102aea96b548..fe9a6e3835932 100644 --- a/core/src/test/scala/org/apache/spark/FutureActionSuite.scala +++ b/core/src/test/scala/org/apache/spark/FutureActionSuite.scala @@ -17,11 +17,12 @@ package org.apache.spark -import scala.concurrent.Await +import scala.concurrent.{ExecutionContext, Await} import scala.concurrent.duration.Duration import org.scalatest.{BeforeAndAfter, Matchers} +import org.apache.spark.util.ThreadUtils class FutureActionSuite extends SparkFunSuite @@ -49,4 +50,20 @@ class FutureActionSuite job.jobIds.size should be (2) } + test("simple async action callbacks should not tie up execution context threads (SPARK-9026)") { + val rdd = sc.parallelize(1 to 10, 2).map(_ => Thread.sleep(1000 * 1000)) + val pool = ThreadUtils.newDaemonCachedThreadPool("SimpleFutureActionTest") + val executionContext = ExecutionContext.fromExecutorService(pool) + val job = rdd.countAsync() + try { + for (_ <- 1 to 10) { + job.onComplete(_ => ())(executionContext) + assert(pool.getLargestPoolSize < 10) + } + } finally { + job.cancel() + executionContext.shutdownNow() + } + } + } From 55c41d3a54171adccc333cf155668b8673b5c15f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 13 Jul 2015 19:37:38 -0700 Subject: [PATCH 02/12] Refactor SimpleFutureAction to not block threads for every onComplete callback. --- .../scala/org/apache/spark/FutureAction.scala | 50 ++++--------------- .../apache/spark/scheduler/JobWaiter.scala | 17 +++++++ core/src/test/resources/log4j.properties | 6 ++- 3 files changed, 33 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 48792a958130c..307901516db4e 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.api.java.JavaFutureAction import org.apache.spark.rdd.RDD -import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter} +import org.apache.spark.scheduler.JobWaiter import scala.concurrent._ import scala.concurrent.duration.Duration @@ -109,6 +109,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: extends FutureAction[T] { @volatile private var _cancelled: Boolean = false + @volatile private var _value: Try[T] = null override def cancel() { _cancelled = true @@ -116,57 +117,28 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { - if (!atMost.isFinite()) { - awaitResult() - } else jobWaiter.synchronized { - val finishTime = System.currentTimeMillis() + atMost.toMillis - while (!isCompleted) { - val time = System.currentTimeMillis() - if (time >= finishTime) { - throw new TimeoutException - } else { - jobWaiter.wait(finishTime - time) - } - } - } + jobWaiter.toFuture.ready(atMost)(permit) this } @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { - ready(atMost)(permit) - awaitResult() match { - case scala.util.Success(res) => res - case scala.util.Failure(e) => throw e - } + jobWaiter.toFuture.result(atMost)(permit) + resultFunc } - override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) { - executor.execute(new Runnable { - override def run() { - func(awaitResult()) - } - }) + override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = { + jobWaiter.toFuture.onComplete { (jobWaiterResult: Try[Unit]) => + _value = jobWaiterResult.map(_ => resultFunc) + func(_value) + } } override def isCompleted: Boolean = jobWaiter.jobFinished override def isCancelled: Boolean = _cancelled - override def value: Option[Try[T]] = { - if (jobWaiter.jobFinished) { - Some(awaitResult()) - } else { - None - } - } - - private def awaitResult(): Try[T] = { - jobWaiter.awaitResult() match { - case JobSucceeded => scala.util.Success(resultFunc) - case JobFailed(e: Exception) => scala.util.Failure(e) - } - } + override def value: Option[Try[T]] = Option(_value) def jobIds: Seq[Int] = Seq(jobWaiter.jobId) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 382b09422a4a0..1144b65887c73 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -17,6 +17,9 @@ package org.apache.spark.scheduler +import scala.concurrent.{Future, Promise} +import scala.util.Success + /** * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their * results to the given handler function. @@ -28,12 +31,18 @@ private[spark] class JobWaiter[T]( resultHandler: (Int, T) => Unit) extends JobListener { + private val promise = Promise[Unit] + private var finishedTasks = 0 // Is the job as a whole finished (succeeded or failed)? @volatile private var _jobFinished = totalTasks == 0 + if (_jobFinished) { + promise.complete(Success(Unit)) + } + def jobFinished: Boolean = _jobFinished // If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero @@ -58,6 +67,7 @@ private[spark] class JobWaiter[T]( if (finishedTasks == totalTasks) { _jobFinished = true jobResult = JobSucceeded + promise.trySuccess() this.notifyAll() } } @@ -65,6 +75,7 @@ private[spark] class JobWaiter[T]( override def jobFailed(exception: Exception): Unit = synchronized { _jobFinished = true jobResult = JobFailed(exception) + promise.tryFailure(exception) this.notifyAll() } @@ -74,4 +85,10 @@ private[spark] class JobWaiter[T]( } return jobResult } + + /** + * Return a Future to monitoring the job success or failure event. You can use this method to + * avoid blocking your thread. + */ + def toFuture: Future[Unit] = promise.future } diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index eb3b1999eb996..48222f7719689 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -16,7 +16,11 @@ # # Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=true log4j.appender.file.file=target/unit-tests.log From 1deed38a9cf7d1b4c34074db1affa32d693bbd61 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 13 Jul 2015 20:08:09 -0700 Subject: [PATCH 03/12] Add some comments --- core/src/main/scala/org/apache/spark/FutureAction.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 307901516db4e..0fba6e8f1c7fd 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -108,7 +108,11 @@ trait FutureAction[T] extends Future[T] { class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T) extends FutureAction[T] { + // Note: `resultFunc` is a closure which may contain references to state that's updated by the + // JobWaiter's result handler function. It should only be evaluated once the job has succeeded. + @volatile private var _cancelled: Boolean = false + // Null until the job has completed, then holds a Try representing success or failure. @volatile private var _value: Try[T] = null override def cancel() { @@ -117,18 +121,22 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { + // This call to the JobWaiter's future will throw an exception if the job failed. jobWaiter.toFuture.ready(atMost)(permit) this } @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { + // This call to the JobWaiter's future will throw an exception if the job failed. jobWaiter.toFuture.result(atMost)(permit) + // At this point, we know that the job succeeded so it's safe to evaluate this function: resultFunc } override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = { jobWaiter.toFuture.onComplete { (jobWaiterResult: Try[Unit]) => + // If the job succeeded, then evaluate the result function; otherwise, preserve the exception. _value = jobWaiterResult.map(_ => resultFunc) func(_value) } From d779af8d2fd6b8c64f0e914ee59eb8700d60f8e8 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 13 Jul 2015 20:08:21 -0700 Subject: [PATCH 04/12] Back out log4j.properties changes. --- core/src/test/resources/log4j.properties | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 48222f7719689..eb3b1999eb996 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -16,11 +16,7 @@ # # Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n +log4j.rootCategory=INFO, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=true log4j.appender.file.file=target/unit-tests.log From 1e2db7f54952aa9338acea6e2b64ca24d61ed2ab Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 13 Jul 2015 22:52:22 -0700 Subject: [PATCH 05/12] Fix race. --- .../scala/org/apache/spark/FutureAction.scala | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 0fba6e8f1c7fd..5c27d078d3ac9 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.api.java.JavaFutureAction import org.apache.spark.rdd.RDD -import org.apache.spark.scheduler.JobWaiter +import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter} import scala.concurrent._ import scala.concurrent.duration.Duration @@ -112,8 +112,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: // JobWaiter's result handler function. It should only be evaluated once the job has succeeded. @volatile private var _cancelled: Boolean = false - // Null until the job has completed, then holds a Try representing success or failure. - @volatile private var _value: Try[T] = null + private[this] val jobWaiterFuture: Future[Unit] = jobWaiter.toFuture override def cancel() { _cancelled = true @@ -121,32 +120,34 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { - // This call to the JobWaiter's future will throw an exception if the job failed. - jobWaiter.toFuture.ready(atMost)(permit) + jobWaiterFuture.ready(atMost)(permit) // Throws exception if the job failed. this } @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { - // This call to the JobWaiter's future will throw an exception if the job failed. - jobWaiter.toFuture.result(atMost)(permit) - // At this point, we know that the job succeeded so it's safe to evaluate this function: - resultFunc + jobWaiterFuture.result(atMost)(permit) // Throws exception if the job failed. + resultFunc // This function is safe to evaluate because the job must have succeeded. } override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = { - jobWaiter.toFuture.onComplete { (jobWaiterResult: Try[Unit]) => - // If the job succeeded, then evaluate the result function; otherwise, preserve the exception. - _value = jobWaiterResult.map(_ => resultFunc) - func(_value) - } + jobWaiterFuture.map { _ => resultFunc }.onComplete(func) } override def isCompleted: Boolean = jobWaiter.jobFinished override def isCancelled: Boolean = _cancelled - override def value: Option[Try[T]] = Option(_value) + override def value: Option[Try[T]] = { + if (!isCompleted) { + None + } else { + jobWaiter.awaitResult() match { + case JobSucceeded => Some(scala.util.Success(resultFunc)) + case JobFailed(e) => Some(scala.util.Failure(e)) + } + } + } def jobIds: Seq[Int] = Seq(jobWaiter.jobId) } From 1346313c38d8009d849399b654913b95b3646ff5 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 14 Jul 2015 10:54:41 -0700 Subject: [PATCH 06/12] Use lazy val to make ot clear that resultfunc should only be evaluated once --- .../src/main/scala/org/apache/spark/FutureAction.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 5c27d078d3ac9..7a3d29f25d522 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -113,6 +113,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: @volatile private var _cancelled: Boolean = false private[this] val jobWaiterFuture: Future[Unit] = jobWaiter.toFuture + private[this] lazy val resultFuncOutput: T = { + assert(isCompleted, "resultFunc should only be evaluated after the job has completed") + resultFunc + } override def cancel() { _cancelled = true @@ -127,11 +131,11 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { jobWaiterFuture.result(atMost)(permit) // Throws exception if the job failed. - resultFunc // This function is safe to evaluate because the job must have succeeded. + resultFuncOutput // This function is safe to evaluate because the job must have succeeded. } override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = { - jobWaiterFuture.map { _ => resultFunc }.onComplete(func) + jobWaiterFuture.map { _ => resultFuncOutput }.onComplete(func) } override def isCompleted: Boolean = jobWaiter.jobFinished @@ -143,7 +147,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: None } else { jobWaiter.awaitResult() match { - case JobSucceeded => Some(scala.util.Success(resultFunc)) + case JobSucceeded => Some(scala.util.Success(resultFuncOutput)) case JobFailed(e) => Some(scala.util.Failure(e)) } } From e08623a5c10d3d0201ab0b4d5b5f2fcbde85bfe7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 14 Jul 2015 22:35:59 -0700 Subject: [PATCH 07/12] Convert JobWaiter into a Future --- .../scala/org/apache/spark/FutureAction.scala | 20 +++--- .../apache/spark/scheduler/DAGScheduler.scala | 7 ++- .../apache/spark/scheduler/JobWaiter.scala | 62 +++++++++---------- 3 files changed, 40 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 7a3d29f25d522..4799aadb49d9b 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.api.java.JavaFutureAction import org.apache.spark.rdd.RDD -import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter} +import org.apache.spark.scheduler.JobWaiter import scala.concurrent._ import scala.concurrent.duration.Duration @@ -112,7 +112,6 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: // JobWaiter's result handler function. It should only be evaluated once the job has succeeded. @volatile private var _cancelled: Boolean = false - private[this] val jobWaiterFuture: Future[Unit] = jobWaiter.toFuture private[this] lazy val resultFuncOutput: T = { assert(isCompleted, "resultFunc should only be evaluated after the job has completed") resultFunc @@ -124,32 +123,27 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { - jobWaiterFuture.ready(atMost)(permit) // Throws exception if the job failed. + jobWaiter.ready(atMost)(permit) this } @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { - jobWaiterFuture.result(atMost)(permit) // Throws exception if the job failed. + jobWaiter.result(atMost)(permit) // Throws exception if the job failed. resultFuncOutput // This function is safe to evaluate because the job must have succeeded. } override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = { - jobWaiterFuture.map { _ => resultFuncOutput }.onComplete(func) + jobWaiter.map { _ => resultFuncOutput }.onComplete(func) } - override def isCompleted: Boolean = jobWaiter.jobFinished + override def isCompleted: Boolean = jobWaiter.isCompleted override def isCancelled: Boolean = _cancelled override def value: Option[Try[T]] = { - if (!isCompleted) { - None - } else { - jobWaiter.awaitResult() match { - case JobSucceeded => Some(scala.util.Success(resultFuncOutput)) - case JobFailed(e) => Some(scala.util.Failure(e)) - } + jobWaiter.value.map { valueTry => + valueTry.map(_ => resultFuncOutput) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f3d87ee5c4fd1..6ea2ffa423d42 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, Stack} +import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.existentials import scala.language.postfixOps @@ -549,11 +550,11 @@ class DAGScheduler( properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties) - waiter.awaitResult() match { - case JobSucceeded => + Await.ready(waiter, Duration.Inf).value.get match { + case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) - case JobFailed(exception: Exception) => + case scala.util.Failure(exception: Exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) throw exception diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 1144b65887c73..c13cab5b146c6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -17,8 +17,9 @@ package org.apache.spark.scheduler -import scala.concurrent.{Future, Promise} -import scala.util.Success +import scala.concurrent.duration.Duration +import scala.concurrent._ +import scala.util.Try /** * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their @@ -29,25 +30,37 @@ private[spark] class JobWaiter[T]( val jobId: Int, totalTasks: Int, resultHandler: (Int, T) => Unit) - extends JobListener { + extends JobListener with Future[Unit] { - private val promise = Promise[Unit] + private[this] val promise: Promise[Unit] = { + if (totalTasks == 0) { + Promise.successful[Unit]() + } else { + Promise[Unit]() + } + } + private[this] val promiseFuture: Future[Unit] = promise.future + private[this] var finishedTasks = 0 + + override def onComplete[U](func: (Try[Unit]) => U)(implicit executor: ExecutionContext): Unit = { + promiseFuture.onComplete(func) + } - private var finishedTasks = 0 + override def isCompleted: Boolean = promiseFuture.isCompleted - // Is the job as a whole finished (succeeded or failed)? - @volatile - private var _jobFinished = totalTasks == 0 + override def value: Option[Try[Unit]] = promiseFuture.value - if (_jobFinished) { - promise.complete(Success(Unit)) + @throws(classOf[Exception]) + override def result(atMost: Duration)(implicit permit: CanAwait): Unit = { + promiseFuture.result(atMost)(permit) } - def jobFinished: Boolean = _jobFinished - - // If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero - // partition RDDs), we set the jobResult directly to JobSucceeded. - private var jobResult: JobResult = if (jobFinished) JobSucceeded else null + @throws(classOf[InterruptedException]) + @throws(classOf[TimeoutException]) + override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = { + promiseFuture.ready(atMost)(permit) + this + } /** * Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled @@ -59,36 +72,19 @@ private[spark] class JobWaiter[T]( } override def taskSucceeded(index: Int, result: Any): Unit = synchronized { - if (_jobFinished) { + if (isCompleted) { throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") } resultHandler(index, result.asInstanceOf[T]) finishedTasks += 1 if (finishedTasks == totalTasks) { - _jobFinished = true - jobResult = JobSucceeded promise.trySuccess() this.notifyAll() } } override def jobFailed(exception: Exception): Unit = synchronized { - _jobFinished = true - jobResult = JobFailed(exception) promise.tryFailure(exception) this.notifyAll() } - - def awaitResult(): JobResult = synchronized { - while (!_jobFinished) { - this.wait() - } - return jobResult - } - - /** - * Return a Future to monitoring the job success or failure event. You can use this method to - * avoid blocking your thread. - */ - def toFuture: Future[Unit] = promise.future } From b504384eb18169190e6322faaf48ad1a05cf07de Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 15 Jul 2015 08:16:02 -0700 Subject: [PATCH 08/12] Remove unnecessary notifyAll() cals. --- core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index c13cab5b146c6..f4dd80b519692 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -79,12 +79,10 @@ private[spark] class JobWaiter[T]( finishedTasks += 1 if (finishedTasks == totalTasks) { promise.trySuccess() - this.notifyAll() } } override def jobFailed(exception: Exception): Unit = synchronized { promise.tryFailure(exception) - this.notifyAll() } } From 12ddad67a151b4c54b9e370766ca03cd8d9ca507 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 15 Jul 2015 19:34:11 -0700 Subject: [PATCH 09/12] Remove unnecessary test --- .../org/apache/spark/FutureActionSuite.scala | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FutureActionSuite.scala b/core/src/test/scala/org/apache/spark/FutureActionSuite.scala index fe9a6e3835932..1102aea96b548 100644 --- a/core/src/test/scala/org/apache/spark/FutureActionSuite.scala +++ b/core/src/test/scala/org/apache/spark/FutureActionSuite.scala @@ -17,12 +17,11 @@ package org.apache.spark -import scala.concurrent.{ExecutionContext, Await} +import scala.concurrent.Await import scala.concurrent.duration.Duration import org.scalatest.{BeforeAndAfter, Matchers} -import org.apache.spark.util.ThreadUtils class FutureActionSuite extends SparkFunSuite @@ -50,20 +49,4 @@ class FutureActionSuite job.jobIds.size should be (2) } - test("simple async action callbacks should not tie up execution context threads (SPARK-9026)") { - val rdd = sc.parallelize(1 to 10, 2).map(_ => Thread.sleep(1000 * 1000)) - val pool = ThreadUtils.newDaemonCachedThreadPool("SimpleFutureActionTest") - val executionContext = ExecutionContext.fromExecutorService(pool) - val job = rdd.countAsync() - try { - for (_ <- 1 to 10) { - job.onComplete(_ => ())(executionContext) - assert(pool.getLargestPoolSize < 10) - } - } finally { - job.cancel() - executionContext.shutdownNow() - } - } - } From dae8805704c5efd696d1b379b7ab6b5b50d3902f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 15 Jul 2015 19:35:09 -0700 Subject: [PATCH 10/12] Use success instead of trySuccess; etc. --- .../src/main/scala/org/apache/spark/scheduler/JobWaiter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index c13cab5b146c6..483a5fca7ab13 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -78,13 +78,13 @@ private[spark] class JobWaiter[T]( resultHandler(index, result.asInstanceOf[T]) finishedTasks += 1 if (finishedTasks == totalTasks) { - promise.trySuccess() + promise.success() this.notifyAll() } } override def jobFailed(exception: Exception): Unit = synchronized { - promise.tryFailure(exception) + promise.failure(exception) this.notifyAll() } } From 1a19268fda972f6103bcd50f376c0f57a238b394 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 17 Jul 2015 09:35:46 -0700 Subject: [PATCH 11/12] Make jobFailed idempotent --- .../main/scala/org/apache/spark/scheduler/JobWaiter.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 80babbf260bf6..586f9eff5a8ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -83,6 +83,12 @@ private[spark] class JobWaiter[T]( } override def jobFailed(exception: Exception): Unit = synchronized { - promise.failure(exception) + // There are certain situations where jobFailed can be called multiple times for the same + // job. We guard against this by making this method idempotent. + if (!isCompleted) { + promise.failure(exception) + } else { + assert(promiseFuture.value.get.isFailure) + } } } From 17edbcd06086b6a8cad922b4c535eb2a6265b2e3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Jul 2015 16:18:38 -0700 Subject: [PATCH 12/12] Exception -> Throwable --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 61fe92d2d99f5..d5d21f03a77a9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -548,7 +548,7 @@ class DAGScheduler( case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) - case scala.util.Failure(exception: Exception) => + case scala.util.Failure(exception: Throwable) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.