Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {

Expand Down Expand Up @@ -132,12 +132,18 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
executorInitContainerBootstrap,
executorInitContainerSecretVolumePlugin,
kubernetesShuffleManager)
val allocatorExecutor = ThreadUtils
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
"kubernetes-executor-requests")
new KubernetesClusterSchedulerBackend(
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl],
sc,
scheduler.asInstanceOf[TaskSchedulerImpl],
sc.env.rpcEnv,
executorPodFactory,
kubernetesShuffleManager,
kubernetesClient)
kubernetesClient,
allocatorExecutor,
requestExecutorsService)
}

override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster.kubernetes
import java.io.Closeable
import java.net.InetAddress
import java.util.Collections
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}

import io.fabric8.kubernetes.api.model._
Expand All @@ -29,25 +29,28 @@ import scala.collection.{concurrent, mutable}
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.Utils

private[spark] class KubernetesClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
val sc: SparkContext,
rpcEnv: RpcEnv,
executorPodFactory: ExecutorPodFactory,
shuffleManager: Option[KubernetesExternalShuffleManager],
kubernetesClient: KubernetesClient)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
kubernetesClient: KubernetesClient,
allocatorExecutor: ScheduledExecutorService,
requestExecutorsService: ExecutorService)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {

import KubernetesClusterSchedulerBackend._

private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this here so there's less static state?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct - it's much more difficult to unit test if the counter is global, because between different tests one needs to know what the counter is set to.

private val RUNNING_EXECUTOR_PODS_LOCK = new Object
// Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
Expand All @@ -57,21 +60,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val EXECUTOR_PODS_BY_IPS_LOCK = new Object
// Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK
private val executorPodsByIPs = new mutable.HashMap[String, Pod]
private val failedPods: concurrent.Map[String, ExecutorExited] = new
ConcurrentHashMap[String, ExecutorExited]().asScala
private val executorsToRemove = Collections.newSetFromMap[String](
new ConcurrentHashMap[String, java.lang.Boolean]()).asScala
private val podsWithKnownExitReasons: concurrent.Map[String, ExecutorExited] =
new ConcurrentHashMap[String, ExecutorExited]().asScala
private val disconnectedPodsByExecutorIdPendingRemoval =
new ConcurrentHashMap[String, Pod]().asScala

private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)

private val kubernetesDriverPodName = conf
.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(
throw new SparkException("Must specify the driver pod name"))
private val executorPodNamePrefix = conf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)

private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests"))
requestExecutorsService)

