diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/InetAddressUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/InetAddressUtil.scala new file mode 100644 index 000000000000..e04ab9e54196 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/InetAddressUtil.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import java.net.InetAddress + +/** + * Gets full host names of given IP addresses from DNS. + */ +private[kubernetes] trait InetAddressUtil { + + def getFullHostName(ipAddress: String): String +} + +private[kubernetes] object InetAddressUtilImpl extends InetAddressUtil { + + // NOTE: This does issue a network call to DNS. Caching is done internally by the InetAddress + // class for both hits and misses. + override def getFullHostName(ipAddress: String): String = { + InetAddress.getByName(ipAddress).getCanonicalHostName + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala index a5e126480b83..ca40d46400ea 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImpl.scala @@ -16,12 +16,59 @@ */ package org.apache.spark.scheduler.cluster.kubernetes +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, TaskSet, TaskSetManager} +import org.apache.spark.util.Utils import org.apache.spark.SparkContext -import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager} -private[spark] class KubernetesTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) { +private[spark] class KubernetesTaskSchedulerImpl( + sc: SparkContext, + rackResolverUtil: RackResolverUtil, + inetAddressUtil: InetAddressUtil = InetAddressUtilImpl) extends TaskSchedulerImpl(sc) { + var kubernetesSchedulerBackend: KubernetesClusterSchedulerBackend = null + + def this(sc: SparkContext) = this(sc, new RackResolverUtilImpl(sc.hadoopConfiguration)) + + override def initialize(backend: SchedulerBackend): Unit = { + super.initialize(backend) + kubernetesSchedulerBackend = this.backend.asInstanceOf[KubernetesClusterSchedulerBackend] + } override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { new KubernetesTaskSetManager(this, taskSet, maxTaskFailures) } + + override def getRackForHost(hostPort: String): Option[String] = { + if (!rackResolverUtil.isConfigured) { + // Only calls resolver when it is configured to avoid sending DNS queries for cluster nodes. + // See InetAddressUtil for details. + None + } else { + getRackForDatanodeOrExecutor(hostPort) + } + } + + private def getRackForDatanodeOrExecutor(hostPort: String): Option[String] = { + val host = Utils.parseHostPort(hostPort)._1 + val executorPod = kubernetesSchedulerBackend.getExecutorPodByIP(host) + val hadoopConfiguration = sc.hadoopConfiguration + executorPod.map( + pod => { + val clusterNodeName = pod.getSpec.getNodeName + val rackByNodeName = rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeName) + rackByNodeName.orElse({ + val clusterNodeIP = pod.getStatus.getHostIP + val rackByNodeIP = rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeIP) + rackByNodeIP.orElse({ + if (conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)) { + val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP) + rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeFullName) + } else { + Option.empty + } + }) + }) + } + ).getOrElse(rackResolverUtil.resolveRack(hadoopConfiguration, host)) + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala index 17710fada287..44d01a5d76b0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.net.InetAddress - import scala.collection.mutable.ArrayBuffer import org.apache.spark.deploy.kubernetes.config._ @@ -27,7 +25,7 @@ private[spark] class KubernetesTaskSetManager( sched: TaskSchedulerImpl, taskSet: TaskSet, maxTaskFailures: Int, - inetAddressUtil: InetAddressUtil = new InetAddressUtil) + inetAddressUtil: InetAddressUtil = InetAddressUtilImpl) extends TaskSetManager(sched, taskSet, maxTaskFailures) { private val conf = sched.sc.conf @@ -83,12 +81,3 @@ private[spark] class KubernetesTaskSetManager( } } -// To support mocks in unit tests. -private[kubernetes] class InetAddressUtil { - - // NOTE: This does issue a network call to DNS. Caching is done internally by the InetAddress - // class for both hits and misses. - def getFullHostName(ipAddress: String): String = { - InetAddress.getByName(ipAddress).getCanonicalHostName - } -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtil.scala new file mode 100644 index 000000000000..29a7dc982a5a --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtil.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.CommonConfigurationKeysPublic +import org.apache.hadoop.net.{NetworkTopology, ScriptBasedMapping, TableMapping} +import org.apache.hadoop.yarn.util.RackResolver +import org.apache.log4j.{Level, Logger} + +/** + * Finds rack names that cluster nodes belong to in order to support HDFS rack locality. + */ +private[kubernetes] trait RackResolverUtil { + + def isConfigured() : Boolean + + def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String] +} + +private[kubernetes] class RackResolverUtilImpl(hadoopConfiguration: Configuration) + extends RackResolverUtil { + + val scriptPlugin : String = classOf[ScriptBasedMapping].getCanonicalName + val tablePlugin : String = classOf[TableMapping].getCanonicalName + val isResolverConfigured : Boolean = checkConfigured(hadoopConfiguration) + + // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. + if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { + Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) + } + + override def isConfigured() : Boolean = isResolverConfigured + + def checkConfigured(hadoopConfiguration: Configuration): Boolean = { + val plugin = hadoopConfiguration.get( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, scriptPlugin) + val scriptName = hadoopConfiguration.get( + CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "") + val tableName = hadoopConfiguration.get( + CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, "") + plugin == scriptPlugin && scriptName.nonEmpty || + plugin == tablePlugin && tableName.nonEmpty || + plugin != scriptPlugin && plugin != tablePlugin + } + + override def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String] = { + val rack = Option(RackResolver.resolve(hadoopConfiguration, host).getNetworkLocation) + if (rack.nonEmpty && rack.get != NetworkTopology.DEFAULT_RACK) { + rack + } else { + None + } + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala new file mode 100644 index 000000000000..b31ff4fdbeb3 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSchedulerImplSuite.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.scheduler.FakeTask +import org.scalatest.BeforeAndAfter + +class KubernetesTaskSchedulerImplSuite extends SparkFunSuite with BeforeAndAfter { + + SparkContext.clearActiveContext() + val sc = new SparkContext("local", "test") + val backend = mock(classOf[KubernetesClusterSchedulerBackend]) + + before { + sc.conf.remove(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED) + } + + test("Create a k8s task set manager") { + val sched = new KubernetesTaskSchedulerImpl(sc) + sched.kubernetesSchedulerBackend = backend + val taskSet = FakeTask.createTaskSet(0) + + val manager = sched.createTaskSetManager(taskSet, maxTaskFailures = 3) + assert(manager.isInstanceOf[KubernetesTaskSetManager]) + } + + test("Gets racks for datanodes") { + val rackResolverUtil = mock(classOf[RackResolverUtil]) + when(rackResolverUtil.isConfigured).thenReturn(true) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1")) + .thenReturn(Option("/rack1")) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2")) + .thenReturn(Option("/rack2")) + val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil) + sched.kubernetesSchedulerBackend = backend + when(backend.getExecutorPodByIP("kube-node1")).thenReturn(None) + when(backend.getExecutorPodByIP("kube-node2")).thenReturn(None) + + assert(sched.getRackForHost("kube-node1:60010") == Option("/rack1")) + assert(sched.getRackForHost("kube-node2:60010") == Option("/rack2")) + } + + test("Gets racks for executor pods") { + sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, true) + val rackResolverUtil = mock(classOf[RackResolverUtil]) + when(rackResolverUtil.isConfigured).thenReturn(true) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1")) + .thenReturn(Option("/rack1")) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2.mydomain.com")) + .thenReturn(Option("/rack2")) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2")) + .thenReturn(None) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "192.168.1.5")) + .thenReturn(None) + val inetAddressUtil = mock(classOf[InetAddressUtil]) + val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil, inetAddressUtil) + sched.kubernetesSchedulerBackend = backend + + val spec1 = mock(classOf[PodSpec]) + when(spec1.getNodeName).thenReturn("kube-node1") + val status1 = mock(classOf[PodStatus]) + when(status1.getHostIP).thenReturn("192.168.1.4") + val pod1 = mock(classOf[Pod]) + when(pod1.getSpec).thenReturn(spec1) + when(pod1.getStatus).thenReturn(status1) + when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1)) + + val spec2 = mock(classOf[PodSpec]) + when(spec2.getNodeName).thenReturn("kube-node2") + val status2 = mock(classOf[PodStatus]) + when(status2.getHostIP).thenReturn("192.168.1.5") + val pod2 = mock(classOf[Pod]) + when(pod2.getSpec).thenReturn(spec2) + when(pod2.getStatus).thenReturn(status2) + when(inetAddressUtil.getFullHostName("192.168.1.5")).thenReturn("kube-node2.mydomain.com") + when(backend.getExecutorPodByIP("10.0.1.1")).thenReturn(Some(pod2)) + + assert(sched.getRackForHost("10.0.0.1:7079") == Option("/rack1")) + assert(sched.getRackForHost("10.0.1.1:7079") == Option("/rack2")) + + verify(inetAddressUtil, times(1)).getFullHostName(anyString()) + } + + test("Gets racks for executor pods while disabling DNS lookup ") { + sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, false) + val rackResolverUtil = mock(classOf[RackResolverUtil]) + when(rackResolverUtil.isConfigured).thenReturn(true) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1")) + .thenReturn(Option("/rack1")) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2.mydomain.com")) + .thenReturn(Option("/rack2")) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2")) + .thenReturn(None) + when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "192.168.1.5")) + .thenReturn(None) + val inetAddressUtil = mock(classOf[InetAddressUtil]) + val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil, inetAddressUtil) + sched.kubernetesSchedulerBackend = backend + + val spec1 = mock(classOf[PodSpec]) + when(spec1.getNodeName).thenReturn("kube-node1") + val status1 = mock(classOf[PodStatus]) + when(status1.getHostIP).thenReturn("192.168.1.4") + val pod1 = mock(classOf[Pod]) + when(pod1.getSpec).thenReturn(spec1) + when(pod1.getStatus).thenReturn(status1) + when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1)) + + val spec2 = mock(classOf[PodSpec]) + when(spec2.getNodeName).thenReturn("kube-node2") + val status2 = mock(classOf[PodStatus]) + when(status2.getHostIP).thenReturn("192.168.1.5") + val pod2 = mock(classOf[Pod]) + when(pod2.getSpec).thenReturn(spec2) + when(pod2.getStatus).thenReturn(status2) + when(inetAddressUtil.getFullHostName("192.168.1.5")).thenReturn("kube-node2.mydomain.com") + when(backend.getExecutorPodByIP("10.0.1.1")).thenReturn(Some(pod2)) + + assert(sched.getRackForHost("10.0.0.1:7079") == Option("/rack1")) + assert(sched.getRackForHost("10.0.1.1:7079") == None) + + verify(inetAddressUtil, never).getFullHostName(anyString()) + } + + test("Does not get racks if plugin is not configured") { + val rackResolverUtil = mock(classOf[RackResolverUtil]) + when(rackResolverUtil.isConfigured()).thenReturn(false) + val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil) + sched.kubernetesSchedulerBackend = backend + when(backend.getExecutorPodByIP("kube-node1")).thenReturn(None) + + val spec1 = mock(classOf[PodSpec]) + when(spec1.getNodeName).thenReturn("kube-node1") + val status1 = mock(classOf[PodStatus]) + when(status1.getHostIP).thenReturn("192.168.1.4") + val pod1 = mock(classOf[Pod]) + when(pod1.getSpec).thenReturn(spec1) + when(pod1.getStatus).thenReturn(status1) + when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1)) + + assert(sched.getRackForHost("kube-node1:60010").isEmpty) + assert(sched.getRackForHost("10.0.0.1:7079").isEmpty) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala index 864ff40d88c5..889758731a6c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.scheduler.{FakeTask, FakeTaskScheduler, HostTaskLocation class KubernetesTaskSetManagerSuite extends SparkFunSuite with BeforeAndAfter { + SparkContext.clearActiveContext() val sc = new SparkContext("local", "test") val sched = new FakeTaskScheduler(sc, ("execA", "10.0.0.1"), ("execB", "10.0.0.2"), ("execC", "10.0.0.3")) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtilImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtilImplSuite.scala new file mode 100644 index 000000000000..ee671ce7d659 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/RackResolverUtilImplSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.CommonConfigurationKeysPublic + +import org.apache.spark.SparkFunSuite + +class RackResolverUtilImplSuite extends SparkFunSuite { + + test("Detects if topology plugin is configured") { + val hadoopConfiguration = new Configuration + val rackResolverUtil = new RackResolverUtilImpl(hadoopConfiguration) + + assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + rackResolverUtil.scriptPlugin) + assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, + "my-script") + assert(rackResolverUtil.checkConfigured(hadoopConfiguration)) + + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + rackResolverUtil.tablePlugin) + assert(!rackResolverUtil.checkConfigured(hadoopConfiguration)) + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, + "my-table") + assert(rackResolverUtil.checkConfigured(hadoopConfiguration)) + + hadoopConfiguration.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + "my.Plugin") + assert(rackResolverUtil.checkConfigured(hadoopConfiguration)) + } +}