Skip to content
Closed
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
cdef539
Merge pull request #1 from apache/master
YanTangZhai Aug 6, 2014
cbcba66
Merge pull request #3 from apache/master
YanTangZhai Aug 20, 2014
8a00106
Merge pull request #6 from apache/master
YanTangZhai Sep 12, 2014
03b62b0
Merge pull request #7 from apache/master
YanTangZhai Sep 16, 2014
76d4027
Merge pull request #8 from apache/master
YanTangZhai Oct 20, 2014
d26d982
Merge pull request #9 from apache/master
YanTangZhai Nov 4, 2014
e249846
Merge pull request #10 from apache/master
YanTangZhai Nov 11, 2014
6e643f8
Merge pull request #11 from apache/master
YanTangZhai Dec 1, 2014
92242c7
Update HiveQl.scala
YanTangZhai Dec 2, 2014
74175b4
Update HiveQuerySuite.scala
YanTangZhai Dec 3, 2014
950b21e
Update HiveQuerySuite.scala
YanTangZhai Dec 3, 2014
718afeb
Merge pull request #12 from apache/master
YanTangZhai Dec 5, 2014
59e4de9
make hive test
marmbrus Dec 17, 2014
1893956
Merge pull request #14 from marmbrus/pr/3555
YanTangZhai Dec 18, 2014
bd2c444
Update HiveQuerySuite.scala
YanTangZhai Dec 22, 2014
efc4210
Update HiveQuerySuite.scala
YanTangZhai Dec 22, 2014
1e1ebb4
Update HiveQuerySuite.scala
YanTangZhai Dec 22, 2014
e4c2c0a
Merge pull request #15 from apache/master
YanTangZhai Dec 24, 2014
5601a8b
[SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAG…
YanTangZhai Dec 25, 2014
af5abda
[SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAG…
YanTangZhai Dec 25, 2014
6e95955
[SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAG…
YanTangZhai Dec 30, 2014
d4bca32
Merge pull request #19 from apache/master
YanTangZhai Dec 31, 2014
5e3ef70
Merge branch 'SPARK-4692'
YanTangZhai Dec 31, 2014
fd87518
[SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAG…
YanTangZhai Dec 31, 2014
74c1dec
update HadoopRDD.scala
YanTangZhai Dec 31, 2014
5041b35
Merge pull request #24 from apache/master
YanTangZhai Jan 12, 2015
09afdff
[SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAG…
YanTangZhai Jan 14, 2015
b535a53
[SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAG…
YanTangZhai Jan 19, 2015
e2880f9
Merge pull request #27 from apache/master
YanTangZhai Jan 19, 2015
aed530b
[SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAG…
YanTangZhai Jan 20, 2015
5b27571
Merge pull request #28 from apache/master
YanTangZhai Jan 24, 2015
267e375
Merge branch 'master' into SPARK-4961
YanTangZhai Jan 24, 2015
e2c2494
Merge branch 'master' of https://github.com/YanTangZhai/spark
YanTangZhai Jan 24, 2015
572079b
update
YanTangZhai Jan 24, 2015
ae7c139
update
YanTangZhai Jan 24, 2015
d5c0e84
update
YanTangZhai Jan 24, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,20 @@ class DAGScheduler(
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

// The reason for performing this call here is that computing the partitions
// may be very expensive for certain types of RDDs (e.g. HadoopRDDs), so therefore
// we'd like that computation to take place outside of the DAGScheduler to avoid
// blocking its event processing loop. See SPARK-4961 for details.
try {
getParentStages(rdd, jobId).foreach(_.rdd.partitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that this could be a thread-safety issue: getParentStages could call getShuffleMapStage, which mutates a non-thread-safe shuffleToMapStage map. Even if that map was synchronized, we could still have race-conditions between calls from the event processing loop and external calls.

Do you think we could just call rdd.partitions on the final RDD (e.g. the rdd local variable here) instead of calling getParentStages?

} catch {
case e: Exception =>
logWarning("Get the partitions of parent stages' rdds failed due to exception - job: "
+ jobId, e)
waiter.jobFailed(e)
return waiter
}
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
waiter
Expand Down