Skip to content

Commit 2eeada3

Browse files
sryzatgravescs
authored andcommitted
SPARK-1714. Take advantage of AMRMClient APIs to simplify logic in YarnA...
...llocator The goal of this PR is to simplify YarnAllocator as much as possible and get it up to the level of code quality we see in the rest of Spark. In service of this, it does a few things: * Uses AMRMClient APIs for matching containers to requests. * Adds calls to AMRMClient.removeContainerRequest so that, when we use a container, we don't end up requesting it again. * Removes YarnAllocator's host->rack cache. YARN's RackResolver already does this caching, so this is redundant. * Adds tests for basic YarnAllocator functionality. * Breaks up the allocateResources method, which was previously nearly 300 lines. * A little bit of stylistic cleanup. * Fixes a bug that causes three times the requests to be filed when preferred host locations are given. The patch is lossy. In particular, it loses the logic for trying to avoid containers bunching up on nodes. As I understand it, the logic that's gone is: * If, in a single response from the RM, we receive a set of containers on a node, and prefer some number of containers on that node greater than 0 but less than the number we received, give back the delta between what we preferred and what we received. This seems like a weird way to avoid bunching E.g. it does nothing to avoid bunching when we don't request containers on particular nodes. Author: Sandy Ryza <[email protected]> Closes apache#3765 from sryza/sandy-spark-1714 and squashes the following commits: 32a5942 [Sandy Ryza] Muffle RackResolver logs 74f56dd [Sandy Ryza] Fix a couple comments and simplify requestTotalExecutors 60ea4bd [Sandy Ryza] Fix scalastyle ca35b53 [Sandy Ryza] Simplify further e9cf8a6 [Sandy Ryza] Fix YarnClusterSuite 257acf3 [Sandy Ryza] Remove locality stuff and more cleanup 59a3c5e [Sandy Ryza] Take out rack stuff 5f72fd5 [Sandy Ryza] Further documentation and cleanup 89edd68 [Sandy Ryza] SPARK-1714. Take advantage of AMRMClient APIs to simplify logic in YarnAllocator
1 parent 8c06a5f commit 2eeada3

File tree

7 files changed

+389
-550
lines changed

7 files changed

+389
-550
lines changed

core/src/main/resources/org/apache/spark/log4j-defaults.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ log4j.logger.org.eclipse.jetty=WARN
1010
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
1111
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
1212
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
13+
log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 232 additions & 501 deletions
Large diffs are not rendered by default.

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
7272
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
7373
registered = true
7474
}
75-
new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args,
76-
preferredNodeLocations, securityMgr)
75+
new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
7776
}
7877

