Skip to content

Commit 00c1248

Browse files
author
Marcelo Vanzin
committed
[SPARK-20191][YARN] Crate wrapper for RackResolver so tests can override it.
Current test code tries to override the RackResolver used by setting configuration params, but because YARN libs statically initialize the resolver the first time it's used, that means that those configs don't really take effect during Spark tests. This change adds a wrapper class that easily allows tests to override the behavior of the resolver for the Spark code that uses it. Author: Marcelo Vanzin <[email protected]> Closes #17508 from vanzin/SPARK-20191. (cherry picked from commit 0736980) Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent f9546da commit 00c1248

File tree

6 files changed

+56
-35
lines changed

6 files changed

+56
-35
lines changed

yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
2323
import org.apache.hadoop.conf.Configuration
2424
import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
2525
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
26-
import org.apache.hadoop.yarn.util.RackResolver
2726

2827
import org.apache.spark.SparkConf
2928
import org.apache.spark.internal.config._
@@ -83,7 +82,8 @@ private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], rack
8382
private[yarn] class LocalityPreferredContainerPlacementStrategy(
8483
val sparkConf: SparkConf,
8584
val yarnConf: Configuration,
86-
val resource: Resource) {
85+
val resource: Resource,
86+
resolver: SparkRackResolver) {
8787

8888
/**
8989
* Calculate each container's node locality and rack locality
@@ -139,7 +139,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
139139
// still be allocated with new container request.
140140
val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray
141141
val racks = hosts.map { h =>
142-
RackResolver.resolve(yarnConf, h).getNetworkLocation
142+
resolver.resolve(yarnConf, h)
143143
}.toSet
144144
containerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray)
145145

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.yarn
19+
20+
import org.apache.hadoop.conf.Configuration
21+
import org.apache.hadoop.yarn.util.RackResolver
22+
import org.apache.log4j.{Level, Logger}
23+
24+
/**
25+
* Wrapper around YARN's [[RackResolver]]. This allows Spark tests to easily override the
26+
* default behavior, since YARN's class self-initializes the first time it's called, and
27+
* future calls all use the initial configuration.
28+
*/
29+
private[yarn] class SparkRackResolver {
30+
31+
// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
32+
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
33+
Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
34+
}
35+
36+
def resolve(conf: Configuration, hostName: String): String = {
37+
RackResolver.resolve(conf, hostName).getNetworkLocation()
38+
}
39+
40+
}

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records._
3030
import org.apache.hadoop.yarn.client.api.AMRMClient
3131
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
3232
import org.apache.hadoop.yarn.conf.YarnConfiguration
33-
import org.apache.hadoop.yarn.util.RackResolver
3433
import org.apache.log4j.{Level, Logger}
3534

3635
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
@@ -65,16 +64,12 @@ private[yarn] class YarnAllocator(
6564
amClient: AMRMClient[ContainerRequest],
6665
appAttemptId: ApplicationAttemptId,
6766
securityMgr: SecurityManager,
68-
localResources: Map[String, LocalResource])
67+
localResources: Map[String, LocalResource],
68+
resolver: SparkRackResolver)
6969
extends Logging {
7070

7171
import YarnAllocator._
7272

73-
// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
74-
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
75-
Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
76-
}
77-
7873
// Visible for testing.
7974
val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]
8075
val allocatedContainerToHostMap = new HashMap[ContainerId, String]
@@ -171,7 +166,7 @@ private[yarn] class YarnAllocator(
171166

172167
// A container placement strategy based on pending tasks' locality preference
173168
private[yarn] val containerPlacementStrategy =
174-
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
169+
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)
175170

176171
/**
177172
* Use a different clock for YarnAllocator. This is mainly used for testing.
@@ -422,7 +417,7 @@ private[yarn] class YarnAllocator(
422417
// Match remaining by rack
423418
val remainingAfterRackMatches = new ArrayBuffer[Container]
424419
for (allocatedContainer <- remainingAfterHostMatches) {
425-
val rack = RackResolver.resolve(conf, allocatedContainer.getNodeId.getHost).getNetworkLocation
420+
val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
426421
matchContainerToRequest(allocatedContainer, rack, containersToUse,
427422
remainingAfterRackMatches)
428423
}

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private[spark] class YarnRMClient extends Logging {
7575
registered = true
7676
}
7777
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
78-
localResources)
78+
localResources, new SparkRackResolver())
7979
}
8080

8181
/**

yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20+
import scala.collection.JavaConverters._
2021
import scala.collection.mutable.{HashMap, HashSet, Set}
2122

22-
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
23-
import org.apache.hadoop.net.DNSToSwitchMapping
2423
import org.apache.hadoop.yarn.api.records._
2524
import org.apache.hadoop.yarn.conf.YarnConfiguration
2625
import org.mockito.Mockito._
@@ -51,9 +50,6 @@ class LocalityPlacementStrategySuite extends SparkFunSuite {
5150

5251
private def runTest(): Unit = {
5352
val yarnConf = new YarnConfiguration()
54-
yarnConf.setClass(
55-
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
56-
classOf[MockResolver], classOf[DNSToSwitchMapping])
5753

5854
// The numbers below have been chosen to balance being large enough to replicate the
5955
// original issue while not taking too long to run when the issue is fixed. The main
@@ -62,7 +58,7 @@ class LocalityPlacementStrategySuite extends SparkFunSuite {
6258

6359
val resource = Resource.newInstance(8 * 1024, 4)
6460
val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(),
65-
yarnConf, resource)
61+
yarnConf, resource, new MockResolver())
6662

6763
val totalTasks = 32 * 1024
6864
val totalContainers = totalTasks / 16

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20-
import java.util.{Arrays, List => JList}
21-
22-
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
23-
import org.apache.hadoop.net.DNSToSwitchMapping
20+
import org.apache.hadoop.conf.Configuration
2421
import org.apache.hadoop.yarn.api.records._
2522
import org.apache.hadoop.yarn.client.api.AMRMClient
2623
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
@@ -36,24 +33,16 @@ import org.apache.spark.rpc.RpcEndpointRef
3633
import org.apache.spark.scheduler.SplitInfo
3734
import org.apache.spark.util.ManualClock
3835

39-
class MockResolver extends DNSToSwitchMapping {
36+
class MockResolver extends SparkRackResolver {
4037

41-
override def resolve(names: JList[String]): JList[String] = {
42-
if (names.size > 0 && names.get(0) == "host3") Arrays.asList("/rack2")
43-
else Arrays.asList("/rack1")
38+
override def resolve(conf: Configuration, hostName: String): String = {
39+
if (hostName == "host3") "/rack2" else "/rack1"
4440
}
4541

46-
override def reloadCachedMappings() {}
47-
48-
def reloadCachedMappings(names: JList[String]) {}
4942
}
5043

5144
class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
5245
val conf = new YarnConfiguration()
53-
conf.setClass(
54-
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
55-
classOf[MockResolver], classOf[DNSToSwitchMapping])
56-
5746
val sparkConf = new SparkConf()
5847
sparkConf.set("spark.driver.host", "localhost")
5948
sparkConf.set("spark.driver.port", "4040")
@@ -107,7 +96,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
10796
rmClient,
10897
appAttemptId,
10998
new SecurityManager(sparkConf),
110-
Map())
99+
Map(),
100+
new MockResolver())
111101
}
112102

113103
def createContainer(host: String): Container = {

0 commit comments

Comments
 (0)