private val driverPod = try {
kubernetesClient.pods().inNamespace(kubernetesNamespace).
Expand All @@ -93,9 +94,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
protected var totalExpectedExecutors = new AtomicInteger(0)

private val driverUrl = RpcEndpointAddress(
sc.getConf.get("spark.driver.host"),
sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
conf.get("spark.driver.host"),
conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

private val initialExecutors = getInitialTargetExecutorNumber()

Expand All @@ -109,21 +110,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
s"${KUBERNETES_ALLOCATION_BATCH_SIZE} " +
s"is ${podAllocationSize}, should be a positive integer")

private val allocator = ThreadUtils
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
private val allocatorRunnable = new Runnable {

private val allocatorRunnable: Runnable = new Runnable {

// Number of times we are allowed check for the loss reason for an executor before we give up
// and assume the executor failed for good, and attribute it to a framework fault.
private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
private val executorsToRecover = new mutable.HashSet[String]
// Maintains a map of executor id to count of checks performed to learn the loss reason
// for an executor.
private val executorReasonChecks = new mutable.HashMap[String, Int]
private val executorReasonCheckAttemptCounts = new mutable.HashMap[String, Int]

override def run(): Unit = {
removeFailedExecutors()
handleDisconnectedExecutors()
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) {
logDebug("Waiting for pending executors before scaling")
Expand All @@ -132,7 +126,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
} else {
val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
for (i <- 0 until math.min(
totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount)
runningExecutorsToPods.put(executorId, pod)
runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
Expand All @@ -143,43 +137,47 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
}

def removeFailedExecutors(): Unit = {
val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.toMap
}
executorsToRemove.foreach { case (executorId) =>
localRunningExecutorsToPods.get(executorId).map { pod: Pod =>
failedPods.get(pod.getMetadata.getName).map { executorExited: ExecutorExited =>
logDebug(s"Removing executor $executorId with loss reason " + executorExited.message)
removeExecutor(executorId, executorExited)
if (!executorExited.exitCausedByApp) {
executorsToRecover.add(executorId)
}
}.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
}.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))

executorsToRecover.foreach(executorId => {
executorsToRemove -= executorId
executorReasonChecks -= executorId
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.remove(executorId).map { pod: Pod =>
kubernetesClient.pods().delete(pod)
runningPodsToExecutors.remove(pod.getMetadata.getName)
}.getOrElse(logWarning(s"Unable to remove pod for unknown executor $executorId"))
def handleDisconnectedExecutors(): Unit = {
// For each disconnected executor, synchronize with the loss reasons that may have been found
// by the executor pod watcher. If the loss reason was discovered by the watcher,
// inform the parent class with removeExecutor.
val disconnectedPodsByExecutorIdPendingRemovalCopy =
Map.empty ++ disconnectedPodsByExecutorIdPendingRemoval
disconnectedPodsByExecutorIdPendingRemovalCopy.foreach { case (executorId, executorPod) =>
val knownExitReason = podsWithKnownExitReasons.remove(executorPod.getMetadata.getName)
knownExitReason.fold {
removeExecutorOrIncrementLossReasonCheckCount(executorId)
} { executorExited =>
logDebug(s"Removing executor $executorId with loss reason " + executorExited.message)
removeExecutor(executorId, executorExited)
// We keep around executors that have exit conditions caused by the application. This
// allows them to be debugged later on. Otherwise, mark them as to be deleted from the
// the API server.
if (!executorExited.exitCausedByApp) {
deleteExecutorFromClusterAndDataStructures(executorId)
}
})
executorsToRecover.clear()
}
}
}

def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = {
val reasonCheckCount = executorReasonChecks.getOrElse(executorId, 0)
if (reasonCheckCount > MAX_EXECUTOR_LOST_REASON_CHECKS) {
removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons"))
executorsToRecover.add(executorId)
executorReasonChecks -= executorId
val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0)
if (reasonCheckCount >= MAX_EXECUTOR_LOST_REASON_CHECKS) {
removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons."))
deleteExecutorFromClusterAndDataStructures(executorId)
Copy link
Member

@varunkatta varunkatta Sep 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the change. Thanks for catching the omission here.

} else {
executorReasonChecks.put(executorId, reasonCheckCount + 1)
executorReasonCheckAttemptCounts.put(executorId, reasonCheckCount + 1)
}
}

def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = {
disconnectedPodsByExecutorIdPendingRemoval -= executorId
executorReasonCheckAttemptCounts -= executorId
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.remove(executorId).map { pod =>
kubernetesClient.pods().delete(pod)
runningPodsToExecutors.remove(pod.getMetadata.getName)
}.getOrElse(logWarning(s"Unable to remove pod for unknown executor $executorId"))
}
}
}
Expand Down Expand Up @@ -214,18 +212,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.watch(new ExecutorPodsWatcher()))

allocator.scheduleWithFixedDelay(
allocatorRunnable, 0, podAllocationInterval, TimeUnit.SECONDS)
allocatorExecutor.scheduleWithFixedDelay(
allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS)
shuffleManager.foreach(_.start(applicationId()))

if (!Utils.isDynamicAllocationEnabled(sc.conf)) {
if (!Utils.isDynamicAllocationEnabled(conf)) {
doRequestTotalExecutors(initialExecutors)
}
}

override def stop(): Unit = {
// stop allocation of new resources and caches.
allocator.shutdown()
allocatorExecutor.shutdown()
shuffleManager.foreach(_.stop())

// send stop message to executors so they shut down cleanly
Expand Down Expand Up @@ -298,7 +296,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
executorId,
applicationId(),
driverUrl,
sc.conf.getExecutorEnv,
conf.getExecutorEnv,
driverPod,
nodeToLocalTaskCount)
try {
Expand All @@ -318,11 +316,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
for (executor <- executorIds) {
runningExecutorsToPods.remove(executor) match {
case Some(pod) =>
kubernetesClient.pods().delete(pod)
runningPodsToExecutors.remove(pod.getMetadata.getName)
case None => logWarning(s"Unable to remove pod for unknown executor $executor")
val maybeRemovedExecutor = runningExecutorsToPods.remove(executor)
maybeRemovedExecutor.foreach { executorPod =>
kubernetesClient.pods().delete(executorPod)
disconnectedPodsByExecutorIdPendingRemoval(executor) = executorPod
runningPodsToExecutors.remove(executorPod.getMetadata.getName)
}
if (maybeRemovedExecutor.isEmpty) {
logWarning(s"Unable to remove pod for unknown executor $executor")
}
}
}
Expand Down Expand Up @@ -396,10 +397,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
}

