Skip to content

Conversation

@erikerlandson
Copy link
Collaborator

@erikerlandson erikerlandson commented Nov 12, 2016

This PR adds support for dynamic executors. It works by running the shuffle server in the same container as the executor.

To use it, you will want to run with my latest image:
manyangled/kube-spark:dynamic

You must configure dynamic executors, for example submit using --conf spark.dynamicAllocation.enabled=true

@erikerlandson
Copy link
Collaborator Author

@foxish Here is a basic dynamic executor capability, using the per-executor shuffle server we talked about as a starting point.


if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
submitArgs ++= Vector(
"--conf spark.dynamicAllocation.enabled=true",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should actually try to propogate all spark configurations from the user.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely, that is on my to-do list

logInfo(s"Adding $delta new executor Pods")
createExecutorPods(delta)
} else if (delta < 0) {
logInfo(s"Deleting ${-delta} new executor Pods")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Spark should give you a request total executors less than what's already running, we should just assert this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the dynamic-executor case, Spark will scale back what it's requesting. For example, if a job starts to complete and there are fewer partitions left to compute, it will start requesting numbers of executors fewer than what it is running, and eventually drops to zero.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, how do you plan to pick the executor pods to delete ? I can think of a few ways to do this, but ultimately the external shuffle service would be needed here to prevent losing shuffle blocks. (For safe deletion of executors). If this is in place, then perhaps we don't need to be particularly mindful of which executor pods get removed, so long as the resources are returned back to the cluster


if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
submitArgs ++= Vector(
"dynamic-executors")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this conf for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a special argument to flag to the shim script that it needs to spin up the shuffle service before it launches the executor backend. It's a bit of a hack, but putting it right at the beginning made it easy to check for and remove. Shell handling of argument lists isn't very sophisticated :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original plan was to just detect the --conf spark.dynamicAllocation.enabled flag, but the ExecutorBackend doesn't recognize --conf args (it appears to expect its conf sent from the driver), so I needed something easy to strip off the argument list. Alternatively, the pod could be configured with some additional environment variable, but that isn't any simpler, afaict.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just set a environment variable? An argument makes it hard to realize what this is really for, at least we can have a descriptive name like "SPARK_LAUNCH_SHUFFLE_SERVICE=1"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And before we go too far we should ask perhaps @rxin or someone familiar with shuffle if they see long term problem with this.
Where is the code that just launchs shuffle service? I don't see it in the PR

clientJarUri,
s"--class=${args.userClass}",
s"--master=$kubernetesHost",
s"--executor-memory=${driverDescription.mem}",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also realize we need to set cores too, however we should just forward all user specified Spark conf into executors and override if need to

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: another to-do is to make sure the executor pods are configured with core/mem requirements that align with the corresponding spark configurations for core/mem

@foxish
Copy link
Owner

foxish commented Nov 15, 2016

Thanks, merging this for now in order to iterate.

@foxish foxish merged this pull request into foxish:k8s-support Nov 15, 2016
foxish pushed a commit that referenced this pull request Nov 15, 2016
* Add support for dynamic executors

* fill in some sane logic for doKillExecutors

* doRequestTotalExecutors signals graceful executor shutdown, and favors idle executors
@foxish
Copy link
Owner

foxish commented Nov 15, 2016

@erikerlandson, I just tried the old example with this change, passing in --conf spark.dynamicAllocation.enabled=true. It appears to grow the required tasks unbounded.

2016-11-15 17:51:17 INFO  KubernetesClusterSchedulerBackend:54 - Received doRequestTotalExecutors: 6
2016-11-15 17:51:17 INFO  KubernetesClusterSchedulerBackend:54 - Adding 1 new executors
2016-11-15 17:51:17 INFO  ExecutorAllocationManager:54 - Requesting 1 new executor because tasks are backlogged (new desired total will be 6)
2016-11-15 17:51:18 INFO  MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 1832.0 B, free 366.3 MB)
2016-11-15 17:51:18 INFO  KubernetesClusterSchedulerBackend:54 - Received doRequestTotalExecutors: 8
2016-11-15 17:51:18 INFO  KubernetesClusterSchedulerBackend:54 - Adding 2 new executors
2016-11-15 17:51:18 INFO  MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 1172.0 B, free 366.3 MB)
2016-11-15 17:51:18 INFO  BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on 10.180.4.127:33798 (size: 1172.0 B, free: 366.3 MB)
2016-11-15 17:51:18 INFO  ExecutorAllocationManager:54 - Requesting 2 new executors because tasks are backlogged (new desired total will be 8)
2016-11-15 17:51:18 INFO  SparkContext:54 - Created broadcast 0 from broadcast at DAGScheduler.scala:996
2016-11-15 17:51:19 INFO  KubernetesClusterSchedulerBackend:54 - Received doRequestTotalExecutors: 12
2016-11-15 17:51:19 INFO  KubernetesClusterSchedulerBackend:54 - Adding 4 new executors
2016-11-15 17:51:19 INFO  DAGScheduler:54 - Submitting 10000 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34)
2016-11-15 17:51:20 INFO  ExecutorAllocationManager:54 - Requesting 4 new executors because tasks are backlogged (new desired total will be 12)
2016-11-15 17:51:20 INFO  TaskSchedulerImpl:54 - Adding task set 0.0 with 10000 tasks
2016-11-15 17:51:20 INFO  KubernetesClusterSchedulerBackend:54 - Received doRequestTotalExecutors: 20
2016-11-15 17:51:20 INFO  KubernetesClusterSchedulerBackend:54 - Adding 8 new executors
2016-11-15 17:51:21 INFO  ExecutorAllocationManager:54 - Requesting 8 new executors because tasks are backlogged (new desired total will be 20)
2016-11-15 17:51:22 INFO  KubernetesClusterSchedulerBackend:54 - Received doRequestTotalExecutors: 36
2016-11-15 17:51:22 INFO  KubernetesClusterSchedulerBackend:54 - Adding 16 new executors
2016-11-15 17:51:23 INFO  ExecutorAllocationManager:54 - Requesting 16 new executors because tasks are backlogged (new desired total will be 36)
2016-11-15 17:51:23 INFO  KubernetesClusterSchedulerBackend:54 - Received doRequestTotalExecutors: 68
2016-11-15 17:51:23 INFO  KubernetesClusterSchedulerBackend:54 - Adding 32 new executors
2016-11-15 17:51:28 INFO  ExecutorAllocationManager:54 - Requesting 32 new executors because tasks are backlogged (new desired total will be 68)
2016-11-15 17:51:28 INFO  KubernetesClusterSchedulerBackend:54 - Received doRequestTotalExecutors: 132
2016-11-15 17:51:28 INFO  KubernetesClusterSchedulerBackend:54 - Adding 64 new executors
2016-11-15 17:51:35 INFO  ExecutorAllocationManager:54 - Requesting 64 new executors because tasks are backlogged (new desired total will be 132)
2016-11-15 17:51:35 WARN  TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
2016-11-15 17:51:36 INFO  KubernetesClusterSchedulerBackend:54 - Received doRequestTotalExecutors: 260
2016-11-15 17:51:36 INFO  KubernetesClusterSchedulerBackend:54 - Adding 128 new executors
2016-11-15 17:51:48 INFO  ExecutorAllocationManager:54 - Requesting 128 new executors because tasks are backlogged (new desired total will be 260)
2016-11-15 17:51:48 INFO  KubernetesClusterSchedulerBackend:54 - Received doRequestTotalExecutors: 516
2016-11-15 17:51:48 INFO  KubernetesClusterSchedulerBackend:54 - Adding 256 new executors
2016-11-15 17:51:50 WARN  TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
2016-11-15 17:52:05 WARN  TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
2016-11-15 17:52:06 INFO  ExecutorAllocationManager:54 - Requesting 256 new executors because tasks are backlogged (new desired total will be 516)
2016-11-15 17:52:06 INFO  KubernetesClusterSchedulerBackend:54 - Received doRequestTotalExecutors: 1028
2016-11-15 17:52:06 INFO  KubernetesClusterSchedulerBackend:54 - Adding 512 new executors

