-
Notifications
You must be signed in to change notification settings - Fork 117
Support HDFS rack locality #349
Description
This is likely the last sub-item of HDFS locality umbrella issue #206.
When using HDFS, Spark driver looks up which rack a given datanode or executor belongs to. So that it sends tasks to right executors that can read task data from datanodes on same racks. (This happens as a fallback when node locality fails). To support rack locality, Spark driver loads a configurable topology plugin into its JVM.
TaskSchedulerImpl has the getRackForHost method, that is supposed to be overridden by a subclass to call the topology plugin. Then TaskSetManager will call getRackForHost to populate pendingTasksForRack map. It gets called for each datanode host associated with input data blocks of pending tasks.
private def addPendingTask(index: Int) {
...
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
getRackForHost is also called with executor addresses when the driver is about to send tasks to executors.
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
...
if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL, false))
}
}
Yarn implements getRackForHost method in YarnScheduler:
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}
We can add similar code in KubernetesTaskSchedulerImpl. The datanode handling will be exactly like the above. For executors, we would map pod IP addresses to cluster node name/IP and then call the topology plugin using the cluster node address. I'll send a PR soon.