From 89366619ffb016e1a1f1e66a2ceb63754293a083 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Wed, 2 Aug 2017 12:00:02 -0700 Subject: [PATCH 1/3] Flag-guard expensive DNS lookup of cluster node full names, part of HDFS locality support --- .../spark/deploy/kubernetes/config.scala | 12 +++++++++ .../kubernetes/KubernetesTaskSetManager.scala | 6 ++++- .../KubernetesTaskSetManagerSuite.scala | 27 +++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index c6772c1cb5ae..e0e97fcc70ec 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -491,6 +491,18 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED = + ConfigBuilder("spark.kubernetes.driver.hdfslocality.clusterNodeNameDNSLookup.enabled") + .doc("Whether or not HDFS locality support code should look up DNS for full hostnames of" + + " cluster nodes. In some K8s clusters, notably GKE, cluster node names are short" + + " hostnames, and so comparing them against HDFS datanode hostnames always fail. To fix," + + " enable this flag. This is disabled by default because DNS lookup can be expensive." + + " The driver can slow down and fail to respond to executor heartbeats in time." + + " If enabling this flag, make sure your DNS server has enough capacity" + + " for the workload.") + .booleanConf + .createWithDefault(false) + private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES = ConfigBuilder("spark.kubernetes.executor.limit.cores") .doc("Specify the hard cpu limit for a single executor pod") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala index 51566d03a7a6..2498987d3261 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala @@ -20,6 +20,7 @@ import java.net.InetAddress import scala.collection.mutable.ArrayBuffer +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager} private[spark] class KubernetesTaskSetManager( @@ -29,6 +30,8 @@ private[spark] class KubernetesTaskSetManager( inetAddressUtil: InetAddressUtil = new InetAddressUtil) extends TaskSetManager(sched, taskSet, maxTaskFailures) { + private val conf = sched.sc.conf + /** * Overrides the lookup to use not only the executor pod IP, but also the cluster node * name and host IP address that the pod is running on. The base class may have populated @@ -53,7 +56,8 @@ private[spark] class KubernetesTaskSetManager( } else { val clusterNodeIP = pod.get.getStatus.getHostIP val pendingTasksClusterNodeIP = super.getPendingTasksForHost(clusterNodeIP) - if (pendingTasksClusterNodeIP.nonEmpty) { + if (pendingTasksClusterNodeIP.nonEmpty || + !conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)) { logDebug(s"Got preferred task list $pendingTasksClusterNodeIP for executor host " + s"$executorIP using cluster node IP $clusterNodeIP") pendingTasksClusterNodeIP diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala index 7618c137ab22..c6bf96c07105 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala @@ -22,6 +22,7 @@ import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus} import org.mockito.Mockito._ import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.scheduler.{FakeTask, FakeTaskScheduler, HostTaskLocation, TaskLocation} class KubernetesTaskSetManagerSuite extends SparkFunSuite { @@ -76,7 +77,33 @@ class KubernetesTaskSetManagerSuite extends SparkFunSuite { assert(manager.getPendingTasksForHost("10.0.0.1") == ArrayBuffer(1, 0)) } + test("Test DNS lookup is disabled by default for cluster node full hostnames") { + assert(!sc.conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)) + } + + test("Find pending tasks for executors, but avoid looking up cluster node FQDNs from DNS") { + sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, false) + val taskSet = FakeTask.createTaskSet(2, + Seq(HostTaskLocation("kube-node1.domain1")), // Task 0's partition belongs to datanode here. + Seq(HostTaskLocation("kube-node1.domain1")) // task 1's partition belongs to datanode here. + ) + val spec1 = mock(classOf[PodSpec]) + when(spec1.getNodeName).thenReturn("kube-node1") + val pod1 = mock(classOf[Pod]) + when(pod1.getSpec).thenReturn(spec1) + val status1 = mock(classOf[PodStatus]) + when(status1.getHostIP).thenReturn("196.0.0.5") + when(pod1.getStatus).thenReturn(status1) + val inetAddressUtil = mock(classOf[InetAddressUtil]) + when(inetAddressUtil.getFullHostName("196.0.0.5")).thenReturn("kube-node1.domain1") + when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1)) + + val manager = new KubernetesTaskSetManager(sched, taskSet, maxTaskFailures = 2, inetAddressUtil) + assert(manager.getPendingTasksForHost("10.0.0.1") == ArrayBuffer()) + } + test("Find pending tasks for executors using cluster node FQDNs that executor pods run on") { + sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, true) val taskSet = FakeTask.createTaskSet(2, Seq(HostTaskLocation("kube-node1.domain1")), // Task 0's partition belongs to datanode here. Seq(HostTaskLocation("kube-node1.domain1")) // task 1's partition belongs to datanode here. From 4c4bfe2c4a20ac7bb9dfdd7608492772bd53480c Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Thu, 3 Aug 2017 09:26:20 -0700 Subject: [PATCH 2/3] Clean up a bit --- .../spark/deploy/kubernetes/config.scala | 1 + .../kubernetes/KubernetesTaskSetManager.scala | 21 ++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index e0e97fcc70ec..f9c4c9c6a1e1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -500,6 +500,7 @@ package object config extends Logging { " The driver can slow down and fail to respond to executor heartbeats in time." + " If enabling this flag, make sure your DNS server has enough capacity" + " for the workload.") + .internal() .booleanConf .createWithDefault(false) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala index 2498987d3261..17710fada287 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManager.scala @@ -56,19 +56,24 @@ private[spark] class KubernetesTaskSetManager( } else { val clusterNodeIP = pod.get.getStatus.getHostIP val pendingTasksClusterNodeIP = super.getPendingTasksForHost(clusterNodeIP) - if (pendingTasksClusterNodeIP.nonEmpty || - !conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)) { + if (pendingTasksClusterNodeIP.nonEmpty) { logDebug(s"Got preferred task list $pendingTasksClusterNodeIP for executor host " + s"$executorIP using cluster node IP $clusterNodeIP") pendingTasksClusterNodeIP } else { - val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP) - val pendingTasksClusterNodeFullName = super.getPendingTasksForHost(clusterNodeFullName) - if (pendingTasksClusterNodeFullName.nonEmpty) { - logDebug(s"Got preferred task list $pendingTasksClusterNodeFullName " + - s"for executor host $executorIP using cluster node full name $clusterNodeFullName") + if (conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)) { + val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP) + val pendingTasksClusterNodeFullName = super.getPendingTasksForHost( + clusterNodeFullName) + if (pendingTasksClusterNodeFullName.nonEmpty) { + logDebug(s"Got preferred task list $pendingTasksClusterNodeFullName " + + s"for executor host $executorIP using cluster node full name " + + s"$clusterNodeFullName") + } + pendingTasksClusterNodeFullName + } else { + pendingTasksExecutorIP // Empty } - pendingTasksClusterNodeFullName } } } else { From 27f433f7b9cca783fc6ec93315a173f0b22436e2 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Thu, 3 Aug 2017 15:16:29 -0700 Subject: [PATCH 3/3] Improve unit tests --- .../cluster/kubernetes/KubernetesTaskSetManagerSuite.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala index c6bf96c07105..864ff40d88c5 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesTaskSetManagerSuite.scala @@ -20,12 +20,13 @@ import scala.collection.mutable.ArrayBuffer import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus} import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkContext, SparkFunSuite} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.scheduler.{FakeTask, FakeTaskScheduler, HostTaskLocation, TaskLocation} -class KubernetesTaskSetManagerSuite extends SparkFunSuite { +class KubernetesTaskSetManagerSuite extends SparkFunSuite with BeforeAndAfter { val sc = new SparkContext("local", "test") val sched = new FakeTaskScheduler(sc, @@ -33,6 +34,10 @@ class KubernetesTaskSetManagerSuite extends SparkFunSuite { val backend = mock(classOf[KubernetesClusterSchedulerBackend]) sched.backend = backend + before { + sc.conf.remove(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED) + } + test("Find pending tasks for executors using executor pod IP addresses") { val taskSet = FakeTask.createTaskSet(3, Seq(TaskLocation("10.0.0.1", "execA")), // Task 0 runs on executor pod 10.0.0.1.