@erikerlandson
Copy link
Collaborator Author

@foxish I've seen that once or twice. It needs two things:

  1. pass through all dynamic-executor settings (min-executors, max-executors, delay between scaling calls, etc)
  2. setting pod resource reqs to align with executor cores and executor mem

I started those today

@erikerlandson
Copy link
Collaborator Author

@foxish, I'll put those on a new PR

@foxish
Copy link
Owner

foxish commented Nov 15, 2016

Should we wait for the executors from the previous scaling round to come up before we scale further?

@erikerlandson
Copy link
Collaborator Author

erikerlandson commented Nov 15, 2016

@foxish, that isn't really under the control of the scheduler back-end, but the configuration spark.dynamicAllocation.sustainedSchedulerBacklogTimeout controls how long it will wait between re-scaling attempts. Its default is only 1 second, which in my opinion ought to be more like at least 5 or 10 sec.

@tnachen
Copy link

tnachen commented Nov 15, 2016

I don't think we really need to since dynamic allocation gives you a total number it wants, so if we detect a good number is still launching just running a few more should be fine.

About failure detection and handling, on Mesos/Spark side we have bad hosts detection if launching a host continously have problems.

@erikerlandson
Copy link
Collaborator Author

For reference, these are the dynamic executor parameters, which I will explicitly pass to the executors via sc.getConf.setExecutorEnv
http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation

