From e2d48ded2f16c1c1305e2b50eb7247f19d787968 Mon Sep 17 00:00:00 2001 From: foxish Date: Thu, 23 Feb 2017 23:56:49 -0800 Subject: [PATCH 1/3] Recover from executor death --- .../KubernetesClusterSchedulerBackend.scala | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 898b215b92d04..aaa5df397aad9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -16,11 +16,8 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.UUID -import java.util.concurrent.Executors import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import com.google.common.util.concurrent.ThreadFactoryBuilder import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} @@ -29,7 +26,7 @@ import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.kubernetes.{Client, KubernetesClientBuilder} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.rpc.RpcEndpointAddress +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{ThreadUtils, Utils} @@ -257,8 +254,34 @@ private[spark] class KubernetesClusterSchedulerBackend( } true } + + override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { + new KubernetesDriverEndpoint(rpcEnv, properties) + } + + /** + * Override the DriverEndpoint to add extra logic for the case when + * an executor is disconnected. + */ + private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) + extends DriverEndpoint(rpcEnv, sparkProperties) { + + /** + * We assume for now that we should create a replacement executor pod when it is lost. + * TODO: if spark scales down the number of executors (in dynamic allocation mode), + * we must not attempt to create replacements for them. + */ + override def onDisconnected(rpcAddress: RpcAddress): Unit = { + addressToExecutorId.get(rpcAddress).foreach { executorId => + if (disableExecutor(executorId)) { + allocateNewExecutorPod() + } + } + } + } } + private object KubernetesClusterSchedulerBackend { private val DEFAULT_STATIC_PORT = 10000 private val MEMORY_OVERHEAD_FACTOR = 0.10 From 897df68009879684cfe4b695d72e9d90d30f81ed Mon Sep 17 00:00:00 2001 From: foxish Date: Fri, 24 Feb 2017 00:02:16 -0800 Subject: [PATCH 2/3] Fixes --- .../kubernetes/KubernetesClusterSchedulerBackend.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index aaa5df397aad9..f51fa1c8b646b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -274,7 +274,9 @@ private[spark] class KubernetesClusterSchedulerBackend( override def onDisconnected(rpcAddress: RpcAddress): Unit = { addressToExecutorId.get(rpcAddress).foreach { executorId => if (disableExecutor(executorId)) { - allocateNewExecutorPod() + EXECUTOR_MODIFICATION_LOCK.synchronized { + runningExecutorPods += allocateNewExecutorPod() + } } } } From f203c6c158d4ef46c0080ed330f9ad633b5475d8 Mon Sep 17 00:00:00 2001 From: foxish Date: Fri, 24 Feb 2017 16:15:51 -0800 Subject: [PATCH 3/3] Fix scalastyle issue --- .../kubernetes/KubernetesClusterSchedulerBackend.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index f51fa1c8b646b..e9271e17361bc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -260,9 +260,9 @@ private[spark] class KubernetesClusterSchedulerBackend( } /** - * Override the DriverEndpoint to add extra logic for the case when - * an executor is disconnected. - */ + * Override the DriverEndpoint to add extra logic for the case when + * an executor is disconnected. + */ private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends DriverEndpoint(rpcEnv, sparkProperties) {