Skip to content

Commit cf01607

Browse files
committed
[SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps
The shuffle locality patch made the DAGScheduler aware of shuffle data, but for RDDs that have both narrow and shuffle dependencies, it can cause them to place tasks based on the shuffle dependency instead of the narrow one. This case is common in iterative join-based algorithms like PageRank and ALS, where one RDD is hash-partitioned and one isn't. Author: Matei Zaharia <[email protected]> Closes #8220 from mateiz/shuffle-loc-fix.
1 parent 5f9ce73 commit cf01607

File tree

2 files changed

+44
-19
lines changed

2 files changed

+44
-19
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,33 +1383,36 @@ class DAGScheduler(
13831383
return rddPrefs.map(TaskLocation(_))
13841384
}
13851385

1386+
// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
1387+
// that has any placement preferences. Ideally we would choose based on transfer sizes,
1388+
// but this will do for now.
13861389
rdd.dependencies.foreach {
13871390
case n: NarrowDependency[_] =>
1388-
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
1389-
// that has any placement preferences. Ideally we would choose based on transfer sizes,
1390-
// but this will do for now.
13911391
for (inPart <- n.getParents(partition)) {
13921392
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
13931393
if (locs != Nil) {
13941394
return locs
13951395
}
13961396
}
1397-
case s: ShuffleDependency[_, _, _] =>
1398-
// For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION
1399-
// of data as preferred locations
1400-
if (shuffleLocalityEnabled &&
1401-
rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
1402-
s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
1403-
// Get the preferred map output locations for this reducer
1404-
val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
1405-
partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
1406-
if (topLocsForReducer.nonEmpty) {
1407-
return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
1408-
}
1409-
}
1410-
14111397
case _ =>
14121398
}
1399+
1400+
// If the RDD has shuffle dependencies and shuffle locality is enabled, pick locations that
1401+
// have at least REDUCER_PREF_LOCS_FRACTION of data as preferred locations
1402+
if (shuffleLocalityEnabled && rdd.partitions.length < SHUFFLE_PREF_REDUCE_THRESHOLD) {
1403+
rdd.dependencies.foreach {
1404+
case s: ShuffleDependency[_, _, _] =>
1405+
if (s.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD) {
1406+
// Get the preferred map output locations for this reducer
1407+
val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
1408+
partition, rdd.partitions.length, REDUCER_PREF_LOCS_FRACTION)
1409+
if (topLocsForReducer.nonEmpty) {
1410+
return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
1411+
}
1412+
}
1413+
case _ =>
1414+
}
1415+
}
14131416
Nil
14141417
}
14151418

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,7 @@ class DAGSchedulerSuite
926926
assertLocations(reduceTaskSet, Seq(Seq("hostA")))
927927
complete(reduceTaskSet, Seq((Success, 42)))
928928
assert(results === Map(0 -> 42))
929-
assertDataStructuresEmpty
929+
assertDataStructuresEmpty()
930930
}
931931

932932
test("reduce task locality preferences should only include machines with largest map outputs") {
@@ -950,7 +950,29 @@ class DAGSchedulerSuite
950950
assertLocations(reduceTaskSet, Seq(hosts))
951951
complete(reduceTaskSet, Seq((Success, 42)))
952952
assert(results === Map(0 -> 42))
953-
assertDataStructuresEmpty
953+
assertDataStructuresEmpty()
954+
}
955+
956+
test("stages with both narrow and shuffle dependencies use narrow ones for locality") {
957+
// Create an RDD that has both a shuffle dependency and a narrow dependency (e.g. for a join)
958+
val rdd1 = new MyRDD(sc, 1, Nil)
959+
val rdd2 = new MyRDD(sc, 1, Nil, locations = Seq(Seq("hostB")))
960+
val shuffleDep = new ShuffleDependency(rdd1, null)
961+
val narrowDep = new OneToOneDependency(rdd2)
962+
val shuffleId = shuffleDep.shuffleId
963+
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep, narrowDep))
964+
submit(reduceRdd, Array(0))
965+
complete(taskSets(0), Seq(
966+
(Success, makeMapStatus("hostA", 1))))
967+
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
968+
HashSet(makeBlockManagerId("hostA")))
969+
970+
// Reducer should run where RDD 2 has preferences, even though though it also has a shuffle dep
971+
val reduceTaskSet = taskSets(1)
972+
assertLocations(reduceTaskSet, Seq(Seq("hostB")))
973+
complete(reduceTaskSet, Seq((Success, 42)))
974+
assert(results === Map(0 -> 42))
975+
assertDataStructuresEmpty()
954976
}
955977

956978
test("Spark exceptions should include call site in stack trace") {

0 commit comments

Comments
 (0)