Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,35 @@ private[spark] class DAGScheduler(
missing.toList
}

/** Invoke `.partitions` on the given RDD and all of its ancestors */
private def eagerlyComputePartitionsForRddAndAncestors(rdd: RDD[_]): Unit = {
val startTime = System.nanoTime
val visitedRdds = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd

def visit(rdd: RDD[_]): Unit = {
if (!visitedRdds(rdd)) {
visitedRdds += rdd

// Eagerly compute:
rdd.partitions

for (dep <- rdd.dependencies) {
waitingForVisit.prepend(dep.rdd)
}
}
}

while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.remove(0))
}
logDebug("eagerlyComputePartitionsForRddAndAncestors for RDD %d took %f seconds"
.format(rdd.id, (System.nanoTime - startTime) / 1e9))
}

/**
* Registers the given jobId among the jobs that need the given stage and
* all of that stage's ancestors.
Expand Down Expand Up @@ -841,6 +870,11 @@ private[spark] class DAGScheduler(
"Total number of partitions: " + maxPartitions)
}

// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
// is evaluated outside of the DAGScheduler's single-threaded event loop:
eagerlyComputePartitionsForRddAndAncestors(rdd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be a good idea to add an assertion in the DebugFilesystem we have to check that it's not accessed within the event loop thread? It might help catch other cases where event loop might be doing heavy blocking operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea, but I'd like to defer it to a separate followup PR. I've filed https://issues.apache.org/jira/browse/SPARK-37009 to track that.


val jobId = nextJobId.getAndIncrement()
if (partitions.isEmpty) {
val clonedProperties = Utils.cloneProperties(properties)
Expand Down Expand Up @@ -930,6 +964,12 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))
return new PartialResult(evaluator.currentResult(), true)
}

// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
// is evaluated outside of the DAGScheduler's single-threaded event loop:
eagerlyComputePartitionsForRddAndAncestors(rdd)

val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
eventProcessLoop.post(JobSubmitted(
Expand Down Expand Up @@ -962,6 +1002,11 @@ private[spark] class DAGScheduler(
throw SparkCoreErrors.cannotRunSubmitMapStageOnZeroPartitionRDDError()
}

// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
// is evaluated outside of the DAGScheduler's single-threaded event loop:
eagerlyComputePartitionsForRddAndAncestors(rdd)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like before the three places calling eagerlyComputePartitionsForRddAndAncestors, there are some checks for rdd.partitions, so actually we just (need) compute its ancestors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but I chose not to make that optimization because (a) it doesn't actually matter from a performance perspective (accessing already-computed .partitions is very cheap) and (b) I think the optimization would make the code more complex.

// We create a JobWaiter with only one "task", which will be marked as complete when the whole
// map stage has completed, and will be passed the MapOutputStatistics for that stage.
// This makes it easier to avoid race conditions between the user code and the map output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class HealthTrackerIntegrationSuite extends SchedulerIntegrationSuite[MultiExecu
backend.taskFailed(taskDescription, new RuntimeException("test task failure"))
}
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 10).toArray)
awaitJobTermination(jobFuture, duration)
val pattern = (
s"""|Aborting TaskSet 0.0 because task .*
Expand Down Expand Up @@ -150,7 +150,7 @@ class MockRDDWithLocalityPrefs(
sc: SparkContext,
numPartitions: Int,
shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]],
val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps) {
val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps, Nil) {
override def getPreferredLocations(split: Partition): Seq[String] = {
Seq(preferredLoc)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.scheduler

import java.util.Properties
import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
Expand Down Expand Up @@ -205,7 +205,13 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
def shuffle(nParts: Int, input: MockRDD): MockRDD = {
val partitioner = new HashPartitioner(nParts)
val shuffleDep = new ShuffleDependency[Int, Int, Nothing](input, partitioner)
new MockRDD(sc, nParts, List(shuffleDep))
new MockRDD(sc, nParts, List(shuffleDep), Nil)
}

/** models a one-to-one dependency within a stage, like a map or filter */
def oneToOne(input: MockRDD): MockRDD = {
val dep = new OneToOneDependency[(Int, Int)](input)
new MockRDD(sc, input.numPartitions, Nil, Seq(dep))
}

/** models a stage boundary with multiple dependencies, like a join */
Expand All @@ -214,7 +220,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
val shuffleDeps = inputs.map { inputRDD =>
new ShuffleDependency[Int, Int, Nothing](inputRDD, partitioner)
}
new MockRDD(sc, nParts, shuffleDeps)
new MockRDD(sc, nParts, shuffleDeps, Nil)
}

