Skip to content

Commit 3bcc554

Browse files
committed
Add same fix to submitMapStage() and runApproximateJob()
1 parent 39111e1 commit 3bcc554

File tree

1 file changed

+11
-0
lines changed

1 file changed

+11
-0
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -961,6 +961,12 @@ private[spark] class DAGScheduler(
961961
listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))
962962
return new PartialResult(evaluator.currentResult(), true)
963963
}
964+
965+
// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
966+
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
967+
// is evaluated outside of the DAGScheduler's single-threaded event loop:
968+
eagerlyComputePartitionsForRddAndAncestors(rdd)
969+
964970
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
965971
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
966972
eventProcessLoop.post(JobSubmitted(
@@ -993,6 +999,11 @@ private[spark] class DAGScheduler(
993999
throw SparkCoreErrors.cannotRunSubmitMapStageOnZeroPartitionRDDError()
9941000
}
9951001

1002+
// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
1003+
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
1004+
// is evaluated outside of the DAGScheduler's single-threaded event loop:
1005+
eagerlyComputePartitionsForRddAndAncestors(rdd)
1006+
9961007
// We create a JobWaiter with only one "task", which will be marked as complete when the whole
9971008
// map stage has completed, and will be passed the MapOutputStatistics for that stage.
9981009
// This makes it easier to avoid race conditions between the user code and the map output

0 commit comments

Comments
 (0)