diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 1cfe98673773..0f565f23a118 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -483,6 +483,20 @@ class DAGScheduler( assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) + + // The reason for performing this call here is that computing the partitions + // may be very expensive for certain types of RDDs (e.g. HadoopRDDs), so therefore + // we'd like that computation to take place outside of the DAGScheduler to avoid + // blocking its event processing loop. See SPARK-4961 for details. + try { + getParentStages(rdd, jobId).foreach(_.rdd.partitions) + } catch { + case e: Exception => + logWarning("Get the partitions of parent stages' rdds failed due to exception - job: " + + jobId, e) + waiter.jobFailed(e) + return waiter + } eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)) waiter