val backendException = new AtomicReference[Exception](null)
Expand Down Expand Up @@ -449,10 +455,11 @@ case class ExecutorTaskStatus(host: String, executorId: String, var freeCores: I
class MockRDD(
sc: SparkContext,
val numPartitions: Int,
val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]]
) extends RDD[(Int, Int)](sc, shuffleDeps) with Serializable {
val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]],
val oneToOneDeps: Seq[OneToOneDependency[(Int, Int)]]
) extends RDD[(Int, Int)](sc, deps = shuffleDeps ++ oneToOneDeps) with Serializable {

MockRDD.validate(numPartitions, shuffleDeps)
MockRDD.validate(numPartitions, shuffleDeps, oneToOneDeps)

override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
Expand All @@ -468,14 +475,25 @@ class MockRDD(
object MockRDD extends AssertionsHelper with TripleEquals with Assertions {
/**
* make sure all the shuffle dependencies have a consistent number of output partitions
* and that one-to-one dependencies have the same partition counts as their parents
* (mostly to make sure the test setup makes sense, not that Spark itself would get this wrong)
*/
def validate(numPartitions: Int, dependencies: Seq[ShuffleDependency[_, _, _]]): Unit = {
dependencies.foreach { dependency =>
def validate(
numPartitions: Int,
shuffleDependencies: Seq[ShuffleDependency[_, _, _]],
oneToOneDependencies: Seq[OneToOneDependency[_]]): Unit = {
shuffleDependencies.foreach { dependency =>
val partitioner = dependency.partitioner
assert(partitioner != null)
assert(partitioner.numPartitions === numPartitions)
}
oneToOneDependencies.foreach { dependency =>
// In order to support the SPARK-23626 testcase, we cast to MockRDD
// and access `numPartitions` instead of just calling `getNumPartitions`:
// `getNumPartitions` would call `getPartitions`, undermining the intention
// of the SPARK-23626 testcase.
assert(dependency.rdd.asInstanceOf[MockRDD].numPartitions === numPartitions)
}
}
}

Expand Down Expand Up @@ -539,7 +557,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
backend.taskSuccess(taskDescription, 42)
}
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 10).toArray)
awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 10).map { _ -> 42 }.toMap)
Expand All @@ -564,7 +582,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
}
}

val a = new MockRDD(sc, 2, Nil)
val a = new MockRDD(sc, 2, Nil, Nil)
val b = shuffle(10, a)
val c = shuffle(20, a)
val d = join(30, b, c)
Expand Down Expand Up @@ -604,7 +622,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
* (b) we get a second attempt for stage 0 & stage 1
*/
testScheduler("job with fetch failure") {
val input = new MockRDD(sc, 2, Nil)
val input = new MockRDD(sc, 2, Nil, Nil)
val shuffledRdd = shuffle(10, input)
val shuffleId = shuffledRdd.shuffleDeps.head.shuffleId

Expand Down Expand Up @@ -646,10 +664,88 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
backend.taskFailed(taskDescription, new RuntimeException("test task failure"))
}
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 10).toArray)
awaitJobTermination(jobFuture, duration)
assert(failure.getMessage.contains("test task failure"))
}
assertDataStructuresEmpty(noFailure = false)
}

testScheduler("SPARK-23626: RDD with expensive getPartitions() doesn't block scheduler loop") {
// Before SPARK-23626, expensive `RDD.getPartitions()` calls might occur inside of the
// DAGScheduler event loop, causing concurrently-submitted jobs to block. This test case
// reproduces a scenario where that blocking could occur.

// We'll use latches to simulate an RDD with a slow getPartitions() call.
import MockRDDWithSlowGetPartitions._

// DAGScheduler.submitJob calls `.partitions` on the RDD passed to it.
// Therefore to write a proper regression test for SPARK-23626 we must
// ensure that the slow getPartitions() call occurs deeper in the RDD DAG:
val rddWithSlowGetPartitions = oneToOne(new MockRDDWithSlowGetPartitions(sc, 1))

// A RDD whose execution should not be blocked by the other RDD's slow getPartitions():
val simpleRdd = new MockRDD(sc, 1, Nil, Nil)

getPartitionsShouldNotHaveBeenCalledYet.set(false)

def runBackend(): Unit = {
val (taskDescription, _) = backend.beginTask()
backend.taskSuccess(taskDescription, 42)
}

withBackend(runBackend _) {
// Submit a job containing an RDD which will hang in getPartitions() until we release
// the countdown latch:
import scala.concurrent.ExecutionContext.Implicits.global
val slowJobFuture = Future { submit(rddWithSlowGetPartitions, Array(0)) }.flatten

// Block the current thread until the other thread has started the getPartitions() call:
beginGetPartitionsLatch.await(duration.toSeconds, SECONDS)

// Submit a concurrent job. This job's execution should not be blocked by the other job:
val fastJobFuture = submit(simpleRdd, Array(0))
awaitJobTermination(fastJobFuture, duration)

// The slow job should still be blocked in the getPartitions() call:
assert(!slowJobFuture.isCompleted)

// Allow it to complete:
endGetPartitionsLatch.countDown()
awaitJobTermination(slowJobFuture, duration)
}

assertDataStructuresEmpty()
}
}

/** Helper class used in SPARK-23626 test case */
private object MockRDDWithSlowGetPartitions {
// Latch for blocking the test execution thread until getPartitions() has been called:
val beginGetPartitionsLatch = new CountDownLatch(1)

// Latch for blocking the getPartitions() call from completing:
val endGetPartitionsLatch = new CountDownLatch(1)

// Atomic boolean which is used to fail the test in case getPartitions() is called earlier
// than expected. This guards against false-negatives (e.g. the test passing because
// `.getPartitions()` was called in the test setup before we even submitted a job):
val getPartitionsShouldNotHaveBeenCalledYet = new AtomicBoolean(true)
}

/** Helper class used in SPARK-23626 test case */
private class MockRDDWithSlowGetPartitions(
sc: SparkContext,
numPartitions: Int) extends MockRDD(sc, numPartitions, Nil, Nil) {
import MockRDDWithSlowGetPartitions._

override def getPartitions: Array[Partition] = {
if (getPartitionsShouldNotHaveBeenCalledYet.get()) {
throw new Exception("getPartitions() should not have been called at this point")
}
beginGetPartitionsLatch.countDown()
val partitions = super.getPartitions
endGetPartitionsLatch.await()
partitions
}
}