@@ -60,10 +60,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
6060 private val EXECUTOR_PODS_BY_IPS_LOCK = new Object
6161 // Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK
6262 private val executorPodsByIPs = new mutable.HashMap [String , Pod ]
63- private val failedPods : concurrent.Map [String , ExecutorExited ] = new
64- ConcurrentHashMap [String , ExecutorExited ]().asScala
65- private val executorsToRemove = Collections .newSetFromMap[ String ](
66- new ConcurrentHashMap [String , java.lang. Boolean ]() ).asScala
63+ private val podsWithKnownExitReasons : concurrent.Map [String , ExecutorExited ] =
64+ new ConcurrentHashMap [String , ExecutorExited ]().asScala
65+ private val disconnectedPodsByExecutorIdPendingRemoval =
66+ new ConcurrentHashMap [String , Pod ]( ).asScala
6767
6868 private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE )
6969
@@ -111,18 +111,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
111111 s " ${KUBERNETES_ALLOCATION_BATCH_SIZE } " +
112112 s " is ${podAllocationSize}, should be a positive integer " )
113113
114- private val allocatorRunnable : Runnable = new Runnable {
114+ private val allocatorRunnable = new Runnable {
115115
116- // Number of times we are allowed check for the loss reason for an executor before we give up
117- // and assume the executor failed for good, and attribute it to a framework fault.
118- private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
119- private val executorsToRecover = new mutable.HashSet [String ]
120116 // Maintains a map of executor id to count of checks performed to learn the loss reason
121117 // for an executor.
122- private val executorReasonChecks = new mutable.HashMap [String , Int ]
118+ private val executorReasonCheckAttemptCounts = new mutable.HashMap [String , Int ]
123119
124120 override def run (): Unit = {
125- removeFailedExecutors ()
121+ handleDisconnectedExecutors ()
126122 RUNNING_EXECUTOR_PODS_LOCK .synchronized {
127123 if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) {
128124 logDebug(" Waiting for pending executors before scaling" )
@@ -131,7 +127,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
131127 } else {
132128 val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
133129 for (i <- 0 until math.min(
134- totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
130+ totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
135131 val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount)
136132 runningExecutorsToPods.put(executorId, pod)
137133 runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
@@ -142,43 +138,47 @@ private[spark] class KubernetesClusterSchedulerBackend(
142138 }
143139 }
144140
145- def removeFailedExecutors (): Unit = {
146- val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK .synchronized {
147- runningExecutorsToPods.toMap
148- }
149- executorsToRemove.foreach { case (executorId) =>
150- localRunningExecutorsToPods.get(executorId).map { pod : Pod =>
151- failedPods.get(pod.getMetadata.getName).map { executorExited : ExecutorExited =>
152- logDebug(s " Removing executor $executorId with loss reason " + executorExited.message)
153- removeExecutor(executorId, executorExited)
154- if (! executorExited.exitCausedByApp) {
155- executorsToRecover.add(executorId)
156- }
157- }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
158- }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
159-
160- executorsToRecover.foreach(executorId => {
161- executorsToRemove -= executorId
162- executorReasonChecks -= executorId
163- RUNNING_EXECUTOR_PODS_LOCK .synchronized {
164- runningExecutorsToPods.remove(executorId).map { pod : Pod =>
165- kubernetesClient.pods().delete(pod)
166- runningPodsToExecutors.remove(pod.getMetadata.getName)
167- }.getOrElse(logWarning(s " Unable to remove pod for unknown executor $executorId" ))
141+ def handleDisconnectedExecutors (): Unit = {
142+ // For each disconnected executor, synchronize with the loss reasons that may have been found
143+ // by the executor pod watcher. If the loss reason was discovered by the watcher,
144+ // inform the parent class with removeExecutor.
145+ val disconnectedPodsByExecutorIdPendingRemovalCopy =
146+ Map .empty ++ disconnectedPodsByExecutorIdPendingRemoval
147+ disconnectedPodsByExecutorIdPendingRemovalCopy.foreach { case (executorId, executorPod) =>
148+ val knownExitReason = podsWithKnownExitReasons.remove(executorPod.getMetadata.getName)
149+ knownExitReason.fold {
150+ removeExecutorOrIncrementLossReasonCheckCount(executorId)
151+ } { executorExited =>
152+ logDebug(s " Removing executor $executorId with loss reason " + executorExited.message)
153+ removeExecutor(executorId, executorExited)
154+ // We keep around executors that have exit conditions caused by the application. This
155+ // allows them to be debugged later on. Otherwise, mark them as to be deleted from the
156+ // the API server.
157+ if (! executorExited.exitCausedByApp) {
158+ deleteExecutorFromApiAndDataStructures(executorId)
168159 }
169- })
170- executorsToRecover.clear()
160+ }
171161 }
172162 }
173163
174164 def removeExecutorOrIncrementLossReasonCheckCount (executorId : String ): Unit = {
175- val reasonCheckCount = executorReasonChecks.getOrElse(executorId, 0 )
176- if (reasonCheckCount > MAX_EXECUTOR_LOST_REASON_CHECKS ) {
177- removeExecutor(executorId, SlaveLost (" Executor lost for unknown reasons" ))
178- executorsToRecover.add(executorId)
179- executorReasonChecks -= executorId
165+ val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0 )
166+ if (reasonCheckCount >= MAX_EXECUTOR_LOST_REASON_CHECKS ) {
167+ removeExecutor(executorId, SlaveLost (" Executor lost for unknown reasons." ))
168+ deleteExecutorFromApiAndDataStructures(executorId)
180169 } else {
181- executorReasonChecks.put(executorId, reasonCheckCount + 1 )
170+ executorReasonCheckAttemptCounts.put(executorId, reasonCheckCount + 1 )
171+ }
172+ }
173+
174+ def deleteExecutorFromApiAndDataStructures (executorId : String ): Unit = {
175+ disconnectedPodsByExecutorIdPendingRemoval -= executorId
176+ executorReasonCheckAttemptCounts -= executorId
177+ RUNNING_EXECUTOR_PODS_LOCK .synchronized {
178+ runningExecutorsToPods.remove(executorId).map { pod =>
179+ kubernetesClient.pods().delete(pod)
180+ runningPodsToExecutors.remove(pod.getMetadata.getName)
181+ }.getOrElse(logWarning(s " Unable to remove pod for unknown executor $executorId" ))
182182 }
183183 }
184184 }
@@ -320,6 +320,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
320320 val maybeRemovedExecutor = runningExecutorsToPods.remove(executor)
321321 maybeRemovedExecutor.foreach { executorPod =>
322322 kubernetesClient.pods().delete(executorPod)
323+ disconnectedPodsByExecutorIdPendingRemoval(executor) = executorPod
323324 runningPodsToExecutors.remove(executorPod.getMetadata.getName)
324325 }
325326 if (maybeRemovedExecutor.isEmpty) {
@@ -412,17 +413,24 @@ private[spark] class KubernetesClusterSchedulerBackend(
412413 // Here we can't be sure that that exit was caused by the application but this seems
413414 // to be the right default since we know the pod was not explicitly deleted by
414415 // the user.
415- " Pod exited with following container exit status code " + containerExitStatus
416+ s " Pod ${pod.getMetadata.getName}'s executor container exited with exit status " +
417+ s " code $containerExitStatus. "
416418 }
417419 ExecutorExited (containerExitStatus, exitCausedByApp = true , containerExitReason)
418420 }
419- failedPods .put(pod.getMetadata.getName, exitReason)
421+ podsWithKnownExitReasons .put(pod.getMetadata.getName, exitReason)
420422 }
421423
422424 def handleDeletedPod (pod : Pod ): Unit = {
423- val exitReason = ExecutorExited (getExecutorExitStatus(pod), exitCausedByApp = false ,
424- " Pod " + pod.getMetadata.getName + " deleted or lost." )
425- failedPods.put(pod.getMetadata.getName, exitReason)
425+ val alreadyReleased = isPodAlreadyReleased(pod)
426+ val exitMessage = if (alreadyReleased) {
427+ s " Container in pod ${pod.getMetadata.getName} exited from explicit termination request. "
428+ } else {
429+ s " Pod ${pod.getMetadata.getName} deleted or lost. "
430+ }
431+ val exitReason = ExecutorExited (
432+ getExecutorExitStatus(pod), exitCausedByApp = false , exitMessage)
433+ podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason)
426434 }
427435 }
428436
@@ -434,12 +442,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
434442 rpcEnv : RpcEnv ,
435443 sparkProperties : Seq [(String , String )])
436444 extends DriverEndpoint (rpcEnv, sparkProperties) {
437- private val externalShufflePort = conf.getInt(" spark.shuffle.service.port" , 7337 )
438445
439446 override def onDisconnected (rpcAddress : RpcAddress ): Unit = {
440447 addressToExecutorId.get(rpcAddress).foreach { executorId =>
441448 if (disableExecutor(executorId)) {
442- executorsToRemove.add(executorId)
449+ RUNNING_EXECUTOR_PODS_LOCK .synchronized {
450+ runningExecutorsToPods.get(executorId).foreach { pod =>
451+ disconnectedPodsByExecutorIdPendingRemoval(executorId) = pod
452+ }
453+ }
443454 }
444455 }
445456 }
@@ -478,10 +489,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
478489}
479490
480491private object KubernetesClusterSchedulerBackend {
481- private val DEFAULT_STATIC_PORT = 10000
482492 private val VMEM_EXCEEDED_EXIT_CODE = - 103
483493 private val PMEM_EXCEEDED_EXIT_CODE = - 104
484494 private val UNKNOWN_EXIT_CODE = - 111
495+ // Number of times we are allowed check for the loss reason for an executor before we give up
496+ // and assume the executor failed for good, and attribute it to a framework fault.
497+ val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
485498
486499 def memLimitExceededLogMessage (diagnostics : String ): String = {
487500 s " Pod/Container killed for exceeding memory limits. $diagnostics" +
0 commit comments