From fcb9a087a199445d7cbc2a3a2e19109a320d60ff Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 16 Jun 2017 10:56:47 -0700 Subject: [PATCH 1/6] Support HDFS rack locality --- .../KubernetesTaskSchedulerImpl.scala | 89 ++++++++++- .../KubernetesTaskSchedulerImplSuite.scala | 142 ++++++++++++++++++ 2 files changed, 229 insertions(+), 2 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala index a5e126480b83..4671da9f1038 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala @@ -16,12 +16,97 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import org.apache.spark.SparkContext +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.CommonConfigurationKeysPublic +import org.apache.hadoop.net.{NetworkTopology, ScriptBasedMapping, TableMapping} +import org.apache.hadoop.yarn.util.RackResolver +import org.apache.log4j.{Level, Logger} + import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager} +import org.apache.spark.util.Utils +import org.apache.spark.SparkContext -private[spark] class KubernetesTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) { +private[spark] class KubernetesTaskSchedulerImpl( + sc: SparkContext, + rackResolverUtil: RackResolverUtil = new RackResolverUtil, + inetAddressUtil: InetAddressUtil = new InetAddressUtil) extends TaskSchedulerImpl(sc) { + + rackResolverUtil.init(sc.hadoopConfiguration) override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { new KubernetesTaskSetManager(this, taskSet, maxTaskFailures) } + + override def getRackForHost(hostPort: String): Option[String] = { + if (!rackResolverUtil.isConfigured) { + // Only calls resolver when it is configured to avoid sending DNS queries for cluster nodes. + // See InetAddressUtil for details. + None + } else { + getRackForDatanodeOrExecutor(hostPort) + } + } + + private def getRackForDatanodeOrExecutor(hostPort: String): Option[String] = { + val host = Utils.parseHostPort(hostPort)._1 + val backend = this.backend.asInstanceOf[KubernetesClusterSchedulerBackend] + val executorPod = backend.getExecutorPodByIP(host) + if (executorPod.isEmpty) { + // Find the rack of the datanode host. + rackResolverUtil.resolveRack(sc.hadoopConfiguration, host) + } else { + // Find the rack of the cluster node that the executor pod is running on. + val clusterNodeName = executorPod.get.getSpec.getNodeName + val rackByNodeName = rackResolverUtil.resolveRack(sc.hadoopConfiguration, clusterNodeName) + if (rackByNodeName.nonEmpty) { + rackByNodeName + } else { + val clusterNodeIP = executorPod.get.getStatus.getHostIP + val rackByNodeIP = rackResolverUtil.resolveRack(sc.hadoopConfiguration, clusterNodeIP) + if (rackByNodeName.nonEmpty) { + rackByNodeIP + } else { + val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP) + rackResolverUtil.resolveRack(sc.hadoopConfiguration, clusterNodeFullName) + } + } + } + } +} + +private[kubernetes] class RackResolverUtil { + + val scriptPlugin = classOf[ScriptBasedMapping].getCanonicalName + val tablePlugin = classOf[TableMapping].getCanonicalName + + var isConfigured = false + + def init(hadoopConfiguration: Configuration) : Unit = { + isConfigured = checkConfigured(hadoopConfiguration) + // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. + if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { + Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) + } + } + + def checkConfigured(hadoopConfiguration: Configuration): Boolean = { + val plugin = hadoopConfiguration.get( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, scriptPlugin) + val scriptName = hadoopConfiguration.get( + CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "") + val tableName = hadoopConfiguration.get( + CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, "") + plugin == scriptPlugin && scriptName.nonEmpty || + plugin == tablePlugin && tableName.nonEmpty || + plugin != scriptPlugin && plugin != tablePlugin + } + + def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String] = { + val rack = Option(RackResolver.resolve(hadoopConfiguration, host).getNetworkLocation) + if (rack.nonEmpty && rack.get != NetworkTopology.DEFAULT_RACK) { + rack + } else { + None + } + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala new file mode 100644 index 000000000000..7add492391da --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.CommonConfigurationKeysPublic +import org.apache.hadoop.net.NetworkTopology +import org.mockito.Matchers._ +import org.mockito.Mockito._ + +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.scheduler.FakeTask + +class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { + + val sc = new SparkContext("local", "test") + val backend = mock(classOf[KubernetesClusterSchedulerBackend]) + + test("Create a k8s task set manager") { + val sched = new KubernetesTaskSchedulerImpl(sc) + sched.backend = backend + val taskSet = FakeTask.createTaskSet(0) + + val manager = sched.createTaskSetManager(taskSet, maxTaskFailures = 3) + assert(manager.isInstanceOf[KubernetesTaskSetManager]) + } + + test("Gets racks for datanodes") { + val rackResolverUtil = mock(classOf[RackResolverUtil]) + when(rackResolverUtil.checkConfigured(sc.hadoopConfiguration)).thenReturn(true) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1")) + .thenReturn(Option("/rack1")) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2")) + .thenReturn(Option("/rack2")) + val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil) + sched.backend = backend + when(backend.getExecutorPodByIP("kube-node1")).thenReturn(None) + when(backend.getExecutorPodByIP("kube-node2")).thenReturn(None) + + assert(sched.getRackForHost("kube-node1:60010") == Option("/rack1")) + assert(sched.getRackForHost("kube-node2:60010") == Option("/rack2")) + } + + test("Gets racks for executor pods") { + val rackResolverUtil = mock(classOf[RackResolverUtil]) + when(rackResolverUtil.checkConfigured(sc.hadoopConfiguration)).thenReturn(true) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1")) + .thenReturn(Option("/rack1")) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2.mydomain.com")) + .thenReturn(Option("/rack2")) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2")) + .thenReturn(Option(NetworkTopology.DEFAULT_RACK)) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "192.168.1.5")) + .thenReturn(Option(NetworkTopology.DEFAULT_RACK)) + val inetAddressUtil = mock(classOf[InetAddressUtil]) + val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil, inetAddressUtil) + sched.backend = backend + + val spec1 = mock(classOf[PodSpec]) + when(spec1.getNodeName).thenReturn("kube-node1") + val status1 = mock(classOf[PodStatus]) + when(status1.getHostIP).thenReturn("192.168.1.4") + val pod1 = mock(classOf[Pod]) + when(pod1.getSpec).thenReturn(spec1) + when(pod1.getStatus).thenReturn(status1) + when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1)) + + val spec2 = mock(classOf[PodSpec]) + when(spec2.getNodeName).thenReturn("kube-node2") + val status2 = mock(classOf[PodStatus]) + when(status2.getHostIP).thenReturn("192.168.1.5") + val pod2 = mock(classOf[Pod]) + when(pod2.getSpec).thenReturn(spec2) + when(pod2.getStatus).thenReturn(status2) + when(inetAddressUtil.getFullHostName("192.168.1.5")).thenReturn("kube-node2.mydomain.com") + when(backend.getExecutorPodByIP("10.0.1.1")).thenReturn(Some(pod2)) + + assert(sched.getRackForHost("10.0.0.1:7079") == Option("/rack1")) + assert(sched.getRackForHost("10.0.1.1:7079") == Option("/rack2")) + + verify(inetAddressUtil, times(1)).getFullHostName(anyString()) + } + + test("Does not get racks if plugin is not configured") { + val rackResolverUtil = mock(classOf[RackResolverUtil]) + when(rackResolverUtil.checkConfigured(sc.hadoopConfiguration)).thenReturn(false) + val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil) + sched.backend = backend + when(backend.getExecutorPodByIP("kube-node1")).thenReturn(None) + + val spec1 = mock(classOf[PodSpec]) + when(spec1.getNodeName).thenReturn("kube-node1") + val status1 = mock(classOf[PodStatus]) + when(status1.getHostIP).thenReturn("192.168.1.4") + val pod1 = mock(classOf[Pod]) + when(pod1.getSpec).thenReturn(spec1) + when(pod1.getStatus).thenReturn(status1) + when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1)) + + assert(sched.getRackForHost("kube-node1:60010").isEmpty) + assert(sched.getRackForHost("10.0.0.1:7079").isEmpty) + } + + test("Detects if topology plugin is configured") { + val rackResolverUtil = new RackResolverUtil + val hadoopConfiguration = new Configuration + + assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + rackResolverUtil.scriptPlugin) + assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, + "my-script") + assert(rackResolverUtil.checkConfigured(hadoopConfiguration)) + + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + rackResolverUtil.tablePlugin) + assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, + "my-table") + assert(rackResolverUtil.checkConfigured(hadoopConfiguration)) + + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + "my.Plugin") + assert(rackResolverUtil.checkConfigured(hadoopConfiguration)) + } +} From 8b58eed3d99c3026e570cbff976b1f546bc83bdc Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 16 Jun 2017 16:44:37 -0700 Subject: [PATCH 2/6] Fix unit tests --- .../KubernetesTaskSchedulerImplSuite.scala | 14 +++++++------- .../kubernetes/KubernetesTaskSetManagerSuite.scala | 5 +++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala index 7add492391da..f336c7547fa1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala @@ -19,16 +19,16 @@ package org.apache.spark.scheduler.cluster.kubernetes import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.CommonConfigurationKeysPublic -import org.apache.hadoop.net.NetworkTopology import org.mockito.Matchers._ import org.mockito.Mockito._ -import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.scheduler.FakeTask class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { - val sc = new SparkContext("local", "test") + val sc = new SparkContext(master = "local", appName = "test", + new SparkConf().set("spark.driver.allowMultipleContexts", "true")) val backend = mock(classOf[KubernetesClusterSchedulerBackend]) test("Create a k8s task set manager") { @@ -42,7 +42,7 @@ class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { test("Gets racks for datanodes") { val rackResolverUtil = mock(classOf[RackResolverUtil]) - when(rackResolverUtil.checkConfigured(sc.hadoopConfiguration)).thenReturn(true) + when(rackResolverUtil.isConfigured).thenReturn(true) when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1")) .thenReturn(Option("/rack1")) when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2")) @@ -58,15 +58,15 @@ class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { test("Gets racks for executor pods") { val rackResolverUtil = mock(classOf[RackResolverUtil]) - when(rackResolverUtil.checkConfigured(sc.hadoopConfiguration)).thenReturn(true) + when(rackResolverUtil.isConfigured).thenReturn(true) when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1")) .thenReturn(Option("/rack1")) when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2.mydomain.com")) .thenReturn(Option("/rack2")) when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2")) - .thenReturn(Option(NetworkTopology.DEFAULT_RACK)) + .thenReturn(None) when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "192.168.1.5")) - .thenReturn(Option(NetworkTopology.DEFAULT_RACK)) + .thenReturn(None) val inetAddressUtil = mock(classOf[InetAddressUtil]) val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil, inetAddressUtil) sched.backend = backend diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala index 7618c137ab22..35c5e89a2c5c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala @@ -21,12 +21,13 @@ import scala.collection.mutable.ArrayBuffer import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus} import org.mockito.Mockito._ -import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.scheduler.{FakeTask, FakeTaskScheduler, HostTaskLocation, TaskLocation} class KubernetesTaskSetManagerSuite extends SparkFunSuite { - val sc = new SparkContext("local", "test") + val sc = new SparkContext(master = "local", appName = "test", + new SparkConf().set("spark.driver.allowMultipleContexts", "true")) val sched = new FakeTaskScheduler(sc, ("execA", "10.0.0.1"), ("execB", "10.0.0.2"), ("execC", "10.0.0.3")) val backend = mock(classOf[KubernetesClusterSchedulerBackend]) From 7d54a8b88b06b7955d7cba35c579cf8240b64684 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Thu, 22 Jun 2017 17:31:20 -0700 Subject: [PATCH 3/6] Address review comments --- .../KubernetesTaskSchedulerImpl.scala | 66 ++++++++++--------- .../KubernetesTaskSchedulerImplSuite.scala | 16 ++--- .../KubernetesTaskSetManagerSuite.scala | 6 +- 3 files changed, 45 insertions(+), 43 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala index 4671da9f1038..91e532085106 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala @@ -22,17 +22,23 @@ import org.apache.hadoop.net.{NetworkTopology, ScriptBasedMapping, TableMapping} import org.apache.hadoop.yarn.util.RackResolver import org.apache.log4j.{Level, Logger} -import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager} +import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, TaskSet, TaskSetManager} import org.apache.spark.util.Utils import org.apache.spark.SparkContext private[spark] class KubernetesTaskSchedulerImpl( sc: SparkContext, - rackResolverUtil: RackResolverUtil = new RackResolverUtil, + rackResolverUtil: RackResolverUtil, inetAddressUtil: InetAddressUtil = new InetAddressUtil) extends TaskSchedulerImpl(sc) { - rackResolverUtil.init(sc.hadoopConfiguration) + var kubernetesSchedulerBackend: KubernetesClusterSchedulerBackend = null + def this(sc: SparkContext) = this(sc, new RackResolverUtil(sc.hadoopConfiguration)) + + override def initialize(backend: SchedulerBackend): Unit = { + super.initialize(backend) + kubernetesSchedulerBackend = this.backend.asInstanceOf[KubernetesClusterSchedulerBackend] + } override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { new KubernetesTaskSetManager(this, taskSet, maxTaskFailures) } @@ -49,44 +55,40 @@ private[spark] class KubernetesTaskSchedulerImpl( private def getRackForDatanodeOrExecutor(hostPort: String): Option[String] = { val host = Utils.parseHostPort(hostPort)._1 - val backend = this.backend.asInstanceOf[KubernetesClusterSchedulerBackend] - val executorPod = backend.getExecutorPodByIP(host) - if (executorPod.isEmpty) { - // Find the rack of the datanode host. - rackResolverUtil.resolveRack(sc.hadoopConfiguration, host) - } else { - // Find the rack of the cluster node that the executor pod is running on. - val clusterNodeName = executorPod.get.getSpec.getNodeName - val rackByNodeName = rackResolverUtil.resolveRack(sc.hadoopConfiguration, clusterNodeName) - if (rackByNodeName.nonEmpty) { - rackByNodeName - } else { - val clusterNodeIP = executorPod.get.getStatus.getHostIP - val rackByNodeIP = rackResolverUtil.resolveRack(sc.hadoopConfiguration, clusterNodeIP) + val executorPod = kubernetesSchedulerBackend.getExecutorPodByIP(host) + executorPod.isEmpty match { + case true => + // Find the rack of the datanode host. + rackResolverUtil.resolveRack(sc.hadoopConfiguration, host) + case false => + // Find the rack of the cluster node that the executor pod is running on. + val clusterNodeName = executorPod.get.getSpec.getNodeName + val rackByNodeName = rackResolverUtil.resolveRack(sc.hadoopConfiguration, clusterNodeName) if (rackByNodeName.nonEmpty) { - rackByNodeIP + rackByNodeName } else { - val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP) - rackResolverUtil.resolveRack(sc.hadoopConfiguration, clusterNodeFullName) + val clusterNodeIP = executorPod.get.getStatus.getHostIP + val rackByNodeIP = rackResolverUtil.resolveRack(sc.hadoopConfiguration, clusterNodeIP) + if (rackByNodeName.nonEmpty) { + rackByNodeIP + } else { + val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP) + rackResolverUtil.resolveRack(sc.hadoopConfiguration, clusterNodeFullName) + } } - } } } } -private[kubernetes] class RackResolverUtil { +private[kubernetes] class RackResolverUtil(hadoopConfiguration: Configuration) { - val scriptPlugin = classOf[ScriptBasedMapping].getCanonicalName - val tablePlugin = classOf[TableMapping].getCanonicalName + val scriptPlugin : String = classOf[ScriptBasedMapping].getCanonicalName + val tablePlugin : String = classOf[TableMapping].getCanonicalName + val isConfigured : Boolean = checkConfigured(hadoopConfiguration) - var isConfigured = false - - def init(hadoopConfiguration: Configuration) : Unit = { - isConfigured = checkConfigured(hadoopConfiguration) - // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. - if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { - Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) - } + // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. + if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { + Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) } def checkConfigured(hadoopConfiguration: Configuration): Boolean = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala index f336c7547fa1..71fc7e1d53eb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala @@ -22,18 +22,18 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic import org.mockito.Matchers._ import org.mockito.Mockito._ -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{SparkContext, SparkFunSuite} import org.apache.spark.scheduler.FakeTask class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { - val sc = new SparkContext(master = "local", appName = "test", - new SparkConf().set("spark.driver.allowMultipleContexts", "true")) + SparkContext.clearActiveContext() + val sc = new SparkContext("local", "test") val backend = mock(classOf[KubernetesClusterSchedulerBackend]) test("Create a k8s task set manager") { val sched = new KubernetesTaskSchedulerImpl(sc) - sched.backend = backend + sched.kubernetesSchedulerBackend = backend val taskSet = FakeTask.createTaskSet(0) val manager = sched.createTaskSetManager(taskSet, maxTaskFailures = 3) @@ -48,7 +48,7 @@ class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2")) .thenReturn(Option("/rack2")) val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil) - sched.backend = backend + sched.kubernetesSchedulerBackend = backend when(backend.getExecutorPodByIP("kube-node1")).thenReturn(None) when(backend.getExecutorPodByIP("kube-node2")).thenReturn(None) @@ -69,7 +69,7 @@ class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { .thenReturn(None) val inetAddressUtil = mock(classOf[InetAddressUtil]) val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil, inetAddressUtil) - sched.backend = backend + sched.kubernetesSchedulerBackend = backend val spec1 = mock(classOf[PodSpec]) when(spec1.getNodeName).thenReturn("kube-node1") @@ -100,7 +100,7 @@ class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { val rackResolverUtil = mock(classOf[RackResolverUtil]) when(rackResolverUtil.checkConfigured(sc.hadoopConfiguration)).thenReturn(false) val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil) - sched.backend = backend + sched.kubernetesSchedulerBackend = backend when(backend.getExecutorPodByIP("kube-node1")).thenReturn(None) val spec1 = mock(classOf[PodSpec]) @@ -117,8 +117,8 @@ class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { } test("Detects if topology plugin is configured") { - val rackResolverUtil = new RackResolverUtil val hadoopConfiguration = new Configuration + val rackResolverUtil = new RackResolverUtil(hadoopConfiguration) assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala index 35c5e89a2c5c..eced9a8e52e2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala @@ -21,13 +21,13 @@ import scala.collection.mutable.ArrayBuffer import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus} import org.mockito.Mockito._ -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{SparkContext, SparkFunSuite} import org.apache.spark.scheduler.{FakeTask, FakeTaskScheduler, HostTaskLocation, TaskLocation} class KubernetesTaskSetManagerSuite extends SparkFunSuite { - val sc = new SparkContext(master = "local", appName = "test", - new SparkConf().set("spark.driver.allowMultipleContexts", "true")) + SparkContext.clearActiveContext() + val sc = new SparkContext("local", "test") val sched = new FakeTaskScheduler(sc, ("execA", "10.0.0.1"), ("execB", "10.0.0.2"), ("execC", "10.0.0.3")) val backend = mock(classOf[KubernetesClusterSchedulerBackend]) From 6872b118c6f1f840942439a3921426350673af00 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Mon, 24 Jul 2017 17:00:30 -0700 Subject: [PATCH 4/6] Address some review comments --- .../KubernetesTaskSchedulerImpl.scala | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala index 91e532085106..723dae3cd52e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala @@ -56,27 +56,21 @@ private[spark] class KubernetesTaskSchedulerImpl( private def getRackForDatanodeOrExecutor(hostPort: String): Option[String] = { val host = Utils.parseHostPort(hostPort)._1 val executorPod = kubernetesSchedulerBackend.getExecutorPodByIP(host) - executorPod.isEmpty match { - case true => - // Find the rack of the datanode host. - rackResolverUtil.resolveRack(sc.hadoopConfiguration, host) - case false => - // Find the rack of the cluster node that the executor pod is running on. - val clusterNodeName = executorPod.get.getSpec.getNodeName - val rackByNodeName = rackResolverUtil.resolveRack(sc.hadoopConfiguration, clusterNodeName) - if (rackByNodeName.nonEmpty) { - rackByNodeName - } else { - val clusterNodeIP = executorPod.get.getStatus.getHostIP - val rackByNodeIP = rackResolverUtil.resolveRack(sc.hadoopConfiguration, clusterNodeIP) - if (rackByNodeName.nonEmpty) { - rackByNodeIP - } else { - val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP) - rackResolverUtil.resolveRack(sc.hadoopConfiguration, clusterNodeFullName) - } + val hadoopConfiguration = sc.hadoopConfiguration + executorPod.map( + pod => { + val clusterNodeName = pod.getSpec.getNodeName + val rackByNodeName = rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeName) + rackByNodeName.orElse({ + val clusterNodeIP = pod.getStatus.getHostIP + val rackByNodeIP = rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeIP) + rackByNodeIP.orElse({ + val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP) + rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeFullName) + }) + }) } - } + ).getOrElse(rackResolverUtil.resolveRack(hadoopConfiguration, host)) } } From 2e49e48fd10e1a27aaef06a9ba73544f2af865ea Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Tue, 25 Jul 2017 16:00:18 -0700 Subject: [PATCH 5/6] Use traits for InetAddress and RackResolver util classes --- .../cluster/kubernetes/InetAddressUtil.scala | 36 ++++++++++ .../KubernetesTaskSchedulerImpl.scala | 43 +----------- .../kubernetes/KubernetesTaskSetManager.scala | 13 +--- .../cluster/kubernetes/RackResolverUtil.scala | 69 +++++++++++++++++++ .../KubernetesTaskSchedulerImplSuite.scala | 28 +------- .../RackResolverUtilImplSuite.scala | 49 +++++++++++++ 6 files changed, 158 insertions(+), 80 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/InetAddressUtil.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtil.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtilImplSuite.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/InetAddressUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/InetAddressUtil.scala new file mode 100644 index 000000000000..e04ab9e54196 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/InetAddressUtil.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import java.net.InetAddress + +/** + * Gets full host names of given IP addresses from DNS. + */ +private[kubernetes] trait InetAddressUtil { + + def getFullHostName(ipAddress: String): String +} + +private[kubernetes] object InetAddressUtilImpl extends InetAddressUtil { + + // NOTE: This does issue a network call to DNS. Caching is done internally by the InetAddress + // class for both hits and misses. + override def getFullHostName(ipAddress: String): String = { + InetAddress.getByName(ipAddress).getCanonicalHostName + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala index 723dae3cd52e..2b4024899593 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala @@ -16,12 +16,6 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.CommonConfigurationKeysPublic -import org.apache.hadoop.net.{NetworkTopology, ScriptBasedMapping, TableMapping} -import org.apache.hadoop.yarn.util.RackResolver -import org.apache.log4j.{Level, Logger} - import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, TaskSet, TaskSetManager} import org.apache.spark.util.Utils import org.apache.spark.SparkContext @@ -29,11 +23,11 @@ import org.apache.spark.SparkContext private[spark] class KubernetesTaskSchedulerImpl( sc: SparkContext, rackResolverUtil: RackResolverUtil, - inetAddressUtil: InetAddressUtil = new InetAddressUtil) extends TaskSchedulerImpl(sc) { + inetAddressUtil: InetAddressUtil = InetAddressUtilImpl) extends TaskSchedulerImpl(sc) { var kubernetesSchedulerBackend: KubernetesClusterSchedulerBackend = null - def this(sc: SparkContext) = this(sc, new RackResolverUtil(sc.hadoopConfiguration)) + def this(sc: SparkContext) = this(sc, new RackResolverUtilImpl(sc.hadoopConfiguration)) override def initialize(backend: SchedulerBackend): Unit = { super.initialize(backend) @@ -73,36 +67,3 @@ private[spark] class KubernetesTaskSchedulerImpl( ).getOrElse(rackResolverUtil.resolveRack(hadoopConfiguration, host)) } } - -private[kubernetes] class RackResolverUtil(hadoopConfiguration: Configuration) { - - val scriptPlugin : String = classOf[ScriptBasedMapping].getCanonicalName - val tablePlugin : String = classOf[TableMapping].getCanonicalName - val isConfigured : Boolean = checkConfigured(hadoopConfiguration) - - // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. - if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { - Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) - } - - def checkConfigured(hadoopConfiguration: Configuration): Boolean = { - val plugin = hadoopConfiguration.get( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, scriptPlugin) - val scriptName = hadoopConfiguration.get( - CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "") - val tableName = hadoopConfiguration.get( - CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, "") - plugin == scriptPlugin && scriptName.nonEmpty || - plugin == tablePlugin && tableName.nonEmpty || - plugin != scriptPlugin && plugin != tablePlugin - } - - def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String] = { - val rack = Option(RackResolver.resolve(hadoopConfiguration, host).getNetworkLocation) - if (rack.nonEmpty && rack.get != NetworkTopology.DEFAULT_RACK) { - rack - } else { - None - } - } -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala index 51566d03a7a6..b0d981a153ca 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.net.InetAddress - import scala.collection.mutable.ArrayBuffer import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager} @@ -26,7 +24,7 @@ private[spark] class KubernetesTaskSetManager( sched: TaskSchedulerImpl, taskSet: TaskSet, maxTaskFailures: Int, - inetAddressUtil: InetAddressUtil = new InetAddressUtil) + inetAddressUtil: InetAddressUtil = InetAddressUtilImpl) extends TaskSetManager(sched, taskSet, maxTaskFailures) { /** @@ -74,12 +72,3 @@ private[spark] class KubernetesTaskSetManager( } } -// To support mocks in unit tests. -private[kubernetes] class InetAddressUtil { - - // NOTE: This does issue a network call to DNS. Caching is done internally by the InetAddress - // class for both hits and misses. - def getFullHostName(ipAddress: String): String = { - InetAddress.getByName(ipAddress).getCanonicalHostName - } -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtil.scala new file mode 100644 index 000000000000..29a7dc982a5a --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtil.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.CommonConfigurationKeysPublic +import org.apache.hadoop.net.{NetworkTopology, ScriptBasedMapping, TableMapping} +import org.apache.hadoop.yarn.util.RackResolver +import org.apache.log4j.{Level, Logger} + +/** + * Finds rack names that cluster nodes belong to in order to support HDFS rack locality. + */ +private[kubernetes] trait RackResolverUtil { + + def isConfigured() : Boolean + + def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String] +} + +private[kubernetes] class RackResolverUtilImpl(hadoopConfiguration: Configuration) + extends RackResolverUtil { + + val scriptPlugin : String = classOf[ScriptBasedMapping].getCanonicalName + val tablePlugin : String = classOf[TableMapping].getCanonicalName + val isResolverConfigured : Boolean = checkConfigured(hadoopConfiguration) + + // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. + if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { + Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) + } + + override def isConfigured() : Boolean = isResolverConfigured + + def checkConfigured(hadoopConfiguration: Configuration): Boolean = { + val plugin = hadoopConfiguration.get( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, scriptPlugin) + val scriptName = hadoopConfiguration.get( + CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "") + val tableName = hadoopConfiguration.get( + CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, "") + plugin == scriptPlugin && scriptName.nonEmpty || + plugin == tablePlugin && tableName.nonEmpty || + plugin != scriptPlugin && plugin != tablePlugin + } + + override def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String] = { + val rack = Option(RackResolver.resolve(hadoopConfiguration, host).getNetworkLocation) + if (rack.nonEmpty && rack.get != NetworkTopology.DEFAULT_RACK) { + rack + } else { + None + } + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala index 71fc7e1d53eb..d3fa118ca164 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.scheduler.cluster.kubernetes import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.CommonConfigurationKeysPublic import org.mockito.Matchers._ import org.mockito.Mockito._ @@ -98,7 +96,7 @@ class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { test("Does not get racks if plugin is not configured") { val rackResolverUtil = mock(classOf[RackResolverUtil]) - when(rackResolverUtil.checkConfigured(sc.hadoopConfiguration)).thenReturn(false) + when(rackResolverUtil.isConfigured()).thenReturn(false) val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil) sched.kubernetesSchedulerBackend = backend when(backend.getExecutorPodByIP("kube-node1")).thenReturn(None) @@ -115,28 +113,4 @@ class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { assert(sched.getRackForHost("kube-node1:60010").isEmpty) assert(sched.getRackForHost("10.0.0.1:7079").isEmpty) } - - test("Detects if topology plugin is configured") { - val hadoopConfiguration = new Configuration - val rackResolverUtil = new RackResolverUtil(hadoopConfiguration) - - assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) - hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - rackResolverUtil.scriptPlugin) - assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) - hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, - "my-script") - assert(rackResolverUtil.checkConfigured(hadoopConfiguration)) - - hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - rackResolverUtil.tablePlugin) - assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) - hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, - "my-table") - assert(rackResolverUtil.checkConfigured(hadoopConfiguration)) - - hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - "my.Plugin") - assert(rackResolverUtil.checkConfigured(hadoopConfiguration)) - } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtilImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtilImplSuite.scala new file mode 100644 index 000000000000..ee671ce7d659 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtilImplSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.CommonConfigurationKeysPublic + +import org.apache.spark.SparkFunSuite + +class RackResolverUtilImplSuite extends SparkFunSuite { + + test("Detects if topology plugin is configured") { + val hadoopConfiguration = new Configuration + val rackResolverUtil = new RackResolverUtilImpl(hadoopConfiguration) + + assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + rackResolverUtil.scriptPlugin) + assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, + "my-script") + assert(rackResolverUtil.checkConfigured(hadoopConfiguration)) + + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + rackResolverUtil.tablePlugin) + assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, + "my-table") + assert(rackResolverUtil.checkConfigured(hadoopConfiguration)) + + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + "my.Plugin") + assert(rackResolverUtil.checkConfigured(hadoopConfiguration)) + } +} From 3e1340201a118035f1187c5e4442e6ab20499178 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Thu, 10 Aug 2017 14:52:16 -0700 Subject: [PATCH 6/6] Disables expensive DNS lookup by default --- .../KubernetesTaskSchedulerImpl.scala | 9 +++- .../KubernetesTaskSchedulerImplSuite.scala | 51 ++++++++++++++++++- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala index 2b4024899593..ca40d46400ea 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.scheduler.cluster.kubernetes +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, TaskSet, TaskSetManager} import org.apache.spark.util.Utils import org.apache.spark.SparkContext @@ -59,8 +60,12 @@ private[spark] class KubernetesTaskSchedulerImpl( val clusterNodeIP = pod.getStatus.getHostIP val rackByNodeIP = rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeIP) rackByNodeIP.orElse({ - val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP) - rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeFullName) + if (conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)) { + val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP) + rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeFullName) + } else { + Option.empty + } }) }) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala index d3fa118ca164..b31ff4fdbeb3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala @@ -19,16 +19,21 @@ package org.apache.spark.scheduler.cluster.kubernetes import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus} import org.mockito.Matchers._ import org.mockito.Mockito._ - import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.scheduler.FakeTask +import org.scalatest.BeforeAndAfter -class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { +class KubernetesTaskSchedulerImplSuite extends SparkFunSuite with BeforeAndAfter { SparkContext.clearActiveContext() val sc = new SparkContext("local", "test") val backend = mock(classOf[KubernetesClusterSchedulerBackend]) + before { + sc.conf.remove(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED) + } + test("Create a k8s task set manager") { val sched = new KubernetesTaskSchedulerImpl(sc) sched.kubernetesSchedulerBackend = backend @@ -55,6 +60,7 @@ class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { } test("Gets racks for executor pods") { + sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, true) val rackResolverUtil = mock(classOf[RackResolverUtil]) when(rackResolverUtil.isConfigured).thenReturn(true) when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1")) @@ -94,6 +100,47 @@ class KubernetesTaskSchedulerImplSuite extends SparkFunSuite { verify(inetAddressUtil, times(1)).getFullHostName(anyString()) } + test("Gets racks for executor pods while disabling DNS lookup ") { + sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, false) + val rackResolverUtil = mock(classOf[RackResolverUtil]) + when(rackResolverUtil.isConfigured).thenReturn(true) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1")) + .thenReturn(Option("/rack1")) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2.mydomain.com")) + .thenReturn(Option("/rack2")) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2")) + .thenReturn(None) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "192.168.1.5")) + .thenReturn(None) + val inetAddressUtil = mock(classOf[InetAddressUtil]) + val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil, inetAddressUtil) + sched.kubernetesSchedulerBackend = backend + + val spec1 = mock(classOf[PodSpec]) + when(spec1.getNodeName).thenReturn("kube-node1") + val status1 = mock(classOf[PodStatus]) + when(status1.getHostIP).thenReturn("192.168.1.4") + val pod1 = mock(classOf[Pod]) + when(pod1.getSpec).thenReturn(spec1) + when(pod1.getStatus).thenReturn(status1) + when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1)) + + val spec2 = mock(classOf[PodSpec]) + when(spec2.getNodeName).thenReturn("kube-node2") + val status2 = mock(classOf[PodStatus]) + when(status2.getHostIP).thenReturn("192.168.1.5") + val pod2 = mock(classOf[Pod]) + when(pod2.getSpec).thenReturn(spec2) + when(pod2.getStatus).thenReturn(status2) + when(inetAddressUtil.getFullHostName("192.168.1.5")).thenReturn("kube-node2.mydomain.com") + when(backend.getExecutorPodByIP("10.0.1.1")).thenReturn(Some(pod2)) + + assert(sched.getRackForHost("10.0.0.1:7079") == Option("/rack1")) + assert(sched.getRackForHost("10.0.1.1:7079") == None) + + verify(inetAddressUtil, never).getFullHostName(anyString()) + } + test("Does not get racks if plugin is not configured") { val rackResolverUtil = mock(classOf[RackResolverUtil]) when(rackResolverUtil.isConfigured()).thenReturn(false)