Skip to content

Commit 143ec54

Browse files
Marcelo Vanzincmonkey
authored andcommitted
[SPARK-18750][YARN] Avoid using "mapValues" when allocating containers.
That method is prone to stack overflows when the input map is really large; instead, use plain "map". Also includes a unit test that was tested and caused stack overflows without the fix. Author: Marcelo Vanzin <[email protected]> Closes apache#16667 from vanzin/SPARK-18750.
1 parent 1a67bfc commit 143ec54

File tree

2 files changed

+93
-5
lines changed

2 files changed

+93
-5
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,9 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
129129
val largestRatio = updatedHostToContainerCount.values.max
130130
// Round the ratio of preferred locality to the number of locality required container
131131
// number, which is used for locality preferred host calculating.
132-
var preferredLocalityRatio = updatedHostToContainerCount.mapValues { ratio =>
132+
var preferredLocalityRatio = updatedHostToContainerCount.map { case(k, ratio) =>
133133
val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio
134-
adjustedRatio.ceil.toInt
134+
(k, adjustedRatio.ceil.toInt)
135135
}
136136

137137
for (i <- 0 until requiredLocalityAwareContainerNum) {
@@ -145,7 +145,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
145145

146146
// Minus 1 each time when the host is used. When the current ratio is 0,
147147
// which means all the required ratio is satisfied, this host will not be allocated again.
148-
preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1)
148+
preferredLocalityRatio = preferredLocalityRatio.map { case (k, v) => (k, v - 1) }
149149
}
150150
}
151151

@@ -218,7 +218,8 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
218218

219219
val possibleTotalContainerNum = pendingHostToContainerCount.values.sum
220220
val localityMatchedPendingNum = localityMatchedPendingAllocations.size.toDouble
221-
pendingHostToContainerCount.mapValues(_ * localityMatchedPendingNum / possibleTotalContainerNum)
222-
.toMap
221+
pendingHostToContainerCount.map { case (k, v) =>
222+
(k, v * localityMatchedPendingNum / possibleTotalContainerNum)
223+
}.toMap
223224
}
224225
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.yarn
19+
20+
import scala.collection.mutable.{HashMap, HashSet, Set}
21+
22+
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
23+
import org.apache.hadoop.net.DNSToSwitchMapping
24+
import org.apache.hadoop.yarn.api.records._
25+
import org.apache.hadoop.yarn.conf.YarnConfiguration
26+
import org.mockito.Mockito._
27+
28+
import org.apache.spark.{SparkConf, SparkFunSuite}
29+
30+
class LocalityPlacementStrategySuite extends SparkFunSuite {
31+
32+
test("handle large number of containers and tasks (SPARK-18750)") {
33+
// Run the test in a thread with a small stack size, since the original issue
34+
// surfaced as a StackOverflowError.
35+
var error: Throwable = null
36+
37+
val runnable = new Runnable() {
38+
override def run(): Unit = try {
39+
runTest()
40+
} catch {
41+
case e: Throwable => error = e
42+
}
43+
}
44+
45+
val thread = new Thread(new ThreadGroup("test"), runnable, "test-thread", 32 * 1024)
46+
thread.start()
47+
thread.join()
48+
49+
assert(error === null)
50+
}
51+
52+
private def runTest(): Unit = {
53+
val yarnConf = new YarnConfiguration()
54+
yarnConf.setClass(
55+
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
56+
classOf[MockResolver], classOf[DNSToSwitchMapping])
57+
58+
// The numbers below have been chosen to balance being large enough to replicate the
59+
// original issue while not taking too long to run when the issue is fixed. The main
60+
// goal is to create enough requests for localized containers (so there should be many
61+
// tasks on several hosts that have no allocated containers).
62+
63+
val resource = Resource.newInstance(8 * 1024, 4)
64+
val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(),
65+
yarnConf, resource)
66+
67+
val totalTasks = 32 * 1024
68+
val totalContainers = totalTasks / 16
69+
val totalHosts = totalContainers / 16
70+
71+
val mockId = mock(classOf[ContainerId])
72+
val hosts = (1 to totalHosts).map { i => (s"host_$i", totalTasks % i) }.toMap
73+
val containers = (1 to totalContainers).map { i => mockId }
74+
val count = containers.size / hosts.size / 2
75+
76+
val hostToContainerMap = new HashMap[String, Set[ContainerId]]()
77+
hosts.keys.take(hosts.size / 2).zipWithIndex.foreach { case (host, i) =>
78+
val hostContainers = new HashSet[ContainerId]()
79+
containers.drop(count * i).take(i).foreach { c => hostContainers += c }
80+
hostToContainerMap(host) = hostContainers
81+
}
82+
83+
strategy.localityOfRequestedContainers(containers.size * 2, totalTasks, hosts,
84+
hostToContainerMap, Nil)
85+
}
86+
87+
}

0 commit comments

Comments
 (0)