Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
*/
package org.apache.spark.scheduler.cluster.kubernetes

import java.util.UUID
import java.util.concurrent.Executors
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}

import com.google.common.util.concurrent.ThreadFactoryBuilder
import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder}
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -29,7 +26,7 @@ import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.kubernetes.{Client, KubernetesClientBuilder}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.{ThreadUtils, Utils}
Expand Down Expand Up @@ -257,8 +254,36 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
true
}

override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new KubernetesDriverEndpoint(rpcEnv, properties)
}

/**
* Override the DriverEndpoint to add extra logic for the case when
* an executor is disconnected.
*/
private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends DriverEndpoint(rpcEnv, sparkProperties) {

/**
* We assume for now that we should create a replacement executor pod when it is lost.
* TODO: if spark scales down the number of executors (in dynamic allocation mode),
Copy link

@mccheah mccheah Feb 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this case is actually supposed to be handled by CoarseGrainedSchedulerBackend. How does the code there react to a lost executor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YARN scheduler backend uses this pattern. However, I think I should be calling super.OnDisconnected because it does some things like unregister the executor from the block manager and other bookkeeping.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YARN isn't a good precedent because they have a construct of loss reasons, in order to find why the executor disconnected and to change around some of the logic accordingly. For example, when the executor is eliminated because of YARN's preemption, Spark doesn't count tasks that were allocated to that executor towards a count of failures that fail the entire job at a certain threshold.

I think just calling RemoveExecutor should suffice, which is what the default implementation does. I'm surprised that CoarseGrainedSchedulerBackend isn't trying to re-sync the number of active executors to what is desired - I would anticipate this to be said class's job. It might be worth tracing down how the other implementations eventually do this re-sync - I believe YARN mode depends on logic on the Application Master side.

Copy link

@mccheah mccheah Feb 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of how YARN goes about doing it is that it has a maintained target number of executors stored on the ApplicationMaster. YarnSchedulerBackend#doRequestTotalExecutors (code) synchronizes the requested number of executors from the driver up to the ApplicationMaster. The ApplicationMaster periodically runs YarnAllocator#allocateResources (code) which synchronizes the amount of resources (# of executors) required with the YARN resource manager.

The periodic polling "self-corrects" in incidents where executors are lost during the lifetime of an application. We can follow this precedent, but it's not completely unreasonable to handle the situation in our own way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for investigating. Perhaps then we should adopt our own model, and rather than react to executor loss using onDisconnected, we should watch all the executor pods with the kubernetes API and create new executors if it falls below the expected number?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think trying to query the Spark UI that runs on the executors is good enough. @ash211 thoughts?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can check that an executor is individually healthy with a liveness probe on it. But another failure mode is when it gets network partitioned from the driver. It might create a lot of traffic on the driver if every probe check on the executor also verified connectivity to the driver with a ping.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tentatively fine with being inaccurate on that particular failure mode if it means we get to use mostly Kubernetes primitives in an elegant way. I'm not sure if executors detect if they can't reach the driver or if the driver is failing to send messages to the executor, and I don't know if the executors shut themselves down in that case - it would be worth looking into that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of checking pods using k8s API.

On health checks. Executors periodically send heartbeats to the driver so the driver can expire unresponsive executors. (See HeartbeatReceiver code) Maybe we can piggyback on them:

  private def expireDeadHosts(): Unit = {
    logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
    val now = clock.getTimeMillis()
    for ((executorId, lastSeenMs) <- executorLastSeen) {
      if (now - lastSeenMs > executorTimeoutMs) {
        logWarning(s"Removing executor $executorId with no recent heartbeats: " +
          s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
        scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
          s"timed out after ${now - lastSeenMs} ms"))
          // Asynchronously kill the executor to avoid blocking the current thread
        killExecutorThread.submit(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            // Note: we want to get an executor back after expiring this one,
            // so do not simply call `sc.killExecutor` here (SPARK-8119)
            sc.killAndReplaceExecutor(executorId)
          }
        })
        executorLastSeen.remove(executorId)
      }
    }
  }

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@foxish Let's go with a strategy that watches the K8s API and go from there.

* we must not attempt to create replacements for them.
*/
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
addressToExecutorId.get(rpcAddress).foreach { executorId =>
if (disableExecutor(executorId)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the onDisconnected method in the super calls removeExecutor not disableExecutor -- should we be using that one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder onDisconnected is called also when an executor JVM crashed inside a running pod. (Seems this is the caller code):

          case RemoteProcessDisconnected(remoteAddress) =>
            endpoint.onDisconnected(remoteAddress)

We may not want to allocate a new pod in such a case because the JVM will get restarted. i.e. We may want to allocate a new pod only when we lost a pod.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to be consistent everywhere and always operate on the pod level as opposed to the container level. So if the container JVM exits we should immediately shut down the whole pod and allocate a new one.

EXECUTOR_MODIFICATION_LOCK.synchronized {
runningExecutorPods += allocateNewExecutorPod()
}
}
}
}
}
}


private object KubernetesClusterSchedulerBackend {
private val DEFAULT_STATIC_PORT = 10000
private val MEMORY_OVERHEAD_FACTOR = 0.10
Expand Down