7978
/**

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.JobConf
3131
import org.apache.hadoop.security.Credentials
3232
import org.apache.hadoop.security.UserGroupInformation
3333
import org.apache.hadoop.yarn.conf.YarnConfiguration
34-
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
34+
import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
3535
import org.apache.hadoop.yarn.util.RackResolver
3636
import org.apache.hadoop.conf.Configuration
3737

@@ -99,13 +99,7 @@ object YarnSparkHadoopUtil {
9999

100100
// All RM requests are issued with same priority : we do not (yet) have any distinction between
101101
// request types (like map/reduce in hadoop for example)
102-
val RM_REQUEST_PRIORITY = 1
103-
104-
// Host to rack map - saved from allocation requests. We are expecting this not to change.
105-
// Note that it is possible for this to change : and ResourceManager will indicate that to us via
106-
// update response to allocate. But we are punting on handling that for now.
107-
private val hostToRack = new ConcurrentHashMap[String, String]()
108-
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
102+
val RM_REQUEST_PRIORITY = Priority.newInstance(1)
109103

110104
/**
111105
* Add a path variable to the given environment map.
@@ -184,37 +178,6 @@ object YarnSparkHadoopUtil {
184178
}
185179
}
186180

187-
def lookupRack(conf: Configuration, host: String): String = {
188-
if (!hostToRack.contains(host)) {
189-
populateRackInfo(conf, host)
190-
}
191-
hostToRack.get(host)
192-
}
193-
194-
def populateRackInfo(conf: Configuration, hostname: String) {
195-
Utils.checkHost(hostname)
196-
197-
if (!hostToRack.containsKey(hostname)) {
198-
// If there are repeated failures to resolve, all to an ignore list.
199-
val rackInfo = RackResolver.resolve(conf, hostname)
200-
if (rackInfo != null && rackInfo.getNetworkLocation != null) {
201-
val rack = rackInfo.getNetworkLocation
202-
hostToRack.put(hostname, rack)
203-
if (! rackToHostSet.containsKey(rack)) {
204-
rackToHostSet.putIfAbsent(rack,
205-
Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
206-
}
207-
rackToHostSet.get(rack).add(hostname)
208-
209-
// TODO(harvey): Figure out what this comment means...
210-
// Since RackResolver caches, we are disabling this for now ...
211-
} /* else {
212-
// right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
213-
hostToRack.put(hostname, null)
214-
} */
215-
}
216-
}
217-
218181
def getApplicationAclsForYarn(securityMgr: SecurityManager)
219182
: Map[ApplicationAccessType, String] = {
220183
Map[ApplicationAccessType, String] (

yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20+
import org.apache.hadoop.yarn.util.RackResolver
21+
2022
import org.apache.spark._
21-
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
2223
import org.apache.spark.scheduler.TaskSchedulerImpl
2324
import org.apache.spark.util.Utils
2425

@@ -30,6 +31,6 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSc
3031
// By default, rack is unknown
3132
override def getRackForHost(hostPort: String): Option[String] = {
3233
val host = Utils.parseHostPort(hostPort)._1
33-
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
34+
Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
3435
}
3536
}

yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20+
import org.apache.hadoop.yarn.util.RackResolver
21+
2022
import org.apache.spark._
21-
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
23+
import org.apache.spark.deploy.yarn.ApplicationMaster
2224
import org.apache.spark.scheduler.TaskSchedulerImpl
2325
import org.apache.spark.util.Utils
2426

@@ -39,7 +41,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedule
3941
// By default, rack is unknown
4042
override def getRackForHost(hostPort: String): Option[String] = {
4143
val host = Utils.parseHostPort(hostPort)._1
42-
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
44+
Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
4345
}
4446

4547
override def postStartHook() {

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala

Lines changed: 146 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,160 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20+
import java.util.{Arrays, List => JList}
21+
22+
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
24+
import org.apache.hadoop.net.DNSToSwitchMapping
25+
import org.apache.hadoop.yarn.api.records._
26+
import org.apache.hadoop.yarn.client.api.AMRMClient
27+
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
28+
29+
import org.apache.spark.SecurityManager
30+
import org.apache.spark.SparkConf
31+
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
2032
import org.apache.spark.deploy.yarn.YarnAllocator._
21-
import org.scalatest.FunSuite
33+
import org.apache.spark.scheduler.SplitInfo
34+
35+
import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}
36+
37+
class MockResolver extends DNSToSwitchMapping {
38+
39+
override def resolve(names: JList[String]): JList[String] = {
40+
if (names.size > 0 && names.get(0) == "host3") Arrays.asList("/rack2")
41+
else Arrays.asList("/rack1")
42+
}
43+
44+
override def reloadCachedMappings() {}
45+
46+
def reloadCachedMappings(names: JList[String]) {}
47+
}
48+
49+
class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach {
50+
val conf = new Configuration()
51+
conf.setClass(
52+
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
53+
classOf[MockResolver], classOf[DNSToSwitchMapping])
54+
55+
val sparkConf = new SparkConf()
56+
sparkConf.set("spark.driver.host", "localhost")
57+
sparkConf.set("spark.driver.port", "4040")
58+
sparkConf.set("spark.yarn.jar", "notarealjar.jar")
59+
sparkConf.set("spark.yarn.launchContainers", "false")
60+
61+
val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0)
62+
63+
// Resource returned by YARN. YARN can give larger containers than requested, so give 6 cores
64+
// instead of the 5 requested and 3 GB instead of the 2 requested.
65+
val containerResource = Resource.newInstance(3072, 6)
66+
67+
var rmClient: AMRMClient[ContainerRequest] = _
68+
69+
var containerNum = 0
70+
71+
override def beforeEach() {
72+
rmClient = AMRMClient.createAMRMClient()
73+
rmClient.init(conf)
74+
rmClient.start()
75+
}
76+
77+
override def afterEach() {
78+
rmClient.stop()
79+
}
80+
81+
class MockSplitInfo(host: String) extends SplitInfo(null, host, null, 1, null) {
82+
override def equals(other: Any) = false
83+
}
84+
85+
def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
86+
val args = Array(
87+
"--num-executors", s"$maxExecutors",
88+
"--executor-cores", "5",
89+
"--executor-memory", "2048",
90+
"--jar", "somejar.jar",
91+
"--class", "SomeClass")
92+
new YarnAllocator(
93+
conf,
94+
sparkConf,
95+
rmClient,
96+
appAttemptId,
97+
new ApplicationMasterArguments(args),
98+
new SecurityManager(sparkConf))
99+
}
100+
101+
def createContainer(host: String): Container = {
102+
val containerId = ContainerId.newInstance(appAttemptId, containerNum)
103+
containerNum += 1
104+
val nodeId = NodeId.newInstance(host, 1000)
105+
Container.newInstance(containerId, nodeId, "", containerResource, RM_REQUEST_PRIORITY, null)
106+
}
107+
108+
test("single container allocated") {
109+
// request a single container and receive it
110+
val handler = createAllocator()
111+
handler.addResourceRequests(1)
112+
handler.getNumExecutorsRunning should be (0)
113+
handler.getNumPendingAllocate should be (1)
114+
115+
val container = createContainer("host1")
116+
handler.handleAllocatedContainers(Array(container))
117+
118+
handler.getNumExecutorsRunning should be (1)
119+
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
120+
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
121+
rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size should be (0)
122+
}
123+
124+
test("some containers allocated") {
125+
// request a few containers and receive some of them
126+
val handler = createAllocator()
127+
handler.addResourceRequests(4)
128+
handler.getNumExecutorsRunning should be (0)
129+
handler.getNumPendingAllocate should be (4)
130+
131+
val container1 = createContainer("host1")
132+
val container2 = createContainer("host1")
133+
val container3 = createContainer("host2")
134+
handler.handleAllocatedContainers(Array(container1, container2, container3))
135+
136+
handler.getNumExecutorsRunning should be (3)
137+
handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
138+
handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host1")
139+
handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host2")
140+
handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
141+
handler.allocatedHostToContainersMap.get("host1").get should contain (container2.getId)
142+
handler.allocatedHostToContainersMap.get("host2").get should contain (container3.getId)
143+
}
144+
145+
test("receive more containers than requested") {
146+
val handler = createAllocator(2)
147+
handler.addResourceRequests(2)
148+
handler.getNumExecutorsRunning should be (0)
149+
handler.getNumPendingAllocate should be (2)
150+
151+
val container1 = createContainer("host1")
152+
val container2 = createContainer("host2")
153+
val container3 = createContainer("host4")
154+
handler.handleAllocatedContainers(Array(container1, container2, container3))
155+
156+
handler.getNumExecutorsRunning should be (2)
157+
handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
158+
handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2")
159+
handler.allocatedContainerToHostMap.contains(container3.getId) should be (false)
160+
handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
161+
handler.allocatedHostToContainersMap.get("host2").get should contain (container2.getId)
162+
handler.allocatedHostToContainersMap.contains("host4") should be (false)
163+
}
22164

23-
class YarnAllocatorSuite extends FunSuite {
24165
test("memory exceeded diagnostic regexes") {
25166
val diagnostics =
26167
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
27-
"beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
28-
"5.8 GB of 4.2 GB virtual memory used. Killing container."
168+
"beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
169+
"5.8 GB of 4.2 GB virtual memory used. Killing container."
29170
val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
30171
val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
31172
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
32173
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
33174
}
175+
34176
}

0 commit comments

Comments
 (0)