def handleErroredPod(pod: Pod): Unit = {
val alreadyReleased = isPodAlreadyReleased(pod)
val containerExitStatus = getExecutorExitStatus(pod)
// container was probably actively killed by the driver.
val exitReason = if (alreadyReleased) {
val exitReason = if (isPodAlreadyReleased(pod)) {
ExecutorExited(containerExitStatus, exitCausedByApp = false,
s"Container in pod " + pod.getMetadata.getName +
" exited from explicit termination request.")
Expand All @@ -411,17 +411,23 @@ private[spark] class KubernetesClusterSchedulerBackend(
// Here we can't be sure that that exit was caused by the application but this seems
// to be the right default since we know the pod was not explicitly deleted by
// the user.
"Pod exited with following container exit status code " + containerExitStatus
s"Pod ${pod.getMetadata.getName}'s executor container exited with exit status" +
s" code $containerExitStatus."
}
ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason)
}
failedPods.put(pod.getMetadata.getName, exitReason)
podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason)
}

def handleDeletedPod(pod: Pod): Unit = {
val exitReason = ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false,
"Pod " + pod.getMetadata.getName + " deleted or lost.")
failedPods.put(pod.getMetadata.getName, exitReason)
val exitMessage = if (isPodAlreadyReleased(pod)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this change..Makes it consistent with the case of ErroredPod handling

s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request."
} else {
s"Pod ${pod.getMetadata.getName} deleted or lost."
}
val exitReason = ExecutorExited(
getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage)
podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason)
}
}

Expand All @@ -433,12 +439,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
rpcEnv: RpcEnv,
sparkProperties: Seq[(String, String)])
extends DriverEndpoint(rpcEnv, sparkProperties) {
private val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)

override def onDisconnected(rpcAddress: RpcAddress): Unit = {
addressToExecutorId.get(rpcAddress).foreach { executorId =>
if (disableExecutor(executorId)) {
executorsToRemove.add(executorId)
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.get(executorId).foreach { pod =>
disconnectedPodsByExecutorIdPendingRemoval(executorId) = pod
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain a bit why it's safe to remove the executor directly here, rather than going through the executorsToRemove set first?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't remove the executor directly here. Some of the logic has changed and variables are renamed, so executorsToRemove doesn't exactly exist anymore anyways.

Copy link
Author

@mccheah mccheah Aug 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are marking the executor as disconnected and the allocator thread will clean it up once the exit reason is known. I believe this is the same semantics as before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. No change in semantics just that the marking is done after verifying that the executor is still running/tracked.

}
}
}
}
}
Expand All @@ -448,7 +457,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
new PartialFunction[Any, Unit]() {
override def isDefinedAt(msg: Any): Boolean = {
msg match {
case RetrieveSparkAppConfig(executorId) =>
case RetrieveSparkAppConfig(_) =>
shuffleManager.isDefined
case _ => false
}
Expand Down Expand Up @@ -477,11 +486,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
}

private object KubernetesClusterSchedulerBackend {
private val DEFAULT_STATIC_PORT = 10000
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
private val VMEM_EXCEEDED_EXIT_CODE = -103
private val PMEM_EXCEEDED_EXIT_CODE = -104
private val UNKNOWN_EXIT_CODE = -111
// Number of times we are allowed check for the loss reason for an executor before we give up
// and assume the executor failed for good, and attribute it to a framework fault.
val MAX_EXECUTOR_LOST_REASON_CHECKS = 10

def memLimitExceededLogMessage(diagnostics: String): String = {
s"Pod/Container killed for exceeding memory limits. $diagnostics" +
Expand Down
Loading