From c87a2f52acb7905ce0fb0513d7d7bbcb5046478e Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Mon, 7 Aug 2017 18:13:56 -0700 Subject: [PATCH] Flag-guard expensive DNS lookup of cluster node full names, part of HDFS locality support (#412) * Flag-guard expensive DNS lookup of cluster node full names, part of HDFS locality support * Clean up a bit * Improve unit tests --- .../spark/deploy/kubernetes/config.scala | 13 +++++++ .../kubernetes/KubernetesTaskSetManager.scala | 21 ++++++++---- .../KubernetesTaskSetManagerSuite.scala | 34 ++++++++++++++++++- 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index c6772c1cb5ae4..f9c4c9c6a1e18 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -491,6 +491,19 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED = + ConfigBuilder("spark.kubernetes.driver.hdfslocality.clusterNodeNameDNSLookup.enabled") + .doc("Whether or not HDFS locality support code should look up DNS for full hostnames of" + + " cluster nodes. In some K8s clusters, notably GKE, cluster node names are short" + + " hostnames, and so comparing them against HDFS datanode hostnames always fail. To fix," + + " enable this flag. This is disabled by default because DNS lookup can be expensive." + + " The driver can slow down and fail to respond to executor heartbeats in time." + + " If enabling this flag, make sure your DNS server has enough capacity" + + " for the workload.") + .internal() + .booleanConf + .createWithDefault(false) + private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES = ConfigBuilder("spark.kubernetes.executor.limit.cores") .doc("Specify the hard cpu limit for a single executor pod") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala index 51566d03a7a6c..17710fada2876 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala @@ -20,6 +20,7 @@ import java.net.InetAddress import scala.collection.mutable.ArrayBuffer +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager} private[spark] class KubernetesTaskSetManager( @@ -29,6 +30,8 @@ private[spark] class KubernetesTaskSetManager( inetAddressUtil: InetAddressUtil = new InetAddressUtil) extends TaskSetManager(sched, taskSet, maxTaskFailures) { + private val conf = sched.sc.conf + /** * Overrides the lookup to use not only the executor pod IP, but also the cluster node * name and host IP address that the pod is running on. The base class may have populated @@ -58,13 +61,19 @@ private[spark] class KubernetesTaskSetManager( s"$executorIP using cluster node IP $clusterNodeIP") pendingTasksClusterNodeIP } else { - val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP) - val pendingTasksClusterNodeFullName = super.getPendingTasksForHost(clusterNodeFullName) - if (pendingTasksClusterNodeFullName.nonEmpty) { - logDebug(s"Got preferred task list $pendingTasksClusterNodeFullName " + - s"for executor host $executorIP using cluster node full name $clusterNodeFullName") + if (conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)) { + val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP) + val pendingTasksClusterNodeFullName = super.getPendingTasksForHost( + clusterNodeFullName) + if (pendingTasksClusterNodeFullName.nonEmpty) { + logDebug(s"Got preferred task list $pendingTasksClusterNodeFullName " + + s"for executor host $executorIP using cluster node full name " + + s"$clusterNodeFullName") + } + pendingTasksClusterNodeFullName + } else { + pendingTasksExecutorIP // Empty } - pendingTasksClusterNodeFullName } } } else { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala index 7618c137ab22b..864ff40d88c5c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala @@ -20,11 +20,13 @@ import scala.collection.mutable.ArrayBuffer import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus} import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.scheduler.{FakeTask, FakeTaskScheduler, HostTaskLocation, TaskLocation} -class KubernetesTaskSetManagerSuite extends SparkFunSuite { +class KubernetesTaskSetManagerSuite extends SparkFunSuite with BeforeAndAfter { val sc = new SparkContext("local", "test") val sched = new FakeTaskScheduler(sc, @@ -32,6 +34,10 @@ class KubernetesTaskSetManagerSuite extends SparkFunSuite { val backend = mock(classOf[KubernetesClusterSchedulerBackend]) sched.backend = backend + before { + sc.conf.remove(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED) + } + test("Find pending tasks for executors using executor pod IP addresses") { val taskSet = FakeTask.createTaskSet(3, Seq(TaskLocation("10.0.0.1", "execA")), // Task 0 runs on executor pod 10.0.0.1. @@ -76,7 +82,33 @@ class KubernetesTaskSetManagerSuite extends SparkFunSuite { assert(manager.getPendingTasksForHost("10.0.0.1") == ArrayBuffer(1, 0)) } + test("Test DNS lookup is disabled by default for cluster node full hostnames") { + assert(!sc.conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)) + } + + test("Find pending tasks for executors, but avoid looking up cluster node FQDNs from DNS") { + sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, false) + val taskSet = FakeTask.createTaskSet(2, + Seq(HostTaskLocation("kube-node1.domain1")), // Task 0's partition belongs to datanode here. + Seq(HostTaskLocation("kube-node1.domain1")) // task 1's partition belongs to datanode here. + ) + val spec1 = mock(classOf[PodSpec]) + when(spec1.getNodeName).thenReturn("kube-node1") + val pod1 = mock(classOf[Pod]) + when(pod1.getSpec).thenReturn(spec1) + val status1 = mock(classOf[PodStatus]) + when(status1.getHostIP).thenReturn("196.0.0.5") + when(pod1.getStatus).thenReturn(status1) + val inetAddressUtil = mock(classOf[InetAddressUtil]) + when(inetAddressUtil.getFullHostName("196.0.0.5")).thenReturn("kube-node1.domain1") + when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1)) + + val manager = new KubernetesTaskSetManager(sched, taskSet, maxTaskFailures = 2, inetAddressUtil) + assert(manager.getPendingTasksForHost("10.0.0.1") == ArrayBuffer()) + } + test("Find pending tasks for executors using cluster node FQDNs that executor pods run on") { + sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, true) val taskSet = FakeTask.createTaskSet(2, Seq(HostTaskLocation("kube-node1.domain1")), // Task 0's partition belongs to datanode here. Seq(HostTaskLocation("kube-node1.domain1")) // task 1's partition belongs to datanode here.