@erikerlandson
Copy link
Collaborator Author

What's happening currently is that it takes rather longer than 1 second for new executor pods to spin up and register to the driver. So it just keeps scaling up, because it is still backlogged. Then, the new pods start to thrash with the actual computation, and it goes into a positive feedback loop. setting the backlog timeout to some appropriate value will fix it, as would just setting some maximum number of executors.

@foxish
Copy link
Owner

foxish commented Nov 15, 2016

I'm just concerned that even with a larger timeout, it may not be sufficient, and we would always have to specify a reasonable upper bound (spark.dynamicAllocation.maxExecutors). We can have scheduling delays or just slow networks taking a long time to pull the docker image, which would land us in the same feedback loop, with the tasks continuing to stay pending, and the allocation trying to scale up aggressively.

@erikerlandson
Copy link
Collaborator Author

@foxish as long as the timeout is reasonably large, it will be able to keep up.
However, that second part about setting pod resource requests will also keep a check on it, as the cluster will start rejecting pod startups if it tries to create more than it's allowed by namespace, or cluster limits, etc

@erikerlandson
Copy link
Collaborator Author

@foxish This code that checks for environment variable SPARK_EXECUTOR_INSTANCES executes on the driver running inside the cluster. The only way to set it would be on the container description. Unless I'm missing some angle here, I'm thinking of removing the env-var check and replacing it with something along the lines of conf.get("spark.executor.instances", 2)

      val targetNumExecutors =
        sys.env
          .get("SPARK_EXECUTOR_INSTANCES")
          .map(_.toInt)
          .getOrElse(numExecutors)
      conf.get(EXECUTOR_INSTANCES).getOrElse(targetNumExecutors)

@foxish
Copy link
Owner

foxish commented Nov 15, 2016

@erikerlandson Yes, you're right. We don't want to get that value from env-vars, so, it can be replaced by the read from SparkConf.

foxish pushed a commit that referenced this pull request Dec 2, 2016
* Add support for dynamic executors

* fill in some sane logic for doKillExecutors

* doRequestTotalExecutors signals graceful executor shutdown, and favors idle executors
foxish pushed a commit that referenced this pull request Dec 7, 2016
* Add support for dynamic executors

* fill in some sane logic for doKillExecutors

* doRequestTotalExecutors signals graceful executor shutdown, and favors idle executors
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants