Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,19 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED =
ConfigBuilder("spark.kubernetes.driver.hdfslocality.clusterNodeNameDNSLookup.enabled")
.doc("Whether or not HDFS locality support code should look up DNS for full hostnames of" +
" cluster nodes. In some K8s clusters, notably GKE, cluster node names are short" +
" hostnames, and so comparing them against HDFS datanode hostnames always fail. To fix," +
" enable this flag. This is disabled by default because DNS lookup can be expensive." +
" The driver can slow down and fail to respond to executor heartbeats in time." +
" If enabling this flag, make sure your DNS server has enough capacity" +
" for the workload.")
.internal()
.booleanConf
.createWithDefault(false)

private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for a single executor pod")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.net.InetAddress

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager}

private[spark] class KubernetesTaskSetManager(
Expand All @@ -29,6 +30,8 @@ private[spark] class KubernetesTaskSetManager(
inetAddressUtil: InetAddressUtil = new InetAddressUtil)
extends TaskSetManager(sched, taskSet, maxTaskFailures) {

private val conf = sched.sc.conf

/**
* Overrides the lookup to use not only the executor pod IP, but also the cluster node
* name and host IP address that the pod is running on. The base class may have populated
Expand Down Expand Up @@ -58,13 +61,19 @@ private[spark] class KubernetesTaskSetManager(
s"$executorIP using cluster node IP $clusterNodeIP")
pendingTasksClusterNodeIP
} else {
val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP)
val pendingTasksClusterNodeFullName = super.getPendingTasksForHost(clusterNodeFullName)
if (pendingTasksClusterNodeFullName.nonEmpty) {
logDebug(s"Got preferred task list $pendingTasksClusterNodeFullName " +
s"for executor host $executorIP using cluster node full name $clusterNodeFullName")
if (conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)) {
val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP)
val pendingTasksClusterNodeFullName = super.getPendingTasksForHost(
clusterNodeFullName)
if (pendingTasksClusterNodeFullName.nonEmpty) {
logDebug(s"Got preferred task list $pendingTasksClusterNodeFullName " +
s"for executor host $executorIP using cluster node full name " +
s"$clusterNodeFullName")
}
pendingTasksClusterNodeFullName
} else {
pendingTasksExecutorIP // Empty
}
pendingTasksClusterNodeFullName
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@ import scala.collection.mutable.ArrayBuffer

import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus}
import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.scheduler.{FakeTask, FakeTaskScheduler, HostTaskLocation, TaskLocation}

class KubernetesTaskSetManagerSuite extends SparkFunSuite {
class KubernetesTaskSetManagerSuite extends SparkFunSuite with BeforeAndAfter {

val sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc,
("execA", "10.0.0.1"), ("execB", "10.0.0.2"), ("execC", "10.0.0.3"))
val backend = mock(classOf[KubernetesClusterSchedulerBackend])
sched.backend = backend

before {
sc.conf.remove(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)
}

test("Find pending tasks for executors using executor pod IP addresses") {
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("10.0.0.1", "execA")), // Task 0 runs on executor pod 10.0.0.1.
Expand Down Expand Up @@ -76,7 +82,33 @@ class KubernetesTaskSetManagerSuite extends SparkFunSuite {
assert(manager.getPendingTasksForHost("10.0.0.1") == ArrayBuffer(1, 0))
}

test("Test DNS lookup is disabled by default for cluster node full hostnames") {
assert(!sc.conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED))
}

test("Find pending tasks for executors, but avoid looking up cluster node FQDNs from DNS") {
sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, false)
val taskSet = FakeTask.createTaskSet(2,
Seq(HostTaskLocation("kube-node1.domain1")), // Task 0's partition belongs to datanode here.
Seq(HostTaskLocation("kube-node1.domain1")) // task 1's partition belongs to datanode here.
)
val spec1 = mock(classOf[PodSpec])
when(spec1.getNodeName).thenReturn("kube-node1")
val pod1 = mock(classOf[Pod])
when(pod1.getSpec).thenReturn(spec1)
val status1 = mock(classOf[PodStatus])
when(status1.getHostIP).thenReturn("196.0.0.5")
when(pod1.getStatus).thenReturn(status1)
val inetAddressUtil = mock(classOf[InetAddressUtil])
when(inetAddressUtil.getFullHostName("196.0.0.5")).thenReturn("kube-node1.domain1")
when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1))

val manager = new KubernetesTaskSetManager(sched, taskSet, maxTaskFailures = 2, inetAddressUtil)
assert(manager.getPendingTasksForHost("10.0.0.1") == ArrayBuffer())
}

test("Find pending tasks for executors using cluster node FQDNs that executor pods run on") {
sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, true)
val taskSet = FakeTask.createTaskSet(2,
Seq(HostTaskLocation("kube-node1.domain1")), // Task 0's partition belongs to datanode here.
Seq(HostTaskLocation("kube-node1.domain1")) // task 1's partition belongs to datanode here.
Expand Down