diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 898b215b92d04..e9271e17361bc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -16,11 +16,8 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.UUID -import java.util.concurrent.Executors import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import com.google.common.util.concurrent.ThreadFactoryBuilder import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} @@ -29,7 +26,7 @@ import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.kubernetes.{Client, KubernetesClientBuilder} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.rpc.RpcEndpointAddress +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{ThreadUtils, Utils} @@ -257,8 +254,36 @@ private[spark] class KubernetesClusterSchedulerBackend( } true } + + override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { + new KubernetesDriverEndpoint(rpcEnv, properties) + } + + /** + * Override the DriverEndpoint to add extra logic for the case when + * an executor is disconnected. + */ + private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) + extends DriverEndpoint(rpcEnv, sparkProperties) { + + /** + * We assume for now that we should create a replacement executor pod when it is lost. + * TODO: if spark scales down the number of executors (in dynamic allocation mode), + * we must not attempt to create replacements for them. + */ + override def onDisconnected(rpcAddress: RpcAddress): Unit = { + addressToExecutorId.get(rpcAddress).foreach { executorId => + if (disableExecutor(executorId)) { + EXECUTOR_MODIFICATION_LOCK.synchronized { + runningExecutorPods += allocateNewExecutorPod() + } + } + } + } + } } + private object KubernetesClusterSchedulerBackend { private val DEFAULT_STATIC_PORT = 10000 private val MEMORY_OVERHEAD_FACTOR = 0.10