Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 2e49e48

Browse files
committed
Use traits for InetAddress and RackResolver util classes
1 parent 6872b11 commit 2e49e48

File tree

6 files changed

+158
-80
lines changed

6 files changed

+158
-80
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.kubernetes
18+
19+
import java.net.InetAddress
20+
21+
/**
22+
* Gets full host names of given IP addresses from DNS.
23+
*/
24+
private[kubernetes] trait InetAddressUtil {
25+
26+
def getFullHostName(ipAddress: String): String
27+
}
28+
29+
private[kubernetes] object InetAddressUtilImpl extends InetAddressUtil {
30+
31+
// NOTE: This does issue a network call to DNS. Caching is done internally by the InetAddress
32+
// class for both hits and misses.
33+
override def getFullHostName(ipAddress: String): String = {
34+
InetAddress.getByName(ipAddress).getCanonicalHostName
35+
}
36+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,18 @@
1616
*/
1717
package org.apache.spark.scheduler.cluster.kubernetes
1818

19-
import org.apache.hadoop.conf.Configuration
20-
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
21-
import org.apache.hadoop.net.{NetworkTopology, ScriptBasedMapping, TableMapping}
22-
import org.apache.hadoop.yarn.util.RackResolver
23-
import org.apache.log4j.{Level, Logger}
24-
2519
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, TaskSet, TaskSetManager}
2620
import org.apache.spark.util.Utils
2721
import org.apache.spark.SparkContext
2822

2923
private[spark] class KubernetesTaskSchedulerImpl(
3024
sc: SparkContext,
3125
rackResolverUtil: RackResolverUtil,
32-
inetAddressUtil: InetAddressUtil = new InetAddressUtil) extends TaskSchedulerImpl(sc) {
26+
inetAddressUtil: InetAddressUtil = InetAddressUtilImpl) extends TaskSchedulerImpl(sc) {
3327

3428
var kubernetesSchedulerBackend: KubernetesClusterSchedulerBackend = null
3529

36-
def this(sc: SparkContext) = this(sc, new RackResolverUtil(sc.hadoopConfiguration))
30+
def this(sc: SparkContext) = this(sc, new RackResolverUtilImpl(sc.hadoopConfiguration))
3731

3832
override def initialize(backend: SchedulerBackend): Unit = {
3933
super.initialize(backend)
@@ -73,36 +67,3 @@ private[spark] class KubernetesTaskSchedulerImpl(
7367
).getOrElse(rackResolverUtil.resolveRack(hadoopConfiguration, host))
7468
}
7569
}
76-
77-
private[kubernetes] class RackResolverUtil(hadoopConfiguration: Configuration) {
78-
79-
val scriptPlugin : String = classOf[ScriptBasedMapping].getCanonicalName
80-
val tablePlugin : String = classOf[TableMapping].getCanonicalName
81-
val isConfigured : Boolean = checkConfigured(hadoopConfiguration)
82-
83-
// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
84-
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
85-
Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
86-
}
87-
88-
def checkConfigured(hadoopConfiguration: Configuration): Boolean = {
89-
val plugin = hadoopConfiguration.get(
90-
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, scriptPlugin)
91-
val scriptName = hadoopConfiguration.get(
92-
CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "")
93-
val tableName = hadoopConfiguration.get(
94-
CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, "")
95-
plugin == scriptPlugin && scriptName.nonEmpty ||
96-
plugin == tablePlugin && tableName.nonEmpty ||
97-
plugin != scriptPlugin && plugin != tablePlugin
98-
}
99-
100-
def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String] = {
101-
val rack = Option(RackResolver.resolve(hadoopConfiguration, host).getNetworkLocation)
102-
if (rack.nonEmpty && rack.get != NetworkTopology.DEFAULT_RACK) {
103-
rack
104-
} else {
105-
None
106-
}
107-
}
108-
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.spark.scheduler.cluster.kubernetes
1818

19-
import java.net.InetAddress
20-
2119
import scala.collection.mutable.ArrayBuffer
2220

