diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 442edc732c8e..e483fd5b3a9a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -732,6 +732,35 @@ private[spark] class DAGScheduler( missing.toList } + /** Invoke `.partitions` on the given RDD and all of its ancestors */ + private def eagerlyComputePartitionsForRddAndAncestors(rdd: RDD[_]): Unit = { + val startTime = System.nanoTime + val visitedRdds = new HashSet[RDD[_]] + // We are manually maintaining a stack here to prevent StackOverflowError + // caused by recursively visiting + val waitingForVisit = new ListBuffer[RDD[_]] + waitingForVisit += rdd + + def visit(rdd: RDD[_]): Unit = { + if (!visitedRdds(rdd)) { + visitedRdds += rdd + + // Eagerly compute: + rdd.partitions + + for (dep <- rdd.dependencies) { + waitingForVisit.prepend(dep.rdd) + } + } + } + + while (waitingForVisit.nonEmpty) { + visit(waitingForVisit.remove(0)) + } + logDebug("eagerlyComputePartitionsForRddAndAncestors for RDD %d took %f seconds" + .format(rdd.id, (System.nanoTime - startTime) / 1e9)) + } + /** * Registers the given jobId among the jobs that need the given stage and * all of that stage's ancestors. @@ -841,6 +870,11 @@ private[spark] class DAGScheduler( "Total number of partitions: " + maxPartitions) } + // SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute + // `.partitions` on every RDD in the DAG to ensure that `getPartitions()` + // is evaluated outside of the DAGScheduler's single-threaded event loop: + eagerlyComputePartitionsForRddAndAncestors(rdd) + val jobId = nextJobId.getAndIncrement() if (partitions.isEmpty) { val clonedProperties = Utils.cloneProperties(properties) @@ -930,6 +964,12 @@ private[spark] class DAGScheduler( listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded)) return new PartialResult(evaluator.currentResult(), true) } + + // SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute + // `.partitions` on every RDD in the DAG to ensure that `getPartitions()` + // is evaluated outside of the DAGScheduler's single-threaded event loop: + eagerlyComputePartitionsForRddAndAncestors(rdd) + val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] eventProcessLoop.post(JobSubmitted( @@ -962,6 +1002,11 @@ private[spark] class DAGScheduler( throw SparkCoreErrors.cannotRunSubmitMapStageOnZeroPartitionRDDError() } + // SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute + // `.partitions` on every RDD in the DAG to ensure that `getPartitions()` + // is evaluated outside of the DAGScheduler's single-threaded event loop: + eagerlyComputePartitionsForRddAndAncestors(rdd) + // We create a JobWaiter with only one "task", which will be marked as complete when the whole // map stage has completed, and will be passed the MapOutputStatistics for that stage. // This makes it easier to avoid race conditions between the user code and the map output diff --git a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerIntegrationSuite.scala index 29a8f4be8b72..fd05ff9dfe8a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerIntegrationSuite.scala @@ -112,7 +112,7 @@ class HealthTrackerIntegrationSuite extends SchedulerIntegrationSuite[MultiExecu backend.taskFailed(taskDescription, new RuntimeException("test task failure")) } withBackend(runBackend _) { - val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) + val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 10).toArray) awaitJobTermination(jobFuture, duration) val pattern = ( s"""|Aborting TaskSet 0.0 because task .* @@ -150,7 +150,7 @@ class MockRDDWithLocalityPrefs( sc: SparkContext, numPartitions: Int, shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]], - val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps) { + val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps, Nil) { override def getPreferredLocations(split: Partition): Seq[String] = { Seq(preferredLoc) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 88d2868b957f..874abce68c11 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler import java.util.Properties -import java.util.concurrent.{TimeoutException, TimeUnit} +import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -205,7 +205,13 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa def shuffle(nParts: Int, input: MockRDD): MockRDD = { val partitioner = new HashPartitioner(nParts) val shuffleDep = new ShuffleDependency[Int, Int, Nothing](input, partitioner) - new MockRDD(sc, nParts, List(shuffleDep)) + new MockRDD(sc, nParts, List(shuffleDep), Nil) + } + + /** models a one-to-one dependency within a stage, like a map or filter */ + def oneToOne(input: MockRDD): MockRDD = { + val dep = new OneToOneDependency[(Int, Int)](input) + new MockRDD(sc, input.numPartitions, Nil, Seq(dep)) } /** models a stage boundary with multiple dependencies, like a join */ @@ -214,7 +220,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa val shuffleDeps = inputs.map { inputRDD => new ShuffleDependency[Int, Int, Nothing](inputRDD, partitioner) } - new MockRDD(sc, nParts, shuffleDeps) + new MockRDD(sc, nParts, shuffleDeps, Nil) } val backendException = new AtomicReference[Exception](null) @@ -449,10 +455,11 @@ case class ExecutorTaskStatus(host: String, executorId: String, var freeCores: I class MockRDD( sc: SparkContext, val numPartitions: Int, - val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]] -) extends RDD[(Int, Int)](sc, shuffleDeps) with Serializable { + val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]], + val oneToOneDeps: Seq[OneToOneDependency[(Int, Int)]] +) extends RDD[(Int, Int)](sc, deps = shuffleDeps ++ oneToOneDeps) with Serializable { - MockRDD.validate(numPartitions, shuffleDeps) + MockRDD.validate(numPartitions, shuffleDeps, oneToOneDeps) override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = throw new RuntimeException("should not be reached") @@ -468,14 +475,25 @@ class MockRDD( object MockRDD extends AssertionsHelper with TripleEquals with Assertions { /** * make sure all the shuffle dependencies have a consistent number of output partitions + * and that one-to-one dependencies have the same partition counts as their parents * (mostly to make sure the test setup makes sense, not that Spark itself would get this wrong) */ - def validate(numPartitions: Int, dependencies: Seq[ShuffleDependency[_, _, _]]): Unit = { - dependencies.foreach { dependency => + def validate( + numPartitions: Int, + shuffleDependencies: Seq[ShuffleDependency[_, _, _]], + oneToOneDependencies: Seq[OneToOneDependency[_]]): Unit = { + shuffleDependencies.foreach { dependency => val partitioner = dependency.partitioner assert(partitioner != null) assert(partitioner.numPartitions === numPartitions) } + oneToOneDependencies.foreach { dependency => + // In order to support the SPARK-23626 testcase, we cast to MockRDD + // and access `numPartitions` instead of just calling `getNumPartitions`: + // `getNumPartitions` would call `getPartitions`, undermining the intention + // of the SPARK-23626 testcase. + assert(dependency.rdd.asInstanceOf[MockRDD].numPartitions === numPartitions) + } } } @@ -539,7 +557,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor backend.taskSuccess(taskDescription, 42) } withBackend(runBackend _) { - val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) + val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 10).toArray) awaitJobTermination(jobFuture, duration) } assert(results === (0 until 10).map { _ -> 42 }.toMap) @@ -564,7 +582,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor } } - val a = new MockRDD(sc, 2, Nil) + val a = new MockRDD(sc, 2, Nil, Nil) val b = shuffle(10, a) val c = shuffle(20, a) val d = join(30, b, c) @@ -604,7 +622,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor * (b) we get a second attempt for stage 0 & stage 1 */ testScheduler("job with fetch failure") { - val input = new MockRDD(sc, 2, Nil) + val input = new MockRDD(sc, 2, Nil, Nil) val shuffledRdd = shuffle(10, input) val shuffleId = shuffledRdd.shuffleDeps.head.shuffleId @@ -646,10 +664,88 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor backend.taskFailed(taskDescription, new RuntimeException("test task failure")) } withBackend(runBackend _) { - val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) + val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 10).toArray) awaitJobTermination(jobFuture, duration) assert(failure.getMessage.contains("test task failure")) } assertDataStructuresEmpty(noFailure = false) } + + testScheduler("SPARK-23626: RDD with expensive getPartitions() doesn't block scheduler loop") { + // Before SPARK-23626, expensive `RDD.getPartitions()` calls might occur inside of the + // DAGScheduler event loop, causing concurrently-submitted jobs to block. This test case + // reproduces a scenario where that blocking could occur. + + // We'll use latches to simulate an RDD with a slow getPartitions() call. + import MockRDDWithSlowGetPartitions._ + + // DAGScheduler.submitJob calls `.partitions` on the RDD passed to it. + // Therefore to write a proper regression test for SPARK-23626 we must + // ensure that the slow getPartitions() call occurs deeper in the RDD DAG: + val rddWithSlowGetPartitions = oneToOne(new MockRDDWithSlowGetPartitions(sc, 1)) + + // A RDD whose execution should not be blocked by the other RDD's slow getPartitions(): + val simpleRdd = new MockRDD(sc, 1, Nil, Nil) + + getPartitionsShouldNotHaveBeenCalledYet.set(false) + + def runBackend(): Unit = { + val (taskDescription, _) = backend.beginTask() + backend.taskSuccess(taskDescription, 42) + } + + withBackend(runBackend _) { + // Submit a job containing an RDD which will hang in getPartitions() until we release + // the countdown latch: + import scala.concurrent.ExecutionContext.Implicits.global + val slowJobFuture = Future { submit(rddWithSlowGetPartitions, Array(0)) }.flatten + + // Block the current thread until the other thread has started the getPartitions() call: + beginGetPartitionsLatch.await(duration.toSeconds, SECONDS) + + // Submit a concurrent job. This job's execution should not be blocked by the other job: + val fastJobFuture = submit(simpleRdd, Array(0)) + awaitJobTermination(fastJobFuture, duration) + + // The slow job should still be blocked in the getPartitions() call: + assert(!slowJobFuture.isCompleted) + + // Allow it to complete: + endGetPartitionsLatch.countDown() + awaitJobTermination(slowJobFuture, duration) + } + + assertDataStructuresEmpty() + } +} + +/** Helper class used in SPARK-23626 test case */ +private object MockRDDWithSlowGetPartitions { + // Latch for blocking the test execution thread until getPartitions() has been called: + val beginGetPartitionsLatch = new CountDownLatch(1) + + // Latch for blocking the getPartitions() call from completing: + val endGetPartitionsLatch = new CountDownLatch(1) + + // Atomic boolean which is used to fail the test in case getPartitions() is called earlier + // than expected. This guards against false-negatives (e.g. the test passing because + // `.getPartitions()` was called in the test setup before we even submitted a job): + val getPartitionsShouldNotHaveBeenCalledYet = new AtomicBoolean(true) +} + +/** Helper class used in SPARK-23626 test case */ +private class MockRDDWithSlowGetPartitions( + sc: SparkContext, + numPartitions: Int) extends MockRDD(sc, numPartitions, Nil, Nil) { + import MockRDDWithSlowGetPartitions._ + + override def getPartitions: Array[Partition] = { + if (getPartitionsShouldNotHaveBeenCalledYet.get()) { + throw new Exception("getPartitions() should not have been called at this point") + } + beginGetPartitionsLatch.countDown() + val partitions = super.getPartitions + endGetPartitionsLatch.await() + partitions + } }