2321
import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager}
@@ -26,7 +24,7 @@ private[spark] class KubernetesTaskSetManager(
2624
sched: TaskSchedulerImpl,
2725
taskSet: TaskSet,
2826
maxTaskFailures: Int,
29-
inetAddressUtil: InetAddressUtil = new InetAddressUtil)
27+
inetAddressUtil: InetAddressUtil = InetAddressUtilImpl)
3028
extends TaskSetManager(sched, taskSet, maxTaskFailures) {
3129

3230
/**
@@ -74,12 +72,3 @@ private[spark] class KubernetesTaskSetManager(
7472
}
7573
}
7674

77-
// To support mocks in unit tests.
78-
private[kubernetes] class InetAddressUtil {
79-
80-
// NOTE: This does issue a network call to DNS. Caching is done internally by the InetAddress
81-
// class for both hits and misses.
82-
def getFullHostName(ipAddress: String): String = {
83-
InetAddress.getByName(ipAddress).getCanonicalHostName
84-
}
85-
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.kubernetes
18+
19+
import org.apache.hadoop.conf.Configuration
20+
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
21+
import org.apache.hadoop.net.{NetworkTopology, ScriptBasedMapping, TableMapping}
22+
import org.apache.hadoop.yarn.util.RackResolver
23+
import org.apache.log4j.{Level, Logger}
24+
25+
/**
26+
* Finds rack names that cluster nodes belong to in order to support HDFS rack locality.
27+
*/
28+
private[kubernetes] trait RackResolverUtil {
29+
30+
def isConfigured() : Boolean
31+
32+
def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String]
33+
}
34+
35+
private[kubernetes] class RackResolverUtilImpl(hadoopConfiguration: Configuration)
36+
extends RackResolverUtil {
37+
38+
val scriptPlugin : String = classOf[ScriptBasedMapping].getCanonicalName
39+
val tablePlugin : String = classOf[TableMapping].getCanonicalName
40+
val isResolverConfigured : Boolean = checkConfigured(hadoopConfiguration)
41+
42+
// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
43+
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
44+
Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
45+
}
46+
47+
override def isConfigured() : Boolean = isResolverConfigured
48+
49+
def checkConfigured(hadoopConfiguration: Configuration): Boolean = {
50+
val plugin = hadoopConfiguration.get(
51+
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, scriptPlugin)
52+
val scriptName = hadoopConfiguration.get(
53+
CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "")
54+
val tableName = hadoopConfiguration.get(
55+
CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, "")
56+
plugin == scriptPlugin && scriptName.nonEmpty ||
57+
plugin == tablePlugin && tableName.nonEmpty ||
58+
plugin != scriptPlugin && plugin != tablePlugin
59+
}
60+
61+
override def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String] = {
62+
val rack = Option(RackResolver.resolve(hadoopConfiguration, host).getNetworkLocation)
63+
if (rack.nonEmpty && rack.get != NetworkTopology.DEFAULT_RACK) {
64+
rack
65+
} else {
66+
None
67+
}
68+
}
69+
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
package org.apache.spark.scheduler.cluster.kubernetes
1818

1919
import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus}
20-
import org.apache.hadoop.conf.Configuration
21-
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
2220
import org.mockito.Matchers._
2321
import org.mockito.Mockito._
2422

@@ -98,7 +96,7 @@ class KubernetesTaskSchedulerImplSuite extends SparkFunSuite {
9896

9997
test("Does not get racks if plugin is not configured") {
10098
val rackResolverUtil = mock(classOf[RackResolverUtil])
101-
when(rackResolverUtil.checkConfigured(sc.hadoopConfiguration)).thenReturn(false)
99+
when(rackResolverUtil.isConfigured()).thenReturn(false)
102100
val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil)
103101
sched.kubernetesSchedulerBackend = backend
104102
when(backend.getExecutorPodByIP("kube-node1")).thenReturn(None)
@@ -115,28 +113,4 @@ class KubernetesTaskSchedulerImplSuite extends SparkFunSuite {
115113
assert(sched.getRackForHost("kube-node1:60010").isEmpty)
116114
assert(sched.getRackForHost("10.0.0.1:7079").isEmpty)
117115
}
118-
119-
test("Detects if topology plugin is configured") {
120-
val hadoopConfiguration = new Configuration
121-
val rackResolverUtil = new RackResolverUtil(hadoopConfiguration)
122-
123-
assert(!rackResolverUtil.checkConfigured(hadoopConfiguration))
124-
hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
125-
rackResolverUtil.scriptPlugin)
126-
assert(!rackResolverUtil.checkConfigured(hadoopConfiguration))
127-
hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
128-
"my-script")
129-
assert(rackResolverUtil.checkConfigured(hadoopConfiguration))
130-
131-
hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
132-
rackResolverUtil.tablePlugin)
133-
assert(!rackResolverUtil.checkConfigured(hadoopConfiguration))
134-
hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY,
135-
"my-table")
136-
assert(rackResolverUtil.checkConfigured(hadoopConfiguration))
137-
138-
hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
139-
"my.Plugin")
140-
assert(rackResolverUtil.checkConfigured(hadoopConfiguration))
141-
}
142116
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.kubernetes
18+
19+
import org.apache.hadoop.conf.Configuration
20+
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
21+
22+
import org.apache.spark.SparkFunSuite
23+
24+
class RackResolverUtilImplSuite extends SparkFunSuite {
25+
26+
test("Detects if topology plugin is configured") {
27+
val hadoopConfiguration = new Configuration
28+
val rackResolverUtil = new RackResolverUtilImpl(hadoopConfiguration)
29+
30+
assert(!rackResolverUtil.checkConfigured(hadoopConfiguration))
31+
hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
32+
rackResolverUtil.scriptPlugin)
33+
assert(!rackResolverUtil.checkConfigured(hadoopConfiguration))
34+
hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
35+
"my-script")
36+
assert(rackResolverUtil.checkConfigured(hadoopConfiguration))
37+
38+
hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
39+
rackResolverUtil.tablePlugin)
40+
assert(!rackResolverUtil.checkConfigured(hadoopConfiguration))
41+
hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY,
42+
"my-table")
43+
assert(rackResolverUtil.checkConfigured(hadoopConfiguration))
44+
45+
hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
46+
"my.Plugin")
47+
assert(rackResolverUtil.checkConfigured(hadoopConfiguration))
48+
}
49+
}

0 commit comments

Comments
 (0)